summaryrefslogtreecommitdiff
path: root/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h')
-rw-r--r--src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h156
1 files changed, 156 insertions, 0 deletions
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
new file mode 100644
index 0000000000..98d5260259
--- /dev/null
+++ b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
@@ -0,0 +1,156 @@
+#ifndef MB_CONTROLLER_H
+#define MB_CONTROLLER_H
+
+#include <iostream>
+#include <boost/beast/core.hpp>
+#include <boost/beast/websocket.hpp>
+#include <boost/asio/bind_executor.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/asio/placeholders.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp>
+#include <algorithm>
+#include <cstdlib>
+#include <functional>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+#include <map>
+#include "json/json.h"
+#include "utils/macro.h"
+#include "utils/lock.h"
+#include "utils/atomic_object.h"
+#include "websocket_session.h"
+
+using namespace boost::beast::websocket;
+
+namespace hmi_message_handler {
+
+CREATE_LOGGERPTR_GLOBAL(mb_logger_, "HMIMessageHandler")
+
+enum ErrorCode {
+ CONTROLLER_EXISTS = -32000,
+ SUBSCRIBTION_EXISTS = -32001,
+ INVALID_REQUEST = -32600
+};
+
+class WebsocketSession;
+
+class CMessageBrokerController
+ : public std::enable_shared_from_this<CMessageBrokerController> {
+ public:
+ CMessageBrokerController(const std::string& address,
+ uint16_t port,
+ std::string name,
+ int num_ports);
+
+ ~CMessageBrokerController();
+
+ bool StartListener();
+
+ bool Run();
+
+ void WaitForConnection();
+
+ void StartSession(boost::system::error_code ec);
+
+ void OnAccept(
+ boost::system::error_code ec,
+ boost::asio::strand<boost::asio::io_context::executor_type>& strand,
+ stream<boost::asio::ip::tcp::socket>& ws);
+
+ void OnRead(boost::system::error_code ec, std::size_t bytes_transferred);
+
+ bool isNotification(Json::Value& message);
+
+ void sendNotification(Json::Value& message);
+
+ bool isResponse(Json::Value& message);
+
+ void sendResponse(Json::Value& message);
+
+ void sendJsonMessage(Json::Value& message);
+
+ void subscribeTo(std::string property);
+
+ void registerController(int id = 0);
+
+ void unregisterController();
+
+ void* MethodForReceiverThread(void* arg);
+
+ bool Connect();
+
+ void exitReceivingThread();
+
+ virtual void processResponse(std::string method, Json::Value& root) = 0;
+
+ virtual void processRequest(Json::Value& root) = 0;
+
+ virtual void processNotification(Json::Value& root) = 0;
+
+ std::string getMethodName(std::string& method);
+
+ std::string GetComponentName(std::string& method);
+
+ void processInternalRequest(Json::Value& message,
+ WebsocketSession* ws_session);
+
+ void pushRequest(Json::Value& message, WebsocketSession* ws_session);
+
+ // Registry
+ bool addController(WebsocketSession* ws_session, std::string name);
+
+ void deleteController(WebsocketSession* ws_session);
+
+ void deleteController(std::string name);
+
+ void removeSubscribersBySession(const WebsocketSession* ws);
+
+ bool addSubscriber(WebsocketSession* ws_session, std::string name);
+
+ void deleteSubscriber(WebsocketSession* ws, std::string name);
+
+ int getSubscribersFd(std::string name,
+ std::vector<WebsocketSession*>& result);
+
+ int getNextControllerId();
+
+ private:
+ boost::asio::io_context ioc_;
+ const std::string& address_;
+ uint16_t port_;
+ std::string name_;
+ int num_threads_;
+ std::vector<std::thread> thread_vector_;
+
+ boost::asio::ip::tcp::acceptor acceptor_;
+ boost::asio::ip::tcp::socket socket_;
+ boost::beast::multi_buffer buffer_;
+ boost::asio::ip::tcp::endpoint endpoint_;
+
+ int mControllersIdCounter;
+
+ // Registry
+ std::vector<std::shared_ptr<hmi_message_handler::WebsocketSession> >
+ mConnectionList;
+ sync_primitives::Lock mConnectionListLock;
+
+ std::map<std::string, WebsocketSession*> mControllersList;
+ sync_primitives::Lock mControllersListLock;
+
+ std::multimap<std::string, WebsocketSession*> mSubscribersList;
+ sync_primitives::Lock mSubscribersListLock;
+
+ std::map<std::string, WebsocketSession*> mRequestList;
+ sync_primitives::Lock mRequestListLock;
+
+ std::atomic_bool shutdown_;
+};
+
+} // hmi_message_handler
+
+#endif /* MB_CONTROLLER_H */ \ No newline at end of file