summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js6
-rw-r--r--jstests/change_streams/shell_helper.js1
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp57
5 files changed, 71 insertions, 5 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 958a60e5234..6235d5f59c3 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -204,6 +204,8 @@ last-continuous:
test_file: jstests/sharding/resharding_change_stream_namespace_filtering.js
- ticket: SERVER-65261
test_file: jstests/replsets/capped_deletes.js
+ - ticket: SERVER-65022
+ test_file: jstests/sharding/change_stream_shard_failover.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
@@ -543,6 +545,8 @@ last-lts:
test_file: jstests/sharding/refine_collection_shard_key_basic.js
- ticket: SERVER-65261
test_file: jstests/replsets/capped_deletes.js
+ - ticket: SERVER-65022
+ test_file: jstests/sharding/change_stream_shard_failover.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
diff --git a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
index 43df7cfd1bb..d268938640b 100644
--- a/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
+++ b/jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js
@@ -266,6 +266,10 @@ for (let csConfig of [{fullDocument: "updateLookup"}]) {
for (let i = 0; i < nonOptimizedOutput.length; ++i) {
try {
assert(i < optimizedOutput.length);
+ if (optimizedOutput[i].hasOwnProperty("wallTime") &&
+ nonOptimizedOutput[i].hasOwnProperty("wallTime")) {
+ optimizedOutput[i].wallTime = nonOptimizedOutput[i].wallTime;
+ }
assert(friendlyEqual(optimizedOutput[i], nonOptimizedOutput[i]));
} catch (error) {
failedTestCases.push({
@@ -283,4 +287,4 @@ for (let csConfig of [{fullDocument: "updateLookup"}]) {
// Assert that there were no failed test cases.
assert(failedTestCases.length == 0, failedTestCases);
-})(); \ No newline at end of file
+})();
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js
index de7cf737280..65f40508694 100644
--- a/jstests/change_streams/shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -73,6 +73,7 @@ resumeToken = change._id;
// Remove the fields we cannot predict, then test that the change is as expected.
delete change._id;
delete change.clusterTime;
+delete change.wallTime;
assert.docEq(change, expected);
jsTestLog("Testing watch() with pipeline");
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 1731a99f328..644ced702c7 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -408,12 +408,12 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// Note: If the UUID is a missing value (which can be true for events like 'dropDatabase'),
// 'addField' will not add anything to the document.
doc.addField(DocumentSourceChangeStream::kCollectionUuidField, uuid);
-
- const auto wallTime = input[repl::OplogEntry::kWallClockTimeFieldName];
- checkValueType(wallTime, repl::OplogEntry::kWallClockTimeFieldName, BSONType::Date);
- doc.addField(DocumentSourceChangeStream::kWallTimeField, wallTime);
}
+ const auto wallTime = input[repl::OplogEntry::kWallClockTimeFieldName];
+ checkValueType(wallTime, repl::OplogEntry::kWallClockTimeFieldName, BSONType::Date);
+ doc.addField(DocumentSourceChangeStream::kWallTimeField, wallTime);
+
// Invalidation, topology change, and resharding events have fewer fields.
if (operationType == DocumentSourceChangeStream::kInvalidateOpType ||
operationType == DocumentSourceChangeStream::kNewShardDetectedOpType ||
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index ae9edbcd177..5d11196a08b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -358,6 +358,7 @@ public:
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
{
@@ -648,6 +649,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -676,6 +678,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) {
makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
@@ -695,6 +698,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
@@ -729,6 +733,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) {
makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
@@ -751,6 +756,7 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFields) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{
@@ -931,6 +937,7 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsLegacyNoId) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 1}, {"y", 1}}},
{
@@ -956,6 +963,7 @@ TEST_F(ChangeStreamStageTest, TransformRemoveFields) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, Document{{"_id", 1}, {"x", 2}}},
{
@@ -980,6 +988,7 @@ TEST_F(ChangeStreamStageTest, TransformReplace) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
@@ -1025,6 +1034,7 @@ TEST_F(ChangeStreamStageTest, TransformDelete) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
@@ -1100,6 +1110,7 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
@@ -1114,6 +1125,7 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
@@ -1122,6 +1134,7 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(dropColl, expectedDrop, kDefaultSpec, expectedInvalidate);
@@ -1188,6 +1201,7 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
@@ -1196,6 +1210,7 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate);
@@ -1264,6 +1279,7 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
};
Document expectedInvalidate{
@@ -1272,6 +1288,7 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate);
@@ -1295,6 +1312,7 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << o2Field))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
getExpCtx()->needsMerge = true;
@@ -1322,6 +1340,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) {
makeResumeToken(kDefaultTs, uuid, BSON("_id" << o2Field.toBSON()))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(reshardingBegin, expectedReshardingBegin, spec);
}
@@ -1351,6 +1370,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
makeResumeToken(kDefaultTs, reshardingUuid, BSON("_id" << o2Field.toBSON()))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec);
@@ -1529,6 +1549,7 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSONObj())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 123}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{}},
@@ -2268,6 +2289,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
{DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{
@@ -2285,6 +2307,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
{DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(dropColl, expectedDrop);
@@ -2303,6 +2326,7 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
{DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -2517,6 +2541,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
@@ -2525,6 +2550,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
auto next = lastStage->getNext();
@@ -2578,6 +2604,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeTokenWhe
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
@@ -2621,6 +2648,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKeyC
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
@@ -2663,6 +2691,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
@@ -2707,6 +2736,7 @@ TEST_F(ChangeStreamStageTest, RenameFromSystemToUserCollectionShouldIncludeNotif
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -2725,6 +2755,7 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -2798,6 +2829,7 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) {
makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -2842,6 +2874,7 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) {
makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", otherNss.db()}, {"coll", otherNss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
@@ -2900,6 +2933,7 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", ns.db()}, {"coll", ns.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
@@ -2917,6 +2951,7 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{"updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}},
@@ -2939,6 +2974,7 @@ TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
{
@@ -2963,6 +2999,7 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}, {"y", 1}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
@@ -2984,6 +3021,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDelete) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
@@ -3028,6 +3066,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
@@ -3041,6 +3080,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDrop) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(dropColl, expectedDrop);
@@ -3057,6 +3097,7 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -3070,6 +3111,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
};
Document expectedInvalidate{
@@ -3078,6 +3120,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
};
checkTransformation(dropDB, expectedDropDatabase, kDefaultSpec, expectedInvalidate);
@@ -3153,6 +3196,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
};
@@ -3166,6 +3210,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
{DSChangeStream::kFullDocumentBeforeChangeField, preImageObj},
@@ -3245,6 +3290,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
{
@@ -3262,6 +3308,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
{
@@ -3345,6 +3392,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, replacementDoc},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
@@ -3359,6 +3407,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, replacementDoc},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, documentKey},
@@ -3421,6 +3470,7 @@ TEST_F(ChangeStreamStageDBTest, RenameFromSystemToUserCollectionShouldIncludeNot
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -3439,6 +3489,7 @@ TEST_F(ChangeStreamStageDBTest, RenameFromUserToSystemCollectionShouldIncludeNot
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
checkTransformation(rename, expectedRename);
@@ -3480,6 +3531,7 @@ TEST_F(ChangeStreamStageDBTest,
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
@@ -3518,6 +3570,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKe
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
@@ -3556,6 +3609,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPr
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
@@ -3593,6 +3647,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken
{DSChangeStream::kIdField, makeResumeToken(ts, uuid, BSON("_id" << 2))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
@@ -3645,6 +3700,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) {
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
@@ -3673,6 +3729,7 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},