summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/udp_server_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/udp_server_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/udp_server_endpoint_impl.cpp186
1 files changed, 104 insertions, 82 deletions
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
index 0a65b2e..9d22d51 100644
--- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
@@ -37,6 +37,7 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
_configuration),
unicast_socket_(_io, _local.protocol()),
unicast_recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0),
+ multicast_id_(0),
joined_group_(false),
local_port_(_local.port()),
tp_reassembler_(std::make_shared<tp::tp_reassembler>(_configuration->get_max_message_size_unreliable(), _io)),
@@ -77,6 +78,7 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
boost::asio::socket_base::broadcast option(true);
unicast_socket_.set_option(option, ec);
boost::asio::detail::throw_error(ec, "broadcast option");
+
const std::uint32_t its_udp_recv_buffer_size =
configuration_->get_udp_receive_buffer_size();
unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size(
@@ -124,9 +126,8 @@ void udp_server_endpoint_impl::start() {
void udp_server_endpoint_impl::stop() {
server_endpoint_impl::stop();
-
{
- std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_);
+ std::lock_guard<std::mutex> its_lock(unicast_mutex_);
if (unicast_socket_.is_open()) {
boost::system::error_code its_error;
@@ -136,7 +137,7 @@ void udp_server_endpoint_impl::stop() {
}
{
- std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
+ std::lock_guard<std::mutex> its_lock(multicast_mutex_);
if (multicast_socket_ && multicast_socket_->is_open()) {
boost::system::error_code its_error;
@@ -150,11 +151,11 @@ void udp_server_endpoint_impl::stop() {
void udp_server_endpoint_impl::receive() {
receive_unicast();
- receive_multicast();
}
void udp_server_endpoint_impl::receive_unicast() {
- std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_);
+
+ std::lock_guard<std::mutex> its_lock(unicast_mutex_);
if(unicast_socket_.is_open()) {
unicast_socket_.async_receive_from(
@@ -172,12 +173,14 @@ void udp_server_endpoint_impl::receive_unicast() {
}
}
-void udp_server_endpoint_impl::receive_multicast() {
- std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
+//
+// receive_multicast is called with multicast_mutex_ being hold
+//
+void udp_server_endpoint_impl::receive_multicast(uint8_t _multicast_id) {
- if (multicast_socket_ && multicast_socket_->is_open()) {
+ if (_multicast_id == multicast_id_ && multicast_socket_ && multicast_socket_->is_open()) {
multicast_socket_->async_receive_from(
- boost::asio::buffer(&(*multicast_recv_buffer_)[0], max_message_size_),
+ boost::asio::buffer(&multicast_recv_buffer_[0], max_message_size_),
multicast_remote_,
std::bind(
&udp_server_endpoint_impl::on_multicast_received,
@@ -185,7 +188,8 @@ void udp_server_endpoint_impl::receive_multicast() {
udp_server_endpoint_impl >(shared_from_this()),
std::placeholders::_1,
std::placeholders::_2,
- std::placeholders::_3
+ std::placeholders::_3,
+ _multicast_id
)
);
}
@@ -194,6 +198,7 @@ void udp_server_endpoint_impl::receive_multicast() {
bool udp_server_endpoint_impl::send_to(
const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size) {
+
std::lock_guard<std::mutex> its_lock(mutex_);
endpoint_type its_target(_target->get_address(), _target->get_port());
return send_intern(its_target, _data, _size);
@@ -202,6 +207,7 @@ bool udp_server_endpoint_impl::send_to(
bool udp_server_endpoint_impl::send_error(
const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size) {
+
bool ret(false);
std::lock_guard<std::mutex> its_lock(mutex_);
const endpoint_type its_target(_target->get_address(), _target->get_port());
@@ -236,64 +242,72 @@ void udp_server_endpoint_impl::send_queued(
<< (int)(*its_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
- std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_);
+ std::lock_guard<std::mutex> its_lock(unicast_mutex_);
unicast_socket_.async_send_to(
- boost::asio::buffer(*its_buffer),
- _queue_iterator->first,
- std::bind(
- &udp_server_endpoint_base_impl::send_cbk,
- shared_from_this(),
- _queue_iterator,
- std::placeholders::_1,
- std::placeholders::_2
- )
- );
+ boost::asio::buffer(*its_buffer),
+ _queue_iterator->first,
+ std::bind(
+ &udp_server_endpoint_base_impl::send_cbk,
+ shared_from_this(),
+ _queue_iterator,
+ std::placeholders::_1,
+ std::placeholders::_2
+ )
+ );
}
void udp_server_endpoint_impl::get_configured_times_from_endpoint(
service_t _service, method_t _method,
std::chrono::nanoseconds *_debouncing,
std::chrono::nanoseconds *_maximum_retention) const {
+
configuration_->get_configured_timing_responses(_service,
udp_server_endpoint_base_impl::local_.address().to_string(),
udp_server_endpoint_base_impl::local_.port(), _method,
_debouncing, _maximum_retention);
}
+//
+// Both is_joined - methods must be called with multicast_mutex_ being hold!
+//
bool udp_server_endpoint_impl::is_joined(const std::string &_address) const {
- std::lock_guard<std::mutex> its_lock(joined_mutex_);
+
return (joined_.find(_address) != joined_.end());
}
bool udp_server_endpoint_impl::is_joined(
const std::string &_address, bool* _received) const {
- *_received = false;
- std::lock_guard<std::mutex> its_lock(joined_mutex_);
+
const auto found_address = joined_.find(_address);
if (found_address != joined_.end()) {
*_received = found_address->second;
+ } else {
+ *_received = false;
}
+
return (found_address != joined_.end());
}
void udp_server_endpoint_impl::join(const std::string &_address) {
+
bool has_received(false);
+ //
+ // join_func must be called with multicast_mutex_ being hold!
+ //
auto join_func = [this](const std::string &_address) {
try {
- VSOMEIP_TRACE << "Joining to multicast group " << _address
+ VSOMEIP_DEBUG << "Joining to multicast group " << _address
<< " from " << local_.address().to_string();
boost::system::error_code ec;
- if (!multicast_recv_buffer_) {
- multicast_recv_buffer_ = std::unique_ptr<message_buffer_t>(
- new message_buffer_t(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0));
- }
+ if (multicast_recv_buffer_.empty())
+ multicast_recv_buffer_.resize(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0);
- if (!multicast_ep_) {
- multicast_ep_ = std::unique_ptr<endpoint_type>(
+ if (!multicast_local_) {
+ multicast_local_ = std::unique_ptr<endpoint_type>(
new endpoint_type(boost::asio::ip::address_v4::any(), local_port_));
}
@@ -305,7 +319,7 @@ void udp_server_endpoint_impl::join(const std::string &_address) {
multicast_socket_->set_option(optionReuseAddress, ec);
boost::asio::detail::throw_error(ec, "reuse address in multicast");
- multicast_socket_->bind(*multicast_ep_, ec);
+ multicast_socket_->bind(*multicast_local_, ec);
boost::asio::detail::throw_error(ec, "bind multicast");
const std::uint32_t its_udp_recv_buffer_size =
@@ -342,8 +356,8 @@ void udp_server_endpoint_impl::join(const std::string &_address) {
::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO,
&optval, sizeof(optval));
#endif
-
- receive_multicast();
+ multicast_id_++;
+ receive_multicast(multicast_id_);
}
bool is_v4(false);
@@ -355,55 +369,50 @@ void udp_server_endpoint_impl::join(const std::string &_address) {
}
if (is_v4) {
- std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true));
multicast_socket_->set_option(
boost::asio::ip::multicast::enable_loopback(false));
-#ifdef _WIN32
multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
boost::asio::ip::address::from_string(_address).to_v4(),
local_.address().to_v4()));
-#else
- multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
- boost::asio::ip::address::from_string(_address).to_v4()));
-#endif
} else if (is_v6) {
- std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true));
multicast_socket_->set_option(
boost::asio::ip::multicast::enable_loopback(false));
-#ifdef _WIN32
multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
boost::asio::ip::address::from_string(_address).to_v6(),
local_.address().to_v6().scope_id()));
-#else
- multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
- boost::asio::ip::address::from_string(_address).to_v6()));
-#endif
- }
- {
- std::lock_guard<std::mutex> its_lock(joined_mutex_);
- joined_[_address] = false;
}
+
+ joined_[_address] = false;
joined_group_ = true;
+
} catch (const std::exception &e) {
VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what();
}
};
+ std::lock_guard<std::mutex> its_lock(multicast_mutex_);
if (!is_joined(_address, &has_received)) {
join_func(_address);
} else if (!has_received) {
// joined the multicast group but didn't receive a event yet -> rejoin
- leave(_address);
+ leave_unlocked(_address);
join_func(_address);
}
}
void udp_server_endpoint_impl::leave(const std::string &_address) {
+
+ std::lock_guard<std::mutex> its_lock(multicast_mutex_);
+ leave_unlocked(_address);
+}
+
+void udp_server_endpoint_impl::leave_unlocked(const std::string &_address) {
+
try {
if (is_joined(_address)) {
- VSOMEIP_TRACE << "Leaving the multicast group " << _address
+ VSOMEIP_DEBUG << "Leaving the multicast group " << _address
<< " from " << local_.address().to_string();
bool is_v4(false);
@@ -414,24 +423,22 @@ void udp_server_endpoint_impl::leave(const std::string &_address) {
is_v6 = local_.address().is_v6();
}
if (is_v4) {
- std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
multicast_socket_->set_option(boost::asio::ip::multicast::leave_group(
boost::asio::ip::address::from_string(_address)));
} else if (is_v6) {
- std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
multicast_socket_->set_option(boost::asio::ip::multicast::leave_group(
boost::asio::ip::address::from_string(_address)));
}
- {
- std::lock_guard<std::mutex> its_lock(joined_mutex_);
- joined_.erase(_address);
- if (!joined_.size()) {
- joined_group_ = false;
-
- multicast_socket_.reset(nullptr);
- multicast_ep_.reset(nullptr);
- multicast_recv_buffer_.reset(nullptr);
- }
+
+ joined_.erase(_address);
+ if (0 == joined_.size()) {
+ joined_group_ = false;
+
+ boost::system::error_code ec;
+ multicast_socket_->cancel(ec);
+
+ multicast_socket_.reset(nullptr);
+ multicast_local_.reset(nullptr);
}
}
}
@@ -473,21 +480,36 @@ void udp_server_endpoint_impl::on_unicast_received(
boost::system::error_code const &_error,
std::size_t _bytes,
boost::asio::ip::address const &_destination) {
- on_message_received(_error, _bytes, _destination, unicast_remote_, unicast_recv_buffer_);
- receive_unicast();
+
+ if (_error != boost::asio::error::operation_aborted) {
+ {
+ // By locking the multicast mutex here it is ensured that unicast
+ // & multicast messages are not processed in parallel. This aligns
+ // the behavior of endpoints with one and two active sockets.
+ std::lock_guard<std::mutex> its_lock(multicast_mutex_);
+ on_message_received(_error, _bytes, _destination,
+ unicast_remote_, unicast_recv_buffer_);
+ }
+ receive_unicast();
+ }
}
void udp_server_endpoint_impl::on_multicast_received(
boost::system::error_code const &_error,
std::size_t _bytes,
- boost::asio::ip::address const &_destination) {
+ boost::asio::ip::address const &_destination,
+ uint8_t _multicast_id) {
+
+ std::lock_guard<std::mutex> its_lock(multicast_mutex_);
+ if (_error != boost::asio::error::operation_aborted) {
+ // Filter messages sent from the same source address
+ if (multicast_remote_.address() != local_.address()) {
+ on_message_received(_error, _bytes, _destination,
+ multicast_remote_, multicast_recv_buffer_);
+ }
- // Filter messages sent from the same source address
- if (multicast_remote_.address() != local_.address()) {
- on_message_received(_error, _bytes, _destination, multicast_remote_, *multicast_recv_buffer_);
+ receive_multicast(_multicast_id);
}
-
- receive_multicast();
}
void udp_server_endpoint_impl::on_message_received(
@@ -579,7 +601,6 @@ void udp_server_endpoint_impl::on_message_received(
} else if (its_service != VSOMEIP_SD_SERVICE
&& utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
&& joined_group_) {
- std::lock_guard<std::mutex> its_lock(joined_mutex_);
boost::system::error_code ec;
const auto found_address = joined_.find(_destination.to_string(ec));
if (found_address != joined_.end()) {
@@ -599,14 +620,12 @@ void udp_server_endpoint_impl::on_message_received(
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
res.second[VSOMEIP_SESSION_POS_MIN],
res.second[VSOMEIP_SESSION_POS_MAX]);
- clients_mutex_.lock();
+ std::lock_guard<std::mutex> its_client_lock(clients_mutex_);
clients_[its_client][its_session] = _remote;
- clients_mutex_.unlock();
}
} else if (its_service != VSOMEIP_SD_SERVICE
&& utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS])
&& joined_group_) {
- std::lock_guard<std::mutex> its_lock(joined_mutex_);
boost::system::error_code ec;
const auto found_address = joined_.find(_destination.to_string(ec));
if (found_address != joined_.end()) {
@@ -668,9 +687,10 @@ void udp_server_endpoint_impl::print_status() {
VSOMEIP_INFO << "status use: " << std::dec << local_port_
<< " number queues: " << std::dec << queues_.size()
- << " recv_buffer: " << std::dec << unicast_recv_buffer_.capacity()
- << " multicast_recv_buffer: " << std::dec
- << (multicast_recv_buffer_ ? multicast_recv_buffer_->capacity() : 0);
+ << " recv_buffer: "
+ << std::dec << unicast_recv_buffer_.capacity()
+ << " multicast_recv_buffer: "
+ << std::dec << multicast_recv_buffer_.capacity();
for (const auto &c : queues_) {
std::size_t its_data_size(0);
@@ -706,7 +726,8 @@ bool udp_server_endpoint_impl::is_reliable() const {
}
const std::string udp_server_endpoint_impl::get_address_port_local() const {
- std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_);
+
+ std::lock_guard<std::mutex> its_lock(unicast_mutex_);
std::string its_address_port;
its_address_port.reserve(21);
boost::system::error_code ec;
@@ -721,8 +742,9 @@ const std::string udp_server_endpoint_impl::get_address_port_local() const {
return its_address_port;
}
-bool udp_server_endpoint_impl::tp_segmentation_enabled(service_t _service,
- method_t _method) const {
+bool udp_server_endpoint_impl::tp_segmentation_enabled(
+ service_t _service, method_t _method) const {
+
return configuration_->tp_segment_messages_service_to_client(_service,
local_.address().to_string(),
local_.port(), _method);