summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2019-10-29 16:27:46 +0000
committerevergreen <evergreen@mongodb.com>2019-10-29 16:27:46 +0000
commite14dbefec5fb18a7e9fc8739d3ef529bb1338ab4 (patch)
tree20d3c6259b884a1689b47dd3b6ec17f7fc2b1412 /src/mongo/s/query/async_results_merger.cpp
parent29cd14ad598a3594529201fa095cc1fa3dc68e07 (diff)
downloadmongo-e14dbefec5fb18a7e9fc8739d3ef529bb1338ab4.tar.gz
SERVER-42713 Consistent sort key format for change streams pipelines
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp11
1 files changed, 8 insertions, 3 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index ae6aaf092b4..51bbcf84db4 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -192,7 +192,10 @@ BSONObj AsyncResultsMerger::getHighWaterMark() {
stdx::lock_guard<Latch> lk(_mutex);
auto minPromisedSortKey = _getMinPromisedSortKey(lk);
if (!minPromisedSortKey.isEmpty() && !_ready(lk)) {
- _highWaterMark = minPromisedSortKey;
+ // When 'minPromisedSortKey' contains the "high watermark" resume token, it's stored in
+ // sort-key format: {"": <high watermark>}. We copy the <high watermark> part of of the
+ // sort key, which looks like {_data: ..., _typeBits: ...}, and return that.
+ _highWaterMark = minPromisedSortKey.firstElement().Obj().getOwned();
}
return _highWaterMark;
}
@@ -519,8 +522,10 @@ void AsyncResultsMerger::_updateRemoteMetadata(WithLock,
invariant(!response.getPostBatchResumeToken()->isEmpty());
// The most recent minimum sort key should never be smaller than the previous promised
- // minimum sort key for this remote, if one exists.
- auto newMinSortKey = *response.getPostBatchResumeToken();
+ // minimum sort key for this remote, if one exists. Note that the post-batch resume token is
+ // an object (with format {_data: ..., _typeBits: ...}) that we must wrap in a sort key so
+ // that it can compare correctly with sort keys from other streams.
+ auto newMinSortKey = BSON("" << *response.getPostBatchResumeToken());
if (auto& oldMinSortKey = remote.promisedMinSortKey) {
invariant(compareSortKeys(newMinSortKey, *oldMinSortKey, *_params.getSort()) >= 0);
invariant(_promisedMinSortKeys.size() <= _remotes.size());