summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorArun Banala <arun.banala@mongodb.com>2022-04-29 12:56:44 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-23 07:52:57 +0000
commit9d559b3ad8942ade301b41284dee9e6b8c8f098c (patch)
tree2942842824443d062be972c637219e02136ac93c /src/mongo/db/pipeline
parentfb4724113317c2279e340047b3a1cd8d6c72fb36 (diff)
downloadmongo-9d559b3ad8942ade301b41284dee9e6b8c8f098c.tar.gz
SERVER-65909 Make oplog and change stream formats of {op: "n"} events consistent
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp70
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.cpp29
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp42
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.h15
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp142
7 files changed, 227 insertions, 77 deletions
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 64874656a89..63a8aeebac8 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/change_stream_document_diff_parser.h"
#include "mongo/db/pipeline/change_stream_filter_helpers.h"
+#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
@@ -335,17 +336,21 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
break;
}
case repl::OpTypeEnum::kNoop: {
+ // TODO SERVER-66138: The legacy oplog format for some no-op operations can include
+ // 'type' field, which was removed post 6.0. We can remove all the logic related to the
+ // 'type' field once 7.0 is release.
+ const auto o2Field = change_stream_legacy::convertFromLegacyOplogFormat(
+ input[repl::OplogEntry::kObject2FieldName].getDocument(), nss);
+
// Check whether this is a shardCollection oplog entry.
- if (!input.getNestedField("o2.shardCollection").missing()) {
- const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument();
+ if (!o2Field["shardCollection"].missing()) {
operationType = DocumentSourceChangeStream::kShardCollectionOpType;
operationDescription = Value(copyDocExceptFields(o2Field, {"shardCollection"_sd}));
break;
}
// Check if this is a migration of the last chunk off a shard.
- if (!input.getNestedField("o2.migrateLastChunkFromShard").missing()) {
- const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument();
+ if (!o2Field["migrateLastChunkFromShard"].missing()) {
operationType = DocumentSourceChangeStream::kMigrateLastChunkFromShardOpType;
operationDescription =
Value(copyDocExceptFields(o2Field, {"migrateLastChunkFromShard"_sd}));
@@ -353,8 +358,7 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
}
// Check whether this is a refineCollectionShardKey oplog entry.
- if (!input.getNestedField("o2.refineCollectionShardKey").missing()) {
- const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument();
+ if (!o2Field["refineCollectionShardKey"].missing()) {
operationType = DocumentSourceChangeStream::kRefineCollectionShardKeyOpType;
operationDescription =
Value(copyDocExceptFields(o2Field, {"refineCollectionShardKey"_sd}));
@@ -362,41 +366,39 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
}
// Check whether this is a reshardCollection oplog entry.
- if (!input.getNestedField("o2.reshardCollection").missing()) {
- const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument();
+ if (!o2Field["reshardCollection"].missing()) {
operationType = DocumentSourceChangeStream::kReshardCollectionOpType;
operationDescription =
Value(copyDocExceptFields(o2Field, {"reshardCollection"_sd}));
break;
}
- // Otherwise, o2.type determines the message type.
- auto o2Type = input.getNestedField("o2.type");
- tassert(5052200, "o2.type is missing from noop oplog event", !o2Type.missing());
-
- if (o2Type.getString() == "migrateChunkToNewShard"_sd) {
- operationType = DocumentSourceChangeStream::kNewShardDetectedOpType;
- // Generate a fake document Id for NewShardDetected operation so that we can
- // resume after this operation.
- documentKey = Value(Document{{DocumentSourceChangeStream::kIdField,
- input[repl::OplogEntry::kObject2FieldName]}});
- } else if (o2Type.getString() == "reshardBegin"_sd) {
+ if (!o2Field["migrateChunkToNewShard"].missing()) {
+ operationType = change_stream_legacy::getNewShardDetectedOpName(_expCtx);
+ operationDescription =
+ Value(copyDocExceptFields(o2Field, {"migrateChunkToNewShard"_sd}));
+ break;
+ }
+
+ if (!o2Field["reshardBegin"].missing()) {
operationType = DocumentSourceChangeStream::kReshardBeginOpType;
doc.addField(DocumentSourceChangeStream::kReshardingUuidField,
- input.getNestedField("o2.reshardingUUID"));
- documentKey = Value(Document{{DocumentSourceChangeStream::kIdField,
- input[repl::OplogEntry::kObject2FieldName]}});
- } else if (o2Type.getString() == "reshardDoneCatchUp"_sd) {
+ o2Field["reshardingUUID"]);
+ operationDescription = Value(copyDocExceptFields(o2Field, {"reshardBegin"_sd}));
+ break;
+ }
+
+ if (!o2Field["reshardDoneCatchUp"].missing()) {
operationType = DocumentSourceChangeStream::kReshardDoneCatchUpOpType;
doc.addField(DocumentSourceChangeStream::kReshardingUuidField,
- input.getNestedField("o2.reshardingUUID"));
- documentKey = Value(Document{{DocumentSourceChangeStream::kIdField,
- input[repl::OplogEntry::kObject2FieldName]}});
- } else {
- // We should never see an unknown noop entry.
- MONGO_UNREACHABLE_TASSERT(5052201);
+ o2Field["reshardingUUID"]);
+ operationDescription =
+ Value(copyDocExceptFields(o2Field, {"reshardDoneCatchUp"_sd}));
+ break;
}
- break;
+
+ // We should never see an unknown noop entry.
+ MONGO_UNREACHABLE_TASSERT(5052201);
}
default: { MONGO_UNREACHABLE_TASSERT(6330501); }
}
@@ -443,14 +445,6 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
checkValueType(wallTime, repl::OplogEntry::kWallClockTimeFieldName, BSONType::Date);
doc.addField(DocumentSourceChangeStream::kWallTimeField, wallTime);
- // Invalidation, topology change, and resharding events have fewer fields.
- if (operationType == DocumentSourceChangeStream::kInvalidateOpType ||
- operationType == DocumentSourceChangeStream::kNewShardDetectedOpType ||
- operationType == DocumentSourceChangeStream::kReshardBeginOpType ||
- operationType == DocumentSourceChangeStream::kReshardDoneCatchUpOpType) {
- return doc.freeze();
- }
-
// Add the post-image, pre-image id, namespace, documentKey and other fields as appropriate.
doc.addField(DocumentSourceChangeStream::kFullDocumentField, std::move(fullDocument));
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
index bf8e0e1a4c6..5f552921008 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
@@ -294,7 +294,9 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter(
// Noop change events:
// - reshardBegin: A resharding operation begins.
// - reshardDoneCatchUp: "Catch up" phase of reshard operation completes.
- std::vector<StringData> internalOpTypes = {"reshardBegin"_sd, "reshardDoneCatchUp"_sd};
+ // - shardCollection: A shardCollection operation has completed.
+ std::vector<StringData> internalOpTypes = {
+ "reshardBegin"_sd, "reshardDoneCatchUp"_sd, "shardCollection"_sd};
// Noop change events that are only applicable when merging results on mongoS:
// - migrateChunkToNewShard: A chunk migrated to a shard that didn't have any chunks.
@@ -302,29 +304,26 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter(
internalOpTypes.push_back("migrateChunkToNewShard"_sd);
}
- // Build the oplog filter to match the required internal op types.
- BSONArrayBuilder internalOpTypeOrBuilder;
- for (const auto& eventName : internalOpTypes) {
- internalOpTypeOrBuilder.append(BSON("o2.type" << eventName));
- }
-
- // Also filter for shardCollection events, which are recorded as {op: 'n'} in the oplog.
- internalOpTypeOrBuilder.append(BSON("o2.shardCollection" << BSON("$exists" << true)));
-
// Only return the 'migrateLastChunkFromShard' event if 'showSystemEvents' is set.
if (expCtx->changeStreamSpec->getShowSystemEvents()) {
- internalOpTypeOrBuilder.append(
- BSON("o2.migrateLastChunkFromShard" << BSON("$exists" << true)));
+ internalOpTypes.push_back("migrateLastChunkFromShard"_sd);
}
if (feature_flags::gFeatureFlagChangeStreamsFurtherEnrichedEvents.isEnabled(
serverGlobalParams.featureCompatibility)) {
- internalOpTypeOrBuilder.append(
- BSON("o2.refineCollectionShardKey" << BSON("$exists" << true)));
+ internalOpTypes.push_back("refineCollectionShardKey"_sd);
+ internalOpTypes.push_back("reshardCollection"_sd);
+ }
- internalOpTypeOrBuilder.append(BSON("o2.reshardCollection" << BSON("$exists" << true)));
+ // Build the oplog filter to match the required internal op types.
+ BSONArrayBuilder internalOpTypeOrBuilder;
+ for (const auto& eventName : internalOpTypes) {
+ internalOpTypeOrBuilder.append(BSON("o2." + eventName << BSON("$exists" << true)));
}
+ // TODO SERVER-66138: This filter can be removed after 7.0 release.
+ change_stream_legacy::populateInternalOperationFilter(expCtx, &internalOpTypeOrBuilder);
+
// Finalize the array of $or filter predicates.
internalOpTypeOrBuilder.done();
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index bf935d51da4..92998d18955 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -132,5 +132,47 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo
return Document{opLogEntry.getObject().getOwned()};
}
+// TODO SERVER-66138: This function can be removed after we branch for 7.0.
+void populateInternalOperationFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ BSONArrayBuilder* orBuilder) {
+ std::vector<StringData> opTypes = {"reshardBegin"_sd, "reshardDoneCatchUp"_sd};
+
+ // Noop change events that are only applicable when merging results on mongoS:
+ // - migrateChunkToNewShard: A chunk migrated to a shard that didn't have any chunks.
+ if (expCtx->inMongos || expCtx->needsMerge) {
+ opTypes.push_back("migrateChunkToNewShard"_sd);
+ }
+
+ for (const auto& eventName : opTypes) {
+ // Legacy oplog messages used the "o2.type" field to indicate the message type.
+ orBuilder->append(BSON("o2.type" << eventName));
+ }
+}
+
+// TODO SERVER-66138: This function can be removed after we branch for 7.0.
+Document convertFromLegacyOplogFormat(const Document& o2Entry, const NamespaceString& nss) {
+ auto type = o2Entry["type"];
+ if (type.missing()) {
+ return o2Entry;
+ }
+
+ MutableDocument doc(o2Entry);
+ doc.remove("type");
+
+ // This field would be the first field in the new format, but the current change stream code
+ // does not depend on the field order.
+ doc.addField(type.getString(), Value(nss.toString()));
+ return doc.freeze();
+}
+
+// TODO SERVER-66138: This function can be removed after we branch for 7.0.
+StringData getNewShardDetectedOpName(const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ // The op name on 6.0 and older versions.
+ const StringData kNewShardDetectedOpTypeLegacyName = "kNewShardDetected"_sd;
+ return (expCtx->changeStreamTokenVersion == ResumeTokenData::kDefaultTokenVersion)
+ ? DocumentSourceChangeStream::kNewShardDetectedOpType
+ : kNewShardDetectedOpTypeLegacyName;
+}
+
} // namespace change_stream_legacy
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
index fff3b770134..e2e17843b49 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
@@ -66,4 +66,19 @@ static const std::set<StringData> kClassicOperationTypes =
DocumentSourceChangeStream::kReshardDoneCatchUpOpType,
DocumentSourceChangeStream::kNewShardDetectedOpType};
+/**
+ * Adds filtering for legacy-format {op: 'n'} oplog messages, which used the "o2.type" field to
+ * indicate the message type.
+ */
+void populateInternalOperationFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ BSONArrayBuilder* filter);
+
+/**
+ * Converts legacy-format oplog o2 fields of type {type: <op name>, ...} to
+ * {..., <op name>: <namespace>}. Does nothing if the 'type' field is not present inside 'o2'.
+ */
+Document convertFromLegacyOplogFormat(const Document& legacyO2Entry, const NamespaceString& nss);
+
+StringData getNewShardDetectedOpName(const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
} // namespace mongo::change_stream_legacy
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index ab7489222c6..1ecdde1d5b6 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -230,8 +230,9 @@ public:
// The internal change events that are not exposed to the users.
static constexpr StringData kReshardBeginOpType = "reshardBegin"_sd;
static constexpr StringData kReshardDoneCatchUpOpType = "reshardDoneCatchUp"_sd;
+
// Internal op type to signal mongos to open cursors on new shards.
- static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd;
+ static constexpr StringData kNewShardDetectedOpType = "migrateChunkToNewShard"_sd;
// These events are guarded behind the 'showExpandedEvents' flag.
static constexpr StringData kCreateOpType = "create"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp
index 26cdcb77181..73212fedac4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_topology_change.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/document_source_change_stream_check_topology_change.h"
+#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/change_stream_topology_change_info.h"
namespace mongo {
@@ -78,7 +79,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamCheckTopologyChange::doG
// Throw the 'ChangeStreamTopologyChangeInfo' exception, wrapping the topology change event
// along with its metadata. This will bypass the remainder of the pipeline and will be passed
// directly up to mongoS.
- if (eventOpType == DocumentSourceChangeStream::kNewShardDetectedOpType) {
+ if (eventOpType == change_stream_legacy::getNewShardDetectedOpName(pExpCtx)) {
uasserted(ChangeStreamTopologyChangeInfo(eventDoc.toBsonWithMetaData()),
"Collection migrated to new shard");
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 8a48df33cbc..fcd7c965b37 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1306,8 +1306,7 @@ TEST_F(ChangeStreamStageTest, MatchFiltersDropDatabaseCommand) {
OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false);
checkTransformation(dropDB, boost::none);
}
-
-TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
+TEST_F(ChangeStreamStageTest, TransformNewShardDetectedLegacyFormat) {
auto o2Field = D{{"type", "migrateChunkToNewShard"_sd}};
auto newShardDetected = makeOplogEntry(OpTypeEnum::kNoop,
nss,
@@ -1318,14 +1317,12 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
Document expectedNewShardDetected{
{DSChangeStream::kIdField,
- makeResumeTokenWithEventId(
- kDefaultTs,
- testUuid(),
- Document{{"operationType", DSChangeStream::kNewShardDetectedOpType},
- {"documentKey", BSON("_id" << o2Field)}})},
+ makeResumeToken(
+ kDefaultTs, testUuid(), V{D{{}}}, DSChangeStream::kNewShardDetectedOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
getExpCtx()->needsMerge = true;
@@ -1333,11 +1330,70 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
checkTransformation(newShardDetected, expectedNewShardDetected);
}
+TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
+ auto o2Field = D{{"migrateChunkToNewShard", nss.toString()},
+ {"fromShardId", "fromShard"_sd},
+ {"toShardId", "toShard"_sd}};
+ auto newShardDetected = makeOplogEntry(OpTypeEnum::kNoop,
+ nss,
+ BSONObj(),
+ testUuid(),
+ boost::none, // fromMigrate
+ o2Field.toBson());
+
+ const auto opDesc = Value(D{{"fromShardId", "fromShard"_sd}, {"toShardId", "toShard"_sd}});
+ Document expectedNewShardDetected{
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), opDesc, DSChangeStream::kNewShardDetectedOpType)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kCollectionUuidField, testUuid()},
+ {DSChangeStream::kWallTimeField, Date_t()},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kOperationDescriptionField, opDesc},
+ };
+
+ getExpCtx()->needsMerge = true;
+
+ checkTransformation(newShardDetected, expectedNewShardDetected, kShowExpandedEventsSpec);
+}
+
+TEST_F(ChangeStreamStageTest, TransformReshardBeginLegacyFormat) {
+ auto uuid = UUID::gen();
+ auto reshardingUuid = UUID::gen();
+
+ const auto o2FieldInLegacyFormat = BSON("type"
+ << "reshardBegin"
+ << "reshardingUUID" << reshardingUuid);
+ auto reshardingBegin = makeOplogEntry(OpTypeEnum::kNoop,
+ nss,
+ BSONObj(),
+ uuid,
+ true, // fromMigrate
+ o2FieldInLegacyFormat);
+
+ auto spec = fromjson("{$changeStream: {showMigrationEvents: true, showExpandedEvents: true}}");
+
+ const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}};
+ Document expectedReshardingBegin{
+ {DSChangeStream::kReshardingUuidField, reshardingUuid},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, uuid, opDesc, DSChangeStream::kReshardBeginOpType)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kCollectionUuidField, uuid},
+ {DSChangeStream::kWallTimeField, Date_t()},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kOperationDescriptionField, opDesc},
+ };
+ checkTransformation(reshardingBegin, expectedReshardingBegin, spec);
+}
+
TEST_F(ChangeStreamStageTest, TransformReshardBegin) {
auto uuid = UUID::gen();
auto reshardingUuid = UUID::gen();
- ReshardingChangeEventO2Field o2Field{reshardingUuid, ReshardingChangeEventEnum::kReshardBegin};
+ ReshardBeginChangeEventO2Field o2Field{nss, reshardingUuid};
auto reshardingBegin = makeOplogEntry(OpTypeEnum::kNoop,
nss,
BSONObj(),
@@ -1345,29 +1401,68 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) {
true, // fromMigrate
o2Field.toBSON());
- auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
+ auto spec = fromjson("{$changeStream: {showMigrationEvents: true, showExpandedEvents: true}}");
+ const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}};
Document expectedReshardingBegin{
{DSChangeStream::kReshardingUuidField, reshardingUuid},
{DSChangeStream::kIdField,
- makeResumeTokenWithEventId(kDefaultTs,
- uuid,
- Document{{"operationType", DSChangeStream::kReshardBeginOpType},
- {"documentKey", BSON("_id" << o2Field.toBSON())}})},
+ makeResumeToken(kDefaultTs, uuid, opDesc, DSChangeStream::kReshardBeginOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kCollectionUuidField, uuid},
{DSChangeStream::kWallTimeField, Date_t()},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kOperationDescriptionField, opDesc},
};
checkTransformation(reshardingBegin, expectedReshardingBegin, spec);
}
+TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUpLegacyFormat) {
+ auto existingUuid = UUID::gen();
+ auto reshardingUuid = UUID::gen();
+ auto temporaryNs = constructTemporaryReshardingNss(nss.db(), existingUuid);
+
+ const auto o2FieldInLegacyFormat = BSON("type"
+ << "reshardDoneCatchUp"
+ << "reshardingUUID" << reshardingUuid);
+ auto reshardDoneCatchUp = makeOplogEntry(OpTypeEnum::kNoop,
+ temporaryNs,
+ BSONObj(),
+ reshardingUuid,
+ true, // fromMigrate
+ o2FieldInLegacyFormat);
+
+ auto spec = fromjson(
+ "{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true, "
+ "showExpandedEvents: true}}");
+ auto expCtx = getExpCtx();
+ expCtx->ns = temporaryNs;
+
+ const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}};
+ Document expectedReshardingDoneCatchUp{
+ {DSChangeStream::kReshardingUuidField, reshardingUuid},
+ {DSChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, reshardingUuid, opDesc, DSChangeStream::kReshardDoneCatchUpOpType)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kCollectionUuidField, reshardingUuid},
+ {DSChangeStream::kWallTimeField, Date_t()},
+ {DSChangeStream::kNamespaceField,
+ D{{"db", temporaryNs.db()}, {"coll", temporaryNs.coll()}}},
+ {DSChangeStream::kOperationDescriptionField, opDesc},
+ };
+
+ checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec);
+}
+
TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
auto existingUuid = UUID::gen();
auto reshardingUuid = UUID::gen();
auto temporaryNs = constructTemporaryReshardingNss(nss.db(), existingUuid);
- ReshardingChangeEventO2Field o2Field{reshardingUuid,
- ReshardingChangeEventEnum::kReshardDoneCatchUp};
+ ReshardDoneCatchUpChangeEventO2Field o2Field{temporaryNs, reshardingUuid};
auto reshardDoneCatchUp = makeOplogEntry(OpTypeEnum::kNoop,
temporaryNs,
BSONObj(),
@@ -1375,22 +1470,25 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
true, // fromMigrate
o2Field.toBSON());
- auto spec =
- fromjson("{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true}}");
+ auto spec = fromjson(
+ "{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true, "
+ "showExpandedEvents: true}}");
auto expCtx = getExpCtx();
expCtx->ns = temporaryNs;
+ const auto opDesc = V{D{{"reshardingUUID", reshardingUuid}}};
Document expectedReshardingDoneCatchUp{
{DSChangeStream::kReshardingUuidField, reshardingUuid},
{DSChangeStream::kIdField,
- makeResumeTokenWithEventId(
- kDefaultTs,
- reshardingUuid,
- Document{{"operationType", DSChangeStream::kReshardDoneCatchUpOpType},
- {"documentKey", BSON("_id" << o2Field.toBSON())}})},
+ makeResumeToken(
+ kDefaultTs, reshardingUuid, opDesc, DSChangeStream::kReshardDoneCatchUpOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kCollectionUuidField, reshardingUuid},
{DSChangeStream::kWallTimeField, Date_t()},
+ {DSChangeStream::kNamespaceField,
+ D{{"db", temporaryNs.db()}, {"coll", temporaryNs.coll()}}},
+ {DSChangeStream::kOperationDescriptionField, opDesc},
};
checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec);