diff options
author | Ben Shteinfeld <ben.shteinfeld@mongodb.com> | 2023-05-10 20:44:04 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-10 21:26:03 +0000 |
commit | 680b65808f905ed796b4afe14c2990fc7d1d1646 (patch) | |
tree | 54146323c34c3f0d156dde5127c3e4556f672ea7 | |
parent | cf775ff4b185433ec54aba4363905e3b7d924718 (diff) | |
download | mongo-680b65808f905ed796b4afe14c2990fc7d1d1646.tar.gz |
SERVER-73959 Add option to TaskExecutorCursor to not prefetch next batch
-rw-r--r-- | src/mongo/executor/task_executor_cursor.cpp | 33 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor.h | 19 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor_test.cpp | 168 |
3 files changed, 205 insertions, 15 deletions
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp index dcbe6e1a945..b44034489c2 100644 --- a/src/mongo/executor/task_executor_cursor.cpp +++ b/src/mongo/executor/task_executor_cursor.cpp @@ -254,16 +254,34 @@ void TaskExecutorCursor::_processResponse(OperationContext* opCtx, CursorRespons _batch = response.releaseBatch(); _batchIter = _batch.begin(); - // If we got a cursor id back, pre-fetch the next batch - if (_cursorId) { - GetMoreCommandRequest getMoreRequest(_cursorId, _ns.coll().toString()); - getMoreRequest.setBatchSize(_options.batchSize); - _runRemoteCommand(_createRequest(opCtx, getMoreRequest.toBSON({}))); + // If the previous response contained a cursorId and pre-fetching is enabled, schedule the + // getMore. + if ((_cursorId != kClosedCursorId) && _options.preFetchNextBatch) { + _scheduleGetMore(opCtx); } } +void TaskExecutorCursor::_scheduleGetMore(OperationContext* opCtx) { + // The previous response must have returned an open cursor ID. + invariant(_cursorId >= kMinLegalCursorId); + // There cannot be an existing in-flight request. + invariant(!_cmdState); + GetMoreCommandRequest getMoreRequest(_cursorId, _ns.coll().toString()); + getMoreRequest.setBatchSize(_options.batchSize); + _runRemoteCommand(_createRequest(opCtx, getMoreRequest.toBSON({}))); +} + void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { - invariant(_cmdState, "_getNextBatch() requires an async request to have already been sent."); + // If we don't have an in-flight request, schedule one. This will occur when the + // 'preFetchNextBatch' option is false. + if (!_cmdState) { + invariant(!_options.preFetchNextBatch); + _scheduleGetMore(opCtx); + } + + // There should be an in-flight request at this point, either sent asyncronously when we + // processed the previous response or just scheduled. + invariant(_cmdState); invariant(_cursorId != kClosedCursorId); auto clock = opCtx->getServiceContext()->getPreciseClockSource(); @@ -291,12 +309,15 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { tassert(6253100, "Expected at least one response for cursor", cursorResponses.size() > 0); CursorResponse cr = uassertStatusOK(std::move(cursorResponses[0])); _processResponse(opCtx, std::move(cr)); + // If we have more responses, build them into cursors then hold them until a caller accesses // them. Skip the first response, we used it to populate this cursor. // Ensure we update the RCR we give to each 'child cursor' with the current opCtx. auto freshRcr = _createRequest(opCtx, _rcr.cmdObj); auto copyOptions = [&] { TaskExecutorCursor::Options options; + // In the case that pinConnection is true, we need to ensure that additional cursors also + // pin their connection to the same socket as the original cursor. options.pinConnection = _options.pinConnection; return options; }; diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h index 458e010f976..1016f8dcfd0 100644 --- a/src/mongo/executor/task_executor_cursor.h +++ b/src/mongo/executor/task_executor_cursor.h @@ -54,8 +54,8 @@ namespace executor { * * The main differentiator for this type over DBClientCursor is the use of a task executor (which * provides access to a different connection pool, as well as interruptibility) and the ability to - * overlap getMores. This starts fetching the next batch as soon as one is exhausted (rather than - * on a call to getNext()). + * overlap getMores. This starts fetching the next batch as soon as the previous one is received + * (rather than on a call to 'getNext()'). */ class TaskExecutorCursor { public: @@ -70,6 +70,10 @@ public: struct Options { boost::optional<int64_t> batchSize; bool pinConnection{gPinTaskExecCursorConns.load()}; + // If true, we will fetch the next batch as soon as the current one is recieved. + // If false, we will fetch the next batch when the current batch is exhausted and + // 'getNext()' is invoked. + bool preFetchNextBatch{true}; Options() {} }; @@ -177,8 +181,7 @@ private: /** * Helper for '_getNextBatch' that handles the reading of the 'CursorResponse' object and - * storing of relevant values. This is also responsible for issuing a getMore request if it - * is required to populate the next batch. + * storing of relevant values. */ void _processResponse(OperationContext* opCtx, CursorResponse&& response); @@ -187,6 +190,14 @@ private: */ const RemoteCommandRequest& _createRequest(OperationContext* opCtx, const BSONObj& cmd); + /** + * Schedules a 'GetMore' request to run asyncronously. + * This function can only be invoked when: + * - There is no in-flight request ('_cmdState' is null). + * - We have an open '_cursorId'. + */ + void _scheduleGetMore(OperationContext* opCtx); + std::shared_ptr<executor::TaskExecutor> _executor; // If we are pinning connections, we need to keep a separate reference to the // non-pinning, normal executor, so that we can shut down the pinned executor diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp index eaece63fd6c..784dedddf48 100644 --- a/src/mongo/executor/task_executor_cursor_test.cpp +++ b/src/mongo/executor/task_executor_cursor_test.cpp @@ -492,6 +492,56 @@ public: th.join(); } + /** + * Test that if 'preFetchNextBatch' is false, the TaskExecutorCursor does not request GetMores + * until the current batch is exhausted and 'getNext()' is invoked. + */ + void NoPrefetchGetMore() { + CursorId cursorId = 1; + RemoteCommandRequest rcr(HostAndPort("localhost"), + "test", + BSON("search" + << "foo"), + opCtx.get()); + + // Construction of the TaskExecutorCursor enqueues a request in the NetworkInterfaceMock. + TaskExecutorCursor tec = makeTec(rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 2; + opts.preFetchNextBatch = false; + return opts; + }()); + + // Mock the response for the first batch. + scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId); + + // Exhaust the first batch. + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + + // Assert that the TaskExecutorCursor has not requested a GetMore. This enforces that + // 'preFetchNextBatch' works as expected. + ASSERT_FALSE(hasReadyRequests()); + + // As soon as 'getNext()' is invoked, the TaskExecutorCursor will try to send a GetMore and + // that will block this thread in the NetworkInterfaceMock until there is a scheduled + // response. However, we cannot schedule the cursor response on the main thread before we + // call 'getNext()' as that will cause the NetworkInterfaceMock to block until there is + // request enqueued ('getNext()' is the function which will enqueue such as request). + // To avoid this deadlock, we start a new thread which will schedule a response on the + // NetworkInterfaceMock. + stdx::thread t( + [this, cursorId] { scheduleSuccessfulCursorResponse("nextBatch", 3, 4, 0); }); + t.detach(); + + // Schedules the GetMore request and exhausts the cursor. + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 3); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 4); + + // Assert no GetMore is requested. + ASSERT_FALSE(hasReadyRequests()); + } + ServiceContext::UniqueServiceContext serviceCtx = ServiceContext::make(); ServiceContext::UniqueClient client; ServiceContext::UniqueOperationContext opCtx; @@ -505,10 +555,10 @@ public: launchExecutorThread(); } - BSONObj scheduleSuccessfulCursorResponse(StringData fieldName, - size_t start, - size_t end, - size_t cursorId) { + virtual BSONObj scheduleSuccessfulCursorResponse(StringData fieldName, + size_t start, + size_t end, + size_t cursorId) { NetworkInterfaceMock::InNetworkGuard ing(getNet()); @@ -653,10 +703,51 @@ public: } }; +class NoPrefetchTaskExecutorCursorTestFixture : public NonPinningTaskExecutorCursorTestFixture { +public: + TaskExecutorCursor makeTec(RemoteCommandRequest rcr, + TaskExecutorCursor::Options&& options = {}) { + options.preFetchNextBatch = false; + return TaskExecutorCursor(getExecutorPtr(), rcr, std::move(options)); + } + + BSONObj scheduleSuccessfulCursorResponse(StringData fieldName, + size_t start, + size_t end, + size_t cursorId) { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + // Don't assert that the network has requests like we do in other classes. This is to enable + // the test in 'NoPrefetchGetMore'. + auto rcr = + ing->scheduleSuccessfulResponse(buildCursorResponse(fieldName, start, end, cursorId)); + ing->runReadyNetworkOperations(); + return rcr.cmdObj.getOwned(); + } +}; + +class NoPrefetchPinnedTaskExecutorCursorTestFixture + : public PinnedConnTaskExecutorCursorTestFixture { +public: + TaskExecutorCursor makeTec(RemoteCommandRequest rcr, + TaskExecutorCursor::Options&& options = {}) { + options.preFetchNextBatch = false; + options.pinConnection = true; + return TaskExecutorCursor(getExecutorPtr(), rcr, std::move(options)); + } +}; + TEST_F(NonPinningTaskExecutorCursorTestFixture, SingleBatchWorks) { SingleBatchWorksTest(); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, SingleBatchWorks) { + SingleBatchWorksTest(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, SingleBatchWorks) { + SingleBatchWorksTest(); +} + TEST_F(PinnedConnTaskExecutorCursorTestFixture, SingleBatchWorks) { SingleBatchWorksTest(); } @@ -665,6 +756,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSuccee MultipleCursorsSingleBatchSucceedsTest(); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) { + MultipleCursorsSingleBatchSucceedsTest(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) { + MultipleCursorsSingleBatchSucceedsTest(); +} + TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) { MultipleCursorsSingleBatchSucceedsTest(); } @@ -674,14 +773,33 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest(); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, + ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) { + ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, + ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) { + ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest(); +} + TEST_F(PinnedConnTaskExecutorCursorTestFixture, ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) { ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest(); } + TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) { MultipleCursorsGetMoreWorksTest(); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) { + MultipleCursorsGetMoreWorksTest(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) { + MultipleCursorsGetMoreWorksTest(); +} + TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) { MultipleCursorsGetMoreWorksTest(); } @@ -690,13 +808,21 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, FailureInFind) { FailureInFindTest(); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, FailureInFind) { + FailureInFindTest(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, FailureInFind) { + FailureInFindTest(); +} + TEST_F(PinnedConnTaskExecutorCursorTestFixture, FailureInFind) { FailureInFindTest(); } /** * Ensure early termination of the cursor calls killCursor (if we know about the cursor id) - * Only applicapble to the unpinned case - if the connection is pinned, and a getMore is + * Only applicable to the unpinned case - if the connection is pinned, and a getMore is * in progress and/or fails, the most we can do is kill the connection. We can't re-use * the connection to send killCursors. */ @@ -730,6 +856,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleBatchesWorks) { MultipleBatchesWorksTest(); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, MultipleBatchesWorks) { + MultipleBatchesWorksTest(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, MultipleBatchesWorks) { + MultipleBatchesWorksTest(); +} + TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleBatchesWorks) { MultipleBatchesWorksTest(); } @@ -738,6 +872,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, EmptyFirstBatch) { EmptyFirstBatchTest(); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, EmptyFirstBatch) { + EmptyFirstBatchTest(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, EmptyFirstBatch) { + EmptyFirstBatchTest(); +} + TEST_F(PinnedConnTaskExecutorCursorTestFixture, EmptyFirstBatch) { EmptyFirstBatchTest(); } @@ -746,6 +888,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, EmptyNonInitialBatch) { EmptyNonInitialBatchTest(); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, EmptyNonInitialBatch) { + EmptyNonInitialBatchTest(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, EmptyNonInitialBatch) { + EmptyNonInitialBatchTest(); +} + TEST_F(PinnedConnTaskExecutorCursorTestFixture, EmptyNonInitialBatch) { EmptyNonInitialBatchTest(); } @@ -797,6 +947,14 @@ TEST_F(NonPinningTaskExecutorCursorTestFixture, LsidIsPassed) { ASSERT_FALSE(hasReadyRequests()); } +TEST_F(NoPrefetchTaskExecutorCursorTestFixture, NoPrefetchGetMore) { + NoPrefetchGetMore(); +} + +TEST_F(NoPrefetchPinnedTaskExecutorCursorTestFixture, NoPrefetchWithPinning) { + NoPrefetchGetMore(); +} + } // namespace } // namespace executor } // namespace mongo |