summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2019-10-29 16:27:46 +0000
committerevergreen <evergreen@mongodb.com>2019-10-29 16:27:46 +0000
commite14dbefec5fb18a7e9fc8739d3ef529bb1338ab4 (patch)
tree20d3c6259b884a1689b47dd3b6ec17f7fc2b1412 /src
parent29cd14ad598a3594529201fa095cc1fa3dc68e07 (diff)
downloadmongo-e14dbefec5fb18a7e9fc8739d3ef529bb1338ab4.tar.gz
SERVER-42713 Consistent sort key format for change streams pipelines
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/exec/document_value/document.cpp31
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp27
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h13
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp9
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp6
-rw-r--r--src/mongo/s/query/async_results_merger.cpp11
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp56
7 files changed, 78 insertions, 75 deletions
diff --git a/src/mongo/db/exec/document_value/document.cpp b/src/mongo/db/exec/document_value/document.cpp
index 2f6660bd568..c50648c0e20 100644
--- a/src/mongo/db/exec/document_value/document.cpp
+++ b/src/mongo/db/exec/document_value/document.cpp
@@ -383,35 +383,14 @@ void DocumentStorage::loadLazyMetadata() const {
} else if (fieldName == Document::metaFieldSortKey) {
auto bsonSortKey = elem.Obj();
- bool isSingleElementKey = false;
-
+ // If the sort key has exactly one field, we say it is a "single element key."
BSONObjIterator sortKeyIt(bsonSortKey);
uassert(31282, "Empty sort key in metadata", sortKeyIt.more());
- auto firstElementName = sortKeyIt.next().fieldNameStringData();
-
- // If the sort key has exactly one field, we say it is a "single element key."
- boost::optional<StringData> secondElementName;
- if (!sortKeyIt.more()) {
- isSingleElementKey = true;
- } else {
- secondElementName = sortKeyIt.next().fieldNameStringData();
- }
+ bool isSingleElementKey = !(++sortKeyIt).more();
- // If the sort key looks like {_data: ...} or {_data: ..., _typeBits: ...}, we know
- // that it came from a change stream, and we also treat it as a "single element
- // key."
- if (!sortKeyIt.more() && (firstElementName == ResumeToken::kDataFieldName) &&
- (!secondElementName || secondElementName == ResumeToken::kTypeBitsFieldName)) {
- // TODO (SERVER-43361): In 4.2 and earlier, the "sort key" for a change stream
- // document gets serialized differently than sort keys for normal pipeline
- // documents.
- isSingleElementKey = true;
- _metadataFields.setSortKey(Value(bsonSortKey), isSingleElementKey);
- } else {
- _metadataFields.setSortKey(
- DocumentMetadataFields::deserializeSortKey(isSingleElementKey, bsonSortKey),
- isSingleElementKey);
- }
+ _metadataFields.setSortKey(
+ DocumentMetadataFields::deserializeSortKey(isSingleElementKey, bsonSortKey),
+ isSingleElementKey);
} else if (fieldName == Document::metaFieldGeoNearDistance) {
_metadataFields.setGeoNearDistance(elem.Double());
} else if (fieldName == Document::metaFieldGeoNearPoint) {
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 626867a499e..bea15219efe 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -48,20 +48,6 @@
namespace mongo {
-constexpr StringData AggregationRequest::kCommandName;
-constexpr StringData AggregationRequest::kCursorName;
-constexpr StringData AggregationRequest::kBatchSizeName;
-constexpr StringData AggregationRequest::kFromMongosName;
-constexpr StringData AggregationRequest::kNeedsMergeName;
-constexpr StringData AggregationRequest::kPipelineName;
-constexpr StringData AggregationRequest::kCollationName;
-constexpr StringData AggregationRequest::kExplainName;
-constexpr StringData AggregationRequest::kAllowDiskUseName;
-constexpr StringData AggregationRequest::kHintName;
-constexpr StringData AggregationRequest::kExchangeName;
-
-constexpr long long AggregationRequest::kDefaultBatchSize;
-
StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
const std::string& dbName,
const BSONObj& cmdObj,
@@ -215,6 +201,18 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
// 4.4 upgrade purposes, since a 4.2 mongoS will always send {mergeByPBRT:true} to the
// shards. We do nothing with it because mergeByPBRT is the only mode available in 4.4.
// Remove this final vestige of mergeByPBRT during the 4.5 development cycle.
+ } else if (fieldName == kUse44SortKeys) {
+ // TODO (SERVER-43361): After branching for 4.5, we will accept this option but ignore
+ // it, as we will be able to assume that any supported mongoS will be recent enough to
+ // understand the 4.4 sort key format. In the version that follows, we will be able to
+ // completely remove this option.
+ if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << kUse44SortKeys << " must be a boolean, not a "
+ << typeName(elem.type())};
+ }
+
+ request.setUse44SortKeys(elem.boolean());
} else if (!isGenericArgument(fieldName)) {
return {ErrorCodes::FailedToParse,
str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
@@ -313,6 +311,7 @@ Document AggregationRequest::serializeToCommandObj() const {
_writeConcern ? Value(_writeConcern->toBSON()) : Value()},
// Only serialize runtime constants if any were specified.
{kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()},
+ {kUse44SortKeys, _use44SortKeys ? Value(true) : Value()},
};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 053797eb389..223aa172c3d 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -63,6 +63,7 @@ public:
static constexpr StringData kHintName = "hint"_sd;
static constexpr StringData kExchangeName = "exchange"_sd;
static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd;
+ static constexpr StringData kUse44SortKeys = "use44SortKeys"_sd;
static constexpr long long kDefaultBatchSize = 101;
@@ -216,6 +217,10 @@ public:
return _runtimeConstants;
}
+ bool getUse44SortKeys() const {
+ return _use44SortKeys;
+ }
+
//
// Setters for optional fields.
//
@@ -280,6 +285,10 @@ public:
_runtimeConstants = std::move(runtimeConstants);
}
+ void setUse44SortKeys(bool use44SortKeys) {
+ _use44SortKeys = use44SortKeys;
+ }
+
private:
// Required fields.
const NamespaceString _nss;
@@ -329,5 +338,9 @@ private:
// A document containing runtime constants; i.e. values that do not change once computed (e.g.
// $$NOW).
boost::optional<RuntimeConstants> _runtimeConstants;
+
+ // All aggregation requests from mongos-4.4 set this flag, indicating that shard results should
+ // use the updated sort key format when returning change stream results.
+ bool _use44SortKeys = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 0ce917467f5..3bdd0a61fa2 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -653,10 +653,11 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData);
plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS;
- // TODO (SERVER-42713): When we change the format of Change Stream sort keys for 4.4, this
- // function will determine whether we use the new format, based on the AggregationRequest
- // parameters. For now, we always use the old 4.2 format.
- expCtx->use42ChangeStreamSortKeys = true;
+ // SERVER-42713: If "use44SortKeys" isn't set, then this aggregation request is from an
+ // earlier version of mongos, and we must fall back to the old way of serializing change
+ // stream sort keys from 4.2 and earlier.
+ invariant(aggRequest);
+ expCtx->use42ChangeStreamSortKeys = !aggRequest->getUse44SortKeys();
}
// If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 08f970cd34b..4e58492af14 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -178,6 +178,12 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
Value(static_cast<long long>(*opCtx->getTxnNumber()));
}
+ // TODO (SERVER-43361): We set this flag to indicate to the shards that the mongos will be able
+ // to understand change stream sort keys in the new format. After branching for 4.5, there will
+ // only be one sort key format for changes streams, so there will be no need to set this flag
+ // anymore. This flag has no effect on pipelines without a change stream.
+ cmdForShards[AggregationRequest::kUse44SortKeys] = Value(true);
+
return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), false);
}
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index ae6aaf092b4..51bbcf84db4 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -192,7 +192,10 @@ BSONObj AsyncResultsMerger::getHighWaterMark() {
stdx::lock_guard<Latch> lk(_mutex);
auto minPromisedSortKey = _getMinPromisedSortKey(lk);
if (!minPromisedSortKey.isEmpty() && !_ready(lk)) {
- _highWaterMark = minPromisedSortKey;
+ // When 'minPromisedSortKey' contains the "high watermark" resume token, it's stored in
+ // sort-key format: {"": <high watermark>}. We copy the <high watermark> part of of the
+ // sort key, which looks like {_data: ..., _typeBits: ...}, and return that.
+ _highWaterMark = minPromisedSortKey.firstElement().Obj().getOwned();
}
return _highWaterMark;
}
@@ -519,8 +522,10 @@ void AsyncResultsMerger::_updateRemoteMetadata(WithLock,
invariant(!response.getPostBatchResumeToken()->isEmpty());
// The most recent minimum sort key should never be smaller than the previous promised
- // minimum sort key for this remote, if one exists.
- auto newMinSortKey = *response.getPostBatchResumeToken();
+ // minimum sort key for this remote, if one exists. Note that the post-batch resume token is
+ // an object (with format {_data: ..., _typeBits: ...}) that we must wrap in a sort key so
+ // that it can compare correctly with sort keys from other streams.
+ auto newMinSortKey = BSON("" << *response.getPostBatchResumeToken());
if (auto& oldMinSortKey = remote.promisedMinSortKey) {
invariant(compareSortKeys(newMinSortKey, *oldMinSortKey, *_params.getSort()) >= 0);
invariant(_promisedMinSortKeys.size() <= _remotes.size());
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 575f3cc8509..bbaa35734f6 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -1344,10 +1344,10 @@ DEATH_TEST_F(AsyncResultsMergerTest,
// Create one cursor whose initial response has a postBatchResumeToken.
auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
- auto firstCursorResponse =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '"
- << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '"
- << firstDocSortKey.firstElement().String() << "'}}");
+ auto firstCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 1}}, $sortKey: {'': {_data: '"
+ << firstDocSortKey.firstElement().String() << "'}}}");
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
@@ -1377,10 +1377,10 @@ DEATH_TEST_F(AsyncResultsMergerTest,
std::vector<RemoteCursor> cursors;
BSONObj pbrtFirstCursor;
auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
- auto firstCursorResponse =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '"
- << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '"
- << firstDocSortKey.firstElement().String() << "'}}");
+ auto firstCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 1}}, $sortKey: {'': {_data: '"
+ << firstDocSortKey.firstElement().String() << "'}}}");
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
@@ -1405,10 +1405,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfRemoteHasLowerPostB
std::vector<RemoteCursor> cursors;
auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
- auto firstCursorResponse =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '"
- << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '"
- << firstDocSortKey.firstElement().String() << "'}}");
+ auto firstCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 1}}, $sortKey: {'': {data: '"
+ << firstDocSortKey.firstElement().String() << "'}}}");
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
@@ -1456,10 +1456,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
std::vector<CursorResponse> responses;
auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 6));
- auto firstCursorResponse =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '"
- << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '"
- << firstDocSortKey.firstElement().String() << "'}}");
+ auto firstCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 1}}, $sortKey: {'': {_data: '"
+ << firstDocSortKey.firstElement().String() << "'}}}");
std::vector<BSONObj> batch1 = {firstCursorResponse};
auto firstDoc = batch1.front();
responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor);
@@ -1485,10 +1485,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
responses.clear();
auto secondDocSortKey = makeResumeToken(Timestamp(1, 5), uuid, BSON("_id" << 2));
auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6));
- auto secondCursorResponse =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: '"
- << uuid.toString() << "', documentKey: {_id: 2}}, $sortKey: {'': '"
- << secondDocSortKey.firstElement().String() << "'}}");
+ auto secondCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 2}}, $sortKey: {'': {_data: '"
+ << secondDocSortKey.firstElement().String() << "'}}}");
std::vector<BSONObj> batch2 = {secondCursorResponse};
auto secondDoc = batch2.front();
responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor);
@@ -1534,10 +1534,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
std::vector<CursorResponse> responses;
auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1));
auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
- auto firstCursorResponse =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '"
- << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '"
- << firstDocSortKey.firstElement().String() << "'}}");
+ auto firstCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 1}}, $sortKey: {'': {_data: '"
+ << firstDocSortKey.firstElement().String() << "'}}}");
std::vector<BSONObj> batch1 = {firstCursorResponse};
responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor);
scheduleNetworkResponses(std::move(responses));
@@ -1562,10 +1562,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
responses.clear();
auto secondDocSortKey = makeResumeToken(Timestamp(1, 3), uuid, BSON("_id" << 2));
auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 5));
- auto secondCursorResponse =
- fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 3)}, uuid: '"
- << uuid.toString() << "', documentKey: {_id: 2}}, $sortKey: {'': '"
- << secondDocSortKey.firstElement().String() << "'}}");
+ auto secondCursorResponse = fromjson(
+ str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 3)}, uuid: '" << uuid.toString()
+ << "', documentKey: {_id: 2}}, $sortKey: {'': {_data: '"
+ << secondDocSortKey.firstElement().String() << "'}}}");
std::vector<BSONObj> batch2 = {secondCursorResponse};
// The last observed time should still be later than the first shard, so we can get the data
// from it.