diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-10 13:25:49 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-12 14:00:17 -0400 |
commit | ec25294c8d0c1c60ff786ea99198749dc4788dd1 (patch) | |
tree | 7d7f55565974f226f3c8c9fb60e36c4f33ded665 /src/mongo | |
parent | 49ab42843f85a50a215687ddbd1dd5db6a94738c (diff) | |
download | mongo-ec25294c8d0c1c60ff786ea99198749dc4788dd1.tar.gz |
SERVER-34181 Include 'clusterTime' in each change stream
Diffstat (limited to 'src/mongo')
3 files changed, 134 insertions, 66 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index fc0614a2b8e..2f7f185dcb2 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -75,7 +75,6 @@ constexpr StringData DocumentSourceChangeStream::kNamespaceField; constexpr StringData DocumentSourceChangeStream::kUuidField; constexpr StringData DocumentSourceChangeStream::kOperationTypeField; constexpr StringData DocumentSourceChangeStream::kStageName; -constexpr StringData DocumentSourceChangeStream::kTimestampField; constexpr StringData DocumentSourceChangeStream::kClusterTimeField; constexpr StringData DocumentSourceChangeStream::kUpdateOpType; constexpr StringData DocumentSourceChangeStream::kDeleteOpType; @@ -688,6 +687,7 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D resumeTokenData.uuid = uuid.getUuid(); doc.addField(kIdField, Value(ResumeToken(resumeTokenData).toDocument())); doc.addField(kOperationTypeField, Value(operationType)); + doc.addField(kClusterTimeField, Value(resumeTokenData.clusterTime)); // If we're in a sharded environment, we'll need to merge the results by their sort key, so add // that as metadata. diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index bbaa868169b..6d7ac9e3eba 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -150,18 +150,13 @@ public: // transformation. static constexpr StringData kOperationTypeField = "operationType"_sd; - // The name of this stage. - static constexpr StringData kStageName = "$changeStream"_sd; - // The name of the field where the clusterTime of the change will be located after the // transformation. The cluster time will be located inside the change identifier, so the full // path to the cluster time will be kIdField + "." + kClusterTimeField. static constexpr StringData kClusterTimeField = "clusterTime"_sd; - // The name of the field where the timestamp of the change will be located after the - // transformation. The timestamp will be located inside the cluster time, so the full path - // to the timestamp will be kIdField + "." + kClusterTimeField + "." + kTimestampField. - static constexpr StringData kTimestampField = "ts"_sd; + // The name of this stage. + static constexpr StringData kStageName = "$changeStream"_sd; // The different types of operations we can use for the operation type. static constexpr StringData kUpdateOpType = "update"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index e89e9e0395b..5e51059853c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -69,8 +69,8 @@ using V = Value; using DSChangeStream = DocumentSourceChangeStream; -static const Timestamp ts(100, 1); -static const repl::OpTime optime(ts, 1); +static const Timestamp kDefaultTs(100, 1); +static const repl::OpTime kDefaultOpTime(kDefaultTs, 1); static const NamespaceString nss("unittests.change_stream"); class ChangeStreamStageTestNoSetup : public AggregationContextFixture { @@ -160,13 +160,15 @@ public: OplogEntry createCommand(const BSONObj& oField, const boost::optional<UUID> uuid = boost::none, - const boost::optional<bool> fromMigrate = boost::none) { + const boost::optional<bool> fromMigrate = boost::none, + boost::optional<repl::OpTime> opTime = boost::none) { return makeOplogEntry(OpTypeEnum::kCommand, // op type nss.getCommandNS(), // namespace oField, // o uuid, // uuid fromMigrate, // fromMigrate - boost::none); // o2 + boost::none, // o2 + opTime); // opTime } Document makeResumeToken(Timestamp ts, @@ -197,21 +199,22 @@ public: BSONObj object, boost::optional<UUID> uuid = testUuid(), boost::optional<bool> fromMigrate = boost::none, - boost::optional<BSONObj> object2 = boost::none) { + boost::optional<BSONObj> object2 = boost::none, + boost::optional<repl::OpTime> opTime = boost::none) { long long hash = 1LL; - return repl::OplogEntry(optime, // optime - hash, // hash - opType, // opType - nss, // namespace - uuid, // uuid - fromMigrate, // fromMigrate - repl::OplogEntry::kOplogVersion, // version - object, // o - object2, // o2 - {}, // sessionInfo - boost::none, // upsert - boost::none, // wall clock time - boost::none, // statement id + return repl::OplogEntry(opTime ? *opTime : kDefaultOpTime, // optime + hash, // hash + opType, // opType + nss, // namespace + uuid, // uuid + fromMigrate, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + object, // o + object2, // o2 + {}, // sessionInfo + boost::none, // upsert + boost::none, // wall clock time + boost::none, // statement id boost::none, // optime of previous write within same transaction boost::none, // pre-image optime boost::none); // post-image optime @@ -260,10 +263,11 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothResumeAfterClusterTimeAndResumeAft ASSERT_THROWS_CODE( DSChangeStream::createFromBson( - BSON(DSChangeStream::kStageName << BSON( - "resumeAfter" << makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1)) - << "$_resumeAfterClusterTime" - << BSON("ts" << ts))) + BSON(DSChangeStream::kStageName + << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)) + << "$_resumeAfterClusterTime" + << BSON("ts" << kDefaultTs))) .firstElement(), expCtx), AssertionException, @@ -279,10 +283,11 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtClusterTimeAndResumeAfterOp ASSERT_THROWS_CODE( DSChangeStream::createFromBson( - BSON(DSChangeStream::kStageName << BSON( - "resumeAfter" << makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1)) - << "startAtClusterTime" - << BSON("ts" << ts))) + BSON(DSChangeStream::kStageName + << BSON("resumeAfter" + << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)) + << "startAtClusterTime" + << BSON("ts" << kDefaultTs))) .firstElement(), expCtx), AssertionException, @@ -296,15 +301,15 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtAndResumeAfterClusterTimeOp Collection collection(stdx::make_unique<CollectionMock>(nss)); UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid()); - ASSERT_THROWS_CODE( - DSChangeStream::createFromBson( - BSON(DSChangeStream::kStageName - << BSON("$_resumeAfterClusterTime" << BSON("ts" << ts) << "startAtClusterTime" - << BSON("ts" << ts))) - .firstElement(), - expCtx), - AssertionException, - 50573); + ASSERT_THROWS_CODE(DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName + << BSON("$_resumeAfterClusterTime" << BSON("ts" << kDefaultTs) + << "startAtClusterTime" + << BSON("ts" << kDefaultTs))) + .firstElement(), + expCtx), + AssertionException, + 50573); } TEST_F(ChangeStreamStageTestNoSetup, FailsWithNoReplicationCoordinator) { @@ -339,8 +344,10 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { boost::none); // o2 Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. @@ -365,8 +372,10 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) { boost::none); // o2 Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1 << "x" << 2))}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first @@ -383,8 +392,9 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) { boost::none); // o2 Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1))}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, @@ -416,8 +426,9 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFields) { // Update fields Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, { @@ -441,8 +452,9 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsLegacyNoId) { // Update fields Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 1}, {"y", 1}}}, { @@ -464,8 +476,9 @@ TEST_F(ChangeStreamStageTest, TransformRemoveFields) { // Remove fields Document expectedRemoveField{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{{"_id", 1}, {"x", 2}}}}, { @@ -486,8 +499,9 @@ TEST_F(ChangeStreamStageTest, TransformReplace) { // Replace Document expectedReplace{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, @@ -506,8 +520,9 @@ TEST_F(ChangeStreamStageTest, TransformDelete) { // Delete Document expectedDelete{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, }; @@ -547,8 +562,9 @@ TEST_F(ChangeStreamStageTest, TransformInvalidate) { // Invalidate entry doesn't have a document id. Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; for (auto& entry : {dropColl, rename}) { checkTransformation(entry, expectedInvalidate); @@ -556,8 +572,9 @@ TEST_F(ChangeStreamStageTest, TransformInvalidate) { // Drop database invalidate entry doesn't have a UUID. Document expectedInvalidateDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(ts)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(dropDB, expectedInvalidateDropDatabase); } @@ -592,8 +609,9 @@ TEST_F(ChangeStreamStageTest, TransformInvalidateRenameDropTarget) { boost::none); // o2 Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(rename, expectedInvalidate); } @@ -608,12 +626,54 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { o2Field.toBson()); Document expectedNewShardDetected{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << o2Field))}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << o2Field))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(newShardDetected, expectedNewShardDetected); } +TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + + // Test the 'clusterTime' field is copied from the oplog entry for an update. + BSONObj o = BSON("$set" << BSON("y" << 1)); + BSONObj o2 = BSON("_id" << 1 << "x" << 2); + auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, // op type + nss, // namespace + o, // o + testUuid(), // uuid + boost::none, // fromMigrate + o2, // o2 + opTime); // opTime + + Document expectedUpdateField{ + {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, + { + "updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}, + }, + }; + checkTransformation(updateField, expectedUpdateField); + + // Test the 'clusterTime' field is copied from the oplog entry for an invalidation. + OplogEntry dropColl = + createCommand(BSON("drop" << nss.coll()), testUuid(), boost::none, opTime); + + // Invalidate entry doesn't have a document id. + Document expectedInvalidate{ + {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, ts}, + }; + checkTransformation(dropColl, expectedInvalidate); +} + TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) { auto collSpec = D{{"create", "foo"_sd}, @@ -699,8 +759,9 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { auto closeCursor = stages.back(); Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; auto next = closeCursor->getNext(); @@ -761,8 +822,10 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) { auto insert = makeOplogEntry(OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 2)); Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. @@ -777,8 +840,10 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) { // Insert on another collection in the same database. Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", otherNss.db()}, {"coll", otherNss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. @@ -833,8 +898,9 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy for (auto& ns : allowedNamespaces) { auto insert = makeOplogEntry(OpTypeEnum::kInsert, ns, BSON("_id" << 1)); Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1))}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", ns.db()}, {"coll", ns.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, @@ -849,8 +915,9 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) { auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2); Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, {"updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}}, @@ -870,8 +937,9 @@ TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) { // Remove fields Document expectedRemoveField{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{{"_id", 1}, {"x", 2}}}}, { @@ -892,8 +960,9 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) { // Replace Document expectedReplace{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, @@ -912,8 +981,9 @@ TEST_F(ChangeStreamStageDBTest, TransformDelete) { // Delete Document expectedDelete{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, }; @@ -953,8 +1023,9 @@ TEST_F(ChangeStreamStageDBTest, TransformInvalidate) { // Invalidate entry doesn't have a document id. Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; for (auto& entry : {dropColl, rename}) { checkTransformation(entry, expectedInvalidate); @@ -962,8 +1033,9 @@ TEST_F(ChangeStreamStageDBTest, TransformInvalidate) { // Drop database invalidate entry doesn't have a UUID. Document expectedInvalidateDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(ts)}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; checkTransformation(dropDB, expectedInvalidateDropDatabase); } @@ -972,8 +1044,9 @@ TEST_F(ChangeStreamStageDBTest, SystemCollectionsDropOrRenameShouldInvalidate) { NamespaceString systemColl(nss.db() + ".system.users"); NamespaceString renamedSystemColl(nss.db() + ".system.users_new"); Document expectedInvalidate{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, }; OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid()); |