diff options
29 files changed, 365 insertions, 207 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml index 2f3e11b8a15..5b16e4026e6 100644 --- a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml @@ -42,6 +42,7 @@ selector: - jstests/core/startup_log.js # "local" database. - jstests/core/storageDetailsCommand.js # diskStorageStats. - jstests/core/tailable_skip_limit.js # capped collections. + - jstests/core/tailable_getmore_batch_size.js # capped collections. - jstests/core/top.js # top. # The following tests fail because mongos behaves differently from mongod when testing certain # functionality. The differences are in a comment next to the failing test. diff --git a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml index 38405f89481..964a6d437cf 100644 --- a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml @@ -40,6 +40,7 @@ selector: - jstests/core/startup_log.js # "local" database. - jstests/core/storageDetailsCommand.js # diskStorageStats. - jstests/core/tailable_skip_limit.js # capped collections. + - jstests/core/tailable_getmore_batch_size.js # capped collections. - jstests/core/top.js # top. # The following tests fail because mongos behaves differently from mongod when testing certain # functionality. The differences are in a comment next to the failing test. diff --git a/jstests/core/tailable_getmore_batch_size.js b/jstests/core/tailable_getmore_batch_size.js new file mode 100644 index 00000000000..be3e21f0a67 --- /dev/null +++ b/jstests/core/tailable_getmore_batch_size.js @@ -0,0 +1,94 @@ +// Tests for the behavior of combining the tailable and awaitData options to the getMore command +// with the batchSize option. +(function() { + "use strict"; + + const collName = "tailable_getmore_batch_size"; + const coll = db[collName]; + const batchSize = 2; + + function dropAndRecreateColl({numDocs}) { + coll.drop(); + assert.commandWorked(db.createCollection(collName, {capped: true, size: 1024})); + const bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < numDocs; ++i) { + bulk.insert({_id: i}); + } + assert.writeOK(bulk.execute()); + } + + // Test that running a find with the 'tailable' option will return results immediately, even if + // there are fewer than the specified batch size. + dropAndRecreateColl({numDocs: batchSize - 1}); + let findRes = + assert.commandWorked(db.runCommand({find: collName, tailable: true, batchSize: batchSize})); + assert.eq(findRes.cursor.firstBatch.length, batchSize - 1); + assert.neq(findRes.cursor.id, 0); + // Test that the same is true for a find with the 'tailable' and 'awaitData' options set. + findRes = assert.commandWorked( + db.runCommand({find: collName, tailable: true, awaitData: true, batchSize: batchSize})); + assert.eq(findRes.cursor.firstBatch.length, batchSize - 1); + assert.neq(findRes.cursor.id, 0); + + /** + * Runs a find command with a batchSize of 'batchSize' to establish a cursor. Asserts that the + * command worked and that the cursor id is not 0, then returns the cursor id. + */ + function openCursor({batchSize, tailable, awaitData}) { + const findRes = assert.commandWorked(db.runCommand( + {find: collName, tailable: tailable, awaitData: awaitData, batchSize: batchSize})); + assert.eq(findRes.cursor.firstBatch.length, batchSize); + assert.neq(findRes.cursor.id, 0); + assert.eq(findRes.cursor.ns, coll.getFullName()); + return findRes.cursor.id; + } + + // Test that specifying a batch size to a getMore on a tailable cursor produces a batch of the + // desired size when the number of results is larger than the batch size. + + // One batch's worth for the find and one more than one batch's worth for the getMore. + dropAndRecreateColl({numDocs: batchSize + (batchSize + 1)}); + let cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: false}); + let getMoreRes = assert.commandWorked( + db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize})); + assert.eq(getMoreRes.cursor.nextBatch.length, batchSize); + + // Test that the same is true for a tailable, *awaitData* cursor. + cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: true}); + getMoreRes = assert.commandWorked( + db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize})); + assert.eq(getMoreRes.cursor.nextBatch.length, batchSize); + + // Test that specifying a batch size to a getMore on a tailable cursor returns all + // new results immediately, even if the batch size is larger than the number of new results. + // One batch's worth for the find and one less than one batch's worth for the getMore. + dropAndRecreateColl({numDocs: batchSize + (batchSize - 1)}); + cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: false}); + getMoreRes = assert.commandWorked( + db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize})); + assert.eq(getMoreRes.cursor.nextBatch.length, batchSize - 1); + + // Test that the same is true for a tailable, *awaitData* cursor. + cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: true}); + getMoreRes = assert.commandWorked( + db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize})); + assert.eq(getMoreRes.cursor.nextBatch.length, batchSize - 1); + + // Test that using a smaller batch size than there are results will return all results without + // empty batches in between (SERVER-30799). + dropAndRecreateColl({numDocs: batchSize * 3}); + cursorId = openCursor({batchSize: batchSize, tailable: true, awaitData: false}); + getMoreRes = assert.commandWorked( + db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize})); + assert.eq(getMoreRes.cursor.nextBatch.length, batchSize); + getMoreRes = assert.commandWorked( + db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize})); + assert.eq(getMoreRes.cursor.nextBatch.length, batchSize); + getMoreRes = assert.commandWorked( + db.runCommand({getMore: cursorId, collection: collName, batchSize: batchSize})); + assert.eq(getMoreRes.cursor.nextBatch.length, 0); + + // Avoid leaving the cursor open. Cursors above are killed by drops, but we'll avoid dropping + // the collection at the end so other consistency checks like validate can be run against it. + assert.commandWorked(db.runCommand({killCursors: collName, cursors: [getMoreRes.cursor.id]})); +}()); diff --git a/jstests/core/tailable_skip_limit.js b/jstests/core/tailable_skip_limit.js index 7e09d1bfbfa..c7b1ce4c926 100644 --- a/jstests/core/tailable_skip_limit.js +++ b/jstests/core/tailable_skip_limit.js @@ -6,16 +6,16 @@ var collname = "jstests_tailable_skip_limit"; var t = db[collname]; t.drop(); - db.createCollection(collname, {capped: true, size: 1024}); + assert.commandWorked(db.createCollection(collname, {capped: true, size: 1024})); - t.save({_id: 1}); - t.save({_id: 2}); + assert.writeOK(t.insert({_id: 1})); + assert.writeOK(t.insert({_id: 2})); // Non-tailable with skip var cursor = t.find().skip(1); assert.eq(2, cursor.next()["_id"]); assert(!cursor.hasNext()); - t.save({_id: 3}); + assert.writeOK(t.insert({_id: 3})); assert(!cursor.hasNext()); // Non-tailable with limit @@ -24,7 +24,7 @@ assert.eq(i, cursor.next()["_id"]); } assert(!cursor.hasNext()); - t.save({_id: 4}); + assert.writeOK(t.insert({_id: 4})); assert(!cursor.hasNext()); // Non-tailable with negative limit @@ -33,14 +33,14 @@ assert.eq(i, cursor.next()["_id"]); } assert(!cursor.hasNext()); - t.save({_id: 5}); + assert.writeOK(t.insert({_id: 5})); assert(!cursor.hasNext()); // Tailable with skip cursor = t.find().addOption(2).skip(4); assert.eq(5, cursor.next()["_id"]); assert(!cursor.hasNext()); - t.save({_id: 6}); + assert.writeOK(t.insert({_id: 6})); assert(cursor.hasNext()); assert.eq(6, cursor.next()["_id"]); @@ -50,7 +50,7 @@ assert.eq(i, cursor.next()["_id"]); } assert(!cursor.hasNext()); - t.save({_id: 7}); + assert.writeOK(t.insert({_id: 7})); assert(cursor.hasNext()); assert.eq(7, cursor.next()["_id"]); @@ -76,7 +76,7 @@ // Tests that a tailable cursor over an empty capped collection produces a dead cursor, intended // to be run on both mongod and mongos. For SERVER-20720. t.drop(); - db.createCollection(t.getName(), {capped: true, size: 1024}); + assert.commandWorked(db.createCollection(t.getName(), {capped: true, size: 1024})); var cmdRes = db.runCommand({find: t.getName(), tailable: true}); assert.commandWorked(cmdRes); diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index ad2987e7603..4626948ece9 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -221,8 +221,7 @@ void CollectionCloner::shutdown() { void CollectionCloner::_cancelRemainingWork_inlock() { if (_arm) { Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _killArmHandle = _arm->kill(opCtx); + _killArmHandle = _arm->kill(cc().getOperationContext()); } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); @@ -577,7 +576,9 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa _clusterClientCursorParams = stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()); _clusterClientCursorParams->remotes = std::move(remoteCursors); - _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get()); + Client::initThreadIfNotAlready(); + _arm = stdx::make_unique<AsyncResultsMerger>( + cc().getOperationContext(), _executor, _clusterClientCursorParams.get()); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -590,6 +591,7 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard<stdx::mutex> lock(_mutex); Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); + _arm->detachFromOperationContext(); if (!scheduleStatus.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); return; @@ -613,6 +615,9 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS } Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { + Client::initThreadIfNotAlready(); + auto opCtx = cc().getOperationContext(); + _arm->reattachToOperationContext(opCtx); while (_arm->ready()) { auto armResultStatus = _arm->nextReady(); if (!armResultStatus.getStatus().isOK()) { @@ -626,6 +631,7 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { _documentsToInsert.push_back(std::move(*queryResult)); } } + _arm->detachFromOperationContext(); return Status::OK(); } @@ -633,8 +639,9 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { Status CollectionCloner::_scheduleNextARMResultsCallback( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - auto nextEvent = _arm->nextEvent(opCtx); + _arm->reattachToOperationContext(cc().getOperationContext()); + auto nextEvent = _arm->nextEvent(); + _arm->detachFromOperationContext(); if (!nextEvent.isOK()) { return nextEvent.getStatus(); } diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 995ebd1a872..f2ba6b2b839 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -331,7 +331,6 @@ BSONObj establishMergingMongosCursor( params.mergePipeline = std::move(pipelineForMerging); params.remotes = std::move(cursors); - params.batchSize = request.getBatchSize(); auto ccc = ClusterClientCursorImpl::make( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); @@ -341,8 +340,6 @@ BSONObj establishMergingMongosCursor( CursorResponseBuilder responseBuilder(true, &cursorResponse); - ccc->reattachToOperationContext(opCtx); - for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { auto next = uassertStatusOK(ccc->next()); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 5a550ae7e24..19281785be0 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -52,9 +52,11 @@ const int kMaxNumFailedHostRetryAttempts = 3; } // namespace -AsyncResultsMerger::AsyncResultsMerger(executor::TaskExecutor* executor, +AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, + executor::TaskExecutor* executor, ClusterClientCursorParams* params) - : _executor(executor), + : _opCtx(opCtx), + _executor(executor), _params(params), _mergeQueue(MergingComparator(_remotes, _params->sort)) { size_t remoteIndex = 0; @@ -112,14 +114,28 @@ bool AsyncResultsMerger::ready() { return ready_inlock(); } +void AsyncResultsMerger::detachFromOperationContext() { + _opCtx = nullptr; + // If we were about ready to return a boost::none because a tailable cursor reached the end of + // the batch, that should no longer apply to the next use - when we are reattached to a + // different OperationContext, it signals that the caller is ready for a new batch, and wants us + // to request a new batch from the tailable cursor. + _eofNext = false; +} + +void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { + invariant(!_opCtx); + _opCtx = opCtx; +} + bool AsyncResultsMerger::ready_inlock() { if (_lifecycleState != kAlive) { return true; } if (_eofNext) { - // We are ready to return boost::none due to reaching the end of a batch of results from a - // tailable cursor. + // Mark this operation as ready to return boost::none due to reaching the end of a batch of + // results from a tailable cursor. return true; } @@ -239,7 +255,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { return {}; } -Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* opCtx, size_t remoteIndex) { +Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -262,15 +278,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* opCtx, size_ .toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, opCtx); - - auto callbackStatus = - _executor->scheduleRemoteCommand(request, - stdx::bind(&AsyncResultsMerger::handleBatchResponse, - this, - stdx::placeholders::_1, - opCtx, - remoteIndex)); + remote.getTargetHost(), _params->nsString.db().toString(), cmdObj, _metadataObj, _opCtx); + + auto callbackStatus = _executor->scheduleRemoteCommand( + request, + stdx::bind( + &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex)); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -287,8 +300,7 @@ Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* opCtx, size_ * 2. Remotes that already has some result will have a non-empty buffer. * 3. Remotes that reached maximum retries will be in 'exhausted' state. */ -StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent( - OperationContext* opCtx) { +StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_lifecycleState != kAlive) { @@ -315,7 +327,7 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent( if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { // If this remote is not exhausted and there is no outstanding request for it, schedule // work to retrieve the next batch. - auto nextBatchStatus = askForNextBatch_inlock(opCtx, i); + auto nextBatchStatus = askForNextBatch_inlock(i); if (!nextBatchStatus.isOK()) { return nextBatchStatus; } @@ -358,9 +370,7 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj } void AsyncResultsMerger::handleBatchResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - OperationContext* opCtx, - size_t remoteIndex) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) { stdx::lock_guard<stdx::mutex> lk(_mutex); auto& remote = _remotes[remoteIndex]; @@ -385,7 +395,7 @@ void AsyncResultsMerger::handleBatchResponse( // If the event handle is invalid, then the executor is in the middle of shutting down, // and we can't schedule any more work for it to complete. if (_killCursorsScheduledEvent.isValid()) { - scheduleKillCursors_inlock(opCtx); + scheduleKillCursors_inlock(_opCtx); _executor->signalEvent(_killCursorsScheduledEvent); } @@ -442,7 +452,7 @@ void AsyncResultsMerger::handleBatchResponse( // We do not ask for the next batch if the cursor is tailable, as batches received from remote // tailable cursors should be passed through to the client without asking for more batches. if (!_params->isTailable && !remote.hasNext() && !remote.exhausted()) { - remote.status = askForNextBatch_inlock(opCtx, remoteIndex); + remote.status = askForNextBatch_inlock(remoteIndex); if (!remote.status.isOK()) { return; } diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 6b30730392f..04262309a99 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -83,8 +83,14 @@ public: * buffered results onto _mergeQueue. * * The TaskExecutor* must remain valid for the lifetime of the ARM. + * + * If 'opCtx' may be deleted before this AsyncResultsMerger, the caller must call + * detachFromOperationContext() before deleting 'opCtx', and call reattachToOperationContext() + * with a new, valid OperationContext before the next use. */ - AsyncResultsMerger(executor::TaskExecutor* executor, ClusterClientCursorParams* params); + AsyncResultsMerger(OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params); /** * In order to be destroyed, either the ARM must have been kill()'ed or all cursors must have @@ -107,6 +113,18 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); /** + * Signals to the AsyncResultsMerger that the caller is finished using it in the current + * context. + */ + void detachFromOperationContext(); + + /** + * Provides a new OperationContext to be used by the AsyncResultsMerger - the caller must call + * detachFromOperationContext() before 'opCtx' is deleted. + */ + void reattachToOperationContext(OperationContext* opCtx); + + /** * Returns true if there is no need to schedule remote work in order to take the next action. * This means that either * --there is a buffered result which we can return, @@ -157,7 +175,7 @@ public: * non-exhausted remotes. * If there is no sort, the event is signaled when some remote has a buffered result. */ - StatusWith<executor::TaskExecutor::EventHandle> nextEvent(OperationContext* opCtx); + StatusWith<executor::TaskExecutor::EventHandle> nextEvent(); /** * Starts shutting down this ARM by canceling all pending requests. Returns a handle to an event @@ -170,6 +188,10 @@ public: * killing is considered complete and the ARM may be destroyed immediately. * * May be called multiple times (idempotent). + * + * Note that 'opCtx' may or may not be the same as the operation context to which this cursor is + * currently attached. This is so that a killing thread may call this method with its own + * operation context. */ executor::TaskExecutor::EventHandle kill(OperationContext* opCtx); @@ -263,7 +285,7 @@ private: * * Returns success if the command to retrieve the next batch was scheduled successfully. */ - Status askForNextBatch_inlock(OperationContext* opCtx, size_t remoteIndex); + Status askForNextBatch_inlock(size_t remoteIndex); /** * Checks whether or not the remote cursors are all exhausted. @@ -294,7 +316,6 @@ private: * buffered. */ void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - OperationContext* opCtx, size_t remoteIndex); /** * Adds the batch of results to the RemoteCursorData. Returns false if there was an error @@ -321,10 +342,8 @@ private: */ void scheduleKillCursors_inlock(OperationContext* opCtx); - // Not owned here. + OperationContext* _opCtx; executor::TaskExecutor* _executor; - - // Not owned here. ClusterClientCursorParams* _params; // The metadata obj to pass along with the command request. Used to indicate that the command is diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index bca910ce347..6076a559867 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -132,7 +132,7 @@ protected: _params->isAllowPartialResults = qr->isAllowPartialResults(); } - arm = stdx::make_unique<AsyncResultsMerger>(executor(), _params.get()); + arm = stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), _params.get()); } /** @@ -234,7 +234,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { ASSERT_FALSE(arm->remotesExhausted()); // Schedule requests. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -278,7 +278,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) { ASSERT_FALSE(arm->remotesExhausted()); // Schedule requests. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -322,7 +322,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { ASSERT_FALSE(arm->remotesExhausted()); // Schedule requests. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -354,7 +354,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { ASSERT_FALSE(arm->ready()); // Make next event to be signaled. - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); // Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. responses.clear(); @@ -394,7 +394,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { ASSERT_FALSE(arm->remotesExhausted()); // Schedule requests. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -457,7 +457,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { ASSERT_FALSE(arm->remotesExhausted()); // Schedule requests. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); // First shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. std::vector<CursorResponse> responses; @@ -486,7 +486,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { ASSERT_FALSE(arm->ready()); // Make next event to be signaled. - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); // Second shard responds; the handleBatchResponse callback is run and ARM's remote gets updated. responses.clear(); @@ -514,7 +514,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { ASSERT_FALSE(arm->ready()); // Make next event to be signaled. - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); // First shard returns remainder of results. responses.clear(); @@ -552,7 +552,7 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) { // Schedule requests. ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Deliver responses. @@ -604,7 +604,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Parsing the batch results in an error because the sort key is missing. @@ -621,7 +621,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::InternalError); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -650,7 +650,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) { ASSERT_FALSE(arm->ready()); // Schedule requests. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -709,7 +709,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { ASSERT_FALSE(arm->ready()); // Schedule requests. - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); // Before any responses are delivered, ARM is not ready to return results. ASSERT_FALSE(arm->ready()); @@ -749,7 +749,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Both shards respond with the first batch. @@ -772,7 +772,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // When we ask the shards for their next batch, the first shard responds and the second shard @@ -791,7 +791,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // We can continue to return results from first shard, while second shard remains unresponsive. @@ -810,7 +810,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't // wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of // the network interface. - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); ASSERT_TRUE(killEvent.isValid()); } @@ -820,7 +820,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -834,7 +834,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { ASSERT(!arm->nextReady().isOK()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -846,7 +846,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; @@ -863,7 +863,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { ASSERT(!statusWithNext.isOK()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -875,7 +875,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -896,7 +896,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { ASSERT_EQ(statusWithNext.getStatus().reason(), "bad thing happened"); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -906,10 +906,10 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); - // Error to call nextEvent() before the previous event is signaled. - ASSERT_NOT_OK(arm->nextEvent(nullptr).getStatus()); + // Error to call nextEvent()() before the previous event is signaled. + ASSERT_NOT_OK(arm->nextEvent().getStatus()); std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; @@ -926,7 +926,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -936,8 +936,8 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { makeCursorFromExistingCursors(std::move(cursors)); executor()->shutdown(); - ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent(nullptr).getStatus()); - auto killEvent = arm->kill(nullptr); + ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus()); + auto killEvent = arm->kill(operationContext()); ASSERT_FALSE(killEvent.isValid()); } @@ -948,13 +948,13 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch // Make a request to the shard that will never get answered. ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); blackHoleNextRequest(); // Executor shuts down before a response is received. executor()->shutdown(); - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); ASSERT_FALSE(killEvent.isValid()); } @@ -964,7 +964,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto killedEvent = arm->kill(nullptr); + auto killedEvent = arm->kill(operationContext()); // Killed cursors are considered ready, but return an error when you try to receive the next // doc. @@ -982,7 +982,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -996,7 +996,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { CursorResponse::ResponseType::SubsequentResponse); // Kill should be able to return right away if there are no pending batches. - auto killedEvent = arm->kill(nullptr); + auto killedEvent = arm->kill(operationContext()); ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); executor()->waitForEvent(killedEvent); @@ -1010,7 +1010,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -1020,7 +1020,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { CursorResponse::ResponseType::SubsequentResponse); // Kill event will only be signalled once the callbacks for the pending batches have run. - auto killedEvent = arm->kill(nullptr); + auto killedEvent = arm->kill(operationContext()); // The pending requests have been canceled, so run their callbacks. runReadyCallbacks(); @@ -1036,7 +1036,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { makeCursorFromExistingCursors(std::move(cursors)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -1045,10 +1045,10 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::SubsequentResponse); - auto killedEvent = arm->kill(nullptr); + auto killedEvent = arm->kill(operationContext()); // Attempting to schedule more network operations on a killed arm is an error. - ASSERT_NOT_OK(arm->nextEvent(nullptr).getStatus()); + ASSERT_NOT_OK(arm->nextEvent().getStatus()); executor()->waitForEvent(killedEvent); } @@ -1057,9 +1057,9 @@ TEST_F(AsyncResultsMergerTest, KillCalledTwice) { std::vector<ClusterClientCursorParams::RemoteCursor> cursors; cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 1, {})); makeCursorFromExistingCursors(std::move(cursors)); - auto killedEvent1 = arm->kill(nullptr); + auto killedEvent1 = arm->kill(operationContext()); ASSERT(killedEvent1.isValid()); - auto killedEvent2 = arm->kill(nullptr); + auto killedEvent2 = arm->kill(operationContext()); ASSERT(killedEvent2.isValid()); executor()->waitForEvent(killedEvent1); executor()->waitForEvent(killedEvent2); @@ -1072,7 +1072,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -1093,7 +1093,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { ASSERT_FALSE(arm->remotesExhausted()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); responses.clear(); @@ -1110,7 +1110,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); - auto killedEvent = arm->kill(nullptr); + auto killedEvent = arm->kill(operationContext()); executor()->waitForEvent(killedEvent); } @@ -1121,7 +1121,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Remote responds with an empty batch and a non-zero cursor id. @@ -1138,7 +1138,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); - auto killedEvent = arm->kill(nullptr); + auto killedEvent = arm->kill(operationContext()); executor()->waitForEvent(killedEvent); } @@ -1149,7 +1149,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Remote responds with an empty batch and a zero cursor id. @@ -1174,7 +1174,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -1195,7 +1195,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(0), batch2); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); BSONObj scheduledCmd = getFirstPendingRequest().cmdObj; auto request = GetMoreRequest::parseFromBSON("anydbname", scheduledCmd); @@ -1221,7 +1221,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { ReadPreferenceSetting(ReadPreference::Nearest)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); BSONObj cmdRequestMetadata = getFirstPendingRequest().metadata; @@ -1251,7 +1251,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // An error occurs with the first host. @@ -1275,7 +1275,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Now the second host becomes unreachable. We should still be willing to return results from @@ -1294,7 +1294,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Once the last reachable shard indicates that its cursor is closed, we're done. @@ -1316,7 +1316,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -1332,7 +1332,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // The lone host involved in this query returns an error. This should simply cause us to return @@ -1350,7 +1350,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // First host returns single result @@ -1380,7 +1380,7 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Both hosts return network (retriable) errors. @@ -1396,7 +1396,7 @@ TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { ASSERT_EQ(statusWithNext.getStatus().reason(), "host unreachable"); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -1407,7 +1407,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector<CursorResponse> responses; @@ -1425,7 +1425,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Pending getMore request should include maxTimeMS. @@ -1456,7 +1456,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -1467,7 +1467,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -1478,13 +1478,13 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { makeCursorFromExistingCursors(std::move(cursors), findCmd); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); + auto readyEvent = unittest::assertGet(arm->nextEvent()); scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"}); - ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent(nullptr).getStatus()); + ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent().getStatus()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(nullptr); + auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index aca0f54c704..d4b89d74dfc 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -77,13 +77,13 @@ public: virtual void kill(OperationContext* opCtx) = 0; /** - * Sets the operation context for the cursor. Must be called before the first call to next(). - * The cursor attaches to a new OperationContext on each getMore. + * Sets the operation context for the cursor. */ virtual void reattachToOperationContext(OperationContext* opCtx) = 0; /** - * Detaches the cursor from its current OperationContext. + * Detaches the cursor from its current OperationContext. Must be called before the + * OperationContext in use is deleted. */ virtual void detachFromOperationContext() = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 0e6b5a197ce..e7716355c0f 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -63,15 +63,16 @@ std::unique_ptr<ClusterClientCursor> ClusterClientCursorGuard::releaseCursor() { ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams&& params) { - std::unique_ptr<ClusterClientCursor> cursor( - new ClusterClientCursorImpl(executor, std::move(params), opCtx->getLogicalSessionId())); + std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl( + opCtx, executor, std::move(params), opCtx->getLogicalSessionId())); return ClusterClientCursorGuard(opCtx, std::move(cursor)); } -ClusterClientCursorImpl::ClusterClientCursorImpl(executor::TaskExecutor* executor, +ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, + executor::TaskExecutor* executor, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid) - : _params(std::move(params)), _root(buildMergerPlan(executor, &_params)), _lsid(lsid) {} + : _params(std::move(params)), _root(buildMergerPlan(opCtx, executor, &_params)), _lsid(lsid) {} ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root, ClusterClientCursorParams&& params, @@ -140,7 +141,7 @@ boost::optional<LogicalSessionId> ClusterClientCursorImpl::getLsid() const { } std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( - executor::TaskExecutor* executor, ClusterClientCursorParams* params) { + OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) { const auto skip = params->skip; const auto limit = params->limit; const bool hasSort = !params->sort.isEmpty(); @@ -153,18 +154,19 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( return stdx::make_unique<RouterStageAggregationMerge>(std::move(params->mergePipeline)); } - std::unique_ptr<RouterExecStage> root = stdx::make_unique<RouterStageMerge>(executor, params); + std::unique_ptr<RouterExecStage> root = + stdx::make_unique<RouterStageMerge>(opCtx, executor, params); if (skip) { - root = stdx::make_unique<RouterStageSkip>(std::move(root), *skip); + root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip); } if (limit) { - root = stdx::make_unique<RouterStageLimit>(std::move(root), *limit); + root = stdx::make_unique<RouterStageLimit>(opCtx, std::move(root), *limit); } if (hasSort) { - root = stdx::make_unique<RouterStageRemoveSortKey>(std::move(root)); + root = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(root)); } return root; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 07dbab2094d..8b5e5b0a855 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -123,7 +123,8 @@ public: /** * Constructs a cluster client cursor. */ - ClusterClientCursorImpl(executor::TaskExecutor* executor, + ClusterClientCursorImpl(OperationContext* opCtx, + executor::TaskExecutor* executor, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid); @@ -131,7 +132,8 @@ private: /** * Constructs the pipeline of MergerPlanStages which will be used to answer the query. */ - std::unique_ptr<RouterExecStage> buildMergerPlan(executor::TaskExecutor* executor, + std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx, + executor::TaskExecutor* executor, ClusterClientCursorParams* params); ClusterClientCursorParams _params; diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp index 8598fe192dc..08dcdb8c303 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp @@ -40,13 +40,12 @@ namespace mongo { namespace { -// Note: Though the next() method on RouterExecStage and its subclasses depend on an -// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are -// mocked in this test using RouterStageMock. RouterStageMock does not actually use the -// OperationContext, so we omit the call to rettachToOperationContext in these tests. +// These tests use RouterStageMock, which does not actually use its OperationContext, so rather than +// going through the trouble of making one, we'll just use nullptr throughout. +OperationContext* opCtx = nullptr; TEST(ClusterClientCursorImpl, NumReturnedSoFar) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); for (int i = 1; i < 10; ++i) { mockStage->queueResult(BSON("a" << i)); } @@ -71,7 +70,7 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) { } TEST(ClusterClientCursorImpl, QueueResult) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 4)); @@ -110,7 +109,7 @@ TEST(ClusterClientCursorImpl, QueueResult) { } TEST(ClusterClientCursorImpl, RemotesExhausted) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->markRemotesExhausted(); @@ -141,7 +140,7 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) { } TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); auto mockStagePtr = mockStage.get(); ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); @@ -157,13 +156,13 @@ TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) { TEST(ClusterClientCursorImpl, LogicalSessionIdsOnCursors) { // Make a cursor with no lsid - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); ClusterClientCursorParams params(NamespaceString("test"), {}); ClusterClientCursorImpl cursor{std::move(mockStage), std::move(params), boost::none}; ASSERT(!cursor.getLsid()); // Make a cursor with an lsid - auto mockStage2 = stdx::make_unique<RouterStageMock>(); + auto mockStage2 = stdx::make_unique<RouterStageMock>(opCtx); ClusterClientCursorParams params2(NamespaceString("test"), {}); auto lsid = makeLogicalSessionIdForTest(); ClusterClientCursorImpl cursor2{std::move(mockStage2), std::move(params2), lsid}; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index b7a5ecc6b34..27afa51d5a5 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -274,8 +274,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, auto cursorState = ClusterCursorManager::CursorState::NotExhausted; int bytesBuffered = 0; - ccc->reattachToOperationContext(opCtx); - while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) { auto next = ccc->next(); diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 1f2fd2a9e7f..ac074d92b62 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -52,8 +52,9 @@ class OperationContext; */ class RouterExecStage { public: - RouterExecStage() = default; - RouterExecStage(std::unique_ptr<RouterExecStage> child) : _child(std::move(child)) {} + RouterExecStage(OperationContext* opCtx) : _opCtx(opCtx) {} + RouterExecStage(OperationContext* opCtx, std::unique_ptr<RouterExecStage> child) + : _opCtx(opCtx), _child(std::move(child)) {} virtual ~RouterExecStage() = default; @@ -71,6 +72,10 @@ public: /** * Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for * responses from remote hosts. + * + * Note that 'opCtx' may or may not be the same as the operation context to which this cursor is + * currently attached. This is so that a killing thread may call this method with its own + * operation context. */ virtual void kill(OperationContext* opCtx) = 0; @@ -144,8 +149,8 @@ protected: } private: - std::unique_ptr<RouterExecStage> _child; OperationContext* _opCtx = nullptr; + std::unique_ptr<RouterExecStage> _child; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_aggregation_merge.cpp b/src/mongo/s/query/router_stage_aggregation_merge.cpp index 6fdfdc4fea2..a4273dbd4a7 100644 --- a/src/mongo/s/query/router_stage_aggregation_merge.cpp +++ b/src/mongo/s/query/router_stage_aggregation_merge.cpp @@ -37,7 +37,8 @@ namespace mongo { RouterStageAggregationMerge::RouterStageAggregationMerge( std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline) - : _mergePipeline(std::move(mergePipeline)) {} + : RouterExecStage(mergePipeline->getContext()->opCtx), + _mergePipeline(std::move(mergePipeline)) {} StatusWith<ClusterQueryResult> RouterStageAggregationMerge::next() { // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF. diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index 03711279e07..feb8f11626f 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -34,8 +34,10 @@ namespace mongo { -RouterStageLimit::RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit) - : RouterExecStage(std::move(child)), _limit(limit) { +RouterStageLimit::RouterStageLimit(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> child, + long long limit) + : RouterExecStage(opCtx, std::move(child)), _limit(limit) { invariant(limit > 0); } diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index cb2fd708835..42ef46c21ab 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -37,7 +37,9 @@ namespace mongo { */ class RouterStageLimit final : public RouterExecStage { public: - RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit); + RouterStageLimit(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> child, + long long limit); StatusWith<ClusterQueryResult> next() final; diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp index 61689e4cd6a..ffbacf1f17d 100644 --- a/src/mongo/s/query/router_stage_limit_test.cpp +++ b/src/mongo/s/query/router_stage_limit_test.cpp @@ -40,18 +40,17 @@ namespace mongo { namespace { -// Note: Though the next() method on RouterExecStage and its subclasses depend on an -// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are -// mocked in this test using RouterStageMock. RouterStageMock does not actually use the -// OperationContext, so we omit the call to rettachToOperationContext in these tests. +// These tests use router stages, which do not actually use their OperationContext, so rather than +// going through the trouble of making one, we'll just use nullptr throughout. +OperationContext* opCtx = nullptr; TEST(RouterStageLimitTest, LimitIsOne) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult({BSON("a" << 1)}); mockStage->queueResult({BSON("a" << 2)}); mockStage->queueResult({BSON("a" << 3)}); - auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 1); + auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 1); auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -69,12 +68,12 @@ TEST(RouterStageLimitTest, LimitIsOne) { } TEST(RouterStageLimitTest, LimitIsTwo) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); - auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2); + auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 2); auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -92,13 +91,13 @@ TEST(RouterStageLimitTest, LimitIsTwo) { } TEST(RouterStageLimitTest, LimitStagePropagatesError) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); - auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 3); + auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 3); auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -115,13 +114,13 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) { // Here we're mocking the tailable case, where there may be a boost::none returned before the // remote cursor is closed. Our goal is to make sure that the limit stage handles this properly, // not counting boost::none towards the limit. - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueEOF(); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); - auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2); + auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 2); auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -143,12 +142,12 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) { } TEST(RouterStageLimitTest, LimitStageRemotesExhausted) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->markRemotesExhausted(); - auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100); + auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 100); ASSERT_TRUE(limitStage->remotesExhausted()); auto firstResult = limitStage->next(); @@ -170,11 +169,11 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) { } TEST(RouterStageLimitTest, ForwardsAwaitDataTimeout) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); auto mockStagePtr = mockStage.get(); ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); - auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100); + auto limitStage = stdx::make_unique<RouterStageLimit>(opCtx, std::move(mockStage), 100); ASSERT_OK(limitStage->setAwaitDataTimeout(Milliseconds(789))); auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 72fe7a06624..78ee1a3475a 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -36,13 +36,14 @@ namespace mongo { -RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor, +RouterStageMerge::RouterStageMerge(OperationContext* opCtx, + executor::TaskExecutor* executor, ClusterClientCursorParams* params) - : _executor(executor), _arm(executor, params) {} + : RouterExecStage(opCtx), _executor(executor), _arm(opCtx, executor, params) {} StatusWith<ClusterQueryResult> RouterStageMerge::next() { while (!_arm.ready()) { - auto nextEventStatus = _arm.nextEvent(getOpCtx()); + auto nextEventStatus = _arm.nextEvent(); if (!nextEventStatus.isOK()) { return nextEventStatus.getStatus(); } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index caae43877c6..23503c664f6 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -43,7 +43,9 @@ namespace mongo { */ class RouterStageMerge final : public RouterExecStage { public: - RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams* params); + RouterStageMerge(OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params); StatusWith<ClusterQueryResult> next() final; @@ -53,6 +55,15 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; +protected: + void doReattachToOperationContext() override { + _arm.reattachToOperationContext(getOpCtx()); + } + + virtual void doDetachFromOperationContext() { + _arm.detachFromOperationContext(); + } + private: // Not owned here. executor::TaskExecutor* _executor; diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index 18baaeacd74..e2f8e7adab5 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -42,6 +42,7 @@ namespace mongo { */ class RouterStageMock final : public RouterExecStage { public: + RouterStageMock(OperationContext* opCtx) : RouterExecStage(opCtx) {} ~RouterStageMock() final {} StatusWith<ClusterQueryResult> next() final; diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp index fecf5440898..fe7a8cf0f7d 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp @@ -38,8 +38,9 @@ namespace mongo { -RouterStageRemoveSortKey::RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child) - : RouterExecStage(std::move(child)) {} +RouterStageRemoveSortKey::RouterStageRemoveSortKey(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> child) + : RouterExecStage(opCtx, std::move(child)) {} StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() { auto childResult = getChildStage()->next(); diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h index c2329bbc93d..ba71364dfa9 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ b/src/mongo/s/query/router_stage_remove_sortkey.h @@ -39,7 +39,7 @@ namespace mongo { */ class RouterStageRemoveSortKey final : public RouterExecStage { public: - RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child); + RouterStageRemoveSortKey(OperationContext* opCtx, std::unique_ptr<RouterExecStage> child); StatusWith<ClusterQueryResult> next() final; diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp index 5db61b7b0a9..5767549ad64 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp @@ -40,20 +40,19 @@ namespace mongo { namespace { -// Note: Though the next() method on RouterExecStage and its subclasses depend on an -// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are -// mocked in this test using RouterStageMock. RouterStageMock does not actually use the -// OperationContext, so we omit the call to rettachToOperationContext in these tests. +// These tests use router stages, which do not actually use their OperationContext, so rather than +// going through the trouble of making one, we'll just use nullptr throughout. +OperationContext* opCtx = nullptr; TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3)); mockStage->queueResult(BSON("$sortKey" << BSON("" << 3) << "c" << BSON("d" << "foo"))); mockStage->queueResult(BSON("a" << 3)); mockStage->queueResult(BSONObj()); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -83,11 +82,11 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { } TEST(RouterStageRemoveSortKeyTest, PropagatesError) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("$sortKey" << 1)); mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -101,12 +100,12 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) { } TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); mockStage->queueEOF(); mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -128,12 +127,12 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { } TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1)); mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2)); mockStage->markRemotesExhausted(); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); auto firstResult = sortKeyStage->next(); @@ -155,11 +154,11 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { } TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); auto mockStagePtr = mockStage.get(); ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); - auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); + auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage)); ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789))); auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index 7510e3aefd6..50d2107b14c 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -34,8 +34,10 @@ namespace mongo { -RouterStageSkip::RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip) - : RouterExecStage(std::move(child)), _skip(skip) { +RouterStageSkip::RouterStageSkip(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> child, + long long skip) + : RouterExecStage(opCtx, std::move(child)), _skip(skip) { invariant(skip > 0); } diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index c6dc1adda39..49051128577 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -37,7 +37,9 @@ namespace mongo { */ class RouterStageSkip final : public RouterExecStage { public: - RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip); + RouterStageSkip(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> child, + long long skip); StatusWith<ClusterQueryResult> next() final; diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp index f1a58371b5c..1e2ca91dba5 100644 --- a/src/mongo/s/query/router_stage_skip_test.cpp +++ b/src/mongo/s/query/router_stage_skip_test.cpp @@ -40,18 +40,17 @@ namespace mongo { namespace { -// Note: Though the next() method on RouterExecStage and its subclasses depend on an -// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are -// mocked in this test using RouterStageMock. RouterStageMock does not actually use the -// OperationContext, so we omit the call to rettachToOperationContext in these tests. +// These tests use RouterStageMock, which does not actually use its OperationContext, so rather than +// going through the trouble of making one, we'll just use nullptr throughout. +OperationContext* opCtx = nullptr; TEST(RouterStageSkipTest, SkipIsOne) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 1); auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -74,13 +73,13 @@ TEST(RouterStageSkipTest, SkipIsOne) { } TEST(RouterStageSkipTest, SkipIsThree) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); mockStage->queueResult(BSON("a" << 4)); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 3); auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -93,12 +92,12 @@ TEST(RouterStageSkipTest, SkipIsThree) { } TEST(RouterStageSkipTest, SkipEqualToResultSetSize) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 3); auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -106,12 +105,12 @@ TEST(RouterStageSkipTest, SkipEqualToResultSetSize) { } TEST(RouterStageSkipTest, SkipExceedsResultSetSize) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 100); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 100); auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -119,13 +118,13 @@ TEST(RouterStageSkipTest, SkipExceedsResultSetSize) { } TEST(RouterStageSkipTest, ErrorWhileSkippingResults) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 2); auto firstResult = skipStage->next(); ASSERT_NOT_OK(firstResult.getStatus()); @@ -134,13 +133,13 @@ TEST(RouterStageSkipTest, ErrorWhileSkippingResults) { } TEST(RouterStageSkipTest, ErrorAfterSkippingResults) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 2); auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -155,13 +154,13 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) { TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) { // Skip stage must propagate a boost::none, but not count it towards the skip value. - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueEOF(); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 2); auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); @@ -178,13 +177,13 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) { } TEST(RouterStageSkipTest, SkipStageRemotesExhausted) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); mockStage->queueResult(BSON("a" << 1)); mockStage->queueResult(BSON("a" << 2)); mockStage->queueResult(BSON("a" << 3)); mockStage->markRemotesExhausted(); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 1); ASSERT_TRUE(skipStage->remotesExhausted()); auto firstResult = skipStage->next(); @@ -206,11 +205,11 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) { } TEST(RouterStageSkipTest, ForwardsAwaitDataTimeout) { - auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStage = stdx::make_unique<RouterStageMock>(opCtx); auto mockStagePtr = mockStage.get(); ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); - auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); + auto skipStage = stdx::make_unique<RouterStageSkip>(opCtx, std::move(mockStage), 3); ASSERT_OK(skipStage->setAwaitDataTimeout(Milliseconds(789))); auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index a2f912971d4..dce282c5892 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -70,6 +70,9 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params)); + // We don't expect to use this cursor until a subsequent getMore, so detach from the current + // OperationContext until then. + ccc->detachFromOperationContext(); auto clusterCursorId = cursorManager->registerCursor(opCtx, ccc.releaseCursor(), |