diff options
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors_test.cpp | 66 |
2 files changed, 46 insertions, 32 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 4ee7bf893c7..36f91459d29 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -58,6 +58,7 @@ namespace { const int kMaxNumFailedHostRetryAttempts = 3; MONGO_FAIL_POINT_DEFINE(hangBeforeSchedulingRemoteCommand); +MONGO_FAIL_POINT_DEFINE(hangBeforePollResponse); } // namespace @@ -89,6 +90,17 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, AsyncRequestsSender::Response AsyncRequestsSender::next() noexcept { invariant(!done()); + hangBeforePollResponse.executeIf( + [&](const BSONObj& data) { + while (MONGO_unlikely(hangBeforePollResponse.shouldFail())) { + LOGV2(4840900, "Hanging in ARS::next due to 'hangBeforePollResponse' failpoint"); + sleepmillis(100); + } + }, + [&](const BSONObj& data) { + return MONGO_unlikely(_remotesLeft == (size_t)data.getIntField("remotesLeft")); + }); + _remotesLeft--; // If we've been interrupted, the response queue should be filled with interrupted answers, go diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp index eb38a322293..5091b44758b 100644 --- a/src/mongo/s/query/establish_cursors_test.cpp +++ b/src/mongo/s/query/establish_cursors_test.cpp @@ -717,10 +717,18 @@ TEST_F(EstablishCursorsTest, InterruptedWithDanglingRemoteRequest) { {kTestShardIds[1], cmdObj}, }; - // Hang before sending the command to shard 1. - auto fp = globalFailPointRegistry().find("hangBeforeSchedulingRemoteCommand"); - invariant(fp); - fp->setMode(FailPoint::alwaysOn, 0, BSON("hostAndPort" << kTestShardHosts[1].toString())); + // Hang in ARS before it sends the request to remotes[1]. + auto fpSend = globalFailPointRegistry().find("hangBeforeSchedulingRemoteCommand"); + invariant(fpSend); + auto timesHitSend = fpSend->setMode( + FailPoint::alwaysOn, 0, BSON("hostAndPort" << kTestShardHosts[1].toString())); + + // Also hang in ARS::next when there is exactly 1 remote that hasn't replied yet. + // This failpoint is important to ensure establishCursors' check for _interruptStatus.isOK() + // happens after this unittest does opCtx->killOperation(). + auto fpNext = globalFailPointRegistry().find("hangBeforePollResponse"); + invariant(fpNext); + auto timesHitNext = fpNext->setMode(FailPoint::alwaysOn, 0, BSON("remotesLeft" << 1)); auto future = launchAsync([&] { ASSERT_THROWS(establishCursors(operationContext(), @@ -732,16 +740,6 @@ TEST_F(EstablishCursorsTest, InterruptedWithDanglingRemoteRequest) { ExceptionFor<ErrorCodes::CursorKilled>); }); - // Verify that the failpoint is hit. - fp->waitForTimesEntered(5ULL); - - // Mark the OperationContext as killed. - { - stdx::lock_guard<Client> lk(*operationContext()->getClient()); - operationContext()->getServiceContext()->killOperation( - lk, operationContext(), ErrorCodes::CursorKilled); - } - // First remote responds. onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); @@ -750,27 +748,31 @@ TEST_F(EstablishCursorsTest, InterruptedWithDanglingRemoteRequest) { return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); }); - // Disable the failpoint to enable the ARS to continue. Once interrupted, it will then trigger a - // killOperations for the two remotes. - fp->setMode(FailPoint::off); + // Wait for ars._remotes[1] to try to send its request. We want to test the case where the + // opCtx is killed after this happens. + fpSend->waitForTimesEntered(timesHitSend + 1); - // The second remote operation may be in flight before the killOperations cleanup, so relax the - // assertions on the mocked responses. - auto killsReceived = 0; - while (killsReceived < 2) { - onCommand([&](const RemoteCommandRequest& request) { - if (request.dbname == "admin" && request.cmdObj.hasField("_killOperations")) { - killsReceived++; - return BSON("ok" << 1); - } + // Mark the OperationContext as killed. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->getServiceContext()->killOperation( + lk, operationContext(), ErrorCodes::CursorKilled); + } - // Its not a killOperations, so expect a normal remote command. - ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); + // Allow ars._remotes[1] to send its request. + fpSend->setMode(FailPoint::off); - CursorResponse cursorResponse(_nss, CursorId(123), {}); - return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); - }); - } + // Wait for establishCursors to call ars.next. + fpNext->waitForTimesEntered(timesHitNext + 1); + + // Disable the ARS::next failpoint to allow establishCursors to handle that response. + // Now ARS::next should check that the opCtx has been marked killed, and return a + // failing response to establishCursors, which should clean up by sending kill commands. + fpNext->setMode(FailPoint::off); + + // Because we paused the ARS using hangBeforePollResponse, we know the ARS will detect the + // killed opCtx before sending any more requests. So we know only _killOperations will be sent. + expectKillOperations(2); future.default_timed_get(); } |