summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJackLivio <jack@livio.io>2018-02-02 17:07:26 -0500
committerJackLivio <jack@livio.io>2018-02-02 17:07:26 -0500
commit5b6515d1e63bab3e22d81cc4d2f6184026cfbbee (patch)
tree786aa7a7064d04075cbfb08378f55abc5782df93
parenteadcb0bff5b73c6010fef82c7bc34e3d5352e323 (diff)
downloadsdl_core-5b6515d1e63bab3e22d81cc4d2f6184026cfbbee.tar.gz
Fixes for PR comment
-rw-r--r--src/3rd_party/CMakeLists.txt9
-rw-r--r--src/appMain/CMakeLists.txt2
-rw-r--r--src/appMain/life_cycle.cc14
-rw-r--r--src/appMain/life_cycle.h18
-rw-r--r--src/appMain/main.cc10
-rw-r--r--src/components/hmi_message_handler/CMakeLists.txt4
-rw-r--r--src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h175
-rw-r--r--src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h10
-rw-r--r--src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h170
-rw-r--r--src/components/hmi_message_handler/src/mb_controller.cc896
-rw-r--r--src/components/hmi_message_handler/src/messagebroker_adapter.cc6
-rw-r--r--src/components/hmi_message_handler/src/websocket_session.cc610
-rw-r--r--src/components/hmi_message_handler/test/CMakeLists.txt9
-rw-r--r--src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc8
14 files changed, 932 insertions, 1009 deletions
diff --git a/src/3rd_party/CMakeLists.txt b/src/3rd_party/CMakeLists.txt
index 81a9a038be..238a28feed 100644
--- a/src/3rd_party/CMakeLists.txt
+++ b/src/3rd_party/CMakeLists.txt
@@ -321,11 +321,12 @@ endif()
if (HMIADAPTER STREQUAL "messagebroker")
find_package(Boost 1.66.0 COMPONENTS system)
- message(STATUS ${Boost_FOUND})
+ set(BOOST_LIB_SOURCE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/boost_src)
+ set(BOOST_LIBS_DIRECTORY ${3RD_PARTY_INSTALL_PREFIX}/lib)
+ SET_PROPERTY(GLOBAL PROPERTY GLOBAL_BOOST_LIBS ${BOOST_LIBS_DIRECTORY})
+ set(BOOST_INCLUDE_DIRECTORY ${3RD_PARTY_INSTALL_PREFIX}/include )
if (NOT ${Boost_FOUND})
- set(BOOST_LIB_SOURCE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/boost_src)
- set(BOOST_LIBS_DIRECTORY ${3RD_PARTY_INSTALL_PREFIX}/lib)
- set(BOOST_INCLUDE_DIRECTORY ${3RD_PARTY_INSTALL_PREFIX}/include )
+ message(STATUS "Did not find boost. Downloading and installing boost 1.66")
include(ExternalProject)
ExternalProject_Add(
Boost
diff --git a/src/appMain/CMakeLists.txt b/src/appMain/CMakeLists.txt
index 47bd286d9b..7cf1a6b01f 100644
--- a/src/appMain/CMakeLists.txt
+++ b/src/appMain/CMakeLists.txt
@@ -154,8 +154,6 @@ endif()
target_link_libraries(${PROJECT} ${LIBRARIES})
-
-
add_dependencies(${PROJECT} Policy)
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/log4cxx.properties DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
diff --git a/src/appMain/life_cycle.cc b/src/appMain/life_cycle.cc
index a3a563244a..1863fd8654 100644
--- a/src/appMain/life_cycle.cc
+++ b/src/appMain/life_cycle.cc
@@ -75,10 +75,6 @@ LifeCycle::LifeCycle(const profile::Profile& profile)
#endif // DBUS_HMIADAPTER
#ifdef MESSAGEBROKER_HMIADAPTER
, mb_adapter_(NULL)
- , message_broker_(NULL)
- , message_broker_server_(NULL)
- , mb_thread_(NULL)
- , mb_server_thread_(NULL)
, mb_adapter_thread_(NULL)
#endif // MESSAGEBROKER_HMIADAPTER
, profile_(profile) {
@@ -174,17 +170,17 @@ bool LifeCycle::StartComponents() {
}
#ifdef MESSAGEBROKER_HMIADAPTER
-bool LifeCycle::InitMessageSystem(boost::asio::io_context& ioc) {
- DCHECK(!message_broker_)
+bool LifeCycle::InitMessageSystem() {
mb_adapter_ = new hmi_message_handler::MessageBrokerAdapter(
- hmi_handler_, profile_.server_address(), profile_.server_port(), ioc);
+ hmi_handler_, profile_.server_address(), profile_.server_port());
- if(!mb_adapter_->StartListener()) {
+ if (!mb_adapter_->StartListener()) {
return false;
}
hmi_handler_->AddHMIMessageAdapter(mb_adapter_);
- mb_adapter_thread_ = new std::thread(&hmi_message_handler::MessageBrokerAdapter::Run, mb_adapter_);
+ mb_adapter_thread_ = new std::thread(
+ &hmi_message_handler::MessageBrokerAdapter::Run, mb_adapter_);
return true;
}
#endif // MESSAGEBROKER_HMIADAPTER
diff --git a/src/appMain/life_cycle.h b/src/appMain/life_cycle.h
index 9f31da827d..f6c556e50e 100644
--- a/src/appMain/life_cycle.h
+++ b/src/appMain/life_cycle.h
@@ -43,7 +43,6 @@
#endif // DBUS_HMIADAPTER
#if (defined(MESSAGEBROKER_HMIADAPTER) || defined(PASA_HMI))
#include "hmi_message_handler/messagebroker_adapter.h"
-//#include "hmi_message_handler/mb_controller.h"
#endif // #if ( defined (MESSAGEBROKER_HMIADAPTER) || defined(PASA_HMI) )
#include "application_manager/application_manager_impl.h"
#ifdef SDL_REMOTE_CONTROL
@@ -59,11 +58,6 @@
#include "telemetry_monitor/telemetry_monitor.h"
#endif
-//#if ( defined (MESSAGEBROKER_HMIADAPTER) || defined(PASA_HMI) )
-#ifdef MESSAGEBROKER_HMIADAPTER
-
-#endif // MESSAGEBROKER_HMIADAPTER
-
#ifdef ENABLE_SECURITY
namespace security_manager {
class CryptoManager;
@@ -81,14 +75,10 @@ class LifeCycle {
* Initialize MessageBroker component
* @return true if success otherwise false.
*/
- #ifdef MESSAGEBROKER_HMIADAPTER
- bool InitMessageSystem(boost::asio::io_context& ioc);
- #else
bool InitMessageSystem();
- #endif
/**
- * \brief Main loop
- */
+ * \brief Main loop
+ */
void Run();
void StopComponents();
@@ -115,10 +105,6 @@ class LifeCycle {
#ifdef MESSAGEBROKER_HMIADAPTER
hmi_message_handler::MessageBrokerAdapter* mb_adapter_;
- void* message_broker_;
- void* message_broker_server_;
- void* mb_thread_;
- void* mb_server_thread_;
std::thread* mb_adapter_thread_;
#endif // MESSAGEBROKER_HMIADAPTER
diff --git a/src/appMain/main.cc b/src/appMain/main.cc
index 120848afde..feb5b5830e 100644
--- a/src/appMain/main.cc
+++ b/src/appMain/main.cc
@@ -68,8 +68,6 @@ const std::string kBrowserName = "chromium-browser";
const std::string kBrowserParams = "--auth-schemes=basic,digest,ntlm";
const std::string kLocalHostAddress = "127.0.0.1";
-boost::asio::io_context ioc;
-
#ifdef WEB_HMI
/**
* Initialize HTML based HMI.
@@ -146,17 +144,14 @@ int32_t main(int32_t argc, char** argv) {
// --------------------------------------------------------------------------
// Third-Party components initialization.
-
- if (!life_cycle.InitMessageSystem(ioc)) {
+ if (!life_cycle.InitMessageSystem()) {
LOG4CXX_FATAL(logger_, "Failed to init message system");
life_cycle.StopComponents();
DEINIT_LOGGER();
_exit(EXIT_FAILURE);
}
LOG4CXX_INFO(logger_, "InitMessageBroker successful");
- //ioc.run();
- LOG4CXX_INFO(logger_, "IOC RUN");
if (profile_instance.launch_hmi()) {
if (profile_instance.server_address() == kLocalHostAddress) {
LOG4CXX_INFO(logger_, "Start HMI on localhost");
@@ -174,9 +169,8 @@ int32_t main(int32_t argc, char** argv) {
life_cycle.Run();
LOG4CXX_INFO(logger_, "Stop SDL due to caught signal");
-
+
life_cycle.StopComponents();
- ioc.stop();
LOG4CXX_INFO(logger_, "Application has been stopped successfuly");
DEINIT_LOGGER();
diff --git a/src/components/hmi_message_handler/CMakeLists.txt b/src/components/hmi_message_handler/CMakeLists.txt
index 4d799b92dd..13d5177046 100644
--- a/src/components/hmi_message_handler/CMakeLists.txt
+++ b/src/components/hmi_message_handler/CMakeLists.txt
@@ -54,11 +54,8 @@ include_directories (
set(PATHS
${CMAKE_CURRENT_SOURCE_DIR}/include
${CMAKE_CURRENT_SOURCE_DIR}/src
-
)
-message(${PATHS})
-
if (HMIADAPTER STREQUAL "dbus")
set(EXCLUDE_PATHS)
set(DBUS_ADAPTER DBus)
@@ -79,7 +76,6 @@ set(LIBRARIES
${RTLIB}
)
-
add_library("HMIMessageHandler" ${SOURCES})
if (HMIADAPTER STREQUAL "messagebroker")
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
index 6ef5dc4e3e..98d5260259 100644
--- a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
+++ b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
@@ -1,5 +1,5 @@
-#ifndef MB_CONTROLLER_H
-#define MB_CONTROLLER_H
+#ifndef MB_CONTROLLER_H
+#define MB_CONTROLLER_H
#include <iostream>
#include <boost/beast/core.hpp>
@@ -20,146 +20,137 @@
#include <vector>
#include <map>
#include "json/json.h"
-
+#include "utils/macro.h"
#include "utils/lock.h"
#include "utils/atomic_object.h"
+#include "websocket_session.h"
using namespace boost::beast::websocket;
-#ifdef DEBUG_ON
-/**
-* \def DBG_MSG
-* \brief Debug message output with file name and line number.
-* \param x formatted debug message.
-* \return printf construction.
-*/
-#define DBG_MSG(x) printf("%s:%d ", __FILE__, __LINE__);\
- printf x
-#else
-#define DBG_MSG(x)
-#endif
-
-#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d ", __FILE__, __LINE__);\
- printf x
-
-
-namespace NsMessageBroker {
-
- enum ErrorCode {
- CONTROLLER_EXISTS = -32000,
- SUBSCRIBTION_EXISTS = -32001,
- INVALID_REQUEST = -32600
- };
-
- class WebsocketSession;
-
- class CMessageBrokerController : public std::enable_shared_from_this<CMessageBrokerController> {
- public:
- CMessageBrokerController(const std::string& address, uint16_t port, std::string name, int num_ports, boost::asio::io_context& ioc);
+namespace hmi_message_handler {
- ~CMessageBrokerController();
+CREATE_LOGGERPTR_GLOBAL(mb_logger_, "HMIMessageHandler")
- bool StartListener();
+enum ErrorCode {
+ CONTROLLER_EXISTS = -32000,
+ SUBSCRIBTION_EXISTS = -32001,
+ INVALID_REQUEST = -32600
+};
- bool Run();
+class WebsocketSession;
- void WaitForConnection();
+class CMessageBrokerController
+ : public std::enable_shared_from_this<CMessageBrokerController> {
+ public:
+ CMessageBrokerController(const std::string& address,
+ uint16_t port,
+ std::string name,
+ int num_ports);
- void StartSession(boost::system::error_code ec);
+ ~CMessageBrokerController();
- void OnAccept(boost::system::error_code ec, boost::asio::strand<boost::asio::io_context::executor_type>& strand, stream<boost::asio::ip::tcp::socket>& ws);
+ bool StartListener();
- void OnRead(boost::system::error_code ec, std::size_t bytes_transferred);
+ bool Run();
- bool isNotification(Json::Value& message);
+ void WaitForConnection();
- void sendNotification(Json::Value& message);
+ void StartSession(boost::system::error_code ec);
- bool isResponse(Json::Value& message);
+ void OnAccept(
+ boost::system::error_code ec,
+ boost::asio::strand<boost::asio::io_context::executor_type>& strand,
+ stream<boost::asio::ip::tcp::socket>& ws);
- void sendResponse(Json::Value& message);
+ void OnRead(boost::system::error_code ec, std::size_t bytes_transferred);
- void sendJsonMessage(Json::Value& message);
+ bool isNotification(Json::Value& message);
- void subscribeTo(std::string property);
+ void sendNotification(Json::Value& message);
- void registerController(int id = 0);
+ bool isResponse(Json::Value& message);
- void unregisterController();
+ void sendResponse(Json::Value& message);
- void* MethodForReceiverThread(void * arg);
+ void sendJsonMessage(Json::Value& message);
- bool Connect();
+ void subscribeTo(std::string property);
- void exitReceivingThread();
+ void registerController(int id = 0);
- virtual void processResponse(std::string method, Json::Value& root) = 0;
+ void unregisterController();
- virtual void processRequest(Json::Value& root) = 0;
+ void* MethodForReceiverThread(void* arg);
- virtual void processNotification(Json::Value& root) = 0;
+ bool Connect();
- std::string getMethodName(std::string& method);
+ void exitReceivingThread();
- std::string GetComponentName(std::string& method);
+ virtual void processResponse(std::string method, Json::Value& root) = 0;
- void processInternalRequest(Json::Value& message, WebsocketSession* ws_session );
+ virtual void processRequest(Json::Value& root) = 0;
- void pushRequest(Json::Value& message, WebsocketSession* ws_session);
+ virtual void processNotification(Json::Value& root) = 0;
- //Registry
- bool addController(WebsocketSession* ws_session, std::string name);
+ std::string getMethodName(std::string& method);
- void deleteController(WebsocketSession* ws_session);
+ std::string GetComponentName(std::string& method);
- void deleteController(std::string name);
+ void processInternalRequest(Json::Value& message,
+ WebsocketSession* ws_session);
- void removeSubscribersBySession(const WebsocketSession* ws);
+ void pushRequest(Json::Value& message, WebsocketSession* ws_session);
- bool addSubscriber(WebsocketSession* ws_session, std::string name);
+ // Registry
+ bool addController(WebsocketSession* ws_session, std::string name);
- void deleteSubscriber(WebsocketSession* ws, std::string name);
+ void deleteController(WebsocketSession* ws_session);
- int getSubscribersFd(std::string name, std::vector<WebsocketSession*>& result);
+ void deleteController(std::string name);
- int getNextControllerIdDiapason();
+ void removeSubscribersBySession(const WebsocketSession* ws);
- private:
- boost::asio::io_context ioc_;
- const std::string& address_;
- uint16_t port_;
- std::string name_;
- int num_threads_;
- std::vector<std::thread> thread_vector_;
-
- boost::asio::ip::tcp::acceptor acceptor_;
- boost::asio::ip::tcp::socket socket_;
- boost::beast::multi_buffer buffer_;
- boost::asio::ip::tcp::endpoint endpoint_;
+ bool addSubscriber(WebsocketSession* ws_session, std::string name);
- int mControllersIdCounter;
+ void deleteSubscriber(WebsocketSession* ws, std::string name);
- //Registry
- std::vector <std::shared_ptr<NsMessageBroker::WebsocketSession>> mConnectionList;
- sync_primitives::Lock mConnectionListLock;
+ int getSubscribersFd(std::string name,
+ std::vector<WebsocketSession*>& result);
- std::map <std::string, WebsocketSession*> mControllersList;
- sync_primitives::Lock mControllersListLock;
+ int getNextControllerId();
- std::multimap <std::string, WebsocketSession*> mSubscribersList;
- sync_primitives::Lock mSubscribersListLock;
+ private:
+ boost::asio::io_context ioc_;
+ const std::string& address_;
+ uint16_t port_;
+ std::string name_;
+ int num_threads_;
+ std::vector<std::thread> thread_vector_;
- std::map <std::string, WebsocketSession*> mRequestList;
- sync_primitives::Lock mRequestListLock;
+ boost::asio::ip::tcp::acceptor acceptor_;
+ boost::asio::ip::tcp::socket socket_;
+ boost::beast::multi_buffer buffer_;
+ boost::asio::ip::tcp::endpoint endpoint_;
- std::atomic_bool shutdown_;
+ int mControllersIdCounter;
+ // Registry
+ std::vector<std::shared_ptr<hmi_message_handler::WebsocketSession> >
+ mConnectionList;
+ sync_primitives::Lock mConnectionListLock;
- };
+ std::map<std::string, WebsocketSession*> mControllersList;
+ sync_primitives::Lock mControllersListLock;
-} //NsMessageBroker
+ std::multimap<std::string, WebsocketSession*> mSubscribersList;
+ sync_primitives::Lock mSubscribersListLock;
+ std::map<std::string, WebsocketSession*> mRequestList;
+ sync_primitives::Lock mRequestListLock;
+ std::atomic_bool shutdown_;
+};
+} // hmi_message_handler
#endif /* MB_CONTROLLER_H */ \ No newline at end of file
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h b/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h
index 81ab823e1c..423f331297 100644
--- a/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h
+++ b/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h
@@ -41,14 +41,14 @@
namespace hmi_message_handler {
-class MessageBrokerAdapter : public HMIMessageAdapterImpl,
- public NsMessageBroker::CMessageBrokerController,
- public threads::SingleThreadValidator {
+class MessageBrokerAdapter
+ : public HMIMessageAdapterImpl,
+ public hmi_message_handler::CMessageBrokerController,
+ public threads::SingleThreadValidator {
public:
MessageBrokerAdapter(HMIMessageHandler* handler_param,
const std::string& server_address,
- uint16_t port,
- boost::asio::io_context& ioc);
+ uint16_t port);
~MessageBrokerAdapter();
void SendMessageToHMI(MessageSharedPointer message);
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h b/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h
index 24d85dbaff..c084bd7769 100644
--- a/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h
+++ b/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h
@@ -1,5 +1,5 @@
-#ifndef WEBSOCKET_SESSION_H
-#define WEBSOCKET_SESSION_H
+#ifndef WEBSOCKET_SESSION_H
+#define WEBSOCKET_SESSION_H
#include <iostream>
#include <boost/beast/core.hpp>
@@ -21,162 +21,158 @@
#include <mutex>
#include <queue>
#include "json/json.h"
-
+#include "utils/macro.h"
#include "utils/lock.h"
#include "utils/atomic_object.h"
#include "utils/threads/thread.h"
#include "utils/threads/message_loop_thread.h"
#include "utils/message_queue.h"
-//#include "hmi_message_handler/mb_controller.h"
-
using namespace boost::beast::websocket;
using ::utils::MessageQueue;
-#ifdef DEBUG_ON
+#ifdef DEBUG_ON
/**
* \def DBG_MSG
* \brief Debug message output with file name and line number.
* \param x formatted debug message.
* \return printf construction.
*/
-#define DBG_MSG(x) printf("%s:%d ", __FILE__, __LINE__);\
- printf x
+#define DBG_MSG(x) \
+ printf("%s:%d ", __FILE__, __LINE__); \
+ printf x
#else
#define DBG_MSG(x)
#endif
-#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d ", __FILE__, __LINE__);\
- printf x
+#define DBG_MSG_ERROR(x) \
+ printf("ERROR!!! %s:%d ", __FILE__, __LINE__); \
+ printf x
-typedef std::queue<std::shared_ptr<std::string>> AsyncQueue;
+typedef std::queue<std::shared_ptr<std::string> > AsyncQueue;
typedef std::shared_ptr<std::string> Message;
+namespace hmi_message_handler {
-namespace NsMessageBroker {
-
- class CMessageBrokerController;
+CREATE_LOGGERPTR_GLOBAL(ws_logger_, "HMIMessageHandler")
- class WebsocketSession :
- public std::enable_shared_from_this<WebsocketSession> {
+class CMessageBrokerController;
+class WebsocketSession : public std::enable_shared_from_this<WebsocketSession> {
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
boost::beast::multi_buffer buffer_;
boost::beast::multi_buffer send_buffer_;
- CMessageBrokerController* controller_;
-
- public:
- WebsocketSession(boost::asio::ip::tcp::socket socket, CMessageBrokerController* controller);
-
- ~WebsocketSession();
+ CMessageBrokerController* controller_;
- void Accept();
+ public:
+ WebsocketSession(boost::asio::ip::tcp::socket socket,
+ CMessageBrokerController* controller);
- void Shutdown();
+ ~WebsocketSession();
- bool IsShuttingDown();
+ void Accept();
- void Recv(boost::system::error_code ec);
+ void Shutdown();
- void Send(std::string& message, Json::Value& json_message);
+ bool IsShuttingDown();
- void SendFromQueue();
+ void Recv(boost::system::error_code ec);
- void sendJsonMessage(Json::Value& message);
+ void Send(std::string& message, Json::Value& json_message);
- void OnWrite(boost::system::error_code ec, std::size_t bytes_transferred, std::shared_ptr<std::string> message);
+ void SendFromQueue();
- void Read(boost::system::error_code ec, std::size_t bytes_transferred);
+ void sendJsonMessage(Json::Value& message);
- int getNextMessageId();
+ void OnWrite(boost::system::error_code ec,
+ std::size_t bytes_transferred,
+ std::shared_ptr<std::string> message);
- void prepareMessage(Json::Value& root);
+ void Read(boost::system::error_code ec, std::size_t bytes_transferred);
- void prepareErrorMessage(int errCode, std::string errMessage, Json::Value& error);
+ int getNextMessageId();
- std::string getDestinationComponentName(Json::Value& root);
+ void prepareMessage(Json::Value& root);
- bool isNotification(Json::Value& root);
+ void prepareErrorMessage(int errCode,
+ std::string errMessage,
+ Json::Value& error);
- bool isResponse(Json::Value& root);
+ std::string getDestinationComponentName(Json::Value& root);
- std::string findMethodById(std::string id);
+ bool isNotification(Json::Value& root);
- void registerController(int id = 0);
+ bool isResponse(Json::Value& root);
- void unregisterController();
+ std::string findMethodById(std::string id);
- void subscribeTo(std::string property);
+ void registerController(int id = 0);
- void unsubscribeFrom(std::string property);
+ void unregisterController();
- bool checkMessage(Json::Value& root, Json::Value& error);
+ void subscribeTo(std::string property);
- std::string getControllersName();
+ void unsubscribeFrom(std::string property);
- std::string GetComponentName(std::string& method);
+ bool checkMessage(Json::Value& root, Json::Value& error);
- protected:
+ std::string getControllersName();
- sync_primitives::atomic_bool stop;
+ std::string GetComponentName(std::string& method);
- private:
- void onMessageReceived(Json::Value message);
+ protected:
+ sync_primitives::atomic_bool stop;
- std::map<std::string, std::string> mWaitResponseQueue;
+ private:
+ void onMessageReceived(Json::Value message);
- std::string m_receivingBuffer;
+ std::map<std::string, std::string> mWaitResponseQueue;
- int mControllersIdStart;
+ std::string m_receivingBuffer;
- int mControllersIdCurrent;
+ int mControllersIdStart;
- Json::Reader m_reader;
- Json::FastWriter m_writer;
- Json::FastWriter m_receiverWriter;
-
+ int mControllersIdCurrent;
- sync_primitives::Lock queue_lock_;
- sync_primitives::Lock message_queue_lock_;
- std::atomic_bool shutdown_;
+ Json::Reader m_reader;
+ Json::FastWriter m_writer;
+ Json::FastWriter m_receiverWriter;
+ sync_primitives::Lock queue_lock_;
+ sync_primitives::Lock message_queue_lock_;
+ std::atomic_bool shutdown_;
- MessageQueue<Message, AsyncQueue> message_queue_;
+ MessageQueue<Message, AsyncQueue> message_queue_;
- class LoopThreadDelegate : public threads::ThreadDelegate {
- public:
- LoopThreadDelegate(MessageQueue<Message, AsyncQueue>* message_queue,
- WebsocketSession* handler);
+ class LoopThreadDelegate : public threads::ThreadDelegate {
+ public:
+ LoopThreadDelegate(MessageQueue<Message, AsyncQueue>* message_queue,
+ WebsocketSession* handler);
- virtual void threadMain() OVERRIDE;
- virtual void exitThreadMain() OVERRIDE;
+ virtual void threadMain() OVERRIDE;
+ virtual void exitThreadMain() OVERRIDE;
- void OnWrite();
+ void OnWrite();
- void SetShutdown();
+ void SetShutdown();
- private:
- void DrainQueue();
- MessageQueue<Message, AsyncQueue>& message_queue_;
- WebsocketSession& handler_;
- sync_primitives::Lock queue_lock_;
- sync_primitives::ConditionalVariable queue_new_items_;
- std::atomic_bool write_pending_;
- std::atomic_bool shutdown_;
-
- sync_primitives::Lock write_lock_;
-
- };
-
- LoopThreadDelegate* thread_delegate_;
- threads::Thread* thread_;
+ private:
+ void DrainQueue();
+ MessageQueue<Message, AsyncQueue>& message_queue_;
+ WebsocketSession& handler_;
+ sync_primitives::Lock queue_lock_;
+ sync_primitives::ConditionalVariable queue_new_items_;
+ std::atomic_bool write_pending_;
+ std::atomic_bool shutdown_;
+ sync_primitives::Lock write_lock_;
};
-} //NsMessageBroker
-
-
+ LoopThreadDelegate* thread_delegate_;
+ threads::Thread* thread_;
+};
+} // hmi_message_handler
#endif /* WEBSOCKET_SESSION_H */ \ No newline at end of file
diff --git a/src/components/hmi_message_handler/src/mb_controller.cc b/src/components/hmi_message_handler/src/mb_controller.cc
index 871121fb4b..ec0d1f8bd8 100644
--- a/src/components/hmi_message_handler/src/mb_controller.cc
+++ b/src/components/hmi_message_handler/src/mb_controller.cc
@@ -1,479 +1,475 @@
#include "hmi_message_handler/mb_controller.h"
-#include "websocket_session.cc"
using namespace boost::beast::websocket;
-namespace NsMessageBroker
-{
- CMessageBrokerController::CMessageBrokerController(
- const std::string& address, uint16_t port, std::string name,
- int num_threads, boost::asio::io_context& ioc):address_(address), acceptor_(ioc_), socket_(ioc_), mControllersIdCounter(1)
- {
-
- port_ = port;
- name_ = name;
- num_threads_ = num_threads;
- endpoint_ = {boost::asio::ip::make_address(address), static_cast<unsigned short>(port)};
- shutdown_ = false;
-
- }
-
- CMessageBrokerController::~CMessageBrokerController()
- {
- boost::system::error_code ec;
- socket_.close();
- acceptor_.close(ec);
- if(ec) {
- std::cerr << "ErrorMessage Close: " << ": " << ec.message() << "\n";
- }
- shutdown_ = true;
- ioc_.stop();
- }
-
- bool CMessageBrokerController::StartListener() {
- boost::system::error_code error;
- acceptor_.open(endpoint_.protocol(), error);
- if(error) {
- std::cerr << "ErrorOpen: " << ": " << error.message() << "\n";
- return false;
- }
-
- acceptor_.set_option(boost::asio::socket_base::reuse_address(true), error);
- if(error) {
- std::cerr << "ErrorSetOption: " << ": " << error.message() << "\n";
- return false;
- }
- acceptor_.bind(endpoint_, error);
- if(error) {
- std::cerr << "ErrorBind: " << ": " << error.message() << "\n";
- return false;
- }
- acceptor_.listen(boost::asio::socket_base::max_listen_connections, error);
- if(error) {
- std::cerr << "ErrorListen: " << ": " << error.message() << "\n";
- return false;
- }
- return true;
- }
-
- bool CMessageBrokerController::Run(){
- if(acceptor_.is_open() && !shutdown_) {
- acceptor_.async_accept(
- socket_,
- std::bind(
- &CMessageBrokerController::StartSession,
- this,
- std::placeholders::_1
- ));
- ioc_.run();
- return true;
+namespace hmi_message_handler {
+
+CMessageBrokerController::CMessageBrokerController(const std::string& address,
+ uint16_t port,
+ std::string name,
+ int num_threads)
+ : address_(address)
+ , acceptor_(ioc_)
+ , socket_(ioc_)
+ , mControllersIdCounter(1) {
+ port_ = port;
+ name_ = name;
+ num_threads_ = num_threads;
+ endpoint_ = {boost::asio::ip::make_address(address),
+ static_cast<unsigned short>(port)};
+ shutdown_ = false;
+}
- }
- return false;
- }
-
- void CMessageBrokerController::WaitForConnection() {
- if(acceptor_.is_open() && !shutdown_) {
- acceptor_.async_accept(
- socket_,
- std::bind(
- &CMessageBrokerController::StartSession,
- this,
- std::placeholders::_1
- )
- );
- }
- }
+CMessageBrokerController::~CMessageBrokerController() {
+ boost::system::error_code ec;
+ socket_.close();
+ acceptor_.close(ec);
+ if (ec) {
+ std::string str_err = "ErrorMessage Close: " + ec.message();
+ LOG4CXX_ERROR(mb_logger_, str_err);
+ }
+ shutdown_ = true;
+ ioc_.stop();
+}
- void CMessageBrokerController::StartSession(boost::system::error_code ec){
- if(ec){
- std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n";
- ioc_.stop();
- return;
- }
- if(shutdown_) {
- return;
- }
- std::shared_ptr<WebsocketSession> ws_ptr = std::make_shared<WebsocketSession>(std::move(socket_), this);
- ws_ptr->Accept();
-
- mConnectionListLock.Acquire();
- mConnectionList.push_back(ws_ptr);
- mConnectionListLock.Release();
-
- WaitForConnection();
-
- }
-
-
- bool CMessageBrokerController::isNotification(Json::Value& message) {
- DBG_MSG(("CMessageBroker::isNotification()\n"));
- bool ret = false;
- if (false == message.isMember("id")) {
- ret = true;
- }
- DBG_MSG(("Result: %d\n", ret));
- return ret;
- }
-
- void CMessageBrokerController::sendNotification(Json::Value& message) {
- DBG_MSG(("CMessageBroker::processNotification()\n"));
- std::string methodName = message["method"].asString();
- DBG_MSG(("Property: %s\n", methodName.c_str()));
- std::vector<WebsocketSession*> result;
- int subscribersCount = getSubscribersFd(methodName, result);
- if (0 < subscribersCount) {
- std::vector<WebsocketSession*>::iterator it;
- for (it = result.begin(); it != result.end(); it++) {
- (*it)->sendJsonMessage(message);
- }
- } else {
- DBG_MSG(("No subscribers for this property!\n"));
- }
+bool CMessageBrokerController::StartListener() {
+ boost::system::error_code error;
+ acceptor_.open(endpoint_.protocol(), error);
+ if (error) {
+ std::string str_err = "ErrorOpen: " + error.message();
+ LOG4CXX_ERROR(mb_logger_, str_err);
+ return false;
+ }
+
+ acceptor_.set_option(boost::asio::socket_base::reuse_address(true), error);
+ if (error) {
+ std::string str_err = "ErrorSetOption: " + error.message();
+ LOG4CXX_ERROR(mb_logger_, str_err);
+ return false;
+ }
+ acceptor_.bind(endpoint_, error);
+ if (error) {
+ std::string str_err = "ErrorBind: " + error.message();
+ LOG4CXX_ERROR(mb_logger_, str_err);
+ return false;
+ }
+ acceptor_.listen(boost::asio::socket_base::max_listen_connections, error);
+ if (error) {
+ std::string str_err = "ErrorListen: " + error.message();
+ LOG4CXX_ERROR(mb_logger_, str_err);
+ return false;
+ }
+ return true;
+}
- }
+bool CMessageBrokerController::Run() {
+ if (acceptor_.is_open() && !shutdown_) {
+ acceptor_.async_accept(socket_,
+ std::bind(&CMessageBrokerController::StartSession,
+ this,
+ std::placeholders::_1));
+ ioc_.run();
+ return true;
+ }
+ return false;
+}
- bool CMessageBrokerController::isResponse(Json::Value& message) {
- DBG_MSG(("CMessageBroker::isResponse()\n"));
- bool ret = false;
- if ((true == message.isMember("result")) || (true == message.isMember("error"))) {
- ret = true;
- }
- DBG_MSG(("Result: %d\n", ret));
- return ret;
- }
-
- void CMessageBrokerController::sendResponse(Json::Value& message) {
- WebsocketSession* ws;
- std::map <std::string, WebsocketSession*>::iterator it;
- sync_primitives::AutoLock request_lock(mRequestListLock);
-
- std::string id = message["id"].asString();
- it = mRequestList.find(id);
- if (it != mRequestList.end()) {
- ws = it->second;
- ws->sendJsonMessage(message);
- mRequestList.erase(it);
- }
- }
+void CMessageBrokerController::WaitForConnection() {
+ if (acceptor_.is_open() && !shutdown_) {
+ acceptor_.async_accept(socket_,
+ std::bind(&CMessageBrokerController::StartSession,
+ this,
+ std::placeholders::_1));
+ }
+}
- void CMessageBrokerController::sendJsonMessage(Json::Value& message){
+void CMessageBrokerController::StartSession(boost::system::error_code ec) {
+ if (ec) {
+ std::string str_err = "ErrorMessage: " + ec.message();
+ LOG4CXX_ERROR(mb_logger_, str_err);
+ ioc_.stop();
+ return;
+ }
+ if (shutdown_) {
+ return;
+ }
+ std::shared_ptr<WebsocketSession> ws_ptr =
+ std::make_shared<WebsocketSession>(std::move(socket_), this);
+ ws_ptr->Accept();
+
+ mConnectionListLock.Acquire();
+ mConnectionList.push_back(ws_ptr);
+ mConnectionListLock.Release();
+
+ WaitForConnection();
+}
- if(isNotification(message)) {
- sendNotification(message);
- return;
- } else if (isResponse(message)) {
- sendResponse(message);
- return;
- }
+bool CMessageBrokerController::isNotification(Json::Value& message) {
+ bool ret = false;
+ if (false == message.isMember("id")) {
+ ret = true;
+ }
+ return ret;
+}
- //Send request
- WebsocketSession* ws;
- std::map <std::string, WebsocketSession*>::iterator it;
- std::string method = message["method"].asString();
- std::string component_name = GetComponentName(method);
-
- sync_primitives::AutoLock lock(mControllersListLock);
- it = mControllersList.find(component_name);
- if (it != mControllersList.end()) {
- ws = it->second;
- ws->sendJsonMessage(message);
- }
- }
+void CMessageBrokerController::sendNotification(Json::Value& message) {
+ std::string methodName = message["method"].asString();
+ std::vector<WebsocketSession*> result;
+ int subscribersCount = getSubscribersFd(methodName, result);
+ if (0 < subscribersCount) {
+ std::vector<WebsocketSession*>::iterator it;
+ for (it = result.begin(); it != result.end(); it++) {
+ (*it)->sendJsonMessage(message);
+ }
+ } else {
+ LOG4CXX_ERROR(mb_logger_, ("No subscribers for this property!\n"));
+ }
+}
+
+bool CMessageBrokerController::isResponse(Json::Value& message) {
+ bool ret = false;
+ if ((true == message.isMember("result")) ||
+ (true == message.isMember("error"))) {
+ ret = true;
+ }
+ return ret;
+}
- void CMessageBrokerController::subscribeTo(std::string property){}
+void CMessageBrokerController::sendResponse(Json::Value& message) {
+ WebsocketSession* ws;
+ std::map<std::string, WebsocketSession*>::iterator it;
+ sync_primitives::AutoLock request_lock(mRequestListLock);
+
+ std::string id = message["id"].asString();
+ it = mRequestList.find(id);
+ if (it != mRequestList.end()) {
+ ws = it->second;
+ ws->sendJsonMessage(message);
+ mRequestList.erase(it);
+ }
+}
- void CMessageBrokerController::registerController(int id){}
+void CMessageBrokerController::sendJsonMessage(Json::Value& message) {
+ if (isNotification(message)) {
+ sendNotification(message);
+ return;
+ } else if (isResponse(message)) {
+ sendResponse(message);
+ return;
+ }
+
+ // Send request
+ WebsocketSession* ws;
+ std::map<std::string, WebsocketSession*>::iterator it;
+ std::string method = message["method"].asString();
+ std::string component_name = GetComponentName(method);
+
+ sync_primitives::AutoLock lock(mControllersListLock);
+ it = mControllersList.find(component_name);
+ if (it != mControllersList.end()) {
+ ws = it->second;
+ ws->sendJsonMessage(message);
+ }
+}
- void CMessageBrokerController::unregisterController(){}
+void CMessageBrokerController::subscribeTo(std::string property) {}
- void* CMessageBrokerController::MethodForReceiverThread(void * arg){
- return NULL;
- }
+void CMessageBrokerController::registerController(int id) {}
- bool CMessageBrokerController::Connect(){
- return true;
- }
+void CMessageBrokerController::unregisterController() {}
- void CMessageBrokerController::exitReceivingThread(){
- shutdown_ = true;
- mConnectionListLock.Acquire();
- std::vector<std::shared_ptr<NsMessageBroker::WebsocketSession>>::iterator it;
- for(it = mConnectionList.begin(); it != mConnectionList.end(); it++) {
- (*it)->Shutdown();
- mConnectionList.erase(it);
- }
- mConnectionListLock.Release();
+void* CMessageBrokerController::MethodForReceiverThread(void* arg) {
+ return NULL;
+}
- boost::system::error_code ec;
- socket_.close();
- acceptor_.cancel(ec);
- if(ec) {
- std::cerr << "ErrorMessage Cancel: " << ": " << ec.message() << "\n";
- }
- acceptor_.close(ec);
- if(ec) {
- std::cerr << "ErrorMessage Close: " << ": " << ec.message() << "\n";
- }
- ioc_.stop();
- }
-
- std::string CMessageBrokerController::getMethodName(std::string& method) {
- std::string return_string="";
- if(method != "") {
- int position = method.find(".");
- if(position != -1) {
- return_string = method.substr(position + 1);
- }
- }
- return return_string;
- }
-
- std::string CMessageBrokerController::GetComponentName(std::string& method) {
- std::string return_string="";
- if(method != "") {
- int position = method.find(".");
- if(position != -1) {
- return_string = method.substr(0, position);
- }
- }
- return return_string;
- }
-
- bool CMessageBrokerController::addController(WebsocketSession* ws_session, std::string name){
- bool result = false;
- std::map <std::string, WebsocketSession*>::iterator it;
-
- sync_primitives::AutoLock lock(mControllersListLock);
- it = mControllersList.find(name);
- if (it == mControllersList.end()) {
- mControllersList.insert(std::map <std::string, WebsocketSession*>::value_type(name, ws_session));
- result = true;
+bool CMessageBrokerController::Connect() {
+ return true;
+}
+
+void CMessageBrokerController::exitReceivingThread() {
+ shutdown_ = true;
+ mConnectionListLock.Acquire();
+ std::vector<std::shared_ptr<hmi_message_handler::WebsocketSession> >::iterator
+ it;
+ for (it = mConnectionList.begin(); it != mConnectionList.end();) {
+ (*it)->Shutdown();
+ mConnectionList.erase(it++);
+ }
+ mConnectionListLock.Release();
+
+ boost::system::error_code ec;
+ socket_.close();
+ acceptor_.cancel(ec);
+ if (ec) {
+ std::string str_err = "ErrorMessage Cancel: " + ec.message();
+ LOG4CXX_ERROR(mb_logger_, str_err);
+ }
+ acceptor_.close(ec);
+ if (ec) {
+ std::string str_err = "ErrorMessage Close: " + ec.message();
+ }
+ ioc_.stop();
+}
+
+std::string CMessageBrokerController::getMethodName(std::string& method) {
+ std::string return_string = "";
+ if (method != "") {
+ int position = method.find(".");
+ if (position != -1) {
+ return_string = method.substr(position + 1);
+ }
+ }
+ return return_string;
+}
+
+std::string CMessageBrokerController::GetComponentName(std::string& method) {
+ std::string return_string = "";
+ if (method != "") {
+ int position = method.find(".");
+ if (position != -1) {
+ return_string = method.substr(0, position);
+ }
+ }
+ return return_string;
+}
+
+bool CMessageBrokerController::addController(WebsocketSession* ws_session,
+ std::string name) {
+ bool result = false;
+ std::map<std::string, WebsocketSession*>::iterator it;
+
+ sync_primitives::AutoLock lock(mControllersListLock);
+ it = mControllersList.find(name);
+ if (it == mControllersList.end()) {
+ mControllersList.insert(
+ std::map<std::string, WebsocketSession*>::value_type(name, ws_session));
+ result = true;
+ } else {
+ LOG4CXX_ERROR(mb_logger_, ("Controller already exists!\n"));
+ }
+ return result;
+}
+
+void CMessageBrokerController::deleteController(WebsocketSession* ws_session) {
+ {
+ sync_primitives::AutoLock lock(mControllersListLock);
+ std::map<std::string, WebsocketSession*>::iterator it;
+ for (it = mControllersList.begin(); it != mControllersList.end();) {
+ if (it->second == ws_session) {
+ mControllersList.erase(it++);
} else {
- DBG_MSG(("Controller already exists!\n"));
+ it++;
}
+ }
+ }
+ removeSubscribersBySession(ws_session);
+}
- DBG_MSG(("Count of controllers: %zu\n", mControllersList.size()));
- return result;
- }
-
- void CMessageBrokerController::deleteController(WebsocketSession* ws_session) {
- {
- sync_primitives::AutoLock lock(mControllersListLock);
- std::map <std::string, WebsocketSession*>::iterator it;
- for(it = mControllersList.begin(); it != mControllersList.end(); it++) {
- if(it->second == ws_session) {
- mControllersList.erase(it);
- }
- }
- }
- removeSubscribersBySession(ws_session);
- }
-
- void CMessageBrokerController::deleteController(std::string name) {
- std::map <std::string, WebsocketSession*>::iterator it;
- WebsocketSession* ws;
- {
- sync_primitives::AutoLock lock(mControllersListLock);
- it = mControllersList.find(name);
- if (it != mControllersList.end())
- {
- ws = it->second;
- mControllersList.erase(it);
- } else {
- return;
- }
- }
- removeSubscribersBySession(ws);
- }
-
- void CMessageBrokerController::removeSubscribersBySession(const WebsocketSession* ws) {
- sync_primitives::AutoLock lock(mSubscribersListLock);
- std::multimap <std::string, WebsocketSession*>::iterator it_s = mSubscribersList.begin();
- for (; it_s !=mSubscribersList.end(); ) {
- if (it_s->second == ws) {
- mSubscribersList.erase(it_s++);
- } else {
- ++it_s;
- }
- }
- }
-
- void CMessageBrokerController::pushRequest(Json::Value& message, WebsocketSession* ws_session) {
- sync_primitives::AutoLock lock(mRequestListLock);
- std::string id = message["id"].asString();
- mRequestList.insert(std::map <std::string, WebsocketSession*>::value_type(id, ws_session));
- }
-
- bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session, std::string name) {
- bool result = true;
- sync_primitives::AutoLock lock(mSubscribersListLock);
- std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name);
- if (p.first != p.second)
- {
- std::multimap <std::string, WebsocketSession*>::iterator itr;
- for (itr = p.first; itr != p.second; itr++)
- {
- if (ws_session == itr->second)
- {
- result = false;
- DBG_MSG(("Subscriber already exists!\n"));
- }
- }
- }
- if (result)
- {
- mSubscribersList.insert(std::map <std::string, WebsocketSession*>::value_type(name, ws_session));
- }
- DBG_MSG(("Count of subscribers: %zu\n", mSubscribersList.size()));
- return result;
- }
-
- void CMessageBrokerController::deleteSubscriber(WebsocketSession* ws, std::string name) {
-
- sync_primitives::AutoLock lock(mSubscribersListLock);
- std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name);
- if (p.first != p.second) {
- std::multimap <std::string, WebsocketSession*>::iterator itr;
- for (itr = p.first; itr != p.second; ) {
- if (ws == itr->second) {
- mSubscribersList.erase(itr++);
- } else {
- ++itr;
- }
- }
- }
+void CMessageBrokerController::deleteController(std::string name) {
+ std::map<std::string, WebsocketSession*>::iterator it;
+ WebsocketSession* ws;
+ {
+ sync_primitives::AutoLock lock(mControllersListLock);
+ it = mControllersList.find(name);
+ if (it != mControllersList.end()) {
+ ws = it->second;
+ mControllersList.erase(it);
+ } else {
+ return;
+ }
+ }
+ removeSubscribersBySession(ws);
+}
+
+void CMessageBrokerController::removeSubscribersBySession(
+ const WebsocketSession* ws) {
+ sync_primitives::AutoLock lock(mSubscribersListLock);
+ std::multimap<std::string, WebsocketSession*>::iterator it_s =
+ mSubscribersList.begin();
+ for (; it_s != mSubscribersList.end();) {
+ if (it_s->second == ws) {
+ mSubscribersList.erase(it_s++);
+ } else {
+ ++it_s;
+ }
+ }
+}
+
+void CMessageBrokerController::pushRequest(Json::Value& message,
+ WebsocketSession* ws_session) {
+ sync_primitives::AutoLock lock(mRequestListLock);
+ std::string id = message["id"].asString();
+ mRequestList.insert(
+ std::map<std::string, WebsocketSession*>::value_type(id, ws_session));
+}
- DBG_MSG(("Count of subscribers: %zu\n", mSubscribersList.size()));
- }
-
- int CMessageBrokerController::getSubscribersFd(std::string name, std::vector<WebsocketSession*>& result) {
- DBG_MSG(("CMessageBrokerRegistry::getSubscribersFd()\n"));
- int res = 0;
- std::map <std::string, WebsocketSession*>::iterator it;
-
- sync_primitives::AutoLock lock(mSubscribersListLock);
- std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name);
- if (p.first != p.second)
- {
- std::multimap <std::string, WebsocketSession*>::iterator itr;
- for (itr = p.first; itr != p.second; itr++)
- {
- result.push_back(itr->second);
- DBG_MSG(("Controllers Fd: %d\n", itr->second));
- }
+bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session,
+ std::string name) {
+ bool result = true;
+ sync_primitives::AutoLock lock(mSubscribersListLock);
+ std::pair<std::multimap<std::string, WebsocketSession*>::iterator,
+ std::multimap<std::string, WebsocketSession*>::iterator> p =
+ mSubscribersList.equal_range(name);
+ if (p.first != p.second) {
+ std::multimap<std::string, WebsocketSession*>::iterator itr;
+ for (itr = p.first; itr != p.second; itr++) {
+ if (ws_session == itr->second) {
+ result = false;
+ LOG4CXX_ERROR(mb_logger_, ("Subscriber already exists!\n"));
}
+ }
+ }
+ if (result) {
+ mSubscribersList.insert(
+ std::map<std::string, WebsocketSession*>::value_type(name, ws_session));
+ }
+ return result;
+}
- res = result.size();
- DBG_MSG(("Result vector size: %d\n", res));
- return res;
- }
-
- void CMessageBrokerController::processInternalRequest(Json::Value& message, WebsocketSession* ws_session){
- std::string method = message["method"].asString();
- std::string methodName = getMethodName(method);
- if (methodName == "registerComponent") {
- Json::Value params = message["params"];
- if (params.isMember("componentName") && params["componentName"].isString()) {
- std::string controllerName = params["componentName"].asString();
- if (addController(ws_session, controllerName)) {
- Json::Value response;
- response["id"] = message["id"];
- response["jsonrpc"] = "2.0";
- response["result"] = getNextControllerIdDiapason();
- ws_session->sendJsonMessage(response);
- } else {
- Json::Value error, err;
- error["id"] = message["id"];
- error["jsonrpc"] = "2.0";
- err["code"] = CONTROLLER_EXISTS;
- err["message"] = "Controller has been already registered.";
- error["error"] = err;
- ws_session->sendJsonMessage(error);
- }
- } else {
- Json::Value error, err;
- error["id"] = message["id"];
- error["jsonrpc"] = "2.0";
- err["code"] = INVALID_REQUEST;
- err["message"] = "Wrong method parameter.";
- error["error"] = err;
- ws_session->sendJsonMessage(error);
- }
- } else if (methodName == "subscribeTo") {
- Json::Value params = message["params"];
- if (params.isMember("propertyName") && params["propertyName"].isString()) {
- std::string propertyName = params["propertyName"].asString();
- if (addSubscriber(ws_session, propertyName)) {
- Json::Value response;
- response["id"] = message["id"];
- response["jsonrpc"] = "2.0";
- response["result"] = "OK";
- ws_session->sendJsonMessage(response);
- } else {
- Json::Value error, err;
- error["id"] = message["id"];
- error["jsonrpc"] = "2.0";
- err["code"] = CONTROLLER_EXISTS;
- err["message"] = "Subscribe has been already registered.";
- error["error"] = err;
- ws_session->sendJsonMessage(error);
- }
- } else {
- Json::Value error, err;
- error["id"] = message["id"];
- error["jsonrpc"] = "2.0";
- err["code"] = INVALID_REQUEST;
- err["message"] = "Wrong method parameter.";
- error["error"] = err;
- ws_session->sendJsonMessage(error);
- }
-
- } else if (methodName == "unregisterComponent" ) {
- Json::Value params = message["params"];
- if (params.isMember("componentName") && params["componentName"].isString()) {
- std::string controllerName = params["componentName"].asString();
- deleteController(controllerName);
- Json::Value response;
- response["id"] = message["id"];
- response["jsonrpc"] = "2.0";
- response["result"] = "OK";
- ws_session->sendJsonMessage(response);
- } else {
- Json::Value error, err;
- error["id"] = message["id"];
- error["jsonrpc"] = "2.0";
- err["code"] = INVALID_REQUEST;
- err["message"] = "Wrong method parameter.";
- error["error"] = err;
- ws_session->sendJsonMessage(error);
- }
- } else if (methodName == "unsubscribeFrom") {
- Json::Value params = message["params"];
- if (params.isMember("propertyName") && params["propertyName"].isString()) {
- std::string propertyName = params["propertyName"].asString();
- deleteSubscriber(ws_session, propertyName);
- Json::Value response;
- response["id"] = message["id"];
- response["jsonrpc"] = "2.0";
- response["result"] = "OK";
- ws_session->sendJsonMessage(response);
- } else {
- Json::Value error, err;
- error["id"] = message["id"];
- error["jsonrpc"] = "2.0";
- err["code"] = INVALID_REQUEST;
- err["message"] = "Wrong method parameter.";
- error["error"] = err;
- ws_session->sendJsonMessage(error);
- }
+void CMessageBrokerController::deleteSubscriber(WebsocketSession* ws,
+ std::string name) {
+ sync_primitives::AutoLock lock(mSubscribersListLock);
+ std::pair<std::multimap<std::string, WebsocketSession*>::iterator,
+ std::multimap<std::string, WebsocketSession*>::iterator> p =
+ mSubscribersList.equal_range(name);
+ if (p.first != p.second) {
+ std::multimap<std::string, WebsocketSession*>::iterator itr;
+ for (itr = p.first; itr != p.second;) {
+ if (ws == itr->second) {
+ mSubscribersList.erase(itr++);
} else {
-
+ ++itr;
}
- }
+ }
+ }
+}
- int CMessageBrokerController::getNextControllerIdDiapason() {
- return 1000 * mControllersIdCounter++;
- }
+int CMessageBrokerController::getSubscribersFd(
+ std::string name, std::vector<WebsocketSession*>& result) {
+ int res = 0;
+ std::map<std::string, WebsocketSession*>::iterator it;
+
+ sync_primitives::AutoLock lock(mSubscribersListLock);
+ std::pair<std::multimap<std::string, WebsocketSession*>::iterator,
+ std::multimap<std::string, WebsocketSession*>::iterator> p =
+ mSubscribersList.equal_range(name);
+ if (p.first != p.second) {
+ std::multimap<std::string, WebsocketSession*>::iterator itr;
+ for (itr = p.first; itr != p.second; itr++) {
+ result.push_back(itr->second);
+ }
+ }
+
+ res = result.size();
+ return res;
+}
+
+void CMessageBrokerController::processInternalRequest(
+ Json::Value& message, WebsocketSession* ws_session) {
+ std::string method = message["method"].asString();
+ std::string methodName = getMethodName(method);
+ if (methodName == "registerComponent") {
+ Json::Value params = message["params"];
+ if (params.isMember("componentName") &&
+ params["componentName"].isString()) {
+ std::string controllerName = params["componentName"].asString();
+ if (addController(ws_session, controllerName)) {
+ Json::Value response;
+ response["id"] = message["id"];
+ response["jsonrpc"] = "2.0";
+ response["result"] = getNextControllerId();
+ ws_session->sendJsonMessage(response);
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = CONTROLLER_EXISTS;
+ err["message"] = "Controller has been already registered.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Wrong method parameter.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else if (methodName == "subscribeTo") {
+ Json::Value params = message["params"];
+ if (params.isMember("propertyName") && params["propertyName"].isString()) {
+ std::string propertyName = params["propertyName"].asString();
+ if (addSubscriber(ws_session, propertyName)) {
+ Json::Value response;
+ response["id"] = message["id"];
+ response["jsonrpc"] = "2.0";
+ response["result"] = "OK";
+ ws_session->sendJsonMessage(response);
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = CONTROLLER_EXISTS;
+ err["message"] = "Subscribe has been already registered.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Wrong method parameter.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+
+ } else if (methodName == "unregisterComponent") {
+ Json::Value params = message["params"];
+ if (params.isMember("componentName") &&
+ params["componentName"].isString()) {
+ std::string controllerName = params["componentName"].asString();
+ deleteController(controllerName);
+ Json::Value response;
+ response["id"] = message["id"];
+ response["jsonrpc"] = "2.0";
+ response["result"] = "OK";
+ ws_session->sendJsonMessage(response);
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Wrong method parameter.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else if (methodName == "unsubscribeFrom") {
+ Json::Value params = message["params"];
+ if (params.isMember("propertyName") && params["propertyName"].isString()) {
+ std::string propertyName = params["propertyName"].asString();
+ deleteSubscriber(ws_session, propertyName);
+ Json::Value response;
+ response["id"] = message["id"];
+ response["jsonrpc"] = "2.0";
+ response["result"] = "OK";
+ ws_session->sendJsonMessage(response);
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Wrong method parameter.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else {
+ }
+}
+int CMessageBrokerController::getNextControllerId() {
+ return 1000 * mControllersIdCounter++;
+}
}
diff --git a/src/components/hmi_message_handler/src/messagebroker_adapter.cc b/src/components/hmi_message_handler/src/messagebroker_adapter.cc
index 337b3c73ca..53a0d84362 100644
--- a/src/components/hmi_message_handler/src/messagebroker_adapter.cc
+++ b/src/components/hmi_message_handler/src/messagebroker_adapter.cc
@@ -39,13 +39,13 @@ namespace hmi_message_handler {
CREATE_LOGGERPTR_GLOBAL(logger_, "HMIMessageHandler")
-typedef NsMessageBroker::CMessageBrokerController MessageBrokerController;
+typedef hmi_message_handler::CMessageBrokerController MessageBrokerController;
MessageBrokerAdapter::MessageBrokerAdapter(HMIMessageHandler* handler_param,
const std::string& server_address,
- uint16_t port, boost::asio::io_context& ioc)
+ uint16_t port)
: HMIMessageAdapterImpl(handler_param)
- , MessageBrokerController(server_address, port, "SDL", 8, ioc) {
+ , MessageBrokerController(server_address, port, "SDL", 8) {
LOG4CXX_TRACE(logger_, "Created MessageBrokerAdapter");
}
diff --git a/src/components/hmi_message_handler/src/websocket_session.cc b/src/components/hmi_message_handler/src/websocket_session.cc
index 84c786fac3..2e9ec1890b 100644
--- a/src/components/hmi_message_handler/src/websocket_session.cc
+++ b/src/components/hmi_message_handler/src/websocket_session.cc
@@ -1,332 +1,300 @@
-
#include "hmi_message_handler/websocket_session.h"
#include "hmi_message_handler/mb_controller.h"
#include <unistd.h>
using namespace boost::beast::websocket;
-namespace NsMessageBroker
-{
- WebsocketSession::WebsocketSession(boost::asio::ip::tcp::socket socket, CMessageBrokerController* controller) :
- ws_(std::move(socket)),
- strand_(ws_.get_executor()),
- controller_(controller),
- stop(false),
- m_receivingBuffer(""),
- mControllersIdStart(-1),
- mControllersIdCurrent(0),
- shutdown_(false),
- thread_delegate_(new LoopThreadDelegate(&message_queue_, this)),
- thread_(threads::CreateThread("WS Async Send", thread_delegate_)){
- thread_->start(threads::ThreadOptions());
- }
-
- WebsocketSession::~WebsocketSession(){}
-
- void WebsocketSession::Accept() {
- ws_.async_accept(
- boost::asio::bind_executor(
- strand_,
- std::bind(
- &WebsocketSession::Recv,
- shared_from_this(), std::placeholders::_1)));
- }
-
- void WebsocketSession::Shutdown() {
- shutdown_ = true;
- thread_delegate_->SetShutdown();
- thread_->join();
- delete thread_delegate_;
- threads::DeleteThread(thread_);
- }
-
- bool WebsocketSession::IsShuttingDown() {
- return shutdown_;
- }
-
- void WebsocketSession::Recv(boost::system::error_code ec) {
- if(shutdown_) {
- return;
- }
-
- if(ec){
- std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n";
- shutdown_ = true;
- thread_delegate_->SetShutdown();
- controller_->deleteController(this);
- return;
-
- }
-
- ws_.async_read(
- buffer_,
- boost::asio::bind_executor(
- strand_,
- std::bind(
- &WebsocketSession::Read,
- shared_from_this(),
- std::placeholders::_1,
- std::placeholders::_2
- )
- )
- );
- }
-
- void WebsocketSession::Send(std::string& message, Json::Value& json_message) {
- if (shutdown_) {
- return;
- }
- std::shared_ptr<std::string> message_ptr = std::make_shared<std::string>( message );
- message_queue_.push(message_ptr);
- }
-
- void WebsocketSession::sendJsonMessage(Json::Value& message) {
- std::string str_msg = m_writer.write(message);
- sync_primitives::AutoLock auto_lock(queue_lock_);
- if (!isNotification(message) && !isResponse(message)) {
- mWaitResponseQueue.insert(std::map<std::string, std::string>::value_type(message["id"].asString(), message["method"].asString()));
- }
-
- Send(str_msg, message);
- }
-
- void WebsocketSession::Read(boost::system::error_code ec, std::size_t bytes_transferred) {
- boost::ignore_unused(bytes_transferred);
- if(ec){
- std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n";
- shutdown_ = true;
- thread_delegate_->SetShutdown();
- controller_->deleteController(this);
- buffer_.consume(buffer_.size());
- return;
- }
-
- std::string data = boost::beast::buffers_to_string(buffer_.data());
- m_receivingBuffer += data;
-
- Json::Value root;
- if (!m_reader.parse(m_receivingBuffer, root))
- {
- std::cerr << "Invalid JSON Message" << ": " << data << "\n";
- return;
- }
-
- std::string wmes = m_receiverWriter.write(root);
- ssize_t beginpos = m_receivingBuffer.find(wmes);
- if (-1 != beginpos)
- {
- m_receivingBuffer.erase(0, beginpos + wmes.length());
- DBG_MSG(("Buffer after cut is:%s\n", m_receivingBuffer.c_str()));
- } else
- {
- m_receivingBuffer.clear();
- }
- onMessageReceived(root);
-
-
- buffer_.consume(buffer_.size());
-
- Recv(ec);
-
- }
-
- std::string WebsocketSession::GetComponentName(std::string& method) {
- std::string return_string="";
- if(method != "") {
- int position = method.find(".");
- if(position != -1) {
- return_string = method.substr(0, position);
- }
- }
- return return_string;
- }
-
- void WebsocketSession::onMessageReceived(Json::Value message) {
- // Determine message type and process...
- Json::Value error;
- if (checkMessage(message, error)) {
- if (isNotification(message)) {
- DBG_MSG(("Message is notification!\n"));
- controller_->processNotification(message);
- } else if (isResponse(message)) {
- std::string id = message["id"].asString();
- std::string method = findMethodById(id);
- DBG_MSG(("Message is response on: %s\n", method.c_str()));
- if ("" != method) {
- if ("MB.registerComponent" == method) {
- if (message.isMember("result") && message["result"].isInt())
- {
- mControllersIdStart = message["result"].asInt();
- } else
- {
- DBG_MSG_ERROR(("Not possible to initialize mControllersIdStart!\n"));
- }
- } else if ("MB.subscribeTo" == method || "MB.unregisterComponent" == method || "MB.unsubscribeFrom" == method)
- {
- //nothing to do for now
- } else
- {
- controller_->processResponse(method, message);
- }
- } else {
- DBG_MSG_ERROR(("Request with id %s has not been found!\n", id.c_str()));
- }
- } else {
- DBG_MSG(("Message is request!\n"));
- std::string method = message["method"].asString();
- std::string component_name = GetComponentName(method);
-
- if(component_name == "MB") {
- controller_->processInternalRequest(message, this);
- } else {
- controller_->pushRequest(message, this);
- controller_->processRequest(message);
- }
- }
+namespace hmi_message_handler {
+
+WebsocketSession::WebsocketSession(boost::asio::ip::tcp::socket socket,
+ CMessageBrokerController* controller)
+ : ws_(std::move(socket))
+ , strand_(ws_.get_executor())
+ , controller_(controller)
+ , stop(false)
+ , m_receivingBuffer("")
+ , mControllersIdStart(-1)
+ , mControllersIdCurrent(0)
+ , shutdown_(false)
+ , thread_delegate_(new LoopThreadDelegate(&message_queue_, this))
+ , thread_(threads::CreateThread("WS Async Send", thread_delegate_)) {
+ thread_->start(threads::ThreadOptions());
+}
+
+WebsocketSession::~WebsocketSession() {}
+
+void WebsocketSession::Accept() {
+ ws_.async_accept(boost::asio::bind_executor(
+ strand_,
+ std::bind(
+ &WebsocketSession::Recv, shared_from_this(), std::placeholders::_1)));
+}
+
+void WebsocketSession::Shutdown() {
+ shutdown_ = true;
+ thread_delegate_->SetShutdown();
+ thread_->join();
+ delete thread_delegate_;
+ threads::DeleteThread(thread_);
+}
+
+bool WebsocketSession::IsShuttingDown() {
+ return shutdown_;
+}
+
+void WebsocketSession::Recv(boost::system::error_code ec) {
+ if (shutdown_) {
+ return;
+ }
+
+ if (ec) {
+ std::string str_err = "ErrorMessage: " + ec.message();
+ LOG4CXX_ERROR(ws_logger_, str_err);
+ shutdown_ = true;
+ thread_delegate_->SetShutdown();
+ controller_->deleteController(this);
+ return;
+ }
+
+ ws_.async_read(buffer_,
+ boost::asio::bind_executor(strand_,
+ std::bind(&WebsocketSession::Read,
+ shared_from_this(),
+ std::placeholders::_1,
+ std::placeholders::_2)));
+}
+
+void WebsocketSession::Send(std::string& message, Json::Value& json_message) {
+ if (shutdown_) {
+ return;
+ }
+ std::shared_ptr<std::string> message_ptr =
+ std::make_shared<std::string>(message);
+ message_queue_.push(message_ptr);
+}
+
+void WebsocketSession::sendJsonMessage(Json::Value& message) {
+ std::string str_msg = m_writer.write(message);
+ sync_primitives::AutoLock auto_lock(queue_lock_);
+ if (!isNotification(message) && !isResponse(message)) {
+ mWaitResponseQueue.insert(std::map<std::string, std::string>::value_type(
+ message["id"].asString(), message["method"].asString()));
+ }
+
+ Send(str_msg, message);
+}
+
+void WebsocketSession::Read(boost::system::error_code ec,
+ std::size_t bytes_transferred) {
+ boost::ignore_unused(bytes_transferred);
+ if (ec) {
+ std::string str_err = "ErrorMessage: " + ec.message();
+ LOG4CXX_ERROR(ws_logger_, str_err);
+ shutdown_ = true;
+ thread_delegate_->SetShutdown();
+ controller_->deleteController(this);
+ buffer_.consume(buffer_.size());
+ return;
+ }
+
+ std::string data = boost::beast::buffers_to_string(buffer_.data());
+ m_receivingBuffer += data;
+
+ Json::Value root;
+ if (!m_reader.parse(m_receivingBuffer, root)) {
+ std::string str_err = "Invalid JSON Message: " + data;
+ LOG4CXX_ERROR(ws_logger_, str_err);
+ return;
+ }
+
+ std::string wmes = m_receiverWriter.write(root);
+ ssize_t beginpos = m_receivingBuffer.find(wmes);
+ if (-1 != beginpos) {
+ m_receivingBuffer.erase(0, beginpos + wmes.length());
+ } else {
+ m_receivingBuffer.clear();
+ }
+ onMessageReceived(root);
+
+ buffer_.consume(buffer_.size());
+
+ Recv(ec);
+}
+
+std::string WebsocketSession::GetComponentName(std::string& method) {
+ std::string return_string = "";
+ if (method != "") {
+ int position = method.find(".");
+ if (position != -1) {
+ return_string = method.substr(0, position);
+ }
+ }
+ return return_string;
+}
+
+void WebsocketSession::onMessageReceived(Json::Value message) {
+ // Determine message type and process...
+ Json::Value error;
+ if (checkMessage(message, error)) {
+ if (isNotification(message)) {
+ controller_->processNotification(message);
+ } else if (isResponse(message)) {
+ std::string id = message["id"].asString();
+ std::string method = findMethodById(id);
+ if ("" != method) {
+ if ("MB.registerComponent" == method) {
+ if (message.isMember("result") && message["result"].isInt()) {
+ mControllersIdStart = message["result"].asInt();
+ } else {
+ LOG4CXX_ERROR(ws_logger_,
+ "Not possible to initialize mControllersIdStart!");
+ }
+ } else if ("MB.subscribeTo" == method ||
+ "MB.unregisterComponent" == method ||
+ "MB.unsubscribeFrom" == method) {
+ // nothing to do for now
+ } else {
+ controller_->processResponse(method, message);
+ }
} else {
- DBG_MSG_ERROR(("Message contains wrong data!\n"));
- sendJsonMessage(error);
- }
- }
-
- bool WebsocketSession::isNotification(Json::Value& root) {
- DBG_MSG(("CMessageBrokerController::isNotification()\n"));
- bool ret = false;
- if (false == root.isMember("id"))
- {
- ret = true;
+ LOG4CXX_ERROR(ws_logger_,
+ "Request with id: " + id + " has not been found!");
}
- DBG_MSG(("Result: %d\n", ret));
- return ret;
- }
-
- bool WebsocketSession::isResponse(Json::Value& root) {
- DBG_MSG(("CMessageBrokerController::isResponse()\n"));
- bool ret = false;
- if ((true == root.isMember("result")) || (true == root.isMember("error")))
- {
- ret = true;
- }
- DBG_MSG(("Result: %d\n", ret));
- return ret;
- }
-
- std::string WebsocketSession::findMethodById(std::string id) {
- DBG_MSG(("CMessageBrokerController::findMethodById()\n"));
- sync_primitives::AutoLock auto_lock(queue_lock_);
- std::string res = "";
- std::map <std::string, std::string>::iterator it;
- it = mWaitResponseQueue.find(id);
- if (it != mWaitResponseQueue.end())
- {
- res = (*it).second;
- mWaitResponseQueue.erase(it);
- }
- return res;
- }
-
- bool WebsocketSession::checkMessage(Json::Value& root, Json::Value& error)
- {
- DBG_MSG(("CMessageBrokerController::checkMessage()\n"));
- Json::Value err;
-
- try
- {
- /* check the JSON-RPC version => 2.0 */
- if (!root.isObject() || !root.isMember("jsonrpc") || root["jsonrpc"] != "2.0")
- {
- error["id"] = Json::Value::null;
- error["jsonrpc"] = "2.0";
- err["code"] = NsMessageBroker::INVALID_REQUEST;
- err["message"] = "Invalid MessageBroker request.";
- error["error"] = err;
- return false;
- }
-
- if (root.isMember("id") && (root["id"].isArray() || root["id"].isObject()))
- {
- error["id"] = Json::Value::null;
- error["jsonrpc"] = "2.0";
- err["code"] = NsMessageBroker::INVALID_REQUEST;
- err["message"] = "Invalid MessageBroker request.";
- error["error"] = err;
- return false;
- }
-
- if (root.isMember("result") && root.isMember("error"))
- {
- /* message can't contain simultaneously result and error*/
- return false;
- }
-
- if (root.isMember("method"))
- {
- if (!root["method"].isString())
- {
- error["id"] = Json::Value::null;
- error["jsonrpc"] = "2.0";
- err["code"] = NsMessageBroker::INVALID_REQUEST;
- err["message"] = "Invalid MessageBroker request.";
- error["error"] = err;
- return false;
- }
- /* Check the params is an object*/
- if (root.isMember("params") && !root["params"].isObject())
- {
- error["id"] = Json::Value::null;
- error["jsonrpc"] = "2.0";
- err["code"] = INVALID_REQUEST;
- err["message"] = "Invalid JSONRPC params.";
- error["error"] = err;
- return false;
- }
- } else if (!root.isMember("result") && !root.isMember("error"))
- {
- return false;
- }
- return true;
- } catch (...)
- {
- DBG_MSG_ERROR(("CMessageBrokerController::checkMessage() EXCEPTION has been caught!\n"));
- return false;
- }
- }
-
- WebsocketSession::LoopThreadDelegate::LoopThreadDelegate(
- MessageQueue<Message, AsyncQueue>* message_queue,
- WebsocketSession* handler)
- : message_queue_(*message_queue),
- handler_(*handler),
- shutdown_(false) {
- }
-
- void WebsocketSession::LoopThreadDelegate::threadMain() {
- while(!message_queue_.IsShuttingDown() && !shutdown_) {
- DrainQueue();
- message_queue_.wait();
- }
- DrainQueue();
- }
+ } else {
+ std::string method = message["method"].asString();
+ std::string component_name = GetComponentName(method);
- void WebsocketSession::LoopThreadDelegate::exitThreadMain() {
- shutdown_= true;
- if(!message_queue_.IsShuttingDown()) {
- message_queue_.Shutdown();
+ if (component_name == "MB") {
+ controller_->processInternalRequest(message, this);
+ } else {
+ controller_->pushRequest(message, this);
+ controller_->processRequest(message);
}
- }
-
- void WebsocketSession::LoopThreadDelegate::DrainQueue() {
- while(!message_queue_.empty()) {
- Message message_ptr;
- message_queue_.pop(message_ptr);
- if(!shutdown_) {
- handler_.ws_.write(boost::asio::buffer(*message_ptr)) ;
- };
+ }
+ } else {
+ LOG4CXX_ERROR(ws_logger_, "Message contains wrong data!\n");
+ sendJsonMessage(error);
+ }
+}
+
+bool WebsocketSession::isNotification(Json::Value& root) {
+ bool ret = false;
+ if (false == root.isMember("id")) {
+ ret = true;
+ }
+ return ret;
+}
+
+bool WebsocketSession::isResponse(Json::Value& root) {
+ bool ret = false;
+ if ((true == root.isMember("result")) || (true == root.isMember("error"))) {
+ ret = true;
+ }
+ return ret;
+}
+
+std::string WebsocketSession::findMethodById(std::string id) {
+ sync_primitives::AutoLock auto_lock(queue_lock_);
+ std::string res = "";
+ std::map<std::string, std::string>::iterator it;
+ it = mWaitResponseQueue.find(id);
+ if (it != mWaitResponseQueue.end()) {
+ res = (*it).second;
+ mWaitResponseQueue.erase(it);
+ }
+ return res;
+}
+
+bool WebsocketSession::checkMessage(Json::Value& root, Json::Value& error) {
+ Json::Value err;
+ try {
+ /* check the JSON-RPC version => 2.0 */
+ if (!root.isObject() || !root.isMember("jsonrpc") ||
+ root["jsonrpc"] != "2.0") {
+ error["id"] = Json::Value::null;
+ error["jsonrpc"] = "2.0";
+ err["code"] = hmi_message_handler::INVALID_REQUEST;
+ err["message"] = "Invalid MessageBroker request.";
+ error["error"] = err;
+ return false;
+ }
+
+ if (root.isMember("id") &&
+ (root["id"].isArray() || root["id"].isObject())) {
+ error["id"] = Json::Value::null;
+ error["jsonrpc"] = "2.0";
+ err["code"] = hmi_message_handler::INVALID_REQUEST;
+ err["message"] = "Invalid MessageBroker request.";
+ error["error"] = err;
+ return false;
+ }
+
+ if (root.isMember("result") && root.isMember("error")) {
+ /* message can't contain simultaneously result and error*/
+ return false;
+ }
+
+ if (root.isMember("method")) {
+ if (!root["method"].isString()) {
+ error["id"] = Json::Value::null;
+ error["jsonrpc"] = "2.0";
+ err["code"] = hmi_message_handler::INVALID_REQUEST;
+ err["message"] = "Invalid MessageBroker request.";
+ error["error"] = err;
+ return false;
}
- }
-
- void WebsocketSession::LoopThreadDelegate::SetShutdown(){
- shutdown_ = true;
- if(!message_queue_.IsShuttingDown()) {
- message_queue_.Shutdown();
+ /* Check the params is an object*/
+ if (root.isMember("params") && !root["params"].isObject()) {
+ error["id"] = Json::Value::null;
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Invalid JSONRPC params.";
+ error["error"] = err;
+ return false;
}
- }
+ } else if (!root.isMember("result") && !root.isMember("error")) {
+ return false;
+ }
+ return true;
+ } catch (...) {
+ LOG4CXX_ERROR(
+ ws_logger_,
+ "CMessageBrokerController::checkMessage() EXCEPTION has been caught!");
+ return false;
+ }
+}
+
+WebsocketSession::LoopThreadDelegate::LoopThreadDelegate(
+ MessageQueue<Message, AsyncQueue>* message_queue, WebsocketSession* handler)
+ : message_queue_(*message_queue), handler_(*handler), shutdown_(false) {}
+
+void WebsocketSession::LoopThreadDelegate::threadMain() {
+ while (!message_queue_.IsShuttingDown() && !shutdown_) {
+ DrainQueue();
+ message_queue_.wait();
+ }
+ DrainQueue();
+}
+
+void WebsocketSession::LoopThreadDelegate::exitThreadMain() {
+ shutdown_ = true;
+ if (!message_queue_.IsShuttingDown()) {
+ message_queue_.Shutdown();
+ }
+}
+
+void WebsocketSession::LoopThreadDelegate::DrainQueue() {
+ while (!message_queue_.empty()) {
+ Message message_ptr;
+ message_queue_.pop(message_ptr);
+ if (!shutdown_) {
+ handler_.ws_.write(boost::asio::buffer(*message_ptr));
+ };
+ }
+}
+
+void WebsocketSession::LoopThreadDelegate::SetShutdown() {
+ shutdown_ = true;
+ if (!message_queue_.IsShuttingDown()) {
+ message_queue_.Shutdown();
+ }
+}
} \ No newline at end of file
diff --git a/src/components/hmi_message_handler/test/CMakeLists.txt b/src/components/hmi_message_handler/test/CMakeLists.txt
index 9d72f3d15e..6d4041e0bf 100644
--- a/src/components/hmi_message_handler/test/CMakeLists.txt
+++ b/src/components/hmi_message_handler/test/CMakeLists.txt
@@ -37,10 +37,6 @@ include_directories (
${COMPONENTS_DIR}/hmi_message_handler/test/include
)
-if (HMIADAPTER STREQUAL "messagebroker")
- add_dependencies("hmi_message_handler_test" Boost)
-endif()
-
set(EXCLUDE_PATHS)
set(LIBRARIES
@@ -61,4 +57,9 @@ endif()
collect_sources(SOURCES "${CMAKE_CURRENT_SOURCE_DIR}" "${EXCLUDE_PATHS}")
+if (HMIADAPTER STREQUAL "messagebroker")
+ GET_PROPERTY(BOOST_LIBS_DIRECTORY GLOBAL PROPERTY GLOBAL_BOOST_LIBS)
+ list(APPEND LIBRARIES ${BOOST_LIBS_DIRECTORY}/libboost_system.so)
+endif()
+
create_test(hmi_message_handler_test "${SOURCES}" "${LIBRARIES}")
diff --git a/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc b/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc
index bf44c3e2d1..fd459ea094 100644
--- a/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc
+++ b/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc
@@ -52,12 +52,12 @@ class HMIMessageHandlerImplTest : public ::testing::Test {
: mb_adapter_(NULL)
, hmi_handler_(NULL)
, mock_hmi_message_observer_(NULL) {}
- boost::asio::io_context ioc_;
+
protected:
hmi_message_handler::MessageBrokerAdapter* mb_adapter_;
hmi_message_handler::HMIMessageHandlerImpl* hmi_handler_;
MockHMIMessageObserver* mock_hmi_message_observer_;
-
+
testing::NiceMock<MockHMIMessageHandlerSettings>
mock_hmi_message_handler_settings;
const uint64_t stack_size = 1000u;
@@ -69,7 +69,7 @@ class HMIMessageHandlerImplTest : public ::testing::Test {
mock_hmi_message_handler_settings);
ASSERT_TRUE(NULL != hmi_handler_);
mb_adapter_ = new hmi_message_handler::MessageBrokerAdapter(
- hmi_handler_, "127.0.0.1", 8087, ioc_);
+ hmi_handler_, "127.0.0.1", 8087);
ASSERT_TRUE(NULL != mb_adapter_);
mock_hmi_message_observer_ = new MockHMIMessageObserver();
ASSERT_TRUE(NULL != mock_hmi_message_observer_);
@@ -125,7 +125,7 @@ TEST_F(HMIMessageHandlerImplTest,
AddHMIMessageAdapter_AddExistedAdapter_ExpectAdded) {
// Check before action
EXPECT_TRUE(hmi_handler_->message_adapters().empty());
- // Act
+ // Act
hmi_handler_->AddHMIMessageAdapter(mb_adapter_);
// Check after action
EXPECT_EQ(1u, hmi_handler_->message_adapters().size());