summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/server_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/server_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/server_endpoint_impl.cpp64
1 files changed, 54 insertions, 10 deletions
diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp
index 8ffbfdb..17e26df 100644
--- a/implementation/endpoints/src/server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/server_endpoint_impl.cpp
@@ -25,8 +25,10 @@ namespace vsomeip {
template<typename Protocol>
server_endpoint_impl<Protocol>::server_endpoint_impl(
std::shared_ptr<endpoint_host> _host, endpoint_type _local,
- boost::asio::io_service &_io, std::uint32_t _max_message_size)
- : endpoint_impl<Protocol>(_host, _local, _io, _max_message_size),
+ boost::asio::io_service &_io, std::uint32_t _max_message_size,
+ configuration::endpoint_queue_limit_t _queue_limit)
+ : endpoint_impl<Protocol>(_host, _local, _io, _max_message_size,
+ _queue_limit),
flush_timer_(_io) {
}
@@ -140,25 +142,65 @@ bool server_endpoint_impl<Protocol>::send_intern(
target_queue_iterator = queues_.insert(queues_.begin(),
std::make_pair(
_target,
- std::deque<message_buffer_ptr_t>()
+ std::make_pair(std::size_t(0),
+ std::deque<message_buffer_ptr_t>())
));
}
// TODO compare against value from configuration here
- const bool queue_size_zero_on_entry(target_queue_iterator->second.empty());
+ const bool queue_size_zero_on_entry(target_queue_iterator->second.second.empty());
if (target_packetizer->size() + _size
> endpoint_impl<Protocol>::max_message_size_
&& !target_packetizer->empty()) {
- target_queue_iterator->second.push_back(target_packetizer);
+ target_queue_iterator->second.second.push_back(target_packetizer);
+ target_queue_iterator->second.first += target_packetizer->size();
target_packetizer = std::make_shared<message_buffer_t>();
packetizer_[_target] = target_packetizer;
}
+ if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
+ && target_queue_iterator->second.first + _size >
+ endpoint_impl<Protocol>::queue_limit_) {
+ service_t its_service(0);
+ method_t its_method(0);
+ client_t its_client(0);
+ session_t its_session(0);
+ if (_size >= VSOMEIP_SESSION_POS_MAX) {
+ // this will yield wrong IDs for local communication as the commands
+ // are prepended to the actual payload
+ // it will print:
+ // (lowbyte service ID + highbyte methoid)
+ // [(Command + lowerbyte sender's client ID).
+ // highbyte sender's client ID + lowbyte command size.
+ // lowbyte methodid + highbyte vsomeipd length]
+ its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
+ _data[VSOMEIP_SERVICE_POS_MAX]);
+ its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
+ _data[VSOMEIP_METHOD_POS_MAX]);
+ its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
+ _data[VSOMEIP_CLIENT_POS_MAX]);
+ its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
+ _data[VSOMEIP_SESSION_POS_MAX]);
+ }
+ VSOMEIP_ERROR << "sei::send_intern: queue size limit (" << std::dec
+ << endpoint_impl<Protocol>::queue_limit_
+ << ") reached. Dropping message ("
+ << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
+ << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
+ << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"
+ << " queue_size: " << std::dec << target_queue_iterator->second.first
+ << " data size: " << std::dec << _size;
+ return false;
+ }
+
+
target_packetizer->insert(target_packetizer->end(), _data, _data + _size);
if (_flush) {
flush_timer_.cancel();
- target_queue_iterator->second.push_back(target_packetizer);
+ target_queue_iterator->second.second.push_back(target_packetizer);
+ target_queue_iterator->second.first += target_packetizer->size();
packetizer_[_target] = std::make_shared<message_buffer_t>();
} else {
std::chrono::milliseconds flush_timeout(VSOMEIP_DEFAULT_FLUSH_TIMEOUT);
@@ -170,7 +212,7 @@ bool server_endpoint_impl<Protocol>::send_intern(
std::placeholders::_1));
}
- if (queue_size_zero_on_entry && !target_queue_iterator->second.empty()) { // no writing in progress
+ if (queue_size_zero_on_entry && !target_queue_iterator->second.second.empty()) { // no writing in progress
send_queued(target_queue_iterator);
}
@@ -183,7 +225,7 @@ bool server_endpoint_impl<Protocol>::flush(
bool is_flushed = false;
std::lock_guard<std::mutex> its_lock(mutex_);
auto queue_iterator = queues_.find(_target);
- if (queue_iterator != queues_.end() && !queue_iterator->second.empty()) {
+ if (queue_iterator != queues_.end() && !queue_iterator->second.second.empty()) {
send_queued(queue_iterator);
is_flushed = true;
}
@@ -205,8 +247,10 @@ void server_endpoint_impl<Protocol>::send_cbk(
std::lock_guard<std::mutex> its_lock(mutex_);
if (!_error) {
- _queue_iterator->second.pop_front();
- if (_queue_iterator->second.size() > 0) {
+ _queue_iterator->second.first -=
+ _queue_iterator->second.second.front()->size();
+ _queue_iterator->second.second.pop_front();
+ if (_queue_iterator->second.second.size() > 0) {
send_queued(_queue_iterator);
}
} else {