diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:05 -0800 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:05 -0800 |
commit | 79fd5f7a34ed33392f71fa914a60b2e68b28de68 (patch) | |
tree | 5ea93513d0173ffe6dea57545cc5b28db591f082 /implementation/endpoints | |
parent | 5c43d511bd5b5e15eca521c4c71dfa69c6f1c90f (diff) | |
download | vSomeIP-79fd5f7a34ed33392f71fa914a60b2e68b28de68.tar.gz |
vsomeip 2.10.02.10.0
Diffstat (limited to 'implementation/endpoints')
12 files changed, 136 insertions, 63 deletions
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index 57a3084..b656f8b 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -45,7 +45,7 @@ public: const byte_t *_data, uint32_t _size, bool _flush = true);
bool flush();
- void stop();
+ virtual void stop();
virtual void restart() = 0;
bool is_client() const;
diff --git a/implementation/endpoints/include/local_client_endpoint_impl.hpp b/implementation/endpoints/include/local_client_endpoint_impl.hpp index eb1b310..7ff96ff 100644 --- a/implementation/endpoints/include/local_client_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_client_endpoint_impl.hpp @@ -39,6 +39,7 @@ public: virtual ~local_client_endpoint_impl(); void start(); + void stop(); bool is_local() const; diff --git a/implementation/endpoints/include/local_server_endpoint_impl.hpp b/implementation/endpoints/include/local_server_endpoint_impl.hpp index f9fa0ab..33658fb 100644 --- a/implementation/endpoints/include/local_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_server_endpoint_impl.hpp @@ -60,7 +60,7 @@ public: bool send_to(const std::shared_ptr<endpoint_definition>, const byte_t *_data, uint32_t _size, bool _flush); - void send_queued(queue_iterator_type _queue_iterator); + void send_queued(const queue_iterator_type _queue_iterator); bool get_default_target(service_t, endpoint_type &) const; @@ -84,7 +84,7 @@ private: void start(); void stop(); - void send_queued(queue_iterator_type _queue_iterator); + void send_queued(const queue_iterator_type _queue_iterator); void set_bound_client(client_t _client); @@ -95,7 +95,6 @@ private: std::uint32_t _buffer_shrink_threshold, boost::asio::io_service &_io_service); - void send_magic_cookie(); void receive_cbk(boost::system::error_code const &_error, std::size_t _bytes); void calculate_shrink_count(); diff --git a/implementation/endpoints/include/server_endpoint_impl.hpp b/implementation/endpoints/include/server_endpoint_impl.hpp index 710df35..ce89c6b 100644 --- a/implementation/endpoints/include/server_endpoint_impl.hpp +++ b/implementation/endpoints/include/server_endpoint_impl.hpp @@ -46,7 +46,7 @@ public: public:
void connect_cbk(boost::system::error_code const &_error);
- void send_cbk(queue_iterator_type _queue_iterator,
+ void send_cbk(const queue_iterator_type _queue_iterator,
boost::system::error_code const &_error, std::size_t _bytes);
void flush_cbk(endpoint_type _target,
const boost::system::error_code &_error);
@@ -54,7 +54,7 @@ public: public:
virtual bool send_intern(endpoint_type _target, const byte_t *_data,
uint32_t _port, bool _flush);
- virtual void send_queued(queue_iterator_type _queue_iterator) = 0;
+ virtual void send_queued(const queue_iterator_type _queue_iterator) = 0;
virtual bool get_default_target(service_t _service,
endpoint_type &_target) const = 0;
diff --git a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp index fd725d2..09c2ae5 100644 --- a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp @@ -38,7 +38,7 @@ public: bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush);
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
VSOMEIP_EXPORT bool is_established(std::shared_ptr<endpoint_definition> _endpoint);
@@ -72,7 +72,7 @@ private: void stop();
void receive();
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
void set_remote_info(const endpoint_type &_remote);
diff --git a/implementation/endpoints/include/udp_server_endpoint_impl.hpp b/implementation/endpoints/include/udp_server_endpoint_impl.hpp index 787a824..aae7a6b 100644 --- a/implementation/endpoints/include/udp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_server_endpoint_impl.hpp @@ -35,7 +35,7 @@ public: bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush);
- void send_queued(queue_iterator_type _queue_iterator);
+ void send_queued(const queue_iterator_type _queue_iterator);
void join(const std::string &_address);
void leave(const std::string &_address);
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 9744a22..3783add 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -60,6 +60,8 @@ void client_endpoint_impl<Protocol>::stop() { {
std::lock_guard<std::mutex> its_lock(mutex_);
endpoint_impl<Protocol>::sending_blocked_ = true;
+ // delete unsent messages
+ queue_.clear();
}
{
std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
@@ -67,28 +69,6 @@ void client_endpoint_impl<Protocol>::stop() { connect_timer_.cancel(ec);
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
-
- bool is_open(false);
- {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- is_open = socket_->is_open();
- }
- if (is_open) {
- bool send_queue_empty(false);
- std::uint32_t times_slept(0);
-
- while (times_slept <= 50) {
- mutex_.lock();
- send_queue_empty = (queue_.size() == 0);
- mutex_.unlock();
- if (send_queue_empty) {
- break;
- } else {
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- times_slept++;
- }
- }
- }
shutdown_and_close_socket();
}
@@ -253,6 +233,10 @@ void client_endpoint_impl<Protocol>::send_cbk( std::lock_guard<std::mutex> its_lock(mutex_);
if (endpoint_impl<Protocol>::sending_blocked_) {
queue_.clear();
+ } else {
+ VSOMEIP_WARNING << "cei::send_cbk received error: "
+ << _error.message() << " (" << std::dec
+ << _error.value() << ") " << std::dec << queue_.size();
}
}
shutdown_and_close_socket();
@@ -264,6 +248,9 @@ void client_endpoint_impl<Protocol>::send_cbk( } else if (_error == boost::asio::error::operation_aborted) {
// endpoint was stopped
shutdown_and_close_socket();
+ } else {
+ VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ")" ;
}
}
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index f6e1b5c..52970ca 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -62,6 +62,42 @@ void local_client_endpoint_impl::start() { connect();
}
+void local_client_endpoint_impl::stop() {
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ sending_blocked_ = true;
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
+ boost::system::error_code ec;
+ connect_timer_.cancel(ec);
+ }
+ connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
+
+ bool is_open(false);
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ is_open = socket_->is_open();
+ }
+ if (is_open) {
+ bool send_queue_empty(false);
+ std::uint32_t times_slept(0);
+
+ while (times_slept <= 50) {
+ mutex_.lock();
+ send_queue_empty = (queue_.size() == 0);
+ mutex_.unlock();
+ if (send_queue_empty) {
+ break;
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ times_slept++;
+ }
+ }
+ }
+ shutdown_and_close_socket();
+}
+
void local_client_endpoint_impl::connect() {
boost::system::error_code its_connect_error;
{
@@ -87,6 +123,11 @@ void local_client_endpoint_impl::connect() { its_host->get_client());
}
}
+ } else {
+ VSOMEIP_WARNING << "local_client_endpoint::connect: Couldn't "
+ << "connect to: " << remote_.path() << " ("
+ << its_connect_error.message() << " / " << std::dec
+ << its_connect_error.value() << ")";
}
#endif
diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp index 5a44f2f..460fe40 100644 --- a/implementation/endpoints/src/local_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp @@ -136,11 +136,29 @@ bool local_server_endpoint_impl::send_to( } void local_server_endpoint_impl::send_queued( - queue_iterator_type _queue_iterator) { - std::lock_guard<std::mutex> its_lock(connections_mutex_); - auto connection_iterator = connections_.find(_queue_iterator->first); - if (connection_iterator != connections_.end()) - connection_iterator->second->send_queued(_queue_iterator); + const queue_iterator_type _queue_iterator) { + connection::ptr its_connection; + { + std::lock_guard<std::mutex> its_lock(connections_mutex_); + auto connection_iterator = connections_.find(_queue_iterator->first); + if (connection_iterator != connections_.end()) { + connection_iterator->second->send_queued(_queue_iterator); + } else { + VSOMEIP_INFO << "Didn't find connection: " +#ifdef _WIN32 + << _queue_iterator->first.address().to_string() << ":" << std::dec + << static_cast<std::uint16_t>(_queue_iterator->first.port()) +#else + << _queue_iterator->first.path() +#endif + << " dropping outstanding messages (" << std::dec + << _queue_iterator->second.size() << ")."; + _queue_iterator->second.clear(); + } + } + if (its_connection) { + its_connection->send_queued(_queue_iterator); + } } void local_server_endpoint_impl::receive() { @@ -155,13 +173,25 @@ bool local_server_endpoint_impl::get_default_target( void local_server_endpoint_impl::remove_connection( local_server_endpoint_impl::connection *_connection) { - std::lock_guard<std::mutex> its_lock(connections_mutex_); - for (auto it = connections_.begin(); it != connections_.end();) { - if (it->second.get() == _connection) { - it = connections_.erase(it); - break; - } else { - ++it; + endpoint_type its_target; + { + std::lock_guard<std::mutex> its_lock(connections_mutex_); + for (auto it = connections_.begin(); it != connections_.end();) { + if (it->second.get() == _connection) { + its_target = it->first; + it = connections_.erase(it); + break; + } else { + ++it; + } + } + } + { + // delete outstanding responses for this connection as well + std::lock_guard<std::mutex> its_lock(mutex_); + const auto found_target = queues_.find(its_target); + if (found_target != queues_.end()) { + found_target->second.clear(); } } } @@ -318,7 +348,7 @@ void local_server_endpoint_impl::connection::stop() { } void local_server_endpoint_impl::connection::send_queued( - queue_iterator_type _queue_iterator) { + const queue_iterator_type _queue_iterator) { // TODO: We currently do _not_ use the send method of the local server // endpoints. If we ever need it, we need to add the "start tag", "data", @@ -355,9 +385,6 @@ void local_server_endpoint_impl::connection::send_queued( } } -void local_server_endpoint_impl::connection::send_magic_cookie() { -} - void local_server_endpoint_impl::connection::receive_cbk( boost::system::error_code const &_error, std::size_t _bytes) { diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp index 3d00ca4..39c6be2 100644 --- a/implementation/endpoints/src/server_endpoint_impl.cpp +++ b/implementation/endpoints/src/server_endpoint_impl.cpp @@ -199,16 +199,20 @@ void server_endpoint_impl<Protocol>::connect_cbk( template<typename Protocol>
void server_endpoint_impl<Protocol>::send_cbk(
- queue_iterator_type _queue_iterator, boost::system::error_code const &_error,
- std::size_t _bytes) {
+ const queue_iterator_type _queue_iterator,
+ boost::system::error_code const &_error, std::size_t _bytes) {
(void)_bytes;
+ std::lock_guard<std::mutex> its_lock(mutex_);
if (!_error) {
- std::lock_guard<std::mutex> its_lock(mutex_);
_queue_iterator->second.pop_front();
if (_queue_iterator->second.size() > 0) {
send_queued(_queue_iterator);
}
+ } else {
+ // error: sending of outstanding responses isn't started again
+ // delete remaining outstanding responses
+ _queue_iterator->second.clear();
}
}
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index f91f7f6..e39589a 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -95,7 +95,7 @@ bool tcp_server_endpoint_impl::send_to( return send_intern(its_target, _data, _size, _flush); } -void tcp_server_endpoint_impl::send_queued(queue_iterator_type _queue_iterator) { +void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iterator) { connection::ptr its_connection; { std::lock_guard<std::mutex> its_lock(connections_mutex_); @@ -106,8 +106,9 @@ void tcp_server_endpoint_impl::send_queued(queue_iterator_type _queue_iterator) VSOMEIP_INFO << "Didn't find connection: " << _queue_iterator->first.address().to_string() << ":" << std::dec << static_cast<std::uint16_t>(_queue_iterator->first.port()) - << " dropping message."; - _queue_iterator->second.pop_front(); + << " dropping outstanding messages (" << std::dec + << _queue_iterator->second.size() << ")."; + _queue_iterator->second.clear(); } } if (its_connection) { @@ -139,13 +140,25 @@ bool tcp_server_endpoint_impl::get_default_target(service_t, void tcp_server_endpoint_impl::remove_connection( tcp_server_endpoint_impl::connection *_connection) { - std::lock_guard<std::mutex> its_lock(connections_mutex_); - for (auto it = connections_.begin(); it != connections_.end();) { - if (it->second.get() == _connection) { - it = connections_.erase(it); - break; - } else { - ++it; + endpoint_type its_target; + { + std::lock_guard<std::mutex> its_lock(connections_mutex_); + for (auto it = connections_.begin(); it != connections_.end();) { + if (it->second.get() == _connection) { + its_target = it->first; + it = connections_.erase(it); + break; + } else { + ++it; + } + } + } + { + // delete outstanding responses for this connection as well + std::lock_guard<std::mutex> its_lock(mutex_); + const auto found_target = queues_.find(its_target); + if (found_target != queues_.end()) { + found_target->second.clear(); } } } @@ -295,7 +308,7 @@ void tcp_server_endpoint_impl::connection::stop() { } void tcp_server_endpoint_impl::connection::send_queued( - queue_iterator_type _queue_iterator) { + const queue_iterator_type _queue_iterator) { std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock()); if (!its_server) { VSOMEIP_TRACE << "tcp_server_endpoint_impl::connection::send_queued " @@ -318,7 +331,8 @@ void tcp_server_endpoint_impl::connection::send_queued( boost::asio::async_write(socket_, boost::asio::buffer(*its_buffer), std::bind(&tcp_server_endpoint_base_impl::send_cbk, its_server, - _queue_iterator, std::placeholders::_1, + _queue_iterator, + std::placeholders::_1, std::placeholders::_2)); } } diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index ce1a276..7de96b4 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -120,7 +120,7 @@ bool udp_server_endpoint_impl::send_to( } void udp_server_endpoint_impl::send_queued( - queue_iterator_type _queue_iterator) { + const queue_iterator_type _queue_iterator) { message_buffer_ptr_t its_buffer = _queue_iterator->second.front(); #if 0 std::stringstream msg; |