diff options
Diffstat (limited to 'src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBroker.cpp')
-rw-r--r-- | src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBroker.cpp | 968 |
1 files changed, 0 insertions, 968 deletions
diff --git a/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBroker.cpp b/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBroker.cpp deleted file mode 100644 index b1c29cb0ef..0000000000 --- a/src/3rd_party-static/message_broker/src/lib_messagebroker/CMessageBroker.cpp +++ /dev/null @@ -1,968 +0,0 @@ -/** - * \file CMessageBroker.cpp - * \brief CMessageBroker singletone class implementation. - * \author AKara - */ - -#include <cassert> -#include <stdio.h> -#include <vector> - -#include <string> - -#include "CMessageBroker.hpp" -#include "CMessageBrokerRegistry.hpp" - -#include "system.h" - -#include "json/json.h" - -#include "libMBDebugHelper.h" - -namespace NsMessageBroker { -/** - * \class CMessage - * \brief CMessage class implementation. - */ -class CMessage { - public: - /** - * \brief Constructor. - */ - CMessage(int aSenderFp, Json::Value aMessage) { - mSenderFd = aSenderFp; - mMessage = aMessage; - } - - /** - * \brief Destructor. - */ - ~CMessage() { - } - - /** - * \brief getter for Json::Value message. - * \return Json::Value message. - */ - Json::Value getMessage() const { - return mMessage; - } - - /** - * \brief getter for sender FileDescriptor. - * \return sender FileDescriptor. - */ - int getSenderFd() const { - return mSenderFd; - } - private: - /** - * \brief sender FileDescriptor. - */ - int mSenderFd; - - /** - * \brief Json::Value message. - */ - Json::Value mMessage; -}; - - -class CMessageBroker_Private { - public: - /** - * \brief Constructor. - */ - CMessageBroker_Private(); - - /** - * \brief Check if que empty (Thread safe). - * \return True when empty. - */ - bool isEventQueueEmpty(); - - /** - * \brief Pop message from que (Thread safe). - * \return Pointer to CMessage. - */ - CMessage* popMessage(); - - /** - * \brief Push message to que (Thread safe). - * \param pMessage pointer to new CMessage object. - */ - void pushMessage(CMessage* pMessage); - - /** - * \brief gets destination component name. - * \param pMessage JSON message. - * \return string destination component name. - */ - std::string getDestinationComponentName(CMessage* pMessage); - - /** - * \brief gets method name. - * \param pMessage JSON message. - * \return string method name. - */ - std::string getMethodName(CMessage* pMessage); - - /** - * \brief checks is message notification or not. - * \param pMessage JSON message. - * \return true if notification. - */ - bool isNotification(CMessage* pMessage); - - /** - * \brief checks is message response or not. - * \param pMessage JSON message. - * \return true if response. - */ - bool isResponse(CMessage* pMessage); - - /** - * \brief checks message. - * \param pMessage JSON message. - * \param error JSON message to fill in case of any errors. - * \return true if message is good. - */ - bool checkMessage(CMessage* pMessage, Json::Value& error); - - /** - * \brief Process internal MessageBrocker message - * - * \brief Register controller in MessageBroker. - * Use following JSON command to register new component: - * \code - * {"jsonrpc": "2.0", "method": "MB.registerComponent", "params": "<ComponentName>"} - * \endcode - * - * \brief Unregister controller in MessageBroker. - * Use following JSON command to unregister component: - * \code - * {"jsonrpc": "2.0", "method": "MB.unregisterComponent", "params": "<ComponentName>"} - * \endcode - * - * \brief Subscribe controller on property change. - * Use following JSON command to subscribe to notifications: - * \code - * {"jsonrpc": "2.0", "method": "MB.subscribeTo", "params": "<ComponentName>.<NotificationName>"} - * \endcode - * - * \brief Unsubscribe controller from property change. - * Use following JSON command to unsubscribe from notifications: - * \code - * {"jsonrpc": "2.0", "method": "MB.unsubscribeFrom", "params": "<ComponentName>.<NotificationName>"} - * \endcode - * - * \param pMessage JSON message. - */ - void processInternalMessage(CMessage* pMessage); - - /** - * \brief process external message. - * \param pMessage JSON message. - */ - void processExternalMessage(CMessage* pMessage); - - /** - * \brief process response. - * \param pMessage JSON message. - */ - void processResponse(CMessage* pMessage); - - /** - * \brief Process notification message. - * \brief Notify subscribers about property change. - * expected notification format example: - * \code - * {"jsonrpc": "2.0", "method": "<ComponentName>.<NotificationName>", "params": <list of params>} - * \endcode - * \param pMessage JSON message. - */ - void processNotification(CMessage* pMessage); - - /** - * \brief send error message. - * \param pMessage JSON message. - */ - void processError(CMessage* pMessage); - - /** - * \brief send Json message. - * \param fd FileDescriptor of socket. - * \param message JSON message. - */ - void sendJsonMessage(int fd, Json::Value message); - - /** - * \brief push message to wait response que. - * \param pMessage JSON message. - */ - void pushMessageToWaitQue(CMessage* pMessage); - - /** - * \brief Returns start position for Id's generator of controller. - * \return start position for Id's generator of controller (1000 id's). - */ - int getNextControllerIdDiapason() { - return 1000 * mControllersIdCounter++; - } - - /** - * \brief pop message from wait response que. - * \param pMessage JSON message. - */ - int popMessageFromWaitQue(CMessage* pMessage); - - /** - * \brief Tries to remove the parsed part of the buffer - * \param root Parsed JSON value - * \param aJSONData The string buffer - * \return true on success, false on failure - */ - bool cutParsedJSON(const Json::Value& root, std::string& aJSONData); - - /** - * \brief Finds the position just after a JSON object or array in a buffer - * \param isObject Must be true for object, false for array - * \param aJSONData The string buffer - * \return The position in the buffer after the object or array on success, - * std::strin::npos on failure - */ - size_t jumpOverJSONObjectOrArray(bool isObject, const std::string& aJSONData); - - /** - * \brief Finds the position just after a JSON string in a buffer - * \param aJSONData The string buffer - * \return The position in the buffer after the string on success, - * std::strin::npos on failure - */ - size_t jumpOverJSONString(const std::string& aJSONData); - - /** - * \brief Que of messages. - */ - std::deque<CMessage*> mMessagesQueue; - - /** - * \brief Counter of messages Id's diapason for the next controllers - * From mControllersIdCounter*1000 to mControllersIdCounter*1000+999. - */ - int mControllersIdCounter; - - /** - * \brief Que of messages which are waiting the response in format: MessageId:SenderFd. - */ - std::map<int, int> mWaitResponseQueue; - - /** - * \brief Pointer to sender. - */ - CSender* mpSender; - - /** - * \brief Pointer to registry. - */ - CMessageBrokerRegistry* mpRegistry; - - /** - * \brief JSON reader. - */ - Json::Reader m_reader; - - /** - * \brief JSON writer. - */ - Json::FastWriter m_writer; - - /** - * \brief JSON writer for receiver. - */ - Json::FastWriter m_recieverWriter; - - /** - * \brief Messages que mutex. - */ - System::Mutex mMessagesQueueMutex; - - /** - * \brief Binary semaphore that is used to notify the - * messaging thread that a new message is available. - */ - System::BinarySemaphore m_messageQueueSemaphore; -}; - -CMessageBroker_Private::CMessageBroker_Private() : - mControllersIdCounter(1), - mpSender(NULL) { - mpRegistry = CMessageBrokerRegistry::getInstance(); -} - - -CMessageBroker::CMessageBroker() : - p(new CMessageBroker_Private()) { -} - -CMessageBroker::~CMessageBroker() { - delete p, p = 0; -} - -CMessageBroker* CMessageBroker::getInstance() { - static CMessageBroker instance; - return &instance; -} - - -size_t CMessageBroker_Private::jumpOverJSONObjectOrArray(bool isObject, - const std::string& aJSONData) { - const char openBracket = isObject? '{' : '['; - const char closeBracket = isObject? '}' : ']'; - int open_minus_close_brackets(1); - size_t position = aJSONData.find(openBracket); // Find the beginning of the object - - while ((position != std::string::npos) && (open_minus_close_brackets > 0)) { - position = aJSONData.find_first_of(std::string("\"")+openBracket+closeBracket, - position+1); - if (std::string::npos == position) { - break; - } - if ('"' == aJSONData[position]) { - // Ignore string interior, which might contain brackets and escaped "-s - do { - position = aJSONData.find('"', position+1); // Find the closing quote - } while ((std::string::npos != position) && ('\\' == aJSONData[position-1])); - } else if (openBracket == aJSONData[position]) { - ++open_minus_close_brackets; - } else if (closeBracket == aJSONData[position]) { - --open_minus_close_brackets; - } - } - - if ((0 == open_minus_close_brackets) && (std::string::npos != position)) { - ++position; // Move after the closing bracket - } else { - position = std::string::npos; - } - - return position; -} - - -size_t CMessageBroker_Private::jumpOverJSONString(const std::string& aJSONData) { - size_t position = aJSONData.find('"'); // Find the beginning of the string - - do { - position = aJSONData.find('"', position+1); // Find the closing quote - } while ((std::string::npos != position) && ('\\' == aJSONData[position-1])); - - if (std::string::npos != position) { - ++position; // Move after the closing quote - } - - return position; -} - - -bool CMessageBroker_Private::cutParsedJSON(const Json::Value& root, - std::string& aJSONData) { - if (root.isNull() || aJSONData.empty()) { - DBG_MSG_ERROR(("JSON is null or the buffer is empty!\n")); - return false; - } - - std::string parsed_json_str = m_recieverWriter.write(root); - DBG_MSG(("Parsed JSON string: '%s'\n", parsed_json_str.c_str())); - - // Trim front spaces (if any) - const size_t nonempty_position = aJSONData.find_first_not_of(" \t\n\v\f\r"); - aJSONData.erase(0, nonempty_position); - if (std::string::npos == nonempty_position) { - DBG_MSG_ERROR(("Buffer contains only blanks!\n")); - return false; - } - - // JSON writer puts '\n' at the end. Remove it. - const size_t final_lf_pos = parsed_json_str.rfind('\n'); - if (final_lf_pos == parsed_json_str.length()-1) { - parsed_json_str.erase(final_lf_pos, 1); - } - - /* RFC 4627: "A JSON value MUST be an object, array, number, or string, or - * one of the following three literal names: false null true" - * So we will try to find the borders of the parsed part based on its type. */ - - size_t position(std::string::npos); - - if (0 == aJSONData.find(parsed_json_str)) { - // If by chance parsed JSON is the same in the buffer and is at the beginning - position = parsed_json_str.length(); - } else if (root.isObject() || root.isArray()) { - position = jumpOverJSONObjectOrArray(root.isObject(), aJSONData); - } else if (root.isString()) { - position = jumpOverJSONString(aJSONData); - } else if (root.isNumeric()) { - position = aJSONData.find_first_not_of("+-0123456789.eE"); - } else if (root.isBool() || ("null" == parsed_json_str)) { - position = aJSONData.find(parsed_json_str); - if (std::string::npos != position) { - position += parsed_json_str.length(); - } - } else { - DBG_MSG_ERROR(("Unknown JSON type!\n")); - } - - if (std::string::npos == position) { - DBG_MSG_ERROR(("Error finding JSON object boundaries!\n")); - /* This should not happen, because the string is already parsed as a - * valid JSON. If this happens then above code is wrong. It is better - * to assert() than just return here, because otherwise we may enter an - * endless cycle - fail to process one and the same message again and - * again. Or we may clear the buffer and return, but in this way we will - * loose the next messages, miss a bug here, and create another bug. */ - assert(std::string::npos != position); - return false; // For release version - } - - if ((position >= aJSONData.length()) || - ((position == aJSONData.length()-1) && isspace(aJSONData[position]))) { - // No next object. Clear entire aJSONData. - aJSONData = ""; - } else { - // There is another object. Clear the current one. - aJSONData.erase(0, position); - } - - return true; -} - - -void CMessageBroker::onMessageReceived(int fd, std::string& aJSONData, bool tryHard) { - DBG_MSG(("CMessageBroker::onMessageReceived(%d, '%s')\n", fd, aJSONData.c_str())); - - while (! aJSONData.empty()) { - Json::Value root; - if ((! p->m_reader.parse(aJSONData, root)) || root.isNull()) { - DBG_MSG_ERROR(("Unable to parse JSON!\n")); - if (! tryHard) { - return; - } - uint8_t first_byte = static_cast<uint8_t>(aJSONData[0]); - if ((first_byte <= 0x08) || ((first_byte >= 0x80) && (first_byte <= 0x88))) { - DBG_MSG(("There is an unparsed websocket header probably.\n")); - /* Websocket headers can have FIN flag set in the first byte (0x80). - * Then there are 3 zero bits and 4 bits for opcode (from 0x00 to 0x0A). - * But actually we don't use opcodes above 0x08. - * Use this fact to distinguish websocket header from payload text data. - * It can be a coincidence of course, but we have to give it a try. */ - return; - } else if ('{' == aJSONData[0]) { - const bool is_object = true; - const size_t next_object_pos = - p->jumpOverJSONObjectOrArray(is_object, aJSONData); - - if (next_object_pos != std::string::npos) { - DBG_MSG_ERROR(("Invalid JSON object probably. Skipping.\n")); - aJSONData.erase(0, next_object_pos); - DBG_MSG(("Buffer after cut is: '%s'\n", aJSONData.c_str())); - continue; - } - DBG_MSG_ERROR(("Incomplete JSON object probably.\n")); - return; - } else { - DBG_MSG_ERROR(("Step in the buffer and try again...\n")); - aJSONData.erase(0, 1); - DBG_MSG(("Buffer after cut is: '%s'\n", aJSONData.c_str())); - continue; - } - - } else if (! root.isObject()) { - /* JSON RPC 2.0 messages are objects. Batch calls must be pre-rpocessed, - * so no need for "and !root.isArray()" */ - DBG_MSG_ERROR(("Parsed JSON is not an object!\n")); - if (! tryHard) { - return; - } - // Cut parsed data from the buffer below and continue - - } else if ((!root.isMember("jsonrpc")) || (root["jsonrpc"]!="2.0")) { - DBG_MSG_ERROR(("'jsonrpc' is not set correctly in parsed JSON!\n")); - if (! tryHard) { - return; - } - // Cut parsed object from the buffer below and continue - - } else { - // Parsing successful. Pass the message up. - p->pushMessage(new CMessage(fd, root)); - } - - p->cutParsedJSON(root, aJSONData); - - DBG_MSG(("Buffer after cut is: '%s'\n", aJSONData.c_str())); - } -} - -void CMessageBroker::Test() { - Json::Value root, err; - std::string ReceivingBuffer = - "{\"id\":0,\"jsonrpc\":\"2.0\",\"method\":\"MB.registerComponent\",\"params\":{\"componentName\":\"AVA\"}}123{\"id\":0,\"jsonrpc\":\"2.0\",\"method\":\"MB.registerComponent\",\"params\":{\"componentName\":\"AVA\"}}"; - DBG_MSG(("String is:%s\n", ReceivingBuffer.c_str())); - while (1) { - if (!p->m_reader.parse(ReceivingBuffer, root)) { - DBG_MSG_ERROR(("Received not JSON string! %s\n", ReceivingBuffer.c_str())); - return; - } - std::string wmes = p->m_recieverWriter.write(root); - DBG_MSG(("Parsed JSON string:%s; length: %zu\n", wmes.c_str(), wmes.length())); - DBG_MSG(("Buffer is:%s\n", ReceivingBuffer.c_str())); - ssize_t beginpos = ReceivingBuffer.find(wmes); - ReceivingBuffer.erase(0, beginpos + wmes.length()); - DBG_MSG(("Buffer after cut is:%s\n", ReceivingBuffer.c_str())); - CMessage message(0, root); - if (p->checkMessage(&message, err)) { - //here put message to que - } else { - DBG_MSG_ERROR(("Wrong message:%s\n", wmes.c_str())); - } - } -} - -void CMessageBroker::OnSocketClosed(const int fd) { - DBG_MSG(("CMessageBroker::OnSocketClosed(%d)\n", fd)); - if (p->mpRegistry) { - p->mpRegistry->removeControllersByDescriptor(fd); - } -} - -void CMessageBroker::startMessageBroker(CSender* pSender) { - DBG_MSG(("CMessageBroker::startMessageBroker()\n")); - p->mpSender = pSender; -} - -void CMessageBroker::stopMessageBroker() { - p->mpSender = NULL; - DBG_MSG(("CMessageBroker::stopMessageBroker()\n")); -} - -CMessage* CMessageBroker_Private::popMessage() { - CMessage* ret = NULL; - DBG_MSG(("CMessageBroker::popMessage()\n")); - mMessagesQueueMutex.Lock(); - if (false == mMessagesQueue.empty()) { - ret = mMessagesQueue.front(); - mMessagesQueue.pop_front();// delete message from que - } else { - DBG_MSG(("Que is empty!\n")); - } - mMessagesQueueMutex.Unlock(); - return ret; -} - -void CMessageBroker_Private::pushMessage(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::pushMessage()\n")); - mMessagesQueueMutex.Lock(); - if (pMessage) { - mMessagesQueue.push_back(pMessage); - } else { - DBG_MSG_ERROR(("NULL pointer!\n")); - } - mMessagesQueueMutex.Unlock(); - - m_messageQueueSemaphore.Notify(); -} - -bool CMessageBroker_Private::isEventQueueEmpty() { - bool bResult = true; - mMessagesQueueMutex.Lock(); - bResult = mMessagesQueue.empty(); - mMessagesQueueMutex.Unlock(); - return bResult; -} - -std::string CMessageBroker_Private::getDestinationComponentName(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::getDestinationComponentName()\n")); - std::string ret = ""; - if (pMessage) { - Json::Value mes = pMessage->getMessage(); - std::string method = mes["method"].asString(); - int pos = method.find("."); - if (-1 != pos) { - ret = method.substr(0, pos); - } - DBG_MSG(("Destination component is: %s\n", ret.c_str())); - } else { - DBG_MSG_ERROR(("NULL pointer!\n")); - } - return ret; -} - -std::string CMessageBroker_Private::getMethodName(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::getMethodName()\n")); - std::string ret = ""; - if (pMessage) { - Json::Value mes = pMessage->getMessage(); - std::string method = mes["method"].asString(); - int pos = method.find("."); - if (-1 != pos) { - ret = method.substr(pos + 1); - } - DBG_MSG(("Method is: %s\n", ret.c_str())); - } else { - DBG_MSG_ERROR(("NULL pointer!\n")); - } - return ret; -} - -bool CMessageBroker_Private::isNotification(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::isNotification()\n")); - bool ret = false; - Json::Value mes = pMessage->getMessage(); - if (false == mes.isMember("id")) { - ret = true; - } - DBG_MSG(("Result: %d\n", ret)); - return ret; -} - -bool CMessageBroker_Private::isResponse(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::isResponse()\n")); - bool ret = false; - Json::Value mes = pMessage->getMessage(); - if ((true == mes.isMember("result")) || (true == mes.isMember("error"))) { - ret = true; - } - DBG_MSG(("Result: %d\n", ret)); - return ret; -} - -void CMessageBroker_Private::pushMessageToWaitQue(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::pushMessageToWaitQue()\n")); - if (pMessage) { - Json::Value root = pMessage->getMessage(); - mWaitResponseQueue.insert(std::map<int, int>::value_type(root["id"].asInt(), pMessage->getSenderFd())); - } else { - DBG_MSG_ERROR(("NULL pointer!\n")); - } -} - -int CMessageBroker_Private::popMessageFromWaitQue(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::popMessageFromWaitQue()\n")); - int result = -1; - if (pMessage) { - Json::Value root = pMessage->getMessage(); - int messageId = root["id"].asInt(); - std::map <int, int>::iterator it; - it = mWaitResponseQueue.find(messageId); - if (it != mWaitResponseQueue.end()) { - result = (*it).second; - mWaitResponseQueue.erase(it); - } - } else { - DBG_MSG_ERROR(("NULL pointer!\n")); - } - DBG_MSG(("Senders Fd: %d\n", result)); - return result; -} - -void CMessageBroker_Private::processInternalMessage(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::processInternalMessage()\n")); - if (pMessage) { - std::string amethodName = getMethodName(pMessage); - DBG_MSG(("Method: %s\n", amethodName.c_str())); - Json::Value root = pMessage->getMessage(); - if ("registerComponent" == amethodName) { - Json::Value params = root["params"]; - if (params.isMember("componentName") && params["componentName"].isString()) { - std::string controllerName = params["componentName"].asString(); - if (mpRegistry->addController(pMessage->getSenderFd(), controllerName)) { - Json::Value response; - response["id"] = root["id"]; - response["jsonrpc"] = "2.0"; - response["result"] = getNextControllerIdDiapason(); - sendJsonMessage(pMessage->getSenderFd(), response); - } else { - Json::Value error, err; - error["id"] = root["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = CONTROLLER_EXISTS; - err["message"] = "Controller has been already registered."; - error["error"] = err; - processError(new CMessage(pMessage->getSenderFd(), error)); - } - } else { - Json::Value error, err; - error["id"] = root["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Wrong method parameter."; - error["error"] = err; - processError(new CMessage(pMessage->getSenderFd(), error)); - } - } else if ("subscribeTo" == amethodName) { - Json::Value params = root["params"]; - if (params.isMember("propertyName") && params["propertyName"].isString()) { - std::string propertyName = params["propertyName"].asString(); - if (mpRegistry->addSubscriber(pMessage->getSenderFd(), propertyName)) { - Json::Value response; - response["id"] = root["id"]; - response["jsonrpc"] = "2.0"; - response["result"] = "OK"; - sendJsonMessage(pMessage->getSenderFd(), response); - } else { - Json::Value error, err; - error["id"] = root["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = CONTROLLER_EXISTS; - err["message"] = "Subscribe has been already registered."; - error["error"] = err; - processError(new CMessage(pMessage->getSenderFd(), error)); - } - } else { - Json::Value error, err; - error["id"] = root["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Wrong method parameter."; - error["error"] = err; - processError(new CMessage(pMessage->getSenderFd(), error)); - } - } else if ("unregisterComponent" == amethodName) { - Json::Value params = root["params"]; - if (params.isMember("componentName") && params["componentName"].isString()) { - std::string controllerName = params["componentName"].asString(); - mpRegistry->deleteController(controllerName); - Json::Value response; - response["id"] = root["id"]; - response["jsonrpc"] = "2.0"; - response["result"] = "OK"; - sendJsonMessage(pMessage->getSenderFd(), response); - } else { - Json::Value error, err; - error["id"] = root["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Wrong method parameter."; - error["error"] = err; - processError(new CMessage(pMessage->getSenderFd(), error)); - } - } else if ("unsubscribeFrom" == amethodName) { - Json::Value params = root["params"]; - if (params.isMember("propertyName") && params["propertyName"].isString()) { - std::string propertyName = params["propertyName"].asString(); - mpRegistry->deleteSubscriber(pMessage->getSenderFd(), propertyName); - Json::Value response; - response["id"] = root["id"]; - response["jsonrpc"] = "2.0"; - response["result"] = "OK"; - sendJsonMessage(pMessage->getSenderFd(), response); - } else { - Json::Value error, err; - error["id"] = root["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Wrong method parameter."; - error["error"] = err; - processError(new CMessage(pMessage->getSenderFd(), error)); - } - } else { - DBG_MSG(("Unknown method!\n")); - Json::Value error; - Json::Value err; - error["id"] = root["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Invalid MessageBroker method."; - error["error"] = err; - processError(new CMessage(pMessage->getSenderFd(), error)); - } - } else { - DBG_MSG_ERROR(("NULL pointer!\n")); - } -} - -void CMessageBroker_Private::processExternalMessage(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::processExternalMessage()\n")); - if (pMessage) { - std::string destComponentName = getDestinationComponentName(pMessage); - int destFd = mpRegistry->getDestinationFd(destComponentName); - Json::Value root = pMessage->getMessage(); - if (0 < destFd) { - sendJsonMessage(destFd, root); - pushMessageToWaitQue(pMessage); - } else { - // error, controller not found in the registry - DBG_MSG(("Unknown method!\n")); - Json::Value error; - Json::Value err; - Json::Value error_data; - error["id"] = root["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = UNSUPPORTED_RESOURCE; - err["message"] = "Destination controller not found!"; - error_data["method"] = root["method"]; - err["data"] = error_data; - error["error"] = err; - processError(new CMessage(pMessage->getSenderFd(), error)); - } - } else { - DBG_MSG_ERROR(("NULL pointer\n")); - } -} - -void CMessageBroker_Private::processResponse(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::processResponse()\n")); - if (pMessage) { - int senderFd = popMessageFromWaitQue(pMessage); - if (-1 != senderFd) { - sendJsonMessage(senderFd, pMessage->getMessage()); - } - } else { - DBG_MSG_ERROR(("NULL pointer\n")); - } -} - -void CMessageBroker_Private::processNotification(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::processNotification()\n")); - if (pMessage) { - Json::Value root = pMessage->getMessage(); - std::string methodName = root["method"].asString(); - DBG_MSG(("Property: %s\n", methodName.c_str())); - std::vector<int> result; - int subscribersCount = mpRegistry->getSubscribersFd(methodName, result); - if (0 < subscribersCount) { - std::vector<int>::iterator it; - for (it = result.begin(); it != result.end(); it++) { - sendJsonMessage(*it, root); - } - } else { - DBG_MSG(("No subscribers for this property!\n")); - } - } else { - DBG_MSG_ERROR(("NULL pointer\n")); - } -} - -void CMessageBroker_Private::processError(CMessage* pMessage) { - DBG_MSG(("CMessageBroker::processError()\n")); - if (pMessage) { - sendJsonMessage(pMessage->getSenderFd(), pMessage->getMessage()); - delete pMessage;// delete CMessage object with error description!!! - } else { - DBG_MSG_ERROR(("NULL pointer\n")); - } -} - -void CMessageBroker_Private::sendJsonMessage(int fd, Json::Value message) { - DBG_MSG(("CMessageBroker::sendJsonMessage(%d)\n", fd)); - if (mpSender) { - std::string mes = m_writer.write(message); - int retVal = mpSender->Send(fd, mes); - if (retVal == -1) { - DBG_MSG_ERROR(("Message hasn't been sent!\n")); - return; - } - DBG_MSG(("Length: %zu, Sent: %d bytes\n", mes.length(), retVal)); - } else { - DBG_MSG_ERROR(("mpSender NULL pointer\n")); - } -} - -void* CMessageBroker::MethodForThread(void* arg) { - arg = arg; // to avoid compiler warnings - while (1) { - while (!p->isEventQueueEmpty()) { - CMessage* message = p->popMessage(); - if (message) { - Json::Value error; - if (p->checkMessage(message, error)) { - if (p->isNotification(message)) { - DBG_MSG(("Message is notification!\n")); - p->processNotification(message); - } else if (p->isResponse(message)) { - DBG_MSG(("Message is response!\n")); - p->processResponse(message); - } else { - if ("MB" == p->getDestinationComponentName(message)) { - DBG_MSG(("Internal MessageBroker method!\n")); - p->processInternalMessage(message); - } else { - DBG_MSG(("Not MessageBroker method!\n")); - p->processExternalMessage(message); - } - } - } else { - DBG_MSG_ERROR(("Message contains wrong data!\n")); - CMessage* errMessage = new CMessage(message->getSenderFd(), error); - if (NULL != errMessage) { - p->processError(errMessage); - } else { - DBG_MSG_ERROR(("NULL pointer!\n")); - } - } - delete message;// delete message object - } - } - p->m_messageQueueSemaphore.Wait(); - } - - return NULL; -} - -bool CMessageBroker_Private::checkMessage(CMessage* pMessage, Json::Value& error) { - DBG_MSG(("CMessageBroker::checkMessage()\n")); - Json::Value root; - root = pMessage->getMessage(); - Json::Value err; - - /* check the JSON-RPC version => 2.0 */ - if (!root.isObject() || !root.isMember("jsonrpc") || root["jsonrpc"] != "2.0") { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Invalid JSON RPC version."; - error["error"] = err; - return false; - } - - /* Check the id of message */ - if (root.isMember("id") && (root["id"].isArray() || root["id"].isObject() || root["id"].isString())) { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Invalid ID of message."; - error["error"] = err; - return false; - } - - /* extract "method" attribute */ - if (root.isMember("method")) { - if (!root["method"].isString()) { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Invalid JSONRPC method."; - error["error"] = err; - return false; - } - /* Check the params is an object*/ - if (root.isMember("params") && !root["params"].isObject()) { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Invalid JSONRPC params."; - error["error"] = err; - return false; - } - } else if (!(root.isMember("result") || root.isMember("error"))) { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Unknwn message type."; - error["error"] = err; - return false; - } - return true; -} -} /* namespace NsMessageBroker */ |