diff options
author | JackLivio <jack@livio.io> | 2018-02-02 12:00:25 -0500 |
---|---|---|
committer | JackLivio <jack@livio.io> | 2018-02-02 12:00:25 -0500 |
commit | b12728724da99ff832ee57af5c105910732e8760 (patch) | |
tree | 68f8d4d4328dbd288649e3ba5a543544abda2de5 /src/components | |
parent | dfe8b5ef72fc97c8c60b85006704e4d42d97d34e (diff) | |
download | sdl_core-b12728724da99ff832ee57af5c105910732e8760.tar.gz |
Add Boost Websocket HMI Adapter
Repleaces old message broker with new boost::beast websocket library. No changes required for setup or connecting with HMI.
Diffstat (limited to 'src/components')
9 files changed, 1175 insertions, 11 deletions
diff --git a/src/components/hmi_message_handler/CMakeLists.txt b/src/components/hmi_message_handler/CMakeLists.txt index c1dfca5e67..4d799b92dd 100644 --- a/src/components/hmi_message_handler/CMakeLists.txt +++ b/src/components/hmi_message_handler/CMakeLists.txt @@ -46,13 +46,19 @@ include_directories ( ${COMPONENTS_DIR}/dbus/include/ ${CMAKE_SOURCE_DIR}/ ${LOG4CXX_INCLUDE_DIRECTORY} + ${COMPONENTS_DIR}/hmi_message_handler/include + ${COMPONENTS_DIR}/hmi_message_handler/src + ${BOOST_INCLUDE_DIR} ) set(PATHS ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_SOURCE_DIR}/src + ) +message(${PATHS}) + if (HMIADAPTER STREQUAL "dbus") set(EXCLUDE_PATHS) set(DBUS_ADAPTER DBus) @@ -73,7 +79,13 @@ set(LIBRARIES ${RTLIB} ) + add_library("HMIMessageHandler" ${SOURCES}) + +if (HMIADAPTER STREQUAL "messagebroker") + add_dependencies("HMIMessageHandler" Boost) +endif() + target_link_libraries("HMIMessageHandler" ${LIBRARIES}) if(ENABLE_LOG) diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h new file mode 100644 index 0000000000..a8ec3f88bb --- /dev/null +++ b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h @@ -0,0 +1,162 @@ +#ifndef MB_CONTROLLER_H +#define MB_CONTROLLER_H + +#include <iostream> +#include <boost/beast/core.hpp> +#include <boost/beast/websocket.hpp> +#include <boost/asio/bind_executor.hpp> +#include <boost/asio/strand.hpp> +#include <boost/asio/placeholders.hpp> +#include <boost/asio/ip/tcp.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/thread.hpp> +#include <algorithm> +#include <cstdlib> +#include <functional> +#include <iostream> +#include <memory> +#include <string> +#include <thread> +#include <vector> +#include <map> +#include "json/json.h" + +#include "utils/lock.h" +#include "utils/atomic_object.h" + +using namespace boost::beast::websocket; + +#ifdef DEBUG_ON +/** +* \def DBG_MSG +* \brief Debug message output with file name and line number. +* \param x formatted debug message. +* \return printf construction. +*/ +#define DBG_MSG(x) printf("%s:%d ", __FILE__, __LINE__);\ + printf x +#else +#define DBG_MSG(x) +#endif + +#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d ", __FILE__, __LINE__);\ + printf x + + +namespace NsMessageBroker { + + enum ErrorCode { + CONTROLLER_EXISTS = -32000, + SUBSCRIBTION_EXISTS = -32001, + INVALID_REQUEST = -32600 + }; + + class WebsocketSession; + + class CMessageBrokerController : public std::enable_shared_from_this<CMessageBrokerController> { + public: + CMessageBrokerController(const std::string& address, uint16_t port, std::string name, int num_ports, boost::asio::io_context& ioc); + + ~CMessageBrokerController(); + + bool StartListener(); + + bool Run(); + + void WaitForConnection(); + + void StartSession(boost::system::error_code ec); + + void OnAccept(boost::system::error_code ec, boost::asio::strand<boost::asio::io_context::executor_type>& strand, stream<boost::asio::ip::tcp::socket>& ws); + + void OnRead(boost::system::error_code ec, std::size_t bytes_transferred); + + bool isNotification(Json::Value& message); + + void sendNotification(Json::Value& message); + + bool isResponse(Json::Value& message); + + void sendResponse(Json::Value& message); + + void sendJsonMessage(Json::Value& message); + + void subscribeTo(std::string property); + + void registerController(int id = 0); + + void unregisterController(); + + void* MethodForReceiverThread(void * arg); + + bool Connect(); + + void exitReceivingThread(); + + virtual void processResponse(std::string method, Json::Value& root) = 0; + + virtual void processRequest(Json::Value& root) = 0; + + virtual void processNotification(Json::Value& root) = 0; + + std::string getMethodName(std::string& method); + + std::string GetComponentName(std::string& method); + + void processInternalRequest(Json::Value& message, WebsocketSession* ws_session ); + + void pushRequest(Json::Value& message, WebsocketSession* ws_session); + + //Registry + bool addController(WebsocketSession* ws_session, std::string name); + + void deleteController(WebsocketSession* ws_session); + + void deleteController(std::string name); + + void removeSubscribersBySession(const WebsocketSession* ws); + + bool addSubscriber(WebsocketSession* ws_session, std::string name); + + void deleteSubscriber(WebsocketSession* ws, std::string name); + + int getSubscribersFd(std::string name, std::vector<WebsocketSession*>& result); + + int getNextControllerIdDiapason(); + + private: + boost::asio::io_context ioc_; + const std::string& address_; + uint16_t port_; + std::string name_; + int num_threads_; + std::vector<std::thread> thread_vector_; + + boost::asio::ip::tcp::acceptor acceptor_; + boost::asio::ip::tcp::socket socket_; + boost::beast::multi_buffer buffer_; + boost::asio::ip::tcp::endpoint endpoint_; + + int mControllersIdCounter; + + //Registry + std::map <std::string, WebsocketSession*> mControllersList; + sync_primitives::Lock mControllersListLock; + + std::multimap <std::string, WebsocketSession*> mSubscribersList; + sync_primitives::Lock mSubscribersListLock; + + std::map <std::string, WebsocketSession*> mRequestList; + sync_primitives::Lock mRequestListLock; + + std::atomic_bool shutdown_; + + + }; + +} //NsMessageBroker + + + + +#endif /* MB_CONTROLLER_H */
\ No newline at end of file diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h b/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h index f582cb2b81..81ab823e1c 100644 --- a/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h +++ b/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h @@ -35,7 +35,7 @@ #include <string> -#include "mb_controller.hpp" +#include "hmi_message_handler/mb_controller.h" #include "hmi_message_handler/hmi_message_adapter_impl.h" #include "utils/threads/thread_validator.h" @@ -47,7 +47,8 @@ class MessageBrokerAdapter : public HMIMessageAdapterImpl, public: MessageBrokerAdapter(HMIMessageHandler* handler_param, const std::string& server_address, - uint16_t port); + uint16_t port, + boost::asio::io_context& ioc); ~MessageBrokerAdapter(); void SendMessageToHMI(MessageSharedPointer message); diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h b/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h new file mode 100644 index 0000000000..d8a79e7ef7 --- /dev/null +++ b/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h @@ -0,0 +1,184 @@ +#ifndef WEBSOCKET_SESSION_H +#define WEBSOCKET_SESSION_H + +#include <iostream> +#include <boost/beast/core.hpp> +#include <boost/beast/websocket.hpp> +#include <boost/asio/bind_executor.hpp> +#include <boost/asio/strand.hpp> +#include <boost/asio/placeholders.hpp> +#include <boost/asio/ip/tcp.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/thread.hpp> +#include <algorithm> +#include <cstdlib> +#include <functional> +#include <iostream> +#include <memory> +#include <string> +#include <thread> +#include <vector> +#include <mutex> +#include <queue> +#include "json/json.h" + +#include "utils/lock.h" +#include "utils/atomic_object.h" +#include "utils/threads/thread.h" +#include "utils/threads/message_loop_thread.h" +#include "utils/message_queue.h" + +//#include "hmi_message_handler/mb_controller.h" + +using namespace boost::beast::websocket; +using ::utils::MessageQueue; + +#ifdef DEBUG_ON +/** +* \def DBG_MSG +* \brief Debug message output with file name and line number. +* \param x formatted debug message. +* \return printf construction. +*/ +#define DBG_MSG(x) printf("%s:%d ", __FILE__, __LINE__);\ + printf x +#else +#define DBG_MSG(x) +#endif + +#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d ", __FILE__, __LINE__);\ + printf x + +typedef std::queue<std::shared_ptr<std::string>> AsyncQueue; +typedef std::shared_ptr<std::string> Message; + + +namespace NsMessageBroker { + + class CMessageBrokerController; + + class WebsocketSession : + public std::enable_shared_from_this<WebsocketSession> { + + boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws_; + boost::asio::strand<boost::asio::io_context::executor_type> strand_; + boost::beast::multi_buffer buffer_; + boost::beast::multi_buffer send_buffer_; + CMessageBrokerController* controller_; + + public: + WebsocketSession(boost::asio::ip::tcp::socket socket, CMessageBrokerController* controller); + + ~WebsocketSession(); + + void Accept(); + + void Close(); + + void Shutdown(); + + bool IsShuttingDown(); + + void Recv(boost::system::error_code ec); + + void Send(std::string& message, Json::Value& json_message); + + void SendFromQueue(); + + void sendJsonMessage(Json::Value& message); + + void OnWrite(boost::system::error_code ec, std::size_t bytes_transferred, std::shared_ptr<std::string> message); + + void Read(boost::system::error_code ec, std::size_t bytes_transferred); + + int getNextMessageId(); + + void prepareMessage(Json::Value& root); + + void prepareErrorMessage(int errCode, std::string errMessage, Json::Value& error); + + std::string getDestinationComponentName(Json::Value& root); + + bool isNotification(Json::Value& root); + + bool isResponse(Json::Value& root); + + std::string findMethodById(std::string id); + + void registerController(int id = 0); + + void unregisterController(); + + void subscribeTo(std::string property); + + void unsubscribeFrom(std::string property); + + bool checkMessage(Json::Value& root, Json::Value& error); + + std::string getControllersName(); + + std::string GetComponentName(std::string& method); + + protected: + + sync_primitives::atomic_bool stop; + + private: + void onMessageReceived(Json::Value message); + + std::map<std::string, std::string> mWaitResponseQueue; + + std::string m_receivingBuffer; + + int mControllersIdStart; + + int mControllersIdCurrent; + + Json::Reader m_reader; + Json::FastWriter m_writer; + Json::FastWriter m_receiverWriter; + + + sync_primitives::Lock queue_lock_; + sync_primitives::Lock message_queue_lock_; + std::atomic_bool shutdown_; + + + MessageQueue<Message, AsyncQueue> message_queue_; + + class LoopThreadDelegate : public threads::ThreadDelegate { + public: + LoopThreadDelegate(MessageQueue<Message, AsyncQueue>* message_queue, + WebsocketSession* handler); + + virtual void threadMain() OVERRIDE; + virtual void exitThreadMain() OVERRIDE; + + void OnWrite(); + + void SetShutdown(); + + private: + void DrainQueue(); + MessageQueue<Message, AsyncQueue>& message_queue_; + WebsocketSession& handler_; + sync_primitives::Lock queue_lock_; + sync_primitives::ConditionalVariable queue_new_items_; + std::atomic_bool write_pending_; + std::atomic_bool shutdown_; + + sync_primitives::Lock write_lock_; + + }; + + LoopThreadDelegate* thread_delegate_; + threads::Thread* thread_; + + }; + +} //NsMessageBroker + + + + +#endif /* WEBSOCKET_SESSION_H */
\ No newline at end of file diff --git a/src/components/hmi_message_handler/src/mb_controller.cc b/src/components/hmi_message_handler/src/mb_controller.cc new file mode 100644 index 0000000000..d7fb432df2 --- /dev/null +++ b/src/components/hmi_message_handler/src/mb_controller.cc @@ -0,0 +1,472 @@ + +#include "hmi_message_handler/mb_controller.h" +#include "websocket_session.cc" + +using namespace boost::beast::websocket; +namespace NsMessageBroker +{ + CMessageBrokerController::CMessageBrokerController( + const std::string& address, uint16_t port, std::string name, + int num_threads, boost::asio::io_context& ioc):address_(address), acceptor_(ioc_), socket_(ioc_), mControllersIdCounter(1) + { + + port_ = port; + name_ = name; + num_threads_ = num_threads; + endpoint_ = {boost::asio::ip::make_address(address), static_cast<unsigned short>(port)}; + shutdown_ = false; + + } + + CMessageBrokerController::~CMessageBrokerController() + { + boost::system::error_code ec; + socket_.close(); + acceptor_.close(ec); + if(ec) { + std::cerr << "ErrorMessage Close: " << ": " << ec.message() << "\n"; + } + shutdown_ = true; + ioc_.stop(); + } + + bool CMessageBrokerController::StartListener() { + boost::system::error_code error; + acceptor_.open(endpoint_.protocol(), error); + if(error) { + std::cerr << "ErrorOpen: " << ": " << error.message() << "\n"; + return false; + } + + acceptor_.set_option(boost::asio::socket_base::reuse_address(true), error); + if(error) { + std::cerr << "ErrorSetOption: " << ": " << error.message() << "\n"; + return false; + } + acceptor_.bind(endpoint_, error); + if(error) { + std::cerr << "ErrorBind: " << ": " << error.message() << "\n"; + return false; + } + acceptor_.listen(boost::asio::socket_base::max_listen_connections, error); + if(error) { + std::cerr << "ErrorListen: " << ": " << error.message() << "\n"; + return false; + } + return true; + } + + bool CMessageBrokerController::Run(){ + if(acceptor_.is_open() && !shutdown_) { + acceptor_.async_accept( + socket_, + std::bind( + &CMessageBrokerController::StartSession, + this, + std::placeholders::_1 + )); + ioc_.run(); + return true; + + } + return false; + } + + void CMessageBrokerController::WaitForConnection() { + if(acceptor_.is_open() && !shutdown_) { + acceptor_.async_accept( + socket_, + std::bind( + &CMessageBrokerController::StartSession, + this, + std::placeholders::_1 + ) + ); + } + } + + void CMessageBrokerController::StartSession(boost::system::error_code ec){ + if(ec){ + std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n"; + ioc_.stop(); + return; + } + if(shutdown_) { + return; + } + std::make_shared<WebsocketSession>(std::move(socket_), this)->Accept(); + WaitForConnection(); + + } + + + bool CMessageBrokerController::isNotification(Json::Value& message) { + DBG_MSG(("CMessageBroker::isNotification()\n")); + bool ret = false; + if (false == message.isMember("id")) { + ret = true; + } + DBG_MSG(("Result: %d\n", ret)); + return ret; + } + + void CMessageBrokerController::sendNotification(Json::Value& message) { + DBG_MSG(("CMessageBroker::processNotification()\n")); + std::string methodName = message["method"].asString(); + DBG_MSG(("Property: %s\n", methodName.c_str())); + std::vector<WebsocketSession*> result; + int subscribersCount = getSubscribersFd(methodName, result); + if (0 < subscribersCount) { + std::vector<WebsocketSession*>::iterator it; + for (it = result.begin(); it != result.end(); it++) { + (*it)->sendJsonMessage(message); + } + } else { + DBG_MSG(("No subscribers for this property!\n")); + } + + } + + bool CMessageBrokerController::isResponse(Json::Value& message) { + DBG_MSG(("CMessageBroker::isResponse()\n")); + bool ret = false; + if ((true == message.isMember("result")) || (true == message.isMember("error"))) { + ret = true; + } + DBG_MSG(("Result: %d\n", ret)); + return ret; + } + + void CMessageBrokerController::sendResponse(Json::Value& message) { + WebsocketSession* ws; + std::map <std::string, WebsocketSession*>::iterator it; + sync_primitives::AutoLock request_lock(mRequestListLock); + + std::string id = message["id"].asString(); + it = mRequestList.find(id); + if (it != mRequestList.end()) { + ws = it->second; + ws->sendJsonMessage(message); + mRequestList.erase(it); + } + } + + void CMessageBrokerController::sendJsonMessage(Json::Value& message){ + + if(isNotification(message)) { + sendNotification(message); + return; + } else if (isResponse(message)) { + sendResponse(message); + return; + } + + //Send request + WebsocketSession* ws; + std::map <std::string, WebsocketSession*>::iterator it; + std::string method = message["method"].asString(); + std::string component_name = GetComponentName(method); + + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(component_name); + if (it != mControllersList.end()) { + ws = it->second; + ws->sendJsonMessage(message); + } + } + + void CMessageBrokerController::subscribeTo(std::string property){} + + void CMessageBrokerController::registerController(int id){} + + void CMessageBrokerController::unregisterController(){} + + void* CMessageBrokerController::MethodForReceiverThread(void * arg){ + return NULL; + } + + bool CMessageBrokerController::Connect(){ + return true; + } + + void CMessageBrokerController::exitReceivingThread(){ + sync_primitives::AutoLock lock(mControllersListLock); + shutdown_ = true; + std::map <std::string, WebsocketSession*>::iterator it; + for(it = mControllersList.begin(); it != mControllersList.end(); it++) { + if(!it->second->IsShuttingDown()) { + it->second->Shutdown(); + } + } + boost::system::error_code ec; + socket_.close(); + acceptor_.cancel(ec); + if(ec) { + std::cerr << "ErrorMessage Cancel: " << ": " << ec.message() << "\n"; + } + acceptor_.close(ec); + if(ec) { + std::cerr << "ErrorMessage Close: " << ": " << ec.message() << "\n"; + } + ioc_.stop(); + } + + std::string CMessageBrokerController::getMethodName(std::string& method) { + std::string return_string=""; + if(method != "") { + int position = method.find("."); + if(position != -1) { + return_string = method.substr(position + 1); + } + } + return return_string; + } + + std::string CMessageBrokerController::GetComponentName(std::string& method) { + std::string return_string=""; + if(method != "") { + int position = method.find("."); + if(position != -1) { + return_string = method.substr(0, position); + } + } + return return_string; + } + + bool CMessageBrokerController::addController(WebsocketSession* ws_session, std::string name){ + bool result = false; + std::map <std::string, WebsocketSession*>::iterator it; + + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(name); + if (it == mControllersList.end()) { + mControllersList.insert(std::map <std::string, WebsocketSession*>::value_type(name, ws_session)); + result = true; + } else { + DBG_MSG(("Controller already exists!\n")); + } + + DBG_MSG(("Count of controllers: %zu\n", mControllersList.size())); + return result; + } + + void CMessageBrokerController::deleteController(WebsocketSession* ws_session) { + { + sync_primitives::AutoLock lock(mControllersListLock); + std::map <std::string, WebsocketSession*>::iterator it; + for(it = mControllersList.begin(); it != mControllersList.end(); it++) { + if(it->second == ws_session) { + mControllersList.erase(it); + } + } + } + removeSubscribersBySession(ws_session); + } + + void CMessageBrokerController::deleteController(std::string name) { + std::map <std::string, WebsocketSession*>::iterator it; + WebsocketSession* ws; + { + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(name); + if (it != mControllersList.end()) + { + ws = it->second; + mControllersList.erase(it); + } else { + return; + } + } + removeSubscribersBySession(ws); + } + + void CMessageBrokerController::removeSubscribersBySession(const WebsocketSession* ws) { + sync_primitives::AutoLock lock(mSubscribersListLock); + std::multimap <std::string, WebsocketSession*>::iterator it_s = mSubscribersList.begin(); + for (; it_s !=mSubscribersList.end(); ) { + if (it_s->second == ws) { + mSubscribersList.erase(it_s++); + } else { + ++it_s; + } + } + } + + void CMessageBrokerController::pushRequest(Json::Value& message, WebsocketSession* ws_session) { + sync_primitives::AutoLock lock(mRequestListLock); + std::string id = message["id"].asString(); + mRequestList.insert(std::map <std::string, WebsocketSession*>::value_type(id, ws_session)); + } + + bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session, std::string name) { + bool result = true; + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name); + if (p.first != p.second) + { + std::multimap <std::string, WebsocketSession*>::iterator itr; + for (itr = p.first; itr != p.second; itr++) + { + if (ws_session == itr->second) + { + result = false; + DBG_MSG(("Subscriber already exists!\n")); + } + } + } + if (result) + { + mSubscribersList.insert(std::map <std::string, WebsocketSession*>::value_type(name, ws_session)); + } + DBG_MSG(("Count of subscribers: %zu\n", mSubscribersList.size())); + return result; + } + + void CMessageBrokerController::deleteSubscriber(WebsocketSession* ws, std::string name) { + + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name); + if (p.first != p.second) { + std::multimap <std::string, WebsocketSession*>::iterator itr; + for (itr = p.first; itr != p.second; ) { + if (ws == itr->second) { + mSubscribersList.erase(itr++); + } else { + ++itr; + } + } + } + + DBG_MSG(("Count of subscribers: %zu\n", mSubscribersList.size())); + } + + int CMessageBrokerController::getSubscribersFd(std::string name, std::vector<WebsocketSession*>& result) { + DBG_MSG(("CMessageBrokerRegistry::getSubscribersFd()\n")); + int res = 0; + std::map <std::string, WebsocketSession*>::iterator it; + + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name); + if (p.first != p.second) + { + std::multimap <std::string, WebsocketSession*>::iterator itr; + for (itr = p.first; itr != p.second; itr++) + { + result.push_back(itr->second); + DBG_MSG(("Controllers Fd: %d\n", itr->second)); + } + } + + res = result.size(); + DBG_MSG(("Result vector size: %d\n", res)); + return res; + } + + void CMessageBrokerController::processInternalRequest(Json::Value& message, WebsocketSession* ws_session){ + std::string method = message["method"].asString(); + std::string methodName = getMethodName(method); + if (methodName == "registerComponent") { + Json::Value params = message["params"]; + if (params.isMember("componentName") && params["componentName"].isString()) { + std::string controllerName = params["componentName"].asString(); + if (addController(ws_session, controllerName)) { + Json::Value response; + response["id"] = message["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = getNextControllerIdDiapason(); + ws_session->sendJsonMessage(response); + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = CONTROLLER_EXISTS; + err["message"] = "Controller has been already registered."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else if (methodName == "subscribeTo") { + Json::Value params = message["params"]; + if (params.isMember("propertyName") && params["propertyName"].isString()) { + std::string propertyName = params["propertyName"].asString(); + if (addSubscriber(ws_session, propertyName)) { + Json::Value response; + response["id"] = message["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + ws_session->sendJsonMessage(response); + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = CONTROLLER_EXISTS; + err["message"] = "Subscribe has been already registered."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + + } else if (methodName == "unregisterComponent" ) { + Json::Value params = message["params"]; + if (params.isMember("componentName") && params["componentName"].isString()) { + std::string controllerName = params["componentName"].asString(); + deleteController(controllerName); + Json::Value response; + response["id"] = message["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + ws_session->sendJsonMessage(response); + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else if (methodName == "unsubscribeFrom") { + Json::Value params = message["params"]; + if (params.isMember("propertyName") && params["propertyName"].isString()) { + std::string propertyName = params["propertyName"].asString(); + deleteSubscriber(ws_session, propertyName); + Json::Value response; + response["id"] = message["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + ws_session->sendJsonMessage(response); + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else { + + } + } + + int CMessageBrokerController::getNextControllerIdDiapason() { + return 1000 * mControllersIdCounter++; + } + +} diff --git a/src/components/hmi_message_handler/src/messagebroker_adapter.cc b/src/components/hmi_message_handler/src/messagebroker_adapter.cc index ff5a6d687e..337b3c73ca 100644 --- a/src/components/hmi_message_handler/src/messagebroker_adapter.cc +++ b/src/components/hmi_message_handler/src/messagebroker_adapter.cc @@ -43,9 +43,9 @@ typedef NsMessageBroker::CMessageBrokerController MessageBrokerController; MessageBrokerAdapter::MessageBrokerAdapter(HMIMessageHandler* handler_param, const std::string& server_address, - uint16_t port) + uint16_t port, boost::asio::io_context& ioc) : HMIMessageAdapterImpl(handler_param) - , MessageBrokerController(server_address, port, "SDL") { + , MessageBrokerController(server_address, port, "SDL", 8, ioc) { LOG4CXX_TRACE(logger_, "Created MessageBrokerAdapter"); } @@ -153,6 +153,7 @@ void* MessageBrokerAdapter::SubscribeAndBeginReceiverThread(void* param) { void MessageBrokerAdapter::ProcessRecievedFromMB(Json::Value& root) { LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_INFO(logger_, "MB_Adapter: " << root); if (root.isNull()) { // LOG return; diff --git a/src/components/hmi_message_handler/src/websocket_session.cc b/src/components/hmi_message_handler/src/websocket_session.cc new file mode 100644 index 0000000000..3484278c1a --- /dev/null +++ b/src/components/hmi_message_handler/src/websocket_session.cc @@ -0,0 +1,334 @@ + +#include "hmi_message_handler/websocket_session.h" +#include "hmi_message_handler/mb_controller.h" +#include <unistd.h> +using namespace boost::beast::websocket; +namespace NsMessageBroker +{ + WebsocketSession::WebsocketSession(boost::asio::ip::tcp::socket socket, CMessageBrokerController* controller) : + ws_(std::move(socket)), + strand_(ws_.get_executor()), + controller_(controller), + stop(false), + m_receivingBuffer(""), + mControllersIdStart(-1), + mControllersIdCurrent(0), + shutdown_(false), + thread_delegate_(new LoopThreadDelegate(&message_queue_, this)), + thread_(threads::CreateThread("WS Async Send", thread_delegate_)){ + thread_->start(threads::ThreadOptions()); + } + + WebsocketSession::~WebsocketSession(){} + + void WebsocketSession::Accept() { + ws_.async_accept( + boost::asio::bind_executor( + strand_, + std::bind( + &WebsocketSession::Recv, + shared_from_this(), std::placeholders::_1))); + } + + void WebsocketSession::Close() { + ws_.async_close({}, [](boost::system::error_code){}); + } + + void WebsocketSession::Shutdown() { + shutdown_ = true; + thread_delegate_->SetShutdown(); + thread_->join(); + delete thread_delegate_; + threads::DeleteThread(thread_); + Close(); + + } + + bool WebsocketSession::IsShuttingDown() { + return shutdown_; + } + + void WebsocketSession::Recv(boost::system::error_code ec) { + if(shutdown_) { + return; + } + + if(ec){ + std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n"; + shutdown_ = true; + thread_delegate_->SetShutdown(); + controller_->deleteController(this); + return; + + } + + ws_.async_read( + buffer_, + boost::asio::bind_executor( + strand_, + std::bind( + &WebsocketSession::Read, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2 + ) + ) + ); + } + + void WebsocketSession::Send(std::string& message, Json::Value& json_message) { + if (shutdown_) { + return; + } + std::shared_ptr<std::string> message_ptr = std::make_shared<std::string>( message ); + message_queue_.push(message_ptr); + } + + void WebsocketSession::sendJsonMessage(Json::Value& message) { + std::string str_msg = m_writer.write(message); + sync_primitives::AutoLock auto_lock(queue_lock_); + if (!isNotification(message) && !isResponse(message)) { + mWaitResponseQueue.insert(std::map<std::string, std::string>::value_type(message["id"].asString(), message["method"].asString())); + } + + Send(str_msg, message); + } + + void WebsocketSession::Read(boost::system::error_code ec, std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + if(ec){ + std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n"; + shutdown_ = true; + thread_delegate_->SetShutdown(); + controller_->deleteController(this); + buffer_.consume(buffer_.size()); + return; + } + + std::string data = boost::beast::buffers_to_string(buffer_.data()); + m_receivingBuffer += data; + + Json::Value root; + if (!m_reader.parse(m_receivingBuffer, root)) + { + std::cerr << "Invalid JSON Message" << ": " << data << "\n"; + return; + } + + std::string wmes = m_receiverWriter.write(root); + ssize_t beginpos = m_receivingBuffer.find(wmes); + if (-1 != beginpos) + { + m_receivingBuffer.erase(0, beginpos + wmes.length()); + DBG_MSG(("Buffer after cut is:%s\n", m_receivingBuffer.c_str())); + } else + { + m_receivingBuffer.clear(); + } + onMessageReceived(root); + + + buffer_.consume(buffer_.size()); + + Recv(ec); + + } + + std::string WebsocketSession::GetComponentName(std::string& method) { + std::string return_string=""; + if(method != "") { + int position = method.find("."); + if(position != -1) { + return_string = method.substr(0, position); + } + } + return return_string; + } + + void WebsocketSession::onMessageReceived(Json::Value message) { + // Determine message type and process... + Json::Value error; + if (checkMessage(message, error)) { + if (isNotification(message)) { + DBG_MSG(("Message is notification!\n")); + controller_->processNotification(message); + } else if (isResponse(message)) { + std::string id = message["id"].asString(); + std::string method = findMethodById(id); + DBG_MSG(("Message is response on: %s\n", method.c_str())); + if ("" != method) { + if ("MB.registerComponent" == method) { + if (message.isMember("result") && message["result"].isInt()) + { + mControllersIdStart = message["result"].asInt(); + } else + { + DBG_MSG_ERROR(("Not possible to initialize mControllersIdStart!\n")); + } + } else if ("MB.subscribeTo" == method || "MB.unregisterComponent" == method || "MB.unsubscribeFrom" == method) + { + //nothing to do for now + } else + { + controller_->processResponse(method, message); + } + } else { + DBG_MSG_ERROR(("Request with id %s has not been found!\n", id.c_str())); + } + } else { + DBG_MSG(("Message is request!\n")); + std::string method = message["method"].asString(); + std::string component_name = GetComponentName(method); + + if(component_name == "MB") { + controller_->processInternalRequest(message, this); + } else { + controller_->pushRequest(message, this); + controller_->processRequest(message); + } + } + } else { + DBG_MSG_ERROR(("Message contains wrong data!\n")); + sendJsonMessage(error); + } + } + + bool WebsocketSession::isNotification(Json::Value& root) { + DBG_MSG(("CMessageBrokerController::isNotification()\n")); + bool ret = false; + if (false == root.isMember("id")) + { + ret = true; + } + DBG_MSG(("Result: %d\n", ret)); + return ret; + } + + bool WebsocketSession::isResponse(Json::Value& root) { + DBG_MSG(("CMessageBrokerController::isResponse()\n")); + bool ret = false; + if ((true == root.isMember("result")) || (true == root.isMember("error"))) + { + ret = true; + } + DBG_MSG(("Result: %d\n", ret)); + return ret; + } + + std::string WebsocketSession::findMethodById(std::string id) { + DBG_MSG(("CMessageBrokerController::findMethodById()\n")); + sync_primitives::AutoLock auto_lock(queue_lock_); + std::string res = ""; + std::map <std::string, std::string>::iterator it; + it = mWaitResponseQueue.find(id); + if (it != mWaitResponseQueue.end()) + { + res = (*it).second; + mWaitResponseQueue.erase(it); + } + return res; + } + + bool WebsocketSession::checkMessage(Json::Value& root, Json::Value& error) + { + DBG_MSG(("CMessageBrokerController::checkMessage()\n")); + Json::Value err; + + try + { + /* check the JSON-RPC version => 2.0 */ + if (!root.isObject() || !root.isMember("jsonrpc") || root["jsonrpc"] != "2.0") + { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = NsMessageBroker::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; + } + + if (root.isMember("id") && (root["id"].isArray() || root["id"].isObject())) + { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = NsMessageBroker::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; + } + + if (root.isMember("result") && root.isMember("error")) + { + /* message can't contain simultaneously result and error*/ + return false; + } + + if (root.isMember("method")) + { + if (!root["method"].isString()) + { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = NsMessageBroker::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; + } + /* Check the params is an object*/ + if (root.isMember("params") && !root["params"].isObject()) + { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Invalid JSONRPC params."; + error["error"] = err; + return false; + } + } else if (!root.isMember("result") && !root.isMember("error")) + { + return false; + } + return true; + } catch (...) + { + DBG_MSG_ERROR(("CMessageBrokerController::checkMessage() EXCEPTION has been caught!\n")); + return false; + } + } + + WebsocketSession::LoopThreadDelegate::LoopThreadDelegate( + MessageQueue<Message, AsyncQueue>* message_queue, + WebsocketSession* handler) + : message_queue_(*message_queue), + handler_(*handler), + shutdown_(false) { + } + + void WebsocketSession::LoopThreadDelegate::threadMain() { + while(!message_queue_.IsShuttingDown() && !shutdown_) { + DrainQueue(); + message_queue_.wait(); + } + DrainQueue(); + } + + void WebsocketSession::LoopThreadDelegate::exitThreadMain() { + shutdown_= true; + message_queue_.Shutdown(); + } + + void WebsocketSession::LoopThreadDelegate::DrainQueue() { + while(!message_queue_.empty()) { + Message message_ptr; + message_queue_.pop(message_ptr); + if(!shutdown_) { + handler_.ws_.write(boost::asio::buffer(*message_ptr)) ; + }; + } + } + + void WebsocketSession::LoopThreadDelegate::SetShutdown(){ + shutdown_ = true; + message_queue_.Shutdown(); + } +}
\ No newline at end of file diff --git a/src/components/hmi_message_handler/test/CMakeLists.txt b/src/components/hmi_message_handler/test/CMakeLists.txt index 0d7ccd9ee1..9d72f3d15e 100644 --- a/src/components/hmi_message_handler/test/CMakeLists.txt +++ b/src/components/hmi_message_handler/test/CMakeLists.txt @@ -38,10 +38,7 @@ include_directories ( ) if (HMIADAPTER STREQUAL "messagebroker") - set (BROKER_LIBRARIES - message_broker_client - message_broker_server - ) + add_dependencies("hmi_message_handler_test" Boost) endif() set(EXCLUDE_PATHS) diff --git a/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc b/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc index 33be4a9228..bf44c3e2d1 100644 --- a/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc +++ b/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc @@ -52,11 +52,12 @@ class HMIMessageHandlerImplTest : public ::testing::Test { : mb_adapter_(NULL) , hmi_handler_(NULL) , mock_hmi_message_observer_(NULL) {} - + boost::asio::io_context ioc_; protected: hmi_message_handler::MessageBrokerAdapter* mb_adapter_; hmi_message_handler::HMIMessageHandlerImpl* hmi_handler_; MockHMIMessageObserver* mock_hmi_message_observer_; + testing::NiceMock<MockHMIMessageHandlerSettings> mock_hmi_message_handler_settings; const uint64_t stack_size = 1000u; @@ -68,7 +69,7 @@ class HMIMessageHandlerImplTest : public ::testing::Test { mock_hmi_message_handler_settings); ASSERT_TRUE(NULL != hmi_handler_); mb_adapter_ = new hmi_message_handler::MessageBrokerAdapter( - hmi_handler_, "localhost", 22); + hmi_handler_, "127.0.0.1", 8087, ioc_); ASSERT_TRUE(NULL != mb_adapter_); mock_hmi_message_observer_ = new MockHMIMessageObserver(); ASSERT_TRUE(NULL != mock_hmi_message_observer_); @@ -124,7 +125,7 @@ TEST_F(HMIMessageHandlerImplTest, AddHMIMessageAdapter_AddExistedAdapter_ExpectAdded) { // Check before action EXPECT_TRUE(hmi_handler_->message_adapters().empty()); - // Act + // Act hmi_handler_->AddHMIMessageAdapter(mb_adapter_); // Check after action EXPECT_EQ(1u, hmi_handler_->message_adapters().size()); |