summaryrefslogtreecommitdiff
path: root/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp')
-rw-r--r--test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp1305
1 files changed, 692 insertions, 613 deletions
diff --git a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
index 9668c66..6d597cb 100644
--- a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
+++ b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
@@ -51,6 +51,12 @@ protected:
std::thread io_thread_;
};
+/*
+ * @test Send 16 subscriptions to the service and check that every
+ * subscription is answered with an SubscribeEventgroupAck entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
TEST_F(pending_subscription, send_multiple_subscriptions)
{
std::promise<bool> trigger_notifications;
@@ -61,90 +67,88 @@ TEST_F(pending_subscription, send_multiple_subscriptions)
udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
std::thread receive_thread([&](){
- std::atomic<bool> keep_receiving(true);
- std::function<void()> receive;
+ bool keep_receiving(true);
std::vector<std::uint8_t> receive_buffer(4096);
std::vector<vsomeip::event_t> its_received_events;
+ std::uint32_t subscribe_acks_receiveid = 0;
+ std::uint32_t events_received = 0;
- const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
- const boost::system::error_code& error, std::size_t bytes_transferred) {
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
if (error) {
keep_receiving = false;
ADD_FAILURE() << __func__ << " error: " << error.message();
- return;
- }
- #if 0
- std::stringstream str;
- for (size_t i = 0; i < bytes_transferred; i++) {
- str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
- }
- std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
- #endif
-
- vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
- vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
- receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
- vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
- receive_buffer[VSOMEIP_METHOD_POS_MAX]);
- if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
- vsomeip::sd::message_impl sd_msg;
- EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
- EXPECT_EQ(2u, sd_msg.get_entries().size());
- for (auto e : sd_msg.get_entries()) {
- EXPECT_TRUE(e->is_eventgroup_entry());
- EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
- EXPECT_EQ(3u, e->get_ttl());
- EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
- EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
- if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
- std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
- std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
- EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
- its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ } else {
+
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(3u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ subscribe_acks_receiveid++;
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ events_received = 2;
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ }
}
}
- EXPECT_EQ(0u, sd_msg.get_options().size());
- } else { // non-sd-message
- vsomeip::message_impl msg;
- EXPECT_TRUE(msg.deserialize(&its_deserializer));
- if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
- EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
- EXPECT_EQ(0x2222, msg.get_client());
- } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
- its_received_events.push_back(msg.get_method());
- if (its_received_events.size() == 2) {
- EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
- EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
- }
- EXPECT_EQ(1u, msg.get_payload()->get_length());
- EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(0x0, msg.get_client());
+ if (subscribe_acks_receiveid == 30) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ subscribe_acks_receiveid++; // don't set promise value again
+ }
+ if (its_received_events.size() == 2 && events_received == 2) {
+ // all events received as well
+ keep_receiving = false;
}
}
-
- static int called = 0;
- if (++called == 15) { // all subscribeAcks received
- trigger_notifications.set_value(true);
- }
- if (called == 18) { // events were received as well
- keep_receiving = false;
- }
- if (!error && keep_receiving) {
- receive();
- }
- };
-
- receive = [&]() {
- udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
- receive_cbk);
- };
-
- receive();
- while(keep_receiving) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
@@ -221,6 +225,13 @@ TEST_F(pending_subscription, send_multiple_subscriptions)
udp_socket.close(ec);
}
+/*
+ * @test Send 16 subscriptions to the service while alternating between Subscribe
+ * and Unsubscribe and check that every SubscribeEventgroupEntry (ttl > 0)
+ * is answered with an SubscribeEventgroupAck entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
{
std::promise<bool> trigger_notifications;
@@ -242,97 +253,101 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
bool triggered_notifications(false);
- std::function<void()> receive;
std::vector<std::uint8_t> receive_buffer(4096);
std::vector<vsomeip::event_t> its_received_events;
- const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
- const boost::system::error_code& error, std::size_t bytes_transferred) {
+ bool keep_receiving(true);
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
if (error) {
+ keep_receiving = false;
acks_received = expected_acks;
responses_received = expected_responses;
ADD_FAILURE() << __func__ << " error: " << error.message();
- return;
- }
- #if 0
- std::stringstream str;
- for (size_t i = 0; i < bytes_transferred; i++) {
- str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
- }
- std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
- #endif
-
- vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
- vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
- receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
- vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
- receive_buffer[VSOMEIP_METHOD_POS_MAX]);
- if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
- vsomeip::sd::message_impl sd_msg;
- EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
- EXPECT_EQ(2u, sd_msg.get_entries().size());
- for (auto e : sd_msg.get_entries()) {
- EXPECT_TRUE(e->is_eventgroup_entry());
- EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
- EXPECT_EQ(16u, e->get_ttl());
- EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
- EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
- if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
- std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
- std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
- EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
- its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
- }
+ } else {
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
}
- EXPECT_EQ(0u, sd_msg.get_options().size());
- acks_received++;
- } else { // non-sd-message
- vsomeip::message_impl msg;
- EXPECT_TRUE(msg.deserialize(&its_deserializer));
- if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
- EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
- EXPECT_EQ(0x2222, msg.get_client());
- responses_received++;
- } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
- its_received_events.push_back(msg.get_method());
- if (its_received_events.size() == 2) {
- EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
- EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
}
- EXPECT_EQ(1u, msg.get_payload()->get_length());
- EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(0x0, msg.get_client());
- notifications_received++;
- }
- }
-
- if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
- trigger_notifications.set_value(true);
- triggered_notifications = true;
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
}
-
- if (!error && (acks_received != expected_acks ||
- responses_received != expected_responses ||
- notifications_received != expected_notifications)) {
- receive();
+ if (acks_received == expected_acks &&
+ responses_received == expected_responses &&
+ notifications_received == expected_notifications) {
+ keep_receiving = false;
}
- };
-
- receive = [&]() {
- udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
- receive_cbk);
- };
-
- receive();
- while(acks_received < expected_acks ||
- responses_received < expected_responses ||
- notifications_received < expected_notifications) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
+
+
EXPECT_EQ(expected_acks, acks_received);
EXPECT_EQ(expected_responses, responses_received);
EXPECT_EQ(expected_notifications, notifications_received);
@@ -418,6 +433,14 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
udp_socket.close(ec);
}
+/*
+ * @test Send 16 subscriptions to the service while only two contain a
+ * SubscribeEventgroupEntry and the rest contain StopSubscribeEventgroupEntries
+ * and check that all subscriptions with SubscribeEventgroupEntries are
+ * answered with an SubscribeEventgroupAck entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
TEST_F(pending_subscription, send_multiple_unsubscriptions)
{
std::promise<bool> trigger_notifications;
@@ -439,97 +462,98 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions)
bool triggered_notifications(false);
- std::function<void()> receive;
std::vector<std::uint8_t> receive_buffer(4096);
std::vector<vsomeip::event_t> its_received_events;
- const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
- const boost::system::error_code& error, std::size_t bytes_transferred) {
+ bool keep_receiving(true);
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
if (error) {
+ keep_receiving = false;
acks_received = expected_acks;
responses_received = expected_responses;
ADD_FAILURE() << __func__ << " error: " << error.message();
- return;
- }
- #if 0
- std::stringstream str;
- for (size_t i = 0; i < bytes_transferred; i++) {
- str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
- }
- std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
- #endif
-
- vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
- vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
- receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
- vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
- receive_buffer[VSOMEIP_METHOD_POS_MAX]);
- if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
- vsomeip::sd::message_impl sd_msg;
- EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
- EXPECT_EQ(2u, sd_msg.get_entries().size());
- for (auto e : sd_msg.get_entries()) {
- EXPECT_TRUE(e->is_eventgroup_entry());
- EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
- EXPECT_EQ(16u, e->get_ttl());
- EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
- EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
- if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
- std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
- std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
- EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
- its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
- }
+ } else {
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
}
- EXPECT_EQ(0u, sd_msg.get_options().size());
- acks_received++;
- } else { // non-sd-message
- vsomeip::message_impl msg;
- EXPECT_TRUE(msg.deserialize(&its_deserializer));
- if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
- EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
- EXPECT_EQ(0x2222, msg.get_client());
- responses_received++;
- } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
- its_received_events.push_back(msg.get_method());
- if (its_received_events.size() == 2) {
- EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
- EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+ std::uint32_t its_pos = 0;
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
}
- EXPECT_EQ(1u, msg.get_payload()->get_length());
- EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(0x0, msg.get_client());
- notifications_received++;
+ }
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
}
}
-
-
- if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
- trigger_notifications.set_value(true);
- triggered_notifications = true;
- }
-
- if (!error && (acks_received != expected_acks ||
- responses_received != expected_responses ||
- notifications_received != expected_notifications)) {
- receive();
+ if (acks_received == expected_acks &&
+ responses_received == expected_responses &&
+ notifications_received == expected_notifications) {
+ std::cerr << "every thing received" << std::endl;
+ keep_receiving = false;
}
- };
-
- receive = [&]() {
- udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
- receive_cbk);
- };
-
- receive();
- while(acks_received < expected_acks ||
- responses_received < expected_responses ||
- notifications_received < expected_notifications) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
+
EXPECT_EQ(expected_acks, acks_received);
EXPECT_EQ(expected_responses, responses_received);
EXPECT_EQ(expected_notifications, notifications_received);
@@ -615,6 +639,12 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions)
udp_socket.close(ec);
}
+/*
+ * @test Send 16 subscriptions to the service and check that every second
+ * subscription is answered with an SubscribeEventgroupNack entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
{
std::promise<bool> trigger_notifications;
@@ -638,107 +668,110 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
std::atomic<std::uint32_t> notifications_received(0);
bool triggered_notifications(false);
+ bool keep_receiving(true);
- std::function<void()> receive;
std::vector<std::uint8_t> receive_buffer(4096);
std::vector<vsomeip::event_t> its_received_events;
- const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
- const boost::system::error_code& error, std::size_t bytes_transferred) {
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
if (error) {
+ keep_receiving = false;
acks_received = expected_acks;
responses_received = expected_responses;
nacks_received = expected_nacks;
ADD_FAILURE() << __func__ << " error: " << error.message();
- return;
- }
- #if 0
- std::stringstream str;
- for (size_t i = 0; i < bytes_transferred; i++) {
- str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
- }
- std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
- #endif
-
- vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
- vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
- receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
- vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
- receive_buffer[VSOMEIP_METHOD_POS_MAX]);
- if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
- vsomeip::sd::message_impl sd_msg;
- EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
- EXPECT_EQ(2u, sd_msg.get_entries().size());
- for (auto e : sd_msg.get_entries()) {
- EXPECT_TRUE(e->is_eventgroup_entry());
- EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
- if (e->get_ttl()) {
- EXPECT_EQ(16u, e->get_ttl());
- acks_received++;
- } else {
- EXPECT_EQ(0u, e->get_ttl());
- nacks_received++;
- }
- EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
- EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
- if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
- std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
- std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
- EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
- its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
- }
+ } else {
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
}
- EXPECT_EQ(0u, sd_msg.get_options().size());
- } else { // non-sd-message
- vsomeip::message_impl msg;
- EXPECT_TRUE(msg.deserialize(&its_deserializer));
- if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
- EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
- EXPECT_EQ(0x2222, msg.get_client());
- responses_received++;
- } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
- its_received_events.push_back(msg.get_method());
- if (its_received_events.size() == 2) {
- EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
- EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+ std::uint32_t its_pos = 0;
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ if (e->get_ttl()) {
+ EXPECT_EQ(16u, e->get_ttl());
+ acks_received++;
+ } else {
+ EXPECT_EQ(0u, e->get_ttl());
+ nacks_received++;
+ }
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
}
- EXPECT_EQ(1u, msg.get_payload()->get_length());
- EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(0x0, msg.get_client());
- notifications_received++;
- }
- }
- if (!triggered_notifications && acks_received == expected_acks &&
- nacks_received == expected_nacks) { // all subscribeAcks received
- trigger_notifications.set_value(true);
- triggered_notifications = true;
+ if (!triggered_notifications && acks_received == expected_acks &&
+ nacks_received == expected_nacks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
}
-
- if (!error && (acks_received != expected_acks ||
- responses_received != expected_responses ||
- notifications_received != expected_notifications ||
- nacks_received != expected_nacks)) {
- receive();
+ if (nacks_received == expected_nacks &&
+ acks_received == expected_acks &&
+ notifications_received == expected_notifications &&
+ responses_received == expected_responses) {
+ keep_receiving = false;
}
- };
-
- receive = [&]() {
- udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
- receive_cbk);
- };
-
- receive();
- while(acks_received < expected_acks ||
- responses_received < expected_responses ||
- notifications_received < expected_notifications) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
+
EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_nacks, nacks_received);
EXPECT_EQ(expected_responses, responses_received);
EXPECT_EQ(expected_notifications, notifications_received);
});
@@ -823,6 +856,14 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
udp_socket.close(ec);
}
+/*
+ * @test Send 16 subscriptions containing an UDP and TCP endpoint option
+ * to the service while alternating between Subscribe
+ * and Unsubscribe and check that every SubscribeEventgroupEntry (ttl > 0)
+ * is answered with an SubscribeEventgroupAck entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
{
std::promise<bool> trigger_notifications;
@@ -848,7 +889,6 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
bool triggered_notifications(false);
- std::function<void()> receive;
std::vector<std::uint8_t> receive_buffer(4096);
std::vector<vsomeip::event_t> its_received_events;
@@ -858,93 +898,93 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
ASSERT_EQ(0, ec.value());
tcp_connected.set_value();
- const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
- const boost::system::error_code& error, std::size_t bytes_transferred) {
+ bool keep_receiving(true);
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
if (error) {
+ keep_receiving = false;
acks_received = expected_acks;
responses_received = expected_responses;
ADD_FAILURE() << __func__ << " error: " << error.message();
- return;
- }
- #if 0
- std::stringstream str;
- for (size_t i = 0; i < bytes_transferred; i++) {
- str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
- }
- std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
- #endif
-
- vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
- vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
- receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
- vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
- receive_buffer[VSOMEIP_METHOD_POS_MAX]);
- if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
- vsomeip::sd::message_impl sd_msg;
- EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
- EXPECT_EQ(2u, sd_msg.get_entries().size());
- for (auto e : sd_msg.get_entries()) {
- EXPECT_TRUE(e->is_eventgroup_entry());
- EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
- EXPECT_EQ(16u, e->get_ttl());
- EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
- EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
- if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
- std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
- std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
- EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
- its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
- }
- }
- EXPECT_EQ(0u, sd_msg.get_options().size());
- acks_received++;
- } else { // non-sd-message
- vsomeip::message_impl msg;
- EXPECT_TRUE(msg.deserialize(&its_deserializer));
- if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
- EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
- EXPECT_EQ(0x2222, msg.get_client());
- responses_received++;
- } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
- its_received_events.push_back(msg.get_method());
- if (its_received_events.size() == 2) {
- EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
- EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ } else {
+
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
}
- EXPECT_EQ(1u, msg.get_payload()->get_length());
- EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(0x0, msg.get_client());
- notifications_received++;
- }
- }
- if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
- trigger_notifications.set_value(true);
- triggered_notifications = true;
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
}
-
- if (!error && (acks_received != expected_acks ||
- responses_received != expected_responses ||
- notifications_received != expected_notifications)) {
- receive();
+ if (acks_received == expected_acks &&
+ responses_received == expected_responses &&
+ notifications_received == expected_notifications) {
+ keep_receiving = false;
}
- };
-
- receive = [&]() {
- udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
- receive_cbk);
- };
-
- receive();
- while(acks_received < expected_acks ||
- responses_received < expected_responses ||
- notifications_received < expected_notifications) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
+
+
EXPECT_EQ(expected_acks, acks_received);
EXPECT_EQ(expected_responses, responses_received);
EXPECT_EQ(expected_notifications, notifications_received);
@@ -1056,99 +1096,115 @@ TEST_F(pending_subscription, subscribe_resubscribe_mixed)
udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
std::thread receive_thread([&](){
- std::atomic<bool> keep_receiving(true);
- std::function<void()> receive;
std::vector<std::uint8_t> receive_buffer(4096);
std::vector<vsomeip::event_t> its_received_events;
- const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
- const boost::system::error_code& error, std::size_t bytes_transferred) {
+ const std::uint32_t expected_acks(3);
+ std::atomic<std::uint32_t> acks_received(0);
+
+ const std::uint32_t expected_responses(1);
+ std::atomic<std::uint32_t> responses_received(0);
+
+ const std::uint32_t expected_notifications(2);
+ std::atomic<std::uint32_t> notifications_received(0);
+
+ bool keep_receiving(true);
+ bool first_initial_event_checked(false);
+ bool second_initial_event_checked(false);
+
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transfered = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
if (error) {
keep_receiving = false;
ADD_FAILURE() << __func__ << " error: " << error.message();
- return;
- }
- #if 0
- std::stringstream str;
- for (size_t i = 0; i < bytes_transferred; i++) {
- str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
- }
- std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
- #endif
-
- vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
- vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
- receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
- vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
- receive_buffer[VSOMEIP_METHOD_POS_MAX]);
- if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
- vsomeip::sd::message_impl sd_msg;
- EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
- EXPECT_GE(2u, sd_msg.get_entries().size());
- for (auto e : sd_msg.get_entries()) {
- EXPECT_TRUE(e->is_eventgroup_entry());
- EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
- EXPECT_EQ(3u, e->get_ttl());
- EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
- EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
- if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
- std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
- std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
- EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
- its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ } else {
+
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transfered > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transfered -= its_message_size;
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_GE(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(3u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ acks_received++;
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
}
- }
- EXPECT_EQ(0u, sd_msg.get_options().size());
- } else { // non-sd-message
- vsomeip::message_impl msg;
- EXPECT_TRUE(msg.deserialize(&its_deserializer));
- if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
- EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
- EXPECT_EQ(0x2222, msg.get_client());
- } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
- its_received_events.push_back(msg.get_method());
- if (its_received_events.size() == 2) {
+
+ if (!first_initial_event_checked && notifications_received == 1) {
+ EXPECT_EQ(1u, its_received_events.size());
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
+ // all subscribeAcks and one initial event of first event received
+ first_initial_event_received.set_value();
+ first_initial_event_checked = true;
+ }
+
+ if (!second_initial_event_checked && notifications_received == 2) { // events were received as well
+ // all subscribeAcks and one initial event of second event received
+ EXPECT_EQ(2u, its_received_events.size());
EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]);
+ second_initial_event_received.set_value();
+ second_initial_event_checked = true;
+ }
+ if (notifications_received == 2 && responses_received == 1) {
+ keep_receiving = false;
}
- EXPECT_EQ(1u, msg.get_payload()->get_length());
- EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(0x0, msg.get_client());
}
}
-
- static int called = 0;
- if (++called == 2) {
- EXPECT_EQ(1u, its_received_events.size());
- EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
- // all subscribeAcks and one initial event of first event received
- first_initial_event_received.set_value();
- }
- if (called == 4) { // events were received as well
- // all subscribeAcks and one initial event of second event received
- EXPECT_EQ(2u, its_received_events.size());
- EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
- EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]);
- keep_receiving = false;
- second_initial_event_received.set_value();
- }
- if (!error && keep_receiving) {
- receive();
- }
- };
-
- receive = [&]() {
- udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
- receive_cbk);
- };
-
- receive();
- while(keep_receiving) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
+ EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_notifications, notifications_received);
+ EXPECT_EQ(expected_responses, responses_received);
});
std::thread send_thread([&]() {
@@ -1261,7 +1317,7 @@ TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe)
tcp_socket.set_option(boost::asio::socket_base::linger(true, 0));
std::thread receive_thread([&](){
- const std::uint32_t expected_acks(1);
+ const std::uint32_t expected_acks(2);
std::atomic<std::uint32_t> acks_received(0);
const std::uint32_t expected_responses(1);
@@ -1272,7 +1328,6 @@ TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe)
bool triggered_notifications(false);
- std::function<void()> receive;
std::vector<std::uint8_t> receive_buffer(4096);
std::vector<vsomeip::event_t> its_received_events;
@@ -1282,88 +1337,93 @@ TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe)
ASSERT_EQ(0, ec.value());
tcp_connected.set_value();
- const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
- const boost::system::error_code& error, std::size_t bytes_transferred) {
+ bool keep_receiving(true);
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
if (error) {
+ keep_receiving = false;
acks_received = expected_acks;
responses_received = expected_responses;
ADD_FAILURE() << __func__ << " error: " << error.message();
- return;
- }
- #if 0
- std::stringstream str;
- for (size_t i = 0; i < bytes_transferred; i++) {
- str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
- }
- std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
- #endif
-
- vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
- vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
- receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
- vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
- receive_buffer[VSOMEIP_METHOD_POS_MAX]);
- if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
- vsomeip::sd::message_impl sd_msg;
- EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
- EXPECT_EQ(1u, sd_msg.get_entries().size());
- for (auto e : sd_msg.get_entries()) {
- EXPECT_TRUE(e->is_eventgroup_entry());
- EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
- EXPECT_EQ(16u, e->get_ttl());
- EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
- EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
- if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
- std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
- std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
- EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id);
- }
- }
- EXPECT_EQ(0u, sd_msg.get_options().size());
- acks_received++;
- } else { // non-sd-message
- vsomeip::message_impl msg;
- EXPECT_TRUE(msg.deserialize(&its_deserializer));
- if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
- EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
- EXPECT_EQ(0x2222, msg.get_client());
- responses_received++;
- } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
- its_received_events.push_back(msg.get_method());
- EXPECT_EQ(1u, its_received_events.size());
- EXPECT_EQ(1u, msg.get_payload()->get_length());
- EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(0x0, msg.get_client());
- notifications_received++;
- }
- }
+ } else {
+ std::uint32_t its_pos = 0;
- if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
- trigger_notifications.set_value(true);
- triggered_notifications = true;
- }
+ while (bytes_transferred > 0) {
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(1u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ EXPECT_EQ(1u, its_received_events.size());
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
+ }
- if (!error && (acks_received != expected_acks ||
- responses_received != expected_responses ||
- notifications_received != expected_notifications)) {
- receive();
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
+ if (acks_received == expected_acks &&
+ responses_received == expected_responses &&
+ notifications_received == expected_notifications) {
+ keep_receiving = false;
+ }
}
- };
-
- receive = [&]() {
- udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
- receive_cbk);
- };
-
- receive();
- while(acks_received < expected_acks ||
- responses_received < expected_responses ||
- notifications_received < expected_notifications) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
EXPECT_EQ(expected_acks, acks_received);
EXPECT_EQ(expected_responses, responses_received);
@@ -1376,10 +1436,26 @@ TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe)
}
try {
+ std::uint8_t its_normal_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x30, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x10, // length entries array
+ 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
std::uint8_t its_subscribe_message[] = {
0xff, 0xff, 0x81, 0x00,
0x00, 0x00, 0x00, 0x50, // length
- 0x00, 0x00, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x02,
0x01, 0x01, 0x02, 0x00,
0xc0, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x30, // length entries array
@@ -1404,10 +1480,13 @@ TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe)
boost::asio::ip::address its_local_address =
boost::asio::ip::address::from_string(std::string(local_address));
std::memcpy(&its_subscribe_message[80], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_normal_subscribe_message[48], &its_local_address.to_v4().to_bytes()[0], 4);
boost::asio::ip::udp::socket::endpoint_type target_sd(
boost::asio::ip::address::from_string(std::string(remote_address)),
30490);
+
+ udp_socket.send_to(boost::asio::buffer(its_normal_subscribe_message), target_sd);
udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
@@ -1466,80 +1545,80 @@ TEST_F(pending_subscription, send_request_to_sd_port)
udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
std::thread receive_thread([&](){
- std::atomic<bool> keep_receiving(true);
- std::function<void()> receive;
+ bool keep_receiving(true);
std::vector<std::uint8_t> receive_buffer(4096);
std::vector<vsomeip::event_t> its_received_events;
- const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&](
- const boost::system::error_code& error, std::size_t bytes_transferred) {
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
if (error) {
keep_receiving = false;
ADD_FAILURE() << __func__ << " error: " << error.message();
- return;
- }
- #if 0
- std::stringstream str;
- for (size_t i = 0; i < bytes_transferred; i++) {
- str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
- }
- std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
- #endif
-
- static int offers_received = 0;
- static int responses_received = 0;
- vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0);
- vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN],
- receive_buffer[VSOMEIP_SERVICE_POS_MAX]);
- vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN],
- receive_buffer[VSOMEIP_METHOD_POS_MAX]);
- if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
- vsomeip::sd::message_impl sd_msg;
- EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
- EXPECT_EQ(1u, sd_msg.get_entries().size());
- EXPECT_EQ(2u, sd_msg.get_options().size());
- for (auto e : sd_msg.get_entries()) {
- EXPECT_TRUE(e->is_service_entry());
- EXPECT_EQ(vsomeip::sd::entry_type_e::OFFER_SERVICE, e->get_type());
- EXPECT_EQ(0xffffffu, e->get_ttl());
- EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
- EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
- offers_received++;
- }
- } else { // non-sd-message
- vsomeip::message_impl msg;
- EXPECT_TRUE(msg.deserialize(&its_deserializer));
- if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
- EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
- EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
- EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
- EXPECT_EQ(0x2222, msg.get_client());
- responses_received++;
- }
- }
+ } else {
+ std::uint32_t its_pos = 0;
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+ static int offers_received = 0;
+ static int responses_received = 0;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(1u, sd_msg.get_entries().size());
+ EXPECT_EQ(2u, sd_msg.get_options().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_service_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::OFFER_SERVICE, e->get_type());
+ EXPECT_EQ(0xffffffu, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ offers_received++;
+ }
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ }
+ }
- if (responses_received == 1) { // resonse to shutdown method was received as well
- keep_receiving = false;
- } else if (offers_received == 3 ) { // all multiple offers received
- try {
- all_offers_received.set_value(true);
- } catch (const std::future_error& e) {
+ if (responses_received == 1) { // response to shutdown method was received as well
+ keep_receiving = false;
+ } else if (offers_received == 3 ) { // all multiple offers received
+ try {
+ all_offers_received.set_value(true);
+ } catch (const std::future_error& e) {
+ }
+ }
}
}
- if (!error && keep_receiving) {
- receive();
- }
- };
-
- receive = [&]() {
- udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()),
- receive_cbk);
- };
-
- receive();
- while(keep_receiving) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});