summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2022-02-10 17:36:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-10 18:25:19 +0000
commit724d7f8cd512d3afde931d3d34220e7eb05e5a2d (patch)
tree99397b45f1892a3e012d8fa01394ccdf0a126a34
parentd8ff665343ad29cf286ee2cf4a1960d29371937b (diff)
downloadmongo-724d7f8cd512d3afde931d3d34220e7eb05e5a2d.tar.gz
SERVER-62531 Add ability for TaskExecutorCursor to parse multiple cursors
-rw-r--r--src/mongo/db/query/cursor_response.cpp15
-rw-r--r--src/mongo/db/query/cursor_response.h6
-rw-r--r--src/mongo/executor/task_executor_cursor.cpp90
-rw-r--r--src/mongo/executor/task_executor_cursor.h40
-rw-r--r--src/mongo/executor/task_executor_cursor_test.cpp135
5 files changed, 261 insertions, 25 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index d31b6ab9274..b4042f559b1 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -159,7 +159,7 @@ std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany(
<< "Cursors array element contains non-object element: "
<< elt});
} else {
- cursors.push_back(parseFromBSON(elt.Obj()));
+ cursors.push_back(parseFromBSON(elt.Obj(), &cmdResponse));
}
}
}
@@ -167,7 +167,8 @@ std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany(
return cursors;
}
-StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdResponse) {
+StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdResponse,
+ const BSONObj* ownedObj) {
Status cmdStatus = getStatusFromCommandResult(cmdResponse);
if (!cmdStatus.isOK()) {
return cmdStatus;
@@ -231,8 +232,16 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
batch.push_back(elt.Obj());
}
+ tassert(6253102,
+ "Must own one of the two arguments if there are documents in the batch",
+ batch.size() == 0 || cmdResponse.isOwned() || (ownedObj && ownedObj->isOwned()));
+
for (auto& doc : batch) {
- doc.shareOwnershipWith(cmdResponse);
+ if (ownedObj) {
+ doc.shareOwnershipWith(*ownedObj);
+ } else {
+ doc.shareOwnershipWith(cmdResponse);
+ }
}
auto postBatchResumeTokenElem = cursorObj[kPostBatchResumeTokenField];
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index ce1950fe104..ca81974e78b 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -179,9 +179,11 @@ public:
static std::vector<StatusWith<CursorResponse>> parseFromBSONMany(const BSONObj& cmdResponse);
/**
- * Constructs a CursorResponse from the command BSON response.
+ * Constructs a CursorResponse from the command BSON response. If 'cmdResponse' is not owned,
+ * the second argument should be the object that owns the response.
*/
- static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse);
+ static StatusWith<CursorResponse> parseFromBSON(const BSONObj& cmdResponse,
+ const BSONObj* ownedObj = nullptr);
/**
* A throwing version of 'parseFromBSON'.
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp
index e22858df10f..9bc01121b38 100644
--- a/src/mongo/executor/task_executor_cursor.cpp
+++ b/src/mongo/executor/task_executor_cursor.cpp
@@ -34,7 +34,6 @@
#include "mongo/executor/task_executor_cursor.h"
#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_command_gen.h"
#include "mongo/db/query/kill_cursors_gen.h"
#include "mongo/util/scopeguard.h"
@@ -55,6 +54,44 @@ TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor,
_runRemoteCommand(_createRequest(_rcr.opCtx, _rcr.cmdObj));
}
+TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor,
+ CursorResponse&& response,
+ RemoteCommandRequest& rcr,
+ Options&& options)
+ : _executor(executor), _rcr(rcr), _options(std::move(options)), _batchIter(_batch.end()) {
+
+ tassert(6253101, "rcr must have an opCtx to use construct cursor from response", rcr.opCtx);
+ _lsid = rcr.opCtx->getLogicalSessionId();
+ _processResponse(rcr.opCtx, std::move(response));
+}
+
+TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other)
+ : _executor(other._executor),
+ _rcr(other._rcr),
+ _options(std::move(other._options)),
+ _lsid(other._lsid),
+ _cbHandle(std::move(other._cbHandle)),
+ _cursorId(other._cursorId),
+ _millisecondsWaiting(other._millisecondsWaiting),
+ _ns(other._ns),
+ _batchNum(other._batchNum),
+ _pipe(std::move(other._pipe)),
+ _additionalCursors(std::move(other._additionalCursors)) {
+ // Copy the status of the batch.
+ auto batchIterIndex = other._batchIter - other._batch.begin();
+ _batch = std::move(other._batch);
+ _batchIter = _batch.begin() + batchIterIndex;
+
+ // Get owned copy of the vars.
+ if (other._cursorVars) {
+ _cursorVars = other._cursorVars->getOwned();
+ }
+ // Other is no longer responsible for this cursor id.
+ other._cursorId = 0;
+ // Other should not cancel the callback on destruction.
+ other._cbHandle = boost::none;
+}
+
TaskExecutorCursor::~TaskExecutorCursor() {
try {
if (_cbHandle) {
@@ -129,6 +166,26 @@ void TaskExecutorCursor::_runRemoteCommand(const RemoteCommandRequest& rcr) {
}
}));
}
+void TaskExecutorCursor::_processResponse(OperationContext* opCtx, CursorResponse&& response) {
+ // If this was our first batch.
+ if (_cursorId == kUnitializedCursorId) {
+ _ns = response.getNSS();
+ _rcr.dbname = _ns.db().toString();
+ // 'vars' are only included in the first batch.
+ _cursorVars = response.getVarsField();
+ }
+
+ _cursorId = response.getCursorId();
+ _batch = response.releaseBatch();
+ _batchIter = _batch.begin();
+
+ // If we got a cursor id back, pre-fetch the next batch
+ if (_cursorId) {
+ GetMoreCommandRequest getMoreRequest(_cursorId, _ns.coll().toString());
+ getMoreRequest.setBatchSize(_options.batchSize);
+ _runRemoteCommand(_createRequest(opCtx, getMoreRequest.toBSON({})));
+ }
+}
void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
invariant(_cbHandle, "_getNextBatch() requires an async request to have already been sent.");
@@ -154,25 +211,18 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
// is done.
_cbHandle.reset();
- auto cr = uassertStatusOK(CursorResponse::parseFromBSON(out.getValue()));
-
- // If this was our first batch
- if (_cursorId == kUnitializedCursorId) {
- _ns = cr.getNSS();
- _rcr.dbname = _ns.db().toString();
- // 'vars' are only included in the first batch.
- _cursorVars = cr.getVarsField();
- }
-
- _cursorId = cr.getCursorId();
- _batch = cr.releaseBatch();
- _batchIter = _batch.begin();
-
- // If we got a cursor id back, pre-fetch the next batch
- if (_cursorId) {
- GetMoreCommandRequest getMoreRequest(_cursorId, _ns.coll().toString());
- getMoreRequest.setBatchSize(_options.batchSize);
- _runRemoteCommand(_createRequest(opCtx, getMoreRequest.toBSON({})));
+ // Parse into a vector in case the remote sent back multiple cursors.
+ auto cursorResponses = CursorResponse::parseFromBSONMany(out.getValue());
+ tassert(6253100, "Expected at least one response for cursor", cursorResponses.size() > 0);
+ CursorResponse cr = uassertStatusOK(std::move(cursorResponses[0]));
+ _processResponse(opCtx, std::move(cr));
+ // If we have more responses, build them into cursors then hold them until a caller accesses
+ // them. Skip the first response, we used it to populate this cursor.
+ for (unsigned int i = 1; i < cursorResponses.size(); ++i) {
+ _additionalCursors.emplace_back(_executor,
+ uassertStatusOK(std::move(cursorResponses[i])),
+ _rcr,
+ TaskExecutorCursor::Options());
}
}
diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h
index 07643999579..bfc916d0e13 100644
--- a/src/mongo/executor/task_executor_cursor.h
+++ b/src/mongo/executor/task_executor_cursor.h
@@ -37,6 +37,7 @@
#include "mongo/db/cursor_id.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/cursor_response.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/duration.h"
@@ -82,6 +83,22 @@ public:
Options&& options = {});
/**
+ * Construct the cursor from a cursor response from a previously executed RemoteCommandRequest.
+ * The executor is used for subsequent getMore calls. Uses the original RemoteCommandRequest
+ * to build subsequent commands. Takes ownership of the CursorResponse and gives it to the new
+ * cursor.
+ */
+ TaskExecutorCursor(executor::TaskExecutor* executor,
+ CursorResponse&& response,
+ RemoteCommandRequest& rcr,
+ Options&& options = {});
+
+ /**
+ * Move constructor to enable storing cursors in vectors.
+ */
+ TaskExecutorCursor(TaskExecutorCursor&& other);
+
+ /**
* Asynchronously kills async ops and kills the underlying cursor on destruction.
*/
~TaskExecutorCursor();
@@ -115,6 +132,18 @@ public:
return _batchNum;
}
+ /**
+ * Returns the vector of cursors that were returned alongside this one. Calling this claims
+ * ownership of the cursors and will return an empty vector on subsequent calls.
+ */
+ std::vector<TaskExecutorCursor> releaseAdditionalCursors() {
+ return std::move(_additionalCursors);
+ }
+
+ auto getNumAdditionalCursors() {
+ return _additionalCursors.size();
+ }
+
private:
/**
* Runs a remote command and pipes the output back to this object
@@ -127,6 +156,14 @@ private:
void _getNextBatch(OperationContext* opCtx);
/**
+ * Helper for '_getNextBatch' that handles the reading of the 'CursorResponse' object and
+ * storing of relevant values. This is also responsible for issuing a getMore request if it
+ * is required to populate the next batch.
+ */
+ void _processResponse(OperationContext* opCtx, CursorResponse&& response);
+
+
+ /**
* Create a new request, annotating with lsid and current opCtx
*/
const RemoteCommandRequest& _createRequest(OperationContext* opCtx, const BSONObj& cmd);
@@ -163,6 +200,9 @@ private:
// Multi producer because we hold onto the producer side in this object, as well as placing it
// into callbacks for the task executor
MultiProducerSingleConsumerQueue<StatusWith<BSONObj>>::Pipe _pipe;
+
+ // Cursors built from the responses returned alongside the results for this cursor.
+ std::vector<TaskExecutorCursor> _additionalCursors;
};
} // namespace executor
diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp
index 4f9132fd387..09d1a1c301b 100644
--- a/src/mongo/executor/task_executor_cursor_test.cpp
+++ b/src/mongo/executor/task_executor_cursor_test.cpp
@@ -96,6 +96,41 @@ public:
return rcr.cmdObj.getOwned();
}
+ BSONObj scheduleSuccessfulMultiCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ std::vector<size_t> cursorIds) {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+
+ BSONObjBuilder bob;
+ {
+ BSONArrayBuilder cursors;
+ int baseCursorValue = 1;
+ for (auto cursorId : cursorIds) {
+ BSONObjBuilder cursor;
+ BSONArrayBuilder batch;
+ ASSERT(start < end && end < INT_MAX);
+ for (size_t i = start; i <= end; ++i) {
+ batch.append(BSON("x" << static_cast<int>(i) * baseCursorValue).getOwned());
+ }
+ cursor.append(fieldName, batch.arr());
+ cursor.append("id", (long long)(cursorId));
+ cursor.append("ns", "test.test");
+ auto cursorObj = BSON("cursor" << cursor.done() << "ok" << 1);
+ cursors.append(cursorObj.getOwned());
+ ++baseCursorValue;
+ }
+ bob.append("cursors", cursors.arr());
+ }
+ bob.append("ok", 1);
+
+ ASSERT(getNet()->hasReadyRequests());
+ auto rcr = getNet()->scheduleSuccessfulResponse(bob.obj());
+ getNet()->runReadyNetworkOperations();
+
+ return rcr.cmdObj.getOwned();
+ }
+
BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) {
NetworkInterfaceMock::InNetworkGuard ing(getNet());
@@ -144,6 +179,106 @@ TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) {
}
/**
+ * Ensure the firstBatch can be read correctly when multiple cursors are returned.
+ */
+TEST_F(TaskExecutorCursorFixture, MultipleCursorsSingleBatchSucceeds) {
+ const auto aggCmd = BSON("aggregate"
+ << "test"
+ << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true)));
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get());
+
+ TaskExecutorCursor tec(&getExecutor(), rcr);
+
+ ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0}));
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
+
+ auto cursorVec = tec.releaseAdditionalCursors();
+ ASSERT_EQUALS(cursorVec.size(), 1);
+ auto secondCursor = std::move(cursorVec[0]);
+
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 2);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 4);
+ ASSERT_FALSE(hasReadyRequests());
+
+ ASSERT_FALSE(secondCursor.getNext(opCtx.get()));
+}
+
+TEST_F(TaskExecutorCursorFixture, MultipleCursorsGetMoreWorks) {
+ const auto aggCmd = BSON("aggregate"
+ << "test"
+ << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true)));
+
+ std::vector<size_t> cursorIds{1, 2};
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get());
+
+ TaskExecutorCursor tec(&getExecutor(), rcr);
+
+ ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds));
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+
+ auto cursorVec = tec.releaseAdditionalCursors();
+ ASSERT_EQUALS(cursorVec.size(), 1);
+
+ // If we try to getNext() at this point, we are interruptible and can timeout
+ ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
+ ErrorCodes::ExceededTimeLimit,
+ [&] { tec.getNext(opCtx.get()); }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+
+ // We can pick up after that interruption though
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection"
+ << "test"),
+ scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorIds[0]));
+
+ // Repeat for second cursor.
+ auto secondCursor = std::move(cursorVec[0]);
+
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 2);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 4);
+
+ ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
+ ErrorCodes::ExceededTimeLimit,
+ [&] { secondCursor.getNext(opCtx.get()); }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection"
+ << "test"),
+ scheduleSuccessfulCursorResponse("nextBatch", 6, 8, cursorIds[1]));
+ // Read second batch on both cursors.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 3);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 4);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 5);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 6);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 7);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 8);
+
+ // Schedule EOF on both cursors.
+ scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0);
+ scheduleSuccessfulCursorResponse("nextBatch", 12, 12, 0);
+
+ // Read final document.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 6);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 12);
+
+ // Shouldn't have any more requests, both cursors are closed.
+ ASSERT_FALSE(hasReadyRequests());
+
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
+ ASSERT_FALSE(secondCursor.getNext(opCtx.get()));
+}
+
+/**
* Ensure we work if find fails (and that we receive the error code it failed with)
*/
TEST_F(TaskExecutorCursorFixture, FailureInFind) {