summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-02-09 15:08:31 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-02-09 18:19:26 -0500
commit31e5c31a79b2043d0fa4288c0435fdfce4348343 (patch)
tree8c02d5cc58ea067dbc5af6722eccdf16707a9b9c /src/mongo/s/query
parent0e3e4b1dbb3fa475abc29033171e26d194028391 (diff)
downloadmongo-31e5c31a79b2043d0fa4288c0435fdfce4348343.tar.gz
SERVER-27965 thread OperationContext* down through ClusterClientCursor's next() and kill() methods
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp46
-rw-r--r--src/mongo/s/query/async_results_merger.h16
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp168
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h11
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp24
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h12
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp26
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp9
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h4
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp23
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h13
-rw-r--r--src/mongo/s/query/cluster_cursor_manager_test.cpp162
-rw-r--r--src/mongo/s/query/cluster_find.cpp9
-rw-r--r--src/mongo/s/query/router_exec_stage.h11
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp12
-rw-r--r--src/mongo/s/query/router_stage_limit.h6
-rw-r--r--src/mongo/s/query/router_stage_limit_test.cpp36
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp12
-rw-r--r--src/mongo/s/query/router_stage_merge.h6
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp8
-rw-r--r--src/mongo/s/query/router_stage_mock.h6
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp12
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h6
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey_test.cpp32
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp14
-rw-r--r--src/mongo/s/query/router_stage_skip.h6
-rw-r--r--src/mongo/s/query/router_stage_skip_test.cpp40
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp8
-rw-r--r--src/mongo/s/query/store_possible_cursor.h4
-rw-r--r--src/mongo/s/query/store_possible_cursor_test.cpp9
31 files changed, 374 insertions, 383 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 95223473d5e..1db4a63c79a 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -114,11 +114,6 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return Status::OK();
}
-void AsyncResultsMerger::setOperationContext(OperationContext* txn) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _params.txn = txn;
-}
-
bool AsyncResultsMerger::ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return ready_inlock();
@@ -258,7 +253,7 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
return {};
}
-Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
+Status AsyncResultsMerger::askForNextBatch_inlock(OperationContext* txn, size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -295,16 +290,16 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
cmdObj = *remote.initialCmdObj;
}
- executor::RemoteCommandRequest request(remote.getTargetHost(),
- _params.nsString.db().toString(),
- cmdObj,
- _metadataObj,
- _params.txn);
+ executor::RemoteCommandRequest request(
+ remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _metadataObj, txn);
- auto callbackStatus = _executor->scheduleRemoteCommand(
- request,
- stdx::bind(
- &AsyncResultsMerger::handleBatchResponse, this, stdx::placeholders::_1, remoteIndex));
+ auto callbackStatus =
+ _executor->scheduleRemoteCommand(request,
+ stdx::bind(&AsyncResultsMerger::handleBatchResponse,
+ this,
+ stdx::placeholders::_1,
+ txn,
+ remoteIndex));
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -321,7 +316,8 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
* 2. Remotes that already has some result will have a non-empty buffer.
* 3. Remotes that reached maximum retries will be in 'exhausted' state.
*/
-StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() {
+StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent(
+ OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_lifecycleState != kAlive) {
@@ -349,7 +345,7 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent()
// If we already have established a cursor with this remote, and there is no outstanding
// request for which we have a valid callback handle, then schedule work to retrieve the
// next batch.
- auto nextBatchStatus = askForNextBatch_inlock(i);
+ auto nextBatchStatus = askForNextBatch_inlock(txn, i);
if (!nextBatchStatus.isOK()) {
return nextBatchStatus;
}
@@ -394,7 +390,9 @@ StatusWith<CursorResponse> AsyncResultsMerger::parseCursorResponse(const BSONObj
}
void AsyncResultsMerger::handleBatchResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
+ OperationContext* txn,
+ size_t remoteIndex) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto& remote = _remotes[remoteIndex];
@@ -428,7 +426,7 @@ void AsyncResultsMerger::handleBatchResponse(
// If the event handle is invalid, then the executor is in the middle of shutting down,
// and we can't schedule any more work for it to complete.
if (_killCursorsScheduledEvent.isValid()) {
- scheduleKillCursors_inlock();
+ scheduleKillCursors_inlock(txn);
_executor->signalEvent(_killCursorsScheduledEvent);
}
@@ -583,7 +581,7 @@ void AsyncResultsMerger::handleBatchResponse(
// We do not ask for the next batch if the cursor is tailable, as batches received from remote
// tailable cursors should be passed through to the client without asking for more batches.
if (!_params.isTailable && !remote.hasNext() && !remote.exhausted()) {
- remote.status = askForNextBatch_inlock(remoteIndex);
+ remote.status = askForNextBatch_inlock(txn, remoteIndex);
if (!remote.status.isOK()) {
return;
}
@@ -614,7 +612,7 @@ bool AsyncResultsMerger::haveOutstandingBatchRequests_inlock() {
return false;
}
-void AsyncResultsMerger::scheduleKillCursors_inlock() {
+void AsyncResultsMerger::scheduleKillCursors_inlock(OperationContext* txn) {
invariant(_lifecycleState == kKillStarted);
invariant(_killCursorsScheduledEvent.isValid());
@@ -625,7 +623,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock() {
BSONObj cmdObj = KillCursorsRequest(_params.nsString, {*remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _params.txn);
+ remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, txn);
_executor->scheduleRemoteCommand(
request,
@@ -639,7 +637,7 @@ void AsyncResultsMerger::handleKillCursorsResponse(
// We just ignore any killCursors command responses.
}
-executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() {
+executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_killCursorsScheduledEvent.isValid()) {
invariant(_lifecycleState != kAlive);
@@ -665,7 +663,7 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill() {
// remotes now. Otherwise, we have to wait until all responses are back, and then we can kill
// the remote cursors.
if (!haveOutstandingBatchRequests_inlock()) {
- scheduleKillCursors_inlock();
+ scheduleKillCursors_inlock(txn);
_lifecycleState = kKillComplete;
_executor->signalEvent(_killCursorsScheduledEvent);
}
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index fb3cd3262c5..1bee485e09e 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -102,13 +102,6 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
/**
- * Update the operation context for remote requests.
- *
- * Network requests depend on having a valid operation context for user initiated actions.
- */
- void setOperationContext(OperationContext* txn);
-
- /**
* Returns true if there is no need to schedule remote work in order to take the next action.
* This means that either
* --there is a buffered result which we can return,
@@ -161,7 +154,7 @@ public:
* the caller should call nextEvent() to retry the request on the hosts that errored. If
* ready() is true, then either the error was not retriable or it has exhausted max retries.
*/
- StatusWith<executor::TaskExecutor::EventHandle> nextEvent();
+ StatusWith<executor::TaskExecutor::EventHandle> nextEvent(OperationContext* txn);
/**
* Starts shutting down this ARM. Returns a handle to an event which is signaled when this
@@ -176,7 +169,7 @@ public:
*
* May be called multiple times (idempotent).
*/
- executor::TaskExecutor::EventHandle kill();
+ executor::TaskExecutor::EventHandle kill(OperationContext* txn);
private:
/**
@@ -298,7 +291,7 @@ private:
*
* Returns success if the command to retrieve the next batch was scheduled successfully.
*/
- Status askForNextBatch_inlock(size_t remoteIndex);
+ Status askForNextBatch_inlock(OperationContext* txn, size_t remoteIndex);
/**
* Checks whether or not the remote cursors are all exhausted.
@@ -329,6 +322,7 @@ private:
* buffered.
*/
void handleBatchResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
+ OperationContext* txn,
size_t remoteIndex);
/**
@@ -348,7 +342,7 @@ private:
/**
* Schedules a killCursors command to be run on all remote hosts that have open cursors.
*/
- void scheduleKillCursors_inlock();
+ void scheduleKillCursors_inlock(OperationContext* txn);
// Not owned here.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 94a6acf31ec..d9bd5c058ac 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -228,7 +228,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
@@ -276,7 +276,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -304,7 +304,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
@@ -330,7 +330,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
ASSERT_FALSE(arm->remotesExhausted());
@@ -353,7 +353,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -396,7 +396,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -426,7 +426,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
*unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
responses.clear();
@@ -448,7 +448,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
*unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
responses.clear();
@@ -477,7 +477,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -520,7 +520,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Parsing the batch results in an error because the sort key is missing.
@@ -536,7 +536,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) {
ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::InternalError);
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -549,7 +549,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]}, getMoreBatchSize);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Both shards give back empty responses. Second shard doesn't have any results so it
@@ -574,7 +574,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// The shard responds with another empty batch but leaves the cursor open. It probably shouldn't
@@ -604,7 +604,7 @@ TEST_F(AsyncResultsMergerTest, ReceivedViewDefinitionFromShard) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::string inputNs = "views_sharded.coll";
@@ -634,7 +634,7 @@ TEST_F(AsyncResultsMergerTest, ExistingCursors) {
makeCursorFromExistingCursors({{kTestShardHosts[0], 5}, {kTestShardHosts[1], 6}});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -664,7 +664,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Both shards respond with the first batch.
@@ -686,7 +686,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// When we ask the shards for their next batch, the first shard responds and the second shard
@@ -705,7 +705,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// We can continue to return results from first shard, while second shard remains unresponsive.
@@ -724,7 +724,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
// Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't
// wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of
// the network interface.
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
ASSERT_TRUE(killEvent.isValid());
}
@@ -733,7 +733,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -751,7 +751,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
responses.clear();
@@ -766,7 +766,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
ASSERT(!arm->nextReady().isOK());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -775,7 +775,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
@@ -793,7 +793,7 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
ASSERT(!statusWithNext.isOK());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -802,7 +802,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -822,7 +822,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
ASSERT_EQ(statusWithNext.getStatus().reason(), "bad thing happened");
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -831,10 +831,10 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
// Error to call nextEvent() before the previous event is signaled.
- ASSERT_NOT_OK(arm->nextEvent().getStatus());
+ ASSERT_NOT_OK(arm->nextEvent(nullptr).getStatus());
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
@@ -850,7 +850,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -858,8 +858,8 @@ TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
makeCursorFromFindCmd(findCmd, kTestShardIds);
executor()->shutdown();
- ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent().getStatus());
- auto killEvent = arm->kill();
+ ASSERT_EQ(ErrorCodes::ShutdownInProgress, arm->nextEvent(nullptr).getStatus());
+ auto killEvent = arm->kill(nullptr);
ASSERT_FALSE(killEvent.isValid());
}
@@ -869,13 +869,13 @@ TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatch
// Make a request to the shard that will never get answered.
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
blackHoleNextRequest();
// Executor shuts down before a response is received.
executor()->shutdown();
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
ASSERT_FALSE(killEvent.isValid());
}
@@ -884,7 +884,7 @@ TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto killedEvent = arm->kill();
+ auto killedEvent = arm->kill(nullptr);
// Killed cursors are considered ready, but return an error when you try to receive the next
// doc.
@@ -899,7 +899,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -912,7 +912,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
// Kill should be able to return right away if there are no pending batches.
- auto killedEvent = arm->kill();
+ auto killedEvent = arm->kill(nullptr);
ASSERT_TRUE(arm->ready());
ASSERT_NOT_OK(arm->nextReady().getStatus());
executor()->waitForEvent(killedEvent);
@@ -923,7 +923,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -932,7 +932,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
// Kill event will only be signalled once the pending batches are received.
- auto killedEvent = arm->kill();
+ auto killedEvent = arm->kill(nullptr);
// After the kill, the ARM waits for outstanding batches to come back. This ensures that we
// receive cursor ids for any established remote cursors, and can clean them up by issuing
@@ -963,7 +963,7 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -979,10 +979,10 @@ TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) {
// This will schedule a getMore on cursor id 123.
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
- auto killedEvent = arm->kill();
+ auto killedEvent = arm->kill(nullptr);
// The kill can't complete until the getMore response is received.
responses.clear();
@@ -1010,7 +1010,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1018,10 +1018,10 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
responses.emplace_back(_nss, CursorId(1), batch1);
scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
- auto killedEvent = arm->kill();
+ auto killedEvent = arm->kill(nullptr);
// Attempting to schedule more network operations on a killed arm is an error.
- ASSERT_NOT_OK(arm->nextEvent().getStatus());
+ ASSERT_NOT_OK(arm->nextEvent(nullptr).getStatus());
executor()->waitForEvent(killedEvent);
}
@@ -1029,9 +1029,9 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
TEST_F(AsyncResultsMergerTest, KillCalledTwice) {
BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}");
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
- auto killedEvent1 = arm->kill();
+ auto killedEvent1 = arm->kill(nullptr);
ASSERT(killedEvent1.isValid());
- auto killedEvent2 = arm->kill();
+ auto killedEvent2 = arm->kill(nullptr);
ASSERT(killedEvent2.isValid());
executor()->waitForEvent(killedEvent1);
executor()->waitForEvent(killedEvent2);
@@ -1042,7 +1042,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1062,7 +1062,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
ASSERT_FALSE(arm->remotesExhausted());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
responses.clear();
@@ -1079,7 +1079,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
- auto killedEvent = arm->kill();
+ auto killedEvent = arm->kill(nullptr);
executor()->waitForEvent(killedEvent);
}
@@ -1088,7 +1088,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Remote responds with an empty batch and a non-zero cursor id.
@@ -1104,7 +1104,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
ASSERT_FALSE(arm->remotesExhausted());
- auto killedEvent = arm->kill();
+ auto killedEvent = arm->kill(nullptr);
executor()->waitForEvent(killedEvent);
}
@@ -1113,7 +1113,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Remote responds with an empty batch and a zero cursor id.
@@ -1135,7 +1135,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1155,7 +1155,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(0), batch2);
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
BSONObj scheduledCmd = getFirstPendingRequest().cmdObj;
auto request = GetMoreRequest::parseFromBSON("anydbname", scheduledCmd);
@@ -1178,7 +1178,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) {
findCmd, {kTestShardIds[0]}, boost::none, ReadPreferenceSetting(ReadPreference::Nearest));
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
BSONObj cmdRequestMetadata = getFirstPendingRequest().metadata;
@@ -1201,7 +1201,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
makeCursorFromFindCmd(findCmd, kTestShardIds);
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// An unretriable error occurs with the first host.
@@ -1224,7 +1224,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Now the second host becomes unreachable. We should still be willing to return results from
@@ -1243,7 +1243,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Once the last reachable shard indicates that its cursor is closed, we're done.
@@ -1263,7 +1263,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1278,7 +1278,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) {
ASSERT_BSONOBJ_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()).getResult());
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// The lone host involved in this query returns an error. This should simply cause us to return
@@ -1295,19 +1295,19 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkSingleNode) {
ASSERT_FALSE(arm->ready());
// First and second attempts return an error.
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
// Third attempt succeeds.
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(0), batch);
@@ -1326,7 +1326,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// All attempts return an error (one attempt plus three retries)
@@ -1334,19 +1334,19 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) {
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
executor()->waitForEvent(readyEvent);
@@ -1356,7 +1356,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnNotMasterNoSlaveOkAllFailSingleNode) {
ASSERT_EQ(status.getStatus().code(), ErrorCodes::NotMasterNoSlaveOk);
// Protocol is to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -1365,7 +1365,7 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// First host returns single result
@@ -1379,23 +1379,23 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"});
executor()->waitForEvent(readyEvent);
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1410,7 +1410,7 @@ TEST_F(AsyncResultsMergerTest, ErrorAtFirstAttemptAtSameTimeShouldEventuallyRetu
makeCursorFromFindCmd(findCmd, {kTestShardIds[0], kTestShardIds[1]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Both hosts return an error which indicates that the request should be retried.
@@ -1420,7 +1420,7 @@ TEST_F(AsyncResultsMergerTest, ErrorAtFirstAttemptAtSameTimeShouldEventuallyRetu
executor()->waitForEvent(readyEvent);
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
@@ -1458,7 +1458,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
std::vector<CursorResponse> responses;
@@ -1475,7 +1475,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
ASSERT_FALSE(arm->ready());
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
ASSERT_FALSE(arm->ready());
// Pending getMore request should include maxTimeMS.
@@ -1502,7 +1502,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -1510,7 +1510,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -1519,13 +1519,13 @@ TEST_F(AsyncResultsMergerTest, ShardCanErrorInBetweenReadyAndNextEvent) {
makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
ASSERT_FALSE(arm->ready());
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"});
- ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent().getStatus());
+ ASSERT_EQ(ErrorCodes::BadValue, arm->nextEvent(nullptr).getStatus());
// Required to kill the 'arm' on error before destruction.
- auto killEvent = arm->kill();
+ auto killEvent = arm->kill(nullptr);
executor()->waitForEvent(killEvent);
}
@@ -1536,11 +1536,11 @@ TEST_F(AsyncResultsMergerTest, RetryWhenShardHasRetriableErrorInBetweenReadyAndN
ASSERT_FALSE(arm->ready());
// First attempt returns a retriable error.
- auto readyEvent = unittest::assertGet(arm->nextEvent());
+ auto readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
scheduleErrorResponse({ErrorCodes::NotMasterNoSlaveOk, "not master and not slave"});
// We expect to be able to retrieve another event, and be waiting on the retry to succeed.
- readyEvent = unittest::assertGet(arm->nextEvent());
+ readyEvent = unittest::assertGet(arm->nextEvent(nullptr));
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch);
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index c4bb1c9373e..46b92eab642 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -64,7 +64,7 @@ public:
*
* A non-ok status is returned in case of any error.
*/
- virtual StatusWith<ClusterQueryResult> next() = 0;
+ virtual StatusWith<ClusterQueryResult> next(OperationContext* txn) = 0;
/**
* Must be called before destruction to abandon a not-yet-exhausted cursor. If next() has
@@ -72,7 +72,7 @@ public:
*
* May block waiting for responses from remote hosts.
*/
- virtual void kill() = 0;
+ virtual void kill(OperationContext* txn) = 0;
/**
* Returns whether or not this cursor is tailing a capped collection on a shard.
@@ -108,13 +108,6 @@ public:
* the cursor is not tailable + awaitData).
*/
virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
-
- /**
- * Update the operation context for remote requests.
- *
- * Network requests depend on having a valid operation context for user initiated actions.
- */
- virtual void setOperationContext(OperationContext* txn) = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1498164f9ac..6d0a81be161 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -41,12 +41,13 @@
namespace mongo {
-ClusterClientCursorGuard::ClusterClientCursorGuard(std::unique_ptr<ClusterClientCursor> ccc)
- : _ccc(std::move(ccc)) {}
+ClusterClientCursorGuard::ClusterClientCursorGuard(OperationContext* txn,
+ std::unique_ptr<ClusterClientCursor> ccc)
+ : _txn(txn), _ccc(std::move(ccc)) {}
ClusterClientCursorGuard::~ClusterClientCursorGuard() {
if (_ccc && !_ccc->remotesExhausted()) {
- _ccc->kill();
+ _ccc->kill(_txn);
}
}
@@ -58,11 +59,12 @@ std::unique_ptr<ClusterClientCursor> ClusterClientCursorGuard::releaseCursor() {
return std::move(_ccc);
}
-ClusterClientCursorGuard ClusterClientCursorImpl::make(executor::TaskExecutor* executor,
+ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* txn,
+ executor::TaskExecutor* executor,
ClusterClientCursorParams&& params) {
std::unique_ptr<ClusterClientCursor> cursor(
new ClusterClientCursorImpl(executor, std::move(params)));
- return ClusterClientCursorGuard(std::move(cursor));
+ return ClusterClientCursorGuard(txn, std::move(cursor));
}
ClusterClientCursorImpl::ClusterClientCursorImpl(executor::TaskExecutor* executor,
@@ -72,7 +74,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(executor::TaskExecutor* executo
ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root)
: _root(std::move(root)) {}
-StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
+StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next(OperationContext* txn) {
// First return stashed results, if there are any.
if (!_stash.empty()) {
auto front = std::move(_stash.front());
@@ -81,15 +83,15 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
return {front};
}
- auto next = _root->next();
+ auto next = _root->next(txn);
if (next.isOK() && !next.getValue().isEOF()) {
++_numReturnedSoFar;
}
return next;
}
-void ClusterClientCursorImpl::kill() {
- _root->kill();
+void ClusterClientCursorImpl::kill(OperationContext* txn) {
+ _root->kill(txn);
}
bool ClusterClientCursorImpl::isTailable() const {
@@ -122,10 +124,6 @@ Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeou
return _root->setAwaitDataTimeout(awaitDataTimeout);
}
-void ClusterClientCursorImpl::setOperationContext(OperationContext* txn) {
- return _root->setOperationContext(txn);
-}
-
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
executor::TaskExecutor* executor, ClusterClientCursorParams&& params) {
const auto skip = params.skip;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 71f018deec7..1974344abf5 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -50,7 +50,7 @@ class ClusterClientCursorGuard final {
MONGO_DISALLOW_COPYING(ClusterClientCursorGuard);
public:
- ClusterClientCursorGuard(std::unique_ptr<ClusterClientCursor> ccc);
+ ClusterClientCursorGuard(OperationContext* txn, std::unique_ptr<ClusterClientCursor> ccc);
/**
* If a cursor is owned, safely destroys the cursor, cleaning up remote cursor state if
@@ -74,6 +74,7 @@ public:
std::unique_ptr<ClusterClientCursor> releaseCursor();
private:
+ OperationContext* _txn;
std::unique_ptr<ClusterClientCursor> _ccc;
};
@@ -84,7 +85,8 @@ public:
/**
* Constructs a CCC whose safe cleanup is ensured by an RAII object.
*/
- static ClusterClientCursorGuard make(executor::TaskExecutor* executor,
+ static ClusterClientCursorGuard make(OperationContext* txn,
+ executor::TaskExecutor* executor,
ClusterClientCursorParams&& params);
/**
@@ -92,9 +94,9 @@ public:
*/
ClusterClientCursorImpl(std::unique_ptr<RouterStageMock> root);
- StatusWith<ClusterQueryResult> next() final;
+ StatusWith<ClusterQueryResult> next(OperationContext* txn) final;
- void kill() final;
+ void kill(OperationContext* txn) final;
bool isTailable() const final;
@@ -106,8 +108,6 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
- void setOperationContext(OperationContext* txn) final;
-
private:
/**
* Constructs a cluster client cursor.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 0b87488e651..ff3a666551d 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -40,6 +40,10 @@ namespace mongo {
namespace {
+// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*,
+// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use
+// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests.
+
TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
auto mockStage = stdx::make_unique<RouterStageMock>();
for (int i = 1; i < 10; ++i) {
@@ -51,13 +55,13 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 0);
for (int i = 1; i < 10; ++i) {
- auto result = cursor.next();
+ auto result = cursor.next(nullptr);
ASSERT(result.isOK());
ASSERT_BSONOBJ_EQ(*result.getValue().getResult(), BSON("a" << i));
ASSERT_EQ(cursor.getNumReturnedSoFar(), i);
}
// Now check that if nothing is fetched the getNumReturnedSoFar stays the same.
- auto result = cursor.next();
+ auto result = cursor.next(nullptr);
ASSERT_OK(result.getStatus());
ASSERT_TRUE(result.getValue().isEOF());
ASSERT_EQ(cursor.getNumReturnedSoFar(), 9LL);
@@ -70,7 +74,7 @@ TEST(ClusterClientCursorImpl, QueueResult) {
ClusterClientCursorImpl cursor(std::move(mockStage));
- auto firstResult = cursor.next();
+ auto firstResult = cursor.next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
@@ -78,22 +82,22 @@ TEST(ClusterClientCursorImpl, QueueResult) {
cursor.queueResult(BSON("a" << 2));
cursor.queueResult(BSON("a" << 3));
- auto secondResult = cursor.next();
+ auto secondResult = cursor.next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
- auto thirdResult = cursor.next();
+ auto thirdResult = cursor.next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3));
- auto fourthResult = cursor.next();
+ auto fourthResult = cursor.next(nullptr);
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSON("a" << 4));
- auto fifthResult = cursor.next();
+ auto fifthResult = cursor.next(nullptr);
ASSERT_OK(fifthResult.getStatus());
ASSERT(fifthResult.getValue().isEOF());
@@ -114,7 +118,7 @@ TEST(ClusterClientCursorImpl, CursorPropagatesViewDefinition) {
ClusterClientCursorImpl cursor(std::move(mockStage));
- auto result = cursor.next();
+ auto result = cursor.next(nullptr);
ASSERT_OK(result.getStatus());
ASSERT(!result.getValue().getResult());
ASSERT(result.getValue().getViewDefinition());
@@ -130,19 +134,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) {
ClusterClientCursorImpl cursor(std::move(mockStage));
ASSERT_TRUE(cursor.remotesExhausted());
- auto firstResult = cursor.next();
+ auto firstResult = cursor.next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
ASSERT_TRUE(cursor.remotesExhausted());
- auto secondResult = cursor.next();
+ auto secondResult = cursor.next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(cursor.remotesExhausted());
- auto thirdResult = cursor.next();
+ auto thirdResult = cursor.next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT_TRUE(thirdResult.getValue().isEOF());
ASSERT_TRUE(cursor.remotesExhausted());
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index bd4136ab7f2..82ada207939 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -43,7 +43,7 @@ ClusterClientCursorMock::~ClusterClientCursorMock() {
invariant(_exhausted || _killed);
}
-StatusWith<ClusterQueryResult> ClusterClientCursorMock::next() {
+StatusWith<ClusterQueryResult> ClusterClientCursorMock::next(OperationContext* txn) {
invariant(!_killed);
if (_resultsQueue.empty()) {
@@ -66,7 +66,7 @@ long long ClusterClientCursorMock::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
-void ClusterClientCursorMock::kill() {
+void ClusterClientCursorMock::kill(OperationContext* txn) {
_killed = true;
if (_killCallback) {
_killCallback();
@@ -97,9 +97,4 @@ Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeou
MONGO_UNREACHABLE;
}
-
-void ClusterClientCursorMock::setOperationContext(OperationContext* txn) {
- // Do nothing
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index d9e0ba789e3..8820069b0e3 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -43,9 +43,9 @@ public:
~ClusterClientCursorMock();
- StatusWith<ClusterQueryResult> next() final;
+ StatusWith<ClusterQueryResult> next(OperationContext* txn) final;
- void kill() final;
+ void kill(OperationContext* txn) final;
bool isTailable() const final;
@@ -55,8 +55,6 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
- void setOperationContext(OperationContext* txn) final;
-
/**
* Returns true unless marked as having non-exhausted remote cursors via
* markRemotesNotExhausted().
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index fce3bcf12cb..5e21b25ea8b 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -137,10 +137,6 @@ struct ClusterClientCursorParams {
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
-
- // OperationContext of the calling thread. Used to append Client dependent metadata to remote
- // requests.
- OperationContext* txn;
};
} // mongo
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 48d233239d9..2b4e68ac2cf 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -110,9 +110,9 @@ ClusterCursorManager::PinnedCursor& ClusterCursorManager::PinnedCursor::operator
return *this;
}
-StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next() {
+StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next(OperationContext* txn) {
invariant(_cursor);
- return _cursor->next();
+ return _cursor->next(txn);
}
bool ClusterCursorManager::PinnedCursor::isTailable() const {
@@ -152,11 +152,6 @@ Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awai
return _cursor->setAwaitDataTimeout(awaitDataTimeout);
}
-void ClusterCursorManager::PinnedCursor::setOperationContext(OperationContext* txn) {
- return _cursor->setOperationContext(txn);
-}
-
-
void ClusterCursorManager::PinnedCursor::returnAndKillCursor() {
invariant(_cursor);
@@ -192,6 +187,7 @@ void ClusterCursorManager::shutdown() {
}
StatusWith<CursorId> ClusterCursorManager::registerCursor(
+ OperationContext* txn,
std::unique_ptr<ClusterClientCursor> cursor,
const NamespaceString& nss,
CursorType cursorType,
@@ -203,7 +199,7 @@ StatusWith<CursorId> ClusterCursorManager::registerCursor(
if (_inShutdown) {
lk.unlock();
- cursor->kill();
+ cursor->kill(txn);
return Status(ErrorCodes::ShutdownInProgress,
"Cannot register new cursors as we are in the process of shutting down");
}
@@ -274,7 +270,6 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur
}
entry->setLastActive(now);
- cursor->setOperationContext(txn);
// Note that pinning a cursor transfers ownership of the underlying ClusterClientCursor object
// to the pin; the CursorEntry is left with a null ClusterClientCursor.
@@ -289,9 +284,6 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
invariant(cursor);
- // Reset OperationContext so that non-user initiated operations do not try to use an invalid
- // operation context
- cursor->setOperationContext(nullptr);
const bool remotesExhausted = cursor->remotesExhausted();
CursorEntry* entry = getEntry_inlock(nss, cursorId);
@@ -400,8 +392,11 @@ std::size_t ClusterCursorManager::reapZombieCursors() {
}
lk.unlock();
- zombieCursor.getValue()->setOperationContext(nullptr);
- zombieCursor.getValue()->kill();
+ // Pass a null OperationContext, because this call should not actually schedule any remote
+ // work: the cursor is already pending kill, meaning the killCursors commands are already
+ // being scheduled to be sent to the remote shard hosts. This method will just wait for them
+ // all to be scheduled.
+ zombieCursor.getValue()->kill(nullptr);
zombieCursor.getValue().reset();
lk.lock();
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index ab7a874e69c..6126ef0757e 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -154,7 +154,7 @@ public:
*
* Can block.
*/
- StatusWith<ClusterQueryResult> next();
+ StatusWith<ClusterQueryResult> next(OperationContext* txn);
/**
* Returns whether or not the underlying cursor is tailing a capped collection. Cannot be
@@ -202,14 +202,6 @@ public:
*/
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
-
- /**
- * Update the operation context for remote requests.
- *
- * Network requests depend on having a valid operation context for user initiated actions.
- */
- void setOperationContext(OperationContext* txn);
-
private:
// ClusterCursorManager is a friend so that its methods can call the PinnedCursor
// constructor declared below, which is private to prevent clients from calling it directly.
@@ -269,7 +261,8 @@ public:
*
* Does not block.
*/
- StatusWith<CursorId> registerCursor(std::unique_ptr<ClusterClientCursor> cursor,
+ StatusWith<CursorId> registerCursor(OperationContext* txn,
+ std::unique_ptr<ClusterClientCursor> cursor,
const NamespaceString& nss,
CursorType cursorType,
CursorLifetime cursorLifetime);
diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp
index b12b332375b..1ff8bace8c5 100644
--- a/src/mongo/s/query/cluster_cursor_manager_test.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp
@@ -109,17 +109,18 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << 1));
auto cursorId = assertGet(
- getManager()->registerCursor(std::move(cursor),
+ getManager()->registerCursor(nullptr,
+ std::move(cursor),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- auto nextResult = pinnedCursor.getValue().next();
+ auto nextResult = pinnedCursor.getValue().next(nullptr);
ASSERT_OK(nextResult.getStatus());
ASSERT(nextResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult());
- nextResult = pinnedCursor.getValue().next();
+ nextResult = pinnedCursor.getValue().next(nullptr);
ASSERT_OK(nextResult.getStatus());
ASSERT_TRUE(nextResult.getValue().isEOF());
}
@@ -127,7 +128,8 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) {
// Test that registering a cursor returns a non-zero cursor id.
TEST_F(ClusterCursorManagerTest, RegisterCursorReturnsNonZeroId) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -139,18 +141,19 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << 1));
auto cursorId = assertGet(
- getManager()->registerCursor(std::move(cursor),
+ getManager()->registerCursor(nullptr,
+ std::move(cursor),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId());
- auto nextResult = checkedOutCursor.getValue().next();
+ auto nextResult = checkedOutCursor.getValue().next(nullptr);
ASSERT_OK(nextResult.getStatus());
ASSERT(nextResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult());
- nextResult = checkedOutCursor.getValue().next();
+ nextResult = checkedOutCursor.getValue().next(nullptr);
ASSERT_OK(nextResult.getStatus());
ASSERT_TRUE(nextResult.getValue().isEOF());
}
@@ -164,7 +167,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << i));
cursorIds[i] = assertGet(
- getManager()->registerCursor(std::move(cursor),
+ getManager()->registerCursor(nullptr,
+ std::move(cursor),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -172,11 +176,11 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
for (int i = 0; i < numCursors; ++i) {
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- auto nextResult = pinnedCursor.getValue().next();
+ auto nextResult = pinnedCursor.getValue().next(nullptr);
ASSERT_OK(nextResult.getStatus());
ASSERT(nextResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(BSON("a" << i), *nextResult.getValue().getResult());
- nextResult = pinnedCursor.getValue().next();
+ nextResult = pinnedCursor.getValue().next(nullptr);
ASSERT_OK(nextResult.getStatus());
ASSERT_TRUE(nextResult.getValue().isEOF());
}
@@ -185,7 +189,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
// Test that checking out a pinned cursor returns an error with code ErrorCodes::CursorInUse.
TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -198,7 +203,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) {
// Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound.
TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -219,7 +225,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) {
const NamespaceString correctNamespace("test.correct");
const NamespaceString incorrectNamespace("test.incorrect");
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
correctNamespace,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -231,7 +238,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) {
// even if there is an existing cursor with the same namespace but a different cursor id.
TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -243,7 +251,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
// current time.
TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -261,7 +270,8 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) {
// Test that killing a pinned cursor by id successfully kills the cursor.
TEST_F(ClusterCursorManagerTest, KillCursorBasic) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -282,7 +292,8 @@ TEST_F(ClusterCursorManagerTest, KillCursorMultipleCursors) {
// Register cursors and populate 'cursorIds' with the returned cursor ids.
for (size_t i = 0; i < numCursors; ++i) {
cursorIds[i] = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -308,7 +319,8 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) {
const NamespaceString correctNamespace("test.correct");
const NamespaceString incorrectNamespace("test.incorrect");
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
correctNamespace,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -320,7 +332,8 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) {
// even if there is an existing cursor with the same namespace but a different cursor id.
TEST_F(ClusterCursorManagerTest, KillCursorWrongCursorId) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -330,7 +343,8 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongCursorId) {
// Test that killing all mortal expired cursors correctly kills a mortal expired cursor.
TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) {
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -344,7 +358,8 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) {
TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipUnexpired) {
Date_t timeBeforeCursorCreation = getClockSource()->now();
getClockSource()->advance(Milliseconds(1));
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -356,7 +371,8 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipUnexpired) {
// Test that killing all mortal expired cursors does not kill a cursor that is immortal.
TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipImmortal) {
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Immortal);
@@ -376,7 +392,8 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors)
if (i < numKilledCursorsExpected) {
cutoff = getClockSource()->now();
}
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -400,7 +417,8 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors)
TEST_F(ClusterCursorManagerTest, KillAllCursors) {
const size_t numCursors = 10;
for (size_t i = 0; i < numCursors; ++i) {
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -419,7 +437,8 @@ TEST_F(ClusterCursorManagerTest, KillAllCursors) {
// cursor.
TEST_F(ClusterCursorManagerTest, ReapZombieCursorsBasic) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -433,7 +452,8 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsBasic) {
// that is still pinned.
TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -446,7 +466,8 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) {
// Test that reaping does not call kill() on the underlying ClusterClientCursor for cursors that
// haven't been killed.
TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipNonZombies) {
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -464,7 +485,8 @@ TEST_F(ClusterCursorManagerTest, StatsInitAsZero) {
// Test that registering a sharded cursor updates the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) {
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -473,7 +495,8 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) {
// Test that registering a not-sharded cursor updates the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) {
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -483,7 +506,8 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) {
// Test that checking out a cursor updates the pinned counter in stats().
TEST_F(ClusterCursorManagerTest, StatsPinCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(allocateMockCursor(),
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -496,7 +520,8 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) {
TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) {
const size_t numShardedCursors = 10;
for (size_t i = 0; i < numShardedCursors; ++i) {
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -505,7 +530,8 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) {
}
const size_t numNotShardedCursors = 10;
for (size_t i = 0; i < numNotShardedCursors; ++i) {
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal);
@@ -517,7 +543,8 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) {
// Test that killing a sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsKillShardedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(allocateMockCursor(),
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -529,7 +556,8 @@ TEST_F(ClusterCursorManagerTest, StatsKillShardedCursor) {
// Test that killing a not-sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsKillNotShardedCursor) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -541,7 +569,8 @@ TEST_F(ClusterCursorManagerTest, StatsKillNotShardedCursor) {
// Test that killing a pinned cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(allocateMockCursor(),
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -554,13 +583,14 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) {
// Test that exhausting a sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) {
auto cursorId =
- assertGet(getManager()->registerCursor(allocateMockCursor(),
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(pinnedCursor.getValue().next().getStatus());
+ ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsSharded);
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
ASSERT_EQ(0U, getManager()->stats().cursorsSharded);
@@ -569,13 +599,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) {
// Test that exhausting a not-sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(pinnedCursor.getValue().next().getStatus());
+ ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded);
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded);
@@ -585,13 +616,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) {
// stats().
TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(pinnedCursor.getValue().next().getStatus());
+ ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
@@ -601,13 +633,14 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) {
// stats().
TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(pinnedCursor.getValue().next().getStatus());
+ ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
@@ -616,7 +649,8 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) {
// Test that getting the namespace for a cursor returns the correct namespace.
TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdBasic) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -633,7 +667,8 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsSameNames
std::vector<CursorId> cursorIds(numCursors);
for (size_t i = 0; i < numCursors; ++i) {
cursorIds[i] = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -654,7 +689,8 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsDifferent
for (size_t i = 0; i < numCursors; ++i) {
NamespaceString cursorNamespace(std::string(str::stream() << "test.collection" << i));
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
cursorNamespace,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -684,7 +720,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDefaultConstructor) {
// cursor.
TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -702,7 +739,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) {
// cursor, and leaves the pin owning no cursor.
TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -710,7 +748,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
- ASSERT_OK(registeredCursor.getValue().next().getStatus());
+ ASSERT_OK(registeredCursor.getValue().next(nullptr).getStatus());
registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
@@ -733,7 +771,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
mockCursor->markRemotesNotExhausted();
auto cursorId = assertGet(
- getManager()->registerCursor(std::move(mockCursor),
+ getManager()->registerCursor(nullptr,
+ std::move(mockCursor),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -741,7 +780,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
- ASSERT_OK(registeredCursor.getValue().next().getStatus());
+ ASSERT_OK(registeredCursor.getValue().next(nullptr).getStatus());
registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
@@ -756,7 +795,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
// been returned.
TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -771,7 +811,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) {
TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) {
{
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -789,7 +830,8 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) {
ASSERT_FALSE(mockCursor->remotesExhausted());
auto cursorId = assertGet(
- getManager()->registerCursor(std::move(mockCursor),
+ getManager()->registerCursor(nullptr,
+ std::move(mockCursor),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -801,7 +843,8 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) {
// Test that killed cursors which are still pinned are not reaped.
TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -822,7 +865,8 @@ TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) {
}
TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) {
- ASSERT_OK(getManager()->registerCursor(allocateMockCursor(),
+ ASSERT_OK(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -834,7 +878,8 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) {
ASSERT_EQUALS(
ErrorCodes::ShutdownInProgress,
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
@@ -842,7 +887,8 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) {
TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) {
auto cursorId = assertGet(
- getManager()->registerCursor(allocateMockCursor(),
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index c1d32e5c6be..b8f0a2b6eba 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -187,7 +187,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
params.isTailable = query.getQueryRequest().isTailable();
params.isAwaitData = query.getQueryRequest().isAwaitData();
params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults();
- params.txn = txn;
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
// usually use the batchSize associated with the initial find, but as it is illegal to send a
@@ -232,12 +231,12 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
}
auto ccc = ClusterClientCursorImpl::make(
- Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
+ txn, Grid::get(txn)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
int bytesBuffered = 0;
while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) {
- auto next = ccc->next();
+ auto next = ccc->next(txn);
if (!next.isOK()) {
return next.getStatus();
}
@@ -294,7 +293,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
? ClusterCursorManager::CursorLifetime::Immortal
: ClusterCursorManager::CursorLifetime::Mortal;
return cursorManager->registerCursor(
- ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime);
+ txn, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime);
}
} // namespace
@@ -389,7 +388,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn,
long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar();
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
while (!FindCommon::enoughForGetMore(batchSize, batch.size())) {
- auto next = pinnedCursor.getValue().next();
+ auto next = pinnedCursor.getValue().next(txn);
if (!next.isOK()) {
return next.getStatus();
}
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 726bd2df97b..5fcb6053e58 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -66,13 +66,13 @@ public:
* holding on to a subset of the returned results and need to minimize memory usage, call copy()
* on the BSONObjs.
*/
- virtual StatusWith<ClusterQueryResult> next() = 0;
+ virtual StatusWith<ClusterQueryResult> next(OperationContext* txn) = 0;
/**
* Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for
* responses from remote hosts.
*/
- virtual void kill() = 0;
+ virtual void kill(OperationContext* txn) = 0;
/**
* Returns whether or not all the remote cursors are exhausted.
@@ -88,13 +88,6 @@ public:
*/
virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
- /**
- * Update the operation context for remote requests.
- *
- * Network requests depend on having a valid operation context for user initiated actions.
- */
- virtual void setOperationContext(OperationContext* txn) = 0;
-
protected:
/**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index 4756423249c..4a1a428a533 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -39,12 +39,12 @@ RouterStageLimit::RouterStageLimit(std::unique_ptr<RouterExecStage> child, long
invariant(limit > 0);
}
-StatusWith<ClusterQueryResult> RouterStageLimit::next() {
+StatusWith<ClusterQueryResult> RouterStageLimit::next(OperationContext* txn) {
if (_returnedSoFar >= _limit) {
return {ClusterQueryResult()};
}
- auto childResult = getChildStage()->next();
+ auto childResult = getChildStage()->next(txn);
if (!childResult.isOK()) {
return childResult;
}
@@ -55,8 +55,8 @@ StatusWith<ClusterQueryResult> RouterStageLimit::next() {
return childResult;
}
-void RouterStageLimit::kill() {
- getChildStage()->kill();
+void RouterStageLimit::kill(OperationContext* txn) {
+ getChildStage()->kill(txn);
}
bool RouterStageLimit::remotesExhausted() {
@@ -67,8 +67,4 @@ Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
-void RouterStageLimit::setOperationContext(OperationContext* txn) {
- return getChildStage()->setOperationContext(txn);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 8b29b56f291..29fb85dd458 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -39,16 +39,14 @@ class RouterStageLimit final : public RouterExecStage {
public:
RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit);
- StatusWith<ClusterQueryResult> next() final;
+ StatusWith<ClusterQueryResult> next(OperationContext* txn) final;
- void kill() final;
+ void kill(OperationContext* txn) final;
bool remotesExhausted() final;
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
- void setOperationContext(OperationContext* txn) final;
-
private:
long long _limit;
diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp
index 11c245f67ec..b0b70a90cc8 100644
--- a/src/mongo/s/query/router_stage_limit_test.cpp
+++ b/src/mongo/s/query/router_stage_limit_test.cpp
@@ -40,6 +40,10 @@ namespace mongo {
namespace {
+// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*,
+// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use
+// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests.
+
TEST(RouterStageLimitTest, LimitIsOne) {
auto mockStage = stdx::make_unique<RouterStageMock>();
mockStage->queueResult({BSON("a" << 1)});
@@ -48,17 +52,17 @@ TEST(RouterStageLimitTest, LimitIsOne) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 1);
- auto firstResult = limitStage->next();
+ auto firstResult = limitStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
- auto secondResult = limitStage->next();
+ auto secondResult = limitStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(!secondResult.getValue().getResult());
// Once end-of-stream is reached, the limit stage should keep returning no results.
- auto thirdResult = limitStage->next();
+ auto thirdResult = limitStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(!thirdResult.getValue().getResult());
}
@@ -71,17 +75,17 @@ TEST(RouterStageLimitTest, LimitIsTwo) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2);
- auto firstResult = limitStage->next();
+ auto firstResult = limitStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
- auto secondResult = limitStage->next();
+ auto secondResult = limitStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
- auto thirdResult = limitStage->next();
+ auto thirdResult = limitStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(!thirdResult.getValue().getResult());
}
@@ -95,12 +99,12 @@ TEST(RouterStageLimitTest, LimitStagePropagatesError) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 3);
- auto firstResult = limitStage->next();
+ auto firstResult = limitStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
- auto secondResult = limitStage->next();
+ auto secondResult = limitStage->next(nullptr);
ASSERT_NOT_OK(secondResult.getStatus());
ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue);
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
@@ -120,7 +124,7 @@ TEST(RouterStageLimitTest, LimitStagePropagatesViewDefinition) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 3);
- auto result = limitStage->next();
+ auto result = limitStage->next(nullptr);
ASSERT_OK(result.getStatus());
ASSERT(!result.getValue().getResult());
ASSERT(result.getValue().getViewDefinition());
@@ -139,21 +143,21 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2);
- auto firstResult = limitStage->next();
+ auto firstResult = limitStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
- auto secondResult = limitStage->next();
+ auto secondResult = limitStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().isEOF());
- auto thirdResult = limitStage->next();
+ auto thirdResult = limitStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2));
- auto fourthResult = limitStage->next();
+ auto fourthResult = limitStage->next(nullptr);
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().isEOF());
}
@@ -167,19 +171,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100);
ASSERT_TRUE(limitStage->remotesExhausted());
- auto firstResult = limitStage->next();
+ auto firstResult = limitStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
ASSERT_TRUE(limitStage->remotesExhausted());
- auto secondResult = limitStage->next();
+ auto secondResult = limitStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(limitStage->remotesExhausted());
- auto thirdResult = limitStage->next();
+ auto thirdResult = limitStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(limitStage->remotesExhausted());
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index a3976ffb40d..f5ecc75728a 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -40,9 +40,9 @@ RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor,
ClusterClientCursorParams&& params)
: _executor(executor), _arm(executor, std::move(params)) {}
-StatusWith<ClusterQueryResult> RouterStageMerge::next() {
+StatusWith<ClusterQueryResult> RouterStageMerge::next(OperationContext* txn) {
while (!_arm.ready()) {
- auto nextEventStatus = _arm.nextEvent();
+ auto nextEventStatus = _arm.nextEvent(txn);
if (!nextEventStatus.isOK()) {
return nextEventStatus.getStatus();
}
@@ -55,8 +55,8 @@ StatusWith<ClusterQueryResult> RouterStageMerge::next() {
return _arm.nextReady();
}
-void RouterStageMerge::kill() {
- auto killEvent = _arm.kill();
+void RouterStageMerge::kill(OperationContext* txn) {
+ auto killEvent = _arm.kill(txn);
if (!killEvent) {
// Mongos is shutting down.
return;
@@ -72,8 +72,4 @@ Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
-void RouterStageMerge::setOperationContext(OperationContext* txn) {
- return _arm.setOperationContext(txn);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 9f2c2e9e0c4..4c69925e0d3 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -45,16 +45,14 @@ class RouterStageMerge final : public RouterExecStage {
public:
RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams&& params);
- StatusWith<ClusterQueryResult> next() final;
+ StatusWith<ClusterQueryResult> next(OperationContext* txn) final;
- void kill() final;
+ void kill(OperationContext* txn) final;
bool remotesExhausted() final;
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
- void setOperationContext(OperationContext* txn) final;
-
private:
// Not owned here.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index b2aa83e3ed6..c348018fe6f 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -50,7 +50,7 @@ void RouterStageMock::markRemotesExhausted() {
_remotesExhausted = true;
}
-StatusWith<ClusterQueryResult> RouterStageMock::next() {
+StatusWith<ClusterQueryResult> RouterStageMock::next(OperationContext* txn) {
if (_resultsQueue.empty()) {
return {ClusterQueryResult()};
}
@@ -60,7 +60,7 @@ StatusWith<ClusterQueryResult> RouterStageMock::next() {
return out;
}
-void RouterStageMock::kill() {
+void RouterStageMock::kill(OperationContext* txn) {
// No child to kill.
}
@@ -73,10 +73,6 @@ Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return Status::OK();
}
-void RouterStageMock::setOperationContext(OperationContext* txn) {
- // Do nothing
-}
-
StatusWith<Milliseconds> RouterStageMock::getAwaitDataTimeout() {
if (!_awaitDataTimeout) {
return Status(ErrorCodes::BadValue, "no awaitData timeout set");
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index ef093b04fe4..dce077d8122 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -44,16 +44,14 @@ class RouterStageMock final : public RouterExecStage {
public:
~RouterStageMock() final {}
- StatusWith<ClusterQueryResult> next() final;
+ StatusWith<ClusterQueryResult> next(OperationContext* txn) final;
- void kill() final;
+ void kill(OperationContext* txn) final;
bool remotesExhausted() final;
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
- void setOperationContext(OperationContext* txn) final;
-
/**
* Queues a BSONObj to be returned.
*/
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index 77d3e26afd0..9c58e489b13 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -41,8 +41,8 @@ namespace mongo {
RouterStageRemoveSortKey::RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child)
: RouterExecStage(std::move(child)) {}
-StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() {
- auto childResult = getChildStage()->next();
+StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next(OperationContext* txn) {
+ auto childResult = getChildStage()->next(txn);
if (!childResult.isOK() || !childResult.getValue().getResult()) {
return childResult;
}
@@ -59,8 +59,8 @@ StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() {
return {builder.obj()};
}
-void RouterStageRemoveSortKey::kill() {
- getChildStage()->kill();
+void RouterStageRemoveSortKey::kill(OperationContext* txn) {
+ getChildStage()->kill(txn);
}
bool RouterStageRemoveSortKey::remotesExhausted() {
@@ -71,8 +71,4 @@ Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeo
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
-void RouterStageRemoveSortKey::setOperationContext(OperationContext* txn) {
- return getChildStage()->setOperationContext(txn);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index 79294aeb20a..291cf01a803 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -41,15 +41,13 @@ class RouterStageRemoveSortKey final : public RouterExecStage {
public:
RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child);
- StatusWith<ClusterQueryResult> next() final;
+ StatusWith<ClusterQueryResult> next(OperationContext* txn) final;
- void kill() final;
+ void kill(OperationContext* txn) final;
bool remotesExhausted() final;
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
- void setOperationContext(OperationContext* txn) final;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
index d8e1f51605d..e9f338b9e5f 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
@@ -40,6 +40,10 @@ namespace mongo {
namespace {
+// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*,
+// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use
+// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests.
+
TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
auto mockStage = stdx::make_unique<RouterStageMock>();
mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3));
@@ -50,29 +54,29 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
- auto firstResult = sortKeyStage->next();
+ auto firstResult = sortKeyStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4 << "b" << 3));
- auto secondResult = sortKeyStage->next();
+ auto secondResult = sortKeyStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(),
BSON("c" << BSON("d"
<< "foo")));
- auto thirdResult = sortKeyStage->next();
+ auto thirdResult = sortKeyStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3));
- auto fourthResult = sortKeyStage->next();
+ auto fourthResult = sortKeyStage->next(nullptr);
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSONObj());
- auto fifthResult = sortKeyStage->next();
+ auto fifthResult = sortKeyStage->next(nullptr);
ASSERT_OK(fifthResult.getStatus());
ASSERT(fifthResult.getValue().isEOF());
}
@@ -84,12 +88,12 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
- auto firstResult = sortKeyStage->next();
+ auto firstResult = sortKeyStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSONObj());
- auto secondResult = sortKeyStage->next();
+ auto secondResult = sortKeyStage->next(nullptr);
ASSERT_NOT_OK(secondResult.getStatus());
ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue);
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
@@ -103,21 +107,21 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
- auto firstResult = sortKeyStage->next();
+ auto firstResult = sortKeyStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1));
- auto secondResult = sortKeyStage->next();
+ auto secondResult = sortKeyStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().isEOF());
- auto thirdResult = sortKeyStage->next();
+ auto thirdResult = sortKeyStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2 << "b" << 2));
- auto fourthResult = sortKeyStage->next();
+ auto fourthResult = sortKeyStage->next(nullptr);
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().isEOF());
}
@@ -131,19 +135,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
- auto firstResult = sortKeyStage->next();
+ auto firstResult = sortKeyStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
- auto secondResult = sortKeyStage->next();
+ auto secondResult = sortKeyStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2 << "b" << 2));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
- auto thirdResult = sortKeyStage->next();
+ auto thirdResult = sortKeyStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(sortKeyStage->remotesExhausted());
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index f9a63e515ff..e961fe60f13 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -39,9 +39,9 @@ RouterStageSkip::RouterStageSkip(std::unique_ptr<RouterExecStage> child, long lo
invariant(skip > 0);
}
-StatusWith<ClusterQueryResult> RouterStageSkip::next() {
+StatusWith<ClusterQueryResult> RouterStageSkip::next(OperationContext* txn) {
while (_skippedSoFar < _skip) {
- auto next = getChildStage()->next();
+ auto next = getChildStage()->next(txn);
if (!next.isOK()) {
return next;
}
@@ -57,11 +57,11 @@ StatusWith<ClusterQueryResult> RouterStageSkip::next() {
++_skippedSoFar;
}
- return getChildStage()->next();
+ return getChildStage()->next(txn);
}
-void RouterStageSkip::kill() {
- getChildStage()->kill();
+void RouterStageSkip::kill(OperationContext* txn) {
+ getChildStage()->kill(txn);
}
bool RouterStageSkip::remotesExhausted() {
@@ -72,8 +72,4 @@ Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
-void RouterStageSkip::setOperationContext(OperationContext* txn) {
- return getChildStage()->setOperationContext(txn);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index fda4201f9cb..c949271f79e 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -39,16 +39,14 @@ class RouterStageSkip final : public RouterExecStage {
public:
RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip);
- StatusWith<ClusterQueryResult> next() final;
+ StatusWith<ClusterQueryResult> next(OperationContext* txn) final;
- void kill() final;
+ void kill(OperationContext* txn) final;
bool remotesExhausted() final;
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
- void setOperationContext(OperationContext* txn) final;
-
private:
long long _skip;
diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp
index 242a032375a..56182fb18a5 100644
--- a/src/mongo/s/query/router_stage_skip_test.cpp
+++ b/src/mongo/s/query/router_stage_skip_test.cpp
@@ -40,6 +40,10 @@ namespace mongo {
namespace {
+// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*,
+// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use
+// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests.
+
TEST(RouterStageSkipTest, SkipIsOne) {
auto mockStage = stdx::make_unique<RouterStageMock>();
mockStage->queueResult(BSON("a" << 1));
@@ -48,22 +52,22 @@ TEST(RouterStageSkipTest, SkipIsOne) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1);
- auto firstResult = skipStage->next();
+ auto firstResult = skipStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 2));
- auto secondResult = skipStage->next();
+ auto secondResult = skipStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
// Once end-of-stream is reached, the skip stage should keep returning boost::none.
- auto thirdResult = skipStage->next();
+ auto thirdResult = skipStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
- auto fourthResult = skipStage->next();
+ auto fourthResult = skipStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
}
@@ -77,12 +81,12 @@ TEST(RouterStageSkipTest, SkipIsThree) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
- auto firstResult = skipStage->next();
+ auto firstResult = skipStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4));
- auto secondResult = skipStage->next();
+ auto secondResult = skipStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().isEOF());
}
@@ -95,7 +99,7 @@ TEST(RouterStageSkipTest, SkipEqualToResultSetSize) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
- auto firstResult = skipStage->next();
+ auto firstResult = skipStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().isEOF());
}
@@ -108,7 +112,7 @@ TEST(RouterStageSkipTest, SkipExceedsResultSetSize) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 100);
- auto firstResult = skipStage->next();
+ auto firstResult = skipStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().isEOF());
}
@@ -122,7 +126,7 @@ TEST(RouterStageSkipTest, ErrorWhileSkippingResults) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
- auto firstResult = skipStage->next();
+ auto firstResult = skipStage->next(nullptr);
ASSERT_NOT_OK(firstResult.getStatus());
ASSERT_EQ(firstResult.getStatus(), ErrorCodes::BadValue);
ASSERT_EQ(firstResult.getStatus().reason(), "bad thing happened");
@@ -137,12 +141,12 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
- auto firstResult = skipStage->next();
+ auto firstResult = skipStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 3));
- auto secondResult = skipStage->next();
+ auto secondResult = skipStage->next(nullptr);
ASSERT_NOT_OK(secondResult.getStatus());
ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue);
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
@@ -160,7 +164,7 @@ TEST(RouterStageSkipTest, SkipStagePropagatesViewDefinition) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
- auto result = skipStage->next();
+ auto result = skipStage->next(nullptr);
ASSERT_OK(result.getStatus());
ASSERT(!result.getValue().getResult());
ASSERT(result.getValue().getViewDefinition());
@@ -181,16 +185,16 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
- auto firstResult = skipStage->next();
+ auto firstResult = skipStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().isEOF());
- auto secondResult = skipStage->next();
+ auto secondResult = skipStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
- auto thirdResult = skipStage->next();
+ auto thirdResult = skipStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
}
@@ -205,19 +209,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1);
ASSERT_TRUE(skipStage->remotesExhausted());
- auto firstResult = skipStage->next();
+ auto firstResult = skipStage->next(nullptr);
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(skipStage->remotesExhausted());
- auto secondResult = skipStage->next();
+ auto secondResult = skipStage->next(nullptr);
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
ASSERT_TRUE(skipStage->remotesExhausted());
- auto thirdResult = skipStage->next();
+ auto thirdResult = skipStage->next(nullptr);
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(skipStage->remotesExhausted());
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 13477f1e14e..1e3b7d03306 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -39,7 +39,8 @@
namespace mongo {
-StatusWith<BSONObj> storePossibleCursor(const HostAndPort& server,
+StatusWith<BSONObj> storePossibleCursor(OperationContext* txn,
+ const HostAndPort& server,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
@@ -61,10 +62,11 @@ StatusWith<BSONObj> storePossibleCursor(const HostAndPort& server,
params.remotes.emplace_back(server, incomingCursorResponse.getValue().getCursorId());
- auto ccc = ClusterClientCursorImpl::make(executor, std::move(params));
+ auto ccc = ClusterClientCursorImpl::make(txn, executor, std::move(params));
auto clusterCursorId =
- cursorManager->registerCursor(ccc.releaseCursor(),
+ cursorManager->registerCursor(txn,
+ ccc.releaseCursor(),
requestedNss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal);
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index e290556f7ea..f06c959b41c 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
namespace mongo {
@@ -56,7 +57,8 @@ class TaskExecutor;
* BSONObj response document describing the newly-created cursor, which is suitable for returning to
* the client.
*/
-StatusWith<BSONObj> storePossibleCursor(const HostAndPort& server,
+StatusWith<BSONObj> storePossibleCursor(OperationContext* txn,
+ const HostAndPort& server,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
diff --git a/src/mongo/s/query/store_possible_cursor_test.cpp b/src/mongo/s/query/store_possible_cursor_test.cpp
index 7e7b5d671f0..d542e1ee760 100644
--- a/src/mongo/s/query/store_possible_cursor_test.cpp
+++ b/src/mongo/s/query/store_possible_cursor_test.cpp
@@ -62,7 +62,8 @@ TEST_F(StorePossibleCursorTest, ReturnsValidCursorResponse) {
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
CursorResponse cursorResponse(nss, CursorId(0), batch);
auto outgoingCursorResponse =
- storePossibleCursor(hostAndPort,
+ storePossibleCursor(nullptr, // OperationContext*
+ hostAndPort,
cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse),
nss,
nullptr, // TaskExecutor
@@ -80,7 +81,8 @@ TEST_F(StorePossibleCursorTest, ReturnsValidCursorResponse) {
// Test that storePossibleCursor() propagates an error if it cannot parse the cursor response.
TEST_F(StorePossibleCursorTest, FailsGracefullyOnBadCursorResponseDocument) {
- auto outgoingCursorResponse = storePossibleCursor(hostAndPort,
+ auto outgoingCursorResponse = storePossibleCursor(nullptr, // OperationContext*
+ hostAndPort,
fromjson("{ok: 1, cursor: {}}"),
nss,
nullptr, // TaskExecutor
@@ -94,7 +96,8 @@ TEST_F(StorePossibleCursorTest, FailsGracefullyOnBadCursorResponseDocument) {
TEST_F(StorePossibleCursorTest, PassesUpCommandResultIfItDoesNotDescribeACursor) {
BSONObj notACursorObj = BSON("not"
<< "cursor");
- auto outgoingCursorResponse = storePossibleCursor(hostAndPort,
+ auto outgoingCursorResponse = storePossibleCursor(nullptr, // OperationContext*
+ hostAndPort,
notACursorObj,
nss,
nullptr, // TaskExecutor