diff options
Diffstat (limited to 'src/3rd_party-static/message_broker/src')
21 files changed, 4563 insertions, 0 deletions
diff --git a/src/3rd_party-static/message_broker/src/client/mb_client.cpp b/src/3rd_party-static/message_broker/src/client/mb_client.cpp new file mode 100644 index 0000000000..6342c776a8 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/client/mb_client.cpp @@ -0,0 +1,75 @@ +/** + * \file mb_client.cpp + * \brief MessageBroker client. + * \author AKara + */ + +#include <cstring> + +#include "mb_client.hpp" + +namespace NsMessageBroker +{ + + Client::Client() + { + } + + Client::Client(const std::string& address, uint16_t port) + { + m_sock = -1; + m_address = address; + m_port = port; + memset(&m_sockaddr, 0x00, sizeof(struct sockaddr_storage)); + m_sockaddrlen = 0; + } + + Client::~Client() + { + if(m_sock != -1) + { + Close(); + } + } + + int Client::GetSocket() const + { + return m_sock; + } + + std::string Client::GetAddress() const + { + return m_address; + } + + void Client::SetAddress(const std::string& address) + { + m_address = address; + } + + void Client::SetPort(uint16_t port) + { + m_port = port; + } + + uint16_t Client::GetPort() const + { + return m_port; + } + + bool Client::Connect() + { + m_sock = networking::connect(m_protocol, GetAddress(), GetPort(), &m_sockaddr, &m_sockaddrlen); + + return (m_sock != -1) ? true : false; + } + + void Client::Close() + { + shutdown(m_sock, SHUT_RDWR); + + close(m_sock); + m_sock = -1; + } + +} /* namespace NsMessageBroker */ diff --git a/src/3rd_party-static/message_broker/src/client/mb_controller.cpp b/src/3rd_party-static/message_broker/src/client/mb_controller.cpp new file mode 100644 index 0000000000..8a4a77cf30 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/client/mb_controller.cpp @@ -0,0 +1,352 @@ +/** + * \file mb_controller.cpp + * \brief MessageBroker Controller. + * \author AKara + */ + +#include "mb_controller.hpp" + +#include "MBDebugHelper.h" +#include "CMessageBroker.hpp" + +namespace NsMessageBroker +{ + CMessageBrokerController::CMessageBrokerController(const std::string& address, uint16_t port, std::string name): + TcpClient(address, port), + stop(false), + m_receivingBuffer(""), + mControllersIdStart(-1), + mControllersIdCurrent(0) + { + mControllersName = name; + } + + std::string CMessageBrokerController::getControllersName() + { + return mControllersName; + } + + CMessageBrokerController::~CMessageBrokerController() + { + } + + ssize_t CMessageBrokerController::Recv(std::string& data) + { + DBG_MSG(("CMessageBrokerController::Recv()\n")); + ssize_t recv = TcpClient::Recv(data); + DBG_MSG(("Received message: %s\n", data.c_str())); + m_receivingBuffer += data; + while (!stop) + { + Json::Value root; + if (!m_reader.parse(m_receivingBuffer, root)) + { + DBG_MSG(("Received not JSON string! %s\n", m_receivingBuffer.c_str())); + return recv; + } + std::string wmes = m_receiverWriter.write(root); + DBG_MSG(("Parsed JSON string:%s; length: %d\n", wmes.c_str(), wmes.length())); + DBG_MSG(("Buffer is:%s\n", m_receivingBuffer.c_str())); + ssize_t beginpos = m_receivingBuffer.find(wmes); + if (-1 != beginpos) + { + m_receivingBuffer.erase(0, beginpos + wmes.length()); + DBG_MSG(("Buffer after cut is:%s\n", m_receivingBuffer.c_str())); + } else + { + m_receivingBuffer.clear(); + } + onMessageReceived(root); + } + return recv; + } + + void CMessageBrokerController::onMessageReceived(Json::Value message) + { + // Determine message type and process... + Json::Value error; + if (checkMessage(message, error)) + { + if (isNotification(message)) + { + DBG_MSG(("Message is notification!\n")); + processNotification(message); + } else if (isResponse(message)) + { + std::string id = message["id"].asString(); + std::string method = findMethodById(id); + DBG_MSG(("Message is response on: %s\n", method.c_str())); + if ("" != method) + { + if ("MB.registerComponent" == method) + { // initialize mControllersIdStart + if (message.isMember("result") && message["result"].isInt()) + { + mControllersIdStart = message["result"].asInt(); + } else + { + DBG_MSG_ERROR(("Not possible to initialize mControllersIdStart!\n")); + } + } else if ("MB.subscribeTo" == method || "MB.unregisterComponent" == method || "MB.unsubscribeFrom" == method) + { + //nothing to do for now + } else + { + processResponse(method, message); + } + } else + { + DBG_MSG_ERROR(("Request with id %s has not been found!\n", id.c_str())); + } + } else + { + DBG_MSG(("Message is request!\n")); + processRequest(message); + } + } else + { + DBG_MSG_ERROR(("Message contains wrong data!\n")); + } + } + + ssize_t CMessageBrokerController::Send(const std::string& data) + { + return TcpClient::Send(data); + } + + void CMessageBrokerController::sendJsonMessage(Json::Value& message) + { + DBG_MSG(("CMessageBrokerController::sendJsonMessage()\n")); + sync_primitives::AutoLock auto_lock(queue_lock_); + std::string mes = m_writer.write(message); + if (!isNotification(message) && !isResponse(message)) + {// not notification, not a response, store id and method name to recognize an answer + mWaitResponseQueue.insert(std::map<std::string, std::string>::value_type(message["id"].asString(), message["method"].asString())); + } + int bytesSent = Send(mes); + bytesSent = bytesSent; // to prevent compiler warnings in case DBG_MSG off + DBG_MSG(("Length:%d, Sent: %d bytes\n", mes.length(), bytesSent)); + } + + std::string CMessageBrokerController::findMethodById(std::string id) + { + DBG_MSG(("CMessageBrokerController::findMethodById()\n")); + sync_primitives::AutoLock auto_lock(queue_lock_); + std::string res = ""; + std::map <std::string, std::string>::iterator it; + it = mWaitResponseQueue.find(id); + if (it != mWaitResponseQueue.end()) + { + res = (*it).second; + mWaitResponseQueue.erase(it); + } + return res; + } + + int CMessageBrokerController::getNextMessageId() + { + if (mControllersIdCurrent < (mControllersIdStart+1000)) + { + return mControllersIdCurrent++; + } else + { + return mControllersIdCurrent = mControllersIdStart; + } + } + + void CMessageBrokerController::prepareMessage(Json::Value& root) + { + root["jsonrpc"] = "2.0"; + root["id"] = getNextMessageId(); + } + + void CMessageBrokerController::prepareErrorMessage(int errCode, std::string errMessage, Json::Value& error) + { + DBG_MSG(("CMessageBrokerController::prepareErrorMessage()\n")); + Json::Value err; + err["code"] = errCode; + err["message"] = errMessage; + error["error"] = err; + } + + std::string CMessageBrokerController::getDestinationComponentName(Json::Value& root) + { + DBG_MSG(("CMessageBrokerController::getDestinationComponentName()\n")); + std::string ret = ""; + std::string method = root["method"].asString(); + int pos = method.find("."); + if (-1 != pos) + { + ret = method.substr(0, pos); + } + DBG_MSG(("Destination component is: %s\n", ret.c_str())); + return ret; + } + + std::string CMessageBrokerController::getMethodName(Json::Value& root) + { + DBG_MSG(("CMessageBrokerController::getMethodName()\n")); + std::string ret = ""; + std::string method = root["method"].asString(); + int pos = method.find("."); + if (-1 != pos) + { + ret = method.substr(pos+1); + } + DBG_MSG(("Method is: %s\n", ret.c_str())); + return ret; + } + + bool CMessageBrokerController::isNotification(Json::Value& root) + { + DBG_MSG(("CMessageBrokerController::isNotification()\n")); + bool ret = false; + if (false == root.isMember("id")) + { + ret = true; + } + DBG_MSG(("Result: %d\n", ret)); + return ret; + } + + bool CMessageBrokerController::isResponse(Json::Value& root) + { + DBG_MSG(("CMessageBrokerController::isResponse()\n")); + bool ret = false; + if ((true == root.isMember("result")) || (true == root.isMember("error"))) + { + ret = true; + } + DBG_MSG(("Result: %d\n", ret)); + return ret; + } + + void CMessageBrokerController::registerController(int id) + { + DBG_MSG(("CMessageBrokerController::registerController()\n")); + Json::Value root; + Json::Value params; + prepareMessage(root); + root["id"] = id; + root["method"] = "MB.registerComponent"; + params["componentName"] = mControllersName; + root["params"] = params; + sendJsonMessage(root); + } + + void CMessageBrokerController::unregisterController() + { + DBG_MSG(("CMessageBrokerController::unregisterController()\n")); + Json::Value root; + Json::Value params; + prepareMessage(root); + root["method"] = "MB.unregisterComponent"; + params["componentName"] = mControllersName; + root["params"] = params; + sendJsonMessage(root); + } + + void CMessageBrokerController::subscribeTo(std::string property) + { + DBG_MSG(("CMessageBrokerController::subscribeTo()\n")); + Json::Value root; + Json::Value params; + prepareMessage(root); + root["method"] = "MB.subscribeTo"; + params["propertyName"] = property; + root["params"] = params; + sendJsonMessage(root); + } + + void CMessageBrokerController::unsubscribeFrom(std::string property) + { + DBG_MSG(("CMessageBrokerController::unsubscribeFrom()\n")); + Json::Value root; + Json::Value params; + prepareMessage(root); + root["method"] = "MB.unsubscribeFrom"; + params["propertyName"] = property; + root["params"] = params; + sendJsonMessage(root); + } + + void* CMessageBrokerController::MethodForReceiverThread(void * arg) + { + stop = false; + arg = arg; // to avoid compiler warnings + while(!stop) + { + std::string data = ""; + Recv(data); + } + return NULL; + } + + bool CMessageBrokerController::checkMessage(Json::Value& root, Json::Value& error) + { + DBG_MSG(("CMessageBrokerController::checkMessage()\n")); + Json::Value err; + + try + { + /* check the JSON-RPC version => 2.0 */ + if (!root.isObject() || !root.isMember("jsonrpc") || root["jsonrpc"] != "2.0") + { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = NsMessageBroker::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; + } + + if (root.isMember("id") && (root["id"].isArray() || root["id"].isObject())) + { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = NsMessageBroker::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; + } + + if (root.isMember("result") && root.isMember("error")) + { + /* message can't contain simultaneously result and error*/ + return false; + } + + if (root.isMember("method")) + { + if (!root["method"].isString()) + { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = NsMessageBroker::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; + } + /* Check the params is an object*/ + if (root.isMember("params") && !root["params"].isObject()) + { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Invalid JSONRPC params."; + error["error"] = err; + return false; + } + } else if (!root.isMember("result") && !root.isMember("error")) + { + return false; + } + return true; + } catch (...) + { + DBG_MSG_ERROR(("CMessageBrokerController::checkMessage() EXCEPTION has been caught!\n")); + return false; + } + } + +} /* namespace NsMessageBroker */ diff --git a/src/3rd_party-static/message_broker/src/client/mb_tcpclient.cpp b/src/3rd_party-static/message_broker/src/client/mb_tcpclient.cpp new file mode 100644 index 0000000000..02db417c26 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/client/mb_tcpclient.cpp @@ -0,0 +1,58 @@ +/** + * \file mb_tcpclient.cpp + * \brief MessageBroker TCP client. + * \author AKara + */ + +#include "mb_tcpclient.hpp" +#include "MBDebugHelper.h" + +namespace NsMessageBroker +{ + + TcpClient::TcpClient(const std::string& address, uint16_t port) : Client(address, port) + { + m_protocol = networking::TCP; + } + + TcpClient::~TcpClient() + { + } + + ssize_t TcpClient::Send(const std::string& data) + { + std::string rep = data; + int bytesToSend = rep.length(); + const char* ptrBuffer = rep.c_str(); + do + { + int retVal = send(m_sock, ptrBuffer, bytesToSend, 0); + if(retVal == -1) + { + return -1; + } + bytesToSend -= retVal; + ptrBuffer += retVal; + }while(bytesToSend > 0); + return rep.length(); + } + + ssize_t TcpClient::Recv(std::string& data) + { + char buf[1500]; + ssize_t nb = -1; + + if((nb = ::recv(m_sock, buf, sizeof(buf), 0)) == -1) + { + std::cerr << "Error while receiving" << std::endl; + return -1; + } + + data = std::string(buf, nb); + DBG_MSG(("Received from server: %s\n", data.c_str())); + + return nb; + } + +} /* namespace NsMessageBroker */ + diff --git a/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerAVA.cpp b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerAVA.cpp new file mode 100644 index 0000000000..77ab1ca2c1 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerAVA.cpp @@ -0,0 +1,59 @@ +/**
+ * \file MessageBrokerControllerAVA.cpp
+ * \brief MessageBroker Controller for Avatar.
+ * \author AKara
+ */
+
+#include <ctime>
+
+#include "MessageBrokerControllerAVA.hpp"
+
+#include "MBDebugHelper.h"
+
+extern int start;
+
+namespace NsMessageBroker
+{
+ CMessageBrokerControllerAVA::CMessageBrokerControllerAVA(const std::string& address, uint16_t port):
+ CMessageBrokerController(address, port, "AVA")
+ {
+ }
+
+ CMessageBrokerControllerAVA::~CMessageBrokerControllerAVA()
+ {
+ }
+
+ void CMessageBrokerControllerAVA::processRequest(Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerAVA::processRequest()\n"));
+ root=root;//to prevent compiler warning
+ }
+
+ void CMessageBrokerControllerAVA::processNotification(Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerAVA::processNotification()\n"));
+ root=root;//to prevent compiler warning
+ }
+
+ void CMessageBrokerControllerAVA::processResponse(std::string method, Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerAVA::processResponse()\n"));
+ DWORD stop = GetTickCount();
+ int diff = stop - start;
+ std::string id = root["id"].asString();
+ printf("Execution time for id %s is %d ms!\n", id.c_str(), diff);
+ root=root;//to prevent compiler warning
+ method=method;//to prevent compiler warning
+ }
+
+ void CMessageBrokerControllerAVA::makeCall(std::string phoneNumber)
+ {
+ DBG_MSG(("CMessageBrokerControllerAVA::makeCall()\n"));
+ Json::Value request, params;
+ prepareMessage(request);
+ request["method"] = "Phone.makeCall";
+ params["phoneNumber"] = phoneNumber;
+ request["params"] = params;
+ sendJsonMessage(request);
+ }
+} /* namespace NsMessageBroker */
diff --git a/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerAVA.hpp b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerAVA.hpp new file mode 100644 index 0000000000..47e684a7e4 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerAVA.hpp @@ -0,0 +1,68 @@ +/**
+ * \file MessageBrokerControllerAVA.hpp
+ * \brief MessageBroker Controller AVA.
+ * \author AKara
+ */
+
+#ifndef MB_CONTROLLERAVA_H
+#define MB_CONTROLLERAVA_H
+
+#include <iostream>
+
+#include "json/json.h"
+
+#include "mb_controller.hpp"
+
+/**
+ * \namespace NsMessageBroker
+ * \brief MessageBroker related functions.
+ */
+namespace NsMessageBroker
+{
+ /**
+ * \class CMessageBrokerControllerAVA
+ * \brief MessageBroker Controller.
+ */
+ class CMessageBrokerControllerAVA : public CMessageBrokerController
+ {
+ public:
+ /**
+ * \brief Constructor.
+ * \param address remote network address or FQDN
+ * \param port remote local port
+ */
+ CMessageBrokerControllerAVA(const std::string& address, uint16_t port);
+
+ /**
+ * \brief Destructor.
+ */
+ ~CMessageBrokerControllerAVA();
+
+ /**
+ * \brief process request.
+ * \param root JSON message.
+ */
+ void processRequest(Json::Value& root);
+
+ /**
+ * \brief process notification.
+ * \param root JSON message.
+ */
+ void processNotification(Json::Value& root);
+
+ /**
+ * \brief process response.
+ * \param method method name which has been called.
+ * \param root JSON message.
+ */
+ void processResponse(std::string method, Json::Value& root);
+
+ /**
+ * \brief sends message to the phone to make call.
+ * \param phoneNumber number for call.
+ */
+ void makeCall(std::string phoneNumber);
+
+ };
+}/* namespace NsMessageBroker */
+#endif /* MB_CONTROLLERAVA_H */
\ No newline at end of file diff --git a/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerBackend.cpp b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerBackend.cpp new file mode 100644 index 0000000000..cbbe39492b --- /dev/null +++ b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerBackend.cpp @@ -0,0 +1,125 @@ +/**
+ * \file MessageBrokerControllerBackend.cpp
+ * \brief MessageBroker Controller for Backend.
+ * \author AKara
+ */
+
+#include "MessageBrokerControllerBackend.hpp"
+
+#include "MBDebugHelper.h"
+
+namespace NsMessageBroker
+{
+ CMessageBrokerControllerBackend::CMessageBrokerControllerBackend(const std::string& address, unsigned short port):
+CMessageBrokerController(address, port, std::string("Backend"))
+ {
+ }
+
+ CMessageBrokerControllerBackend::~CMessageBrokerControllerBackend()
+ {
+ }
+
+ void CMessageBrokerControllerBackend::processRequest(Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::processRequest()\n"));
+ if (getControllersName() == getDestinationComponentName(root))
+ {
+ Json::Value response;
+ response["jsonrpc"] = root["jsonrpc"];
+ response["id"] = root["id"];
+ if ("isFirstStart" == getMethodName(root))
+ {
+ isFirstStart(response);
+ } else if ("isFullScreen" == getMethodName(root))
+ {
+ isFullScreen(response);
+ } else if ("getWindowSize" == getMethodName(root))
+ {
+ getWindowSize(response);
+ } else if ("getWindowDensity" == getMethodName(root))
+ {
+ getWindowDensity(response);
+ } else if ("getOSInfo" == getMethodName(root))
+ {
+ getOSInfo(response);
+ } else if ("logToOS" == getMethodName(root))
+ {
+ logToOS(response);
+ } else
+ {
+ DBG_MSG_ERROR(("Method has not been found!\n"));
+ Json::Value err;
+ err["code"] = NsMessageBroker::INVALID_REQUEST;
+ err["message"] = "Method has not been found.";
+ response["error"] = err;
+ }
+ sendJsonMessage(response);
+ } else
+ {
+ DBG_MSG_ERROR(("Wrong message destination!\n"));
+ }
+ }
+
+ void CMessageBrokerControllerBackend::processNotification(Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::processNotification()\n"));
+ root=root;//to prevent compiler warning
+ }
+
+ void CMessageBrokerControllerBackend::processResponse(std::string method, Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::processResponse()\n"));
+ root=root;//to prevent compiler warning
+ method=method;//to prevent compiler warning
+ }
+
+ void CMessageBrokerControllerBackend::isFirstStart(Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::isFirstStart()\n"));
+ Json::Value res;
+ res["isFirstStart"] = false;
+ response["result"] = res;
+ }
+
+ void CMessageBrokerControllerBackend::isFullScreen(Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::isFullScreen()\n"));
+ Json::Value res;
+ res["isFullScreen"] = false;
+ response["result"] = res;
+ }
+
+ void CMessageBrokerControllerBackend::getWindowSize(Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::getWindowSize()\n"));
+ Json::Value res;
+ res["width"] = 800;
+ res["height"] = 480;
+ response["result"] = res;
+ }
+
+ void CMessageBrokerControllerBackend::getWindowDensity(Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::getWindowDensity()\n"));
+ Json::Value res;
+ res["windowDensity"] = 1;
+ response["result"] = res;
+ }
+
+ void CMessageBrokerControllerBackend::getOSInfo(Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::getOSInfo()\n"));
+ Json::Value res;
+ res["osType"] = "";
+ res["osVersion"] = "";
+ res["isNativeApplication"] = true;
+ response["result"] = res;
+ }
+
+ void CMessageBrokerControllerBackend::logToOS(Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerBackend::logToOS()\n"));
+ response["result"] = "";
+ }
+
+} /* namespace NsMessageBroker */
diff --git a/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerBackend.hpp b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerBackend.hpp new file mode 100644 index 0000000000..b79411dc98 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerBackend.hpp @@ -0,0 +1,96 @@ +/**
+ * \file MessageBrokerControllerBackend.hpp
+ * \brief MessageBroker Controller Backend.
+ * \author AKara
+ */
+#pragma once
+
+#include <iostream>
+
+#include "json/json.h"
+
+#include "mb_controller.hpp"
+#include "CMessageBroker.hpp"
+
+/**
+ * \namespace NsMessageBroker
+ * \brief MessageBroker related functions.
+ */
+namespace NsMessageBroker
+{
+ /**
+ * \class CMessageBrokerControllerBackend
+ * \brief MessageBroker Controller.
+ */
+
+ class CMessageBrokerControllerBackend : public CMessageBrokerController
+ {
+ public:
+ /**
+ * \brief Constructor.
+ * \param address remote network address or FQDN
+ * \param port remote local port
+ */
+ CMessageBrokerControllerBackend(const std::string& address, unsigned short port);
+
+ /**
+ * \brief Destructor.
+ */
+ ~CMessageBrokerControllerBackend();
+
+ /**
+ * \brief process request.
+ * \param root JSON message.
+ */
+ void processRequest(Json::Value& root);
+
+ /**
+ * \brief process notification.
+ * \param root JSON message.
+ */
+ void processNotification(Json::Value& root);
+
+ /**
+ * \brief process response.
+ * \param method method name which has been called.
+ * \param root JSON message.
+ */
+ void processResponse(std::string method, Json::Value& root);
+ private:
+ /**
+ * \brief Checks first start.
+ * \param response container for response
+ */
+ void isFirstStart(Json::Value& response);
+
+ /**
+ * \brief Checks first start.
+ * \param response container for response
+ */
+ void isFullScreen(Json::Value& response);
+
+ /**
+ * \brief Checks first start.
+ * \param response container for response
+ */
+ void getWindowSize(Json::Value& response);
+
+ /**
+ * \brief Checks first start.
+ * \param response container for response
+ */
+ void getWindowDensity(Json::Value& response);
+
+ /**
+ * \brief Checks first start.
+ * \param response container for response
+ */
+ void getOSInfo(Json::Value& response);
+
+ /**
+ * \brief Checks first start.
+ * \param response container for response
+ */
+ void logToOS(Json::Value& response);
+ };
+}/* namespace NsMessageBroker */
diff --git a/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerPhone.cpp b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerPhone.cpp new file mode 100644 index 0000000000..6f308e71c2 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerPhone.cpp @@ -0,0 +1,191 @@ +/**
+ * \file MessageBrokerControllerPhone.cpp
+ * \brief MessageBroker Controller for Phone.
+ * \author AKara
+ */
+
+#include "MessageBrokerControllerPhone.hpp"
+
+#include "CMessageBroker.hpp"
+
+#include "MBDebugHelper.h"
+
+namespace NsMessageBroker
+{
+ CMessageBrokerControllerPhone::CMessageBrokerControllerPhone(const std::string& address, uint16_t port):
+ CMessageBrokerController(address, port, "Phone")
+ {
+ }
+
+ CMessageBrokerControllerPhone::~CMessageBrokerControllerPhone()
+ {
+ }
+
+ void CMessageBrokerControllerPhone::processRequest(Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::processRequest()\n"));
+ if (getControllersName() == getDestinationComponentName(root))
+ {
+ Json::Value response;
+ response["jsonrpc"] = root["jsonrpc"];
+ response["id"] = root["id"];
+ if ("makeCall" == getMethodName(root))
+ {
+ if (root.isMember("params"))
+ {
+ Json::Value params = root["params"];
+ if (params.isMember("phoneNumber") && params["phoneNumber"].isString())
+ {
+ makeCall(params["phoneNumber"].asString(), response);
+ } else
+ {
+ DBG_MSG_ERROR(("Wrong params!\n"));
+ prepareErrorMessage(NsMessageBroker::INVALID_PARAMS, "Wrong params!", response);
+ }
+ } else
+ {
+ DBG_MSG_ERROR(("Not possible to parse phone number!\n"));
+ prepareErrorMessage(NsMessageBroker::INVALID_PARAMS, "Not possible to parse phone number!", response);
+ }
+ } else if ("endCall" == getMethodName(root))
+ {
+ endCall(root["params"].asString(), response);
+ } else if ("holdCall" == getMethodName(root))
+ {
+ holdCall(root["params"].asString(), response);
+ } else if ("getContacts" == getMethodName(root))
+ {
+ if (root.isMember("params"))
+ {
+ Json::Value params = root["params"];
+ if (params.isMember("firstLetter") && params["firstLetter"].isString()
+ && params.isMember("offset") && params["offset"].isInt()
+ && params.isMember("numberOfItems") && params["numberOfItems"].isInt())
+ {
+ getContacts(params["firstLetter"].asString(), params["offset"].asInt(), params["numberOfItems"].asInt(), response);
+ } else
+ {
+ DBG_MSG_ERROR(("Wrong params of getContacts()!\n"));
+ prepareErrorMessage(NsMessageBroker::INVALID_PARAMS, "Wrong params of getContacts()!", response);
+ }
+ } else
+ {
+ DBG_MSG_ERROR(("Params is not an object!\n"));
+ prepareErrorMessage(NsMessageBroker::INVALID_PARAMS, "Params is not an object!", response);
+ }
+ } else if ("getHistory" == getMethodName(root))
+ {
+ if (root.isMember("params"))
+ {
+ Json::Value params = root["params"];
+ if (params.isMember("typeOfContacts") && params["typeOfContacts"].isString()
+ && params.isMember("offset") && params["offset"].isInt()
+ && params.isMember("numberOfItems") && params["numberOfItems"].isInt())
+ {
+ getHistory(params["typeOfContacts"].asString(), params["offset"].asInt(), params["numberOfItems"].asInt(), response);
+ } else
+ {
+ DBG_MSG_ERROR(("Wrong params of getHistory()!\n"));
+ prepareErrorMessage(NsMessageBroker::INVALID_PARAMS, "Wrong params of getHistory()!", response);
+ }
+ } else
+ {
+ DBG_MSG_ERROR(("Params is not an object!\n"));
+ prepareErrorMessage(NsMessageBroker::INVALID_PARAMS, "Params is not an object!", response);
+ }
+ } else
+ {
+ DBG_MSG_ERROR(("Method has not been found!\n"));
+ Json::Value err;
+ err["code"] = NsMessageBroker::INVALID_REQUEST;
+ err["message"] = "Method has not been found.";
+ response["error"] = err;
+ }
+ sendJsonMessage(response);
+ } else
+ {
+ DBG_MSG_ERROR(("Wrong message destination!\n"));
+ }
+ }
+
+ void CMessageBrokerControllerPhone::makeCall(std::string phoneNumber, Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::makeCall()\n"));
+ phoneNumber = phoneNumber; // to avoid compiler's warning
+ response["result"] = "OK";
+ }
+
+ void CMessageBrokerControllerPhone::endCall(std::string phoneNumber, Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::endCall()\n"));
+ phoneNumber = phoneNumber; // to avoid compiler's warning
+ response["result"] = "OK";
+ }
+
+ void CMessageBrokerControllerPhone::holdCall(std::string phoneNumber, Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::holdCall()\n"));
+ phoneNumber = phoneNumber; // to avoid compiler's warning
+ response["result"] = "OK";
+ }
+
+ void CMessageBrokerControllerPhone::getContacts(std::string firstLetter, int offset, int numberOfItems, Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::getContacts()\n"));
+ firstLetter = firstLetter; // to avoid compiler's warning
+ offset = offset; // to avoid compiler's warning
+ numberOfItems = numberOfItems; // to avoid compiler's warning
+ response["result"] = "OK";
+ }
+
+ void CMessageBrokerControllerPhone::getHistory(std::string firstLetter, int offset, int numberOfItems, Json::Value& response)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::getHistory()\n"));
+ firstLetter = firstLetter; // to avoid compiler's warning
+ offset = offset; // to avoid compiler's warning
+ numberOfItems = numberOfItems; // to avoid compiler's warning
+ response["result"] = "OK";
+ }
+
+ void CMessageBrokerControllerPhone::onCallStatusChanged(int callStatus)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::onCallStatusChanged()\n"));
+ Json::Value request, params;
+ request["jsonrpc"] = "2.0";
+ request["method"] = "Phone.onCallStatusChanged";
+ params["callStatus"] = callStatus;
+ request["params"] = params;
+ sendJsonMessage(request);
+ }
+
+ void CMessageBrokerControllerPhone::onContactsUpdated()
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::onContactsUpdated()\n"));
+ Json::Value request;
+ request["jsonrpc"] = "2.0";
+ request["method"] = "Phone.onContactsUpdated";
+ sendJsonMessage(request);
+ }
+
+ void CMessageBrokerControllerPhone::onHistoryUpdated()
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::onHistoryUpdated()\n"));
+ Json::Value request;
+ request["jsonrpc"] = "2.0";
+ request["method"] = "Phone.onHistoryUpdated";
+ sendJsonMessage(request);
+ }
+
+ void CMessageBrokerControllerPhone::processNotification(Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::processNotification()\n"));
+ root=root;//to prevent compiler warning
+ }
+
+ void CMessageBrokerControllerPhone::processResponse(std::string method, Json::Value& root)
+ {
+ DBG_MSG(("CMessageBrokerControllerPhone::processResponse()\n"));
+ root=root;//to prevent compiler warning
+ method = method;
+ }
+} /* namespace NsMessageBroker */
diff --git a/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerPhone.hpp b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerPhone.hpp new file mode 100644 index 0000000000..e6f4898226 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/example/MessageBrokerControllerPhone.hpp @@ -0,0 +1,117 @@ +/**
+ * \file MessageBrokerControllerPhone.hpp
+ * \brief MessageBroker Controller Phone.
+ * \author AKara
+ */
+
+#ifndef MB_CONTROLLERPHONE_H
+#define MB_CONTROLLERPHONE_H
+
+#include <iostream>
+
+#include "json/json.h"
+
+#include "mb_controller.hpp"
+
+/**
+ * \namespace NsMessageBroker
+ * \brief MessageBroker related functions.
+ */
+namespace NsMessageBroker
+{
+ /**
+ * \class CMessageBrokerControllerPhone
+ * \brief MessageBroker Controller Phone.
+ */
+ class CMessageBrokerControllerPhone : public CMessageBrokerController
+ {
+ public:
+ /**
+ * \brief Constructor.
+ * \param address remote network address or FQDN
+ * \param port remote local port
+ */
+ CMessageBrokerControllerPhone(const std::string& address, uint16_t port);
+
+ /**
+ * \brief Destructor.
+ */
+ ~CMessageBrokerControllerPhone();
+
+ /**
+ * \brief process request.
+ * \param root JSON message.
+ */
+ void processRequest(Json::Value& root);
+
+ /**
+ * \brief process notification.
+ * \param root JSON message.
+ */
+ void processNotification(Json::Value& root);
+
+ /**
+ * \brief process response.
+ * \param method method name which has been called.
+ * \param root JSON message.
+ */
+ void processResponse(std::string method, Json::Value& root);
+ public://Notifications
+ /**
+ * \brief Notifies Call Status changing.
+ * \param callStatus status of current active call
+ */
+ void onCallStatusChanged(int callStatus);
+
+ /**
+ * \brief Notifies Contacts list updated.
+ */
+ void onContactsUpdated();
+
+ /**
+ * \brief Notifies History list updated.
+ */
+ void onHistoryUpdated();
+ private:
+ /**
+ * \brief Makes call.
+ * \param phoneNumber number for call.
+ * \param response container for response
+ */
+ void makeCall(std::string phoneNumber, Json::Value& response);
+
+ /**
+ * \brief Ends call.
+ * \param phoneNumber number of call.
+ * \param response container for response
+ */
+ void endCall(std::string phoneNumber, Json::Value& response);
+
+ /**
+ * \brief Holds call.
+ * \param phoneNumber number of call.
+ * \param response container for response
+ */
+ void holdCall(std::string phoneNumber, Json::Value& response);
+
+ /**
+ * \brief Gets contacts.
+ * \param firstLetter first letter of list.
+ * \param offset offset from first item
+ * \param numberOfItems number of expected items
+ * \param response container for response
+ */
+ void getContacts(std::string firstLetter, int offset, int numberOfItems, Json::Value& response);
+
+ /**
+ * \brief Gets history.
+ * \param typeOfContacts type of contacts (incoming/outgoing/missed/all calls).
+ * \param offset offset from first item
+ * \param numberOfItems number of expected items
+ * \param response container for response
+ */
+ void getHistory(std::string typeOfContacts, int offset, int numberOfItems, Json::Value& response);
+
+ };
+}/* namespace NsMessageBroker */
+#endif /* MB_CONTROLLERPHONE_H */
\ No newline at end of file diff --git a/src/3rd_party-static/message_broker/src/example/MessageBrokerServer.cpp b/src/3rd_party-static/message_broker/src/example/MessageBrokerServer.cpp new file mode 100644 index 0000000000..405b3fcbd5 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/example/MessageBrokerServer.cpp @@ -0,0 +1,245 @@ +/**
+ * \file MessageBrokerServer.cpp
+ * \brief MessageBrokerServer sources
+ * \author AKara
+ */
+
+#include <cstdio>
+#include <cstdlib>
+#include <csignal>
+#include <iostream>
+#include <ctime>
+
+#include "system.h"
+
+#include "MBDebugHelper.h"
+
+#include "mb_tcpserver.hpp"
+#include "mb_tcpclient.hpp"
+#include "CMessageBroker.hpp"
+
+#include "MessageBrokerControllerAVA.hpp"
+#include "MessageBrokerControllerPhone.hpp"
+#include "MessageBrokerControllerBackend.hpp"
+
+/**
+ * \brief Signal management.
+ * \param code signal code
+ */
+ static void signal_handler(int code)
+ {
+ switch(code)
+ {
+ case SIGINT:
+ case SIGTERM:
+ break;
+ default:
+ break;
+ }
+}
+
+/**
+ * \brief stores start time of test operation.
+ */
+int start;
+
+/**
+ * \brief Entry point of the program.
+ * \param argc number of argument
+ * \param argv array of arguments
+ * \return EXIT_SUCCESS or EXIT_FAILURE
+ */
+int main(int argc, char** argv)
+{
+ NsMessageBroker::CMessageBroker* mpMessageBroker = NsMessageBroker::CMessageBroker::getInstance();
+ if (!mpMessageBroker)
+ {
+ DBG_MSG_ERROR(("NULL pointer\n"));
+ exit(EXIT_FAILURE);
+ }
+
+
+ NsMessageBroker::TcpServer server(std::string("127.0.0.1"), 8086, mpMessageBroker);
+
+ DBG_MSG(("Start MessageBroker component\n"));
+ mpMessageBroker->startMessageBroker(&server);
+
+ NsMessageBroker::CMessageBrokerControllerAVA tcpControllerAVA(std::string("127.0.0.1"), 8086);
+ NsMessageBroker::CMessageBrokerControllerPhone tcpControllerPhone(std::string("127.0.0.1"), 8086);
+ NsMessageBroker::CMessageBrokerControllerBackend tcpControllerBackend(std::string("127.0.0.1"), 8086);
+
+ /* avoid compilation warnings */
+ argc = argc;
+ argv = argv;
+
+ if(!networking::init())
+ {
+ DBG_MSG_ERROR(("Networking initialization failed!\n"));
+
+ }
+
+ if(signal(SIGTERM, signal_handler) == SIG_ERR)
+ {
+ DBG_MSG_ERROR(("Error signal SIGTERM will not be handled!\n"));
+ }
+
+ if(signal(SIGINT, signal_handler) == SIG_ERR)
+ {
+ DBG_MSG_ERROR(("Error signal SIGINT will not be handled!\n"));
+ }
+
+ if(!server.Bind())
+ {
+ DBG_MSG_ERROR(("Bind failed!\n"));
+ exit(EXIT_FAILURE);
+ } else
+ {
+ DBG_MSG(("Bind successful!\n"));
+ }
+
+ if(!server.Listen())
+ {
+ DBG_MSG_ERROR(("Listen failed!\n"));
+ exit(EXIT_FAILURE);
+ } else
+ {
+ DBG_MSG(("Listen successful!\n"));
+ }
+
+ if(!tcpControllerAVA.Connect())
+ {
+ DBG_MSG_ERROR(("Cannot connect to remote peer!\n"));
+ exit(EXIT_FAILURE);
+ } else
+ {
+ DBG_MSG(("ClientAVA connected to the server! SocketID = %d\n", tcpControllerAVA.GetSocket()));
+ }
+
+ if(!tcpControllerPhone.Connect())
+ {
+ DBG_MSG_ERROR(("Cannot connect to remote peer!\n"));
+ exit(EXIT_FAILURE);
+ } else
+ {
+ DBG_MSG(("ClientPhone connected to the server! SocketID = %d\n",tcpControllerPhone.GetSocket()));
+ }
+
+ if(!tcpControllerBackend.Connect())
+ {
+ DBG_MSG_ERROR(("Cannot connect to remote peer!\n"));
+ exit(EXIT_FAILURE);
+ } else
+ {
+ DBG_MSG(("ClientBackend connected to the server! SocketID = %d\n",tcpControllerBackend.GetSocket()));
+ }
+
+ DBG_MSG(("Start CMessageBroker thread!\n"));
+ System::Thread th1(new System::ThreadArgImpl<NsMessageBroker::CMessageBroker>(*mpMessageBroker, &NsMessageBroker::CMessageBroker::MethodForThread, NULL));
+ th1.Start(false);
+
+ DBG_MSG(("Start MessageBroker TCP server thread!\n"));
+ System::Thread th2(new System::ThreadArgImpl<NsMessageBroker::TcpServer>(server, &NsMessageBroker::TcpServer::MethodForThread, NULL));
+ th2.Start(false);
+
+ DBG_MSG(("Start tcpControllerAVA receiver thread!\n"));
+ System::Thread th3(new System::ThreadArgImpl<NsMessageBroker::CMessageBrokerControllerAVA>(tcpControllerAVA, &NsMessageBroker::CMessageBrokerControllerAVA::MethodForReceiverThread, NULL));
+ th3.Start(false);
+
+ DBG_MSG(("Start tcpControllerPhone receiver thread!\n"));
+ System::Thread th4(new System::ThreadArgImpl<NsMessageBroker::CMessageBrokerControllerPhone>(tcpControllerPhone, &NsMessageBroker::CMessageBrokerControllerPhone::MethodForReceiverThread, NULL));
+ th4.Start(false);
+
+ DBG_MSG(("Start tcpControllerBackend receiver thread!\n"));
+ System::Thread th5(new System::ThreadArgImpl<NsMessageBroker::CMessageBrokerControllerBackend>(tcpControllerBackend, &NsMessageBroker::CMessageBrokerControllerBackend::MethodForReceiverThread, NULL));
+ th5.Start(false);
+
+ bool loop = true;
+ while(loop)
+ {
+ DBG_MSG(("Enter command code:\n"));
+ int i;
+ std::cin >> i;
+ switch(i)
+ {
+ case 0:
+ {
+ DBG_MSG(("Exit!\n"));
+ mpMessageBroker->stopMessageBroker();
+ th1.Stop();
+ th2.Stop();
+ th3.Stop();
+ th4.Stop();
+ server.Close();
+ loop = false;
+ break;
+ }
+ case 1:// register component
+ {
+ DBG_MSG(("tcpControllerAVA.register()\n"));
+ tcpControllerAVA.registerController(0);
+ DBG_MSG(("tcpControllerPhone.register()\n"));
+ tcpControllerPhone.registerController(1);
+ DBG_MSG(("tcpControllerBackend.register()\n"));
+ tcpControllerBackend.registerController(2);
+ break;
+ }
+ case 2:// external message
+ {
+ DBG_MSG(("tcpControllerAVA.Phone.Call()\n"));
+ start = GetTickCount();
+ tcpControllerAVA.makeCall("+380677639550");
+ break;
+ }
+ case 3://subscribe
+ {
+ DBG_MSG(("tcpControllerAVA.subscribeTo()\n"));
+ tcpControllerAVA.subscribeTo("Phone.onContactsUpdated");
+ break;
+ }
+ case 4://notify
+ {
+ DBG_MSG(("tcpControllerPhone.onContactsUpdated()\n"));
+ tcpControllerPhone.onContactsUpdated();
+ break;
+ }
+ case 5: //unsubscribe
+ {
+ DBG_MSG(("tcpControllerAVA.unsubscribeFrom()\n"));
+ tcpControllerAVA.unsubscribeFrom("Phone.onContactsUpdated");
+ break;
+ }
+ case 6: //unregister
+ {
+ DBG_MSG(("tcpControllerPhone.unregister()\n"));
+ tcpControllerPhone.unregisterController();
+ break;
+ }
+ case 7: //stress test
+ {
+
+ DBG_MSG(("tcpControllerAVA.Phone.Call() 500 times\n"));
+ start = GetTickCount();
+ for (int c =0; c<1000; c++)
+ {
+ tcpControllerAVA.makeCall("+380677639550");
+ }
+ int stop = GetTickCount();
+ int diff = stop - start;
+ printf("Requests execution time is %d ms!\n", diff);
+ break;
+ }
+ case 8: //parser test
+ {
+ DBG_MSG(("Parser test\n"));
+ mpMessageBroker->Test();
+ break;
+ }
+ default:
+ {
+ DBG_MSG(("Entered: %d\n", i));
+ break;
+ }
+ }
+ }
+ return EXIT_SUCCESS;
+}
\ No newline at end of file diff --git a/src/3rd_party-static/message_broker/src/example/MessageBrokerServer.hpp b/src/3rd_party-static/message_broker/src/example/MessageBrokerServer.hpp new file mode 100644 index 0000000000..884ed0309f --- /dev/null +++ b/src/3rd_party-static/message_broker/src/example/MessageBrokerServer.hpp @@ -0,0 +1,19 @@ +/**
+ * \file MessageBrokerServer.hpp
+ * \brief MessageBrokerServer header
+ * \author AKara
+ */
+
+#ifndef MESSAGEBROKERSERVER_H
+#define MESSAGEBROKERSERVER_H
+
+/**
+ * \namespace NsMessageBroker
+ * \brief MessageBroker related functions.
+ */
+namespace NsMessageBroker
+{
+
+} /* namespace NsMessageBroker */
+
+#endif /* MESSAGEBROKERSERVER_H */
diff --git a/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBroker.cpp b/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBroker.cpp new file mode 100644 index 0000000000..3b14489a8c --- /dev/null +++ b/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBroker.cpp @@ -0,0 +1,958 @@ +/** + * \file CMessageBroker.cpp + * \brief CMessageBroker singletone class implementation. + * \author AKara + */ + +#include <cassert> +#include <stdio.h> +#include <vector> + +#include <string> + +#include "CMessageBroker.hpp" +#include "CMessageBrokerRegistry.hpp" + +#include "system.h" + +#include "json/json.h" + +#include "libMBDebugHelper.h" + +namespace NsMessageBroker { +/** + * \class CMessage + * \brief CMessage class implementation. + */ +class CMessage { + public: + /** + * \brief Constructor. + */ + CMessage(int aSenderFp, Json::Value aMessage) { + mSenderFd = aSenderFp; + mMessage = aMessage; + } + + /** + * \brief Destructor. + */ + ~CMessage() { + } + + /** + * \brief getter for Json::Value message. + * \return Json::Value message. + */ + Json::Value getMessage() const { + return mMessage; + } + + /** + * \brief getter for sender FileDescriptor. + * \return sender FileDescriptor. + */ + int getSenderFd() const { + return mSenderFd; + } + private: + /** + * \brief sender FileDescriptor. + */ + int mSenderFd; + + /** + * \brief Json::Value message. + */ + Json::Value mMessage; +}; + + +class CMessageBroker_Private { + public: + /** + * \brief Constructor. + */ + CMessageBroker_Private(); + + /** + * \brief Check if que empty (Thread safe). + * \return True when empty. + */ + bool isEventQueueEmpty(); + + /** + * \brief Pop message from que (Thread safe). + * \return Pointer to CMessage. + */ + CMessage* popMessage(); + + /** + * \brief Push message to que (Thread safe). + * \param pMessage pointer to new CMessage object. + */ + void pushMessage(CMessage* pMessage); + + /** + * \brief gets destination component name. + * \param pMessage JSON message. + * \return string destination component name. + */ + std::string getDestinationComponentName(CMessage* pMessage); + + /** + * \brief gets method name. + * \param pMessage JSON message. + * \return string method name. + */ + std::string getMethodName(CMessage* pMessage); + + /** + * \brief checks is message notification or not. + * \param pMessage JSON message. + * \return true if notification. + */ + bool isNotification(CMessage* pMessage); + + /** + * \brief checks is message response or not. + * \param pMessage JSON message. + * \return true if response. + */ + bool isResponse(CMessage* pMessage); + + /** + * \brief checks message. + * \param pMessage JSON message. + * \param error JSON message to fill in case of any errors. + * \return true if message is good. + */ + bool checkMessage(CMessage* pMessage, Json::Value& error); + + /** + * \brief Process internal MessageBrocker message + * + * \brief Register controller in MessageBroker. + * Use following JSON command to register new component: + * \code + * {"jsonrpc": "2.0", "method": "MB.registerComponent", "params": "<ComponentName>"} + * \endcode + * + * \brief Unregister controller in MessageBroker. + * Use following JSON command to unregister component: + * \code + * {"jsonrpc": "2.0", "method": "MB.unregisterComponent", "params": "<ComponentName>"} + * \endcode + * + * \brief Subscribe controller on property change. + * Use following JSON command to subscribe to notifications: + * \code + * {"jsonrpc": "2.0", "method": "MB.subscribeTo", "params": "<ComponentName>.<NotificationName>"} + * \endcode + * + * \brief Unsubscribe controller from property change. + * Use following JSON command to unsubscribe from notifications: + * \code + * {"jsonrpc": "2.0", "method": "MB.unsubscribeFrom", "params": "<ComponentName>.<NotificationName>"} + * \endcode + * + * \param pMessage JSON message. + */ + void processInternalMessage(CMessage* pMessage); + + /** + * \brief process external message. + * \param pMessage JSON message. + */ + void processExternalMessage(CMessage* pMessage); + + /** + * \brief process response. + * \param pMessage JSON message. + */ + void processResponse(CMessage* pMessage); + + /** + * \brief Process notification message. + * \brief Notify subscribers about property change. + * expected notification format example: + * \code + * {"jsonrpc": "2.0", "method": "<ComponentName>.<NotificationName>", "params": <list of params>} + * \endcode + * \param pMessage JSON message. + */ + void processNotification(CMessage* pMessage); + + /** + * \brief send error message. + * \param pMessage JSON message. + */ + void processError(CMessage* pMessage); + + /** + * \brief send Json message. + * \param fd FileDescriptor of socket. + * \param message JSON message. + */ + void sendJsonMessage(int fd, Json::Value message); + + /** + * \brief push message to wait response que. + * \param pMessage JSON message. + */ + void pushMessageToWaitQue(CMessage* pMessage); + + /** + * \brief Returns start position for Id's generator of controller. + * \return start position for Id's generator of controller (1000 id's). + */ + int getNextControllerIdDiapason() { + return 1000 * mControllersIdCounter++; + } + + /** + * \brief pop message from wait response que. + * \param pMessage JSON message. + */ + int popMessageFromWaitQue(CMessage* pMessage); + + /** + * \brief Tries to remove the parsed part of the buffer + * \param root Parsed JSON value + * \param aJSONData The string buffer + * \return true on success, false on failure + */ + bool cutParsedJSON(const Json::Value& root, std::string& aJSONData); + + /** + * \brief Finds the position just after a JSON object or array in a buffer + * \param isObject Must be true for object, false for array + * \param aJSONData The string buffer + * \return The position in the buffer after the object or array on success, + * std::strin::npos on failure + */ + size_t jumpOverJSONObjectOrArray(bool isObject, const std::string& aJSONData); + + /** + * \brief Finds the position just after a JSON string in a buffer + * \param aJSONData The string buffer + * \return The position in the buffer after the string on success, + * std::strin::npos on failure + */ + size_t jumpOverJSONString(const std::string& aJSONData); + + /** + * \brief Que of messages. + */ + std::deque<CMessage*> mMessagesQueue; + + /** + * \brief Counter of messages Id's diapason for the next controllers + * From mControllersIdCounter*1000 to mControllersIdCounter*1000+999. + */ + int mControllersIdCounter; + + /** + * \brief Que of messages which are waiting the response in format: MessageId:SenderFd. + */ + std::map<int, int> mWaitResponseQueue; + + /** + * \brief Pointer to sender. + */ + CSender* mpSender; + + /** + * \brief Pointer to registry. + */ + CMessageBrokerRegistry* mpRegistry; + + /** + * \brief JSON reader. + */ + Json::Reader m_reader; + + /** + * \brief JSON writer. + */ + Json::FastWriter m_writer; + + /** + * \brief JSON writer for receiver. + */ + Json::FastWriter m_recieverWriter; + + /** + * \brief Messages que mutex. + */ + System::Mutex mMessagesQueueMutex; + + /** + * \brief Binary semaphore that is used to notify the + * messaging thread that a new message is available. + */ + System::BinarySemaphore m_messageQueueSemaphore; +}; + +CMessageBroker_Private::CMessageBroker_Private() : + mControllersIdCounter(1), + mpSender(NULL) { + mpRegistry = CMessageBrokerRegistry::getInstance(); +} + + +CMessageBroker::CMessageBroker() : + p(new CMessageBroker_Private()) { +} + +CMessageBroker::~CMessageBroker() { + delete p, p = 0; +} + +CMessageBroker* CMessageBroker::getInstance() { + static CMessageBroker instance; + return &instance; +} + + +size_t CMessageBroker_Private::jumpOverJSONObjectOrArray(bool isObject, + const std::string& aJSONData) { + const char openBracket = isObject? '{' : '['; + const char closeBracket = isObject? '}' : ']'; + int open_minus_close_brackets(1); + size_t position = aJSONData.find(openBracket); // Find the beginning of the object + + while ((position != std::string::npos) && (open_minus_close_brackets > 0)) { + position = aJSONData.find_first_of(std::string("\"")+openBracket+closeBracket, + position+1); + if (std::string::npos == position) { + break; + } + if ('"' == aJSONData[position]) { + // Ignore string interior, which might contain brackets and escaped "-s + do { + position = aJSONData.find('"', position+1); // Find the closing quote + } while ((std::string::npos != position) && ('\\' == aJSONData[position-1])); + } else if (openBracket == aJSONData[position]) { + ++open_minus_close_brackets; + } else if (closeBracket == aJSONData[position]) { + --open_minus_close_brackets; + } + } + + if ((0 == open_minus_close_brackets) && (std::string::npos != position)) { + ++position; // Move after the closing bracket + } else { + position = std::string::npos; + } + + return position; +} + + +size_t CMessageBroker_Private::jumpOverJSONString(const std::string& aJSONData) { + size_t position = aJSONData.find('"'); // Find the beginning of the string + + do { + position = aJSONData.find('"', position+1); // Find the closing quote + } while ((std::string::npos != position) && ('\\' == aJSONData[position-1])); + + if (std::string::npos != position) { + ++position; // Move after the closing quote + } + + return position; +} + + +bool CMessageBroker_Private::cutParsedJSON(const Json::Value& root, + std::string& aJSONData) { + if (root.isNull() || aJSONData.empty()) { + DBG_MSG_ERROR(("JSON is null or the buffer is empty!\n")); + return false; + } + + std::string parsed_json_str = m_recieverWriter.write(root); + DBG_MSG(("Parsed JSON string: '%s'\n", parsed_json_str.c_str())); + + // Trim front spaces (if any) + const size_t nonempty_position = aJSONData.find_first_not_of(" \t\n\v\f\r"); + aJSONData.erase(0, nonempty_position); + if (std::string::npos == nonempty_position) { + DBG_MSG_ERROR(("Buffer contains only blanks!\n")); + return false; + } + + // JSON writer puts '\n' at the end. Remove it. + const size_t final_lf_pos = parsed_json_str.rfind('\n'); + if (final_lf_pos == parsed_json_str.length()-1) { + parsed_json_str.erase(final_lf_pos, 1); + } + + /* RFC 4627: "A JSON value MUST be an object, array, number, or string, or + * one of the following three literal names: false null true" + * So we will try to find the borders of the parsed part based on its type. */ + + size_t position(std::string::npos); + + if (0 == aJSONData.find(parsed_json_str)) { + // If by chance parsed JSON is the same in the buffer and is at the beginning + position = parsed_json_str.length(); + } else if (root.isObject() || root.isArray()) { + position = jumpOverJSONObjectOrArray(root.isObject(), aJSONData); + } else if (root.isString()) { + position = jumpOverJSONString(aJSONData); + } else if (root.isNumeric()) { + position = aJSONData.find_first_not_of("+-0123456789.eE"); + } else if (root.isBool() || ("null" == parsed_json_str)) { + position = aJSONData.find(parsed_json_str); + if (std::string::npos != position) { + position += parsed_json_str.length(); + } + } else { + DBG_MSG_ERROR(("Unknown JSON type!\n")); + } + + if (std::string::npos == position) { + DBG_MSG_ERROR(("Error finding JSON object boundaries!\n")); + /* This should not happen, because the string is already parsed as a + * valid JSON. If this happens then above code is wrong. It is better + * to assert() than just return here, because otherwise we may enter an + * endless cycle - fail to process one and the same message again and + * again. Or we may clear the buffer and return, but in this way we will + * loose the next messages, miss a bug here, and create another bug. */ + assert(std::string::npos != position); + return false; // For release version + } + + if ((position >= aJSONData.length()) || + ((position == aJSONData.length()-1) && isspace(aJSONData[position]))) { + // No next object. Clear entire aJSONData. + aJSONData = ""; + } else { + // There is another object. Clear the current one. + aJSONData.erase(0, position); + } + + return true; +} + + +void CMessageBroker::onMessageReceived(int fd, std::string& aJSONData, bool tryHard) { + DBG_MSG(("CMessageBroker::onMessageReceived(%d, '%s')\n", fd, aJSONData.c_str())); + + while (! aJSONData.empty()) { + Json::Value root; + if ((! p->m_reader.parse(aJSONData, root)) || root.isNull()) { + DBG_MSG_ERROR(("Unable to parse JSON!")); + if (! tryHard) { + return; + } + uint8_t first_byte = static_cast<uint8_t>(aJSONData[0]); + if ((first_byte <= 0x08) || ((first_byte >= 0x80) && (first_byte <= 0x88))) { + DBG_MSG((" There is an unparsed websocket header probably.\n")); + /* Websocket headers can have FIN flag set in the first byte (0x80). + * Then there are 3 zero bits and 4 bits for opcode (from 0x00 to 0x0A). + * But actually we don't use opcodes above 0x08. + * Use this fact to distinguish websocket header from payload text data. + * It can be a coincidence of course, but we have to give it a try. */ + return; + } else if ('{' == aJSONData[0]) { + DBG_MSG_ERROR((" Incomplete JSON object probably.\n")); + return; + } else { + DBG_MSG_ERROR((" Step in the buffer and try again...\n")); + aJSONData.erase(0, 1); + DBG_MSG_ERROR(("Buffer after cut is: '%s'\n", aJSONData.c_str())); + continue; + } + + } else if (! root.isObject()) { + /* JSON RPC 2.0 messages are objects. Batch calls must be pre-rpocessed, + * so no need for "and !root.isArray()" */ + DBG_MSG_ERROR(("Parsed JSON is not an object!\n")); + if (! tryHard) { + return; + } + // Cut parsed data from the buffer below and continue + + } else if ((!root.isMember("jsonrpc")) || (root["jsonrpc"]!="2.0")) { + DBG_MSG_ERROR(("'jsonrpc' is not set correctly in parsed JSON!\n")); + if (! tryHard) { + return; + } + // Cut parsed object from the buffer below and continue + + } else { + // Parsing successful. Pass the message up. + p->pushMessage(new CMessage(fd, root)); + } + + p->cutParsedJSON(root, aJSONData); + + DBG_MSG(("Buffer after cut is: '%s'\n", aJSONData.c_str())); + } +} + +void CMessageBroker::Test() { + Json::Value root, err; + std::string ReceivingBuffer = + "{\"id\":0,\"jsonrpc\":\"2.0\",\"method\":\"MB.registerComponent\",\"params\":{\"componentName\":\"AVA\"}}123{\"id\":0,\"jsonrpc\":\"2.0\",\"method\":\"MB.registerComponent\",\"params\":{\"componentName\":\"AVA\"}}"; + DBG_MSG(("String is:%s\n", ReceivingBuffer.c_str())); + while (1) { + if (!p->m_reader.parse(ReceivingBuffer, root)) { + DBG_MSG_ERROR(("Received not JSON string! %s\n", ReceivingBuffer.c_str())); + return; + } + std::string wmes = p->m_recieverWriter.write(root); + DBG_MSG(("Parsed JSON string:%s; length: %d\n", wmes.c_str(), wmes.length())); + DBG_MSG(("Buffer is:%s\n", ReceivingBuffer.c_str())); + ssize_t beginpos = ReceivingBuffer.find(wmes); + ReceivingBuffer.erase(0, beginpos + wmes.length()); + DBG_MSG(("Buffer after cut is:%s\n", ReceivingBuffer.c_str())); + CMessage message(0, root); + if (p->checkMessage(&message, err)) { + //here put message to que + } else { + DBG_MSG_ERROR(("Wrong message:%s\n", wmes.c_str())); + } + } +} + +void CMessageBroker::OnSocketClosed(const int fd) { + DBG_MSG(("CMessageBroker::OnSocketClosed(%d)\n", fd)); + if (p->mpRegistry) { + p->mpRegistry->removeControllersByDescriptor(fd); + } +} + +void CMessageBroker::startMessageBroker(CSender* pSender) { + DBG_MSG(("CMessageBroker::startMessageBroker()\n")); + p->mpSender = pSender; +} + +void CMessageBroker::stopMessageBroker() { + p->mpSender = NULL; + DBG_MSG(("CMessageBroker::stopMessageBroker()\n")); +} + +CMessage* CMessageBroker_Private::popMessage() { + CMessage* ret = NULL; + DBG_MSG(("CMessageBroker::popMessage()\n")); + mMessagesQueueMutex.Lock(); + if (false == mMessagesQueue.empty()) { + ret = mMessagesQueue.front(); + mMessagesQueue.pop_front();// delete message from que + } else { + DBG_MSG(("Que is empty!\n")); + } + mMessagesQueueMutex.Unlock(); + return ret; +} + +void CMessageBroker_Private::pushMessage(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::pushMessage()\n")); + mMessagesQueueMutex.Lock(); + if (pMessage) { + mMessagesQueue.push_back(pMessage); + } else { + DBG_MSG_ERROR(("NULL pointer!\n")); + } + mMessagesQueueMutex.Unlock(); + + m_messageQueueSemaphore.Notify(); +} + +bool CMessageBroker_Private::isEventQueueEmpty() { + bool bResult = true; + mMessagesQueueMutex.Lock(); + bResult = mMessagesQueue.empty(); + mMessagesQueueMutex.Unlock(); + return bResult; +} + +std::string CMessageBroker_Private::getDestinationComponentName(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::getDestinationComponentName()\n")); + std::string ret = ""; + if (pMessage) { + Json::Value mes = pMessage->getMessage(); + std::string method = mes["method"].asString(); + int pos = method.find("."); + if (-1 != pos) { + ret = method.substr(0, pos); + } + DBG_MSG(("Destination component is: %s\n", ret.c_str())); + } else { + DBG_MSG_ERROR(("NULL pointer!\n")); + } + return ret; +} + +std::string CMessageBroker_Private::getMethodName(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::getMethodName()\n")); + std::string ret = ""; + if (pMessage) { + Json::Value mes = pMessage->getMessage(); + std::string method = mes["method"].asString(); + int pos = method.find("."); + if (-1 != pos) { + ret = method.substr(pos + 1); + } + DBG_MSG(("Method is: %s\n", ret.c_str())); + } else { + DBG_MSG_ERROR(("NULL pointer!\n")); + } + return ret; +} + +bool CMessageBroker_Private::isNotification(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::isNotification()\n")); + bool ret = false; + Json::Value mes = pMessage->getMessage(); + if (false == mes.isMember("id")) { + ret = true; + } + DBG_MSG(("Result: %d\n", ret)); + return ret; +} + +bool CMessageBroker_Private::isResponse(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::isResponse()\n")); + bool ret = false; + Json::Value mes = pMessage->getMessage(); + if ((true == mes.isMember("result")) || (true == mes.isMember("error"))) { + ret = true; + } + DBG_MSG(("Result: %d\n", ret)); + return ret; +} + +void CMessageBroker_Private::pushMessageToWaitQue(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::pushMessageToWaitQue()\n")); + if (pMessage) { + Json::Value root = pMessage->getMessage(); + mWaitResponseQueue.insert(std::map<int, int>::value_type(root["id"].asInt(), pMessage->getSenderFd())); + } else { + DBG_MSG_ERROR(("NULL pointer!\n")); + } +} + +int CMessageBroker_Private::popMessageFromWaitQue(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::popMessageFromWaitQue()\n")); + int result = -1; + if (pMessage) { + Json::Value root = pMessage->getMessage(); + int messageId = root["id"].asInt(); + std::map <int, int>::iterator it; + it = mWaitResponseQueue.find(messageId); + if (it != mWaitResponseQueue.end()) { + result = (*it).second; + mWaitResponseQueue.erase(it); + } + } else { + DBG_MSG_ERROR(("NULL pointer!\n")); + } + DBG_MSG(("Senders Fd: %d\n", result)); + return result; +} + +void CMessageBroker_Private::processInternalMessage(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::processInternalMessage()\n")); + if (pMessage) { + std::string amethodName = getMethodName(pMessage); + DBG_MSG(("Method: %s\n", amethodName.c_str())); + Json::Value root = pMessage->getMessage(); + if ("registerComponent" == amethodName) { + Json::Value params = root["params"]; + if (params.isMember("componentName") && params["componentName"].isString()) { + std::string controllerName = params["componentName"].asString(); + if (mpRegistry->addController(pMessage->getSenderFd(), controllerName)) { + Json::Value response; + response["id"] = root["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = getNextControllerIdDiapason(); + sendJsonMessage(pMessage->getSenderFd(), response); + } else { + Json::Value error, err; + error["id"] = root["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = CONTROLLER_EXISTS; + err["message"] = "Controller has been already registered."; + error["error"] = err; + processError(new CMessage(pMessage->getSenderFd(), error)); + } + } else { + Json::Value error, err; + error["id"] = root["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + processError(new CMessage(pMessage->getSenderFd(), error)); + } + } else if ("subscribeTo" == amethodName) { + Json::Value params = root["params"]; + if (params.isMember("propertyName") && params["propertyName"].isString()) { + std::string propertyName = params["propertyName"].asString(); + if (mpRegistry->addSubscriber(pMessage->getSenderFd(), propertyName)) { + Json::Value response; + response["id"] = root["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + sendJsonMessage(pMessage->getSenderFd(), response); + } else { + Json::Value error, err; + error["id"] = root["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = CONTROLLER_EXISTS; + err["message"] = "Subscribe has been already registered."; + error["error"] = err; + processError(new CMessage(pMessage->getSenderFd(), error)); + } + } else { + Json::Value error, err; + error["id"] = root["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + processError(new CMessage(pMessage->getSenderFd(), error)); + } + } else if ("unregisterComponent" == amethodName) { + Json::Value params = root["params"]; + if (params.isMember("componentName") && params["componentName"].isString()) { + std::string controllerName = params["componentName"].asString(); + mpRegistry->deleteController(controllerName); + Json::Value response; + response["id"] = root["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + sendJsonMessage(pMessage->getSenderFd(), response); + } else { + Json::Value error, err; + error["id"] = root["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + processError(new CMessage(pMessage->getSenderFd(), error)); + } + } else if ("unsubscribeFrom" == amethodName) { + Json::Value params = root["params"]; + if (params.isMember("propertyName") && params["propertyName"].isString()) { + std::string propertyName = params["propertyName"].asString(); + mpRegistry->deleteSubscriber(pMessage->getSenderFd(), propertyName); + Json::Value response; + response["id"] = root["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + sendJsonMessage(pMessage->getSenderFd(), response); + } else { + Json::Value error, err; + error["id"] = root["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + processError(new CMessage(pMessage->getSenderFd(), error)); + } + } else { + DBG_MSG(("Unknown method!\n")); + Json::Value error; + Json::Value err; + error["id"] = root["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Invalid MessageBroker method."; + error["error"] = err; + processError(new CMessage(pMessage->getSenderFd(), error)); + } + } else { + DBG_MSG_ERROR(("NULL pointer!\n")); + } +} + +void CMessageBroker_Private::processExternalMessage(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::processExternalMessage()\n")); + if (pMessage) { + std::string destComponentName = getDestinationComponentName(pMessage); + int destFd = mpRegistry->getDestinationFd(destComponentName); + Json::Value root = pMessage->getMessage(); + if (0 < destFd) { + sendJsonMessage(destFd, root); + pushMessageToWaitQue(pMessage); + } else { + // error, controller not found in the registry + DBG_MSG(("Unknown method!\n")); + Json::Value error; + Json::Value err; + Json::Value error_data; + error["id"] = root["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = UNSUPPORTED_RESOURCE; + err["message"] = "Destination controller not found!"; + error_data["method"] = root["method"]; + err["data"] = error_data; + error["error"] = err; + processError(new CMessage(pMessage->getSenderFd(), error)); + } + } else { + DBG_MSG_ERROR(("NULL pointer\n")); + } +} + +void CMessageBroker_Private::processResponse(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::processResponse()\n")); + if (pMessage) { + int senderFd = popMessageFromWaitQue(pMessage); + if (-1 != senderFd) { + sendJsonMessage(senderFd, pMessage->getMessage()); + } + } else { + DBG_MSG_ERROR(("NULL pointer\n")); + } +} + +void CMessageBroker_Private::processNotification(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::processNotification()\n")); + if (pMessage) { + Json::Value root = pMessage->getMessage(); + std::string methodName = root["method"].asString(); + DBG_MSG(("Property: %s\n", methodName.c_str())); + std::vector<int> result; + int subscribersCount = mpRegistry->getSubscribersFd(methodName, result); + if (0 < subscribersCount) { + std::vector<int>::iterator it; + for (it = result.begin(); it != result.end(); it++) { + sendJsonMessage(*it, root); + } + } else { + DBG_MSG(("No subscribers for this property!\n")); + } + } else { + DBG_MSG_ERROR(("NULL pointer\n")); + } +} + +void CMessageBroker_Private::processError(CMessage* pMessage) { + DBG_MSG(("CMessageBroker::processError()\n")); + if (pMessage) { + sendJsonMessage(pMessage->getSenderFd(), pMessage->getMessage()); + delete pMessage;// delete CMessage object with error description!!! + } else { + DBG_MSG_ERROR(("NULL pointer\n")); + } +} + +void CMessageBroker_Private::sendJsonMessage(int fd, Json::Value message) { + DBG_MSG(("CMessageBroker::sendJsonMessage(%d)\n", fd)); + if (mpSender) { + std::string mes = m_writer.write(message); + int retVal = mpSender->Send(fd, mes); + if (retVal == -1) { + DBG_MSG_ERROR(("Message hasn't been sent!\n")); + return; + } + DBG_MSG(("Length:%d, Sent: %d bytes\n", mes.length(), retVal)); + } else { + DBG_MSG_ERROR(("mpSender NULL pointer\n")); + } +} + +void* CMessageBroker::MethodForThread(void* arg) { + arg = arg; // to avoid compiler warnings + while (1) { + while (!p->isEventQueueEmpty()) { + CMessage* message = p->popMessage(); + if (message) { + Json::Value error; + if (p->checkMessage(message, error)) { + if (p->isNotification(message)) { + DBG_MSG(("Message is notification!\n")); + p->processNotification(message); + } else if (p->isResponse(message)) { + DBG_MSG(("Message is response!\n")); + p->processResponse(message); + } else { + if ("MB" == p->getDestinationComponentName(message)) { + DBG_MSG(("Internal MessageBroker method!\n")); + p->processInternalMessage(message); + } else { + DBG_MSG(("Not MessageBroker method!\n")); + p->processExternalMessage(message); + } + } + } else { + DBG_MSG_ERROR(("Message contains wrong data!\n")); + CMessage* errMessage = new CMessage(message->getSenderFd(), error); + if (NULL != errMessage) { + p->processError(errMessage); + } else { + DBG_MSG_ERROR(("NULL pointer!\n")); + } + } + delete message;// delete message object + } + } + p->m_messageQueueSemaphore.Wait(); + } + + return NULL; +} + +bool CMessageBroker_Private::checkMessage(CMessage* pMessage, Json::Value& error) { + DBG_MSG(("CMessageBroker::checkMessage()\n")); + Json::Value root; + root = pMessage->getMessage(); + Json::Value err; + + /* check the JSON-RPC version => 2.0 */ + if (!root.isObject() || !root.isMember("jsonrpc") || root["jsonrpc"] != "2.0") { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Invalid JSON RPC version."; + error["error"] = err; + return false; + } + + /* Check the id of message */ + if (root.isMember("id") && (root["id"].isArray() || root["id"].isObject() || root["id"].isString())) { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Invalid ID of message."; + error["error"] = err; + return false; + } + + /* extract "method" attribute */ + if (root.isMember("method")) { + if (!root["method"].isString()) { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Invalid JSONRPC method."; + error["error"] = err; + return false; + } + /* Check the params is an object*/ + if (root.isMember("params") && !root["params"].isObject()) { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Invalid JSONRPC params."; + error["error"] = err; + return false; + } + } else if (!(root.isMember("result") || root.isMember("error"))) { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Unknwn message type."; + error["error"] = err; + return false; + } + return true; +} +} /* namespace NsMessageBroker */ diff --git a/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBrokerRegistry.cpp b/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBrokerRegistry.cpp new file mode 100644 index 0000000000..fb24d08f1c --- /dev/null +++ b/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBrokerRegistry.cpp @@ -0,0 +1,191 @@ +/** + * \file CMessageBrokerRegistry.cpp + * \brief CMessageBrokerRegistry singletone class implementation. + * \author AKara + */ + +#include "CMessageBrokerRegistry.hpp" +#include "libMBDebugHelper.h" + +#include <vector> +#include <string> + +namespace NsMessageBroker +{ + CMessageBrokerRegistry::CMessageBrokerRegistry() + { + } + + CMessageBrokerRegistry::~CMessageBrokerRegistry() + { + } + + CMessageBrokerRegistry* CMessageBrokerRegistry::getInstance() + { + static CMessageBrokerRegistry instance; + return &instance; + } + + bool CMessageBrokerRegistry::addController(int fd, std::string name) + { + DBG_MSG(("CMessageBrokerRegistry::addController()\n")); + bool result = false; + std::map <std::string, int>::iterator it; + + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(name); + if (it == mControllersList.end()) + { + mControllersList.insert(std::map <std::string, int>::value_type(name, fd)); + result = true; + } else + { + DBG_MSG(("Controller already exists!\n")); + } + + DBG_MSG(("Count of controllers: %d\n", mControllersList.size())); + return result; + } + + void CMessageBrokerRegistry::deleteController(std::string name) + { + DBG_MSG(("CMessageBrokerRegistry::deleteController()\n")); + std::map <std::string, int>::iterator it; + + int fd; + { + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(name); + if (it != mControllersList.end()) + { + fd = it->second; + mControllersList.erase(it); + } else { + DBG_MSG(("No such controller in the list!\n")); + return; + } + DBG_MSG(("Count of controllers: %d\n", mControllersList.size())); + } + removeSubscribersByDescriptor(fd); + } + + void CMessageBrokerRegistry::removeControllersByDescriptor(const int fd) { + DBG_MSG(("CMessageBrokerRegistry::removeControllersByDescriptor(%d)\n", + fd)); + { + sync_primitives::AutoLock lock(mControllersListLock); + std::map <std::string, int>::iterator it = mControllersList.begin(); + for (; it != mControllersList.end();) { + if (it->second == fd) { + mControllersList.erase(it++); + } else { + ++it; + } + } + } + removeSubscribersByDescriptor(fd); + } + + void CMessageBrokerRegistry::removeSubscribersByDescriptor(const int fd) { + DBG_MSG(("CMessageBrokerRegistry::removeSubscribersByDescriptor(%d)\n", + fd)); + sync_primitives::AutoLock lock(mSubscribersListLock); + std::multimap <std::string, int>::iterator it_s = mSubscribersList.begin(); + for (; it_s !=mSubscribersList.end(); ) { + if (it_s->second == fd) { + mSubscribersList.erase(it_s++); + } else { + ++it_s; + } + } + } + + bool CMessageBrokerRegistry::addSubscriber(int fd, std::string name) + { + DBG_MSG(("CMessageBrokerRegistry::addSubscriber()\n")); + bool result = true; + + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap <std::string, int>::iterator, std::multimap <std::string, int>::iterator> p = mSubscribersList.equal_range(name); + if (p.first != p.second) + { + std::multimap <std::string, int>::iterator itr; + for (itr = p.first; itr != p.second; itr++) + { + if (fd == itr->second) + { + result = false; + DBG_MSG(("Subscriber already exists!\n")); + } + } + } + if (result) + { + mSubscribersList.insert(std::map <std::string, int>::value_type(name, fd)); + } + + DBG_MSG(("Count of subscribers: %d\n", mSubscribersList.size())); + return result; + } + + void CMessageBrokerRegistry::deleteSubscriber(int fd, std::string name) + { + DBG_MSG(("CMessageBrokerRegistry::deleteSubscriber()\n")); + + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap <std::string, int>::iterator, std::multimap <std::string, int>::iterator> p = mSubscribersList.equal_range(name); + if (p.first != p.second) { + std::multimap <std::string, int>::iterator itr; + for (itr = p.first; itr != p.second; ) { + DBG_MSG(("My for loop %s, %d", itr->first.c_str() ,itr->second)); + if (fd == itr->second) { + mSubscribersList.erase(itr++); + } else { + ++itr; + } + } + } + + DBG_MSG(("Count of subscribers: %d\n", mSubscribersList.size())); + } + + int CMessageBrokerRegistry::getDestinationFd(std::string name) + { + DBG_MSG(("CMessageBrokerRegistry::getDestinationFd()\n")); + int result = -1; + std::map <std::string, int>::iterator it; + + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(name); + if (it != mControllersList.end()) + { + result = it->second; + } + + DBG_MSG(("Controllers Fd: %d\n", result)); + return result; + } + + int CMessageBrokerRegistry::getSubscribersFd(std::string name, std::vector<int>& result) + { + DBG_MSG(("CMessageBrokerRegistry::getSubscribersFd()\n")); + int res = 0; + std::map <std::string, int>::iterator it; + + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap <std::string, int>::iterator, std::multimap <std::string, int>::iterator> p = mSubscribersList.equal_range(name); + if (p.first != p.second) + { + std::multimap <std::string, int>::iterator itr; + for (itr = p.first; itr != p.second; itr++) + { + result.push_back(itr->second); + DBG_MSG(("Controllers Fd: %d\n", itr->second)); + } + } + + res = result.size(); + DBG_MSG(("Result vector size: %d\n", res)); + return res; + } +} /* namespace NsMessageBroker */ diff --git a/src/3rd_party-static/message_broker/src/lib_messagebroker/libMBDebugHelper.h b/src/3rd_party-static/message_broker/src/lib_messagebroker/libMBDebugHelper.h new file mode 100644 index 0000000000..0d5260cdda --- /dev/null +++ b/src/3rd_party-static/message_broker/src/lib_messagebroker/libMBDebugHelper.h @@ -0,0 +1,43 @@ +/** + * \file libMBDebugHelper.h + * \brief DebugHelper. + * \author AKara + */ + +#ifndef MB_DEBUG_HELPER_H +#define MB_DEBUG_HELPER_H + +#include <cstdio> + +/** +* \def DEBUG_MB_ON +* \brief Switches on MessageBroker debug messages. +*/ +#ifdef DEBUG_MB_ON + +/** +* \def DBG_MSG +* \brief Debug message output with file name and line number. +* \param x formatted debug message. +* \return printf construction. +*/ +#define DBG_MSG(x) printf("%s:%d_lib ", __FILE__, __LINE__);\ + printf x + +/** + * \def DBG_MSG_ERROR + * \brief Debug ERROR message output with file name and line number. + * \param x formatted debug message. + * \return printf construction. + */ +#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d_lib ", __FILE__, __LINE__);\ + printf x + +#else + +#define DBG_MSG(x) +#define DBG_MSG_ERROR(x) + +#endif + +#endif /*MB_DEBUG_HELPER_H*/ diff --git a/src/3rd_party-static/message_broker/src/lib_messagebroker/md5.cpp b/src/3rd_party-static/message_broker/src/lib_messagebroker/md5.cpp new file mode 100644 index 0000000000..b3e347d9d7 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/lib_messagebroker/md5.cpp @@ -0,0 +1,373 @@ +/* MD5 + converted to C++ class by Frank Thilo (thilo@unix-ag.org) + for bzflag (http://www.bzflag.org) + + based on: + + md5.h and md5.c + reference implemantion of RFC 1321 + + Copyright (C) 1991-2, RSA Data Security, Inc. Created 1991. All + rights reserved. + + License to copy and use this software is granted provided that it + is identified as the "RSA Data Security, Inc. MD5 Message-Digest + Algorithm" in all material mentioning or referencing this software + or this function. + + License is also granted to make and use derivative works provided + that such works are identified as "derived from the RSA Data + Security, Inc. MD5 Message-Digest Algorithm" in all material + mentioning or referencing the derived work. + + RSA Data Security, Inc. makes no representations concerning either + the merchantability of this software or the suitability of this + software for any particular purpose. It is provided "as is" + without express or implied warranty of any kind. + + These notices must be retained in any copies of any part of this + documentation and/or software. + + */ + +/* interface header */ +#include "md5.h" + +/* system implementation headers */ +#include <stdio.h> +#include <memory.h> + +// Constants for MD5Transform routine. +#define S11 7 +#define S12 12 +#define S13 17 +#define S14 22 +#define S21 5 +#define S22 9 +#define S23 14 +#define S24 20 +#define S31 4 +#define S32 11 +#define S33 16 +#define S34 23 +#define S41 6 +#define S42 10 +#define S43 15 +#define S44 21 + +/////////////////////////////////////////////// + +// F, G, H and I are basic MD5 functions. +inline MD5::uint4 MD5::F(uint4 x, uint4 y, uint4 z) { + return (x&y) | (~x&z); +} + +inline MD5::uint4 MD5::G(uint4 x, uint4 y, uint4 z) { + return (x&z) | (y&~z); +} + +inline MD5::uint4 MD5::H_(uint4 x, uint4 y, uint4 z) { + return x^y^z; +} + +inline MD5::uint4 MD5::I(uint4 x, uint4 y, uint4 z) { + return y ^ (x | ~z); +} + +// rotate_left rotates x left n bits. +inline MD5::uint4 MD5::rotate_left(uint4 x, int n) { + return (x << n) | (x >> (32-n)); +} + +// FF, GG, HH, and II transformations for rounds 1, 2, 3, and 4. +// Rotation is separate from addition to prevent recomputation. +inline void MD5::FF(uint4 &a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac) { + a = rotate_left(a+ F(b,c,d) + x + ac, s) + b; +} + +inline void MD5::GG(uint4 &a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac) { + a = rotate_left(a + G(b,c,d) + x + ac, s) + b; +} + +inline void MD5::HH(uint4 &a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac) { + a = rotate_left(a + H_(b,c,d) + x + ac, s) + b; +} + +inline void MD5::II(uint4 &a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac) { + a = rotate_left(a + I(b,c,d) + x + ac, s) + b; +} + +////////////////////////////////////////////// + +// default ctor, just initailize +MD5::MD5() +{ + init(); +} + +////////////////////////////////////////////// + +// nifty shortcut ctor, compute MD5 for string and finalize it right away +MD5::MD5(const std::string &text) +{ + init(); + update(text.c_str(), text.length()); + finalize(); +} + +////////////////////////////// + +void MD5::init() +{ + finalized=false; + + count[0] = 0; + count[1] = 0; + + // load magic initialization constants. + state[0] = 0x67452301; + state[1] = 0xefcdab89; + state[2] = 0x98badcfe; + state[3] = 0x10325476; +} + +////////////////////////////// + +// decodes input (unsigned char) into output (uint4). Assumes len is a multiple of 4. +void MD5::decode(uint4 output[], const uint1 input[], size_type len) +{ + for (unsigned int i = 0, j = 0; j < len; i++, j += 4) + output[i] = ((uint4)input[j]) | (((uint4)input[j+1]) << 8) | + (((uint4)input[j+2]) << 16) | (((uint4)input[j+3]) << 24); +} + +////////////////////////////// + +// encodes input (uint4) into output (unsigned char). Assumes len is +// a multiple of 4. +void MD5::encode(uint1 output[], const uint4 input[], size_type len) +{ + for (size_type i = 0, j = 0; j < len; i++, j += 4) { + output[j] = input[i] & 0xff; + output[j+1] = (input[i] >> 8) & 0xff; + output[j+2] = (input[i] >> 16) & 0xff; + output[j+3] = (input[i] >> 24) & 0xff; + } +} + +////////////////////////////// + +// apply MD5 algo on a block +void MD5::transform(const uint1 block[blocksize]) +{ + uint4 a = state[0], b = state[1], c = state[2], d = state[3], x[16]; + decode (x, block, blocksize); + + /* Round 1 */ + FF (a, b, c, d, x[ 0], S11, 0xd76aa478); /* 1 */ + FF (d, a, b, c, x[ 1], S12, 0xe8c7b756); /* 2 */ + FF (c, d, a, b, x[ 2], S13, 0x242070db); /* 3 */ + FF (b, c, d, a, x[ 3], S14, 0xc1bdceee); /* 4 */ + FF (a, b, c, d, x[ 4], S11, 0xf57c0faf); /* 5 */ + FF (d, a, b, c, x[ 5], S12, 0x4787c62a); /* 6 */ + FF (c, d, a, b, x[ 6], S13, 0xa8304613); /* 7 */ + FF (b, c, d, a, x[ 7], S14, 0xfd469501); /* 8 */ + FF (a, b, c, d, x[ 8], S11, 0x698098d8); /* 9 */ + FF (d, a, b, c, x[ 9], S12, 0x8b44f7af); /* 10 */ + FF (c, d, a, b, x[10], S13, 0xffff5bb1); /* 11 */ + FF (b, c, d, a, x[11], S14, 0x895cd7be); /* 12 */ + FF (a, b, c, d, x[12], S11, 0x6b901122); /* 13 */ + FF (d, a, b, c, x[13], S12, 0xfd987193); /* 14 */ + FF (c, d, a, b, x[14], S13, 0xa679438e); /* 15 */ + FF (b, c, d, a, x[15], S14, 0x49b40821); /* 16 */ + + /* Round 2 */ + GG (a, b, c, d, x[ 1], S21, 0xf61e2562); /* 17 */ + GG (d, a, b, c, x[ 6], S22, 0xc040b340); /* 18 */ + GG (c, d, a, b, x[11], S23, 0x265e5a51); /* 19 */ + GG (b, c, d, a, x[ 0], S24, 0xe9b6c7aa); /* 20 */ + GG (a, b, c, d, x[ 5], S21, 0xd62f105d); /* 21 */ + GG (d, a, b, c, x[10], S22, 0x2441453); /* 22 */ + GG (c, d, a, b, x[15], S23, 0xd8a1e681); /* 23 */ + GG (b, c, d, a, x[ 4], S24, 0xe7d3fbc8); /* 24 */ + GG (a, b, c, d, x[ 9], S21, 0x21e1cde6); /* 25 */ + GG (d, a, b, c, x[14], S22, 0xc33707d6); /* 26 */ + GG (c, d, a, b, x[ 3], S23, 0xf4d50d87); /* 27 */ + GG (b, c, d, a, x[ 8], S24, 0x455a14ed); /* 28 */ + GG (a, b, c, d, x[13], S21, 0xa9e3e905); /* 29 */ + GG (d, a, b, c, x[ 2], S22, 0xfcefa3f8); /* 30 */ + GG (c, d, a, b, x[ 7], S23, 0x676f02d9); /* 31 */ + GG (b, c, d, a, x[12], S24, 0x8d2a4c8a); /* 32 */ + + /* Round 3 */ + HH (a, b, c, d, x[ 5], S31, 0xfffa3942); /* 33 */ + HH (d, a, b, c, x[ 8], S32, 0x8771f681); /* 34 */ + HH (c, d, a, b, x[11], S33, 0x6d9d6122); /* 35 */ + HH (b, c, d, a, x[14], S34, 0xfde5380c); /* 36 */ + HH (a, b, c, d, x[ 1], S31, 0xa4beea44); /* 37 */ + HH (d, a, b, c, x[ 4], S32, 0x4bdecfa9); /* 38 */ + HH (c, d, a, b, x[ 7], S33, 0xf6bb4b60); /* 39 */ + HH (b, c, d, a, x[10], S34, 0xbebfbc70); /* 40 */ + HH (a, b, c, d, x[13], S31, 0x289b7ec6); /* 41 */ + HH (d, a, b, c, x[ 0], S32, 0xeaa127fa); /* 42 */ + HH (c, d, a, b, x[ 3], S33, 0xd4ef3085); /* 43 */ + HH (b, c, d, a, x[ 6], S34, 0x4881d05); /* 44 */ + HH (a, b, c, d, x[ 9], S31, 0xd9d4d039); /* 45 */ + HH (d, a, b, c, x[12], S32, 0xe6db99e5); /* 46 */ + HH (c, d, a, b, x[15], S33, 0x1fa27cf8); /* 47 */ + HH (b, c, d, a, x[ 2], S34, 0xc4ac5665); /* 48 */ + + /* Round 4 */ + II (a, b, c, d, x[ 0], S41, 0xf4292244); /* 49 */ + II (d, a, b, c, x[ 7], S42, 0x432aff97); /* 50 */ + II (c, d, a, b, x[14], S43, 0xab9423a7); /* 51 */ + II (b, c, d, a, x[ 5], S44, 0xfc93a039); /* 52 */ + II (a, b, c, d, x[12], S41, 0x655b59c3); /* 53 */ + II (d, a, b, c, x[ 3], S42, 0x8f0ccc92); /* 54 */ + II (c, d, a, b, x[10], S43, 0xffeff47d); /* 55 */ + II (b, c, d, a, x[ 1], S44, 0x85845dd1); /* 56 */ + II (a, b, c, d, x[ 8], S41, 0x6fa87e4f); /* 57 */ + II (d, a, b, c, x[15], S42, 0xfe2ce6e0); /* 58 */ + II (c, d, a, b, x[ 6], S43, 0xa3014314); /* 59 */ + II (b, c, d, a, x[13], S44, 0x4e0811a1); /* 60 */ + II (a, b, c, d, x[ 4], S41, 0xf7537e82); /* 61 */ + II (d, a, b, c, x[11], S42, 0xbd3af235); /* 62 */ + II (c, d, a, b, x[ 2], S43, 0x2ad7d2bb); /* 63 */ + II (b, c, d, a, x[ 9], S44, 0xeb86d391); /* 64 */ + + state[0] += a; + state[1] += b; + state[2] += c; + state[3] += d; + + // Zeroize sensitive information. + memset(x, 0, sizeof x); +} + +////////////////////////////// + +// MD5 block update operation. Continues an MD5 message-digest +// operation, processing another message block +void MD5::update(const unsigned char input[], size_type length) +{ + // compute number of bytes mod 64 + size_type index = count[0] / 8 % blocksize; + + // Update number of bits + if ((count[0] += (length << 3)) < (length << 3)) + count[1]++; + count[1] += (length >> 29); + + // number of bytes we need to fill in buffer + size_type firstpart = 64 - index; + + size_type i; + + // transform as many times as possible. + if (length >= firstpart) + { + // fill buffer first, transform + memcpy(&buffer[index], input, firstpart); + transform(buffer); + + // transform chunks of blocksize (64 bytes) + for (i = firstpart; i + blocksize <= length; i += blocksize) + transform(&input[i]); + + index = 0; + } + else + i = 0; + + // buffer remaining input + memcpy(&buffer[index], &input[i], length-i); +} + +////////////////////////////// + +// for convenience provide a verson with signed char +void MD5::update(const char input[], size_type length) +{ + update((const unsigned char*)input, length); +} + +////////////////////////////// + +// MD5 finalization. Ends an MD5 message-digest operation, writing the +// the message digest and zeroizing the context. +MD5& MD5::finalize() +{ + static unsigned char padding[64] = { + 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + }; + + if (!finalized) { + // Save number of bits + unsigned char bits[8]; + encode(bits, count, 8); + + // pad out to 56 mod 64. + size_type index = count[0] / 8 % 64; + size_type padLen = (index < 56) ? (56 - index) : (120 - index); + update(padding, padLen); + + // Append length (before padding) + update(bits, 8); + + // Store state in digest + encode(digest, state, 16); + + // Zeroize sensitive information. + memset(buffer, 0, sizeof buffer); + memset(count, 0, sizeof count); + + finalized=true; + } + + return *this; +} + +////////////////////////////// + +void MD5::getdigest(char *digest) const +{ + if (!finalized) + return; + + if (digest == NULL) + return; + + memcpy(digest, this->digest, 16); +} + +// return hex representation of digest as string +std::string MD5::hexdigest() const +{ + if (!finalized) + return ""; + + char buf[33]; + for (int i=0; i<16; i++) + sprintf(buf+i*2, "%02x", digest[i]); + buf[32]=0; + + return std::string(buf); +} + +////////////////////////////// + +std::ostream& operator<<(std::ostream& out, MD5 md5) +{ + return out << md5.hexdigest(); +} + +////////////////////////////// + +std::string md5(const std::string str) +{ + MD5 md5 = MD5(str); + + return md5.hexdigest(); +}
\ No newline at end of file diff --git a/src/3rd_party-static/message_broker/src/lib_messagebroker/md5.h b/src/3rd_party-static/message_broker/src/lib_messagebroker/md5.h new file mode 100644 index 0000000000..2c54c03b1b --- /dev/null +++ b/src/3rd_party-static/message_broker/src/lib_messagebroker/md5.h @@ -0,0 +1,93 @@ +/* MD5 + converted to C++ class by Frank Thilo (thilo@unix-ag.org) + for bzflag (http://www.bzflag.org) + + based on: + + md5.h and md5.c + reference implementation of RFC 1321 + + Copyright (C) 1991-2, RSA Data Security, Inc. Created 1991. All + rights reserved. + + License to copy and use this software is granted provided that it + is identified as the "RSA Data Security, Inc. MD5 Message-Digest + Algorithm" in all material mentioning or referencing this software + or this function. + + License is also granted to make and use derivative works provided + that such works are identified as "derived from the RSA Data + Security, Inc. MD5 Message-Digest Algorithm" in all material + mentioning or referencing the derived work. + + RSA Data Security, Inc. makes no representations concerning either + the merchantability of this software or the suitability of this + software for any particular purpose. It is provided "as is" + without express or implied warranty of any kind. + + These notices must be retained in any copies of any part of this + documentation and/or software. + + */ + +#ifndef BZF_MD5_H +#define BZF_MD5_H + +#include <string> +#include <iostream> + +// a small class for calculating MD5 hashes of strings or byte arrays +// it is not meant to be fast or secure +// +// usage: 1) feed it blocks of uchars with update() +// 2) finalize() +// 3) get hexdigest() string +// or +// MD5(std::string).hexdigest() +// +// assumes that char is 8 bit and int is 32 bit +class MD5 +{ +public: + typedef unsigned int size_type; // must be 32bit + + MD5(); + MD5(const std::string& text); + void update(const unsigned char *buf, size_type length); + void update(const char *buf, size_type length); + MD5& finalize(); + void getdigest(char *digest) const; // digest must be 16 bytes long + std::string hexdigest() const; + friend std::ostream& operator<<(std::ostream&, MD5 md5); + +private: + void init(); + typedef unsigned char uint1; // 8bit + typedef unsigned int uint4; // 32bit + enum {blocksize = 64}; // VC6 won't eat a const static int here + + void transform(const uint1 block[blocksize]); + static void decode(uint4 output[], const uint1 input[], size_type len); + static void encode(uint1 output[], const uint4 input[], size_type len); + + bool finalized; + uint1 buffer[blocksize]; // bytes that didn't fit in last 64 byte chunk + uint4 count[2]; // 64bit counter for number of bits (lo, hi) + uint4 state[4]; // digest so far + uint1 digest[16]; // the result + + // low level logic operations + static inline uint4 F(uint4 x, uint4 y, uint4 z); + static inline uint4 G(uint4 x, uint4 y, uint4 z); + static inline uint4 H_(uint4 x, uint4 y, uint4 z); + static inline uint4 I(uint4 x, uint4 y, uint4 z); + static inline uint4 rotate_left(uint4 x, int n); + static inline void FF(uint4 &a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); + static inline void GG(uint4 &a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); + static inline void HH(uint4 &a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); + static inline void II(uint4 &a, uint4 b, uint4 c, uint4 d, uint4 x, uint4 s, uint4 ac); +}; + +std::string md5(const std::string str); + +#endif
\ No newline at end of file diff --git a/src/3rd_party-static/message_broker/src/lib_messagebroker/system.cpp b/src/3rd_party-static/message_broker/src/lib_messagebroker/system.cpp new file mode 100644 index 0000000000..456362f9d8 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/lib_messagebroker/system.cpp @@ -0,0 +1,267 @@ +/* + * JsonRpc-Cpp - JSON-RPC implementation. + * Copyright (C) 2008-2011 Sebastien Vincent <sebastien.vincent@cppextrem.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** + * \file system.cpp + * \brief System utils. + * \author Sebastien Vincent + */ + +#include <time.h> +#include <signal.h> + +#include "system.h" + +namespace System { + +void msleep(unsigned long ms) { +#ifdef _WIN32 + Sleep(ms); +#else + /* Unix */ + struct timespec req; + req.tv_sec = ms / 1000; + req.tv_nsec = (ms % 1000) * 1000000; + nanosleep(&req, NULL); +#endif +} + +ThreadArg::~ThreadArg() { +} + +#ifndef WIN32 + +/* POSIX specific part for thread and mutex */ + +Thread::Thread(ThreadArg* arg) { + m_arg = arg; +} + +Thread::~Thread() { + delete m_arg; +} + +bool Thread::Start(bool detach) { + pthread_attr_t attr; + int ret = -1; + + /* must have valid object argument */ + if (m_arg == NULL) { + return false; + } + + /* set the detach state value */ + if (pthread_attr_init(&attr) != 0) { + return false; + } + + if (pthread_attr_setdetachstate(&attr, detach ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE) != 0) { + pthread_attr_destroy(&attr); + return false; + } + + /* create thread */ + ret = pthread_create(&m_id, &attr, &Thread::Call, this); + pthread_setname_np(m_id, "MB Thread"); + pthread_attr_destroy(&attr); + return ret == 0; +} + +bool Thread::Stop() { + pthread_cancel(m_id); + return false;// Android does not support 'pthread_cancel'; +} + +bool Thread::Join(void** ret) { + return pthread_join(m_id, ret) == 0; +} + +void* Thread::Call(void* arg) { + // Disable system signals receiving in thread + // by setting empty signal mask + // (system signals processes only in the main thread) + sigset_t set; + sigfillset(&set); + pthread_sigmask(SIG_SETMASK, &set, NULL); + + Thread* thread = static_cast<Thread*>(arg); + + /* call our specific object method */ + return thread->m_arg->Call(); +} + +Mutex::Mutex() { + pthread_mutexattr_t attr; + + pthread_mutexattr_init(&attr); + pthread_mutex_init(&m_mutex, &attr); + pthread_mutexattr_destroy(&attr); +} + +Mutex::~Mutex() { + pthread_mutex_destroy(&m_mutex); +} + +bool Mutex::Lock() { + return !pthread_mutex_lock(&m_mutex); +} + +bool Mutex::Unlock() { + return !pthread_mutex_unlock(&m_mutex); +} + + +// Based on Binary Semaphores example at +// http://www.mathcs.emory.edu/~cheung/Courses/455/Syllabus/5c-pthreads/sync.html +BinarySemaphore::BinarySemaphore() : + m_mutex(PTHREAD_MUTEX_INITIALIZER), + m_cond(PTHREAD_COND_INITIALIZER), + m_isUp(false) { + pthread_mutex_init(&m_mutex, NULL); + pthread_cond_init(&m_cond, NULL); +} + +BinarySemaphore::~BinarySemaphore() { + pthread_cond_destroy(&m_cond); + pthread_mutex_destroy(&m_mutex); +} + +void BinarySemaphore::Wait() { + // try to get exclusive access to the flag + pthread_mutex_lock(&m_mutex); + // success: no other thread can get here unless + // the current thread unlocks the mutex + + // wait until the flag is up + while (!m_isUp) { + pthread_cond_wait(&m_cond, &m_mutex); + // when the current thread executes this, it will be + // blocked on m_cond, and automatically unlocks the + // mutex! Unlocking the mutex will let other threads + // in to test the flag. + } + + // here we know that flag is upand this thread has now + // successfully passed the semaphore + + // this will cause all other threads that execute the Wait() + // call to wait in the above loop + m_isUp = false; + + // release the exclusive access to the flag + pthread_mutex_unlock(&m_mutex); +} + +void BinarySemaphore::Notify() { + // try to get exclusive access to the flag + pthread_mutex_lock(&m_mutex); + + // this call may resume a thread that is blocked on m_cond + // (in the Wait() call). if there was none, this does nothing + pthread_cond_signal(&m_cond); + + // up the flag + m_isUp = true; + + // release the exclusive access to the flag + pthread_mutex_unlock(&m_mutex); +} + +#else + +/* Windows specific part for thread and mutex */ + +Thread::Thread(ThreadArg* arg) { + m_arg = arg; +} + +Thread::~Thread() { + delete m_arg; +} + +bool Thread::Start(bool detach) { + detach = detach; /* unused parameter */ + + m_id = CreateThread(NULL, /* default security attributes */ + 0, /* use default stack size */ + &Thread::Call, /* thread function name */ + this, /* argument to thread function */ + 0, /* use default creation flags */ + NULL); /* returns the thread identifier */ + + return m_id != NULL; +} + +bool Thread::Stop() { + return TerminateThread(m_id, (DWORD) - 1); +} + +bool Thread::Join(void** ret) { + DWORD val = 0; + WaitForSingleObject(m_id, INFINITE); + GetExitCodeThread(m_id, &val); + CloseHandle(m_id); + m_id = NULL; + *ret = (void*)val; + return true; +} + +DWORD WINAPI Thread::Call(LPVOID arg) { + Thread* thread = static_cast<Thread*>(arg); + + /* call our specific object method */ +#ifdef _WIN64 + return (DWORD64)thread->m_arg->Call(); +#else + return (DWORD)thread->m_arg->Call(); +#endif +} + +Mutex::Mutex() { + m_mutex = CreateMutex(NULL, /* no security attribute */ + 0, /* not initial owner (i.e. no first lock) */ + NULL); /* no name */ +} + +Mutex::~Mutex() { + /* free mutex */ + if (m_mutex) { + CloseHandle(m_mutex); + } +} + +bool Mutex::Lock() { + if (!m_mutex) { + return false; + } + + return (WaitForSingleObject(m_mutex, INFINITE) == WAIT_OBJECT_0); +} + +bool Mutex::Unlock() { + if (!m_mutex) { + return false; + } + + return ReleaseMutex(m_mutex); +} + +#endif + +} /* namespace System */ + diff --git a/src/3rd_party-static/message_broker/src/lib_messagebroker/websocket_handler.cpp b/src/3rd_party-static/message_broker/src/lib_messagebroker/websocket_handler.cpp new file mode 100644 index 0000000000..7d3890b7a8 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/lib_messagebroker/websocket_handler.cpp @@ -0,0 +1,667 @@ +/** + * \file websocket_handler.cpp + * \brief WebSocket Handler. + * \author AKara + */ + +#include <cstdio> + +#include <cstring> +#include <sstream> +#include <netinet/in.h> + +#ifdef _WIN32 +#include <winsock2.h> +#endif//_WIN32 + +#include "websocket_handler.hpp" + +#include "libMBDebugHelper.h" +#include "md5.h" + +namespace NsMessageBroker +{ + + unsigned int CWebSocketHandler::parseWebSocketDataLength( + const char* Buffer, unsigned int& b_size) { + + unsigned char payload = + (unsigned char)((Buffer[1] & 0x40) | (Buffer[1] & 0x20) | + (Buffer[1] & 0x10) | (Buffer[1] & 0x08) | (Buffer[1] & 0x04) | + (Buffer[1] & 0x02) | (Buffer[1] & 0x01)); + unsigned long length = 0; + unsigned char position = 2; // current buffer position + + switch(payload) { + case 126: + { + length = (unsigned char)Buffer[position++]; + length <<=8; + length |= (unsigned char)Buffer[position++]; + break; + } + case 127: + { + length = (unsigned char)Buffer[position++]; + length <<=8; + length |= (unsigned char)Buffer[position++]; + length <<=8; + length |= (unsigned char)Buffer[position++]; + length <<=8; + length |= (unsigned char)Buffer[position++]; + length <<=8; + length |= (unsigned char)Buffer[position++]; + length <<=8; + length |= (unsigned char)Buffer[position++]; + length <<=8; + length |= (unsigned char)Buffer[position++]; + length <<=8; + length |= (unsigned char)Buffer[position++]; + break; + } + default: + { + length = payload; + return length; + } + } + + return length; + } + + int CWebSocketHandler::parseWebSocketData(char* Buffer, unsigned int& b_size) { + // Please see RFC6455 standard protocol specification: + //http://tools.ietf.org/html/rfc6455 + // Chapter 5.2 + DBG_MSG(("CWebSocketHandler::parseWebSocketData()b_size = %d\n", b_size)); + char* recBuffer = Buffer; + unsigned int parsedBufferPosition = 0; + unsigned char position = 0; // current buffer position + unsigned int size = b_size; + + static uint32_t minimum_heade_size = 4; + while (minimum_heade_size < size) { + + bool fin = ((recBuffer[0] & 0x80) | (recBuffer[0] & 0x01)) == 0x81; + bool rsv1 = (recBuffer[0] & 0x40) == 0x40; + bool rsv2 = (recBuffer[0] & 0x20) == 0x20; + bool rsv3 = (recBuffer[0] & 0x10) == 0x10; + unsigned char opCode = ((recBuffer[0] & 0x08) | (recBuffer[0] & 0x04) | + (recBuffer[0] & 0x02) | (recBuffer[0] & 0x01)); + + bool mask = (recBuffer[1] & 0x80) == 0x80; + + DBG_MSG(("CWebSocketHandler::fin = %d recBuffer[0] = 0x%02X\n" + " parsedlength = %d b_size= %d parsedBufferPosition = %d\n" + "rsv1 = %d, rsv2 = %d, rsv3 = %d, opCode = %u\n", + fin, recBuffer[0], parsedBufferPosition + position, + size, parsedBufferPosition, rsv1, rsv2, rsv3, opCode)); + + if ((rsv1)|(rsv2)|(rsv3)) { + DBG_MSG(("rsv1 or rsv2 or rsv3 is 0 \n")); + break; + } + + switch(opCode) { + case 0x0: break; //Continuation frame + case 0x1: break; //Text Frame + case 0x2: break; //Binary Frame + case 0x8: break; //Connection close Frame + case 0x9: break; //ping Frame + case 0xA: break; //Pong Frame + default: break; //Unknown frame + } + + if (false == fin) { + break; + } + + unsigned char payload = (unsigned char) + ((recBuffer[1] & 0x40) | (recBuffer[1] & 0x20) | (recBuffer[1] & 0x10) | + (recBuffer[1] & 0x08) | (recBuffer[1] & 0x04) | (recBuffer[1] & 0x02) | + (recBuffer[1] & 0x01)); + + unsigned long length = parseWebSocketDataLength(recBuffer, size); + position = 2; + + if (length > size) { + DBG_MSG_ERROR(("Incomplete message")); + break; + } + + switch(payload) { + case 126: { + position +=2; + break; + } + case 127: { + position +=8; + break; + } + default: { + break; + } + } + + if (mask) { + unsigned char maskKeys[4]; + maskKeys[0] = recBuffer[position++]; + maskKeys[1] = recBuffer[position++]; + maskKeys[2] = recBuffer[position++]; + maskKeys[3] = recBuffer[position++]; + DBG_MSG(("CWebSocketHandler::parseWebSocketData()maskKeys[0]:0x%02X;" + "maskKeys[1]:0x%02X; maskKeys[2]:0x%02X; maskKeys[3]:0x%02X\n" + , maskKeys[0], maskKeys[1], maskKeys[2], maskKeys[3])); + for (unsigned long i = position; i < position+length; i++) + { + recBuffer[i] = recBuffer[i] ^ maskKeys[(i-position)%4]; + } + } + DBG_MSG(("CWebSocketHandler::parseWebSocketData()length:%d; size:%d;" + " position:%d\n", (int)length, size, position)); + + for (unsigned long i = 0; (i < size); i++) { + Buffer[parsedBufferPosition + i] = recBuffer[i+position]; + } + b_size -= position; + parsedBufferPosition += length; + recBuffer += length; + size -= length+position; + } + return b_size; + } + + int CWebSocketHandler::prepareWebSocketDataHeader(unsigned char* Buffer, + unsigned long long b_size) + { + unsigned int headerLength = 2; + unsigned char payload; + + memset(Buffer, 0, headerLength); + Buffer[0] = 0x81; // 129 + + if (b_size <= 125) + { + payload = b_size; + Buffer[1] = b_size; // string length + } else if (b_size >= 126 && b_size <= 65535) + { + headerLength += 2; + payload = 126; + Buffer[1] = 0x7E; // 126 + } else + { + headerLength += 8; + payload = 127; + Buffer[1] = 0x7F; // 127 + } + + + if (payload == 126) + { + Buffer[2] = (b_size>>8); + Buffer[3] = b_size; + } else if (payload == 127) + { + Buffer[9] = (b_size & 0xFF); + Buffer[8] = ((b_size>>8) & 0xFF); + Buffer[7] = ((b_size>>16) & 0xFF); + Buffer[6] = ((b_size>>24) & 0xFF); + Buffer[5] = ((b_size>>32) & 0xFF); + Buffer[4] = ((b_size>>40) & 0xFF); + Buffer[3] = ((b_size>>48) & 0xFF); + Buffer[2] = ((b_size>>56) & 0xFF); + } + return headerLength; +} + + void CWebSocketHandler::handshake_0405(std::string &key) + { + static const char *websocket_magic_guid_04 = + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + char accept_buf[MAX_WEBSOCKET_04_KEY_LEN + 37]; + unsigned char hash[20] = {0xb3, 0x7a, 0x4f, 0x2c, 0xc0, 0x62, 0x4f, 0x16, 0x90, 0xf6, 0x46, 0x06, 0xcf, 0x38, 0x59, 0x45, 0xb2, 0xbe, 0xc4, 0xea}; + int accept_len; + + strncpy(accept_buf, key.c_str(), MAX_WEBSOCKET_04_KEY_LEN + 37); + strncpy(accept_buf + key.length(), websocket_magic_guid_04, + MAX_WEBSOCKET_04_KEY_LEN + 37 - strlen(key.c_str())); + + SHA1((unsigned char *)accept_buf, key.length() + strlen(websocket_magic_guid_04), hash); + + accept_len = lws_b64_encode_string((char *)hash, 20, accept_buf, sizeof accept_buf); + if (accept_len < 0) + { + fprintf(stderr, "Base64 encoded hash too long\n"); + } + fprintf(stderr, "accept_buf: %s\n", accept_buf); + key = accept_buf; + } + + void CWebSocketHandler::sha1_step(struct sha1_ctxt *ctxt) + { + unsigned int a, b, c, d, e; + size_t t, s; + unsigned int tmp; + + struct sha1_ctxt tctxt; + + memcpy(&tctxt.m.b8[0], &ctxt->m.b8[0], 64); + ctxt->m.b8[0] = tctxt.m.b8[3]; ctxt->m.b8[1] = tctxt.m.b8[2]; + ctxt->m.b8[2] = tctxt.m.b8[1]; ctxt->m.b8[3] = tctxt.m.b8[0]; + ctxt->m.b8[4] = tctxt.m.b8[7]; ctxt->m.b8[5] = tctxt.m.b8[6]; + ctxt->m.b8[6] = tctxt.m.b8[5]; ctxt->m.b8[7] = tctxt.m.b8[4]; + ctxt->m.b8[8] = tctxt.m.b8[11]; ctxt->m.b8[9] = tctxt.m.b8[10]; + ctxt->m.b8[10] = tctxt.m.b8[9]; ctxt->m.b8[11] = tctxt.m.b8[8]; + ctxt->m.b8[12] = tctxt.m.b8[15]; ctxt->m.b8[13] = tctxt.m.b8[14]; + ctxt->m.b8[14] = tctxt.m.b8[13]; ctxt->m.b8[15] = tctxt.m.b8[12]; + ctxt->m.b8[16] = tctxt.m.b8[19]; ctxt->m.b8[17] = tctxt.m.b8[18]; + ctxt->m.b8[18] = tctxt.m.b8[17]; ctxt->m.b8[19] = tctxt.m.b8[16]; + ctxt->m.b8[20] = tctxt.m.b8[23]; ctxt->m.b8[21] = tctxt.m.b8[22]; + ctxt->m.b8[22] = tctxt.m.b8[21]; ctxt->m.b8[23] = tctxt.m.b8[20]; + ctxt->m.b8[24] = tctxt.m.b8[27]; ctxt->m.b8[25] = tctxt.m.b8[26]; + ctxt->m.b8[26] = tctxt.m.b8[25]; ctxt->m.b8[27] = tctxt.m.b8[24]; + ctxt->m.b8[28] = tctxt.m.b8[31]; ctxt->m.b8[29] = tctxt.m.b8[30]; + ctxt->m.b8[30] = tctxt.m.b8[29]; ctxt->m.b8[31] = tctxt.m.b8[28]; + ctxt->m.b8[32] = tctxt.m.b8[35]; ctxt->m.b8[33] = tctxt.m.b8[34]; + ctxt->m.b8[34] = tctxt.m.b8[33]; ctxt->m.b8[35] = tctxt.m.b8[32]; + ctxt->m.b8[36] = tctxt.m.b8[39]; ctxt->m.b8[37] = tctxt.m.b8[38]; + ctxt->m.b8[38] = tctxt.m.b8[37]; ctxt->m.b8[39] = tctxt.m.b8[36]; + ctxt->m.b8[40] = tctxt.m.b8[43]; ctxt->m.b8[41] = tctxt.m.b8[42]; + ctxt->m.b8[42] = tctxt.m.b8[41]; ctxt->m.b8[43] = tctxt.m.b8[40]; + ctxt->m.b8[44] = tctxt.m.b8[47]; ctxt->m.b8[45] = tctxt.m.b8[46]; + ctxt->m.b8[46] = tctxt.m.b8[45]; ctxt->m.b8[47] = tctxt.m.b8[44]; + ctxt->m.b8[48] = tctxt.m.b8[51]; ctxt->m.b8[49] = tctxt.m.b8[50]; + ctxt->m.b8[50] = tctxt.m.b8[49]; ctxt->m.b8[51] = tctxt.m.b8[48]; + ctxt->m.b8[52] = tctxt.m.b8[55]; ctxt->m.b8[53] = tctxt.m.b8[54]; + ctxt->m.b8[54] = tctxt.m.b8[53]; ctxt->m.b8[55] = tctxt.m.b8[52]; + ctxt->m.b8[56] = tctxt.m.b8[59]; ctxt->m.b8[57] = tctxt.m.b8[58]; + ctxt->m.b8[58] = tctxt.m.b8[57]; ctxt->m.b8[59] = tctxt.m.b8[56]; + ctxt->m.b8[60] = tctxt.m.b8[63]; ctxt->m.b8[61] = tctxt.m.b8[62]; + ctxt->m.b8[62] = tctxt.m.b8[61]; ctxt->m.b8[63] = tctxt.m.b8[60]; + + a = H(0); b = H(1); c = H(2); d = H(3); e = H(4); + + for (t = 0; t < 20; t++) + { + s = t & 0x0f; + if (t >= 16) + W(s) = S(1, W((s+13) & 0x0f) ^ W((s+8) & 0x0f) ^ + W((s+2) & 0x0f) ^ W(s)); + + tmp = S(5, a) + F0(b, c, d) + e + W(s) + K(t); + e = d; d = c; c = S(30, b); b = a; a = tmp; + } + for (t = 20; t < 40; t++) + { + s = t & 0x0f; + W(s) = S(1, W((s+13) & 0x0f) ^ W((s+8) & 0x0f) ^ + W((s+2) & 0x0f) ^ W(s)); + tmp = S(5, a) + F1(b, c, d) + e + W(s) + K(t); + e = d; d = c; c = S(30, b); b = a; a = tmp; + } + for (t = 40; t < 60; t++) + { + s = t & 0x0f; + W(s) = S(1, W((s+13) & 0x0f) ^ W((s+8) & 0x0f) ^ + W((s+2) & 0x0f) ^ W(s)); + tmp = S(5, a) + F2(b, c, d) + e + W(s) + K(t); + e = d; d = c; c = S(30, b); b = a; a = tmp; + } + for (t = 60; t < 80; t++) + { + s = t & 0x0f; + W(s) = S(1, W((s+13) & 0x0f) ^ W((s+8) & 0x0f) ^ + W((s+2) & 0x0f) ^ W(s)); + tmp = S(5, a) + F3(b, c, d) + e + W(s) + K(t); + e = d; d = c; c = S(30, b); b = a; a = tmp; + } + + H(0) = H(0) + a; + H(1) = H(1) + b; + H(2) = H(2) + c; + H(3) = H(3) + d; + H(4) = H(4) + e; + + memset(&ctxt->m.b8[0], 0, 64); + } + + void CWebSocketHandler::sha1_init(struct sha1_ctxt *ctxt) + { + memset(ctxt, 0, sizeof(struct sha1_ctxt)); + H(0) = 0x67452301; + H(1) = 0xefcdab89; + H(2) = 0x98badcfe; + H(3) = 0x10325476; + H(4) = 0xc3d2e1f0; + } + + void CWebSocketHandler::sha1_pad(struct sha1_ctxt *ctxt) + { + size_t padlen; /*pad length in bytes*/ + size_t padstart; + + PUTPAD(0x80); + + padstart = COUNT % 64; + padlen = 64 - padstart; + if (padlen < 8) + { + memset(&ctxt->m.b8[padstart], 0, padlen); + COUNT += padlen; + COUNT %= 64; + sha1_step(ctxt); + padstart = COUNT % 64; /* should be 0 */ + padlen = 64 - padstart; /* should be 64 */ + } + memset(&ctxt->m.b8[padstart], 0, padlen - 8); + COUNT += (padlen - 8); + COUNT %= 64; + + PUTPAD(ctxt->c.b8[7]); PUTPAD(ctxt->c.b8[6]); + PUTPAD(ctxt->c.b8[5]); PUTPAD(ctxt->c.b8[4]); + PUTPAD(ctxt->c.b8[3]); PUTPAD(ctxt->c.b8[2]); + PUTPAD(ctxt->c.b8[1]); PUTPAD(ctxt->c.b8[0]); + } + + void CWebSocketHandler::sha1_loop(struct sha1_ctxt *ctxt, const unsigned char *input, size_t len) + { + size_t gaplen; + size_t gapstart; + size_t off; + size_t copysiz; + + off = 0; + + while (off < len) + { + gapstart = COUNT % 64; + gaplen = 64 - gapstart; + + copysiz = (gaplen < len - off) ? gaplen : len - off; + memcpy(&ctxt->m.b8[gapstart], &input[off], copysiz); + COUNT += copysiz; + COUNT %= 64; + ctxt->c.b64[0] += copysiz * 8; + if (COUNT % 64 == 0) + sha1_step(ctxt); + off += copysiz; + } + } + + void CWebSocketHandler::sha1_result(struct sha1_ctxt *ctxt, unsigned char* digest0) + { + unsigned char *digest; + + digest = (unsigned char *)digest0; + sha1_pad(ctxt); + digest[0] = ctxt->h.b8[3]; digest[1] = ctxt->h.b8[2]; + digest[2] = ctxt->h.b8[1]; digest[3] = ctxt->h.b8[0]; + digest[4] = ctxt->h.b8[7]; digest[5] = ctxt->h.b8[6]; + digest[6] = ctxt->h.b8[5]; digest[7] = ctxt->h.b8[4]; + digest[8] = ctxt->h.b8[11]; digest[9] = ctxt->h.b8[10]; + digest[10] = ctxt->h.b8[9]; digest[11] = ctxt->h.b8[8]; + digest[12] = ctxt->h.b8[15]; digest[13] = ctxt->h.b8[14]; + digest[14] = ctxt->h.b8[13]; digest[15] = ctxt->h.b8[12]; + digest[16] = ctxt->h.b8[19]; digest[17] = ctxt->h.b8[18]; + digest[18] = ctxt->h.b8[17]; digest[19] = ctxt->h.b8[16]; + } + + /* + * This should look and work like the libcrypto implementation + */ + + unsigned char * CWebSocketHandler::SHA1(const unsigned char *d, size_t n, unsigned char *md) + { + struct sha1_ctxt ctx; + + sha1_init(&ctx); + sha1_loop(&ctx, d, n); + sha1_result(&ctx, (unsigned char*)md); + + return md; + } + + int CWebSocketHandler::lws_b64_encode_string(const char *in, int in_len, char *out, int out_size) + { + unsigned char triple[3]; + int i; + int len; + int line = 0; + int done = 0; + + while (in_len) + { + len = 0; + for (i = 0; i < 3; i++) + { + if (in_len) + { + triple[i] = *in++; + len++; + in_len--; + } else + triple[i] = 0; + } + if (len) + { + + if (done + 4 >= out_size) + return -1; + + *out++ = encode[triple[0] >> 2]; + *out++ = encode[((triple[0] & 0x03) << 4) | + ((triple[1] & 0xf0) >> 4)]; + *out++ = (len > 1 ? encode[((triple[1] & 0x0f) << 2) | + ((triple[2] & 0xc0) >> 6)] : '='); + *out++ = (len > 2 ? encode[triple[2] & 0x3f] : '='); + + done += 4; + line += 4; + } + if (line >= 72) + { + + if (done + 2 >= out_size) + return -1; + + *out++ = '\r'; + *out++ = '\n'; + done += 2; + line = 0; + } + } + + if (done + 1 >= out_size) + return -1; + + *out++ = '\0'; + + return done; + } + + /* + * returns length of decoded string in out, or -1 if out was too small + * according to out_size + */ + + int CWebSocketHandler::lws_b64_decode_string(const char *in, char *out, int out_size) + { + int len; + int i; + int done = 0; + unsigned char v; + unsigned char quad[4]; + + while (*in) + { + + len = 0; + for (i = 0; i < 4 && *in; i++) + { + + v = 0; + while (*in && !v) + { + + v = *in++; + v = (v < 43 || v > 122) ? 0 : decode[v - 43]; + if (v) + v = (v == '$') ? 0 : v - 61; + if (*in) + { + len++; + if (v) + quad[i] = v - 1; + } else + quad[i] = 0; + } + } + if (!len) + continue; + + if (out_size < (done + len - 1)) + /* out buffer is too small */ + return -1; + + if (len >= 2) + *out++ = quad[0] << 2 | quad[1] >> 4; + if (len >= 3) + *out++ = quad[1] << 4 | quad[2] >> 2; + if (len >= 4) + *out++ = ((quad[2] << 6) & 0xc0) | quad[3]; + + done += len - 1; + } + + if (done + 1 >= out_size) + return -1; + + *out++ = '\0'; + + return done; + } + + int CWebSocketHandler::lws_b64_selftest(void) + { + char buf[64]; + int n; + unsigned int test; + static const char *plaintext[] = {"sanity check base 64"}; + static const char *coded[] = {"c2FuaXR5IGNoZWNrIGJhc2UgNjQ="}; + + for (test = 0; test < sizeof plaintext / sizeof(plaintext[0]); test++) + { + + buf[sizeof(buf) - 1] = '\0'; + n = lws_b64_encode_string(plaintext[test], + strlen(plaintext[test]), buf, sizeof buf); + if (n != (int)strlen(coded[test]) || strcmp(buf, coded[test])) + { + fprintf(stderr, "Failed lws_b64 encode selftest " + "%d result '%s' %d\n", test, buf, n); + return -1; + } + + buf[sizeof(buf) - 1] = '\0'; + n = lws_b64_decode_string(coded[test], buf, sizeof buf); + if (n != (int)strlen(plaintext[test]) || + strcmp(buf, plaintext[test])) + { + fprintf(stderr, "Failed lws_b64 decode selftest " + "%d result '%s' %d\n", test, buf, n); + return -1; + } + } + + return 0; + } + + rawBytes CWebSocketHandler::handshake_hybi00(const std::string &key1, const std::string &key2, const rawBytes &key3) + { + if (key3.size() < 8) + { + DBG_MSG_ERROR(("key3's size is %d, less than 8 bytes\n", key3.size())); + return rawBytes(); + } + + unsigned long number1 = extractNumber(key1); + unsigned long number2 = extractNumber(key2); + DBG_MSG(("number1 is %ld, number2 is %ld\n", number1, number2)); + + if ((number1 == 0) || (number2 == 0)) + { + return rawBytes(); + } + + // represent the numbers in big-endian format (network-byte order) + unsigned long bigEndianNumber1 = htonl(number1); + unsigned long bigEndianNumber2 = htonl(number2); + + // the temporary key consists of bytes of the first and second numbers + // and the key3 + rawBytes key(8); + memcpy(&key[0], &bigEndianNumber1, 4); + memcpy(&key[4], &bigEndianNumber2, 4); + key.insert(key.end(), key3.begin(), key3.begin() + 8); + + MD5 md5(std::string(key.begin(), key.end())); + char digest[16]; + md5.getdigest(digest); + rawBytes resultBytes(&digest[0], &digest[16]); + + return resultBytes; + } + + unsigned long CWebSocketHandler::extractNumber(const std::string &key) const + { + // leave digits only + // and count the number of spaces in the key + std::string keyDigits; + int spacesCountKey = 0; + for (unsigned int index = 0; index < key.length(); ++index) + { + char keyChar = key[index]; + if (keyChar == ' ') + { + ++spacesCountKey; + } + else if (isdigit(keyChar)) + { + keyDigits += keyChar; + } + } + + unsigned long result = 0; + + // convert string to number + long long numberKey; + if (std::stringstream(keyDigits) >> numberKey) + { + if (spacesCountKey != 0) + { + if (numberKey % spacesCountKey == 0) + { + // divide the number by the count + result = numberKey / spacesCountKey; + } + else + { + // key is not an integral multiple of spaces count + } + } + else + { + // the denominator is 0 + } + } + else + { + // couldn't convert + } + + return result; + } + +} /* namespace NsMessageBroker */ + diff --git a/src/3rd_party-static/message_broker/src/server/mb_server.cpp b/src/3rd_party-static/message_broker/src/server/mb_server.cpp new file mode 100644 index 0000000000..25ec7fc9f3 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/server/mb_server.cpp @@ -0,0 +1,46 @@ +/** + * \file mb_server.cpp + * \brief MessageBroker server. + * \author AKara + */ + +#include "mb_server.hpp" + +namespace NsMessageBroker { + +Server::Server(const std::string& address, uint16_t port) { + m_sock = -1; + m_address = address; + m_port = port; +} + +Server::~Server() { + if (m_sock != -1) { + Close(); + } +} + +int Server::GetSocket() const { + return m_sock; +} + +std::string Server::GetAddress() const { + return m_address; +} + +uint16_t Server::GetPort() const { + return m_port; +} + +bool Server::Bind() { + m_sock = networking::bind(m_protocol, m_address, m_port, NULL, NULL); + + return (m_sock != -1) ? true : false; +} + +void Server::Close() { + ::close(m_sock); + m_sock = -1; +} + +} /* namespace NsMessageBroker */ diff --git a/src/3rd_party-static/message_broker/src/server/mb_tcpserver.cpp b/src/3rd_party-static/message_broker/src/server/mb_tcpserver.cpp new file mode 100644 index 0000000000..bdd7b2bfdf --- /dev/null +++ b/src/3rd_party-static/message_broker/src/server/mb_tcpserver.cpp @@ -0,0 +1,331 @@ +/** + * \file mb_tcpserver.cpp + * \brief MessageBroker TCP server. + * \author AKara + */ + +#include <cstring> +#include <cerrno> +#include <iostream> +#include <algorithm> +#include <vector> +#include <assert.h> + +#include "MBDebugHelper.h" + +#include "mb_tcpserver.hpp" +#include "CMessageBroker.hpp" + +namespace NsMessageBroker { + +TcpServer::TcpServer(const std::string& address, uint16_t port, NsMessageBroker::CMessageBroker* pMessageBroker) : + Server(address, port) { + m_protocol = networking::TCP; + mpMessageBroker = pMessageBroker; +} + +TcpServer::~TcpServer() { + if (m_sock != -1) { + Close(); + } +} + +ssize_t TcpServer::Send(int fd, const std::string& data) { + DBG_MSG(("Send to %d: %s\n", fd, data.c_str())); + std::string rep = data; + if (isWebSocket(fd)) { + unsigned char buf[10] = {'\0'}; + ssize_t headerlen = mWebSocketHandler.prepareWebSocketDataHeader( + (unsigned char*)buf, (unsigned long)rep.length()); + std::string header = std::string((char*)buf, headerlen); + rep = header + rep; + } + int bytesToSend = rep.length(); + const char* ptrBuffer = rep.c_str(); + do { + int retVal = send(fd, ptrBuffer, bytesToSend, MSG_NOSIGNAL); + if (retVal == -1) { + if (EPIPE == errno) { + m_purge.push_back(fd); + } + return -1; + } + bytesToSend -= retVal; + ptrBuffer += retVal; + } while (bytesToSend > 0); + return rep.length(); +} + +bool TcpServer::Recv(int fd) { + DBG_MSG(("TcpServer::Recv(%d)\n", fd)); + + std::string* pReceivingBuffer = getBufferFor(fd); + bool buffer_was_not_empty = pReceivingBuffer->size() > 0; + + std::vector<char> buf; + buf.reserve(RECV_BUFFER_LENGTH + pReceivingBuffer->size()); + DBG_MSG(("Left in pReceivingBuffer: %d \n", + pReceivingBuffer->size())); + buf.assign(pReceivingBuffer->c_str(), + pReceivingBuffer->c_str() + pReceivingBuffer->size()); + buf.resize(RECV_BUFFER_LENGTH + pReceivingBuffer->size()); + + int received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0); + if (received_bytes <= 0) { + DBG_MSG(("Received %d bytes from %d; error = %d\n", + received_bytes, fd, errno)); + m_purge.push_back(fd); + return false; + } + + unsigned int nb = received_bytes; + std::vector<char> last_msg_buf(buf.begin()+pReceivingBuffer->size(), + buf.begin()+pReceivingBuffer->size()+nb); + DBG_MSG(("Recieved %d from %d\n", nb, fd)); + nb += static_cast<unsigned int>(pReceivingBuffer->size()); + DBG_MSG(("Recieved with buffer %d from %d\n", nb, fd)); + + if (nb > 0) { // This is redundant + if (isWebSocket(fd)) { + const unsigned int data_length = + mWebSocketHandler.parseWebSocketDataLength(&buf[0], nb); + + DBG_MSG(("Received %d actual data length %d\n", nb, data_length)); + + if (data_length > nb) { + DBG_MSG_ERROR(("Received %d actual data length %d\n", nb, data_length)); + DBG_MSG_ERROR(("Incomplete message")); + *pReceivingBuffer = std::string(&buf[0], nb); + return false; + } + mWebSocketHandler.parseWebSocketData(&buf[0], nb); + } + + *pReceivingBuffer = std::string(&buf[0], nb); + DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s\n", + pReceivingBuffer->size(), pReceivingBuffer->c_str())); + + // we need to check for websocket handshake + if (!checkWebSocketHandShake(fd, pReceivingBuffer)) + { //JSON MESSAGE received. Send data in CMessageBroker. + if (mpMessageBroker) { + size_t buffer_size_before = pReceivingBuffer->size(); + mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer, true); + + if (buffer_was_not_empty && (pReceivingBuffer->size() == buffer_size_before)) { + /* We couldn't parse the buffer (with the last message at the end) + * Try to parse ONLY the last message */ + DBG_MSG_ERROR(("Couldn't parse the whole buffer! Try only the last message.\n")); + + nb = static_cast<unsigned int>(last_msg_buf.size()); + if (isWebSocket(fd)) { + const unsigned int data_length = + mWebSocketHandler.parseWebSocketDataLength(&last_msg_buf[0], nb); + if (data_length > nb) { + DBG_MSG_ERROR(("The last message may be incomplete. Don't do anything.\n")); + /* Should we replace the buffer with the last message? + * Probably not. It may not be a real websocket message. + * Wait for a full message. */ + return false; + } + mWebSocketHandler.parseWebSocketData(&last_msg_buf[0], nb); + } + + std::string last_message = std::string(&last_msg_buf[0], nb); + buffer_size_before = last_message.size(); + mpMessageBroker->onMessageReceived(fd, last_message, false); + if ( last_message.size() < buffer_size_before ) { + /* Parsing last message successful! Discard the old data and + * keep only what is left from the last message */ + DBG_MSG_ERROR(("Parsing last message successful! Discard the old data.\n")); + *pReceivingBuffer = last_message; + } + } + } else { + return false; + } + } else { // message is a websocket handshake + ssize_t webSocketKeyPos = pReceivingBuffer->find("Sec-WebSocket-Key: "); + if (-1 != webSocketKeyPos) { + std::string handshakeResponse = + "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n" + "Connection: Upgrade\r\nSec-WebSocket-Accept: "; + std::string wsKey = pReceivingBuffer->substr(webSocketKeyPos + 19, 24); + mWebSocketHandler.handshake_0405(wsKey); + handshakeResponse += wsKey; + handshakeResponse += "\r\n\r\n"; + pReceivingBuffer->clear(); + std::list<int>::iterator acceptedClientIt = + find(m_AcceptedClients.begin(), m_AcceptedClients.end(), fd); + if (m_AcceptedClients.end() != acceptedClientIt) { + m_AcceptedClients.erase(acceptedClientIt); + } + Send(fd, handshakeResponse); + m_WebSocketClients.push_back(fd); + } + } + } + return true; +} + +bool TcpServer::checkWebSocketHandShake(int fd, std::string* pReceivingBuffer) { + bool result = false; + std::list<int>::iterator acceptedClientIt = find(m_AcceptedClients.begin(), m_AcceptedClients.end(), fd); + if (m_AcceptedClients.end() != acceptedClientIt) { + ssize_t httpheader = pReceivingBuffer->find("GET / HTTP/1.1"); + if (-1 != httpheader) { // here is a header + DBG_MSG(("HTTP header detected!\n")); + result = true; + } else { // not winsocket client + m_AcceptedClients.erase(acceptedClientIt); + } + } + DBG_MSG(("TcpServer::checkWebSocket(): %d!\n", result)); + return result; +} + +bool TcpServer::isWebSocket(int fd) { + bool result = false; + std::list<int>::iterator wsClientIt = find(m_WebSocketClients.begin(), m_WebSocketClients.end(), fd); + if (m_WebSocketClients.end() != wsClientIt) { + result = true; + } + return result; +} + +std::string* TcpServer::getBufferFor(int fd) { + std::string* res = 0; + std::map <int, std::string*>::iterator it; + it = m_receivingBuffers.find(fd); + if (it != m_receivingBuffers.end()) { + res = (*it).second; + } else { // create a new buffer... + res = new std::string(""); + printf("getBufferFor method!\n"); + m_receivingBuffers.insert(std::map<int, std::string*>::value_type(fd, res)); + } + + return res; +} + +void TcpServer::WaitMessage(uint32_t ms) { + fd_set fdsr; + struct timeval tv; + int max_sock = m_sock; + + tv.tv_sec = ms / 1000; + tv.tv_usec = (ms % 1000) / 1000; + + FD_ZERO(&fdsr); + +#ifdef _WIN32 + /* on Windows, a socket is not an int but a SOCKET (unsigned int) */ + FD_SET((SOCKET)m_sock, &fdsr); +#else + FD_SET(m_sock, &fdsr); +#endif + + for (std::map<int, std::string*>::iterator it = m_receivingBuffers.begin(); + it != m_receivingBuffers.end() ; it++) { +#ifdef _WIN32 + FD_SET((SOCKET)((*it).first), &fdsr); +#else + FD_SET(((*it).first), &fdsr); +#endif + + if (((*it).first) > max_sock) { + max_sock = ((*it).first); + } + } + + max_sock++; + + if (select(max_sock, &fdsr, NULL, NULL, ms ? &tv : NULL) > 0) { + if (FD_ISSET(m_sock, &fdsr)) { + Accept(); + } + + for (std::map<int, std::string*>::iterator it = m_receivingBuffers.begin(); + it != m_receivingBuffers.end() ; it++) { + if (FD_ISSET(((*it).first), &fdsr)) { + Recv((*it).first); + } + } + + /* remove disconnect socket descriptor */ + for (std::list<int>::iterator it = m_purge.begin(); + it != m_purge.end() ; it++) { + std::map <int, std::string*>::iterator itr; + itr = m_receivingBuffers.find((*it)); + if (itr != m_receivingBuffers.end()) + { // delete receiving buffer of disconnected client + mpMessageBroker->OnSocketClosed(itr->first); + delete itr->second; + m_receivingBuffers.erase(itr); + } + } + + /* purge disconnected list */ + m_purge.erase(m_purge.begin(), m_purge.end()); + } + else { + /* error */ + } +} + +bool TcpServer::Listen() const { + if (m_sock == -1) { + return false; + } + + if (listen(m_sock, 5) == -1) { + return false; + } + + return true; +} + +bool TcpServer::Accept() { + + int client = -1; + socklen_t addrlen = sizeof(struct sockaddr_storage); + + if (m_sock == -1) { + return false; + } + + client = accept(m_sock, 0, &addrlen); + + if (client == -1) { + return false; + } + + std::string* res = new std::string(""); + m_receivingBuffers.insert(std::map<int, std::string*>::value_type(client, res)); + m_AcceptedClients.push_back(client); + return true; +} + +void TcpServer::Close() { + /* close all client sockets */ + for (std::map<int, std::string*>::iterator it = m_receivingBuffers.begin(); + it != m_receivingBuffers.end() ; it++) { + ::close((*it).first); + if ((*it).second) { + delete(*it).second; + } + } + m_receivingBuffers.clear(); + Server::Close(); + /* listen socket should be closed in Server destructor */ +} + +void* TcpServer::MethodForThread(void* arg) { + arg = arg; + while (1) { + WaitMessage(1000); + } + return NULL; +} + +} /* namespace NsMessageBroker */ diff --git a/src/3rd_party-static/message_broker/src/server/networking.cpp b/src/3rd_party-static/message_broker/src/server/networking.cpp new file mode 100644 index 0000000000..f054431690 --- /dev/null +++ b/src/3rd_party-static/message_broker/src/server/networking.cpp @@ -0,0 +1,189 @@ +/* + * JsonRpc-Cpp - JSON-RPC implementation. + * Copyright (C) 2008-2011 Sebastien Vincent <sebastien.vincent@cppextrem.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** + * \file networking.cpp + * \brief Networking utils. + * \author Sebastien Vincent + */ + +#include <cstdio> +#include <cstring> + +#include "networking.h" + +namespace networking { +#ifdef _WIN32 +/** + * \var wsaData + * \brief MS Windows object to start + * networking stuff. + */ +static WSAData wsaData; +#endif + +bool init() { + bool ret = false; + +#ifdef _WIN32 + ret = (WSAStartup(MAKEWORD(2, 0), &wsaData) == 0); +#else + /* unix-like */ + ret = true; +#endif + + return ret; +} + +void cleanup() { +#ifdef _WIN32 + WSACleanup(); +#endif +} + +int connect(enum TransportProtocol protocol, + const std::string& address, + uint16_t port, struct sockaddr_storage* sockaddr, + socklen_t* addrlen) { + struct addrinfo hints; + struct addrinfo* res = NULL; + struct addrinfo* p = NULL; + char service[8]; + int sock = -1; + + if (!port || address == "") { + return -1; + } + + snprintf(service, sizeof(service), "%u", port); + service[sizeof(service) - 1] = 0x00; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = protocol == UDP ? SOCK_DGRAM : SOCK_STREAM; + hints.ai_protocol = protocol; + hints.ai_flags = 0; + + if (getaddrinfo(address.c_str(), + service, &hints, &res) != 0) { + return -1; + } + + for (p = res ; p ; p = p->ai_next) { + sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + + if (sock == -1) { + continue; + } + + if (protocol == TCP && ::connect(sock, (struct sockaddr*)p->ai_addr, p->ai_addrlen) == -1) { + ::close(sock); + sock = -1; + continue; + } + + if (sockaddr) { + memcpy(sockaddr, p->ai_addr, p->ai_addrlen); + } + + if (addrlen) { + *addrlen = p->ai_addrlen; + } + + /* ok so now we have a socket bound, break the loop */ + break; + } + + freeaddrinfo(res); + p = NULL; + + return sock; +} + +int bind(enum TransportProtocol protocol, + const std::string& address, uint16_t port, + struct sockaddr_storage* sockaddr, socklen_t* addrlen) { + struct addrinfo hints; + struct addrinfo* res = NULL; + struct addrinfo* p = NULL; + char service[8]; + int sock = -1; + + if (!port || address == "") { + return -1; + } + + snprintf(service, sizeof(service), "%u", port); + service[sizeof(service) - 1] = 0x00; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = protocol == UDP ? SOCK_DGRAM : SOCK_STREAM; + hints.ai_protocol = protocol; + hints.ai_flags = AI_PASSIVE; + + if (getaddrinfo(address.c_str(), service, &hints, &res) != 0) { + return -1; + } + + for (p = res ; p ; p = p->ai_next) { + int on = 1; + on = on; + + sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + + if (sock == -1) { + continue; + } + +#ifndef _WIN32 + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(int)); + + /* accept IPv6 OR IPv4 on the same socket */ + on = 1; + setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)); +#else + on = 0; +#endif + + if (::bind(sock, p->ai_addr, p->ai_addrlen) == -1) { + ::close(sock); + sock = -1; + continue; + } + + if (sockaddr) { + memcpy(sockaddr, p->ai_addr, p->ai_addrlen); + } + + if (addrlen) { + *addrlen = p->ai_addrlen; + } + + /* ok so now we have a socket bound, break the loop */ + break; + } + + freeaddrinfo(res); + p = NULL; + + return sock; +} + +} /* namespace networking */ + |