summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuoxin Xu <ruoxin.xu@mongodb.com>2020-10-04 10:57:34 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-15 12:45:20 +0000
commit4530bcb094d5017599699a3da074a061a493f2b1 (patch)
treeb7381277bb1232091d87b68aa1c856c2662d2ceb
parent35d7e75bca7cae7bfc984db0dbc1a5099821ccc4 (diff)
downloadmongo-4530bcb094d5017599699a3da074a061a493f2b1.tar.gz
SERVER-25497 Fix sharded query path to handle shutdown of the mongos process
-rw-r--r--src/mongo/s/query/async_results_merger.cpp41
-rw-r--r--src/mongo/s/query/async_results_merger.h37
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp102
-rw-r--r--src/mongo/s/query/blocking_results_merger.cpp7
-rw-r--r--src/mongo/stdx/future.h1
5 files changed, 91 insertions, 97 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 98aec3332ec..f97dd6c6da9 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -685,12 +685,10 @@ void AsyncResultsMerger::_cleanUpKilledBatch(WithLock lk) {
invariant(_lifecycleState == kKillStarted);
// If this is the last callback to run then we are ready to free the ARM. We signal the
- // '_killCompleteEvent', which the caller of kill() may be waiting on.
+ // '_killCompleteInfo', which the caller of kill() may be waiting on.
if (!_haveOutstandingBatchRequests(lk)) {
- // If the event is invalid then '_executor' is in shutdown, so we cannot signal events.
- if (_killCompleteEvent.isValid()) {
- _executor->signalEvent(_killCompleteEvent);
- }
+ invariant(_killCompleteInfo);
+ _killCompleteInfo->signalFutures();
_lifecycleState = kKillComplete;
}
@@ -818,7 +816,7 @@ bool AsyncResultsMerger::_haveOutstandingBatchRequests(WithLock) {
}
void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx) {
- invariant(_killCompleteEvent.isValid());
+ invariant(_killCompleteInfo);
for (const auto& remote : _remotes) {
if (remote.status.isOK() && remote.cursorId && !remote.exhausted()) {
@@ -834,48 +832,37 @@ void AsyncResultsMerger::_scheduleKillCursors(WithLock, OperationContext* opCtx)
}
}
-executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* opCtx) {
+stdx::shared_future<void> AsyncResultsMerger::kill(OperationContext* opCtx) {
stdx::lock_guard<Latch> lk(_mutex);
- if (_killCompleteEvent.isValid()) {
+ if (_killCompleteInfo) {
invariant(_lifecycleState != kAlive);
- return _killCompleteEvent;
+ return _killCompleteInfo->getFuture();
}
invariant(_lifecycleState == kAlive);
_lifecycleState = kKillStarted;
- // Make '_killCompleteEvent', which we will signal as soon as all of our callbacks
- // have finished running.
- auto statusWithEvent = _executor->makeEvent();
- if (ErrorCodes::isShutdownError(statusWithEvent.getStatus().code())) {
- // The underlying task executor is shutting down.
- if (!_haveOutstandingBatchRequests(lk)) {
- _lifecycleState = kKillComplete;
- }
- return executor::TaskExecutor::EventHandle();
- }
- fassert(28716, statusWithEvent);
- _killCompleteEvent = statusWithEvent.getValue();
+ // Create "_killCompleteInfo", which we will signal as soon as all of our callbacks have
+ // finished running.
+ _killCompleteInfo.emplace();
_scheduleKillCursors(lk, opCtx);
if (!_haveOutstandingBatchRequests(lk)) {
_lifecycleState = kKillComplete;
- // Signal the event right now, as there's nothing to wait for.
- _executor->signalEvent(_killCompleteEvent);
- return _killCompleteEvent;
+ // Signal the future right now, as there's nothing to wait for.
+ _killCompleteInfo->signalFutures();
+ return _killCompleteInfo->getFuture();
}
- _lifecycleState = kKillStarted;
-
// Cancel all of our callbacks. Once they all complete, the event will be signaled.
for (const auto& remote : _remotes) {
if (remote.cbHandle.isValid()) {
_executor->cancel(remote.cbHandle);
}
}
- return _killCompleteEvent;
+ return _killCompleteInfo->getFuture();
}
//
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 5e247d20391..11fc6eb2569 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -40,6 +40,7 @@
#include "mongo/platform/mutex.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/s/query/cluster_query_result.h"
+#include "mongo/stdx/future.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
@@ -231,22 +232,19 @@ public:
/**
* Starts shutting down this ARM by canceling all pending requests and scheduling killCursors
- * on all of the unexhausted remotes. Returns a handle to an event that is signaled when this
- * ARM is safe to destroy.
+ * on all of the unexhausted remotes. Returns a 'future' that is signaled when this ARM is safe
+ * to destroy.
*
- * If there are no pending requests, schedules killCursors and signals the event immediately.
+ * If there are no pending requests, schedules killCursors and signals the future immediately.
* Otherwise, the last callback that runs after kill() is called signals the event.
*
- * Returns an invalid handle if the underlying task executor is shutting down. In this case,
- * killing is considered complete and the ARM may be destroyed immediately.
- *
* May be called multiple times (idempotent).
*
* Note that 'opCtx' may or may not be the same as the operation context to which this cursor is
* currently attached. This is so that a killing thread may call this method with its own
* operation context.
*/
- executor::TaskExecutor::EventHandle kill(OperationContext* opCtx);
+ stdx::shared_future<void> kill(OperationContext* opCtx);
private:
/**
@@ -517,9 +515,28 @@ private:
LifecycleState _lifecycleState = kAlive;
- // Signaled when all outstanding batch request callbacks have run after kill() has been
- // called. This means that the ARM is safe to delete.
- executor::TaskExecutor::EventHandle _killCompleteEvent;
+ // Handles the promise/future mechanism used to cleanly shut down the ARM. This avoids race
+ // conditions in cases where the underlying TaskExecutor is simultaneously being torn down.
+ struct KillCompletePromiseFuture {
+ KillCompletePromiseFuture() : _future(_promise.get_future()){};
+
+ // Multiple calls to kill() can be made and each must return a future that will be notified
+ // when the ARM has been cleaned up.
+ stdx::shared_future<void> getFuture() {
+ return _future;
+ }
+
+ // Called by the ARM when all outstanding requests have run. Notifies all threads waiting on
+ // shared futures that the ARM has been cleaned up.
+ void signalFutures() {
+ _promise.set_value();
+ }
+
+ private:
+ stdx::promise<void> _promise;
+ stdx::shared_future<void> _future;
+ };
+ boost::optional<KillCompletePromiseFuture> _killCompleteInfo;
};
} // namespace mongo
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 3c0a626ec4a..00112add13b 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -465,8 +465,8 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::InternalError);
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
@@ -649,13 +649,9 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
ASSERT_TRUE(arm->ready());
ASSERT_BSONOBJ_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady()).getResult());
- // Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't
- // wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of
- // the network interface.
- auto killEvent = arm->kill(operationContext());
- ASSERT_TRUE(killEvent.isValid());
+ auto killFuture = arm->kill(operationContext());
executor()->shutdown();
- executor()->waitForEvent(killEvent);
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
@@ -678,8 +674,8 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
ASSERT(!arm->nextReady().isOK());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
@@ -711,8 +707,8 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
ASSERT(!statusWithNext.isOK());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
@@ -746,8 +742,8 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
ASSERT_EQ(statusWithNext.getStatus().reason(), "bad thing happened");
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
@@ -776,8 +772,8 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
@@ -788,8 +784,8 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
executor()->shutdown();
ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus());
- auto killEvent = arm->kill(operationContext());
- ASSERT_FALSE(killEvent.isValid());
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) {
@@ -806,8 +802,8 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
// Executor shuts down before a response is received.
executor()->shutdown();
- auto killEvent = arm->kill(operationContext());
- ASSERT_FALSE(killEvent.isValid());
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
// Ensure that the executor finishes all of the outstanding callbacks before the ARM is freed.
executor()->join();
@@ -820,7 +816,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
auto arm = makeARMFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto killedEvent = arm->kill(operationContext());
+ auto killFuture = arm->kill(operationContext());
assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1);
// Killed cursors are considered ready, but return an error when you try to receive the next
@@ -828,7 +824,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
ASSERT_TRUE(arm->ready());
ASSERT_NOT_OK(arm->nextReady().getStatus());
- executor()->waitForEvent(killedEvent);
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
@@ -854,14 +850,14 @@ TEST_F(AsyncResultsMergerTest, KillAllRemotesExhausted) {
responses.emplace_back(kTestNss, CursorId(0), batch3);
scheduleNetworkResponses(std::move(responses));
- auto killedEvent = arm->kill(operationContext());
+ auto killFuture = arm->kill(operationContext());
// ARM shouldn't schedule killCursors on anything since all of the remotes are exhausted.
ASSERT_FALSE(networkHasReadyRequests());
ASSERT_TRUE(arm->ready());
ASSERT_NOT_OK(arm->nextReady().getStatus());
- executor()->waitForEvent(killedEvent);
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
@@ -888,14 +884,14 @@ TEST_F(AsyncResultsMergerTest, KillNonExhaustedCursorWithoutPendingRequest) {
responses.emplace_back(kTestNss, CursorId(123), batch3);
scheduleNetworkResponses(std::move(responses));
- auto killedEvent = arm->kill(operationContext());
+ auto killFuture = arm->kill(operationContext());
// ARM should schedule killCursors on cursor 123
assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 123);
ASSERT_TRUE(arm->ready());
ASSERT_NOT_OK(arm->nextReady().getStatus());
- executor()->waitForEvent(killedEvent);
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
@@ -918,7 +914,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
scheduleNetworkResponses(std::move(responses));
// Kill event will only be signalled once the callbacks for the pending batches have run.
- auto killedEvent = arm->kill(operationContext());
+ auto killFuture = arm->kill(operationContext());
// Check that the ARM kills both batches.
assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 2);
@@ -929,7 +925,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
// Ensure that we properly signal those waiting for more results to be ready.
executor()->waitForEvent(readyEvent);
- executor()->waitForEvent(killedEvent);
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
@@ -947,12 +943,12 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
responses.emplace_back(kTestNss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses));
- auto killedEvent = arm->kill(operationContext());
+ auto killFuture = arm->kill(operationContext());
// Attempting to schedule more network operations on a killed arm is an error.
ASSERT_NOT_OK(arm->nextEvent().getStatus());
- executor()->waitForEvent(killedEvent);
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
@@ -960,12 +956,10 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
cursors.push_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
auto arm = makeARMFromExistingCursors(std::move(cursors));
- auto killedEvent1 = arm->kill(operationContext());
- ASSERT(killedEvent1.isValid());
- auto killedEvent2 = arm->kill(operationContext());
- ASSERT(killedEvent2.isValid());
- executor()->waitForEvent(killedEvent1);
- executor()->waitForEvent(killedEvent2);
+ auto killFuture1 = arm->kill(operationContext());
+ auto killFuture2 = arm->kill(operationContext());
+ killFuture1.wait();
+ killFuture2.wait();
}
TEST_F(AsyncResultsMergerTest, TailableBasic) {
@@ -1012,8 +1006,8 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
- auto killedEvent = arm->kill(operationContext());
- executor()->waitForEvent(killedEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
@@ -1040,8 +1034,8 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
- auto killedEvent = arm->kill(operationContext());
- executor()->waitForEvent(killedEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
@@ -1270,8 +1264,8 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
ASSERT_EQ(statusWithNext.getStatus().reason(), "host unreachable");
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
@@ -1433,8 +1427,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfRemoteHasLowerPostB
std::vector<CursorResponse> responses;
responses.emplace_back(kTestNss, CursorId(0), std::vector<BSONObj>{});
scheduleNetworkResponses(std::move(responses));
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) {
@@ -1801,8 +1795,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
@@ -1813,8 +1807,8 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
@@ -1831,8 +1825,8 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent().getStatus());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(operationContext());
- executor()->waitForEvent(killEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulingKillCursors) {
@@ -1854,7 +1848,7 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
// Kill the ARM while a batch is still outstanding. The callback for the outstanding batch
// should be canceled.
- auto killEvent = arm->kill(operationContext());
+ auto killFuture = arm->kill(operationContext());
// Check that the ARM will run killCursors.
assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1);
@@ -1863,7 +1857,7 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
runReadyCallbacks();
executor()->waitForEvent(readyEvent);
- executor()->waitForEvent(killEvent);
+ killFuture.wait();
}
TEST_F(AsyncResultsMergerTest, GetMoresShouldNotIncludeLSIDOrTxnNumberIfNoneSpecified) {
@@ -2036,8 +2030,8 @@ TEST_F(AsyncResultsMergerTest, ShouldNotScheduleGetMoresWithoutAnOperationContex
ASSERT_FALSE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
- auto killedEvent = arm->kill(operationContext());
- executor()->waitForEvent(killedEvent);
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
}
} // namespace
diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp
index fd2a52f7625..7f9edc2cb0e 100644
--- a/src/mongo/s/query/blocking_results_merger.cpp
+++ b/src/mongo/s/query/blocking_results_merger.cpp
@@ -178,12 +178,7 @@ StatusWith<executor::TaskExecutor::EventHandle> BlockingResultsMerger::getNextEv
}
void BlockingResultsMerger::kill(OperationContext* opCtx) {
- auto killEvent = _arm.kill(opCtx);
- if (!killEvent) {
- // We are shutting down.
- return;
- }
- _executor->waitForEvent(killEvent);
+ _arm.kill(opCtx).wait();
}
} // namespace mongo
diff --git a/src/mongo/stdx/future.h b/src/mongo/stdx/future.h
index 58bdccfa400..548d4863862 100644
--- a/src/mongo/stdx/future.h
+++ b/src/mongo/stdx/future.h
@@ -40,6 +40,7 @@ using ::std::future_status; // NOLINT
using ::std::launch; // NOLINT
using ::std::packaged_task; // NOLINT
using ::std::promise; // NOLINT
+using ::std::shared_future; // NOLINT
} // namespace stdx
} // namespace mongo