summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYana Chernysheva (GitHub) <59469418+ychernysheva@users.noreply.github.com>2020-10-19 23:45:10 +0300
committerGitHub <noreply@github.com>2020-10-19 16:45:10 -0400
commit4304cc9c71f6d65688ec0c191b66f5c30ef64ca6 (patch)
treeb5512a3073d63a67f24d1ef23e2aee7876aba7bf
parentad32fca1735ff424f20c574cfde62395382f861a (diff)
downloadsdl_core-4304cc9c71f6d65688ec0c191b66f5c30ef64ca6.tar.gz
Fix synchronization in WaypointsPendingResumptionHandler (#3531)
There was a problem with duplicate subscriptions to shared data due to time gap between check for already subscribed apps in SDLRPCPlugin and further processing of corresponding requests in WayPointsPendingResumptionHandler. That's why this check was moved to WayPointsPendingResumptionHandler. Also refactoring of some methods WayPointsPendingResumptionHandler class was done to make their logic clearer and easier to understand.
-rw-r--r--src/components/application_manager/rpc_plugins/sdl_rpc_plugin/include/sdl_rpc_plugin/waypoints_pending_resumption_handler.h63
-rw-r--r--src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/sdl_rpc_plugin.cc8
-rw-r--r--src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/waypoints_pending_resumption_handler.cc204
3 files changed, 138 insertions, 137 deletions
diff --git a/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/include/sdl_rpc_plugin/waypoints_pending_resumption_handler.h b/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/include/sdl_rpc_plugin/waypoints_pending_resumption_handler.h
index 5dad6d07ac..871359bc9a 100644
--- a/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/include/sdl_rpc_plugin/waypoints_pending_resumption_handler.h
+++ b/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/include/sdl_rpc_plugin/waypoints_pending_resumption_handler.h
@@ -58,35 +58,50 @@ class WayPointsPendingResumptionHandler
private:
/**
* @brief RaiseFakeSuccessfulResponse raise event for the subscriber that
- * contains emulated successful response from HMI To avoid double subscription
- * WayPointsPendingResumptionHandler freezes sending requests to HMI. But
- * resumption_data_processor().SubscribeOnResponse() need to be called to
- * provide information that some data need to be resumed. So if pending
- * request exists, SDL creates preliminary requests to HMI, subscribe the
- * subscriber to this request but do not send it to HMI. It freezes this
- * requests to precess the one by one when a response to the current request
- * will be received When SDL receives a response from HMI it may satisfy some
- * frozen requests. If it does SDL will create faked HMI response(based on the
- * real one with corr_id replacement) and raise event. So that
- * subscriber::on_event will be called with an appropriate response to the
- * request that SDL was not sent to HMI.
- * @param response message that will be raised with corr_id replacement
- * @param corr_id correlation id that will be replaced in response to notify
- * the subscriber
+ * contains emulated successful response from HMI. To avoid double
+ * subscription WayPointsPendingResumptionHandler freezes sending requests to
+ * HMI. But resumption_data_processor().SubscribeOnResponse() need to be
+ * called to provide information that some data need to be resumed. So if
+ * pending request exists, SDL creates preliminary requests to HMI, subscribe
+ * the subscriber to this request but do not send it to HMI. It freezes this
+ * requests to process the one by one when a response to the current request
+ * will be received. When SDL receives a response from HMI it may satisfy some
+ * frozen requests. If it does SDL will create fake successful HMI
+ * response and raise event. So that subscriber::on_event will be called with
+ * an appropriate response to the request that SDL was not sent to HMI.
+ * @param corr_id correlation id of next pending request
*/
- void RaiseFakeSuccessfulResponse(smart_objects::SmartObject response,
- const int32_t corr_id);
+ void RaiseFakeSuccessfulResponse(const int32_t corr_id);
smart_objects::SmartObjectSPtr CreateSubscriptionRequest();
- struct ResumptionAwaitingHandling {
- const uint32_t app_id;
- WayPointsAppExtension& ext;
- resumption::ResumptionRequest request_to_send_;
+ /**
+ * @brief ProcessNextPendingResumption is responsible for processing of next
+ * pending request. If any application is already subscribed to waypoints,
+ * this method ensures that current application will be subscribed to
+ * waypoints too or send request to HMI otherwise
+ */
+ void ProcessNextPendingResumption();
+
+ struct PendingRequest {
+ explicit PendingRequest(const uint32_t app_id, const uint32_t corr_id)
+ : app_id_(app_id)
+ , corr_id_(corr_id)
+ , waiting_for_hmi_response_(false) {}
+ uint32_t app_id_;
+ uint32_t corr_id_;
+ bool waiting_for_hmi_response_;
};
- std::vector<ResumptionAwaitingHandling> frozen_resumptions_;
- std::map<uint32_t, smart_objects::SmartObject> pending_requests_;
- std::queue<uint32_t> app_ids_;
+ /**
+ * @brief SendPendingHMIRequest is responsible for creating and sending of
+ * next pending request to HMI. Also here this request is marked as waiting
+ * for response.
+ * @param pending_request Next pending request
+ */
+ void SendPendingHMIRequest(PendingRequest& pending_request);
+
+ std::deque<PendingRequest> pending_requests_;
+ sync_primitives::RecursiveLock pending_resumption_lock_;
};
} // namespace sdl_rpc_plugin
diff --git a/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/sdl_rpc_plugin.cc b/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/sdl_rpc_plugin.cc
index e01165269a..4756c8b9a2 100644
--- a/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/sdl_rpc_plugin.cc
+++ b/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/sdl_rpc_plugin.cc
@@ -115,14 +115,6 @@ void SDLRPCPlugin::ProcessResumptionSubscription(
application_manager::Application& app, WayPointsAppExtension& ext) {
SDL_LOG_AUTO_TRACE();
- if (application_manager_->IsAnyAppSubscribedForWayPoints()) {
- SDL_LOG_DEBUG(
- "Subscription to waypoint already exist, no need to send "
- "request to HMI");
- application_manager_->SubscribeAppForWayPoints(app.app_id());
- return;
- }
-
pending_resumption_handler_->HandleResumptionSubscriptionRequest(ext, app);
}
diff --git a/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/waypoints_pending_resumption_handler.cc b/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/waypoints_pending_resumption_handler.cc
index 8b5d117f54..7aa6dd5273 100644
--- a/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/waypoints_pending_resumption_handler.cc
+++ b/src/components/application_manager/rpc_plugins/sdl_rpc_plugin/src/waypoints_pending_resumption_handler.cc
@@ -36,6 +36,8 @@ namespace sdl_rpc_plugin {
SDL_CREATE_LOG_VARIABLE("SdlRPCPlugin")
+using hmi_apis::FunctionID::Navigation_SubscribeWayPoints;
+
WayPointsPendingResumptionHandler::WayPointsPendingResumptionHandler(
application_manager::ApplicationManager& application_manager)
: PendingResumptionHandler(application_manager) {}
@@ -43,9 +45,10 @@ WayPointsPendingResumptionHandler::WayPointsPendingResumptionHandler(
smart_objects::SmartObjectSPtr
WayPointsPendingResumptionHandler::CreateSubscriptionRequest() {
SDL_LOG_AUTO_TRACE();
+
auto subscribe_waypoints_msg =
application_manager::MessageHelper::CreateMessageForHMI(
- hmi_apis::FunctionID::Navigation_SubscribeWayPoints,
+ Navigation_SubscribeWayPoints,
application_manager_.GetNextHMICorrelationID());
(*subscribe_waypoints_msg)[application_manager::strings::params]
[application_manager::strings::message_type] =
@@ -53,40 +56,52 @@ WayPointsPendingResumptionHandler::CreateSubscriptionRequest() {
return subscribe_waypoints_msg;
}
+void WayPointsPendingResumptionHandler::SendPendingHMIRequest(
+ PendingRequest& pending_request) {
+ SDL_LOG_AUTO_TRACE();
+ using namespace application_manager;
+
+ SDL_LOG_DEBUG("Sending request with function id: "
+ << Navigation_SubscribeWayPoints
+ << " and correlation_id: " << pending_request.corr_id_);
+
+ auto request = MessageHelper::CreateMessageForHMI(
+ Navigation_SubscribeWayPoints, pending_request.corr_id_);
+ (*request)[strings::params][strings::message_type] =
+ hmi_apis::messageType::request;
+ subscribe_on_event(Navigation_SubscribeWayPoints, pending_request.corr_id_);
+ application_manager_.GetRPCService().ManageHMICommand(request);
+ pending_request.waiting_for_hmi_response_ = true;
+}
+
void WayPointsPendingResumptionHandler::HandleResumptionSubscriptionRequest(
application_manager::AppExtension& extension,
application_manager::Application& app) {
SDL_LOG_AUTO_TRACE();
- WayPointsAppExtension& ext = dynamic_cast<WayPointsAppExtension&>(extension);
- smart_objects::SmartObjectSPtr request = CreateSubscriptionRequest();
- smart_objects::SmartObject& request_ref = *request;
- const auto function_id = static_cast<hmi_apis::FunctionID::eType>(
- request_ref[application_manager::strings::params]
- [application_manager::strings::function_id]
- .asInt());
- const uint32_t corr_id =
- request_ref[application_manager::strings::params]
- [application_manager::strings::correlation_id]
- .asUInt();
+ using namespace application_manager;
+ sync_primitives::AutoLock lock(pending_resumption_lock_);
+ UNUSED(extension);
+
+ if (application_manager_.IsAnyAppSubscribedForWayPoints()) {
+ SDL_LOG_DEBUG(
+ "Subscription to waypoint already exist, no need to send "
+ "request to HMI");
+ application_manager_.SubscribeAppForWayPoints(app.app_id());
+ return;
+ }
+ const auto request = CreateSubscriptionRequest();
+ const auto corr_id =
+ (*request)[strings::params][strings::correlation_id].asInt();
auto resumption_request =
- MakeResumptionRequest(corr_id, function_id, *request);
- app_ids_.push(app.app_id());
+ MakeResumptionRequest(corr_id, Navigation_SubscribeWayPoints, *request);
- if (pending_requests_.empty()) {
- SDL_LOG_DEBUG("There are no pending requests for app_id: " << app.app_id());
- pending_requests_[corr_id] = request_ref;
- subscribe_on_event(function_id, corr_id);
- SDL_LOG_DEBUG("Sending request with function id: "
- << function_id << " and correlation_id: " << corr_id);
-
- application_manager_.GetRPCService().ManageHMICommand(request);
- } else {
- SDL_LOG_DEBUG("There are pending requests. Frozen resumption for app id "
- << app.app_id() << " corr id = " << corr_id);
- ResumptionAwaitingHandling frozen_res{
- app.app_id(), ext, resumption_request};
- frozen_resumptions_.push_back(frozen_res);
+ PendingRequest pending_request(app.app_id(), corr_id);
+ pending_requests_.push_back(pending_request);
+ SDL_LOG_DEBUG("Add to pending resumptins corr_id = " << corr_id);
+
+ if (pending_requests_.size() == 1) {
+ SendPendingHMIRequest(pending_requests_.front());
}
resumption_data_processor().SubscribeToResponse(app.app_id(),
resumption_request);
@@ -94,42 +109,32 @@ void WayPointsPendingResumptionHandler::HandleResumptionSubscriptionRequest(
void WayPointsPendingResumptionHandler::OnResumptionRevert() {
SDL_LOG_AUTO_TRACE();
- using namespace application_manager;
+ sync_primitives::AutoLock lock(pending_resumption_lock_);
- if (!pending_requests_.empty()) {
- SDL_LOG_DEBUG("Still waiting for some response");
+ if (pending_requests_.empty()) {
+ SDL_LOG_DEBUG("No pending resumptions");
return;
}
- if (!frozen_resumptions_.empty()) {
- ResumptionAwaitingHandling frozen_resumption = frozen_resumptions_.back();
- frozen_resumptions_.pop_back();
-
- auto request = std::make_shared<smart_objects::SmartObject>(
- frozen_resumption.request_to_send_.message);
- const uint32_t cid =
- (*request)[strings::params][strings::correlation_id].asUInt();
- const auto fid = static_cast<hmi_apis::FunctionID::eType>(
- (*request)[strings::params][strings::function_id].asInt());
-
- SDL_LOG_DEBUG("Subscribing for event with function id: "
- << fid << " correlation id: " << cid);
- subscribe_on_event(fid, cid);
- pending_requests_[cid] = *request;
- SDL_LOG_DEBUG("Sending request with fid: " << fid << " and cid: " << cid);
- application_manager_.GetRPCService().ManageHMICommand(request);
+ auto& pending_request = pending_requests_.front();
+ if (pending_request.waiting_for_hmi_response_) {
+ SDL_LOG_DEBUG("Pending resumption for "
+ << pending_request.app_id_
+ << " is already waiting for HMI response");
+ return;
}
+ SendPendingHMIRequest(pending_request);
}
void WayPointsPendingResumptionHandler::RaiseFakeSuccessfulResponse(
- ns_smart_device_link::ns_smart_objects::SmartObject response,
const int32_t corr_id) {
using namespace application_manager;
- response[strings::params][strings::correlation_id] = corr_id;
- auto fid = static_cast<hmi_apis::FunctionID::eType>(
- response[strings::params][strings::function_id].asInt());
- event_engine::Event event(fid);
- event.set_smart_object(response);
+
+ auto response = MessageHelper::CreateResponseMessageFromHmi(
+ Navigation_SubscribeWayPoints, corr_id, hmi_apis::Common_Result::SUCCESS);
+
+ event_engine::Event event(Navigation_SubscribeWayPoints);
+ event.set_smart_object(*response);
SDL_LOG_TRACE("Raise fake response for subscriber. corr_id : " << corr_id);
event.raise(application_manager_.event_dispatcher());
@@ -139,6 +144,14 @@ void WayPointsPendingResumptionHandler::on_event(
const application_manager::event_engine::Event& event) {
using namespace application_manager;
SDL_LOG_AUTO_TRACE();
+ sync_primitives::AutoLock lock(pending_resumption_lock_);
+
+ unsubscribe_from_event(Navigation_SubscribeWayPoints);
+
+ if (pending_requests_.empty()) {
+ SDL_LOG_DEBUG("Not waiting for any response");
+ return;
+ }
const smart_objects::SmartObject& response = event.smart_object();
const uint32_t corr_id = event.smart_object_correlation_id();
@@ -146,64 +159,45 @@ void WayPointsPendingResumptionHandler::on_event(
SDL_LOG_TRACE("Received event with function id: "
<< event.id() << " and correlation id: " << corr_id);
- smart_objects::SmartObject pending_request;
- if (pending_requests_.find(corr_id) == pending_requests_.end()) {
- SDL_LOG_ERROR("corr id " << corr_id << " NOT found");
- return;
- }
- pending_request = pending_requests_[corr_id];
- pending_requests_.erase(corr_id);
- if (app_ids_.empty()) {
- SDL_LOG_ERROR("app_ids is empty");
- return;
- }
- uint32_t app_id = app_ids_.front();
- app_ids_.pop();
- auto app = application_manager_.application(app_id);
+ auto current_pending = pending_requests_.front();
+ pending_requests_.pop_front();
+
+ auto app = application_manager_.application(current_pending.app_id_);
if (!app) {
- SDL_LOG_ERROR("Application not found " << app_id);
+ SDL_LOG_WARN("Application not found " << current_pending.app_id_);
return;
}
if (resumption::IsResponseSuccessful(response)) {
- SDL_LOG_DEBUG("Resumption of subscriptions is successful");
-
+ SDL_LOG_DEBUG("Resumption of waypoints is successful");
application_manager_.SubscribeAppForWayPoints(app);
+ }
+ ProcessNextPendingResumption();
+}
- for (auto& frozen_resumption : frozen_resumptions_) {
- auto corr_id = frozen_resumption.request_to_send_
- .message[strings::params][strings::correlation_id]
- .asInt();
- RaiseFakeSuccessfulResponse(response, corr_id);
- application_manager_.SubscribeAppForWayPoints(frozen_resumption.app_id);
- }
- frozen_resumptions_.clear();
- } else {
- SDL_LOG_DEBUG("Resumption of subscriptions is NOT successful");
-
- if (frozen_resumptions_.empty()) {
- SDL_LOG_DEBUG("frozen resumptions list is empty");
- return;
- }
-
- ResumptionAwaitingHandling frozen_resumption = frozen_resumptions_.back();
- frozen_resumptions_.pop_back();
- auto resumption_req = frozen_resumption.request_to_send_;
- const uint32_t cid =
- resumption_req.message[strings::params][strings::correlation_id]
- .asInt();
- const hmi_apis::FunctionID::eType fid =
- static_cast<hmi_apis::FunctionID::eType>(
- resumption_req.message[strings::params][strings::function_id]
- .asInt());
- subscribe_on_event(fid, cid);
- auto request =
- std::make_shared<smart_objects::SmartObject>(resumption_req.message);
- SDL_LOG_DEBUG("Subscribing for event with function id: "
- << fid << " correlation id: " << cid);
- pending_requests_[cid] = *request;
- SDL_LOG_DEBUG("Sending request with fid: " << fid << " and cid: " << cid);
- application_manager_.GetRPCService().ManageHMICommand(request);
+void WayPointsPendingResumptionHandler::ProcessNextPendingResumption() {
+ SDL_LOG_AUTO_TRACE();
+ if (pending_requests_.empty()) {
+ SDL_LOG_DEBUG("No more pending resumptions");
+ return;
+ }
+ auto& pending = pending_requests_.front();
+ if (pending.waiting_for_hmi_response_) {
+ SDL_LOG_DEBUG("Request was already sent to HMI for " << pending.app_id_);
+ return;
}
+
+ if (!application_manager_.IsAnyAppSubscribedForWayPoints()) {
+ SendPendingHMIRequest(pending);
+ return;
+ }
+
+ auto pending_copy = pending;
+ pending_requests_.pop_front();
+ auto app = application_manager_.application(pending_copy.app_id_);
+ application_manager_.SubscribeAppForWayPoints(app);
+ RaiseFakeSuccessfulResponse(pending_copy.corr_id_);
+ ProcessNextPendingResumption();
}
+
} // namespace sdl_rpc_plugin