diff options
5 files changed, 71 insertions, 5 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 958a60e5234..6235d5f59c3 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -204,6 +204,8 @@ last-continuous: test_file: jstests/sharding/resharding_change_stream_namespace_filtering.js - ticket: SERVER-65261 test_file: jstests/replsets/capped_deletes.js + - ticket: SERVER-65022 + test_file: jstests/sharding/change_stream_shard_failover.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: @@ -543,6 +545,8 @@ last-lts: test_file: jstests/sharding/refine_collection_shard_key_basic.js - ticket: SERVER-65261 test_file: jstests/replsets/capped_deletes.js + - ticket: SERVER-65022 + test_file: jstests/sharding/change_stream_shard_failover.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js index 43df7cfd1bb..d268938640b 100644 --- a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js +++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js @@ -266,6 +266,10 @@ for (let csConfig of [{fullDocument: "updateLookup"}]) { for (let i = 0; i < nonOptimizedOutput.length; ++i) { try { assert(i < optimizedOutput.length); + if (optimizedOutput[i].hasOwnProperty("wallTime") && + nonOptimizedOutput[i].hasOwnProperty("wallTime")) { + optimizedOutput[i].wallTime = nonOptimizedOutput[i].wallTime; + } assert(friendlyEqual(optimizedOutput[i], nonOptimizedOutput[i])); } catch (error) { failedTestCases.push({ @@ -283,4 +287,4 @@ for (let csConfig of [{fullDocument: "updateLookup"}]) { // Assert that there were no failed test cases. assert(failedTestCases.length == 0, failedTestCases); -})();
\ No newline at end of file +})(); diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js index de7cf737280..65f40508694 100644 --- a/jstests/change_streams/shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -73,6 +73,7 @@ resumeToken = change._id; // Remove the fields we cannot predict, then test that the change is as expected. delete change._id; delete change.clusterTime; +delete change.wallTime; assert.docEq(change, expected); jsTestLog("Testing watch() with pipeline"); diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index 1731a99f328..644ced702c7 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -408,12 +408,12 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum // Note: If the UUID is a missing value (which can be true for events like 'dropDatabase'), // 'addField' will not add anything to the document. doc.addField(DocumentSourceChangeStream::kCollectionUuidField, uuid); - - const auto wallTime = input[repl::OplogEntry::kWallClockTimeFieldName]; - checkValueType(wallTime, repl::OplogEntry::kWallClockTimeFieldName, BSONType::Date); - doc.addField(DocumentSourceChangeStream::kWallTimeField, wallTime); } + const auto wallTime = input[repl::OplogEntry::kWallClockTimeFieldName]; + checkValueType(wallTime, repl::OplogEntry::kWallClockTimeFieldName, BSONType::Date); + doc.addField(DocumentSourceChangeStream::kWallTimeField, wallTime); + // Invalidation, topology change, and resharding events have fewer fields. if (operationType == DocumentSourceChangeStream::kInvalidateOpType || operationType == DocumentSourceChangeStream::kNewShardDetectedOpType || diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index ae9edbcd177..5d11196a08b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -358,6 +358,7 @@ public: {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, { @@ -648,6 +649,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. @@ -676,6 +678,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) { makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first @@ -695,6 +698,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, @@ -729,6 +733,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) { makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first @@ -751,6 +756,7 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFields) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, { @@ -931,6 +937,7 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsLegacyNoId) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 1}, {"y", 1}}}, { @@ -956,6 +963,7 @@ TEST_F(ChangeStreamStageTest, TransformRemoveFields) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, Document{{"_id", 1}, {"x", 2}}}, { @@ -980,6 +988,7 @@ TEST_F(ChangeStreamStageTest, TransformReplace) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, @@ -1025,6 +1034,7 @@ TEST_F(ChangeStreamStageTest, TransformDelete) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, }; @@ -1100,6 +1110,7 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, }; @@ -1114,6 +1125,7 @@ TEST_F(ChangeStreamStageTest, TransformDrop) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; Document expectedInvalidate{ @@ -1122,6 +1134,7 @@ TEST_F(ChangeStreamStageTest, TransformDrop) { kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, }; checkTransformation(dropColl, expectedDrop, kDefaultSpec, expectedInvalidate); @@ -1188,6 +1201,7 @@ TEST_F(ChangeStreamStageTest, TransformRename) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; Document expectedInvalidate{ @@ -1196,6 +1210,7 @@ TEST_F(ChangeStreamStageTest, TransformRename) { kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, }; checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate); @@ -1264,6 +1279,7 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, }; Document expectedInvalidate{ @@ -1272,6 +1288,7 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) { kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, }; checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate); @@ -1295,6 +1312,7 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << o2Field))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, }; getExpCtx()->needsMerge = true; @@ -1322,6 +1340,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) { makeResumeToken(kDefaultTs, uuid, BSON("_id" << o2Field.toBSON()))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, }; checkTransformation(reshardingBegin, expectedReshardingBegin, spec); } @@ -1351,6 +1370,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) { makeResumeToken(kDefaultTs, reshardingUuid, BSON("_id" << o2Field.toBSON()))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, }; checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec); @@ -1529,6 +1549,7 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSONObj())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 123}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{}}, @@ -2268,6 +2289,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, { @@ -2285,6 +2307,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; checkTransformation(dropColl, expectedDrop); @@ -2303,6 +2326,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; checkTransformation(rename, expectedRename); @@ -2517,6 +2541,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; Document expectedInvalidate{ @@ -2525,6 +2550,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, }; auto next = lastStage->getNext(); @@ -2578,6 +2604,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeTokenWhe {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, @@ -2621,6 +2648,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKeyC {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, @@ -2663,6 +2691,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, @@ -2707,6 +2736,7 @@ TEST_F(ChangeStreamStageTest, RenameFromSystemToUserCollectionShouldIncludeNotif {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}}, }; checkTransformation(rename, expectedRename); @@ -2725,6 +2755,7 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; checkTransformation(rename, expectedRename); @@ -2798,6 +2829,7 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) { makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. @@ -2842,6 +2874,7 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) { makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", otherNss.db()}, {"coll", otherNss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. @@ -2900,6 +2933,7 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", ns.db()}, {"coll", ns.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, @@ -2917,6 +2951,7 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, {"updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}}, @@ -2939,6 +2974,7 @@ TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, { @@ -2963,6 +2999,7 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, @@ -2984,6 +3021,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDelete) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, }; @@ -3028,6 +3066,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, }; @@ -3041,6 +3080,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDrop) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; checkTransformation(dropColl, expectedDrop); @@ -3057,6 +3097,7 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; checkTransformation(rename, expectedRename); @@ -3070,6 +3111,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}}, }; Document expectedInvalidate{ @@ -3078,6 +3120,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, }; checkTransformation(dropDB, expectedDropDatabase, kDefaultSpec, expectedInvalidate); @@ -3153,6 +3196,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, documentKey}, }; @@ -3166,6 +3210,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, documentKey}, {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj}, @@ -3245,6 +3290,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, documentKey}, { @@ -3262,6 +3308,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, documentKey}, { @@ -3345,6 +3392,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, replacementDoc}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, documentKey}, @@ -3359,6 +3407,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, replacementDoc}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, documentKey}, @@ -3421,6 +3470,7 @@ TEST_F(ChangeStreamStageDBTest, RenameFromSystemToUserCollectionShouldIncludeNot {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}}, }; checkTransformation(rename, expectedRename); @@ -3439,6 +3489,7 @@ TEST_F(ChangeStreamStageDBTest, RenameFromUserToSystemCollectionShouldIncludeNot {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; checkTransformation(rename, expectedRename); @@ -3480,6 +3531,7 @@ TEST_F(ChangeStreamStageDBTest, {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, @@ -3518,6 +3570,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKe {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, @@ -3556,6 +3609,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPr {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, @@ -3593,6 +3647,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken {DSChangeStream::kIdField, makeResumeToken(ts, uuid, BSON("_id" << 2))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, @@ -3645,6 +3700,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) { {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, @@ -3673,6 +3729,7 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, |