diff options
author | Arun Banala <arun.banala@mongodb.com> | 2022-04-29 12:56:44 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-23 07:52:57 +0000 |
commit | 9d559b3ad8942ade301b41284dee9e6b8c8f098c (patch) | |
tree | 2942842824443d062be972c637219e02136ac93c /src/mongo/db/pipeline | |
parent | fb4724113317c2279e340047b3a1cd8d6c72fb36 (diff) | |
download | mongo-9d559b3ad8942ade301b41284dee9e6b8c8f098c.tar.gz |
SERVER-65909 Make oplog and change stream formats of {op: "n"} events consistent
Diffstat (limited to 'src/mongo/db/pipeline')
7 files changed, 227 insertions, 77 deletions
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index 64874656a89..63a8aeebac8 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/change_stream_document_diff_parser.h" #include "mongo/db/pipeline/change_stream_filter_helpers.h" +#include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" @@ -335,17 +336,21 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum break; } case repl::OpTypeEnum::kNoop: { + // TODO SERVER-66138: The legacy oplog format for some no-op operations can include + // 'type' field, which was removed post 6.0. We can remove all the logic related to the + // 'type' field once 7.0 is release. + const auto o2Field = change_stream_legacy::convertFromLegacyOplogFormat( + input[repl::OplogEntry::kObject2FieldName].getDocument(), nss); + // Check whether this is a shardCollection oplog entry. - if (!input.getNestedField("o2.shardCollection").missing()) { - const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument(); + if (!o2Field["shardCollection"].missing()) { operationType = DocumentSourceChangeStream::kShardCollectionOpType; operationDescription = Value(copyDocExceptFields(o2Field, {"shardCollection"_sd})); break; } // Check if this is a migration of the last chunk off a shard. - if (!input.getNestedField("o2.migrateLastChunkFromShard").missing()) { - const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument(); + if (!o2Field["migrateLastChunkFromShard"].missing()) { operationType = DocumentSourceChangeStream::kMigrateLastChunkFromShardOpType; operationDescription = Value(copyDocExceptFields(o2Field, {"migrateLastChunkFromShard"_sd})); @@ -353,8 +358,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum } // Check whether this is a refineCollectionShardKey oplog entry. - if (!input.getNestedField("o2.refineCollectionShardKey").missing()) { - const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument(); + if (!o2Field["refineCollectionShardKey"].missing()) { operationType = DocumentSourceChangeStream::kRefineCollectionShardKeyOpType; operationDescription = Value(copyDocExceptFields(o2Field, {"refineCollectionShardKey"_sd})); @@ -362,41 +366,39 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum } // Check whether this is a reshardCollection oplog entry. - if (!input.getNestedField("o2.reshardCollection").missing()) { - const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument(); + if (!o2Field["reshardCollection"].missing()) { operationType = DocumentSourceChangeStream::kReshardCollectionOpType; operationDescription = Value(copyDocExceptFields(o2Field, {"reshardCollection"_sd})); break; } - // Otherwise, o2.type determines the message type. - auto o2Type = input.getNestedField("o2.type"); - tassert(5052200, "o2.type is missing from noop oplog event", !o2Type.missing()); - - if (o2Type.getString() == "migrateChunkToNewShard"_sd) { - operationType = DocumentSourceChangeStream::kNewShardDetectedOpType; - // Generate a fake document Id for NewShardDetected operation so that we can - // resume after this operation. - documentKey = Value(Document{{DocumentSourceChangeStream::kIdField, - input[repl::OplogEntry::kObject2FieldName]}}); - } else if (o2Type.getString() == "reshardBegin"_sd) { + if (!o2Field["migrateChunkToNewShard"].missing()) { + operationType = change_stream_legacy::getNewShardDetectedOpName(_expCtx); + operationDescription = + Value(copyDocExceptFields(o2Field, {"migrateChunkToNewShard"_sd})); + break; + } + + if (!o2Field["reshardBegin"].missing()) { operationType = DocumentSourceChangeStream::kReshardBeginOpType; doc.addField(DocumentSourceChangeStream::kReshardingUuidField, - input.getNestedField("o2.reshardingUUID")); - documentKey = Value(Document{{DocumentSourceChangeStream::kIdField, - input[repl::OplogEntry::kObject2FieldName]}}); - } else if (o2Type.getString() == "reshardDoneCatchUp"_sd) { + o2Field["reshardingUUID"]); + operationDescription = Value(copyDocExceptFields(o2Field, {"reshardBegin"_sd})); + break; + } + + if (!o2Field["reshardDoneCatchUp"].missing()) { operationType = DocumentSourceChangeStream::kReshardDoneCatchUpOpType; doc.addField(DocumentSourceChangeStream::kReshardingUuidField, - input.getNestedField("o2.reshardingUUID")); - documentKey = Value(Document{{DocumentSourceChangeStream::kIdField, - input[repl::OplogEntry::kObject2FieldName]}}); - } else { - // We should never see an unknown noop entry. - MONGO_UNREACHABLE_TASSERT(5052201); + o2Field["reshardingUUID"]); + operationDescription = + Value(copyDocExceptFields(o2Field, {"reshardDoneCatchUp"_sd})); + break; } - break; + + // We should never see an unknown noop entry. + MONGO_UNREACHABLE_TASSERT(5052201); } default: { MONGO_UNREACHABLE_TASSERT(6330501); } } @@ -443,14 +445,6 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum checkValueType(wallTime, repl::OplogEntry::kWallClockTimeFieldName, BSONType::Date); doc.addField(DocumentSourceChangeStream::kWallTimeField, wallTime); - // Invalidation, topology change, and resharding events have fewer fields. - if (operationType == DocumentSourceChangeStream::kInvalidateOpType || - operationType == DocumentSourceChangeStream::kNewShardDetectedOpType || - operationType == DocumentSourceChangeStream::kReshardBeginOpType || - operationType == DocumentSourceChangeStream::kReshardDoneCatchUpOpType) { - return doc.freeze(); - } - // Add the post-image, pre-image id, namespace, documentKey and other fields as appropriate. doc.addField(DocumentSourceChangeStream::kFullDocumentField, std::move(fullDocument)); diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp index bf8e0e1a4c6..5f552921008 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp @@ -294,7 +294,9 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter( // Noop change events: // - reshardBegin: A resharding operation begins. // - reshardDoneCatchUp: "Catch up" phase of reshard operation completes. - std::vector<StringData> internalOpTypes = {"reshardBegin"_sd, "reshardDoneCatchUp"_sd}; + // - shardCollection: A shardCollection operation has completed. + std::vector<StringData> internalOpTypes = { + "reshardBegin"_sd, "reshardDoneCatchUp"_sd, "shardCollection"_sd}; // Noop change events that are only applicable when merging results on mongoS: // - migrateChunkToNewShard: A chunk migrated to a shard that didn't have any chunks. @@ -302,29 +304,26 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter( internalOpTypes.push_back("migrateChunkToNewShard"_sd); } - // Build the oplog filter to match the required internal op types. - BSONArrayBuilder internalOpTypeOrBuilder; - for (const auto& eventName : internalOpTypes) { - internalOpTypeOrBuilder.append(BSON("o2.type" << eventName)); - } - - // Also filter for shardCollection events, which are recorded as {op: 'n'} in the oplog. - internalOpTypeOrBuilder.append(BSON("o2.shardCollection" << BSON("$exists" << true))); - // Only return the 'migrateLastChunkFromShard' event if 'showSystemEvents' is set. if (expCtx->changeStreamSpec->getShowSystemEvents()) { - internalOpTypeOrBuilder.append( - BSON("o2.migrateLastChunkFromShard" << BSON("$exists" << true))); + internalOpTypes.push_back("migrateLastChunkFromShard"_sd); } if (feature_flags::gFeatureFlagChangeStreamsFurtherEnrichedEvents.isEnabled( serverGlobalParams.featureCompatibility)) { - internalOpTypeOrBuilder.append( - BSON("o2.refineCollectionShardKey" << BSON("$exists" << true))); + internalOpTypes.push_back("refineCollectionShardKey"_sd); + internalOpTypes.push_back("reshardCollection"_sd); + } - internalOpTypeOrBuilder.append(BSON("o2.reshardCollection" << BSON("$exists" << true))); + // Build the oplog filter to match the required internal op types. + BSONArrayBuilder internalOpTypeOrBuilder; + for (const auto& eventName : internalOpTypes) { + internalOpTypeOrBuilder.append(BSON("o2." + eventName << BSON("$exists" << true))); } + // TODO SERVER-66138: This filter can be removed after 7.0 release. + change_stream_legacy::populateInternalOperationFilter(expCtx, &internalOpTypeOrBuilder); + // Finalize the array of $or filter predicates. internalOpTypeOrBuilder.done(); diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index bf935d51da4..92998d18955 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -132,5 +132,47 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo return Document{opLogEntry.getObject().getOwned()}; } +// TODO SERVER-66138: This function can be removed after we branch for 7.0. +void populateInternalOperationFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx, + BSONArrayBuilder* orBuilder) { + std::vector<StringData> opTypes = {"reshardBegin"_sd, "reshardDoneCatchUp"_sd}; + + // Noop change events that are only applicable when merging results on mongoS: + // - migrateChunkToNewShard: A chunk migrated to a shard that didn't have any chunks. + if (expCtx->inMongos || expCtx->needsMerge) { + opTypes.push_back("migrateChunkToNewShard"_sd); + } + + for (const auto& eventName : opTypes) { + // Legacy oplog messages used the "o2.type" field to indicate the message type. + orBuilder->append(BSON("o2.type" << eventName)); + } +} + +// TODO SERVER-66138: This function can be removed after we branch for 7.0. +Document convertFromLegacyOplogFormat(const Document& o2Entry, const NamespaceString& nss) { + auto type = o2Entry["type"]; + if (type.missing()) { + return o2Entry; + } + + MutableDocument doc(o2Entry); + doc.remove("type"); + + // This field would be the first field in the new format, but the current change stream code + // does not depend on the field order. + doc.addField(type.getString(), Value(nss.toString())); + return doc.freeze(); +} + +// TODO SERVER-66138: This function can be removed after we branch for 7.0. +StringData getNewShardDetectedOpName(const boost::intrusive_ptr<ExpressionContext>& expCtx) { + // The op name on 6.0 and older versions. + const StringData kNewShardDetectedOpTypeLegacyName = "kNewShardDetected"_sd; + return (expCtx->changeStreamTokenVersion == ResumeTokenData::kDefaultTokenVersion) + ? DocumentSourceChangeStream::kNewShardDetectedOpType + : kNewShardDetectedOpTypeLegacyName; +} + } // namespace change_stream_legacy } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h index fff3b770134..e2e17843b49 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h @@ -66,4 +66,19 @@ static const std::set<StringData> kClassicOperationTypes = DocumentSourceChangeStream::kReshardDoneCatchUpOpType, DocumentSourceChangeStream::kNewShardDetectedOpType}; +/** + * Adds filtering for legacy-format {op: 'n'} oplog messages, which used the "o2.type" field to + * indicate the message type. + */ +void populateInternalOperationFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx, + BSONArrayBuilder* filter); + +/** + * Converts legacy-format oplog o2 fields of type {type: <op name>, ...} to + * {..., <op name>: <namespace>}. Does nothing if the 'type' field is not present inside 'o2'. + */ +Document convertFromLegacyOplogFormat(const Document& legacyO2Entry, const NamespaceString& nss); + +StringData getNewShardDetectedOpName(const boost::intrusive_ptr<ExpressionContext>& expCtx); + } // namespace mongo::change_stream_legacy diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index ab7489222c6..1ecdde1d5b6 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -230,8 +230,9 @@ public: // The internal change events that are not exposed to the users. static constexpr StringData kReshardBeginOpType = "reshardBegin"_sd; static constexpr StringData kReshardDoneCatchUpOpType = "reshardDoneCatchUp"_sd; + // Internal op type to signal mongos to open cursors on new shards. - static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd; + static constexpr StringData kNewShardDetectedOpType = "migrateChunkToNewShard"_sd; // These events are guarded behind the 'showExpandedEvents' flag. static constexpr StringData kCreateOpType = "create"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp index 26cdcb77181..73212fedac4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h" +#include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/change_stream_topology_change_info.h" namespace mongo { @@ -78,7 +79,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckTopologyChange::doG // Throw the 'ChangeStreamTopologyChangeInfo' exception, wrapping the topology change event // along with its metadata. This will bypass the remainder of the pipeline and will be passed // directly up to mongoS. - if (eventOpType == DocumentSourceChangeStream::kNewShardDetectedOpType) { + if (eventOpType == change_stream_legacy::getNewShardDetectedOpName(pExpCtx)) { uasserted(ChangeStreamTopologyChangeInfo(eventDoc.toBsonWithMetaData()), "Collection migrated to new shard"); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 8a48df33cbc..fcd7c965b37 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1306,8 +1306,7 @@ TEST_F(ChangeStreamStageTest, MatchFiltersDropDatabaseCommand) { OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false); checkTransformation(dropDB, boost::none); } - -TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { +TEST_F(ChangeStreamStageTest, TransformNewShardDetectedLegacyFormat) { auto o2Field = D{{"type", "migrateChunkToNewShard"_sd}}; auto newShardDetected = makeOplogEntry(OpTypeEnum::kNoop, nss, @@ -1318,14 +1317,12 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { Document expectedNewShardDetected{ {DSChangeStream::kIdField, - makeResumeTokenWithEventId( - kDefaultTs, - testUuid(), - Document{{"operationType", DSChangeStream::kNewShardDetectedOpType}, - {"documentKey", BSON("_id" << o2Field)}})}, + makeResumeToken( + kDefaultTs, testUuid(), V{D{{}}}, DSChangeStream::kNewShardDetectedOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; getExpCtx()->needsMerge = true; @@ -1333,11 +1330,70 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { checkTransformation(newShardDetected, expectedNewShardDetected); } +TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { + auto o2Field = D{{"migrateChunkToNewShard", nss.toString()}, + {"fromShardId", "fromShard"_sd}, + {"toShardId", "toShard"_sd}}; + auto newShardDetected = makeOplogEntry(OpTypeEnum::kNoop, + nss, + BSONObj(), + testUuid(), + boost::none, // fromMigrate + o2Field.toBson()); + + const auto opDesc = Value(D{{"fromShardId", "fromShard"_sd}, {"toShardId", "toShard"_sd}}); + Document expectedNewShardDetected{ + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), opDesc, DSChangeStream::kNewShardDetectedOpType)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kCollectionUuidField, testUuid()}, + {DSChangeStream::kWallTimeField, Date_t()}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kOperationDescriptionField, opDesc}, + }; + + getExpCtx()->needsMerge = true; + + checkTransformation(newShardDetected, expectedNewShardDetected, kShowExpandedEventsSpec); +} + +TEST_F(ChangeStreamStageTest, TransformReshardBeginLegacyFormat) { + auto uuid = UUID::gen(); + auto reshardingUuid = UUID::gen(); + + const auto o2FieldInLegacyFormat = BSON("type" + << "reshardBegin" + << "reshardingUUID" << reshardingUuid); + auto reshardingBegin = makeOplogEntry(OpTypeEnum::kNoop, + nss, + BSONObj(), + uuid, + true, // fromMigrate + o2FieldInLegacyFormat); + + auto spec = fromjson("{$changeStream: {showMigrationEvents: true, showExpandedEvents: true}}"); + + const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}}; + Document expectedReshardingBegin{ + {DSChangeStream::kReshardingUuidField, reshardingUuid}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, uuid, opDesc, DSChangeStream::kReshardBeginOpType)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kCollectionUuidField, uuid}, + {DSChangeStream::kWallTimeField, Date_t()}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kOperationDescriptionField, opDesc}, + }; + checkTransformation(reshardingBegin, expectedReshardingBegin, spec); +} + TEST_F(ChangeStreamStageTest, TransformReshardBegin) { auto uuid = UUID::gen(); auto reshardingUuid = UUID::gen(); - ReshardingChangeEventO2Field o2Field{reshardingUuid, ReshardingChangeEventEnum::kReshardBegin}; + ReshardBeginChangeEventO2Field o2Field{nss, reshardingUuid}; auto reshardingBegin = makeOplogEntry(OpTypeEnum::kNoop, nss, BSONObj(), @@ -1345,29 +1401,68 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) { true, // fromMigrate o2Field.toBSON()); - auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}"); + auto spec = fromjson("{$changeStream: {showMigrationEvents: true, showExpandedEvents: true}}"); + const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}}; Document expectedReshardingBegin{ {DSChangeStream::kReshardingUuidField, reshardingUuid}, {DSChangeStream::kIdField, - makeResumeTokenWithEventId(kDefaultTs, - uuid, - Document{{"operationType", DSChangeStream::kReshardBeginOpType}, - {"documentKey", BSON("_id" << o2Field.toBSON())}})}, + makeResumeToken(kDefaultTs, uuid, opDesc, DSChangeStream::kReshardBeginOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kCollectionUuidField, uuid}, {DSChangeStream::kWallTimeField, Date_t()}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kOperationDescriptionField, opDesc}, }; checkTransformation(reshardingBegin, expectedReshardingBegin, spec); } +TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUpLegacyFormat) { + auto existingUuid = UUID::gen(); + auto reshardingUuid = UUID::gen(); + auto temporaryNs = constructTemporaryReshardingNss(nss.db(), existingUuid); + + const auto o2FieldInLegacyFormat = BSON("type" + << "reshardDoneCatchUp" + << "reshardingUUID" << reshardingUuid); + auto reshardDoneCatchUp = makeOplogEntry(OpTypeEnum::kNoop, + temporaryNs, + BSONObj(), + reshardingUuid, + true, // fromMigrate + o2FieldInLegacyFormat); + + auto spec = fromjson( + "{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true, " + "showExpandedEvents: true}}"); + auto expCtx = getExpCtx(); + expCtx->ns = temporaryNs; + + const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}}; + Document expectedReshardingDoneCatchUp{ + {DSChangeStream::kReshardingUuidField, reshardingUuid}, + {DSChangeStream::kIdField, + makeResumeToken( + kDefaultTs, reshardingUuid, opDesc, DSChangeStream::kReshardDoneCatchUpOpType)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kCollectionUuidField, reshardingUuid}, + {DSChangeStream::kWallTimeField, Date_t()}, + {DSChangeStream::kNamespaceField, + D{{"db", temporaryNs.db()}, {"coll", temporaryNs.coll()}}}, + {DSChangeStream::kOperationDescriptionField, opDesc}, + }; + + checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec); +} + TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) { auto existingUuid = UUID::gen(); auto reshardingUuid = UUID::gen(); auto temporaryNs = constructTemporaryReshardingNss(nss.db(), existingUuid); - ReshardingChangeEventO2Field o2Field{reshardingUuid, - ReshardingChangeEventEnum::kReshardDoneCatchUp}; + ReshardDoneCatchUpChangeEventO2Field o2Field{temporaryNs, reshardingUuid}; auto reshardDoneCatchUp = makeOplogEntry(OpTypeEnum::kNoop, temporaryNs, BSONObj(), @@ -1375,22 +1470,25 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) { true, // fromMigrate o2Field.toBSON()); - auto spec = - fromjson("{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true}}"); + auto spec = fromjson( + "{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true, " + "showExpandedEvents: true}}"); auto expCtx = getExpCtx(); expCtx->ns = temporaryNs; + const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}}; Document expectedReshardingDoneCatchUp{ {DSChangeStream::kReshardingUuidField, reshardingUuid}, {DSChangeStream::kIdField, - makeResumeTokenWithEventId( - kDefaultTs, - reshardingUuid, - Document{{"operationType", DSChangeStream::kReshardDoneCatchUpOpType}, - {"documentKey", BSON("_id" << o2Field.toBSON())}})}, + makeResumeToken( + kDefaultTs, reshardingUuid, opDesc, DSChangeStream::kReshardDoneCatchUpOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kCollectionUuidField, reshardingUuid}, {DSChangeStream::kWallTimeField, Date_t()}, + {DSChangeStream::kNamespaceField, + D{{"db", temporaryNs.db()}, {"coll", temporaryNs.coll()}}}, + {DSChangeStream::kOperationDescriptionField, opDesc}, }; checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec); |