summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-08-22 22:53:08 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-08-30 11:47:14 -0400
commitbfbeb0cbabd9ae85f34df430474c9e524b274862 (patch)
tree4697dbc84e56120a5f34a1bd82182dc1e8740131
parent88ef24561ef69ac7756b80256a86515180b830a3 (diff)
downloadmongo-bfbeb0cbabd9ae85f34df430474c9e524b274862.tar.gz
SERVER-30799 Avoid misleading empty batches with tailable cursors.
This bug impacts tailable cursors being sent through a mongos.
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml1
-rw-r--r--jstests/core/tailable_getmore_batch_size.js94
-rw-r--r--jstests/core/tailable_skip_limit.js18
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp17
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp3
-rw-r--r--src/mongo/s/query/async_results_merger.cpp54
-rw-r--r--src/mongo/s/query/async_results_merger.h33
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp132
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp20
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp19
-rw-r--r--src/mongo/s/query/cluster_find.cpp2
-rw-r--r--src/mongo/s/query/router_exec_stage.h11
-rw-r--r--src/mongo/s/query/router_stage_aggregation_merge.cpp3
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp6
-rw-r--r--src/mongo/s/query/router_stage_limit.h4
-rw-r--r--src/mongo/s/query/router_stage_limit_test.cpp31
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp7
-rw-r--r--src/mongo/s/query/router_stage_merge.h13
-rw-r--r--src/mongo/s/query/router_stage_mock.h1
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp5
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey_test.cpp27
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp6
-rw-r--r--src/mongo/s/query/router_stage_skip.h4
-rw-r--r--src/mongo/s/query/router_stage_skip_test.cpp43
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp3
29 files changed, 365 insertions, 207 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml
index 2f3e11b8a15..5b16e4026e6 100644
--- a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml
@@ -42,6 +42,7 @@ selector:
- jstests/core/startup_log.js # "local" database.
- jstests/core/storageDetailsCommand.js # diskStorageStats.
- jstests/core/tailable_skip_limit.js # capped collections.
+ - jstests/core/tailable_getmore_batch_size.js # capped collections.
- jstests/core/top.js # top.
# The following tests fail because mongos behaves differently from mongod when testing certain
# functionality. The differences are in a comment next to the failing test.
diff --git a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml
index 38405f89481..964a6d437cf 100644
--- a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml
@@ -40,6 +40,7 @@ selector:
- jstests/core/startup_log.js # "local" database.
- jstests/core/storageDetailsCommand.js # diskStorageStats.
- jstests/core/tailable_skip_limit.js # capped collections.
+ - jstests/core/tailable_getmore_batch_size.js # capped collections.
- jstests/core/top.js # top.
# The following tests fail because mongos behaves differently from mongod when testing certain
# functionality. The differences are in a comment next to the failing test.
diff --git a/jstests/core/tailable_getmore_batch_size.js b/jstests/core/tailable_getmore_batch_size.js
new file mode 100644
index 00000000000..be3e21f0a67
--- /dev/null
+++ b/jstests/core/tailable_getmore_batch_size.js
@@ -0,0 +1,94 @@
+// Tests for the behavior of combining the tailable and awaitData options to the getMore command
+// with the batchSize option.
+(function() {
+ "use strict";
+
+ const collName = "tailable_getmore_batch_size";
+ const coll = db[collName];
+ const batchSize = 2;
+
+ function dropAndRecreateColl({numDocs}) {
+ coll.drop();
+ assert.commandWorked(db.createCollection(collName, {capped: true, size: 1024}));
+ const bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < numDocs; ++i) {
+ bulk.insert({_id: i});
+ }
+ assert.writeOK(bulk.execute());
+ }
+
+ // Test that running a find with the 'tailable' option will return results immediately, even if
+ // there are fewer than the specified batch size.
+ dropAndRecreateColl({numDocs: batchSize - 1});
+ let findRes =
+ assert.commandWorked(db.runCommand({find: collName, tailable: true, batchSize: batchSize}));
+ assert.eq(findRes.cursor.firstBatch.length, batchSize - 1);
+ assert.neq(findRes.cursor.id, 0);
+ // Test that the same is true for a find with the 'tailable' and 'awaitData' options set.
+ findRes = assert.commandWorked(
+ db.runCommand({find: collName, tailable: true, awaitData: true, batchSize: batchSize}));
+ assert.eq(findRes.cursor.firstBatch.length, batchSize - 1);
+ assert.neq(findRes.cursor.id, 0);
+
+ /**
+ * Runs a find command with a batchSize of 'batchSize' to establish a cursor. Asserts that the
+ * command worked and that the cursor id is not 0, then returns the cursor id.
+ */
+ function openCursor({batchSize, tailable, awaitData}) {
+ const findRes = assert.commandWorked(db.runCommand(
+ {find: collName, tailable: tailable, awaitData: awaitData, batchSize: batchSize}));
+ assert.eq(findRes.cursor.firstBatch.length, batchSize);
+ assert.neq(findRes.cursor.id, 0);
+ assert.eq(findRes.cursor.ns, coll.getFullName());
+ return findRes.cursor.id;
+ }
+
+ // Test that specifying a batch size to a getMore on a tailable cursor produces a batch of the
+ // desired size when the number of results is larger than the batch size.
+
+ // One batch's worth for the find and one more than one batch's worth for the getMore.
+ dropAndRecreateColl({numDocs: batchSize + (batchSize + 1)});
+ let cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: false});
+ let getMoreRes = assert.commandWorked(
+ db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}));
+ assert.eq(getMoreRes.cursor.nextBatch.length, batchSize);
+
+ // Test that the same is true for a tailable, *awaitData* cursor.
+ cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: true});
+ getMoreRes = assert.commandWorked(
+ db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}));
+ assert.eq(getMoreRes.cursor.nextBatch.length, batchSize);
+
+ // Test that specifying a batch size to a getMore on a tailable cursor returns all
+ // new results immediately, even if the batch size is larger than the number of new results.
+ // One batch's worth for the find and one less than one batch's worth for the getMore.
+ dropAndRecreateColl({numDocs: batchSize + (batchSize - 1)});
+ cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: false});
+ getMoreRes = assert.commandWorked(
+ db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}));
+ assert.eq(getMoreRes.cursor.nextBatch.length, batchSize - 1);
+
+ // Test that the same is true for a tailable, *awaitData* cursor.
+ cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: true});
+ getMoreRes = assert.commandWorked(
+ db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}));
+ assert.eq(getMoreRes.cursor.nextBatch.length, batchSize - 1);
+
+ // Test that using a smaller batch size than there are results will return all results without
+ // empty batches in between (SERVER-30799).
+ dropAndRecreateColl({numDocs: batchSize * 3});
+ cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: false});
+ getMoreRes = assert.commandWorked(
+ db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}));
+ assert.eq(getMoreRes.cursor.nextBatch.length, batchSize);
+ getMoreRes = assert.commandWorked(
+ db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}));
+ assert.eq(getMoreRes.cursor.nextBatch.length, batchSize);
+ getMoreRes = assert.commandWorked(
+ db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize}));
+ assert.eq(getMoreRes.cursor.nextBatch.length, 0);
+
+ // Avoid leaving the cursor open. Cursors above are killed by drops, but we'll avoid dropping
+ // the collection at the end so other consistency checks like validate can be run against it.
+ assert.commandWorked(db.runCommand({killCursors: collName, cursors: [getMoreRes.cursor.id]}));
+}());
diff --git a/jstests/core/tailable_skip_limit.js b/jstests/core/tailable_skip_limit.js
index 7e09d1bfbfa..c7b1ce4c926 100644
--- a/jstests/core/tailable_skip_limit.js
+++ b/jstests/core/tailable_skip_limit.js
@@ -6,16 +6,16 @@
var collname = "jstests_tailable_skip_limit";
var t = db[collname];
t.drop();
- db.createCollection(collname, {capped: true, size: 1024});
+ assert.commandWorked(db.createCollection(collname, {capped: true, size: 1024}));
- t.save({_id: 1});
- t.save({_id: 2});
+ assert.writeOK(t.insert({_id: 1}));
+ assert.writeOK(t.insert({_id: 2}));
// Non-tailable with skip
var cursor = t.find().skip(1);
assert.eq(2, cursor.next()["_id"]);
assert(!cursor.hasNext());
- t.save({_id: 3});
+ assert.writeOK(t.insert({_id: 3}));
assert(!cursor.hasNext());
// Non-tailable with limit
@@ -24,7 +24,7 @@
assert.eq(i, cursor.next()["_id"]);
}
assert(!cursor.hasNext());
- t.save({_id: 4});
+ assert.writeOK(t.insert({_id: 4}));
assert(!cursor.hasNext());
// Non-tailable with negative limit
@@ -33,14 +33,14 @@
assert.eq(i, cursor.next()["_id"]);
}
assert(!cursor.hasNext());
- t.save({_id: 5});
+ assert.writeOK(t.insert({_id: 5}));
assert(!cursor.hasNext());
// Tailable with skip
cursor = t.find().addOption(2).skip(4);
assert.eq(5, cursor.next()["_id"]);
assert(!cursor.hasNext());
- t.save({_id: 6});
+ assert.writeOK(t.insert({_id: 6}));
assert(cursor.hasNext());
assert.eq(6, cursor.next()["_id"]);
@@ -50,7 +50,7 @@
assert.eq(i, cursor.next()["_id"]);
}
assert(!cursor.hasNext());
- t.save({_id: 7});
+ assert.writeOK(t.insert({_id: 7}));
assert(cursor.hasNext());
assert.eq(7, cursor.next()["_id"]);
@@ -76,7 +76,7 @@
// Tests that a tailable cursor over an empty capped collection produces a dead cursor, intended
// to be run on both mongod and mongos. For SERVER-20720.
t.drop();
- db.createCollection(t.getName(), {capped: true, size: 1024});
+ assert.commandWorked(db.createCollection(t.getName(), {capped: true, size: 1024}));
var cmdRes = db.runCommand({find: t.getName(), tailable: true});
assert.commandWorked(cmdRes);
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index ad2987e7603..4626948ece9 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -221,8 +221,7 @@ void CollectionCloner::shutdown() {
void CollectionCloner::_cancelRemainingWork_inlock() {
if (_arm) {
Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- _killArmHandle = _arm->kill(opCtx);
+ _killArmHandle = _arm->kill(cc().getOperationContext());
}
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
@@ -577,7 +576,9 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
_clusterClientCursorParams =
stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator());
_clusterClientCursorParams->remotes = std::move(remoteCursors);
- _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get());
+ Client::initThreadIfNotAlready();
+ _arm = stdx::make_unique<AsyncResultsMerger>(
+ cc().getOperationContext(), _executor, _clusterClientCursorParams.get());
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
@@ -590,6 +591,7 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
// outside the mutex. This is a necessary condition to invoke _finishCallback.
stdx::lock_guard<stdx::mutex> lock(_mutex);
Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
+ _arm->detachFromOperationContext();
if (!scheduleStatus.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
return;
@@ -613,6 +615,9 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS
}
Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
+ Client::initThreadIfNotAlready();
+ auto opCtx = cc().getOperationContext();
+ _arm->reattachToOperationContext(opCtx);
while (_arm->ready()) {
auto armResultStatus = _arm->nextReady();
if (!armResultStatus.getStatus().isOK()) {
@@ -626,6 +631,7 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
_documentsToInsert.push_back(std::move(*queryResult));
}
}
+ _arm->detachFromOperationContext();
return Status::OK();
}
@@ -633,8 +639,9 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
Status CollectionCloner::_scheduleNextARMResultsCallback(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- auto nextEvent = _arm->nextEvent(opCtx);
+ _arm->reattachToOperationContext(cc().getOperationContext());
+ auto nextEvent = _arm->nextEvent();
+ _arm->detachFromOperationContext();
if (!nextEvent.isOK()) {
return nextEvent.getStatus();
}
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 995ebd1a872..f2ba6b2b839 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -331,7 +331,6 @@ BSONObj establishMergingMongosCursor(
params.mergePipeline = std::move(pipelineForMerging);
params.remotes = std::move(cursors);
- params.batchSize = request.getBatchSize();
auto ccc = ClusterClientCursorImpl::make(
opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
@@ -341,8 +340,6 @@ BSONObj establishMergingMongosCursor(
CursorResponseBuilder responseBuilder(true, &cursorResponse);
- ccc->reattachToOperationContext(opCtx);
-
for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
auto next = uassertStatusOK(ccc->next());
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 5a550ae7e24..19281785be0 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -52,9 +52,11 @@ const int kMaxNumFailedHostRetryAttempts = 3;
} // namespace
-AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor,
+AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
ClusterClientCursorParams* params)
- : _executor(executor),
+ : _opCtx(opCtx),
+ _executor(executor),
_params(params),
_mergeQueue(MergingComparator(_remotes, _params->sort)) {
size_t remoteIndex = 0;
@@ -112,14 +114,28 @@ bool AsyncResultsMerger::ready() {
return ready_inlock();
}
+void AsyncResultsMerger::detachFromOperationContext() {
+ _opCtx = nullptr;
+ // If we were about ready to return a boost::none because a tailable cursor reached the end of
+ // the batch, that should no longer apply to the next use - when we are reattached to a
+ // different OperationContext, it signals that the caller is ready for a new batch, and wants us
+ // to request a new batch from the tailable cursor.
+ _eofNext = false;
+}
+
+void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
+ invariant(!_opCtx);
+ _opCtx = opCtx;
+}
+
bool AsyncResultsMerger::ready_inlock() {
if (_lifecycleState != kAlive) {
return true;
}
if (_eofNext) {
- // We are ready to return boost::none due to reaching the end of a batch of results from a
- // tailable cursor.
+ // Mark this operation as ready to return boost::none due to reaching the end of a batch of
+ // results from a tailable cursor.
return true;
}
@@ -239,7 +255,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
return {};
}
-Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* opCtx, size_t remoteIndex) {
+Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -262,15 +278,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* opCtx, size_
.toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, opCtx);
-
- auto callbackStatus =
- _executor->scheduleRemoteCommand(request,
- stdx::bind(&AsyncResultsMerger::handleBatchResponse,
- this,
- stdx::placeholders::_1,
- opCtx,
- remoteIndex));
+ remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, _opCtx);
+
+ auto callbackStatus = _executor->scheduleRemoteCommand(
+ request,
+ stdx::bind(
+ &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex));
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -287,8 +300,7 @@ Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* opCtx, size_
* 2. Remotes that already has some result will have a non-empty buffer.
* 3. Remotes that reached maximum retries will be in 'exhausted' state.
*/
-StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent(
- OperationContext* opCtx) {
+StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_lifecycleState != kAlive) {
@@ -315,7 +327,7 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent(
if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) {
// If this remote is not exhausted and there is no outstanding request for it, schedule
// work to retrieve the next batch.
- auto nextBatchStatus = askForNextBatch_inlock(opCtx, i);
+ auto nextBatchStatus = askForNextBatch_inlock(i);
if (!nextBatchStatus.isOK()) {
return nextBatchStatus;
}
@@ -358,9 +370,7 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj
}
void AsyncResultsMerger::handleBatchResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- OperationContext* opCtx,
- size_t remoteIndex) {
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto& remote = _remotes[remoteIndex];
@@ -385,7 +395,7 @@ void AsyncResultsMerger::handleBatchResponse(
// If the event handle is invalid, then the executor is in the middle of shutting down,
// and we can't schedule any more work for it to complete.
if (_killCursorsScheduledEvent.isValid()) {
- scheduleKillCursors_inlock(opCtx);
+ scheduleKillCursors_inlock(_opCtx);
_executor->signalEvent(_killCursorsScheduledEvent);
}
@@ -442,7 +452,7 @@ void AsyncResultsMerger::handleBatchResponse(
// We do not ask for the next batch if the cursor is tailable, as batches received from remote
// tailable cursors should be passed through to the client without asking for more batches.
if (!_params->isTailable && !remote.hasNext() && !remote.exhausted()) {
- remote.status = askForNextBatch_inlock(opCtx, remoteIndex);
+ remote.status = askForNextBatch_inlock(remoteIndex);
if (!remote.status.isOK()) {
return;
}
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 6b30730392f..04262309a99 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -83,8 +83,14 @@ public:
* buffered results onto _mergeQueue.
*
* The TaskExecutor* must remain valid for the lifetime of the ARM.
+ *
+ * If 'opCtx' may be deleted before this AsyncResultsMerger, the caller must call
+ * detachFromOperationContext() before deleting 'opCtx', and call reattachToOperationContext()
+ * with a new, valid OperationContext before the next use.
*/
- AsyncResultsMerger(executor::TaskExecutor* executor, ClusterClientCursorParams* params);
+ AsyncResultsMerger(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params);
/**
* In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have
@@ -107,6 +113,18 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
/**
+ * Signals to the AsyncResultsMerger that the caller is finished using it in the current
+ * context.
+ */
+ void detachFromOperationContext();
+
+ /**
+ * Provides a new OperationContext to be used by the AsyncResultsMerger - the caller must call
+ * detachFromOperationContext() before 'opCtx' is deleted.
+ */
+ void reattachToOperationContext(OperationContext* opCtx);
+
+ /**
* Returns true if there is no need to schedule remote work in order to take the next action.
* This means that either
* --there is a buffered result which we can return,
@@ -157,7 +175,7 @@ public:
* non-exhausted remotes.
* If there is no sort, the event is signaled when some remote has a buffered result.
*/
- StatusWith<executor::TaskExecutor::EventHandle> nextEvent(OperationContext* opCtx);
+ StatusWith<executor::TaskExecutor::EventHandle> nextEvent();
/**
* Starts shutting down this ARM by canceling all pending requests. Returns a handle to an event
@@ -170,6 +188,10 @@ public:
* killing is considered complete and the ARM may be destroyed immediately.
*
* May be called multiple times (idempotent).
+ *
+ * Note that 'opCtx' may or may not be the same as the operation context to which this cursor is
+ * currently attached. This is so that a killing thread may call this method with its own
+ * operation context.
*/
executor::TaskExecutor::EventHandle kill(OperationContext* opCtx);
@@ -263,7 +285,7 @@ private:
*
* Returns success if the command to retrieve the next batch was scheduled successfully.
*/
- Status askForNextBatch_inlock(OperationContext* opCtx, size_t remoteIndex);
+ Status askForNextBatch_inlock(size_t remoteIndex);
/**
* Checks whether or not the remote cursors are all exhausted.
@@ -294,7 +316,6 @@ private:
* buffered.
*/
void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- OperationContext* opCtx,
size_t remoteIndex);
/**
* Adds the batch of results to the RemoteCursorData. Returns false if there was an error
@@ -321,10 +342,8 @@ private:
*/
void scheduleKillCursors_inlock(OperationContext* opCtx);
- // Not owned here.
+ OperationContext* _opCtx;
executor::TaskExecutor* _executor;
-
- // Not owned here.
ClusterClientCursorParams* _params;
// The metadata obj to pass along with the command request. Used to indicate that the command is
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index bca910ce347..6076a559867 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -132,7 +132,7 @@ protected:
_params->isAllowPartialResults = qr->isAllowPartialResults();
}
- arm = stdx::make_unique<AsyncResultsMerger>(executor(), _params.get());
+ arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), _params.get());
}
/**
@@ -234,7 +234,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
ASSERT_FALSE(arm->remotesExhausted());
// Schedule requests.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
// Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -278,7 +278,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) {
ASSERT_FALSE(arm->remotesExhausted());
// Schedule requests.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
// Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -322,7 +322,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
ASSERT_FALSE(arm->remotesExhausted());
// Schedule requests.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
// Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -354,7 +354,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) {
ASSERT_FALSE(arm->ready());
// Make next event to be signaled.
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
// Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
responses.clear();
@@ -394,7 +394,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) {
ASSERT_FALSE(arm->remotesExhausted());
// Schedule requests.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
// Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -457,7 +457,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
ASSERT_FALSE(arm->remotesExhausted());
// Schedule requests.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
// First shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
std::vector<CursorResponse> responses;
@@ -486,7 +486,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
ASSERT_FALSE(arm->ready());
// Make next event to be signaled.
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
// Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated.
responses.clear();
@@ -514,7 +514,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) {
ASSERT_FALSE(arm->ready());
// Make next event to be signaled.
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
// First shard returns remainder of results.
responses.clear();
@@ -552,7 +552,7 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) {
// Schedule requests.
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Deliver responses.
@@ -604,7 +604,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Parsing the batch results in an error because the sort key is missing.
@@ -621,7 +621,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) {
ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::InternalError);
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -650,7 +650,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) {
ASSERT_FALSE(arm->ready());
// Schedule requests.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
// Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -709,7 +709,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) {
ASSERT_FALSE(arm->ready());
// Schedule requests.
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
// Before any responses are delivered, ARM is not ready to return results.
ASSERT_FALSE(arm->ready());
@@ -749,7 +749,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Both shards respond with the first batch.
@@ -772,7 +772,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// When we ask the shards for their next batch, the first shard responds and the second shard
@@ -791,7 +791,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// We can continue to return results from first shard, while second shard remains unresponsive.
@@ -810,7 +810,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't
// wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of
// the network interface.
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
ASSERT_TRUE(killEvent.isValid());
}
@@ -820,7 +820,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -834,7 +834,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
ASSERT(!arm->nextReady().isOK());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -846,7 +846,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
@@ -863,7 +863,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
ASSERT(!statusWithNext.isOK());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -875,7 +875,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -896,7 +896,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
ASSERT_EQ(statusWithNext.getStatus().reason(), "bad thing happened");
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -906,10 +906,10 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
- // Error to call nextEvent() before the previous event is signaled.
- ASSERT_NOT_OK(arm->nextEvent(nullptr).getStatus());
+ // Error to call nextEvent()() before the previous event is signaled.
+ ASSERT_NOT_OK(arm->nextEvent().getStatus());
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
@@ -926,7 +926,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -936,8 +936,8 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
makeCursorFromExistingCursors(std::move(cursors));
executor()->shutdown();
- ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent(nullptr).getStatus());
- auto killEvent = arm->kill(nullptr);
+ ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus());
+ auto killEvent = arm->kill(operationContext());
ASSERT_FALSE(killEvent.isValid());
}
@@ -948,13 +948,13 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
// Make a request to the shard that will never get answered.
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
blackHoleNextRequest();
// Executor shuts down before a response is received.
executor()->shutdown();
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
ASSERT_FALSE(killEvent.isValid());
}
@@ -964,7 +964,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto killedEvent = arm->kill(nullptr);
+ auto killedEvent = arm->kill(operationContext());
// Killed cursors are considered ready, but return an error when you try to receive the next
// doc.
@@ -982,7 +982,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -996,7 +996,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
CursorResponse::ResponseType::SubsequentResponse);
// Kill should be able to return right away if there are no pending batches.
- auto killedEvent = arm->kill(nullptr);
+ auto killedEvent = arm->kill(operationContext());
ASSERT_TRUE(arm->ready());
ASSERT_NOT_OK(arm->nextReady().getStatus());
executor()->waitForEvent(killedEvent);
@@ -1010,7 +1010,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1020,7 +1020,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
CursorResponse::ResponseType::SubsequentResponse);
// Kill event will only be signalled once the callbacks for the pending batches have run.
- auto killedEvent = arm->kill(nullptr);
+ auto killedEvent = arm->kill(operationContext());
// The pending requests have been canceled, so run their callbacks.
runReadyCallbacks();
@@ -1036,7 +1036,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
makeCursorFromExistingCursors(std::move(cursors));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1045,10 +1045,10 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
scheduleNetworkResponses(std::move(responses),
CursorResponse::ResponseType::SubsequentResponse);
- auto killedEvent = arm->kill(nullptr);
+ auto killedEvent = arm->kill(operationContext());
// Attempting to schedule more network operations on a killed arm is an error.
- ASSERT_NOT_OK(arm->nextEvent(nullptr).getStatus());
+ ASSERT_NOT_OK(arm->nextEvent().getStatus());
executor()->waitForEvent(killedEvent);
}
@@ -1057,9 +1057,9 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {}));
makeCursorFromExistingCursors(std::move(cursors));
- auto killedEvent1 = arm->kill(nullptr);
+ auto killedEvent1 = arm->kill(operationContext());
ASSERT(killedEvent1.isValid());
- auto killedEvent2 = arm->kill(nullptr);
+ auto killedEvent2 = arm->kill(operationContext());
ASSERT(killedEvent2.isValid());
executor()->waitForEvent(killedEvent1);
executor()->waitForEvent(killedEvent2);
@@ -1072,7 +1072,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1093,7 +1093,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
ASSERT_FALSE(arm->remotesExhausted());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
responses.clear();
@@ -1110,7 +1110,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
- auto killedEvent = arm->kill(nullptr);
+ auto killedEvent = arm->kill(operationContext());
executor()->waitForEvent(killedEvent);
}
@@ -1121,7 +1121,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Remote responds with an empty batch and a non-zero cursor id.
@@ -1138,7 +1138,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
- auto killedEvent = arm->kill(nullptr);
+ auto killedEvent = arm->kill(operationContext());
executor()->waitForEvent(killedEvent);
}
@@ -1149,7 +1149,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Remote responds with an empty batch and a zero cursor id.
@@ -1174,7 +1174,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1195,7 +1195,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(0), batch2);
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
BSONObj scheduledCmd = getFirstPendingRequest().cmdObj;
auto request = GetMoreRequest::parseFromBSON("anydbname", scheduledCmd);
@@ -1221,7 +1221,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
ReadPreferenceSetting(ReadPreference::Nearest));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
BSONObj cmdRequestMetadata = getFirstPendingRequest().metadata;
@@ -1251,7 +1251,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// An error occurs with the first host.
@@ -1275,7 +1275,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Now the second host becomes unreachable. We should still be willing to return results from
@@ -1294,7 +1294,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Once the last reachable shard indicates that its cursor is closed, we're done.
@@ -1316,7 +1316,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1332,7 +1332,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// The lone host involved in this query returns an error. This should simply cause us to return
@@ -1350,7 +1350,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// First host returns single result
@@ -1380,7 +1380,7 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Both hosts return network (retriable) errors.
@@ -1396,7 +1396,7 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
ASSERT_EQ(statusWithNext.getStatus().reason(), "host unreachable");
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -1407,7 +1407,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1425,7 +1425,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ readyEvent = unittest::assertGet(arm->nextEvent());
ASSERT_FALSE(arm->ready());
// Pending getMore request should include maxTimeMS.
@@ -1456,7 +1456,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -1467,7 +1467,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
@@ -1478,13 +1478,13 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
makeCursorFromExistingCursors(std::move(cursors), findCmd);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"});
- ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent(nullptr).getStatus());
+ ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent().getStatus());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill(nullptr);
+ auto killEvent = arm->kill(operationContext());
executor()->waitForEvent(killEvent);
}
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index aca0f54c704..d4b89d74dfc 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -77,13 +77,13 @@ public:
virtual void kill(OperationContext* opCtx) = 0;
/**
- * Sets the operation context for the cursor. Must be called before the first call to next().
- * The cursor attaches to a new OperationContext on each getMore.
+ * Sets the operation context for the cursor.
*/
virtual void reattachToOperationContext(OperationContext* opCtx) = 0;
/**
- * Detaches the cursor from its current OperationContext.
+ * Detaches the cursor from its current OperationContext. Must be called before the
+ * OperationContext in use is deleted.
*/
virtual void detachFromOperationContext() = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 0e6b5a197ce..e7716355c0f 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -63,15 +63,16 @@ std::unique_ptr<ClusterClientCursor> ClusterClientCursorGuard::releaseCursor() {
ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams&& params) {
- std::unique_ptr<ClusterClientCursor> cursor(
- new ClusterClientCursorImpl(executor, std::move(params), opCtx->getLogicalSessionId()));
+ std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl(
+ opCtx, executor, std::move(params), opCtx->getLogicalSessionId()));
return ClusterClientCursorGuard(opCtx, std::move(cursor));
}
-ClusterClientCursorImpl::ClusterClientCursorImpl(executor::TaskExecutor* executor,
+ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid)
- : _params(std::move(params)), _root(buildMergerPlan(executor, &_params)), _lsid(lsid) {}
+ : _params(std::move(params)), _root(buildMergerPlan(opCtx, executor, &_params)), _lsid(lsid) {}
ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root,
ClusterClientCursorParams&& params,
@@ -140,7 +141,7 @@ boost::optional<LogicalSessionId> ClusterClientCursorImpl::getLsid() const {
}
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
- executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
+ OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
const auto skip = params->skip;
const auto limit = params->limit;
const bool hasSort = !params->sort.isEmpty();
@@ -153,18 +154,19 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
return stdx::make_unique<RouterStageAggregationMerge>(std::move(params->mergePipeline));
}
- std::unique_ptr<RouterExecStage> root = stdx::make_unique<RouterStageMerge>(executor, params);
+ std::unique_ptr<RouterExecStage> root =
+ stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
if (skip) {
- root = stdx::make_unique<RouterStageSkip>(std::move(root), *skip);
+ root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip);
}
if (limit) {
- root = stdx::make_unique<RouterStageLimit>(std::move(root), *limit);
+ root = stdx::make_unique<RouterStageLimit>(opCtx, std::move(root), *limit);
}
if (hasSort) {
- root = stdx::make_unique<RouterStageRemoveSortKey>(std::move(root));
+ root = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(root));
}
return root;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 07dbab2094d..8b5e5b0a855 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -123,7 +123,8 @@ public:
/**
* Constructs a cluster client cursor.
*/
- ClusterClientCursorImpl(executor::TaskExecutor* executor,
+ ClusterClientCursorImpl(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid);
@@ -131,7 +132,8 @@ private:
/**
* Constructs the pipeline of MergerPlanStages which will be used to answer the query.
*/
- std::unique_ptr<RouterExecStage> buildMergerPlan(executor::TaskExecutor* executor,
+ std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
ClusterClientCursorParams* params);
ClusterClientCursorParams _params;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 8598fe192dc..08dcdb8c303 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -40,13 +40,12 @@ namespace mongo {
namespace {
-// Note: Though the next() method on RouterExecStage and its subclasses depend on an
-// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are
-// mocked in this test using RouterStageMock. RouterStageMock does not actually use the
-// OperationContext, so we omit the call to rettachToOperationContext in these tests.
+// These tests use RouterStageMock, which does not actually use its OperationContext, so rather than
+// going through the trouble of making one, we'll just use nullptr throughout.
+OperationContext* opCtx = nullptr;
TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
for (int i = 1; i < 10; ++i) {
mockStage->queueResult(BSON("a" << i));
}
@@ -71,7 +70,7 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
}
TEST(ClusterClientCursorImpl, QueueResult) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 4));
@@ -110,7 +109,7 @@ TEST(ClusterClientCursorImpl, QueueResult) {
}
TEST(ClusterClientCursorImpl, RemotesExhausted) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->markRemotesExhausted();
@@ -141,7 +140,7 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) {
}
TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
auto mockStagePtr = mockStage.get();
ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
@@ -157,13 +156,13 @@ TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) {
TEST(ClusterClientCursorImpl, LogicalSessionIdsOnCursors) {
// Make a cursor with no lsid
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
ClusterClientCursorParams params(NamespaceString("test"), {});
ClusterClientCursorImpl cursor{std::move(mockStage), std::move(params), boost::none};
ASSERT(!cursor.getLsid());
// Make a cursor with an lsid
- auto mockStage2 = stdx::make_unique<RouterStageMock>();
+ auto mockStage2 = stdx::make_unique<RouterStageMock>(opCtx);
ClusterClientCursorParams params2(NamespaceString("test"), {});
auto lsid = makeLogicalSessionIdForTest();
ClusterClientCursorImpl cursor2{std::move(mockStage2), std::move(params2), lsid};
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index b7a5ecc6b34..27afa51d5a5 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -274,8 +274,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
int bytesBuffered = 0;
- ccc->reattachToOperationContext(opCtx);
-
while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) {
auto next = ccc->next();
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 1f2fd2a9e7f..ac074d92b62 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -52,8 +52,9 @@ class OperationContext;
*/
class RouterExecStage {
public:
- RouterExecStage() = default;
- RouterExecStage(std::unique_ptr<RouterExecStage> child) : _child(std::move(child)) {}
+ RouterExecStage(OperationContext* opCtx) : _opCtx(opCtx) {}
+ RouterExecStage(OperationContext* opCtx, std::unique_ptr<RouterExecStage> child)
+ : _opCtx(opCtx), _child(std::move(child)) {}
virtual ~RouterExecStage() = default;
@@ -71,6 +72,10 @@ public:
/**
* Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for
* responses from remote hosts.
+ *
+ * Note that 'opCtx' may or may not be the same as the operation context to which this cursor is
+ * currently attached. This is so that a killing thread may call this method with its own
+ * operation context.
*/
virtual void kill(OperationContext* opCtx) = 0;
@@ -144,8 +149,8 @@ protected:
}
private:
- std::unique_ptr<RouterExecStage> _child;
OperationContext* _opCtx = nullptr;
+ std::unique_ptr<RouterExecStage> _child;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_aggregation_merge.cpp b/src/mongo/s/query/router_stage_aggregation_merge.cpp
index 6fdfdc4fea2..a4273dbd4a7 100644
--- a/src/mongo/s/query/router_stage_aggregation_merge.cpp
+++ b/src/mongo/s/query/router_stage_aggregation_merge.cpp
@@ -37,7 +37,8 @@ namespace mongo {
RouterStageAggregationMerge::RouterStageAggregationMerge(
std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline)
- : _mergePipeline(std::move(mergePipeline)) {}
+ : RouterExecStage(mergePipeline->getContext()->opCtx),
+ _mergePipeline(std::move(mergePipeline)) {}
StatusWith<ClusterQueryResult> RouterStageAggregationMerge::next() {
// Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index 03711279e07..feb8f11626f 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -34,8 +34,10 @@
namespace mongo {
-RouterStageLimit::RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit)
- : RouterExecStage(std::move(child)), _limit(limit) {
+RouterStageLimit::RouterStageLimit(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child,
+ long long limit)
+ : RouterExecStage(opCtx, std::move(child)), _limit(limit) {
invariant(limit > 0);
}
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index cb2fd708835..42ef46c21ab 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -37,7 +37,9 @@ namespace mongo {
*/
class RouterStageLimit final : public RouterExecStage {
public:
- RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit);
+ RouterStageLimit(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child,
+ long long limit);
StatusWith<ClusterQueryResult> next() final;
diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp
index 61689e4cd6a..ffbacf1f17d 100644
--- a/src/mongo/s/query/router_stage_limit_test.cpp
+++ b/src/mongo/s/query/router_stage_limit_test.cpp
@@ -40,18 +40,17 @@ namespace mongo {
namespace {
-// Note: Though the next() method on RouterExecStage and its subclasses depend on an
-// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are
-// mocked in this test using RouterStageMock. RouterStageMock does not actually use the
-// OperationContext, so we omit the call to rettachToOperationContext in these tests.
+// These tests use router stages, which do not actually use their OperationContext, so rather than
+// going through the trouble of making one, we'll just use nullptr throughout.
+OperationContext* opCtx = nullptr;
TEST(RouterStageLimitTest, LimitIsOne) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult({BSON("a" << 1)});
mockStage->queueResult({BSON("a" << 2)});
mockStage->queueResult({BSON("a" << 3)});
- auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 1);
+ auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 1);
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -69,12 +68,12 @@ TEST(RouterStageLimitTest, LimitIsOne) {
}
TEST(RouterStageLimitTest, LimitIsTwo) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened"));
- auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2);
+ auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 2);
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -92,13 +91,13 @@ TEST(RouterStageLimitTest, LimitIsTwo) {
}
TEST(RouterStageLimitTest, LimitStagePropagatesError) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened"));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
- auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 3);
+ auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 3);
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -115,13 +114,13 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) {
// Here we're mocking the tailable case, where there may be a boost::none returned before the
// remote cursor is closed. Our goal is to make sure that the limit stage handles this properly,
// not counting boost::none towards the limit.
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueEOF();
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
- auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2);
+ auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 2);
auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -143,12 +142,12 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) {
}
TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->markRemotesExhausted();
- auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100);
+ auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 100);
ASSERT_TRUE(limitStage->remotesExhausted());
auto firstResult = limitStage->next();
@@ -170,11 +169,11 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
}
TEST(RouterStageLimitTest, ForwardsAwaitDataTimeout) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
auto mockStagePtr = mockStage.get();
ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
- auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100);
+ auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 100);
ASSERT_OK(limitStage->setAwaitDataTimeout(Milliseconds(789)));
auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 72fe7a06624..78ee1a3475a 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -36,13 +36,14 @@
namespace mongo {
-RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor,
+RouterStageMerge::RouterStageMerge(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
ClusterClientCursorParams* params)
- : _executor(executor), _arm(executor, params) {}
+ : RouterExecStage(opCtx), _executor(executor), _arm(opCtx, executor, params) {}
StatusWith<ClusterQueryResult> RouterStageMerge::next() {
while (!_arm.ready()) {
- auto nextEventStatus = _arm.nextEvent(getOpCtx());
+ auto nextEventStatus = _arm.nextEvent();
if (!nextEventStatus.isOK()) {
return nextEventStatus.getStatus();
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index caae43877c6..23503c664f6 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -43,7 +43,9 @@ namespace mongo {
*/
class RouterStageMerge final : public RouterExecStage {
public:
- RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams* params);
+ RouterStageMerge(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params);
StatusWith<ClusterQueryResult> next() final;
@@ -53,6 +55,15 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+protected:
+ void doReattachToOperationContext() override {
+ _arm.reattachToOperationContext(getOpCtx());
+ }
+
+ virtual void doDetachFromOperationContext() {
+ _arm.detachFromOperationContext();
+ }
+
private:
// Not owned here.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index 18baaeacd74..e2f8e7adab5 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -42,6 +42,7 @@ namespace mongo {
*/
class RouterStageMock final : public RouterExecStage {
public:
+ RouterStageMock(OperationContext* opCtx) : RouterExecStage(opCtx) {}
~RouterStageMock() final {}
StatusWith<ClusterQueryResult> next() final;
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index fecf5440898..fe7a8cf0f7d 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -38,8 +38,9 @@
namespace mongo {
-RouterStageRemoveSortKey::RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child)
- : RouterExecStage(std::move(child)) {}
+RouterStageRemoveSortKey::RouterStageRemoveSortKey(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child)
+ : RouterExecStage(opCtx, std::move(child)) {}
StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() {
auto childResult = getChildStage()->next();
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index c2329bbc93d..ba71364dfa9 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -39,7 +39,7 @@ namespace mongo {
*/
class RouterStageRemoveSortKey final : public RouterExecStage {
public:
- RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child);
+ RouterStageRemoveSortKey(OperationContext* opCtx, std::unique_ptr<RouterExecStage> child);
StatusWith<ClusterQueryResult> next() final;
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
index 5db61b7b0a9..5767549ad64 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
@@ -40,20 +40,19 @@ namespace mongo {
namespace {
-// Note: Though the next() method on RouterExecStage and its subclasses depend on an
-// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are
-// mocked in this test using RouterStageMock. RouterStageMock does not actually use the
-// OperationContext, so we omit the call to rettachToOperationContext in these tests.
+// These tests use router stages, which do not actually use their OperationContext, so rather than
+// going through the trouble of making one, we'll just use nullptr throughout.
+OperationContext* opCtx = nullptr;
TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3));
mockStage->queueResult(BSON("$sortKey" << BSON("" << 3) << "c" << BSON("d"
<< "foo")));
mockStage->queueResult(BSON("a" << 3));
mockStage->queueResult(BSONObj());
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -83,11 +82,11 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
}
TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("$sortKey" << 1));
mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened"));
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -101,12 +100,12 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
}
TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1));
mockStage->queueEOF();
mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2));
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -128,12 +127,12 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
}
TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1));
mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2));
mockStage->markRemotesExhausted();
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
auto firstResult = sortKeyStage->next();
@@ -155,11 +154,11 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
}
TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
auto mockStagePtr = mockStage.get();
ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789)));
auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 7510e3aefd6..50d2107b14c 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -34,8 +34,10 @@
namespace mongo {
-RouterStageSkip::RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip)
- : RouterExecStage(std::move(child)), _skip(skip) {
+RouterStageSkip::RouterStageSkip(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child,
+ long long skip)
+ : RouterExecStage(opCtx, std::move(child)), _skip(skip) {
invariant(skip > 0);
}
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index c6dc1adda39..49051128577 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -37,7 +37,9 @@ namespace mongo {
*/
class RouterStageSkip final : public RouterExecStage {
public:
- RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip);
+ RouterStageSkip(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child,
+ long long skip);
StatusWith<ClusterQueryResult> next() final;
diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp
index f1a58371b5c..1e2ca91dba5 100644
--- a/src/mongo/s/query/router_stage_skip_test.cpp
+++ b/src/mongo/s/query/router_stage_skip_test.cpp
@@ -40,18 +40,17 @@ namespace mongo {
namespace {
-// Note: Though the next() method on RouterExecStage and its subclasses depend on an
-// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are
-// mocked in this test using RouterStageMock. RouterStageMock does not actually use the
-// OperationContext, so we omit the call to rettachToOperationContext in these tests.
+// These tests use RouterStageMock, which does not actually use its OperationContext, so rather than
+// going through the trouble of making one, we'll just use nullptr throughout.
+OperationContext* opCtx = nullptr;
TEST(RouterStageSkipTest, SkipIsOne) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 1);
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -74,13 +73,13 @@ TEST(RouterStageSkipTest, SkipIsOne) {
}
TEST(RouterStageSkipTest, SkipIsThree) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
mockStage->queueResult(BSON("a" << 4));
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 3);
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -93,12 +92,12 @@ TEST(RouterStageSkipTest, SkipIsThree) {
}
TEST(RouterStageSkipTest, SkipEqualToResultSetSize) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 3);
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -106,12 +105,12 @@ TEST(RouterStageSkipTest, SkipEqualToResultSetSize) {
}
TEST(RouterStageSkipTest, SkipExceedsResultSetSize) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 100);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 100);
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -119,13 +118,13 @@ TEST(RouterStageSkipTest, SkipExceedsResultSetSize) {
}
TEST(RouterStageSkipTest, ErrorWhileSkippingResults) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened"));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 2);
auto firstResult = skipStage->next();
ASSERT_NOT_OK(firstResult.getStatus());
@@ -134,13 +133,13 @@ TEST(RouterStageSkipTest, ErrorWhileSkippingResults) {
}
TEST(RouterStageSkipTest, ErrorAfterSkippingResults) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened"));
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 2);
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -155,13 +154,13 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) {
TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) {
// Skip stage must propagate a boost::none, but not count it towards the skip value.
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueEOF();
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 2);
auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -178,13 +177,13 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) {
}
TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1));
mockStage->queueResult(BSON("a" << 2));
mockStage->queueResult(BSON("a" << 3));
mockStage->markRemotesExhausted();
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 1);
ASSERT_TRUE(skipStage->remotesExhausted());
auto firstResult = skipStage->next();
@@ -206,11 +205,11 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
}
TEST(RouterStageSkipTest, ForwardsAwaitDataTimeout) {
- auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
auto mockStagePtr = mockStage.get();
ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
- auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
+ auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 3);
ASSERT_OK(skipStage->setAwaitDataTimeout(Milliseconds(789)));
auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index a2f912971d4..dce282c5892 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -70,6 +70,9 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));
+ // We don't expect to use this cursor until a subsequent getMore, so detach from the current
+ // OperationContext until then.
+ ccc->detachFromOperationContext();
auto clusterCursorId =
cursorManager->registerCursor(opCtx,
ccc.releaseCursor(),