summaryrefslogtreecommitdiff
path: root/implementation/endpoints
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:05 -0800
committerJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:05 -0800
commit79fd5f7a34ed33392f71fa914a60b2e68b28de68 (patch)
tree5ea93513d0173ffe6dea57545cc5b28db591f082 /implementation/endpoints
parent5c43d511bd5b5e15eca521c4c71dfa69c6f1c90f (diff)
downloadvSomeIP-79fd5f7a34ed33392f71fa914a60b2e68b28de68.tar.gz
vsomeip 2.10.02.10.0
Diffstat (limited to 'implementation/endpoints')
-rw-r--r--implementation/endpoints/include/client_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/include/local_client_endpoint_impl.hpp1
-rw-r--r--implementation/endpoints/include/local_server_endpoint_impl.hpp5
-rw-r--r--implementation/endpoints/include/server_endpoint_impl.hpp4
-rw-r--r--implementation/endpoints/include/tcp_server_endpoint_impl.hpp4
-rw-r--r--implementation/endpoints/include/udp_server_endpoint_impl.hpp2
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp31
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp41
-rw-r--r--implementation/endpoints/src/local_server_endpoint_impl.cpp59
-rw-r--r--implementation/endpoints/src/server_endpoint_impl.cpp10
-rw-r--r--implementation/endpoints/src/tcp_server_endpoint_impl.cpp38
-rw-r--r--implementation/endpoints/src/udp_server_endpoint_impl.cpp2
12 files changed, 136 insertions, 63 deletions
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp
index 57a3084..b656f8b 100644
--- a/implementation/endpoints/include/client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/client_endpoint_impl.hpp
@@ -45,7 +45,7 @@ public:
const byte_t *_data, uint32_t _size, bool _flush = true);
bool flush();
- void stop();
+ virtual void stop();
virtual void restart() = 0;
bool is_client() const;
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp
index eb1b310..7ff96ff 100644
--- a/implementation/endpoints/include/local_client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp
@@ -39,6 +39,7 @@ public:
virtual ~local_client_endpoint_impl();
void start();
+ void stop();
bool is_local() const;
diff --git a/implementation/endpoints/include/local_server_endpoint_impl.hpp b/implementation/endpoints/include/local_server_endpoint_impl.hpp
index f9fa0ab..33658fb 100644
--- a/implementation/endpoints/include/local_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/local_server_endpoint_impl.hpp
@@ -60,7 +60,7 @@ public:
bool send_to(const std::shared_ptr<endpoint_definition>,
const byte_t *_data, uint32_t _size, bool _flush);
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
bool get_default_target(service_t, endpoint_type &) const;
@@ -84,7 +84,7 @@ private:
void start();
void stop();
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
void set_bound_client(client_t _client);
@@ -95,7 +95,6 @@ private:
std::uint32_t _buffer_shrink_threshold,
boost::asio::io_service &_io_service);
- void send_magic_cookie();
void receive_cbk(boost::system::error_code const &_error,
std::size_t _bytes);
void calculate_shrink_count();
diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp
index 710df35..ce89c6b 100644
--- a/implementation/endpoints/include/server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/server_endpoint_impl.hpp
@@ -46,7 +46,7 @@ public:
public:
void connect_cbk(boost::system::error_code const &_error);
- void send_cbk(queue_iterator_type _queue_iterator,
+ void send_cbk(const queue_iterator_type _queue_iterator,
boost::system::error_code const &_error, std::size_t _bytes);
void flush_cbk(endpoint_type _target,
const boost::system::error_code &_error);
@@ -54,7 +54,7 @@ public:
public:
virtual bool send_intern(endpoint_type _target, const byte_t *_data,
uint32_t _port, bool _flush);
- virtual void send_queued(queue_iterator_type _queue_iterator) = 0;
+ virtual void send_queued(const queue_iterator_type _queue_iterator) = 0;
virtual bool get_default_target(service_t _service,
endpoint_type &_target) const = 0;
diff --git a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
index fd725d2..09c2ae5 100644
--- a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp
@@ -38,7 +38,7 @@ public:
bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush);
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
VSOMEIP_EXPORT bool is_established(std::shared_ptr<endpoint_definition> _endpoint);
@@ -72,7 +72,7 @@ private:
void stop();
void receive();
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
void set_remote_info(const endpoint_type &_remote);
diff --git a/implementation/endpoints/include/udp_server_endpoint_impl.hpp b/implementation/endpoints/include/udp_server_endpoint_impl.hpp
index 787a824..aae7a6b 100644
--- a/implementation/endpoints/include/udp_server_endpoint_impl.hpp
+++ b/implementation/endpoints/include/udp_server_endpoint_impl.hpp
@@ -35,7 +35,7 @@ public:
bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush);
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
void join(const std::string &_address);
void leave(const std::string &_address);
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 9744a22..3783add 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -60,6 +60,8 @@ void client_endpoint_impl<Protocol>::stop() {
{
std::lock_guard<std::mutex> its_lock(mutex_);
endpoint_impl<Protocol>::sending_blocked_ = true;
+ // delete unsent messages
+ queue_.clear();
}
{
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
@@ -67,28 +69,6 @@ void client_endpoint_impl<Protocol>::stop() {
connect_timer_.cancel(ec);
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
-
- bool is_open(false);
- {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- is_open = socket_->is_open();
- }
- if (is_open) {
- bool send_queue_empty(false);
- std::uint32_t times_slept(0);
-
- while (times_slept <= 50) {
- mutex_.lock();
- send_queue_empty = (queue_.size() == 0);
- mutex_.unlock();
- if (send_queue_empty) {
- break;
- } else {
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- times_slept++;
- }
- }
- }
shutdown_and_close_socket();
}
@@ -253,6 +233,10 @@ void client_endpoint_impl<Protocol>::send_cbk(
std::lock_guard<std::mutex> its_lock(mutex_);
if (endpoint_impl<Protocol>::sending_blocked_) {
queue_.clear();
+ } else {
+ VSOMEIP_WARNING << "cei::send_cbk received error: "
+ << _error.message() << " (" << std::dec
+ << _error.value() << ") " << std::dec << queue_.size();
}
}
shutdown_and_close_socket();
@@ -264,6 +248,9 @@ void client_endpoint_impl<Protocol>::send_cbk(
} else if (_error == boost::asio::error::operation_aborted) {
// endpoint was stopped
shutdown_and_close_socket();
+ } else {
+ VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ")" ;
}
}
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index f6e1b5c..52970ca 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -62,6 +62,42 @@ void local_client_endpoint_impl::start() {
connect();
}
+void local_client_endpoint_impl::stop() {
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ sending_blocked_ = true;
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
+ boost::system::error_code ec;
+ connect_timer_.cancel(ec);
+ }
+ connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
+
+ bool is_open(false);
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ is_open = socket_->is_open();
+ }
+ if (is_open) {
+ bool send_queue_empty(false);
+ std::uint32_t times_slept(0);
+
+ while (times_slept <= 50) {
+ mutex_.lock();
+ send_queue_empty = (queue_.size() == 0);
+ mutex_.unlock();
+ if (send_queue_empty) {
+ break;
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ times_slept++;
+ }
+ }
+ }
+ shutdown_and_close_socket();
+}
+
void local_client_endpoint_impl::connect() {
boost::system::error_code its_connect_error;
{
@@ -87,6 +123,11 @@ void local_client_endpoint_impl::connect() {
its_host->get_client());
}
}
+ } else {
+ VSOMEIP_WARNING << "local_client_endpoint::connect: Couldn't "
+ << "connect to: " << remote_.path() << " ("
+ << its_connect_error.message() << " / " << std::dec
+ << its_connect_error.value() << ")";
}
#endif
diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp
index 5a44f2f..460fe40 100644
--- a/implementation/endpoints/src/local_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp
@@ -136,11 +136,29 @@ bool local_server_endpoint_impl::send_to(
}
void local_server_endpoint_impl::send_queued(
- queue_iterator_type _queue_iterator) {
- std::lock_guard<std::mutex> its_lock(connections_mutex_);
- auto connection_iterator = connections_.find(_queue_iterator->first);
- if (connection_iterator != connections_.end())
- connection_iterator->second->send_queued(_queue_iterator);
+ const queue_iterator_type _queue_iterator) {
+ connection::ptr its_connection;
+ {
+ std::lock_guard<std::mutex> its_lock(connections_mutex_);
+ auto connection_iterator = connections_.find(_queue_iterator->first);
+ if (connection_iterator != connections_.end()) {
+ connection_iterator->second->send_queued(_queue_iterator);
+ } else {
+ VSOMEIP_INFO << "Didn't find connection: "
+#ifdef _WIN32
+ << _queue_iterator->first.address().to_string() << ":" << std::dec
+ << static_cast<std::uint16_t>(_queue_iterator->first.port())
+#else
+ << _queue_iterator->first.path()
+#endif
+ << " dropping outstanding messages (" << std::dec
+ << _queue_iterator->second.size() << ").";
+ _queue_iterator->second.clear();
+ }
+ }
+ if (its_connection) {
+ its_connection->send_queued(_queue_iterator);
+ }
}
void local_server_endpoint_impl::receive() {
@@ -155,13 +173,25 @@ bool local_server_endpoint_impl::get_default_target(
void local_server_endpoint_impl::remove_connection(
local_server_endpoint_impl::connection *_connection) {
- std::lock_guard<std::mutex> its_lock(connections_mutex_);
- for (auto it = connections_.begin(); it != connections_.end();) {
- if (it->second.get() == _connection) {
- it = connections_.erase(it);
- break;
- } else {
- ++it;
+ endpoint_type its_target;
+ {
+ std::lock_guard<std::mutex> its_lock(connections_mutex_);
+ for (auto it = connections_.begin(); it != connections_.end();) {
+ if (it->second.get() == _connection) {
+ its_target = it->first;
+ it = connections_.erase(it);
+ break;
+ } else {
+ ++it;
+ }
+ }
+ }
+ {
+ // delete outstanding responses for this connection as well
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ const auto found_target = queues_.find(its_target);
+ if (found_target != queues_.end()) {
+ found_target->second.clear();
}
}
}
@@ -318,7 +348,7 @@ void local_server_endpoint_impl::connection::stop() {
}
void local_server_endpoint_impl::connection::send_queued(
- queue_iterator_type _queue_iterator) {
+ const queue_iterator_type _queue_iterator) {
// TODO: We currently do _not_ use the send method of the local server
// endpoints. If we ever need it, we need to add the "start tag", "data",
@@ -355,9 +385,6 @@ void local_server_endpoint_impl::connection::send_queued(
}
}
-void local_server_endpoint_impl::connection::send_magic_cookie() {
-}
-
void local_server_endpoint_impl::connection::receive_cbk(
boost::system::error_code const &_error, std::size_t _bytes) {
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp
index 3d00ca4..39c6be2 100644
--- a/implementation/endpoints/src/server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/server_endpoint_impl.cpp
@@ -199,16 +199,20 @@ void server_endpoint_impl<Protocol>::connect_cbk(
template<typename Protocol>
void server_endpoint_impl<Protocol>::send_cbk(
- queue_iterator_type _queue_iterator, boost::system::error_code const &_error,
- std::size_t _bytes) {
+ const queue_iterator_type _queue_iterator,
+ boost::system::error_code const &_error, std::size_t _bytes) {
(void)_bytes;
+ std::lock_guard<std::mutex> its_lock(mutex_);
if (!_error) {
- std::lock_guard<std::mutex> its_lock(mutex_);
_queue_iterator->second.pop_front();
if (_queue_iterator->second.size() > 0) {
send_queued(_queue_iterator);
}
+ } else {
+ // error: sending of outstanding responses isn't started again
+ // delete remaining outstanding responses
+ _queue_iterator->second.clear();
}
}
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
index f91f7f6..e39589a 100644
--- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp
@@ -95,7 +95,7 @@ bool tcp_server_endpoint_impl::send_to(
return send_intern(its_target, _data, _size, _flush);
}
-void tcp_server_endpoint_impl::send_queued(queue_iterator_type _queue_iterator) {
+void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iterator) {
connection::ptr its_connection;
{
std::lock_guard<std::mutex> its_lock(connections_mutex_);
@@ -106,8 +106,9 @@ void tcp_server_endpoint_impl::send_queued(queue_iterator_type _queue_iterator)
VSOMEIP_INFO << "Didn't find connection: "
<< _queue_iterator->first.address().to_string() << ":" << std::dec
<< static_cast<std::uint16_t>(_queue_iterator->first.port())
- << " dropping message.";
- _queue_iterator->second.pop_front();
+ << " dropping outstanding messages (" << std::dec
+ << _queue_iterator->second.size() << ").";
+ _queue_iterator->second.clear();
}
}
if (its_connection) {
@@ -139,13 +140,25 @@ bool tcp_server_endpoint_impl::get_default_target(service_t,
void tcp_server_endpoint_impl::remove_connection(
tcp_server_endpoint_impl::connection *_connection) {
- std::lock_guard<std::mutex> its_lock(connections_mutex_);
- for (auto it = connections_.begin(); it != connections_.end();) {
- if (it->second.get() == _connection) {
- it = connections_.erase(it);
- break;
- } else {
- ++it;
+ endpoint_type its_target;
+ {
+ std::lock_guard<std::mutex> its_lock(connections_mutex_);
+ for (auto it = connections_.begin(); it != connections_.end();) {
+ if (it->second.get() == _connection) {
+ its_target = it->first;
+ it = connections_.erase(it);
+ break;
+ } else {
+ ++it;
+ }
+ }
+ }
+ {
+ // delete outstanding responses for this connection as well
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ const auto found_target = queues_.find(its_target);
+ if (found_target != queues_.end()) {
+ found_target->second.clear();
}
}
}
@@ -295,7 +308,7 @@ void tcp_server_endpoint_impl::connection::stop() {
}
void tcp_server_endpoint_impl::connection::send_queued(
- queue_iterator_type _queue_iterator) {
+ const queue_iterator_type _queue_iterator) {
std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock());
if (!its_server) {
VSOMEIP_TRACE << "tcp_server_endpoint_impl::connection::send_queued "
@@ -318,7 +331,8 @@ void tcp_server_endpoint_impl::connection::send_queued(
boost::asio::async_write(socket_, boost::asio::buffer(*its_buffer),
std::bind(&tcp_server_endpoint_base_impl::send_cbk,
its_server,
- _queue_iterator, std::placeholders::_1,
+ _queue_iterator,
+ std::placeholders::_1,
std::placeholders::_2));
}
}
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
index ce1a276..7de96b4 100644
--- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
@@ -120,7 +120,7 @@ bool udp_server_endpoint_impl::send_to(
}
void udp_server_endpoint_impl::send_queued(
- queue_iterator_type _queue_iterator) {
+ const queue_iterator_type _queue_iterator) {
message_buffer_ptr_t its_buffer = _queue_iterator->second.front();
#if 0
std::stringstream msg;