Changeset 2b2e9b8 in nscp
- Timestamp:
- 09/18/11 21:56:36 (20 months ago)
- Branches:
- master, 0.4.0, 0.4.1, 0.4.2
- Children:
- a629015
- Parents:
- 76d1076
- Files:
-
- 4 added
- 27 edited
-
CMakeLists.txt (modified) (1 diff)
-
changelog (modified) (1 diff)
-
include/client/command_line_parser.cpp (modified) (1 diff)
-
include/nscapi/nscapi_plugin_wrapper.hpp (modified) (1 diff)
-
include/nscp/client/socket.hpp (modified) (2 diffs)
-
include/nscp/handler.cpp (modified) (4 diffs)
-
include/nscp/handler.hpp (modified) (1 diff)
-
include/nscp/packet.hpp (modified) (19 diffs)
-
include/nscp/server/connection.cpp (modified) (1 diff)
-
include/nscp/server/connection.hpp (modified) (1 diff)
-
include/nscp/server/handler.hpp (modified) (1 diff)
-
include/nscp/server/parser.hpp (modified) (1 diff)
-
include/strEx.h (modified) (3 diffs)
-
include/zeromq/client.hpp (added)
-
include/zeromq/interface.hpp (added)
-
include/zmsg.hpp (modified) (9 diffs)
-
libs/protobuf/CMakeLists.txt (modified) (1 diff)
-
libs/protobuf/ipc.proto (modified) (3 diffs)
-
modules/DistributedServer/CMakeLists.txt (modified) (4 diffs)
-
modules/DistributedServer/DistributedServer.cpp (modified) (3 diffs)
-
modules/DistributedServer/DistributedServer.def (modified) (1 diff)
-
modules/DistributedServer/DistributedServer.h (modified) (3 diffs)
-
modules/DistributedServer/handler_impl.cpp (modified) (2 diffs)
-
modules/DistributedServer/handler_impl.hpp (modified) (1 diff)
-
modules/DistributedServer/module.cmake (modified) (1 diff)
-
modules/DistributedServer/queue_manager.hpp (added)
-
modules/DistributedServer/worker_manager.hpp (added)
-
modules/NRPEClient/CMakeLists.txt (modified) (1 diff)
-
modules/NRPEServer/CMakeLists.txt (modified) (1 diff)
-
modules/NSCPClient/NSCPClient.cpp (modified) (1 diff)
-
modules/NSCPClient/NSCPClient.h (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
CMakeLists.txt
r7515d00 r2b2e9b8 216 216 if(OPENSSL_FOUND) 217 217 MESSAGE(STATUS "Found OpenSSL: ${OPENSSL_INCLUDE_DIR}") 218 SET(NSCP_GLOBAL_DEFINES ${NSCP_GLOBAL_DEFINES} -DUSE_SSL)219 218 else(OPENSSL_FOUND) 220 219 MESSAGE(WARNING " OpenSSL NOT found (no ssl support, ${OPENSSL_INCLUDE_DIR})") -
changelog
r7515d00 r2b2e9b8 6 6 * Fix RtlStringFromGUID problem on NT4 7 7 8 2011-09-18 MickeM 9 * Implementd first version of DistributedClient and DistributedSServer which work so now we have a propper messager based transport. 10 Still a lot of rough edges such as cookie and authentication support is hardcoded. 11 We also need a proper two way distributed server as well as implement "all" payload types. 8 12 9 13 2011-09-11 MickeM -
include/client/command_line_parser.cpp
r7515d00 r2b2e9b8 162 162 std::string buffer, reply; 163 163 nscapi::functions::create_simple_query_request(config.data->command, config.data->arguments, buffer); 164 int ret = config.handler-> exec(config.data, buffer, reply);164 int ret = config.handler->query(config.data, buffer, reply); 165 165 nscapi::functions::parse_simple_query_response(reply, msg, perf); 166 166 return ret; -
include/nscapi/nscapi_plugin_wrapper.hpp
ra14aa07 r2b2e9b8 115 115 } 116 116 void register_command(std::wstring command, std::wstring description) { 117 get_core()->registerCommand(get_id(), _T("submit_nscp"), _T("Submit a query to a remote host via NSCP"));117 get_core()->registerCommand(get_id(), command, description); 118 118 } 119 119 -
include/nscp/client/socket.hpp
r7515d00 r2b2e9b8 78 78 while (left > 0) { 79 79 nscp::packet chunk; 80 std::vector<char> buf( sizeof(nscp::data::signature_packet));80 std::vector<char> buf(nscp::length::get_signature_size()); 81 81 if (!reader->read_and_wait(*socket_, get_socket(), boost::asio::buffer(buf))) { 82 82 get_socket().close(); … … 86 86 chunk.read_signature(buf); 87 87 std::wcout << _T("<<<") << chunk.signature.to_wstring() << std::endl; 88 buf.resize( chunk.signature.payload_length);88 buf.resize(nscp::length::get_payload_size(chunk.signature)); 89 89 90 90 if (!reader->read_and_wait(*socket_, get_socket(), boost::asio::buffer(buf))) { -
include/nscp/handler.cpp
r7515d00 r2b2e9b8 1 1 #include <nscp/handler.hpp> 2 3 2 #include <nscapi/functions.hpp> 4 5 3 6 4 std::list<nscp::packet> nscp::handler::process_all(const std::list<nscp::packet> &packets) { 7 5 std::list<nscp::packet> result; 8 6 BOOST_FOREACH(const nscp::packet &packet, packets) { 9 std::list<nscp::packet> tmp = process(packet); 10 result.insert(result.begin(), tmp.begin(), tmp.end()); 7 result.push_back(process(packet)); 11 8 } 12 9 return result; … … 14 11 15 12 16 std::list<nscp::packet> nscp::handler::process(const nscp::packet &packet) { 17 std::list<nscp::packet> result; 13 nscp::packet nscp::handler::process(const nscp::packet &packet) { 18 14 Plugin::Common::Header hdr; 19 20 15 if (nscp::checks::is_query_request(packet)) { 21 16 Plugin::QueryRequestMessage msg; … … 26 21 try { 27 22 NSCAPI::nagiosReturn returncode = handle_query_request(packet.payload, msg, reply); 28 if (returncode == NSCAPI::returnIgnored) {23 if (returncode == NSCAPI::returnIgnored) 29 24 nscapi::functions::create_simple_query_response_unknown(command, _T("Command was not found: ") + command, _T(""), reply); 30 }31 25 } catch (const nscp::nscp_exception &e) { 32 26 nscapi::functions::create_simple_query_response_unknown(command, _T("Processing error: ") + command + _T(": ") + utf8::cvt<std::wstring>(e.what()), _T(""), reply); … … 34 28 nscapi::functions::create_simple_query_response_unknown(command, _T("Unknown error processing: ") + command + _T(": ") + utf8::cvt<std::wstring>(e.what()), _T(""), reply); 35 29 } 30 return nscp::factory::create_query_response(reply); 36 31 } else if (nscp::checks::is_query_response(packet)) { 37 32 // @todo handle submission here 38 33 return nscp::factory::create_error(_T("Submissions currently not supported: ") + packet.to_wstring()); 39 34 } else { 40 35 this->log_error(__FILE__, __LINE__, _T("Unknown packet: ") + packet.to_wstring()); 41 re sult.push_back(nscp::factory::create_error(_T("Unknown packet: ") + packet.to_wstring()));36 return nscp::factory::create_error(_T("Unknown packet: ") + packet.to_wstring()); 42 37 } 43 return result;44 38 } -
include/nscp/handler.hpp
r7515d00 r2b2e9b8 8 8 9 9 struct handler : public nscp::server::server_handler { 10 virtual std::list<nscp::packet>process(const nscp::packet &packet);10 virtual nscp::packet process(const nscp::packet &packet); 11 11 virtual std::list<nscp::packet> process_all(const std::list<nscp::packet> &packet); 12 12 -
include/nscp/packet.hpp
r7515d00 r2b2e9b8 49 49 static const short exec_response = 21; 50 50 51 static const short message_envelope_request = 30; 52 static const short message_envelope_response = 31; 53 51 54 static const int nscp_magic_number = 12345; 52 55 static const short error = 100; … … 54 57 static const short version_1 = 1; 55 58 56 struct signature_packet { 59 struct signature_type; 60 struct tcp_signature_data { 57 61 int16_t version; 58 62 … … 65 69 u_int32_t additional_packet_count; 66 70 u_int32_t magic_number; 67 68 signature_packet() : magic_number(nscp_magic_number) {} 69 signature_packet(const signature_packet &other) 71 tcp_signature_data() : magic_number(nscp_magic_number) {} 72 tcp_signature_data(const tcp_signature_data &other) 70 73 : version(other.version) 71 74 , header_type(other.header_type) … … 76 79 , additional_packet_count(other.additional_packet_count) 77 80 {} 78 const signature_packet& operator=(const signature_packet&other) {81 const tcp_signature_data& operator=(const tcp_signature_data &other) { 79 82 version = other.version; 80 83 header_type = other.header_type; … … 86 89 return *this; 87 90 } 88 89 bool validate() const { 90 return magic_number == nscp_magic_number; 91 const tcp_signature_data& operator=(const signature_type &other) { 92 version = other.version; 93 header_type = other.header_type; 94 header_length = other.header_length; 95 payload_type = other.payload_type; 96 payload_length = other.payload_length; 97 additional_packet_count = other.additional_packet_count; 98 magic_number = other.magic_number; 99 return *this; 91 100 } 92 101 … … 99 108 << _T(", payload: ") << payload_type 100 109 << _T(", ") << payload_length 101 << _T(", count: ") << additional_packet_count ;110 << _T(", count: ") << additional_packet_count; 102 111 return ss.str(); 103 112 } … … 110 119 << ", payload: " << payload_type 111 120 << ", " << payload_length 112 << ", count: " << additional_packet_count ;121 << ", count: " << additional_packet_count; 113 122 return ss.str(); 114 123 } 115 124 }; 125 struct signature_type : public tcp_signature_data { 126 std::string cookie; 127 128 signature_type() {} 129 signature_type(const signature_type &other) : tcp_signature_data(other), cookie(other.cookie) {} 130 signature_type(const tcp_signature_data &other) : tcp_signature_data(other) {} 131 const signature_type& operator=(const signature_type &other) { 132 tcp_signature_data::operator =(other); 133 cookie = other.cookie; 134 return *this; 135 } 136 const signature_type& operator=(const tcp_signature_data &other) { 137 tcp_signature_data::operator =(other); 138 return *this; 139 } 140 141 bool validate() const { 142 return magic_number == nscp_magic_number; 143 } 144 145 std::wstring to_wstring() const { 146 std::wstringstream ss; 147 ss << _T("base: {") << tcp_signature_data::to_wstring() << _T("}"); 148 return ss.str(); 149 } 150 std::string to_string() const { 151 std::stringstream ss; 152 ss << "base: {" << tcp_signature_data::to_string() << "}, cookie: " << cookie ; 153 return ss.str(); 154 } 155 }; 156 157 116 158 }; 117 159 struct length { 118 static unsigned long get_signature_size() {119 return sizeof(data:: signature_packet);120 } 121 static unsigned long get_header_size(const data::signature_packet&signature) {160 static unsigned long long get_signature_size() { 161 return sizeof(data::tcp_signature_data); 162 } 163 static unsigned long long get_header_size(const data::tcp_signature_data &signature) { 122 164 return signature.header_length*sizeof(char); 123 165 } 124 static unsigned long get_payload_size(const data::signature_packet&signature) {166 static unsigned long long get_payload_size(const data::tcp_signature_data &signature) { 125 167 return signature.payload_length*sizeof(char); 126 168 } … … 141 183 142 184 struct packet { 143 nscp::data::signature_ packetsignature;185 nscp::data::signature_type signature; 144 186 std::string header; 145 187 std::string payload; 146 188 147 189 packet() {} 148 packet(const nscp::data:: signature_packet&sig) : signature(sig) {}149 packet(const nscp::data::signature_ packet&sig, std::string header, std::string payload) : signature(sig), header(header), payload(payload) {}190 packet(const nscp::data::tcp_signature_data &sig) : signature(sig) {} 191 packet(const nscp::data::signature_type &sig, std::string header, std::string payload) : signature(sig), header(header), payload(payload) {} 150 192 packet(const packet & other) : signature(other.signature), header(other.header), payload(other.payload) {} 151 193 const packet& operator=(const packet & other) { … … 165 207 return ret; 166 208 } 209 std::string write_signature() const { 210 std::string buffer; 211 write_signature(buffer); 212 return buffer; 213 } 214 std::string write_msg_signature() const { 215 std::string buffer; 216 write_msg_signature(buffer); 217 return buffer; 218 } 219 void write_msg_signature(std::string &buffer) const { 220 NSCPIPC::Signature sig; 221 sig.set_header_type(signature.header_type); 222 sig.set_payload_type(signature.payload_type); 223 sig.set_version(NSCPIPC::Common_Version_VERSION_1); 224 sig.set_cookie(signature.cookie); 225 sig.AppendToString(&buffer); 226 } 167 227 void write_signature(std::string &buffer) const { 168 // @todo: Optimize this away once this is working 169 char * tmpbuffer = new char[length::get_signature_size()+1]; 170 nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(tmpbuffer); 171 *tmp = signature; 172 buffer.append(tmpbuffer, length::get_signature_size()); 173 delete [] tmpbuffer; 228 nscp::data::tcp_signature_data data = signature; 229 buffer.append(reinterpret_cast<char*>(&data), length::get_signature_size()); 230 } 231 std::string write_header() const { 232 std::string buffer; 233 write_header(buffer); 234 return buffer; 174 235 } 175 236 inline void write_header(std::string &buffer) const { … … 177 238 buffer.insert(buffer.end(), header.begin(), header.end()); 178 239 } 240 std::string write_payload() const { 241 std::string buffer; 242 write_payload(buffer); 243 return buffer; 244 } 179 245 inline void write_payload(std::string &buffer) const { 180 246 if (!payload.empty()) … … 182 248 } 183 249 250 bool is_signature_valid() { 251 return signature.magic_number == nscp::data::nscp_magic_number; 252 } 253 184 254 ////////////////////////////////////////////////////////////////////////// 185 255 // Read from vector (string?) 256 257 void read_msg_signature(std::string &string) { 258 NSCPIPC::Signature sig; 259 sig.ParseFromString(string); 260 signature.header_length = 0; 261 signature.payload_length = 0; 262 signature.cookie = sig.cookie(); 263 signature.header_type = sig.header_type(); 264 signature.payload_type = sig.payload_type(); 265 signature.magic_number = nscp::data::nscp_magic_number; 266 if (sig.version() == NSCPIPC::Common_Version_VERSION_1) 267 signature.version = nscp::data::version_1; 268 signature.additional_packet_count = 0; 269 } 270 186 271 void read_signature(std::vector<char> &buf) { 187 272 assert(buf.size() >= nscp::length::get_signature_size()); 188 // @todo: Optimize this away once this is working189 nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(&(*buf.begin()));190 signature = *tmp;191 signature.payload_type = tmp->payload_type;192 signature.payload_length = tmp->payload_length;273 read_signature(reinterpret_cast<nscp::data::tcp_signature_data*>(&(*buf.begin()))); 274 } 275 void read_signature(std::string &string) { 276 assert(string.size() >= nscp::length::get_signature_size()); 277 read_signature(reinterpret_cast<nscp::data::tcp_signature_data*>(&(*string.begin()))); 193 278 } 194 279 void read_signature(std::string::iterator begin, std::string::iterator end) { 195 280 assert(end-begin >= nscp::length::get_signature_size()); 196 // @todo: Optimize this away once this is working 197 nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(&(*begin)); 198 signature = *tmp; 199 signature.payload_type = tmp->payload_type; 200 signature.payload_length = tmp->payload_length; 281 read_signature(reinterpret_cast<nscp::data::tcp_signature_data*>(&(*begin))); 282 } 283 void read_signature(nscp::data::signature_type *sig) { 284 signature = *sig; 285 } 286 void read_signature(nscp::data::tcp_signature_data *sig) { 287 signature = *sig; 201 288 } 202 289 void nibble_signature(std::string &buf) { 203 assert(buf.size() >= nscp::length::get_signature_size()); 204 // @todo: Optimize this away once this is working 205 nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(&(*buf.begin())); 206 signature = *tmp; 207 signature.payload_type = tmp->payload_type; 208 signature.payload_length = tmp->payload_length; 290 read_signature(buf); 209 291 buf.erase(buf.begin(), buf.begin()+nscp::length::get_signature_size()); 292 } 293 void read_header(std::string &string) { 294 read_header(string.begin(), string.end()); 210 295 } 211 296 void read_header(std::vector<char> &buf) { … … 219 304 header = std::string(buf.begin(), buf.begin()+nscp::length::get_header_size(signature)); 220 305 buf.erase(buf.begin(), buf.begin()+nscp::length::get_header_size(signature)); 306 } 307 void read_payload(std::string &string) { 308 read_payload(string.begin(), string.end()); 221 309 } 222 310 void read_payload(std::vector<char> &buf) { … … 255 343 256 344 257 static nscp::data::signature_ packetcreate_simple_sig(int payload_type, std::string::size_type size) {258 nscp::data::signature_ packetsignature;345 static nscp::data::signature_type create_simple_sig(int payload_type, std::string::size_type size) { 346 nscp::data::signature_type signature; 259 347 signature.header_length = 0; 260 348 signature.header_type = 0; … … 268 356 269 357 } 358 static nscp::data::signature_type create_sig(int payload_type, std::string::size_type header_size, std::string::size_type payload_size, unsigned long additional_packets = 0) { 359 nscp::data::signature_type signature; 360 signature.header_length = header_size; 361 signature.header_type = 0; 362 363 signature.additional_packet_count = additional_packets; 364 signature.version = nscp::data::version_1; 365 366 signature.payload_length = payload_size; 367 signature.payload_type = payload_type; 368 return signature; 369 370 } 270 371 static packet create_payload(unsigned long payload_type, std::string buffer, unsigned long additional_packets = 0) { 271 nscp::data::signature_ packetsignature;372 nscp::data::signature_type signature; 272 373 signature.header_length = 0; 273 374 signature.header_type = 0; … … 286 387 } 287 388 288 static packet create_envelope_request(unsigned long additionl_packets) { 289 nscp::data::signature_packet signature; 290 signature.header_length = 0; 291 signature.header_type = 0; 292 293 signature.additional_packet_count = additionl_packets; 294 signature.version = nscp::data::version_1; 295 389 static packet create_message_envelope_request(unsigned long additional_packets) { 390 std::string buffer; 391 NSCPIPC::MessageRequestEnvelope request_envelope; 392 request_envelope.mutable_envelope()->set_version(NSCPIPC::Common_Version_VERSION_1); 393 request_envelope.mutable_envelope()->set_max_supported_version(NSCPIPC::Common_Version_VERSION_1); 394 // @todo: set authentication stuff here 395 request_envelope.SerializeToString(&buffer); 396 return packet(create_sig(nscp::data::message_envelope_request, 0, buffer.size(), additional_packets), "", buffer); 397 } 398 static packet create_message_envelope_response(std::string cookie, int sequence) { 399 std::string buffer; 400 NSCPIPC::MessageResponseEnvelope envelope; 401 envelope.mutable_envelope()->set_version(NSCPIPC::Common_Version_VERSION_1); 402 envelope.mutable_envelope()->set_max_supported_version(NSCPIPC::Common_Version_VERSION_1); 403 envelope.set_cookie(cookie); 404 envelope.set_sequence(sequence); 405 envelope.SerializeToString(&buffer); 406 return packet(create_sig(nscp::data::message_envelope_response, 0, buffer.size(), 0), "", buffer); 407 } 408 409 410 static packet create_envelope_request(unsigned long additional_packets) { 296 411 std::string buffer; 297 412 NSCPIPC::RequestEnvelope request_envelope; … … 299 414 request_envelope.set_max_supported_version(NSCPIPC::Common_Version_VERSION_1); 300 415 request_envelope.SerializeToString(&buffer); 301 302 signature.payload_length = buffer.size(); 303 signature.payload_type = nscp::data::envelope_request; 304 305 return packet(signature, "", buffer); 306 } 307 308 static packet create_envelope_response(unsigned long additionl_packets) { 309 nscp::data::signature_packet signature; 310 signature.header_length = 0; 311 signature.header_type = 0; 312 313 signature.additional_packet_count = additionl_packets; 314 signature.version = nscp::data::version_1; 315 416 return packet(create_sig(nscp::data::envelope_request, 0, buffer.size(), additional_packets), "", buffer); 417 } 418 419 static packet create_envelope_response(unsigned long additional_packets) { 316 420 std::string buffer; 317 421 NSCPIPC::RequestEnvelope request_envelope; … … 319 423 request_envelope.set_max_supported_version(NSCPIPC::Common_Version_VERSION_1); 320 424 request_envelope.SerializeToString(&buffer); 321 322 signature.payload_length = buffer.size(); 323 signature.payload_type = nscp::data::envelope_response; 324 325 return packet(signature, "", buffer); 425 return packet(create_sig(nscp::data::envelope_response, 0, buffer.size(), additional_packets), "", buffer); 326 426 } 327 427 328 428 static packet create_error(std::wstring msg) { 329 nscp::data::signature_packet signature;330 signature.header_length = 0;331 signature.header_type = 0;332 333 signature.additional_packet_count = 0;334 signature.version = nscp::data::version_1;335 336 429 std::string buffer; 337 430 NSCPIPC::ErrorMessage message; … … 340 433 error->set_message(utf8::cvt<std::string>(msg)); 341 434 message.SerializeToString(&buffer); 342 343 signature.payload_length = buffer.size(); 344 signature.payload_type = nscp::data::error; 345 346 return packet(signature, "", buffer); 435 return packet(create_sig(nscp::data::error, 0, buffer.size()), "", buffer); 347 436 } 348 437 }; … … 355 444 return packet.signature.payload_type == nscp::data::envelope_response; 356 445 } 446 static bool is_message_envelope_request(const nscp::packet &packet) { 447 return packet.signature.payload_type == nscp::data::message_envelope_request; 448 } 449 static bool is_message_envelope_response(const nscp::packet &packet) { 450 return packet.signature.payload_type == nscp::data::message_envelope_response; 451 } 357 452 static bool is_query_request(const nscp::packet &packet) { 358 453 return packet.signature.payload_type == nscp::data::command_request; -
include/nscp/server/connection.cpp
r7515d00 r2b2e9b8 121 121 envelope.ParseFromString(packet.payload); 122 122 } else { 123 std::list<nscp::packet> result = handler_->process(packet); 124 outbound_queue_.insert(outbound_queue_.end(), result.begin(), result.end()); 123 outbound_queue_.push_back(handler_->process(packet)); 125 124 } 126 125 return boost::make_tuple(sig.additional_packet_count > 0, process_helper(&nscp::server::parser::digest_signature, &connection::process_signature)); -
include/nscp/server/connection.hpp
r7515d00 r2b2e9b8 73 73 boost::shared_ptr<nscp::server::server_handler> handler_; 74 74 75 nscp::data:: signature_packetsig;75 nscp::data::tcp_signature_data sig; 76 76 77 77 /// Buffer for incoming data. -
include/nscp/server/handler.hpp
r7515d00 r2b2e9b8 14 14 public: 15 15 server_handler() {} 16 virtual std::list<nscp::packet>process(const nscp::packet &packet) = 0;16 virtual nscp::packet process(const nscp::packet &packet) = 0; 17 17 virtual std::list<nscp::packet> process_all(const std::list<nscp::packet> &packet) = 0; 18 18 -
include/nscp/server/parser.hpp
r7515d00 r2b2e9b8 31 31 32 32 template <typename InputIterator> 33 InputIterator digest_header(InputIterator begin, InputIterator end, const nscp::data:: signature_packet&signature) {33 InputIterator digest_header(InputIterator begin, InputIterator end, const nscp::data::tcp_signature_data &signature) { 34 34 return digest_anything(begin, end, nscp::length::get_header_size(signature)); 35 35 } 36 36 37 boost::tuple<bool, char*> digest_payload(char* begin, char* end, const nscp::data:: signature_packet&signature) {37 boost::tuple<bool, char*> digest_payload(char* begin, char* end, const nscp::data::tcp_signature_data &signature) { 38 38 return digest_anything(begin, end, nscp::length::get_payload_size(signature)); 39 39 } 40 40 41 nscp::data:: signature_packetparse_signature() {42 assert(buffer_.size() >= sizeof(nscp::data::signature_packet));43 nscp::data:: signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(&(*buffer_.begin()));44 nscp::data:: signature_packetsignature = *tmp;41 nscp::data::tcp_signature_data parse_signature() { 42 assert(buffer_.size() >= nscp::length::get_signature_size()); 43 nscp::data::tcp_signature_data *tmp = reinterpret_cast<nscp::data::tcp_signature_data*>(&(*buffer_.begin())); 44 nscp::data::tcp_signature_data signature = *tmp; 45 45 buffer_.clear(); 46 46 return signature; 47 47 } 48 void parse_header(const nscp::data:: signature_packet&signature) {48 void parse_header(const nscp::data::tcp_signature_data &signature) { 49 49 unsigned long wanted = nscp::length::get_header_size(signature); 50 50 if (wanted == 0) -
include/strEx.h
r7515d00 r2b2e9b8 282 282 } 283 283 } 284 ss << chars; 284 285 return ss.str(); 285 286 } … … 304 305 chars += buf[i]; 305 306 } 306 return ss.str(); 307 ss << chars; 308 return ss.str(); 309 } 310 inline std::string format_buffer(std::string buf) { 311 return format_buffer(buf.c_str(), buf.size()); 307 312 } 308 313 inline std::string format_buffer(const std::vector<char> &buf) { … … 326 331 chars += buf[i]; 327 332 } 333 ss << chars; 328 334 return ss.str(); 329 335 } -
include/zmsg.hpp
r7515d00 r2b2e9b8 88 88 89 89 void set_part(size_t part_nbr, std::string data) { 90 if (part_nbr < m_part_data.size() && part_nbr >= 0) { 91 m_part_data[part_nbr] = data; 92 } 90 if (part_nbr < m_part_data.size() && part_nbr >= 0) { 91 m_part_data[part_nbr] = data; 92 } 93 } 94 std::string get_part(size_t part_nbr) { 95 return m_part_data[part_nbr]; 93 96 } 94 97 … … 109 112 } 110 113 char *data = reinterpret_cast<char*>(message.data()); 111 //std::cerr << "recv: \"" << (unsigned char*) message.data() << "\", size " << message.size() << std::endl;112 114 if (message.size() == 17 && data[0] == 0) { 113 115 push_back(encode_uuid(msg_to_string(message))); … … 125 127 } 126 128 127 voidsend(zmq::socket_t & socket) {129 bool send(zmq::socket_t & socket) { 128 130 for (size_t part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) { 129 131 zmq::message_t message; … … 133 135 message.rebuild(17); 134 136 memcpy(message.data(), uuidbin, 17); 135 delete uuidbin;137 delete uuidbin; 136 138 } else { 137 139 message.rebuild(data.size()); … … 139 141 } 140 142 try { 141 //dump();143 //dump(); 142 144 socket.send(message, part_nbr < m_part_data.size() - 1 ? ZMQ_SNDMORE : 0); 143 145 } catch (zmq::error_t error) { 144 //assert(error.num()!=0); 146 assert(false); 147 return false; 145 148 } 146 149 } 147 150 clear(); 151 return true; 148 152 } 149 153 … … 187 191 188 192 assert(data[0] == 0); 189 std::string uuidstr = std::string(3 4, ' ');193 std::string uuidstr = std::string(33, ' '); 190 194 uuidstr[0] = '@'; 191 195 int byte_nbr; 192 196 for (byte_nbr = 0; byte_nbr < 16; byte_nbr++) { 193 uuidstr[byte_nbr * 2 + 1] = hex_char[ data[byte_nbr + 1] >> 4];194 uuidstr[byte_nbr * 2 + 2] = hex_char[ data[byte_nbr + 1] & 15];197 uuidstr[byte_nbr * 2 + 1] = hex_char[(unsigned char)data[byte_nbr + 1] >> 4]; 198 uuidstr[byte_nbr * 2 + 2] = hex_char[(unsigned char)data[byte_nbr + 1] & 15]; 195 199 } 196 200 uuidstr[33] = 0; … … 226 230 = (hex_to_bin [uuidstr [byte_nbr * 2 + 1] & 127] << 4) 227 231 + (hex_to_bin [uuidstr [byte_nbr * 2 + 2] & 127]); 228 229 232 return (data); 230 233 } … … 281 284 282 285 // Dump the message as text or binary 283 int is_text = 1;284 for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++)285 if (data [char_nbr] < 32 || data [char_nbr] > 127)286 is_text = 0;287 288 286 std::cerr << "[" << std::setw(3) << std::setfill('0') << (int) data.size() << "] "; 289 287 for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++) { 290 if (is_text) { 291 std::cerr << (char) data [char_nbr]; 288 if (data [char_nbr] < 32 || data [char_nbr] == 127) { 289 std::cerr << "0x" << std::hex << std::setw(2) << std::setfill('0') << (short int) data [char_nbr]; 290 //std::cerr.unsetf(std::hex); 292 291 } else { 293 std::cerr << std::hex << std::setw(2) << std::setfill('0') << (short int) data [char_nbr];292 std::cerr << (char) data [char_nbr]; 294 293 } 295 294 } … … 386 385 }; 387 386 387 struct zxmsg : public zmq::message_t { 388 389 zxmsg(std::string string) : zmq::message_t(string.size()) { 390 memcpy(data(), string.data(), string.size()); 391 } 392 static std::string read(zmq::message_t &msg) { 393 return std::string(reinterpret_cast<char*>(msg.data()), msg.size()); 394 } 395 396 }; 397 388 398 #endif /* ZMSG_H_ */ -
libs/protobuf/CMakeLists.txt
rb38e845 r2b2e9b8 14 14 ) 15 15 16 add_library(${TARGET} ${SRCS})16 ADD_LIBRARY(${TARGET} ${SRCS}) 17 17 SET_TARGET_PROPERTIES(${TARGET} PROPERTIES FOLDER "libraries") 18 18 -
libs/protobuf/ipc.proto
r7515d00 r2b2e9b8 1 1 package NSCPIPC; 2 3 option optimize_for = LITE_RUNTIME; 2 4 3 5 message Common { … … 19 21 required int32 header_type = 2; 20 22 required int32 payload_type = 3; 21 required int32 additional_packet_count = 4; 23 required string cookie = 4; 24 }; 25 message MessageSignature { 26 required Signature signature = 1; 27 required string cookie = 2; 28 optional int64 sequence = 3; 22 29 }; 23 30 … … 57 64 }; 58 65 66 message MessageRequestEnvelope { 67 required RequestEnvelope envelope = 1; 68 optional string username = 2; 69 optional string password = 3; 70 optional int64 sequence = 4; 71 }; 72 73 message MessageResponseEnvelope { 74 required RequestEnvelope envelope = 1; 75 required string cookie = 2; 76 optional int64 sequence = 3; 77 } 78 79 80 59 81 message ErrorMessage { 60 82 -
modules/DistributedServer/CMakeLists.txt
r76d1076 r2b2e9b8 1 1 cmake_minimum_required(VERSION 2.6) 2 2 3 SET(TARGET ZeroMQServer)3 SET(TARGET DistributedServer) 4 4 5 5 PROJECT(${TARGET}) … … 22 22 SET(SRCS ${SRCS} 23 23 stdafx.h 24 "${TARGET}.h" 25 "${TARGET}.def" 26 "handler_impl.hpp" 24 ${TARGET}.h 25 ${TARGET}.def 26 handler_impl.hpp 27 queue_manager.hpp 28 worker_manager.hpp 27 29 ${NSCP_INCLUDEDIR}/nscp/packet.hpp 28 30 ${NSCP_INCLUDEDIR}/nscp/handler.hpp … … 36 38 ADD_DEFINITIONS(${NSCP_GLOBAL_DEFINES}) 37 39 ADD_LIBRARY(${TARGET} MODULE ${SRCS}) 38 INCLUDE_DIRECTORIES(${OPENSSL_INCLUDE_DIR})39 40 INCLUDE_DIRECTORIES(${ZEROMQ_INCLUDE_DIR}) 40 41 … … 42 43 ${Boost_FILESYSTEM_LIBRARY} 43 44 ${NSCP_DEF_PLUGIN_LIB} 44 ${OPENSSL_LIBRARIES}45 45 ${ZEROMQ_LIBRARY} 46 46 ${EXTRA_LIB} -
modules/DistributedServer/DistributedServer.cpp
r76d1076 r2b2e9b8 21 21 #include "stdafx.h" 22 22 23 #include <zmq.h> 24 #include <zmq.hpp> 25 26 #include "ZeroMQServer.h" 23 #include "DistributedServer.h" 27 24 #include <strEx.h> 28 #include <time.h>29 #include <queue>30 #include <list>31 #include <nscp/packet.hpp>32 #include <boost/thread.hpp>33 25 34 26 #include <config.h> 35 27 #include "handler_impl.hpp" 28 #include "queue_manager.hpp" 29 #include "worker_manager.hpp" 36 30 37 31 #include <settings/client/settings_client.hpp> 38 32 39 40 41 //42 // Asynchronous client-to-server (DEALER to ROUTER)43 //44 // While this example runs in a single process, that is just to make45 // it easier to start and stop the example. Each task has its own46 // context and conceptually acts as a separate process.47 48 #include <zmq.hpp>49 #include <zmsg.hpp>50 51 52 struct zxmsg : public zmq::message_t {53 54 zxmsg(std::string string) : zmq::message_t(string.size()) {55 memcpy(data(), string.data(), string.size());56 }57 static std::string read(zmq::message_t &msg) {58 return std::string(reinterpret_cast<char*>(msg.data()), msg.size());59 }60 61 };62 63 void start_queue(zmq::context_t *context, boost::thread_group *threads, int my_thread_count);64 65 void queue_thread(zmq::context_t *context) {66 67 zmq::socket_t frontend(*context, ZMQ_XREP);68 zmq::socket_t backend(*context, ZMQ_XREP);69 zmq::socket_t control(*context, ZMQ_REP);70 frontend.bind("tcp://*:5555");71 backend.bind("inproc://backend");72 control.bind("inproc://control");73 74 std::queue<std::string> worker_queue;75 76 try {77 while (1) {78 79 zmq::pollitem_t items [] = {80 { backend, 0, ZMQ_POLLIN, 0 },81 { control, 0, ZMQ_POLLIN, 0 },82 { frontend, 0, ZMQ_POLLIN, 0 }83 };84 // Poll frontend only if we have available workers85 if (worker_queue.size())86 zmq::poll(items, 3, -1);87 else88 zmq::poll(items, 2, -1);89 90 // Handle worker activity on backend91 if (items[0].revents & ZMQ_POLLIN) {92 NSC_DEBUG_MSG_STD(_T("---got worker msg---"));93 zmsg zm;94 if (!zm.recv(backend)) {95 NSC_DEBUG_MSG_STD(_T("---failed to read---"));96 return;97 }98 99 // Use worker address for LRU routing100 //assert (worker_queue.size() < worker_count);101 102 zmsg::opstring_t s = zm.unwrap();103 if (s)104 worker_queue.push(*s);105 106 // Return reply to client if it's not a READY107 zmsg::opstring_t a = zm.address();108 if (a && *a == "READY")109 zm.clear();110 else111 zm.send(frontend);112 }113 if (items[1].revents & ZMQ_POLLIN) {114 NSC_DEBUG_MSG_STD(_T("---got control msg---"));115 zmq::message_t query;116 if (!control.recv(&query)) {117 NSC_DEBUG_MSG_STD(_T("---failed to read---"));118 return;119 }120 std::string command = zxmsg::read(query);121 NSC_DEBUG_MSG_STD(_T("=>") + to_wstring(command));122 zxmsg resp("OK");123 control.send(resp);124 }125 if (items[2].revents & ZMQ_POLLIN) {126 NSC_DEBUG_MSG_STD(_T("---got client msg---"));127 // Now get next client request, route to next worker128 129 zmsg zm;130 if (!zm.recv(frontend)) {131 NSC_DEBUG_MSG_STD(_T("---failed to read---"));132 return;133 }134 135 // REQ socket in worker needs an envelope delimiter136 zm.wrap(worker_queue.front(), "");137 zm.send(backend);138 139 // Dequeue and drop the next worker address140 worker_queue.pop();141 }142 }143 } catch (zmq::error_t &e) {144 NSC_LOG_ERROR(_T("Failed in process: ") + to_wstring(e.what()));145 }146 }147 int worker_thread(zmq::context_t *context, std::string foo);148 149 void start_queue(zmq::context_t *context, boost::thread_group &threads, int my_thread_count) {150 boost::thread *t = new boost::thread(queue_thread, context);151 threads.add_thread(t);152 153 zmq::socket_t control(*context, ZMQ_REQ);154 for (int i=0;i<10;i++) {155 try {156 control.connect("inproc://control");157 NSC_LOG_ERROR_STD(_T("connected to control..."));158 zxmsg msg("start");159 if (!control.send(msg)) {160 NSC_LOG_ERROR_STD(_T("Failed to send start"));161 continue;162 }163 break;164 } catch (zmq::error_t &e) {165 NSC_LOG_ERROR_STD(_T("control: ") + utf8::cvt<std::wstring>(e.what()));166 }167 boost::this_thread::sleep(boost::posix_time::seconds(1));168 }169 zmq::message_t resp;170 control.recv(&resp);171 NSC_DEBUG_MSG(_T("Got: ") + to_wstring(zxmsg::read(resp)));172 173 174 for (std::size_t i = 0; i < my_thread_count; ++i) {175 boost::thread *t = new boost::thread(worker_thread, context, "wt_" + to_string(i));176 threads.add_thread(t);177 }178 }179 180 void stop_queue(zmq::context_t *context) {181 zmq::socket_t control(*context, ZMQ_REQ);182 for (int i=0;i<10;i++) {183 try {184 control.connect("inproc://control");185 NSC_LOG_ERROR_STD(_T("connected to control..."));186 zxmsg msg("stop");187 if (!control.send(msg)) {188 NSC_LOG_ERROR_STD(_T("Failed to send start"));189 continue;190 }191 break;192 } catch (zmq::error_t &e) {193 NSC_LOG_ERROR_STD(_T("control: ") + utf8::cvt<std::wstring>(e.what()));194 }195 boost::this_thread::sleep(boost::posix_time::seconds(1));196 }197 }198 199 int worker_count = 0;200 int worker_thread(zmq::context_t *context, std::string key) {201 //boost::shared_ptr<nscp::handler> handler = new handler_impl();202 try {203 //zmq::context_t context(1);204 zmq::socket_t worker(*context, ZMQ_REQ);205 206 // Set random identity to make tracing easier207 std::string identity = key;208 worker.connect("inproc://backend");209 210 // Tell queue we're ready for work211 NSC_DEBUG_MSG_STD(_T("I: (") + to_wstring(identity) + _T(") worker ready"));212 zxmsg msg("READY");213 if (!worker.send(msg)) {214 NSC_DEBUG_MSG_STD(_T("FAIELD TO SEND!!!!!!"));215 }216 217 while (1) {218 zmsg zm;219 if (!zm.recv(worker)) {220 NSC_DEBUG_MSG_STD(_T("Failed to read in (") + to_wstring(identity) + _T(")"));221 return 0;222 }223 NSC_DEBUG_MSG_STD(_T("I: (") + to_wstring(identity) + _T(") from: ") + to_wstring(*zm.address()));224 NSC_DEBUG_MSG_STD(_T("I: (") + to_wstring(identity) + _T(") sent: ") + to_wstring(*zm.body()));225 //sleep (1); // Do some heavy work226 zm.send(worker);227 }228 } catch (const zmq::error_t &e) {229 NSC_LOG_ERROR_STD(utf8::cvt<std::wstring>(e.what()));230 }231 return 0;232 }233 //////////////////////////////////////////////////////////////////////////234 235 static zmq::socket_t * s_client_socket(zmq::context_t* context) {236 NSC_DEBUG_MSG(_T("I: (re)connecting to server"));237 zmq::socket_t *client = new zmq::socket_t(*context, ZMQ_REQ);238 client->connect ("tcp://localhost:5555");239 240 // Configure socket to not wait at close time241 //int linger = 0;242 //client->setsockopt(ZMQ_LINGER, &linger, sizeof (linger));243 return client;244 }245 246 int start_client (zmq::context_t *context, int request_retries, int timeout) {247 zmq::socket_t * client = s_client_socket(context);248 249 int sequence = 0;250 int retries_left = request_retries;251 252 while (retries_left) {253 std::stringstream request;254 request << ++sequence;255 zxmsg msg(request.str());256 client->send(msg);257 //sleep (1);258 259 bool expect_reply = true;260 while (expect_reply) {261 // Poll socket for a reply, with timeout262 zmq::pollitem_t items[] = { { *client, 0, ZMQ_POLLIN, 0 } };263 zmq::poll (&items[0], 1, timeout * 1000);264 265 // If we got a reply, process it266 if (items[0].revents & ZMQ_POLLIN) {267 // We got a reply from the server, must match sequence268 zmq::message_t msg;269 client->recv(&msg);270 std::string reply(reinterpret_cast<char*>(msg.data()), msg.size());271 if (atoi (reply.c_str ()) == sequence) {272 NSC_DEBUG_MSG(_T("I: server replied OK (") + to_wstring(reply) + _T(")"));273 retries_left = request_retries;274 expect_reply = false;275 } else {276 NSC_DEBUG_MSG(_T("E: malformed reply from server (") + to_wstring(reply) + _T(")"));277 }278 }279 else280 if (--retries_left == 0) {281 NSC_DEBUG_MSG(_T("E: server seems to be offline, abandoning"));282 expect_reply = false;283 break;284 } else {285 NSC_DEBUG_MSG(_T("W: no response from server, retrying..."));286 // Old socket will be confused; close it and open a new one287 delete client;288 client = s_client_socket(context);289 // Send request again, on new socket290 zxmsg msg(request.str());291 client->send(msg);292 }293 }294 }295 delete client;296 return 0;297 }298 299 300 33 namespace sh = nscapi::settings_helper; 301 34 302 NSCPListener::NSCPListener() : context(NULL) {35 DistributedServer::DistributedServer() : context(NULL) { 303 36 } 304 NSCPListener::~NSCPListener() {37 DistributedServer::~DistributedServer() { 305 38 delete context; 306 39 } 307 40 308 bool NSCPListener::loadModule() {41 bool DistributedServer::loadModule() { 309 42 return false; 310 43 } 311 44 312 bool NSCPListener::loadModuleEx(std::wstring alias, NSCAPI::moduleLoadMode mode) { 45 bool DistributedServer::loadModuleEx(std::wstring alias, NSCAPI::moduleLoadMode mode) { 46 std::wstring host, suffix, server_mode; 47 unsigned int thread_count; 313 48 try { 314 /*315 49 sh::settings_registry settings(get_settings_proxy()); 316 settings.set_alias(_T(" nscp"), alias, _T("server"));50 settings.set_alias(_T("distributed"), alias, _T("server")); 317 51 318 52 settings.alias().add_path_to_settings() 319 (_T(" NSCP SERVER SECTION"), _T("Section for NSCP (NSCPListener.dll) (check_nscp) protocol options."))53 (_T("DISTRIBUTED NSCP SERVER SECTION"), _T("Section for Distributed NSCP (DistributedServer) (check_nscp) protocol options.")) 320 54 ; 321 55 322 56 settings.alias().add_key_to_settings() 323 (_T(" port"), sh::uint_key(&info_.port, 5668),324 _T(" PORT NUMBER"), _T("Port to use for NSCP."))57 (_T("host"), sh::wstring_key(&host, _T("tcp://*:5555")), 58 _T("HOST TO BIND/CONNECT TO"), _T("The host to bind/connect to")) 325 59 326 ; 60 (_T("suffix"), sh::wstring_key(&suffix, _T("ncsp.dist")), 61 _T("SUFFIX FOR INTERNAL CHANNELS"), _T("Has to be uniq on each server")) 327 62 328 settings.alias().add_parent(_T("/settings/default")).add_key_to_settings() 63 (_T("worker pool size"), sh::uint_key(&thread_count, 10), 64 _T("WORKER POOL SIZE"), _T("Number of threads to spawn for the worker pool")) 329 65 330 (_T("thread pool"), sh::uint_key(&info_.thread_pool_size, 10), 331 _T("THREAD POOL"), _T("")) 332 333 (_T("bind to"), sh::string_key(&info_.address), 334 _T("BIND TO ADDRESS"), _T("Allows you to bind server to a specific local address. This has to be a dotted ip address not a host name. Leaving this blank will bind to all available IP addresses.")) 335 336 (_T("socket queue size"), sh::int_key(&info_.back_log, 0), 337 _T("LISTEN QUEUE"), _T("Number of sockets to queue before starting to refuse new incoming connections. This can be used to tweak the amount of simultaneous sockets that the server accepts.")) 338 339 (_T("allowed hosts"), sh::string_fun_key<std::wstring>(boost::bind(&socket_helpers::allowed_hosts_manager::set_source, &info_.allowed_hosts, _1), _T("127.0.0.1")), 340 _T("ALLOWED HOSTS"), _T("A comaseparated list of allowed hosts. You can use netmasks (/ syntax) or * to create ranges.")) 341 342 (_T("cache allowed hosts"), sh::bool_key(&info_.allowed_hosts.cached, true), 343 _T("CACHE ALLOWED HOSTS"), _T("If hostnames should be cached, improves speed and security somewhat but wont allow you to have dynamic IPs for your nagios server.")) 344 345 (_T("timeout"), sh::uint_key(&info_.timeout, 30), 346 _T("TIMEOUT"), _T("Timeout when reading packets on incoming sockets. If the data has not arrived within this time we will bail out.")) 347 348 (_T("use ssl"), sh::bool_key(&info_.use_ssl, true), 349 _T("ENABLE SSL ENCRYPTION"), _T("This option controls if SSL should be enabled.")) 350 351 (_T("certificate"), sh::wpath_key(&info_.certificate, _T("${certificate-path}/nrpe_dh_512.pem")), 352 _T("SSL CERTIFICATE"), _T("")) 353 66 (_T("mode"), sh::wstring_key(&server_mode, _T("master")), 67 _T("OPERATION MODE"), _T("Mode of operation can only be master now but will add more later on (such as slave)")) 354 68 ; 355 69 356 70 settings.register_all(); 357 71 settings.notify(); 358 */359 72 360 context = new zmq::context_t(5); 361 start_queue(context, threads, 5); 362 //start_client(context, 5, 10); 363 //std::wcout << _T("---QUEUE STARTED---") << std::endl; 364 //start_client(10, 30); 73 context = new zmq::context_t(2); 74 75 zeromq_queue::connection_info queue_info(to_string(host), to_string(suffix)); 76 zeromq_queue::queue_manager queue; 77 queue.start(context, threads, queue_info); 78 79 zeromq_worker::connection_info worker_info(queue_info.get_backend(), to_string(suffix), thread_count); 80 zeromq_worker::worker_manager workers; 81 workers.start(context, threads, worker_info); 365 82 366 83 } catch (std::exception &e) { … … 371 88 return false; 372 89 } 373 374 375 90 return true; 376 91 } 377 92 378 void free_z_buffer(void *data, void *hint) {379 delete [] reinterpret_cast<char*>(data);380 }381 void* create_z_buffer(std::string &buffer) {382 char *tmp = new char[buffer.size()+1];383 memcpy(tmp, buffer.c_str(), buffer.size());384 return tmp;385 }386 93 387 388 389 struct zeromq_server { 390 zmq::socket_t socket; 391 boost::shared_ptr<nscp::handler> handler; 392 393 zeromq_server(zmq::context_t context, boost::shared_ptr<nscp::handler> handler) : socket(context, ZMQ_REP), handler(handler) { 394 } 395 396 void start_server(std::string address = "tcp://*:5555") { 397 socket.bind(address.c_str()); 398 399 while (true) { 400 zmq::message_t request; 401 socket.recv(&request); 402 403 std::list<nscp::packet> read_list; 404 bool has_more = true; 405 while (has_more) { 406 nscp::packet packet; 407 std::string buffer(reinterpret_cast<char*>(request.data()), request.size()); 408 packet.read_all(buffer); 409 read_list.push_back(packet); 410 std::cout << "<<< " << packet.to_string() << std::endl; 411 has_more = packet.signature.additional_packet_count > 0; 412 } 413 std::list<nscp::packet> result = handler->process_all(read_list); 414 send(result); 415 } 416 } 417 void send(std::list<nscp::packet> packets) { 418 unsigned long count = packets.size(); 419 BOOST_FOREACH(nscp::packet &packet, packets) { 420 packet.signature.additional_packet_count = count--; 421 std::cout << ">>> " << packet.to_string() << std::endl; 422 std::string buffer = packet.write_string(); 423 zmq::message_t msg(create_z_buffer(buffer), buffer.size(), &free_z_buffer); 424 socket.send(msg); 425 } 426 } 427 }; 428 429 bool NSCPListener::unloadModule() { 94 bool DistributedServer::unloadModule() { 430 95 try { 431 96 delete context; 432 97 context = NULL; 433 //stop_queue(context);434 98 threads.join_all(); 435 99 } catch (...) { … … 441 105 442 106 443 bool NSCPListener::hasCommandHandler() {107 bool DistributedServer::hasCommandHandler() { 444 108 return false; 445 109 } 446 bool NSCPListener::hasMessageHandler() {110 bool DistributedServer::hasMessageHandler() { 447 111 return false; 448 112 } 449 113 450 114 NSC_WRAP_DLL(); 451 NSC_WRAPPERS_MAIN_DEF( NSCPListener);115 NSC_WRAPPERS_MAIN_DEF(DistributedServer); 452 116 NSC_WRAPPERS_IGNORE_MSG_DEF(); 453 117 NSC_WRAPPERS_IGNORE_CMD_DEF(); -
modules/DistributedServer/DistributedServer.def
r76d1076 r2b2e9b8 1 LIBRARY NRPEListener1 LIBRARY DistributedServer 2 2 3 3 EXPORTS -
modules/DistributedServer/DistributedServer.h
r76d1076 r2b2e9b8 22 22 NSC_WRAPPERS_MAIN(); 23 23 24 class NSCPListener : public nscapi::impl::simple_plugin { 24 #include <zmq.hpp> 25 26 class DistributedServer : public nscapi::impl::simple_plugin { 25 27 public: 26 28 zmq::context_t *context; 27 29 boost::thread_group threads; 28 NSCPListener();29 virtual ~ NSCPListener();30 DistributedServer(); 31 virtual ~DistributedServer(); 30 32 // Module calls 31 33 bool loadModule(); … … 35 37 36 38 static std::wstring getModuleName() { 37 return _T(" ZeroMQ ServerNSCPserver");39 return _T("Distributed server"); 38 40 } 39 41 static nscapi::plugin_wrapper::module_version getModuleVersion() { … … 42 44 } 43 45 static std::wstring getModuleDescription() { 44 return _T("A simple server that listens for incoming NSCP connection and handles them.");46 return _T("A simple server that listens for incoming distributed requests."); 45 47 } 46 48 -
modules/DistributedServer/handler_impl.cpp
r76d1076 r2b2e9b8 7 7 #include "handler_impl.hpp" 8 8 9 std::list<nscp::packet> handler_impl::process(nscp::packet &packet) { 10 std::list<nscp::packet> result; 9 #include "stdafx.h" 11 10 11 #include <boost/asio.hpp> 12 #include <protobuf/plugin.pb.h> 13 #include <nscapi/functions.hpp> 14 15 #include "handler_impl.hpp" 16 17 NSCAPI::nagiosReturn handler_impl::handle_query_request(const std::string &request, Plugin::QueryRequestMessage &msg, std::string &reply) { 12 18 Plugin::Common::Header hdr; 19 hdr.CopyFrom(msg.header()); 13 20 14 if (nscp::checks::is_query_request(packet)) { 15 Plugin::QueryRequestMessage msg; 16 msg.ParseFromString(packet.payload); 17 hdr.CopyFrom(msg.header()); 21 Plugin::QueryResponseMessage response; 22 // @todo: swap data in the dhear (ie. sender /recipent) 23 response.mutable_header()->CopyFrom(hdr); 18 24 19 // @todo: Make split optional 20 // @todo: Make this return ONE response not multiple responses 25 // @todo: Make split optional 26 for (int i=0;i<msg.payload_size();i++) { 27 const Plugin::QueryRequestMessage_Request &payload = msg.payload(i); 28 std::string outBuffer; 29 std::wstring command = utf8::cvt<std::wstring>(payload.command()); 21 30 22 for (int i=0;i<msg.payload_size();i++) { 23 const Plugin::QueryRequestMessage_Request &payload = msg.payload(i); 24 std::string outBuffer; 25 std::wstring command = utf8::cvt<std::wstring>(payload.command()); 26 27 if (command.empty() || command == _T("_NSCP_CHECK")) { 28 nscapi::functions::create_simple_query_response(_T("_NSCP_CHECK"), NSCAPI::returnOK, _T("I (") + nscapi::plugin_singleton->get_core()->getApplicationVersionString() + _T(") seem to be doing fine..."), _T(""), outBuffer); 29 } else if (!allowArgs_ && payload.arguments_size() > 0) { 30 nscapi::functions::create_simple_query_response_unknown(command, _T("Arguments not allowed for command: ") + command, _T(""), outBuffer); 31 } else { 32 bool ok = true; 33 if (!allowNasty_) { 34 for (int j=0;j<payload.arguments_size();j++) { 35 if (payload.arguments(j).find_first_of(NASTY_METACHARS) != std::wstring::npos) { 36 ok = false; 37 break; 38 } 31 if (command.empty() || command == _T("_NSCP_CHECK")) { 32 nscapi::functions::create_simple_query_response(_T("_NSCP_CHECK"), NSCAPI::returnOK, _T("I (") + nscapi::plugin_singleton->get_core()->getApplicationVersionString() + _T(") seem to be doing fine..."), _T(""), outBuffer); 33 } else if (!allowArgs_ && payload.arguments_size() > 0) { 34 nscapi::functions::create_simple_query_response_unknown(command, _T("Arguments not allowed for command: ") + command, _T(""), reply); 35 } else { 36 bool ok = true; 37 if (!allowNasty_) { 38 for (int j=0;j<payload.arguments_size();j++) { 39 if (payload.arguments(j).find_first_of(NASTY_METACHARS) != std::wstring::npos) { 40 ok = false; 41 break; 39 42 } 40 43 } 41 if (ok) { 42 std::string tmpBuffer; 43 Plugin::QueryRequestMessage tmp; 44 tmp.mutable_header()->CopyFrom(hdr); 45 tmp.add_payload()->CopyFrom(payload); 46 tmp.SerializeToString(&tmpBuffer); 47 NSCAPI::nagiosReturn returncode = nscapi::plugin_singleton->get_core()->query(command, tmpBuffer, outBuffer); 48 if (returncode == NSCAPI::returnIgnored) { 49 nscapi::functions::create_simple_query_response_unknown(command, _T("Command was not found: ") + command, _T(""), outBuffer); 50 } 51 } else { 52 nscapi::functions::create_simple_query_response_unknown(command, _T("Nasty arguments not allowed for command: ") + command, _T(""), outBuffer); 44 } 45 if (ok) { 46 std::string tmpBuffer; 47 Plugin::QueryRequestMessage tmp; 48 tmp.mutable_header()->CopyFrom(hdr); 49 tmp.add_payload()->CopyFrom(payload); 50 tmp.SerializeToString(&tmpBuffer); 51 NSCAPI::nagiosReturn returncode = nscapi::plugin_singleton->get_core()->query(command, tmpBuffer, outBuffer); 52 if (returncode == NSCAPI::returnIgnored) { 53 nscapi::functions::create_simple_query_response_unknown(command, _T("Command was not found: ") + command, _T(""), outBuffer); 53 54 } 55 } else { 56 nscapi::functions::create_simple_query_response_unknown(command, _T("Nasty arguments not allowed for command: ") + command, _T(""), outBuffer); 54 57 } 55 result.push_back(nscp::factory::create_query_response(outBuffer));56 58 } 57 } else if (nscp::checks::is_query_response(packet)) { 58 59 // @todo handle submission here 60 61 } else { 62 NSC_LOG_ERROR(_T("Unknown packet: ") + packet.to_wstring()); 63 result.push_back(create_error(_T("Unknown packet: ") + packet.to_wstring())); 59 Plugin::QueryResponseMessage tmpResponse; 60 tmpResponse.ParseFromString(outBuffer); 61 for (int i=0;i<tmpResponse.payload_size();i++) { 62 response.add_payload()->CopyFrom(tmpResponse.payload(i)); 63 } 64 64 } 65 return result; 65 response.SerializeToString(&reply); 66 // @todo: fixme this should probably be an aggregate right? 67 return NSCAPI::isSuccess; 66 68 } 67 69 … … 69 71 70 72 73 74 75 -
modules/DistributedServer/handler_impl.hpp
r76d1076 r2b2e9b8 3 3 #include <nscp/packet.hpp> 4 4 #include <nscp/handler.hpp> 5 #include <boost/tuple/tuple.hpp>6 5 7 6 class handler_impl : public nscp::handler, private boost::noncopyable { 8 unsigned int payload_length_;9 7 bool allowArgs_; 10 8 bool allowNasty_; 11 9 bool noPerfData_; 12 10 public: 13 handler_impl( unsigned int payload_length) : payload_length_(payload_length),noPerfData_(false), allowNasty_(false), allowArgs_(false) {}11 handler_impl() : noPerfData_(false), allowNasty_(false), allowArgs_(false) {} 14 12 15 unsigned int get_payload_length() { 16 return payload_length_; 17 } 18 void set_payload_length(unsigned int payload) { 19 payload_length_ = payload; 20 } 21 22 std::list<nscp::packet> process(nscp::packet &buffer); 13 NSCAPI::nagiosReturn handle_query_request(const std::string &request, Plugin::QueryRequestMessage &msg, std::string &reply); 23 14 24 15 nscp::packet create_error(std::wstring msg) { -
modules/DistributedServer/module.cmake
r76d1076 r2b2e9b8 2 2 SET (BUILD_MODULE 1) 3 3 ELSE(ZEROMQ_FOUND) 4 MESSAGE(STATUS "Disabling ZeroMQServer since zeromq was not found")4 MESSAGE(STATUS "Disabling DistributedServer since zeromq was not found") 5 5 ENDIF(ZEROMQ_FOUND) 6 6 -
modules/NRPEClient/CMakeLists.txt
r438998b r2b2e9b8 15 15 16 16 ADD_DEFINITIONS(${NSCP_GLOBAL_DEFINES}) 17 IF(OPENSSL_FOUND) 18 ADD_DEFINITIONS(-DUSE_SSL) 19 ENDIF(OPENSSL_FOUND) 17 20 18 21 IF(WIN32) -
modules/NRPEServer/CMakeLists.txt
r438998b r2b2e9b8 23 23 24 24 ADD_DEFINITIONS(${NSCP_GLOBAL_DEFINES}) 25 IF(OPENSSL_FOUND) 26 ADD_DEFINITIONS(-DUSE_SSL) 27 ENDIF(OPENSSL_FOUND) 25 28 26 29 IF(WIN32) -
modules/NSCPClient/NSCPClient.cpp
r7515d00 r2b2e9b8 139 139 } 140 140 141 std::wstring NSCPClient::setup(client::configuration config, const std::wstring &command) {141 std::wstring NSCPClient::setup(client::configuration &config, const std::wstring &command) { 142 142 clp_handler_impl *handler = new clp_handler_impl(this); 143 143 add_local_options(config.local, handler->local_data); -
modules/NSCPClient/NSCPClient.h
r7515d00 r2b2e9b8 108 108 void add_command(std::wstring key, std::wstring args); 109 109 void add_target(std::wstring key, std::wstring args); 110 std::wstring setup(client::configuration config, const std::wstring &command);110 std::wstring setup(client::configuration &config, const std::wstring &command); 111 111 112 112 };
Note: See TracChangeset
for help on using the changeset viewer.








