diff options
Diffstat (limited to 'test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp')
-rw-r--r-- | test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp | 239 |
1 files changed, 239 insertions, 0 deletions
diff --git a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp index dd7ed5a..d8108a5 100644 --- a/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp +++ b/test/pending_subscription_tests/pending_subscription_test_sd_msg_sender.cpp @@ -57,6 +57,8 @@ TEST_F(pending_subscription, send_multiple_subscriptions) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ std::atomic<bool> keep_receiving(true); std::function<void()> receive; @@ -213,6 +215,9 @@ TEST_F(pending_subscription, send_multiple_subscriptions) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); } TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe) @@ -221,6 +226,8 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ const std::uint32_t expected_acks(8); std::atomic<std::uint32_t> acks_received(0); @@ -404,6 +411,9 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); } TEST_F(pending_subscription, send_multiple_unsubscriptions) @@ -412,6 +422,8 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ const std::uint32_t expected_acks(2); std::atomic<std::uint32_t> acks_received(0); @@ -595,6 +607,9 @@ TEST_F(pending_subscription, send_multiple_unsubscriptions) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); } TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe) @@ -603,6 +618,8 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ const std::uint32_t expected_acks(8); std::atomic<std::uint32_t> acks_received(0); @@ -797,6 +814,9 @@ TEST_F(pending_subscription, send_alternating_subscribe_nack_unsubscribe) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); } TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) @@ -805,8 +825,11 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) std::promise<void> tcp_connected; boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); boost::asio::ip::tcp::socket tcp_socket(io_, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 30490)); + tcp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ const std::uint32_t expected_acks(8); std::atomic<std::uint32_t> acks_received(0); @@ -1004,6 +1027,11 @@ TEST_F(pending_subscription, send_alternating_subscribe_unsubscribe_same_port) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + tcp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + tcp_socket.close(ec); + udp_socket.close(ec); } /* @@ -1018,6 +1046,8 @@ TEST_F(pending_subscription, subscribe_resubscribe_mixed) boost::asio::ip::udp::socket udp_socket(io_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + std::thread receive_thread([&](){ std::atomic<bool> keep_receiving(true); std::function<void()> receive; @@ -1201,6 +1231,213 @@ TEST_F(pending_subscription, subscribe_resubscribe_mixed) send_thread.join(); receive_thread.join(); + boost::system::error_code ec; + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.close(ec); +} + +/* + * @test Send a SD message containing a Subscription followed by a StopSubscribe + * Subscribe entry to the same service. Check to receive an initial event + */ +TEST_F(pending_subscription, send_subscribe_stop_subscribe_subscribe) +{ + std::promise<bool> trigger_notifications; + std::promise<void> tcp_connected; + boost::asio::ip::udp::socket udp_socket(io_, + boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 30490)); + udp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + boost::asio::ip::tcp::socket tcp_socket(io_, + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 30490)); + tcp_socket.set_option(boost::asio::socket_base::reuse_address(true)); + + std::thread receive_thread([&](){ + const std::uint32_t expected_acks(1); + std::atomic<std::uint32_t> acks_received(0); + + const std::uint32_t expected_responses(1); + std::atomic<std::uint32_t> responses_received(0); + + const std::uint32_t expected_notifications(1); + std::atomic<std::uint32_t> notifications_received(0); + + bool triggered_notifications(false); + + std::function<void()> receive; + std::vector<std::uint8_t> receive_buffer(4096); + std::vector<vsomeip::event_t> its_received_events; + + boost::system::error_code ec; + tcp_socket.connect(boost::asio::ip::tcp::endpoint( + boost::asio::ip::address::from_string(remote_address), 40001), ec); + ASSERT_EQ(0, ec.value()); + tcp_connected.set_value(); + + const std::function<void(const boost::system::error_code&, std::size_t)> receive_cbk = [&]( + const boost::system::error_code& error, std::size_t bytes_transferred) { + if (error) { + acks_received = expected_acks; + responses_received = expected_responses; + ADD_FAILURE() << __func__ << " error: " << error.message(); + return; + } + #if 0 + std::stringstream str; + for (size_t i = 0; i < bytes_transferred; i++) { + str << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(receive_buffer[i]) << " "; + } + std::cout << __func__ << " received: " << std::dec << bytes_transferred << " bytes: " << str.str() << std::endl; + #endif + + vsomeip::deserializer its_deserializer(&receive_buffer[0], bytes_transferred, 0); + vsomeip::service_t its_service = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_SERVICE_POS_MIN], + receive_buffer[VSOMEIP_SERVICE_POS_MAX]); + vsomeip::method_t its_method = VSOMEIP_BYTES_TO_WORD(receive_buffer[VSOMEIP_METHOD_POS_MIN], + receive_buffer[VSOMEIP_METHOD_POS_MAX]); + if (its_service == vsomeip::sd::service && its_method == vsomeip::sd::method) { + vsomeip::sd::message_impl sd_msg; + EXPECT_TRUE(sd_msg.deserialize(&its_deserializer)); + EXPECT_EQ(1u, sd_msg.get_entries().size()); + for (auto e : sd_msg.get_entries()) { + EXPECT_TRUE(e->is_eventgroup_entry()); + EXPECT_EQ(vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK, e->get_type()); + EXPECT_EQ(16u, e->get_ttl()); + EXPECT_EQ(pending_subscription_test::service.service_id, e->get_service()); + EXPECT_EQ(pending_subscription_test::service.instance_id, e->get_instance()); + if (e->get_type() == vsomeip::sd::entry_type_e::SUBSCRIBE_EVENTGROUP_ACK) { + std::shared_ptr<vsomeip::sd::eventgroupentry_impl> its_casted_entry = + std::static_pointer_cast<vsomeip::sd::eventgroupentry_impl>(e); + EXPECT_TRUE(its_casted_entry->get_eventgroup() == pending_subscription_test::service.eventgroup_id); + } + } + EXPECT_EQ(0u, sd_msg.get_options().size()); + acks_received++; + } else { // non-sd-message + vsomeip::message_impl msg; + EXPECT_TRUE(msg.deserialize(&its_deserializer)); + if (msg.get_message_type() == vsomeip::message_type_e::MT_RESPONSE) { + EXPECT_EQ(vsomeip::message_type_e::MT_RESPONSE, msg.get_message_type()); + EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service()); + EXPECT_EQ(pending_subscription_test::service.shutdown_method_id, msg.get_method()); + EXPECT_EQ(0x2222, msg.get_client()); + responses_received++; + } else if (msg.get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) { + its_received_events.push_back(msg.get_method()); + EXPECT_EQ(1u, its_received_events.size()); + EXPECT_EQ(1u, msg.get_payload()->get_length()); + EXPECT_EQ(0xDD, *msg.get_payload()->get_data()); + EXPECT_EQ(pending_subscription_test::service.service_id, msg.get_service()); + EXPECT_EQ(0x0, msg.get_client()); + notifications_received++; + } + } + + + if (!triggered_notifications && acks_received == expected_acks) { // all subscribeAcks received + trigger_notifications.set_value(true); + triggered_notifications = true; + } + + if (!error && (acks_received != expected_acks || + responses_received != expected_responses || + notifications_received != expected_notifications)) { + receive(); + } + }; + + receive = [&]() { + udp_socket.async_receive(boost::asio::buffer(receive_buffer, receive_buffer.capacity()), + receive_cbk); + }; + + receive(); + while(acks_received < expected_acks || + responses_received < expected_responses || + notifications_received < expected_notifications) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_EQ(expected_acks, acks_received); + EXPECT_EQ(expected_responses, responses_received); + EXPECT_EQ(expected_notifications, notifications_received); + }); + + std::thread send_thread([&]() { + if (std::future_status::timeout == tcp_connected.get_future().wait_for(std::chrono::seconds(10))) { + ADD_FAILURE() << "Didn't establish tcp connection within time"; + } + + try { + std::uint8_t its_subscribe_message[] = { + 0xff, 0xff, 0x81, 0x00, + 0x00, 0x00, 0x00, 0x50, // length + 0x00, 0x00, 0x00, 0x01, + 0x01, 0x01, 0x02, 0x00, + 0xc0, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x30, // length entries array + 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry + 0x11, 0x22, 0x00, 0x01, // service / instance + 0x00, 0x00, 0x00, 0x10, // 16 seconds TTL + 0x00, 0x00, 0x10, 0x00, // eventgroup + 0x06, 0x00, 0x00, 0x10, // Stop subscribe Eventgroup entry + 0x11, 0x22, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x10, 0x00, + 0x06, 0x00, 0x00, 0x10, // subscribe Eventgroup entry + 0x11, 0x22, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x10, + 0x00, 0x00, 0x10, 0x00, + 0x00, 0x00, 0x00, 0x0c, // length options array + 0x00, 0x09, 0x04, 0x00, + 0xff, 0xff, 0xff, 0xff, // ip address + 0x00, 0x11, 0x77, 0x1a + }; + + boost::asio::ip::address its_local_address = + boost::asio::ip::address::from_string(std::string(local_address)); + std::memcpy(&its_subscribe_message[80], &its_local_address.to_v4().to_bytes()[0], 4); + + boost::asio::ip::udp::socket::endpoint_type target_sd( + boost::asio::ip::address::from_string(std::string(remote_address)), + 30490); + udp_socket.send_to(boost::asio::buffer(its_subscribe_message), target_sd); + + if (std::future_status::timeout == trigger_notifications.get_future().wait_for(std::chrono::seconds(10))) { + ADD_FAILURE() << "Didn't receive all SubscribeAcks within time"; + } else { + // call notify method + std::uint8_t trigger_notifications_call[] = { + 0x11, 0x22, 0x42, 0x42, + 0x00, 0x00, 0x00, 0x08, + 0x22, 0x22, 0x00, 0x01, + 0x01, 0x00, 0x01, 0x00 }; + boost::asio::ip::udp::socket::endpoint_type target_service( + boost::asio::ip::address::from_string(std::string(remote_address)), + 30001); + udp_socket.send_to(boost::asio::buffer(trigger_notifications_call), target_service); + } + + // call shutdown method + std::uint8_t shutdown_call[] = { + 0x11, 0x22, 0x14, 0x04, + 0x00, 0x00, 0x00, 0x08, + 0x22, 0x22, 0x00, 0x01, + 0x01, 0x00, 0x00, 0x00 }; + boost::asio::ip::udp::socket::endpoint_type target_service( + boost::asio::ip::address::from_string(std::string(remote_address)), + 30001); + udp_socket.send_to(boost::asio::buffer(shutdown_call), target_service); + } catch (...) { + ASSERT_FALSE(true); + } + + }); + send_thread.join(); + receive_thread.join(); + boost::system::error_code ec; + tcp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + udp_socket.shutdown(boost::asio::socket_base::shutdown_both, ec); + tcp_socket.close(ec); + udp_socket.close(ec); } #ifndef _WIN32 @@ -1227,6 +1464,8 @@ int main(int argc, char** argv) { ::testing::GTEST_FLAG(filter) = "*send_alternating_subscribe_unsubscribe_same_port"; } else if (its_testmode == std::string("SUBSCRIBE_RESUBSCRIBE_MIXED")) { ::testing::GTEST_FLAG(filter) = "*subscribe_resubscribe_mixed"; + } else if (its_testmode == std::string("SUBSCRIBE_STOPSUBSCRIBE_SUBSCRIBE")) { + ::testing::GTEST_FLAG(filter) = "*send_subscribe_stop_subscribe_subscribe"; } return RUN_ALL_TESTS(); } |