summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h31
-rw-r--r--src/components/hmi_message_handler/src/mb_controller.cc243
-rw-r--r--src/components/hmi_message_handler/src/websocket_session.cc8
3 files changed, 141 insertions, 141 deletions
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
index 6e0ad66199..2b7f9d6976 100644
--- a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
+++ b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h
@@ -66,8 +66,7 @@ enum ErrorCode {
class WebsocketSession;
-class CMessageBrokerController
- : public std::enable_shared_from_this<CMessageBrokerController> {
+class CMessageBrokerController {
public:
CMessageBrokerController(const std::string& address,
uint16_t port,
@@ -123,26 +122,26 @@ class CMessageBrokerController
std::string GetComponentName(std::string& method);
- void processInternalRequest(Json::Value& message,
- WebsocketSession* ws_session);
+ void processInternalRequest(const Json::Value& message,
+ WebsocketSession& ws_session);
- void pushRequest(Json::Value& message, WebsocketSession* ws_session);
+ bool pushRequest(Json::Value& message, WebsocketSession& ws_session);
// Registry
- bool addController(WebsocketSession* ws_session, std::string name);
+ bool addController(WebsocketSession& ws_session, const std::string& name);
- void deleteController(WebsocketSession* ws_session);
+ void deleteController(WebsocketSession& ws_session);
- void deleteController(std::string name);
+ void deleteController(const std::string& name);
- void removeSubscribersBySession(const WebsocketSession* ws);
+ void removeSubscribersBySession(const WebsocketSession& ws);
- bool addSubscriber(WebsocketSession* ws_session, std::string name);
+ bool addSubscriber(WebsocketSession& ws_session, const std::string& name);
- void deleteSubscriber(WebsocketSession* ws, std::string name);
+ void deleteSubscriber(const WebsocketSession& ws, const std::string& name);
- int getSubscribersFd(std::string name,
- std::vector<WebsocketSession*>& result);
+ std::vector<std::shared_ptr<WebsocketSession> > getSubscribersFd(
+ const std::string& name);
int getNextControllerId();
@@ -168,13 +167,13 @@ class CMessageBrokerController
mConnectionList;
sync_primitives::Lock mConnectionListLock;
- std::map<std::string, WebsocketSession*> mControllersList;
+ std::map<std::string, std::weak_ptr<WebsocketSession> > mControllersList;
sync_primitives::Lock mControllersListLock;
- std::multimap<std::string, WebsocketSession*> mSubscribersList;
+ std::multimap<std::string, std::weak_ptr<WebsocketSession> > mSubscribersList;
sync_primitives::Lock mSubscribersListLock;
- std::map<std::string, WebsocketSession*> mRequestList;
+ std::map<std::string, std::weak_ptr<WebsocketSession> > mRequestList;
sync_primitives::Lock mRequestListLock;
std::atomic_bool shutdown_;
diff --git a/src/components/hmi_message_handler/src/mb_controller.cc b/src/components/hmi_message_handler/src/mb_controller.cc
index b0388a5bf2..bea05cde1a 100644
--- a/src/components/hmi_message_handler/src/mb_controller.cc
+++ b/src/components/hmi_message_handler/src/mb_controller.cc
@@ -137,15 +137,13 @@ bool CMessageBrokerController::isNotification(Json::Value& message) {
void CMessageBrokerController::sendNotification(Json::Value& message) {
std::string methodName = message["method"].asString();
- std::vector<WebsocketSession*> result;
- int subscribersCount = getSubscribersFd(methodName, result);
- if (0 < subscribersCount) {
- std::vector<WebsocketSession*>::iterator it;
- for (it = result.begin(); it != result.end(); ++it) {
- (*it)->sendJsonMessage(message);
- }
- } else {
- SDL_LOG_ERROR(("No subscribers for this property!\n"));
+ const auto result = getSubscribersFd(methodName);
+ if (result.empty()) {
+ SDL_LOG_ERROR("No subscribers for method: " << methodName);
+ }
+
+ for (const auto& ws : result) {
+ ws->sendJsonMessage(message);
}
}
@@ -159,15 +157,22 @@ bool CMessageBrokerController::isResponse(Json::Value& message) {
}
void CMessageBrokerController::sendResponse(Json::Value& message) {
- std::map<std::string, WebsocketSession*>::iterator it;
- sync_primitives::AutoLock request_lock(mRequestListLock);
-
+ std::weak_ptr<WebsocketSession> weak_ws;
std::string id = message["id"].asString();
- it = mRequestList.find(id);
- if (it != mRequestList.end()) {
- WebsocketSession* ws = it->second;
+
+ {
+ sync_primitives::AutoLock request_lock(mRequestListLock);
+ const auto it = mRequestList.find(id);
+ if (it != mRequestList.end()) {
+ std::swap(weak_ws, it->second);
+ mRequestList.erase(it);
+ }
+ }
+
+ if (auto ws = weak_ws.lock()) {
ws->sendJsonMessage(message);
- mRequestList.erase(it);
+ } else {
+ SDL_LOG_ERROR("A request is not found for id: " << id);
}
}
@@ -181,16 +186,29 @@ void CMessageBrokerController::sendJsonMessage(Json::Value& message) {
}
// Send request
- std::map<std::string, WebsocketSession*>::iterator it;
+ std::shared_ptr<WebsocketSession> ws;
std::string method = message["method"].asString();
std::string component_name = GetComponentName(method);
- sync_primitives::AutoLock lock(mControllersListLock);
- it = mControllersList.find(component_name);
- if (it != mControllersList.end()) {
- WebsocketSession* ws = it->second;
- ws->sendJsonMessage(message);
+ {
+ sync_primitives::AutoLock lock(mControllersListLock);
+ const auto it = mControllersList.find(component_name);
+ if (it != mControllersList.end()) {
+ ws = it->second.lock();
+ if (!ws) {
+ // Clear expired
+ mControllersList.erase(it);
+ }
+ }
}
+
+ if (!ws) {
+ SDL_LOG_ERROR(
+ "A controller is not found for the method: " << component_name);
+ return;
+ }
+
+ ws->sendJsonMessage(message);
}
void CMessageBrokerController::subscribeTo(std::string property) {}
@@ -243,138 +261,121 @@ std::string CMessageBrokerController::GetComponentName(std::string& method) {
return return_string;
}
-bool CMessageBrokerController::addController(WebsocketSession* ws_session,
- std::string name) {
- bool result = false;
- std::map<std::string, WebsocketSession*>::iterator it;
-
+bool CMessageBrokerController::addController(WebsocketSession& ws_session,
+ const std::string& name) {
sync_primitives::AutoLock lock(mControllersListLock);
- it = mControllersList.find(name);
- if (it == mControllersList.end()) {
- mControllersList.insert(
- std::map<std::string, WebsocketSession*>::value_type(name, ws_session));
- result = true;
- } else {
- SDL_LOG_ERROR(("Controller already exists!\n"));
- }
- return result;
+ return mControllersList.emplace(name, ws_session.shared_from_this()).second;
}
-void CMessageBrokerController::deleteController(WebsocketSession* ws_session) {
- {
- sync_primitives::AutoLock lock(mControllersListLock);
- std::map<std::string, WebsocketSession*>::iterator it;
- for (it = mControllersList.begin(); it != mControllersList.end();) {
- if (it->second == ws_session) {
- mControllersList.erase(it++);
- } else {
- ++it;
- }
+void CMessageBrokerController::deleteController(WebsocketSession& ws_session) {
+ removeSubscribersBySession(ws_session);
+ sync_primitives::AutoLock lock(mControllersListLock);
+ for (auto it = mControllersList.cbegin(); it != mControllersList.cend();) {
+ const std::shared_ptr<WebsocketSession> ws = it->second.lock();
+ if (!ws || ws.get() == &ws_session) {
+ it = mControllersList.erase(it);
+ } else {
+ ++it;
}
}
- removeSubscribersBySession(ws_session);
}
-void CMessageBrokerController::deleteController(std::string name) {
- std::map<std::string, WebsocketSession*>::iterator it;
- WebsocketSession* ws;
+void CMessageBrokerController::deleteController(const std::string& name) {
+ std::weak_ptr<WebsocketSession> weak_ws;
{
sync_primitives::AutoLock lock(mControllersListLock);
- it = mControllersList.find(name);
+ const auto it = mControllersList.find(name);
if (it != mControllersList.end()) {
- ws = it->second;
+ std::swap(weak_ws, it->second);
mControllersList.erase(it);
- } else {
- return;
}
}
- removeSubscribersBySession(ws);
+
+ if (auto ws = weak_ws.lock()) {
+ removeSubscribersBySession(*ws);
+ } else {
+ SDL_LOG_ERROR("A controller is not found for the method: " << name);
+ }
}
void CMessageBrokerController::removeSubscribersBySession(
- const WebsocketSession* ws) {
+ const WebsocketSession& ws) {
sync_primitives::AutoLock lock(mSubscribersListLock);
- std::multimap<std::string, WebsocketSession*>::iterator it_s =
- mSubscribersList.begin();
- for (; it_s != mSubscribersList.end();) {
- if (it_s->second == ws) {
- mSubscribersList.erase(it_s++);
+ for (auto it_s = mSubscribersList.cbegin();
+ it_s != mSubscribersList.cend();) {
+ auto ws_session = it_s->second.lock();
+ if (!ws_session || ws_session.get() == &ws) {
+ it_s = mSubscribersList.erase(it_s);
} else {
++it_s;
}
}
}
-void CMessageBrokerController::pushRequest(Json::Value& message,
- WebsocketSession* ws_session) {
- sync_primitives::AutoLock lock(mRequestListLock);
+bool CMessageBrokerController::pushRequest(Json::Value& message,
+ WebsocketSession& ws_session) {
std::string id = message["id"].asString();
- mRequestList.insert(
- std::map<std::string, WebsocketSession*>::value_type(id, ws_session));
+ sync_primitives::AutoLock lock(mRequestListLock);
+ return mRequestList.emplace(std::move(id), ws_session.shared_from_this())
+ .second;
}
-bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session,
- std::string name) {
- bool result = true;
+bool CMessageBrokerController::addSubscriber(WebsocketSession& ws_session,
+ const std::string& name) {
sync_primitives::AutoLock lock(mSubscribersListLock);
- std::pair<std::multimap<std::string, WebsocketSession*>::iterator,
- std::multimap<std::string, WebsocketSession*>::iterator>
- p = mSubscribersList.equal_range(name);
- if (p.first != p.second) {
- std::multimap<std::string, WebsocketSession*>::iterator itr;
- for (itr = p.first; itr != p.second; ++itr) {
- if (ws_session == itr->second) {
- result = false;
- SDL_LOG_ERROR(("Subscriber already exists!\n"));
- }
+ auto p = mSubscribersList.equal_range(name);
+ for (auto it = p.first; it != p.second;) {
+ const std::shared_ptr<WebsocketSession> ws = it->second.lock();
+ if (!ws) {
+ // Clear expired
+ it = mSubscribersList.erase(it);
+ } else if (ws.get() == &ws_session) {
+ // Found an element: {name, ws_session}
+ return false;
+ } else {
+ ++it;
}
}
- if (result) {
- mSubscribersList.insert(
- std::map<std::string, WebsocketSession*>::value_type(name, ws_session));
- }
- return result;
+
+ // Not found an element: {name, ws_session}
+ mSubscribersList.emplace_hint(p.second, name, ws_session.shared_from_this());
+ return true;
}
-void CMessageBrokerController::deleteSubscriber(WebsocketSession* ws,
- std::string name) {
+void CMessageBrokerController::deleteSubscriber(const WebsocketSession& ws,
+ const std::string& name) {
sync_primitives::AutoLock lock(mSubscribersListLock);
- std::pair<std::multimap<std::string, WebsocketSession*>::iterator,
- std::multimap<std::string, WebsocketSession*>::iterator>
- p = mSubscribersList.equal_range(name);
- if (p.first != p.second) {
- std::multimap<std::string, WebsocketSession*>::iterator itr;
- for (itr = p.first; itr != p.second;) {
- if (ws == itr->second) {
- mSubscribersList.erase(itr++);
- } else {
- ++itr;
- }
+ auto p = mSubscribersList.equal_range(name);
+ for (auto it = p.first; it != p.second;) {
+ const std::shared_ptr<WebsocketSession> ws_session = it->second.lock();
+ if (!ws_session || &ws == ws_session.get()) {
+ it = mSubscribersList.erase(it);
+ } else {
+ ++it;
}
}
}
-int CMessageBrokerController::getSubscribersFd(
- std::string name, std::vector<WebsocketSession*>& result) {
- int res = 0;
-
+std::vector<std::shared_ptr<WebsocketSession> >
+CMessageBrokerController::getSubscribersFd(const std::string& name) {
+ std::vector<std::shared_ptr<WebsocketSession> > result;
sync_primitives::AutoLock lock(mSubscribersListLock);
- std::pair<std::multimap<std::string, WebsocketSession*>::iterator,
- std::multimap<std::string, WebsocketSession*>::iterator>
- p = mSubscribersList.equal_range(name);
- if (p.first != p.second) {
- std::multimap<std::string, WebsocketSession*>::iterator itr;
- for (itr = p.first; itr != p.second; ++itr) {
- result.push_back(itr->second);
+ auto p = mSubscribersList.equal_range(name);
+ for (auto it = p.first; it != p.second;) {
+ if (std::shared_ptr<WebsocketSession> ws = it->second.lock()) {
+ result.push_back(std::move(ws));
+ ++it;
+ } else {
+ // Clear expired
+ it = mSubscribersList.erase(it);
}
}
- res = result.size();
- return res;
+ return result;
}
void CMessageBrokerController::processInternalRequest(
- Json::Value& message, WebsocketSession* ws_session) {
+ const Json::Value& message, WebsocketSession& ws_session) {
std::string method = message["method"].asString();
std::string methodName = getMethodName(method);
if (methodName == "registerComponent") {
@@ -387,7 +388,7 @@ void CMessageBrokerController::processInternalRequest(
response["id"] = message["id"];
response["jsonrpc"] = "2.0";
response["result"] = getNextControllerId();
- ws_session->sendJsonMessage(response);
+ ws_session.sendJsonMessage(response);
} else {
Json::Value error, err;
error["id"] = message["id"];
@@ -395,7 +396,7 @@ void CMessageBrokerController::processInternalRequest(
err["code"] = CONTROLLER_EXISTS;
err["message"] = "Controller has been already registered.";
error["error"] = err;
- ws_session->sendJsonMessage(error);
+ ws_session.sendJsonMessage(error);
}
} else {
Json::Value error, err;
@@ -404,7 +405,7 @@ void CMessageBrokerController::processInternalRequest(
err["code"] = INVALID_REQUEST;
err["message"] = "Wrong method parameter.";
error["error"] = err;
- ws_session->sendJsonMessage(error);
+ ws_session.sendJsonMessage(error);
}
} else if (methodName == "subscribeTo") {
Json::Value params = message["params"];
@@ -415,7 +416,7 @@ void CMessageBrokerController::processInternalRequest(
response["id"] = message["id"];
response["jsonrpc"] = "2.0";
response["result"] = "OK";
- ws_session->sendJsonMessage(response);
+ ws_session.sendJsonMessage(response);
} else {
Json::Value error, err;
error["id"] = message["id"];
@@ -423,7 +424,7 @@ void CMessageBrokerController::processInternalRequest(
err["code"] = CONTROLLER_EXISTS;
err["message"] = "Subscribe has been already registered.";
error["error"] = err;
- ws_session->sendJsonMessage(error);
+ ws_session.sendJsonMessage(error);
}
} else {
Json::Value error, err;
@@ -432,7 +433,7 @@ void CMessageBrokerController::processInternalRequest(
err["code"] = INVALID_REQUEST;
err["message"] = "Wrong method parameter.";
error["error"] = err;
- ws_session->sendJsonMessage(error);
+ ws_session.sendJsonMessage(error);
}
} else if (methodName == "unregisterComponent") {
@@ -445,7 +446,7 @@ void CMessageBrokerController::processInternalRequest(
response["id"] = message["id"];
response["jsonrpc"] = "2.0";
response["result"] = "OK";
- ws_session->sendJsonMessage(response);
+ ws_session.sendJsonMessage(response);
} else {
Json::Value error, err;
error["id"] = message["id"];
@@ -453,7 +454,7 @@ void CMessageBrokerController::processInternalRequest(
err["code"] = INVALID_REQUEST;
err["message"] = "Wrong method parameter.";
error["error"] = err;
- ws_session->sendJsonMessage(error);
+ ws_session.sendJsonMessage(error);
}
} else if (methodName == "unsubscribeFrom") {
Json::Value params = message["params"];
@@ -464,7 +465,7 @@ void CMessageBrokerController::processInternalRequest(
response["id"] = message["id"];
response["jsonrpc"] = "2.0";
response["result"] = "OK";
- ws_session->sendJsonMessage(response);
+ ws_session.sendJsonMessage(response);
} else {
Json::Value error, err;
error["id"] = message["id"];
@@ -472,7 +473,7 @@ void CMessageBrokerController::processInternalRequest(
err["code"] = INVALID_REQUEST;
err["message"] = "Wrong method parameter.";
error["error"] = err;
- ws_session->sendJsonMessage(error);
+ ws_session.sendJsonMessage(error);
}
} else {
}
diff --git a/src/components/hmi_message_handler/src/websocket_session.cc b/src/components/hmi_message_handler/src/websocket_session.cc
index f2cb265770..a387b639a2 100644
--- a/src/components/hmi_message_handler/src/websocket_session.cc
+++ b/src/components/hmi_message_handler/src/websocket_session.cc
@@ -85,7 +85,7 @@ void WebsocketSession::Recv(boost::system::error_code ec) {
SDL_LOG_ERROR(str_err);
shutdown_ = true;
thread_delegate_->SetShutdown();
- controller_->deleteController(this);
+ controller_->deleteController(*this);
return;
}
@@ -126,7 +126,7 @@ void WebsocketSession::Read(boost::system::error_code ec,
SDL_LOG_ERROR(str_err);
shutdown_ = true;
thread_delegate_->SetShutdown();
- controller_->deleteController(this);
+ controller_->deleteController(*this);
buffer_.consume(buffer_.size());
return;
}
@@ -198,9 +198,9 @@ void WebsocketSession::onMessageReceived(Json::Value message) {
std::string component_name = GetComponentName(method);
if (component_name == "MB") {
- controller_->processInternalRequest(message, this);
+ controller_->processInternalRequest(message, *this);
} else {
- controller_->pushRequest(message, this);
+ controller_->pushRequest(message, *this);
controller_->processRequest(message);
}
}