summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-11-27 03:50:42 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-01-09 07:16:40 +0000
commit560a0c3a3e20924b362fc2b159c30255d62e81d2 (patch)
tree18a082ebad4c1e50f988025e8a53036836175400 /src/mongo
parentec104311f774165b5b77b41b78c89e4f29baaca9 (diff)
downloadmongo-560a0c3a3e20924b362fc2b159c30255d62e81d2.tar.gz
SERVER-38411 Propagate postBatchResumeToken through mongoS to client
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/query/cursor_response.cpp3
-rw-r--r--src/mongo/s/query/async_results_merger.cpp78
-rw-r--r--src/mongo/s/query/async_results_merger.h36
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp149
-rw-r--r--src/mongo/s/query/blocking_results_merger.h4
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp12
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h5
-rw-r--r--src/mongo/s/query/cluster_find.cpp15
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.cpp7
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h7
-rw-r--r--src/mongo/s/query/results_merger_test_fixture.h9
-rw-r--r--src/mongo/s/query/router_exec_stage.h10
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp4
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h2
20 files changed, 280 insertions, 84 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index b246d6d2b5b..9578ac09538 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -277,8 +277,7 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
}
batchBuilder.doneFast();
- if (_postBatchResumeToken) {
- invariant(!_postBatchResumeToken->isEmpty());
+ if (_postBatchResumeToken && !_postBatchResumeToken->isEmpty()) {
cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken);
}
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 25736f9af2e..119a9bef9ce 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -92,9 +92,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
// since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
_tailableMode(params.getTailableMode().value_or(TailableModeEnum::kNormal)),
_params(std::move(params)),
- _mergeQueue(MergingComparator(_remotes,
- _params.getSort() ? *_params.getSort() : BSONObj(),
- _params.getCompareWholeSortKey())) {
+ _mergeQueue(MergingComparator(
+ _remotes, _params.getSort().value_or(BSONObj()), _params.getCompareWholeSortKey())),
+ _promisedMinSortKeys(PromisedMinSortKeyComparator(_params.getSort().value_or(BSONObj()))) {
if (params.getTxnNumber()) {
invariant(params.getSessionId());
}
@@ -110,6 +110,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
_addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
++remoteIndex;
}
+ // If this is a change stream, then we expect to have already received PBRTs from every shard.
+ invariant(_promisedMinSortKeys.empty() || _promisedMinSortKeys.size() == _remotes.size());
+ _highWaterMark = _promisedMinSortKeys.empty() ? BSONObj() : _promisedMinSortKeys.begin()->first;
}
AsyncResultsMerger::~AsyncResultsMerger() {
@@ -175,13 +178,32 @@ void AsyncResultsMerger::reattachToOperationContext(OperationContext* opCtx) {
void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Create a new entry in the '_remotes' list for each new shard, and add the first cursor batch
+ // to its buffer. This ensures the shard's initial high water mark is respected, if it exists.
for (auto&& remote : newCursors) {
+ const auto newIndex = _remotes.size();
_remotes.emplace_back(remote.getHostAndPort(),
remote.getCursorResponse().getNSS(),
remote.getCursorResponse().getCursorId());
+ _addBatchToBuffer(lk, newIndex, remote.getCursorResponse());
}
}
+BSONObj AsyncResultsMerger::getHighWaterMark() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto minPromisedSortKey = _getMinPromisedSortKey(lk);
+ if (!minPromisedSortKey.isEmpty() && !_ready(lk)) {
+ _highWaterMark = minPromisedSortKey;
+ }
+ return _highWaterMark;
+}
+
+BSONObj AsyncResultsMerger::_getMinPromisedSortKey(WithLock) {
+ // We cannot return the minimum promised sort key unless all shards have reported one.
+ return _promisedMinSortKeys.size() < _remotes.size() ? BSONObj()
+ : _promisedMinSortKeys.begin()->first;
+}
+
bool AsyncResultsMerger::_ready(WithLock lk) {
if (_lifecycleState != kAlive) {
return true;
@@ -220,7 +242,7 @@ bool AsyncResultsMerger::_readySorted(WithLock lk) {
return true;
}
-bool AsyncResultsMerger::_readySortedTailable(WithLock) {
+bool AsyncResultsMerger::_readySortedTailable(WithLock lk) {
if (_mergeQueue.empty()) {
return false;
}
@@ -229,19 +251,10 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) {
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
auto keyWeWantToReturn =
extractSortKey(*smallestResult.getResult(), _params.getCompareWholeSortKey());
- for (const auto& remote : _remotes) {
- if (!remote.promisedMinSortKey) {
- // In order to merge sorted tailable cursors, we need this value to be populated.
- return false;
- }
- if (compareSortKeys(keyWeWantToReturn, *remote.promisedMinSortKey, *_params.getSort()) >
- 0) {
- // The key we want to return is not guaranteed to be smaller than future results from
- // this remote, so we can't yet return it.
- return false;
- }
- }
- return true;
+ // We should always have a minPromisedSortKey from every shard in the sorted tailable case.
+ auto minPromisedSortKey = _getMinPromisedSortKey(lk);
+ invariant(!minPromisedSortKey.isEmpty());
+ return compareSortKeys(keyWeWantToReturn, minPromisedSortKey, *_params.getSort()) <= 0;
}
bool AsyncResultsMerger::_readyUnsorted(WithLock) {
@@ -301,6 +314,12 @@ ClusterQueryResult AsyncResultsMerger::_nextReadySorted(WithLock) {
_mergeQueue.push(smallestRemote);
}
+ // For sorted tailable awaitData cursors, update the high water mark to the document's sort key.
+ if (_tailableMode == TailableModeEnum::kTailableAndAwaitData) {
+ _highWaterMark =
+ extractSortKey(*front.getResult(), _params.getCompareWholeSortKey()).getOwned();
+ }
+
return front;
}
@@ -485,10 +504,12 @@ StatusWith<CursorResponse> AsyncResultsMerger::_parseCursorResponse(
return std::move(cursorResponse);
}
-void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
- const CursorResponse& response) {
+void AsyncResultsMerger::_updateRemoteMetadata(WithLock,
+ size_t remoteIndex,
+ const CursorResponse& response) {
// Update the cursorId; it is sent as '0' when the cursor has been exhausted on the shard.
- remote->cursorId = response.getCursorId();
+ auto& remote = _remotes[remoteIndex];
+ remote.cursorId = response.getCursorId();
if (response.getPostBatchResumeToken()) {
// We only expect to see this for change streams.
invariant(_params.getSort());
@@ -501,15 +522,18 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
// The most recent minimum sort key should never be smaller than the previous promised
// minimum sort key for this remote, if one exists.
auto newMinSortKey = *response.getPostBatchResumeToken();
- if (auto& oldMinSortKey = remote->promisedMinSortKey) {
+ if (auto& oldMinSortKey = remote.promisedMinSortKey) {
invariant(compareSortKeys(newMinSortKey, *oldMinSortKey, *_params.getSort()) >= 0);
+ invariant(_promisedMinSortKeys.size() <= _remotes.size());
+ _promisedMinSortKeys.erase({*oldMinSortKey, remoteIndex});
}
- remote->promisedMinSortKey = newMinSortKey;
+ _promisedMinSortKeys.insert({newMinSortKey, remoteIndex});
+ remote.promisedMinSortKey = newMinSortKey;
} else {
// If we don't have a postBatchResumeToken, then we should never have an oplog timestamp.
// TODO SERVER-38539: remove this validation when $internalLatestOplogTimestamp is removed.
invariant(!response.getLastOplogTimestamp(),
- str::stream() << "Host " << remote->shardHostAndPort
+ str::stream() << "Host " << remote.shardHostAndPort
<< " returned a cursor which has an oplog timestamp but does not "
"have a postBatchResumeToken, suggesting that one or more shards"
" are running an older version of MongoDB. This configuration "
@@ -621,7 +645,7 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
size_t remoteIndex,
const CursorResponse& response) {
auto& remote = _remotes[remoteIndex];
- updateRemoteMetadata(&remote, response);
+ _updateRemoteMetadata(lk, remoteIndex, response);
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
if (_params.getSort()) {
@@ -771,4 +795,10 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const
_sort) > 0;
}
+bool AsyncResultsMerger::PromisedMinSortKeyComparator::operator()(
+ const MinSortKeyRemoteIdPair& lhs, const MinSortKeyRemoteIdPair& rhs) const {
+ auto sortKeyComp = compareSortKeys(lhs.first, rhs.first, _sort);
+ return sortKeyComp < 0 || (sortKeyComp == 0 && lhs.second < rhs.second);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 87466e15a92..163ff7e643c 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -216,6 +216,14 @@ public:
}
/**
+ * For sorted tailable cursors, returns the most recent available sort key. This guarantees that
+ * we will never return any future results which precede this key. If no results are ready to be
+ * returned, this method may cause the high water mark to advance to the lowest promised sortkey
+ * received from the shards. Returns an empty BSONObj if no such sort key is available.
+ */
+ BSONObj getHighWaterMark();
+
+ /**
* Starts shutting down this ARM by canceling all pending requests and scheduling killCursors
* on all of the unexhausted remotes. Returns a handle to an event that is signaled when this
* ARM is safe to destroy.
@@ -317,6 +325,18 @@ private:
const bool _compareWholeSortKey;
};
+ using MinSortKeyRemoteIdPair = std::pair<BSONObj, size_t>;
+
+ class PromisedMinSortKeyComparator {
+ public:
+ PromisedMinSortKeyComparator(BSONObj sort) : _sort(std::move(sort)) {}
+
+ bool operator()(const MinSortKeyRemoteIdPair& lhs, const MinSortKeyRemoteIdPair& rhs) const;
+
+ private:
+ BSONObj _sort;
+ };
+
enum LifecycleState { kAlive, kKillStarted, kKillComplete };
/**
@@ -405,6 +425,11 @@ private:
*/
bool _haveOutstandingBatchRequests(WithLock);
+ /**
+ * If a promisedMinSortKey has been obtained from all remotes, returns the lowest such key.
+ * Otherwise, returns an empty BSONObj.
+ */
+ BSONObj _getMinPromisedSortKey(WithLock);
/**
* Schedules a getMore on any remote hosts which we need another batch from.
@@ -417,9 +442,9 @@ private:
void _scheduleKillCursors(WithLock, OperationContext* opCtx);
/**
- * Updates 'remote's metadata (e.g. the cursor id) based on information in 'response'.
+ * Updates the given remote's metadata (e.g. the cursor id) based on information in 'response'.
*/
- void updateRemoteMetadata(RemoteCursorData* remote, const CursorResponse& response);
+ void _updateRemoteMetadata(WithLock, size_t remoteIndex, const CursorResponse& response);
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
@@ -450,6 +475,13 @@ private:
boost::optional<Milliseconds> _awaitDataTimeout;
+ // An ordered set of (promisedMinSortKey, remoteIndex) pairs received from the shards. The first
+ // element in the set will be the lowest sort key across all shards.
+ std::set<MinSortKeyRemoteIdPair, PromisedMinSortKeyComparator> _promisedMinSortKeys;
+
+ // For sorted tailable cursors, records the current high-water-mark sort key. Empty otherwise.
+ BSONObj _highWaterMark;
+
//
// Killing
//
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index ef905803447..b42f4b6c6bb 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -1335,14 +1335,28 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
scheduleNetworkResponses(std::move(responses));
}
-TEST_F(AsyncResultsMergerTest,
- SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoPostBatchResumeToken) {
+DEATH_TEST_F(AsyncResultsMergerTest,
+ SortedTailableInvariantsIfInitialBatchHasNoPostBatchResumeToken,
+ "Invariant failure _promisedMinSortKeys.empty() || _promisedMinSortKeys.size() == "
+ "_remotes.size()") {
AsyncResultsMergerParams params;
params.setNss(kTestNss);
UUID uuid = UUID::gen();
std::vector<RemoteCursor> cursors;
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 123, {})));
+ // Create one cursor whose initial response has a postBatchResumeToken.
+ auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
+ auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
+ auto firstCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 1}}, $sortKey: {'': '"
+ << firstDocSortKey.firstElement().String()
+ << "'}}");
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[0],
+ kTestShardHosts[0],
+ CursorResponse(
+ kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor)));
+ // Create a second cursor whose initial batch has no PBRT.
cursors.push_back(
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
params.setRemotes(std::move(cursors));
@@ -1352,57 +1366,10 @@ TEST_F(AsyncResultsMergerTest,
stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
auto readyEvent = unittest::assertGet(arm->nextEvent());
-
- ASSERT_FALSE(arm->ready());
-
- // Schedule one response with a postBatchResumeToken in it.
- auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 6));
- auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
- std::vector<CursorResponse> responses;
- auto firstCursorResult = fromjson(
- str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
- << "', documentKey: {_id: 1}}, $sortKey: {'': '"
- << firstDocSortKey.firstElement().String()
- << "'}}");
- std::vector<BSONObj> batch1{firstCursorResult};
- responses.emplace_back(
- kTestNss, CursorId(123), std::move(batch1), boost::none, boost::none, pbrtFirstCursor);
- scheduleNetworkResponses(std::move(responses));
-
- // Still shouldn't be ready, we don't have a guarantee from each shard.
ASSERT_FALSE(arm->ready());
- // Schedule another response from the other shard with a later postBatchResumeToken.
- responses.clear();
- auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 7));
- auto secondDocSortKey = makeResumeToken(Timestamp(1, 5), uuid, BSON("_id" << 2));
- auto secondCursorResult =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: '"
- << uuid.toString() + "', documentKey: {_id: 2}}, $sortKey: {'': '"
- << secondDocSortKey.firstElement().String()
- << "'}}");
- std::vector<BSONObj> batch2{secondCursorResult};
- responses.emplace_back(
- kTestNss, CursorId(456), std::move(batch2), boost::none, boost::none, pbrtSecondCursor);
- scheduleNetworkResponses(std::move(responses));
- executor()->waitForEvent(readyEvent);
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(firstCursorResult, *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_TRUE(arm->ready());
- ASSERT_BSONOBJ_EQ(secondCursorResult, *unittest::assertGet(arm->nextReady()).getResult());
- ASSERT_FALSE(arm->ready());
-
- readyEvent = unittest::assertGet(arm->nextEvent());
-
- // Clean up the cursors.
- responses.clear();
- std::vector<BSONObj> batch3 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch3);
- scheduleNetworkResponses(std::move(responses));
- responses.clear();
- std::vector<BSONObj> batch4 = {};
- responses.emplace_back(kTestNss, CursorId(0), batch4);
- scheduleNetworkResponses(std::move(responses));
+ // We should be dead by now.
+ MONGO_UNREACHABLE;
}
DEATH_TEST_F(AsyncResultsMergerTest,
@@ -1718,6 +1685,82 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
scheduleNetworkResponses(std::move(responses));
}
+TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey) {
+ AsyncResultsMergerParams params;
+ params.setNss(kTestNss);
+ std::vector<RemoteCursor> cursors;
+ // Create three cursors with empty initial batches. Each batch has a PBRT.
+ auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[0],
+ kTestShardHosts[0],
+ CursorResponse(kTestNss, 123, {}, boost::none, boost::none, pbrtFirstCursor)));
+ auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 1));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[1],
+ kTestShardHosts[1],
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, pbrtSecondCursor)));
+ auto pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 4));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[2],
+ kTestShardHosts[2],
+ CursorResponse(kTestNss, 789, {}, boost::none, boost::none, pbrtThirdCursor)));
+ params.setRemotes(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ params.setSort(change_stream_constants::kSortSpec);
+ auto arm =
+ stdx::make_unique<AsyncResultsMerger>(operationContext(), executor(), std::move(params));
+
+ // We have no results to return, so the ARM is not ready.
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // The high water mark should be the second cursor's PBRT, since it is the lowest of the three.
+ ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor);
+
+ // Advance the PBRT of the second cursor. It should still be the lowest. The fixture expects
+ // each cursor to be updated in-order, so we keep the first and third PBRTs constant.
+ pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 3));
+ std::vector<BSONObj> emptyBatch = {};
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
+ ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor);
+ ASSERT_FALSE(arm->ready());
+
+ // Advance the second cursor again, so that it surpasses the other two. The third cursor becomes
+ // the new high water mark.
+ pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6));
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
+ ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtThirdCursor);
+ ASSERT_FALSE(arm->ready());
+
+ // Advance the third cursor such that the first cursor becomes the high water mark.
+ pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 7));
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
+ ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtFirstCursor);
+ ASSERT_FALSE(arm->ready());
+
+ // Clean up the cursors.
+ std::vector<BSONObj> cleanupBatch = {};
+ scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch});
+ scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch});
+ scheduleNetworkResponse({kTestNss, CursorId(0), cleanupBatch});
+}
+
TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
BSONObj findCmd = fromjson("{find: 'testcoll'}");
std::vector<RemoteCursor> cursors;
diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h
index 7d82cdff49c..04421f8c49d 100644
--- a/src/mongo/s/query/blocking_results_merger.h
+++ b/src/mongo/s/query/blocking_results_merger.h
@@ -71,6 +71,10 @@ public:
return _arm.getNumRemotes();
}
+ BSONObj getHighWaterMark() {
+ return _arm.getHighWaterMark();
+ }
+
void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
_arm.addNewShardCursors(std::move(newCursors));
}
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index ef6e1b45caf..568f96021f4 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -35,7 +35,6 @@
#include "mongo/s/query/cluster_aggregate.h"
#include <boost/intrusive_ptr.hpp>
-#include <mongo/rpc/op_msg_rpc_impls.h>
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/authorization_session.h"
@@ -58,6 +57,7 @@
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -326,6 +326,7 @@ BSONObj establishMergingMongosCursor(
options.isInitialResponse = true;
CursorResponseBuilder responseBuilder(&replyBuilder, options);
+ bool stashedResult = false;
for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
ClusterQueryResult next;
@@ -357,12 +358,21 @@ BSONObj establishMergingMongosCursor(
if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
ccc->queueResult(nextObj);
+ stashedResult = true;
break;
}
+ // Set the postBatchResumeToken. For non-$changeStream aggregations, this will be empty.
+ responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
responseBuilder.append(nextObj);
}
+ // For empty batches, or in the case where the final result was added to the batch rather than
+ // being stashed, we update the PBRT here to ensure that it is the most recent available.
+ if (!stashedResult) {
+ responseBuilder.setPostBatchResumeToken(ccc->getPostBatchResumeToken());
+ }
+
ccc->detachFromOperationContext();
int nShards = ccc->getNumRemotes();
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 772067748e6..06d7efb4abf 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -118,6 +118,12 @@ public:
virtual std::size_t getNumRemotes() const = 0;
/**
+ * Returns the current most-recent resume token for this cursor, or an empty object if this is
+ * not a $changeStream cursor.
+ */
+ virtual BSONObj getPostBatchResumeToken() const = 0;
+
+ /**
* Returns the number of result documents returned so far by this cursor via the next() method.
*/
virtual long long getNumReturnedSoFar() const = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 097a5ae1e5c..3c620b77133 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -162,6 +162,10 @@ std::size_t ClusterClientCursorImpl::getNumRemotes() const {
return _root->getNumRemotes();
}
+BSONObj ClusterClientCursorImpl::getPostBatchResumeToken() const {
+ return _root->getPostBatchResumeToken();
+}
+
long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 06c1bad2fa9..c01cd89dfe3 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -118,6 +118,8 @@ public:
std::size_t getNumRemotes() const final;
+ BSONObj getPostBatchResumeToken() const final;
+
long long getNumReturnedSoFar() const final;
void queueResult(const ClusterQueryResult& result) final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 753c88c7fdb..9fdc048819d 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -75,6 +75,10 @@ std::size_t ClusterClientCursorMock::getNumRemotes() const {
MONGO_UNREACHABLE;
}
+BSONObj ClusterClientCursorMock::getPostBatchResumeToken() const {
+ MONGO_UNREACHABLE;
+}
+
long long ClusterClientCursorMock::getNumReturnedSoFar() const {
return _numReturnedSoFar;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index a2286cb9a89..960cb59d218 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -74,6 +74,8 @@ public:
std::size_t getNumRemotes() const final;
+ BSONObj getPostBatchResumeToken() const final;
+
long long getNumReturnedSoFar() const final;
void queueResult(const ClusterQueryResult& result) final;
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 7ba8c456276..8ce0c769651 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -154,6 +154,11 @@ std::size_t ClusterCursorManager::PinnedCursor::getNumRemotes() const {
return _cursor->getNumRemotes();
}
+BSONObj ClusterCursorManager::PinnedCursor::getPostBatchResumeToken() const {
+ invariant(_cursor);
+ return _cursor->getPostBatchResumeToken();
+}
+
CursorId ClusterCursorManager::PinnedCursor::getCursorId() const {
return _cursorId;
}
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index 1a86765cd90..9e7d7a7b61c 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -199,6 +199,11 @@ public:
std::size_t getNumRemotes() const;
/**
+ * If applicable, returns the current most-recent resume token for this cursor.
+ */
+ BSONObj getPostBatchResumeToken() const;
+
+ /**
* Returns the cursor id for the underlying cursor, or zero if no cursor is owned.
*/
CursorId getCursorId() const;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 8b893f178f7..9770c4a02be 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -595,6 +595,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
long long batchSize = request.batchSize.value_or(0);
long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar();
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
+ BSONObj postBatchResumeToken;
+ bool stashedResult = false;
while (!FindCommon::enoughForGetMore(batchSize, batch.size())) {
auto context = batch.empty()
@@ -632,6 +634,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
if (!FindCommon::haveSpaceForNext(
*next.getValue().getResult(), batch.size(), bytesBuffered)) {
pinnedCursor.getValue().queueResult(*next.getValue().getResult());
+ stashedResult = true;
break;
}
@@ -640,6 +643,15 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
bytesBuffered +=
(next.getValue().getResult()->objsize() + kPerDocumentOverheadBytesUpperBound);
batch.push_back(std::move(*next.getValue().getResult()));
+
+ // Update the postBatchResumeToken. For non-$changeStream aggregations, this will be empty.
+ postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken();
+ }
+
+ // For empty batches, or in the case where the final result was added to the batch rather than
+ // being stashed, we update the PBRT here to ensure that it is the most recent available.
+ if (!stashedResult) {
+ postBatchResumeToken = pinnedCursor.getValue().getPostBatchResumeToken();
}
pinnedCursor.getValue().setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
@@ -663,7 +675,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
"waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch");
}
- return CursorResponse(request.nss, idToReturn, std::move(batch), startingFrom);
+ return CursorResponse(
+ request.nss, idToReturn, std::move(batch), startingFrom, boost::none, postBatchResumeToken);
}
} // namespace mongo
diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp
index fbe93ea1b53..53be78aea27 100644
--- a/src/mongo/s/query/document_source_merge_cursors.cpp
+++ b/src/mongo/s/query/document_source_merge_cursors.cpp
@@ -63,6 +63,13 @@ std::size_t DocumentSourceMergeCursors::getNumRemotes() const {
return _blockingResultsMerger->getNumRemotes();
}
+BSONObj DocumentSourceMergeCursors::getHighWaterMark() {
+ if (!_blockingResultsMerger) {
+ populateMerger();
+ }
+ return _blockingResultsMerger->getHighWaterMark();
+}
+
bool DocumentSourceMergeCursors::remotesExhausted() const {
if (_armParams) {
// We haven't started iteration yet.
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
index 5249da51a8e..f733d2c3088 100644
--- a/src/mongo/s/query/document_source_merge_cursors.h
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -99,6 +99,13 @@ public:
std::size_t getNumRemotes() const;
+ /**
+ * Returns the high water mark sort key for the given cursor, if it exists; otherwise, returns
+ * an empty BSONObj. Calling this method causes the underlying BlockingResultsMerger to be
+ * populated and assumes ownership of the remote cursors.
+ */
+ BSONObj getHighWaterMark();
+
bool remotesExhausted() const;
void setExecContext(RouterExecStage::ExecContext execContext) {
diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h
index 2db6b564751..7b714c568ae 100644
--- a/src/mongo/s/query/results_merger_test_fixture.h
+++ b/src/mongo/s/query/results_merger_test_fixture.h
@@ -153,6 +153,15 @@ protected:
}
/**
+ * Schedules a single cursor response to be returned by the mock network.
+ */
+ void scheduleNetworkResponse(CursorResponse&& response) {
+ std::vector<CursorResponse> responses;
+ responses.push_back(std::move(response));
+ scheduleNetworkResponses(std::move(responses));
+ }
+
+ /**
* Schedules a list of raw BSON command responses to be returned by the mock network.
*/
void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) {
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index dbcc156d0d5..5f45baf8052 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -124,6 +124,14 @@ public:
}
/**
+ * Returns the postBatchResumeToken if this RouterExecStage tree is executing a $changeStream;
+ * otherwise, returns an empty BSONObj. Default implementation forwards to the stage's child.
+ */
+ virtual BSONObj getPostBatchResumeToken() const {
+ return _child ? _child->getPostBatchResumeToken() : BSONObj();
+ }
+
+ /**
* Sets the current operation context to be used by the router stage.
*/
void reattachToOperationContext(OperationContext* opCtx) {
@@ -181,7 +189,7 @@ protected:
/**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
*/
- RouterExecStage* getChildStage() {
+ RouterExecStage* getChildStage() const {
return _child.get();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
index b00d7d2cdf2..f79812092c3 100644
--- a/src/mongo/s/query/router_stage_pipeline.cpp
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -87,6 +87,10 @@ std::size_t RouterStagePipeline::getNumRemotes() const {
return 0;
}
+BSONObj RouterStagePipeline::getPostBatchResumeToken() const {
+ return _mergeCursorsStage ? _mergeCursorsStage->getHighWaterMark() : BSONObj();
+}
+
bool RouterStagePipeline::remotesExhausted() {
return !_mergeCursorsStage || _mergeCursorsStage->remotesExhausted();
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index 0efe1b0aa1d..7afc6bfd19f 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -54,6 +54,8 @@ public:
std::size_t getNumRemotes() const final;
+ BSONObj getPostBatchResumeToken() const final;
+
protected:
Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;