diff options
Diffstat (limited to 'src/mongo')
20 files changed, 280 insertions, 84 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index b246d6d2b5b..9578ac09538 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -277,8 +277,7 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, } batchBuilder.doneFast(); - if (_postBatchResumeToken) { - invariant(!_postBatchResumeToken->isEmpty()); + if (_postBatchResumeToken && !_postBatchResumeToken->isEmpty()) { cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken); } diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 25736f9af2e..119a9bef9ce 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -92,9 +92,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, // since that is not supported we treat boost::none (unspecified) to mean 'kNormal'. _tailableMode(params.getTailableMode().value_or(TailableModeEnum::kNormal)), _params(std::move(params)), - _mergeQueue(MergingComparator(_remotes, - _params.getSort() ? *_params.getSort() : BSONObj(), - _params.getCompareWholeSortKey())) { + _mergeQueue(MergingComparator( + _remotes, _params.getSort().value_or(BSONObj()), _params.getCompareWholeSortKey())), + _promisedMinSortKeys(PromisedMinSortKeyComparator(_params.getSort().value_or(BSONObj()))) { if (params.getTxnNumber()) { invariant(params.getSessionId()); } @@ -110,6 +110,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, _addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse()); ++remoteIndex; } + // If this is a change stream, then we expect to have already received PBRTs from every shard. + invariant(_promisedMinSortKeys.empty() || _promisedMinSortKeys.size() == _remotes.size()); + _highWaterMark = _promisedMinSortKeys.empty() ? BSONObj() : _promisedMinSortKeys.begin()->first; } AsyncResultsMerger::~AsyncResultsMerger() { @@ -175,13 +178,32 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) { void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { stdx::lock_guard<stdx::mutex> lk(_mutex); + // Create a new entry in the '_remotes' list for each new shard, and add the first cursor batch + // to its buffer. This ensures the shard's initial high water mark is respected, if it exists. for (auto&& remote : newCursors) { + const auto newIndex = _remotes.size(); _remotes.emplace_back(remote.getHostAndPort(), remote.getCursorResponse().getNSS(), remote.getCursorResponse().getCursorId()); + _addBatchToBuffer(lk, newIndex, remote.getCursorResponse()); } } +BSONObj AsyncResultsMerger::getHighWaterMark() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto minPromisedSortKey = _getMinPromisedSortKey(lk); + if (!minPromisedSortKey.isEmpty() && !_ready(lk)) { + _highWaterMark = minPromisedSortKey; + } + return _highWaterMark; +} + +BSONObj AsyncResultsMerger::_getMinPromisedSortKey(WithLock) { + // We cannot return the minimum promised sort key unless all shards have reported one. + return _promisedMinSortKeys.size() < _remotes.size() ? BSONObj() + : _promisedMinSortKeys.begin()->first; +} + bool AsyncResultsMerger::_ready(WithLock lk) { if (_lifecycleState != kAlive) { return true; @@ -220,7 +242,7 @@ bool AsyncResultsMerger::_readySorted(WithLock lk) { return true; } -bool AsyncResultsMerger::_readySortedTailable(WithLock) { +bool AsyncResultsMerger::_readySortedTailable(WithLock lk) { if (_mergeQueue.empty()) { return false; } @@ -229,19 +251,10 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) { auto smallestResult = _remotes[smallestRemote].docBuffer.front(); auto keyWeWantToReturn = extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey()); - for (const auto& remote : _remotes) { - if (!remote.promisedMinSortKey) { - // In order to merge sorted tailable cursors, we need this value to be populated. - return false; - } - if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) > - 0) { - // The key we want to return is not guaranteed to be smaller than future results from - // this remote, so we can't yet return it. - return false; - } - } - return true; + // We should always have a minPromisedSortKey from every shard in the sorted tailable case. + auto minPromisedSortKey = _getMinPromisedSortKey(lk); + invariant(!minPromisedSortKey.isEmpty()); + return compareSortKeys(keyWeWantToReturn, minPromisedSortKey, *_params.getSort()) <= 0; } bool AsyncResultsMerger::_readyUnsorted(WithLock) { @@ -301,6 +314,12 @@ ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) { _mergeQueue.push(smallestRemote); } + // For sorted tailable awaitData cursors, update the high water mark to the document's sort key. + if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) { + _highWaterMark = + extractSortKey(*front.getResult(), _params.getCompareWholeSortKey()).getOwned(); + } + return front; } @@ -485,10 +504,12 @@ StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse( return std::move(cursorResponse); } -void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, - const CursorResponse& response) { +void AsyncResultsMerger::_updateRemoteMetadata(WithLock, + size_t remoteIndex, + const CursorResponse& response) { // Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard. - remote->cursorId = response.getCursorId(); + auto& remote = _remotes[remoteIndex]; + remote.cursorId = response.getCursorId(); if (response.getPostBatchResumeToken()) { // We only expect to see this for change streams. invariant(_params.getSort()); @@ -501,15 +522,18 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote, // The most recent minimum sort key should never be smaller than the previous promised // minimum sort key for this remote, if one exists. auto newMinSortKey = *response.getPostBatchResumeToken(); - if (auto& oldMinSortKey = remote->promisedMinSortKey) { + if (auto& oldMinSortKey = remote.promisedMinSortKey) { invariant(compareSortKeys(newMinSortKey, *oldMinSortKey, *_params.getSort()) >= 0); + invariant(_promisedMinSortKeys.size() <= _remotes.size()); + _promisedMinSortKeys.erase({*oldMinSortKey, remoteIndex}); } - remote->promisedMinSortKey = newMinSortKey; + _promisedMinSortKeys.insert({newMinSortKey, remoteIndex}); + remote.promisedMinSortKey = newMinSortKey; } else { // If we don't have a postBatchResumeToken, then we should never have an oplog timestamp. // TODO SERVER-38539: remove this validation when $internalLatestOplogTimestamp is removed. invariant(!response.getLastOplogTimestamp(), - str::stream() << "Host " << remote->shardHostAndPort + str::stream() << "Host " << remote.shardHostAndPort << " returned a cursor which has an oplog timestamp but does not " "have a postBatchResumeToken, suggesting that one or more shards" " are running an older version of MongoDB. This configuration " @@ -621,7 +645,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk, size_t remoteIndex, const CursorResponse& response) { auto& remote = _remotes[remoteIndex]; - updateRemoteMetadata(&remote, response); + _updateRemoteMetadata(lk, remoteIndex, response); for (const auto& obj : response.getBatch()) { // If there's a sort, we're expecting the remote node to have given us back a sort key. if (_params.getSort()) { @@ -771,4 +795,10 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const _sort) > 0; } +bool AsyncResultsMerger::PromisedMinSortKeyComparator::operator()( + const MinSortKeyRemoteIdPair& lhs, const MinSortKeyRemoteIdPair& rhs) const { + auto sortKeyComp = compareSortKeys(lhs.first, rhs.first, _sort); + return sortKeyComp < 0 || (sortKeyComp == 0 && lhs.second < rhs.second); +} + } // namespace mongo diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 87466e15a92..163ff7e643c 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -216,6 +216,14 @@ public: } /** + * For sorted tailable cursors, returns the most recent available sort key. This guarantees that + * we will never return any future results which precede this key. If no results are ready to be + * returned, this method may cause the high water mark to advance to the lowest promised sortkey + * received from the shards. Returns an empty BSONObj if no such sort key is available. + */ + BSONObj getHighWaterMark(); + + /** * Starts shutting down this ARM by canceling all pending requests and scheduling killCursors * on all of the unexhausted remotes. Returns a handle to an event that is signaled when this * ARM is safe to destroy. @@ -317,6 +325,18 @@ private: const bool _compareWholeSortKey; }; + using MinSortKeyRemoteIdPair = std::pair<BSONObj, size_t>; + + class PromisedMinSortKeyComparator { + public: + PromisedMinSortKeyComparator(BSONObj sort) : _sort(std::move(sort)) {} + + bool operator()(const MinSortKeyRemoteIdPair& lhs, const MinSortKeyRemoteIdPair& rhs) const; + + private: + BSONObj _sort; + }; + enum LifecycleState { kAlive, kKillStarted, kKillComplete }; /** @@ -405,6 +425,11 @@ private: */ bool _haveOutstandingBatchRequests(WithLock); + /** + * If a promisedMinSortKey has been obtained from all remotes, returns the lowest such key. + * Otherwise, returns an empty BSONObj. + */ + BSONObj _getMinPromisedSortKey(WithLock); /** * Schedules a getMore on any remote hosts which we need another batch from. @@ -417,9 +442,9 @@ private: void _scheduleKillCursors(WithLock, OperationContext* opCtx); /** - * Updates 'remote's metadata (e.g. the cursor id) based on information in 'response'. + * Updates the given remote's metadata (e.g. the cursor id) based on information in 'response'. */ - void updateRemoteMetadata(RemoteCursorData* remote, const CursorResponse& response); + void _updateRemoteMetadata(WithLock, size_t remoteIndex, const CursorResponse& response); OperationContext* _opCtx; executor::TaskExecutor* _executor; @@ -450,6 +475,13 @@ private: boost::optional<Milliseconds> _awaitDataTimeout; + // An ordered set of (promisedMinSortKey, remoteIndex) pairs received from the shards. The first + // element in the set will be the lowest sort key across all shards. + std::set<MinSortKeyRemoteIdPair, PromisedMinSortKeyComparator> _promisedMinSortKeys; + + // For sorted tailable cursors, records the current high-water-mark sort key. Empty otherwise. + BSONObj _highWaterMark; + // // Killing // diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index ef905803447..b42f4b6c6bb 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -1335,14 +1335,28 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { scheduleNetworkResponses(std::move(responses)); } -TEST_F(AsyncResultsMergerTest, - SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoPostBatchResumeToken) { +DEATH_TEST_F(AsyncResultsMergerTest, + SortedTailableInvariantsIfInitialBatchHasNoPostBatchResumeToken, + "Invariant failure _promisedMinSortKeys.empty() || _promisedMinSortKeys.size() == " + "_remotes.size()") { AsyncResultsMergerParams params; params.setNss(kTestNss); UUID uuid = UUID::gen(); std::vector<RemoteCursor> cursors; - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {}))); + // Create one cursor whose initial response has a postBatchResumeToken. + auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); + auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); + auto firstCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 1}}, $sortKey: {'': '" + << firstDocSortKey.firstElement().String() + << "'}}"); + cursors.push_back(makeRemoteCursor( + kTestShardIds[0], + kTestShardHosts[0], + CursorResponse( + kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor))); + // Create a second cursor whose initial batch has no PBRT. cursors.push_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); params.setRemotes(std::move(cursors)); @@ -1352,57 +1366,10 @@ TEST_F(AsyncResultsMergerTest, stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); auto readyEvent = unittest::assertGet(arm->nextEvent()); - - ASSERT_FALSE(arm->ready()); - - // Schedule one response with a postBatchResumeToken in it. - auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 6)); - auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); - std::vector<CursorResponse> responses; - auto firstCursorResult = fromjson( - str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() - << "', documentKey: {_id: 1}}, $sortKey: {'': '" - << firstDocSortKey.firstElement().String() - << "'}}"); - std::vector<BSONObj> batch1{firstCursorResult}; - responses.emplace_back( - kTestNss, CursorId(123), std::move(batch1), boost::none, boost::none, pbrtFirstCursor); - scheduleNetworkResponses(std::move(responses)); - - // Still shouldn't be ready, we don't have a guarantee from each shard. ASSERT_FALSE(arm->ready()); - // Schedule another response from the other shard with a later postBatchResumeToken. - responses.clear(); - auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 7)); - auto secondDocSortKey = makeResumeToken(Timestamp(1, 5), uuid, BSON("_id" << 2)); - auto secondCursorResult = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: '" - << uuid.toString() + "', documentKey: {_id: 2}}, $sortKey: {'': '" - << secondDocSortKey.firstElement().String() - << "'}}"); - std::vector<BSONObj> batch2{secondCursorResult}; - responses.emplace_back( - kTestNss, CursorId(456), std::move(batch2), boost::none, boost::none, pbrtSecondCursor); - scheduleNetworkResponses(std::move(responses)); - executor()->waitForEvent(readyEvent); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(firstCursorResult, *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_TRUE(arm->ready()); - ASSERT_BSONOBJ_EQ(secondCursorResult, *unittest::assertGet(arm->nextReady()).getResult()); - ASSERT_FALSE(arm->ready()); - - readyEvent = unittest::assertGet(arm->nextEvent()); - - // Clean up the cursors. - responses.clear(); - std::vector<BSONObj> batch3 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses)); - responses.clear(); - std::vector<BSONObj> batch4 = {}; - responses.emplace_back(kTestNss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses)); + // We should be dead by now. + MONGO_UNREACHABLE; } DEATH_TEST_F(AsyncResultsMergerTest, @@ -1718,6 +1685,82 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting scheduleNetworkResponses(std::move(responses)); } +TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey) { + AsyncResultsMergerParams params; + params.setNss(kTestNss); + std::vector<RemoteCursor> cursors; + // Create three cursors with empty initial batches. Each batch has a PBRT. + auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); + cursors.push_back(makeRemoteCursor( + kTestShardIds[0], + kTestShardHosts[0], + CursorResponse(kTestNss, 123, {}, boost::none, boost::none, pbrtFirstCursor))); + auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 1)); + cursors.push_back(makeRemoteCursor( + kTestShardIds[1], + kTestShardHosts[1], + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, pbrtSecondCursor))); + auto pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 4)); + cursors.push_back(makeRemoteCursor( + kTestShardIds[2], + kTestShardHosts[2], + CursorResponse(kTestNss, 789, {}, boost::none, boost::none, pbrtThirdCursor))); + params.setRemotes(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + params.setSort(change_stream_constants::kSortSpec); + auto arm = + stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params)); + + // We have no results to return, so the ARM is not ready. + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // The high water mark should be the second cursor's PBRT, since it is the lowest of the three. + ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor); + + // Advance the PBRT of the second cursor. It should still be the lowest. The fixture expects + // each cursor to be updated in-order, so we keep the first and third PBRTs constant. + pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 3)); + std::vector<BSONObj> emptyBatch = {}; + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); + ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor); + ASSERT_FALSE(arm->ready()); + + // Advance the second cursor again, so that it surpasses the other two. The third cursor becomes + // the new high water mark. + pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6)); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); + ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtThirdCursor); + ASSERT_FALSE(arm->ready()); + + // Advance the third cursor such that the first cursor becomes the high water mark. + pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 7)); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); + ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtFirstCursor); + ASSERT_FALSE(arm->ready()); + + // Clean up the cursors. + std::vector<BSONObj> cleanupBatch = {}; + scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch}); + scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch}); + scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch}); +} + TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); std::vector<RemoteCursor> cursors; diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h index 7d82cdff49c..04421f8c49d 100644 --- a/src/mongo/s/query/blocking_results_merger.h +++ b/src/mongo/s/query/blocking_results_merger.h @@ -71,6 +71,10 @@ public: return _arm.getNumRemotes(); } + BSONObj getHighWaterMark() { + return _arm.getHighWaterMark(); + } + void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) { _arm.addNewShardCursors(std::move(newCursors)); } diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index ef6e1b45caf..568f96021f4 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -35,7 +35,6 @@ #include "mongo/s/query/cluster_aggregate.h" #include <boost/intrusive_ptr.hpp> -#include <mongo/rpc/op_msg_rpc_impls.h> #include "mongo/bson/util/bson_extract.h" #include "mongo/db/auth/authorization_session.h" @@ -58,6 +57,7 @@ #include "mongo/db/write_concern_options.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_commands_helpers.h" @@ -326,6 +326,7 @@ BSONObj establishMergingMongosCursor( options.isInitialResponse = true; CursorResponseBuilder responseBuilder(&replyBuilder, options); + bool stashedResult = false; for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { ClusterQueryResult next; @@ -357,12 +358,21 @@ BSONObj establishMergingMongosCursor( if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { ccc->queueResult(nextObj); + stashedResult = true; break; } + // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty. + responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); responseBuilder.append(nextObj); } + // For empty batches, or in the case where the final result was added to the batch rather than + // being stashed, we update the PBRT here to ensure that it is the most recent available. + if (!stashedResult) { + responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken()); + } + ccc->detachFromOperationContext(); int nShards = ccc->getNumRemotes(); diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 772067748e6..06d7efb4abf 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -118,6 +118,12 @@ public: virtual std::size_t getNumRemotes() const = 0; /** + * Returns the current most-recent resume token for this cursor, or an empty object if this is + * not a $changeStream cursor. + */ + virtual BSONObj getPostBatchResumeToken() const = 0; + + /** * Returns the number of result documents returned so far by this cursor via the next() method. */ virtual long long getNumReturnedSoFar() const = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 097a5ae1e5c..3c620b77133 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -162,6 +162,10 @@ std::size_t ClusterClientCursorImpl::getNumRemotes() const { return _root->getNumRemotes(); } +BSONObj ClusterClientCursorImpl::getPostBatchResumeToken() const { + return _root->getPostBatchResumeToken(); +} + long long ClusterClientCursorImpl::getNumReturnedSoFar() const { return _numReturnedSoFar; } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 06c1bad2fa9..c01cd89dfe3 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -118,6 +118,8 @@ public: std::size_t getNumRemotes() const final; + BSONObj getPostBatchResumeToken() const final; + long long getNumReturnedSoFar() const final; void queueResult(const ClusterQueryResult& result) final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 753c88c7fdb..9fdc048819d 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -75,6 +75,10 @@ std::size_t ClusterClientCursorMock::getNumRemotes() const { MONGO_UNREACHABLE; } +BSONObj ClusterClientCursorMock::getPostBatchResumeToken() const { + MONGO_UNREACHABLE; +} + long long ClusterClientCursorMock::getNumReturnedSoFar() const { return _numReturnedSoFar; } diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index a2286cb9a89..960cb59d218 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -74,6 +74,8 @@ public: std::size_t getNumRemotes() const final; + BSONObj getPostBatchResumeToken() const final; + long long getNumReturnedSoFar() const final; void queueResult(const ClusterQueryResult& result) final; diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 7ba8c456276..8ce0c769651 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -154,6 +154,11 @@ std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const { return _cursor->getNumRemotes(); } +BSONObj ClusterCursorManager::PinnedCursor::getPostBatchResumeToken() const { + invariant(_cursor); + return _cursor->getPostBatchResumeToken(); +} + CursorId ClusterCursorManager::PinnedCursor::getCursorId() const { return _cursorId; } diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index 1a86765cd90..9e7d7a7b61c 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -199,6 +199,11 @@ public: std::size_t getNumRemotes() const; /** + * If applicable, returns the current most-recent resume token for this cursor. + */ + BSONObj getPostBatchResumeToken() const; + + /** * Returns the cursor id for the underlying cursor, or zero if no cursor is owned. */ CursorId getCursorId() const; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 8b893f178f7..9770c4a02be 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -595,6 +595,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, long long batchSize = request.batchSize.value_or(0); long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar(); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; + BSONObj postBatchResumeToken; + bool stashedResult = false; while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { auto context = batch.empty() @@ -632,6 +634,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, if (!FindCommon::haveSpaceForNext( *next.getValue().getResult(), batch.size(), bytesBuffered)) { pinnedCursor.getValue().queueResult(*next.getValue().getResult()); + stashedResult = true; break; } @@ -640,6 +643,15 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, bytesBuffered += (next.getValue().getResult()->objsize() + kPerDocumentOverheadBytesUpperBound); batch.push_back(std::move(*next.getValue().getResult())); + + // Update the postBatchResumeToken. For non-$changeStream aggregations, this will be empty. + postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken(); + } + + // For empty batches, or in the case where the final result was added to the batch rather than + // being stashed, we update the PBRT here to ensure that it is the most recent available. + if (!stashedResult) { + postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken(); } pinnedCursor.getValue().setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); @@ -663,7 +675,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"); } - return CursorResponse(request.nss, idToReturn, std::move(batch), startingFrom); + return CursorResponse( + request.nss, idToReturn, std::move(batch), startingFrom, boost::none, postBatchResumeToken); } } // namespace mongo diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp index fbe93ea1b53..53be78aea27 100644 --- a/src/mongo/s/query/document_source_merge_cursors.cpp +++ b/src/mongo/s/query/document_source_merge_cursors.cpp @@ -63,6 +63,13 @@ std::size_t DocumentSourceMergeCursors::getNumRemotes() const { return _blockingResultsMerger->getNumRemotes(); } +BSONObj DocumentSourceMergeCursors::getHighWaterMark() { + if (!_blockingResultsMerger) { + populateMerger(); + } + return _blockingResultsMerger->getHighWaterMark(); +} + bool DocumentSourceMergeCursors::remotesExhausted() const { if (_armParams) { // We haven't started iteration yet. diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index 5249da51a8e..f733d2c3088 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -99,6 +99,13 @@ public: std::size_t getNumRemotes() const; + /** + * Returns the high water mark sort key for the given cursor, if it exists; otherwise, returns + * an empty BSONObj. Calling this method causes the underlying BlockingResultsMerger to be + * populated and assumes ownership of the remote cursors. + */ + BSONObj getHighWaterMark(); + bool remotesExhausted() const; void setExecContext(RouterExecStage::ExecContext execContext) { diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h index 2db6b564751..7b714c568ae 100644 --- a/src/mongo/s/query/results_merger_test_fixture.h +++ b/src/mongo/s/query/results_merger_test_fixture.h @@ -153,6 +153,15 @@ protected: } /** + * Schedules a single cursor response to be returned by the mock network. + */ + void scheduleNetworkResponse(CursorResponse&& response) { + std::vector<CursorResponse> responses; + responses.push_back(std::move(response)); + scheduleNetworkResponses(std::move(responses)); + } + + /** * Schedules a list of raw BSON command responses to be returned by the mock network. */ void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) { diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index dbcc156d0d5..5f45baf8052 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -124,6 +124,14 @@ public: } /** + * Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream; + * otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child. + */ + virtual BSONObj getPostBatchResumeToken() const { + return _child ? _child->getPostBatchResumeToken() : BSONObj(); + } + + /** * Sets the current operation context to be used by the router stage. */ void reattachToOperationContext(OperationContext* opCtx) { @@ -181,7 +189,7 @@ protected: /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. */ - RouterExecStage* getChildStage() { + RouterExecStage* getChildStage() const { return _child.get(); } diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp index b00d7d2cdf2..f79812092c3 100644 --- a/src/mongo/s/query/router_stage_pipeline.cpp +++ b/src/mongo/s/query/router_stage_pipeline.cpp @@ -87,6 +87,10 @@ std::size_t RouterStagePipeline::getNumRemotes() const { return 0; } +BSONObj RouterStagePipeline::getPostBatchResumeToken() const { + return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj(); +} + bool RouterStagePipeline::remotesExhausted() { return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted(); } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index 0efe1b0aa1d..7afc6bfd19f 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -54,6 +54,8 @@ public: std::size_t getNumRemotes() const final; + BSONObj getPostBatchResumeToken() const final; + protected: Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final; |