summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Shteinfeld <ben.shteinfeld@mongodb.com>2023-05-10 20:44:04 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-10 21:26:03 +0000
commit680b65808f905ed796b4afe14c2990fc7d1d1646 (patch)
tree54146323c34c3f0d156dde5127c3e4556f672ea7
parentcf775ff4b185433ec54aba4363905e3b7d924718 (diff)
downloadmongo-680b65808f905ed796b4afe14c2990fc7d1d1646.tar.gz
SERVER-73959 Add option to TaskExecutorCursor to not prefetch next batch
-rw-r--r--src/mongo/executor/task_executor_cursor.cpp33
-rw-r--r--src/mongo/executor/task_executor_cursor.h19
-rw-r--r--src/mongo/executor/task_executor_cursor_test.cpp168
3 files changed, 205 insertions, 15 deletions
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp
index dcbe6e1a945..b44034489c2 100644
--- a/src/mongo/executor/task_executor_cursor.cpp
+++ b/src/mongo/executor/task_executor_cursor.cpp
@@ -254,16 +254,34 @@ void TaskExecutorCursor::_processResponse(OperationContext* opCtx, CursorRespons
_batch = response.releaseBatch();
_batchIter = _batch.begin();
- // If we got a cursor id back, pre-fetch the next batch
- if (_cursorId) {
- GetMoreCommandRequest getMoreRequest(_cursorId, _ns.coll().toString());
- getMoreRequest.setBatchSize(_options.batchSize);
- _runRemoteCommand(_createRequest(opCtx, getMoreRequest.toBSON({})));
+ // If the previous response contained a cursorId and pre-fetching is enabled, schedule the
+ // getMore.
+ if ((_cursorId != kClosedCursorId) && _options.preFetchNextBatch) {
+ _scheduleGetMore(opCtx);
}
}
+void TaskExecutorCursor::_scheduleGetMore(OperationContext* opCtx) {
+ // The previous response must have returned an open cursor ID.
+ invariant(_cursorId >= kMinLegalCursorId);
+ // There cannot be an existing in-flight request.
+ invariant(!_cmdState);
+ GetMoreCommandRequest getMoreRequest(_cursorId, _ns.coll().toString());
+ getMoreRequest.setBatchSize(_options.batchSize);
+ _runRemoteCommand(_createRequest(opCtx, getMoreRequest.toBSON({})));
+}
+
void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
- invariant(_cmdState, "_getNextBatch() requires an async request to have already been sent.");
+ // If we don't have an in-flight request, schedule one. This will occur when the
+ // 'preFetchNextBatch' option is false.
+ if (!_cmdState) {
+ invariant(!_options.preFetchNextBatch);
+ _scheduleGetMore(opCtx);
+ }
+
+ // There should be an in-flight request at this point, either sent asyncronously when we
+ // processed the previous response or just scheduled.
+ invariant(_cmdState);
invariant(_cursorId != kClosedCursorId);
auto clock = opCtx->getServiceContext()->getPreciseClockSource();
@@ -291,12 +309,15 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
tassert(6253100, "Expected at least one response for cursor", cursorResponses.size() > 0);
CursorResponse cr = uassertStatusOK(std::move(cursorResponses[0]));
_processResponse(opCtx, std::move(cr));
+
// If we have more responses, build them into cursors then hold them until a caller accesses
// them. Skip the first response, we used it to populate this cursor.
// Ensure we update the RCR we give to each 'child cursor' with the current opCtx.
auto freshRcr = _createRequest(opCtx, _rcr.cmdObj);
auto copyOptions = [&] {
TaskExecutorCursor::Options options;
+ // In the case that pinConnection is true, we need to ensure that additional cursors also
+ // pin their connection to the same socket as the original cursor.
options.pinConnection = _options.pinConnection;
return options;
};
diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h
index 458e010f976..1016f8dcfd0 100644
--- a/src/mongo/executor/task_executor_cursor.h
+++ b/src/mongo/executor/task_executor_cursor.h
@@ -54,8 +54,8 @@ namespace executor {
*
* The main differentiator for this type over DBClientCursor is the use of a task executor (which
* provides access to a different connection pool, as well as interruptibility) and the ability to
- * overlap getMores. This starts fetching the next batch as soon as one is exhausted (rather than
- * on a call to getNext()).
+ * overlap getMores. This starts fetching the next batch as soon as the previous one is received
+ * (rather than on a call to 'getNext()').
*/
class TaskExecutorCursor {
public:
@@ -70,6 +70,10 @@ public:
struct Options {
boost::optional<int64_t> batchSize;
bool pinConnection{gPinTaskExecCursorConns.load()};
+ // If true, we will fetch the next batch as soon as the current one is recieved.
+ // If false, we will fetch the next batch when the current batch is exhausted and
+ // 'getNext()' is invoked.
+ bool preFetchNextBatch{true};
Options() {}
};
@@ -177,8 +181,7 @@ private:
/**
* Helper for '_getNextBatch' that handles the reading of the 'CursorResponse' object and
- * storing of relevant values. This is also responsible for issuing a getMore request if it
- * is required to populate the next batch.
+ * storing of relevant values.
*/
void _processResponse(OperationContext* opCtx, CursorResponse&& response);
@@ -187,6 +190,14 @@ private:
*/
const RemoteCommandRequest& _createRequest(OperationContext* opCtx, const BSONObj& cmd);
+ /**
+ * Schedules a 'GetMore' request to run asyncronously.
+ * This function can only be invoked when:
+ * - There is no in-flight request ('_cmdState' is null).
+ * - We have an open '_cursorId'.
+ */
+ void _scheduleGetMore(OperationContext* opCtx);
+
std::shared_ptr<executor::TaskExecutor> _executor;
// If we are pinning connections, we need to keep a separate reference to the
// non-pinning, normal executor, so that we can shut down the pinned executor
diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp
index eaece63fd6c..784dedddf48 100644
--- a/src/mongo/executor/task_executor_cursor_test.cpp
+++ b/src/mongo/executor/task_executor_cursor_test.cpp
@@ -492,6 +492,56 @@ public:
th.join();
}
+ /**
+ * Test that if 'preFetchNextBatch' is false, the TaskExecutorCursor does not request GetMores
+ * until the current batch is exhausted and 'getNext()' is invoked.
+ */
+ void NoPrefetchGetMore() {
+ CursorId cursorId = 1;
+ RemoteCommandRequest rcr(HostAndPort("localhost"),
+ "test",
+ BSON("search"
+ << "foo"),
+ opCtx.get());
+
+ // Construction of the TaskExecutorCursor enqueues a request in the NetworkInterfaceMock.
+ TaskExecutorCursor tec = makeTec(rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 2;
+ opts.preFetchNextBatch = false;
+ return opts;
+ }());
+
+ // Mock the response for the first batch.
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId);
+
+ // Exhaust the first batch.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2);
+
+ // Assert that the TaskExecutorCursor has not requested a GetMore. This enforces that
+ // 'preFetchNextBatch' works as expected.
+ ASSERT_FALSE(hasReadyRequests());
+
+ // As soon as 'getNext()' is invoked, the TaskExecutorCursor will try to send a GetMore and
+ // that will block this thread in the NetworkInterfaceMock until there is a scheduled
+ // response. However, we cannot schedule the cursor response on the main thread before we
+ // call 'getNext()' as that will cause the NetworkInterfaceMock to block until there is
+ // request enqueued ('getNext()' is the function which will enqueue such as request).
+ // To avoid this deadlock, we start a new thread which will schedule a response on the
+ // NetworkInterfaceMock.
+ stdx::thread t(
+ [this, cursorId] { scheduleSuccessfulCursorResponse("nextBatch", 3, 4, 0); });
+ t.detach();
+
+ // Schedules the GetMore request and exhausts the cursor.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 3);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 4);
+
+ // Assert no GetMore is requested.
+ ASSERT_FALSE(hasReadyRequests());
+ }
+
ServiceContext::UniqueServiceContext serviceCtx = ServiceContext::make();
ServiceContext::UniqueClient client;
ServiceContext::UniqueOperationContext opCtx;
@@ -505,10 +555,10 @@ public:
launchExecutorThread();
}
- BSONObj scheduleSuccessfulCursorResponse(StringData fieldName,
- size_t start,
- size_t end,
- size_t cursorId) {
+ virtual BSONObj scheduleSuccessfulCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ size_t cursorId) {
NetworkInterfaceMock::InNetworkGuard ing(getNet());
@@ -653,10 +703,51 @@ public:
}
};
+class NoPrefetchTaskExecutorCursorTestFixture : public NonPinningTaskExecutorCursorTestFixture {
+public:
+ TaskExecutorCursor makeTec(RemoteCommandRequest rcr,
+ TaskExecutorCursor::Options&& options = {}) {
+ options.preFetchNextBatch = false;
+ return TaskExecutorCursor(getExecutorPtr(), rcr, std::move(options));
+ }
+
+ BSONObj scheduleSuccessfulCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ size_t cursorId) {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+ // Don't assert that the network has requests like we do in other classes. This is to enable
+ // the test in 'NoPrefetchGetMore'.
+ auto rcr =
+ ing->scheduleSuccessfulResponse(buildCursorResponse(fieldName, start, end, cursorId));
+ ing->runReadyNetworkOperations();
+ return rcr.cmdObj.getOwned();
+ }
+};
+
+class NoPrefetchPinnedTaskExecutorCursorTestFixture
+ : public PinnedConnTaskExecutorCursorTestFixture {
+public:
+ TaskExecutorCursor makeTec(RemoteCommandRequest rcr,
+ TaskExecutorCursor::Options&& options = {}) {
+ options.preFetchNextBatch = false;
+ options.pinConnection = true;
+ return TaskExecutorCursor(getExecutorPtr(), rcr, std::move(options));
+ }
+};
+
TEST_F(NonPinningTaskExecutorCursorTestFixture, SingleBatchWorks) {
SingleBatchWorksTest();
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture, SingleBatchWorks) {
+ SingleBatchWorksTest();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, SingleBatchWorks) {
+ SingleBatchWorksTest();
+}
+
TEST_F(PinnedConnTaskExecutorCursorTestFixture, SingleBatchWorks) {
SingleBatchWorksTest();
}
@@ -665,6 +756,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSuccee
MultipleCursorsSingleBatchSucceedsTest();
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) {
+ MultipleCursorsSingleBatchSucceedsTest();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) {
+ MultipleCursorsSingleBatchSucceedsTest();
+}
+
TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) {
MultipleCursorsSingleBatchSucceedsTest();
}
@@ -674,14 +773,33 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture,
ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest();
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture,
+ ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) {
+ ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture,
+ ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) {
+ ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest();
+}
+
TEST_F(PinnedConnTaskExecutorCursorTestFixture,
ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) {
ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest();
}
+
TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) {
MultipleCursorsGetMoreWorksTest();
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) {
+ MultipleCursorsGetMoreWorksTest();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) {
+ MultipleCursorsGetMoreWorksTest();
+}
+
TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) {
MultipleCursorsGetMoreWorksTest();
}
@@ -690,13 +808,21 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, FailureInFind) {
FailureInFindTest();
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture, FailureInFind) {
+ FailureInFindTest();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, FailureInFind) {
+ FailureInFindTest();
+}
+
TEST_F(PinnedConnTaskExecutorCursorTestFixture, FailureInFind) {
FailureInFindTest();
}
/**
* Ensure early termination of the cursor calls killCursor (if we know about the cursor id)
- * Only applicapble to the unpinned case - if the connection is pinned, and a getMore is
+ * Only applicable to the unpinned case - if the connection is pinned, and a getMore is
* in progress and/or fails, the most we can do is kill the connection. We can't re-use
* the connection to send killCursors.
*/
@@ -730,6 +856,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleBatchesWorks) {
MultipleBatchesWorksTest();
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture, MultipleBatchesWorks) {
+ MultipleBatchesWorksTest();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, MultipleBatchesWorks) {
+ MultipleBatchesWorksTest();
+}
+
TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleBatchesWorks) {
MultipleBatchesWorksTest();
}
@@ -738,6 +872,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, EmptyFirstBatch) {
EmptyFirstBatchTest();
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture, EmptyFirstBatch) {
+ EmptyFirstBatchTest();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, EmptyFirstBatch) {
+ EmptyFirstBatchTest();
+}
+
TEST_F(PinnedConnTaskExecutorCursorTestFixture, EmptyFirstBatch) {
EmptyFirstBatchTest();
}
@@ -746,6 +888,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, EmptyNonInitialBatch) {
EmptyNonInitialBatchTest();
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture, EmptyNonInitialBatch) {
+ EmptyNonInitialBatchTest();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, EmptyNonInitialBatch) {
+ EmptyNonInitialBatchTest();
+}
+
TEST_F(PinnedConnTaskExecutorCursorTestFixture, EmptyNonInitialBatch) {
EmptyNonInitialBatchTest();
}
@@ -797,6 +947,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, LsidIsPassed) {
ASSERT_FALSE(hasReadyRequests());
}
+TEST_F(NoPrefetchTaskExecutorCursorTestFixture, NoPrefetchGetMore) {
+ NoPrefetchGetMore();
+}
+
+TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, NoPrefetchWithPinning) {
+ NoPrefetchGetMore();
+}
+
} // namespace
} // namespace executor
} // namespace mongo