diff options
Diffstat (limited to 'implementation/endpoints/src/udp_server_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/udp_server_endpoint_impl.cpp | 186 |
1 files changed, 104 insertions, 82 deletions
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index 0a65b2e..9d22d51 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -37,6 +37,7 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( _configuration), unicast_socket_(_io, _local.protocol()), unicast_recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0), + multicast_id_(0), joined_group_(false), local_port_(_local.port()), tp_reassembler_(std::make_shared<tp::tp_reassembler>(_configuration->get_max_message_size_unreliable(), _io)), @@ -77,6 +78,7 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( boost::asio::socket_base::broadcast option(true); unicast_socket_.set_option(option, ec); boost::asio::detail::throw_error(ec, "broadcast option"); + const std::uint32_t its_udp_recv_buffer_size = configuration_->get_udp_receive_buffer_size(); unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size( @@ -124,9 +126,8 @@ void udp_server_endpoint_impl::start() { void udp_server_endpoint_impl::stop() { server_endpoint_impl::stop(); - { - std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_); + std::lock_guard<std::mutex> its_lock(unicast_mutex_); if (unicast_socket_.is_open()) { boost::system::error_code its_error; @@ -136,7 +137,7 @@ void udp_server_endpoint_impl::stop() { } { - std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); + std::lock_guard<std::mutex> its_lock(multicast_mutex_); if (multicast_socket_ && multicast_socket_->is_open()) { boost::system::error_code its_error; @@ -150,11 +151,11 @@ void udp_server_endpoint_impl::stop() { void udp_server_endpoint_impl::receive() { receive_unicast(); - receive_multicast(); } void udp_server_endpoint_impl::receive_unicast() { - std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_); + + std::lock_guard<std::mutex> its_lock(unicast_mutex_); if(unicast_socket_.is_open()) { unicast_socket_.async_receive_from( @@ -172,12 +173,14 @@ void udp_server_endpoint_impl::receive_unicast() { } } -void udp_server_endpoint_impl::receive_multicast() { - std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); +// +// receive_multicast is called with multicast_mutex_ being hold +// +void udp_server_endpoint_impl::receive_multicast(uint8_t _multicast_id) { - if (multicast_socket_ && multicast_socket_->is_open()) { + if (_multicast_id == multicast_id_ && multicast_socket_ && multicast_socket_->is_open()) { multicast_socket_->async_receive_from( - boost::asio::buffer(&(*multicast_recv_buffer_)[0], max_message_size_), + boost::asio::buffer(&multicast_recv_buffer_[0], max_message_size_), multicast_remote_, std::bind( &udp_server_endpoint_impl::on_multicast_received, @@ -185,7 +188,8 @@ void udp_server_endpoint_impl::receive_multicast() { udp_server_endpoint_impl >(shared_from_this()), std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3 + std::placeholders::_3, + _multicast_id ) ); } @@ -194,6 +198,7 @@ void udp_server_endpoint_impl::receive_multicast() { bool udp_server_endpoint_impl::send_to( const std::shared_ptr<endpoint_definition> _target, const byte_t *_data, uint32_t _size) { + std::lock_guard<std::mutex> its_lock(mutex_); endpoint_type its_target(_target->get_address(), _target->get_port()); return send_intern(its_target, _data, _size); @@ -202,6 +207,7 @@ bool udp_server_endpoint_impl::send_to( bool udp_server_endpoint_impl::send_error( const std::shared_ptr<endpoint_definition> _target, const byte_t *_data, uint32_t _size) { + bool ret(false); std::lock_guard<std::mutex> its_lock(mutex_); const endpoint_type its_target(_target->get_address(), _target->get_port()); @@ -236,64 +242,72 @@ void udp_server_endpoint_impl::send_queued( << (int)(*its_buffer)[i] << " "; VSOMEIP_INFO << msg.str(); #endif - std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_); + std::lock_guard<std::mutex> its_lock(unicast_mutex_); unicast_socket_.async_send_to( - boost::asio::buffer(*its_buffer), - _queue_iterator->first, - std::bind( - &udp_server_endpoint_base_impl::send_cbk, - shared_from_this(), - _queue_iterator, - std::placeholders::_1, - std::placeholders::_2 - ) - ); + boost::asio::buffer(*its_buffer), + _queue_iterator->first, + std::bind( + &udp_server_endpoint_base_impl::send_cbk, + shared_from_this(), + _queue_iterator, + std::placeholders::_1, + std::placeholders::_2 + ) + ); } void udp_server_endpoint_impl::get_configured_times_from_endpoint( service_t _service, method_t _method, std::chrono::nanoseconds *_debouncing, std::chrono::nanoseconds *_maximum_retention) const { + configuration_->get_configured_timing_responses(_service, udp_server_endpoint_base_impl::local_.address().to_string(), udp_server_endpoint_base_impl::local_.port(), _method, _debouncing, _maximum_retention); } +// +// Both is_joined - methods must be called with multicast_mutex_ being hold! +// bool udp_server_endpoint_impl::is_joined(const std::string &_address) const { - std::lock_guard<std::mutex> its_lock(joined_mutex_); + return (joined_.find(_address) != joined_.end()); } bool udp_server_endpoint_impl::is_joined( const std::string &_address, bool* _received) const { - *_received = false; - std::lock_guard<std::mutex> its_lock(joined_mutex_); + const auto found_address = joined_.find(_address); if (found_address != joined_.end()) { *_received = found_address->second; + } else { + *_received = false; } + return (found_address != joined_.end()); } void udp_server_endpoint_impl::join(const std::string &_address) { + bool has_received(false); + // + // join_func must be called with multicast_mutex_ being hold! + // auto join_func = [this](const std::string &_address) { try { - VSOMEIP_TRACE << "Joining to multicast group " << _address + VSOMEIP_DEBUG << "Joining to multicast group " << _address << " from " << local_.address().to_string(); boost::system::error_code ec; - if (!multicast_recv_buffer_) { - multicast_recv_buffer_ = std::unique_ptr<message_buffer_t>( - new message_buffer_t(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0)); - } + if (multicast_recv_buffer_.empty()) + multicast_recv_buffer_.resize(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0); - if (!multicast_ep_) { - multicast_ep_ = std::unique_ptr<endpoint_type>( + if (!multicast_local_) { + multicast_local_ = std::unique_ptr<endpoint_type>( new endpoint_type(boost::asio::ip::address_v4::any(), local_port_)); } @@ -305,7 +319,7 @@ void udp_server_endpoint_impl::join(const std::string &_address) { multicast_socket_->set_option(optionReuseAddress, ec); boost::asio::detail::throw_error(ec, "reuse address in multicast"); - multicast_socket_->bind(*multicast_ep_, ec); + multicast_socket_->bind(*multicast_local_, ec); boost::asio::detail::throw_error(ec, "bind multicast"); const std::uint32_t its_udp_recv_buffer_size = @@ -342,8 +356,8 @@ void udp_server_endpoint_impl::join(const std::string &_address) { ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO, &optval, sizeof(optval)); #endif - - receive_multicast(); + multicast_id_++; + receive_multicast(multicast_id_); } bool is_v4(false); @@ -355,55 +369,50 @@ void udp_server_endpoint_impl::join(const std::string &_address) { } if (is_v4) { - std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true)); multicast_socket_->set_option( boost::asio::ip::multicast::enable_loopback(false)); -#ifdef _WIN32 multicast_socket_->set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v4(), local_.address().to_v4())); -#else - multicast_socket_->set_option(boost::asio::ip::multicast::join_group( - boost::asio::ip::address::from_string(_address).to_v4())); -#endif } else if (is_v6) { - std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true)); multicast_socket_->set_option( boost::asio::ip::multicast::enable_loopback(false)); -#ifdef _WIN32 multicast_socket_->set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v6(), local_.address().to_v6().scope_id())); -#else - multicast_socket_->set_option(boost::asio::ip::multicast::join_group( - boost::asio::ip::address::from_string(_address).to_v6())); -#endif - } - { - std::lock_guard<std::mutex> its_lock(joined_mutex_); - joined_[_address] = false; } + + joined_[_address] = false; joined_group_ = true; + } catch (const std::exception &e) { VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what(); } }; + std::lock_guard<std::mutex> its_lock(multicast_mutex_); if (!is_joined(_address, &has_received)) { join_func(_address); } else if (!has_received) { // joined the multicast group but didn't receive a event yet -> rejoin - leave(_address); + leave_unlocked(_address); join_func(_address); } } void udp_server_endpoint_impl::leave(const std::string &_address) { + + std::lock_guard<std::mutex> its_lock(multicast_mutex_); + leave_unlocked(_address); +} + +void udp_server_endpoint_impl::leave_unlocked(const std::string &_address) { + try { if (is_joined(_address)) { - VSOMEIP_TRACE << "Leaving the multicast group " << _address + VSOMEIP_DEBUG << "Leaving the multicast group " << _address << " from " << local_.address().to_string(); bool is_v4(false); @@ -414,24 +423,22 @@ void udp_server_endpoint_impl::leave(const std::string &_address) { is_v6 = local_.address().is_v6(); } if (is_v4) { - std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); multicast_socket_->set_option(boost::asio::ip::multicast::leave_group( boost::asio::ip::address::from_string(_address))); } else if (is_v6) { - std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); multicast_socket_->set_option(boost::asio::ip::multicast::leave_group( boost::asio::ip::address::from_string(_address))); } - { - std::lock_guard<std::mutex> its_lock(joined_mutex_); - joined_.erase(_address); - if (!joined_.size()) { - joined_group_ = false; - - multicast_socket_.reset(nullptr); - multicast_ep_.reset(nullptr); - multicast_recv_buffer_.reset(nullptr); - } + + joined_.erase(_address); + if (0 == joined_.size()) { + joined_group_ = false; + + boost::system::error_code ec; + multicast_socket_->cancel(ec); + + multicast_socket_.reset(nullptr); + multicast_local_.reset(nullptr); } } } @@ -473,21 +480,36 @@ void udp_server_endpoint_impl::on_unicast_received( boost::system::error_code const &_error, std::size_t _bytes, boost::asio::ip::address const &_destination) { - on_message_received(_error, _bytes, _destination, unicast_remote_, unicast_recv_buffer_); - receive_unicast(); + + if (_error != boost::asio::error::operation_aborted) { + { + // By locking the multicast mutex here it is ensured that unicast + // & multicast messages are not processed in parallel. This aligns + // the behavior of endpoints with one and two active sockets. + std::lock_guard<std::mutex> its_lock(multicast_mutex_); + on_message_received(_error, _bytes, _destination, + unicast_remote_, unicast_recv_buffer_); + } + receive_unicast(); + } } void udp_server_endpoint_impl::on_multicast_received( boost::system::error_code const &_error, std::size_t _bytes, - boost::asio::ip::address const &_destination) { + boost::asio::ip::address const &_destination, + uint8_t _multicast_id) { + + std::lock_guard<std::mutex> its_lock(multicast_mutex_); + if (_error != boost::asio::error::operation_aborted) { + // Filter messages sent from the same source address + if (multicast_remote_.address() != local_.address()) { + on_message_received(_error, _bytes, _destination, + multicast_remote_, multicast_recv_buffer_); + } - // Filter messages sent from the same source address - if (multicast_remote_.address() != local_.address()) { - on_message_received(_error, _bytes, _destination, multicast_remote_, *multicast_recv_buffer_); + receive_multicast(_multicast_id); } - - receive_multicast(); } void udp_server_endpoint_impl::on_message_received( @@ -579,7 +601,6 @@ void udp_server_endpoint_impl::on_message_received( } else if (its_service != VSOMEIP_SD_SERVICE && utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]) && joined_group_) { - std::lock_guard<std::mutex> its_lock(joined_mutex_); boost::system::error_code ec; const auto found_address = joined_.find(_destination.to_string(ec)); if (found_address != joined_.end()) { @@ -599,14 +620,12 @@ void udp_server_endpoint_impl::on_message_received( const session_t its_session = VSOMEIP_BYTES_TO_WORD( res.second[VSOMEIP_SESSION_POS_MIN], res.second[VSOMEIP_SESSION_POS_MAX]); - clients_mutex_.lock(); + std::lock_guard<std::mutex> its_client_lock(clients_mutex_); clients_[its_client][its_session] = _remote; - clients_mutex_.unlock(); } } else if (its_service != VSOMEIP_SD_SERVICE && utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS]) && joined_group_) { - std::lock_guard<std::mutex> its_lock(joined_mutex_); boost::system::error_code ec; const auto found_address = joined_.find(_destination.to_string(ec)); if (found_address != joined_.end()) { @@ -668,9 +687,10 @@ void udp_server_endpoint_impl::print_status() { VSOMEIP_INFO << "status use: " << std::dec << local_port_ << " number queues: " << std::dec << queues_.size() - << " recv_buffer: " << std::dec << unicast_recv_buffer_.capacity() - << " multicast_recv_buffer: " << std::dec - << (multicast_recv_buffer_ ? multicast_recv_buffer_->capacity() : 0); + << " recv_buffer: " + << std::dec << unicast_recv_buffer_.capacity() + << " multicast_recv_buffer: " + << std::dec << multicast_recv_buffer_.capacity(); for (const auto &c : queues_) { std::size_t its_data_size(0); @@ -706,7 +726,8 @@ bool udp_server_endpoint_impl::is_reliable() const { } const std::string udp_server_endpoint_impl::get_address_port_local() const { - std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_); + + std::lock_guard<std::mutex> its_lock(unicast_mutex_); std::string its_address_port; its_address_port.reserve(21); boost::system::error_code ec; @@ -721,8 +742,9 @@ const std::string udp_server_endpoint_impl::get_address_port_local() const { return its_address_port; } -bool udp_server_endpoint_impl::tp_segmentation_enabled(service_t _service, - method_t _method) const { +bool udp_server_endpoint_impl::tp_segmentation_enabled( + service_t _service, method_t _method) const { + return configuration_->tp_segment_messages_service_to_client(_service, local_.address().to_string(), local_.port(), _method); |