From 724d7f8cd512d3afde931d3d34220e7eb05e5a2d Mon Sep 17 00:00:00 2001 From: Ted Tuckman Date: Thu, 10 Feb 2022 17:36:36 +0000 Subject: SERVER-62531 Add ability for TaskExecutorCursor to parse multiple cursors --- src/mongo/db/query/cursor_response.cpp | 15 ++- src/mongo/db/query/cursor_response.h | 6 +- src/mongo/executor/task_executor_cursor.cpp | 90 +++++++++++---- src/mongo/executor/task_executor_cursor.h | 40 +++++++ src/mongo/executor/task_executor_cursor_test.cpp | 135 +++++++++++++++++++++++ 5 files changed, 261 insertions(+), 25 deletions(-) diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index d31b6ab9274..b4042f559b1 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -159,7 +159,7 @@ std::vector> CursorResponse::parseFromBSONMany( << "Cursors array element contains non-object element: " << elt}); } else { - cursors.push_back(parseFromBSON(elt.Obj())); + cursors.push_back(parseFromBSON(elt.Obj(), &cmdResponse)); } } } @@ -167,7 +167,8 @@ std::vector> CursorResponse::parseFromBSONMany( return cursors; } -StatusWith CursorResponse::parseFromBSON(const BSONObj& cmdResponse) { +StatusWith CursorResponse::parseFromBSON(const BSONObj& cmdResponse, + const BSONObj* ownedObj) { Status cmdStatus = getStatusFromCommandResult(cmdResponse); if (!cmdStatus.isOK()) { return cmdStatus; @@ -231,8 +232,16 @@ StatusWith CursorResponse::parseFromBSON(const BSONObj& cmdRespo batch.push_back(elt.Obj()); } + tassert(6253102, + "Must own one of the two arguments if there are documents in the batch", + batch.size() == 0 || cmdResponse.isOwned() || (ownedObj && ownedObj->isOwned())); + for (auto& doc : batch) { - doc.shareOwnershipWith(cmdResponse); + if (ownedObj) { + doc.shareOwnershipWith(*ownedObj); + } else { + doc.shareOwnershipWith(cmdResponse); + } } auto postBatchResumeTokenElem = cursorObj[kPostBatchResumeTokenField]; diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index ce1950fe104..ca81974e78b 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -179,9 +179,11 @@ public: static std::vector> parseFromBSONMany(const BSONObj& cmdResponse); /** - * Constructs a CursorResponse from the command BSON response. + * Constructs a CursorResponse from the command BSON response. If 'cmdResponse' is not owned, + * the second argument should be the object that owns the response. */ - static StatusWith parseFromBSON(const BSONObj& cmdResponse); + static StatusWith parseFromBSON(const BSONObj& cmdResponse, + const BSONObj* ownedObj = nullptr); /** * A throwing version of 'parseFromBSON'. diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp index e22858df10f..9bc01121b38 100644 --- a/src/mongo/executor/task_executor_cursor.cpp +++ b/src/mongo/executor/task_executor_cursor.cpp @@ -34,7 +34,6 @@ #include "mongo/executor/task_executor_cursor.h" #include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_command_gen.h" #include "mongo/db/query/kill_cursors_gen.h" #include "mongo/util/scopeguard.h" @@ -55,6 +54,44 @@ TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor, _runRemoteCommand(_createRequest(_rcr.opCtx, _rcr.cmdObj)); } +TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor, + CursorResponse&& response, + RemoteCommandRequest& rcr, + Options&& options) + : _executor(executor), _rcr(rcr), _options(std::move(options)), _batchIter(_batch.end()) { + + tassert(6253101, "rcr must have an opCtx to use construct cursor from response", rcr.opCtx); + _lsid = rcr.opCtx->getLogicalSessionId(); + _processResponse(rcr.opCtx, std::move(response)); +} + +TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other) + : _executor(other._executor), + _rcr(other._rcr), + _options(std::move(other._options)), + _lsid(other._lsid), + _cbHandle(std::move(other._cbHandle)), + _cursorId(other._cursorId), + _millisecondsWaiting(other._millisecondsWaiting), + _ns(other._ns), + _batchNum(other._batchNum), + _pipe(std::move(other._pipe)), + _additionalCursors(std::move(other._additionalCursors)) { + // Copy the status of the batch. + auto batchIterIndex = other._batchIter - other._batch.begin(); + _batch = std::move(other._batch); + _batchIter = _batch.begin() + batchIterIndex; + + // Get owned copy of the vars. + if (other._cursorVars) { + _cursorVars = other._cursorVars->getOwned(); + } + // Other is no longer responsible for this cursor id. + other._cursorId = 0; + // Other should not cancel the callback on destruction. + other._cbHandle = boost::none; +} + TaskExecutorCursor::~TaskExecutorCursor() { try { if (_cbHandle) { @@ -129,6 +166,26 @@ void TaskExecutorCursor::_runRemoteCommand(const RemoteCommandRequest& rcr) { } })); } +void TaskExecutorCursor::_processResponse(OperationContext* opCtx, CursorResponse&& response) { + // If this was our first batch. + if (_cursorId == kUnitializedCursorId) { + _ns = response.getNSS(); + _rcr.dbname = _ns.db().toString(); + // 'vars' are only included in the first batch. + _cursorVars = response.getVarsField(); + } + + _cursorId = response.getCursorId(); + _batch = response.releaseBatch(); + _batchIter = _batch.begin(); + + // If we got a cursor id back, pre-fetch the next batch + if (_cursorId) { + GetMoreCommandRequest getMoreRequest(_cursorId, _ns.coll().toString()); + getMoreRequest.setBatchSize(_options.batchSize); + _runRemoteCommand(_createRequest(opCtx, getMoreRequest.toBSON({}))); + } +} void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { invariant(_cbHandle, "_getNextBatch() requires an async request to have already been sent."); @@ -154,25 +211,18 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { // is done. _cbHandle.reset(); - auto cr = uassertStatusOK(CursorResponse::parseFromBSON(out.getValue())); - - // If this was our first batch - if (_cursorId == kUnitializedCursorId) { - _ns = cr.getNSS(); - _rcr.dbname = _ns.db().toString(); - // 'vars' are only included in the first batch. - _cursorVars = cr.getVarsField(); - } - - _cursorId = cr.getCursorId(); - _batch = cr.releaseBatch(); - _batchIter = _batch.begin(); - - // If we got a cursor id back, pre-fetch the next batch - if (_cursorId) { - GetMoreCommandRequest getMoreRequest(_cursorId, _ns.coll().toString()); - getMoreRequest.setBatchSize(_options.batchSize); - _runRemoteCommand(_createRequest(opCtx, getMoreRequest.toBSON({}))); + // Parse into a vector in case the remote sent back multiple cursors. + auto cursorResponses = CursorResponse::parseFromBSONMany(out.getValue()); + tassert(6253100, "Expected at least one response for cursor", cursorResponses.size() > 0); + CursorResponse cr = uassertStatusOK(std::move(cursorResponses[0])); + _processResponse(opCtx, std::move(cr)); + // If we have more responses, build them into cursors then hold them until a caller accesses + // them. Skip the first response, we used it to populate this cursor. + for (unsigned int i = 1; i < cursorResponses.size(); ++i) { + _additionalCursors.emplace_back(_executor, + uassertStatusOK(std::move(cursorResponses[i])), + _rcr, + TaskExecutorCursor::Options()); } } diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h index 07643999579..bfc916d0e13 100644 --- a/src/mongo/executor/task_executor_cursor.h +++ b/src/mongo/executor/task_executor_cursor.h @@ -37,6 +37,7 @@ #include "mongo/db/cursor_id.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/cursor_response.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/task_executor.h" #include "mongo/util/duration.h" @@ -81,6 +82,22 @@ public: const RemoteCommandRequest& rcr, Options&& options = {}); + /** + * Construct the cursor from a cursor response from a previously executed RemoteCommandRequest. + * The executor is used for subsequent getMore calls. Uses the original RemoteCommandRequest + * to build subsequent commands. Takes ownership of the CursorResponse and gives it to the new + * cursor. + */ + TaskExecutorCursor(executor::TaskExecutor* executor, + CursorResponse&& response, + RemoteCommandRequest& rcr, + Options&& options = {}); + + /** + * Move constructor to enable storing cursors in vectors. + */ + TaskExecutorCursor(TaskExecutorCursor&& other); + /** * Asynchronously kills async ops and kills the underlying cursor on destruction. */ @@ -115,6 +132,18 @@ public: return _batchNum; } + /** + * Returns the vector of cursors that were returned alongside this one. Calling this claims + * ownership of the cursors and will return an empty vector on subsequent calls. + */ + std::vector releaseAdditionalCursors() { + return std::move(_additionalCursors); + } + + auto getNumAdditionalCursors() { + return _additionalCursors.size(); + } + private: /** * Runs a remote command and pipes the output back to this object @@ -126,6 +155,14 @@ private: */ void _getNextBatch(OperationContext* opCtx); + /** + * Helper for '_getNextBatch' that handles the reading of the 'CursorResponse' object and + * storing of relevant values. This is also responsible for issuing a getMore request if it + * is required to populate the next batch. + */ + void _processResponse(OperationContext* opCtx, CursorResponse&& response); + + /** * Create a new request, annotating with lsid and current opCtx */ @@ -163,6 +200,9 @@ private: // Multi producer because we hold onto the producer side in this object, as well as placing it // into callbacks for the task executor MultiProducerSingleConsumerQueue>::Pipe _pipe; + + // Cursors built from the responses returned alongside the results for this cursor. + std::vector _additionalCursors; }; } // namespace executor diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp index 4f9132fd387..09d1a1c301b 100644 --- a/src/mongo/executor/task_executor_cursor_test.cpp +++ b/src/mongo/executor/task_executor_cursor_test.cpp @@ -96,6 +96,41 @@ public: return rcr.cmdObj.getOwned(); } + BSONObj scheduleSuccessfulMultiCursorResponse(StringData fieldName, + size_t start, + size_t end, + std::vector cursorIds) { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + + BSONObjBuilder bob; + { + BSONArrayBuilder cursors; + int baseCursorValue = 1; + for (auto cursorId : cursorIds) { + BSONObjBuilder cursor; + BSONArrayBuilder batch; + ASSERT(start < end && end < INT_MAX); + for (size_t i = start; i <= end; ++i) { + batch.append(BSON("x" << static_cast(i) * baseCursorValue).getOwned()); + } + cursor.append(fieldName, batch.arr()); + cursor.append("id", (long long)(cursorId)); + cursor.append("ns", "test.test"); + auto cursorObj = BSON("cursor" << cursor.done() << "ok" << 1); + cursors.append(cursorObj.getOwned()); + ++baseCursorValue; + } + bob.append("cursors", cursors.arr()); + } + bob.append("ok", 1); + + ASSERT(getNet()->hasReadyRequests()); + auto rcr = getNet()->scheduleSuccessfulResponse(bob.obj()); + getNet()->runReadyNetworkOperations(); + + return rcr.cmdObj.getOwned(); + } + BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) { NetworkInterfaceMock::InNetworkGuard ing(getNet()); @@ -143,6 +178,106 @@ TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) { ASSERT_FALSE(tec.getNext(opCtx.get())); } +/** + * Ensure the firstBatch can be read correctly when multiple cursors are returned. + */ +TEST_F(TaskExecutorCursorFixture, MultipleCursorsSingleBatchSucceeds) { + const auto aggCmd = BSON("aggregate" + << "test" + << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true))); + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get()); + + TaskExecutorCursor tec(&getExecutor(), rcr); + + ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0})); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2); + + ASSERT_FALSE(tec.getNext(opCtx.get())); + + auto cursorVec = tec.releaseAdditionalCursors(); + ASSERT_EQUALS(cursorVec.size(), 1); + auto secondCursor = std::move(cursorVec[0]); + + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 2); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 4); + ASSERT_FALSE(hasReadyRequests()); + + ASSERT_FALSE(secondCursor.getNext(opCtx.get())); +} + +TEST_F(TaskExecutorCursorFixture, MultipleCursorsGetMoreWorks) { + const auto aggCmd = BSON("aggregate" + << "test" + << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true))); + + std::vector cursorIds{1, 2}; + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get()); + + TaskExecutorCursor tec(&getExecutor(), rcr); + + ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds)); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2); + + auto cursorVec = tec.releaseAdditionalCursors(); + ASSERT_EQUALS(cursorVec.size(), 1); + + // If we try to getNext() at this point, we are interruptible and can timeout + ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), + ErrorCodes::ExceededTimeLimit, + [&] { tec.getNext(opCtx.get()); }), + DBException, + ErrorCodes::ExceededTimeLimit); + + // We can pick up after that interruption though + ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection" + << "test"), + scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorIds[0])); + + // Repeat for second cursor. + auto secondCursor = std::move(cursorVec[0]); + + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 2); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 4); + + ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), + ErrorCodes::ExceededTimeLimit, + [&] { secondCursor.getNext(opCtx.get()); }), + DBException, + ErrorCodes::ExceededTimeLimit); + + ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection" + << "test"), + scheduleSuccessfulCursorResponse("nextBatch", 6, 8, cursorIds[1])); + // Read second batch on both cursors. + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 3); + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 4); + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 5); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 6); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 7); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 8); + + // Schedule EOF on both cursors. + scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0); + scheduleSuccessfulCursorResponse("nextBatch", 12, 12, 0); + + // Read final document. + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 6); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 12); + + // Shouldn't have any more requests, both cursors are closed. + ASSERT_FALSE(hasReadyRequests()); + + ASSERT_FALSE(tec.getNext(opCtx.get())); + ASSERT_FALSE(secondCursor.getNext(opCtx.get())); +} + /** * Ensure we work if find fails (and that we receive the error code it failed with) */ -- cgit v1.2.1