summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/tcp_client_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp230
1 files changed, 168 insertions, 62 deletions
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index cb92619..41254f6 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -30,7 +30,9 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl(
std::uint32_t _max_message_size,
std::uint32_t _buffer_shrink_threshold,
std::chrono::milliseconds _send_timeout,
- configuration::endpoint_queue_limit_t _queue_limit)
+ configuration::endpoint_queue_limit_t _queue_limit,
+ std::uint32_t _tcp_restart_aborts_max,
+ std::uint32_t _tcp_connect_time_max)
: tcp_client_endpoint_base_impl(_host, _local, _remote, _io,
_max_message_size, _queue_limit),
recv_buffer_size_initial_(VSOMEIP_SOMEIP_HEADER_SIZE),
@@ -41,7 +43,10 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl(
remote_port_(_remote.port()),
last_cookie_sent_(std::chrono::steady_clock::now() - std::chrono::seconds(11)),
send_timeout_(_send_timeout),
- send_timeout_warning_(_send_timeout / 2) {
+ send_timeout_warning_(_send_timeout / 2),
+ tcp_restart_aborts_max_(_tcp_restart_aborts_max),
+ tcp_connect_time_max_(_tcp_connect_time_max),
+ aborted_restart_count_(0) {
is_supporting_magic_cookies_ = true;
}
@@ -62,7 +67,19 @@ void tcp_client_endpoint_impl::start() {
void tcp_client_endpoint_impl::restart(bool _force) {
if (!_force && state_ == cei_state_e::CONNECTING) {
- return;
+ std::chrono::steady_clock::time_point its_current
+ = std::chrono::steady_clock::now();
+ long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+ its_current - connect_timepoint_).count();
+ if (aborted_restart_count_ < tcp_restart_aborts_max_
+ && its_connect_duration < tcp_connect_time_max_) {
+ aborted_restart_count_++;
+ return;
+ } else {
+ VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
+ << tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
+ << its_connect_duration;
+ }
}
state_ = cei_state_e::CONNECTING;
std::string address_port_local;
@@ -72,6 +89,8 @@ void tcp_client_endpoint_impl::restart(bool _force) {
shutdown_and_close_socket_unlocked(true);
recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0);
}
+ was_not_connected_ = true;
+ reconnect_counter_ = 0;
{
std::lock_guard<std::mutex> its_lock(mutex_);
for (const auto&m : queue_) {
@@ -150,7 +169,7 @@ void tcp_client_endpoint_impl::connect() {
<< " remote:" << get_address_port_remote();
try {
// don't connect on bind error to avoid using a random port
- service_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk,
shared_from_this(), its_bind_error));
} catch (const std::exception &e) {
VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
@@ -160,17 +179,23 @@ void tcp_client_endpoint_impl::connect() {
}
}
state_ = cei_state_e::CONNECTING;
+ connect_timepoint_ = std::chrono::steady_clock::now();
+ aborted_restart_count_ = 0;
socket_->async_connect(
remote_,
- std::bind(
- &tcp_client_endpoint_base_impl::connect_cbk,
- shared_from_this(),
- std::placeholders::_1
+ strand_.wrap(
+ std::bind(
+ &tcp_client_endpoint_base_impl::connect_cbk,
+ shared_from_this(),
+ std::placeholders::_1
+ )
)
);
} else {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: Error opening socket: "
<< its_error.message() << " remote:" << get_address_port_remote();
+ strand_.post(std::bind(&tcp_client_endpoint_base_impl::connect_cbk,
+ shared_from_this(), its_error));
}
}
@@ -200,6 +225,12 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
if (its_capacity < its_required_capacity) {
_recv_buffer->reserve(its_required_capacity);
_recv_buffer->resize(its_required_capacity, 0x0);
+ if (_recv_buffer->size() > 1048576) {
+ VSOMEIP_INFO << "tce: recv_buffer size is: " <<
+ _recv_buffer->size()
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ }
}
buffer_size = _missing_capacity;
} else if (buffer_shrink_threshold_
@@ -217,13 +248,15 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
}
socket_->async_receive(
boost::asio::buffer(&(*_recv_buffer)[_recv_buffer_size], buffer_size),
- std::bind(
- &tcp_client_endpoint_impl::receive_cbk,
- std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2,
- _recv_buffer,
- _recv_buffer_size
+ strand_.wrap(
+ std::bind(
+ &tcp_client_endpoint_impl::receive_cbk,
+ std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ _recv_buffer,
+ _recv_buffer_size
+ )
)
);
}
@@ -431,7 +464,7 @@ void tcp_client_endpoint_impl::receive_cbk(
return;
}
uint32_t current_message_size = static_cast<uint32_t>(read_message_size);
- has_full_message = (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE
+ has_full_message = (current_message_size > VSOMEIP_RETURN_CODE_POS
&& current_message_size <= _recv_buffer_size);
if (has_full_message) {
bool needs_forwarding(true);
@@ -472,58 +505,125 @@ void tcp_client_endpoint_impl::receive_cbk(
_recv_buffer_size -= current_message_size;
its_iteration_gap += current_message_size;
its_missing_capacity = 0;
- } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
- current_message_size > max_message_size_) {
- _recv_buffer_size = 0;
- _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
- _recv_buffer->shrink_to_fit();
- if (has_enabled_magic_cookies_) {
- VSOMEIP_ERROR << "Received a TCP message which exceeds "
- << "maximum message size ("
- << std::dec << current_message_size
- << "). Magic Cookies are enabled: "
- << "Resetting receiver. local: "
- << get_address_port_local() << " remote: "
- << get_address_port_remote();
- } else {
- VSOMEIP_ERROR << "Received a TCP message which exceeds "
- << "maximum message size ("
- << std::dec << current_message_size
- << ") Magic cookies are disabled: "
- << "Client will be disabled! local: "
- << get_address_port_local() << " remote: "
- << get_address_port_remote();
- return;
- }
- } else if (current_message_size > _recv_buffer_size) {
- its_missing_capacity = current_message_size
- - static_cast<std::uint32_t>(_recv_buffer_size);
- } else if (VSOMEIP_SOMEIP_HEADER_SIZE > _recv_buffer_size) {
- its_missing_capacity = VSOMEIP_SOMEIP_HEADER_SIZE
- - static_cast<std::uint32_t>(_recv_buffer_size);
} else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
- uint32_t its_offset = find_magic_cookie(&(*_recv_buffer)[its_iteration_gap], _recv_buffer_size);
+ const uint32_t its_offset = find_magic_cookie(
+ &(*_recv_buffer)[its_iteration_gap], _recv_buffer_size);
if (its_offset < _recv_buffer_size) {
_recv_buffer_size -= its_offset;
its_iteration_gap += its_offset;
has_full_message = true; // trigger next loop
- } else {
+ VSOMEIP_ERROR << "Detected Magic Cookie within message data."
+ << " Resyncing. local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ }
+ }
+
+ if (!has_full_message) {
+ if (_recv_buffer_size > VSOMEIP_RETURN_CODE_POS &&
+ ((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
+ !utility::is_valid_message_type(static_cast<message_type_e>((*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])) ||
+ !utility::is_valid_return_code(static_cast<return_code_e>((*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))
+ )) {
+ if ((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
+ VSOMEIP_ERROR << "tce: Wrong protocol version: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ // ensure to send back a message w/ wrong protocol version
+ its_lock.unlock();
+ its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
+ VSOMEIP_SOMEIP_HEADER_SIZE + 8, this,
+ boost::asio::ip::address(),
+ VSOMEIP_ROUTING_CLIENT,
+ remote_address_,
+ remote_port_);
+ its_lock.lock();
+ } else if (!utility::is_valid_message_type(static_cast<message_type_e>(
+ (*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) {
+ VSOMEIP_ERROR << "tce: Invalid message type: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ } else if (!utility::is_valid_return_code(static_cast<return_code_e>(
+ (*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))) {
+ VSOMEIP_ERROR << "tce: Invalid return code: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ }
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket_unlocked(false);
+ its_lock.unlock();
+ its_host->on_disconnect(shared_from_this());
+ restart(true);
+ return;
+ } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
+ current_message_size > max_message_size_) {
+ _recv_buffer_size = 0;
+ _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
+ _recv_buffer->shrink_to_fit();
+ if (has_enabled_magic_cookies_) {
+ VSOMEIP_ERROR << "Received a TCP message which exceeds "
+ << "maximum message size ("
+ << std::dec << current_message_size
+ << "). Magic Cookies are enabled: "
+ << "Resetting receiver. local: "
+ << get_address_port_local() << " remote: "
+ << get_address_port_remote();
+ } else {
+ VSOMEIP_ERROR << "Received a TCP message which exceeds "
+ << "maximum message size ("
+ << std::dec << current_message_size
+ << ") Magic cookies are disabled, "
+ << "Restarting connection. "
+ << "local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket_unlocked(false);
+ its_lock.unlock();
+ its_host->on_disconnect(shared_from_this());
+ restart(true);
+ return;
+ }
+ } else if (current_message_size > _recv_buffer_size) {
+ its_missing_capacity = current_message_size
+ - static_cast<std::uint32_t>(_recv_buffer_size);
+ } else if (VSOMEIP_SOMEIP_HEADER_SIZE > _recv_buffer_size) {
+ its_missing_capacity = VSOMEIP_SOMEIP_HEADER_SIZE
+ - static_cast<std::uint32_t>(_recv_buffer_size);
+ } else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
+ // no need to check for magic cookie here again: has_full_message
+ // would have been set to true if there was one present in the data
_recv_buffer_size = 0;
+ _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
+ _recv_buffer->shrink_to_fit();
its_missing_capacity = 0;
+ VSOMEIP_ERROR << "tce::c<" << this
+ << ">rcb: recv_buffer_capacity: "
+ << _recv_buffer->capacity()
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote()
+ << ". Didn't find magic cookie in broken data, trying to resync.";
+ } else {
+ VSOMEIP_ERROR << "tce::c<" << this
+ << ">rcb: recv_buffer_size is: " << std::dec
+ << _recv_buffer_size << " but couldn't read "
+ "out message_size. recv_buffer_capacity: "
+ << _recv_buffer->capacity()
+ << " its_iteration_gap: " << its_iteration_gap
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote()
+ << ". Restarting connection due to missing/broken data TCP stream.";
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket_unlocked(false);
+ its_lock.unlock();
+ its_host->on_disconnect(shared_from_this());
+ restart(true);
+ return;
}
- } else {
- VSOMEIP_ERROR << "tce::c<" << this
- << ">rcb: recv_buffer_size is: " << std::dec
- << _recv_buffer_size << " but couldn't read "
- "out message_size. recv_buffer_capacity: "
- << _recv_buffer->capacity()
- << " its_iteration_gap: " << its_iteration_gap
- << " local: " << get_address_port_local()
- << " remote: " << get_address_port_remote()
- << ". Restarting connection due to missing/broken data TCP stream.";
- its_lock.unlock();
- restart(true);
- return;
}
} while (has_full_message && _recv_buffer_size);
if (its_iteration_gap) {
@@ -555,7 +655,6 @@ void tcp_client_endpoint_impl::receive_cbk(
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk restarting.";
state_ = cei_state_e::CONNECTING;
shutdown_and_close_socket_unlocked(false);
- was_not_connected_ = true;
its_lock.unlock();
its_host->on_disconnect(shared_from_this());
restart(true);
@@ -705,7 +804,6 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
} else {
state_ = cei_state_e::CONNECTING;
shutdown_and_close_socket(false);
- was_not_connected_ = true;
std::shared_ptr<endpoint_host> its_host = host_.lock();
if (its_host) {
its_host->on_disconnect(shared_from_this());
@@ -742,4 +840,12 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error,
}
}
+std::uint32_t tcp_client_endpoint_impl::get_max_allowed_reconnects() const {
+ return MAX_RECONNECTS_UNLIMITED;
+}
+
+void tcp_client_endpoint_impl::max_allowed_reconnects_reached() {
+ return;
+}
+
} // namespace vsomeip