diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_stub.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_stub.cpp | 340 |
1 files changed, 244 insertions, 96 deletions
diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp index c048fe6..4146f59 100644 --- a/implementation/routing/src/routing_manager_stub.cpp +++ b/implementation/routing/src/routing_manager_stub.cpp @@ -95,8 +95,11 @@ void routing_manager_stub::start() { } void routing_manager_stub::stop() { - client_registration_running_ = false; - client_registration_condition_.notify_one(); + { + std::lock_guard<std::mutex> its_lock(client_registration_mutex_); + client_registration_running_ = false; + client_registration_condition_.notify_one(); + } if (client_registration_thread_->joinable()) { client_registration_thread_->join(); } @@ -134,7 +137,7 @@ void routing_manager_stub::stop() { { std::lock_guard<std::mutex> its_lock(routing_info_mutex_); - broadcast_routing_info(true); + broadcast_routing_stop(); } } @@ -308,14 +311,16 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, sizeof(its_eventgroup)); std::memcpy(&its_major, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], sizeof(its_major)); - std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 7], + std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 7], + sizeof(its_event)); + std::memcpy(&is_remote_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 9], sizeof(is_remote_subscriber)); - std::memcpy(&its_subscription_type, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8], + std::memcpy(&its_subscription_type, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 10], sizeof(its_subscription_type)); if (configuration_->is_client_allowed(its_client, its_service, its_instance)) { - host_->subscribe(its_client, its_service, - its_instance, its_eventgroup, its_major, its_subscription_type); + host_->subscribe(its_client, its_service, its_instance, + its_eventgroup, its_major, its_event, its_subscription_type); } else { VSOMEIP_WARNING << "Security: Client " << std::hex << its_client << " subscribes to service/instance " @@ -335,8 +340,11 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, sizeof(its_instance)); std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4], sizeof(its_eventgroup)); + std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], + sizeof(its_event)); + host_->unsubscribe(its_client, its_service, - its_instance, its_eventgroup); + its_instance, its_eventgroup, its_event); break; case VSOMEIP_SUBSCRIBE_ACK: @@ -352,12 +360,16 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, sizeof(its_eventgroup)); std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], sizeof(its_subscriber)); - host_->on_subscribe_ack(its_subscriber, its_service, its_instance, its_eventgroup); + std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8], + sizeof(its_event)); + host_->on_subscribe_ack(its_subscriber, its_service, + its_instance, its_eventgroup, its_event); VSOMEIP_INFO << "SUBSCRIBE ACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." - << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]"; + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "." + << std::hex << std::setw(4) << std::setfill('0') << its_event << "]"; break; case VSOMEIP_SUBSCRIBE_NACK: @@ -373,12 +385,16 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, sizeof(its_eventgroup)); std::memcpy(&its_subscriber, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6], sizeof(its_subscriber)); - host_->on_subscribe_nack(its_subscriber, its_service, its_instance, its_eventgroup); + std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8], + sizeof(its_event)); + host_->on_subscribe_nack(its_subscriber, its_service, + its_instance, its_eventgroup, its_event); VSOMEIP_INFO << "SUBSCRIBE NACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << "): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_instance << "." - << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]"; + << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "." + << std::hex << std::setw(4) << std::setfill('0') << its_event << "]"; break; case VSOMEIP_SEND: { @@ -488,6 +504,7 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2], sizeof(its_instance)); + host_->release_service(its_client, its_service, its_instance); break; @@ -587,8 +604,6 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, case VSOMEIP_REGISTERED_ACK: VSOMEIP_INFO << "REGISTERED_ACK(" << std::hex << std::setw(4) << std::setfill('0') << its_client << ")"; - std::lock_guard<std::mutex> its_guard(routing_info_mutex_); - broadcast_routing_info(false, its_client); break; } } @@ -660,12 +675,30 @@ void routing_manager_stub::client_registration_func(void) { // endpoint error to avoid writing in an already closed socket if (b != registration_type_e::DEREGISTER_ERROR_CASE) { std::lock_guard<std::mutex> its_guard(routing_info_mutex_); - send_routing_info(r.first); + send_routing_info_delta(r.first, + b == registration_type_e::REGISTER ? + routing_info_entry_e::RIE_ADD_CLIENT : + routing_info_entry_e::RIE_DEL_CLIENT, + r.first); } if (b != registration_type_e::REGISTER) { { std::lock_guard<std::mutex> its_guard(routing_info_mutex_); - broadcast_routing_info(false, r.first); + auto its_connection = connection_matrix_.find(r.first); + if (its_connection != connection_matrix_.end()) { + for (auto its_client : its_connection->second) { + if (its_client != r.first && + its_client != VSOMEIP_ROUTING_CLIENT) { + send_routing_info_delta(its_client, + routing_info_entry_e::RIE_DEL_CLIENT, r.first); + } + } + connection_matrix_.erase(r.first); + } + for (auto its_client : connection_matrix_) { + connection_matrix_[its_client.first].erase(r.first); + } + service_requests_.erase(r.first); } host_->remove_local(r.first); } @@ -764,7 +797,8 @@ void routing_manager_stub::on_offer_service(client_t _client, configuration_->is_offer_allowed(_client, _service, _instance)) { std::lock_guard<std::mutex> its_guard(routing_info_mutex_); routing_info_[_client].second[_service][_instance] = std::make_pair(_major, _minor); - broadcast_routing_info(); + inform_requesters(_client, _service, _instance, _major, _minor, + routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE, true); } else { VSOMEIP_WARNING << std::hex << "Security: Client 0x" << _client << " isn't allow to offer the following service/instance " @@ -788,26 +822,30 @@ void routing_manager_stub::on_stop_offer_service(client_t _client, if (0 == found_service->second.size()) { found_client->second.second.erase(_service); } - broadcast_routing_info(); + inform_requesters(_client, _service, _instance, _major, _minor, + routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE, false); } else if( _major == DEFAULT_MAJOR && _minor == DEFAULT_MINOR) { found_service->second.erase(_instance); if (0 == found_service->second.size()) { found_client->second.second.erase(_service); } - broadcast_routing_info(); + inform_requesters(_client, _service, _instance, _major, _minor, + routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE, false); } } } } } -void routing_manager_stub::send_routing_info(client_t _client, bool _empty) { - std::shared_ptr<endpoint> its_endpoint = host_->find_local(_client); +void routing_manager_stub::send_routing_info_delta(client_t _target, + routing_info_entry_e _entry, + client_t _client, service_t _service, instance_t _instance, + major_version_t _major, minor_version_t _minor) { + std::shared_ptr<endpoint> its_endpoint = host_->find_local(_target); if (its_endpoint) { - // Create the command vector & reserve some bytes initially.. - // ..to avoid reallocation for smaller messages! + connection_matrix_[_target].insert(_client); + std::vector<byte_t> its_command; - its_command.reserve(routingCommandSize_); // Routing command its_command.push_back(VSOMEIP_ROUTING_INFO); @@ -825,77 +863,65 @@ void routing_manager_stub::send_routing_info(client_t _client, bool _empty) { its_command.push_back(size_placeholder); } - // Routing info loop - for (auto &info : routing_info_) { - if (_empty) { - its_command.push_back(0x0); - break; + // Routing Info State Change + for (uint32_t i = 0; i < sizeof(routing_info_entry_e); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&_entry)[i]); + } + + std::size_t its_size_pos = its_command.size(); + std::size_t its_entry_size = its_command.size(); + + // Client size placeholder + byte_t placeholder = 0x0; + for (uint32_t i = 0; i < sizeof(uint32_t); ++i) { + its_command.push_back(placeholder); + } + // Client + for (uint32_t i = 0; i < sizeof(client_t); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&_client)[i]); + } + + if (_entry == routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE || + _entry == routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE) { + //Service + uint32_t its_service_entry_size = uint32_t(sizeof(service_t) + + sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t)); + for (uint32_t i = 0; i < sizeof(its_service_entry_size); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&its_service_entry_size)[i]); } - std::size_t its_size_pos = its_command.size(); - std::size_t its_entry_size = its_command.size(); - // Client size placeholder - byte_t placeholder = 0x0; - for (uint32_t i = 0; i < sizeof(uint32_t); ++i) { - its_command.push_back(placeholder); + for (uint32_t i = 0; i < sizeof(service_t); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&_service)[i]); } - // Client - for (uint32_t i = 0; i < sizeof(client_t); ++i) { - its_command.push_back( - reinterpret_cast<const byte_t*>(&info.first)[i]); + // Instance + for (uint32_t i = 0; i < sizeof(instance_t); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&_instance)[i]); } - // Iterate over all services - for (auto &service : info.second.second) { - // Service entry size - uint32_t its_service_entry_size = uint32_t(sizeof(service_t) - + service.second.size() * (sizeof(instance_t) - + sizeof(major_version_t) + sizeof(minor_version_t))); - for (uint32_t i = 0; i < sizeof(its_service_entry_size); ++i) { - its_command.push_back( - reinterpret_cast<const byte_t*>(&its_service_entry_size)[i]); - } - // Service - for (uint32_t i = 0; i < sizeof(service_t); ++i) { - its_command.push_back( - reinterpret_cast<const byte_t*>(&service.first)[i]); - } - // Iterate over all instances - for (auto &instance : service.second) { - // Instance - for (uint32_t i = 0; i < sizeof(instance_t); ++i) { - its_command.push_back( - reinterpret_cast<const byte_t*>(&instance)[i]); - } - // Major version - for (uint32_t i = 0; i < sizeof(major_version_t); ++i) { - its_command.push_back( - reinterpret_cast<const byte_t*>(&instance.second.first)[i]); - } - // Minor version - for (uint32_t i = 0; i < sizeof(minor_version_t); ++i) { - its_command.push_back( - reinterpret_cast<const byte_t*>(&instance.second.second)[i]); - } - } + // Major version + for (uint32_t i = 0; i < sizeof(major_version_t); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&_major)[i]); + } + // Minor version + for (uint32_t i = 0; i < sizeof(minor_version_t); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&_minor)[i]); } - // File client size - its_entry_size = its_command.size() - its_entry_size - uint32_t(sizeof(uint32_t)); - std::memcpy(&its_command[its_size_pos], &its_entry_size, sizeof(uint32_t)); } + // File client size + its_entry_size = its_command.size() - its_entry_size - uint32_t(sizeof(uint32_t)); + std::memcpy(&its_command[its_size_pos], &its_entry_size, sizeof(uint32_t)); + // File overall size std::size_t its_size = its_command.size() - VSOMEIP_COMMAND_PAYLOAD_POS; - if (_empty) { - its_size = 1; // Indicates stopping routing! - } std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(uint32_t)); its_size += VSOMEIP_COMMAND_PAYLOAD_POS; - // Double init size until it fits into the actual size for next run - size_t newInitSize; - for (newInitSize = VSOMEIP_ROUTING_INFO_SIZE_INIT; - newInitSize < its_size; newInitSize *= 2); - routingCommandSize_ = newInitSize; - #if 0 std::stringstream msg; msg << "rms::send_routing_info "; @@ -914,12 +940,70 @@ void routing_manager_stub::send_routing_info(client_t _client, bool _empty) { } } -void routing_manager_stub::broadcast_routing_info(bool _empty, client_t _ignore) { +void routing_manager_stub::inform_requesters(client_t _hoster, service_t _service, + instance_t _instance, major_version_t _major, minor_version_t _minor, + routing_info_entry_e _entry, bool _inform_service) { + for (auto its_client : service_requests_) { + auto its_service = its_client.second.find(_service); + if (its_service != its_client.second.end()) { + bool send(false); + for (auto its_instance : its_service->second) { + if (its_instance.first == ANY_INSTANCE || + its_instance.first == _instance) { + send = true; + } + } + if (send) { + if (_inform_service) { + if (_hoster != VSOMEIP_ROUTING_CLIENT && + _hoster != host_->get_client()) { + if (!is_already_connected(_hoster, its_client.first)) { + send_routing_info_delta(_hoster, + routing_info_entry_e::RIE_ADD_CLIENT, + its_client.first); + } + } + } + send_routing_info_delta(its_client.first, _entry, _hoster, + _service, _instance, _major, _minor); + } + } + } +} + +bool routing_manager_stub::is_already_connected(client_t _source, client_t _sink) { + return connection_matrix_[_source].find(_sink) != connection_matrix_[_source].end(); +} + +void routing_manager_stub::broadcast_routing_stop() { + std::vector<byte_t> its_command; + + // Routing command + its_command.push_back(VSOMEIP_ROUTING_INFO); + + // Sender client + client_t client = get_client(); + for (uint32_t i = 0; i < sizeof(client_t); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&client)[i]); + } + + // Overall size ~> 1 indicates routing stop + uint32_t size = 0x1; + for (uint32_t i = 0; i < sizeof(uint32_t); ++i) { + its_command.push_back( + reinterpret_cast<const byte_t*>(&size)[i]); + } + + // Stop Placeholder + its_command.push_back(0x0); + for (auto& info : routing_info_) { - if (info.first != VSOMEIP_ROUTING_CLIENT && - info.first != host_->get_client() && - info.first != _ignore) { - send_routing_info(info.first, _empty); + if (info.first != VSOMEIP_ROUTING_CLIENT && info.first != host_->get_client()) { + std::shared_ptr<endpoint> its_endpoint = host_->find_local(info.first); + if (its_endpoint) { + its_endpoint->send(&its_command[0], uint32_t(its_command.size()), true); + } } } } @@ -939,7 +1023,8 @@ void routing_manager_stub::broadcast(const std::vector<byte_t> &_command) const void routing_manager_stub::send_subscribe(std::shared_ptr<vsomeip::endpoint> _target, client_t _client, service_t _service, instance_t _instance, - eventgroup_t _eventgroup, major_version_t _major, bool _is_remote_subscriber) { + eventgroup_t _eventgroup, major_version_t _major, + event_t _event, bool _is_remote_subscriber) { if (_target) { byte_t its_command[VSOMEIP_SUBSCRIBE_COMMAND_SIZE]; uint32_t its_size = VSOMEIP_SUBSCRIBE_COMMAND_SIZE @@ -956,10 +1041,12 @@ void routing_manager_stub::send_subscribe(std::shared_ptr<vsomeip::endpoint> _ta std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = _major; - its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7] = _is_remote_subscriber; + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7], &_event, + sizeof(_event)); + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9] = _is_remote_subscriber; // set byte for subscription_type to zero. It's only used // in subscribe messages sent from rm_proxies to rm_stub. - its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8] = 0x0; + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 10] = 0x0; _target->send(its_command, sizeof(its_command)); } @@ -967,7 +1054,7 @@ void routing_manager_stub::send_subscribe(std::shared_ptr<vsomeip::endpoint> _ta void routing_manager_stub::send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _target, client_t _client, service_t _service, instance_t _instance, - eventgroup_t _eventgroup, bool _is_remote_subscriber) { + eventgroup_t _eventgroup, event_t _event, bool _is_remote_subscriber) { if (_target) { byte_t its_command[VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE]; uint32_t its_size = VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE @@ -983,7 +1070,9 @@ void routing_manager_stub::send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _ sizeof(_instance)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, sizeof(_eventgroup)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_is_remote_subscriber, + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_event, + sizeof(_event)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_is_remote_subscriber, sizeof(_is_remote_subscriber)); _target->send(its_command, sizeof(its_command)); @@ -991,7 +1080,7 @@ void routing_manager_stub::send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _ } void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup) { + instance_t _instance, eventgroup_t _eventgroup, event_t _event) { std::shared_ptr<endpoint> its_endpoint = host_->find_local(_client); if (its_endpoint) { @@ -1013,13 +1102,15 @@ void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _servi sizeof(_eventgroup)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_client, sizeof(_client)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_event, + sizeof(_event)); its_endpoint->send(&its_command[0], sizeof(its_command), true); } } void routing_manager_stub::send_subscribe_nack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup) { + instance_t _instance, eventgroup_t _eventgroup, event_t _event) { std::shared_ptr<endpoint> its_endpoint = host_->find_local(_client); if (its_endpoint) { @@ -1041,6 +1132,8 @@ void routing_manager_stub::send_subscribe_nack(client_t _client, service_t _serv sizeof(_eventgroup)); std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_client, sizeof(_client)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_event, + sizeof(_event)); its_endpoint->send(&its_command[0], sizeof(its_command), true); } @@ -1355,6 +1448,61 @@ client_t routing_manager_stub::get_client() const { return host_->get_client(); } +void routing_manager_stub::on_request_service(client_t _client, service_t _service, + instance_t _instance, major_version_t _major, + minor_version_t _minor) { + + std::lock_guard<std::mutex> its_guard(routing_info_mutex_); + service_requests_[_client][_service][_instance] = std::make_pair(_major, _minor); + + for (auto found_client : routing_info_) { + auto found_service = found_client.second.second.find(_service); + if (found_service != found_client.second.second.end()) { + if (_instance == ANY_INSTANCE) { + if (found_client.first != VSOMEIP_ROUTING_CLIENT && + found_client.first != host_->get_client()) { + if (!is_already_connected(found_client.first, _client)) { + send_routing_info_delta(found_client.first, + routing_info_entry_e::RIE_ADD_CLIENT, _client); + } + } + if (_client != VSOMEIP_ROUTING_CLIENT && + _client != host_->get_client()) { + for (auto instance : found_service->second) { + send_routing_info_delta(_client, + routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE, + found_client.first, _service, instance.first, + instance.second.first, instance.second.second); + } + } + + break; + } else { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + if (found_client.first != VSOMEIP_ROUTING_CLIENT && + found_client.first != host_->get_client()) { + if (!is_already_connected(found_client.first, _client)) { + send_routing_info_delta(found_client.first, + routing_info_entry_e::RIE_ADD_CLIENT, _client); + } + } + if (_client != VSOMEIP_ROUTING_CLIENT && + _client != host_->get_client()) { + send_routing_info_delta(_client, + routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE, + found_client.first, _service, _instance, + found_instance->second.first, + found_instance->second.second); + } + + break; + } + } + } + } +} + #ifndef _WIN32 bool routing_manager_stub::check_credentials(client_t _client, uid_t _uid, gid_t _gid) { return configuration_->check_credentials(_client, _uid, _gid); |