diff options
author | George Wangensteen <george.wangensteen@10gen.com> | 2019-08-05 17:18:36 -0400 |
---|---|---|
committer | George Wangensteen <george.wangensteen@10gen.com> | 2019-08-09 15:57:02 -0400 |
commit | d9efdd52ccc13a0be3fe1ff4c6c575fb98ae2bf4 (patch) | |
tree | b9b420ad080ac1f579d62638fe28301a05e6aa86 /src/mongo/s | |
parent | 7d8dbd3bb7900e470125158d81c8536032a7b5c8 (diff) | |
download | mongo-d9efdd52ccc13a0be3fe1ff4c6c575fb98ae2bf4.tar.gz |
SERVER-42456 Improve access to cursor underlying a ClusterCursorManager::PinnedCursor
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.cpp | 112 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.h | 122 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager_test.cpp | 34 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 36 |
4 files changed, 52 insertions, 252 deletions
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index f5b3290a59a..e31f42793b6 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -112,28 +112,6 @@ ClusterCursorManager::PinnedCursor& ClusterCursorManager::PinnedCursor::operator return *this; } -StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next( - RouterExecStage::ExecContext execContext) { - invariant(_cursor); - return _cursor->next(execContext); -} - -bool ClusterCursorManager::PinnedCursor::isTailable() const { - invariant(_cursor); - return _cursor->isTailable(); -} - -bool ClusterCursorManager::PinnedCursor::isTailableAndAwaitData() const { - invariant(_cursor); - return _cursor->isTailableAndAwaitData(); -} - -boost::optional<ReadPreferenceSetting> ClusterCursorManager::PinnedCursor::getReadPreference() - const { - invariant(_cursor); - return _cursor->getReadPreference(); -} - void ClusterCursorManager::PinnedCursor::returnCursor(CursorState cursorState) { invariant(_cursor); // Note that unpinning a cursor transfers ownership of the underlying ClusterClientCursor object @@ -142,88 +120,26 @@ void ClusterCursorManager::PinnedCursor::returnCursor(CursorState cursorState) { *this = PinnedCursor(); } -BSONObj ClusterCursorManager::PinnedCursor::getOriginatingCommand() const { - invariant(_cursor); - return _cursor->getOriginatingCommand(); -} - -const PrivilegeVector& ClusterCursorManager::PinnedCursor::getOriginatingPrivileges() const& { - invariant(_cursor); - return _cursor->getOriginatingPrivileges(); -} - -std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const { - invariant(_cursor); - return _cursor->getNumRemotes(); -} - -BSONObj ClusterCursorManager::PinnedCursor::getPostBatchResumeToken() const { - invariant(_cursor); - return _cursor->getPostBatchResumeToken(); -} - CursorId ClusterCursorManager::PinnedCursor::getCursorId() const { return _cursorId; } -long long ClusterCursorManager::PinnedCursor::getNumReturnedSoFar() const { - invariant(_cursor); - return _cursor->getNumReturnedSoFar(); -} - -Date_t ClusterCursorManager::PinnedCursor::getLastUseDate() const { - invariant(_cursor); - return _cursor->getLastUseDate(); -} - -void ClusterCursorManager::PinnedCursor::setLastUseDate(Date_t now) { - invariant(_cursor); - _cursor->setLastUseDate(now); -} -Date_t ClusterCursorManager::PinnedCursor::getCreatedDate() const { - invariant(_cursor); - return _cursor->getCreatedDate(); -} -void ClusterCursorManager::PinnedCursor::incNBatches() { - invariant(_cursor); - return _cursor->incNBatches(); -} - -long long ClusterCursorManager::PinnedCursor::getNBatches() const { - invariant(_cursor); - return _cursor->getNBatches(); -} - -void ClusterCursorManager::PinnedCursor::queueResult(const ClusterQueryResult& result) { - invariant(_cursor); - _cursor->queueResult(result); -} - -bool ClusterCursorManager::PinnedCursor::remotesExhausted() { - invariant(_cursor); - return _cursor->remotesExhausted(); -} - GenericCursor ClusterCursorManager::PinnedCursor::toGenericCursor() const { + invariant(_cursor); GenericCursor gc; gc.setCursorId(getCursorId()); gc.setNs(_nss); - gc.setLsid(getLsid()); - gc.setNDocsReturned(getNumReturnedSoFar()); - gc.setTailable(isTailable()); - gc.setAwaitData(isTailableAndAwaitData()); - gc.setOriginatingCommand(getOriginatingCommand()); - gc.setLastAccessDate(getLastUseDate()); - gc.setCreatedDate(getCreatedDate()); - gc.setNBatchesReturned(getNBatches()); + gc.setLsid(_cursor->getLsid()); + gc.setNDocsReturned(_cursor->getNumReturnedSoFar()); + gc.setTailable(_cursor->isTailable()); + gc.setAwaitData(_cursor->isTailableAndAwaitData()); + gc.setOriginatingCommand(_cursor->getOriginatingCommand()); + gc.setLastAccessDate(_cursor->getLastUseDate()); + gc.setCreatedDate(_cursor->getCreatedDate()); + gc.setNBatchesReturned(_cursor->getNBatches()); return gc; } -Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { - invariant(_cursor); - return _cursor->setAwaitDataTimeout(awaitDataTimeout); -} - void ClusterCursorManager::PinnedCursor::returnAndKillCursor() { invariant(_cursor); @@ -231,16 +147,6 @@ void ClusterCursorManager::PinnedCursor::returnAndKillCursor() { returnCursor(CursorState::Exhausted); } -boost::optional<LogicalSessionId> ClusterCursorManager::PinnedCursor::getLsid() const { - invariant(_cursor); - return _cursor->getLsid(); -} - -boost::optional<TxnNumber> ClusterCursorManager::PinnedCursor::getTxnNumber() const { - invariant(_cursor); - return _cursor->getTxnNumber(); -} - ClusterCursorManager::ClusterCursorManager(ClockSource* clockSource) : _clockSource(clockSource), _pseudoRandom(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64()) { diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index 99ded693398..c8c7fa2ced2 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -159,25 +159,13 @@ public: PinnedCursor& operator=(PinnedCursor&& other); /** - * Calls next() on the underlying cursor. Cannot be called after returnCursor() is called. - * A cursor must be owned. - * - * Can block. + * Returns a pointer to the ClusterClientCursor that this PinnedCursor owns. A cursor must + * be owned. */ - StatusWith<ClusterQueryResult> next(RouterExecStage::ExecContext); - - /** - * Returns whether or not the underlying cursor is tailing a capped collection. Cannot be - * called after returnCursor() is called. A cursor must be owned. - */ - bool isTailable() const; - - /** - * Returns whether or not the underlying cursor is tailing a capped collection and was - * created with the 'awaitData' flag set. Cannot be called after returnCursor() is called. - * A cursor must be owned. - */ - bool isTailableAndAwaitData() const; + ClusterClientCursor* operator->() const { + invariant(_cursor); + return _cursor.get(); + } /** * Transfers ownership of the underlying cursor back to the manager, and detaches it from @@ -190,113 +178,15 @@ public: void returnCursor(CursorState cursorState); /** - * Returns the command object which originally created this cursor. - */ - BSONObj getOriginatingCommand() const; - - /** - * Returns the privleges for the original command object which created this cursor. - */ - - const PrivilegeVector& getOriginatingPrivileges() const&; - void getOriginatingPrivileges() && = delete; - - /** - * Returns a reference to the vector of remote hosts involved in this operation. - */ - std::size_t getNumRemotes() const; - - /** - * If applicable, returns the current most-recent resume token for this cursor. - */ - BSONObj getPostBatchResumeToken() const; - - /** * Returns the cursor id for the underlying cursor, or zero if no cursor is owned. */ CursorId getCursorId() const; /** - * Returns the read preference setting for this cursor. - */ - boost::optional<ReadPreferenceSetting> getReadPreference() const; - - /** - * Returns the number of result documents returned so far by this cursor via the next() - * method. - */ - long long getNumReturnedSoFar() const; - - /** - * Returns the creation date of the cursor. - */ - Date_t getCreatedDate() const; - - /** - * Returns the time the cursor was last used. - */ - Date_t getLastUseDate() const; - - /** - * Set the cursor's lastUseDate to the given time. - */ - void setLastUseDate(Date_t now); - - /** - * Increment the number of batches returned by this cursor. - */ - void incNBatches(); - - /** - * Get the number of batches returned by this cursor. - */ - long long getNBatches() const; - - /** * Returns a GenericCursor version of the pinned cursor. */ GenericCursor toGenericCursor() const; - /** - * Stashes 'obj' to be returned later by this cursor. A cursor must be owned. - */ - void queueResult(const ClusterQueryResult& result); - - /** - * Returns whether or not all the remote cursors underlying this cursor have been - * exhausted. Cannot be called after returnCursor() is called. A cursor must be owned. - */ - bool remotesExhausted(); - - /** - * Sets the maxTimeMS value that the cursor should forward with any internally issued - * getMore requests. A cursor must be owned. - * - * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. - * if the cursor is not tailable + awaitData). - */ - Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); - - /** - * Returns the logical session id of the command that created the underlying cursor. - */ - boost::optional<LogicalSessionId> getLsid() const; - - /** - * Returns the transaction number of the command that created the underlying cursor. - */ - boost::optional<TxnNumber> getTxnNumber() const; - - Microseconds getLeftoverMaxTimeMicros() const { - invariant(_cursor); - return _cursor->getLeftoverMaxTimeMicros(); - } - - void setLeftoverMaxTimeMicros(Microseconds leftoverMaxTimeMicros) { - invariant(_cursor); - _cursor->setLeftoverMaxTimeMicros(leftoverMaxTimeMicros); - } - private: // ClusterCursorManager is a friend so that its methods can call the PinnedCursor // constructor declared below, which is private to prevent clients from calling it directly. diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp index ddd2adaa516..44be5f0eb7e 100644 --- a/src/mongo/s/query/cluster_cursor_manager_test.cpp +++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp @@ -147,11 +147,11 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - auto nextResult = pinnedCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind); + auto nextResult = pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult()); - nextResult = pinnedCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind); + nextResult = pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } @@ -183,11 +183,11 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) { getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(checkedOutCursor.getStatus()); ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId()); - auto nextResult = checkedOutCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind); + auto nextResult = checkedOutCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult()); - nextResult = checkedOutCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind); + nextResult = checkedOutCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } @@ -212,11 +212,11 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - auto nextResult = pinnedCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind); + auto nextResult = pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << i), *nextResult.getValue().getResult()); - nextResult = pinnedCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind); + nextResult = pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } @@ -672,7 +672,8 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind).getStatus()); + ASSERT_OK( + pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget); @@ -690,7 +691,8 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind).getStatus()); + ASSERT_OK( + pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget); @@ -709,7 +711,8 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind).getStatus()); + ASSERT_OK( + pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); @@ -728,7 +731,8 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind).getStatus()); + ASSERT_OK( + pinnedCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); @@ -845,7 +849,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); ASSERT_OK( - registeredCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind).getStatus()); + registeredCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); @@ -876,7 +880,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); ASSERT_OK( - registeredCursor.getValue().next(RouterExecStage::ExecContext::kInitialFind).getStatus()); + registeredCursor.getValue()->next(RouterExecStage::ExecContext::kInitialFind).getStatus()); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); @@ -934,7 +938,7 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, _opCtx.get(), successAuthChecker); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_FALSE(pinnedCursor.getValue().remotesExhausted()); + ASSERT_FALSE(pinnedCursor.getValue()->remotesExhausted()); } // Test that killed cursors which are still pinned are not destroyed immediately. @@ -1256,8 +1260,8 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnsUnderlyingCursorTxnNumber) { ASSERT_OK(pinnedCursor.getStatus()); // The underlying cursor's txnNumber should be returned. - ASSERT(pinnedCursor.getValue().getTxnNumber()); - ASSERT_EQ(txnNumber, *pinnedCursor.getValue().getTxnNumber()); + ASSERT(pinnedCursor.getValue()->getTxnNumber()); + ASSERT_EQ(txnNumber, *pinnedCursor.getValue()->getTxnNumber()); } } // namespace diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 6592e16a41c..d89049be635 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -366,7 +366,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, */ Status setUpOperationContextStateForGetMore(OperationContext* opCtx, const GetMoreRequest& request, - ClusterCursorManager::PinnedCursor* cursor) { + const ClusterCursorManager::PinnedCursor& cursor) { if (auto readPref = cursor->getReadPreference()) { ReadPreferenceSetting::get(opCtx) = *readPref; } @@ -496,7 +496,7 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx, */ void validateLSID(OperationContext* opCtx, const GetMoreRequest& request, - ClusterCursorManager::PinnedCursor* cursor) { + const ClusterCursorManager::PinnedCursor& cursor) { if (opCtx->getLogicalSessionId() && !cursor->getLsid()) { uasserted(50799, str::stream() << "Cannot run getMore on cursor " << request.cursorid @@ -526,7 +526,7 @@ void validateLSID(OperationContext* opCtx, */ void validateTxnNumber(OperationContext* opCtx, const GetMoreRequest& request, - ClusterCursorManager::PinnedCursor* cursor) { + const ClusterCursorManager::PinnedCursor& cursor) { if (opCtx->getTxnNumber() && !cursor->getTxnNumber()) { uasserted(50802, str::stream() << "Cannot run getMore on cursor " << request.cursorid @@ -559,8 +559,8 @@ void validateOperationSessionInfo(OperationContext* opCtx, ClusterCursorManager::PinnedCursor* cursor) { auto returnCursorGuard = makeGuard( [cursor] { cursor->returnCursor(ClusterCursorManager::CursorState::NotExhausted); }); - validateLSID(opCtx, request, cursor); - validateTxnNumber(opCtx, request, cursor); + validateLSID(opCtx, request, *cursor); + validateTxnNumber(opCtx, request, *cursor); returnCursorGuard.dismiss(); } @@ -586,7 +586,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // Ensure that the client still has the privileges to run the originating command. if (!authzSession->isAuthorizedForPrivileges( - pinnedCursor.getValue().getOriginatingPrivileges())) { + pinnedCursor.getValue()->getOriginatingPrivileges())) { uasserted(ErrorCodes::Unauthorized, str::stream() << "not authorized for getMore with cursor id " << request.cursorid); @@ -594,11 +594,11 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // Set the originatingCommand object and the cursorID in CurOp. { - CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue().getNumRemotes(); + CurOp::get(opCtx)->debug().nShards = pinnedCursor.getValue()->getNumRemotes(); CurOp::get(opCtx)->debug().cursorid = request.cursorid; stdx::lock_guard<Client> lk(*opCtx->getClient()); CurOp::get(opCtx)->setOriginatingCommand_inlock( - pinnedCursor.getValue().getOriginatingCommand()); + pinnedCursor.getValue()->getOriginatingCommand()); CurOp::get(opCtx)->setGenericCursor_inlock(pinnedCursor.getValue().toGenericCursor()); } @@ -612,7 +612,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, } auto opCtxSetupStatus = - setUpOperationContextStateForGetMore(opCtx, request, &pinnedCursor.getValue()); + setUpOperationContextStateForGetMore(opCtx, request, pinnedCursor.getValue()); if (!opCtxSetupStatus.isOK()) { return opCtxSetupStatus; } @@ -620,7 +620,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, std::vector<BSONObj> batch; int bytesBuffered = 0; long long batchSize = request.batchSize.value_or(0); - long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar(); + long long startingFrom = pinnedCursor.getValue()->getNumReturnedSoFar(); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; BSONObj postBatchResumeToken; bool stashedResult = false; @@ -641,7 +641,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, StatusWith<ClusterQueryResult> next = Status{ErrorCodes::InternalError, "uninitialized cluster query result"}; try { - next = pinnedCursor.getValue().next(context); + next = pinnedCursor.getValue()->next(context); } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) { // This exception is thrown when a $changeStream stage encounters an event // that invalidates the cursor. We should close the cursor and return without @@ -659,8 +659,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no // hope of returning data and thus we need to close the mongos cursor as well. - if (!pinnedCursor.getValue().isTailable() || - pinnedCursor.getValue().remotesExhausted()) { + if (!pinnedCursor.getValue()->isTailable() || + pinnedCursor.getValue()->remotesExhausted()) { cursorState = ClusterCursorManager::CursorState::Exhausted; } break; @@ -668,7 +668,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, if (!FindCommon::haveSpaceForNext( *next.getValue().getResult(), batch.size(), bytesBuffered)) { - pinnedCursor.getValue().queueResult(*next.getValue().getResult()); + pinnedCursor.getValue()->queueResult(*next.getValue().getResult()); stashedResult = true; break; } @@ -680,7 +680,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, batch.push_back(std::move(*next.getValue().getResult())); // Update the postBatchResumeToken. For non-$changeStream aggregations, this will be empty. - postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken(); + postBatchResumeToken = pinnedCursor.getValue()->getPostBatchResumeToken(); } // If the cursor has been exhausted, we will communicate this by returning a CursorId of zero. @@ -691,11 +691,11 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, // For empty batches, or in the case where the final result was added to the batch rather than // being stashed, we update the PBRT here to ensure that it is the most recent available. if (idToReturn && !stashedResult) { - postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken(); + postBatchResumeToken = pinnedCursor.getValue()->getPostBatchResumeToken(); } - pinnedCursor.getValue().setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); - pinnedCursor.getValue().incNBatches(); + pinnedCursor.getValue()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); + pinnedCursor.getValue()->incNBatches(); // Upon successful completion, transfer ownership of the cursor back to the cursor manager. If // the cursor has been exhausted, the cursor manager will clean it up for us. pinnedCursor.getValue().returnCursor(cursorState); |