diff options
Diffstat (limited to 'implementation/routing/src/eventgroupinfo.cpp')
-rw-r--r-- | implementation/routing/src/eventgroupinfo.cpp | 167 |
1 files changed, 125 insertions, 42 deletions
diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp index d09ed1b..50bbdb6 100644 --- a/implementation/routing/src/eventgroupinfo.cpp +++ b/implementation/routing/src/eventgroupinfo.cpp @@ -26,13 +26,14 @@ eventgroupinfo::eventgroupinfo() threshold_(0), id_(PENDING_SUBSCRIPTION_ID), reliability_(reliability_type_e::RT_UNKNOWN), - reliability_auto_mode_(false) { + reliability_auto_mode_(false), + max_remote_subscribers_(VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS) { } eventgroupinfo::eventgroupinfo( const service_t _service, const instance_t _instance, const eventgroup_t _eventgroup, const major_version_t _major, - const ttl_t _ttl) + const ttl_t _ttl, const uint8_t _max_remote_subscribers) : service_(_service), instance_(_instance), eventgroup_(_eventgroup), @@ -42,7 +43,8 @@ eventgroupinfo::eventgroupinfo( threshold_(0), id_(PENDING_SUBSCRIPTION_ID), reliability_(reliability_type_e::RT_UNKNOWN), - reliability_auto_mode_(false) { + reliability_auto_mode_(false), + max_remote_subscribers_(_max_remote_subscribers) { } eventgroupinfo::~eventgroupinfo() { @@ -213,65 +215,123 @@ eventgroupinfo::update_remote_subscription( const std::chrono::steady_clock::time_point &_expiration, std::set<client_t> &_changed, remote_subscription_id_t &_id, const bool _is_subscribe) { - std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); - for (const auto& its_item : subscriptions_) { - if (its_item.second->equals(_subscription)) { - // update existing subscription - _changed = its_item.second->update( - _subscription->get_clients(), _expiration, _is_subscribe); - _id = its_item.second->get_id(); - - // Copy acknowledgment states from existing subscription - for (const auto& its_client : _subscription->get_clients()) { - _subscription->set_client_state(its_client, - its_item.second->get_client_state(its_client)); - } + bool its_result(false); + std::shared_ptr<endpoint_definition> its_subscriber; + std::set<std::shared_ptr<event> > its_events; - if (_is_subscribe) { - if (!_changed.empty()) { - // New clients: - // Let this be a child subscription - _subscription->set_parent(its_item.second); - update_id(); - _subscription->set_id(id_); - subscriptions_[id_] = _subscription; - } else { - if (!_subscription->is_pending()) { - if (!_subscription->force_initial_events()) { - _subscription->set_initial(false); - } + { + std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + + for (const auto& its_item : subscriptions_) { + if (its_item.second->equals(_subscription)) { + // update existing subscription + _changed = its_item.second->update( + _subscription->get_clients(), _expiration, _is_subscribe); + _id = its_item.second->get_id(); + + // Copy acknowledgment states from existing subscription + for (const auto& its_client : _subscription->get_clients()) { + const auto its_state = its_item.second->get_client_state(its_client); + if (_is_subscribe && + its_state == remote_subscription_state_e::SUBSCRIPTION_UNKNOWN) { + _subscription->set_client_state(its_client, + remote_subscription_state_e::SUBSCRIPTION_PENDING); + _changed.insert(its_client); } else { - its_item.second->set_answers( - its_item.second->get_answers() + 1); - _subscription->set_parent(its_item.second); - _subscription->set_answers(0); + _subscription->set_client_state(its_client, its_state); } } - } else { - if (its_item.second->is_pending()) { - for (const auto &its_event : events_) - its_event->remove_pending( - its_item.second->get_subscriber()); + + if (_is_subscribe) { + if (!_changed.empty()) { + // New clients: + // Let this be a child subscription + _subscription->set_parent(its_item.second); + update_id(); + _subscription->set_id(id_); + subscriptions_[id_] = _subscription; + } else { + if (!_subscription->is_pending()) { + if (!_subscription->force_initial_events()) { + _subscription->set_initial(false); + } + } else { + its_item.second->set_answers( + its_item.second->get_answers() + 1); + _subscription->set_parent(its_item.second); + _subscription->set_answers(0); + } + } + } else { + if (its_item.second->is_pending()) { + its_subscriber = its_item.second->get_subscriber(); + } } + + its_result = true; + break; } + } + } - return true; + if (its_subscriber) { + { + // Build set of events first to avoid having to + // hold the "events_mutex_" in parallel to the internal event mutexes. + std::lock_guard<std::mutex> its_lock(events_mutex_); + for (const auto &its_event : events_) + its_events.insert(its_event); } + for (const auto &its_event : its_events) + its_event->remove_pending(its_subscriber); } - return false; + return (its_result); +} + +bool +eventgroupinfo::is_remote_subscription_limit_reached( + const std::shared_ptr<remote_subscription> &_subscription) { + bool limit_reached(false); + + if (subscriptions_.size() <= max_remote_subscribers_) { + return false; + } + + boost::asio::ip::address its_address; + if (_subscription->get_ip_address(its_address)) { + auto find_address = remote_subscribers_count_.find(its_address); + if (find_address != remote_subscribers_count_.end()) { + if (find_address->second > max_remote_subscribers_) { + VSOMEIP_WARNING << ": remote subscriber limit [" << std::dec + << (uint32_t)max_remote_subscribers_ << "] to [" + << std::hex << std::setw(4) << std::setfill('0') << service_ << "." + << std::hex << std::setw(4) << std::setfill('0') << instance_ << "." + << std::hex << std::setw(4) << std::setfill('0') << eventgroup_ << "]" + << " reached for remote address: " << its_address.to_string() + << " rejecting subscription!"; + return true; + } + } + } + return limit_reached; } remote_subscription_id_t eventgroupinfo::add_remote_subscription( const std::shared_ptr<remote_subscription> &_subscription) { std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + update_id(); _subscription->set_id(id_); subscriptions_[id_] = _subscription; + boost::asio::ip::address its_address; + if (_subscription->get_ip_address(its_address)) { + remote_subscribers_count_[its_address]++; + } return id_; } @@ -291,6 +351,20 @@ void eventgroupinfo::remove_remote_subscription( const remote_subscription_id_t _id) { std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); + + auto find_subscription = subscriptions_.find(_id); + if (find_subscription != subscriptions_.end()) { + boost::asio::ip::address its_address; + if (find_subscription->second->get_ip_address(its_address)) { + auto find_address = remote_subscribers_count_.find(its_address); + if (find_address != remote_subscribers_count_.end()) { + if(find_address->second != 0) { + find_address->second--; + } + } + } + } + subscriptions_.erase(_id); } @@ -298,6 +372,7 @@ void eventgroupinfo::clear_remote_subscriptions() { std::lock_guard<std::mutex> its_lock(subscriptions_mutex_); subscriptions_.clear(); + remote_subscribers_count_.clear(); } std::set<std::shared_ptr<endpoint_definition> > @@ -390,11 +465,19 @@ eventgroupinfo::send_initial_events( } // Send events - for (const auto& its_event : its_reliable_events) + for (const auto &its_event : its_reliable_events) its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _reliable); - for (const auto& its_event : its_unreliable_events) + for (const auto &its_event : its_unreliable_events) its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _unreliable); } +uint8_t eventgroupinfo::get_max_remote_subscribers() const { + return max_remote_subscribers_; +} + +void eventgroupinfo::set_max_remote_subscribers(uint8_t _max_remote_subscribers) { + max_remote_subscribers_ = _max_remote_subscribers; +} + } // namespace vsomeip_v3 |