From 4530bcb094d5017599699a3da074a061a493f2b1 Mon Sep 17 00:00:00 2001 From: Ruoxin Xu Date: Sun, 4 Oct 2020 10:57:34 +0100 Subject: SERVER-25497 Fix sharded query path to handle shutdown of the mongos process --- src/mongo/s/query/async_results_merger.cpp | 41 ++++------ src/mongo/s/query/async_results_merger.h | 37 ++++++--- src/mongo/s/query/async_results_merger_test.cpp | 102 +++++++++++------------- src/mongo/s/query/blocking_results_merger.cpp | 7 +- src/mongo/stdx/future.h | 1 + 5 files changed, 91 insertions(+), 97 deletions(-) diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 98aec3332ec..f97dd6c6da9 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -685,12 +685,10 @@ void AsyncResultsMerger::_cleanUpKilledBatch(WithLock lk) { invariant(_lifecycleState == kKillStarted); // If this is the last callback to run then we are ready to free the ARM. We signal the - // '_killCompleteEvent', which the caller of kill() may be waiting on. + // '_killCompleteInfo', which the caller of kill() may be waiting on. if (!_haveOutstandingBatchRequests(lk)) { - // If the event is invalid then '_executor' is in shutdown, so we cannot signal events. - if (_killCompleteEvent.isValid()) { - _executor->signalEvent(_killCompleteEvent); - } + invariant(_killCompleteInfo); + _killCompleteInfo->signalFutures(); _lifecycleState = kKillComplete; } @@ -818,7 +816,7 @@ bool AsyncResultsMerger::_haveOutstandingBatchRequests(WithLock) { } void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) { - invariant(_killCompleteEvent.isValid()); + invariant(_killCompleteInfo); for (const auto& remote : _remotes) { if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) { @@ -834,48 +832,37 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) } } -executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* opCtx) { +stdx::shared_future AsyncResultsMerger::kill(OperationContext* opCtx) { stdx::lock_guard lk(_mutex); - if (_killCompleteEvent.isValid()) { + if (_killCompleteInfo) { invariant(_lifecycleState != kAlive); - return _killCompleteEvent; + return _killCompleteInfo->getFuture(); } invariant(_lifecycleState == kAlive); _lifecycleState = kKillStarted; - // Make '_killCompleteEvent', which we will signal as soon as all of our callbacks - // have finished running. - auto statusWithEvent = _executor->makeEvent(); - if (ErrorCodes::isShutdownError(statusWithEvent.getStatus().code())) { - // The underlying task executor is shutting down. - if (!_haveOutstandingBatchRequests(lk)) { - _lifecycleState = kKillComplete; - } - return executor::TaskExecutor::EventHandle(); - } - fassert(28716, statusWithEvent); - _killCompleteEvent = statusWithEvent.getValue(); + // Create "_killCompleteInfo", which we will signal as soon as all of our callbacks have + // finished running. + _killCompleteInfo.emplace(); _scheduleKillCursors(lk, opCtx); if (!_haveOutstandingBatchRequests(lk)) { _lifecycleState = kKillComplete; - // Signal the event right now, as there's nothing to wait for. - _executor->signalEvent(_killCompleteEvent); - return _killCompleteEvent; + // Signal the future right now, as there's nothing to wait for. + _killCompleteInfo->signalFutures(); + return _killCompleteInfo->getFuture(); } - _lifecycleState = kKillStarted; - // Cancel all of our callbacks. Once they all complete, the event will be signaled. for (const auto& remote : _remotes) { if (remote.cbHandle.isValid()) { _executor->cancel(remote.cbHandle); } } - return _killCompleteEvent; + return _killCompleteInfo->getFuture(); } // diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 5e247d20391..11fc6eb2569 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -40,6 +40,7 @@ #include "mongo/platform/mutex.h" #include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/s/query/cluster_query_result.h" +#include "mongo/stdx/future.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" @@ -231,22 +232,19 @@ public: /** * Starts shutting down this ARM by canceling all pending requests and scheduling killCursors - * on all of the unexhausted remotes. Returns a handle to an event that is signaled when this - * ARM is safe to destroy. + * on all of the unexhausted remotes. Returns a 'future' that is signaled when this ARM is safe + * to destroy. * - * If there are no pending requests, schedules killCursors and signals the event immediately. + * If there are no pending requests, schedules killCursors and signals the future immediately. * Otherwise, the last callback that runs after kill() is called signals the event. * - * Returns an invalid handle if the underlying task executor is shutting down. In this case, - * killing is considered complete and the ARM may be destroyed immediately. - * * May be called multiple times (idempotent). * * Note that 'opCtx' may or may not be the same as the operation context to which this cursor is * currently attached. This is so that a killing thread may call this method with its own * operation context. */ - executor::TaskExecutor::EventHandle kill(OperationContext* opCtx); + stdx::shared_future kill(OperationContext* opCtx); private: /** @@ -517,9 +515,28 @@ private: LifecycleState _lifecycleState = kAlive; - // Signaled when all outstanding batch request callbacks have run after kill() has been - // called. This means that the ARM is safe to delete. - executor::TaskExecutor::EventHandle _killCompleteEvent; + // Handles the promise/future mechanism used to cleanly shut down the ARM. This avoids race + // conditions in cases where the underlying TaskExecutor is simultaneously being torn down. + struct KillCompletePromiseFuture { + KillCompletePromiseFuture() : _future(_promise.get_future()){}; + + // Multiple calls to kill() can be made and each must return a future that will be notified + // when the ARM has been cleaned up. + stdx::shared_future getFuture() { + return _future; + } + + // Called by the ARM when all outstanding requests have run. Notifies all threads waiting on + // shared futures that the ARM has been cleaned up. + void signalFutures() { + _promise.set_value(); + } + + private: + stdx::promise _promise; + stdx::shared_future _future; + }; + boost::optional _killCompleteInfo; }; } // namespace mongo diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 3c0a626ec4a..00112add13b 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -465,8 +465,8 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::InternalError); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, HasFirstBatch) { @@ -649,13 +649,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult()); - // Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't - // wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of - // the network interface. - auto killEvent = arm->kill(operationContext()); - ASSERT_TRUE(killEvent.isValid()); + auto killFuture = arm->kill(operationContext()); executor()->shutdown(); - executor()->waitForEvent(killEvent); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { @@ -678,8 +674,8 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { ASSERT(!arm->nextReady().isOK()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { @@ -711,8 +707,8 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { ASSERT(!statusWithNext.isOK()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { @@ -746,8 +742,8 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { ASSERT_EQ(statusWithNext.getStatus().reason(), "bad thing happened"); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { @@ -776,8 +772,8 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { @@ -788,8 +784,8 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { executor()->shutdown(); ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus()); - auto killEvent = arm->kill(operationContext()); - ASSERT_FALSE(killEvent.isValid()); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) { @@ -806,8 +802,8 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch // Executor shuts down before a response is received. executor()->shutdown(); - auto killEvent = arm->kill(operationContext()); - ASSERT_FALSE(killEvent.isValid()); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); // Ensure that the executor finishes all of the outstanding callbacks before the ARM is freed. executor()->join(); @@ -820,7 +816,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { auto arm = makeARMFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto killedEvent = arm->kill(operationContext()); + auto killFuture = arm->kill(operationContext()); assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1); // Killed cursors are considered ready, but return an error when you try to receive the next @@ -828,7 +824,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); - executor()->waitForEvent(killedEvent); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { @@ -854,14 +850,14 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) { responses.emplace_back(kTestNss, CursorId(0), batch3); scheduleNetworkResponses(std::move(responses)); - auto killedEvent = arm->kill(operationContext()); + auto killFuture = arm->kill(operationContext()); // ARM shouldn't schedule killCursors on anything since all of the remotes are exhausted. ASSERT_FALSE(networkHasReadyRequests()); ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); - executor()->waitForEvent(killedEvent); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { @@ -888,14 +884,14 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) { responses.emplace_back(kTestNss, CursorId(123), batch3); scheduleNetworkResponses(std::move(responses)); - auto killedEvent = arm->kill(operationContext()); + auto killFuture = arm->kill(operationContext()); // ARM should schedule killCursors on cursor 123 assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 123); ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); - executor()->waitForEvent(killedEvent); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { @@ -918,7 +914,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { scheduleNetworkResponses(std::move(responses)); // Kill event will only be signalled once the callbacks for the pending batches have run. - auto killedEvent = arm->kill(operationContext()); + auto killFuture = arm->kill(operationContext()); // Check that the ARM kills both batches. assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 2); @@ -929,7 +925,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { // Ensure that we properly signal those waiting for more results to be ready. executor()->waitForEvent(readyEvent); - executor()->waitForEvent(killedEvent); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { @@ -947,12 +943,12 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { responses.emplace_back(kTestNss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses)); - auto killedEvent = arm->kill(operationContext()); + auto killFuture = arm->kill(operationContext()); // Attempting to schedule more network operations on a killed arm is an error. ASSERT_NOT_OK(arm->nextEvent().getStatus()); - executor()->waitForEvent(killedEvent); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, KillCalledTwice) { @@ -960,12 +956,10 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) { cursors.push_back( makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); auto arm = makeARMFromExistingCursors(std::move(cursors)); - auto killedEvent1 = arm->kill(operationContext()); - ASSERT(killedEvent1.isValid()); - auto killedEvent2 = arm->kill(operationContext()); - ASSERT(killedEvent2.isValid()); - executor()->waitForEvent(killedEvent1); - executor()->waitForEvent(killedEvent2); + auto killFuture1 = arm->kill(operationContext()); + auto killFuture2 = arm->kill(operationContext()); + killFuture1.wait(); + killFuture2.wait(); } TEST_F(AsyncResultsMergerTest, TailableBasic) { @@ -1012,8 +1006,8 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); - auto killedEvent = arm->kill(operationContext()); - executor()->waitForEvent(killedEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { @@ -1040,8 +1034,8 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); - auto killedEvent = arm->kill(operationContext()); - executor()->waitForEvent(killedEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { @@ -1270,8 +1264,8 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { ASSERT_EQ(statusWithNext.getStatus().reason(), "host unreachable"); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { @@ -1433,8 +1427,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfRemoteHasLowerPostB std::vector responses; responses.emplace_back(kTestNss, CursorId(0), std::vector{}); scheduleNetworkResponses(std::move(responses)); - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) { @@ -1801,8 +1795,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { @@ -1813,8 +1807,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { @@ -1831,8 +1825,8 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent().getStatus()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(operationContext()); - executor()->waitForEvent(killEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulingKillCursors) { @@ -1854,7 +1848,7 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin // Kill the ARM while a batch is still outstanding. The callback for the outstanding batch // should be canceled. - auto killEvent = arm->kill(operationContext()); + auto killFuture = arm->kill(operationContext()); // Check that the ARM will run killCursors. assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1); @@ -1863,7 +1857,7 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin runReadyCallbacks(); executor()->waitForEvent(readyEvent); - executor()->waitForEvent(killEvent); + killFuture.wait(); } TEST_F(AsyncResultsMergerTest, GetMoresShouldNotIncludeLSIDOrTxnNumberIfNoneSpecified) { @@ -2036,8 +2030,8 @@ TEST_F(AsyncResultsMergerTest, ShouldNotScheduleGetMoresWithoutAnOperationContex ASSERT_FALSE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); - auto killedEvent = arm->kill(operationContext()); - executor()->waitForEvent(killedEvent); + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); } } // namespace diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp index fd2a52f7625..7f9edc2cb0e 100644 --- a/src/mongo/s/query/blocking_results_merger.cpp +++ b/src/mongo/s/query/blocking_results_merger.cpp @@ -178,12 +178,7 @@ StatusWith BlockingResultsMerger::getNextEv } void BlockingResultsMerger::kill(OperationContext* opCtx) { - auto killEvent = _arm.kill(opCtx); - if (!killEvent) { - // We are shutting down. - return; - } - _executor->waitForEvent(killEvent); + _arm.kill(opCtx).wait(); } } // namespace mongo diff --git a/src/mongo/stdx/future.h b/src/mongo/stdx/future.h index 58bdccfa400..548d4863862 100644 --- a/src/mongo/stdx/future.h +++ b/src/mongo/stdx/future.h @@ -40,6 +40,7 @@ using ::std::future_status; // NOLINT using ::std::launch; // NOLINT using ::std::packaged_task; // NOLINT using ::std::promise; // NOLINT +using ::std::shared_future; // NOLINT } // namespace stdx } // namespace mongo -- cgit v1.2.1