summaryrefslogtreecommitdiff
path: root/test/network_tests/pending_subscription_tests
diff options
context:
space:
mode:
Diffstat (limited to 'test/network_tests/pending_subscription_tests')
-rw-r--r--test/network_tests/pending_subscription_tests/conf/pending_subscription_test_master.json.in55
-rwxr-xr-xtest/network_tests/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in72
-rw-r--r--test/network_tests/pending_subscription_tests/pending_subscription_test_globals.hpp36
-rw-r--r--test/network_tests/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp1735
-rw-r--r--test/network_tests/pending_subscription_tests/pending_subscription_test_service.cpp389
5 files changed, 2287 insertions, 0 deletions
diff --git a/test/network_tests/pending_subscription_tests/conf/pending_subscription_test_master.json.in b/test/network_tests/pending_subscription_tests/conf/pending_subscription_test_master.json.in
new file mode 100644
index 0000000..5bcbe9a
--- /dev/null
+++ b/test/network_tests/pending_subscription_tests/conf/pending_subscription_test_master.json.in
@@ -0,0 +1,55 @@
+{
+ "unicast":"@TEST_IP_MASTER@",
+ "logging":
+ {
+ "level":"info",
+ "console":"true",
+ "file":
+ {
+ "enable":"false",
+ "path":"/tmp/vsomeip.log"
+ },
+ "dlt":"false"
+ },
+ "applications" :
+ [
+ {
+ "name" : "pending_subscription_test_service",
+ "id" : "0xCAFE",
+ "max_dispatch_time" : "1000"
+ }
+ ],
+ "services":
+ [
+ {
+ "service":"0x1122",
+ "instance":"0x0001",
+ "unreliable":"30001",
+ "reliable":
+ {
+ "port":"40001",
+ "enable-magic-cookies":"false"
+ },
+ "events" :
+ [
+ {
+ "event" : "0x1111",
+ "is_reliable" : "false"
+ },
+ {
+ "event" : "0x1112",
+ "is_reliable" : "false"
+ }
+ ]
+ }
+ ],
+ "routing":"routingmanagerd",
+ "service-discovery":
+ {
+ "enable":"true",
+ "multicast":"224.0.23.1",
+ "port":"30490",
+ "protocol":"udp",
+ "cyclic_offer_delay" : "1000"
+ }
+} \ No newline at end of file
diff --git a/test/network_tests/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in b/test/network_tests/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in
new file mode 100755
index 0000000..dc4b1cd
--- /dev/null
+++ b/test/network_tests/pending_subscription_tests/conf/pending_subscription_test_master_starter.sh.in
@@ -0,0 +1,72 @@
+#!/bin/bash
+# Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+# Purpose: This script is needed to start the services with
+# one command. This is necessary as ctest - which is used to run the
+# tests - isn't able to start multiple binaries for one testcase. Therefore
+# the testcase simply executes this script. This script then runs the services
+# and checks that all exit successfully.
+
+FAIL=0
+
+if [ $# -lt 1 ]
+then
+ echo "Please pass a test mode to this script."
+ echo "For example: $0 SUSCRIBE"
+ echo "Valid subscription types include:"
+ echo " [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT, SUBSCRIBE_RESUBSCRIBE_MIXED, SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE, REQUEST_TO_SD]"
+ exit 1
+fi
+TESTMODE=$1
+export VSOMEIP_CONFIGURATION=pending_subscription_test_master.json
+# start daemon
+../../examples/routingmanagerd/./routingmanagerd &
+PID_VSOMEIPD=$!
+# Start the services
+./pending_subscription_test_service $1 &
+PID_SERIVCE=$!
+
+sleep 1
+
+if [ ! -z "$USE_LXC_TEST" ]; then
+ echo "Waiting for 5s"
+ sleep 5
+ echo "starting offer test on slave LXC offer_test_external_slave_starter.sh"
+ ssh -tt -i $SANDBOX_ROOT_DIR/commonapi_main/lxc-config/.ssh/mgc_lxc/rsa_key_file.pub -o StrictHostKeyChecking=no root@$LXC_TEST_SLAVE_IP "bash -ci \"set -m; cd \\\$SANDBOX_TARGET_DIR/vsomeip_lib/test/network_tests; ./pending_subscription_test_sd_msg_sender @TEST_IP_MASTER@ @TEST_IP_SLAVE@ $TESTMODE\"" &
+ echo "remote ssh pid: $!"
+elif [ ! -z "$USE_DOCKER" ]; then
+ echo "Waiting for 5s"
+ sleep 5
+ docker exec $DOCKER_IMAGE sh -c "cd $DOCKER_TESTS && ./pending_subscription_test_sd_msg_sender @TEST_IP_MASTER@ @TEST_IP_SLAVE@ $TESTMODE" &
+else
+cat <<End-of-message
+*******************************************************************************
+*******************************************************************************
+** Please now run:
+** pending_subscription_test_sd_msg_sender @TEST_IP_MASTER@ @TEST_IP_SLAVE@ $TESTMODE
+** from an external host to successfully complete this test.
+**
+** You probably will need to adapt the 'unicast' settings in
+** pending_subscription_test_master.json to your personal setup.
+*******************************************************************************
+*******************************************************************************
+End-of-message
+fi
+
+# Wait until all clients and services are finished
+for job in $PID_SERIVCE
+do
+ # Fail gets incremented if a client exits with a non-zero exit code
+ echo "waiting for $job"
+ wait $job || FAIL=$(($FAIL+1))
+done
+
+# kill the services
+kill $PID_VSOMEIPD
+sleep 1
+
+# Check if everything went well
+exit $FAIL
diff --git a/test/network_tests/pending_subscription_tests/pending_subscription_test_globals.hpp b/test/network_tests/pending_subscription_tests/pending_subscription_test_globals.hpp
new file mode 100644
index 0000000..59ad88e
--- /dev/null
+++ b/test/network_tests/pending_subscription_tests/pending_subscription_test_globals.hpp
@@ -0,0 +1,36 @@
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#ifndef PENDING_SUBSCRIPTION_TEST_GLOBALS_HPP_
+#define PENDING_SUBSCRIPTION_TEST_GLOBALS_HPP_
+
+namespace pending_subscription_test {
+
+struct service_info {
+ vsomeip::service_t service_id;
+ vsomeip::instance_t instance_id;
+ vsomeip::method_t method_id;
+ vsomeip::event_t event_id;
+ vsomeip::eventgroup_t eventgroup_id;
+ vsomeip::method_t shutdown_method_id;
+ vsomeip::method_t notify_method_id;
+};
+
+struct service_info service = { 0x1122, 0x1, 0x1111, 0x1111, 0x1000, 0x1404, 0x4242 };
+
+enum test_mode_e {
+ SUBSCRIBE,
+ SUBSCRIBE_UNSUBSCRIBE,
+ UNSUBSCRIBE,
+ SUBSCRIBE_UNSUBSCRIBE_NACK,
+ SUBSCRIBE_UNSUBSCRIBE_SAME_PORT,
+ SUBSCRIBE_RESUBSCRIBE_MIXED,
+ SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE,
+ REQUEST_TO_SD
+};
+
+}
+
+#endif /* PENDING_SUBSCRIPTION_TEST_GLOBALS_HPP_ */
diff --git a/test/network_tests/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp b/test/network_tests/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
new file mode 100644
index 0000000..75fbd1f
--- /dev/null
+++ b/test/network_tests/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp
@@ -0,0 +1,1735 @@
+// Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <iostream>
+#include <memory>
+#include <thread>
+#include <chrono>
+#include <cstring>
+#include <future>
+
+#include <gtest/gtest.h>
+
+#include <boost/asio.hpp>
+
+#include <vsomeip/vsomeip.hpp>
+
+#include "../../implementation/utility/include/byteorder.hpp"
+#include "../../implementation/message/include/deserializer.hpp"
+#include "../../implementation/service_discovery/include/service_discovery.hpp"
+#include "../../implementation/service_discovery/include/message_impl.hpp"
+#include "../../implementation/service_discovery/include/constants.hpp"
+#include "../../implementation/service_discovery/include/enumeration_types.hpp"
+#include "../../implementation/service_discovery/include/eventgroupentry_impl.hpp"
+#include "../../implementation/message/include/message_impl.hpp"
+#include "pending_subscription_test_globals.hpp"
+
+static char* remote_address;
+static char* local_address;
+
+class pending_subscription : public ::testing::Test {
+public:
+ pending_subscription() :
+ work_(std::make_shared<boost::asio::io_context::work>(io_)),
+ io_thread_(std::bind(&pending_subscription::io_run, this)) {}
+protected:
+
+ void TearDown() {
+ work_.reset();
+ io_thread_.join();
+ io_.stop();
+ }
+
+ void io_run() {
+ io_.run();
+ }
+
+ boost::asio::io_context io_;
+ std::shared_ptr<boost::asio::io_context::work> work_;
+ std::thread io_thread_;
+};
+
+/*
+ * @test Send 16 subscriptions to the service and check that every
+ * subscription is answered with an SubscribeEventgroupAck entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
+TEST_F(pending_subscription, send_multiple_subscriptions)
+{
+ std::promise<bool> trigger_notifications;
+
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+
+ std::thread receive_thread([&](){
+ bool keep_receiving(true);
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+ std::uint32_t subscribe_acks_receiveid = 0;
+ std::uint32_t events_received = 0;
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
+ if (error) {
+ keep_receiving = false;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ } else {
+
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(3u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ subscribe_acks_receiveid++;
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ events_received = 2;
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ }
+ }
+ }
+ if (subscribe_acks_receiveid == 30) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ subscribe_acks_receiveid++; // don't set promise value again
+ }
+ if (its_received_events.size() == 2 && events_received == 2) {
+ // all events received as well
+ keep_receiving = false;
+ }
+ }
+ }
+ });
+
+ std::thread send_thread([&]() {
+ try {
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x40, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x20, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x01, // eventgroup 2
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+ for (int var = 0; var < 15; ++var) {
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ }
+
+
+ if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAcks within time";
+ } else {
+ // call notify method
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+ }
+
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
+}
+
+/*
+ * @test Send 16 subscriptions to the service while alternating between Subscribe
+ * and Unsubscribe and check that every SubscribeEventgroupEntry (ttl > 0)
+ * is answered with an SubscribeEventgroupAck entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
+TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe)
+{
+ std::promise<bool> trigger_notifications;
+
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+
+ std::thread receive_thread([&](){
+ const std::uint32_t expected_acks(8);
+ std::atomic<std::uint32_t> acks_received(0);
+
+ const std::uint32_t expected_responses(1);
+ std::atomic<std::uint32_t> responses_received(0);
+
+ const std::uint32_t expected_notifications(2);
+ std::atomic<std::uint32_t> notifications_received(0);
+
+ bool triggered_notifications(false);
+
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ bool keep_receiving(true);
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
+ if (error) {
+ keep_receiving = false;
+ acks_received = expected_acks;
+ responses_received = expected_responses;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ } else {
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
+ }
+
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
+ }
+ if (acks_received == expected_acks &&
+ responses_received == expected_responses &&
+ notifications_received == expected_notifications) {
+ keep_receiving = false;
+ }
+ }
+
+
+ EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_responses, responses_received);
+ EXPECT_EQ(expected_notifications, notifications_received);
+ });
+
+ std::thread send_thread([&]() {
+ try {
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x40, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x20, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x01, // eventgroup 2
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+ for (int var = 0; var < 15; ++var) {
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ if (its_subscribe_message[11] % 2) {
+ its_subscribe_message[35] = 16;
+ its_subscribe_message[51] = 16;
+ } else {
+ its_subscribe_message[35] = 0;
+ its_subscribe_message[51] = 0;
+ }
+ }
+
+ if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAcks within time";
+ } else {
+ // call notify method
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+ }
+
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
+}
+
+/*
+ * @test Send 16 subscriptions to the service while only two contain a
+ * SubscribeEventgroupEntry and the rest contain StopSubscribeEventgroupEntries
+ * and check that all subscriptions with SubscribeEventgroupEntries are
+ * answered with an SubscribeEventgroupAck entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
+TEST_F(pending_subscription, send_multiple_unsubscriptions)
+{
+ std::promise<bool> trigger_notifications;
+
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+
+ std::thread receive_thread([&](){
+ const std::uint32_t expected_acks(2);
+ std::atomic<std::uint32_t> acks_received(0);
+
+ const std::uint32_t expected_responses(1);
+ std::atomic<std::uint32_t> responses_received(0);
+
+ const std::uint32_t expected_notifications(2);
+ std::atomic<std::uint32_t> notifications_received(0);
+
+ bool triggered_notifications(false);
+
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ bool keep_receiving(true);
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
+ if (error) {
+ keep_receiving = false;
+ acks_received = expected_acks;
+ responses_received = expected_responses;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ } else {
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+ std::uint32_t its_pos = 0;
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
+ }
+ }
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
+ if (acks_received == expected_acks &&
+ responses_received == expected_responses &&
+ notifications_received == expected_notifications) {
+ std::cerr << "every thing received" << std::endl;
+ keep_receiving = false;
+ }
+ }
+
+ EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_responses, responses_received);
+ EXPECT_EQ(expected_notifications, notifications_received);
+ });
+
+ std::thread send_thread([&]() {
+ try {
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x40, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x20, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x01, // eventgroup 2
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+ for (int var = 0; var < 15; ++var) {
+ if (its_subscribe_message[11] == 15 || its_subscribe_message[11] == 0x1) {
+ its_subscribe_message[35] = 16;
+ its_subscribe_message[51] = 16;
+ } else {
+ its_subscribe_message[35] = 0;
+ its_subscribe_message[51] = 0;
+ }
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ }
+
+ if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAcks within time";
+ } else {
+ // call notify method
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+ }
+
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
+}
+
+/*
+ * @test Send 16 subscriptions to the service and check that every second
+ * subscription is answered with an SubscribeEventgroupNack entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
+TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe)
+{
+ std::promise<bool> trigger_notifications;
+
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+
+ std::thread receive_thread([&](){
+ const std::uint32_t expected_acks(8);
+ std::atomic<std::uint32_t> acks_received(0);
+
+ const std::uint32_t expected_nacks(8);
+ std::atomic<std::uint32_t> nacks_received(0);
+
+ const std::uint32_t expected_responses(1);
+ std::atomic<std::uint32_t> responses_received(0);
+
+ const std::uint32_t expected_notifications(2);
+ std::atomic<std::uint32_t> notifications_received(0);
+
+ bool triggered_notifications(false);
+ bool keep_receiving(true);
+
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
+ if (error) {
+ keep_receiving = false;
+ acks_received = expected_acks;
+ responses_received = expected_responses;
+ nacks_received = expected_nacks;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ } else {
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+ std::uint32_t its_pos = 0;
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ if (e->get_ttl()) {
+ EXPECT_EQ(16u, e->get_ttl());
+ acks_received++;
+ } else {
+ EXPECT_EQ(0u, e->get_ttl());
+ nacks_received++;
+ }
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
+ }
+
+
+ if (!triggered_notifications && acks_received == expected_acks &&
+ nacks_received == expected_nacks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
+ }
+ if (nacks_received == expected_nacks &&
+ acks_received == expected_acks &&
+ notifications_received == expected_notifications &&
+ responses_received == expected_responses) {
+ keep_receiving = false;
+ }
+ }
+
+ EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_nacks, nacks_received);
+ EXPECT_EQ(expected_responses, responses_received);
+ EXPECT_EQ(expected_notifications, notifications_received);
+ });
+
+ std::thread send_thread([&]() {
+ try {
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x40, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x20, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x01, // eventgroup 2
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+ for (int var = 0; var < 15; ++var) {
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ if (its_subscribe_message[11] % 2) {
+ its_subscribe_message[35] = 16;
+ its_subscribe_message[51] = 16;
+ } else {
+ its_subscribe_message[35] = 0;
+ its_subscribe_message[51] = 0;
+ }
+ }
+
+ if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAcks within time";
+ } else {
+ // call notify method
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+ }
+
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
+}
+
+/*
+ * @test Send 16 subscriptions containing an UDP and TCP endpoint option
+ * to the service while alternating between Subscribe
+ * and Unsubscribe and check that every SubscribeEventgroupEntry (ttl > 0)
+ * is answered with an SubscribeEventgroupAck entry by the service.
+ * Check that the subscription is active at the end of the test and check that
+ * the notifications send by the service receive the client
+ */
+TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port)
+{
+ std::promise<bool> trigger_notifications;
+ std::promise<void> tcp_connected;
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+ boost::asio::ip::tcp::socket tcp_socket(io_,
+ boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 30490));
+ tcp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ tcp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+
+ std::thread receive_thread([&](){
+ const std::uint32_t expected_acks(8);
+ std::atomic<std::uint32_t> acks_received(0);
+
+ const std::uint32_t expected_responses(1);
+ std::atomic<std::uint32_t> responses_received(0);
+
+ const std::uint32_t expected_notifications(2);
+ std::atomic<std::uint32_t> notifications_received(0);
+
+ bool triggered_notifications(false);
+
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ boost::system::error_code ec;
+ tcp_socket.connect(boost::asio::ip::tcp::endpoint(
+ boost::asio::ip::address::from_string(remote_address), 40001), ec);
+ ASSERT_EQ(0, ec.value());
+ tcp_connected.set_value();
+
+ bool keep_receiving(true);
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
+ if (error) {
+ keep_receiving = false;
+ acks_received = expected_acks;
+ responses_received = expected_responses;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ } else {
+
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[0]);
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
+ }
+
+
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
+ }
+ if (acks_received == expected_acks &&
+ responses_received == expected_responses &&
+ notifications_received == expected_notifications) {
+ keep_receiving = false;
+ }
+ }
+
+
+ EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_responses, responses_received);
+ EXPECT_EQ(expected_notifications, notifications_received);
+ });
+
+ std::thread send_thread([&]() {
+ if (std::future_status::timeout == tcp_connected.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't establish tcp connection within time";
+ }
+
+ try {
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x4C, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x20, // length entries array
+ 0x06, 0x00, 0x00, 0x20,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x20,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x01, // eventgroup 2
+ 0x00, 0x00, 0x00, 0x18, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a,
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x06, 0x77, 0x1a
+ };
+
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_subscribe_message[76], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+ for (int var = 0; var < 15; ++var) {
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ if (its_subscribe_message[11] % 2) {
+ its_subscribe_message[35] = 16;
+ its_subscribe_message[51] = 16;
+ } else {
+ its_subscribe_message[35] = 0;
+ its_subscribe_message[51] = 0;
+ }
+ }
+
+ if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAcks within time";
+ } else {
+ // call notify method
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+ }
+
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ tcp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ tcp_socket.close(ec);
+ udp_socket.close(ec);
+}
+
+/*
+ * @test Send a subscription as single message and afterwards send a
+ * resubscription containing a new subscription in the same message and check
+ * to receive initial event
+ */
+TEST_F(pending_subscription, subscribe_resubscribe_mixed)
+{
+ std::promise<void> first_initial_event_received;
+ std::promise<void> second_initial_event_received;
+
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+
+ std::thread receive_thread([&](){
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ const std::uint32_t expected_acks(3);
+ std::atomic<std::uint32_t> acks_received(0);
+
+ const std::uint32_t expected_responses(1);
+ std::atomic<std::uint32_t> responses_received(0);
+
+ const std::uint32_t expected_notifications(2);
+ std::atomic<std::uint32_t> notifications_received(0);
+
+ bool keep_receiving(true);
+ bool first_initial_event_checked(false);
+ bool second_initial_event_checked(false);
+
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transfered = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
+ if (error) {
+ keep_receiving = false;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ } else {
+
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transfered > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transfered -= its_message_size;
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_GE(2u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(3u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ acks_received++;
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id ||
+ its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id+1);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ if (its_received_events.size() == 2) {
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]);
+ }
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
+ }
+
+ if (!first_initial_event_checked && notifications_received == 1) {
+ EXPECT_EQ(1u, its_received_events.size());
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
+ // all subscribeAcks and one initial event of first event received
+ first_initial_event_received.set_value();
+ first_initial_event_checked = true;
+ }
+
+ if (!second_initial_event_checked && notifications_received == 2) { // events were received as well
+ // all subscribeAcks and one initial event of second event received
+ EXPECT_EQ(2u, its_received_events.size());
+ EXPECT_EQ(static_cast<vsomeip::event_t>(pending_subscription_test::service.event_id + 1u), its_received_events[0]);
+ EXPECT_EQ(pending_subscription_test::service.event_id, its_received_events[1]);
+ second_initial_event_received.set_value();
+ second_initial_event_checked = true;
+ }
+ if (notifications_received == 2 && responses_received == 1) {
+ keep_receiving = false;
+ }
+ }
+ }
+ }
+ EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_notifications, notifications_received);
+ EXPECT_EQ(expected_responses, responses_received);
+ });
+
+ std::thread send_thread([&]() {
+ try {
+ // call notify method to ensure to receive initial events
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x30, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x10, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x01, // eventgroup
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[48], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+
+
+
+ if (std::future_status::timeout == first_initial_event_received.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAck of first subscription within time";
+ }
+
+ // send second subscription with resubscription and new subscription
+ std::uint8_t its_subscribe_resubscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x40, // length
+ 0x00, 0x00, 0x00, 0x02,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x20, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x01, // eventgroup 2
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+ std::memcpy(&its_subscribe_resubscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_resubscribe_message), target_sd);
+
+ if (std::future_status::timeout == second_initial_event_received.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAck of second subscription within time";
+ }
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
+}
+
+/*
+ * @test Send a SD message containing a Subscription followed by a StopSubscribe
+ * Subscribe entry to the same service. Check to receive an initial event
+ */
+TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe)
+{
+ std::promise<bool> trigger_notifications;
+ std::promise<void> tcp_connected;
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+ boost::asio::ip::tcp::socket tcp_socket(io_,
+ boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 30490));
+ tcp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ tcp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+
+ std::thread receive_thread([&](){
+ const std::uint32_t expected_acks(2);
+ std::atomic<std::uint32_t> acks_received(0);
+
+ const std::uint32_t expected_responses(1);
+ std::atomic<std::uint32_t> responses_received(0);
+
+ const std::uint32_t expected_notifications(1);
+ std::atomic<std::uint32_t> notifications_received(0);
+
+ bool triggered_notifications(false);
+
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ boost::system::error_code ec;
+ tcp_socket.connect(boost::asio::ip::tcp::endpoint(
+ boost::asio::ip::address::from_string(remote_address), 40001), ec);
+ ASSERT_EQ(0, ec.value());
+ tcp_connected.set_value();
+
+ bool keep_receiving(true);
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
+ if (error) {
+ keep_receiving = false;
+ acks_received = expected_acks;
+ responses_received = expected_responses;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ } else {
+
+ std::uint32_t its_pos = 0;
+
+ while (bytes_transferred > 0) {
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(1u, sd_msg.get_entries().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_eventgroup_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type());
+ EXPECT_EQ(16u, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) {
+ std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry =
+ std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e);
+ EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id);
+ }
+ }
+ EXPECT_EQ(0u, sd_msg.get_options().size());
+ acks_received++;
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
+ its_received_events.push_back(msg.get_method());
+ EXPECT_EQ(1u, its_received_events.size());
+ EXPECT_EQ(1u, msg.get_payload()->get_length());
+ EXPECT_EQ(0xDD, *msg.get_payload()->get_data());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(0x0, msg.get_client());
+ notifications_received++;
+ }
+ }
+
+ if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received
+ trigger_notifications.set_value(true);
+ triggered_notifications = true;
+ }
+ }
+ if (acks_received == expected_acks &&
+ responses_received == expected_responses &&
+ notifications_received == expected_notifications) {
+ keep_receiving = false;
+ }
+ }
+ }
+ EXPECT_EQ(expected_acks, acks_received);
+ EXPECT_EQ(expected_responses, responses_received);
+ EXPECT_EQ(expected_notifications, notifications_received);
+ });
+
+ std::thread send_thread([&]() {
+ if (std::future_status::timeout == tcp_connected.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't establish tcp connection within time";
+ }
+
+ try {
+ std::uint8_t its_normal_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x30, // length
+ 0x00, 0x00, 0x00, 0x01,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x10, // length entries array
+ 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x50, // length
+ 0x00, 0x00, 0x00, 0x02,
+ 0x01, 0x01, 0x02, 0x00,
+ 0xc0, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x30, // length entries array
+ 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10, // Stop subscribe Eventgroup entry
+ 0x11, 0x22, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x10, 0x00,
+ 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry
+ 0x11, 0x22, 0x00, 0x01,
+ 0x00, 0x00, 0x00, 0x10,
+ 0x00, 0x00, 0x10, 0x00,
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[80], &its_local_address.to_v4().to_bytes()[0], 4);
+ std::memcpy(&its_normal_subscribe_message[48], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+
+ udp_socket.send_to(boost::asio::buffer(its_normal_subscribe_message), target_sd);
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+
+ if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all SubscribeAcks within time";
+ } else {
+ // call notify method
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+ }
+
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ tcp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ tcp_socket.close(ec);
+ udp_socket.close(ec);
+}
+
+/*
+ * @test Send a message with message type 0x0 (REQUEST) to the remote SD port
+ * and check if the remote SD continues to send offers
+ */
+TEST_F(pending_subscription, send_request_to_sd_port)
+{
+ std::promise<bool> all_offers_received;
+
+ boost::asio::ip::udp::socket udp_socket(io_,
+ boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490));
+ udp_socket.set_option(boost::asio::ip::multicast::enable_loopback(false));
+ udp_socket.set_option(boost::asio::ip::multicast::join_group(
+ boost::asio::ip::address::from_string("224.0.23.1").to_v4()));
+ udp_socket.set_option(boost::asio::socket_base::reuse_address(true));
+ udp_socket.set_option(boost::asio::socket_base::linger(true, 0));
+
+ std::thread receive_thread([&](){
+ bool keep_receiving(true);
+ std::vector<std::uint8_t> receive_buffer(4096);
+ std::vector<vsomeip::event_t> its_received_events;
+
+ while (keep_receiving) {
+ boost::system::error_code error;
+ std::size_t bytes_transferred = udp_socket.receive(
+ boost::asio::buffer(receive_buffer, receive_buffer.capacity()), 0, error);
+ if (error) {
+ keep_receiving = false;
+ ADD_FAILURE() << __func__ << " error: " << error.message();
+ } else {
+ std::uint32_t its_pos = 0;
+ while (bytes_transferred > 0) {
+ const std::uint32_t its_message_size = VSOMEIP_BYTES_TO_LONG(
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 1],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 2],
+ receive_buffer[its_pos + VSOMEIP_LENGTH_POS_MIN + 3]) + VSOMEIP_SOMEIP_HEADER_SIZE;
+ vsomeip::deserializer its_deserializer(&receive_buffer[its_pos], its_message_size, 0);
+
+ vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_SERVICE_POS_MAX]);
+ vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[its_pos + VSOMEIP_METHOD_POS_MIN],
+ receive_buffer[its_pos + VSOMEIP_METHOD_POS_MAX]);
+ its_pos += its_message_size;
+ bytes_transferred -= its_message_size;
+
+ #if 0
+ std::stringstream str;
+ for (size_t i = 0; i < bytes_transferred; i++) {
+ str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " ";
+ }
+ std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl;
+ #endif
+ static int offers_received = 0;
+ static int responses_received = 0;
+
+ if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) {
+ vsomeip::sd::message_impl sd_msg;
+ EXPECT_TRUE(sd_msg.deserialize(&its_deserializer));
+ EXPECT_EQ(1u, sd_msg.get_entries().size());
+ EXPECT_EQ(2u, sd_msg.get_options().size());
+ for (const auto& e : sd_msg.get_entries()) {
+ EXPECT_TRUE(e->is_service_entry());
+ EXPECT_EQ(vsomeip::sd::entry_type_e::OFFER_SERVICE, e->get_type());
+ EXPECT_EQ(0xffffffu, e->get_ttl());
+ EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service());
+ EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance());
+ offers_received++;
+ }
+ } else { // non-sd-message
+ vsomeip::message_impl msg;
+ EXPECT_TRUE(msg.deserialize(&its_deserializer));
+ if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) {
+ EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type());
+ EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service());
+ EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method());
+ EXPECT_EQ(0x2222, msg.get_client());
+ responses_received++;
+ }
+ }
+
+ if (responses_received == 1) { // response to shutdown method was received as well
+ keep_receiving = false;
+ } else if (offers_received == 3 ) { // all multiple offers received
+ try {
+ all_offers_received.set_value(true);
+ } catch (const std::future_error& e) {
+
+ }
+ }
+ }
+ }
+ }
+ });
+
+ std::thread send_thread([&]() {
+ try {
+ std::uint8_t its_subscribe_message[] = {
+ 0xff, 0xff, 0x81, 0x00,
+ 0x00, 0x00, 0x00, 0x40, // length
+ 0x00, 0x00, 0x10, 0x01,
+ 0x01, 0x01, 0x00, 0x00,
+ 0xc0, 0x00, 0x00, 0x00, // message type is set to 0x0 (REQUEST)
+ 0x00, 0x00, 0x00, 0x20, // length entries array
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x00, // eventgroup
+ 0x06, 0x00, 0x00, 0x10,
+ 0x11, 0x22, 0x00, 0x01, // service / instance
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x10, 0x01, // eventgroup 2
+ 0x00, 0x00, 0x00, 0x0c, // length options array
+ 0x00, 0x09, 0x04, 0x00,
+ 0xff, 0xff, 0xff, 0xff, // ip address
+ 0x00, 0x11, 0x77, 0x1a
+ };
+ boost::asio::ip::address its_local_address =
+ boost::asio::ip::address::from_string(std::string(local_address));
+ std::memcpy(&its_subscribe_message[64], &its_local_address.to_v4().to_bytes()[0], 4);
+
+ boost::asio::ip::udp::socket::endpoint_type target_sd(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30490);
+ for (int var = 0; var < 15; ++var) {
+ udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd);
+ ++its_subscribe_message[11];
+ }
+
+
+ if (std::future_status::timeout == all_offers_received.get_future().wait_for(std::chrono::seconds(10))) {
+ ADD_FAILURE() << "Didn't receive all Offers within time";
+ }
+
+ {
+ // call notify method (but don't expect notifications) to allow
+ // service to exit
+ std::uint8_t trigger_notifications_call[] = {
+ 0x11, 0x22, 0x42, 0x42,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x01, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service);
+ }
+
+ {
+ // call shutdown method
+ std::uint8_t shutdown_call[] = {
+ 0x11, 0x22, 0x14, 0x04,
+ 0x00, 0x00, 0x00, 0x08,
+ 0x22, 0x22, 0x00, 0x01,
+ 0x01, 0x00, 0x00, 0x00 };
+ boost::asio::ip::udp::socket::endpoint_type target_service(
+ boost::asio::ip::address::from_string(std::string(remote_address)),
+ 30001);
+ udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service);
+ }
+
+ } catch (...) {
+ ASSERT_FALSE(true);
+ }
+
+ });
+
+ send_thread.join();
+ receive_thread.join();
+ boost::system::error_code ec;
+ udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
+ udp_socket.close(ec);
+}
+
+#if defined(__linux__) || defined(ANDROID)
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ if(argc < 4) {
+ std::cerr << "Please pass an target and local IP address and test mode to this binary like: "
+ << argv[0] << " 10.0.3.1 10.0.3.202 SUBSCRIBE" << std::endl;
+ std::cerr << "Testmodes are [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE]" << std::endl;
+ exit(1);
+ }
+ remote_address = argv[1];
+ local_address = argv[2];
+ std::string its_testmode = argv[3];
+ if (its_testmode == std::string("SUBSCRIBE")) {
+ ::testing::GTEST_FLAG(filter) = "*send_multiple_subscriptions";
+ } else if (its_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE")) {
+ ::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_unsubscribe";
+ } else if (its_testmode == std::string("UNSUBSCRIBE")) {
+ ::testing::GTEST_FLAG(filter) = "*send_multiple_unsubscriptions";
+ } else if (its_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE_NACK")) {
+ ::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_nack_unsubscribe";
+ } else if (its_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE_SAME_PORT")) {
+ ::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_unsubscribe_same_port";
+ } else if (its_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) {
+ ::testing::GTEST_FLAG(filter) = "*subscribe_resubscribe_mixed";
+ } else if (its_testmode == std::string("SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE")) {
+ ::testing::GTEST_FLAG(filter) = "*send_subscribe_stop_subscribe_subscribe";
+ } else if (its_testmode == std::string("REQUEST_TO_SD")) {
+ ::testing::GTEST_FLAG(filter) = "*send_request_to_sd_port";
+ }
+ return RUN_ALL_TESTS();
+}
+#endif
diff --git a/test/network_tests/pending_subscription_tests/pending_subscription_test_service.cpp b/test/network_tests/pending_subscription_tests/pending_subscription_test_service.cpp
new file mode 100644
index 0000000..53b2cde
--- /dev/null
+++ b/test/network_tests/pending_subscription_tests/pending_subscription_test_service.cpp
@@ -0,0 +1,389 @@
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <chrono>
+#include <condition_variable>
+#include <iomanip>
+#include <iostream>
+#include <sstream>
+#include <thread>
+#include <map>
+#include <algorithm>
+#include <atomic>
+#include <future>
+
+#include <gtest/gtest.h>
+
+#include <vsomeip/vsomeip.hpp>
+#include <vsomeip/internal/logger.hpp>
+
+#include "pending_subscription_test_globals.hpp"
+
+class pending_subscription_test_service {
+public:
+ pending_subscription_test_service(struct pending_subscription_test::service_info _service_info, pending_subscription_test::test_mode_e _testmode) :
+ service_info_(_service_info),
+ testmode_(_testmode),
+ app_(vsomeip::runtime::get()->create_application("pending_subscription_test_service")),
+ wait_until_registered_(true),
+ wait_until_shutdown_method_called_(true),
+ subscription_accepted_asynchronous_(false),
+ subscription_accepted_synchronous_(false),
+ offer_thread_(std::bind(&pending_subscription_test_service::run, this)) {
+ if (!app_->init()) {
+ ADD_FAILURE() << "Couldn't initialize application";
+ return;
+ }
+ app_->register_state_handler(
+ std::bind(&pending_subscription_test_service::on_state, this,
+ std::placeholders::_1));
+
+ // offer field
+ std::set<vsomeip::eventgroup_t> its_eventgroups;
+ its_eventgroups.insert(_service_info.eventgroup_id);
+ app_->offer_event(service_info_.service_id, 0x1,
+ service_info_.event_id,
+ its_eventgroups, vsomeip::event_type_e::ET_FIELD,
+ std::chrono::milliseconds::zero(),
+ false, true, nullptr, vsomeip::reliability_type_e::RT_UNRELIABLE);
+
+ its_eventgroups.clear();
+ its_eventgroups.insert(static_cast<vsomeip::eventgroup_t>(_service_info.eventgroup_id+1u));
+
+ app_->offer_event(service_info_.service_id, 0x1,
+ static_cast<vsomeip::event_t>(service_info_.event_id+1u),
+ its_eventgroups, vsomeip::event_type_e::ET_FIELD,
+ std::chrono::milliseconds::zero(),
+ false, true, nullptr, vsomeip::reliability_type_e::RT_UNRELIABLE);
+
+ app_->register_message_handler(vsomeip::ANY_SERVICE,
+ vsomeip::ANY_INSTANCE, service_info_.shutdown_method_id,
+ std::bind(&pending_subscription_test_service::on_shutdown_method_called, this,
+ std::placeholders::_1));
+
+ app_->register_message_handler(vsomeip::ANY_SERVICE,
+ vsomeip::ANY_INSTANCE, service_info_.notify_method_id,
+ std::bind(&pending_subscription_test_service::on_notify_method_called, this,
+ std::placeholders::_1));
+
+ app_->register_async_subscription_handler(service_info_.service_id,
+ 0x1, service_info_.eventgroup_id,
+ std::bind(&pending_subscription_test_service::subscription_handler_async,
+ this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
+ std::placeholders::_4, std::placeholders::_5));
+ app_->register_subscription_handler(service_info_.service_id,
+ 0x1, static_cast<vsomeip::eventgroup_t>(service_info_.eventgroup_id+1u),
+ std::bind(&pending_subscription_test_service::subscription_handler,
+ this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
+ std::placeholders::_4));
+ app_->start();
+ }
+
+ ~pending_subscription_test_service() {
+ offer_thread_.join();
+ }
+
+ void offer() {
+ app_->offer_service(service_info_.service_id, 0x1);
+ }
+
+ void stop() {
+ app_->stop_offer_service(service_info_.service_id, 0x1);
+ app_->clear_all_handler();
+ app_->stop();
+ }
+
+ void on_state(vsomeip::state_type_e _state) {
+ VSOMEIP_INFO << "Application " << app_->get_name() << " is "
+ << (_state == vsomeip::state_type_e::ST_REGISTERED ?
+ "registered." : "deregistered.");
+
+ if (_state == vsomeip::state_type_e::ST_REGISTERED) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ wait_until_registered_ = false;
+ condition_.notify_one();
+ }
+ }
+
+ void on_shutdown_method_called(const std::shared_ptr<vsomeip::message> &_message) {
+ app_->send(vsomeip::runtime::get()->create_response(_message));
+ VSOMEIP_WARNING << "************************************************************";
+ VSOMEIP_WARNING << "Shutdown method called -> going down!";
+ VSOMEIP_WARNING << "************************************************************";
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ wait_until_shutdown_method_called_ = false;
+ condition_.notify_one();
+ }
+
+ void on_notify_method_called(const std::shared_ptr<vsomeip::message> &_message) {
+ (void)_message;
+ std::shared_ptr<vsomeip::payload> its_payload = vsomeip::runtime::get()->create_payload();
+ its_payload->set_data( {0xDD});
+ app_->notify(service_info_.service_id, service_info_.instance_id,
+ service_info_.event_id, its_payload);
+ app_->notify(service_info_.service_id, service_info_.instance_id,
+ static_cast<vsomeip::event_t>(service_info_.event_id + 1u) , its_payload);
+ notify_method_called_.set_value(true);
+ }
+
+ void run() {
+ VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
+ << service_info_.service_id << "] Running";
+ std::unique_lock<std::mutex> its_lock(mutex_);
+ while (wait_until_registered_) {
+ condition_.wait(its_lock);
+ }
+
+ VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
+ << service_info_.service_id << "] Offering";
+ offer();
+
+ if (testmode_ == pending_subscription_test::test_mode_e::REQUEST_TO_SD) {
+ // this testcase won't send valid subscriptions -> ensure to exit
+ subscription_accepted_asynchronous_ = true;
+ subscription_accepted_synchronous_ = true;
+ }
+
+ while (!subscription_accepted_asynchronous_ || !subscription_accepted_synchronous_) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ switch (testmode_) {
+ case pending_subscription_test::test_mode_e::SUBSCRIBE:
+ async_subscription_handler_(true);
+ break;
+ case pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE:
+ case pending_subscription_test::test_mode_e::UNSUBSCRIBE:
+ case pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_NACK:
+ case pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT:
+ case pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED:
+ case pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE:
+ case pending_subscription_test::test_mode_e::REQUEST_TO_SD:
+ default:
+ break;
+ }
+
+ std::future<bool> itsFuture = notify_method_called_.get_future();
+ if (std::future_status::timeout == itsFuture.wait_for(std::chrono::seconds(30))) {
+ ADD_FAILURE() << "notify method wasn't called within time!";
+ } else {
+ EXPECT_TRUE(itsFuture.get());
+ }
+ while (wait_until_shutdown_method_called_) {
+ condition_.wait(its_lock);
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+ stop();
+ }
+
+ void subscription_handler_async(vsomeip::client_t _client, std::uint32_t _uid, std::uint32_t _gid,
+ bool _subscribed, const std::function<void(const bool)>& _cbk) {
+ (void)_uid;
+ (void)_gid;
+ VSOMEIP_WARNING << __func__ << " " << std::hex << _client << " subscribed." << _subscribed;
+ if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE) {
+ async_subscription_handler_ = _cbk;
+ static int was_called = 0;
+ was_called++;
+ EXPECT_EQ(1, was_called);
+ EXPECT_TRUE(_subscribed);
+ subscription_accepted_asynchronous_ = true;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE) {
+ static int count_subscribe = 0;
+ static int count_unsubscribe = 0;
+ _subscribed ? count_subscribe++ : count_unsubscribe++;
+ if (count_subscribe == 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ _cbk(true);
+ if (count_subscribe == 8 || count_unsubscribe == 7) {
+ subscription_accepted_asynchronous_ = true;
+ }
+ } else if (testmode_ == pending_subscription_test::test_mode_e::UNSUBSCRIBE) {
+ static int count_subscribe = 0;
+ static int count_unsubscribe = 0;
+ _subscribed ? count_subscribe++ : count_unsubscribe++;
+ if (count_subscribe == 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ _cbk(true);
+ if (count_subscribe == 2 || count_unsubscribe == 1) {
+ subscription_accepted_asynchronous_ = true;
+ }
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_NACK) {
+ static int count_subscribe = 0;
+ static int count_unsubscribe = 0;
+ _subscribed ? count_subscribe++ : count_unsubscribe++;
+ if (count_subscribe == 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ if (_subscribed) {
+ _cbk(((count_subscribe + 1) % 2)); // nack every second subscription
+ } else {
+ _cbk(true);
+ }
+ if (count_subscribe == 8 || count_unsubscribe == 7) {
+ subscription_accepted_asynchronous_ = true;
+ }
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT) {
+ static int count_subscribe = 0;
+ static int count_unsubscribe = 0;
+ _subscribed ? count_subscribe++ : count_unsubscribe++;
+ if (count_subscribe == 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ _cbk(true);
+ if (count_subscribe == 8 || count_unsubscribe == 7) {
+ subscription_accepted_asynchronous_ = true;
+ }
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) {
+ static int was_called = 0;
+ was_called++;
+ EXPECT_EQ(1, was_called);
+ EXPECT_TRUE(_subscribed);
+ _cbk(true);
+ subscription_accepted_asynchronous_ = true;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE) {
+ static int was_called = 0;
+ was_called++;
+ EXPECT_EQ(1, was_called);
+ EXPECT_TRUE(_subscribed);
+ subscription_accepted_asynchronous_ = true;
+ // this test doesn't subscribe to the second eventgroup which is handled by the asynchronous
+ // subscription handler, set it to true here:
+ subscription_accepted_synchronous_ = true;
+ _cbk(true);
+ }
+ }
+
+ bool subscription_handler(vsomeip::client_t _client, std::uint32_t _uid, std::uint32_t _gid, bool _subscribed) {
+ (void)_subscribed;
+ (void)_uid;
+ (void)_gid;
+ bool ret(false);
+ VSOMEIP_WARNING << __func__ << " " << std::hex << _client << " subscribed. " << _subscribed;
+ if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE) {
+ static int was_called = 0;
+ was_called++;
+ EXPECT_EQ(1, was_called);
+ EXPECT_TRUE(_subscribed);
+ subscription_accepted_synchronous_ = true;
+ ret = true;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE) {
+ static int count_subscribed = 0;
+ static int count_unsubscribe = 0;
+ _subscribed ? count_subscribed++ : count_unsubscribe++;
+ if (count_subscribed == 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ if (count_subscribed == 8 && count_unsubscribe == 7) {
+ subscription_accepted_synchronous_ = true;
+ }
+ ret = true;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::UNSUBSCRIBE) {
+ static int count_subscribed = 0;
+ static int count_unsubscribe = 0;
+ _subscribed ? count_subscribed++ : count_unsubscribe++;
+ if (count_subscribed == 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ if (count_subscribed == 2 && count_unsubscribe == 1) {
+ subscription_accepted_synchronous_ = true;
+ }
+ ret = true;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_NACK) {
+ static int count_subscribed = 0;
+ static int count_unsubscribe = 0;
+ _subscribed ? count_subscribed++ : count_unsubscribe++;
+ if (count_subscribed == 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ if (count_subscribed == 8 && count_unsubscribe == 7) {
+ subscription_accepted_synchronous_ = true;
+ }
+ if (_subscribed) {
+ ret = ((count_subscribed + 1) % 2); // nack every second subscription
+ } else {
+ ret = true;
+ }
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT) {
+ static int count_subscribed = 0;
+ static int count_unsubscribe = 0;
+ _subscribed ? count_subscribed++ : count_unsubscribe++;
+
+ if (count_subscribed == 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ if (count_subscribed == 8 && count_unsubscribe == 7) {
+ subscription_accepted_synchronous_ = true;
+ }
+ ret = true;
+ } else if (testmode_ == pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED) {
+ static int was_called = 0;
+ was_called++;
+ EXPECT_EQ(1, was_called);
+ EXPECT_TRUE(_subscribed);
+ subscription_accepted_synchronous_ = true;
+ ret = true;
+ }
+ return ret;
+ }
+
+private:
+ struct pending_subscription_test::service_info service_info_;
+ pending_subscription_test::test_mode_e testmode_;
+ std::shared_ptr<vsomeip::application> app_;
+
+ bool wait_until_registered_;
+ bool wait_until_shutdown_method_called_;
+ std::mutex mutex_;
+ std::condition_variable condition_;
+ std::atomic<bool> subscription_accepted_asynchronous_;
+ std::atomic<bool> subscription_accepted_synchronous_;
+ std::thread offer_thread_;
+ std::function<void(const bool)> async_subscription_handler_;
+ std::promise<bool> notify_method_called_;
+};
+
+pending_subscription_test::test_mode_e its_testmode(pending_subscription_test::test_mode_e::SUBSCRIBE);
+
+TEST(someip_pending_subscription_test, block_subscription_handler)
+{
+ pending_subscription_test_service its_sample(pending_subscription_test::service, its_testmode);
+}
+
+
+#if defined(__linux__) || defined(ANDROID)
+int main(int argc, char** argv)
+{
+ ::testing::InitGoogleTest(&argc, argv);
+ if (argc < 2) {
+ std::cerr << "Please pass a test mode to this binary like: "
+ << argv[0] << " SUBSCRIBE" << std::endl;
+ std::cerr << "Testmodes are [SUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE, UNSUBSCRIBE, SUBSCRIBE_UNSUBSCRIBE_NACK, SUBSCRIBE_UNSUBSCRIBE_SAME_PORT]" << std::endl;
+ exit(1);
+ }
+
+ std::string its_pased_testmode = argv[1];
+ if (its_pased_testmode == std::string("SUBSCRIBE")) {
+ its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE;
+ } else if (its_pased_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE")) {
+ its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE;
+ } else if (its_pased_testmode == std::string("UNSUBSCRIBE")) {
+ its_testmode = pending_subscription_test::test_mode_e::UNSUBSCRIBE;
+ } else if (its_pased_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE_NACK")) {
+ its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_NACK;
+ } else if (its_pased_testmode == std::string("SUBSCRIBE_UNSUBSCRIBE_SAME_PORT")) {
+ its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_UNSUBSCRIBE_SAME_PORT;
+ } else if (its_pased_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) {
+ its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_RESUBSCRIBE_MIXED;
+ } else if (its_pased_testmode == std::string("SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE")) {
+ its_testmode = pending_subscription_test::test_mode_e::SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE;
+ } else if (its_pased_testmode == std::string("REQUEST_TO_SD")) {
+ its_testmode = pending_subscription_test::test_mode_e::REQUEST_TO_SD;
+ }
+
+ return RUN_ALL_TESTS();
+}
+#endif