diff options
3 files changed, 141 insertions, 141 deletions
diff --git a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h index 6e0ad66199..2b7f9d6976 100644 --- a/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h +++ b/src/components/hmi_message_handler/include/hmi_message_handler/mb_controller.h @@ -66,8 +66,7 @@ enum ErrorCode { class WebsocketSession; -class CMessageBrokerController - : public std::enable_shared_from_this<CMessageBrokerController> { +class CMessageBrokerController { public: CMessageBrokerController(const std::string& address, uint16_t port, @@ -123,26 +122,26 @@ class CMessageBrokerController std::string GetComponentName(std::string& method); - void processInternalRequest(Json::Value& message, - WebsocketSession* ws_session); + void processInternalRequest(const Json::Value& message, + WebsocketSession& ws_session); - void pushRequest(Json::Value& message, WebsocketSession* ws_session); + bool pushRequest(Json::Value& message, WebsocketSession& ws_session); // Registry - bool addController(WebsocketSession* ws_session, std::string name); + bool addController(WebsocketSession& ws_session, const std::string& name); - void deleteController(WebsocketSession* ws_session); + void deleteController(WebsocketSession& ws_session); - void deleteController(std::string name); + void deleteController(const std::string& name); - void removeSubscribersBySession(const WebsocketSession* ws); + void removeSubscribersBySession(const WebsocketSession& ws); - bool addSubscriber(WebsocketSession* ws_session, std::string name); + bool addSubscriber(WebsocketSession& ws_session, const std::string& name); - void deleteSubscriber(WebsocketSession* ws, std::string name); + void deleteSubscriber(const WebsocketSession& ws, const std::string& name); - int getSubscribersFd(std::string name, - std::vector<WebsocketSession*>& result); + std::vector<std::shared_ptr<WebsocketSession> > getSubscribersFd( + const std::string& name); int getNextControllerId(); @@ -168,13 +167,13 @@ class CMessageBrokerController mConnectionList; sync_primitives::Lock mConnectionListLock; - std::map<std::string, WebsocketSession*> mControllersList; + std::map<std::string, std::weak_ptr<WebsocketSession> > mControllersList; sync_primitives::Lock mControllersListLock; - std::multimap<std::string, WebsocketSession*> mSubscribersList; + std::multimap<std::string, std::weak_ptr<WebsocketSession> > mSubscribersList; sync_primitives::Lock mSubscribersListLock; - std::map<std::string, WebsocketSession*> mRequestList; + std::map<std::string, std::weak_ptr<WebsocketSession> > mRequestList; sync_primitives::Lock mRequestListLock; std::atomic_bool shutdown_; diff --git a/src/components/hmi_message_handler/src/mb_controller.cc b/src/components/hmi_message_handler/src/mb_controller.cc index b0388a5bf2..bea05cde1a 100644 --- a/src/components/hmi_message_handler/src/mb_controller.cc +++ b/src/components/hmi_message_handler/src/mb_controller.cc @@ -137,15 +137,13 @@ bool CMessageBrokerController::isNotification(Json::Value& message) { void CMessageBrokerController::sendNotification(Json::Value& message) { std::string methodName = message["method"].asString(); - std::vector<WebsocketSession*> result; - int subscribersCount = getSubscribersFd(methodName, result); - if (0 < subscribersCount) { - std::vector<WebsocketSession*>::iterator it; - for (it = result.begin(); it != result.end(); ++it) { - (*it)->sendJsonMessage(message); - } - } else { - SDL_LOG_ERROR(("No subscribers for this property!\n")); + const auto result = getSubscribersFd(methodName); + if (result.empty()) { + SDL_LOG_ERROR("No subscribers for method: " << methodName); + } + + for (const auto& ws : result) { + ws->sendJsonMessage(message); } } @@ -159,15 +157,22 @@ bool CMessageBrokerController::isResponse(Json::Value& message) { } void CMessageBrokerController::sendResponse(Json::Value& message) { - std::map<std::string, WebsocketSession*>::iterator it; - sync_primitives::AutoLock request_lock(mRequestListLock); - + std::weak_ptr<WebsocketSession> weak_ws; std::string id = message["id"].asString(); - it = mRequestList.find(id); - if (it != mRequestList.end()) { - WebsocketSession* ws = it->second; + + { + sync_primitives::AutoLock request_lock(mRequestListLock); + const auto it = mRequestList.find(id); + if (it != mRequestList.end()) { + std::swap(weak_ws, it->second); + mRequestList.erase(it); + } + } + + if (auto ws = weak_ws.lock()) { ws->sendJsonMessage(message); - mRequestList.erase(it); + } else { + SDL_LOG_ERROR("A request is not found for id: " << id); } } @@ -181,16 +186,29 @@ void CMessageBrokerController::sendJsonMessage(Json::Value& message) { } // Send request - std::map<std::string, WebsocketSession*>::iterator it; + std::shared_ptr<WebsocketSession> ws; std::string method = message["method"].asString(); std::string component_name = GetComponentName(method); - sync_primitives::AutoLock lock(mControllersListLock); - it = mControllersList.find(component_name); - if (it != mControllersList.end()) { - WebsocketSession* ws = it->second; - ws->sendJsonMessage(message); + { + sync_primitives::AutoLock lock(mControllersListLock); + const auto it = mControllersList.find(component_name); + if (it != mControllersList.end()) { + ws = it->second.lock(); + if (!ws) { + // Clear expired + mControllersList.erase(it); + } + } } + + if (!ws) { + SDL_LOG_ERROR( + "A controller is not found for the method: " << component_name); + return; + } + + ws->sendJsonMessage(message); } void CMessageBrokerController::subscribeTo(std::string property) {} @@ -243,138 +261,121 @@ std::string CMessageBrokerController::GetComponentName(std::string& method) { return return_string; } -bool CMessageBrokerController::addController(WebsocketSession* ws_session, - std::string name) { - bool result = false; - std::map<std::string, WebsocketSession*>::iterator it; - +bool CMessageBrokerController::addController(WebsocketSession& ws_session, + const std::string& name) { sync_primitives::AutoLock lock(mControllersListLock); - it = mControllersList.find(name); - if (it == mControllersList.end()) { - mControllersList.insert( - std::map<std::string, WebsocketSession*>::value_type(name, ws_session)); - result = true; - } else { - SDL_LOG_ERROR(("Controller already exists!\n")); - } - return result; + return mControllersList.emplace(name, ws_session.shared_from_this()).second; } -void CMessageBrokerController::deleteController(WebsocketSession* ws_session) { - { - sync_primitives::AutoLock lock(mControllersListLock); - std::map<std::string, WebsocketSession*>::iterator it; - for (it = mControllersList.begin(); it != mControllersList.end();) { - if (it->second == ws_session) { - mControllersList.erase(it++); - } else { - ++it; - } +void CMessageBrokerController::deleteController(WebsocketSession& ws_session) { + removeSubscribersBySession(ws_session); + sync_primitives::AutoLock lock(mControllersListLock); + for (auto it = mControllersList.cbegin(); it != mControllersList.cend();) { + const std::shared_ptr<WebsocketSession> ws = it->second.lock(); + if (!ws || ws.get() == &ws_session) { + it = mControllersList.erase(it); + } else { + ++it; } } - removeSubscribersBySession(ws_session); } -void CMessageBrokerController::deleteController(std::string name) { - std::map<std::string, WebsocketSession*>::iterator it; - WebsocketSession* ws; +void CMessageBrokerController::deleteController(const std::string& name) { + std::weak_ptr<WebsocketSession> weak_ws; { sync_primitives::AutoLock lock(mControllersListLock); - it = mControllersList.find(name); + const auto it = mControllersList.find(name); if (it != mControllersList.end()) { - ws = it->second; + std::swap(weak_ws, it->second); mControllersList.erase(it); - } else { - return; } } - removeSubscribersBySession(ws); + + if (auto ws = weak_ws.lock()) { + removeSubscribersBySession(*ws); + } else { + SDL_LOG_ERROR("A controller is not found for the method: " << name); + } } void CMessageBrokerController::removeSubscribersBySession( - const WebsocketSession* ws) { + const WebsocketSession& ws) { sync_primitives::AutoLock lock(mSubscribersListLock); - std::multimap<std::string, WebsocketSession*>::iterator it_s = - mSubscribersList.begin(); - for (; it_s != mSubscribersList.end();) { - if (it_s->second == ws) { - mSubscribersList.erase(it_s++); + for (auto it_s = mSubscribersList.cbegin(); + it_s != mSubscribersList.cend();) { + auto ws_session = it_s->second.lock(); + if (!ws_session || ws_session.get() == &ws) { + it_s = mSubscribersList.erase(it_s); } else { ++it_s; } } } -void CMessageBrokerController::pushRequest(Json::Value& message, - WebsocketSession* ws_session) { - sync_primitives::AutoLock lock(mRequestListLock); +bool CMessageBrokerController::pushRequest(Json::Value& message, + WebsocketSession& ws_session) { std::string id = message["id"].asString(); - mRequestList.insert( - std::map<std::string, WebsocketSession*>::value_type(id, ws_session)); + sync_primitives::AutoLock lock(mRequestListLock); + return mRequestList.emplace(std::move(id), ws_session.shared_from_this()) + .second; } -bool CMessageBrokerController::addSubscriber(WebsocketSession* ws_session, - std::string name) { - bool result = true; +bool CMessageBrokerController::addSubscriber(WebsocketSession& ws_session, + const std::string& name) { sync_primitives::AutoLock lock(mSubscribersListLock); - std::pair<std::multimap<std::string, WebsocketSession*>::iterator, - std::multimap<std::string, WebsocketSession*>::iterator> - p = mSubscribersList.equal_range(name); - if (p.first != p.second) { - std::multimap<std::string, WebsocketSession*>::iterator itr; - for (itr = p.first; itr != p.second; ++itr) { - if (ws_session == itr->second) { - result = false; - SDL_LOG_ERROR(("Subscriber already exists!\n")); - } + auto p = mSubscribersList.equal_range(name); + for (auto it = p.first; it != p.second;) { + const std::shared_ptr<WebsocketSession> ws = it->second.lock(); + if (!ws) { + // Clear expired + it = mSubscribersList.erase(it); + } else if (ws.get() == &ws_session) { + // Found an element: {name, ws_session} + return false; + } else { + ++it; } } - if (result) { - mSubscribersList.insert( - std::map<std::string, WebsocketSession*>::value_type(name, ws_session)); - } - return result; + + // Not found an element: {name, ws_session} + mSubscribersList.emplace_hint(p.second, name, ws_session.shared_from_this()); + return true; } -void CMessageBrokerController::deleteSubscriber(WebsocketSession* ws, - std::string name) { +void CMessageBrokerController::deleteSubscriber(const WebsocketSession& ws, + const std::string& name) { sync_primitives::AutoLock lock(mSubscribersListLock); - std::pair<std::multimap<std::string, WebsocketSession*>::iterator, - std::multimap<std::string, WebsocketSession*>::iterator> - p = mSubscribersList.equal_range(name); - if (p.first != p.second) { - std::multimap<std::string, WebsocketSession*>::iterator itr; - for (itr = p.first; itr != p.second;) { - if (ws == itr->second) { - mSubscribersList.erase(itr++); - } else { - ++itr; - } + auto p = mSubscribersList.equal_range(name); + for (auto it = p.first; it != p.second;) { + const std::shared_ptr<WebsocketSession> ws_session = it->second.lock(); + if (!ws_session || &ws == ws_session.get()) { + it = mSubscribersList.erase(it); + } else { + ++it; } } } -int CMessageBrokerController::getSubscribersFd( - std::string name, std::vector<WebsocketSession*>& result) { - int res = 0; - +std::vector<std::shared_ptr<WebsocketSession> > +CMessageBrokerController::getSubscribersFd(const std::string& name) { + std::vector<std::shared_ptr<WebsocketSession> > result; sync_primitives::AutoLock lock(mSubscribersListLock); - std::pair<std::multimap<std::string, WebsocketSession*>::iterator, - std::multimap<std::string, WebsocketSession*>::iterator> - p = mSubscribersList.equal_range(name); - if (p.first != p.second) { - std::multimap<std::string, WebsocketSession*>::iterator itr; - for (itr = p.first; itr != p.second; ++itr) { - result.push_back(itr->second); + auto p = mSubscribersList.equal_range(name); + for (auto it = p.first; it != p.second;) { + if (std::shared_ptr<WebsocketSession> ws = it->second.lock()) { + result.push_back(std::move(ws)); + ++it; + } else { + // Clear expired + it = mSubscribersList.erase(it); } } - res = result.size(); - return res; + return result; } void CMessageBrokerController::processInternalRequest( - Json::Value& message, WebsocketSession* ws_session) { + const Json::Value& message, WebsocketSession& ws_session) { std::string method = message["method"].asString(); std::string methodName = getMethodName(method); if (methodName == "registerComponent") { @@ -387,7 +388,7 @@ void CMessageBrokerController::processInternalRequest( response["id"] = message["id"]; response["jsonrpc"] = "2.0"; response["result"] = getNextControllerId(); - ws_session->sendJsonMessage(response); + ws_session.sendJsonMessage(response); } else { Json::Value error, err; error["id"] = message["id"]; @@ -395,7 +396,7 @@ void CMessageBrokerController::processInternalRequest( err["code"] = CONTROLLER_EXISTS; err["message"] = "Controller has been already registered."; error["error"] = err; - ws_session->sendJsonMessage(error); + ws_session.sendJsonMessage(error); } } else { Json::Value error, err; @@ -404,7 +405,7 @@ void CMessageBrokerController::processInternalRequest( err["code"] = INVALID_REQUEST; err["message"] = "Wrong method parameter."; error["error"] = err; - ws_session->sendJsonMessage(error); + ws_session.sendJsonMessage(error); } } else if (methodName == "subscribeTo") { Json::Value params = message["params"]; @@ -415,7 +416,7 @@ void CMessageBrokerController::processInternalRequest( response["id"] = message["id"]; response["jsonrpc"] = "2.0"; response["result"] = "OK"; - ws_session->sendJsonMessage(response); + ws_session.sendJsonMessage(response); } else { Json::Value error, err; error["id"] = message["id"]; @@ -423,7 +424,7 @@ void CMessageBrokerController::processInternalRequest( err["code"] = CONTROLLER_EXISTS; err["message"] = "Subscribe has been already registered."; error["error"] = err; - ws_session->sendJsonMessage(error); + ws_session.sendJsonMessage(error); } } else { Json::Value error, err; @@ -432,7 +433,7 @@ void CMessageBrokerController::processInternalRequest( err["code"] = INVALID_REQUEST; err["message"] = "Wrong method parameter."; error["error"] = err; - ws_session->sendJsonMessage(error); + ws_session.sendJsonMessage(error); } } else if (methodName == "unregisterComponent") { @@ -445,7 +446,7 @@ void CMessageBrokerController::processInternalRequest( response["id"] = message["id"]; response["jsonrpc"] = "2.0"; response["result"] = "OK"; - ws_session->sendJsonMessage(response); + ws_session.sendJsonMessage(response); } else { Json::Value error, err; error["id"] = message["id"]; @@ -453,7 +454,7 @@ void CMessageBrokerController::processInternalRequest( err["code"] = INVALID_REQUEST; err["message"] = "Wrong method parameter."; error["error"] = err; - ws_session->sendJsonMessage(error); + ws_session.sendJsonMessage(error); } } else if (methodName == "unsubscribeFrom") { Json::Value params = message["params"]; @@ -464,7 +465,7 @@ void CMessageBrokerController::processInternalRequest( response["id"] = message["id"]; response["jsonrpc"] = "2.0"; response["result"] = "OK"; - ws_session->sendJsonMessage(response); + ws_session.sendJsonMessage(response); } else { Json::Value error, err; error["id"] = message["id"]; @@ -472,7 +473,7 @@ void CMessageBrokerController::processInternalRequest( err["code"] = INVALID_REQUEST; err["message"] = "Wrong method parameter."; error["error"] = err; - ws_session->sendJsonMessage(error); + ws_session.sendJsonMessage(error); } } else { } diff --git a/src/components/hmi_message_handler/src/websocket_session.cc b/src/components/hmi_message_handler/src/websocket_session.cc index f2cb265770..a387b639a2 100644 --- a/src/components/hmi_message_handler/src/websocket_session.cc +++ b/src/components/hmi_message_handler/src/websocket_session.cc @@ -85,7 +85,7 @@ void WebsocketSession::Recv(boost::system::error_code ec) { SDL_LOG_ERROR(str_err); shutdown_ = true; thread_delegate_->SetShutdown(); - controller_->deleteController(this); + controller_->deleteController(*this); return; } @@ -126,7 +126,7 @@ void WebsocketSession::Read(boost::system::error_code ec, SDL_LOG_ERROR(str_err); shutdown_ = true; thread_delegate_->SetShutdown(); - controller_->deleteController(this); + controller_->deleteController(*this); buffer_.consume(buffer_.size()); return; } @@ -198,9 +198,9 @@ void WebsocketSession::onMessageReceived(Json::Value message) { std::string component_name = GetComponentName(method); if (component_name == "MB") { - controller_->processInternalRequest(message, this); + controller_->processInternalRequest(message, *this); } else { - controller_->pushRequest(message, this); + controller_->pushRequest(message, *this); controller_->processRequest(message); } } |