summaryrefslogtreecommitdiff
path: root/src/components
diff options
context:
space:
mode:
authorJackLivio <jack@livio.io>2018-02-02 12:00:25 -0500
committerJackLivio <jack@livio.io>2018-02-02 12:00:25 -0500
commitb12728724da99ff832ee57af5c105910732e8760 (patch)
tree68f8d4d4328dbd288649e3ba5a543544abda2de5 /src/components
parentdfe8b5ef72fc97c8c60b85006704e4d42d97d34e (diff)
downloadsdl_core-b12728724da99ff832ee57af5c105910732e8760.tar.gz
Add Boost Websocket HMI Adapter
Repleaces old message broker with new boost::beast websocket library. No changes required for setup or connecting with HMI.
Diffstat (limited to 'src/components')
-rw-r--r--src/components/hmi_message_handler/CMakeLists.txt12
-rw-r--r--src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h162
-rw-r--r--src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h5
-rw-r--r--src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h184
-rw-r--r--src/components/hmi_message_handler/src/mb_controller.cc472
-rw-r--r--src/components/hmi_message_handler/src/messagebroker_adapter.cc5
-rw-r--r--src/components/hmi_message_handler/src/websocket_session.cc334
-rw-r--r--src/components/hmi_message_handler/test/CMakeLists.txt5
-rw-r--r--src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc7
9 files changed, 1175 insertions, 11 deletions
diff --git a/src/components/hmi_message_handler/CMakeLists.txt b/src/components/hmi_message_handler/CMakeLists.txt
index c1dfca5e67..4d799b92dd 100644
--- a/src/components/hmi_message_handler/CMakeLists.txt
+++ b/src/components/hmi_message_handler/CMakeLists.txt
@@ -46,13 +46,19 @@ include_directories (
${COMPONENTS_DIR}/dbus/include/
${CMAKE_SOURCE_DIR}/
${LOG4CXX_INCLUDE_DIRECTORY}
+ ${COMPONENTS_DIR}/hmi_message_handler/include
+ ${COMPONENTS_DIR}/hmi_message_handler/src
+ ${BOOST_INCLUDE_DIR}
)
set(PATHS
${CMAKE_CURRENT_SOURCE_DIR}/include
${CMAKE_CURRENT_SOURCE_DIR}/src
+
)
+message(${PATHS})
+
if (HMIADAPTER STREQUAL "dbus")
set(EXCLUDE_PATHS)
set(DBUS_ADAPTER DBus)
@@ -73,7 +79,13 @@ set(LIBRARIES
${RTLIB}
)
+
add_library("HMIMessageHandler" ${SOURCES})
+
+if (HMIADAPTER STREQUAL "messagebroker")
+ add_dependencies("HMIMessageHandler" Boost)
+endif()
+
target_link_libraries("HMIMessageHandler" ${LIBRARIES})
if(ENABLE_LOG)
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
new file mode 100644
index 0000000000..a8ec3f88bb
--- /dev/null
+++ b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
@@ -0,0 +1,162 @@
+#ifndef MB_CONTROLLER_H
+#define MB_CONTROLLER_H
+
+#include <iostream>
+#include <boost/beast/core.hpp>
+#include <boost/beast/websocket.hpp>
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/asio/placeholders.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp>
+#include <algorithm>
+#include <cstdlib>
+#include <functional>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+#include <map>
+#include "json/json.h"
+
+#include "utils/lock.h"
+#include "utils/atomic_object.h"
+
+using namespace boost::beast::websocket;
+
+#ifdef DEBUG_ON
+/**
+* \def DBG_MSG
+* \brief Debug message output with file name and line number.
+* \param x formatted debug message.
+* \return printf construction.
+*/
+#define DBG_MSG(x) printf("%s:%d ", __FILE__, __LINE__);\
+ printf x
+#else
+#define DBG_MSG(x)
+#endif
+
+#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d ", __FILE__, __LINE__);\
+ printf x
+
+
+namespace NsMessageBroker {
+
+ enum ErrorCode {
+ CONTROLLER_EXISTS = -32000,
+ SUBSCRIBTION_EXISTS = -32001,
+ INVALID_REQUEST = -32600
+ };
+
+ class WebsocketSession;
+
+ class CMessageBrokerController : public std::enable_shared_from_this<CMessageBrokerController> {
+ public:
+ CMessageBrokerController(const std::string& address, uint16_t port, std::string name, int num_ports, boost::asio::io_context& ioc);
+
+ ~CMessageBrokerController();
+
+ bool StartListener();
+
+ bool Run();
+
+ void WaitForConnection();
+
+ void StartSession(boost::system::error_code ec);
+
+ void OnAccept(boost::system::error_code ec, boost::asio::strand<boost::asio::io_context::executor_type>& strand, stream<boost::asio::ip::tcp::socket>& ws);
+
+ void OnRead(boost::system::error_code ec, std::size_t bytes_transferred);
+
+ bool isNotification(Json::Value& message);
+
+ void sendNotification(Json::Value& message);
+
+ bool isResponse(Json::Value& message);
+
+ void sendResponse(Json::Value& message);
+
+ void sendJsonMessage(Json::Value& message);
+
+ void subscribeTo(std::string property);
+
+ void registerController(int id = 0);
+
+ void unregisterController();
+
+ void* MethodForReceiverThread(void * arg);
+
+ bool Connect();
+
+ void exitReceivingThread();
+
+ virtual void processResponse(std::string method, Json::Value& root) = 0;
+
+ virtual void processRequest(Json::Value& root) = 0;
+
+ virtual void processNotification(Json::Value& root) = 0;
+
+ std::string getMethodName(std::string& method);
+
+ std::string GetComponentName(std::string& method);
+
+ void processInternalRequest(Json::Value& message, WebsocketSession* ws_session );
+
+ void pushRequest(Json::Value& message, WebsocketSession* ws_session);
+
+ //Registry
+ bool addController(WebsocketSession* ws_session, std::string name);
+
+ void deleteController(WebsocketSession* ws_session);
+
+ void deleteController(std::string name);
+
+ void removeSubscribersBySession(const WebsocketSession* ws);
+
+ bool addSubscriber(WebsocketSession* ws_session, std::string name);
+
+ void deleteSubscriber(WebsocketSession* ws, std::string name);
+
+ int getSubscribersFd(std::string name, std::vector<WebsocketSession*>& result);
+
+ int getNextControllerIdDiapason();
+
+ private:
+ boost::asio::io_context ioc_;
+ const std::string& address_;
+ uint16_t port_;
+ std::string name_;
+ int num_threads_;
+ std::vector<std::thread> thread_vector_;
+
+ boost::asio::ip::tcp::acceptor acceptor_;
+ boost::asio::ip::tcp::socket socket_;
+ boost::beast::multi_buffer buffer_;
+ boost::asio::ip::tcp::endpoint endpoint_;
+
+ int mControllersIdCounter;
+
+ //Registry
+ std::map <std::string, WebsocketSession*> mControllersList;
+ sync_primitives::Lock mControllersListLock;
+
+ std::multimap <std::string, WebsocketSession*> mSubscribersList;
+ sync_primitives::Lock mSubscribersListLock;
+
+ std::map <std::string, WebsocketSession*> mRequestList;
+ sync_primitives::Lock mRequestListLock;
+
+ std::atomic_bool shutdown_;
+
+
+ };
+
+} //NsMessageBroker
+
+
+
+
+#endif /* MB_CONTROLLER_H */ \ No newline at end of file
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h b/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h
index f582cb2b81..81ab823e1c 100644
--- a/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h
+++ b/src/components/hmi_message_handler/include/hmi_message_handler/messagebroker_adapter.h
@@ -35,7 +35,7 @@
#include <string>
-#include "mb_controller.hpp"
+#include "hmi_message_handler/mb_controller.h"
#include "hmi_message_handler/hmi_message_adapter_impl.h"
#include "utils/threads/thread_validator.h"
@@ -47,7 +47,8 @@ class MessageBrokerAdapter : public HMIMessageAdapterImpl,
public:
MessageBrokerAdapter(HMIMessageHandler* handler_param,
const std::string& server_address,
- uint16_t port);
+ uint16_t port,
+ boost::asio::io_context& ioc);
~MessageBrokerAdapter();
void SendMessageToHMI(MessageSharedPointer message);
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h b/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h
new file mode 100644
index 0000000000..d8a79e7ef7
--- /dev/null
+++ b/src/components/hmi_message_handler/include/hmi_message_handler/websocket_session.h
@@ -0,0 +1,184 @@
+#ifndef WEBSOCKET_SESSION_H
+#define WEBSOCKET_SESSION_H
+
+#include <iostream>
+#include <boost/beast/core.hpp>
+#include <boost/beast/websocket.hpp>
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/asio/placeholders.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp>
+#include <algorithm>
+#include <cstdlib>
+#include <functional>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+#include <mutex>
+#include <queue>
+#include "json/json.h"
+
+#include "utils/lock.h"
+#include "utils/atomic_object.h"
+#include "utils/threads/thread.h"
+#include "utils/threads/message_loop_thread.h"
+#include "utils/message_queue.h"
+
+//#include "hmi_message_handler/mb_controller.h"
+
+using namespace boost::beast::websocket;
+using ::utils::MessageQueue;
+
+#ifdef DEBUG_ON
+/**
+* \def DBG_MSG
+* \brief Debug message output with file name and line number.
+* \param x formatted debug message.
+* \return printf construction.
+*/
+#define DBG_MSG(x) printf("%s:%d ", __FILE__, __LINE__);\
+ printf x
+#else
+#define DBG_MSG(x)
+#endif
+
+#define DBG_MSG_ERROR(x) printf("ERROR!!! %s:%d ", __FILE__, __LINE__);\
+ printf x
+
+typedef std::queue<std::shared_ptr<std::string>> AsyncQueue;
+typedef std::shared_ptr<std::string> Message;
+
+
+namespace NsMessageBroker {
+
+ class CMessageBrokerController;
+
+ class WebsocketSession :
+ public std::enable_shared_from_this<WebsocketSession> {
+
+ boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws_;
+ boost::asio::strand<boost::asio::io_context::executor_type> strand_;
+ boost::beast::multi_buffer buffer_;
+ boost::beast::multi_buffer send_buffer_;
+ CMessageBrokerController* controller_;
+
+ public:
+ WebsocketSession(boost::asio::ip::tcp::socket socket, CMessageBrokerController* controller);
+
+ ~WebsocketSession();
+
+ void Accept();
+
+ void Close();
+
+ void Shutdown();
+
+ bool IsShuttingDown();
+
+ void Recv(boost::system::error_code ec);
+
+ void Send(std::string& message, Json::Value& json_message);
+
+ void SendFromQueue();
+
+ void sendJsonMessage(Json::Value& message);
+
+ void OnWrite(boost::system::error_code ec, std::size_t bytes_transferred, std::shared_ptr<std::string> message);
+
+ void Read(boost::system::error_code ec, std::size_t bytes_transferred);
+
+ int getNextMessageId();
+
+ void prepareMessage(Json::Value& root);
+
+ void prepareErrorMessage(int errCode, std::string errMessage, Json::Value& error);
+
+ std::string getDestinationComponentName(Json::Value& root);
+
+ bool isNotification(Json::Value& root);
+
+ bool isResponse(Json::Value& root);
+
+ std::string findMethodById(std::string id);
+
+ void registerController(int id = 0);
+
+ void unregisterController();
+
+ void subscribeTo(std::string property);
+
+ void unsubscribeFrom(std::string property);
+
+ bool checkMessage(Json::Value& root, Json::Value& error);
+
+ std::string getControllersName();
+
+ std::string GetComponentName(std::string& method);
+
+ protected:
+
+ sync_primitives::atomic_bool stop;
+
+ private:
+ void onMessageReceived(Json::Value message);
+
+ std::map<std::string, std::string> mWaitResponseQueue;
+
+ std::string m_receivingBuffer;
+
+ int mControllersIdStart;
+
+ int mControllersIdCurrent;
+
+ Json::Reader m_reader;
+ Json::FastWriter m_writer;
+ Json::FastWriter m_receiverWriter;
+
+
+ sync_primitives::Lock queue_lock_;
+ sync_primitives::Lock message_queue_lock_;
+ std::atomic_bool shutdown_;
+
+
+ MessageQueue<Message, AsyncQueue> message_queue_;
+
+ class LoopThreadDelegate : public threads::ThreadDelegate {
+ public:
+ LoopThreadDelegate(MessageQueue<Message, AsyncQueue>* message_queue,
+ WebsocketSession* handler);
+
+ virtual void threadMain() OVERRIDE;
+ virtual void exitThreadMain() OVERRIDE;
+
+ void OnWrite();
+
+ void SetShutdown();
+
+ private:
+ void DrainQueue();
+ MessageQueue<Message, AsyncQueue>& message_queue_;
+ WebsocketSession& handler_;
+ sync_primitives::Lock queue_lock_;
+ sync_primitives::ConditionalVariable queue_new_items_;
+ std::atomic_bool write_pending_;
+ std::atomic_bool shutdown_;
+
+ sync_primitives::Lock write_lock_;
+
+ };
+
+ LoopThreadDelegate* thread_delegate_;
+ threads::Thread* thread_;
+
+ };
+
+} //NsMessageBroker
+
+
+
+
+#endif /* WEBSOCKET_SESSION_H */ \ No newline at end of file
diff --git a/src/components/hmi_message_handler/src/mb_controller.cc b/src/components/hmi_message_handler/src/mb_controller.cc
new file mode 100644
index 0000000000..d7fb432df2
--- /dev/null
+++ b/src/components/hmi_message_handler/src/mb_controller.cc
@@ -0,0 +1,472 @@
+
+#include "hmi_message_handler/mb_controller.h"
+#include "websocket_session.cc"
+
+using namespace boost::beast::websocket;
+namespace NsMessageBroker
+{
+ CMessageBrokerController::CMessageBrokerController(
+ const std::string& address, uint16_t port, std::string name,
+ int num_threads, boost::asio::io_context& ioc):address_(address), acceptor_(ioc_), socket_(ioc_), mControllersIdCounter(1)
+ {
+
+ port_ = port;
+ name_ = name;
+ num_threads_ = num_threads;
+ endpoint_ = {boost::asio::ip::make_address(address), static_cast<unsigned short>(port)};
+ shutdown_ = false;
+
+ }
+
+ CMessageBrokerController::~CMessageBrokerController()
+ {
+ boost::system::error_code ec;
+ socket_.close();
+ acceptor_.close(ec);
+ if(ec) {
+ std::cerr << "ErrorMessage Close: " << ": " << ec.message() << "\n";
+ }
+ shutdown_ = true;
+ ioc_.stop();
+ }
+
+ bool CMessageBrokerController::StartListener() {
+ boost::system::error_code error;
+ acceptor_.open(endpoint_.protocol(), error);
+ if(error) {
+ std::cerr << "ErrorOpen: " << ": " << error.message() << "\n";
+ return false;
+ }
+
+ acceptor_.set_option(boost::asio::socket_base::reuse_address(true), error);
+ if(error) {
+ std::cerr << "ErrorSetOption: " << ": " << error.message() << "\n";
+ return false;
+ }
+ acceptor_.bind(endpoint_, error);
+ if(error) {
+ std::cerr << "ErrorBind: " << ": " << error.message() << "\n";
+ return false;
+ }
+ acceptor_.listen(boost::asio::socket_base::max_listen_connections, error);
+ if(error) {
+ std::cerr << "ErrorListen: " << ": " << error.message() << "\n";
+ return false;
+ }
+ return true;
+ }
+
+ bool CMessageBrokerController::Run(){
+ if(acceptor_.is_open() && !shutdown_) {
+ acceptor_.async_accept(
+ socket_,
+ std::bind(
+ &CMessageBrokerController::StartSession,
+ this,
+ std::placeholders::_1
+ ));
+ ioc_.run();
+ return true;
+
+ }
+ return false;
+ }
+
+ void CMessageBrokerController::WaitForConnection() {
+ if(acceptor_.is_open() && !shutdown_) {
+ acceptor_.async_accept(
+ socket_,
+ std::bind(
+ &CMessageBrokerController::StartSession,
+ this,
+ std::placeholders::_1
+ )
+ );
+ }
+ }
+
+ void CMessageBrokerController::StartSession(boost::system::error_code ec){
+ if(ec){
+ std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n";
+ ioc_.stop();
+ return;
+ }
+ if(shutdown_) {
+ return;
+ }
+ std::make_shared<WebsocketSession>(std::move(socket_), this)->Accept();
+ WaitForConnection();
+
+ }
+
+
+ bool CMessageBrokerController::isNotification(Json::Value& message) {
+ DBG_MSG(("CMessageBroker::isNotification()\n"));
+ bool ret = false;
+ if (false == message.isMember("id")) {
+ ret = true;
+ }
+ DBG_MSG(("Result: %d\n", ret));
+ return ret;
+ }
+
+ void CMessageBrokerController::sendNotification(Json::Value& message) {
+ DBG_MSG(("CMessageBroker::processNotification()\n"));
+ std::string methodName = message["method"].asString();
+ DBG_MSG(("Property: %s\n", methodName.c_str()));
+ std::vector<WebsocketSession*> result;
+ int subscribersCount = getSubscribersFd(methodName, result);
+ if (0 < subscribersCount) {
+ std::vector<WebsocketSession*>::iterator it;
+ for (it = result.begin(); it != result.end(); it++) {
+ (*it)->sendJsonMessage(message);
+ }
+ } else {
+ DBG_MSG(("No subscribers for this property!\n"));
+ }
+
+ }
+
+ bool CMessageBrokerController::isResponse(Json::Value& message) {
+ DBG_MSG(("CMessageBroker::isResponse()\n"));
+ bool ret = false;
+ if ((true == message.isMember("result")) || (true == message.isMember("error"))) {
+ ret = true;
+ }
+ DBG_MSG(("Result: %d\n", ret));
+ return ret;
+ }
+
+ void CMessageBrokerController::sendResponse(Json::Value& message) {
+ WebsocketSession* ws;
+ std::map <std::string, WebsocketSession*>::iterator it;
+ sync_primitives::AutoLock request_lock(mRequestListLock);
+
+ std::string id = message["id"].asString();
+ it = mRequestList.find(id);
+ if (it != mRequestList.end()) {
+ ws = it->second;
+ ws->sendJsonMessage(message);
+ mRequestList.erase(it);
+ }
+ }
+
+ void CMessageBrokerController::sendJsonMessage(Json::Value& message){
+
+ if(isNotification(message)) {
+ sendNotification(message);
+ return;
+ } else if (isResponse(message)) {
+ sendResponse(message);
+ return;
+ }
+
+ //Send request
+ WebsocketSession* ws;
+ std::map <std::string, WebsocketSession*>::iterator it;
+ std::string method = message["method"].asString();
+ std::string component_name = GetComponentName(method);
+
+ sync_primitives::AutoLock lock(mControllersListLock);
+ it = mControllersList.find(component_name);
+ if (it != mControllersList.end()) {
+ ws = it->second;
+ ws->sendJsonMessage(message);
+ }
+ }
+
+ void CMessageBrokerController::subscribeTo(std::string property){}
+
+ void CMessageBrokerController::registerController(int id){}
+
+ void CMessageBrokerController::unregisterController(){}
+
+ void* CMessageBrokerController::MethodForReceiverThread(void * arg){
+ return NULL;
+ }
+
+ bool CMessageBrokerController::Connect(){
+ return true;
+ }
+
+ void CMessageBrokerController::exitReceivingThread(){
+ sync_primitives::AutoLock lock(mControllersListLock);
+ shutdown_ = true;
+ std::map <std::string, WebsocketSession*>::iterator it;
+ for(it = mControllersList.begin(); it != mControllersList.end(); it++) {
+ if(!it->second->IsShuttingDown()) {
+ it->second->Shutdown();
+ }
+ }
+ boost::system::error_code ec;
+ socket_.close();
+ acceptor_.cancel(ec);
+ if(ec) {
+ std::cerr << "ErrorMessage Cancel: " << ": " << ec.message() << "\n";
+ }
+ acceptor_.close(ec);
+ if(ec) {
+ std::cerr << "ErrorMessage Close: " << ": " << ec.message() << "\n";
+ }
+ ioc_.stop();
+ }
+
+ std::string CMessageBrokerController::getMethodName(std::string& method) {
+ std::string return_string="";
+ if(method != "") {
+ int position = method.find(".");
+ if(position != -1) {
+ return_string = method.substr(position + 1);
+ }
+ }
+ return return_string;
+ }
+
+ std::string CMessageBrokerController::GetComponentName(std::string& method) {
+ std::string return_string="";
+ if(method != "") {
+ int position = method.find(".");
+ if(position != -1) {
+ return_string = method.substr(0, position);
+ }
+ }
+ return return_string;
+ }
+
+ bool CMessageBrokerController::addController(WebsocketSession* ws_session, std::string name){
+ bool result = false;
+ std::map <std::string, WebsocketSession*>::iterator it;
+
+ sync_primitives::AutoLock lock(mControllersListLock);
+ it = mControllersList.find(name);
+ if (it == mControllersList.end()) {
+ mControllersList.insert(std::map <std::string, WebsocketSession*>::value_type(name, ws_session));
+ result = true;
+ } else {
+ DBG_MSG(("Controller already exists!\n"));
+ }
+
+ DBG_MSG(("Count of controllers: %zu\n", mControllersList.size()));
+ return result;
+ }
+
+ void CMessageBrokerController::deleteController(WebsocketSession* ws_session) {
+ {
+ sync_primitives::AutoLock lock(mControllersListLock);
+ std::map <std::string, WebsocketSession*>::iterator it;
+ for(it = mControllersList.begin(); it != mControllersList.end(); it++) {
+ if(it->second == ws_session) {
+ mControllersList.erase(it);
+ }
+ }
+ }
+ removeSubscribersBySession(ws_session);
+ }
+
+ void CMessageBrokerController::deleteController(std::string name) {
+ std::map <std::string, WebsocketSession*>::iterator it;
+ WebsocketSession* ws;
+ {
+ sync_primitives::AutoLock lock(mControllersListLock);
+ it = mControllersList.find(name);
+ if (it != mControllersList.end())
+ {
+ ws = it->second;
+ mControllersList.erase(it);
+ } else {
+ return;
+ }
+ }
+ removeSubscribersBySession(ws);
+ }
+
+ void CMessageBrokerController::removeSubscribersBySession(const WebsocketSession* ws) {
+ sync_primitives::AutoLock lock(mSubscribersListLock);
+ std::multimap <std::string, WebsocketSession*>::iterator it_s = mSubscribersList.begin();
+ for (; it_s !=mSubscribersList.end(); ) {
+ if (it_s->second == ws) {
+ mSubscribersList.erase(it_s++);
+ } else {
+ ++it_s;
+ }
+ }
+ }
+
+ void CMessageBrokerController::pushRequest(Json::Value& message, WebsocketSession* ws_session) {
+ sync_primitives::AutoLock lock(mRequestListLock);
+ std::string id = message["id"].asString();
+ mRequestList.insert(std::map <std::string, WebsocketSession*>::value_type(id, ws_session));
+ }
+
+ bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session, std::string name) {
+ bool result = true;
+ sync_primitives::AutoLock lock(mSubscribersListLock);
+ std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name);
+ if (p.first != p.second)
+ {
+ std::multimap <std::string, WebsocketSession*>::iterator itr;
+ for (itr = p.first; itr != p.second; itr++)
+ {
+ if (ws_session == itr->second)
+ {
+ result = false;
+ DBG_MSG(("Subscriber already exists!\n"));
+ }
+ }
+ }
+ if (result)
+ {
+ mSubscribersList.insert(std::map <std::string, WebsocketSession*>::value_type(name, ws_session));
+ }
+ DBG_MSG(("Count of subscribers: %zu\n", mSubscribersList.size()));
+ return result;
+ }
+
+ void CMessageBrokerController::deleteSubscriber(WebsocketSession* ws, std::string name) {
+
+ sync_primitives::AutoLock lock(mSubscribersListLock);
+ std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name);
+ if (p.first != p.second) {
+ std::multimap <std::string, WebsocketSession*>::iterator itr;
+ for (itr = p.first; itr != p.second; ) {
+ if (ws == itr->second) {
+ mSubscribersList.erase(itr++);
+ } else {
+ ++itr;
+ }
+ }
+ }
+
+ DBG_MSG(("Count of subscribers: %zu\n", mSubscribersList.size()));
+ }
+
+ int CMessageBrokerController::getSubscribersFd(std::string name, std::vector<WebsocketSession*>& result) {
+ DBG_MSG(("CMessageBrokerRegistry::getSubscribersFd()\n"));
+ int res = 0;
+ std::map <std::string, WebsocketSession*>::iterator it;
+
+ sync_primitives::AutoLock lock(mSubscribersListLock);
+ std::pair<std::multimap <std::string, WebsocketSession*>::iterator, std::multimap <std::string, WebsocketSession*>::iterator> p = mSubscribersList.equal_range(name);
+ if (p.first != p.second)
+ {
+ std::multimap <std::string, WebsocketSession*>::iterator itr;
+ for (itr = p.first; itr != p.second; itr++)
+ {
+ result.push_back(itr->second);
+ DBG_MSG(("Controllers Fd: %d\n", itr->second));
+ }
+ }
+
+ res = result.size();
+ DBG_MSG(("Result vector size: %d\n", res));
+ return res;
+ }
+
+ void CMessageBrokerController::processInternalRequest(Json::Value& message, WebsocketSession* ws_session){
+ std::string method = message["method"].asString();
+ std::string methodName = getMethodName(method);
+ if (methodName == "registerComponent") {
+ Json::Value params = message["params"];
+ if (params.isMember("componentName") && params["componentName"].isString()) {
+ std::string controllerName = params["componentName"].asString();
+ if (addController(ws_session, controllerName)) {
+ Json::Value response;
+ response["id"] = message["id"];
+ response["jsonrpc"] = "2.0";
+ response["result"] = getNextControllerIdDiapason();
+ ws_session->sendJsonMessage(response);
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = CONTROLLER_EXISTS;
+ err["message"] = "Controller has been already registered.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Wrong method parameter.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else if (methodName == "subscribeTo") {
+ Json::Value params = message["params"];
+ if (params.isMember("propertyName") && params["propertyName"].isString()) {
+ std::string propertyName = params["propertyName"].asString();
+ if (addSubscriber(ws_session, propertyName)) {
+ Json::Value response;
+ response["id"] = message["id"];
+ response["jsonrpc"] = "2.0";
+ response["result"] = "OK";
+ ws_session->sendJsonMessage(response);
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = CONTROLLER_EXISTS;
+ err["message"] = "Subscribe has been already registered.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Wrong method parameter.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+
+ } else if (methodName == "unregisterComponent" ) {
+ Json::Value params = message["params"];
+ if (params.isMember("componentName") && params["componentName"].isString()) {
+ std::string controllerName = params["componentName"].asString();
+ deleteController(controllerName);
+ Json::Value response;
+ response["id"] = message["id"];
+ response["jsonrpc"] = "2.0";
+ response["result"] = "OK";
+ ws_session->sendJsonMessage(response);
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Wrong method parameter.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else if (methodName == "unsubscribeFrom") {
+ Json::Value params = message["params"];
+ if (params.isMember("propertyName") && params["propertyName"].isString()) {
+ std::string propertyName = params["propertyName"].asString();
+ deleteSubscriber(ws_session, propertyName);
+ Json::Value response;
+ response["id"] = message["id"];
+ response["jsonrpc"] = "2.0";
+ response["result"] = "OK";
+ ws_session->sendJsonMessage(response);
+ } else {
+ Json::Value error, err;
+ error["id"] = message["id"];
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Wrong method parameter.";
+ error["error"] = err;
+ ws_session->sendJsonMessage(error);
+ }
+ } else {
+
+ }
+ }
+
+ int CMessageBrokerController::getNextControllerIdDiapason() {
+ return 1000 * mControllersIdCounter++;
+ }
+
+}
diff --git a/src/components/hmi_message_handler/src/messagebroker_adapter.cc b/src/components/hmi_message_handler/src/messagebroker_adapter.cc
index ff5a6d687e..337b3c73ca 100644
--- a/src/components/hmi_message_handler/src/messagebroker_adapter.cc
+++ b/src/components/hmi_message_handler/src/messagebroker_adapter.cc
@@ -43,9 +43,9 @@ typedef NsMessageBroker::CMessageBrokerController MessageBrokerController;
MessageBrokerAdapter::MessageBrokerAdapter(HMIMessageHandler* handler_param,
const std::string& server_address,
- uint16_t port)
+ uint16_t port, boost::asio::io_context& ioc)
: HMIMessageAdapterImpl(handler_param)
- , MessageBrokerController(server_address, port, "SDL") {
+ , MessageBrokerController(server_address, port, "SDL", 8, ioc) {
LOG4CXX_TRACE(logger_, "Created MessageBrokerAdapter");
}
@@ -153,6 +153,7 @@ void* MessageBrokerAdapter::SubscribeAndBeginReceiverThread(void* param) {
void MessageBrokerAdapter::ProcessRecievedFromMB(Json::Value& root) {
LOG4CXX_AUTO_TRACE(logger_);
+ LOG4CXX_INFO(logger_, "MB_Adapter: " << root);
if (root.isNull()) {
// LOG
return;
diff --git a/src/components/hmi_message_handler/src/websocket_session.cc b/src/components/hmi_message_handler/src/websocket_session.cc
new file mode 100644
index 0000000000..3484278c1a
--- /dev/null
+++ b/src/components/hmi_message_handler/src/websocket_session.cc
@@ -0,0 +1,334 @@
+
+#include "hmi_message_handler/websocket_session.h"
+#include "hmi_message_handler/mb_controller.h"
+#include <unistd.h>
+using namespace boost::beast::websocket;
+namespace NsMessageBroker
+{
+ WebsocketSession::WebsocketSession(boost::asio::ip::tcp::socket socket, CMessageBrokerController* controller) :
+ ws_(std::move(socket)),
+ strand_(ws_.get_executor()),
+ controller_(controller),
+ stop(false),
+ m_receivingBuffer(""),
+ mControllersIdStart(-1),
+ mControllersIdCurrent(0),
+ shutdown_(false),
+ thread_delegate_(new LoopThreadDelegate(&message_queue_, this)),
+ thread_(threads::CreateThread("WS Async Send", thread_delegate_)){
+ thread_->start(threads::ThreadOptions());
+ }
+
+ WebsocketSession::~WebsocketSession(){}
+
+ void WebsocketSession::Accept() {
+ ws_.async_accept(
+ boost::asio::bind_executor(
+ strand_,
+ std::bind(
+ &WebsocketSession::Recv,
+ shared_from_this(), std::placeholders::_1)));
+ }
+
+ void WebsocketSession::Close() {
+ ws_.async_close({}, [](boost::system::error_code){});
+ }
+
+ void WebsocketSession::Shutdown() {
+ shutdown_ = true;
+ thread_delegate_->SetShutdown();
+ thread_->join();
+ delete thread_delegate_;
+ threads::DeleteThread(thread_);
+ Close();
+
+ }
+
+ bool WebsocketSession::IsShuttingDown() {
+ return shutdown_;
+ }
+
+ void WebsocketSession::Recv(boost::system::error_code ec) {
+ if(shutdown_) {
+ return;
+ }
+
+ if(ec){
+ std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n";
+ shutdown_ = true;
+ thread_delegate_->SetShutdown();
+ controller_->deleteController(this);
+ return;
+
+ }
+
+ ws_.async_read(
+ buffer_,
+ boost::asio::bind_executor(
+ strand_,
+ std::bind(
+ &WebsocketSession::Read,
+ shared_from_this(),
+ std::placeholders::_1,
+ std::placeholders::_2
+ )
+ )
+ );
+ }
+
+ void WebsocketSession::Send(std::string& message, Json::Value& json_message) {
+ if (shutdown_) {
+ return;
+ }
+ std::shared_ptr<std::string> message_ptr = std::make_shared<std::string>( message );
+ message_queue_.push(message_ptr);
+ }
+
+ void WebsocketSession::sendJsonMessage(Json::Value& message) {
+ std::string str_msg = m_writer.write(message);
+ sync_primitives::AutoLock auto_lock(queue_lock_);
+ if (!isNotification(message) && !isResponse(message)) {
+ mWaitResponseQueue.insert(std::map<std::string, std::string>::value_type(message["id"].asString(), message["method"].asString()));
+ }
+
+ Send(str_msg, message);
+ }
+
+ void WebsocketSession::Read(boost::system::error_code ec, std::size_t bytes_transferred) {
+ boost::ignore_unused(bytes_transferred);
+ if(ec){
+ std::cerr << "ErrorMessage: " << ": " << ec.message() << "\n";
+ shutdown_ = true;
+ thread_delegate_->SetShutdown();
+ controller_->deleteController(this);
+ buffer_.consume(buffer_.size());
+ return;
+ }
+
+ std::string data = boost::beast::buffers_to_string(buffer_.data());
+ m_receivingBuffer += data;
+
+ Json::Value root;
+ if (!m_reader.parse(m_receivingBuffer, root))
+ {
+ std::cerr << "Invalid JSON Message" << ": " << data << "\n";
+ return;
+ }
+
+ std::string wmes = m_receiverWriter.write(root);
+ ssize_t beginpos = m_receivingBuffer.find(wmes);
+ if (-1 != beginpos)
+ {
+ m_receivingBuffer.erase(0, beginpos + wmes.length());
+ DBG_MSG(("Buffer after cut is:%s\n", m_receivingBuffer.c_str()));
+ } else
+ {
+ m_receivingBuffer.clear();
+ }
+ onMessageReceived(root);
+
+
+ buffer_.consume(buffer_.size());
+
+ Recv(ec);
+
+ }
+
+ std::string WebsocketSession::GetComponentName(std::string& method) {
+ std::string return_string="";
+ if(method != "") {
+ int position = method.find(".");
+ if(position != -1) {
+ return_string = method.substr(0, position);
+ }
+ }
+ return return_string;
+ }
+
+ void WebsocketSession::onMessageReceived(Json::Value message) {
+ // Determine message type and process...
+ Json::Value error;
+ if (checkMessage(message, error)) {
+ if (isNotification(message)) {
+ DBG_MSG(("Message is notification!\n"));
+ controller_->processNotification(message);
+ } else if (isResponse(message)) {
+ std::string id = message["id"].asString();
+ std::string method = findMethodById(id);
+ DBG_MSG(("Message is response on: %s\n", method.c_str()));
+ if ("" != method) {
+ if ("MB.registerComponent" == method) {
+ if (message.isMember("result") && message["result"].isInt())
+ {
+ mControllersIdStart = message["result"].asInt();
+ } else
+ {
+ DBG_MSG_ERROR(("Not possible to initialize mControllersIdStart!\n"));
+ }
+ } else if ("MB.subscribeTo" == method || "MB.unregisterComponent" == method || "MB.unsubscribeFrom" == method)
+ {
+ //nothing to do for now
+ } else
+ {
+ controller_->processResponse(method, message);
+ }
+ } else {
+ DBG_MSG_ERROR(("Request with id %s has not been found!\n", id.c_str()));
+ }
+ } else {
+ DBG_MSG(("Message is request!\n"));
+ std::string method = message["method"].asString();
+ std::string component_name = GetComponentName(method);
+
+ if(component_name == "MB") {
+ controller_->processInternalRequest(message, this);
+ } else {
+ controller_->pushRequest(message, this);
+ controller_->processRequest(message);
+ }
+ }
+ } else {
+ DBG_MSG_ERROR(("Message contains wrong data!\n"));
+ sendJsonMessage(error);
+ }
+ }
+
+ bool WebsocketSession::isNotification(Json::Value& root) {
+ DBG_MSG(("CMessageBrokerController::isNotification()\n"));
+ bool ret = false;
+ if (false == root.isMember("id"))
+ {
+ ret = true;
+ }
+ DBG_MSG(("Result: %d\n", ret));
+ return ret;
+ }
+
+ bool WebsocketSession::isResponse(Json::Value& root) {
+ DBG_MSG(("CMessageBrokerController::isResponse()\n"));
+ bool ret = false;
+ if ((true == root.isMember("result")) || (true == root.isMember("error")))
+ {
+ ret = true;
+ }
+ DBG_MSG(("Result: %d\n", ret));
+ return ret;
+ }
+
+ std::string WebsocketSession::findMethodById(std::string id) {
+ DBG_MSG(("CMessageBrokerController::findMethodById()\n"));
+ sync_primitives::AutoLock auto_lock(queue_lock_);
+ std::string res = "";
+ std::map <std::string, std::string>::iterator it;
+ it = mWaitResponseQueue.find(id);
+ if (it != mWaitResponseQueue.end())
+ {
+ res = (*it).second;
+ mWaitResponseQueue.erase(it);
+ }
+ return res;
+ }
+
+ bool WebsocketSession::checkMessage(Json::Value& root, Json::Value& error)
+ {
+ DBG_MSG(("CMessageBrokerController::checkMessage()\n"));
+ Json::Value err;
+
+ try
+ {
+ /* check the JSON-RPC version => 2.0 */
+ if (!root.isObject() || !root.isMember("jsonrpc") || root["jsonrpc"] != "2.0")
+ {
+ error["id"] = Json::Value::null;
+ error["jsonrpc"] = "2.0";
+ err["code"] = NsMessageBroker::INVALID_REQUEST;
+ err["message"] = "Invalid MessageBroker request.";
+ error["error"] = err;
+ return false;
+ }
+
+ if (root.isMember("id") && (root["id"].isArray() || root["id"].isObject()))
+ {
+ error["id"] = Json::Value::null;
+ error["jsonrpc"] = "2.0";
+ err["code"] = NsMessageBroker::INVALID_REQUEST;
+ err["message"] = "Invalid MessageBroker request.";
+ error["error"] = err;
+ return false;
+ }
+
+ if (root.isMember("result") && root.isMember("error"))
+ {
+ /* message can't contain simultaneously result and error*/
+ return false;
+ }
+
+ if (root.isMember("method"))
+ {
+ if (!root["method"].isString())
+ {
+ error["id"] = Json::Value::null;
+ error["jsonrpc"] = "2.0";
+ err["code"] = NsMessageBroker::INVALID_REQUEST;
+ err["message"] = "Invalid MessageBroker request.";
+ error["error"] = err;
+ return false;
+ }
+ /* Check the params is an object*/
+ if (root.isMember("params") && !root["params"].isObject())
+ {
+ error["id"] = Json::Value::null;
+ error["jsonrpc"] = "2.0";
+ err["code"] = INVALID_REQUEST;
+ err["message"] = "Invalid JSONRPC params.";
+ error["error"] = err;
+ return false;
+ }
+ } else if (!root.isMember("result") && !root.isMember("error"))
+ {
+ return false;
+ }
+ return true;
+ } catch (...)
+ {
+ DBG_MSG_ERROR(("CMessageBrokerController::checkMessage() EXCEPTION has been caught!\n"));
+ return false;
+ }
+ }
+
+ WebsocketSession::LoopThreadDelegate::LoopThreadDelegate(
+ MessageQueue<Message, AsyncQueue>* message_queue,
+ WebsocketSession* handler)
+ : message_queue_(*message_queue),
+ handler_(*handler),
+ shutdown_(false) {
+ }
+
+ void WebsocketSession::LoopThreadDelegate::threadMain() {
+ while(!message_queue_.IsShuttingDown() && !shutdown_) {
+ DrainQueue();
+ message_queue_.wait();
+ }
+ DrainQueue();
+ }
+
+ void WebsocketSession::LoopThreadDelegate::exitThreadMain() {
+ shutdown_= true;
+ message_queue_.Shutdown();
+ }
+
+ void WebsocketSession::LoopThreadDelegate::DrainQueue() {
+ while(!message_queue_.empty()) {
+ Message message_ptr;
+ message_queue_.pop(message_ptr);
+ if(!shutdown_) {
+ handler_.ws_.write(boost::asio::buffer(*message_ptr)) ;
+ };
+ }
+ }
+
+ void WebsocketSession::LoopThreadDelegate::SetShutdown(){
+ shutdown_ = true;
+ message_queue_.Shutdown();
+ }
+} \ No newline at end of file
diff --git a/src/components/hmi_message_handler/test/CMakeLists.txt b/src/components/hmi_message_handler/test/CMakeLists.txt
index 0d7ccd9ee1..9d72f3d15e 100644
--- a/src/components/hmi_message_handler/test/CMakeLists.txt
+++ b/src/components/hmi_message_handler/test/CMakeLists.txt
@@ -38,10 +38,7 @@ include_directories (
)
if (HMIADAPTER STREQUAL "messagebroker")
- set (BROKER_LIBRARIES
- message_broker_client
- message_broker_server
- )
+ add_dependencies("hmi_message_handler_test" Boost)
endif()
set(EXCLUDE_PATHS)
diff --git a/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc b/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc
index 33be4a9228..bf44c3e2d1 100644
--- a/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc
+++ b/src/components/hmi_message_handler/test/hmi_message_handler_impl_test.cc
@@ -52,11 +52,12 @@ class HMIMessageHandlerImplTest : public ::testing::Test {
: mb_adapter_(NULL)
, hmi_handler_(NULL)
, mock_hmi_message_observer_(NULL) {}
-
+ boost::asio::io_context ioc_;
protected:
hmi_message_handler::MessageBrokerAdapter* mb_adapter_;
hmi_message_handler::HMIMessageHandlerImpl* hmi_handler_;
MockHMIMessageObserver* mock_hmi_message_observer_;
+
testing::NiceMock<MockHMIMessageHandlerSettings>
mock_hmi_message_handler_settings;
const uint64_t stack_size = 1000u;
@@ -68,7 +69,7 @@ class HMIMessageHandlerImplTest : public ::testing::Test {
mock_hmi_message_handler_settings);
ASSERT_TRUE(NULL != hmi_handler_);
mb_adapter_ = new hmi_message_handler::MessageBrokerAdapter(
- hmi_handler_, "localhost", 22);
+ hmi_handler_, "127.0.0.1", 8087, ioc_);
ASSERT_TRUE(NULL != mb_adapter_);
mock_hmi_message_observer_ = new MockHMIMessageObserver();
ASSERT_TRUE(NULL != mock_hmi_message_observer_);
@@ -124,7 +125,7 @@ TEST_F(HMIMessageHandlerImplTest,
AddHMIMessageAdapter_AddExistedAdapter_ExpectAdded) {
// Check before action
EXPECT_TRUE(hmi_handler_->message_adapters().empty());
- // Act
+ // Act
hmi_handler_->AddHMIMessageAdapter(mb_adapter_);
// Check after action
EXPECT_EQ(1u, hmi_handler_->message_adapters().size());