diff options
author | Arun Banala <arun.banala@mongodb.com> | 2022-04-21 17:38:00 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-28 10:17:50 +0000 |
commit | ae0241b7e9f6ce79d4cc2e8141d5ba88178369c8 (patch) | |
tree | b4d734bc77a5a571083b2e9e96c5e38a233eaffd | |
parent | 782e90f7ae48b583c0a33d555286ef035f35e396 (diff) | |
download | mongo-ae0241b7e9f6ce79d4cc2e8141d5ba88178369c8.tar.gz |
SERVER-64970 Start generating new v2 resume token format for all events by default
20 files changed, 459 insertions, 368 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_passthrough.yml index 4179f872b7f..1e122fc5fd9 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_passthrough.yml @@ -1,11 +1,10 @@ test_kind: js_test -# TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1. selector: roots: - jstests/change_streams/**/*.js exclude_files: # This test explicitly compares v1 and v2 tokens, and must be able to generate the former. - - jstests/change_streams/generate_v2_resume_token.js + - jstests/change_streams/generate_v1_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the @@ -40,7 +39,7 @@ executor: var testingReplication = true; load('jstests/libs/override_methods/set_read_and_write_concerns.js'); load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js'); - load('jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js'); + load('jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js'); hooks: # The CheckReplDBHash hook waits until all operations have replicated to and have been applied # on the secondaries, so we run the ValidateCollections hook after it to ensure we're diff --git a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_sharded_collections_passthrough.yml index a0f93f572b0..90e6eccefb1 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_sharded_collections_passthrough.yml @@ -1,11 +1,10 @@ test_kind: js_test -# TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1. selector: roots: - jstests/change_streams/**/*.js exclude_files: # This test explicitly compares v1 and v2 tokens, and must be able to generate the former. - - jstests/change_streams/generate_v2_resume_token.js + - jstests/change_streams/generate_v1_resume_token.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the @@ -37,7 +36,7 @@ executor: load('jstests/libs/override_methods/set_read_and_write_concerns.js'); load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js'); load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js'); - load('jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js'); + load('jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js'); hooks: - class: CheckReplDBHash - class: ValidateCollections diff --git a/etc/evergreen_yml_components/definitions.yml b/etc/evergreen_yml_components/definitions.yml index 234f6834e4a..546e7530984 100644 --- a/etc/evergreen_yml_components/definitions.yml +++ b/etc/evergreen_yml_components/definitions.yml @@ -4203,7 +4203,7 @@ tasks: - func: "run tests" - <<: *task_template - name: change_streams_v2_resume_token_passthrough + name: change_streams_v1_resume_token_passthrough tags: ["change_streams"] commands: - func: "do setup" @@ -4267,7 +4267,7 @@ tasks: - func: "run tests" - <<: *task_template - name: change_streams_v2_resume_token_sharded_collections_passthrough + name: change_streams_v1_resume_token_sharded_collections_passthrough tags: ["change_streams"] depends_on: - name: change_streams diff --git a/jstests/aggregation/api_version_stage_allowance_checks.js b/jstests/aggregation/api_version_stage_allowance_checks.js index f47b27fd630..1ef9d090e3f 100644 --- a/jstests/aggregation/api_version_stage_allowance_checks.js +++ b/jstests/aggregation/api_version_stage_allowance_checks.js @@ -149,7 +149,7 @@ result = testDB.runCommand({ pipeline: [{$project: {_id: 0}}], cursor: {}, writeConcern: {w: "majority"}, - $_generateV2ResumeTokens: true, + $_generateV2ResumeTokens: false, apiVersion: "1", apiStrict: true }); diff --git a/jstests/change_streams/generate_v2_resume_token.js b/jstests/change_streams/generate_v1_resume_token.js index 016b127d33b..05ebc2f6d92 100644 --- a/jstests/change_streams/generate_v2_resume_token.js +++ b/jstests/change_streams/generate_v1_resume_token.js @@ -1,8 +1,8 @@ /** - * Test that the $_generateV2ResumeTokens parameter can be used to force change streams to return v2 + * Test that the $_generateV2ResumeTokens parameter can be used to force change streams to return v1 * tokens. * @tags: [ - * requires_fcv_60 + * requires_fcv_61 * ] */ (function() { @@ -12,11 +12,11 @@ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateC const coll = assertDropAndRecreateCollection(db, jsTestName()); -// Create one stream that returns v1 tokens, the default. -const v1Stream = coll.watch([]); +// Create one stream that returns v2 tokens, the default. +const v2Stream = coll.watch([]); -// Create a second stream that explicitly requests v2 tokens. -const v2Stream = coll.watch([], {$_generateV2ResumeTokens: true}); +// Create a second stream that explicitly requests v1 tokens. +const v1Stream = coll.watch([], {$_generateV2ResumeTokens: false}); // Insert a test document into the collection. assert.commandWorked(coll.insert({_id: 1})); @@ -34,5 +34,5 @@ delete v1Event._id; delete v2Event._id; assert.docEq(v1Event, v2Event); -assert.neq(v1ResumeToken, v2ResumeToken); +assert.neq(v1ResumeToken, v2ResumeToken, {v1ResumeToken, v2ResumeToken}); })();
\ No newline at end of file diff --git a/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js b/jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js index 068d121fff7..b0fc5dc0de8 100644 --- a/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js +++ b/jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js @@ -1,7 +1,6 @@ /** * Loading this file overrides 'runCommand' with a function that modifies any $changeStream - * aggregation to use $_generateV2ResumeTokens:true. - * TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1. + * aggregation to use $_generateV2ResumeTokens:false. */ (function() { "use strict"; @@ -9,18 +8,18 @@ load("jstests/libs/override_methods/override_helpers.js"); // For 'OverrideHelpers'. // Override runCommand to set $_generateV2ResumeTokens on all $changeStreams. -function runCommandV2Tokens(conn, dbName, cmdName, cmdObj, originalRunCommand, makeRunCommandArgs) { +function runCommandV1Tokens(conn, dbName, cmdName, cmdObj, originalRunCommand, makeRunCommandArgs) { if (OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj)) { // Make a copy to avoid mutating the user's original command object. - cmdObj = Object.assign({}, cmdObj, {$_generateV2ResumeTokens: true}); + cmdObj = Object.assign({}, cmdObj, {$_generateV2ResumeTokens: false}); } return originalRunCommand.apply(conn, makeRunCommandArgs(cmdObj)); } // Always apply the override if a test spawns a parallel shell. OverrideHelpers.prependOverrideInParallelShell( - "jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js"); + "jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js"); // Override the default runCommand with our custom version. -OverrideHelpers.overrideRunCommand(runCommandV2Tokens); +OverrideHelpers.overrideRunCommand(runCommandV1Tokens); })();
\ No newline at end of file diff --git a/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js b/jstests/noPassthrough/change_stream_generate_v2_tokens_flag_with_test_commands_disabled.js index 2a1fe1fbad0..5b1585da5c1 100644 --- a/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js +++ b/jstests/noPassthrough/change_stream_generate_v2_tokens_flag_with_test_commands_disabled.js @@ -1,5 +1,6 @@ /** - * Test that the $_generateV2ResumeTokens parameter cannot be used when test commands are disabled. + * Test that the $_generateV2ResumeTokens parameter cannot be used on mongoS when test commands are + * disabled. * @tags: [ * uses_change_streams, * requires_sharding, @@ -16,12 +17,19 @@ TestData.enableTestCommands = false; // Create a sharding fixture with test commands disabled. const st = new ShardingTest({shards: 1, rs: {nodes: 1}}); -// Confirm that attempting to run change streams with $_generateV2ResumeTokens:true fails on mongos. +// Confirm that attempting to set any values for $_generateV2ResumeTokens field fails on mongos. assert.throwsWithCode(() => st.s.watch([], {$_generateV2ResumeTokens: true}).hasNext(), 6528201); +assert.throwsWithCode(() => st.s.watch([], {$_generateV2ResumeTokens: false}).hasNext(), 6528201); // Confirm that attempting to run change streams with $_generateV2ResumeTokens:true fails on shards. assert.throwsWithCode( () => st.rs0.getPrimary().watch([], {$_generateV2ResumeTokens: true}).hasNext(), 6528200); +// Explicity requesting v1 tokens is allowed on a shard. This is to allow a 6.0 mongoS to +// communicate with a 7.0 shard. +const stream = st.rs0.getPrimary().watch([], {$_generateV2ResumeTokens: false}); +assert.commandWorked(st.s.getDB("test")["coll"].insert({x: 1})); +assert.soon(() => stream.hasNext()); + st.stop(); })();
\ No newline at end of file diff --git a/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js b/jstests/noPassthrough/change_stream_mongos_with_generate_v2_resume_tokens_flag.js index 5139bff5b56..193c7e76825 100644 --- a/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js +++ b/jstests/noPassthrough/change_stream_mongos_with_generate_v2_resume_tokens_flag.js @@ -1,9 +1,8 @@ /** - * Test that mongoS explicitly sets the value of $_generateV2ResumeTokens to 'false' on the commands - * it sends to the shards if no value was specified by the client, and that if a value was - * specified, it forwards it to the shards. On a replica set, no explicit value is set; the - * aggregation simply treats it as default-false. - * TODO SERVER-65370: remove or rework this test when v2 tokens become the default. + * Test that mongoS does not set the value of $_generateV2ResumeTokens on the commands it sends to + * the shards, if no value was specified by the client. If a value was specified, mongoS forwards it + * to the shards. On a replica set, no explicit value is set; the aggregation simply treats it as + * default-true. * @tags: [ * uses_change_streams, * requires_sharding, @@ -34,24 +33,24 @@ const configColl = configDB.shards; assert.commandWorked(shardDB.setProfilingLevel(2)); assert.commandWorked(configDB.setProfilingLevel(2)); -// Create one stream on mongoS that returns v1 tokens, the default. -const v1MongosStream = mongosColl.watch([], {comment: "v1MongosStream"}); +// Create one stream on mongoS that returns v2 tokens, the default. +const v2MongosStream = mongosColl.watch([], {comment: "v2MongosStream"}); -// Create a second stream on mongoS that explicitly requests v2 tokens. -const v2MongosStream = - mongosColl.watch([], {comment: "v2MongosStream", $_generateV2ResumeTokens: true}); +// Create a second stream on mongoS that explicitly requests v1 tokens. +const v1MongosStream = + mongosColl.watch([], {comment: "v1MongosStream", $_generateV2ResumeTokens: false}); -// Create a stream directly on the shard which returns the default v1 tokens. -const v1ShardStream = shardColl.watch([], {comment: "v1ShardStream"}); +// Create a stream directly on the shard which returns the default v2 tokens. +const v2ShardStream = shardColl.watch([], {comment: "v2ShardStream"}); // Insert a test document into the collection. assert.commandWorked(mongosColl.insert({_id: 1})); // Wait until all streams have encountered the insert operation. -assert.soon(() => v1MongosStream.hasNext() && v2MongosStream.hasNext() && v1ShardStream.hasNext()); +assert.soon(() => v1MongosStream.hasNext() && v2MongosStream.hasNext() && v2ShardStream.hasNext()); -// Confirm that in a sharded cluster, mongoS explicitly sets $_generateV2ResumeTokens to false on -// the command that it sends to the shards if nothing was specified by the client. +// Confirm that in a sharded cluster, when v1 token is explicitly requested, mongoS fowards +// $_generateV2ResumeTokens:false to the shard. profilerHasAtLeastOneMatchingEntryOrThrow({ profileDB: shardDB, filter: { @@ -72,35 +71,36 @@ profilerHasAtLeastOneMatchingEntryOrThrow({ } }); -// Confirm that mongoS correctly forwards the value of $_generateV2ResumeTokens to the shards if it -// is specified by the client. +// Confirm that mongoS never sets the $_generateV2ResumeTokens field when client didn't explicitly +// specify. profilerHasAtLeastOneMatchingEntryOrThrow({ profileDB: shardDB, filter: { "originatingCommand.aggregate": mongosColl.getName(), "originatingCommand.comment": "v2MongosStream", - "originatingCommand.$_generateV2ResumeTokens": true + "originatingCommand.$_generateV2ResumeTokens": {$exists: false} } }); -// Confirm that we also forward the value of $_generateV2ResumeTokens to the config servers. +// Confirm that we also do not set the $_generateV2ResumeTokens field on the request sent to the +// config server. profilerHasAtLeastOneMatchingEntryOrThrow({ profileDB: configDB, filter: { "originatingCommand.aggregate": configColl.getName(), "originatingCommand.comment": "v2MongosStream", - "originatingCommand.$_generateV2ResumeTokens": true + "originatingCommand.$_generateV2ResumeTokens": {$exists: false} } }); // Confirm that on a replica set - in this case, a direct connection to the shard - no value is set // for $_generateV2ResumeTokens if the client did not specify one. The aggregation defaults to -// treating the value as false. +// treating the value as true. profilerHasAtLeastOneMatchingEntryOrThrow({ profileDB: shardDB, filter: { "originatingCommand.aggregate": mongosColl.getName(), - "originatingCommand.comment": "v1ShardStream", + "originatingCommand.comment": "v2ShardStream", "originatingCommand.$_generateV2ResumeTokens": {$exists: false} } }); diff --git a/jstests/noPassthrough/change_stream_operation_metrics.js b/jstests/noPassthrough/change_stream_operation_metrics.js index bf7c75a9e24..37407122954 100644 --- a/jstests/noPassthrough/change_stream_operation_metrics.js +++ b/jstests/noPassthrough/change_stream_operation_metrics.js @@ -164,7 +164,7 @@ let nextId = nDocs; assert.gt(metrics[dbName].primaryMetrics.docUnitsRead, 0); assert.gt(metrics[dbName].primaryMetrics.cursorSeeks, 0); // Returns one large document - assert.eq(metrics[dbName].primaryMetrics.docUnitsReturned, 3); + assert.eq(metrics[dbName].primaryMetrics.docUnitsReturned, 4); }); // Update the document and ensure the metrics are aggregated. diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index ecf938a5e41..4a7a59f2fd1 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -445,12 +445,12 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( expCtx->forPerShardCursor = request.getPassthroughToShard().has_value(); expCtx->allowDiskUse = request.getAllowDiskUse().value_or(allowDiskUseByDefault.load()); - // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0 - // we only expect this to occur during testing. - // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false. - if (request.getGenerateV2ResumeTokens()) { - uassert(6528200, "Invalid request for v2 resume tokens", getTestCommandsEnabled()); - expCtx->changeStreamTokenVersion = 2; + // If the request explicitly specified NOT to use v2 resume tokens for change streams, set this + // on the expCtx. This can happen if a the request originated from 6.0 mongos, or in test mode. + if (request.getGenerateV2ResumeTokens().has_value()) { + // We only ever expect an explicit $_generateV2ResumeTokens to be false. + uassert(6528200, "Invalid request for v2 tokens", !request.getGenerateV2ResumeTokens()); + expCtx->changeStreamTokenVersion = 1; } return expCtx; diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl index b84513a30e6..03637da1041 100644 --- a/src/mongo/db/pipeline/aggregate_command.idl +++ b/src/mongo/db/pipeline/aggregate_command.idl @@ -261,7 +261,6 @@ commands: cpp_name: passthroughToShard optional: true unstable: true - # TODO SERVER-65370: after 6.0, $_generateV2ResumeTokens should be assumed true if absent. # TODO SERVER-65369: $_generateV2ResumeTokens can be removed after 7.0. $_generateV2ResumeTokens: description: "Internal parameter to signal whether v2 resume tokens should be generated." diff --git a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp index 2f4271e097f..89dabf3c73e 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp @@ -73,7 +73,10 @@ TEST(ChangeStreamEventTransformTest, TestDefaultUpdateTransform) { // Update fields Document expectedUpdateField{ {DocumentSourceChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), Value(documentKey))}, + makeResumeToken(kDefaultTs, + testUuid(), + Value(documentKey), + DocumentSourceChangeStream::kUpdateOpType)}, {DocumentSourceChangeStream::kOperationTypeField, DocumentSourceChangeStream::kUpdateOpType}, {DocumentSourceChangeStream::kClusterTimeField, kDefaultTs}, @@ -111,10 +114,7 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewTransform) { Document expectedDoc{ {DocumentSourceChangeStream::kIdField, makeResumeToken( - kDefaultTs, - testUuid(), - Value(Document{{"operationType", DocumentSourceChangeStream::kCreateOpType}, - {"operationDescription", opDescription}}))}, + kDefaultTs, testUuid(), opDescription, DocumentSourceChangeStream::kCreateOpType)}, {DocumentSourceChangeStream::kOperationTypeField, DocumentSourceChangeStream::kCreateOpType}, {DocumentSourceChangeStream::kClusterTimeField, kDefaultTs}, @@ -145,17 +145,19 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) { boost::none, // fromMigrate boost::none); // o2 - Document expectedDoc{{DocumentSourceChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), documentKey)}, - {DocumentSourceChangeStream::kOperationTypeField, - DocumentSourceChangeStream::kInsertOpType}, - {DocumentSourceChangeStream::kClusterTimeField, kDefaultTs}, - {DocumentSourceChangeStream::kCollectionUuidField, testUuid()}, - {DocumentSourceChangeStream::kWallTimeField, Date_t()}, - {DocumentSourceChangeStream::kFullDocumentField, Document(document)}, - {DocumentSourceChangeStream::kNamespaceField, - Document{{"db", systemViewNss.db()}, {"coll", systemViewNss.coll()}}}, - {DocumentSourceChangeStream::kDocumentKeyField, documentKey}}; + Document expectedDoc{ + {DocumentSourceChangeStream::kIdField, + makeResumeToken( + kDefaultTs, testUuid(), documentKey, DocumentSourceChangeStream::kInsertOpType)}, + {DocumentSourceChangeStream::kOperationTypeField, + DocumentSourceChangeStream::kInsertOpType}, + {DocumentSourceChangeStream::kClusterTimeField, kDefaultTs}, + {DocumentSourceChangeStream::kCollectionUuidField, testUuid()}, + {DocumentSourceChangeStream::kWallTimeField, Date_t()}, + {DocumentSourceChangeStream::kFullDocumentField, Document(document)}, + {DocumentSourceChangeStream::kNamespaceField, + Document{{"db", systemViewNss.db()}, {"coll", systemViewNss.coll()}}}, + {DocumentSourceChangeStream::kDocumentKeyField, documentKey}}; ASSERT_DOCUMENT_EQ(applyTransformation(oplogEntry), expectedDoc); } diff --git a/src/mongo/db/pipeline/change_stream_test_helpers.cpp b/src/mongo/db/pipeline/change_stream_test_helpers.cpp index b3a29103d94..ccb8966e5da 100644 --- a/src/mongo/db/pipeline/change_stream_test_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_test_helpers.cpp @@ -58,12 +58,26 @@ LogicalSessionFromClient testLsid() { Document makeResumeToken(Timestamp ts, ImplicitValue uuid, - ImplicitValue docKey, + ImplicitValue docKeyOrOpDesc, + StringData operationType, ResumeTokenData::FromInvalidate fromInvalidate, size_t txnOpIndex) { + static const std::set<StringData> kCrudOps = { + "insert"_sd, "update"_sd, "replace"_sd, "delete"_sd}; + auto eventId = Value(Document{ + {"operationType", operationType}, + {kCrudOps.count(operationType) ? "documentKey" : "operationDescription", docKeyOrOpDesc}}); + return makeResumeTokenWithEventId(ts, uuid, eventId, fromInvalidate, txnOpIndex); +} + +Document makeResumeTokenWithEventId(Timestamp ts, + ImplicitValue uuid, + ImplicitValue eventIdentifier, + ResumeTokenData::FromInvalidate fromInvalidate, + size_t txnOpIndex) { ResumeTokenData tokenData; tokenData.clusterTime = ts; - tokenData.eventIdentifier = docKey; + tokenData.eventIdentifier = eventIdentifier; tokenData.fromInvalidate = fromInvalidate; tokenData.txnOpIndex = txnOpIndex; if (!uuid.missing()) diff --git a/src/mongo/db/pipeline/change_stream_test_helpers.h b/src/mongo/db/pipeline/change_stream_test_helpers.h index 2a2059cc582..b2492dbe5dd 100644 --- a/src/mongo/db/pipeline/change_stream_test_helpers.h +++ b/src/mongo/db/pipeline/change_stream_test_helpers.h @@ -57,12 +57,20 @@ const UUID& testUuid(); LogicalSessionFromClient testLsid(); Document makeResumeToken(Timestamp ts, - ImplicitValue uuid = Value(), - ImplicitValue docKey = Value(), + ImplicitValue uuid, + ImplicitValue docKeyOrOpDesc, + StringData operationType, ResumeTokenData::FromInvalidate fromInvalidate = ResumeTokenData::FromInvalidate::kNotFromInvalidate, size_t txnOpIndex = 0); +Document makeResumeTokenWithEventId(Timestamp ts, + ImplicitValue uuid, + ImplicitValue eventIdentifier, + ResumeTokenData::FromInvalidate fromInvalidate = + ResumeTokenData::FromInvalidate::kNotFromInvalidate, + size_t txnOpIndex = 0); + /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. */ diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index cc559e218a5..a1b1c22bd5f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -345,6 +345,24 @@ public: return res; } + Document makeExpectedUpdateEvent(Timestamp ts, + const NamespaceString& nss, + BSONObj documentKey, + Document upateMod, + bool expandedEvents = false) { + return Document{ + {DSChangeStream::kIdField, + makeResumeToken(ts, testUuid(), V{documentKey}, DSChangeStream::kUpdateOpType)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kCollectionUuidField, expandedEvents ? V{testUuid()} : Value()}, + {DSChangeStream::kWallTimeField, Date_t()}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, V{documentKey}}, + {"updateDescription", upateMod}, + }; + } + /** * Helper function to do a $v:2 delta oplog test. */ @@ -356,19 +374,9 @@ public: testUuid(), // uuid boost::none, // fromMigrate o2); // o2 - // Update fields - Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, - { - "updateDescription", - updateModificationEntry, - }, - }; + + const auto expectedUpdateField = + makeExpectedUpdateEvent(kDefaultTs, nss, o2, updateModificationEntry); checkTransformation(deltaOplog, expectedUpdateField); } @@ -513,16 +521,17 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndResumeAfter catalog.registerCollection(expCtx->opCtx, testUuid(), std::move(collection)); }); - ASSERT_THROWS_CODE( - DSChangeStream::createFromBson( - BSON(DSChangeStream::kStageName - << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)) - << "startAtOperationTime" << kDefaultTs)) - .firstElement(), - expCtx), - AssertionException, - 40674); + ASSERT_THROWS_CODE(DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON( + "resumeAfter" << makeResumeToken(kDefaultTs, + testUuid(), + BSON("x" << 2 << "_id" << 1), + DSChangeStream::kInsertOpType) + << "startAtOperationTime" << kDefaultTs)) + .firstElement(), + expCtx), + AssertionException, + 40674); } TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAfterAndResumeAfterOptions) { @@ -538,10 +547,15 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAfterAndResumeAfterOptions) { ASSERT_THROWS_CODE( DSChangeStream::createFromBson( BSON(DSChangeStream::kStageName - << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)) - << "startAfter" - << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)))) + << BSON("resumeAfter" << makeResumeToken(kDefaultTs, + testUuid(), + BSON("x" << 2 << "_id" << 1), + DSChangeStream::kInsertOpType) + << "startAfter" + << makeResumeToken(kDefaultTs, + testUuid(), + BSON("x" << 2 << "_id" << 1), + DSChangeStream::kInsertOpType))) .firstElement(), expCtx), AssertionException, @@ -558,16 +572,17 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndStartAfterO catalog.registerCollection(opCtx, testUuid(), std::move(collection)); }); - ASSERT_THROWS_CODE( - DSChangeStream::createFromBson( - BSON(DSChangeStream::kStageName - << BSON("startAfter" - << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)) - << "startAtOperationTime" << kDefaultTs)) - .firstElement(), - expCtx), - AssertionException, - 40674); + ASSERT_THROWS_CODE(DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON( + "startAfter" << makeResumeToken(kDefaultTs, + testUuid(), + BSON("x" << 2 << "_id" << 1), + DSChangeStream::kInsertOpType) + << "startAtOperationTime" << kDefaultTs)) + .firstElement(), + expCtx), + AssertionException, + 40674); } TEST_F(ChangeStreamStageTest, ShouldRejectResumeAfterWithResumeTokenMissingUUID) { @@ -582,7 +597,9 @@ TEST_F(ChangeStreamStageTest, ShouldRejectResumeAfterWithResumeTokenMissingUUID) ASSERT_THROWS_CODE( DSChangeStream::createFromBson( - BSON(DSChangeStream::kStageName << BSON("resumeAfter" << makeResumeToken(kDefaultTs))) + BSON(DSChangeStream::kStageName + << BSON("resumeAfter" << makeResumeToken( + kDefaultTs, Value(), Value(), DSChangeStream::kInsertOpType))) .firstElement(), expCtx), AssertionException, @@ -649,7 +666,8 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { Document expectedInsert{ {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, + makeResumeToken( + kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -678,7 +696,8 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) { Document expectedInsert{ {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))}, + makeResumeToken( + kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -698,7 +717,8 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) { BSON("_id" << 1)); // o2 Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -733,7 +753,8 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) { auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}"); Document expectedInsert{ {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))}, + makeResumeToken( + kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -754,19 +775,8 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFields) { boost::none, // fromMigrate o2); // o2 - // Update fields - Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, - { - "updateDescription", - D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}, - }, - }; + const auto expectedUpdateField = makeExpectedUpdateEvent( + kDefaultTs, nss, o2, D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}); checkTransformation(updateField, expectedUpdateField); } @@ -780,20 +790,12 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsShowExpandedEvents) { boost::none, // fromMigrate o2); // o2 - // Update fields - Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, - {DSChangeStream::kCollectionUuidField, testUuid()}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, - { - "updateDescription", - D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}, - }, - }; + const auto expectedUpdateField = + makeExpectedUpdateEvent(kDefaultTs, + nss, + o2, + D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}, + true /* expanded events */); checkTransformation(updateField, expectedUpdateField, kShowExpandedEventsSpec); } @@ -935,19 +937,8 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsLegacyNoId) { boost::none, // fromMigrate o2); // o2 - // Update fields - Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, D{{"x", 1}, {"y", 1}}}, - { - "updateDescription", - D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}, - }, - }; + const auto expectedUpdateField = makeExpectedUpdateEvent( + kDefaultTs, nss, o2, D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}); checkTransformation(updateField, expectedUpdateField); } @@ -961,19 +952,9 @@ TEST_F(ChangeStreamStageTest, TransformRemoveFields) { boost::none, // fromMigrate o2); // o2 - // Remove fields - Document expectedRemoveField{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, Document{{"_id", 1}, {"x", 2}}}, - { - "updateDescription", - D{{"updatedFields", D{}}, {"removedFields", {"y"_sd}}}, - }}; - checkTransformation(removeField, expectedRemoveField); + const auto expectedUpdateField = makeExpectedUpdateEvent( + kDefaultTs, nss, o2, D{{"updatedFields", D{}}, {"removedFields", {"y"_sd}}}); + checkTransformation(removeField, expectedUpdateField); } // namespace TEST_F(ChangeStreamStageTest, TransformReplace) { @@ -988,7 +969,8 @@ TEST_F(ChangeStreamStageTest, TransformReplace) { // Replace Document expectedReplace{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1011,7 +993,8 @@ TEST_F(ChangeStreamStageTest, TransformReplaceShowExpandedEvents) { // Replace Document expectedReplace{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kCollectionUuidField, testUuid()}, @@ -1034,7 +1017,8 @@ TEST_F(ChangeStreamStageTest, TransformDelete) { // Delete Document expectedDelete{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1065,7 +1049,8 @@ TEST_F(ChangeStreamStageTest, TransformDeleteShowExpandedEvents) { // Delete Document expectedDelete{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kCollectionUuidField, testUuid()}, @@ -1110,7 +1095,8 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) { auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}"); Document expectedDelete{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1125,7 +1111,8 @@ TEST_F(ChangeStreamStageTest, TransformDrop) { OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid()); Document expectedDrop{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1133,8 +1120,11 @@ TEST_F(ChangeStreamStageTest, TransformDrop) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + testUuid(), + Value(), + DSChangeStream::kDropCollectionOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1147,7 +1137,8 @@ TEST_F(ChangeStreamStageTest, TransformDropShowExpandedEvents) { OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid()); Document expectedDrop{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kCollectionUuidField, testUuid()}, @@ -1157,8 +1148,11 @@ TEST_F(ChangeStreamStageTest, TransformDropShowExpandedEvents) { Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + testUuid(), + Value(), + DSChangeStream::kDropCollectionOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1178,10 +1172,7 @@ TEST_F(ChangeStreamStageTest, TransformCreate) { Document expectedCreate{ {DSChangeStream::kIdField, makeResumeToken( - kDefaultTs, - testUuid(), - Value(Document{{"operationType", DocumentSourceChangeStream::kCreateOpType}, - {"operationDescription", expectedOpDescription}}))}, + kDefaultTs, testUuid(), Value(expectedOpDescription), DSChangeStream::kCreateOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kCreateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kCollectionUuidField, testUuid()}, @@ -1201,7 +1192,8 @@ TEST_F(ChangeStreamStageTest, TransformRename) { Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1209,8 +1201,11 @@ TEST_F(ChangeStreamStageTest, TransformRename) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + testUuid(), + Value(), + DSChangeStream::kRenameCollectionOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1226,25 +1221,30 @@ TEST_F(ChangeStreamStageTest, TransformRenameShowExpandedEvents) { << "dropTarget" << dropTarget), testUuid()); + const auto opDescription = D{ + {"to", D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, + {"dropTarget", dropTarget}, + }; Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken( + kDefaultTs, testUuid(), V{opDescription}, DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kCollectionUuidField, testUuid()}, {DSChangeStream::kWallTimeField, Date_t()}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kOperationDescriptionField, - D{ - {"to", D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, - {"dropTarget", dropTarget}, - }}, + {DSChangeStream::kOperationDescriptionField, opDescription}, }; Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + testUuid(), + V{opDescription}, + DSChangeStream::kRenameCollectionOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1279,7 +1279,8 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) { Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1287,8 +1288,11 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + testUuid(), + Value(), + DSChangeStream::kRenameCollectionOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1312,7 +1316,12 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { o2Field.toBson()); Document expectedNewShardDetected{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << o2Field))}, + {DSChangeStream::kIdField, + makeResumeTokenWithEventId( + kDefaultTs, + testUuid(), + Document{{"operationType", DSChangeStream::kNewShardDetectedOpType}, + {"documentKey", BSON("_id" << o2Field)}})}, {DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1340,7 +1349,10 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) { Document expectedReshardingBegin{ {DSChangeStream::kReshardingUuidField, reshardingUuid}, {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, uuid, BSON("_id" << o2Field.toBSON()))}, + makeResumeTokenWithEventId(kDefaultTs, + uuid, + Document{{"operationType", DSChangeStream::kReshardBeginOpType}, + {"documentKey", BSON("_id" << o2Field.toBSON())}})}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1370,7 +1382,11 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) { Document expectedReshardingDoneCatchUp{ {DSChangeStream::kReshardingUuidField, reshardingUuid}, {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, reshardingUuid, BSON("_id" << o2Field.toBSON()))}, + makeResumeTokenWithEventId( + kDefaultTs, + reshardingUuid, + Document{{"operationType", DSChangeStream::kReshardDoneCatchUpOpType}, + {"documentKey", BSON("_id" << o2Field.toBSON())}})}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1586,7 +1602,8 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact Document expectedResult{ {DSChangeStream::kTxnNumberField, static_cast<int>(*sessionInfo.getTxnNumber())}, {DSChangeStream::kLsidField, Document{{sessionInfo.getSessionId()->toBSON()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSONObj())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), BSONObj(), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -1682,6 +1699,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { makeResumeToken(applyOpsOpTime2.getTimestamp(), testUuid(), V{D{{"_id", 123}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 0)); @@ -1699,6 +1717,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { makeResumeToken(applyOpsOpTime2.getTimestamp(), testUuid(), V{D{{"_id", 456}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 1)); @@ -1716,6 +1735,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { makeResumeToken(applyOpsOpTime2.getTimestamp(), testUuid(), V{D{{"_id", 789}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 2)); } @@ -1853,6 +1873,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { makeResumeToken(applyOpsOpTime5.getTimestamp(), testUuid(), V{D{{"_id", 123}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 0)); @@ -1870,6 +1891,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { makeResumeToken(applyOpsOpTime5.getTimestamp(), testUuid(), V{D{{"_id", 456}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 1)); } @@ -2037,6 +2059,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), V{D{{"_id", 123}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 0)); @@ -2055,6 +2078,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), V{D{{"_id", 456}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 1)); @@ -2073,6 +2097,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), V{D{{"_id", 789}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 2)); @@ -2181,6 +2206,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) { makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), V{D{{"_id", 123}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 0)); @@ -2199,6 +2225,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) { makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), V{D{{"_id", 456}}}, + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 1)); @@ -2325,18 +2352,8 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { o2, // o2 opTime); // opTime - Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, - {DSChangeStream::kClusterTimeField, ts}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, - { - "updateDescription", - D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}, - }, - }; + const auto expectedUpdateField = makeExpectedUpdateEvent( + ts, nss, o2, D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}); checkTransformation(updateField, expectedUpdateField); // Test the 'clusterTime' field is copied from the oplog entry for a collection drop. @@ -2344,7 +2361,8 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { createCommand(BSON("drop" << nss.coll()), testUuid(), boost::none, opTime); Document expectedDrop{ - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(ts, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, ts}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2363,7 +2381,8 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(ts, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, ts}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2467,7 +2486,8 @@ TEST_F(ChangeStreamStageTest, DSCSTransformStageWithResumeTokenSerialize) { auto expCtx = getExpCtx(); DocumentSourceChangeStreamSpec spec; - spec.setResumeAfter(ResumeToken::parse(makeResumeToken(kDefaultTs, testUuid()))); + spec.setResumeAfter(ResumeToken::parse( + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType))); auto originalSpec = BSON("" << spec.toBSON()); // Verify that the 'initialPostBatchResumeToken' is populated while parsing. @@ -2533,8 +2553,12 @@ TEST_F(ChangeStreamStageTest, DSCSCheckInvalidateStageSerialization) { auto expCtx = getExpCtx(); DocumentSourceChangeStreamCheckInvalidateSpec spec; - spec.setStartAfterInvalidate(ResumeToken::parse(makeResumeToken( - kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate))); + spec.setStartAfterInvalidate( + ResumeToken::parse(makeResumeToken(kDefaultTs, + testUuid(), + Value(), + DSChangeStream::kDropCollectionOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate))); auto stageSpecAsBSON = BSON("" << spec.toBSON()); validateDocumentSourceStageSerialization<DocumentSourceChangeStreamCheckInvalidate>( @@ -2545,7 +2569,8 @@ TEST_F(ChangeStreamStageTest, DSCSResumabilityStageSerialization) { auto expCtx = getExpCtx(); DocumentSourceChangeStreamCheckResumabilitySpec spec; - spec.setResumeToken(ResumeToken::parse(makeResumeToken(kDefaultTs, testUuid()))); + spec.setResumeToken(ResumeToken::parse( + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType))); auto stageSpecAsBSON = BSON("" << spec.toBSON()); validateDocumentSourceStageSerialization<DocumentSourceChangeStreamCheckResumability>( @@ -2578,7 +2603,8 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { auto lastStage = stages.back(); Document expectedDrop{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2586,8 +2612,11 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + testUuid(), + Value(), + DSChangeStream::kDropCollectionOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2629,7 +2658,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2FieldIn BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2); - auto resumeToken = makeResumeToken(ts, uuid, docKey); + auto resumeToken = makeResumeToken(ts, uuid, docKey, DSChangeStream::kInsertOpType); BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type @@ -2642,7 +2671,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2FieldIn // The documentKey should have just an _id in this case. Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, uuid, D{{"_id", 2}})}, + {DSChangeStream::kIdField, + makeResumeToken(ts, uuid, D{{"_id", 2}}, DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2671,7 +2701,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldUseO2FieldInOplog) { BSONObj docKey = BSON("_id" << 1); - auto resumeToken = makeResumeToken(ts, uuid, docKey); + auto resumeToken = makeResumeToken(ts, uuid, docKey, DSChangeStream::kInsertOpType); BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); BSONObj o2 = BSON("_id" << 2 << "shardKey" << 3); @@ -2684,7 +2714,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldUseO2FieldInOplog) { opTime); // opTime Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kIdField, + makeResumeToken(ts, uuid, insertDoc, DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2712,7 +2743,7 @@ TEST_F(ChangeStreamStageTest, ResumeAfterFailsIfResumeTokenDoesNotContainUUID) { }); // Create a resume token from only the timestamp. - auto resumeToken = makeResumeToken(ts); + auto resumeToken = makeResumeToken(ts, Value(), Value(), DSChangeStream::kDropCollectionOpType); ASSERT_THROWS_CODE( DSChangeStream::createFromBson( @@ -2731,7 +2762,8 @@ TEST_F(ChangeStreamStageTest, RenameFromSystemToUserCollectionShouldIncludeNotif // Note that the collection rename does *not* have the queued invalidated field. Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2750,7 +2782,8 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2772,6 +2805,7 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) { makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kFromInvalidate); ASSERT_THROWS_CODE( @@ -2799,7 +2833,8 @@ TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) { auto next = stages.back()->getNext(); - auto expectedSortKey = makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1)); + auto expectedSortKey = makeResumeToken( + kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType); ASSERT_TRUE(next.isAdvanced()); ASSERT_VALUE_EQ(next.releaseDocument().metadata().getSortKey(), Value(expectedSortKey)); @@ -2825,7 +2860,8 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) { Document expectedInsert{ {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, + makeResumeToken( + kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2846,7 +2882,8 @@ TEST_F(ChangeStreamStageDBTest, TransformInsertShowExpandedEvents) { Document expectedInsert{ {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, + makeResumeToken( + kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kCollectionUuidField, testUuid()}, @@ -2870,7 +2907,8 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) { // Insert on another collection in the same database. Document expectedInsert{ {DSChangeStream::kIdField, - makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))}, + makeResumeToken( + kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2929,7 +2967,9 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy auto insert = makeOplogEntry( OpTypeEnum::kInsert, ns, BSON("_id" << 1), testUuid(), boost::none, BSON("_id" << 1)); Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))}, + {DSChangeStream::kIdField, + makeResumeToken( + kDefaultTs, testUuid(), BSON("_id" << 1), DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -2946,15 +2986,8 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) { BSONObj o2 = BSON("_id" << 1 << "x" << 2); auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2); - Document expectedUpdateField{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, - {"updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}}, - }; + const auto expectedUpdateField = makeExpectedUpdateEvent( + kDefaultTs, nss, o2, D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}); checkTransformation(updateField, expectedUpdateField); } @@ -2968,18 +3001,8 @@ TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) { boost::none, // fromMigrate o2); // o2 - // Remove fields - Document expectedRemoveField{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, - {DSChangeStream::kClusterTimeField, kDefaultTs}, - {DSChangeStream::kWallTimeField, Date_t()}, - {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, - { - "updateDescription", - D{{"updatedFields", D{}}, {"removedFields", {"y"_sd}}}, - }}; + const auto expectedRemoveField = makeExpectedUpdateEvent( + kDefaultTs, nss, o2, D{{"updatedFields", D{}}, {"removedFields", {"y"_sd}}}); checkTransformation(removeField, expectedRemoveField); } @@ -2995,7 +3018,8 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) { // Replace Document expectedReplace{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3017,7 +3041,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDelete) { // Delete Document expectedDelete{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3062,7 +3087,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) { // Delete auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}"); Document expectedDelete{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3076,7 +3102,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) { TEST_F(ChangeStreamStageDBTest, TransformDrop) { OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid()); Document expectedDrop{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3093,7 +3120,8 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) { Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3107,7 +3135,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { // Drop database entry doesn't have a UUID. Document expectedDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, Value(), Value(), DSChangeStream::kDropDatabaseOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3115,8 +3144,11 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + Value(), + Value(), + DSChangeStream::kDropDatabaseOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3130,7 +3162,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabaseShowExpandedEvents) { // Drop database entry doesn't have a UUID. Document expectedDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, Value(), Value(), DSChangeStream::kDropDatabaseOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3138,8 +3171,11 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabaseShowExpandedEvents) { }; Document expectedInvalidate{ {DSChangeStream::kIdField, - makeResumeToken( - kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)}, + makeResumeToken(kDefaultTs, + Value(), + Value(), + DSChangeStream::kDropDatabaseOpType, + ResumeTokenData::FromInvalidate::kFromInvalidate)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3192,7 +3228,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "off")); Document expectedDeleteNoPreImage{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kDeleteOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3206,7 +3243,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "whenAvailable")); Document expectedDeleteWithPreImage{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kDeleteOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3286,7 +3324,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "off")); Document expectedUpdateNoPreImage{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kUpdateOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3304,7 +3343,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "whenAvailable")); Document expectedUpdateWithPreImage{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kUpdateOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3388,7 +3428,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "off")); Document expectedReplaceNoPreImage{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kReplaceOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3403,7 +3444,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "whenAvailable")); Document expectedReplaceWithPreImage{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kReplaceOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3466,7 +3508,8 @@ TEST_F(ChangeStreamStageDBTest, RenameFromSystemToUserCollectionShouldIncludeNot Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", renamedColl.db()}, {"coll", renamedColl.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3485,7 +3528,8 @@ TEST_F(ChangeStreamStageDBTest, RenameFromUserToSystemCollectionShouldIncludeNot Document expectedRename{ {DSChangeStream::kRenameTargetNssField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}}, - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3514,7 +3558,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2Field }); BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2); - auto resumeToken = makeResumeToken(ts, uuid, docKey); + auto resumeToken = makeResumeToken(ts, uuid, docKey, DSChangeStream::kInsertOpType); BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type @@ -3527,7 +3571,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2Field // The documentKey should just have an _id in this case. Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, uuid, D{{"_id", 2}})}, + {DSChangeStream::kIdField, + makeResumeToken(ts, uuid, D{{"_id", 2}}, DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3551,7 +3596,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldUseO2FieldInOplog) { }); BSONObj docKey = BSON("_id" << 1); - auto resumeToken = makeResumeToken(ts, uuid, docKey); + auto resumeToken = makeResumeToken(ts, uuid, docKey, DSChangeStream::kInsertOpType); BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); BSONObj o2 = BSON("_id" << 2 << "shardKey" << 3); @@ -3564,7 +3609,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldUseO2FieldInOplog) { opTime); // opTime Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kIdField, + makeResumeToken(ts, uuid, insertDoc, DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, ts}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3591,6 +3637,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromInvalidateShouldFail) { makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), + DSChangeStream::kInsertOpType, ResumeTokenData::FromInvalidate::kFromInvalidate); ASSERT_THROWS_CODE( @@ -3611,15 +3658,19 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) { }); // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry. - auto resumeToken = makeResumeToken( - kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kNotFromInvalidate); + auto resumeToken = makeResumeToken(kDefaultTs, + Value(), + Value(), + DSChangeStream::kInsertOpType, + ResumeTokenData::FromInvalidate::kNotFromInvalidate); BSONObj insertDoc = BSON("_id" << 2); auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc, testUuid(), boost::none, insertDoc); Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, testUuid(), insertDoc, DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3641,14 +3692,16 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai }); // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry. - auto resumeToken = makeResumeToken(kDefaultTs); + auto resumeToken = + makeResumeToken(kDefaultTs, Value(), Value(), DSChangeStream::kDropDatabaseOpType); BSONObj insertDoc = BSON("_id" << 2); auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc, uuid, boost::none, insertDoc); Document expectedInsert{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)}, + {DSChangeStream::kIdField, + makeResumeToken(kDefaultTs, uuid, insertDoc, DSChangeStream::kInsertOpType)}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, {DSChangeStream::kWallTimeField, Date_t()}, @@ -3717,10 +3770,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatchAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), BSON("$match" << BSON("operationType" << "insert")), BSON("$match" << BSON("operationType" @@ -3796,10 +3850,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProjectAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), BSON("$project" << BSON("operationType" << 1)), BSON("$project" << BSON("fullDocument" << 1))}; @@ -3826,10 +3881,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithProjectMatchAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), BSON("$project" << BSON("operationType" << 1)), BSON("$match" << BSON("operationType" << "insert"))}; @@ -3906,10 +3962,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithUnsetAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), BSON("$unset" << "operationType")}; @@ -3983,10 +4040,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithAddFieldsAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), BSON("$addFields" << BSON("stockPrice" << 100))}; auto pipeline = buildTestPipeline(rawPipeline); @@ -4058,10 +4116,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSetAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), BSON("$set" << BSON("stockPrice" << 100))}; auto pipeline = buildTestPipeline(rawPipeline); @@ -4109,10 +4168,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceRootAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), BSON("$replaceRoot" << BSON("newRoot" << "$fullDocument"))}; @@ -4162,10 +4222,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceWithAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. const std::vector<BSONObj> rawPipeline = { - BSON("$changeStream" << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), BSON("$replaceWith" << "$fullDocument")}; @@ -4278,23 +4339,23 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsFalseAndUserProj TEST_F(ChangeStreamStageTest, ChangeStreamWithAllStagesAndResumeToken) { // We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which // filters out newly added events. - const std::vector<BSONObj> - rawPipeline = {BSON("$changeStream" - << BSON("resumeAfter" - << makeResumeToken(kDefaultTs, testUuid()) - << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName - << true)), - BSON("$project" << BSON("operationType" << 1)), - BSON("$unset" - << "_id"), - BSON("$addFields" << BSON("stockPrice" << 100)), - BSON("$set" << BSON("fullDocument.stockPrice" << 100)), - BSON("$match" << BSON("operationType" - << "insert")), - BSON("$replaceRoot" << BSON("newRoot" - << "$fullDocument")), - BSON("$replaceWith" - << "fullDocument.stockPrice")}; + const std::vector<BSONObj> rawPipeline = { + BSON("$changeStream" + << BSON("resumeAfter" + << makeResumeToken( + kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType) + << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)), + BSON("$project" << BSON("operationType" << 1)), + BSON("$unset" + << "_id"), + BSON("$addFields" << BSON("stockPrice" << 100)), + BSON("$set" << BSON("fullDocument.stockPrice" << 100)), + BSON("$match" << BSON("operationType" + << "insert")), + BSON("$replaceRoot" << BSON("newRoot" + << "$fullDocument")), + BSON("$replaceWith" + << "fullDocument.stockPrice")}; auto pipeline = buildTestPipeline(rawPipeline); @@ -4379,14 +4440,16 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2Token) { Value(Document{{"operationType", "update"_sd}, {"documentKey", higherDocumentKey}})); // The next event has a clusterTime later than the resume point, and should therefore start - // using the default token version. + // using the default token version, which is 2. next = lastStage->getNext(); ASSERT(next.isAdvanced()); const auto afterResumeTsResumeToken = ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData(); ASSERT_EQ(afterResumeTsResumeToken.clusterTime, afterResumeTs); ASSERT_EQ(afterResumeTsResumeToken.version, ResumeTokenData::kDefaultTokenVersion); - ASSERT_VALUE_EQ(afterResumeTsResumeToken.eventIdentifier, Value(midDocumentKey)); + ASSERT_VALUE_EQ( + afterResumeTsResumeToken.eventIdentifier, + Value(Document{{"operationType", "update"_sd}, {"documentKey", midDocumentKey}})); // Verify that no other events are returned. next = lastStage->getNext(); @@ -4445,14 +4508,16 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV1Token) { ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData(); ASSERT_EQ(afterResumeTsResumeToken.clusterTime, afterResumeTs); ASSERT_EQ(afterResumeTsResumeToken.version, ResumeTokenData::kDefaultTokenVersion); - ASSERT_VALUE_EQ(afterResumeTsResumeToken.eventIdentifier, Value(midDocumentKey)); + ASSERT_VALUE_EQ( + afterResumeTsResumeToken.eventIdentifier, + Value(Document{{"operationType", "update"_sd}, {"documentKey", midDocumentKey}})); // Verify that no other events are returned. next = lastStage->getNext(); ASSERT_FALSE(next.isAdvanced()); } -TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) { +TEST_F(MultiTokenFormatVersionTest, CanResumeFromV1HighWaterMark) { const auto beforeResumeTs = Timestamp(100, 1); const auto resumeTs = Timestamp(100, 2); const auto afterResumeTs = Timestamp(100, 3); @@ -4467,7 +4532,7 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) { // Create a v2 high water mark token which sorts immediately before 'firstOplogAtResumeTime'. ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs, 2).getData(); - resumeToken.version = 2; + resumeToken.version = 1; auto expCtx = getExpCtxRaw(); expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS( TenantDatabaseName(boost::none, "unittests")); @@ -4485,26 +4550,23 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) { // The high water mark token should be order ahead of every other entry with the same // clusterTime. So we should see both entries that match the resumeToken's clusterTime, and both - // should have inherited the token version 2 from the high water mark. + // should have inherited the token version 1 from the high water mark. auto lastStage = stages.back(); auto next = lastStage->getNext(); ASSERT(next.isAdvanced()); const auto sameTsResumeToken1 = ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData(); ASSERT_EQ(sameTsResumeToken1.clusterTime, resumeTs); - ASSERT_EQ(sameTsResumeToken1.version, 2); - ASSERT_VALUE_EQ(sameTsResumeToken1.eventIdentifier, - Value(Document{{"operationType", "update"_sd}, {"documentKey", documentKey}})); + ASSERT_EQ(sameTsResumeToken1.version, 1); + ASSERT_VALUE_EQ(sameTsResumeToken1.eventIdentifier, Value(documentKey)); next = lastStage->getNext(); ASSERT(next.isAdvanced()); const auto sameTsResumeToken2 = ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData(); ASSERT_EQ(sameTsResumeToken2.clusterTime, resumeTs); - ASSERT_EQ(sameTsResumeToken2.version, 2); - ASSERT_VALUE_EQ( - sameTsResumeToken2.eventIdentifier, - Value(Document{{"operationType", "update"_sd}, {"documentKey", higherDocumentKey}})); + ASSERT_EQ(sameTsResumeToken2.version, 1); + ASSERT_VALUE_EQ(sameTsResumeToken2.eventIdentifier, Value(higherDocumentKey)); // The resumeToken after the current clusterTime should start using the default version, and // corresponding 'eventIdentifier' format. @@ -4514,7 +4576,8 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) { ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData(); ASSERT_EQ(afterResumeTsResumeToken.clusterTime, afterResumeTs); ASSERT_EQ(afterResumeTsResumeToken.version, ResumeTokenData::kDefaultTokenVersion); - ASSERT_VALUE_EQ(afterResumeTsResumeToken.eventIdentifier, Value(documentKey)); + ASSERT_VALUE_EQ(afterResumeTsResumeToken.eventIdentifier, + Value(Document{{"operationType", "update"_sd}, {"documentKey", documentKey}})); // Verify that no other events are returned. next = lastStage->getNext(); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 28603f10a30..701c5b495a7 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -655,6 +655,7 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn invalidateToken.clusterTime = resumeTimestamp; invalidateToken.uuid = uuids[0]; invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; + invalidateToken.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken); // Add three documents which each have the invalidate resume token. We expect to see this in the @@ -695,6 +696,7 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv invalidateToken.clusterTime = resumeTimestamp; invalidateToken.uuid = uuids[0]; invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate; + invalidateToken.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken); // Create a second invalidate token with the same clusterTime but a different UUID. diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index a157c97fed9..589cf458ee7 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -44,7 +44,7 @@ struct ResumeTokenData { /* * The default or "preferred" token version generated by this version of the server. */ - static constexpr int kDefaultTokenVersion = 1; + static constexpr int kDefaultTokenVersion = 2; /** * Flag to indicate if the resume token is from an invalidate notification. diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp index eed85344ddd..cbdd6b8b0eb 100644 --- a/src/mongo/db/pipeline/resume_token_test.cpp +++ b/src/mongo/db/pipeline/resume_token_test.cpp @@ -61,6 +61,7 @@ TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) { ResumeTokenData resumeTokenDataIn; resumeTokenDataIn.clusterTime = ts; + resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); ResumeToken token(resumeTokenDataIn); ResumeTokenData tokenData = token.getData(); @@ -91,6 +92,7 @@ TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexEncoding) { ResumeTokenData resumeTokenDataIn; resumeTokenDataIn.clusterTime = ts; + resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); // Test serialization/parsing through Document. auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); @@ -243,6 +245,7 @@ TEST(ResumeToken, FailsToParseForInvalidTokenFormats) { Timestamp ts(1010, 4); ResumeTokenData tokenData; tokenData.clusterTime = ts; + tokenData.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); auto goodTokenDocBinData = ResumeToken(tokenData).toDocument(); auto goodData = goodTokenDocBinData["_data"].getStringData(); ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, goodData}, {"_typeBits", "string"_sd}}), @@ -259,6 +262,7 @@ TEST(ResumeToken, FailsToDecodeInvalidKeyString) { ResumeTokenData tokenData; tokenData.clusterTime = ts; + tokenData.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); auto goodTokenDocBinData = ResumeToken(tokenData).toDocument(); auto goodData = goodTokenDocBinData["_data"].getStringData(); @@ -382,6 +386,7 @@ TEST(ResumeToken, InvalidTxnOpIndex) { ResumeTokenData resumeTokenDataIn; resumeTokenDataIn.clusterTime = ts; resumeTokenDataIn.txnOpIndex = 1234; + resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}}); // Should round trip with a non-negative txnOpIndex. auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson()); diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index f63f6c61bc6..b49422c9959 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -114,9 +114,11 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi aggReq.setFromMongos(true); aggReq.setNeedsMerge(true); - // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed. - if (expCtx->inMongos) { - aggReq.setGenerateV2ResumeTokens(expCtx->changeStreamTokenVersion == 2); + // TODO SERVER-65369: This code block can be removed after 7.0. + if (expCtx->inMongos && expCtx->changeStreamTokenVersion == 1) { + // A request for v1 resume tokens on mongos should only be allowed in test mode. + tassert(6497000, "Invalid request for v1 resume tokens", getTestCommandsEnabled()); + aggReq.setGenerateV2ResumeTokens(false); } SimpleCursorOptions cursor; @@ -151,17 +153,6 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, cmdForShards[AggregateCommandRequest::kCollationFieldName] = Value(collationObj); } - // We explicitly set $_generateV2ResumeTokens to false, if not already set, to indicate that the - // shards should NOT produce v2 resume tokens for change streams; instead, they should continue - // generating v1 tokens. This facilitates upgrade between 6.0 (which produces v1 by default) - // and 7.0 (which will produce v2 by default, but will be capable of generating v1) by ensuring - // that a 6.0 mongoS on a mixed 6.0/7.0 cluster will see only v1 tokens in the stream. - // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed. - const auto& v2FieldName = AggregateCommandRequest::kGenerateV2ResumeTokensFieldName; - if (auto cmdObj = cmdForShards.peek(); expCtx->inMongos && cmdObj[v2FieldName].missing()) { - cmdForShards[v2FieldName] = Value(false); - } - // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. if (explainVerbosity) { diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index b3df71582da..72f57512200 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -133,12 +133,14 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( mergeCtx->inMongos = true; - // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0 - // we only expect this to occur during testing. - // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false. - if (request.getGenerateV2ResumeTokens()) { - uassert(6528201, "Invalid request for v2 resume tokens", getTestCommandsEnabled()); - mergeCtx->changeStreamTokenVersion = 2; + // If the request explicity specified NOT to use v2 resume tokens for change streams, set this + // on the expCtx. We only ever expect to see an explicit value during testing. + if (request.getGenerateV2ResumeTokens().has_value()) { + // If $_generateV2ResumeTokens was specified, we must be testing and it must be false. + uassert(6528201, + "Invalid request for v2 resume tokens", + getTestCommandsEnabled() && !request.getGenerateV2ResumeTokens()); + mergeCtx->changeStreamTokenVersion = 1; } // Serialize the 'AggregateCommandRequest' and save it so that the original command can be |