diff options
author | JackLivio <jack@livio.io> | 2018-02-02 17:07:26 -0500 |
---|---|---|
committer | JackLivio <jack@livio.io> | 2018-02-02 17:07:26 -0500 |
commit | 5b6515d1e63bab3e22d81cc4d2f6184026cfbbee (patch) | |
tree | 786aa7a7064d04075cbfb08378f55abc5782df93 | |
parent | eadcb0bff5b73c6010fef82c7bc34e3d5352e323 (diff) | |
download | sdl_core-5b6515d1e63bab3e22d81cc4d2f6184026cfbbee.tar.gz |
Fixes for PR comment
14 files changed, 932 insertions, 1009 deletions
diff --git a/src/3rd_party/CMakeLists.txt b/src/3rd_party/CMakeLists.txt index 81a9a038be..238a28feed 100644 --- a/src/3rd_party/CMakeLists.txt +++ b/src/3rd_party/CMakeLists.txt @@ -321,11 +321,12 @@ endif() if (HMIADAPTER STREQUAL "messagebroker") find_package(Boost 1.66.0 COMPONENTS system) - message(STATUS ${Boost_FOUND}) + set(BOOST_LIB_SOURCE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/boost_src) + set(BOOST_LIBS_DIRECTORY ${3RD_PARTY_INSTALL_PREFIX}/lib) + SET_PROPERTY(GLOBAL PROPERTY GLOBAL_BOOST_LIBS ${BOOST_LIBS_DIRECTORY}) + set(BOOST_INCLUDE_DIRECTORY ${3RD_PARTY_INSTALL_PREFIX}/include ) if (NOT ${Boost_FOUND}) - set(BOOST_LIB_SOURCE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/boost_src) - set(BOOST_LIBS_DIRECTORY ${3RD_PARTY_INSTALL_PREFIX}/lib) - set(BOOST_INCLUDE_DIRECTORY ${3RD_PARTY_INSTALL_PREFIX}/include ) + message(STATUS "Did not find boost. Downloading and installing boost 1.66") include(ExternalProject) ExternalProject_Add( Boost diff --git a/src/appMain/CMakeLists.txt b/src/appMain/CMakeLists.txt index 47bd286d9b..7cf1a6b01f 100644 --- a/src/appMain/CMakeLists.txt +++ b/src/appMain/CMakeLists.txt @@ -154,8 +154,6 @@ endif() target_link_libraries(${PROJECT} ${LIBRARIES}) - - add_dependencies(${PROJECT} Policy) file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/log4cxx.properties DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/src/appMain/life_cycle.cc b/src/appMain/life_cycle.cc index a3a563244a..1863fd8654 100644 --- a/src/appMain/life_cycle.cc +++ b/src/appMain/life_cycle.cc @@ -75,10 +75,6 @@ LifeCycle::LifeCycle(const profile::Profile& profile) #endif // DBUS_HMIADAPTER #ifdef MESSAGEBROKER_HMIADAPTER , mb_adapter_(NULL) - , message_broker_(NULL) - , message_broker_server_(NULL) - , mb_thread_(NULL) - , mb_server_thread_(NULL) , mb_adapter_thread_(NULL) #endif // MESSAGEBROKER_HMIADAPTER , profile_(profile) { @@ -174,17 +170,17 @@ bool LifeCycle::StartComponents() { } #ifdef MESSAGEBROKER_HMIADAPTER -bool LifeCycle::InitMessageSystem(boost::asio::io_context& ioc) { - DCHECK(!message_broker_) +bool LifeCycle::InitMessageSystem() { mb_adapter_ = new hmi_message_handler::MessageBrokerAdapter( - hmi_handler_, profile_.server_address(), profile_.server_port(), ioc); + hmi_handler_, profile_.server_address(), profile_.server_port()); - if(!mb_adapter_->StartListener()) { + if (!mb_adapter_->StartListener()) { return false; } hmi_handler_->AddHMIMessageAdapter(mb_adapter_); - mb_adapter_thread_ = new std::thread(&hmi_message_handler::MessageBrokerAdapter::Run, mb_adapter_); + mb_adapter_thread_ = new std::thread( + &hmi_message_handler::MessageBrokerAdapter::Run, mb_adapter_); return true; } #endif // MESSAGEBROKER_HMIADAPTER diff --git a/src/appMain/life_cycle.h b/src/appMain/life_cycle.h index 9f31da827d..f6c556e50e 100644 --- a/src/appMain/life_cycle.h +++ b/src/appMain/life_cycle.h @@ -43,7 +43,6 @@ #endif // DBUS_HMIADAPTER #if (defined(MESSAGEBROKER_HMIADAPTER) || defined(PASA_HMI)) #include "hmi_message_handler/messagebroker_adapter.h" -//#include "hmi_message_handler/mb_controller.h" #endif // #if ( defined (MESSAGEBROKER_HMIADAPTER) || defined(PASA_HMI) ) #include "application_manager/application_manager_impl.h" #ifdef SDL_REMOTE_CONTROL @@ -59,11 +58,6 @@ #include "telemetry_monitor/telemetry_monitor.h" #endif -//#if ( defined (MESSAGEBROKER_HMIADAPTER) || defined(PASA_HMI) ) -#ifdef MESSAGEBROKER_HMIADAPTER - -#endif // MESSAGEBROKER_HMIADAPTER - #ifdef ENABLE_SECURITY namespace security_manager { class CryptoManager; @@ -81,14 +75,10 @@ class LifeCycle { * Initialize MessageBroker component * @return true if success otherwise false. */ - #ifdef MESSAGEBROKER_HMIADAPTER - bool InitMessageSystem(boost::asio::io_context& ioc); - #else bool InitMessageSystem(); - #endif /** - * \brief Main loop - */ + * \brief Main loop + */ void Run(); void StopComponents(); @@ -115,10 +105,6 @@ class LifeCycle { #ifdef MESSAGEBROKER_HMIADAPTER hmi_message_handler::MessageBrokerAdapter* mb_adapter_; - void* message_broker_; - void* message_broker_server_; - void* mb_thread_; - void* mb_server_thread_; std::thread* mb_adapter_thread_; #endif // MESSAGEBROKER_HMIADAPTER diff --git a/src/appMain/main.cc b/src/appMain/main.cc index 120848afde..feb5b5830e 100644 --- a/src/appMain/main.cc +++ b/src/appMain/main.cc @@ -68,8 +68,6 @@ const std::string kBrowserName = "chromium-browser"; const std::string kBrowserParams = "--auth-schemes=basic,digest,ntlm"; const std::string kLocalHostAddress = "127.0.0.1"; -boost::asio::io_context ioc; - #ifdef WEB_HMI /** * Initialize HTML based HMI. @@ -146,17 +144,14 @@ int32_t main(int32_t argc, char** argv) { // -------------------------------------------------------------------------- // Third-Party components initialization. - - if (!life_cycle.InitMessageSystem(ioc)) { + if (!life_cycle.InitMessageSystem()) { LOG4CXX_FATAL(logger_, "Failed to init message system"); life_cycle.StopComponents(); DEINIT_LOGGER(); _exit(EXIT_FAILURE); } LOG4CXX_INFO(logger_, "InitMessageBroker successful"); - //ioc.run(); - LOG4CXX_INFO(logger_, "IOC RUN"); if (profile_instance.launch_hmi()) { if (profile_instance.server_address() == kLocalHostAddress) { LOG4CXX_INFO(logger_, "Start HMI on localhost"); @@ -174,9 +169,8 @@ int32_t main(int32_t argc, char** argv) { life_cycle.Run(); LOG4CXX_INFO(logger_, "Stop SDL due to caught signal"); - + life_cycle.StopComponents(); - ioc.stop(); LOG4CXX_INFO(logger_, "Application has been stopped successfuly"); DEINIT_LOGGER(); diff --git a/src/components/hmi_message_handler/CMakeLists.txt b/src/components/hmi_message_handler/CMakeLists.txt index 4d799b92dd..13d5177046 100644 --- a/src/components/hmi_message_handler/CMakeLists.txt +++ b/src/components/hmi_message_handler/CMakeLists.txt @@ -54,11 +54,8 @@ include_directories ( set(PATHS ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_SOURCE_DIR}/src - ) -message(${PATHS}) - if (HMIADAPTER STREQUAL "dbus") set(EXCLUDE_PATHS) set(DBUS_ADAPTER DBus) @@ -79,7 +76,6 @@ set(LIBRARIES ${RTLIB} ) - add_library("HMIMessageHandler" ${SOURCES}) if (HMIADAPTER STREQUAL "messagebroker") diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h index 6ef5dc4e3e..98d5260259 100644 --- a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h +++ b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h @@ -1,5 +1,5 @@ -#ifndef MB_CONTROLLER_H -#define MB_CONTROLLER_H +#ifndef MB_CONTROLLER_H +#define MB_CONTROLLER_H #include <iostream> #include <boost/beast/core.hpp> @@ -20,146 +20,137 @@ #include <vector> #include <map> #include "json/json.h" - +#include "utils/macro.h" #include "utils/lock.h" #include "utils/atomic_object.h" +#include "websocket_session.h" using namespace boost::beast::websocket; -#ifdef DEBUG_ON -/** -* \def DBG_MSG -* \brief Debug message output with file name and line number. -* \param x formatted debug message. -* \return printf construction. -*/ -#define DBG_MSG(x) printf("%s:%d ", __FILE__, __LINE__);\ - printf x -#else -#define DBG_MSG(x) -#endif - -#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d ", __FILE__, __LINE__);\ - printf x - - -namespace NsMessageBroker { - - enum ErrorCode { - CONTROLLER_EXISTS = -32000, - SUBSCRIBTION_EXISTS = -32001, - INVALID_REQUEST = -32600 - }; - - class WebsocketSession; - - class CMessageBrokerController : public std::enable_shared_from_this<CMessageBrokerController> { - public: - CMessageBrokerController(const std::string& address, uint16_t port, std::string name, int num_ports, boost::asio::io_context& ioc); +namespace hmi_message_handler { - ~CMessageBrokerController(); +CREATE_LOGGERPTR_GLOBAL(mb_logger_, "HMIMessageHandler") - bool StartListener(); +enum ErrorCode { + CONTROLLER_EXISTS = -32000, + SUBSCRIBTION_EXISTS = -32001, + INVALID_REQUEST = -32600 +}; - bool Run(); +class WebsocketSession; - void WaitForConnection(); +class CMessageBrokerController + : public std::enable_shared_from_this<CMessageBrokerController> { + public: + CMessageBrokerController(const std::string& address, + uint16_t port, + std::string name, + int num_ports); - void StartSession(boost::system::error_code ec); + ~CMessageBrokerController(); - void OnAccept(boost::system::error_code ec, boost::asio::strand<boost::asio::io_context::executor_type>& strand, stream<boost::asio::ip::tcp::socket>& ws); + bool StartListener(); - void OnRead(boost::system::error_code ec, std::size_t bytes_transferred); + bool Run(); - bool isNotification(Json::Value& message); + void WaitForConnection(); - void sendNotification(Json::Value& message); + void StartSession(boost::system::error_code ec); - bool isResponse(Json::Value& message); + void OnAccept( + boost::system::error_code ec, + boost::asio::strand<boost::asio::io_context::executor_type>& strand, + stream<boost::asio::ip::tcp::socket>& ws); - void sendResponse(Json::Value& message); + void OnRead(boost::system::error_code ec, std::size_t bytes_transferred); - void sendJsonMessage(Json::Value& message); + bool isNotification(Json::Value& message); - void subscribeTo(std::string property); + void sendNotification(Json::Value& message); - void registerController(int id = 0); + bool isResponse(Json::Value& message); - void unregisterController(); + void sendResponse(Json::Value& message); - void* MethodForReceiverThread(void * arg); + void sendJsonMessage(Json::Value& message); - bool Connect(); + void subscribeTo(std::string property); - void exitReceivingThread(); + void registerController(int id = 0); - virtual void processResponse(std::string method, Json::Value& root) = 0; + void unregisterController(); - virtual void processRequest(Json::Value& root) = 0; + void* MethodForReceiverThread(void* arg); - virtual void processNotification(Json::Value& root) = 0; + bool Connect(); - std::string getMethodName(std::string& method); + void exitReceivingThread(); - std::string GetComponentName(std::string& method); + virtual void processResponse(std::string method, Json::Value& root) = 0; - void processInternalRequest(Json::Value& message, WebsocketSession* ws_session ); + virtual void processRequest(Json::Value& root) = 0; - void pushRequest(Json::Value& message, WebsocketSession* ws_session); + virtual void processNotification(Json::Value& root) = 0; - //Registry - bool addController(WebsocketSession* ws_session, std::string name); + std::string getMethodName(std::string& method); - void deleteController(WebsocketSession* ws_session); + std::string GetComponentName(std::string& method); - void deleteController(std::string name); + void processInternalRequest(Json::Value& message, + WebsocketSession* ws_session); - void removeSubscribersBySession(const WebsocketSession* ws); + void pushRequest(Json::Value& message, WebsocketSession* ws_session); - bool addSubscriber(WebsocketSession* ws_session, std::string name); + // Registry + bool addController(WebsocketSession* ws_session, std::string name); - void deleteSubscriber(WebsocketSession* ws, std::string name); + void deleteController(WebsocketSession* ws_session); - int getSubscribersFd(std::string name, std::vector<WebsocketSession*>& result); + void deleteController(std::string name); - int getNextControllerIdDiapason(); + void removeSubscribersBySession(const WebsocketSession* ws); - private: - boost::asio::io_context ioc_; - const std::string& address_; - uint16_t port_; - std::string name_; - int num_threads_; - std::vector<std::thread> thread_vector_; - - boost::asio::ip::tcp::acceptor acceptor_; - boost::asio::ip::tcp::socket socket_; - boost::beast::multi_buffer buffer_; - boost::asio::ip::tcp::endpoint endpoint_; + bool addSubscriber(WebsocketSession* ws_session, std::string name); - int mControllersIdCounter; + void deleteSubscriber(WebsocketSession* ws, std::string name); - //Registry - std::vector <std::shared_ptr<NsMessageBroker::WebsocketSession>> mConnectionList; - sync_primitives::Lock mConnectionListLock; + int getSubscribersFd(std::string name, + std::vector<WebsocketSession*>& result); - std::map <std::string, WebsocketSession*> mControllersList; - sync_primitives::Lock mControllersListLock; + int getNextControllerId(); - std::multimap <std::string, WebsocketSession*> mSubscribersList; - sync_primitives::Lock mSubscribersListLock; + private: + boost::asio::io_context ioc_; + const std::string& address_; + uint16_t port_; + std::string name_; + int num_threads_; + std::vector<std::thread> thread_vector_; - std::map <std::string, WebsocketSession*> mRequestList; - sync_primitives::Lock mRequestListLock; + boost::asio::ip::tcp::acceptor acceptor_; + boost::asio::ip::tcp::socket socket_; + boost::beast::multi_buffer buffer_; + boost::asio::ip::tcp::endpoint endpoint_; - std::atomic_bool shutdown_; + int mControllersIdCounter; + // Registry + std::vector<std::shared_ptr<hmi_message_handler::WebsocketSession> > + mConnectionList; + sync_primitives::Lock mConnectionListLock; - }; + std::map<std::string, WebsocketSession*> mControllersList; + sync_primitives::Lock mControllersListLock; -} //NsMessageBroker + std::multimap<std::string, WebsocketSession*> mSubscribersList; + sync_primitives::Lock mSubscribersListLock; + std::map<std::string, WebsocketSession*> mRequestList; + sync_primitives::Lock mRequestListLock; + std::atomic_bool shutdown_; +}; +} // hmi_message_handler #endif /* MB_CONTROLLER_H */
\ No newline at end of file diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h b/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h index 81ab823e1c..423f331297 100644 --- a/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h +++ b/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h @@ -41,14 +41,14 @@ namespace hmi_message_handler { -class MessageBrokerAdapter : public HMIMessageAdapterImpl, - public NsMessageBroker::CMessageBrokerController, - public threads::SingleThreadValidator { +class MessageBrokerAdapter + : public HMIMessageAdapterImpl, + public hmi_message_handler::CMessageBrokerController, + public threads::SingleThreadValidator { public: MessageBrokerAdapter(HMIMessageHandler* handler_param, const std::string& server_address, - uint16_t port, - boost::asio::io_context& ioc); + uint16_t port); ~MessageBrokerAdapter(); void SendMessageToHMI(MessageSharedPointer message); diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h b/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h index 24d85dbaff..c084bd7769 100644 --- a/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h +++ b/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h @@ -1,5 +1,5 @@ -#ifndef WEBSOCKET_SESSION_H -#define WEBSOCKET_SESSION_H +#ifndef WEBSOCKET_SESSION_H +#define WEBSOCKET_SESSION_H #include <iostream> #include <boost/beast/core.hpp> @@ -21,162 +21,158 @@ #include <mutex> #include <queue> #include "json/json.h" - +#include "utils/macro.h" #include "utils/lock.h" #include "utils/atomic_object.h" #include "utils/threads/thread.h" #include "utils/threads/message_loop_thread.h" #include "utils/message_queue.h" -//#include "hmi_message_handler/mb_controller.h" - using namespace boost::beast::websocket; using ::utils::MessageQueue; -#ifdef DEBUG_ON +#ifdef DEBUG_ON /** * \def DBG_MSG * \brief Debug message output with file name and line number. * \param x formatted debug message. * \return printf construction. */ -#define DBG_MSG(x) printf("%s:%d ", __FILE__, __LINE__);\ - printf x +#define DBG_MSG(x) \ + printf("%s:%d ", __FILE__, __LINE__); \ + printf x #else #define DBG_MSG(x) #endif -#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d ", __FILE__, __LINE__);\ - printf x +#define DBG_MSG_ERROR(x) \ + printf("ERROR!!! %s:%d ", __FILE__, __LINE__); \ + printf x -typedef std::queue<std::shared_ptr<std::string>> AsyncQueue; +typedef std::queue<std::shared_ptr<std::string> > AsyncQueue; typedef std::shared_ptr<std::string> Message; +namespace hmi_message_handler { -namespace NsMessageBroker { - - class CMessageBrokerController; +CREATE_LOGGERPTR_GLOBAL(ws_logger_, "HMIMessageHandler") - class WebsocketSession : - public std::enable_shared_from_this<WebsocketSession> { +class CMessageBrokerController; +class WebsocketSession : public std::enable_shared_from_this<WebsocketSession> { boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws_; boost::asio::strand<boost::asio::io_context::executor_type> strand_; boost::beast::multi_buffer buffer_; boost::beast::multi_buffer send_buffer_; - CMessageBrokerController* controller_; - - public: - WebsocketSession(boost::asio::ip::tcp::socket socket, CMessageBrokerController* controller); - - ~WebsocketSession(); + CMessageBrokerController* controller_; - void Accept(); + public: + WebsocketSession(boost::asio::ip::tcp::socket socket, + CMessageBrokerController* controller); - void Shutdown(); + ~WebsocketSession(); - bool IsShuttingDown(); + void Accept(); - void Recv(boost::system::error_code ec); + void Shutdown(); - void Send(std::string& message, Json::Value& json_message); + bool IsShuttingDown(); - void SendFromQueue(); + void Recv(boost::system::error_code ec); - void sendJsonMessage(Json::Value& message); + void Send(std::string& message, Json::Value& json_message); - void OnWrite(boost::system::error_code ec, std::size_t bytes_transferred, std::shared_ptr<std::string> message); + void SendFromQueue(); - void Read(boost::system::error_code ec, std::size_t bytes_transferred); + void sendJsonMessage(Json::Value& message); - int getNextMessageId(); + void OnWrite(boost::system::error_code ec, + std::size_t bytes_transferred, + std::shared_ptr<std::string> message); - void prepareMessage(Json::Value& root); + void Read(boost::system::error_code ec, std::size_t bytes_transferred); - void prepareErrorMessage(int errCode, std::string errMessage, Json::Value& error); + int getNextMessageId(); - std::string getDestinationComponentName(Json::Value& root); + void prepareMessage(Json::Value& root); - bool isNotification(Json::Value& root); + void prepareErrorMessage(int errCode, + std::string errMessage, + Json::Value& error); - bool isResponse(Json::Value& root); + std::string getDestinationComponentName(Json::Value& root); - std::string findMethodById(std::string id); + bool isNotification(Json::Value& root); - void registerController(int id = 0); + bool isResponse(Json::Value& root); - void unregisterController(); + std::string findMethodById(std::string id); - void subscribeTo(std::string property); + void registerController(int id = 0); - void unsubscribeFrom(std::string property); + void unregisterController(); - bool checkMessage(Json::Value& root, Json::Value& error); + void subscribeTo(std::string property); - std::string getControllersName(); + void unsubscribeFrom(std::string property); - std::string GetComponentName(std::string& method); + bool checkMessage(Json::Value& root, Json::Value& error); - protected: + std::string getControllersName(); - sync_primitives::atomic_bool stop; + std::string GetComponentName(std::string& method); - private: - void onMessageReceived(Json::Value message); + protected: + sync_primitives::atomic_bool stop; - std::map<std::string, std::string> mWaitResponseQueue; + private: + void onMessageReceived(Json::Value message); - std::string m_receivingBuffer; + std::map<std::string, std::string> mWaitResponseQueue; - int mControllersIdStart; + std::string m_receivingBuffer; - int mControllersIdCurrent; + int mControllersIdStart; - Json::Reader m_reader; - Json::FastWriter m_writer; - Json::FastWriter m_receiverWriter; - + int mControllersIdCurrent; - sync_primitives::Lock queue_lock_; - sync_primitives::Lock message_queue_lock_; - std::atomic_bool shutdown_; + Json::Reader m_reader; + Json::FastWriter m_writer; + Json::FastWriter m_receiverWriter; + sync_primitives::Lock queue_lock_; + sync_primitives::Lock message_queue_lock_; + std::atomic_bool shutdown_; - MessageQueue<Message, AsyncQueue> message_queue_; + MessageQueue<Message, AsyncQueue> message_queue_; - class LoopThreadDelegate : public threads::ThreadDelegate { - public: - LoopThreadDelegate(MessageQueue<Message, AsyncQueue>* message_queue, - WebsocketSession* handler); + class LoopThreadDelegate : public threads::ThreadDelegate { + public: + LoopThreadDelegate(MessageQueue<Message, AsyncQueue>* message_queue, + WebsocketSession* handler); - virtual void threadMain() OVERRIDE; - virtual void exitThreadMain() OVERRIDE; + virtual void threadMain() OVERRIDE; + virtual void exitThreadMain() OVERRIDE; - void OnWrite(); + void OnWrite(); - void SetShutdown(); + void SetShutdown(); - private: - void DrainQueue(); - MessageQueue<Message, AsyncQueue>& message_queue_; - WebsocketSession& handler_; - sync_primitives::Lock queue_lock_; - sync_primitives::ConditionalVariable queue_new_items_; - std::atomic_bool write_pending_; - std::atomic_bool shutdown_; - - sync_primitives::Lock write_lock_; - - }; - - LoopThreadDelegate* thread_delegate_; - threads::Thread* thread_; + private: + void DrainQueue(); + MessageQueue<Message, AsyncQueue>& message_queue_; + WebsocketSession& handler_; + sync_primitives::Lock queue_lock_; + sync_primitives::ConditionalVariable queue_new_items_; + std::atomic_bool write_pending_; + std::atomic_bool shutdown_; + sync_primitives::Lock write_lock_; }; -} //NsMessageBroker - - + LoopThreadDelegate* thread_delegate_; + threads::Thread* thread_; +}; +} // hmi_message_handler #endif /* WEBSOCKET_SESSION_H */
\ No newline at end of file diff --git a/src/components/hmi_message_handler/src/mb_controller.cc b/src/components/hmi_message_handler/src/mb_controller.cc index 871121fb4b..ec0d1f8bd8 100644 --- a/src/components/hmi_message_handler/src/mb_controller.cc +++ b/src/components/hmi_message_handler/src/mb_controller.cc @@ -1,479 +1,475 @@ #include "hmi_message_handler/mb_controller.h" -#include "websocket_session.cc" using namespace boost::beast::websocket; -namespace NsMessageBroker -{ - CMessageBrokerController::CMessageBrokerController( - const std::string& address, uint16_t port, std::string name, - int num_threads, boost::asio::io_context& ioc):address_(address), acceptor_(ioc_), socket_(ioc_), mControllersIdCounter(1) - { - - port_ = port; - name_ = name; - num_threads_ = num_threads; - endpoint_ = {boost::asio::ip::make_address(address), static_cast<unsigned short>(port)}; - shutdown_ = false; - - } - - CMessageBrokerController::~CMessageBrokerController() - { - boost::system::error_code ec; - socket_.close(); - acceptor_.close(ec); - if(ec) { - std::cerr << "ErrorMessage Close: " << ": " << ec.message() << "\n"; - } - shutdown_ = true; - ioc_.stop(); - } - - bool CMessageBrokerController::StartListener() { - boost::system::error_code error; - acceptor_.open(endpoint_.protocol(), error); - if(error) { - std::cerr << "ErrorOpen: " << ": " << error.message() << "\n"; - return false; - } - - acceptor_.set_option(boost::asio::socket_base::reuse_address(true), error); - if(error) { - std::cerr << "ErrorSetOption: " << ": " << error.message() << "\n"; - return false; - } - acceptor_.bind(endpoint_, error); - if(error) { - std::cerr << "ErrorBind: " << ": " << error.message() << "\n"; - return false; - } - acceptor_.listen(boost::asio::socket_base::max_listen_connections, error); - if(error) { - std::cerr << "ErrorListen: " << ": " << error.message() << "\n"; - return false; - } - return true; - } - - bool CMessageBrokerController::Run(){ - if(acceptor_.is_open() && !shutdown_) { - acceptor_.async_accept( - socket_, - std::bind( - &CMessageBrokerController::StartSession, - this, - std::placeholders::_1 - )); - ioc_.run(); - return true; +namespace hmi_message_handler { + +CMessageBrokerController::CMessageBrokerController(const std::string& address, + uint16_t port, + std::string name, + int num_threads) + : address_(address) + , acceptor_(ioc_) + , socket_(ioc_) + , mControllersIdCounter(1) { + port_ = port; + name_ = name; + num_threads_ = num_threads; + endpoint_ = {boost::asio::ip::make_address(address), + static_cast<unsigned short>(port)}; + shutdown_ = false; +} - } - return false; - } - - void CMessageBrokerController::WaitForConnection() { - if(acceptor_.is_open() && !shutdown_) { - acceptor_.async_accept( - socket_, - std::bind( - &CMessageBrokerController::StartSession, - this, - std::placeholders::_1 - ) - ); - } - } +CMessageBrokerController::~CMessageBrokerController() { + boost::system::error_code ec; + socket_.close(); + acceptor_.close(ec); + if (ec) { + std::string str_err = "ErrorMessage Close: " + ec.message(); + LOG4CXX_ERROR(mb_logger_, str_err); + } + shutdown_ = true; + ioc_.stop(); +} - void CMessageBrokerController::StartSession(boost::system::error_code ec){ - if(ec){ - std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n"; - ioc_.stop(); - return; - } - if(shutdown_) { - return; - } - std::shared_ptr<WebsocketSession> ws_ptr = std::make_shared<WebsocketSession>(std::move(socket_), this); - ws_ptr->Accept(); - - mConnectionListLock.Acquire(); - mConnectionList.push_back(ws_ptr); - mConnectionListLock.Release(); - - WaitForConnection(); - - } - - - bool CMessageBrokerController::isNotification(Json::Value& message) { - DBG_MSG(("CMessageBroker::isNotification()\n")); - bool ret = false; - if (false == message.isMember("id")) { - ret = true; - } - DBG_MSG(("Result: %d\n", ret)); - return ret; - } - - void CMessageBrokerController::sendNotification(Json::Value& message) { - DBG_MSG(("CMessageBroker::processNotification()\n")); - std::string methodName = message["method"].asString(); - DBG_MSG(("Property: %s\n", methodName.c_str())); - std::vector<WebsocketSession*> result; - int subscribersCount = getSubscribersFd(methodName, result); - if (0 < subscribersCount) { - std::vector<WebsocketSession*>::iterator it; - for (it = result.begin(); it != result.end(); it++) { - (*it)->sendJsonMessage(message); - } - } else { - DBG_MSG(("No subscribers for this property!\n")); - } +bool CMessageBrokerController::StartListener() { + boost::system::error_code error; + acceptor_.open(endpoint_.protocol(), error); + if (error) { + std::string str_err = "ErrorOpen: " + error.message(); + LOG4CXX_ERROR(mb_logger_, str_err); + return false; + } + + acceptor_.set_option(boost::asio::socket_base::reuse_address(true), error); + if (error) { + std::string str_err = "ErrorSetOption: " + error.message(); + LOG4CXX_ERROR(mb_logger_, str_err); + return false; + } + acceptor_.bind(endpoint_, error); + if (error) { + std::string str_err = "ErrorBind: " + error.message(); + LOG4CXX_ERROR(mb_logger_, str_err); + return false; + } + acceptor_.listen(boost::asio::socket_base::max_listen_connections, error); + if (error) { + std::string str_err = "ErrorListen: " + error.message(); + LOG4CXX_ERROR(mb_logger_, str_err); + return false; + } + return true; +} - } +bool CMessageBrokerController::Run() { + if (acceptor_.is_open() && !shutdown_) { + acceptor_.async_accept(socket_, + std::bind(&CMessageBrokerController::StartSession, + this, + std::placeholders::_1)); + ioc_.run(); + return true; + } + return false; +} - bool CMessageBrokerController::isResponse(Json::Value& message) { - DBG_MSG(("CMessageBroker::isResponse()\n")); - bool ret = false; - if ((true == message.isMember("result")) || (true == message.isMember("error"))) { - ret = true; - } - DBG_MSG(("Result: %d\n", ret)); - return ret; - } - - void CMessageBrokerController::sendResponse(Json::Value& message) { - WebsocketSession* ws; - std::map <std::string, WebsocketSession*>::iterator it; - sync_primitives::AutoLock request_lock(mRequestListLock); - - std::string id = message["id"].asString(); - it = mRequestList.find(id); - if (it != mRequestList.end()) { - ws = it->second; - ws->sendJsonMessage(message); - mRequestList.erase(it); - } - } +void CMessageBrokerController::WaitForConnection() { + if (acceptor_.is_open() && !shutdown_) { + acceptor_.async_accept(socket_, + std::bind(&CMessageBrokerController::StartSession, + this, + std::placeholders::_1)); + } +} - void CMessageBrokerController::sendJsonMessage(Json::Value& message){ +void CMessageBrokerController::StartSession(boost::system::error_code ec) { + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(mb_logger_, str_err); + ioc_.stop(); + return; + } + if (shutdown_) { + return; + } + std::shared_ptr<WebsocketSession> ws_ptr = + std::make_shared<WebsocketSession>(std::move(socket_), this); + ws_ptr->Accept(); + + mConnectionListLock.Acquire(); + mConnectionList.push_back(ws_ptr); + mConnectionListLock.Release(); + + WaitForConnection(); +} - if(isNotification(message)) { - sendNotification(message); - return; - } else if (isResponse(message)) { - sendResponse(message); - return; - } +bool CMessageBrokerController::isNotification(Json::Value& message) { + bool ret = false; + if (false == message.isMember("id")) { + ret = true; + } + return ret; +} - //Send request - WebsocketSession* ws; - std::map <std::string, WebsocketSession*>::iterator it; - std::string method = message["method"].asString(); - std::string component_name = GetComponentName(method); - - sync_primitives::AutoLock lock(mControllersListLock); - it = mControllersList.find(component_name); - if (it != mControllersList.end()) { - ws = it->second; - ws->sendJsonMessage(message); - } - } +void CMessageBrokerController::sendNotification(Json::Value& message) { + std::string methodName = message["method"].asString(); + std::vector<WebsocketSession*> result; + int subscribersCount = getSubscribersFd(methodName, result); + if (0 < subscribersCount) { + std::vector<WebsocketSession*>::iterator it; + for (it = result.begin(); it != result.end(); it++) { + (*it)->sendJsonMessage(message); + } + } else { + LOG4CXX_ERROR(mb_logger_, ("No subscribers for this property!\n")); + } +} + +bool CMessageBrokerController::isResponse(Json::Value& message) { + bool ret = false; + if ((true == message.isMember("result")) || + (true == message.isMember("error"))) { + ret = true; + } + return ret; +} - void CMessageBrokerController::subscribeTo(std::string property){} +void CMessageBrokerController::sendResponse(Json::Value& message) { + WebsocketSession* ws; + std::map<std::string, WebsocketSession*>::iterator it; + sync_primitives::AutoLock request_lock(mRequestListLock); + + std::string id = message["id"].asString(); + it = mRequestList.find(id); + if (it != mRequestList.end()) { + ws = it->second; + ws->sendJsonMessage(message); + mRequestList.erase(it); + } +} - void CMessageBrokerController::registerController(int id){} +void CMessageBrokerController::sendJsonMessage(Json::Value& message) { + if (isNotification(message)) { + sendNotification(message); + return; + } else if (isResponse(message)) { + sendResponse(message); + return; + } + + // Send request + WebsocketSession* ws; + std::map<std::string, WebsocketSession*>::iterator it; + std::string method = message["method"].asString(); + std::string component_name = GetComponentName(method); + + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(component_name); + if (it != mControllersList.end()) { + ws = it->second; + ws->sendJsonMessage(message); + } +} - void CMessageBrokerController::unregisterController(){} +void CMessageBrokerController::subscribeTo(std::string property) {} - void* CMessageBrokerController::MethodForReceiverThread(void * arg){ - return NULL; - } +void CMessageBrokerController::registerController(int id) {} - bool CMessageBrokerController::Connect(){ - return true; - } +void CMessageBrokerController::unregisterController() {} - void CMessageBrokerController::exitReceivingThread(){ - shutdown_ = true; - mConnectionListLock.Acquire(); - std::vector<std::shared_ptr<NsMessageBroker::WebsocketSession>>::iterator it; - for(it = mConnectionList.begin(); it != mConnectionList.end(); it++) { - (*it)->Shutdown(); - mConnectionList.erase(it); - } - mConnectionListLock.Release(); +void* CMessageBrokerController::MethodForReceiverThread(void* arg) { + return NULL; +} - boost::system::error_code ec; - socket_.close(); - acceptor_.cancel(ec); - if(ec) { - std::cerr << "ErrorMessage Cancel: " << ": " << ec.message() << "\n"; - } - acceptor_.close(ec); - if(ec) { - std::cerr << "ErrorMessage Close: " << ": " << ec.message() << "\n"; - } - ioc_.stop(); - } - - std::string CMessageBrokerController::getMethodName(std::string& method) { - std::string return_string=""; - if(method != "") { - int position = method.find("."); - if(position != -1) { - return_string = method.substr(position + 1); - } - } - return return_string; - } - - std::string CMessageBrokerController::GetComponentName(std::string& method) { - std::string return_string=""; - if(method != "") { - int position = method.find("."); - if(position != -1) { - return_string = method.substr(0, position); - } - } - return return_string; - } - - bool CMessageBrokerController::addController(WebsocketSession* ws_session, std::string name){ - bool result = false; - std::map <std::string, WebsocketSession*>::iterator it; - - sync_primitives::AutoLock lock(mControllersListLock); - it = mControllersList.find(name); - if (it == mControllersList.end()) { - mControllersList.insert(std::map <std::string, WebsocketSession*>::value_type(name, ws_session)); - result = true; +bool CMessageBrokerController::Connect() { + return true; +} + +void CMessageBrokerController::exitReceivingThread() { + shutdown_ = true; + mConnectionListLock.Acquire(); + std::vector<std::shared_ptr<hmi_message_handler::WebsocketSession> >::iterator + it; + for (it = mConnectionList.begin(); it != mConnectionList.end();) { + (*it)->Shutdown(); + mConnectionList.erase(it++); + } + mConnectionListLock.Release(); + + boost::system::error_code ec; + socket_.close(); + acceptor_.cancel(ec); + if (ec) { + std::string str_err = "ErrorMessage Cancel: " + ec.message(); + LOG4CXX_ERROR(mb_logger_, str_err); + } + acceptor_.close(ec); + if (ec) { + std::string str_err = "ErrorMessage Close: " + ec.message(); + } + ioc_.stop(); +} + +std::string CMessageBrokerController::getMethodName(std::string& method) { + std::string return_string = ""; + if (method != "") { + int position = method.find("."); + if (position != -1) { + return_string = method.substr(position + 1); + } + } + return return_string; +} + +std::string CMessageBrokerController::GetComponentName(std::string& method) { + std::string return_string = ""; + if (method != "") { + int position = method.find("."); + if (position != -1) { + return_string = method.substr(0, position); + } + } + return return_string; +} + +bool CMessageBrokerController::addController(WebsocketSession* ws_session, + std::string name) { + bool result = false; + std::map<std::string, WebsocketSession*>::iterator it; + + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(name); + if (it == mControllersList.end()) { + mControllersList.insert( + std::map<std::string, WebsocketSession*>::value_type(name, ws_session)); + result = true; + } else { + LOG4CXX_ERROR(mb_logger_, ("Controller already exists!\n")); + } + return result; +} + +void CMessageBrokerController::deleteController(WebsocketSession* ws_session) { + { + sync_primitives::AutoLock lock(mControllersListLock); + std::map<std::string, WebsocketSession*>::iterator it; + for (it = mControllersList.begin(); it != mControllersList.end();) { + if (it->second == ws_session) { + mControllersList.erase(it++); } else { - DBG_MSG(("Controller already exists!\n")); + it++; } + } + } + removeSubscribersBySession(ws_session); +} - DBG_MSG(("Count of controllers: %zu\n", mControllersList.size())); - return result; - } - - void CMessageBrokerController::deleteController(WebsocketSession* ws_session) { - { - sync_primitives::AutoLock lock(mControllersListLock); - std::map <std::string, WebsocketSession*>::iterator it; - for(it = mControllersList.begin(); it != mControllersList.end(); it++) { - if(it->second == ws_session) { - mControllersList.erase(it); - } - } - } - removeSubscribersBySession(ws_session); - } - - void CMessageBrokerController::deleteController(std::string name) { - std::map <std::string, WebsocketSession*>::iterator it; - WebsocketSession* ws; - { - sync_primitives::AutoLock lock(mControllersListLock); - it = mControllersList.find(name); - if (it != mControllersList.end()) - { - ws = it->second; - mControllersList.erase(it); - } else { - return; - } - } - removeSubscribersBySession(ws); - } - - void CMessageBrokerController::removeSubscribersBySession(const WebsocketSession* ws) { - sync_primitives::AutoLock lock(mSubscribersListLock); - std::multimap <std::string, WebsocketSession*>::iterator it_s = mSubscribersList.begin(); - for (; it_s !=mSubscribersList.end(); ) { - if (it_s->second == ws) { - mSubscribersList.erase(it_s++); - } else { - ++it_s; - } - } - } - - void CMessageBrokerController::pushRequest(Json::Value& message, WebsocketSession* ws_session) { - sync_primitives::AutoLock lock(mRequestListLock); - std::string id = message["id"].asString(); - mRequestList.insert(std::map <std::string, WebsocketSession*>::value_type(id, ws_session)); - } - - bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session, std::string name) { - bool result = true; - sync_primitives::AutoLock lock(mSubscribersListLock); - std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name); - if (p.first != p.second) - { - std::multimap <std::string, WebsocketSession*>::iterator itr; - for (itr = p.first; itr != p.second; itr++) - { - if (ws_session == itr->second) - { - result = false; - DBG_MSG(("Subscriber already exists!\n")); - } - } - } - if (result) - { - mSubscribersList.insert(std::map <std::string, WebsocketSession*>::value_type(name, ws_session)); - } - DBG_MSG(("Count of subscribers: %zu\n", mSubscribersList.size())); - return result; - } - - void CMessageBrokerController::deleteSubscriber(WebsocketSession* ws, std::string name) { - - sync_primitives::AutoLock lock(mSubscribersListLock); - std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name); - if (p.first != p.second) { - std::multimap <std::string, WebsocketSession*>::iterator itr; - for (itr = p.first; itr != p.second; ) { - if (ws == itr->second) { - mSubscribersList.erase(itr++); - } else { - ++itr; - } - } - } +void CMessageBrokerController::deleteController(std::string name) { + std::map<std::string, WebsocketSession*>::iterator it; + WebsocketSession* ws; + { + sync_primitives::AutoLock lock(mControllersListLock); + it = mControllersList.find(name); + if (it != mControllersList.end()) { + ws = it->second; + mControllersList.erase(it); + } else { + return; + } + } + removeSubscribersBySession(ws); +} + +void CMessageBrokerController::removeSubscribersBySession( + const WebsocketSession* ws) { + sync_primitives::AutoLock lock(mSubscribersListLock); + std::multimap<std::string, WebsocketSession*>::iterator it_s = + mSubscribersList.begin(); + for (; it_s != mSubscribersList.end();) { + if (it_s->second == ws) { + mSubscribersList.erase(it_s++); + } else { + ++it_s; + } + } +} + +void CMessageBrokerController::pushRequest(Json::Value& message, + WebsocketSession* ws_session) { + sync_primitives::AutoLock lock(mRequestListLock); + std::string id = message["id"].asString(); + mRequestList.insert( + std::map<std::string, WebsocketSession*>::value_type(id, ws_session)); +} - DBG_MSG(("Count of subscribers: %zu\n", mSubscribersList.size())); - } - - int CMessageBrokerController::getSubscribersFd(std::string name, std::vector<WebsocketSession*>& result) { - DBG_MSG(("CMessageBrokerRegistry::getSubscribersFd()\n")); - int res = 0; - std::map <std::string, WebsocketSession*>::iterator it; - - sync_primitives::AutoLock lock(mSubscribersListLock); - std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name); - if (p.first != p.second) - { - std::multimap <std::string, WebsocketSession*>::iterator itr; - for (itr = p.first; itr != p.second; itr++) - { - result.push_back(itr->second); - DBG_MSG(("Controllers Fd: %d\n", itr->second)); - } +bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session, + std::string name) { + bool result = true; + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap<std::string, WebsocketSession*>::iterator, + std::multimap<std::string, WebsocketSession*>::iterator> p = + mSubscribersList.equal_range(name); + if (p.first != p.second) { + std::multimap<std::string, WebsocketSession*>::iterator itr; + for (itr = p.first; itr != p.second; itr++) { + if (ws_session == itr->second) { + result = false; + LOG4CXX_ERROR(mb_logger_, ("Subscriber already exists!\n")); } + } + } + if (result) { + mSubscribersList.insert( + std::map<std::string, WebsocketSession*>::value_type(name, ws_session)); + } + return result; +} - res = result.size(); - DBG_MSG(("Result vector size: %d\n", res)); - return res; - } - - void CMessageBrokerController::processInternalRequest(Json::Value& message, WebsocketSession* ws_session){ - std::string method = message["method"].asString(); - std::string methodName = getMethodName(method); - if (methodName == "registerComponent") { - Json::Value params = message["params"]; - if (params.isMember("componentName") && params["componentName"].isString()) { - std::string controllerName = params["componentName"].asString(); - if (addController(ws_session, controllerName)) { - Json::Value response; - response["id"] = message["id"]; - response["jsonrpc"] = "2.0"; - response["result"] = getNextControllerIdDiapason(); - ws_session->sendJsonMessage(response); - } else { - Json::Value error, err; - error["id"] = message["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = CONTROLLER_EXISTS; - err["message"] = "Controller has been already registered."; - error["error"] = err; - ws_session->sendJsonMessage(error); - } - } else { - Json::Value error, err; - error["id"] = message["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Wrong method parameter."; - error["error"] = err; - ws_session->sendJsonMessage(error); - } - } else if (methodName == "subscribeTo") { - Json::Value params = message["params"]; - if (params.isMember("propertyName") && params["propertyName"].isString()) { - std::string propertyName = params["propertyName"].asString(); - if (addSubscriber(ws_session, propertyName)) { - Json::Value response; - response["id"] = message["id"]; - response["jsonrpc"] = "2.0"; - response["result"] = "OK"; - ws_session->sendJsonMessage(response); - } else { - Json::Value error, err; - error["id"] = message["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = CONTROLLER_EXISTS; - err["message"] = "Subscribe has been already registered."; - error["error"] = err; - ws_session->sendJsonMessage(error); - } - } else { - Json::Value error, err; - error["id"] = message["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Wrong method parameter."; - error["error"] = err; - ws_session->sendJsonMessage(error); - } - - } else if (methodName == "unregisterComponent" ) { - Json::Value params = message["params"]; - if (params.isMember("componentName") && params["componentName"].isString()) { - std::string controllerName = params["componentName"].asString(); - deleteController(controllerName); - Json::Value response; - response["id"] = message["id"]; - response["jsonrpc"] = "2.0"; - response["result"] = "OK"; - ws_session->sendJsonMessage(response); - } else { - Json::Value error, err; - error["id"] = message["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Wrong method parameter."; - error["error"] = err; - ws_session->sendJsonMessage(error); - } - } else if (methodName == "unsubscribeFrom") { - Json::Value params = message["params"]; - if (params.isMember("propertyName") && params["propertyName"].isString()) { - std::string propertyName = params["propertyName"].asString(); - deleteSubscriber(ws_session, propertyName); - Json::Value response; - response["id"] = message["id"]; - response["jsonrpc"] = "2.0"; - response["result"] = "OK"; - ws_session->sendJsonMessage(response); - } else { - Json::Value error, err; - error["id"] = message["id"]; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Wrong method parameter."; - error["error"] = err; - ws_session->sendJsonMessage(error); - } +void CMessageBrokerController::deleteSubscriber(WebsocketSession* ws, + std::string name) { + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap<std::string, WebsocketSession*>::iterator, + std::multimap<std::string, WebsocketSession*>::iterator> p = + mSubscribersList.equal_range(name); + if (p.first != p.second) { + std::multimap<std::string, WebsocketSession*>::iterator itr; + for (itr = p.first; itr != p.second;) { + if (ws == itr->second) { + mSubscribersList.erase(itr++); } else { - + ++itr; } - } + } + } +} - int CMessageBrokerController::getNextControllerIdDiapason() { - return 1000 * mControllersIdCounter++; - } +int CMessageBrokerController::getSubscribersFd( + std::string name, std::vector<WebsocketSession*>& result) { + int res = 0; + std::map<std::string, WebsocketSession*>::iterator it; + + sync_primitives::AutoLock lock(mSubscribersListLock); + std::pair<std::multimap<std::string, WebsocketSession*>::iterator, + std::multimap<std::string, WebsocketSession*>::iterator> p = + mSubscribersList.equal_range(name); + if (p.first != p.second) { + std::multimap<std::string, WebsocketSession*>::iterator itr; + for (itr = p.first; itr != p.second; itr++) { + result.push_back(itr->second); + } + } + + res = result.size(); + return res; +} + +void CMessageBrokerController::processInternalRequest( + Json::Value& message, WebsocketSession* ws_session) { + std::string method = message["method"].asString(); + std::string methodName = getMethodName(method); + if (methodName == "registerComponent") { + Json::Value params = message["params"]; + if (params.isMember("componentName") && + params["componentName"].isString()) { + std::string controllerName = params["componentName"].asString(); + if (addController(ws_session, controllerName)) { + Json::Value response; + response["id"] = message["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = getNextControllerId(); + ws_session->sendJsonMessage(response); + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = CONTROLLER_EXISTS; + err["message"] = "Controller has been already registered."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else if (methodName == "subscribeTo") { + Json::Value params = message["params"]; + if (params.isMember("propertyName") && params["propertyName"].isString()) { + std::string propertyName = params["propertyName"].asString(); + if (addSubscriber(ws_session, propertyName)) { + Json::Value response; + response["id"] = message["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + ws_session->sendJsonMessage(response); + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = CONTROLLER_EXISTS; + err["message"] = "Subscribe has been already registered."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + + } else if (methodName == "unregisterComponent") { + Json::Value params = message["params"]; + if (params.isMember("componentName") && + params["componentName"].isString()) { + std::string controllerName = params["componentName"].asString(); + deleteController(controllerName); + Json::Value response; + response["id"] = message["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + ws_session->sendJsonMessage(response); + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else if (methodName == "unsubscribeFrom") { + Json::Value params = message["params"]; + if (params.isMember("propertyName") && params["propertyName"].isString()) { + std::string propertyName = params["propertyName"].asString(); + deleteSubscriber(ws_session, propertyName); + Json::Value response; + response["id"] = message["id"]; + response["jsonrpc"] = "2.0"; + response["result"] = "OK"; + ws_session->sendJsonMessage(response); + } else { + Json::Value error, err; + error["id"] = message["id"]; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Wrong method parameter."; + error["error"] = err; + ws_session->sendJsonMessage(error); + } + } else { + } +} +int CMessageBrokerController::getNextControllerId() { + return 1000 * mControllersIdCounter++; +} } diff --git a/src/components/hmi_message_handler/src/messagebroker_adapter.cc b/src/components/hmi_message_handler/src/messagebroker_adapter.cc index 337b3c73ca..53a0d84362 100644 --- a/src/components/hmi_message_handler/src/messagebroker_adapter.cc +++ b/src/components/hmi_message_handler/src/messagebroker_adapter.cc @@ -39,13 +39,13 @@ namespace hmi_message_handler { CREATE_LOGGERPTR_GLOBAL(logger_, "HMIMessageHandler") -typedef NsMessageBroker::CMessageBrokerController MessageBrokerController; +typedef hmi_message_handler::CMessageBrokerController MessageBrokerController; MessageBrokerAdapter::MessageBrokerAdapter(HMIMessageHandler* handler_param, const std::string& server_address, - uint16_t port, boost::asio::io_context& ioc) + uint16_t port) : HMIMessageAdapterImpl(handler_param) - , MessageBrokerController(server_address, port, "SDL", 8, ioc) { + , MessageBrokerController(server_address, port, "SDL", 8) { LOG4CXX_TRACE(logger_, "Created MessageBrokerAdapter"); } diff --git a/src/components/hmi_message_handler/src/websocket_session.cc b/src/components/hmi_message_handler/src/websocket_session.cc index 84c786fac3..2e9ec1890b 100644 --- a/src/components/hmi_message_handler/src/websocket_session.cc +++ b/src/components/hmi_message_handler/src/websocket_session.cc @@ -1,332 +1,300 @@ - #include "hmi_message_handler/websocket_session.h" #include "hmi_message_handler/mb_controller.h" #include <unistd.h> using namespace boost::beast::websocket; -namespace NsMessageBroker -{ - WebsocketSession::WebsocketSession(boost::asio::ip::tcp::socket socket, CMessageBrokerController* controller) : - ws_(std::move(socket)), - strand_(ws_.get_executor()), - controller_(controller), - stop(false), - m_receivingBuffer(""), - mControllersIdStart(-1), - mControllersIdCurrent(0), - shutdown_(false), - thread_delegate_(new LoopThreadDelegate(&message_queue_, this)), - thread_(threads::CreateThread("WS Async Send", thread_delegate_)){ - thread_->start(threads::ThreadOptions()); - } - - WebsocketSession::~WebsocketSession(){} - - void WebsocketSession::Accept() { - ws_.async_accept( - boost::asio::bind_executor( - strand_, - std::bind( - &WebsocketSession::Recv, - shared_from_this(), std::placeholders::_1))); - } - - void WebsocketSession::Shutdown() { - shutdown_ = true; - thread_delegate_->SetShutdown(); - thread_->join(); - delete thread_delegate_; - threads::DeleteThread(thread_); - } - - bool WebsocketSession::IsShuttingDown() { - return shutdown_; - } - - void WebsocketSession::Recv(boost::system::error_code ec) { - if(shutdown_) { - return; - } - - if(ec){ - std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n"; - shutdown_ = true; - thread_delegate_->SetShutdown(); - controller_->deleteController(this); - return; - - } - - ws_.async_read( - buffer_, - boost::asio::bind_executor( - strand_, - std::bind( - &WebsocketSession::Read, - shared_from_this(), - std::placeholders::_1, - std::placeholders::_2 - ) - ) - ); - } - - void WebsocketSession::Send(std::string& message, Json::Value& json_message) { - if (shutdown_) { - return; - } - std::shared_ptr<std::string> message_ptr = std::make_shared<std::string>( message ); - message_queue_.push(message_ptr); - } - - void WebsocketSession::sendJsonMessage(Json::Value& message) { - std::string str_msg = m_writer.write(message); - sync_primitives::AutoLock auto_lock(queue_lock_); - if (!isNotification(message) && !isResponse(message)) { - mWaitResponseQueue.insert(std::map<std::string, std::string>::value_type(message["id"].asString(), message["method"].asString())); - } - - Send(str_msg, message); - } - - void WebsocketSession::Read(boost::system::error_code ec, std::size_t bytes_transferred) { - boost::ignore_unused(bytes_transferred); - if(ec){ - std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n"; - shutdown_ = true; - thread_delegate_->SetShutdown(); - controller_->deleteController(this); - buffer_.consume(buffer_.size()); - return; - } - - std::string data = boost::beast::buffers_to_string(buffer_.data()); - m_receivingBuffer += data; - - Json::Value root; - if (!m_reader.parse(m_receivingBuffer, root)) - { - std::cerr << "Invalid JSON Message" << ": " << data << "\n"; - return; - } - - std::string wmes = m_receiverWriter.write(root); - ssize_t beginpos = m_receivingBuffer.find(wmes); - if (-1 != beginpos) - { - m_receivingBuffer.erase(0, beginpos + wmes.length()); - DBG_MSG(("Buffer after cut is:%s\n", m_receivingBuffer.c_str())); - } else - { - m_receivingBuffer.clear(); - } - onMessageReceived(root); - - - buffer_.consume(buffer_.size()); - - Recv(ec); - - } - - std::string WebsocketSession::GetComponentName(std::string& method) { - std::string return_string=""; - if(method != "") { - int position = method.find("."); - if(position != -1) { - return_string = method.substr(0, position); - } - } - return return_string; - } - - void WebsocketSession::onMessageReceived(Json::Value message) { - // Determine message type and process... - Json::Value error; - if (checkMessage(message, error)) { - if (isNotification(message)) { - DBG_MSG(("Message is notification!\n")); - controller_->processNotification(message); - } else if (isResponse(message)) { - std::string id = message["id"].asString(); - std::string method = findMethodById(id); - DBG_MSG(("Message is response on: %s\n", method.c_str())); - if ("" != method) { - if ("MB.registerComponent" == method) { - if (message.isMember("result") && message["result"].isInt()) - { - mControllersIdStart = message["result"].asInt(); - } else - { - DBG_MSG_ERROR(("Not possible to initialize mControllersIdStart!\n")); - } - } else if ("MB.subscribeTo" == method || "MB.unregisterComponent" == method || "MB.unsubscribeFrom" == method) - { - //nothing to do for now - } else - { - controller_->processResponse(method, message); - } - } else { - DBG_MSG_ERROR(("Request with id %s has not been found!\n", id.c_str())); - } - } else { - DBG_MSG(("Message is request!\n")); - std::string method = message["method"].asString(); - std::string component_name = GetComponentName(method); - - if(component_name == "MB") { - controller_->processInternalRequest(message, this); - } else { - controller_->pushRequest(message, this); - controller_->processRequest(message); - } - } +namespace hmi_message_handler { + +WebsocketSession::WebsocketSession(boost::asio::ip::tcp::socket socket, + CMessageBrokerController* controller) + : ws_(std::move(socket)) + , strand_(ws_.get_executor()) + , controller_(controller) + , stop(false) + , m_receivingBuffer("") + , mControllersIdStart(-1) + , mControllersIdCurrent(0) + , shutdown_(false) + , thread_delegate_(new LoopThreadDelegate(&message_queue_, this)) + , thread_(threads::CreateThread("WS Async Send", thread_delegate_)) { + thread_->start(threads::ThreadOptions()); +} + +WebsocketSession::~WebsocketSession() {} + +void WebsocketSession::Accept() { + ws_.async_accept(boost::asio::bind_executor( + strand_, + std::bind( + &WebsocketSession::Recv, shared_from_this(), std::placeholders::_1))); +} + +void WebsocketSession::Shutdown() { + shutdown_ = true; + thread_delegate_->SetShutdown(); + thread_->join(); + delete thread_delegate_; + threads::DeleteThread(thread_); +} + +bool WebsocketSession::IsShuttingDown() { + return shutdown_; +} + +void WebsocketSession::Recv(boost::system::error_code ec) { + if (shutdown_) { + return; + } + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(ws_logger_, str_err); + shutdown_ = true; + thread_delegate_->SetShutdown(); + controller_->deleteController(this); + return; + } + + ws_.async_read(buffer_, + boost::asio::bind_executor(strand_, + std::bind(&WebsocketSession::Read, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2))); +} + +void WebsocketSession::Send(std::string& message, Json::Value& json_message) { + if (shutdown_) { + return; + } + std::shared_ptr<std::string> message_ptr = + std::make_shared<std::string>(message); + message_queue_.push(message_ptr); +} + +void WebsocketSession::sendJsonMessage(Json::Value& message) { + std::string str_msg = m_writer.write(message); + sync_primitives::AutoLock auto_lock(queue_lock_); + if (!isNotification(message) && !isResponse(message)) { + mWaitResponseQueue.insert(std::map<std::string, std::string>::value_type( + message["id"].asString(), message["method"].asString())); + } + + Send(str_msg, message); +} + +void WebsocketSession::Read(boost::system::error_code ec, + std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(ws_logger_, str_err); + shutdown_ = true; + thread_delegate_->SetShutdown(); + controller_->deleteController(this); + buffer_.consume(buffer_.size()); + return; + } + + std::string data = boost::beast::buffers_to_string(buffer_.data()); + m_receivingBuffer += data; + + Json::Value root; + if (!m_reader.parse(m_receivingBuffer, root)) { + std::string str_err = "Invalid JSON Message: " + data; + LOG4CXX_ERROR(ws_logger_, str_err); + return; + } + + std::string wmes = m_receiverWriter.write(root); + ssize_t beginpos = m_receivingBuffer.find(wmes); + if (-1 != beginpos) { + m_receivingBuffer.erase(0, beginpos + wmes.length()); + } else { + m_receivingBuffer.clear(); + } + onMessageReceived(root); + + buffer_.consume(buffer_.size()); + + Recv(ec); +} + +std::string WebsocketSession::GetComponentName(std::string& method) { + std::string return_string = ""; + if (method != "") { + int position = method.find("."); + if (position != -1) { + return_string = method.substr(0, position); + } + } + return return_string; +} + +void WebsocketSession::onMessageReceived(Json::Value message) { + // Determine message type and process... + Json::Value error; + if (checkMessage(message, error)) { + if (isNotification(message)) { + controller_->processNotification(message); + } else if (isResponse(message)) { + std::string id = message["id"].asString(); + std::string method = findMethodById(id); + if ("" != method) { + if ("MB.registerComponent" == method) { + if (message.isMember("result") && message["result"].isInt()) { + mControllersIdStart = message["result"].asInt(); + } else { + LOG4CXX_ERROR(ws_logger_, + "Not possible to initialize mControllersIdStart!"); + } + } else if ("MB.subscribeTo" == method || + "MB.unregisterComponent" == method || + "MB.unsubscribeFrom" == method) { + // nothing to do for now + } else { + controller_->processResponse(method, message); + } } else { - DBG_MSG_ERROR(("Message contains wrong data!\n")); - sendJsonMessage(error); - } - } - - bool WebsocketSession::isNotification(Json::Value& root) { - DBG_MSG(("CMessageBrokerController::isNotification()\n")); - bool ret = false; - if (false == root.isMember("id")) - { - ret = true; + LOG4CXX_ERROR(ws_logger_, + "Request with id: " + id + " has not been found!"); } - DBG_MSG(("Result: %d\n", ret)); - return ret; - } - - bool WebsocketSession::isResponse(Json::Value& root) { - DBG_MSG(("CMessageBrokerController::isResponse()\n")); - bool ret = false; - if ((true == root.isMember("result")) || (true == root.isMember("error"))) - { - ret = true; - } - DBG_MSG(("Result: %d\n", ret)); - return ret; - } - - std::string WebsocketSession::findMethodById(std::string id) { - DBG_MSG(("CMessageBrokerController::findMethodById()\n")); - sync_primitives::AutoLock auto_lock(queue_lock_); - std::string res = ""; - std::map <std::string, std::string>::iterator it; - it = mWaitResponseQueue.find(id); - if (it != mWaitResponseQueue.end()) - { - res = (*it).second; - mWaitResponseQueue.erase(it); - } - return res; - } - - bool WebsocketSession::checkMessage(Json::Value& root, Json::Value& error) - { - DBG_MSG(("CMessageBrokerController::checkMessage()\n")); - Json::Value err; - - try - { - /* check the JSON-RPC version => 2.0 */ - if (!root.isObject() || !root.isMember("jsonrpc") || root["jsonrpc"] != "2.0") - { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = NsMessageBroker::INVALID_REQUEST; - err["message"] = "Invalid MessageBroker request."; - error["error"] = err; - return false; - } - - if (root.isMember("id") && (root["id"].isArray() || root["id"].isObject())) - { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = NsMessageBroker::INVALID_REQUEST; - err["message"] = "Invalid MessageBroker request."; - error["error"] = err; - return false; - } - - if (root.isMember("result") && root.isMember("error")) - { - /* message can't contain simultaneously result and error*/ - return false; - } - - if (root.isMember("method")) - { - if (!root["method"].isString()) - { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = NsMessageBroker::INVALID_REQUEST; - err["message"] = "Invalid MessageBroker request."; - error["error"] = err; - return false; - } - /* Check the params is an object*/ - if (root.isMember("params") && !root["params"].isObject()) - { - error["id"] = Json::Value::null; - error["jsonrpc"] = "2.0"; - err["code"] = INVALID_REQUEST; - err["message"] = "Invalid JSONRPC params."; - error["error"] = err; - return false; - } - } else if (!root.isMember("result") && !root.isMember("error")) - { - return false; - } - return true; - } catch (...) - { - DBG_MSG_ERROR(("CMessageBrokerController::checkMessage() EXCEPTION has been caught!\n")); - return false; - } - } - - WebsocketSession::LoopThreadDelegate::LoopThreadDelegate( - MessageQueue<Message, AsyncQueue>* message_queue, - WebsocketSession* handler) - : message_queue_(*message_queue), - handler_(*handler), - shutdown_(false) { - } - - void WebsocketSession::LoopThreadDelegate::threadMain() { - while(!message_queue_.IsShuttingDown() && !shutdown_) { - DrainQueue(); - message_queue_.wait(); - } - DrainQueue(); - } + } else { + std::string method = message["method"].asString(); + std::string component_name = GetComponentName(method); - void WebsocketSession::LoopThreadDelegate::exitThreadMain() { - shutdown_= true; - if(!message_queue_.IsShuttingDown()) { - message_queue_.Shutdown(); + if (component_name == "MB") { + controller_->processInternalRequest(message, this); + } else { + controller_->pushRequest(message, this); + controller_->processRequest(message); } - } - - void WebsocketSession::LoopThreadDelegate::DrainQueue() { - while(!message_queue_.empty()) { - Message message_ptr; - message_queue_.pop(message_ptr); - if(!shutdown_) { - handler_.ws_.write(boost::asio::buffer(*message_ptr)) ; - }; + } + } else { + LOG4CXX_ERROR(ws_logger_, "Message contains wrong data!\n"); + sendJsonMessage(error); + } +} + +bool WebsocketSession::isNotification(Json::Value& root) { + bool ret = false; + if (false == root.isMember("id")) { + ret = true; + } + return ret; +} + +bool WebsocketSession::isResponse(Json::Value& root) { + bool ret = false; + if ((true == root.isMember("result")) || (true == root.isMember("error"))) { + ret = true; + } + return ret; +} + +std::string WebsocketSession::findMethodById(std::string id) { + sync_primitives::AutoLock auto_lock(queue_lock_); + std::string res = ""; + std::map<std::string, std::string>::iterator it; + it = mWaitResponseQueue.find(id); + if (it != mWaitResponseQueue.end()) { + res = (*it).second; + mWaitResponseQueue.erase(it); + } + return res; +} + +bool WebsocketSession::checkMessage(Json::Value& root, Json::Value& error) { + Json::Value err; + try { + /* check the JSON-RPC version => 2.0 */ + if (!root.isObject() || !root.isMember("jsonrpc") || + root["jsonrpc"] != "2.0") { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = hmi_message_handler::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; + } + + if (root.isMember("id") && + (root["id"].isArray() || root["id"].isObject())) { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = hmi_message_handler::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; + } + + if (root.isMember("result") && root.isMember("error")) { + /* message can't contain simultaneously result and error*/ + return false; + } + + if (root.isMember("method")) { + if (!root["method"].isString()) { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = hmi_message_handler::INVALID_REQUEST; + err["message"] = "Invalid MessageBroker request."; + error["error"] = err; + return false; } - } - - void WebsocketSession::LoopThreadDelegate::SetShutdown(){ - shutdown_ = true; - if(!message_queue_.IsShuttingDown()) { - message_queue_.Shutdown(); + /* Check the params is an object*/ + if (root.isMember("params") && !root["params"].isObject()) { + error["id"] = Json::Value::null; + error["jsonrpc"] = "2.0"; + err["code"] = INVALID_REQUEST; + err["message"] = "Invalid JSONRPC params."; + error["error"] = err; + return false; } - } + } else if (!root.isMember("result") && !root.isMember("error")) { + return false; + } + return true; + } catch (...) { + LOG4CXX_ERROR( + ws_logger_, + "CMessageBrokerController::checkMessage() EXCEPTION has been caught!"); + return false; + } +} + +WebsocketSession::LoopThreadDelegate::LoopThreadDelegate( + MessageQueue<Message, AsyncQueue>* message_queue, WebsocketSession* handler) + : message_queue_(*message_queue), handler_(*handler), shutdown_(false) {} + +void WebsocketSession::LoopThreadDelegate::threadMain() { + while (!message_queue_.IsShuttingDown() && !shutdown_) { + DrainQueue(); + message_queue_.wait(); + } + DrainQueue(); +} + +void WebsocketSession::LoopThreadDelegate::exitThreadMain() { + shutdown_ = true; + if (!message_queue_.IsShuttingDown()) { + message_queue_.Shutdown(); + } +} + +void WebsocketSession::LoopThreadDelegate::DrainQueue() { + while (!message_queue_.empty()) { + Message message_ptr; + message_queue_.pop(message_ptr); + if (!shutdown_) { + handler_.ws_.write(boost::asio::buffer(*message_ptr)); + }; + } +} + +void WebsocketSession::LoopThreadDelegate::SetShutdown() { + shutdown_ = true; + if (!message_queue_.IsShuttingDown()) { + message_queue_.Shutdown(); + } +} }
\ No newline at end of file diff --git a/src/components/hmi_message_handler/test/CMakeLists.txt b/src/components/hmi_message_handler/test/CMakeLists.txt index 9d72f3d15e..6d4041e0bf 100644 --- a/src/components/hmi_message_handler/test/CMakeLists.txt +++ b/src/components/hmi_message_handler/test/CMakeLists.txt @@ -37,10 +37,6 @@ include_directories ( ${COMPONENTS_DIR}/hmi_message_handler/test/include ) -if (HMIADAPTER STREQUAL "messagebroker") - add_dependencies("hmi_message_handler_test" Boost) -endif() - set(EXCLUDE_PATHS) set(LIBRARIES @@ -61,4 +57,9 @@ endif() collect_sources(SOURCES "${CMAKE_CURRENT_SOURCE_DIR}" "${EXCLUDE_PATHS}") +if (HMIADAPTER STREQUAL "messagebroker") + GET_PROPERTY(BOOST_LIBS_DIRECTORY GLOBAL PROPERTY GLOBAL_BOOST_LIBS) + list(APPEND LIBRARIES ${BOOST_LIBS_DIRECTORY}/libboost_system.so) +endif() + create_test(hmi_message_handler_test "${SOURCES}" "${LIBRARIES}") diff --git a/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc b/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc index bf44c3e2d1..fd459ea094 100644 --- a/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc +++ b/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc @@ -52,12 +52,12 @@ class HMIMessageHandlerImplTest : public ::testing::Test { : mb_adapter_(NULL) , hmi_handler_(NULL) , mock_hmi_message_observer_(NULL) {} - boost::asio::io_context ioc_; + protected: hmi_message_handler::MessageBrokerAdapter* mb_adapter_; hmi_message_handler::HMIMessageHandlerImpl* hmi_handler_; MockHMIMessageObserver* mock_hmi_message_observer_; - + testing::NiceMock<MockHMIMessageHandlerSettings> mock_hmi_message_handler_settings; const uint64_t stack_size = 1000u; @@ -69,7 +69,7 @@ class HMIMessageHandlerImplTest : public ::testing::Test { mock_hmi_message_handler_settings); ASSERT_TRUE(NULL != hmi_handler_); mb_adapter_ = new hmi_message_handler::MessageBrokerAdapter( - hmi_handler_, "127.0.0.1", 8087, ioc_); + hmi_handler_, "127.0.0.1", 8087); ASSERT_TRUE(NULL != mb_adapter_); mock_hmi_message_observer_ = new MockHMIMessageObserver(); ASSERT_TRUE(NULL != mock_hmi_message_observer_); @@ -125,7 +125,7 @@ TEST_F(HMIMessageHandlerImplTest, AddHMIMessageAdapter_AddExistedAdapter_ExpectAdded) { // Check before action EXPECT_TRUE(hmi_handler_->message_adapters().empty()); - // Act + // Act hmi_handler_->AddHMIMessageAdapter(mb_adapter_); // Check after action EXPECT_EQ(1u, hmi_handler_->message_adapters().size()); |