From 31e5c31a79b2043d0fa4288c0435fdfce4348343 Mon Sep 17 00:00:00 2001 From: Esha Maharishi Date: Thu, 9 Feb 2017 15:08:31 -0500 Subject: SERVER-27965 thread OperationContext* down through ClusterClientCursor's next() and kill() methods --- src/mongo/s/query/async_results_merger.cpp | 46 +++--- src/mongo/s/query/async_results_merger.h | 16 +- src/mongo/s/query/async_results_merger_test.cpp | 168 ++++++++++----------- src/mongo/s/query/cluster_client_cursor.h | 11 +- src/mongo/s/query/cluster_client_cursor_impl.cpp | 24 ++- src/mongo/s/query/cluster_client_cursor_impl.h | 12 +- .../s/query/cluster_client_cursor_impl_test.cpp | 26 ++-- src/mongo/s/query/cluster_client_cursor_mock.cpp | 9 +- src/mongo/s/query/cluster_client_cursor_mock.h | 6 +- src/mongo/s/query/cluster_client_cursor_params.h | 4 - src/mongo/s/query/cluster_cursor_manager.cpp | 23 ++- src/mongo/s/query/cluster_cursor_manager.h | 13 +- src/mongo/s/query/cluster_cursor_manager_test.cpp | 162 +++++++++++++------- src/mongo/s/query/cluster_find.cpp | 9 +- src/mongo/s/query/router_exec_stage.h | 11 +- src/mongo/s/query/router_stage_limit.cpp | 12 +- src/mongo/s/query/router_stage_limit.h | 6 +- src/mongo/s/query/router_stage_limit_test.cpp | 36 +++-- src/mongo/s/query/router_stage_merge.cpp | 12 +- src/mongo/s/query/router_stage_merge.h | 6 +- src/mongo/s/query/router_stage_mock.cpp | 8 +- src/mongo/s/query/router_stage_mock.h | 6 +- src/mongo/s/query/router_stage_remove_sortkey.cpp | 12 +- src/mongo/s/query/router_stage_remove_sortkey.h | 6 +- .../s/query/router_stage_remove_sortkey_test.cpp | 32 ++-- src/mongo/s/query/router_stage_skip.cpp | 14 +- src/mongo/s/query/router_stage_skip.h | 6 +- src/mongo/s/query/router_stage_skip_test.cpp | 40 ++--- src/mongo/s/query/store_possible_cursor.cpp | 8 +- src/mongo/s/query/store_possible_cursor.h | 4 +- src/mongo/s/query/store_possible_cursor_test.cpp | 9 +- 31 files changed, 374 insertions(+), 383 deletions(-) (limited to 'src/mongo/s/query') diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 95223473d5e..1db4a63c79a 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -114,11 +114,6 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return Status::OK(); } -void AsyncResultsMerger::setOperationContext(OperationContext* txn) { - stdx::lock_guard lk(_mutex); - _params.txn = txn; -} - bool AsyncResultsMerger::ready() { stdx::lock_guard lk(_mutex); return ready_inlock(); @@ -258,7 +253,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { return {}; } -Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { +Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* txn, size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -295,16 +290,16 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { cmdObj = *remote.initialCmdObj; } - executor::RemoteCommandRequest request(remote.getTargetHost(), - _params.nsString.db().toString(), - cmdObj, - _metadataObj, - _params.txn); + executor::RemoteCommandRequest request( + remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _metadataObj, txn); - auto callbackStatus = _executor->scheduleRemoteCommand( - request, - stdx::bind( - &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex)); + auto callbackStatus = + _executor->scheduleRemoteCommand(request, + stdx::bind(&AsyncResultsMerger::handleBatchResponse, + this, + stdx::placeholders::_1, + txn, + remoteIndex)); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -321,7 +316,8 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { * 2. Remotes that already has some result will have a non-empty buffer. * 3. Remotes that reached maximum retries will be in 'exhausted' state. */ -StatusWith AsyncResultsMerger::nextEvent() { +StatusWith AsyncResultsMerger::nextEvent( + OperationContext* txn) { stdx::lock_guard lk(_mutex); if (_lifecycleState != kAlive) { @@ -349,7 +345,7 @@ StatusWith AsyncResultsMerger::nextEvent() // If we already have established a cursor with this remote, and there is no outstanding // request for which we have a valid callback handle, then schedule work to retrieve the // next batch. - auto nextBatchStatus = askForNextBatch_inlock(i); + auto nextBatchStatus = askForNextBatch_inlock(txn, i); if (!nextBatchStatus.isOK()) { return nextBatchStatus; } @@ -394,7 +390,9 @@ StatusWith AsyncResultsMerger::parseCursorResponse(const BSONObj } void AsyncResultsMerger::handleBatchResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, + OperationContext* txn, + size_t remoteIndex) { stdx::lock_guard lk(_mutex); auto& remote = _remotes[remoteIndex]; @@ -428,7 +426,7 @@ void AsyncResultsMerger::handleBatchResponse( // If the event handle is invalid, then the executor is in the middle of shutting down, // and we can't schedule any more work for it to complete. if (_killCursorsScheduledEvent.isValid()) { - scheduleKillCursors_inlock(); + scheduleKillCursors_inlock(txn); _executor->signalEvent(_killCursorsScheduledEvent); } @@ -583,7 +581,7 @@ void AsyncResultsMerger::handleBatchResponse( // We do not ask for the next batch if the cursor is tailable, as batches received from remote // tailable cursors should be passed through to the client without asking for more batches. if (!_params.isTailable && !remote.hasNext() && !remote.exhausted()) { - remote.status = askForNextBatch_inlock(remoteIndex); + remote.status = askForNextBatch_inlock(txn, remoteIndex); if (!remote.status.isOK()) { return; } @@ -614,7 +612,7 @@ bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() { return false; } -void AsyncResultsMerger::scheduleKillCursors_inlock() { +void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* txn) { invariant(_lifecycleState == kKillStarted); invariant(_killCursorsScheduledEvent.isValid()); @@ -625,7 +623,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock() { BSONObj cmdObj = KillCursorsRequest(_params.nsString, {*remote.cursorId}).toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _params.txn); + remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, txn); _executor->scheduleRemoteCommand( request, @@ -639,7 +637,7 @@ void AsyncResultsMerger::handleKillCursorsResponse( // We just ignore any killCursors command responses. } -executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() { +executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* txn) { stdx::lock_guard lk(_mutex); if (_killCursorsScheduledEvent.isValid()) { invariant(_lifecycleState != kAlive); @@ -665,7 +663,7 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() { // remotes now. Otherwise, we have to wait until all responses are back, and then we can kill // the remote cursors. if (!haveOutstandingBatchRequests_inlock()) { - scheduleKillCursors_inlock(); + scheduleKillCursors_inlock(txn); _lifecycleState = kKillComplete; _executor->signalEvent(_killCursorsScheduledEvent); } diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index fb3cd3262c5..1bee485e09e 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -101,13 +101,6 @@ public: */ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); - /** - * Update the operation context for remote requests. - * - * Network requests depend on having a valid operation context for user initiated actions. - */ - void setOperationContext(OperationContext* txn); - /** * Returns true if there is no need to schedule remote work in order to take the next action. * This means that either @@ -161,7 +154,7 @@ public: * the caller should call nextEvent() to retry the request on the hosts that errored. If * ready() is true, then either the error was not retriable or it has exhausted max retries. */ - StatusWith nextEvent(); + StatusWith nextEvent(OperationContext* txn); /** * Starts shutting down this ARM. Returns a handle to an event which is signaled when this @@ -176,7 +169,7 @@ public: * * May be called multiple times (idempotent). */ - executor::TaskExecutor::EventHandle kill(); + executor::TaskExecutor::EventHandle kill(OperationContext* txn); private: /** @@ -298,7 +291,7 @@ private: * * Returns success if the command to retrieve the next batch was scheduled successfully. */ - Status askForNextBatch_inlock(size_t remoteIndex); + Status askForNextBatch_inlock(OperationContext* txn, size_t remoteIndex); /** * Checks whether or not the remote cursors are all exhausted. @@ -329,6 +322,7 @@ private: * buffered. */ void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, + OperationContext* txn, size_t remoteIndex); /** @@ -348,7 +342,7 @@ private: /** * Schedules a killCursors command to be run on all remote hosts that have open cursors. */ - void scheduleKillCursors_inlock(); + void scheduleKillCursors_inlock(OperationContext* txn); // Not owned here. executor::TaskExecutor* _executor; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 94a6acf31ec..d9bd5c058ac 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -228,7 +228,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); @@ -276,7 +276,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -304,7 +304,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); @@ -330,7 +330,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); ASSERT_FALSE(arm->remotesExhausted()); @@ -353,7 +353,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -396,7 +396,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -426,7 +426,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); responses.clear(); @@ -448,7 +448,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); responses.clear(); @@ -477,7 +477,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -520,7 +520,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Parsing the batch results in an error because the sort key is missing. @@ -536,7 +536,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::InternalError); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -549,7 +549,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}, getMoreBatchSize); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Both shards give back empty responses. Second shard doesn't have any results so it @@ -574,7 +574,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // The shard responds with another empty batch but leaves the cursor open. It probably shouldn't @@ -604,7 +604,7 @@ TEST_F(AsyncResultsMergerTest, ReceivedViewDefinitionFromShard) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::string inputNs = "views_sharded.coll"; @@ -634,7 +634,7 @@ TEST_F(AsyncResultsMergerTest, ExistingCursors) { makeCursorFromExistingCursors({{kTestShardHosts[0], 5}, {kTestShardHosts[1], 6}}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -664,7 +664,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Both shards respond with the first batch. @@ -686,7 +686,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // When we ask the shards for their next batch, the first shard responds and the second shard @@ -705,7 +705,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // We can continue to return results from first shard, while second shard remains unresponsive. @@ -724,7 +724,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { // Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't // wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of // the network interface. - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); ASSERT_TRUE(killEvent.isValid()); } @@ -733,7 +733,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -751,7 +751,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); responses.clear(); @@ -766,7 +766,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { ASSERT(!arm->nextReady().isOK()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -775,7 +775,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; @@ -793,7 +793,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { ASSERT(!statusWithNext.isOK()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -802,7 +802,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -822,7 +822,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { ASSERT_EQ(statusWithNext.getStatus().reason(), "bad thing happened"); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -831,10 +831,10 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); // Error to call nextEvent() before the previous event is signaled. - ASSERT_NOT_OK(arm->nextEvent().getStatus()); + ASSERT_NOT_OK(arm->nextEvent(nullptr).getStatus()); std::vector responses; std::vector batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; @@ -850,7 +850,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -858,8 +858,8 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, kTestShardIds); executor()->shutdown(); - ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus()); - auto killEvent = arm->kill(); + ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent(nullptr).getStatus()); + auto killEvent = arm->kill(nullptr); ASSERT_FALSE(killEvent.isValid()); } @@ -869,13 +869,13 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch // Make a request to the shard that will never get answered. ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); blackHoleNextRequest(); // Executor shuts down before a response is received. executor()->shutdown(); - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); ASSERT_FALSE(killEvent.isValid()); } @@ -884,7 +884,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto killedEvent = arm->kill(); + auto killedEvent = arm->kill(nullptr); // Killed cursors are considered ready, but return an error when you try to receive the next // doc. @@ -899,7 +899,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -912,7 +912,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); // Kill should be able to return right away if there are no pending batches. - auto killedEvent = arm->kill(); + auto killedEvent = arm->kill(nullptr); ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); executor()->waitForEvent(killedEvent); @@ -923,7 +923,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -932,7 +932,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); // Kill event will only be signalled once the pending batches are received. - auto killedEvent = arm->kill(); + auto killedEvent = arm->kill(nullptr); // After the kill, the ARM waits for outstanding batches to come back. This ensures that we // receive cursor ids for any established remote cursors, and can clean them up by issuing @@ -963,7 +963,7 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -979,10 +979,10 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) { // This will schedule a getMore on cursor id 123. ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); - auto killedEvent = arm->kill(); + auto killedEvent = arm->kill(nullptr); // The kill can't complete until the getMore response is received. responses.clear(); @@ -1010,7 +1010,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -1018,10 +1018,10 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); - auto killedEvent = arm->kill(); + auto killedEvent = arm->kill(nullptr); // Attempting to schedule more network operations on a killed arm is an error. - ASSERT_NOT_OK(arm->nextEvent().getStatus()); + ASSERT_NOT_OK(arm->nextEvent(nullptr).getStatus()); executor()->waitForEvent(killedEvent); } @@ -1029,9 +1029,9 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { TEST_F(AsyncResultsMergerTest, KillCalledTwice) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); - auto killedEvent1 = arm->kill(); + auto killedEvent1 = arm->kill(nullptr); ASSERT(killedEvent1.isValid()); - auto killedEvent2 = arm->kill(); + auto killedEvent2 = arm->kill(nullptr); ASSERT(killedEvent2.isValid()); executor()->waitForEvent(killedEvent1); executor()->waitForEvent(killedEvent2); @@ -1042,7 +1042,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -1062,7 +1062,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { ASSERT_FALSE(arm->remotesExhausted()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); responses.clear(); @@ -1079,7 +1079,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); - auto killedEvent = arm->kill(); + auto killedEvent = arm->kill(nullptr); executor()->waitForEvent(killedEvent); } @@ -1088,7 +1088,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Remote responds with an empty batch and a non-zero cursor id. @@ -1104,7 +1104,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); ASSERT_FALSE(arm->remotesExhausted()); - auto killedEvent = arm->kill(); + auto killedEvent = arm->kill(nullptr); executor()->waitForEvent(killedEvent); } @@ -1113,7 +1113,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Remote responds with an empty batch and a zero cursor id. @@ -1135,7 +1135,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -1155,7 +1155,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { std::vector batch2 = {fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(0), batch2); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); BSONObj scheduledCmd = getFirstPendingRequest().cmdObj; auto request = GetMoreRequest::parseFromBSON("anydbname", scheduledCmd); @@ -1178,7 +1178,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { findCmd, {kTestShardIds[0]}, boost::none, ReadPreferenceSetting(ReadPreference::Nearest)); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); BSONObj cmdRequestMetadata = getFirstPendingRequest().metadata; @@ -1201,7 +1201,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { makeCursorFromFindCmd(findCmd, kTestShardIds); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // An unretriable error occurs with the first host. @@ -1224,7 +1224,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Now the second host becomes unreachable. We should still be willing to return results from @@ -1243,7 +1243,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Once the last reachable shard indicates that its cursor is closed, we're done. @@ -1263,7 +1263,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -1278,7 +1278,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult()); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // The lone host involved in this query returns an error. This should simply cause us to return @@ -1295,19 +1295,19 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) { ASSERT_FALSE(arm->ready()); // First and second attempts return an error. - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); // Third attempt succeeds. - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); std::vector responses; std::vector batch = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch); @@ -1326,7 +1326,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // All attempts return an error (one attempt plus three retries) @@ -1334,19 +1334,19 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) { executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); executor()->waitForEvent(readyEvent); @@ -1356,7 +1356,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) { ASSERT_EQ(status.getStatus().code(), ErrorCodes::NotMasterNoSlaveOk); // Protocol is to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -1365,7 +1365,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // First host returns single result @@ -1379,23 +1379,23 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); executor()->waitForEvent(readyEvent); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1410,7 +1410,7 @@ TEST_F(AsyncResultsMergerTest, ErrorAtFirstAttemptAtSameTimeShouldEventuallyRetu makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Both hosts return an error which indicates that the request should be retried. @@ -1420,7 +1420,7 @@ TEST_F(AsyncResultsMergerTest, ErrorAtFirstAttemptAtSameTimeShouldEventuallyRetu executor()->waitForEvent(readyEvent); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); @@ -1458,7 +1458,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); std::vector responses; @@ -1475,7 +1475,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); ASSERT_FALSE(arm->ready()); - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); ASSERT_FALSE(arm->ready()); // Pending getMore request should include maxTimeMS. @@ -1502,7 +1502,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -1510,7 +1510,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -1519,13 +1519,13 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) { makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); ASSERT_FALSE(arm->ready()); - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"}); - ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent().getStatus()); + ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent(nullptr).getStatus()); // Required to kill the 'arm' on error before destruction. - auto killEvent = arm->kill(); + auto killEvent = arm->kill(nullptr); executor()->waitForEvent(killEvent); } @@ -1536,11 +1536,11 @@ TEST_F(AsyncResultsMergerTest, RetryWhenShardHasRetriableErrorInBetweenReadyAndN ASSERT_FALSE(arm->ready()); // First attempt returns a retriable error. - auto readyEvent = unittest::assertGet(arm->nextEvent()); + auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"}); // We expect to be able to retrieve another event, and be waiting on the retry to succeed. - readyEvent = unittest::assertGet(arm->nextEvent()); + readyEvent = unittest::assertGet(arm->nextEvent(nullptr)); std::vector responses; std::vector batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch); diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index c4bb1c9373e..46b92eab642 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -64,7 +64,7 @@ public: * * A non-ok status is returned in case of any error. */ - virtual StatusWith next() = 0; + virtual StatusWith next(OperationContext* txn) = 0; /** * Must be called before destruction to abandon a not-yet-exhausted cursor. If next() has @@ -72,7 +72,7 @@ public: * * May block waiting for responses from remote hosts. */ - virtual void kill() = 0; + virtual void kill(OperationContext* txn) = 0; /** * Returns whether or not this cursor is tailing a capped collection on a shard. @@ -108,13 +108,6 @@ public: * the cursor is not tailable + awaitData). */ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; - - /** - * Update the operation context for remote requests. - * - * Network requests depend on having a valid operation context for user initiated actions. - */ - virtual void setOperationContext(OperationContext* txn) = 0; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 1498164f9ac..6d0a81be161 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -41,12 +41,13 @@ namespace mongo { -ClusterClientCursorGuard::ClusterClientCursorGuard(std::unique_ptr ccc) - : _ccc(std::move(ccc)) {} +ClusterClientCursorGuard::ClusterClientCursorGuard(OperationContext* txn, + std::unique_ptr ccc) + : _txn(txn), _ccc(std::move(ccc)) {} ClusterClientCursorGuard::~ClusterClientCursorGuard() { if (_ccc && !_ccc->remotesExhausted()) { - _ccc->kill(); + _ccc->kill(_txn); } } @@ -58,11 +59,12 @@ std::unique_ptr ClusterClientCursorGuard::releaseCursor() { return std::move(_ccc); } -ClusterClientCursorGuard ClusterClientCursorImpl::make(executor::TaskExecutor* executor, +ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* txn, + executor::TaskExecutor* executor, ClusterClientCursorParams&& params) { std::unique_ptr cursor( new ClusterClientCursorImpl(executor, std::move(params))); - return ClusterClientCursorGuard(std::move(cursor)); + return ClusterClientCursorGuard(txn, std::move(cursor)); } ClusterClientCursorImpl::ClusterClientCursorImpl(executor::TaskExecutor* executor, @@ -72,7 +74,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(executor::TaskExecutor* executo ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr root) : _root(std::move(root)) {} -StatusWith ClusterClientCursorImpl::next() { +StatusWith ClusterClientCursorImpl::next(OperationContext* txn) { // First return stashed results, if there are any. if (!_stash.empty()) { auto front = std::move(_stash.front()); @@ -81,15 +83,15 @@ StatusWith ClusterClientCursorImpl::next() { return {front}; } - auto next = _root->next(); + auto next = _root->next(txn); if (next.isOK() && !next.getValue().isEOF()) { ++_numReturnedSoFar; } return next; } -void ClusterClientCursorImpl::kill() { - _root->kill(); +void ClusterClientCursorImpl::kill(OperationContext* txn) { + _root->kill(txn); } bool ClusterClientCursorImpl::isTailable() const { @@ -122,10 +124,6 @@ Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeou return _root->setAwaitDataTimeout(awaitDataTimeout); } -void ClusterClientCursorImpl::setOperationContext(OperationContext* txn) { - return _root->setOperationContext(txn); -} - std::unique_ptr ClusterClientCursorImpl::buildMergerPlan( executor::TaskExecutor* executor, ClusterClientCursorParams&& params) { const auto skip = params.skip; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 71f018deec7..1974344abf5 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -50,7 +50,7 @@ class ClusterClientCursorGuard final { MONGO_DISALLOW_COPYING(ClusterClientCursorGuard); public: - ClusterClientCursorGuard(std::unique_ptr ccc); + ClusterClientCursorGuard(OperationContext* txn, std::unique_ptr ccc); /** * If a cursor is owned, safely destroys the cursor, cleaning up remote cursor state if @@ -74,6 +74,7 @@ public: std::unique_ptr releaseCursor(); private: + OperationContext* _txn; std::unique_ptr _ccc; }; @@ -84,7 +85,8 @@ public: /** * Constructs a CCC whose safe cleanup is ensured by an RAII object. */ - static ClusterClientCursorGuard make(executor::TaskExecutor* executor, + static ClusterClientCursorGuard make(OperationContext* txn, + executor::TaskExecutor* executor, ClusterClientCursorParams&& params); /** @@ -92,9 +94,9 @@ public: */ ClusterClientCursorImpl(std::unique_ptr root); - StatusWith next() final; + StatusWith next(OperationContext* txn) final; - void kill() final; + void kill(OperationContext* txn) final; bool isTailable() const final; @@ -106,8 +108,6 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - void setOperationContext(OperationContext* txn) final; - private: /** * Constructs a cluster client cursor. diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp index 0b87488e651..ff3a666551d 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp @@ -40,6 +40,10 @@ namespace mongo { namespace { +// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*, +// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use +// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests. + TEST(ClusterClientCursorImpl, NumReturnedSoFar) { auto mockStage = stdx::make_unique(); for (int i = 1; i < 10; ++i) { @@ -51,13 +55,13 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) { ASSERT_EQ(cursor.getNumReturnedSoFar(), 0); for (int i = 1; i < 10; ++i) { - auto result = cursor.next(); + auto result = cursor.next(nullptr); ASSERT(result.isOK()); ASSERT_BSONOBJ_EQ(*result.getValue().getResult(), BSON("a" << i)); ASSERT_EQ(cursor.getNumReturnedSoFar(), i); } // Now check that if nothing is fetched the getNumReturnedSoFar stays the same. - auto result = cursor.next(); + auto result = cursor.next(nullptr); ASSERT_OK(result.getStatus()); ASSERT_TRUE(result.getValue().isEOF()); ASSERT_EQ(cursor.getNumReturnedSoFar(), 9LL); @@ -70,7 +74,7 @@ TEST(ClusterClientCursorImpl, QueueResult) { ClusterClientCursorImpl cursor(std::move(mockStage)); - auto firstResult = cursor.next(); + auto firstResult = cursor.next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); @@ -78,22 +82,22 @@ TEST(ClusterClientCursorImpl, QueueResult) { cursor.queueResult(BSON("a" << 2)); cursor.queueResult(BSON("a" << 3)); - auto secondResult = cursor.next(); + auto secondResult = cursor.next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); - auto thirdResult = cursor.next(); + auto thirdResult = cursor.next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3)); - auto fourthResult = cursor.next(); + auto fourthResult = cursor.next(nullptr); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSON("a" << 4)); - auto fifthResult = cursor.next(); + auto fifthResult = cursor.next(nullptr); ASSERT_OK(fifthResult.getStatus()); ASSERT(fifthResult.getValue().isEOF()); @@ -114,7 +118,7 @@ TEST(ClusterClientCursorImpl, CursorPropagatesViewDefinition) { ClusterClientCursorImpl cursor(std::move(mockStage)); - auto result = cursor.next(); + auto result = cursor.next(nullptr); ASSERT_OK(result.getStatus()); ASSERT(!result.getValue().getResult()); ASSERT(result.getValue().getViewDefinition()); @@ -130,19 +134,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) { ClusterClientCursorImpl cursor(std::move(mockStage)); ASSERT_TRUE(cursor.remotesExhausted()); - auto firstResult = cursor.next(); + auto firstResult = cursor.next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); ASSERT_TRUE(cursor.remotesExhausted()); - auto secondResult = cursor.next(); + auto secondResult = cursor.next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(cursor.remotesExhausted()); - auto thirdResult = cursor.next(); + auto thirdResult = cursor.next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT_TRUE(thirdResult.getValue().isEOF()); ASSERT_TRUE(cursor.remotesExhausted()); diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index bd4136ab7f2..82ada207939 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -43,7 +43,7 @@ ClusterClientCursorMock::~ClusterClientCursorMock() { invariant(_exhausted || _killed); } -StatusWith ClusterClientCursorMock::next() { +StatusWith ClusterClientCursorMock::next(OperationContext* txn) { invariant(!_killed); if (_resultsQueue.empty()) { @@ -66,7 +66,7 @@ long long ClusterClientCursorMock::getNumReturnedSoFar() const { return _numReturnedSoFar; } -void ClusterClientCursorMock::kill() { +void ClusterClientCursorMock::kill(OperationContext* txn) { _killed = true; if (_killCallback) { _killCallback(); @@ -97,9 +97,4 @@ Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeou MONGO_UNREACHABLE; } - -void ClusterClientCursorMock::setOperationContext(OperationContext* txn) { - // Do nothing -} - } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index d9e0ba789e3..8820069b0e3 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -43,9 +43,9 @@ public: ~ClusterClientCursorMock(); - StatusWith next() final; + StatusWith next(OperationContext* txn) final; - void kill() final; + void kill(OperationContext* txn) final; bool isTailable() const final; @@ -55,8 +55,6 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - void setOperationContext(OperationContext* txn) final; - /** * Returns true unless marked as having non-exhausted remote cursors via * markRemotesNotExhausted(). diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index fce3bcf12cb..5e21b25ea8b 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -137,10 +137,6 @@ struct ClusterClientCursorParams { // Whether the client indicated that it is willing to receive partial results in the case of an // unreachable host. bool isAllowPartialResults = false; - - // OperationContext of the calling thread. Used to append Client dependent metadata to remote - // requests. - OperationContext* txn; }; } // mongo diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 48d233239d9..2b4e68ac2cf 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -110,9 +110,9 @@ ClusterCursorManager::PinnedCursor& ClusterCursorManager::PinnedCursor::operator return *this; } -StatusWith ClusterCursorManager::PinnedCursor::next() { +StatusWith ClusterCursorManager::PinnedCursor::next(OperationContext* txn) { invariant(_cursor); - return _cursor->next(); + return _cursor->next(txn); } bool ClusterCursorManager::PinnedCursor::isTailable() const { @@ -152,11 +152,6 @@ Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awai return _cursor->setAwaitDataTimeout(awaitDataTimeout); } -void ClusterCursorManager::PinnedCursor::setOperationContext(OperationContext* txn) { - return _cursor->setOperationContext(txn); -} - - void ClusterCursorManager::PinnedCursor::returnAndKillCursor() { invariant(_cursor); @@ -192,6 +187,7 @@ void ClusterCursorManager::shutdown() { } StatusWith ClusterCursorManager::registerCursor( + OperationContext* txn, std::unique_ptr cursor, const NamespaceString& nss, CursorType cursorType, @@ -203,7 +199,7 @@ StatusWith ClusterCursorManager::registerCursor( if (_inShutdown) { lk.unlock(); - cursor->kill(); + cursor->kill(txn); return Status(ErrorCodes::ShutdownInProgress, "Cannot register new cursors as we are in the process of shutting down"); } @@ -274,7 +270,6 @@ StatusWith ClusterCursorManager::checkOutCur } entry->setLastActive(now); - cursor->setOperationContext(txn); // Note that pinning a cursor transfers ownership of the underlying ClusterClientCursor object // to the pin; the CursorEntry is left with a null ClusterClientCursor. @@ -289,9 +284,6 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr cu invariant(cursor); - // Reset OperationContext so that non-user initiated operations do not try to use an invalid - // operation context - cursor->setOperationContext(nullptr); const bool remotesExhausted = cursor->remotesExhausted(); CursorEntry* entry = getEntry_inlock(nss, cursorId); @@ -400,8 +392,11 @@ std::size_t ClusterCursorManager::reapZombieCursors() { } lk.unlock(); - zombieCursor.getValue()->setOperationContext(nullptr); - zombieCursor.getValue()->kill(); + // Pass a null OperationContext, because this call should not actually schedule any remote + // work: the cursor is already pending kill, meaning the killCursors commands are already + // being scheduled to be sent to the remote shard hosts. This method will just wait for them + // all to be scheduled. + zombieCursor.getValue()->kill(nullptr); zombieCursor.getValue().reset(); lk.lock(); diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index ab7a874e69c..6126ef0757e 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -154,7 +154,7 @@ public: * * Can block. */ - StatusWith next(); + StatusWith next(OperationContext* txn); /** * Returns whether or not the underlying cursor is tailing a capped collection. Cannot be @@ -202,14 +202,6 @@ public: */ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); - - /** - * Update the operation context for remote requests. - * - * Network requests depend on having a valid operation context for user initiated actions. - */ - void setOperationContext(OperationContext* txn); - private: // ClusterCursorManager is a friend so that its methods can call the PinnedCursor // constructor declared below, which is private to prevent clients from calling it directly. @@ -269,7 +261,8 @@ public: * * Does not block. */ - StatusWith registerCursor(std::unique_ptr cursor, + StatusWith registerCursor(OperationContext* txn, + std::unique_ptr cursor, const NamespaceString& nss, CursorType cursorType, CursorLifetime cursorLifetime); diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp index b12b332375b..1ff8bace8c5 100644 --- a/src/mongo/s/query/cluster_cursor_manager_test.cpp +++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp @@ -109,17 +109,18 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << 1)); auto cursorId = assertGet( - getManager()->registerCursor(std::move(cursor), + getManager()->registerCursor(nullptr, + std::move(cursor), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - auto nextResult = pinnedCursor.getValue().next(); + auto nextResult = pinnedCursor.getValue().next(nullptr); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult()); - nextResult = pinnedCursor.getValue().next(); + nextResult = pinnedCursor.getValue().next(nullptr); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } @@ -127,7 +128,8 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) { // Test that registering a cursor returns a non-zero cursor id. TEST_F(ClusterCursorManagerTest, RegisterCursorReturnsNonZeroId) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -139,18 +141,19 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << 1)); auto cursorId = assertGet( - getManager()->registerCursor(std::move(cursor), + getManager()->registerCursor(nullptr, + std::move(cursor), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId()); - auto nextResult = checkedOutCursor.getValue().next(); + auto nextResult = checkedOutCursor.getValue().next(nullptr); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult()); - nextResult = checkedOutCursor.getValue().next(); + nextResult = checkedOutCursor.getValue().next(nullptr); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } @@ -164,7 +167,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << i)); cursorIds[i] = assertGet( - getManager()->registerCursor(std::move(cursor), + getManager()->registerCursor(nullptr, + std::move(cursor), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -172,11 +176,11 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { for (int i = 0; i < numCursors; ++i) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], nullptr); ASSERT_OK(pinnedCursor.getStatus()); - auto nextResult = pinnedCursor.getValue().next(); + auto nextResult = pinnedCursor.getValue().next(nullptr); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << i), *nextResult.getValue().getResult()); - nextResult = pinnedCursor.getValue().next(); + nextResult = pinnedCursor.getValue().next(nullptr); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } @@ -185,7 +189,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { // Test that checking out a pinned cursor returns an error with code ErrorCodes::CursorInUse. TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -198,7 +203,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) { // Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound. TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -219,7 +225,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) { const NamespaceString correctNamespace("test.correct"); const NamespaceString incorrectNamespace("test.incorrect"); auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), correctNamespace, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -231,7 +238,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) { // even if there is an existing cursor with the same namespace but a different cursor id. TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -243,7 +251,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { // current time. TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -261,7 +270,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) { // Test that killing a pinned cursor by id successfully kills the cursor. TEST_F(ClusterCursorManagerTest, KillCursorBasic) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -282,7 +292,8 @@ TEST_F(ClusterCursorManagerTest, KillCursorMultipleCursors) { // Register cursors and populate 'cursorIds' with the returned cursor ids. for (size_t i = 0; i < numCursors; ++i) { cursorIds[i] = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -308,7 +319,8 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) { const NamespaceString correctNamespace("test.correct"); const NamespaceString incorrectNamespace("test.incorrect"); auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), correctNamespace, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -320,7 +332,8 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) { // even if there is an existing cursor with the same namespace but a different cursor id. TEST_F(ClusterCursorManagerTest, KillCursorWrongCursorId) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -330,7 +343,8 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongCursorId) { // Test that killing all mortal expired cursors correctly kills a mortal expired cursor. TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) { - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -344,7 +358,8 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) { TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipUnexpired) { Date_t timeBeforeCursorCreation = getClockSource()->now(); getClockSource()->advance(Milliseconds(1)); - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -356,7 +371,8 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipUnexpired) { // Test that killing all mortal expired cursors does not kill a cursor that is immortal. TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipImmortal) { - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Immortal); @@ -376,7 +392,8 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors) if (i < numKilledCursorsExpected) { cutoff = getClockSource()->now(); } - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -400,7 +417,8 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors) TEST_F(ClusterCursorManagerTest, KillAllCursors) { const size_t numCursors = 10; for (size_t i = 0; i < numCursors; ++i) { - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -419,7 +437,8 @@ TEST_F(ClusterCursorManagerTest, KillAllCursors) { // cursor. TEST_F(ClusterCursorManagerTest, ReapZombieCursorsBasic) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -433,7 +452,8 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsBasic) { // that is still pinned. TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -446,7 +466,8 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) { // Test that reaping does not call kill() on the underlying ClusterClientCursor for cursors that // haven't been killed. TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipNonZombies) { - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -464,7 +485,8 @@ TEST_F(ClusterCursorManagerTest, StatsInitAsZero) { // Test that registering a sharded cursor updates the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) { - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -473,7 +495,8 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) { // Test that registering a not-sharded cursor updates the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) { - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -483,7 +506,8 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) { // Test that checking out a cursor updates the pinned counter in stats(). TEST_F(ClusterCursorManagerTest, StatsPinCursor) { auto cursorId = - assertGet(getManager()->registerCursor(allocateMockCursor(), + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -496,7 +520,8 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) { TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) { const size_t numShardedCursors = 10; for (size_t i = 0; i < numShardedCursors; ++i) { - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -505,7 +530,8 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) { } const size_t numNotShardedCursors = 10; for (size_t i = 0; i < numNotShardedCursors; ++i) { - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal); @@ -517,7 +543,8 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) { // Test that killing a sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsKillShardedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(allocateMockCursor(), + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -529,7 +556,8 @@ TEST_F(ClusterCursorManagerTest, StatsKillShardedCursor) { // Test that killing a not-sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsKillNotShardedCursor) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -541,7 +569,8 @@ TEST_F(ClusterCursorManagerTest, StatsKillNotShardedCursor) { // Test that killing a pinned cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(allocateMockCursor(), + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -554,13 +583,14 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) { // Test that exhausting a sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) { auto cursorId = - assertGet(getManager()->registerCursor(allocateMockCursor(), + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next().getStatus()); + ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsSharded); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0U, getManager()->stats().cursorsSharded); @@ -569,13 +599,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) { // Test that exhausting a not-sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next().getStatus()); + ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded); @@ -585,13 +616,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) { // stats(). TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next().getStatus()); + ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); @@ -601,13 +633,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { // stats(). TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next().getStatus()); + ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); @@ -616,7 +649,8 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { // Test that getting the namespace for a cursor returns the correct namespace. TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdBasic) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -633,7 +667,8 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsSameNames std::vector cursorIds(numCursors); for (size_t i = 0; i < numCursors; ++i) { cursorIds[i] = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -654,7 +689,8 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsDifferent for (size_t i = 0; i < numCursors; ++i) { NamespaceString cursorNamespace(std::string(str::stream() << "test.collection" << i)); auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), cursorNamespace, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -684,7 +720,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDefaultConstructor) { // cursor. TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -702,7 +739,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) { // cursor, and leaves the pin owning no cursor. TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -710,7 +748,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); - ASSERT_OK(registeredCursor.getValue().next().getStatus()); + ASSERT_OK(registeredCursor.getValue().next(nullptr).getStatus()); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); @@ -733,7 +771,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust mockCursor->markRemotesNotExhausted(); auto cursorId = assertGet( - getManager()->registerCursor(std::move(mockCursor), + getManager()->registerCursor(nullptr, + std::move(mockCursor), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -741,7 +780,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); - ASSERT_OK(registeredCursor.getValue().next().getStatus()); + ASSERT_OK(registeredCursor.getValue().next(nullptr).getStatus()); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); @@ -756,7 +795,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust // been returned. TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -771,7 +811,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) { TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) { { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -789,7 +830,8 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { ASSERT_FALSE(mockCursor->remotesExhausted()); auto cursorId = assertGet( - getManager()->registerCursor(std::move(mockCursor), + getManager()->registerCursor(nullptr, + std::move(mockCursor), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -801,7 +843,8 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { // Test that killed cursors which are still pinned are not reaped. TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -822,7 +865,8 @@ TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) { } TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) { - ASSERT_OK(getManager()->registerCursor(allocateMockCursor(), + ASSERT_OK(getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -834,7 +878,8 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) { ASSERT_EQUALS( ErrorCodes::ShutdownInProgress, - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); @@ -842,7 +887,8 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) { TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) { auto cursorId = assertGet( - getManager()->registerCursor(allocateMockCursor(), + getManager()->registerCursor(nullptr, + allocateMockCursor(), nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index c1d32e5c6be..b8f0a2b6eba 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -187,7 +187,6 @@ StatusWith runQueryWithoutRetrying(OperationContext* txn, params.isTailable = query.getQueryRequest().isTailable(); params.isAwaitData = query.getQueryRequest().isAwaitData(); params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults(); - params.txn = txn; // This is the batchSize passed to each subsequent getMore command issued by the cursor. We // usually use the batchSize associated with the initial find, but as it is illegal to send a @@ -232,12 +231,12 @@ StatusWith runQueryWithoutRetrying(OperationContext* txn, } auto ccc = ClusterClientCursorImpl::make( - Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); + txn, Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; int bytesBuffered = 0; while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) { - auto next = ccc->next(); + auto next = ccc->next(txn); if (!next.isOK()) { return next.getStatus(); } @@ -294,7 +293,7 @@ StatusWith runQueryWithoutRetrying(OperationContext* txn, ? ClusterCursorManager::CursorLifetime::Immortal : ClusterCursorManager::CursorLifetime::Mortal; return cursorManager->registerCursor( - ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime); + txn, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime); } } // namespace @@ -389,7 +388,7 @@ StatusWith ClusterFind::runGetMore(OperationContext* txn, long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar(); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { - auto next = pinnedCursor.getValue().next(); + auto next = pinnedCursor.getValue().next(txn); if (!next.isOK()) { return next.getStatus(); } diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 726bd2df97b..5fcb6053e58 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -66,13 +66,13 @@ public: * holding on to a subset of the returned results and need to minimize memory usage, call copy() * on the BSONObjs. */ - virtual StatusWith next() = 0; + virtual StatusWith next(OperationContext* txn) = 0; /** * Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for * responses from remote hosts. */ - virtual void kill() = 0; + virtual void kill(OperationContext* txn) = 0; /** * Returns whether or not all the remote cursors are exhausted. @@ -88,13 +88,6 @@ public: */ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; - /** - * Update the operation context for remote requests. - * - * Network requests depend on having a valid operation context for user initiated actions. - */ - virtual void setOperationContext(OperationContext* txn) = 0; - protected: /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index 4756423249c..4a1a428a533 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -39,12 +39,12 @@ RouterStageLimit::RouterStageLimit(std::unique_ptr child, long invariant(limit > 0); } -StatusWith RouterStageLimit::next() { +StatusWith RouterStageLimit::next(OperationContext* txn) { if (_returnedSoFar >= _limit) { return {ClusterQueryResult()}; } - auto childResult = getChildStage()->next(); + auto childResult = getChildStage()->next(txn); if (!childResult.isOK()) { return childResult; } @@ -55,8 +55,8 @@ StatusWith RouterStageLimit::next() { return childResult; } -void RouterStageLimit::kill() { - getChildStage()->kill(); +void RouterStageLimit::kill(OperationContext* txn) { + getChildStage()->kill(txn); } bool RouterStageLimit::remotesExhausted() { @@ -67,8 +67,4 @@ Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } -void RouterStageLimit::setOperationContext(OperationContext* txn) { - return getChildStage()->setOperationContext(txn); -} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 8b29b56f291..29fb85dd458 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -39,16 +39,14 @@ class RouterStageLimit final : public RouterExecStage { public: RouterStageLimit(std::unique_ptr child, long long limit); - StatusWith next() final; + StatusWith next(OperationContext* txn) final; - void kill() final; + void kill(OperationContext* txn) final; bool remotesExhausted() final; Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - void setOperationContext(OperationContext* txn) final; - private: long long _limit; diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp index 11c245f67ec..b0b70a90cc8 100644 --- a/src/mongo/s/query/router_stage_limit_test.cpp +++ b/src/mongo/s/query/router_stage_limit_test.cpp @@ -40,6 +40,10 @@ namespace mongo { namespace { +// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*, +// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use +// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests. + TEST(RouterStageLimitTest, LimitIsOne) { auto mockStage = stdx::make_unique(); mockStage->queueResult({BSON("a" << 1)}); @@ -48,17 +52,17 @@ TEST(RouterStageLimitTest, LimitIsOne) { auto limitStage = stdx::make_unique(std::move(mockStage), 1); - auto firstResult = limitStage->next(); + auto firstResult = limitStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); - auto secondResult = limitStage->next(); + auto secondResult = limitStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(!secondResult.getValue().getResult()); // Once end-of-stream is reached, the limit stage should keep returning no results. - auto thirdResult = limitStage->next(); + auto thirdResult = limitStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(!thirdResult.getValue().getResult()); } @@ -71,17 +75,17 @@ TEST(RouterStageLimitTest, LimitIsTwo) { auto limitStage = stdx::make_unique(std::move(mockStage), 2); - auto firstResult = limitStage->next(); + auto firstResult = limitStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); - auto secondResult = limitStage->next(); + auto secondResult = limitStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); - auto thirdResult = limitStage->next(); + auto thirdResult = limitStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(!thirdResult.getValue().getResult()); } @@ -95,12 +99,12 @@ TEST(RouterStageLimitTest, LimitStagePropagatesError) { auto limitStage = stdx::make_unique(std::move(mockStage), 3); - auto firstResult = limitStage->next(); + auto firstResult = limitStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); - auto secondResult = limitStage->next(); + auto secondResult = limitStage->next(nullptr); ASSERT_NOT_OK(secondResult.getStatus()); ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); @@ -120,7 +124,7 @@ TEST(RouterStageLimitTest, LimitStagePropagatesViewDefinition) { auto limitStage = stdx::make_unique(std::move(mockStage), 3); - auto result = limitStage->next(); + auto result = limitStage->next(nullptr); ASSERT_OK(result.getStatus()); ASSERT(!result.getValue().getResult()); ASSERT(result.getValue().getViewDefinition()); @@ -139,21 +143,21 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) { auto limitStage = stdx::make_unique(std::move(mockStage), 2); - auto firstResult = limitStage->next(); + auto firstResult = limitStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); - auto secondResult = limitStage->next(); + auto secondResult = limitStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().isEOF()); - auto thirdResult = limitStage->next(); + auto thirdResult = limitStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2)); - auto fourthResult = limitStage->next(); + auto fourthResult = limitStage->next(nullptr); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().isEOF()); } @@ -167,19 +171,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) { auto limitStage = stdx::make_unique(std::move(mockStage), 100); ASSERT_TRUE(limitStage->remotesExhausted()); - auto firstResult = limitStage->next(); + auto firstResult = limitStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); ASSERT_TRUE(limitStage->remotesExhausted()); - auto secondResult = limitStage->next(); + auto secondResult = limitStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(limitStage->remotesExhausted()); - auto thirdResult = limitStage->next(); + auto thirdResult = limitStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(limitStage->remotesExhausted()); diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index a3976ffb40d..f5ecc75728a 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -40,9 +40,9 @@ RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams&& params) : _executor(executor), _arm(executor, std::move(params)) {} -StatusWith RouterStageMerge::next() { +StatusWith RouterStageMerge::next(OperationContext* txn) { while (!_arm.ready()) { - auto nextEventStatus = _arm.nextEvent(); + auto nextEventStatus = _arm.nextEvent(txn); if (!nextEventStatus.isOK()) { return nextEventStatus.getStatus(); } @@ -55,8 +55,8 @@ StatusWith RouterStageMerge::next() { return _arm.nextReady(); } -void RouterStageMerge::kill() { - auto killEvent = _arm.kill(); +void RouterStageMerge::kill(OperationContext* txn) { + auto killEvent = _arm.kill(txn); if (!killEvent) { // Mongos is shutting down. return; @@ -72,8 +72,4 @@ Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return _arm.setAwaitDataTimeout(awaitDataTimeout); } -void RouterStageMerge::setOperationContext(OperationContext* txn) { - return _arm.setOperationContext(txn); -} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 9f2c2e9e0c4..4c69925e0d3 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -45,16 +45,14 @@ class RouterStageMerge final : public RouterExecStage { public: RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams&& params); - StatusWith next() final; + StatusWith next(OperationContext* txn) final; - void kill() final; + void kill(OperationContext* txn) final; bool remotesExhausted() final; Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - void setOperationContext(OperationContext* txn) final; - private: // Not owned here. executor::TaskExecutor* _executor; diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index b2aa83e3ed6..c348018fe6f 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -50,7 +50,7 @@ void RouterStageMock::markRemotesExhausted() { _remotesExhausted = true; } -StatusWith RouterStageMock::next() { +StatusWith RouterStageMock::next(OperationContext* txn) { if (_resultsQueue.empty()) { return {ClusterQueryResult()}; } @@ -60,7 +60,7 @@ StatusWith RouterStageMock::next() { return out; } -void RouterStageMock::kill() { +void RouterStageMock::kill(OperationContext* txn) { // No child to kill. } @@ -73,10 +73,6 @@ Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return Status::OK(); } -void RouterStageMock::setOperationContext(OperationContext* txn) { - // Do nothing -} - StatusWith RouterStageMock::getAwaitDataTimeout() { if (!_awaitDataTimeout) { return Status(ErrorCodes::BadValue, "no awaitData timeout set"); diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index ef093b04fe4..dce077d8122 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -44,16 +44,14 @@ class RouterStageMock final : public RouterExecStage { public: ~RouterStageMock() final {} - StatusWith next() final; + StatusWith next(OperationContext* txn) final; - void kill() final; + void kill(OperationContext* txn) final; bool remotesExhausted() final; Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - void setOperationContext(OperationContext* txn) final; - /** * Queues a BSONObj to be returned. */ diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp index 77d3e26afd0..9c58e489b13 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp @@ -41,8 +41,8 @@ namespace mongo { RouterStageRemoveSortKey::RouterStageRemoveSortKey(std::unique_ptr child) : RouterExecStage(std::move(child)) {} -StatusWith RouterStageRemoveSortKey::next() { - auto childResult = getChildStage()->next(); +StatusWith RouterStageRemoveSortKey::next(OperationContext* txn) { + auto childResult = getChildStage()->next(txn); if (!childResult.isOK() || !childResult.getValue().getResult()) { return childResult; } @@ -59,8 +59,8 @@ StatusWith RouterStageRemoveSortKey::next() { return {builder.obj()}; } -void RouterStageRemoveSortKey::kill() { - getChildStage()->kill(); +void RouterStageRemoveSortKey::kill(OperationContext* txn) { + getChildStage()->kill(txn); } bool RouterStageRemoveSortKey::remotesExhausted() { @@ -71,8 +71,4 @@ Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeo return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } -void RouterStageRemoveSortKey::setOperationContext(OperationContext* txn) { - return getChildStage()->setOperationContext(txn); -} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h index 79294aeb20a..291cf01a803 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ b/src/mongo/s/query/router_stage_remove_sortkey.h @@ -41,15 +41,13 @@ class RouterStageRemoveSortKey final : public RouterExecStage { public: RouterStageRemoveSortKey(std::unique_ptr child); - StatusWith next() final; + StatusWith next(OperationContext* txn) final; - void kill() final; + void kill(OperationContext* txn) final; bool remotesExhausted() final; Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - - void setOperationContext(OperationContext* txn) final; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp index d8e1f51605d..e9f338b9e5f 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp @@ -40,6 +40,10 @@ namespace mongo { namespace { +// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*, +// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use +// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests. + TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { auto mockStage = stdx::make_unique(); mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3)); @@ -50,29 +54,29 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { auto sortKeyStage = stdx::make_unique(std::move(mockStage)); - auto firstResult = sortKeyStage->next(); + auto firstResult = sortKeyStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4 << "b" << 3)); - auto secondResult = sortKeyStage->next(); + auto secondResult = sortKeyStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("c" << BSON("d" << "foo"))); - auto thirdResult = sortKeyStage->next(); + auto thirdResult = sortKeyStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3)); - auto fourthResult = sortKeyStage->next(); + auto fourthResult = sortKeyStage->next(nullptr); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSONObj()); - auto fifthResult = sortKeyStage->next(); + auto fifthResult = sortKeyStage->next(nullptr); ASSERT_OK(fifthResult.getStatus()); ASSERT(fifthResult.getValue().isEOF()); } @@ -84,12 +88,12 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) { auto sortKeyStage = stdx::make_unique(std::move(mockStage)); - auto firstResult = sortKeyStage->next(); + auto firstResult = sortKeyStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSONObj()); - auto secondResult = sortKeyStage->next(); + auto secondResult = sortKeyStage->next(nullptr); ASSERT_NOT_OK(secondResult.getStatus()); ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); @@ -103,21 +107,21 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { auto sortKeyStage = stdx::make_unique(std::move(mockStage)); - auto firstResult = sortKeyStage->next(); + auto firstResult = sortKeyStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); - auto secondResult = sortKeyStage->next(); + auto secondResult = sortKeyStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().isEOF()); - auto thirdResult = sortKeyStage->next(); + auto thirdResult = sortKeyStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); - auto fourthResult = sortKeyStage->next(); + auto fourthResult = sortKeyStage->next(nullptr); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().isEOF()); } @@ -131,19 +135,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { auto sortKeyStage = stdx::make_unique(std::move(mockStage)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); - auto firstResult = sortKeyStage->next(); + auto firstResult = sortKeyStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); - auto secondResult = sortKeyStage->next(); + auto secondResult = sortKeyStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); - auto thirdResult = sortKeyStage->next(); + auto thirdResult = sortKeyStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(sortKeyStage->remotesExhausted()); diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index f9a63e515ff..e961fe60f13 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -39,9 +39,9 @@ RouterStageSkip::RouterStageSkip(std::unique_ptr child, long lo invariant(skip > 0); } -StatusWith RouterStageSkip::next() { +StatusWith RouterStageSkip::next(OperationContext* txn) { while (_skippedSoFar < _skip) { - auto next = getChildStage()->next(); + auto next = getChildStage()->next(txn); if (!next.isOK()) { return next; } @@ -57,11 +57,11 @@ StatusWith RouterStageSkip::next() { ++_skippedSoFar; } - return getChildStage()->next(); + return getChildStage()->next(txn); } -void RouterStageSkip::kill() { - getChildStage()->kill(); +void RouterStageSkip::kill(OperationContext* txn) { + getChildStage()->kill(txn); } bool RouterStageSkip::remotesExhausted() { @@ -72,8 +72,4 @@ Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } -void RouterStageSkip::setOperationContext(OperationContext* txn) { - return getChildStage()->setOperationContext(txn); -} - } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index fda4201f9cb..c949271f79e 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -39,16 +39,14 @@ class RouterStageSkip final : public RouterExecStage { public: RouterStageSkip(std::unique_ptr child, long long skip); - StatusWith next() final; + StatusWith next(OperationContext* txn) final; - void kill() final; + void kill(OperationContext* txn) final; bool remotesExhausted() final; Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; - void setOperationContext(OperationContext* txn) final; - private: long long _skip; diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp index 242a032375a..56182fb18a5 100644 --- a/src/mongo/s/query/router_stage_skip_test.cpp +++ b/src/mongo/s/query/router_stage_skip_test.cpp @@ -40,6 +40,10 @@ namespace mongo { namespace { +// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*, +// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use +// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests. + TEST(RouterStageSkipTest, SkipIsOne) { auto mockStage = stdx::make_unique(); mockStage->queueResult(BSON("a" << 1)); @@ -48,22 +52,22 @@ TEST(RouterStageSkipTest, SkipIsOne) { auto skipStage = stdx::make_unique(std::move(mockStage), 1); - auto firstResult = skipStage->next(); + auto firstResult = skipStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 2)); - auto secondResult = skipStage->next(); + auto secondResult = skipStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); // Once end-of-stream is reached, the skip stage should keep returning boost::none. - auto thirdResult = skipStage->next(); + auto thirdResult = skipStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); - auto fourthResult = skipStage->next(); + auto fourthResult = skipStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); } @@ -77,12 +81,12 @@ TEST(RouterStageSkipTest, SkipIsThree) { auto skipStage = stdx::make_unique(std::move(mockStage), 3); - auto firstResult = skipStage->next(); + auto firstResult = skipStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4)); - auto secondResult = skipStage->next(); + auto secondResult = skipStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().isEOF()); } @@ -95,7 +99,7 @@ TEST(RouterStageSkipTest, SkipEqualToResultSetSize) { auto skipStage = stdx::make_unique(std::move(mockStage), 3); - auto firstResult = skipStage->next(); + auto firstResult = skipStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().isEOF()); } @@ -108,7 +112,7 @@ TEST(RouterStageSkipTest, SkipExceedsResultSetSize) { auto skipStage = stdx::make_unique(std::move(mockStage), 100); - auto firstResult = skipStage->next(); + auto firstResult = skipStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().isEOF()); } @@ -122,7 +126,7 @@ TEST(RouterStageSkipTest, ErrorWhileSkippingResults) { auto skipStage = stdx::make_unique(std::move(mockStage), 2); - auto firstResult = skipStage->next(); + auto firstResult = skipStage->next(nullptr); ASSERT_NOT_OK(firstResult.getStatus()); ASSERT_EQ(firstResult.getStatus(), ErrorCodes::BadValue); ASSERT_EQ(firstResult.getStatus().reason(), "bad thing happened"); @@ -137,12 +141,12 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) { auto skipStage = stdx::make_unique(std::move(mockStage), 2); - auto firstResult = skipStage->next(); + auto firstResult = skipStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 3)); - auto secondResult = skipStage->next(); + auto secondResult = skipStage->next(nullptr); ASSERT_NOT_OK(secondResult.getStatus()); ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); @@ -160,7 +164,7 @@ TEST(RouterStageSkipTest, SkipStagePropagatesViewDefinition) { auto skipStage = stdx::make_unique(std::move(mockStage), 3); - auto result = skipStage->next(); + auto result = skipStage->next(nullptr); ASSERT_OK(result.getStatus()); ASSERT(!result.getValue().getResult()); ASSERT(result.getValue().getViewDefinition()); @@ -181,16 +185,16 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) { auto skipStage = stdx::make_unique(std::move(mockStage), 2); - auto firstResult = skipStage->next(); + auto firstResult = skipStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().isEOF()); - auto secondResult = skipStage->next(); + auto secondResult = skipStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); - auto thirdResult = skipStage->next(); + auto thirdResult = skipStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); } @@ -205,19 +209,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) { auto skipStage = stdx::make_unique(std::move(mockStage), 1); ASSERT_TRUE(skipStage->remotesExhausted()); - auto firstResult = skipStage->next(); + auto firstResult = skipStage->next(nullptr); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(skipStage->remotesExhausted()); - auto secondResult = skipStage->next(); + auto secondResult = skipStage->next(nullptr); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); ASSERT_TRUE(skipStage->remotesExhausted()); - auto thirdResult = skipStage->next(); + auto thirdResult = skipStage->next(nullptr); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(skipStage->remotesExhausted()); diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 13477f1e14e..1e3b7d03306 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -39,7 +39,8 @@ namespace mongo { -StatusWith storePossibleCursor(const HostAndPort& server, +StatusWith storePossibleCursor(OperationContext* txn, + const HostAndPort& server, const BSONObj& cmdResult, const NamespaceString& requestedNss, executor::TaskExecutor* executor, @@ -61,10 +62,11 @@ StatusWith storePossibleCursor(const HostAndPort& server, params.remotes.emplace_back(server, incomingCursorResponse.getValue().getCursorId()); - auto ccc = ClusterClientCursorImpl::make(executor, std::move(params)); + auto ccc = ClusterClientCursorImpl::make(txn, executor, std::move(params)); auto clusterCursorId = - cursorManager->registerCursor(ccc.releaseCursor(), + cursorManager->registerCursor(txn, + ccc.releaseCursor(), requestedNss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal); diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index e290556f7ea..f06c959b41c 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" namespace mongo { @@ -56,7 +57,8 @@ class TaskExecutor; * BSONObj response document describing the newly-created cursor, which is suitable for returning to * the client. */ -StatusWith storePossibleCursor(const HostAndPort& server, +StatusWith storePossibleCursor(OperationContext* txn, + const HostAndPort& server, const BSONObj& cmdResult, const NamespaceString& requestedNss, executor::TaskExecutor* executor, diff --git a/src/mongo/s/query/store_possible_cursor_test.cpp b/src/mongo/s/query/store_possible_cursor_test.cpp index 7e7b5d671f0..d542e1ee760 100644 --- a/src/mongo/s/query/store_possible_cursor_test.cpp +++ b/src/mongo/s/query/store_possible_cursor_test.cpp @@ -62,7 +62,8 @@ TEST_F(StorePossibleCursorTest, ReturnsValidCursorResponse) { std::vector batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; CursorResponse cursorResponse(nss, CursorId(0), batch); auto outgoingCursorResponse = - storePossibleCursor(hostAndPort, + storePossibleCursor(nullptr, // OperationContext* + hostAndPort, cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse), nss, nullptr, // TaskExecutor @@ -80,7 +81,8 @@ TEST_F(StorePossibleCursorTest, ReturnsValidCursorResponse) { // Test that storePossibleCursor() propagates an error if it cannot parse the cursor response. TEST_F(StorePossibleCursorTest, FailsGracefullyOnBadCursorResponseDocument) { - auto outgoingCursorResponse = storePossibleCursor(hostAndPort, + auto outgoingCursorResponse = storePossibleCursor(nullptr, // OperationContext* + hostAndPort, fromjson("{ok: 1, cursor: {}}"), nss, nullptr, // TaskExecutor @@ -94,7 +96,8 @@ TEST_F(StorePossibleCursorTest, FailsGracefullyOnBadCursorResponseDocument) { TEST_F(StorePossibleCursorTest, PassesUpCommandResultIfItDoesNotDescribeACursor) { BSONObj notACursorObj = BSON("not" << "cursor"); - auto outgoingCursorResponse = storePossibleCursor(hostAndPort, + auto outgoingCursorResponse = storePossibleCursor(nullptr, // OperationContext* + hostAndPort, notACursorObj, nss, nullptr, // TaskExecutor -- cgit v1.2.1