summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-04-10 13:25:49 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-04-12 14:00:17 -0400
commitec25294c8d0c1c60ff786ea99198749dc4788dd1 (patch)
tree7d7f55565974f226f3c8c9fb60e36c4f33ded665 /src/mongo
parent49ab42843f85a50a215687ddbd1dd5db6a94738c (diff)
downloadmongo-ec25294c8d0c1c60ff786ea99198749dc4788dd1.tar.gz
SERVER-34181 Include 'clusterTime' in each change stream
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp189
3 files changed, 134 insertions, 66 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index fc0614a2b8e..2f7f185dcb2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -75,7 +75,6 @@ constexpr StringData DocumentSourceChangeStream::kNamespaceField;
constexpr StringData DocumentSourceChangeStream::kUuidField;
constexpr StringData DocumentSourceChangeStream::kOperationTypeField;
constexpr StringData DocumentSourceChangeStream::kStageName;
-constexpr StringData DocumentSourceChangeStream::kTimestampField;
constexpr StringData DocumentSourceChangeStream::kClusterTimeField;
constexpr StringData DocumentSourceChangeStream::kUpdateOpType;
constexpr StringData DocumentSourceChangeStream::kDeleteOpType;
@@ -688,6 +687,7 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
resumeTokenData.uuid = uuid.getUuid();
doc.addField(kIdField, Value(ResumeToken(resumeTokenData).toDocument()));
doc.addField(kOperationTypeField, Value(operationType));
+ doc.addField(kClusterTimeField, Value(resumeTokenData.clusterTime));
// If we're in a sharded environment, we'll need to merge the results by their sort key, so add
// that as metadata.
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index bbaa868169b..6d7ac9e3eba 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -150,18 +150,13 @@ public:
// transformation.
static constexpr StringData kOperationTypeField = "operationType"_sd;
- // The name of this stage.
- static constexpr StringData kStageName = "$changeStream"_sd;
-
// The name of the field where the clusterTime of the change will be located after the
// transformation. The cluster time will be located inside the change identifier, so the full
// path to the cluster time will be kIdField + "." + kClusterTimeField.
static constexpr StringData kClusterTimeField = "clusterTime"_sd;
- // The name of the field where the timestamp of the change will be located after the
- // transformation. The timestamp will be located inside the cluster time, so the full path
- // to the timestamp will be kIdField + "." + kClusterTimeField + "." + kTimestampField.
- static constexpr StringData kTimestampField = "ts"_sd;
+ // The name of this stage.
+ static constexpr StringData kStageName = "$changeStream"_sd;
// The different types of operations we can use for the operation type.
static constexpr StringData kUpdateOpType = "update"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index e89e9e0395b..5e51059853c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -69,8 +69,8 @@ using V = Value;
using DSChangeStream = DocumentSourceChangeStream;
-static const Timestamp ts(100, 1);
-static const repl::OpTime optime(ts, 1);
+static const Timestamp kDefaultTs(100, 1);
+static const repl::OpTime kDefaultOpTime(kDefaultTs, 1);
static const NamespaceString nss("unittests.change_stream");
class ChangeStreamStageTestNoSetup : public AggregationContextFixture {
@@ -160,13 +160,15 @@ public:
OplogEntry createCommand(const BSONObj& oField,
const boost::optional<UUID> uuid = boost::none,
- const boost::optional<bool> fromMigrate = boost::none) {
+ const boost::optional<bool> fromMigrate = boost::none,
+ boost::optional<repl::OpTime> opTime = boost::none) {
return makeOplogEntry(OpTypeEnum::kCommand, // op type
nss.getCommandNS(), // namespace
oField, // o
uuid, // uuid
fromMigrate, // fromMigrate
- boost::none); // o2
+ boost::none, // o2
+ opTime); // opTime
}
Document makeResumeToken(Timestamp ts,
@@ -197,21 +199,22 @@ public:
BSONObj object,
boost::optional<UUID> uuid = testUuid(),
boost::optional<bool> fromMigrate = boost::none,
- boost::optional<BSONObj> object2 = boost::none) {
+ boost::optional<BSONObj> object2 = boost::none,
+ boost::optional<repl::OpTime> opTime = boost::none) {
long long hash = 1LL;
- return repl::OplogEntry(optime, // optime
- hash, // hash
- opType, // opType
- nss, // namespace
- uuid, // uuid
- fromMigrate, // fromMigrate
- repl::OplogEntry::kOplogVersion, // version
- object, // o
- object2, // o2
- {}, // sessionInfo
- boost::none, // upsert
- boost::none, // wall clock time
- boost::none, // statement id
+ return repl::OplogEntry(opTime ? *opTime : kDefaultOpTime, // optime
+ hash, // hash
+ opType, // opType
+ nss, // namespace
+ uuid, // uuid
+ fromMigrate, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ object, // o
+ object2, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ boost::none, // wall clock time
+ boost::none, // statement id
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none); // post-image optime
@@ -260,10 +263,11 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothResumeAfterClusterTimeAndResumeAft
ASSERT_THROWS_CODE(
DSChangeStream::createFromBson(
- BSON(DSChangeStream::kStageName << BSON(
- "resumeAfter" << makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))
- << "$_resumeAfterClusterTime"
- << BSON("ts" << ts)))
+ BSON(DSChangeStream::kStageName
+ << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))
+ << "$_resumeAfterClusterTime"
+ << BSON("ts" << kDefaultTs)))
.firstElement(),
expCtx),
AssertionException,
@@ -279,10 +283,11 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtClusterTimeAndResumeAfterOp
ASSERT_THROWS_CODE(
DSChangeStream::createFromBson(
- BSON(DSChangeStream::kStageName << BSON(
- "resumeAfter" << makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))
- << "startAtClusterTime"
- << BSON("ts" << ts)))
+ BSON(DSChangeStream::kStageName
+ << BSON("resumeAfter"
+ << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))
+ << "startAtClusterTime"
+ << BSON("ts" << kDefaultTs)))
.firstElement(),
expCtx),
AssertionException,
@@ -296,15 +301,15 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtAndResumeAfterClusterTimeOp
Collection collection(stdx::make_unique<CollectionMock>(nss));
UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid());
- ASSERT_THROWS_CODE(
- DSChangeStream::createFromBson(
- BSON(DSChangeStream::kStageName
- << BSON("$_resumeAfterClusterTime" << BSON("ts" << ts) << "startAtClusterTime"
- << BSON("ts" << ts)))
- .firstElement(),
- expCtx),
- AssertionException,
- 50573);
+ ASSERT_THROWS_CODE(DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName
+ << BSON("$_resumeAfterClusterTime" << BSON("ts" << kDefaultTs)
+ << "startAtClusterTime"
+ << BSON("ts" << kDefaultTs)))
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ 50573);
}
TEST_F(ChangeStreamStageTestNoSetup, FailsWithNoReplicationCoordinator) {
@@ -339,8 +344,10 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
boost::none); // o2
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -365,8 +372,10 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) {
boost::none); // o2
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1 << "x" << 2))},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
@@ -383,8 +392,9 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) {
boost::none); // o2
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1))},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
@@ -416,8 +426,9 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFields) {
// Update fields
Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{
@@ -441,8 +452,9 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsLegacyNoId) {
// Update fields
Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 1}, {"y", 1}}},
{
@@ -464,8 +476,9 @@ TEST_F(ChangeStreamStageTest, TransformRemoveFields) {
// Remove fields
Document expectedRemoveField{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{{"_id", 1}, {"x", 2}}}},
{
@@ -486,8 +499,9 @@ TEST_F(ChangeStreamStageTest, TransformReplace) {
// Replace
Document expectedReplace{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
@@ -506,8 +520,9 @@ TEST_F(ChangeStreamStageTest, TransformDelete) {
// Delete
Document expectedDelete{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
@@ -547,8 +562,9 @@ TEST_F(ChangeStreamStageTest, TransformInvalidate) {
// Invalidate entry doesn't have a document id.
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
for (auto& entry : {dropColl, rename}) {
checkTransformation(entry, expectedInvalidate);
@@ -556,8 +572,9 @@ TEST_F(ChangeStreamStageTest, TransformInvalidate) {
// Drop database invalidate entry doesn't have a UUID.
Document expectedInvalidateDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(ts)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(dropDB, expectedInvalidateDropDatabase);
}
@@ -592,8 +609,9 @@ TEST_F(ChangeStreamStageTest, TransformInvalidateRenameDropTarget) {
boost::none); // o2
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(rename, expectedInvalidate);
}
@@ -608,12 +626,54 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
o2Field.toBson());
Document expectedNewShardDetected{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << o2Field))},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << o2Field))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(newShardDetected, expectedNewShardDetected);
}
+TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+
+ // Test the 'clusterTime' field is copied from the oplog entry for an update.
+ BSONObj o = BSON("$set" << BSON("y" << 1));
+ BSONObj o2 = BSON("_id" << 1 << "x" << 2);
+ auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, // op type
+ nss, // namespace
+ o, // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ o2, // o2
+ opTime); // opTime
+
+ Document expectedUpdateField{
+ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
+ {
+ "updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}},
+ },
+ };
+ checkTransformation(updateField, expectedUpdateField);
+
+ // Test the 'clusterTime' field is copied from the oplog entry for an invalidation.
+ OplogEntry dropColl =
+ createCommand(BSON("drop" << nss.coll()), testUuid(), boost::none, opTime);
+
+ // Invalidate entry doesn't have a document id.
+ Document expectedInvalidate{
+ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ };
+ checkTransformation(dropColl, expectedInvalidate);
+}
+
TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) {
auto collSpec =
D{{"create", "foo"_sd},
@@ -699,8 +759,9 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
auto closeCursor = stages.back();
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
auto next = closeCursor->getNext();
@@ -761,8 +822,10 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) {
auto insert = makeOplogEntry(OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 2));
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -777,8 +840,10 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) {
// Insert on another collection in the same database.
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", otherNss.db()}, {"coll", otherNss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -833,8 +898,9 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
for (auto& ns : allowedNamespaces) {
auto insert = makeOplogEntry(OpTypeEnum::kInsert, ns, BSON("_id" << 1));
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1))},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", ns.db()}, {"coll", ns.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
@@ -849,8 +915,9 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) {
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{"updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}},
@@ -870,8 +937,9 @@ TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) {
// Remove fields
Document expectedRemoveField{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{{"_id", 1}, {"x", 2}}}},
{
@@ -892,8 +960,9 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) {
// Replace
Document expectedReplace{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
@@ -912,8 +981,9 @@ TEST_F(ChangeStreamStageDBTest, TransformDelete) {
// Delete
Document expectedDelete{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
@@ -953,8 +1023,9 @@ TEST_F(ChangeStreamStageDBTest, TransformInvalidate) {
// Invalidate entry doesn't have a document id.
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
for (auto& entry : {dropColl, rename}) {
checkTransformation(entry, expectedInvalidate);
@@ -962,8 +1033,9 @@ TEST_F(ChangeStreamStageDBTest, TransformInvalidate) {
// Drop database invalidate entry doesn't have a UUID.
Document expectedInvalidateDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(ts)},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(dropDB, expectedInvalidateDropDatabase);
}
@@ -972,8 +1044,9 @@ TEST_F(ChangeStreamStageDBTest, SystemCollectionsDropOrRenameShouldInvalidate) {
NamespaceString systemColl(nss.db() + ".system.users");
NamespaceString renamedSystemColl(nss.db() + ".system.users_new");
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid());