summaryrefslogtreecommitdiff
path: root/src/mongo/transport/grpc/mock_server_stream_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/grpc/mock_server_stream_test.cpp')
-rw-r--r--src/mongo/transport/grpc/mock_server_stream_test.cpp66
1 files changed, 54 insertions, 12 deletions
diff --git a/src/mongo/transport/grpc/mock_server_stream_test.cpp b/src/mongo/transport/grpc/mock_server_stream_test.cpp
index 1452f9f3f4a..1f01a7b7ff0 100644
--- a/src/mongo/transport/grpc/mock_server_stream_test.cpp
+++ b/src/mongo/transport/grpc/mock_server_stream_test.cpp
@@ -33,6 +33,8 @@
#include <utility>
#include <vector>
+#include <grpcpp/support/status.h>
+
#include "mongo/db/concurrency/locker_noop_service_context_test_fixture.h"
#include "mongo/platform/mutex.h"
#include "mongo/rpc/message.h"
@@ -40,6 +42,7 @@
#include "mongo/transport/grpc/metadata.h"
#include "mongo/transport/grpc/mock_server_context.h"
#include "mongo/transport/grpc/mock_server_stream.h"
+#include "mongo/transport/grpc/mock_stub.h"
#include "mongo/transport/grpc/test_fixtures.h"
#include "mongo/unittest/assert.h"
#include "mongo/unittest/death_test.h"
@@ -47,6 +50,7 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/concurrency/notification.h"
#include "mongo/util/duration.h"
+#include "mongo/util/future.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/system_clock_source.h"
@@ -58,32 +62,29 @@ template <class Base>
class MockServerStreamBase : public Base {
public:
static constexpr Milliseconds kTimeout = Milliseconds(100);
- static constexpr const char* kRemote = "abc:123";
virtual void setUp() override {
Base::setUp();
- _fixtures = std::make_unique<MockStreamTestFixtures>(
- HostAndPort{kRemote}, kTimeout, _clientMetadata);
- }
- MockStreamTestFixtures& getFixtures() {
- return *_fixtures;
+ MockStubTestFixtures fixtures;
+ _fixtures = fixtures.makeStreamTestFixtures(
+ Base::getServiceContext()->getFastClockSource()->now() + kTimeout, _clientMetadata);
}
MockServerStream& getServerStream() {
- return *getFixtures().serverStream;
+ return *_fixtures->serverStream;
}
MockServerContext& getServerContext() {
- return *getFixtures().serverCtx;
+ return *_fixtures->serverCtx;
}
- MockClientStream& getClientStream() {
- return *getFixtures().clientStream;
+ ClientStream& getClientStream() {
+ return *_fixtures->clientStream;
}
- MockClientContext& getClientContext() {
- return *getFixtures().clientCtx;
+ ClientContext& getClientContext() {
+ return *_fixtures->clientCtx;
}
const Message& getClientFirstMessage() const {
@@ -220,6 +221,7 @@ TEST_F(MockServerStreamTestWithMockedClockSource, DeadlineIsEnforced) {
ASSERT_FALSE(getServerStream().write(makeUniqueMessage().sharedBuffer()));
ASSERT_FALSE(getClientContext().getServerInitialMetadata());
ASSERT_FALSE(getClientStream().read());
+ ASSERT_EQ(getClientStream().finish().error_code(), ::grpc::StatusCode::DEADLINE_EXCEEDED);
}
TEST_F(MockServerStreamTest, TryCancelSubsequentServerRead) {
@@ -265,4 +267,44 @@ TEST_F(MockServerStreamTest, ClientMetadataIsAccessible) {
ASSERT_EQ(getServerContext().getClientMetadata(), getClientMetadata());
}
+TEST_F(MockServerStreamTest, ClientSideCancellation) {
+ ASSERT_TRUE(getClientStream().write(makeUniqueMessage().sharedBuffer()));
+ ASSERT_TRUE(getServerStream().read());
+
+ getClientContext().tryCancel();
+
+ ASSERT_FALSE(getClientStream().read());
+ ASSERT_FALSE(getClientStream().write(makeUniqueMessage().sharedBuffer()));
+ ASSERT_FALSE(getServerStream().read());
+ ASSERT_FALSE(getServerStream().write(makeUniqueMessage().sharedBuffer()));
+ ASSERT_TRUE(getServerContext().isCancelled());
+
+ ASSERT_EQ(getClientStream().finish().error_code(), ::grpc::StatusCode::CANCELLED);
+}
+
+TEST_F(MockServerStreamTest, CancellationInterruptsFinish) {
+ auto pf = makePromiseFuture<::grpc::Status>();
+ auto finishThread =
+ stdx::thread([&] { pf.promise.setWith([&] { return getClientStream().finish(); }); });
+ ON_BLOCK_EXIT([&] { finishThread.join(); });
+
+ // finish() won't return until server end hangs up too.
+ ASSERT_FALSE(pf.future.isReady());
+
+ getServerContext().tryCancel();
+ ASSERT_EQ(pf.future.get().error_code(), ::grpc::StatusCode::CANCELLED);
+}
+
+TEST_F(MockServerStreamTestWithMockedClockSource, DeadlineExceededInterruptsFinish) {
+ auto pf = makePromiseFuture<::grpc::Status>();
+ auto finishThread =
+ stdx::thread([&] { pf.promise.setWith([&] { return getClientStream().finish(); }); });
+ ON_BLOCK_EXIT([&] { finishThread.join(); });
+
+ // finish() won't return until server end hangs up too.
+ ASSERT_FALSE(pf.future.isReady());
+
+ clockSource().advance(kTimeout * 2);
+ ASSERT_EQ(pf.future.get().error_code(), ::grpc::StatusCode::DEADLINE_EXCEEDED);
+}
} // namespace mongo::transport::grpc