summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/tcp_server_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/tcp_server_endpoint_impl.cpp124
1 files changed, 98 insertions, 26 deletions
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
index 0084d8e..fc31850 100644
--- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
@@ -1,4 +1,4 @@
-// Copyright (C) 2014-2015 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
@@ -46,28 +46,61 @@ void tcp_server_endpoint_impl::start() {
}
void tcp_server_endpoint_impl::stop() {
+ server_endpoint_impl::stop();
for (auto& i : connections_)
i.second->stop();
- acceptor_.close();
+ if(acceptor_.is_open()) {
+ boost::system::error_code its_error;
+ acceptor_.close(its_error);
+ }
}
bool tcp_server_endpoint_impl::send_to(
const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data,
uint32_t _size, bool _flush) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
endpoint_type its_target(_target->get_address(), _target->get_port());
return send_intern(its_target, _data, _size, _flush);
}
void tcp_server_endpoint_impl::send_queued(queue_iterator_type _queue_iterator) {
auto connection_iterator = connections_.find(_queue_iterator->first);
- if (connection_iterator != connections_.end())
+ if (connection_iterator != connections_.end()) {
connection_iterator->second->send_queued(_queue_iterator);
+ } else {
+ VSOMEIP_DEBUG << "Didn't find connection: "
+ << _queue_iterator->first.address().to_string() << ":" << std::dec
+ << static_cast<std::uint16_t>(_queue_iterator->first.port())
+ << " dropping message.";
+ _queue_iterator->second.pop_front();
+ }
+}
+
+bool tcp_server_endpoint_impl::is_established(std::shared_ptr<endpoint_definition> _endpoint) {
+ bool is_connected = false;
+ endpoint_type endpoint(_endpoint->get_address(), _endpoint->get_port());
+ auto connection_iterator = connections_.find(endpoint);
+ if (connection_iterator != connections_.end()) {
+#if 0
+ VSOMEIP_DEBUG << "tcp_server_endpoint_impl::is_established(): subscribers TCP connection for "
+ << endpoint.address().to_string() << ":" << std::dec
+ << static_cast<std::uint16_t>(endpoint.port())
+ << " is established!" ;
+#endif
+ is_connected = true;
+ } else {
+ VSOMEIP_DEBUG << "Didn't find TCP connection: Subscription rejected for: "
+ << endpoint.address().to_string() << ":" << std::dec
+ << static_cast<std::uint16_t>(endpoint.port());
+ }
+ return is_connected;
}
tcp_server_endpoint_impl::endpoint_type
tcp_server_endpoint_impl::get_remote() const {
- return current_->get_socket().remote_endpoint();
+ boost::system::error_code its_error;
+ return current_->get_socket().remote_endpoint(its_error);
}
bool tcp_server_endpoint_impl::get_remote_address(
@@ -77,9 +110,7 @@ bool tcp_server_endpoint_impl::get_remote_address(
boost::system::error_code its_error;
tcp_server_endpoint_impl::endpoint_type its_endpoint =
current_->get_socket().remote_endpoint(its_error);
- if (its_error) {
- return false;
- } else {
+ if (!its_error) {
boost::asio::ip::address its_address = its_endpoint.address();
if (!its_address.is_unspecified()) {
_address = its_address;
@@ -90,7 +121,19 @@ bool tcp_server_endpoint_impl::get_remote_address(
return false;
}
-bool tcp_server_endpoint_impl::get_multicast(service_t, event_t,
+unsigned short tcp_server_endpoint_impl::get_remote_port() const {
+ if (current_) {
+ boost::system::error_code its_error;
+ tcp_server_endpoint_impl::endpoint_type its_endpoint =
+ current_->get_socket().remote_endpoint(its_error);
+ if (!its_error) {
+ return its_endpoint.port();
+ }
+ }
+ return 0;
+}
+
+bool tcp_server_endpoint_impl::get_default_target(service_t,
tcp_server_endpoint_impl::endpoint_type &) const {
return false;
}
@@ -100,17 +143,23 @@ void tcp_server_endpoint_impl::accept_cbk(connection::ptr _connection,
if (!_error) {
socket_type &new_connection_socket = _connection->get_socket();
- endpoint_type remote = new_connection_socket.remote_endpoint();
-
- connections_[remote] = _connection;
- _connection->start();
-
+ boost::system::error_code its_error;
+ endpoint_type remote = new_connection_socket.remote_endpoint(its_error);
+ if(!its_error) {
+ connections_[remote] = _connection;
+ _connection->start();
+ }
+ }
+ if (_error != boost::asio::error::operation_aborted) {
start();
+ } else {
+ VSOMEIP_DEBUG << "Endpoint was stopped, don't starting again";
}
}
unsigned short tcp_server_endpoint_impl::get_local_port() const {
- return acceptor_.local_endpoint().port();
+ boost::system::error_code its_error;
+ return acceptor_.local_endpoint(its_error).port();
}
bool tcp_server_endpoint_impl::is_reliable() const {
@@ -163,8 +212,10 @@ void tcp_server_endpoint_impl::connection::receive() {
void tcp_server_endpoint_impl::connection::stop() {
std::lock_guard<std::mutex> its_lock(stop_mutex_);
if(socket_.is_open()) {
- socket_.shutdown(socket_.shutdown_both);
- socket_.close();
+ boost::system::error_code its_shutdown_error;
+ socket_.shutdown(socket_.shutdown_both, its_shutdown_error);
+ boost::system::error_code its_close_error;
+ socket_.close(its_close_error);
}
}
@@ -172,8 +223,9 @@ void tcp_server_endpoint_impl::connection::send_queued(
queue_iterator_type _queue_iterator) {
message_buffer_ptr_t its_buffer = _queue_iterator->second.front();
- if (server_->has_enabled_magic_cookies_)
+ if (server_->has_enabled_magic_cookies_) {
send_magic_cookie(its_buffer);
+ }
boost::asio::async_write(socket_, boost::asio::buffer(*its_buffer),
std::bind(&tcp_server_endpoint_base_impl::send_cbk,
@@ -240,20 +292,27 @@ void tcp_server_endpoint_impl::connection::receive_cbk(
}
}
if (needs_forwarding) {
- if (utility::is_request(recv_buffer_[VSOMEIP_MESSAGE_TYPE_POS])) {
+ if (utility::is_request(
+ recv_buffer_[its_iteration_gap
+ + VSOMEIP_MESSAGE_TYPE_POS])) {
client_t its_client;
std::memcpy(&its_client,
- &recv_buffer_[VSOMEIP_CLIENT_POS_MIN],
+ &recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN],
sizeof(client_t));
session_t its_session;
std::memcpy(&its_session,
- &recv_buffer_[VSOMEIP_SESSION_POS_MIN],
+ &recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN],
sizeof(session_t));
{
std::lock_guard<std::mutex> its_lock(stop_mutex_);
if (socket_.is_open()) {
- server_->clients_[its_client][its_session] =
- socket_.remote_endpoint();
+ server_->clients_mutex_.lock();
+ boost::system::error_code its_error;
+ endpoint_type its_endpoint(socket_.remote_endpoint(its_error));
+ if (!its_error) {
+ server_->clients_[its_client][its_session] = its_endpoint;
+ }
+ server_->clients_mutex_.unlock();
server_->current_ = this;
}
}
@@ -290,10 +349,22 @@ void tcp_server_endpoint_impl::connection::receive_cbk(
}
}
} else if (current_message_size > max_message_size_) {
- VSOMEIP_ERROR << "Message exceeds maximum message size ("
- << std::dec << current_message_size
- << "). Resetting receiver.";
- recv_buffer_size_ = 0;
+ if (server_->has_enabled_magic_cookies_) {
+ VSOMEIP_ERROR << "Received a TCP message which exceeds "
+ << "maximum message size ("
+ << std::dec << current_message_size
+ << "). Magic Cookies are enabled: "
+ << "Resetting receiver.";
+ recv_buffer_size_ = 0;
+ } else {
+ VSOMEIP_ERROR << "Received a TCP message which exceeds "
+ << "maximum message size ("
+ << std::dec << current_message_size
+ << ") Magic cookies are disabled: "
+ << "Connection will be disabled!";
+ recv_buffer_size_ = 0;
+ return;
+ }
}
} while (has_full_message && recv_buffer_size_);
if (its_iteration_gap) {
@@ -317,6 +388,7 @@ client_t tcp_server_endpoint_impl::get_client(std::shared_ptr<endpoint_definitio
}
client_t tcp_server_endpoint_impl::connection::get_client(endpoint_type _endpoint_type) {
+ std::lock_guard<std::mutex> its_lock(server_->clients_mutex_);
for (auto its_client : server_->clients_) {
for (auto its_session : server_->clients_[its_client.first]) {
auto endpoint = its_session.second;