summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-03-04 12:08:42 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-25 04:30:03 +0000
commit8124a8d047ce142f6d6defc089e5e71192721a5c (patch)
treedddfd98d579eb288b31bb28cb29072dbc37b2c20 /src/mongo/transport
parent4601bd54dfd3f3ab20b357b71a4b17667143c0fb (diff)
downloadmongo-8124a8d047ce142f6d6defc089e5e71192721a5c.tar.gz
SERVER-44954 Streamable RSM uses exhaust isMaster
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/transport_layer_asio_integration_test.cpp84
1 files changed, 27 insertions, 57 deletions
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp
index ec7981b615a..9cf669ec978 100644
--- a/src/mongo/transport/transport_layer_asio_integration_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp
@@ -152,42 +152,6 @@ TEST(TransportLayerASIO, asyncConnectTimeoutCleansUpSocket) {
ASSERT_EQ(client.getStatus(), ErrorCodes::NetworkTimeout);
}
-class ExhaustRequestHandlerUtil {
-public:
- AsyncDBClient::RemoteCommandCallbackFn&& getExhaustRequestCallbackFn() {
- return std::move(_callbackFn);
- }
-
- executor::RemoteCommandResponse getReplyObjectWhenReady() {
- stdx::unique_lock<Latch> lk(_mutex);
- _cv.wait(_mutex, [&] { return _replyUpdated; });
- _replyUpdated = false;
- return _reply;
- }
-
-private:
- // holds the server's response once it sent one
- executor::RemoteCommandResponse _reply;
- // set to true once 'reply' has been set. Used to indicate that a new response has been set and
- // should be inspected.
- bool _replyUpdated = false;
-
- Mutex _mutex = MONGO_MAKE_LATCH();
- stdx::condition_variable _cv;
-
- // called when a server sends a new isMaster exhaust response. Updates _reply and _replyUpdated.
- AsyncDBClient::RemoteCommandCallbackFn _callbackFn =
- [&](const executor::RemoteCommandResponse& response, bool isMoreToComeSet) {
- {
- stdx::unique_lock<Latch> lk(_mutex);
- _reply = response;
- _replyUpdated = true;
- }
-
- _cv.notify_all();
- };
-};
-
TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
auto connectionString = unittest::getFixtureConnectionString();
auto server = connectionString.getServers().front();
@@ -217,29 +181,27 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
BSONObj(),
nullptr};
- ExhaustRequestHandlerUtil exhaustRequestHandler;
- Future<void> exhaustFuture = handle->runExhaustCommandRequest(
- isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn());
+ Future<executor::RemoteCommandResponse> beginExhaustFuture =
+ handle->beginExhaustCommandRequest(isMasterRequest);
Date_t prevTime;
TopologyVersion topologyVersion;
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = beginExhaustFuture.get();
- ASSERT(!exhaustFuture.isReady());
ASSERT_OK(reply.status);
+ ASSERT(reply.moreToCome);
prevTime = reply.data.getField("localTime").Date();
topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"),
reply.data.getField("topologyVersion").Obj());
}
+ Future<executor::RemoteCommandResponse> awaitExhaustFuture = handle->awaitExhaustCommand();
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = awaitExhaustFuture.get();
- // The moreToCome bit is still set
- ASSERT(!exhaustFuture.isReady());
ASSERT_OK(reply.status);
-
+ ASSERT(reply.moreToCome);
auto replyTime = reply.data.getField("localTime").Date();
ASSERT_GT(replyTime, prevTime);
@@ -249,12 +211,22 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
ASSERT_EQ(replyTopologyVersion.getCounter(), topologyVersion.getCounter());
}
- handle->cancel();
- handle->end();
- auto error = exhaustFuture.getNoThrow();
- // exhaustFuture will resolve with CallbackCanceled unless the socket is already closed, in
- // which case it will resolve with HostUnreachable.
- ASSERT((error == ErrorCodes::CallbackCanceled) || (error == ErrorCodes::HostUnreachable));
+ Future<executor::RemoteCommandResponse> cancelExhaustFuture = handle->awaitExhaustCommand();
+ {
+ handle->cancel();
+ handle->end();
+ auto swReply = cancelExhaustFuture.getNoThrow();
+
+ // The original isMaster request has maxAwaitTimeMs = 1000 ms, if the cancel executes before
+ // the 1000ms then we expect the future to resolve with an error. It should resolve with
+ // CallbackCanceled unless the socket is already closed, in which case it will resolve with
+ // HostUnreachable. If the network is slow, the server may response before the cancel
+ // executes however.
+ if (!swReply.getStatus().isOK()) {
+ ASSERT((swReply.getStatus() == ErrorCodes::CallbackCanceled) ||
+ (swReply.getStatus() == ErrorCodes::HostUnreachable));
+ }
+ }
}
TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) {
@@ -323,16 +295,14 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) {
BSONObj(),
nullptr};
- ExhaustRequestHandlerUtil exhaustRequestHandler;
- Future<void> exhaustFuture = isMasterHandle->runExhaustCommandRequest(
- isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn());
-
+ Future<executor::RemoteCommandResponse> beginExhaustFuture =
+ isMasterHandle->beginExhaustCommandRequest(isMasterRequest);
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = beginExhaustFuture.get();
- exhaustFuture.get();
ASSERT_OK(reply.status);
ASSERT_EQ(reply.data["ok"].Double(), 0.0);
+ ASSERT(!reply.moreToCome);
}
}