diff options
author | Justin Seyster <justin.seyster@mongodb.com> | 2019-10-29 16:27:46 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-29 16:27:46 +0000 |
commit | e14dbefec5fb18a7e9fc8739d3ef529bb1338ab4 (patch) | |
tree | 20d3c6259b884a1689b47dd3b6ec17f7fc2b1412 /src | |
parent | 29cd14ad598a3594529201fa095cc1fa3dc68e07 (diff) | |
download | mongo-e14dbefec5fb18a7e9fc8739d3ef529bb1338ab4.tar.gz |
SERVER-42713 Consistent sort key format for change streams pipelines
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/exec/document_value/document.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/pipeline/aggregation_request.h | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 56 |
7 files changed, 78 insertions, 75 deletions
diff --git a/src/mongo/db/exec/document_value/document.cpp b/src/mongo/db/exec/document_value/document.cpp index 2f6660bd568..c50648c0e20 100644 --- a/src/mongo/db/exec/document_value/document.cpp +++ b/src/mongo/db/exec/document_value/document.cpp @@ -383,35 +383,14 @@ void DocumentStorage::loadLazyMetadata() const { } else if (fieldName == Document::metaFieldSortKey) { auto bsonSortKey = elem.Obj(); - bool isSingleElementKey = false; - + // If the sort key has exactly one field, we say it is a "single element key." BSONObjIterator sortKeyIt(bsonSortKey); uassert(31282, "Empty sort key in metadata", sortKeyIt.more()); - auto firstElementName = sortKeyIt.next().fieldNameStringData(); - - // If the sort key has exactly one field, we say it is a "single element key." - boost::optional<StringData> secondElementName; - if (!sortKeyIt.more()) { - isSingleElementKey = true; - } else { - secondElementName = sortKeyIt.next().fieldNameStringData(); - } + bool isSingleElementKey = !(++sortKeyIt).more(); - // If the sort key looks like {_data: ...} or {_data: ..., _typeBits: ...}, we know - // that it came from a change stream, and we also treat it as a "single element - // key." - if (!sortKeyIt.more() && (firstElementName == ResumeToken::kDataFieldName) && - (!secondElementName || secondElementName == ResumeToken::kTypeBitsFieldName)) { - // TODO (SERVER-43361): In 4.2 and earlier, the "sort key" for a change stream - // document gets serialized differently than sort keys for normal pipeline - // documents. - isSingleElementKey = true; - _metadataFields.setSortKey(Value(bsonSortKey), isSingleElementKey); - } else { - _metadataFields.setSortKey( - DocumentMetadataFields::deserializeSortKey(isSingleElementKey, bsonSortKey), - isSingleElementKey); - } + _metadataFields.setSortKey( + DocumentMetadataFields::deserializeSortKey(isSingleElementKey, bsonSortKey), + isSingleElementKey); } else if (fieldName == Document::metaFieldGeoNearDistance) { _metadataFields.setGeoNearDistance(elem.Double()); } else if (fieldName == Document::metaFieldGeoNearPoint) { diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 626867a499e..bea15219efe 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -48,20 +48,6 @@ namespace mongo { -constexpr StringData AggregationRequest::kCommandName; -constexpr StringData AggregationRequest::kCursorName; -constexpr StringData AggregationRequest::kBatchSizeName; -constexpr StringData AggregationRequest::kFromMongosName; -constexpr StringData AggregationRequest::kNeedsMergeName; -constexpr StringData AggregationRequest::kPipelineName; -constexpr StringData AggregationRequest::kCollationName; -constexpr StringData AggregationRequest::kExplainName; -constexpr StringData AggregationRequest::kAllowDiskUseName; -constexpr StringData AggregationRequest::kHintName; -constexpr StringData AggregationRequest::kExchangeName; - -constexpr long long AggregationRequest::kDefaultBatchSize; - StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( const std::string& dbName, const BSONObj& cmdObj, @@ -215,6 +201,18 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( // 4.4 upgrade purposes, since a 4.2 mongoS will always send {mergeByPBRT:true} to the // shards. We do nothing with it because mergeByPBRT is the only mode available in 4.4. // Remove this final vestige of mergeByPBRT during the 4.5 development cycle. + } else if (fieldName == kUse44SortKeys) { + // TODO (SERVER-43361): After branching for 4.5, we will accept this option but ignore + // it, as we will be able to assume that any supported mongoS will be recent enough to + // understand the 4.4 sort key format. In the version that follows, we will be able to + // completely remove this option. + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() << kUse44SortKeys << " must be a boolean, not a " + << typeName(elem.type())}; + } + + request.setUse44SortKeys(elem.boolean()); } else if (!isGenericArgument(fieldName)) { return {ErrorCodes::FailedToParse, str::stream() << "unrecognized field '" << elem.fieldName() << "'"}; @@ -313,6 +311,7 @@ Document AggregationRequest::serializeToCommandObj() const { _writeConcern ? Value(_writeConcern->toBSON()) : Value()}, // Only serialize runtime constants if any were specified. {kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()}, + {kUse44SortKeys, _use44SortKeys ? Value(true) : Value()}, }; } } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 053797eb389..223aa172c3d 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -63,6 +63,7 @@ public: static constexpr StringData kHintName = "hint"_sd; static constexpr StringData kExchangeName = "exchange"_sd; static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd; + static constexpr StringData kUse44SortKeys = "use44SortKeys"_sd; static constexpr long long kDefaultBatchSize = 101; @@ -216,6 +217,10 @@ public: return _runtimeConstants; } + bool getUse44SortKeys() const { + return _use44SortKeys; + } + // // Setters for optional fields. // @@ -280,6 +285,10 @@ public: _runtimeConstants = std::move(runtimeConstants); } + void setUse44SortKeys(bool use44SortKeys) { + _use44SortKeys = use44SortKeys; + } + private: // Required fields. const NamespaceString _nss; @@ -329,5 +338,9 @@ private: // A document containing runtime constants; i.e. values that do not change once computed (e.g. // $$NOW). boost::optional<RuntimeConstants> _runtimeConstants; + + // All aggregation requests from mongos-4.4 set this flag, indicating that shard results should + // use the updated sort key format when returning change stream results. + bool _use44SortKeys = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 0ce917467f5..3bdd0a61fa2 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -653,10 +653,11 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep invariant(expCtx->tailableMode == TailableModeEnum::kTailableAndAwaitData); plannerOpts |= QueryPlannerParams::TRACK_LATEST_OPLOG_TS; - // TODO (SERVER-42713): When we change the format of Change Stream sort keys for 4.4, this - // function will determine whether we use the new format, based on the AggregationRequest - // parameters. For now, we always use the old 4.2 format. - expCtx->use42ChangeStreamSortKeys = true; + // SERVER-42713: If "use44SortKeys" isn't set, then this aggregation request is from an + // earlier version of mongos, and we must fall back to the old way of serializing change + // stream sort keys from 4.2 and earlier. + invariant(aggRequest); + expCtx->use42ChangeStreamSortKeys = !aggRequest->getUse44SortKeys(); } // If there is a sort stage eligible for pushdown, serialize its SortPattern to a BSONObj. The diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 08f970cd34b..4e58492af14 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -178,6 +178,12 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, Value(static_cast<long long>(*opCtx->getTxnNumber())); } + // TODO (SERVER-43361): We set this flag to indicate to the shards that the mongos will be able + // to understand change stream sort keys in the new format. After branching for 4.5, there will + // only be one sort key format for changes streams, so there will be no need to set this flag + // anymore. This flag has no effect on pipelines without a change stream. + cmdForShards[AggregationRequest::kUse44SortKeys] = Value(true); + return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), false); } diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index ae6aaf092b4..51bbcf84db4 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -192,7 +192,10 @@ BSONObj AsyncResultsMerger::getHighWaterMark() { stdx::lock_guard<Latch> lk(_mutex); auto minPromisedSortKey = _getMinPromisedSortKey(lk); if (!minPromisedSortKey.isEmpty() && !_ready(lk)) { - _highWaterMark = minPromisedSortKey; + // When 'minPromisedSortKey' contains the "high watermark" resume token, it's stored in + // sort-key format: {"": <high watermark>}. We copy the <high watermark> part of of the + // sort key, which looks like {_data: ..., _typeBits: ...}, and return that. + _highWaterMark = minPromisedSortKey.firstElement().Obj().getOwned(); } return _highWaterMark; } @@ -519,8 +522,10 @@ void AsyncResultsMerger::_updateRemoteMetadata(WithLock, invariant(!response.getPostBatchResumeToken()->isEmpty()); // The most recent minimum sort key should never be smaller than the previous promised - // minimum sort key for this remote, if one exists. - auto newMinSortKey = *response.getPostBatchResumeToken(); + // minimum sort key for this remote, if one exists. Note that the post-batch resume token is + // an object (with format {_data: ..., _typeBits: ...}) that we must wrap in a sort key so + // that it can compare correctly with sort keys from other streams. + auto newMinSortKey = BSON("" << *response.getPostBatchResumeToken()); if (auto& oldMinSortKey = remote.promisedMinSortKey) { invariant(compareSortKeys(newMinSortKey, *oldMinSortKey, *_params.getSort()) >= 0); invariant(_promisedMinSortKeys.size() <= _remotes.size()); diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 575f3cc8509..bbaa35734f6 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -1344,10 +1344,10 @@ DEATH_TEST_F(AsyncResultsMergerTest, // Create one cursor whose initial response has a postBatchResumeToken. auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); - auto firstCursorResponse = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" - << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '" - << firstDocSortKey.firstElement().String() << "'}}"); + auto firstCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 1}}, $sortKey: {'': {_data: '" + << firstDocSortKey.firstElement().String() << "'}}}"); cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], @@ -1377,10 +1377,10 @@ DEATH_TEST_F(AsyncResultsMergerTest, std::vector<RemoteCursor> cursors; BSONObj pbrtFirstCursor; auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); - auto firstCursorResponse = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" - << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '" - << firstDocSortKey.firstElement().String() << "'}}"); + auto firstCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 1}}, $sortKey: {'': {_data: '" + << firstDocSortKey.firstElement().String() << "'}}}"); cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], @@ -1405,10 +1405,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfRemoteHasLowerPostB std::vector<RemoteCursor> cursors; auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); - auto firstCursorResponse = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" - << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '" - << firstDocSortKey.firstElement().String() << "'}}"); + auto firstCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 1}}, $sortKey: {'': {data: '" + << firstDocSortKey.firstElement().String() << "'}}}"); cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], @@ -1456,10 +1456,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) std::vector<CursorResponse> responses; auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 6)); - auto firstCursorResponse = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" - << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '" - << firstDocSortKey.firstElement().String() << "'}}"); + auto firstCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 1}}, $sortKey: {'': {_data: '" + << firstDocSortKey.firstElement().String() << "'}}}"); std::vector<BSONObj> batch1 = {firstCursorResponse}; auto firstDoc = batch1.front(); responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor); @@ -1485,10 +1485,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) responses.clear(); auto secondDocSortKey = makeResumeToken(Timestamp(1, 5), uuid, BSON("_id" << 2)); auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6)); - auto secondCursorResponse = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: '" - << uuid.toString() << "', documentKey: {_id: 2}}, $sortKey: {'': '" - << secondDocSortKey.firstElement().String() << "'}}"); + auto secondCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 5)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 2}}, $sortKey: {'': {_data: '" + << secondDocSortKey.firstElement().String() << "'}}}"); std::vector<BSONObj> batch2 = {secondCursorResponse}; auto secondDoc = batch2.front(); responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor); @@ -1534,10 +1534,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting std::vector<CursorResponse> responses; auto firstDocSortKey = makeResumeToken(Timestamp(1, 4), uuid, BSON("_id" << 1)); auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); - auto firstCursorResponse = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" - << uuid.toString() << "', documentKey: {_id: 1}}, $sortKey: {'': '" - << firstDocSortKey.firstElement().String() << "'}}"); + auto firstCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 4)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 1}}, $sortKey: {'': {_data: '" + << firstDocSortKey.firstElement().String() << "'}}}"); std::vector<BSONObj> batch1 = {firstCursorResponse}; responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor); scheduleNetworkResponses(std::move(responses)); @@ -1562,10 +1562,10 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting responses.clear(); auto secondDocSortKey = makeResumeToken(Timestamp(1, 3), uuid, BSON("_id" << 2)); auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 5)); - auto secondCursorResponse = - fromjson(str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 3)}, uuid: '" - << uuid.toString() << "', documentKey: {_id: 2}}, $sortKey: {'': '" - << secondDocSortKey.firstElement().String() << "'}}"); + auto secondCursorResponse = fromjson( + str::stream() << "{_id: {clusterTime: {ts: Timestamp(1, 3)}, uuid: '" << uuid.toString() + << "', documentKey: {_id: 2}}, $sortKey: {'': {_data: '" + << secondDocSortKey.firstElement().String() << "'}}}"); std::vector<BSONObj> batch2 = {secondCursorResponse}; // The last observed time should still be later than the first shard, so we can get the data // from it. |