summaryrefslogtreecommitdiff
path: root/implementation
diff options
context:
space:
mode:
Diffstat (limited to 'implementation')
-rw-r--r--implementation/configuration/include/client.hpp6
-rw-r--r--implementation/configuration/include/configuration.hpp7
-rw-r--r--implementation/configuration/include/configuration_impl.hpp27
-rw-r--r--implementation/configuration/include/internal.hpp.in9
-rw-r--r--implementation/configuration/include/internal_android.hpp9
-rw-r--r--implementation/configuration/src/configuration_impl.cpp309
-rw-r--r--implementation/endpoints/include/client_endpoint_impl.hpp4
-rw-r--r--implementation/endpoints/include/endpoint.hpp1
-rw-r--r--implementation/endpoints/include/endpoint_host.hpp1
-rw-r--r--implementation/endpoints/include/endpoint_impl.hpp1
-rw-r--r--implementation/endpoints/include/endpoint_manager_base.hpp1
-rw-r--r--implementation/endpoints/include/endpoint_manager_impl.hpp13
-rw-r--r--implementation/endpoints/include/local_client_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/local_server_endpoint_impl.hpp1
-rw-r--r--implementation/endpoints/include/server_endpoint_impl.hpp7
-rw-r--r--implementation/endpoints/include/tcp_client_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/tcp_server_endpoint_impl.hpp1
-rw-r--r--implementation/endpoints/include/udp_client_endpoint_impl.hpp7
-rw-r--r--implementation/endpoints/include/udp_server_endpoint_impl.hpp1
-rw-r--r--implementation/endpoints/include/virtual_server_endpoint_impl.hpp1
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp54
-rw-r--r--implementation/endpoints/src/endpoint_manager_base.cpp9
-rw-r--r--implementation/endpoints/src/endpoint_manager_impl.cpp190
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp13
-rw-r--r--implementation/endpoints/src/local_server_endpoint_impl.cpp4
-rw-r--r--implementation/endpoints/src/server_endpoint_impl.cpp39
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp247
-rw-r--r--implementation/endpoints/src/tcp_server_endpoint_impl.cpp51
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp263
-rw-r--r--implementation/endpoints/src/udp_server_endpoint_impl.cpp121
-rw-r--r--implementation/endpoints/src/virtual_server_endpoint_impl.cpp4
-rw-r--r--implementation/logger/src/message.cpp3
-rw-r--r--implementation/message/src/deserializer.cpp12
-rw-r--r--implementation/routing/include/eventgroupinfo.hpp12
-rw-r--r--implementation/routing/include/remote_subscription.hpp3
-rw-r--r--implementation/routing/include/routing_manager_base.hpp8
-rw-r--r--implementation/routing/include/routing_manager_impl.hpp14
-rw-r--r--implementation/routing/include/routing_manager_proxy.hpp2
-rw-r--r--implementation/routing/include/routing_manager_stub.hpp10
-rw-r--r--implementation/routing/include/routing_manager_stub_host.hpp2
-rw-r--r--implementation/routing/src/event.cpp46
-rw-r--r--implementation/routing/src/eventgroupinfo.cpp167
-rw-r--r--implementation/routing/src/remote_subscription.cpp30
-rw-r--r--implementation/routing/src/routing_manager_base.cpp60
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp361
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp109
-rw-r--r--implementation/routing/src/routing_manager_stub.cpp93
-rw-r--r--implementation/runtime/src/application_impl.cpp2
-rw-r--r--implementation/security/src/policy.cpp4
-rw-r--r--implementation/security/src/security_impl.cpp16
-rw-r--r--implementation/service_discovery/include/service_discovery.hpp1
-rw-r--r--implementation/service_discovery/include/service_discovery_host.hpp2
-rw-r--r--implementation/service_discovery/include/service_discovery_impl.hpp5
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp258
54 files changed, 2014 insertions, 611 deletions
diff --git a/implementation/configuration/include/client.hpp b/implementation/configuration/include/client.hpp
index 7758eca..872974f 100644
--- a/implementation/configuration/include/client.hpp
+++ b/implementation/configuration/include/client.hpp
@@ -16,16 +16,20 @@ namespace vsomeip_v3 {
namespace cfg {
struct client {
- client() : service_(ANY_SERVICE), instance_(ANY_INSTANCE) {}
+ client() : service_(ANY_SERVICE),
+ instance_(ANY_INSTANCE) {
+ }
// ports for specific service / instance
service_t service_;
instance_t instance_;
std::map<bool, std::set<uint16_t> > ports_;
+ std::map<bool, uint16_t> last_used_specific_client_port_;
// client port ranges mapped to remote port ranges
std::map<bool, std::pair<uint16_t, uint16_t> > remote_ports_;
std::map<bool, std::pair<uint16_t, uint16_t> > client_ports_;
+ std::map<bool, uint16_t> last_used_client_port_;
};
} // namespace cfg
diff --git a/implementation/configuration/include/configuration.hpp b/implementation/configuration/include/configuration.hpp
index a962298..e107e17 100644
--- a/implementation/configuration/include/configuration.hpp
+++ b/implementation/configuration/include/configuration.hpp
@@ -254,7 +254,7 @@ public:
virtual bool is_secure_service(service_t _service, instance_t _instance) const = 0;
- virtual std::uint32_t get_udp_receive_buffer_size() const = 0;
+ virtual int get_udp_receive_buffer_size() const = 0;
virtual bool check_routing_credentials(client_t _client, uint32_t _uid, uint32_t _gid) const = 0;
@@ -273,6 +273,11 @@ public:
virtual uint32_t get_statistics_interval() const = 0;
virtual uint32_t get_statistics_min_freq() const = 0;
virtual uint32_t get_statistics_max_messages() const = 0;
+
+ virtual uint8_t get_max_remote_subscribers() const = 0;
+
+ virtual partition_id_t get_partition_id(
+ service_t _service, instance_t _instance) const = 0;
};
} // namespace vsomeip_v3
diff --git a/implementation/configuration/include/configuration_impl.hpp b/implementation/configuration/include/configuration_impl.hpp
index 66cfc84..cb6c284 100644
--- a/implementation/configuration/include/configuration_impl.hpp
+++ b/implementation/configuration/include/configuration_impl.hpp
@@ -220,7 +220,7 @@ public:
VSOMEIP_EXPORT bool is_secure_service(service_t _service, instance_t _instance) const;
- VSOMEIP_EXPORT std::uint32_t get_udp_receive_buffer_size() const;
+ VSOMEIP_EXPORT int get_udp_receive_buffer_size() const;
VSOMEIP_EXPORT bool has_overlay(const std::string &_name) const;
VSOMEIP_EXPORT void load_overlay(const std::string &_name);
@@ -239,6 +239,11 @@ public:
VSOMEIP_EXPORT uint32_t get_statistics_min_freq() const;
VSOMEIP_EXPORT uint32_t get_statistics_max_messages() const;
+ VSOMEIP_EXPORT uint8_t get_max_remote_subscribers() const;
+
+ VSOMEIP_EXPORT partition_id_t get_partition_id(
+ service_t _service, instance_t _instance) const;
+
private:
void read_data(const std::set<std::string> &_input,
std::vector<configuration_element> &_elements,
@@ -355,6 +360,9 @@ private:
instance_t _instance, eventgroup_t _eventgroup) const;
bool find_port(uint16_t &_port, uint16_t _remote, bool _reliable,
std::map<bool, std::set<uint16_t> > &_used_client_ports) const;
+ bool find_specific_port(uint16_t &_port, service_t _service,
+ instance_t _instance, bool _reliable,
+ std::map<bool, std::set<uint16_t> > &_used_client_ports) const;
void set_magic_cookies_unicast_address();
@@ -379,6 +387,9 @@ private:
void load_secure_services(const configuration_element &_element);
void load_secure_service(const boost::property_tree::ptree &_tree);
+ void load_partitions(const configuration_element &_element);
+ void load_partition(const boost::property_tree::ptree &_tree);
+
private:
std::mutex mutex_;
@@ -514,7 +525,9 @@ protected:
ET_PLUGIN_TYPE,
ET_ROUTING_CREDENTIALS,
ET_SHUTDOWN_TIMEOUT,
- ET_MAX = 42
+ ET_MAX_REMOTE_SUBSCRIBERS,
+ ET_PARTITIONS,
+ ET_MAX = 44
};
bool is_configured_[ET_MAX];
@@ -552,7 +565,7 @@ protected:
bool has_issued_methods_warning_;
bool has_issued_clients_warning_;
- std::uint32_t udp_receive_buffer_size_;
+ int udp_receive_buffer_size_;
std::chrono::nanoseconds npdu_default_debounce_requ_;
std::chrono::nanoseconds npdu_default_debounce_resp_;
@@ -568,6 +581,14 @@ protected:
uint32_t statistics_interval_;
uint32_t statistics_min_freq_;
uint32_t statistics_max_messages_;
+ uint8_t max_remote_subscribers_;
+
+ mutable std::mutex partitions_mutex_;
+ std::map<service_t,
+ std::map<instance_t,
+ partition_id_t
+ >
+ > partitions_;
};
} // namespace cfg
diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in
index cc21616..efd9ba0 100644
--- a/implementation/configuration/include/internal.hpp.in
+++ b/implementation/configuration/include/internal.hpp.in
@@ -88,6 +88,8 @@
#define VSOMEIP_DEFAULT_STATISTICS_MIN_FREQ 50
#define VSOMEIP_DEFAULT_STATISTICS_INTERVAL 10000
+#define VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS 3
+
#define VSOMEIP_MAX_WAIT_SENT 5
#define VSOMEIP_COMMAND_HEADER_SIZE 7
@@ -138,6 +140,9 @@
#define VSOMEIP_UPDATE_SECURITY_CREDENTIALS 0x27
#define VSOMEIP_DISTRIBUTE_SECURITY_POLICIES 0x28
#define VSOMEIP_UPDATE_SECURITY_POLICY_INT 0x29
+#define VSOMEIP_EXPIRED_SUBSCRIPTION 0x2A
+
+#define VSOMEIP_SUSPEND 0x30
#define VSOMEIP_SEND_COMMAND_SIZE 13
#define VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN 7
@@ -170,6 +175,7 @@
#define VSOMEIP_REGISTER_APPLICATION_COMMAND_SIZE 7
#define VSOMEIP_DEREGISTER_APPLICATION_COMMAND_SIZE 7
#define VSOMEIP_REGISTERED_ACK_COMMAND_SIZE 7
+#define VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE 17
#ifndef _WIN32
@@ -233,6 +239,9 @@ enum class port_type_e {
PT_UNKNOWN
};
+typedef uint8_t partition_id_t;
+const partition_id_t VSOMEIP_DEFAULT_PARTITION_ID = 0;
+
} // namespace vsomeip_v3
#endif // VSOMEIP_V3_INTERNAL_HPP_
diff --git a/implementation/configuration/include/internal_android.hpp b/implementation/configuration/include/internal_android.hpp
index 25ecb4e..8ecd2b5 100644
--- a/implementation/configuration/include/internal_android.hpp
+++ b/implementation/configuration/include/internal_android.hpp
@@ -72,6 +72,8 @@
#define VSOMEIP_DEFAULT_STATISTICS_MIN_FREQ 50
#define VSOMEIP_DEFAULT_STATISTICS_INTERVAL 10000
+#define VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS 3
+
#define VSOMEIP_MAX_WAIT_SENT 5
#define VSOMEIP_COMMAND_HEADER_SIZE 7
@@ -122,6 +124,9 @@
#define VSOMEIP_UPDATE_SECURITY_CREDENTIALS 0x27
#define VSOMEIP_DISTRIBUTE_SECURITY_POLICIES 0x28
#define VSOMEIP_UPDATE_SECURITY_POLICY_INT 0x29
+#define VSOMEIP_EXPIRED_SUBSCRIPTION 0x2A
+
+#define VSOMEIP_SUSPEND 0x30
#define VSOMEIP_SEND_COMMAND_SIZE 13
#define VSOMEIP_SEND_COMMAND_INSTANCE_POS_MIN 7
@@ -154,6 +159,7 @@
#define VSOMEIP_REGISTER_APPLICATION_COMMAND_SIZE 7
#define VSOMEIP_DEREGISTER_APPLICATION_COMMAND_SIZE 7
#define VSOMEIP_REGISTERED_ACK_COMMAND_SIZE 7
+#define VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE 17
#include <pthread.h>
@@ -214,6 +220,9 @@ enum class port_type_e {
PT_UNKNOWN
};
+typedef uint8_t partition_id_t;
+const partition_id_t VSOMEIP_DEFAULT_PARTITION_ID = 0;
+
} // namespace vsomeip_v3
#endif // VSOMEIP_V3_INTERNAL_HPP_
diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp
index 0e9ca51..6fdbb67 100644
--- a/implementation/configuration/src/configuration_impl.cpp
+++ b/implementation/configuration/src/configuration_impl.cpp
@@ -94,7 +94,8 @@ configuration_impl::configuration_impl()
log_statistics_(true),
statistics_interval_(VSOMEIP_DEFAULT_STATISTICS_INTERVAL),
statistics_min_freq_(VSOMEIP_DEFAULT_STATISTICS_MIN_FREQ),
- statistics_max_messages_(VSOMEIP_DEFAULT_STATISTICS_MAX_MSG) {
+ statistics_max_messages_(VSOMEIP_DEFAULT_STATISTICS_MAX_MSG),
+ max_remote_subscribers_(VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS) {
unicast_ = unicast_.from_string(VSOMEIP_UNICAST_ADDRESS);
netmask_ = netmask_.from_string(VSOMEIP_NETMASK);
for (auto i = 0; i < ET_MAX; i++)
@@ -199,6 +200,7 @@ configuration_impl::configuration_impl(const configuration_impl &_other)
statistics_interval_ = _other.statistics_interval_;
statistics_min_freq_ = _other.statistics_min_freq_;
statistics_max_messages_ = _other.statistics_max_messages_;
+ max_remote_subscribers_ = _other.max_remote_subscribers_;
}
configuration_impl::~configuration_impl() {
@@ -510,6 +512,7 @@ bool configuration_impl::load_data(const std::vector<configuration_element> &_el
load_debounce(e);
load_acceptances(e);
load_secure_services(e);
+ load_partitions(e);
}
}
@@ -1320,6 +1323,24 @@ void configuration_impl::load_service_discovery(
load_ttl_factors(i->second, &ttl_factors_subscriptions_);
is_configured_[ET_SERVICE_DISCOVERY_TTL_FACTOR_SUBSCRIPTIONS] = true;
}
+ } else if (its_key == "max_remote_subscribers") {
+ if (!is_overlay_ && is_configured_[ET_MAX_REMOTE_SUBSCRIBERS]) {
+ VSOMEIP_WARNING << "Multiple definitions for service_discovery.max_remote_subscribers."
+ " Ignoring definition from " << _element.name_;
+ } else {
+ int tmp;
+ its_converter << its_value;
+ its_converter >> tmp;
+ max_remote_subscribers_ = (tmp > (std::numeric_limits<std::uint8_t>::max)()) ?
+ (std::numeric_limits<std::uint8_t>::max)() :
+ static_cast<std::uint8_t>(tmp);
+ if (max_remote_subscribers_ == 0) {
+ VSOMEIP_WARNING << "max_remote_subscribers_ = 0 is not allowed. Using default ("
+ << std::dec << VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS << ")";
+ max_remote_subscribers_ = VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS;
+ }
+ is_configured_[ET_MAX_REMOTE_SUBSCRIBERS] = true;
+ }
}
}
} catch (...) {
@@ -1832,6 +1853,10 @@ void configuration_impl::load_client(const boost::property_tree::ptree &_tree) {
its_client->remote_ports_[false] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT);
its_client->client_ports_[true] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT);
its_client->client_ports_[false] = std::make_pair(ILLEGAL_PORT, ILLEGAL_PORT);
+ its_client->last_used_specific_client_port_[true] = ILLEGAL_PORT;
+ its_client->last_used_specific_client_port_[false] = ILLEGAL_PORT;
+ its_client->last_used_client_port_[true] = ILLEGAL_PORT;
+ its_client->last_used_client_port_[false] = ILLEGAL_PORT;
for (auto i = _tree.begin(); i != _tree.end(); ++i) {
std::string its_key(i->first);
@@ -2128,6 +2153,92 @@ void configuration_impl::load_selective_broadcasts_support(const configuration_e
}
}
+
+void
+configuration_impl::load_partitions(const configuration_element &_element) {
+
+ try {
+ auto its_partitions = _element.tree_.get_child("partitions");
+ for (auto i = its_partitions.begin(); i != its_partitions.end(); ++i) {
+ load_partition(i->second);
+ }
+ } catch (...) {
+ }
+}
+
+void
+configuration_impl::load_partition(const boost::property_tree::ptree &_tree) {
+
+ static partition_id_t its_partition_id(VSOMEIP_DEFAULT_PARTITION_ID);
+
+ try {
+
+ std::stringstream its_converter;
+ std::map<service_t, std::set<instance_t> > its_partition_members;
+
+ for (auto i = _tree.begin(); i != _tree.end(); ++i) {
+ service_t its_service(0x0);
+ instance_t its_instance(0x0);
+ std::string its_service_s, its_instance_s;
+
+ for (auto j = i->second.begin(); j != i->second.end(); ++j) {
+ std::string its_key(j->first);
+ std::string its_data(j->second.data());
+
+ its_converter.str("");
+ its_converter.clear();
+
+ if (its_data.find("0x") != std::string::npos)
+ its_converter << std::hex;
+ else
+ its_converter << std::dec;
+ its_converter << its_data;
+
+ if (its_key == "service") {
+ its_converter >> its_service;
+ its_service_s = its_data;
+ } else if (its_key == "instance") {
+ its_converter >> its_instance;
+ its_instance_s = its_data;
+ }
+ }
+
+ if (its_service > 0 && its_instance > 0)
+ its_partition_members[its_service].insert(its_instance);
+ else
+ VSOMEIP_ERROR << "P: <" << its_service_s << "."
+ << its_instance_s << "> is no valid service instance.";
+
+ }
+
+ if (!its_partition_members.empty()) {
+ std::lock_guard<std::mutex> its_lock(partitions_mutex_);
+ its_partition_id++;
+
+ std::stringstream its_log;
+ its_log << "P"
+ << std::dec << static_cast<int>(its_partition_id)
+ << " [";
+
+ for (const auto &i : its_partition_members) {
+ for (const auto j : i.second) {
+ partitions_[i.first][j] = its_partition_id;
+ its_log << "<"
+ << std::setw(4) << std::setfill('0') << std::hex
+ << i.first << "."
+ << std::setw(4) << std::setfill('0') << std::hex
+ << j
+ << ">";
+ }
+ }
+
+ its_log << "]";
+ VSOMEIP_INFO << its_log.str();
+ }
+ } catch (...) {
+ }
+}
+
///////////////////////////////////////////////////////////////////////////////
// Internal helper
///////////////////////////////////////////////////////////////////////////////
@@ -2351,19 +2462,14 @@ bool configuration_impl::get_client_port(
std::map<bool, std::set<uint16_t> > &_used_client_ports,
uint16_t &_client_port) const {
bool is_configured(false);
-
_client_port = ILLEGAL_PORT;
- auto its_client = find_client(_service, _instance);
- // Check for service, instance specific port configuration
- if (its_client && !its_client->ports_[_reliable].empty()) {
+ uint16_t its_specific_port(ILLEGAL_PORT);
+ if (find_specific_port(its_specific_port, _service, _instance, _reliable, _used_client_ports)) {
is_configured = true;
- for (auto its_port : its_client->ports_[_reliable]) {
- // Found free configured port
- if (_used_client_ports[_reliable].find(its_port) == _used_client_ports[_reliable].end()) {
- _client_port = its_port;
- return true;
- }
+ if (its_specific_port != ILLEGAL_PORT) {
+ _client_port = its_specific_port;
+ return true;
}
}
@@ -2615,18 +2721,120 @@ bool configuration_impl::find_port(uint16_t &_port, uint16_t _remote, bool _reli
std::list<std::shared_ptr<client>>::const_iterator it;
for (it = clients_.begin(); it != clients_.end(); ++it) {
- if (is_in_port_range(_remote, (*it)->remote_ports_[_reliable])) {
+ if (_remote != ILLEGAL_PORT && is_in_port_range(_remote,
+ (*it)->remote_ports_[_reliable])) {
is_configured = true;
- for (uint16_t its_port = (*it)->client_ports_[_reliable].first;
- its_port <= (*it)->client_ports_[_reliable].second; its_port++ ) {
- if (_used_client_ports[_reliable].find(its_port) == _used_client_ports[_reliable].end()) {
+ uint16_t its_port(ILLEGAL_PORT);
+ if ((*it)->last_used_client_port_[_reliable] != ILLEGAL_PORT &&
+ is_in_port_range(((*it)->last_used_client_port_[_reliable])++,
+ (*it)->client_ports_[_reliable])) {
+ its_port = ((*it)->last_used_client_port_[_reliable])++;
+ } else {
+ // on initial start of port search
+ if ((*it)->last_used_client_port_[_reliable] == ILLEGAL_PORT) {
+ its_port = (*it)->client_ports_[_reliable].first;
+ } else {
+ continue;
+ }
+ }
+ while (its_port <= (*it)->client_ports_[_reliable].second) {
+ if (_used_client_ports[_reliable].find(its_port)
+ == _used_client_ports[_reliable].end()) {
+ _port = its_port;
+ (*it)->last_used_client_port_[_reliable] = its_port;
+ return true;
+ }
+ its_port++;
+ }
+ }
+ }
+ // no free port was found in _used_client_ports or last_used_port
+ // cannot be incremented for any client port range
+ // -> reset last used port for all available client port ranges
+ for (it = clients_.begin(); it != clients_.end(); ++it) {
+ if (_remote != ILLEGAL_PORT && is_in_port_range(_remote,
+ (*it)->remote_ports_[_reliable])) {
+ (*it)->last_used_client_port_[_reliable] = ILLEGAL_PORT;
+ }
+ }
+ // ensure that all configured client ports are checked from beginning
+ for (it = clients_.begin(); it != clients_.end(); ++it) {
+ if (_remote != ILLEGAL_PORT && is_in_port_range(_remote,
+ (*it)->remote_ports_[_reliable])) {
+ uint16_t its_port(ILLEGAL_PORT);
+ its_port = (*it)->client_ports_[_reliable].first;
+ while (its_port <= (*it)->client_ports_[_reliable].second) {
+ if (_used_client_ports[_reliable].find(its_port)
+ == _used_client_ports[_reliable].end()) {
_port = its_port;
+ (*it)->last_used_client_port_[_reliable] = its_port;
return true;
}
+ its_port++;
}
}
}
+ return is_configured;
+}
+
+bool configuration_impl::find_specific_port(uint16_t &_port, service_t _service,
+ instance_t _instance, bool _reliable,
+ std::map<bool, std::set<uint16_t> > &_used_client_ports) const {
+ bool is_configured(false);
+ bool check_all(false);
+ std::list<std::shared_ptr<client>>::const_iterator it;
+ auto its_client = find_client(_service, _instance);
+ // Check for service, instance specific port configuration
+ if (its_client && !its_client->ports_[_reliable].empty()) {
+ is_configured = true;
+ std::set<uint16_t>::const_iterator it;
+ if (its_client->last_used_specific_client_port_[_reliable] == ILLEGAL_PORT) {
+ it = its_client->ports_[_reliable].begin();
+ } else {
+ it = its_client->ports_[_reliable].find(
+ its_client->last_used_specific_client_port_[_reliable]);
+ auto it_next = std::next(it, 1);
+ if (it_next != its_client->ports_[_reliable].end()) {
+ check_all = true;
+ it = it_next;
+ } else {
+ it = its_client->ports_[_reliable].begin();
+ }
+ }
+ while (it != its_client->ports_[_reliable].end()) {
+ if (_used_client_ports[_reliable].find(*it)
+ == _used_client_ports[_reliable].end()) {
+ _port = *it;
+ its_client->last_used_specific_client_port_[_reliable] = *it;
+ VSOMEIP_INFO << "configuration_impl:find_specific_port #1:"
+ << " service: " << std::hex << _service
+ << " instance: " << _instance
+ << " reliable: " << std::dec << _reliable
+ << " return specific port: " << (uint32_t)_port;
+ return true;
+ }
+ ++it;
+ }
+ if (check_all) {
+ // no free port was found
+ // ensure that all configured client ports are checked from beginning
+ for (auto its_port : _used_client_ports[_reliable]) {
+ if (_used_client_ports[_reliable].find(its_port)
+ == _used_client_ports[_reliable].end()) {
+ _port = its_port;
+ its_client->last_used_specific_client_port_[_reliable] = its_port;
+ VSOMEIP_INFO << "configuration_impl:find_specific_port #2:"
+ << " service: " << std::hex << _service
+ << " instance: " << _instance
+ << " reliable: " << std::dec << _reliable
+ << " return specific port: " << (uint32_t)_port;
+ return true;
+ }
+ }
+ }
+ its_client->last_used_specific_client_port_[_reliable] = ILLEGAL_PORT;
+ }
return is_configured;
}
@@ -3414,23 +3622,54 @@ configuration_impl::load_acceptance_data(
// If optional was not set, use default!
if (!has_optional) {
const auto its_optional_client = boost::icl::interval<std::uint16_t>::closed(30491, 30499);
+ const auto its_optional_client_spare = boost::icl::interval<std::uint16_t>::closed(30898, 30998);
const auto its_optional_server = boost::icl::interval<std::uint16_t>::closed(30501, 30599);
its_ports.operator [](is_reliable).first.insert(its_optional_client);
+ its_ports.operator [](is_reliable).first.insert(its_optional_client_spare);
its_ports.operator [](is_reliable).first.insert(its_optional_server);
}
// If secure was not set, use default!
if (!has_secure) {
const auto its_secure_client = boost::icl::interval<std::uint16_t>::closed(32491, 32499);
+ const auto its_secure_client_spare = boost::icl::interval<std::uint16_t>::closed(32898, 32998);
const auto its_secure_server = boost::icl::interval<std::uint16_t>::closed(32501, 32599);
its_ports.operator [](is_reliable).second.insert(its_secure_client);
+ its_ports.operator [](is_reliable).second.insert(its_secure_client_spare);
its_ports.operator [](is_reliable).second.insert(its_secure_server);
}
}
}
+ // If no ports are specified, use default!
+ if (its_ports.empty()) {
+ const auto its_optional_client = boost::icl::interval<std::uint16_t>::closed(30491, 30499);
+ const auto its_optional_client_spare = boost::icl::interval<std::uint16_t>::closed(30898, 30998);
+ const auto its_optional_server = boost::icl::interval<std::uint16_t>::closed(30501, 30599);
+
+ // optional
+ its_ports.operator [](false).first.insert(its_optional_client);
+ its_ports.operator [](false).first.insert(its_optional_client_spare);
+ its_ports.operator [](false).first.insert(its_optional_server);
+ its_ports.operator [](true).first.insert(its_optional_client);
+ its_ports.operator [](true).first.insert(its_optional_client_spare);
+ its_ports.operator [](true).first.insert(its_optional_server);
+
+ // secure
+ const auto its_secure_client = boost::icl::interval<std::uint16_t>::closed(32491, 32499);
+ const auto its_secure_client_spare = boost::icl::interval<std::uint16_t>::closed(32898, 32998);
+ const auto its_secure_server = boost::icl::interval<std::uint16_t>::closed(32501, 32599);
+
+ its_ports.operator [](false).second.insert(its_secure_client);
+ its_ports.operator [](false).second.insert(its_secure_client_spare);
+ its_ports.operator [](false).second.insert(its_secure_server);
+ its_ports.operator [](true).second.insert(its_secure_client);
+ its_ports.operator [](true).second.insert(its_secure_client_spare);
+ its_ports.operator [](true).second.insert(its_secure_server);
+ }
+
if (!its_address.is_unspecified()) {
sd_acceptance_rules_.insert(
std::make_pair(its_address,
@@ -3572,8 +3811,7 @@ configuration_impl::load_udp_receive_buffer_size(const configuration_element &_e
} else {
const std::string s(_element.tree_.get_child(urbs).data());
try {
- udp_receive_buffer_size_ = static_cast<std::uint32_t>(std::stoul(
- s.c_str(), NULL, 10));
+ udp_receive_buffer_size_ = std::stoi(s.c_str(), NULL, 10);
} catch (const std::exception &e) {
VSOMEIP_ERROR<< __func__ << ": " << urbs << " " << e.what();
}
@@ -3774,8 +4012,11 @@ void configuration_impl::set_sd_acceptance_rule(
std::lock_guard<std::mutex> its_lock(sd_acceptance_required_ips_mutex_);
const auto its_optional_client = boost::icl::interval<std::uint16_t>::closed(30491, 30499);
+ const auto its_optional_client_spare = boost::icl::interval<std::uint16_t>::closed(30898, 30998);
const auto its_optional_server = boost::icl::interval<std::uint16_t>::closed(30501, 30599);
+
const auto its_secure_client = boost::icl::interval<std::uint16_t>::closed(32491, 32499);
+ const auto its_secure_client_spare = boost::icl::interval<std::uint16_t>::closed(32898, 32998);
const auto its_secure_server = boost::icl::interval<std::uint16_t>::closed(32501, 32599);
const bool rules_active = (sd_acceptance_rules_active_.find(_address)
@@ -3801,8 +4042,10 @@ void configuration_impl::set_sd_acceptance_rule(
(found_reliability->second.first.empty()
&& found_reliability->second.second.empty())) {
found_reliability->second.first.add(its_optional_client);
+ found_reliability->second.first.add(its_optional_client_spare);
found_reliability->second.first.add(its_optional_server);
found_reliability->second.second.add(its_secure_client);
+ found_reliability->second.second.add(its_secure_client_spare);
found_reliability->second.second.add(its_secure_server);
if (!rules_active) {
sd_acceptance_rules_active_.insert(_address);
@@ -3819,8 +4062,10 @@ void configuration_impl::set_sd_acceptance_rule(
}
} else {
found_reliability->second.first.erase(its_optional_client);
+ found_reliability->second.first.erase(its_optional_client_spare);
found_reliability->second.first.erase(its_optional_server);
found_reliability->second.second.erase(its_secure_client);
+ found_reliability->second.second.erase(its_secure_client_spare);
found_reliability->second.second.erase(its_secure_server);
if (found_reliability->second.first.empty()
&& found_reliability->second.second.empty()) {
@@ -3836,9 +4081,11 @@ void configuration_impl::set_sd_acceptance_rule(
} else if (_enable) {
boost::icl::interval_set<std::uint16_t> its_optional_default;
its_optional_default.add(its_optional_client);
+ its_optional_default.add(its_optional_client_spare);
its_optional_default.add(its_optional_server);
boost::icl::interval_set<std::uint16_t> its_secure_default;
its_secure_default.add(its_secure_client);
+ its_secure_default.add(its_secure_client_spare);
its_secure_default.add(its_secure_server);
found_address->second.second.emplace(
@@ -3857,9 +4104,11 @@ void configuration_impl::set_sd_acceptance_rule(
} else if (_enable) {
boost::icl::interval_set<std::uint16_t> its_optional_default;
its_optional_default.add(its_optional_client);
+ its_optional_default.add(its_optional_client_spare);
its_optional_default.add(its_optional_server);
boost::icl::interval_set<std::uint16_t> its_secure_default;
its_secure_default.add(its_secure_client);
+ its_secure_default.add(its_secure_client_spare);
its_secure_default.add(its_secure_server);
sd_acceptance_rules_.emplace(std::make_pair(_address,
@@ -3921,7 +4170,7 @@ bool configuration_impl::is_secure_service(service_t _service, instance_t _insta
return (false);
}
-std::uint32_t configuration_impl::get_udp_receive_buffer_size() const {
+int configuration_impl::get_udp_receive_buffer_size() const {
return udp_receive_buffer_size_;
}
@@ -4001,5 +4250,27 @@ uint32_t configuration_impl::get_statistics_max_messages() const {
return statistics_max_messages_;
}
-} // namespace config
+uint8_t configuration_impl::get_max_remote_subscribers() const {
+ return max_remote_subscribers_;
+}
+
+partition_id_t
+configuration_impl::get_partition_id(
+ service_t _service, instance_t _instance) const {
+
+ partition_id_t its_id(VSOMEIP_DEFAULT_PARTITION_ID);
+
+ std::lock_guard<std::mutex> its_lock(partitions_mutex_);
+ auto find_service = partitions_.find(_service);
+ if (find_service != partitions_.end()) {
+ auto find_instance = find_service->second.find(_instance);
+ if (find_instance != find_service->second.end()) {
+ its_id = find_instance->second;
+ }
+ }
+
+ return (its_id);
+}
+
+} // namespace cfg
} // namespace vsomeip_v3
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp
index 4411a50..518e696 100644
--- a/implementation/endpoints/include/client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/client_endpoint_impl.hpp
@@ -70,6 +70,7 @@ public:
virtual std::uint16_t get_remote_port() const;
std::uint16_t get_local_port() const;
+ void set_local_port(uint16_t _port);
virtual bool is_reliable() const = 0;
size_t get_queue_size() const;
@@ -93,7 +94,8 @@ protected:
CONNECTED,
ESTABLISHED
};
- virtual void send_queued() = 0;
+ message_buffer_ptr_t get_front();
+ virtual void send_queued(message_buffer_ptr_t _buffer) = 0;
virtual void get_configured_times_from_endpoint(
service_t _service, method_t _method,
std::chrono::nanoseconds *_debouncing,
diff --git a/implementation/endpoints/include/endpoint.hpp b/implementation/endpoints/include/endpoint.hpp
index ccfe96b..3eafc3a 100644
--- a/implementation/endpoints/include/endpoint.hpp
+++ b/implementation/endpoints/include/endpoint.hpp
@@ -52,6 +52,7 @@ public:
virtual void remove_default_target(service_t _service) = 0;
virtual std::uint16_t get_local_port() const = 0;
+ virtual void set_local_port(uint16_t _port) = 0;
virtual bool is_reliable() const = 0;
virtual bool is_local() const = 0;
diff --git a/implementation/endpoints/include/endpoint_host.hpp b/implementation/endpoints/include/endpoint_host.hpp
index 61c012c..2af1697 100644
--- a/implementation/endpoints/include/endpoint_host.hpp
+++ b/implementation/endpoints/include/endpoint_host.hpp
@@ -29,6 +29,7 @@ public:
virtual void on_connect(std::shared_ptr<endpoint> _endpoint) = 0;
virtual void on_disconnect(std::shared_ptr<endpoint> _endpoint) = 0;
+ virtual bool on_bind_error(std::shared_ptr<endpoint> _endpoint, uint16_t _remote_port) = 0;
virtual void on_error(const byte_t *_data, length_t _length,
endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
diff --git a/implementation/endpoints/include/endpoint_impl.hpp b/implementation/endpoints/include/endpoint_impl.hpp
index 953201c..76f4698 100644
--- a/implementation/endpoints/include/endpoint_impl.hpp
+++ b/implementation/endpoints/include/endpoint_impl.hpp
@@ -43,6 +43,7 @@ public:
void remove_default_target(service_t);
virtual std::uint16_t get_local_port() const = 0;
+ virtual void set_local_port(uint16_t _port) = 0;
virtual bool is_reliable() const = 0;
void increment_use_count();
diff --git a/implementation/endpoints/include/endpoint_manager_base.hpp b/implementation/endpoints/include/endpoint_manager_base.hpp
index 2a766f9..aa21269 100644
--- a/implementation/endpoints/include/endpoint_manager_base.hpp
+++ b/implementation/endpoints/include/endpoint_manager_base.hpp
@@ -50,6 +50,7 @@ public:
// endpoint_host interface
virtual void on_connect(std::shared_ptr<endpoint> _endpoint);
virtual void on_disconnect(std::shared_ptr<endpoint> _endpoint);
+ virtual bool on_bind_error(std::shared_ptr<endpoint> _endpoint, uint16_t _remote_port);
virtual void on_error(const byte_t *_data, length_t _length,
endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
diff --git a/implementation/endpoints/include/endpoint_manager_impl.hpp b/implementation/endpoints/include/endpoint_manager_impl.hpp
index a7d28c6..3354947 100644
--- a/implementation/endpoints/include/endpoint_manager_impl.hpp
+++ b/implementation/endpoints/include/endpoint_manager_impl.hpp
@@ -82,6 +82,8 @@ public:
// endpoint_host interface
void on_connect(std::shared_ptr<endpoint> _endpoint);
void on_disconnect(std::shared_ptr<endpoint> _endpoint);
+ bool on_bind_error(std::shared_ptr<endpoint> _endpoint,
+ std::uint16_t _remote_port);
void on_error(const byte_t *_data, length_t _length,
endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
@@ -113,8 +115,15 @@ private:
std::map<bool, std::shared_ptr<endpoint>>>> remote_services_t;
remote_services_t remote_services_;
- typedef std::map<boost::asio::ip::address, std::map<uint16_t,
- std::map<bool, std::shared_ptr<endpoint>>>> client_endpoints_by_ip_t;
+ typedef std::map<boost::asio::ip::address,
+ std::map<uint16_t,
+ std::map<bool,
+ std::map<partition_id_t,
+ std::shared_ptr<endpoint>
+ >
+ >
+ >
+ > client_endpoints_by_ip_t;
client_endpoints_by_ip_t client_endpoints_by_ip_;
std::map<service_t, std::map<endpoint *, instance_t> > service_instances_;
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp
index 0e81bf1..3eae191 100644
--- a/implementation/endpoints/include/local_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp
@@ -62,7 +62,7 @@ public:
std::chrono::nanoseconds *_debouncing,
std::chrono::nanoseconds *_maximum_retention) const;
private:
- void send_queued();
+ void send_queued(message_buffer_ptr_t _buffer);
void send_magic_cookie();
diff --git a/implementation/endpoints/include/local_server_endpoint_impl.hpp b/implementation/endpoints/include/local_server_endpoint_impl.hpp
index 965f03c..8fbb619 100644
--- a/implementation/endpoints/include/local_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_server_endpoint_impl.hpp
@@ -83,6 +83,7 @@ public:
bool is_reliable() const;
std::uint16_t get_local_port() const;
+ void set_local_port(std::uint16_t _port);
client_t assign_client(const byte_t *_data, uint32_t _size);
diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp
index 193043c..dfa22e6 100644
--- a/implementation/endpoints/include/server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/server_endpoint_impl.hpp
@@ -59,6 +59,7 @@ public:
virtual bool is_reliable() const = 0;
virtual std::uint16_t get_local_port() const = 0;
+ virtual void set_local_port(uint16_t _port) = 0;
public:
void connect_cbk(boost::system::error_code const &_error);
@@ -98,8 +99,10 @@ protected:
protected:
queue_type queues_;
- std::mutex clients_mutex_;
- std::map<client_t, std::map<session_t, endpoint_type> > clients_;
+ std::mutex requests_mutex_;
+ std::map<client_t,
+ std::map<std::tuple<session_t, service_t, instance_t>, endpoint_type>
+ > requests_;
std::map<endpoint_type, std::shared_ptr<train>> trains_;
diff --git a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
index e8c2c9a..137571c 100644
--- a/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/tcp_client_endpoint_impl.hpp
@@ -42,7 +42,7 @@ public:
void send_cbk(boost::system::error_code const &_error, std::size_t _bytes,
const message_buffer_ptr_t& _sent_msg);
private:
- void send_queued();
+ void send_queued(message_buffer_ptr_t _buffer);
void get_configured_times_from_endpoint(
service_t _service, method_t _method,
std::chrono::nanoseconds *_debouncing,
diff --git a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
index af9a724..bf0e1b9 100644
--- a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
@@ -52,6 +52,7 @@ public:
bool get_default_target(service_t, endpoint_type &) const;
std::uint16_t get_local_port() const;
+ void set_local_port(uint16_t _port);
bool is_reliable() const;
bool is_local() const;
diff --git a/implementation/endpoints/include/udp_client_endpoint_impl.hpp b/implementation/endpoints/include/udp_client_endpoint_impl.hpp
index 05c63c4..3a3fdcb 100644
--- a/implementation/endpoints/include/udp_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/udp_client_endpoint_impl.hpp
@@ -47,8 +47,11 @@ public:
bool is_local() const;
void print_status();
bool is_reliable() const;
+
+ void send_cbk(boost::system::error_code const &_error, std::size_t _bytes,
+ const message_buffer_ptr_t &_sent_msg);
private:
- void send_queued();
+ void send_queued(message_buffer_ptr_t _buffer);
void get_configured_times_from_endpoint(
service_t _service, method_t _method,
std::chrono::nanoseconds *_debouncing,
@@ -66,7 +69,7 @@ private:
private:
const boost::asio::ip::address remote_address_;
const std::uint16_t remote_port_;
- const std::uint32_t udp_receive_buffer_size_;
+ int udp_receive_buffer_size_;
std::shared_ptr<tp::tp_reassembler> tp_reassembler_;
};
diff --git a/implementation/endpoints/include/udp_server_endpoint_impl.hpp b/implementation/endpoints/include/udp_server_endpoint_impl.hpp
index 4ceefb6..fef09dc 100644
--- a/implementation/endpoints/include/udp_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/udp_server_endpoint_impl.hpp
@@ -57,6 +57,7 @@ public:
bool get_default_target(service_t _service, endpoint_type &_target) const;
std::uint16_t get_local_port() const;
+ void set_local_port(uint16_t _port);
bool is_local() const;
void print_status();
diff --git a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
index 38db284..adf3972 100644
--- a/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/virtual_server_endpoint_impl.hpp
@@ -50,6 +50,7 @@ public:
bool get_remote_address(boost::asio::ip::address &_address) const;
std::uint16_t get_local_port() const;
+ void set_local_port(uint16_t _port);
std::uint16_t get_remote_port() const;
bool is_reliable() const;
bool is_local() const;
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 9b31cc1..66b3138 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -120,9 +120,24 @@ void client_endpoint_impl<Protocol>::stop() {
connect_timer_.cancel(ec);
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
- shutdown_and_close_socket(false);
+
+ // bind to strand as stop() might be called from different thread
+ strand_.dispatch(std::bind(&client_endpoint_impl::shutdown_and_close_socket,
+ this->shared_from_this(),
+ false)
+ );
+}
+
+template<typename Protocol>
+message_buffer_ptr_t client_endpoint_impl<Protocol>::get_front() {
+ message_buffer_ptr_t its_buffer;
+ if (queue_.size())
+ its_buffer = queue_.front();
+
+ return (its_buffer);
}
+
template<typename Protocol>
bool client_endpoint_impl<Protocol>::send_to(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
@@ -317,7 +332,11 @@ void client_endpoint_impl<Protocol>::send_segments(
// respect minimal debounce time
wait_until_debounce_time_reached();
// ignore retention time and send immediately as the train is full anyway
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
+ }
}
train_.last_departure_ = std::chrono::steady_clock::now();
}
@@ -397,8 +416,10 @@ void client_endpoint_impl<Protocol>::connect_cbk(
if (was_not_connected_) {
was_not_connected_ = false;
std::lock_guard<std::mutex> its_lock(mutex_);
- if (queue_.size() > 0) {
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
VSOMEIP_WARNING << __func__ << ": resume sending to: "
<< get_remote_information();
}
@@ -415,7 +436,9 @@ template<typename Protocol>
void client_endpoint_impl<Protocol>::wait_connect_cbk(
boost::system::error_code const &_error) {
if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) {
- connect();
+ auto self = this->shared_from_this();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
}
}
@@ -429,7 +452,9 @@ void client_endpoint_impl<Protocol>::send_cbk(
if (queue_.size() > 0) {
queue_size_ -= queue_.front()->size();
queue_.pop_front();
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer)
+ send_queued(its_buffer);
}
} else if (_error == boost::asio::error::broken_pipe) {
state_ = cei_state_e::CLOSED;
@@ -475,7 +500,8 @@ void client_endpoint_impl<Protocol>::send_cbk(
}
was_not_connected_ = true;
shutdown_and_close_socket(true);
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
} else if (_error == boost::asio::error::not_connected
|| _error == boost::asio::error::bad_descriptor
|| _error == boost::asio::error::no_permission) {
@@ -490,7 +516,8 @@ void client_endpoint_impl<Protocol>::send_cbk(
}
was_not_connected_ = true;
shutdown_and_close_socket(true);
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
} else if (_error == boost::asio::error::operation_aborted) {
VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message();
// endpoint was stopped
@@ -585,6 +612,11 @@ std::uint16_t client_endpoint_impl<Protocol>::get_local_port() const {
}
template<typename Protocol>
+void client_endpoint_impl<Protocol>::set_local_port(uint16_t _port) {
+ local_port_ = _port;
+}
+
+template<typename Protocol>
void client_endpoint_impl<Protocol>::start_connect_timer() {
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
connect_timer_.expires_from_now(
@@ -665,7 +697,11 @@ void client_endpoint_impl<Protocol>::queue_train(bool _queue_size_zero_on_entry)
queue_size_ += train_.buffer_->size();
train_.buffer_ = std::make_shared<message_buffer_t>();
if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
+ this->shared_from_this(), its_buffer));
+ }
}
}
diff --git a/implementation/endpoints/src/endpoint_manager_base.cpp b/implementation/endpoints/src/endpoint_manager_base.cpp
index a79ae48..cfddc87 100644
--- a/implementation/endpoints/src/endpoint_manager_base.cpp
+++ b/implementation/endpoints/src/endpoint_manager_base.cpp
@@ -123,6 +123,13 @@ void endpoint_manager_base::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
rm_->on_disconnect(_endpoint);
}
+bool endpoint_manager_base::on_bind_error(std::shared_ptr<endpoint> _endpoint, uint16_t _remote_port) {
+ (void)_endpoint;
+ (void)_remote_port;
+ return true;
+ // intentionally left blank
+}
+
void endpoint_manager_base::on_error(
const byte_t *_data, length_t _length, endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
@@ -158,7 +165,7 @@ endpoint_manager_base::log_client_states() const {
{
std::lock_guard<std::mutex> its_lock(local_endpoint_mutex_);
- for (const auto& e : local_endpoints_) {
+ for (const auto &e : local_endpoints_) {
size_t its_queue_size = e.second->get_queue_size();
if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE) {
its_client_queue_sizes.push_back(
diff --git a/implementation/endpoints/src/endpoint_manager_impl.cpp b/implementation/endpoints/src/endpoint_manager_impl.cpp
index a82a173..dbb2107 100644
--- a/implementation/endpoints/src/endpoint_manager_impl.cpp
+++ b/implementation/endpoints/src/endpoint_manager_impl.cpp
@@ -328,8 +328,10 @@ bool endpoint_manager_impl::remove_server_endpoint(uint16_t _port, bool _reliabl
return ret;
}
-void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_t _instance,
- bool _reliable) {
+void
+endpoint_manager_impl::clear_client_endpoints(
+ service_t _service, instance_t _instance, bool _reliable) {
+
std::shared_ptr<endpoint> endpoint_to_delete;
bool other_services_reachable_through_endpoint(false);
{
@@ -371,8 +373,12 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_
}
if (!other_services_reachable_through_endpoint) {
- std::uint16_t its_port(0);
+ partition_id_t its_partition;
boost::asio::ip::address its_address;
+ std::uint16_t its_port(0);
+
+ its_partition = configuration_->get_partition_id(_service, _instance);
+
if (_reliable) {
std::shared_ptr<tcp_client_endpoint_impl> ep =
std::dynamic_pointer_cast<tcp_client_endpoint_impl>(endpoint_to_delete);
@@ -392,15 +398,21 @@ void endpoint_manager_impl::clear_client_endpoints(service_t _service, instance_
if (found_ip != client_endpoints_by_ip_.end()) {
const auto found_port = found_ip->second.find(its_port);
if (found_port != found_ip->second.end()) {
- const auto found_reliable = found_port->second.find(_reliable);
+ auto found_reliable = found_port->second.find(_reliable);
if (found_reliable != found_port->second.end()) {
- if (found_reliable->second == endpoint_to_delete) {
- found_port->second.erase(_reliable);
- // delete if necessary
- if (!found_port->second.size()) {
- found_ip->second.erase(found_port);
- if (!found_ip->second.size()) {
- client_endpoints_by_ip_.erase(found_ip);
+ const auto found_partition = found_reliable->second.find(its_partition);
+ if (found_partition != found_reliable->second.end()) {
+ if (found_partition->second == endpoint_to_delete) {
+ found_reliable->second.erase(its_partition);
+ // delete if necessary
+ if (0 == found_reliable->second.size()) {
+ found_port->second.erase(_reliable);
+ if (0 == found_port->second.size()) {
+ found_ip->second.erase(found_port);
+ if (0 == found_ip->second.size()) {
+ client_endpoints_by_ip_.erase(found_ip);
+ }
+ }
}
}
}
@@ -455,8 +467,14 @@ void endpoint_manager_impl::find_or_create_multicast_endpoint(
std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
service_instances_multicast_[_service][_sender] = _instance;
}
- dynamic_cast<udp_server_endpoint_impl*>(its_endpoint.get())->join_unlocked(
- _address.to_string());
+
+ auto its_udp_server_endpoint
+ = std::dynamic_pointer_cast<udp_server_endpoint_impl>(its_endpoint);
+ if (_port != configuration_->get_sd_port()) {
+ its_udp_server_endpoint->join(_address.to_string());
+ } else {
+ its_udp_server_endpoint->join_unlocked(_address.to_string());
+ }
} else {
VSOMEIP_ERROR <<"Could not find/create multicast endpoint!";
}
@@ -540,11 +558,13 @@ void endpoint_manager_impl::print_status() const {
VSOMEIP_INFO << "status start remote client endpoints:";
std::uint32_t num_remote_client_endpoints(0);
// normal endpoints
- for (const auto &a : client_endpoints_by_ip) {
- for (const auto& p : a.second) {
- for (const auto& ru : p.second) {
- ru.second->print_status();
- num_remote_client_endpoints++;
+ for (const auto &its_address : client_endpoints_by_ip) {
+ for (const auto &its_port : its_address.second) {
+ for (const auto &its_reliability : its_port.second) {
+ for (const auto &its_partition : its_reliability.second) {
+ its_partition.second->print_status();
+ num_remote_client_endpoints++;
+ }
}
}
}
@@ -800,6 +820,37 @@ void endpoint_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
}
}
+bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, std::uint16_t _remote_port) {
+ std::lock_guard<std::recursive_mutex> its_ep_lock(endpoint_mutex_);
+ for (auto &its_service : remote_services_) {
+ for (auto &its_instance : its_service.second) {
+ const bool is_reliable = _endpoint->is_reliable();
+ auto found_endpoint = its_instance.second.find(is_reliable);
+ if (found_endpoint != its_instance.second.end()) {
+ if (found_endpoint->second == _endpoint) {
+ // get a new client port using service / instance / remote port
+ uint16_t its_old_local_port = _endpoint->get_local_port();
+ uint16_t its_new_local_port(ILLEGAL_PORT);
+
+ std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_);
+ if (configuration_->get_client_port(its_service.first,
+ its_instance.first,
+ _remote_port,
+ is_reliable,
+ used_client_ports_,
+ its_new_local_port)) {
+ _endpoint->set_local_port(its_new_local_port);
+ its_lock.unlock();
+ release_port(its_old_local_port, _endpoint->is_reliable());
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
void endpoint_manager_impl::on_error(
const byte_t *_data, length_t _length, endpoint* const _receiver,
const boost::asio::ip::address &_remote_address,
@@ -820,8 +871,10 @@ void endpoint_manager_impl::release_port(uint16_t _port, bool _reliable) {
used_client_ports_[_reliable].erase(_port);
}
-std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
+std::shared_ptr<endpoint>
+endpoint_manager_impl::find_remote_client(
service_t _service, instance_t _instance, bool _reliable) {
+
std::shared_ptr<endpoint> its_endpoint;
auto found_service = remote_services_.find(_service);
if (found_service != remote_services_.end()) {
@@ -837,35 +890,46 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
return its_endpoint;
}
- // If another service is hosted on the same server_endpoint
- // reuse the existing client_endpoint.
+ // Endpoint did not yet exist. Get the partition id to check
+ // whether the client endpoint for the partition does exist.
+ partition_id_t its_partition_id
+ = configuration_->get_partition_id(_service, _instance);
+
+ // If another service within the same partition is hosted on the
+ // same server_endpoint reuse the existing client_endpoint.
auto found_service_info = remote_service_info_.find(_service);
- if(found_service_info != remote_service_info_.end()) {
+ if (found_service_info != remote_service_info_.end()) {
auto found_instance = found_service_info->second.find(_instance);
- if(found_instance != found_service_info->second.end()) {
+ if (found_instance != found_service_info->second.end()) {
auto found_reliable = found_instance->second.find(_reliable);
- if(found_reliable != found_instance->second.end()) {
- std::shared_ptr<endpoint_definition> its_ep_def =
- found_reliable->second;
+ if (found_reliable != found_instance->second.end()) {
+ std::shared_ptr<endpoint_definition> its_ep_def
+ = found_reliable->second;
auto found_address = client_endpoints_by_ip_.find(
its_ep_def->get_address());
- if(found_address != client_endpoints_by_ip_.end()) {
+ if (found_address != client_endpoints_by_ip_.end()) {
auto found_port = found_address->second.find(
its_ep_def->get_remote_port());
- if(found_port != found_address->second.end()) {
- auto found_reliable2 = found_port->second.find(
- _reliable);
- if(found_reliable2 != found_port->second.end()) {
- its_endpoint = found_reliable2->second;
- // store the endpoint under this service/instance id
- // as well - needed for later cleanup
- remote_services_[_service][_instance][_reliable] =
- its_endpoint;
- service_instances_[_service][its_endpoint.get()] = _instance;
- // add endpoint to serviceinfo object
- auto found_service_info = rm_->find_service(_service,_instance);
- if (found_service_info) {
- found_service_info->set_endpoint(its_endpoint, _reliable);
+ if (found_port != found_address->second.end()) {
+ auto found_reliable2
+ = found_port->second.find(_reliable);
+ if (found_reliable2 != found_port->second.end()) {
+ auto found_partition
+ = found_reliable2->second.find(its_partition_id);
+ if (found_partition != found_reliable2->second.end()) {
+ its_endpoint = found_partition->second;
+
+ // store the endpoint under this service/instance id
+ // as well - needed for later cleanup
+ remote_services_[_service][_instance][_reliable]
+ = its_endpoint;
+ service_instances_[_service][its_endpoint.get()] = _instance;
+
+ // add endpoint to serviceinfo object
+ auto found_service_info = rm_->find_service(_service,_instance);
+ if (found_service_info) {
+ found_service_info->set_endpoint(its_endpoint, _reliable);
+ }
}
}
}
@@ -873,7 +937,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::find_remote_client(
}
}
}
- return its_endpoint;
+
+ return (its_endpoint);
}
std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
@@ -910,6 +975,8 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
}
if (its_endpoint) {
+ partition_id_t its_partition
+ = configuration_->get_partition_id(_service, _instance);
used_client_ports_[_reliable].insert(its_local_port);
its_lock.unlock();
service_instances_[_service][its_endpoint.get()] = _instance;
@@ -917,12 +984,19 @@ std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
client_endpoints_by_ip_[its_endpoint_def->get_address()]
[its_endpoint_def->get_port()]
- [_reliable] = its_endpoint;
+ [_reliable]
+ [its_partition]= its_endpoint;
// Set the basic route to the service in the service info
auto found_service_info = rm_->find_service(_service, _instance);
if (found_service_info) {
found_service_info->set_endpoint(its_endpoint, _reliable);
}
+ boost::system::error_code ec;
+ VSOMEIP_INFO << "endpoint_manager_impl::create_remote_client: "
+ << its_endpoint_def->get_address().to_string(ec)
+ << ":" << std::dec << its_endpoint_def->get_port()
+ << " reliable: " << _reliable
+ << " using local port: " << std::dec << its_local_port;
}
}
}
@@ -983,18 +1057,20 @@ endpoint_manager_impl::log_client_states() const {
its_client_endpoints = client_endpoints_by_ip_;
}
- for (const auto& its_address : its_client_endpoints) {
- for (const auto& its_port : its_address.second) {
- for (const auto& its_reliability : its_port.second) {
- size_t its_queue_size = its_reliability.second->get_queue_size();
- if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
- its_client_queue_sizes.push_back(
- std::make_pair(
- std::make_tuple(
- its_address.first,
- its_port.first,
- its_reliability.first),
- its_queue_size));
+ for (const auto &its_address : its_client_endpoints) {
+ for (const auto &its_port : its_address.second) {
+ for (const auto &its_reliability : its_port.second) {
+ for (const auto &its_partition : its_reliability.second) {
+ size_t its_queue_size = its_partition.second->get_queue_size();
+ if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
+ its_client_queue_sizes.push_back(
+ std::make_pair(
+ std::make_tuple(
+ its_address.first,
+ its_port.first,
+ its_reliability.first),
+ its_queue_size));
+ }
}
}
}
@@ -1040,8 +1116,8 @@ endpoint_manager_impl::log_server_states() const {
its_server_endpoints = server_endpoints_;
}
- for (const auto& its_port : its_server_endpoints) {
- for (const auto& its_reliability : its_port.second) {
+ for (const auto &its_port : its_server_endpoints) {
+ for (const auto &its_reliability : its_port.second) {
size_t its_queue_size = its_reliability.second->get_queue_size();
if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
its_client_queue_sizes.push_back(
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index 7e58e67..04c7787 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -203,20 +203,13 @@ bool local_client_endpoint_impl::send(const uint8_t *_data, uint32_t _size) {
return ret;
}
-void local_client_endpoint_impl::send_queued() {
+void local_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
std::vector<boost::asio::const_buffer> bufs;
- message_buffer_ptr_t its_buffer;
- if(queue_.size()) {
- its_buffer = queue_.front();
- } else {
- return;
- }
-
bufs.push_back(boost::asio::buffer(its_start_tag));
- bufs.push_back(boost::asio::buffer(*its_buffer));
+ bufs.push_back(boost::asio::buffer(*_buffer));
bufs.push_back(boost::asio::buffer(its_end_tag));
{
@@ -231,7 +224,7 @@ void local_client_endpoint_impl::send_queued() {
>(shared_from_this()),
std::placeholders::_1,
std::placeholders::_2,
- its_buffer
+ _buffer
)
);
}
diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp
index 1d412e4..ebf913f 100644
--- a/implementation/endpoints/src/local_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp
@@ -946,6 +946,10 @@ std::uint16_t local_server_endpoint_impl::get_local_port() const {
return 0;
}
+void local_server_endpoint_impl::set_local_port(std::uint16_t _port) {
+ (void) _port;
+}
+
bool local_server_endpoint_impl::check_packetizer_space(
queue_iterator_type _queue_iterator, message_buffer_ptr_t* _packetizer,
std::uint32_t _size) {
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp
index cfb5ea1..ddf6b25 100644
--- a/implementation/endpoints/src/server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/server_endpoint_impl.cpp
@@ -185,38 +185,45 @@ template<typename Protocol>bool server_endpoint_impl<Protocol>::send(const uint8
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
const client_t its_client = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]);
- clients_mutex_.lock();
- auto found_client = clients_.find(its_client);
- if (found_client != clients_.end()) {
- auto found_session = found_client->second.find(its_session);
- if (found_session != found_client->second.end()) {
- its_target = found_session->second;
+ requests_mutex_.lock();
+ auto found_client = requests_.find(its_client);
+ if (found_client != requests_.end()) {
+ auto its_request = std::make_tuple(its_service, its_method, its_session);
+ auto found_request = found_client->second.find(its_request);
+ if (found_request != found_client->second.end()) {
+ its_target = found_request->second;
is_valid_target = true;
- found_client->second.erase(its_session);
+ found_client->second.erase(found_request);
} else {
- VSOMEIP_WARNING << "server_endpoint::send: session_id 0x"
- << std::hex << its_session
- << " not found for client 0x" << its_client;
- const method_t its_method =
- VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
- _data[VSOMEIP_METHOD_POS_MAX]);
+ VSOMEIP_WARNING << "server_endpoint::send: request ["
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_method << "/"
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_client << "."
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_session
+ << "] could not be found.";
if (its_service == VSOMEIP_SD_SERVICE
&& its_method == VSOMEIP_SD_METHOD) {
VSOMEIP_ERROR << "Clearing clients map as a request was "
"received on SD port";
- clients_.clear();
+ requests_.clear();
is_valid_target = get_default_target(its_service, its_target);
}
}
} else {
is_valid_target = get_default_target(its_service, its_target);
}
- clients_mutex_.unlock();
+ requests_mutex_.unlock();
if (is_valid_target) {
is_valid_target = send_intern(its_target, _data, _size);
@@ -757,7 +764,7 @@ size_t server_endpoint_impl<Protocol>::get_queue_size() const {
{
std::lock_guard<std::mutex> its_lock(mutex_);
- for (const auto& q : queues_) {
+ for (const auto &q : queues_) {
its_queue_size += q.second.second.size();
}
}
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index 3debcc7..f88d2a2 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -50,6 +50,7 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl(
tcp_restart_aborts_max_(configuration_->get_max_tcp_restart_aborts()),
tcp_connect_time_max_(configuration_->get_max_tcp_connect_time()),
aborted_restart_count_(0),
+ is_sending_(false),
sent_timer_(_io) {
is_supporting_magic_cookies_ = true;
@@ -67,64 +68,71 @@ bool tcp_client_endpoint_impl::is_local() const {
}
void tcp_client_endpoint_impl::start() {
- connect();
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
}
void tcp_client_endpoint_impl::restart(bool _force) {
- if (!_force && state_ == cei_state_e::CONNECTING) {
- std::chrono::steady_clock::time_point its_current
- = std::chrono::steady_clock::now();
- long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
- its_current - connect_timepoint_).count();
- if (aborted_restart_count_ < tcp_restart_aborts_max_
- && its_connect_duration < tcp_connect_time_max_) {
- aborted_restart_count_++;
- return;
- } else {
- VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
- << tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
- << its_connect_duration;
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ auto restart_func = [self, _force] {
+ if (!_force && self->state_ == cei_state_e::CONNECTING) {
+ std::chrono::steady_clock::time_point its_current
+ = std::chrono::steady_clock::now();
+ long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+ its_current - self->connect_timepoint_).count();
+ if (self->aborted_restart_count_ < self->tcp_restart_aborts_max_
+ && its_connect_duration < self->tcp_connect_time_max_) {
+ self->aborted_restart_count_++;
+ return;
+ } else {
+ VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
+ << self->tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
+ << its_connect_duration;
+ }
}
- }
- state_ = cei_state_e::CONNECTING;
- std::string address_port_local;
- {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- address_port_local = get_address_port_local();
- shutdown_and_close_socket_unlocked(true);
- recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0);
- }
- was_not_connected_ = true;
- reconnect_counter_ = 0;
- {
- std::lock_guard<std::mutex> its_lock(mutex_);
- for (const auto&m : queue_) {
- const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_SERVICE_POS_MIN],
- (*m)[VSOMEIP_SERVICE_POS_MAX]);
- const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_METHOD_POS_MIN],
- (*m)[VSOMEIP_METHOD_POS_MAX]);
- const client_t its_client = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_CLIENT_POS_MIN],
- (*m)[VSOMEIP_CLIENT_POS_MAX]);
- const session_t its_session = VSOMEIP_BYTES_TO_WORD(
- (*m)[VSOMEIP_SESSION_POS_MIN],
- (*m)[VSOMEIP_SESSION_POS_MAX]);
- VSOMEIP_WARNING << "tce::restart: dropping message: "
- << "remote:" << get_address_port_remote() << " ("
- << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
- << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
- << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
- << " size: " << std::dec << m->size();
+ self->state_ = cei_state_e::CONNECTING;
+ std::string address_port_local;
+ {
+ std::lock_guard<std::mutex> its_lock(self->socket_mutex_);
+ address_port_local = self->get_address_port_local();
+ self->shutdown_and_close_socket_unlocked(true);
+ self->recv_buffer_ = std::make_shared<message_buffer_t>(self->recv_buffer_size_initial_, 0);
}
- queue_.clear();
- queue_size_ = 0;
- }
- VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
- << " remote: " << get_address_port_remote();
- start_connect_timer();
+ self->was_not_connected_ = true;
+ self->reconnect_counter_ = 0;
+ {
+ std::lock_guard<std::mutex> its_lock(self->mutex_);
+ for (const auto&m : self->queue_) {
+ const service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_SERVICE_POS_MIN],
+ (*m)[VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_METHOD_POS_MIN],
+ (*m)[VSOMEIP_METHOD_POS_MAX]);
+ const client_t its_client = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_CLIENT_POS_MIN],
+ (*m)[VSOMEIP_CLIENT_POS_MAX]);
+ const session_t its_session = VSOMEIP_BYTES_TO_WORD(
+ (*m)[VSOMEIP_SESSION_POS_MIN],
+ (*m)[VSOMEIP_SESSION_POS_MAX]);
+ VSOMEIP_WARNING << "tce::restart: dropping message: "
+ << "remote:" << self->get_address_port_remote() << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
+ << " size: " << std::dec << m->size();
+ }
+ self->queue_.clear();
+ self->queue_size_ = 0;
+ }
+ VSOMEIP_WARNING << "tce::restart: local: " << address_port_local
+ << " remote: " << self->get_address_port_remote();
+ self->start_connect_timer();
+ };
+ // bind to strand_ to avoid socket closure if
+ // parallel socket operation is currently active
+ strand_.dispatch(restart_func);
}
void tcp_client_endpoint_impl::connect() {
@@ -169,26 +177,54 @@ void tcp_client_endpoint_impl::connect() {
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(socket_->native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "TCP Client: Could not bind to device \"" << its_device << "\"";
}
}
#endif
- // Bind address and, optionally, port.
- boost::system::error_code its_bind_error;
- socket_->bind(local_, its_bind_error);
- if(its_bind_error) {
- VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
- "Error binding socket: " << its_bind_error.message()
- << " remote:" << get_address_port_remote();
- try {
- // don't connect on bind error to avoid using a random port
- strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
- shared_from_this(), its_bind_error));
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
- << e.what() << " remote:" << get_address_port_remote();
+ // In case a client endpoint port was configured,
+ // bind to it before connecting
+ if (local_.port() != ILLEGAL_PORT) {
+ boost::system::error_code its_bind_error;
+ socket_->bind(local_, its_bind_error);
+ if(its_bind_error) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
+ "Error binding socket: " << its_bind_error.message()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ // set new client port depending on service / instance / remote port
+ if (!its_host->on_bind_error(shared_from_this(), remote_port_)) {
+ VSOMEIP_WARNING << "tcp_client_endpoint::connect: "
+ "Failed to set new local port for tce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ local_.port(local_port_);
+ VSOMEIP_INFO << "tcp_client_endpoint::connect: "
+ "Using new new local port for tce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ }
+ try {
+ // don't connect on bind error to avoid using a random port
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ shared_from_this(), its_bind_error));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
+ << e.what()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ return;
}
return;
}
@@ -220,7 +256,10 @@ void tcp_client_endpoint_impl::receive() {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
its_recv_buffer = recv_buffer_;
}
- receive(its_recv_buffer, 0, 0);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &its_recv_buffer](){
+ self->receive(its_recv_buffer, 0, 0);
+ });
}
void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
@@ -277,32 +316,26 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
}
}
-void tcp_client_endpoint_impl::send_queued() {
- message_buffer_ptr_t its_buffer;
- if(queue_.size()) {
- its_buffer = queue_.front();
- } else {
- return;
- }
+void tcp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
- (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
+ (*_buffer)[VSOMEIP_SERVICE_POS_MIN],
+ (*_buffer)[VSOMEIP_SERVICE_POS_MAX]);
const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
- (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
+ (*_buffer)[VSOMEIP_METHOD_POS_MIN],
+ (*_buffer)[VSOMEIP_METHOD_POS_MAX]);
const client_t its_client = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
- (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
+ (*_buffer)[VSOMEIP_CLIENT_POS_MIN],
+ (*_buffer)[VSOMEIP_CLIENT_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
- (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
- (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
+ (*_buffer)[VSOMEIP_SESSION_POS_MIN],
+ (*_buffer)[VSOMEIP_SESSION_POS_MAX]);
if (has_enabled_magic_cookies_) {
const std::chrono::steady_clock::time_point now =
std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
- send_magic_cookie(its_buffer);
+ send_magic_cookie(_buffer);
last_cookie_sent_ = now;
}
}
@@ -312,9 +345,9 @@ void tcp_client_endpoint_impl::send_queued() {
std::stringstream msg;
msg << "tcei<" << remote_.address() << ":"
<< std::dec << remote_.port() << ">::sq: ";
- for (std::size_t i = 0; i < its_buffer->size(); i++)
+ for (std::size_t i = 0; i < _buffer->size(); i++)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)(*its_buffer)[i] << " ";
+ << (int)(*_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
{
@@ -326,21 +359,23 @@ void tcp_client_endpoint_impl::send_queued() {
}
boost::asio::async_write(
*socket_,
- boost::asio::buffer(*its_buffer),
- std::bind(&tcp_client_endpoint_impl::write_completion_condition,
- std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2,
- its_buffer->size(),
- its_service, its_method, its_client, its_session,
- std::chrono::steady_clock::now()),
+ boost::asio::buffer(*_buffer),
std::bind(
+ &tcp_client_endpoint_impl::write_completion_condition,
+ std::static_pointer_cast<tcp_client_endpoint_impl>(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ _buffer->size(),
+ its_service, its_method, its_client, its_session,
+ std::chrono::steady_clock::now()),
+ strand_.wrap(
+ std::bind(
&tcp_client_endpoint_base_impl::send_cbk,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2,
- its_buffer
- )
+ _buffer
+ ))
);
}
}
@@ -675,7 +710,10 @@ void tcp_client_endpoint_impl::receive_cbk(
}
}
its_lock.unlock();
- receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
+ self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ });
} else {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
<< _error.message() << "(" << std::dec << _error.value()
@@ -700,7 +738,10 @@ void tcp_client_endpoint_impl::receive_cbk(
}
} else {
its_lock.unlock();
- receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch([self, &_recv_buffer, _recv_buffer_size, its_missing_capacity](){
+ self->receive(_recv_buffer, _recv_buffer_size, its_missing_capacity);
+ });
}
}
}
@@ -838,7 +879,13 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
if (queue_.size() > 0) {
queue_size_ -= queue_.front()->size();
queue_.pop_front();
- send_queued();
+ auto its_buffer = get_front();
+ if (its_buffer) {
+ auto self = std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this());
+ strand_.dispatch(
+ [self, its_buffer]() { self->send_queued(its_buffer);}
+ );
+ }
}
} else if (_error == boost::system::errc::destination_address_required) {
VSOMEIP_WARNING << "tce::send_cbk received error: " << _error.message()
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
index 1cd2b5b..37db3f5 100644
--- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
@@ -50,7 +50,7 @@ tcp_server_endpoint_impl::tcp_server_endpoint_impl(
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(acceptor_.native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "TCP Server: Could not bind to device \"" << its_device << "\"";
}
}
@@ -152,6 +152,38 @@ void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iter
<< static_cast<std::uint16_t>(_queue_iterator->first.port())
<< " dropping outstanding messages (" << std::dec
<< _queue_iterator->second.second.size() << ").";
+
+ if (_queue_iterator->second.second.size()) {
+ std::set<service_t> its_services;
+
+ // check all outstanding messages of this connection
+ // whether stop handlers need to be called
+ for (const auto &its_buffer : _queue_iterator->second.second) {
+ if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
+ service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
+ (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
+ its_services.insert(its_service);
+ }
+ }
+
+ for (auto its_service : its_services) {
+ auto found_cbk = prepare_stop_handlers_.find(its_service);
+ if (found_cbk != prepare_stop_handlers_.end()) {
+ VSOMEIP_INFO << "Calling prepare stop handler "
+ << "for service: 0x"
+ << std::hex << std::setw(4) << std::setfill('0')
+ << its_service;
+ auto handler = found_cbk->second;
+ auto ptr = this->shared_from_this();
+ service_.post([ptr, handler, its_service](){
+ handler(ptr, its_service);
+ });
+ prepare_stop_handlers_.erase(found_cbk);
+ }
+ }
+ }
+
queues_.erase(_queue_iterator->first);
}
}
@@ -259,6 +291,10 @@ std::uint16_t tcp_server_endpoint_impl::get_local_port() const {
return local_port_;
}
+void tcp_server_endpoint_impl::set_local_port(std::uint16_t _port) {
+ (void)_port;
+}
+
bool tcp_server_endpoint_impl::is_reliable() const {
return true;
}
@@ -551,12 +587,19 @@ void tcp_server_endpoint_impl::connection::receive_cbk(
recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN],
recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MAX]);
if (its_client != MAGIC_COOKIE_CLIENT) {
+ const service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MIN],
+ recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MIN],
+ recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN],
recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MAX]);
- its_server->clients_mutex_.lock();
- its_server->clients_[its_client][its_session] = remote_;
- its_server->clients_mutex_.unlock();
+
+ std::lock_guard<std::mutex> its_requests_guard(its_server->requests_mutex_);
+ its_server->requests_[its_client]
+ [std::make_tuple(its_service, its_method, its_session)] = remote_;
}
}
if (!magic_cookies_enabled_) {
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index dc7a7bf..3b9a212 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -14,6 +14,8 @@
#include "../../routing/include/routing_host.hpp"
#include "../include/udp_client_endpoint_impl.hpp"
#include "../../utility/include/utility.hpp"
+#include "../../utility/include/byteorder.hpp"
+
namespace vsomeip_v3 {
@@ -61,27 +63,50 @@ void udp_client_endpoint_impl::connect() {
socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
if (its_error) {
VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't enable "
- << "SO_REUSEADDR: " << its_error.message() << " remote:"
- << get_address_port_remote();
+ << "SO_REUSEADDR: " << its_error.message()
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
}
+
socket_->set_option(boost::asio::socket_base::receive_buffer_size(
udp_receive_buffer_size_), its_error);
if (its_error) {
VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't set "
- << "SO_RCVBUF: " << its_error.message() << " to: "
- << std::dec << udp_receive_buffer_size_ << " remote:"
- << get_address_port_remote();
- } else {
- boost::asio::socket_base::receive_buffer_size its_option;
- socket_->get_option(its_option, its_error);
- if (its_error) {
- VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get "
- << "SO_RCVBUF: " << its_error.message() << " remote:"
- << get_address_port_remote();
- } else {
- VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: "
- << std::dec << its_option.value();
+ << "SO_RCVBUF: " << its_error.message()
+ << " to: " << std::dec << udp_receive_buffer_size_
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+
+ boost::asio::socket_base::receive_buffer_size its_option;
+ socket_->get_option(its_option, its_error);
+ #ifdef __linux__
+ // If regular setting of the buffer size did not work, try to force
+ // (requires CAP_NET_ADMIN to be successful)
+ if (its_option.value() < 0
+ || its_option.value() < udp_receive_buffer_size_) {
+ its_error.assign(setsockopt(socket_->native_handle(),
+ SOL_SOCKET, SO_RCVBUFFORCE,
+ &udp_receive_buffer_size_, sizeof(udp_receive_buffer_size_)),
+ boost::system::generic_category());
+ if (!its_error) {
+ VSOMEIP_INFO << "udp_client_endpoint_impl::connect: "
+ << "SO_RCVBUFFORCE successful!";
}
+ socket_->get_option(its_option, its_error);
+ }
+ #endif
+ if (its_error) {
+ VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get "
+ << "SO_RCVBUF: " << its_error.message()
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: "
+ << std::dec << its_option.value()
+ << " (" << udp_receive_buffer_size_ << ")"
+ << " local port:" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
}
#ifndef _WIN32
@@ -89,26 +114,53 @@ void udp_client_endpoint_impl::connect() {
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(socket_->native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "UDP Client: Could not bind to device \"" << its_device << "\"";
}
}
#endif
- // Bind address and, optionally, port.
- boost::system::error_code its_bind_error;
- socket_->bind(local_, its_bind_error);
- if(its_bind_error) {
- VSOMEIP_WARNING << "udp_client_endpoint::connect: "
- "Error binding socket: " << its_bind_error.message()
- << " remote:" << get_address_port_remote();
- try {
- // don't connect on bind error to avoid using a random port
- strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
- shared_from_this(), its_bind_error));
- } catch (const std::exception &e) {
- VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: "
- << e.what() << " remote:" << get_address_port_remote();
+ // In case a client endpoint port was configured,
+ // bind to it before connecting
+ if (local_.port() != ILLEGAL_PORT) {
+ boost::system::error_code its_bind_error;
+ socket_->bind(local_, its_bind_error);
+ if(its_bind_error) {
+ VSOMEIP_WARNING << "udp_client_endpoint::connect: "
+ "Error binding socket: " << its_bind_error.message()
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ // set new client port depending on service / instance / remote port
+ if (!its_host->on_bind_error(shared_from_this(), remote_port_)) {
+ VSOMEIP_WARNING << "udp_client_endpoint::connect: "
+ "Failed to set new local port for uce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ } else {
+ local_.port(local_port_);
+ VSOMEIP_INFO << "udp_client_endpoint::connect: "
+ "Using new new local port for uce: "
+ << " local: " << local_.address().to_string()
+ << ":" << std::dec << local_.port()
+ << " remote:" << get_address_port_remote();
+ }
+ }
+
+
+ try {
+ // don't connect on bind error to avoid using a random port
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ shared_from_this(), its_bind_error));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: "
+ << e.what() << " remote:" << get_address_port_remote();
+ }
+ return;
}
return;
}
@@ -158,32 +210,26 @@ void udp_client_endpoint_impl::restart(bool _force) {
start_connect_timer();
}
-void udp_client_endpoint_impl::send_queued() {
- message_buffer_ptr_t its_buffer;
- if(queue_.size()) {
- its_buffer = queue_.front();
- } else {
- return;
- }
+void udp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
#if 0
std::stringstream msg;
msg << "ucei<" << remote_.address() << ":"
<< std::dec << remote_.port() << ">::sq: ";
- for (std::size_t i = 0; i < its_buffer->size(); i++)
+ for (std::size_t i = 0; i < _buffer->size(); i++)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)(*its_buffer)[i] << " ";
+ << (int)(*_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
socket_->async_send(
- boost::asio::buffer(*its_buffer),
+ boost::asio::buffer(*_buffer),
std::bind(
&udp_client_endpoint_base_impl::send_cbk,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2,
- its_buffer
+ _buffer
)
);
}
@@ -354,7 +400,12 @@ void udp_client_endpoint_impl::receive_cbk(
receive();
} else {
if (_error == boost::asio::error::connection_refused) {
- shutdown_and_close_socket(false);
+ VSOMEIP_WARNING << "uce::receive_cbk: local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote()
+ << " error: " << _error.message();
+ std::shared_ptr<endpoint_host> its_ep_host = endpoint_host_.lock();
+ its_ep_host->on_disconnect(shared_from_this());
+ restart(false);
} else {
receive();
}
@@ -415,6 +466,134 @@ std::string udp_client_endpoint_impl::get_remote_information() const {
+ std::to_string(remote_.port());
}
+void udp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, std::size_t _bytes,
+ const message_buffer_ptr_t &_sent_msg) {
+ (void)_bytes;
+ if (!_error) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ if (queue_.size() > 0) {
+ queue_size_ -= queue_.front()->size();
+ queue_.pop_front();
+ auto its_buffer = get_front();
+ if (its_buffer)
+ send_queued(its_buffer);
+ }
+ } else if (_error == boost::asio::error::broken_pipe) {
+ state_ = cei_state_e::CLOSED;
+ bool stopping(false);
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ stopping = sending_blocked_;
+ if (stopping) {
+ queue_.clear();
+ queue_size_ = 0;
+ } else {
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_WARNING << "uce::send_cbk received error: "
+ << _error.message() << " (" << std::dec
+ << _error.value() << ") " << get_remote_information()
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ }
+ }
+ if (!stopping) {
+ print_status();
+ }
+ was_not_connected_ = true;
+ shutdown_and_close_socket(true);
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
+ } else if (_error == boost::asio::error::not_connected
+ || _error == boost::asio::error::bad_descriptor
+ || _error == boost::asio::error::no_permission) {
+ state_ = cei_state_e::CLOSED;
+ if (_error == boost::asio::error::no_permission) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.clear();
+ queue_size_ = 0;
+ }
+ was_not_connected_ = true;
+ shutdown_and_close_socket(true);
+ strand_.dispatch(std::bind(&client_endpoint_impl::connect,
+ this->shared_from_this()));
+ } else if (_error == boost::asio::error::operation_aborted) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message();
+ // endpoint was stopped
+ sending_blocked_ = true;
+ shutdown_and_close_socket(false);
+ } else if (_error == boost::system::errc::destination_address_required) {
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ was_not_connected_ = true;
+ } else {
+ if (state_ == cei_state_e::CONNECTING) {
+ VSOMEIP_WARNING << "uce::send_cbk endpoint is already restarting:"
+ << get_remote_information();
+ } else {
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket(false);
+ std::shared_ptr<endpoint_host> its_host = endpoint_host_.lock();
+ if (its_host) {
+ its_host->on_disconnect(shared_from_this());
+ }
+ restart(true);
+ }
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
+ its_service = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
+ (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
+ (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
+ (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_WARNING << "uce::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information() << " "
+ << " " << std::dec << queue_.size()
+ << " " << std::dec << queue_size_ << " ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
+ print_status();
+ }
+}
+
bool udp_client_endpoint_impl::tp_segmentation_enabled(service_t _service,
method_t _method) const {
return configuration_->tp_segment_messages_client_to_service(_service,
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
index 7aadf3f..bd44b48 100644
--- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
@@ -55,7 +55,7 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
std::string its_device(configuration_->get_device());
if (its_device != "") {
if (setsockopt(unicast_socket_.native_handle(),
- SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
VSOMEIP_WARNING << "UDP Server: Could not bind to device \"" << its_device << "\"";
}
}
@@ -79,28 +79,46 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
unicast_socket_.set_option(option, ec);
boost::asio::detail::throw_error(ec, "broadcast option");
- const std::uint32_t its_udp_recv_buffer_size =
+ const int its_udp_recv_buffer_size =
configuration_->get_udp_receive_buffer_size();
unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size(
its_udp_recv_buffer_size), ec);
-
if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set "
+ VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't set "
<< "SO_RCVBUF: " << ec.message() << " to: " << std::dec
<< its_udp_recv_buffer_size << " local port: " << std::dec
<< local_port_;
- } else {
- boost::asio::socket_base::receive_buffer_size its_option;
- unicast_socket_.get_option(its_option, ec);
- if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
- << "SO_RCVBUF: " << ec.message() << " local port:"
- << std::dec << local_port_;
- } else {
- VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF is: "
- << std::dec << its_option.value();
+ }
+
+ boost::asio::socket_base::receive_buffer_size its_option;
+ unicast_socket_.get_option(its_option, ec);
+#ifdef __linux__
+ // If regular setting of the buffer size did not work, try to force
+ // (requires CAP_NET_ADMIN to be successful)
+ if (its_option.value() < 0
+ || its_option.value() < its_udp_recv_buffer_size) {
+ ec.assign(setsockopt(unicast_socket_.native_handle(),
+ SOL_SOCKET, SO_RCVBUFFORCE,
+ &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)),
+ boost::system::generic_category());
+ if (!ec) {
+ VSOMEIP_INFO << "udp_server_endpoint_impl: "
+ << "SO_RCVBUFFORCE successful.";
}
+ unicast_socket_.get_option(its_option, ec);
}
+#endif
+ if (ec) {
+ VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
+ << "SO_RCVBUF: " << ec.message() << " local port:"
+ << std::dec << local_port_;
+ } else {
+ VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF is: "
+ << std::dec << its_option.value()
+ << " (" << its_udp_recv_buffer_size << ") local port:"
+ << std::dec << local_port_;
+ }
+
#ifdef _WIN32
const char* optval("0001");
@@ -345,29 +363,44 @@ void udp_server_endpoint_impl::join_unlocked(const std::string &_address) {
multicast_socket_->bind(*multicast_local_, ec);
boost::asio::detail::throw_error(ec, "bind multicast");
- const std::uint32_t its_udp_recv_buffer_size =
+ const int its_udp_recv_buffer_size =
configuration_->get_udp_receive_buffer_size();
-
multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size(
its_udp_recv_buffer_size), ec);
-
if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set "
+ VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't set "
<< "SO_RCVBUF: " << ec.message() << " to: " << std::dec
<< its_udp_recv_buffer_size << " local port: " << std::dec
<< local_port_;
- } else {
- boost::asio::socket_base::receive_buffer_size its_option;
- multicast_socket_->get_option(its_option, ec);
+ }
- if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
- << "SO_RCVBUF: " << ec.message() << " local port:"
- << std::dec << local_port_;
- } else {
- VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF (Multicast) is: "
- << std::dec << its_option.value();
+ boost::asio::socket_base::receive_buffer_size its_option;
+ multicast_socket_->get_option(its_option, ec);
+ #ifdef __linux__
+ // If regular setting of the buffer size did not work, try to force
+ // (requires CAP_NET_ADMIN to be successful)
+ if (its_option.value() < 0
+ || its_option.value() < its_udp_recv_buffer_size) {
+ ec.assign(setsockopt(multicast_socket_->native_handle(),
+ SOL_SOCKET, SO_RCVBUFFORCE,
+ &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)),
+ boost::system::generic_category());
+ if (!ec) {
+ VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: "
+ << "SO_RCVBUFFORCE: successful.";
}
+ multicast_socket_->get_option(its_option, ec);
+ }
+ #endif
+ if (ec) {
+ VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't get "
+ << "SO_RCVBUF: " << ec.message() << " local port:"
+ << std::dec << local_port_;
+ } else {
+ VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: SO_RCVBUF is: "
+ << std::dec << its_option.value()
+ << " (" << its_udp_recv_buffer_size << ") local port:"
+ << std::dec << local_port_;
}
#ifdef _WIN32
@@ -413,7 +446,8 @@ void udp_server_endpoint_impl::join_unlocked(const std::string &_address) {
joined_group_ = true;
} catch (const std::exception &e) {
- VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what();
+ VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what()
+ << " address: " << _address;
}
};
@@ -467,7 +501,8 @@ void udp_server_endpoint_impl::leave_unlocked(const std::string &_address) {
}
}
catch (const std::exception &e) {
- VSOMEIP_ERROR << __func__ << ":" << e.what();
+ VSOMEIP_ERROR << __func__ << ":" << e.what()
+ << " address: " << _address;
}
}
@@ -500,6 +535,10 @@ std::uint16_t udp_server_endpoint_impl::get_local_port() const {
return local_port_;
}
+void udp_server_endpoint_impl::set_local_port(std::uint16_t _port) {
+ (void)_port;
+}
+
void udp_server_endpoint_impl::on_unicast_received(
boost::system::error_code const &_error,
std::size_t _bytes,
@@ -624,9 +663,13 @@ void udp_server_endpoint_impl::on_message_received(
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_buffer[i + VSOMEIP_SESSION_POS_MIN],
_buffer[i + VSOMEIP_SESSION_POS_MAX]);
- clients_mutex_.lock();
- clients_[its_client][its_session] = _remote;
- clients_mutex_.unlock();
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ _buffer[i + VSOMEIP_METHOD_POS_MIN],
+ _buffer[i + VSOMEIP_METHOD_POS_MAX]);
+
+ std::lock_guard<std::mutex> its_requests_guard(requests_mutex_);
+ requests_[its_client]
+ [std::make_tuple(its_service, its_method, its_session)] = _remote;
}
} else if (its_service != VSOMEIP_SD_SERVICE
&& utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
@@ -656,11 +699,19 @@ void udp_server_endpoint_impl::on_message_received(
res.second[VSOMEIP_CLIENT_POS_MIN],
res.second[VSOMEIP_CLIENT_POS_MAX]);
if (its_client != MAGIC_COOKIE_CLIENT) {
+ const service_t its_service = VSOMEIP_BYTES_TO_WORD(
+ res.second[VSOMEIP_SERVICE_POS_MIN],
+ res.second[VSOMEIP_SERVICE_POS_MAX]);
+ const method_t its_method = VSOMEIP_BYTES_TO_WORD(
+ res.second[VSOMEIP_METHOD_POS_MIN],
+ res.second[VSOMEIP_METHOD_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
res.second[VSOMEIP_SESSION_POS_MIN],
res.second[VSOMEIP_SESSION_POS_MAX]);
- std::lock_guard<std::mutex> its_client_lock(clients_mutex_);
- clients_[its_client][its_session] = _remote;
+
+ std::lock_guard<std::mutex> its_requests_guard(requests_mutex_);
+ requests_[its_client]
+ [std::make_tuple(its_service, its_method, its_session)] = _remote;
}
} else if (its_service != VSOMEIP_SD_SERVICE
&& utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS])
diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
index c2c917f..5c8981c 100644
--- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp
@@ -112,6 +112,10 @@ std::uint16_t virtual_server_endpoint_impl::get_local_port() const {
return port_;
}
+void virtual_server_endpoint_impl::set_local_port(std::uint16_t _port) {
+ port_ = _port;
+}
+
std::uint16_t virtual_server_endpoint_impl::get_remote_port() const {
return ILLEGAL_PORT;
}
diff --git a/implementation/logger/src/message.cpp b/implementation/logger/src/message.cpp
index e4a902f..3363416 100644
--- a/implementation/logger/src/message.cpp
+++ b/implementation/logger/src/message.cpp
@@ -135,7 +135,8 @@ message::~message() {
<< std::endl;
}
}
- } else if (its_configuration->has_dlt_log()) {
+ }
+ if (its_configuration->has_dlt_log()) {
#ifdef USE_DLT
its_logger->log(level_, buffer_.data_.str().c_str());
#endif // USE_DLT
diff --git a/implementation/message/src/deserializer.cpp b/implementation/message/src/deserializer.cpp
index c7464cb..0e33faa 100644
--- a/implementation/message/src/deserializer.cpp
+++ b/implementation/message/src/deserializer.cpp
@@ -111,12 +111,12 @@ bool deserializer::deserialize(uint8_t *_data, std::size_t _length) {
return true;
}
-bool deserializer::deserialize(std::string& _target, std::size_t _length) {
+bool deserializer::deserialize(std::string &_target, std::size_t _length) {
if (_length > remaining_ || _length > _target.capacity()) {
return false;
}
- _target.assign(position_, position_ + _length);
- position_ += _length;
+ _target.assign(position_, position_ + long(_length));
+ position_ += long(_length);
remaining_ -= _length;
return true;
@@ -135,7 +135,7 @@ bool deserializer::deserialize(std::vector< uint8_t >& _value) {
}
bool deserializer::look_ahead(std::size_t _index, uint8_t &_value) const {
- if (_index >= data_.size())
+ if (_index > remaining_)
return false;
_value = *(position_ + static_cast<std::vector<byte_t>::difference_type>(_index));
@@ -144,7 +144,7 @@ bool deserializer::look_ahead(std::size_t _index, uint8_t &_value) const {
}
bool deserializer::look_ahead(std::size_t _index, uint16_t &_value) const {
- if (_index+1 >= data_.size())
+ if (_index+1 > remaining_)
return false;
std::vector< uint8_t >::iterator i = position_ +
@@ -155,7 +155,7 @@ bool deserializer::look_ahead(std::size_t _index, uint16_t &_value) const {
}
bool deserializer::look_ahead(std::size_t _index, uint32_t &_value) const {
- if (_index+3 >= data_.size())
+ if (_index+3 > remaining_)
return false;
std::vector< uint8_t >::const_iterator i = position_ + static_cast<std::vector<byte_t>::difference_type>(_index);
diff --git a/implementation/routing/include/eventgroupinfo.hpp b/implementation/routing/include/eventgroupinfo.hpp
index 32ce5f2..8ec1ac6 100644
--- a/implementation/routing/include/eventgroupinfo.hpp
+++ b/implementation/routing/include/eventgroupinfo.hpp
@@ -42,7 +42,7 @@ public:
VSOMEIP_EXPORT eventgroupinfo(
const service_t _service, const service_t _instance,
const eventgroup_t _eventgroup, const major_version_t _major,
- const ttl_t _ttl);
+ const ttl_t _ttl, const uint8_t _max_remote_subscribers);
VSOMEIP_EXPORT ~eventgroupinfo();
VSOMEIP_EXPORT service_t get_service() const;
@@ -86,6 +86,9 @@ public:
std::set<client_t> &_changed, remote_subscription_id_t &_id,
const bool _is_subscribe);
+ bool is_remote_subscription_limit_reached(
+ const std::shared_ptr<remote_subscription> &_subscription);
+
remote_subscription_id_t add_remote_subscription(
const std::shared_ptr<remote_subscription> &_subscription);
@@ -107,6 +110,10 @@ public:
VSOMEIP_EXPORT void send_initial_events(
const std::shared_ptr<endpoint_definition> &_reliable,
const std::shared_ptr<endpoint_definition> &_unreliable) const;
+
+ VSOMEIP_EXPORT uint8_t get_max_remote_subscribers() const;
+ VSOMEIP_EXPORT void set_max_remote_subscribers(uint8_t _max_remote_subscribers);
+
private:
void update_id();
uint32_t get_unreliable_target_count() const;
@@ -131,9 +138,12 @@ private:
std::shared_ptr<remote_subscription>
> subscriptions_;
remote_subscription_id_t id_;
+ std::map<boost::asio::ip::address, uint8_t> remote_subscribers_count_;
std::atomic<reliability_type_e> reliability_;
std::atomic<bool> reliability_auto_mode_;
+
+ uint8_t max_remote_subscribers_;
};
} // namespace vsomeip_v3
diff --git a/implementation/routing/include/remote_subscription.hpp b/implementation/routing/include/remote_subscription.hpp
index 685aad9..ff94d5b 100644
--- a/implementation/routing/include/remote_subscription.hpp
+++ b/implementation/routing/include/remote_subscription.hpp
@@ -31,6 +31,7 @@ public:
bool operator==(const remote_subscription &_other) const;
bool equals(const std::shared_ptr<remote_subscription> &_other) const;
+ bool address_equals(const std::shared_ptr<remote_subscription> &_other) const;
VSOMEIP_EXPORT void reset(const std::set<client_t> &_clients);
@@ -89,6 +90,8 @@ public:
VSOMEIP_EXPORT std::uint32_t get_answers() const;
VSOMEIP_EXPORT void set_answers(const std::uint32_t _answers);
+ VSOMEIP_EXPORT bool get_ip_address(boost::asio::ip::address &_address) const;
+
private:
std::atomic<remote_subscription_id_t> id_;
std::atomic<bool> is_initial_;
diff --git a/implementation/routing/include/routing_manager_base.hpp b/implementation/routing/include/routing_manager_base.hpp
index acd57fe..270f4c4 100644
--- a/implementation/routing/include/routing_manager_base.hpp
+++ b/implementation/routing/include/routing_manager_base.hpp
@@ -121,6 +121,8 @@ public:
virtual void set_routing_state(routing_state_e _routing_state) = 0;
+ virtual routing_state_e get_routing_state();
+
virtual void register_client_error_handler(client_t _client,
const std::shared_ptr<endpoint> &_endpoint) = 0;
@@ -131,6 +133,7 @@ public:
std::shared_ptr<serviceinfo> find_service(service_t _service, instance_t _instance) const;
client_t find_local_client(service_t _service, instance_t _instance) const;
+ client_t find_local_client_unlocked(service_t _service, instance_t _instance) const;
std::shared_ptr<event> find_event(service_t _service, instance_t _instance,
event_t _event) const;
@@ -208,6 +211,8 @@ protected:
instance_t _instance, method_t _method);
bool is_subscribe_to_any_event_allowed(credentials_t _credentials, client_t _client,
service_t _service, instance_t _instance, eventgroup_t _eventgroup);
+ void unsubscribe_all(service_t _service, instance_t _instance);
+
#ifdef VSOMEIP_ENABLE_COMPAT
void set_incoming_subscription_state(client_t _client, service_t _service, instance_t _instance,
eventgroup_t _eventgroup, event_t _event, subscription_state_e _state);
@@ -259,6 +264,9 @@ protected:
std::mutex event_registration_mutex_;
+ std::mutex routing_state_mutex_;
+ routing_state_e routing_state_;
+
#ifdef USE_DLT
std::shared_ptr<trace::connector_impl> tc_;
#endif
diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp
index b5f0ea0..3fd6eed 100644
--- a/implementation/routing/include/routing_manager_impl.hpp
+++ b/implementation/routing/include/routing_manager_impl.hpp
@@ -140,7 +140,7 @@ public:
void on_subscribe_nack(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, event_t _event,
- remote_subscription_id_t _id);
+ remote_subscription_id_t _id, bool _simulated);
// interface to stub
@@ -396,6 +396,12 @@ private:
const std::set<client_t> &_removed,
const remote_subscription_id_t _id);
+ void send_expired_subscription(client_t _offering_client,
+ const service_t _service, const instance_t _instance,
+ const eventgroup_t _eventgroup,
+ const std::set<client_t> &_removed,
+ const remote_subscription_id_t _id);
+
void cleanup_server_endpoint(service_t _service,
const std::shared_ptr<endpoint>& _endpoint);
@@ -414,6 +420,8 @@ private:
method_t _method, length_t _length);
void statistics_log_timer_cbk(boost::system::error_code const & _error);
+ void send_suspend() const;
+
private:
std::shared_ptr<routing_manager_stub> stub_;
std::shared_ptr<sd::service_discovery> discovery_;
@@ -476,7 +484,6 @@ private:
pending_remote_offer_id_t pending_remote_offer_id_;
std::map<pending_remote_offer_id_t, std::pair<service_t, instance_t>> pending_remote_offers_;
- std::mutex last_resume_mutex_;
std::chrono::steady_clock::time_point last_resume_;
std::mutex offer_serialization_mutex_;
@@ -493,6 +500,9 @@ private:
msg_statistic_t> message_statistics_;
std::tuple<service_t, instance_t, method_t> message_to_discard_;
uint32_t ignored_statistics_counter_;
+
+ // synchronize update_remote_subscription() and send_(un)subscription()
+ std::mutex update_remote_subscription_mutex_;
};
} // namespace vsomeip_v3
diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp
index fd546a0..2b5335a 100644
--- a/implementation/routing/include/routing_manager_proxy.hpp
+++ b/implementation/routing/include/routing_manager_proxy.hpp
@@ -194,6 +194,8 @@ private:
void on_update_security_credentials(const byte_t *_data, uint32_t _size);
void on_client_assign_ack(const client_t &_client);
+ void on_suspend();
+
private:
enum class inner_state_type_e : std::uint8_t {
ST_REGISTERED = 0x0,
diff --git a/implementation/routing/include/routing_manager_stub.hpp b/implementation/routing/include/routing_manager_stub.hpp
index 4c2ff01..aa5796e 100644
--- a/implementation/routing/include/routing_manager_stub.hpp
+++ b/implementation/routing/include/routing_manager_stub.hpp
@@ -66,6 +66,11 @@ public:
instance_t _instance, eventgroup_t _eventgroup,
event_t _event, remote_subscription_id_t _id);
+ bool send_expired_subscription(const std::shared_ptr<endpoint>& _target,
+ client_t _client, service_t _service,
+ instance_t _instance, eventgroup_t _eventgroup,
+ event_t _event, remote_subscription_id_t _id);
+
void send_subscribe_nack(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, event_t _event);
@@ -115,6 +120,8 @@ public:
const std::set<std::shared_ptr<policy> > &_policies);
void remove_requester_policies(uid_t _uid, gid_t _gid);
+ void send_suspend() const;
+
private:
void broadcast(const std::vector<byte_t> &_command) const;
@@ -123,6 +130,9 @@ private:
void distribute_credentials(client_t _hoster, service_t _service, instance_t _instance);
+ void inform_provider(client_t _hoster, service_t _service,
+ instance_t _instance, major_version_t _major, minor_version_t _minor,
+ routing_info_entry_e _entry);
void inform_requesters(client_t _hoster, service_t _service,
instance_t _instance, major_version_t _major,
minor_version_t _minor, routing_info_entry_e _entry,
diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp
index 4707948..6ad0ab1 100644
--- a/implementation/routing/include/routing_manager_stub_host.hpp
+++ b/implementation/routing/include/routing_manager_stub_host.hpp
@@ -51,7 +51,7 @@ public:
virtual void on_subscribe_nack(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, event_t _event,
- remote_subscription_id_t _subscription_id) = 0;
+ remote_subscription_id_t _subscription_id, bool _simulated) = 0;
virtual void on_subscribe_ack(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, event_t _event,
diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp
index bc0ba3e..b63022c 100644
--- a/implementation/routing/src/event.cpp
+++ b/implementation/routing/src/event.cpp
@@ -130,8 +130,10 @@ void event::set_payload(const std::shared_ptr<payload> &_payload, bool _force) {
}
}
} else {
- VSOMEIP_INFO << "Can't set payload for event " << std::hex
- << message_->get_method() << " as it isn't provided";
+ VSOMEIP_INFO << "Can't set payload for event "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "." << get_event()
+ << " as it isn't provided";
}
}
@@ -146,8 +148,10 @@ void event::set_payload(const std::shared_ptr<payload> &_payload, client_t _clie
}
}
} else {
- VSOMEIP_INFO << "Can't set payload for event " << std::hex
- << message_->get_method() << " as it isn't provided";
+ VSOMEIP_INFO << "Can't set payload for event "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "." << get_event()
+ << ". It isn't provided";
}
}
@@ -164,8 +168,10 @@ void event::set_payload(const std::shared_ptr<payload> &_payload,
}
}
} else {
- VSOMEIP_INFO << "Can't set payload for event " << std::hex
- << message_->get_method() << " as it isn't provided";
+ VSOMEIP_INFO << "Can't set payload for event "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "." << get_event()
+ << ". It isn't provided";
}
}
@@ -281,7 +287,9 @@ void event::notify() {
routing_->send(VSOMEIP_ROUTING_CLIENT, message_);
} else {
VSOMEIP_INFO << __func__
- << ": Notifying " << std::hex << get_event()
+ << ": Notifying "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "." << get_event()
<< " failed. Event payload not (yet) set!";
}
}
@@ -293,7 +301,9 @@ void event::notify_one(client_t _client,
notify_one_unlocked(_client, _target);
} else {
VSOMEIP_WARNING << __func__
- << ": Notifying " << std::hex << get_event()
+ << ": Notifying "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "." << get_event()
<< " failed. Target undefined";
}
}
@@ -306,13 +316,17 @@ void event::notify_one_unlocked(client_t _client,
routing_->send_to(_client, _target, message_);
} else {
VSOMEIP_INFO << __func__
- << ": Notifying " << std::hex << get_event()
+ << ": Notifying "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "." << get_event()
<< " failed. Event payload not (yet) set!";
pending_.insert(_target);
}
} else {
VSOMEIP_WARNING << __func__
- << ": Notifying " << std::hex << get_event()
+ << ": Notifying "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "." << get_event()
<< " failed. Target undefined";
}
}
@@ -329,7 +343,8 @@ void event::notify_one_unlocked(client_t _client) {
} else {
VSOMEIP_INFO << __func__
<< ": Notifying "
- << std::hex << message_->get_method()
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "." << get_event()
<< " to client " << _client
<< " failed. Event payload not set!";
}
@@ -403,9 +418,12 @@ bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _for
ret = eventgroups_[_eventgroup].insert(_client).second;
} else {
VSOMEIP_WARNING << __func__ << ": Didnt' insert client "
- << std::hex << std::setw(4) << std::setfill('0') << _client
- << " to eventgroup 0x"
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup;
+ << std::hex << std::setw(4) << std::setfill('0')
+ << _client
+ << " to eventgroup "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << get_service() << "." << get_instance() << "."
+ << _eventgroup;
}
return ret;
}
diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp
index d09ed1b..50bbdb6 100644
--- a/implementation/routing/src/eventgroupinfo.cpp
+++ b/implementation/routing/src/eventgroupinfo.cpp
@@ -26,13 +26,14 @@ eventgroupinfo::eventgroupinfo()
threshold_(0),
id_(PENDING_SUBSCRIPTION_ID),
reliability_(reliability_type_e::RT_UNKNOWN),
- reliability_auto_mode_(false) {
+ reliability_auto_mode_(false),
+ max_remote_subscribers_(VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS) {
}
eventgroupinfo::eventgroupinfo(
const service_t _service, const instance_t _instance,
const eventgroup_t _eventgroup, const major_version_t _major,
- const ttl_t _ttl)
+ const ttl_t _ttl, const uint8_t _max_remote_subscribers)
: service_(_service),
instance_(_instance),
eventgroup_(_eventgroup),
@@ -42,7 +43,8 @@ eventgroupinfo::eventgroupinfo(
threshold_(0),
id_(PENDING_SUBSCRIPTION_ID),
reliability_(reliability_type_e::RT_UNKNOWN),
- reliability_auto_mode_(false) {
+ reliability_auto_mode_(false),
+ max_remote_subscribers_(_max_remote_subscribers) {
}
eventgroupinfo::~eventgroupinfo() {
@@ -213,65 +215,123 @@ eventgroupinfo::update_remote_subscription(
const std::chrono::steady_clock::time_point &_expiration,
std::set<client_t> &_changed, remote_subscription_id_t &_id,
const bool _is_subscribe) {
- std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
- for (const auto& its_item : subscriptions_) {
- if (its_item.second->equals(_subscription)) {
- // update existing subscription
- _changed = its_item.second->update(
- _subscription->get_clients(), _expiration, _is_subscribe);
- _id = its_item.second->get_id();
-
- // Copy acknowledgment states from existing subscription
- for (const auto& its_client : _subscription->get_clients()) {
- _subscription->set_client_state(its_client,
- its_item.second->get_client_state(its_client));
- }
+ bool its_result(false);
+ std::shared_ptr<endpoint_definition> its_subscriber;
+ std::set<std::shared_ptr<event> > its_events;
- if (_is_subscribe) {
- if (!_changed.empty()) {
- // New clients:
- // Let this be a child subscription
- _subscription->set_parent(its_item.second);
- update_id();
- _subscription->set_id(id_);
- subscriptions_[id_] = _subscription;
- } else {
- if (!_subscription->is_pending()) {
- if (!_subscription->force_initial_events()) {
- _subscription->set_initial(false);
- }
+ {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+
+ for (const auto& its_item : subscriptions_) {
+ if (its_item.second->equals(_subscription)) {
+ // update existing subscription
+ _changed = its_item.second->update(
+ _subscription->get_clients(), _expiration, _is_subscribe);
+ _id = its_item.second->get_id();
+
+ // Copy acknowledgment states from existing subscription
+ for (const auto& its_client : _subscription->get_clients()) {
+ const auto its_state = its_item.second->get_client_state(its_client);
+ if (_is_subscribe &&
+ its_state == remote_subscription_state_e::SUBSCRIPTION_UNKNOWN) {
+ _subscription->set_client_state(its_client,
+ remote_subscription_state_e::SUBSCRIPTION_PENDING);
+ _changed.insert(its_client);
} else {
- its_item.second->set_answers(
- its_item.second->get_answers() + 1);
- _subscription->set_parent(its_item.second);
- _subscription->set_answers(0);
+ _subscription->set_client_state(its_client, its_state);
}
}
- } else {
- if (its_item.second->is_pending()) {
- for (const auto &its_event : events_)
- its_event->remove_pending(
- its_item.second->get_subscriber());
+
+ if (_is_subscribe) {
+ if (!_changed.empty()) {
+ // New clients:
+ // Let this be a child subscription
+ _subscription->set_parent(its_item.second);
+ update_id();
+ _subscription->set_id(id_);
+ subscriptions_[id_] = _subscription;
+ } else {
+ if (!_subscription->is_pending()) {
+ if (!_subscription->force_initial_events()) {
+ _subscription->set_initial(false);
+ }
+ } else {
+ its_item.second->set_answers(
+ its_item.second->get_answers() + 1);
+ _subscription->set_parent(its_item.second);
+ _subscription->set_answers(0);
+ }
+ }
+ } else {
+ if (its_item.second->is_pending()) {
+ its_subscriber = its_item.second->get_subscriber();
+ }
}
+
+ its_result = true;
+ break;
}
+ }
+ }
- return true;
+ if (its_subscriber) {
+ {
+ // Build set of events first to avoid having to
+ // hold the "events_mutex_" in parallel to the internal event mutexes.
+ std::lock_guard<std::mutex> its_lock(events_mutex_);
+ for (const auto &its_event : events_)
+ its_events.insert(its_event);
}
+ for (const auto &its_event : its_events)
+ its_event->remove_pending(its_subscriber);
}
- return false;
+ return (its_result);
+}
+
+bool
+eventgroupinfo::is_remote_subscription_limit_reached(
+ const std::shared_ptr<remote_subscription> &_subscription) {
+ bool limit_reached(false);
+
+ if (subscriptions_.size() <= max_remote_subscribers_) {
+ return false;
+ }
+
+ boost::asio::ip::address its_address;
+ if (_subscription->get_ip_address(its_address)) {
+ auto find_address = remote_subscribers_count_.find(its_address);
+ if (find_address != remote_subscribers_count_.end()) {
+ if (find_address->second > max_remote_subscribers_) {
+ VSOMEIP_WARNING << ": remote subscriber limit [" << std::dec
+ << (uint32_t)max_remote_subscribers_ << "] to ["
+ << std::hex << std::setw(4) << std::setfill('0') << service_ << "."
+ << std::hex << std::setw(4) << std::setfill('0') << instance_ << "."
+ << std::hex << std::setw(4) << std::setfill('0') << eventgroup_ << "]"
+ << " reached for remote address: " << its_address.to_string()
+ << " rejecting subscription!";
+ return true;
+ }
+ }
+ }
+ return limit_reached;
}
remote_subscription_id_t
eventgroupinfo::add_remote_subscription(
const std::shared_ptr<remote_subscription> &_subscription) {
std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+
update_id();
_subscription->set_id(id_);
subscriptions_[id_] = _subscription;
+ boost::asio::ip::address its_address;
+ if (_subscription->get_ip_address(its_address)) {
+ remote_subscribers_count_[its_address]++;
+ }
return id_;
}
@@ -291,6 +351,20 @@ void
eventgroupinfo::remove_remote_subscription(
const remote_subscription_id_t _id) {
std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+
+ auto find_subscription = subscriptions_.find(_id);
+ if (find_subscription != subscriptions_.end()) {
+ boost::asio::ip::address its_address;
+ if (find_subscription->second->get_ip_address(its_address)) {
+ auto find_address = remote_subscribers_count_.find(its_address);
+ if (find_address != remote_subscribers_count_.end()) {
+ if(find_address->second != 0) {
+ find_address->second--;
+ }
+ }
+ }
+ }
+
subscriptions_.erase(_id);
}
@@ -298,6 +372,7 @@ void
eventgroupinfo::clear_remote_subscriptions() {
std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
subscriptions_.clear();
+ remote_subscribers_count_.clear();
}
std::set<std::shared_ptr<endpoint_definition> >
@@ -390,11 +465,19 @@ eventgroupinfo::send_initial_events(
}
// Send events
- for (const auto& its_event : its_reliable_events)
+ for (const auto &its_event : its_reliable_events)
its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _reliable);
- for (const auto& its_event : its_unreliable_events)
+ for (const auto &its_event : its_unreliable_events)
its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _unreliable);
}
+uint8_t eventgroupinfo::get_max_remote_subscribers() const {
+ return max_remote_subscribers_;
+}
+
+void eventgroupinfo::set_max_remote_subscribers(uint8_t _max_remote_subscribers) {
+ max_remote_subscribers_ = _max_remote_subscribers;
+}
+
} // namespace vsomeip_v3
diff --git a/implementation/routing/src/remote_subscription.cpp b/implementation/routing/src/remote_subscription.cpp
index daec6f9..cb04d93 100644
--- a/implementation/routing/src/remote_subscription.cpp
+++ b/implementation/routing/src/remote_subscription.cpp
@@ -46,6 +46,21 @@ remote_subscription::equals(
return operator ==(*_other);
}
+bool
+remote_subscription::address_equals(
+ const std::shared_ptr<remote_subscription> &_other) const {
+ bool relibale_address_equals(false);
+ bool unrelibale_address_equals(false);
+
+ if (reliable_ && (*_other).reliable_)
+ relibale_address_equals = (reliable_->get_address()
+ == (*_other).reliable_->get_address());
+ if (unreliable_ && (*_other).unreliable_)
+ unrelibale_address_equals = (unreliable_->get_address()
+ == (*_other).unreliable_->get_address());
+ return (relibale_address_equals || unrelibale_address_equals);
+}
+
void
remote_subscription::reset(const std::set<client_t> &_clients) {
auto its_client_state = std::make_pair(
@@ -135,7 +150,7 @@ std::set<client_t>
remote_subscription::get_clients() const {
std::lock_guard<std::mutex> its_lock(mutex_);
std::set<client_t> its_clients;
- for (const auto& its_item : clients_)
+ for (const auto &its_item : clients_)
its_clients.insert(its_item.first);
return its_clients;
}
@@ -313,4 +328,17 @@ remote_subscription::set_answers(const std::uint32_t _answers) {
answers_ = _answers;
}
+bool
+remote_subscription::get_ip_address(boost::asio::ip::address &_address) const {
+ if (reliable_) {
+ _address = reliable_->get_address();
+ return true;
+ }
+ else if (unreliable_) {
+ _address = unreliable_->get_address();
+ return true;
+ }
+ return false;
+}
+
} // namespace vsomeip_v3
diff --git a/implementation/routing/src/routing_manager_base.cpp b/implementation/routing/src/routing_manager_base.cpp
index 960ae60..c787a9e 100644
--- a/implementation/routing/src/routing_manager_base.cpp
+++ b/implementation/routing/src/routing_manager_base.cpp
@@ -24,7 +24,8 @@ routing_manager_base::routing_manager_base(routing_manager_host *_host) :
host_(_host),
io_(host_->get_io()),
client_(host_->get_client()),
- configuration_(host_->get_configuration())
+ configuration_(host_->get_configuration()),
+ routing_state_(routing_state_e::RS_UNKNOWN)
#ifdef USE_DLT
, tc_(trace::connector_impl::get())
#endif
@@ -458,6 +459,8 @@ void routing_manager_base::register_event(client_t _client,
its_eventgroupinfo->set_service(_service);
its_eventgroupinfo->set_instance(_instance);
its_eventgroupinfo->set_eventgroup(eg);
+ its_eventgroupinfo->set_max_remote_subscribers(
+ configuration_->get_max_remote_subscribers());
std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
eventgroups_[_service][_instance][eg] = its_eventgroupinfo;
}
@@ -549,22 +552,24 @@ bool routing_manager_base::is_response_allowed(client_t _sender, service_t _serv
return true;
}
- if (_sender == find_local_client(_service, _instance)) {
- // sender is still offering the service
- return true;
- }
+ {
+ std::lock_guard<std::mutex> its_lock(local_services_mutex_);
+ if (_sender == find_local_client_unlocked(_service, _instance)) {
+ // sender is still offering the service
+ return true;
+ }
- std::lock_guard<std::mutex> its_lock(local_services_mutex_);
- auto found_service = local_services_history_.find(_service);
- if (found_service != local_services_history_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- auto found_client = found_instance->second.find(_sender);
- if (found_client != found_instance->second.end()) {
- // sender was offering the service and is still connected
- return true;
+ auto found_service = local_services_history_.find(_service);
+ if (found_service != local_services_history_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ auto found_client = found_instance->second.find(_sender);
+ if (found_client != found_instance->second.end()) {
+ // sender was offering the service and is still connected
+ return true;
+ }
}
- }
+ }
}
// service is now offered by another client
@@ -648,6 +653,21 @@ void routing_manager_base::unsubscribe(client_t _client, uid_t _uid, gid_t _gid,
}
}
+void
+routing_manager_base::unsubscribe_all(
+ service_t _service, instance_t _instance) {
+
+ std::lock_guard<std::mutex> its_guard(events_mutex_);
+ auto find_service = events_.find(_service);
+ if (find_service != events_.end()) {
+ auto find_instance = find_service->second.find(_instance);
+ if (find_instance != find_service->second.end()) {
+ for (auto &e : find_instance->second)
+ e.second->clear_subscribers();
+ }
+ }
+}
+
void routing_manager_base::notify(service_t _service, instance_t _instance,
event_t _event, std::shared_ptr<payload> _payload,
bool _force) {
@@ -982,6 +1002,11 @@ std::set<client_t> routing_manager_base::find_local_clients(service_t _service,
client_t routing_manager_base::find_local_client(service_t _service,
instance_t _instance) const {
std::lock_guard<std::mutex> its_lock(local_services_mutex_);
+ return find_local_client_unlocked(_service, _instance);
+}
+
+client_t routing_manager_base::find_local_client_unlocked(service_t _service,
+ instance_t _instance) const {
client_t its_client(VSOMEIP_ROUTING_CLIENT);
auto its_service = local_services_.find(_service);
if (its_service != local_services_.end()) {
@@ -1396,6 +1421,11 @@ routing_manager_base::get_subscriptions(const client_t _client) {
return result;
}
+routing_state_e
+routing_manager_base::get_routing_state() {
+ return routing_state_;
+}
+
#ifdef VSOMEIP_ENABLE_COMPAT
void routing_manager_base::set_incoming_subscription_state(client_t _client, service_t _service, instance_t _instance,
eventgroup_t _eventgroup, event_t _event, subscription_state_e _state) {
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 34150fe..07c0740 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -532,8 +532,7 @@ void routing_manager_impl::request_service(client_t _client, service_t _service,
_minor, DEFAULT_TTL);
}
its_info->add_client(_client);
- ep_mgr_impl_->find_or_create_remote_client(
- _service, _instance, true);
+ ep_mgr_impl_->find_or_create_remote_client(_service, _instance);
}
}
}
@@ -633,6 +632,7 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
std::unique_lock<std::mutex> its_critical(remote_subscription_state_mutex_);
bool inserted = insert_subscription(_service, _instance, _eventgroup,
_event, _client, &its_already_subscribed_events);
+ const bool subscriber_is_rm_host = (get_client() == _client);
if (inserted) {
if (0 == its_local_client) {
handle_subscription_state(_client, _service, _instance, _eventgroup, _event);
@@ -642,7 +642,11 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
_eventgroup, _event, its_already_subscribed_events);
auto its_info = find_eventgroup(_service, _instance, _eventgroup);
- if (its_info) {
+ // if the subscriber is the rm_host itself: check if service
+ // is available before subscribing via SD otherwise we sent
+ // a StopSubscribe/Subscribe once the first offer is received
+ if (its_info &&
+ (!subscriber_is_rm_host || find_service(_service, _instance))) {
discovery_->subscribe(_service, _instance, _eventgroup,
_major, configured_ttl,
its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT,
@@ -657,7 +661,7 @@ void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
}
}
}
- if (get_client() == _client) {
+ if (subscriber_is_rm_host) {
std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
subscription_data_t subscription = {
_service, _instance, _eventgroup, _major, _event, _uid, _gid
@@ -701,6 +705,10 @@ void routing_manager_impl::unsubscribe(client_t _client, uid_t _uid, gid_t _gid,
host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, false,
[](const bool _subscription_accepted){ (void)_subscription_accepted; });
if (0 == find_local_client(_service, _instance)) {
+ if (get_client() == _client) {
+ std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
+ remove_pending_subscription(_service, _instance, _eventgroup, _event);
+ }
if (last_subscriber_removed) {
unset_all_eventpayloads(_service, _instance, _eventgroup);
{
@@ -784,7 +792,8 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
if (its_target) {
#ifdef USE_DLT
if ((is_request && its_client == get_client()) ||
- (is_response && find_local_client(its_service, _instance) == get_client())) {
+ (is_response && find_local_client(its_service, _instance) == get_client()) ||
+ (is_notification && find_local_client(its_service, _instance) == VSOMEIP_ROUTING_CLIENT)) {
const uint16_t its_data_size
= uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
@@ -1201,6 +1210,37 @@ void routing_manager_impl::notify_one(service_t _service, instance_t _instance,
void routing_manager_impl::on_availability(service_t _service, instance_t _instance,
bool _is_available, major_version_t _major, minor_version_t _minor) {
+
+ // insert subscriptions of routing manager into service discovery
+ // to send SubscribeEventgroup after StopOffer / Offer was received
+ if (_is_available) {
+ if (discovery_) {
+ const client_t its_local_client = find_local_client(_service, _instance);
+ // remote service
+ if (VSOMEIP_ROUTING_CLIENT == its_local_client) {
+ static const ttl_t configured_ttl(configuration_->get_sd_ttl());
+
+ std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
+ for (auto &ps : pending_subscriptions_) {
+ if (ps.service_ == _service
+ && ps.instance_ == _instance
+ && ps.major_ == _major) {
+ auto its_info = find_eventgroup(_service, _instance, ps.eventgroup_);
+ if (its_info) {
+ discovery_->subscribe(
+ _service,
+ _instance,
+ ps.eventgroup_,
+ _major,
+ configured_ttl,
+ its_info->is_selective() ? get_client() : VSOMEIP_ROUTING_CLIENT,
+ its_info);
+ }
+ }
+ }
+ }
+ }
+ }
host_->on_availability(_service, _instance, _is_available, _major, _minor);
}
@@ -1719,16 +1759,23 @@ void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _se
erase_offer_command(_service, _instance);
}
- std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_);
- auto find_service = eventgroups_.find(_service);
- if (find_service != eventgroups_.end()) {
- auto find_instance = find_service->second.find(_instance);
- if (find_instance != find_service->second.end()) {
- for (auto e : find_instance->second) {
- e.second->clear_remote_subscriptions();
+ std::set<std::shared_ptr<eventgroupinfo> > its_eventgroup_info_set;
+ {
+ std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_);
+ auto find_service = eventgroups_.find(_service);
+ if (find_service != eventgroups_.end()) {
+ auto find_instance = find_service->second.find(_instance);
+ if (find_instance != find_service->second.end()) {
+ for (auto e : find_instance->second) {
+ its_eventgroup_info_set.insert(e.second);
+ }
}
}
}
+
+ for (auto e : its_eventgroup_info_set) {
+ e->clear_remote_subscriptions();
+ }
} else {
erase_offer_command(_service, _instance);
}
@@ -1902,6 +1949,11 @@ bool routing_manager_impl::deliver_notification(
}
}
if (!cache_event) {
+ VSOMEIP_WARNING << __func__ << ": dropping ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event_id
+ << "]. No subscription to corresponding eventgroup.";
return true; // as there is nothing to do
}
}
@@ -1996,13 +2048,10 @@ std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoin
its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE,
_address, _port);
if (!_reliable) {
-#if defined(_WIN32) || defined(ANDROID)
- dynamic_cast<udp_server_endpoint_impl*>(
- its_service_endpoint.get())->join(_address);
-#else
- reinterpret_cast<udp_server_endpoint_impl*>(
- its_service_endpoint.get())->join(_address);
-#endif
+ auto its_udp_server_endpoint_impl = std::dynamic_pointer_cast<
+ udp_server_endpoint_impl>(its_service_endpoint);
+ if (its_udp_server_endpoint_impl)
+ its_udp_server_endpoint_impl->join(_address);
}
} else {
VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. "
@@ -2150,6 +2199,7 @@ bool routing_manager_impl::is_field(service_t _service, instance_t _instance,
return false;
}
+//only called from the SD
void routing_manager_impl::add_routing_info(
service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor, ttl_t _ttl,
@@ -2158,6 +2208,12 @@ void routing_manager_impl::add_routing_info(
const boost::asio::ip::address &_unreliable_address,
uint16_t _unreliable_port) {
+ std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
+ if (routing_state_ == routing_state_e::RS_SUSPENDED) {
+ VSOMEIP_INFO << "rmi::" << __func__ << " We are suspened --> do nothing.";
+ return;
+ }
+
// Create/Update service info
std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
if (!its_info) {
@@ -2259,6 +2315,7 @@ void routing_manager_impl::add_routing_info(
}
} else if (_reliable_port != ILLEGAL_PORT && is_reliable_known) {
std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
+ bool connected(false);
for(const auto &client_id : requested_services_) {
auto found_service = client_id.second.find(_service);
if (found_service != client_id.second.end()) {
@@ -2272,21 +2329,34 @@ void routing_manager_impl::add_routing_info(
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
std::shared_ptr<endpoint> ep = its_info->get_endpoint(true);
- if (ep && ep->is_established() &&
+ if (ep) {
+ if (ep->is_established() &&
!stub_->contained_in_routing_info(
VSOMEIP_ROUTING_CLIENT, _service, _instance,
its_info->get_major(),
its_info->get_minor())) {
- on_availability(_service, _instance,
- true, its_info->get_major(), its_info->get_minor());
- stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT,
- _service, _instance,
- its_info->get_major(),
- its_info->get_minor());
- if (discovery_) {
- discovery_->on_endpoint_connected(
- _service, _instance, ep);
+ on_availability(_service, _instance,
+ true, its_info->get_major(), its_info->get_minor());
+ stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT,
+ _service, _instance,
+ its_info->get_major(),
+ its_info->get_minor());
+ if (discovery_) {
+ discovery_->on_endpoint_connected(
+ _service, _instance, ep);
+ }
+ }
+ } else {
+ // no endpoint yet, but requested -> create one
+
+ // SWS_SD_00376 establish TCP connection to service
+ // service is marked as available later in on_connect()
+ if (!connected) {
+ ep_mgr_impl_->find_or_create_remote_client(
+ _service, _instance, true);
+ connected = true;
}
+ its_info->add_client(client_id.first);
}
break;
}
@@ -2358,7 +2428,8 @@ void routing_manager_impl::add_routing_info(
&& (major_minor_pair.second <= _minor
|| _minor == DEFAULT_MINOR
|| major_minor_pair.second == ANY_MINOR)) {
- if (!stub_->contained_in_routing_info(
+ if (_reliable_port == ILLEGAL_PORT && !is_reliable_known &&
+ !stub_->contained_in_routing_info(
VSOMEIP_ROUTING_CLIENT, _service, _instance,
its_info->get_major(),
its_info->get_minor())) {
@@ -2411,18 +2482,15 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst
// do no longer exist and the last received payload is no
// longer valid.
for (auto &its_event : its_eventgroup.second->get_events()) {
- const auto its_subscribers = its_event->get_subscribers();
- for (const auto& its_subscriber : its_subscribers) {
+ const auto& its_subscribers = its_event->get_subscribers();
+ for (const auto its_subscriber : its_subscribers) {
if (its_subscriber != get_client()) {
its_event->remove_subscriber(
its_eventgroup.first, its_subscriber);
}
}
its_events.push_back(its_event);
- remove_pending_subscription(_service, _instance,
- its_eventgroup.first, its_event->get_event());
}
-
}
}
}
@@ -2436,14 +2504,14 @@ void routing_manager_impl::del_routing_info(service_t _service, instance_t _inst
std::set<std::tuple<
service_t, instance_t, eventgroup_t, client_t> > its_invalid;
- for (const auto& its_state : remote_subscription_state_) {
+ for (const auto &its_state : remote_subscription_state_) {
if (std::get<0>(its_state.first) == _service
&& std::get<1>(its_state.first) == _instance) {
its_invalid.insert(its_state.first);
}
}
- for (const auto& its_key : its_invalid)
+ for (const auto &its_key : its_invalid)
remote_subscription_state_.erase(its_key);
}
@@ -2599,8 +2667,15 @@ routing_manager_impl::expire_subscriptions(
const bool expire_all = (_range.first == ANY_PORT
&& _range.second == ANY_PORT);
- std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
- for (const auto &its_service : eventgroups_) {
+ std::map<service_t,
+ std::map<instance_t,
+ std::map<eventgroup_t,
+ std::shared_ptr<eventgroupinfo> > > >its_eventgroups;
+ {
+ std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
+ its_eventgroups = eventgroups_;
+ }
+ for (const auto &its_service : its_eventgroups) {
for (const auto &its_instance : its_service.second) {
for (const auto &its_eventgroup : its_instance.second) {
const auto its_info = its_eventgroup.second;
@@ -2743,10 +2818,12 @@ void routing_manager_impl::on_remote_subscribe(
// not exist or is still (partly) pending.
remote_subscription_id_t its_id;
std::set<client_t> its_added;
+ update_remote_subscription_mutex_.lock();
auto its_result = its_eventgroupinfo->update_remote_subscription(
_subscription, its_expiration, its_added, its_id, true);
if (its_result) {
if (!_subscription->is_pending()) { // resubscription without change
+ update_remote_subscription_mutex_.unlock();
_callback(_subscription);
} else if (!its_added.empty()) { // new clients for a selective subscription
const client_t its_offering_client
@@ -2754,6 +2831,7 @@ void routing_manager_impl::on_remote_subscribe(
send_subscription(its_offering_client,
its_service, its_instance, its_eventgroup, its_major,
its_added, _subscription->get_id());
+ update_remote_subscription_mutex_.unlock();
} else { // identical subscription is not yet processed
std::stringstream its_warning;
its_warning << __func__ << " a remote subscription is already pending ["
@@ -2774,9 +2852,21 @@ void routing_manager_impl::on_remote_subscribe(
if (its_reliable && its_unreliable)
its_warning << "]";
VSOMEIP_WARNING << its_warning.str();
+
+ update_remote_subscription_mutex_.unlock();
_callback(_subscription);
}
} else { // new subscription
+ if (its_eventgroupinfo->is_remote_subscription_limit_reached(
+ _subscription)) {
+ _subscription->set_all_client_states(
+ remote_subscription_state_e::SUBSCRIPTION_NACKED);
+
+ update_remote_subscription_mutex_.unlock();
+ _callback(_subscription);
+ return;
+ }
+
auto its_id
= its_eventgroupinfo->add_remote_subscription(_subscription);
@@ -2785,6 +2875,7 @@ void routing_manager_impl::on_remote_subscribe(
send_subscription(its_offering_client,
its_service, its_instance, its_eventgroup, its_major,
_subscription->get_clients(), its_id);
+ update_remote_subscription_mutex_.unlock();
}
}
@@ -2820,6 +2911,7 @@ void routing_manager_impl::on_remote_unsubscribe(
remote_subscription_id_t its_id(0);
std::set<client_t> its_removed;
+ update_remote_subscription_mutex_.lock();
auto its_result = its_info->update_remote_subscription(
_subscription, std::chrono::steady_clock::now(),
its_removed, its_id, false);
@@ -2831,6 +2923,8 @@ void routing_manager_impl::on_remote_unsubscribe(
its_service, its_instance, its_eventgroup, its_major,
its_removed, its_id);
}
+
+ update_remote_subscription_mutex_.unlock();
}
void routing_manager_impl::on_subscribe_ack_with_multicast(
@@ -2931,13 +3025,20 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client(
void routing_manager_impl::on_subscribe_nack(client_t _client,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- event_t _event, remote_subscription_id_t _id) {
+ event_t _event, remote_subscription_id_t _id, bool _simulated) {
(void)_event; // TODO: Remove completely?
auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
if (its_eventgroup) {
auto its_subscription = its_eventgroup->get_remote_subscription(_id);
if (its_subscription) {
+ if (_simulated) {
+ // method was called because a subscription for unoffered
+ // service was received. Therefore, remove the remote_subscription
+ // from the eventgroupinfo to ensure subsequent similar
+ // subscriptions are handled like a new/unknown subscription
+ its_eventgroup->remove_remote_subscription(_id);
+ }
its_subscription->set_client_state(_client,
remote_subscription_state_e::SUBSCRIPTION_NACKED);
@@ -3110,6 +3211,10 @@ void routing_manager_impl::clear_remote_subscriber(
std::chrono::steady_clock::time_point
routing_manager_impl::expire_subscriptions(bool _force) {
+ std::map<service_t,
+ std::map<instance_t,
+ std::map<eventgroup_t,
+ std::shared_ptr<eventgroupinfo> > > >its_eventgroups;
std::map<std::shared_ptr<remote_subscription>,
std::set<client_t> > its_expired_subscriptions;
@@ -3119,24 +3224,25 @@ routing_manager_impl::expire_subscriptions(bool _force) {
= std::chrono::steady_clock::now() + std::chrono::hours(24);
{
std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
-
- for (auto &its_service : eventgroups_) {
- for (auto &its_instance : its_service.second) {
- for (auto &its_eventgroup : its_instance.second) {
- auto its_subscriptions
- = its_eventgroup.second->get_remote_subscriptions();
- for (auto &s : its_subscriptions) {
- for (auto its_client : s->get_clients()) {
- if (_force) {
- its_expired_subscriptions[s].insert(its_client);
- } else {
- auto its_expiration = s->get_expiration(its_client);
- if (its_expiration != std::chrono::steady_clock::time_point()) {
- if (its_expiration < now) {
- its_expired_subscriptions[s].insert(its_client);
- } else if (its_expiration < its_next_expiration) {
- its_next_expiration = its_expiration;
- }
+ its_eventgroups = eventgroups_;
+ }
+
+ for (auto &its_service : its_eventgroups) {
+ for (auto &its_instance : its_service.second) {
+ for (auto &its_eventgroup : its_instance.second) {
+ auto its_subscriptions
+ = its_eventgroup.second->get_remote_subscriptions();
+ for (auto &s : its_subscriptions) {
+ for (auto its_client : s->get_clients()) {
+ if (_force) {
+ its_expired_subscriptions[s].insert(its_client);
+ } else {
+ auto its_expiration = s->get_expiration(its_client);
+ if (its_expiration != std::chrono::steady_clock::time_point()) {
+ if (its_expiration < now) {
+ its_expired_subscriptions[s].insert(its_client);
+ } else if (its_expiration < its_next_expiration) {
+ its_next_expiration = its_expiration;
}
}
}
@@ -3152,22 +3258,53 @@ routing_manager_impl::expire_subscriptions(bool _force) {
auto its_service = its_info->get_service();
auto its_instance = its_info->get_instance();
auto its_eventgroup = its_info->get_eventgroup();
- auto its_major = its_info->get_major();
remote_subscription_id_t its_id;
+ update_remote_subscription_mutex_.lock();
auto its_result = its_info->update_remote_subscription(
s.first, std::chrono::steady_clock::now(),
s.second, its_id, false);
if (its_result) {
const client_t its_offering_client
= find_local_client(its_service, its_instance);
- send_unsubscription(its_offering_client,
- its_service, its_instance, its_eventgroup, its_major,
+ const auto its_subscription = its_info->get_remote_subscription(its_id);
+ if (its_subscription) {
+ its_info->remove_remote_subscription(its_id);
+
+ std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
+ remote_subscribers_[its_service][its_instance].erase(its_offering_client);
+
+ if (its_info->get_remote_subscriptions().size() == 0) {
+ for (const auto &its_event : its_info->get_events()) {
+ bool has_remote_subscriber(false);
+ for (const auto &its_eventgroup : its_event->get_eventgroups()) {
+ const auto its_eventgroup_info
+ = find_eventgroup(its_service, its_instance, its_eventgroup);
+ if (its_eventgroup_info
+ && its_eventgroup_info->get_remote_subscriptions().size() > 0) {
+ has_remote_subscriber = true;
+ }
+ }
+ if (!has_remote_subscriber && its_event->is_shadow()) {
+ its_event->unset_payload();
+ }
+ }
+ }
+ } else {
+ VSOMEIP_ERROR << __func__
+ << ": Unknown expired subscription " << std::dec << its_id << " for eventgroup ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
+ }
+ send_expired_subscription(its_offering_client,
+ its_service, its_instance, its_eventgroup,
s.second, s.first->get_id());
}
+ update_remote_subscription_mutex_.unlock();
if (s.first->get_unreliable()) {
- VSOMEIP_INFO << "Expired subscription ["
+ VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription ["
<< std::hex << std::setfill('0') << std::setw(4) << its_service << "."
<< std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
<< std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] unreliable from "
@@ -3176,7 +3313,7 @@ routing_manager_impl::expire_subscriptions(bool _force) {
}
if (s.first->get_reliable()) {
- VSOMEIP_INFO << "Expired subscription ["
+ VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription ["
<< std::hex << std::setfill('0') << std::setw(4) << its_service << "."
<< std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
<< std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] reliable from "
@@ -3205,7 +3342,7 @@ void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const
}
std::stringstream its_last_resume;
{
- std::lock_guard<std::mutex> its_lock(last_resume_mutex_);
+ std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
if (last_resume_ != std::chrono::steady_clock::time_point::min()) {
its_last_resume << " | " << std::dec
<< std::chrono::duration_cast<std::chrono::seconds>(
@@ -3491,15 +3628,31 @@ void routing_manager_impl::send_subscribe(client_t _client, service_t _service,
}
void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
+
+ // Ignore setting to the current routing state
+ {
+ std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
+ if (routing_state_ == _routing_state) {
+ VSOMEIP_INFO << "rmi::" << __func__ << " No routing state change --> do nothing.";
+ return;
+ }
+
+ routing_state_ = _routing_state;
+ }
+
if(discovery_) {
switch (_routing_state) {
case routing_state_e::RS_SUSPENDED:
{
- VSOMEIP_INFO << "Set routing to suspend mode, diagnosis mode is "
- << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode, diagnosis mode is "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+
// stop processing of incoming SD messages
discovery_->stop();
+ VSOMEIP_INFO << "rmi::" << __func__ << " Inform all applications that we are going to suspend";
+ send_suspend();
+
// remove all remote subscriptions to remotely offered services on this node
expire_subscriptions(true);
@@ -3520,6 +3673,10 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
remote_subscription_state_.clear();
}
+
+ // send StopSubscribes and clear subscribed_ map
+ discovery_->unsubscribe_all_on_suspend();
+
// mark all external services as offline
services_t its_remote_services;
{
@@ -3528,11 +3685,6 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}
for (const auto &s : its_remote_services) {
for (const auto &i : s.second) {
- // determine existing subscriptions to remote service and send StopSubscribe
- for (auto its_eventgroup : get_subscribed_eventgroups(s.first, i.first)) {
- discovery_->unsubscribe(s.first, i.first, its_eventgroup, VSOMEIP_ROUTING_CLIENT);
- }
-
const bool has_reliable(i.second->get_endpoint(true));
const bool has_unreliable(i.second->get_endpoint(false));
del_routing_info(s.first, i.first, has_reliable, has_unreliable);
@@ -3541,14 +3693,18 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
unset_all_eventpayloads(s.first, i.first);
}
}
+
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode done, diagnosis mode is "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+
break;
}
case routing_state_e::RS_RESUMED:
{
- VSOMEIP_INFO << "Set routing to resume mode, diagnosis mode was "
- << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode, diagnosis mode was "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
{
- std::lock_guard<std::mutex> its_lock(last_resume_mutex_);
+ std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
last_resume_ = std::chrono::steady_clock::now();
}
@@ -3575,11 +3731,14 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
discovery_->offer_service(its_instance.second);
}
}
+
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode done, diagnosis mode was "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
break;
}
case routing_state_e::RS_DIAGNOSIS:
{
- VSOMEIP_INFO << "Set routing to diagnosis mode.";
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode.";
discovery_->set_diagnosis_mode(true);
// send StopOffer messages for all someip protocol services
@@ -3591,11 +3750,13 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}
}
}
+
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode done.";
break;
}
case routing_state_e::RS_RUNNING:
- VSOMEIP_INFO << "Set routing to running mode, diagnosis mode was "
- << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode, diagnosis mode was "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
// Reset relevant in service info
for (const auto &its_service : get_offered_services()) {
@@ -3619,6 +3780,9 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}
}
}
+
+ VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode done, diagnosis mode was "
+ << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
break;
default:
break;
@@ -3724,7 +3888,7 @@ void routing_manager_impl::requested_service_remove(client_t _client,
if (!found_service->second.size()) {
found_client->second.erase(_service);
if (!found_client->second.size()) {
- requested_services_.erase(client_);
+ requested_services_.erase(_client);
}
}
}
@@ -4057,6 +4221,7 @@ routing_manager_impl::on_unsubscribe_ack(client_t _client,
std::shared_ptr<eventgroupinfo> its_info
= find_eventgroup(_service, _instance, _eventgroup);
if (its_info) {
+ update_remote_subscription_mutex_.lock();
const auto its_subscription = its_info->get_remote_subscription(_id);
if (its_subscription) {
its_info->remove_remote_subscription(_id);
@@ -4088,6 +4253,7 @@ routing_manager_impl::on_unsubscribe_ack(client_t _client,
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
+ update_remote_subscription_mutex_.unlock();
} else {
VSOMEIP_ERROR << __func__
<< ": Received StopSubscribe for unknown eventgroup: ("
@@ -4122,7 +4288,7 @@ void routing_manager_impl::send_subscription(
&routing_manager_stub_host::on_subscribe_nack,
std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
its_client, _service, _instance,
- _eventgroup, ANY_EVENT, _id);
+ _eventgroup, ANY_EVENT, _id, false);
io_.post(its_callback);
} else {
const auto its_callback = std::bind(
@@ -4146,7 +4312,7 @@ void routing_manager_impl::send_subscription(
&routing_manager_stub_host::on_subscribe_nack,
std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
its_client, _service, _instance, _eventgroup,
- ANY_EVENT, _id);
+ ANY_EVENT, _id, true);
io_.post(its_callback);
} catch (const std::exception &e) {
VSOMEIP_ERROR << __func__ << e.what();
@@ -4270,7 +4436,7 @@ routing_manager_impl::send_unsubscription(client_t _offering_client,
host_->on_subscription(_service, _instance,
_eventgroup, its_client, own_uid_, own_gid_, false,
[this, self, _service, _instance, _eventgroup,
- its_client, _id, _offering_client]
+ its_client, _id]
(const bool _is_accepted) {
(void)_is_accepted;
try {
@@ -4303,6 +4469,30 @@ routing_manager_impl::send_unsubscription(client_t _offering_client,
}
}
+void
+routing_manager_impl::send_expired_subscription(client_t _offering_client,
+ service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup,
+ const std::set<client_t> &_removed,
+ remote_subscription_id_t _id) {
+
+ if (host_->get_client() == _offering_client) {
+ auto self = shared_from_this();
+ for (const auto its_client : _removed) {
+ host_->on_subscription(_service, _instance,
+ _eventgroup, its_client, own_uid_, own_gid_, false,
+ [] (const bool _subscription_accepted){
+ (void)_subscription_accepted;
+ });
+ }
+ } else {
+ for (const auto its_client : _removed) {
+ stub_->send_expired_subscription(find_local(_offering_client), its_client,
+ _service, _instance, _eventgroup, ANY_EVENT, _id);
+ }
+ }
+}
+
bool
routing_manager_impl::update_security_policy_configuration(
uint32_t _uid, uint32_t _gid,
@@ -4347,7 +4537,7 @@ bool routing_manager_impl::insert_event_statistics(service_t _service, instance_
// check list for entry with least counter value
uint32_t its_min_count(0xFFFFFFFF);
auto its_tuple_to_discard = std::make_tuple(0xFFFF, 0xFFFF, 0xFFFF);
- for (const auto& it : message_statistics_) {
+ for (const auto &it : message_statistics_) {
if (it.second.counter_ < its_min_count) {
its_min_count = it.second.counter_;
its_tuple_to_discard = it.first;
@@ -4389,7 +4579,7 @@ void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code co
std::stringstream its_log;
{
std::lock_guard<std::mutex> its_lock(message_statistics_mutex_);
- for (const auto& s : message_statistics_) {
+ for (const auto &s : message_statistics_) {
if (s.second.counter_ / (its_interval / 1000) >= its_min_freq) {
uint16_t its_subscribed(0);
std::shared_ptr<event> its_event = find_event(std::get<0>(s.first), std::get<1>(s.first), std::get<2>(s.first));
@@ -4431,4 +4621,9 @@ void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code co
}
}
+void routing_manager_impl::send_suspend() const {
+
+ stub_->send_suspend();
+}
+
} // namespace vsomeip_v3
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index 872067c..06573a5 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -1071,6 +1071,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
<< " ~> Skip message!";
return;
}
+ cache_event_payload(its_message);
}
}
#ifdef USE_DLT
@@ -1162,34 +1163,35 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
[this, self, its_client, its_service, its_instance,
its_eventgroup, its_event, its_subscription_id, its_major]
(const bool _subscription_accepted){
+ std::uint32_t its_count = 0;
if(_subscription_accepted) {
send_subscribe_ack(its_client, its_service, its_instance,
its_eventgroup, its_event, its_subscription_id);
+ std::set<event_t> its_already_subscribed_events;
+ bool inserted = insert_subscription(its_service, its_instance, its_eventgroup,
+ its_event, VSOMEIP_ROUTING_CLIENT, &its_already_subscribed_events);
+ if (inserted) {
+ notify_remote_initially(its_service, its_instance, its_eventgroup,
+ its_already_subscribed_events);
+ }
+#ifdef VSOMEIP_ENABLE_COMPAT
+ send_pending_notify_ones(its_service, its_instance, its_eventgroup, its_client, true);
+#endif
+ its_count = get_remote_subscriber_count(its_service, its_instance, its_eventgroup, true);
} else {
send_subscribe_nack(its_client, its_service, its_instance,
its_eventgroup, its_event, its_subscription_id);
}
- std::set<event_t> its_already_subscribed_events;
- bool inserted = insert_subscription(its_service, its_instance, its_eventgroup,
- its_event, VSOMEIP_ROUTING_CLIENT, &its_already_subscribed_events);
- if (inserted) {
- notify_remote_initially(its_service, its_instance, its_eventgroup,
- its_already_subscribed_events);
- }
-#ifdef VSOMEIP_ENABLE_COMPAT
- send_pending_notify_ones(its_service, its_instance, its_eventgroup, its_client, true);
-#endif
- std::uint32_t its_count = get_remote_subscriber_count(
- its_service, its_instance, its_eventgroup, true);
VSOMEIP_INFO << "SUBSCRIBE("
<< std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << ":"
<< std::hex << std::setw(4) << std::setfill('0') << its_event << ":"
- << std::dec << (uint16_t)its_major << "]"
+ << std::dec << (uint16_t)its_major << "] "
<< (bool)(its_subscription_id != PENDING_SUBSCRIPTION_ID) << " "
- << std::dec << its_count;
+ << (_subscription_accepted ? std::to_string(its_count) : "-")
+ << (_subscription_accepted ? " ACCEPTED" : " NOT ACCEPTED");
#ifdef VSOMEIP_ENABLE_COMPAT
routing_manager_base::erase_incoming_subscription_state(its_client, its_service,
its_instance, its_eventgroup, its_event);
@@ -1282,7 +1284,7 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
case VSOMEIP_UNSUBSCRIBE:
if (_size != VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE) {
- VSOMEIP_WARNING << "Received an UNSUBSCRIBE command with wrong ~> skip!";
+ VSOMEIP_WARNING << "Received an UNSUBSCRIBE command with wrong size ~> skip!";
break;
}
std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
@@ -1320,6 +1322,44 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
<< std::dec << its_remote_subscriber_count;
break;
+ case VSOMEIP_EXPIRED_SUBSCRIPTION:
+ if (_size != VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE) {
+ VSOMEIP_WARNING << "Received an VSOMEIP_EXPIRED_SUBSCRIPTION command with wrong size ~> skip!";
+ break;
+ }
+ std::memcpy(&its_service, &_data[VSOMEIP_COMMAND_PAYLOAD_POS],
+ sizeof(its_service));
+ std::memcpy(&its_instance, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 2],
+ sizeof(its_instance));
+ std::memcpy(&its_eventgroup, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 4],
+ sizeof(its_eventgroup));
+ std::memcpy(&its_event, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 6],
+ sizeof(its_event));
+ std::memcpy(&its_subscription_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 8],
+ sizeof(its_subscription_id));
+ host_->on_subscription(its_service, its_instance, its_eventgroup, its_client, its_sender_uid, its_sender_gid, false, [](const bool _subscription_accepted){ (void)_subscription_accepted; });
+ if (its_subscription_id == PENDING_SUBSCRIPTION_ID) {
+ // Local subscriber: withdraw subscription
+ routing_manager_base::unsubscribe(its_client, its_sender_uid, its_sender_gid, its_service, its_instance, its_eventgroup, its_event);
+ } else {
+ // Remote subscriber: withdraw subscription only if no more remote subscriber exists
+ its_remote_subscriber_count = get_remote_subscriber_count(its_service,
+ its_instance, its_eventgroup, false);
+ if (!its_remote_subscriber_count) {
+ routing_manager_base::unsubscribe(VSOMEIP_ROUTING_CLIENT, ANY_UID, ANY_GID, its_service,
+ its_instance, its_eventgroup, its_event);
+ }
+ }
+ VSOMEIP_INFO << "UNSUBSCRIBE EXPIRED SUBSCRIPTION("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_event << "] "
+ << (bool)(its_subscription_id != PENDING_SUBSCRIPTION_ID) << " "
+ << std::dec << its_remote_subscriber_count;
+ break;
+
case VSOMEIP_SUBSCRIBE_NACK:
if (_size != VSOMEIP_SUBSCRIBE_NACK_COMMAND_SIZE) {
VSOMEIP_WARNING << "Received a VSOMEIP_SUBSCRIBE_NACK command with wrong size ~> skip!";
@@ -1538,6 +1578,11 @@ void routing_manager_proxy::on_message(const byte_t *_data, length_t _size,
}
break;
}
+
+ case VSOMEIP_SUSPEND:
+ on_suspend(); // cleanup remote subscribers
+ break;
+
default:
break;
}
@@ -1666,6 +1711,10 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
j += uint32_t(sizeof(minor_version_t));
if (routing_info_entry == routing_info_entry_e::RIE_ADD_SERVICE_INSTANCE) {
+ if (get_routing_state() == routing_state_e::RS_SUSPENDED) {
+ VSOMEIP_INFO << "rmp::" <<__func__ << " We are in suspended mode, the service will not be added!";
+ return;
+ }
{
std::lock_guard<std::mutex> its_lock(known_clients_mutex_);
known_clients_.insert(its_client);
@@ -1704,6 +1753,14 @@ void routing_manager_proxy::on_routing_info(const byte_t *_data,
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
<< std::hex << std::setw(4) << std::setfill('0') << its_instance
<< ":" << std::dec << int(its_major) << "." << std::dec << its_minor << "]";
+
+ if (its_client == get_client()) {
+ VSOMEIP_INFO << __func__
+ << ": Clearing subscriptions for service ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance << "]";
+ unsubscribe_all(its_service, its_instance);
+ }
}
its_services_size -= uint32_t(sizeof(instance_t) + sizeof(major_version_t) + sizeof(minor_version_t) );
@@ -2652,4 +2709,26 @@ void routing_manager_proxy::on_client_assign_ack(const client_t &_client) {
}
}
+void routing_manager_proxy::on_suspend() {
+
+ VSOMEIP_INFO << __func__ << ": Application "
+ << std::hex << std::setw(4) << std::setfill('0')
+ << host_->get_client();
+
+ std::lock_guard<std::mutex> its_lock(remote_subscriber_count_mutex_);
+
+ // Unsubscribe everything that is left over.
+ for (const auto &s : remote_subscriber_count_) {
+ for (const auto &i : s.second) {
+ for (const auto e : i.second)
+ routing_manager_base::unsubscribe(
+ VSOMEIP_ROUTING_CLIENT, ANY_UID, ANY_GID,
+ s.first, i.first, e.first, ANY_EVENT);
+ }
+ }
+
+ // Remove all entries.
+ remote_subscriber_count_.clear();
+}
+
} // namespace vsomeip_v3
diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp
index 9668352..406b0d9 100644
--- a/implementation/routing/src/routing_manager_stub.cpp
+++ b/implementation/routing/src/routing_manager_stub.cpp
@@ -399,7 +399,8 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size,
std::memcpy(&its_subscription_id, &_data[VSOMEIP_COMMAND_PAYLOAD_POS + 10],
sizeof(its_subscription_id));
host_->on_subscribe_nack(its_subscriber, its_service,
- its_instance, its_eventgroup, its_notifier, its_subscription_id);
+ its_instance, its_eventgroup, its_notifier,
+ its_subscription_id, false);
VSOMEIP_INFO << "SUBSCRIBE NACK("
<< std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
<< std::hex << std::setw(4) << std::setfill('0') << its_service << "."
@@ -946,6 +947,8 @@ void routing_manager_stub::on_stop_offer_service(client_t _client,
if (0 == found_service->second.size()) {
found_client->second.second.erase(_service);
}
+ inform_provider(_client, _service, _instance, _major, _minor,
+ routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE);
inform_requesters(_client, _service, _instance, _major, _minor,
routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE, false);
} else if( _major == DEFAULT_MAJOR && _minor == DEFAULT_MINOR) {
@@ -953,6 +956,8 @@ void routing_manager_stub::on_stop_offer_service(client_t _client,
if (0 == found_service->second.size()) {
found_client->second.second.erase(_service);
}
+ inform_provider(_client, _service, _instance, _major, _minor,
+ routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE);
inform_requesters(_client, _service, _instance, _major, _minor,
routing_info_entry_e::RIE_DEL_SERVICE_INSTANCE, false);
}
@@ -1285,16 +1290,9 @@ void routing_manager_stub::distribute_credentials(client_t _hoster, service_t _s
for (auto its_requesting_client : service_requests_) {
auto its_service = its_requesting_client.second.find(_service);
if (its_service != its_requesting_client.second.end()) {
- for (auto its_instance : its_service->second) {
- if (its_instance.first == ANY_INSTANCE ||
- its_instance.first == _instance) {
- its_requesting_clients.insert(its_requesting_client.first);
- } else {
- auto found_instance = its_service->second.find(_instance);
- if (found_instance != its_service->second.end()) {
- its_requesting_clients.insert(its_requesting_client.first);
- }
- }
+ if (its_service->second.find(_instance) != its_service->second.end()
+ || its_service->second.find(ANY_INSTANCE) != its_service->second.end()) {
+ its_requesting_clients.insert(its_requesting_client.first);
}
}
}
@@ -1302,11 +1300,11 @@ void routing_manager_stub::distribute_credentials(client_t _hoster, service_t _s
// search for UID / GID linked with the client ID that offers the requested services
std::pair<uint32_t, uint32_t> its_uid_gid;
if (security::get()->get_client_to_uid_gid_mapping(_hoster, its_uid_gid)) {
+ its_credentials.insert(its_uid_gid);
for (auto its_requesting_client : its_requesting_clients) {
std::pair<uint32_t, uint32_t> its_requester_uid_gid;
if (security::get()->get_client_to_uid_gid_mapping(its_requesting_client, its_requester_uid_gid)) {
if (its_uid_gid != its_requester_uid_gid) {
- its_credentials.insert(std::make_pair(std::get<0>(its_uid_gid), std::get<1>(its_uid_gid)));
create_client_credentials_info(its_requesting_client);
insert_client_credentials_info(its_requesting_client, its_credentials);
send_client_credentials_info(its_requesting_client);
@@ -1316,20 +1314,27 @@ void routing_manager_stub::distribute_credentials(client_t _hoster, service_t _s
}
}
+void routing_manager_stub::inform_provider(client_t _hoster, service_t _service,
+ instance_t _instance, major_version_t _major, minor_version_t _minor,
+ routing_info_entry_e _entry) {
+
+ if (_hoster != VSOMEIP_ROUTING_CLIENT
+ && _hoster != host_->get_client()) {
+ create_client_routing_info(_hoster);
+ insert_client_routing_info(_hoster, _entry, _hoster,
+ _service, _instance, _major, _minor);
+ send_client_routing_info(_hoster);
+ }
+};
+
void routing_manager_stub::inform_requesters(client_t _hoster, service_t _service,
instance_t _instance, major_version_t _major, minor_version_t _minor,
routing_info_entry_e _entry, bool _inform_service) {
for (auto its_client : service_requests_) {
auto its_service = its_client.second.find(_service);
if (its_service != its_client.second.end()) {
- bool send(false);
- for (auto its_instance : its_service->second) {
- if (its_instance.first == ANY_INSTANCE ||
- its_instance.first == _instance) {
- send = true;
- }
- }
- if (send) {
+ if (its_service->second.find(_instance) != its_service->second.end()
+ || its_service->second.find(ANY_INSTANCE) != its_service->second.end()) {
if (_inform_service) {
if (_hoster != VSOMEIP_ROUTING_CLIENT &&
_hoster != host_->get_client()) {
@@ -1447,6 +1452,44 @@ bool routing_manager_stub::send_unsubscribe(
}
}
+bool routing_manager_stub::send_expired_subscription(
+ const std::shared_ptr<endpoint>& _target,
+ client_t _client, service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup, event_t _event,
+ remote_subscription_id_t _id) {
+ if (_target) {
+ byte_t its_command[VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE];
+ uint32_t its_size = VSOMEIP_EXPIRED_SUBSCRIPTION_COMMAND_SIZE
+ - VSOMEIP_COMMAND_HEADER_SIZE;
+ its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_EXPIRED_SUBSCRIPTION;
+ std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &_client,
+ sizeof(_client));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size,
+ sizeof(its_size));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service,
+ sizeof(_service));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2], &_instance,
+ sizeof(_instance));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup,
+ sizeof(_eventgroup));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6], &_event,
+ sizeof(_event));
+ std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 8], &_id,
+ sizeof(_id));
+
+ return _target->send(its_command, sizeof(its_command));
+ } else {
+ VSOMEIP_WARNING << __func__ << " Couldn't send expired subscription to local client ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event << "]"
+ << " subscriber: "<< std::hex << std::setw(4) << std::setfill('0')
+ << _client;
+ return false;
+ }
+}
+
void routing_manager_stub::send_subscribe_ack(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
@@ -2257,7 +2300,7 @@ routing_manager_stub::send_requester_policies(const std::unordered_set<client_t>
pending_security_update_id_t its_policy_id;
// serialize the policies and send them...
- for (const auto& p : _policies) {
+ for (const auto &p : _policies) {
std::vector<byte_t> its_policy_data;
if (p->serialize(its_policy_data)) {
std::vector<byte_t> its_message;
@@ -2569,4 +2612,12 @@ void routing_manager_stub::on_security_update_response(
}
}
+void routing_manager_stub::send_suspend() const {
+
+ static const std::vector<byte_t> its_suspend(
+ { VSOMEIP_SUSPEND, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 });
+
+ broadcast(its_suspend);
+}
+
} // namespace vsomeip_v3
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp
index aba8647..467bca2 100644
--- a/implementation/runtime/src/application_impl.cpp
+++ b/implementation/runtime/src/application_impl.cpp
@@ -961,7 +961,7 @@ void application_impl::on_subscription(service_t _service, instance_t _instance,
}
}
- if(handler_found) {
+ if (handler_found) {
if(auto its_handler = its_handlers.first) {
// "normal" subscription handler exists
_accepted_cb(its_handler(_client, _uid, _gid, _subscribed));
diff --git a/implementation/security/src/policy.cpp b/implementation/security/src/policy.cpp
index 9630e5f..260198a 100644
--- a/implementation/security/src/policy.cpp
+++ b/implementation/security/src/policy.cpp
@@ -305,7 +305,7 @@ policy::serialize(std::vector<byte_t> &_data) const {
uint32_t its_requests_size(0);
serialize_u32(its_requests_size, _data);
- for (const auto& its_request : requests_) {
+ for (const auto &its_request : requests_) {
for (auto its_service = its_request.first.lower();
its_service <= its_request.first.upper();
its_service++) {
@@ -316,7 +316,7 @@ policy::serialize(std::vector<byte_t> &_data) const {
uint32_t its_instances_size(0);
serialize_u32(its_instances_size, _data);
- for (const auto& i : its_request.second) {
+ for (const auto &i : its_request.second) {
boost::icl::interval_set<instance_t> its_instances;
its_instances.insert(i.first);
serialize_interval_set(its_instances, _data);
diff --git a/implementation/security/src/security_impl.cpp b/implementation/security/src/security_impl.cpp
index 8a3d276..377e310 100644
--- a/implementation/security/src/security_impl.cpp
+++ b/implementation/security/src/security_impl.cpp
@@ -470,7 +470,7 @@ security_impl::update_security_policy(uint32_t _uid, uint32_t _gid,
}
if (its_matching_policy) {
- for (const auto& r : _policy->requests_) {
+ for (const auto &r : _policy->requests_) {
service_t its_lower, its_upper;
get_bounds(r.first, its_lower, its_upper);
for (auto s = its_lower; s <= its_upper; s++) {
@@ -479,7 +479,7 @@ security_impl::update_security_policy(uint32_t _uid, uint32_t _gid,
its_matching_policy->requests_ += std::make_pair(its_service, r.second);
}
}
- for (const auto& o : _policy->offers_) {
+ for (const auto &o : _policy->offers_) {
service_t its_lower, its_upper;
get_bounds(o.first, its_lower, its_upper);
for (auto s = its_lower; s <= its_upper; s++) {
@@ -1081,8 +1081,8 @@ security_impl::get_requester_policies(const std::shared_ptr<policy> _policy,
}
std::lock_guard<std::mutex> its_lock(_policy->mutex_);
- for (const auto& o : _policy->offers_) {
- for (const auto& p : its_policies) {
+ for (const auto &o : _policy->offers_) {
+ for (const auto &p : its_policies) {
if (p == _policy)
continue;
@@ -1091,7 +1091,7 @@ security_impl::get_requester_policies(const std::shared_ptr<policy> _policy,
auto its_policy = std::make_shared<policy>();
its_policy->credentials_ = p->credentials_;
- for (const auto& r : p->requests_) {
+ for (const auto &r : p->requests_) {
// o represents an offer by a service interval and its instances
// (a set of intervals)
// r represents a request by a service interval and its instances
@@ -1109,8 +1109,8 @@ security_impl::get_requester_policies(const std::shared_ptr<policy> _policy,
auto its_service_min = std::max(its_o_lower, its_r_lower);
auto its_service_max = std::min(its_r_upper, its_o_upper);
- for (const auto& i : o.second) {
- for (const auto& j : r.second) {
+ for (const auto &i : o.second) {
+ for (const auto &j : r.second) {
for (const auto& k : j.second) {
instance_t its_i_lower, its_i_upper, its_k_lower, its_k_upper;
get_bounds(i, its_i_lower, its_i_upper);
@@ -1151,7 +1151,7 @@ security_impl::get_clients(uid_t _uid, gid_t _gid,
std::unordered_set<client_t> &_clients) const {
std::lock_guard<std::mutex> its_lock(ids_mutex_);
- for (const auto& i : ids_) {
+ for (const auto &i : ids_) {
if (i.second.first == _uid && i.second.second == _gid)
_clients.insert(i.first);
}
diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp
index 778ce08..77b4258 100644
--- a/implementation/service_discovery/include/service_discovery.hpp
+++ b/implementation/service_discovery/include/service_discovery.hpp
@@ -45,6 +45,7 @@ public:
virtual void unsubscribe(service_t _service, instance_t _instance,
eventgroup_t _eventgroup, client_t _client) = 0;
virtual void unsubscribe_all(service_t _service, instance_t _instance) = 0;
+ virtual void unsubscribe_all_on_suspend() = 0;
virtual bool send(bool _is_announcing) = 0;
diff --git a/implementation/service_discovery/include/service_discovery_host.hpp b/implementation/service_discovery/include/service_discovery_host.hpp
index f864571..0f992d7 100644
--- a/implementation/service_discovery/include/service_discovery_host.hpp
+++ b/implementation/service_discovery/include/service_discovery_host.hpp
@@ -86,7 +86,7 @@ public:
virtual void on_subscribe_nack(client_t _client,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- event_t _event, remote_subscription_id_t _subscription_id) = 0;
+ event_t _event, remote_subscription_id_t _subscription_id, bool _simulated) = 0;
virtual std::chrono::steady_clock::time_point expire_subscriptions(bool _force) = 0;
diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp
index 81bb91f..c4e3835 100644
--- a/implementation/service_discovery/include/service_discovery_impl.hpp
+++ b/implementation/service_discovery/include/service_discovery_impl.hpp
@@ -79,7 +79,8 @@ public:
void unsubscribe(service_t _service, instance_t _instance,
eventgroup_t _eventgroup, client_t _client);
void unsubscribe_all(service_t _service, instance_t _instance);
- void reset_subscriptions(service_t _service, instance_t _instance);
+ void unsubscribe_all_on_suspend();
+ void remove_subscriptions(service_t _service, instance_t _instance);
bool send(bool _is_announcing);
@@ -239,7 +240,7 @@ private:
const std::shared_ptr<endpoint> &_unreliable,
boost::asio::ip::address &_address) const;
- std::shared_ptr<request> find_request(service_t _service, instance_t _instance);
+ void update_request(service_t _service, instance_t _instance);
void start_offer_debounce_timer(bool _first_start);
void on_offer_debounce_timer_expired(const boost::system::error_code &_error);
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index ca4e131..74e509b 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -165,9 +165,10 @@ service_discovery_impl::start() {
}
}
if (endpoint_ && !reliable_) {
- // rejoin multicast group
- dynamic_cast<udp_server_endpoint_impl*>(
- endpoint_.get())->join(sd_multicast_);
+ auto its_endpoint = std::dynamic_pointer_cast<
+ udp_server_endpoint_impl>(endpoint_);
+ if (its_endpoint)
+ its_endpoint->join(sd_multicast_);
}
}
is_suspended_ = false;
@@ -213,17 +214,17 @@ service_discovery_impl::release_service(
}
}
-std::shared_ptr<request>
-service_discovery_impl::find_request(service_t _service, instance_t _instance) {
+void
+service_discovery_impl::update_request(service_t _service, instance_t _instance) {
std::lock_guard<std::mutex> its_lock(requested_mutex_);
auto find_service = requested_.find(_service);
if (find_service != requested_.end()) {
auto find_instance = find_service->second.find(_instance);
if (find_instance != find_service->second.end()) {
- return find_instance->second;
+ find_instance->second->set_sent_counter(
+ std::uint8_t(repetitions_max_ + 1));
}
}
- return nullptr;
}
void
@@ -232,6 +233,13 @@ service_discovery_impl::subscribe(
eventgroup_t _eventgroup, major_version_t _major,
ttl_t _ttl, client_t _client,
const std::shared_ptr<eventgroupinfo> &_info) {
+
+ if (is_suspended_) {
+ VSOMEIP_WARNING << "service_discovery::" << __func__
+ << ": Ignoring subscription as we are suspended.";
+ return;
+ }
+
#ifdef VSOMEIP_ENABLE_COMPAT
bool is_selective(_info ? _info->is_selective() : false);
#endif // VSOMEIP_ENABLE_COMPAT
@@ -248,8 +256,8 @@ service_discovery_impl::subscribe(
if (!its_subscription->is_selective() && is_selective) {
its_subscription->set_selective(true);
its_subscription->remove_client(VSOMEIP_ROUTING_CLIENT);
- for (const auto& e : _info->get_events()) {
- for (const auto& c : e->get_subscribers(_eventgroup)) {
+ for (const auto &e : _info->get_events()) {
+ for (const auto &c : e->get_subscribers(_eventgroup)) {
its_subscription->add_client(c);
}
}
@@ -517,23 +525,65 @@ service_discovery_impl::unsubscribe_all(
}
void
-service_discovery_impl::reset_subscriptions(
+service_discovery_impl::unsubscribe_all_on_suspend() {
+
+ std::map<boost::asio::ip::address,
+ std::vector<std::shared_ptr<message_impl> > > its_stopsubscribes;
+
+ {
+ std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
+ for (auto its_service : subscribed_) {
+ for (auto its_instance : its_service.second) {
+ for (auto &its_eventgroup : its_instance.second) {
+ boost::asio::ip::address its_address;
+ auto its_current_message = std::make_shared<message_impl>();
+ auto its_subscription = its_eventgroup.second;
+ its_subscription->set_ttl(0);
+ const reliability_type_e its_reliability =
+ get_eventgroup_reliability(its_service.first, its_instance.first,
+ its_eventgroup.first, its_subscription);
+ auto its_data = create_eventgroup_entry(its_service.first, its_instance.first,
+ its_eventgroup.first, its_subscription, its_reliability);
+ auto its_reliable = its_subscription->get_endpoint(true);
+ auto its_unreliable = its_subscription->get_endpoint(false);
+ get_subscription_address(
+ its_reliable, its_unreliable, its_address);
+ if (its_data.entry_
+ && its_current_message->add_entry_data(its_data.entry_, its_data.options_)) {
+ its_stopsubscribes[its_address].push_back(its_current_message);
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Failed to create StopSubscribe entry for: "
+ << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup.first
+ << " address: " << its_address.to_string();
+ }
+ }
+ its_instance.second.clear();
+ }
+ its_service.second.clear();
+ }
+ subscribed_.clear();
+ }
+
+ for (auto its_address : its_stopsubscribes) {
+ if (!serialize_and_send(its_address.second, its_address.first)) {
+ VSOMEIP_WARNING << __func__ << ": Failed to send StopSubscribe to address: "
+ << its_address.first.to_string();
+ }
+ }
+}
+
+void
+service_discovery_impl::remove_subscriptions(
service_t _service, instance_t _instance) {
std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
auto found_service = subscribed_.find(_service);
if (found_service != subscribed_.end()) {
- auto found_instance = found_service->second.find(_instance);
- if (found_instance != found_service->second.end()) {
- for (auto &its_eventgroup : found_instance->second) {
- auto its_subscription = its_eventgroup.second;
- for (auto its_client : its_subscription->get_clients()) {
- its_subscription->set_state(its_client,
- subscription_state_e::ST_UNKNOWN);
- }
- its_subscription->set_endpoint(nullptr, true);
- its_subscription->set_endpoint(nullptr, false);
- }
+ found_service->second.erase(_instance);
+ if (found_service->second.empty()) {
+ subscribed_.erase(found_service);
}
}
}
@@ -641,16 +691,16 @@ void
service_discovery_impl::insert_find_entries(
std::vector<std::shared_ptr<message_impl> > &_messages,
const requests_t &_requests) {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
entry_data_t its_data;
its_data.entry_ = its_data.other_ = nullptr;
for (const auto& its_service : _requests) {
for (const auto& its_instance : its_service.second) {
+ std::lock_guard<std::mutex> its_lock(requested_mutex_);
auto its_request = its_instance.second;
- // check if release_service was called
+ // check if release_service was called / offer was received
auto the_service = requested_.find(its_service.first);
if ( the_service != requested_.end() ) {
auto the_instance = the_service->second.find(its_instance.first);
@@ -722,17 +772,37 @@ service_discovery_impl::create_eventgroup_entry(
case reliability_type_e::RT_RELIABLE:
if (its_reliable_endpoint) {
insert_reliable = true;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
+ "reliable endpoint is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
break;
case reliability_type_e::RT_UNRELIABLE:
if (its_unreliable_endpoint) {
insert_unreliable = true;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
+ "unreliable endpoint is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
}
break;
case reliability_type_e::RT_BOTH:
if (its_reliable_endpoint && its_unreliable_endpoint) {
insert_reliable = true;
insert_unreliable = true;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
+ "endpoint is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
+ << " reliable: " << !!its_reliable_endpoint
+ << " unreliable: " << !!its_unreliable_endpoint;
}
break;
default:
@@ -817,7 +887,6 @@ service_discovery_impl::create_eventgroup_entry(
its_entry->set_counter(0);
its_entry->set_major_version(_subscription->get_major());
its_entry->set_ttl(_subscription->get_ttl());
-
its_data.entry_ = its_entry;
}
@@ -1008,14 +1077,18 @@ service_discovery_impl::on_message(
}
const bool received_via_mcast = (_destination == sd_multicast_address_);
if (received_via_mcast) {
- std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
+ static bool must_start_last_msg_received_timer(true);
boost::system::error_code ec;
- last_msg_received_timer_.cancel(ec);
- last_msg_received_timer_.expires_from_now(
- last_msg_received_timer_timeout_, ec);
- last_msg_received_timer_.async_wait(
- std::bind(&service_discovery_impl::on_last_msg_received_timer_expired,
- shared_from_this(), std::placeholders::_1));
+
+ std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
+ if (0 < last_msg_received_timer_.cancel(ec) || must_start_last_msg_received_timer) {
+ must_start_last_msg_received_timer = false;
+ last_msg_received_timer_.expires_from_now(
+ last_msg_received_timer_timeout_, ec);
+ last_msg_received_timer_.async_wait(
+ std::bind(&service_discovery_impl::on_last_msg_received_timer_expired,
+ shared_from_this(), std::placeholders::_1));
+ }
}
current_remote_address_ = _sender;
@@ -1256,17 +1329,15 @@ service_discovery_impl::process_serviceentry(
default:
VSOMEIP_ERROR << __func__ << ": Unsupported service entry type";
}
- } else if (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_) {
- std::shared_ptr<request> its_request = find_request(its_service, its_instance);
- if (its_request) {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
- // ID: SIP_SD_830
- its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
- }
+ } else if (its_type != entry_type_e::FIND_SERVICE
+ && (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_)) {
+ // stop sending find service in repetition phase
+ update_request(its_service, its_instance);
+
remove_remote_offer_type(its_service, its_instance,
its_reliable_address, its_reliable_port,
its_unreliable_address, its_unreliable_port);
- reset_subscriptions(its_service, its_instance);
+ remove_subscriptions(its_service, its_instance);
if (!is_diagnosis_ && !is_suspended_) {
host_->del_routing_info(its_service, its_instance,
(its_reliable_port != ILLEGAL_PORT),
@@ -1304,11 +1375,9 @@ service_discovery_impl::process_offerservice_serviceentry(
return;
}
- std::shared_ptr<request> its_request = find_request(_service, _instance);
- if (its_request) {
- std::lock_guard<std::mutex> its_lock(requested_mutex_);
- its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
- }
+ // stop sending find service in repetition phase
+ update_request(_service, _instance);
+
remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN);
if (_reliable_port != ILLEGAL_PORT
&& _unreliable_port != ILLEGAL_PORT
@@ -1430,17 +1499,9 @@ service_discovery_impl::process_offerservice_serviceentry(
}
}
- host_->add_routing_info(_service, _instance,
- _major, _minor,
- _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_),
- _reliable_address, _reliable_port,
- _unreliable_address, _unreliable_port);
// No need to resubscribe for unicast offers
if (_received_via_mcast) {
- std::int32_t its_remaining = VSOMEIP_MAX_UDP_MESSAGE_SIZE;
- its_remaining -= _resubscribes.back()->get_size();
-
std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
auto found_service = subscribed_.find(_service);
if (found_service != subscribed_.end()) {
@@ -1482,6 +1543,12 @@ service_discovery_impl::process_offerservice_serviceentry(
}
}
}
+
+ host_->add_routing_info(_service, _instance,
+ _major, _minor,
+ _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_),
+ _reliable_address, _reliable_port,
+ _unreliable_address, _unreliable_port);
}
void
@@ -1616,7 +1683,7 @@ service_discovery_impl::on_endpoint_connected(
its_subscription->set_endpoint(its_reliable, true);
its_subscription->set_endpoint(its_unreliable, false);
- for (const auto& its_client : its_subscription->get_clients())
+ for (const auto its_client : its_subscription->get_clients())
its_subscription->set_state(its_client,
subscription_state_e::ST_NOT_ACKNOWLEDGED);
@@ -1726,7 +1793,7 @@ service_discovery_impl::process_eventgroupentry(
// We received a subscription for a non-existing eventgroup.
// --> Create dummy eventgroupinfo to send Nack.
its_info = std::make_shared<eventgroupinfo>(its_service, its_instance,
- its_eventgroup, its_major, its_ttl);
+ its_eventgroup, its_major, its_ttl, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS);
boost::system::error_code ec;
VSOMEIP_ERROR << __func__
<< ": Received a SubscribeEventGroup entry for unknown eventgroup "
@@ -1743,6 +1810,16 @@ service_discovery_impl::process_eventgroupentry(
// We received a subscription [n]ack for an eventgroup that does not exist.
// --> Remove subscription.
unsubscribe(its_service, its_instance, its_eventgroup, VSOMEIP_ROUTING_CLIENT);
+
+ boost::system::error_code ec;
+ VSOMEIP_WARNING << __func__
+ << ": Received a SubscribeEventGroup[N]Ack entry for unknown eventgroup "
+ << " from: " << its_sender.to_string(ec) << " for: ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup
+ << "] session: " << std::hex << std::setw(4) << std::setfill('0')
+ << its_session << ", ttl: " << its_ttl;
}
return;
}
@@ -2226,7 +2303,7 @@ service_discovery_impl::handle_eventgroup_subscription(
if (_major != _info->get_major()) {
// Create a temporary info object with TTL=0 --> send NACK
auto its_info = std::make_shared<eventgroupinfo>(_service, _instance,
- _eventgroup, _major, 0);
+ _eventgroup, _major, 0, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS);
boost::system::error_code ec;
// TODO: Add session id
VSOMEIP_ERROR << __func__
@@ -2325,42 +2402,44 @@ service_discovery_impl::handle_eventgroup_subscription(
}
}
- // Create subscription object
- auto its_subscription = std::make_shared<remote_subscription>();
- its_subscription->set_eventgroupinfo(_info);
- its_subscription->set_subscriber(its_subscriber);
- its_subscription->set_reliable(its_reliable);
- its_subscription->set_unreliable(its_unreliable);
- its_subscription->reset(_clients);
+ if (its_subscriber) {
+ // Create subscription object
+ auto its_subscription = std::make_shared<remote_subscription>();
+ its_subscription->set_eventgroupinfo(_info);
+ its_subscription->set_subscriber(its_subscriber);
+ its_subscription->set_reliable(its_reliable);
+ its_subscription->set_unreliable(its_unreliable);
+ its_subscription->reset(_clients);
- if (_ttl == 0) { // --> unsubscribe
- its_subscription->set_ttl(0);
- if (!_is_stop_subscribe_subscribe) {
- {
- std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
- pending_remote_subscriptions_[its_subscription] = _acknowledgement;
- _acknowledgement->add_subscription(its_subscription);
+ if (_ttl == 0) { // --> unsubscribe
+ its_subscription->set_ttl(0);
+ if (!_is_stop_subscribe_subscribe) {
+ {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ pending_remote_subscriptions_[its_subscription] = _acknowledgement;
+ _acknowledgement->add_subscription(its_subscription);
+ }
+ host_->on_remote_unsubscribe(its_subscription);
}
- host_->on_remote_unsubscribe(its_subscription);
+ return;
}
- return;
- }
- if (_force_initial_events) {
- its_subscription->set_force_initial_events(true);
- }
- its_subscription->set_ttl(_ttl
- * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_));
+ if (_force_initial_events) {
+ its_subscription->set_force_initial_events(true);
+ }
+ its_subscription->set_ttl(_ttl
+ * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_));
- {
- std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
- pending_remote_subscriptions_[its_subscription] = _acknowledgement;
- _acknowledgement->add_subscription(its_subscription);
- }
+ {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ pending_remote_subscriptions_[its_subscription] = _acknowledgement;
+ _acknowledgement->add_subscription(its_subscription);
+ }
- host_->on_remote_subscribe(its_subscription,
- std::bind(&service_discovery_impl::update_remote_subscription,
- shared_from_this(), std::placeholders::_1));
+ host_->on_remote_subscribe(its_subscription,
+ std::bind(&service_discovery_impl::update_remote_subscription,
+ shared_from_this(), std::placeholders::_1));
+ }
}
void
@@ -2380,7 +2459,7 @@ service_discovery_impl::handle_eventgroup_subscription_nack(
for (const auto& its_client : _clients) {
host_->on_subscribe_nack(its_client,
_service, _instance, _eventgroup, ANY_EVENT,
- PENDING_SUBSCRIPTION_ID); // TODO: This is a dummy call...
+ PENDING_SUBSCRIPTION_ID, false); // TODO: This is a dummy call...
}
@@ -3339,6 +3418,7 @@ service_discovery_impl::get_ttl_factor(
void
service_discovery_impl::on_last_msg_received_timer_expired(
const boost::system::error_code &_error) {
+
if (!_error) {
// We didn't receive a multicast message within 110% of the cyclic_offer_delay_
VSOMEIP_WARNING << "Didn't receive a multicast SD message for " <<
@@ -3346,8 +3426,10 @@ service_discovery_impl::on_last_msg_received_timer_expired(
// Rejoin multicast group
if (endpoint_ && !reliable_) {
- dynamic_cast<udp_server_endpoint_impl*>(
- endpoint_.get())->join(sd_multicast_);
+ auto its_endpoint = std::dynamic_pointer_cast<
+ udp_server_endpoint_impl>(endpoint_);
+ if (its_endpoint)
+ its_endpoint->join(sd_multicast_);
}
{
boost::system::error_code ec;