summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-04-21 18:37:35 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-21 19:01:45 +0000
commit7d6df44172ec584a241315f64444e982c3016979 (patch)
treea53a9a154719e9b2faa1412c4ac28ae2e11d1b00
parent8b954eecbd5560075d21ff2243e5c1514f9a69e2 (diff)
downloadmongo-7d6df44172ec584a241315f64444e982c3016979.tar.gz
SERVER-65022 always report 'wallTime' in change stream event output
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js6
-rw-r--r--jstests/change_streams/shell_helper.js1
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp57
4 files changed, 69 insertions, 1 deletions
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
index 43df7cfd1bb..d268938640b 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
@@ -266,6 +266,10 @@ for (let csConfig of [{fullDocument: "updateLookup"}]) {
for (let i = 0; i < nonOptimizedOutput.length; ++i) {
try {
assert(i < optimizedOutput.length);
+ if (optimizedOutput[i].hasOwnProperty("wallTime") &&
+ nonOptimizedOutput[i].hasOwnProperty("wallTime")) {
+ optimizedOutput[i].wallTime = nonOptimizedOutput[i].wallTime;
+ }
assert(friendlyEqual(optimizedOutput[i], nonOptimizedOutput[i]));
} catch (error) {
failedTestCases.push({
@@ -283,4 +287,4 @@ for (let csConfig of [{fullDocument: "updateLookup"}]) {
// Assert that there were no failed test cases.
assert(failedTestCases.length == 0, failedTestCases);
-})(); \ No newline at end of file
+})();
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js
index de7cf737280..65f40508694 100644
--- a/jstests/change_streams/shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -73,6 +73,7 @@ resumeToken = change._id;
// Remove the fields we cannot predict, then test that the change is as expected.
delete change._id;
delete change.clusterTime;
+delete change.wallTime;
assert.docEq(change, expected);
jsTestLog("Testing watch() with pipeline");
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 1731a99f328..12dfbc2ceb4 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -408,7 +408,13 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// Note: If the UUID is a missing value (which can be true for events like 'dropDatabase'),
// 'addField' will not add anything to the document.
doc.addField(DocumentSourceChangeStream::kCollectionUuidField, uuid);
+ }
+ // Check if the FCV is <= 5.3
+ bool FCVLessThanEq53 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.isLessThanOrEqualTo(
+ multiversion::FeatureCompatibilityVersion::kVersion_5_3);
+ if ((FCVLessThanEq53 && _changeStreamSpec.getShowExpandedEvents()) || !FCVLessThanEq53) {
const auto wallTime = input[repl::OplogEntry::kWallClockTimeFieldName];
checkValueType(wallTime, repl::OplogEntry::kWallClockTimeFieldName, BSONType::Date);
doc.addField(DocumentSourceChangeStream::kWallTimeField, wallTime);
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index ae9edbcd177..5d11196a08b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -358,6 +358,7 @@ public:
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
{
@@ -648,6 +649,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -676,6 +678,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) {
makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
@@ -695,6 +698,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
@@ -729,6 +733,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) {
makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
@@ -751,6 +756,7 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFields) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{
@@ -931,6 +937,7 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsLegacyNoId) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 1}, {"y", 1}}},
{
@@ -956,6 +963,7 @@ TEST_F(ChangeStreamStageTest, TransformRemoveFields) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, Document{{"_id", 1}, {"x", 2}}},
{
@@ -980,6 +988,7 @@ TEST_F(ChangeStreamStageTest, TransformReplace) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
@@ -1025,6 +1034,7 @@ TEST_F(ChangeStreamStageTest, TransformDelete) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
@@ -1100,6 +1110,7 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
@@ -1114,6 +1125,7 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
@@ -1122,6 +1134,7 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(dropColl, expectedDrop, kDefaultSpec, expectedInvalidate);
@@ -1188,6 +1201,7 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
@@ -1196,6 +1210,7 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate);
@@ -1264,6 +1279,7 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
};
Document expectedInvalidate{
@@ -1272,6 +1288,7 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate);
@@ -1295,6 +1312,7 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << o2Field))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
getExpCtx()->needsMerge = true;
@@ -1322,6 +1340,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) {
makeResumeToken(kDefaultTs, uuid, BSON("_id" << o2Field.toBSON()))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(reshardingBegin, expectedReshardingBegin, spec);
}
@@ -1351,6 +1370,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
makeResumeToken(kDefaultTs, reshardingUuid, BSON("_id" << o2Field.toBSON()))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec);
@@ -1529,6 +1549,7 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSONObj())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 123}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{}},
@@ -2268,6 +2289,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
{DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{
@@ -2285,6 +2307,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
{DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(dropColl, expectedDrop);
@@ -2303,6 +2326,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
{DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -2517,6 +2541,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
@@ -2525,6 +2550,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
auto next = lastStage->getNext();
@@ -2578,6 +2604,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeTokenWhe
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
@@ -2621,6 +2648,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKeyC
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
@@ -2663,6 +2691,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
@@ -2707,6 +2736,7 @@ TEST_F(ChangeStreamStageTest, RenameFromSystemToUserCollectionShouldIncludeNotif
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -2725,6 +2755,7 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -2798,6 +2829,7 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) {
makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -2842,6 +2874,7 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) {
makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", otherNss.db()}, {"coll", otherNss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -2900,6 +2933,7 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", ns.db()}, {"coll", ns.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
@@ -2917,6 +2951,7 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{"updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}},
@@ -2939,6 +2974,7 @@ TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{
@@ -2963,6 +2999,7 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
@@ -2984,6 +3021,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDelete) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
@@ -3028,6 +3066,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
@@ -3041,6 +3080,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDrop) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(dropColl, expectedDrop);
@@ -3057,6 +3097,7 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -3070,6 +3111,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
};
Document expectedInvalidate{
@@ -3078,6 +3120,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(dropDB, expectedDropDatabase, kDefaultSpec, expectedInvalidate);
@@ -3153,6 +3196,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
};
@@ -3166,6 +3210,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
{DSChangeStream::kFullDocumentBeforeChangeField, preImageObj},
@@ -3245,6 +3290,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
{
@@ -3262,6 +3308,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
{
@@ -3345,6 +3392,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, replacementDoc},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
@@ -3359,6 +3407,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, replacementDoc},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
@@ -3421,6 +3470,7 @@ TEST_F(ChangeStreamStageDBTest, RenameFromSystemToUserCollectionShouldIncludeNot
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -3439,6 +3489,7 @@ TEST_F(ChangeStreamStageDBTest, RenameFromUserToSystemCollectionShouldIncludeNot
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -3480,6 +3531,7 @@ TEST_F(ChangeStreamStageDBTest,
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
@@ -3518,6 +3570,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKe
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
@@ -3556,6 +3609,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPr
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
@@ -3593,6 +3647,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, BSON("_id" << 2))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
@@ -3645,6 +3700,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
@@ -3673,6 +3729,7 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},