summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2022-03-02 22:20:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-02 23:11:24 +0000
commitf25675cc8ea9d89672ce063f49dbdaa39e63ce1b (patch)
treeb1867e1b1e3db4dc5083da673bd62aa4cc218767 /src/mongo/executor
parent27ce39ba637159ae0be6e7734b1d7f114af7141c (diff)
downloadmongo-f25675cc8ea9d89672ce063f49dbdaa39e63ce1b.tar.gz
SERVER-62535 Allow sharded aggregation to return two cursors
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/network_test_env.cpp4
-rw-r--r--src/mongo/executor/task_executor.h3
-rw-r--r--src/mongo/executor/task_executor_cursor.cpp19
-rw-r--r--src/mongo/executor/task_executor_cursor.h25
4 files changed, 47 insertions, 4 deletions
diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp
index a52c3ddf9b3..9a128705054 100644
--- a/src/mongo/executor/network_test_env.cpp
+++ b/src/mongo/executor/network_test_env.cpp
@@ -113,7 +113,7 @@ void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) {
const NamespaceString nss =
NamespaceString(request.dbname, request.cmdObj.firstElement().String());
BSONObjBuilder result;
- appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &result);
+ appendCursorResponseObject(0LL, nss.toString(), arr.arr(), boost::none, &result);
return result.obj();
});
@@ -139,7 +139,7 @@ void NetworkTestEnv::onFindWithMetadataCommand(OnFindCommandWithMetadataFunction
const NamespaceString nss =
NamespaceString(request.dbname, request.cmdObj.firstElement().String());
BSONObjBuilder resultBuilder(std::move(metadata));
- appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder);
+ appendCursorResponseObject(0LL, nss.toString(), arr.arr(), boost::none, &resultBuilder);
return RemoteCommandResponse(resultBuilder.obj(), Milliseconds(1));
});
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index ad8cab93436..2680fa3d916 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -410,7 +410,8 @@ public:
*
* NOTE: Do not call from a callback running in the executor.
*
- * Prefer the version that takes an OperationContext* to this version.
+ * Prefer passing an OperationContext* or other interruptible as the second argument to leaving
+ * as not interruptible.
*/
virtual void wait(const CallbackHandle& cbHandle,
Interruptible* interruptible = Interruptible::notInterruptible()) = 0;
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp
index 9bc01121b38..46d1bd846b4 100644
--- a/src/mongo/executor/task_executor_cursor.cpp
+++ b/src/mongo/executor/task_executor_cursor.cpp
@@ -86,6 +86,9 @@ TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other)
if (other._cursorVars) {
_cursorVars = other._cursorVars->getOwned();
}
+ if (other._cursorType) {
+ _cursorType = other._cursorType;
+ }
// Other is no longer responsible for this cursor id.
other._cursorId = 0;
// Other should not cancel the callback on destruction.
@@ -127,6 +130,19 @@ boost::optional<BSONObj> TaskExecutorCursor::getNext(OperationContext* opCtx) {
return std::move(*_batchIter++);
}
+void TaskExecutorCursor::populateCursor(OperationContext* opCtx) {
+ tassert(6253502,
+ "populateCursors should only be called before cursor is initialized",
+ _cursorId == kUnitializedCursorId);
+ tassert(6253503,
+ "populateCursors should only be called after a remote command has been run",
+ _cbHandle);
+ // We really only care about populating the cursor "first batch" fields, but at some point we'll
+ // have to do all of the work done by this function anyway. This would have been called by
+ // getNext() the first time it was called.
+ _getNextBatch(opCtx);
+}
+
const RemoteCommandRequest& TaskExecutorCursor::_createRequest(OperationContext* opCtx,
const BSONObj& cmd) {
// we pull this every time for updated client metadata
@@ -171,8 +187,9 @@ void TaskExecutorCursor::_processResponse(OperationContext* opCtx, CursorRespons
if (_cursorId == kUnitializedCursorId) {
_ns = response.getNSS();
_rcr.dbname = _ns.db().toString();
- // 'vars' are only included in the first batch.
+ // 'vars' and type are only included in the first batch.
_cursorVars = response.getVarsField();
+ _cursorType = response.getCursorType();
}
_cursorId = response.getCursorId();
diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h
index bfc916d0e13..f7f545de2a5 100644
--- a/src/mongo/executor/task_executor_cursor.h
+++ b/src/mongo/executor/task_executor_cursor.h
@@ -114,6 +114,16 @@ public:
*/
boost::optional<BSONObj> getNext(OperationContext* opCtx);
+ /**
+ * Read the response from the remote command issued by this cursor and parse it into this
+ * object. Performs the same work as getNext() above does on the first call to it, and so this
+ * can throw any error that getNext can throw.
+ *
+ * Should not be called once getNext() has been called or the cursor has been otherwise
+ * initialized.
+ */
+ void populateCursor(OperationContext* opCtx);
+
const CursorId getCursorId() const {
return _cursorId;
}
@@ -128,6 +138,10 @@ public:
return _cursorVars;
}
+ auto getType() {
+ return _cursorType;
+ }
+
long long getBatchNum() {
return _batchNum;
}
@@ -144,6 +158,14 @@ public:
return _additionalCursors.size();
}
+ /**
+ * Return the callback that this cursor is waiting on. Can be used to block on getting a
+ * response to this request. Can be boost::none.
+ */
+ auto getCallbackHandle() {
+ return _cbHandle;
+ }
+
private:
/**
* Runs a remote command and pipes the output back to this object
@@ -186,6 +208,9 @@ private:
// Variables sent alongside the results in the cursor.
boost::optional<BSONObj> _cursorVars = boost::none;
+ // For commands that return multiple cursors, the type of the cursor.
+ boost::optional<std::string> _cursorType;
+
// This is a sum of the time spent waiting on remote calls.
Milliseconds _millisecondsWaiting = Milliseconds(0);