diff options
author | jannaerin <golden.janna@gmail.com> | 2020-03-04 12:08:42 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-25 04:30:03 +0000 |
commit | 8124a8d047ce142f6d6defc089e5e71192721a5c (patch) | |
tree | dddfd98d579eb288b31bb28cb29072dbc37b2c20 /src/mongo/transport | |
parent | 4601bd54dfd3f3ab20b357b71a4b17667143c0fb (diff) | |
download | mongo-8124a8d047ce142f6d6defc089e5e71192721a5c.tar.gz |
SERVER-44954 Streamable RSM uses exhaust isMaster
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/transport_layer_asio_integration_test.cpp | 84 |
1 files changed, 27 insertions, 57 deletions
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp index ec7981b615a..9cf669ec978 100644 --- a/src/mongo/transport/transport_layer_asio_integration_test.cpp +++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp @@ -152,42 +152,6 @@ TEST(TransportLayerASIO, asyncConnectTimeoutCleansUpSocket) { ASSERT_EQ(client.getStatus(), ErrorCodes::NetworkTimeout); } -class ExhaustRequestHandlerUtil { -public: - AsyncDBClient::RemoteCommandCallbackFn&& getExhaustRequestCallbackFn() { - return std::move(_callbackFn); - } - - executor::RemoteCommandResponse getReplyObjectWhenReady() { - stdx::unique_lock<Latch> lk(_mutex); - _cv.wait(_mutex, [&] { return _replyUpdated; }); - _replyUpdated = false; - return _reply; - } - -private: - // holds the server's response once it sent one - executor::RemoteCommandResponse _reply; - // set to true once 'reply' has been set. Used to indicate that a new response has been set and - // should be inspected. - bool _replyUpdated = false; - - Mutex _mutex = MONGO_MAKE_LATCH(); - stdx::condition_variable _cv; - - // called when a server sends a new isMaster exhaust response. Updates _reply and _replyUpdated. - AsyncDBClient::RemoteCommandCallbackFn _callbackFn = - [&](const executor::RemoteCommandResponse& response, bool isMoreToComeSet) { - { - stdx::unique_lock<Latch> lk(_mutex); - _reply = response; - _replyUpdated = true; - } - - _cv.notify_all(); - }; -}; - TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { auto connectionString = unittest::getFixtureConnectionString(); auto server = connectionString.getServers().front(); @@ -217,29 +181,27 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { BSONObj(), nullptr}; - ExhaustRequestHandlerUtil exhaustRequestHandler; - Future<void> exhaustFuture = handle->runExhaustCommandRequest( - isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn()); + Future<executor::RemoteCommandResponse> beginExhaustFuture = + handle->beginExhaustCommandRequest(isMasterRequest); Date_t prevTime; TopologyVersion topologyVersion; { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = beginExhaustFuture.get(); - ASSERT(!exhaustFuture.isReady()); ASSERT_OK(reply.status); + ASSERT(reply.moreToCome); prevTime = reply.data.getField("localTime").Date(); topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"), reply.data.getField("topologyVersion").Obj()); } + Future<executor::RemoteCommandResponse> awaitExhaustFuture = handle->awaitExhaustCommand(); { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = awaitExhaustFuture.get(); - // The moreToCome bit is still set - ASSERT(!exhaustFuture.isReady()); ASSERT_OK(reply.status); - + ASSERT(reply.moreToCome); auto replyTime = reply.data.getField("localTime").Date(); ASSERT_GT(replyTime, prevTime); @@ -249,12 +211,22 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { ASSERT_EQ(replyTopologyVersion.getCounter(), topologyVersion.getCounter()); } - handle->cancel(); - handle->end(); - auto error = exhaustFuture.getNoThrow(); - // exhaustFuture will resolve with CallbackCanceled unless the socket is already closed, in - // which case it will resolve with HostUnreachable. - ASSERT((error == ErrorCodes::CallbackCanceled) || (error == ErrorCodes::HostUnreachable)); + Future<executor::RemoteCommandResponse> cancelExhaustFuture = handle->awaitExhaustCommand(); + { + handle->cancel(); + handle->end(); + auto swReply = cancelExhaustFuture.getNoThrow(); + + // The original isMaster request has maxAwaitTimeMs = 1000 ms, if the cancel executes before + // the 1000ms then we expect the future to resolve with an error. It should resolve with + // CallbackCanceled unless the socket is already closed, in which case it will resolve with + // HostUnreachable. If the network is slow, the server may response before the cancel + // executes however. + if (!swReply.getStatus().isOK()) { + ASSERT((swReply.getStatus() == ErrorCodes::CallbackCanceled) || + (swReply.getStatus() == ErrorCodes::HostUnreachable)); + } + } } TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) { @@ -323,16 +295,14 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) { BSONObj(), nullptr}; - ExhaustRequestHandlerUtil exhaustRequestHandler; - Future<void> exhaustFuture = isMasterHandle->runExhaustCommandRequest( - isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn()); - + Future<executor::RemoteCommandResponse> beginExhaustFuture = + isMasterHandle->beginExhaustCommandRequest(isMasterRequest); { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = beginExhaustFuture.get(); - exhaustFuture.get(); ASSERT_OK(reply.status); ASSERT_EQ(reply.data["ok"].Double(), 0.0); + ASSERT(!reply.moreToCome); } } |