summaryrefslogtreecommitdiff
path: root/implementation/routing/src/eventgroupinfo.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/eventgroupinfo.cpp')
-rw-r--r--implementation/routing/src/eventgroupinfo.cpp167
1 files changed, 125 insertions, 42 deletions
diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp
index d09ed1b..50bbdb6 100644
--- a/implementation/routing/src/eventgroupinfo.cpp
+++ b/implementation/routing/src/eventgroupinfo.cpp
@@ -26,13 +26,14 @@ eventgroupinfo::eventgroupinfo()
threshold_(0),
id_(PENDING_SUBSCRIPTION_ID),
reliability_(reliability_type_e::RT_UNKNOWN),
- reliability_auto_mode_(false) {
+ reliability_auto_mode_(false),
+ max_remote_subscribers_(VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS) {
}
eventgroupinfo::eventgroupinfo(
const service_t _service, const instance_t _instance,
const eventgroup_t _eventgroup, const major_version_t _major,
- const ttl_t _ttl)
+ const ttl_t _ttl, const uint8_t _max_remote_subscribers)
: service_(_service),
instance_(_instance),
eventgroup_(_eventgroup),
@@ -42,7 +43,8 @@ eventgroupinfo::eventgroupinfo(
threshold_(0),
id_(PENDING_SUBSCRIPTION_ID),
reliability_(reliability_type_e::RT_UNKNOWN),
- reliability_auto_mode_(false) {
+ reliability_auto_mode_(false),
+ max_remote_subscribers_(_max_remote_subscribers) {
}
eventgroupinfo::~eventgroupinfo() {
@@ -213,65 +215,123 @@ eventgroupinfo::update_remote_subscription(
const std::chrono::steady_clock::time_point &_expiration,
std::set<client_t> &_changed, remote_subscription_id_t &_id,
const bool _is_subscribe) {
- std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
- for (const auto& its_item : subscriptions_) {
- if (its_item.second->equals(_subscription)) {
- // update existing subscription
- _changed = its_item.second->update(
- _subscription->get_clients(), _expiration, _is_subscribe);
- _id = its_item.second->get_id();
-
- // Copy acknowledgment states from existing subscription
- for (const auto& its_client : _subscription->get_clients()) {
- _subscription->set_client_state(its_client,
- its_item.second->get_client_state(its_client));
- }
+ bool its_result(false);
+ std::shared_ptr<endpoint_definition> its_subscriber;
+ std::set<std::shared_ptr<event> > its_events;
- if (_is_subscribe) {
- if (!_changed.empty()) {
- // New clients:
- // Let this be a child subscription
- _subscription->set_parent(its_item.second);
- update_id();
- _subscription->set_id(id_);
- subscriptions_[id_] = _subscription;
- } else {
- if (!_subscription->is_pending()) {
- if (!_subscription->force_initial_events()) {
- _subscription->set_initial(false);
- }
+ {
+ std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+
+ for (const auto& its_item : subscriptions_) {
+ if (its_item.second->equals(_subscription)) {
+ // update existing subscription
+ _changed = its_item.second->update(
+ _subscription->get_clients(), _expiration, _is_subscribe);
+ _id = its_item.second->get_id();
+
+ // Copy acknowledgment states from existing subscription
+ for (const auto& its_client : _subscription->get_clients()) {
+ const auto its_state = its_item.second->get_client_state(its_client);
+ if (_is_subscribe &&
+ its_state == remote_subscription_state_e::SUBSCRIPTION_UNKNOWN) {
+ _subscription->set_client_state(its_client,
+ remote_subscription_state_e::SUBSCRIPTION_PENDING);
+ _changed.insert(its_client);
} else {
- its_item.second->set_answers(
- its_item.second->get_answers() + 1);
- _subscription->set_parent(its_item.second);
- _subscription->set_answers(0);
+ _subscription->set_client_state(its_client, its_state);
}
}
- } else {
- if (its_item.second->is_pending()) {
- for (const auto &its_event : events_)
- its_event->remove_pending(
- its_item.second->get_subscriber());
+
+ if (_is_subscribe) {
+ if (!_changed.empty()) {
+ // New clients:
+ // Let this be a child subscription
+ _subscription->set_parent(its_item.second);
+ update_id();
+ _subscription->set_id(id_);
+ subscriptions_[id_] = _subscription;
+ } else {
+ if (!_subscription->is_pending()) {
+ if (!_subscription->force_initial_events()) {
+ _subscription->set_initial(false);
+ }
+ } else {
+ its_item.second->set_answers(
+ its_item.second->get_answers() + 1);
+ _subscription->set_parent(its_item.second);
+ _subscription->set_answers(0);
+ }
+ }
+ } else {
+ if (its_item.second->is_pending()) {
+ its_subscriber = its_item.second->get_subscriber();
+ }
}
+
+ its_result = true;
+ break;
}
+ }
+ }
- return true;
+ if (its_subscriber) {
+ {
+ // Build set of events first to avoid having to
+ // hold the "events_mutex_" in parallel to the internal event mutexes.
+ std::lock_guard<std::mutex> its_lock(events_mutex_);
+ for (const auto &its_event : events_)
+ its_events.insert(its_event);
}
+ for (const auto &its_event : its_events)
+ its_event->remove_pending(its_subscriber);
}
- return false;
+ return (its_result);
+}
+
+bool
+eventgroupinfo::is_remote_subscription_limit_reached(
+ const std::shared_ptr<remote_subscription> &_subscription) {
+ bool limit_reached(false);
+
+ if (subscriptions_.size() <= max_remote_subscribers_) {
+ return false;
+ }
+
+ boost::asio::ip::address its_address;
+ if (_subscription->get_ip_address(its_address)) {
+ auto find_address = remote_subscribers_count_.find(its_address);
+ if (find_address != remote_subscribers_count_.end()) {
+ if (find_address->second > max_remote_subscribers_) {
+ VSOMEIP_WARNING << ": remote subscriber limit [" << std::dec
+ << (uint32_t)max_remote_subscribers_ << "] to ["
+ << std::hex << std::setw(4) << std::setfill('0') << service_ << "."
+ << std::hex << std::setw(4) << std::setfill('0') << instance_ << "."
+ << std::hex << std::setw(4) << std::setfill('0') << eventgroup_ << "]"
+ << " reached for remote address: " << its_address.to_string()
+ << " rejecting subscription!";
+ return true;
+ }
+ }
+ }
+ return limit_reached;
}
remote_subscription_id_t
eventgroupinfo::add_remote_subscription(
const std::shared_ptr<remote_subscription> &_subscription) {
std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+
update_id();
_subscription->set_id(id_);
subscriptions_[id_] = _subscription;
+ boost::asio::ip::address its_address;
+ if (_subscription->get_ip_address(its_address)) {
+ remote_subscribers_count_[its_address]++;
+ }
return id_;
}
@@ -291,6 +351,20 @@ void
eventgroupinfo::remove_remote_subscription(
const remote_subscription_id_t _id) {
std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
+
+ auto find_subscription = subscriptions_.find(_id);
+ if (find_subscription != subscriptions_.end()) {
+ boost::asio::ip::address its_address;
+ if (find_subscription->second->get_ip_address(its_address)) {
+ auto find_address = remote_subscribers_count_.find(its_address);
+ if (find_address != remote_subscribers_count_.end()) {
+ if(find_address->second != 0) {
+ find_address->second--;
+ }
+ }
+ }
+ }
+
subscriptions_.erase(_id);
}
@@ -298,6 +372,7 @@ void
eventgroupinfo::clear_remote_subscriptions() {
std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
subscriptions_.clear();
+ remote_subscribers_count_.clear();
}
std::set<std::shared_ptr<endpoint_definition> >
@@ -390,11 +465,19 @@ eventgroupinfo::send_initial_events(
}
// Send events
- for (const auto& its_event : its_reliable_events)
+ for (const auto &its_event : its_reliable_events)
its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _reliable);
- for (const auto& its_event : its_unreliable_events)
+ for (const auto &its_event : its_unreliable_events)
its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _unreliable);
}
+uint8_t eventgroupinfo::get_max_remote_subscribers() const {
+ return max_remote_subscribers_;
+}
+
+void eventgroupinfo::set_max_remote_subscribers(uint8_t _max_remote_subscribers) {
+ max_remote_subscribers_ = _max_remote_subscribers;
+}
+
} // namespace vsomeip_v3