diff options
Diffstat (limited to 'src/mongo/transport/grpc/mock_server_stream_test.cpp')
-rw-r--r-- | src/mongo/transport/grpc/mock_server_stream_test.cpp | 66 |
1 files changed, 54 insertions, 12 deletions
diff --git a/src/mongo/transport/grpc/mock_server_stream_test.cpp b/src/mongo/transport/grpc/mock_server_stream_test.cpp index 1452f9f3f4a..1f01a7b7ff0 100644 --- a/src/mongo/transport/grpc/mock_server_stream_test.cpp +++ b/src/mongo/transport/grpc/mock_server_stream_test.cpp @@ -33,6 +33,8 @@ #include <utility> #include <vector> +#include <grpcpp/support/status.h> + #include "mongo/db/concurrency/locker_noop_service_context_test_fixture.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/message.h" @@ -40,6 +42,7 @@ #include "mongo/transport/grpc/metadata.h" #include "mongo/transport/grpc/mock_server_context.h" #include "mongo/transport/grpc/mock_server_stream.h" +#include "mongo/transport/grpc/mock_stub.h" #include "mongo/transport/grpc/test_fixtures.h" #include "mongo/unittest/assert.h" #include "mongo/unittest/death_test.h" @@ -47,6 +50,7 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/concurrency/notification.h" #include "mongo/util/duration.h" +#include "mongo/util/future.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/scopeguard.h" #include "mongo/util/system_clock_source.h" @@ -58,32 +62,29 @@ template <class Base> class MockServerStreamBase : public Base { public: static constexpr Milliseconds kTimeout = Milliseconds(100); - static constexpr const char* kRemote = "abc:123"; virtual void setUp() override { Base::setUp(); - _fixtures = std::make_unique<MockStreamTestFixtures>( - HostAndPort{kRemote}, kTimeout, _clientMetadata); - } - MockStreamTestFixtures& getFixtures() { - return *_fixtures; + MockStubTestFixtures fixtures; + _fixtures = fixtures.makeStreamTestFixtures( + Base::getServiceContext()->getFastClockSource()->now() + kTimeout, _clientMetadata); } MockServerStream& getServerStream() { - return *getFixtures().serverStream; + return *_fixtures->serverStream; } MockServerContext& getServerContext() { - return *getFixtures().serverCtx; + return *_fixtures->serverCtx; } - MockClientStream& getClientStream() { - return *getFixtures().clientStream; + ClientStream& getClientStream() { + return *_fixtures->clientStream; } - MockClientContext& getClientContext() { - return *getFixtures().clientCtx; + ClientContext& getClientContext() { + return *_fixtures->clientCtx; } const Message& getClientFirstMessage() const { @@ -220,6 +221,7 @@ TEST_F(MockServerStreamTestWithMockedClockSource, DeadlineIsEnforced) { ASSERT_FALSE(getServerStream().write(makeUniqueMessage().sharedBuffer())); ASSERT_FALSE(getClientContext().getServerInitialMetadata()); ASSERT_FALSE(getClientStream().read()); + ASSERT_EQ(getClientStream().finish().error_code(), ::grpc::StatusCode::DEADLINE_EXCEEDED); } TEST_F(MockServerStreamTest, TryCancelSubsequentServerRead) { @@ -265,4 +267,44 @@ TEST_F(MockServerStreamTest, ClientMetadataIsAccessible) { ASSERT_EQ(getServerContext().getClientMetadata(), getClientMetadata()); } +TEST_F(MockServerStreamTest, ClientSideCancellation) { + ASSERT_TRUE(getClientStream().write(makeUniqueMessage().sharedBuffer())); + ASSERT_TRUE(getServerStream().read()); + + getClientContext().tryCancel(); + + ASSERT_FALSE(getClientStream().read()); + ASSERT_FALSE(getClientStream().write(makeUniqueMessage().sharedBuffer())); + ASSERT_FALSE(getServerStream().read()); + ASSERT_FALSE(getServerStream().write(makeUniqueMessage().sharedBuffer())); + ASSERT_TRUE(getServerContext().isCancelled()); + + ASSERT_EQ(getClientStream().finish().error_code(), ::grpc::StatusCode::CANCELLED); +} + +TEST_F(MockServerStreamTest, CancellationInterruptsFinish) { + auto pf = makePromiseFuture<::grpc::Status>(); + auto finishThread = + stdx::thread([&] { pf.promise.setWith([&] { return getClientStream().finish(); }); }); + ON_BLOCK_EXIT([&] { finishThread.join(); }); + + // finish() won't return until server end hangs up too. + ASSERT_FALSE(pf.future.isReady()); + + getServerContext().tryCancel(); + ASSERT_EQ(pf.future.get().error_code(), ::grpc::StatusCode::CANCELLED); +} + +TEST_F(MockServerStreamTestWithMockedClockSource, DeadlineExceededInterruptsFinish) { + auto pf = makePromiseFuture<::grpc::Status>(); + auto finishThread = + stdx::thread([&] { pf.promise.setWith([&] { return getClientStream().finish(); }); }); + ON_BLOCK_EXIT([&] { finishThread.join(); }); + + // finish() won't return until server end hangs up too. + ASSERT_FALSE(pf.future.isReady()); + + clockSource().advance(kTimeout * 2); + ASSERT_EQ(pf.future.get().error_code(), ::grpc::StatusCode::DEADLINE_EXCEEDED); +} } // namespace mongo::transport::grpc |