diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_base.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_base.cpp | 84 |
1 files changed, 75 insertions, 9 deletions
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp index e146d65..89a5843 100644 --- a/implementation/routing/src/routing_manager_base.cpp +++ b/implementation/routing/src/routing_manager_base.cpp @@ -20,12 +20,14 @@ routing_manager_base::routing_manager_base(routing_manager_host *_host) : io_(host_->get_io()), client_(host_->get_client()), configuration_(host_->get_configuration()), - serializer_(std::make_shared<serializer>()), - deserializer_(std::make_shared<deserializer>()) + serializer_(std::make_shared<serializer>()) #ifdef USE_DLT , tc_(tc::trace_connector::get()) #endif { + for (int i = 0; i < VSOMEIP_MAX_DESERIALIZER; ++i) { + deserializers_.push(std::make_shared<deserializer>()); + } } routing_manager_base::~routing_manager_base() { @@ -303,7 +305,7 @@ void routing_manager_base::subscribe(client_t _client, service_t _service, = its_eventgroup->get_events(); for (auto e : its_events) { if (e->is_field()) - e->notify_one(_client); + e->notify_one(_client, true); // TODO: use _flush to send all events together! } } } @@ -325,10 +327,11 @@ void routing_manager_base::unsubscribe(client_t _client, service_t _service, } void routing_manager_base::notify(service_t _service, instance_t _instance, - event_t _event, std::shared_ptr<payload> _payload, bool _force) { + event_t _event, std::shared_ptr<payload> _payload, + bool _force, bool _flush) { std::shared_ptr<event> its_event = find_event(_service, _instance, _event); if (its_event) { - its_event->set_payload(_payload, _force); + its_event->set_payload(_payload, _force, _flush); } else { VSOMEIP_WARNING << "Attempt to update the undefined event/field [" << std::hex << _service << "." << _instance << "." << _event @@ -338,7 +341,7 @@ void routing_manager_base::notify(service_t _service, instance_t _instance, void routing_manager_base::notify_one(service_t _service, instance_t _instance, event_t _event, std::shared_ptr<payload> _payload, - client_t _client, bool _force) { + client_t _client, bool _force, bool _flush) { std::shared_ptr<event> its_event = find_event(_service, _instance, _event); if (its_event) { // Event is valid for service/instance @@ -354,7 +357,7 @@ void routing_manager_base::notify_one(service_t _service, instance_t _instance, } } if (found_eventgroup) { - its_event->set_payload(_payload, _client, _force); + its_event->set_payload(_payload, _client, _force, _flush); } } else { VSOMEIP_WARNING << "Attempt to update the undefined event/field [" @@ -757,11 +760,16 @@ bool routing_manager_base::send_local( const byte_t *_data, uint32_t _size, instance_t _instance, bool _flush, bool _reliable, uint8_t _command) const { + client_t sender = get_client(); + size_t additional_size = 0; + if (_command == VSOMEIP_NOTIFY_ONE) { + additional_size +=sizeof(client_t); + } std::vector<byte_t> its_command( VSOMEIP_COMMAND_HEADER_SIZE + _size + sizeof(instance_t) - + sizeof(bool) + sizeof(bool)); + + sizeof(bool) + sizeof(bool) + additional_size); its_command[VSOMEIP_COMMAND_TYPE_POS] = _command; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &_client, + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &sender, sizeof(client_t)); std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &_size, sizeof(_size)); @@ -773,6 +781,11 @@ bool routing_manager_base::send_local( + sizeof(instance_t)], &_flush, sizeof(bool)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size + sizeof(instance_t) + sizeof(bool)], &_reliable, sizeof(bool)); + if (_command == VSOMEIP_NOTIFY_ONE) { + // Add target client + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size + + sizeof(instance_t) + sizeof(bool) + sizeof(bool)], &_client, sizeof(client_t)); + } return _target->send(&its_command[0], uint32_t(its_command.size())); } @@ -804,4 +817,57 @@ void routing_manager_base::on_clientendpoint_error(client_t _client) { remove_local(_client); } +std::shared_ptr<deserializer> routing_manager_base::get_deserializer() { + std::unique_lock<std::mutex> its_lock(deserializer_mutex_); + while (deserializers_.empty()) { + VSOMEIP_INFO << std::hex << "client " << get_client() << + "routing_manager_base::get_deserializer ~> all in use!"; + deserializer_condition_.wait(its_lock); + VSOMEIP_INFO << std::hex << "client " << get_client() << + "routing_manager_base::get_deserializer ~> wait finished!"; + } + auto deserializer = deserializers_.front(); + deserializers_.pop(); + return deserializer; +} + +void routing_manager_base::put_deserializer(std::shared_ptr<deserializer> _deserializer) { + { + std::lock_guard<std::mutex> its_lock(deserializer_mutex_); + deserializers_.push(_deserializer); + } + deserializer_condition_.notify_one(); +} + +#ifndef WIN32 +bool routing_manager_base::check_credentials(client_t _client, uid_t _uid, gid_t _gid) { + return configuration_->check_credentials(_client, _uid, _gid); +} +#endif + +void routing_manager_base::send_pending_subscriptions(service_t _service, + instance_t _instance, major_version_t _major) { + for (auto &ps : pending_subscriptions_) { + if (ps.service_ == _service && + ps.instance_ == _instance && ps.major_ == _major) { + send_subscribe(client_, ps.service_, ps.instance_, + ps.eventgroup_, ps.major_, ps.subscription_type_); + } + } +} + +void routing_manager_base::remove_pending_subscription(service_t _service, + instance_t _instance) { + auto it = pending_subscriptions_.begin(); + while (it != pending_subscriptions_.end()) { + if (it->service_ == _service + && it->instance_ == _instance) { + break; + } + it++; + } + if (it != pending_subscriptions_.end()) pending_subscriptions_.erase(it); +} + + } // namespace vsomeip |