diff options
Diffstat (limited to 'implementation/endpoints/src/udp_server_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/udp_server_endpoint_impl.cpp | 111 |
1 files changed, 77 insertions, 34 deletions
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index 28079d1..7a7a2e4 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -28,7 +28,8 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( : server_endpoint_impl<ip::udp_ext>( _host, _local, _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE), socket_(_io, _local.protocol()), - recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0) { + recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0), + local_port_(_local.port()) { boost::system::error_code ec; boost::asio::socket_base::reuse_address optionReuseAddress(true); @@ -57,7 +58,7 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( socket_.set_option(option, ec); boost::asio::detail::throw_error(ec, "broadcast option"); -#ifdef WIN32 +#ifdef _WIN32 const char* optval("0001"); ::setsockopt(socket_.native(), IPPROTO_IP, IP_PKTINFO, optval, sizeof(optval)); @@ -80,17 +81,19 @@ void udp_server_endpoint_impl::start() { } void udp_server_endpoint_impl::stop() { - std::lock_guard<std::mutex> its_lock(stop_mutex_); server_endpoint_impl::stop(); - if (socket_.is_open()) { - boost::system::error_code its_error; - socket_.shutdown(socket_type::shutdown_both, its_error); - socket_.close(its_error); + { + std::lock_guard<std::mutex> its_lock(socket_mutex_); + if (socket_.is_open()) { + boost::system::error_code its_error; + socket_.shutdown(socket_type::shutdown_both, its_error); + socket_.close(its_error); + } } } void udp_server_endpoint_impl::receive() { - std::lock_guard<std::mutex> its_lock(stop_mutex_); + std::lock_guard<std::mutex> its_lock(socket_mutex_); if(socket_.is_open()) { socket_.async_receive_from( boost::asio::buffer(&recv_buffer_[0], max_message_size_), @@ -131,31 +134,44 @@ void udp_server_endpoint_impl::send_queued( << (int)(*its_buffer)[i] << " "; VSOMEIP_INFO << msg.str(); #endif - socket_.async_send_to( - boost::asio::buffer(*its_buffer), - _queue_iterator->first, - std::bind( - &udp_server_endpoint_base_impl::send_cbk, - shared_from_this(), - _queue_iterator, - std::placeholders::_1, - std::placeholders::_2 - ) - ); + { + std::lock_guard<std::mutex> its_lock(socket_mutex_); + socket_.async_send_to( + boost::asio::buffer(*its_buffer), + _queue_iterator->first, + std::bind( + &udp_server_endpoint_base_impl::send_cbk, + shared_from_this(), + _queue_iterator, + std::placeholders::_1, + std::placeholders::_2 + ) + ); + } } bool udp_server_endpoint_impl::is_joined(const std::string &_address) const { + std::lock_guard<std::mutex> its_lock(joined_mutex_); return (joined_.find(_address) != joined_.end()); } void udp_server_endpoint_impl::join(const std::string &_address) { + try { if (!is_joined(_address)) { - if (local_.address().is_v4()) { + bool is_v4(false); + bool is_v6(false); + { + std::lock_guard<std::mutex> its_lock(local_mutex_); + is_v4 = local_.address().is_v4(); + is_v6 = local_.address().is_v6(); + } + if (is_v4) { + std::lock_guard<std::mutex> its_lock(socket_mutex_); socket_.set_option(ip::udp_ext::socket::reuse_address(true)); socket_.set_option( boost::asio::ip::multicast::enable_loopback(false)); -#ifdef WIN32 +#ifdef _WIN32 socket_.set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v4(), local_.address().to_v4())); @@ -163,11 +179,12 @@ void udp_server_endpoint_impl::join(const std::string &_address) { socket_.set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v4())); #endif - } else if (local_.address().is_v6()) { + } else if (is_v6) { + std::lock_guard<std::mutex> its_lock(socket_mutex_); socket_.set_option(ip::udp_ext::socket::reuse_address(true)); socket_.set_option( boost::asio::ip::multicast::enable_loopback(false)); -#ifdef WIN32 +#ifdef _WIN32 socket_.set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v6(), local_.address().to_v6().scope_id())); @@ -176,48 +193,66 @@ void udp_server_endpoint_impl::join(const std::string &_address) { boost::asio::ip::address::from_string(_address).to_v6())); #endif } - joined_.insert(_address); + { + std::lock_guard<std::mutex> its_lock(joined_mutex_); + joined_.insert(_address); + } } else { VSOMEIP_INFO << "udp_server_endpoint_impl::join: " "Trying to join already joined address: " << _address; } } catch (const std::exception &e) { - VSOMEIP_ERROR << e.what(); + VSOMEIP_ERROR << __func__ << ":" << e.what(); } } void udp_server_endpoint_impl::leave(const std::string &_address) { try { if (is_joined(_address)) { - if (local_.address().is_v4()) { + bool is_v4(false); + bool is_v6(false); + { + std::lock_guard<std::mutex> its_lock(local_mutex_); + is_v4 = local_.address().is_v4(); + is_v6 = local_.address().is_v6(); + } + if (is_v4) { + std::lock_guard<std::mutex> its_lock(socket_mutex_); socket_.set_option(boost::asio::ip::multicast::leave_group( boost::asio::ip::address::from_string(_address))); - } else if (local_.address().is_v6()) { + } else if (is_v6) { + std::lock_guard<std::mutex> its_lock(socket_mutex_); socket_.set_option(boost::asio::ip::multicast::leave_group( boost::asio::ip::address::from_string(_address))); } - joined_.erase(_address); + { + std::lock_guard<std::mutex> its_lock(joined_mutex_); + joined_.erase(_address); + } } } catch (const std::exception &e) { - VSOMEIP_ERROR << e.what(); + VSOMEIP_ERROR << __func__ << ":" << e.what(); } } void udp_server_endpoint_impl::add_default_target( service_t _service, const std::string &_address, uint16_t _port) { + std::lock_guard<std::mutex> its_lock(default_targets_mutex_); endpoint_type its_endpoint( boost::asio::ip::address::from_string(_address), _port); default_targets_[_service] = its_endpoint; } void udp_server_endpoint_impl::remove_default_target(service_t _service) { + std::lock_guard<std::mutex> its_lock(default_targets_mutex_); default_targets_.erase(_service); } bool udp_server_endpoint_impl::get_default_target(service_t _service, udp_server_endpoint_impl::endpoint_type &_target) const { + std::lock_guard<std::mutex> its_lock(default_targets_mutex_); bool is_valid(false); auto find_service = default_targets_.find(_service); if (find_service != default_targets_.end()) { @@ -228,8 +263,7 @@ bool udp_server_endpoint_impl::get_default_target(service_t _service, } unsigned short udp_server_endpoint_impl::get_local_port() const { - boost::system::error_code its_error; - return socket_.local_endpoint(its_error).port(); + return local_port_; } // TODO: find a better way to structure the receive functions @@ -252,11 +286,20 @@ void udp_server_endpoint_impl::receive_cbk( const boost::asio::ip::address its_remote_address(remote_.address()); const std::uint16_t its_remote_port(remote_.port()); do { - uint32_t current_message_size + uint64_t read_message_size = utility::get_message_size(&this->recv_buffer_[i], - (uint32_t) remaining_bytes); + remaining_bytes); + if (read_message_size > MESSAGE_SIZE_UNLIMITED) { + VSOMEIP_ERROR << "Message size exceeds allowed maximum!"; + return; + } + uint32_t current_message_size = static_cast<uint32_t>(read_message_size); if (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE && current_message_size <= remaining_bytes) { + if (remaining_bytes - current_message_size > remaining_bytes) { + VSOMEIP_ERROR << "buffer underflow in udp client endpoint ~> abort!"; + return; + } remaining_bytes -= current_message_size; if (utility::is_request( recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS])) { |