diff options
author | Ted Tuckman <ted.tuckman@mongodb.com> | 2022-03-02 22:20:32 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-02 23:11:24 +0000 |
commit | f25675cc8ea9d89672ce063f49dbdaa39e63ce1b (patch) | |
tree | b1867e1b1e3db4dc5083da673bd62aa4cc218767 /src/mongo/executor | |
parent | 27ce39ba637159ae0be6e7734b1d7f114af7141c (diff) | |
download | mongo-f25675cc8ea9d89672ce063f49dbdaa39e63ce1b.tar.gz |
SERVER-62535 Allow sharded aggregation to return two cursors
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/network_test_env.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor.cpp | 19 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor.h | 25 |
4 files changed, 47 insertions, 4 deletions
diff --git a/src/mongo/executor/network_test_env.cpp b/src/mongo/executor/network_test_env.cpp index a52c3ddf9b3..9a128705054 100644 --- a/src/mongo/executor/network_test_env.cpp +++ b/src/mongo/executor/network_test_env.cpp @@ -113,7 +113,7 @@ void NetworkTestEnv::onFindCommand(OnFindCommandFunction func) { const NamespaceString nss = NamespaceString(request.dbname, request.cmdObj.firstElement().String()); BSONObjBuilder result; - appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &result); + appendCursorResponseObject(0LL, nss.toString(), arr.arr(), boost::none, &result); return result.obj(); }); @@ -139,7 +139,7 @@ void NetworkTestEnv::onFindWithMetadataCommand(OnFindCommandWithMetadataFunction const NamespaceString nss = NamespaceString(request.dbname, request.cmdObj.firstElement().String()); BSONObjBuilder resultBuilder(std::move(metadata)); - appendCursorResponseObject(0LL, nss.toString(), arr.arr(), &resultBuilder); + appendCursorResponseObject(0LL, nss.toString(), arr.arr(), boost::none, &resultBuilder); return RemoteCommandResponse(resultBuilder.obj(), Milliseconds(1)); }); diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index ad8cab93436..2680fa3d916 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -410,7 +410,8 @@ public: * * NOTE: Do not call from a callback running in the executor. * - * Prefer the version that takes an OperationContext* to this version. + * Prefer passing an OperationContext* or other interruptible as the second argument to leaving + * as not interruptible. */ virtual void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) = 0; diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp index 9bc01121b38..46d1bd846b4 100644 --- a/src/mongo/executor/task_executor_cursor.cpp +++ b/src/mongo/executor/task_executor_cursor.cpp @@ -86,6 +86,9 @@ TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other) if (other._cursorVars) { _cursorVars = other._cursorVars->getOwned(); } + if (other._cursorType) { + _cursorType = other._cursorType; + } // Other is no longer responsible for this cursor id. other._cursorId = 0; // Other should not cancel the callback on destruction. @@ -127,6 +130,19 @@ boost::optional<BSONObj> TaskExecutorCursor::getNext(OperationContext* opCtx) { return std::move(*_batchIter++); } +void TaskExecutorCursor::populateCursor(OperationContext* opCtx) { + tassert(6253502, + "populateCursors should only be called before cursor is initialized", + _cursorId == kUnitializedCursorId); + tassert(6253503, + "populateCursors should only be called after a remote command has been run", + _cbHandle); + // We really only care about populating the cursor "first batch" fields, but at some point we'll + // have to do all of the work done by this function anyway. This would have been called by + // getNext() the first time it was called. + _getNextBatch(opCtx); +} + const RemoteCommandRequest& TaskExecutorCursor::_createRequest(OperationContext* opCtx, const BSONObj& cmd) { // we pull this every time for updated client metadata @@ -171,8 +187,9 @@ void TaskExecutorCursor::_processResponse(OperationContext* opCtx, CursorRespons if (_cursorId == kUnitializedCursorId) { _ns = response.getNSS(); _rcr.dbname = _ns.db().toString(); - // 'vars' are only included in the first batch. + // 'vars' and type are only included in the first batch. _cursorVars = response.getVarsField(); + _cursorType = response.getCursorType(); } _cursorId = response.getCursorId(); diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h index bfc916d0e13..f7f545de2a5 100644 --- a/src/mongo/executor/task_executor_cursor.h +++ b/src/mongo/executor/task_executor_cursor.h @@ -114,6 +114,16 @@ public: */ boost::optional<BSONObj> getNext(OperationContext* opCtx); + /** + * Read the response from the remote command issued by this cursor and parse it into this + * object. Performs the same work as getNext() above does on the first call to it, and so this + * can throw any error that getNext can throw. + * + * Should not be called once getNext() has been called or the cursor has been otherwise + * initialized. + */ + void populateCursor(OperationContext* opCtx); + const CursorId getCursorId() const { return _cursorId; } @@ -128,6 +138,10 @@ public: return _cursorVars; } + auto getType() { + return _cursorType; + } + long long getBatchNum() { return _batchNum; } @@ -144,6 +158,14 @@ public: return _additionalCursors.size(); } + /** + * Return the callback that this cursor is waiting on. Can be used to block on getting a + * response to this request. Can be boost::none. + */ + auto getCallbackHandle() { + return _cbHandle; + } + private: /** * Runs a remote command and pipes the output back to this object @@ -186,6 +208,9 @@ private: // Variables sent alongside the results in the cursor. boost::optional<BSONObj> _cursorVars = boost::none; + // For commands that return multiple cursors, the type of the cursor. + boost::optional<std::string> _cursorType; + // This is a sum of the time spent waiting on remote calls. Milliseconds _millisecondsWaiting = Milliseconds(0); |