summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_base.cpp')
-rw-r--r--implementation/routing/src/routing_manager_base.cpp84
1 files changed, 75 insertions, 9 deletions
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp
index e146d65..89a5843 100644
--- a/implementation/routing/src/routing_manager_base.cpp
+++ b/implementation/routing/src/routing_manager_base.cpp
@@ -20,12 +20,14 @@ routing_manager_base::routing_manager_base(routing_manager_host *_host) :
io_(host_->get_io()),
client_(host_->get_client()),
configuration_(host_->get_configuration()),
- serializer_(std::make_shared<serializer>()),
- deserializer_(std::make_shared<deserializer>())
+ serializer_(std::make_shared<serializer>())
#ifdef USE_DLT
, tc_(tc::trace_connector::get())
#endif
{
+ for (int i = 0; i < VSOMEIP_MAX_DESERIALIZER; ++i) {
+ deserializers_.push(std::make_shared<deserializer>());
+ }
}
routing_manager_base::~routing_manager_base() {
@@ -303,7 +305,7 @@ void routing_manager_base::subscribe(client_t _client, service_t _service,
= its_eventgroup->get_events();
for (auto e : its_events) {
if (e->is_field())
- e->notify_one(_client);
+ e->notify_one(_client, true); // TODO: use _flush to send all events together!
}
}
}
@@ -325,10 +327,11 @@ void routing_manager_base::unsubscribe(client_t _client, service_t _service,
}
void routing_manager_base::notify(service_t _service, instance_t _instance,
- event_t _event, std::shared_ptr<payload> _payload, bool _force) {
+ event_t _event, std::shared_ptr<payload> _payload,
+ bool _force, bool _flush) {
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
if (its_event) {
- its_event->set_payload(_payload, _force);
+ its_event->set_payload(_payload, _force, _flush);
} else {
VSOMEIP_WARNING << "Attempt to update the undefined event/field ["
<< std::hex << _service << "." << _instance << "." << _event
@@ -338,7 +341,7 @@ void routing_manager_base::notify(service_t _service, instance_t _instance,
void routing_manager_base::notify_one(service_t _service, instance_t _instance,
event_t _event, std::shared_ptr<payload> _payload,
- client_t _client, bool _force) {
+ client_t _client, bool _force, bool _flush) {
std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
if (its_event) {
// Event is valid for service/instance
@@ -354,7 +357,7 @@ void routing_manager_base::notify_one(service_t _service, instance_t _instance,
}
}
if (found_eventgroup) {
- its_event->set_payload(_payload, _client, _force);
+ its_event->set_payload(_payload, _client, _force, _flush);
}
} else {
VSOMEIP_WARNING << "Attempt to update the undefined event/field ["
@@ -757,11 +760,16 @@ bool routing_manager_base::send_local(
const byte_t *_data, uint32_t _size, instance_t _instance,
bool _flush, bool _reliable, uint8_t _command) const {
+ client_t sender = get_client();
+ size_t additional_size = 0;
+ if (_command == VSOMEIP_NOTIFY_ONE) {
+ additional_size +=sizeof(client_t);
+ }
std::vector<byte_t> its_command(
VSOMEIP_COMMAND_HEADER_SIZE + _size + sizeof(instance_t)
- + sizeof(bool) + sizeof(bool));
+ + sizeof(bool) + sizeof(bool) + additional_size);
its_command[VSOMEIP_COMMAND_TYPE_POS] = _command;
- std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &_client,
+ std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &sender,
sizeof(client_t));
std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &_size,
sizeof(_size));
@@ -773,6 +781,11 @@ bool routing_manager_base::send_local(
+ sizeof(instance_t)], &_flush, sizeof(bool));
std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size
+ sizeof(instance_t) + sizeof(bool)], &_reliable, sizeof(bool));
+ if (_command == VSOMEIP_NOTIFY_ONE) {
+ // Add target client
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size
+ + sizeof(instance_t) + sizeof(bool) + sizeof(bool)], &_client, sizeof(client_t));
+ }
return _target->send(&its_command[0], uint32_t(its_command.size()));
}
@@ -804,4 +817,57 @@ void routing_manager_base::on_clientendpoint_error(client_t _client) {
remove_local(_client);
}
+std::shared_ptr<deserializer> routing_manager_base::get_deserializer() {
+ std::unique_lock<std::mutex> its_lock(deserializer_mutex_);
+ while (deserializers_.empty()) {
+ VSOMEIP_INFO << std::hex << "client " << get_client() <<
+ "routing_manager_base::get_deserializer ~> all in use!";
+ deserializer_condition_.wait(its_lock);
+ VSOMEIP_INFO << std::hex << "client " << get_client() <<
+ "routing_manager_base::get_deserializer ~> wait finished!";
+ }
+ auto deserializer = deserializers_.front();
+ deserializers_.pop();
+ return deserializer;
+}
+
+void routing_manager_base::put_deserializer(std::shared_ptr<deserializer> _deserializer) {
+ {
+ std::lock_guard<std::mutex> its_lock(deserializer_mutex_);
+ deserializers_.push(_deserializer);
+ }
+ deserializer_condition_.notify_one();
+}
+
+#ifndef WIN32
+bool routing_manager_base::check_credentials(client_t _client, uid_t _uid, gid_t _gid) {
+ return configuration_->check_credentials(_client, _uid, _gid);
+}
+#endif
+
+void routing_manager_base::send_pending_subscriptions(service_t _service,
+ instance_t _instance, major_version_t _major) {
+ for (auto &ps : pending_subscriptions_) {
+ if (ps.service_ == _service &&
+ ps.instance_ == _instance && ps.major_ == _major) {
+ send_subscribe(client_, ps.service_, ps.instance_,
+ ps.eventgroup_, ps.major_, ps.subscription_type_);
+ }
+ }
+}
+
+void routing_manager_base::remove_pending_subscription(service_t _service,
+ instance_t _instance) {
+ auto it = pending_subscriptions_.begin();
+ while (it != pending_subscriptions_.end()) {
+ if (it->service_ == _service
+ && it->instance_ == _instance) {
+ break;
+ }
+ it++;
+ }
+ if (it != pending_subscriptions_.end()) pending_subscriptions_.erase(it);
+}
+
+
} // namespace vsomeip