diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2022-04-11 17:09:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-11 23:28:08 +0000 |
commit | 9f0897f74297a4ae415a3b5ebf482ff78afc2058 (patch) | |
tree | 9829721a2bcded15f410b23a8af5962bfd5e6a2d | |
parent | bbb9ba057afddf16a714c3c39688bee9fe8ccb56 (diff) | |
download | mongo-9f0897f74297a4ae415a3b5ebf482ff78afc2058.tar.gz |
SERVER-65282 Add $_generateV2ResumeTokens parameter to aggregate command
33 files changed, 459 insertions, 31 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml new file mode 100644 index 00000000000..4179f872b7f --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml @@ -0,0 +1,59 @@ +test_kind: js_test +# TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1. +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # This test explicitly compares v1 and v2 tokens, and must be able to generate the former. + - jstests/change_streams/generate_v2_resume_token.js + exclude_with_any_tags: + ## + # The next tags correspond to the special errors thrown by the + # set_read_and_write_concerns.js override when it refuses to replace the readConcern or + # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be + # warranted. + ## + # "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + +executor: + archive: + hooks: + - CheckReplDBHash + - CheckReplOplogs + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + defaultReadConcernLevel: null + enableMajorityReadConcern: '' + # Enable causal consistency for change streams suites using 1 node replica sets. Some tests + # rely on the assumption that a w:majority write will be visible immediately in a subsequently + # opened change stream. In 1 node replica sets, an operation that majority commits at + # timestamp T will force the majority snapshot to advance to T, but the oplog visibility point + # may not have advanced to T yet. Subsequent majority snapshot reads will see this write in + # the oplog, but speculative majority reads may not, since they read from a local snapshot and + # are bound to the oplog visibility rules. Using causal consistency forces the visibility + # point to advance to the timestamp of the last write before doing a new read. + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/set_read_and_write_concerns.js'); + load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js'); + load('jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js'); + hooks: + # The CheckReplDBHash hook waits until all operations have replicated to and have been applied + # on the secondaries, so we run the ValidateCollections hook after it to ensure we're + # validating the entire contents of the collection. + - class: CheckReplOplogs + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ReplicaSetFixture + mongod_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + num_nodes: 2 diff --git a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml new file mode 100644 index 00000000000..a0f93f572b0 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml @@ -0,0 +1,61 @@ +test_kind: js_test +# TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1. +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # This test explicitly compares v1 and v2 tokens, and must be able to generate the former. + - jstests/change_streams/generate_v2_resume_token.js + exclude_with_any_tags: + ## + # The next tags correspond to the special errors thrown by the + # set_read_and_write_concerns.js override when it refuses to replace the readConcern or + # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be + # warranted. + ## + # "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + # Exclude any that assume sharding is disabled + - assumes_against_mongod_not_mongos + - assumes_unsharded_collection + +executor: + archive: + hooks: + - CheckReplDBHash + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + defaultReadConcernLevel: null + enableMajorityReadConcern: '' + # Enable causal consistency for change streams suites using 1 node replica sets. See + # change_streams.yml for detailed explanation. + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/set_read_and_write_concerns.js'); + load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js'); + load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js'); + load('jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js'); + hooks: + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ShardedClusterFixture + mongos_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + mongod_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + writePeriodicNoops: 1 + periodicNoopIntervalSecs: 1 + coordinateCommitReturnImmediatelyAfterPersistingDecision: true + num_shards: 2 + enable_sharding: + - test diff --git a/etc/evergreen_yml_components/definitions.yml b/etc/evergreen_yml_components/definitions.yml index 98b9c802a79..5b7eae3aa68 100644 --- a/etc/evergreen_yml_components/definitions.yml +++ b/etc/evergreen_yml_components/definitions.yml @@ -4201,6 +4201,13 @@ tasks: - func: "do setup" - func: "run tests" +- <<: *task_template + name: change_streams_v2_resume_token_passthrough + tags: ["change_streams"] + commands: + - func: "do setup" + - func: "run tests" + - <<: *gen_task_template name: change_streams_multiversion_gen tags: ["multiversion", "multiversion_passthrough"] @@ -4258,6 +4265,15 @@ tasks: - func: "do setup" - func: "run tests" +- <<: *task_template + name: change_streams_v2_resume_token_sharded_collections_passthrough + tags: ["change_streams"] + depends_on: + - name: change_streams + commands: + - func: "do setup" + - func: "run tests" + - <<: *gen_task_template name: change_streams_sharded_collections_multiversion_gen tags: ["multiversion_passthrough", "multiversion"] diff --git a/jstests/aggregation/api_version_stage_allowance_checks.js b/jstests/aggregation/api_version_stage_allowance_checks.js index 751e35663f2..f47b27fd630 100644 --- a/jstests/aggregation/api_version_stage_allowance_checks.js +++ b/jstests/aggregation/api_version_stage_allowance_checks.js @@ -143,6 +143,18 @@ result = testDB.runCommand({ }); assert.commandWorked(result); +// Tests that the internal '$_generateV2ResumeTokens' option does not fail with 'apiStrict: true'. +result = testDB.runCommand({ + aggregate: collName, + pipeline: [{$project: {_id: 0}}], + cursor: {}, + writeConcern: {w: "majority"}, + $_generateV2ResumeTokens: true, + apiVersion: "1", + apiStrict: true +}); +assert.commandWorked(result); + // Tests that time-series collection can be queried (invoking $_internalUnpackBucket stage) // from an external client with 'apiStrict'. (function testInternalUnpackBucketAllowance() { diff --git a/jstests/change_streams/generate_v2_resume_token.js b/jstests/change_streams/generate_v2_resume_token.js new file mode 100644 index 00000000000..4c2fc5229fb --- /dev/null +++ b/jstests/change_streams/generate_v2_resume_token.js @@ -0,0 +1,35 @@ +/** + * Test that the $_generateV2ResumeTokens parameter can be used to force change streams to return v2 + * tokens. + */ +(function() { +"use strict"; + +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. + +const coll = assertDropAndRecreateCollection(db, jsTestName()); + +// Create one stream that returns v1 tokens, the default. +const v1Stream = coll.watch([]); + +// Create a second stream that explicitly requests v2 tokens. +const v2Stream = coll.watch([], {$_generateV2ResumeTokens: true}); + +// Insert a test document into the collection. +assert.commandWorked(coll.insert({_id: 1})); + +// Wait until both streams have encountered the insert operation. +assert.soon(() => v1Stream.hasNext() && v2Stream.hasNext()); +const v1Event = v1Stream.next(); +const v2Event = v2Stream.next(); + +// Confirm that the streams see the same event, but the resume tokens differ. +const v1ResumeToken = v1Event._id; +const v2ResumeToken = v2Event._id; + +delete v1Event._id; +delete v2Event._id; + +assert.docEq(v1Event, v2Event); +assert.neq(v1ResumeToken, v2ResumeToken); +})();
\ No newline at end of file diff --git a/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js b/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js new file mode 100644 index 00000000000..068d121fff7 --- /dev/null +++ b/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js @@ -0,0 +1,26 @@ +/** + * Loading this file overrides 'runCommand' with a function that modifies any $changeStream + * aggregation to use $_generateV2ResumeTokens:true. + * TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1. + */ +(function() { +"use strict"; + +load("jstests/libs/override_methods/override_helpers.js"); // For 'OverrideHelpers'. + +// Override runCommand to set $_generateV2ResumeTokens on all $changeStreams. +function runCommandV2Tokens(conn, dbName, cmdName, cmdObj, originalRunCommand, makeRunCommandArgs) { + if (OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj)) { + // Make a copy to avoid mutating the user's original command object. + cmdObj = Object.assign({}, cmdObj, {$_generateV2ResumeTokens: true}); + } + return originalRunCommand.apply(conn, makeRunCommandArgs(cmdObj)); +} + +// Always apply the override if a test spawns a parallel shell. +OverrideHelpers.prependOverrideInParallelShell( + "jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js"); + +// Override the default runCommand with our custom version. +OverrideHelpers.overrideRunCommand(runCommandV2Tokens); +})();
\ No newline at end of file diff --git a/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js b/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js new file mode 100644 index 00000000000..5139bff5b56 --- /dev/null +++ b/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js @@ -0,0 +1,109 @@ +/** + * Test that mongoS explicitly sets the value of $_generateV2ResumeTokens to 'false' on the commands + * it sends to the shards if no value was specified by the client, and that if a value was + * specified, it forwards it to the shards. On a replica set, no explicit value is set; the + * aggregation simply treats it as default-false. + * TODO SERVER-65370: remove or rework this test when v2 tokens become the default. + * @tags: [ + * uses_change_streams, + * requires_sharding, + * requires_replication, + * requires_wiredtiger, + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. +load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow. + +// Create a sharding fixture with a single one-node replset shard and a one-node replset config +// server. The latter is to ensure that there is only one node that the internal new-shard monitor +// $changeStream can be sent to, since it is dispatched with secondaryPreferred readPreference. +const st = new ShardingTest({shards: 1, rs: {nodes: 1}, config: {nodes: 1}}); + +const mongosDB = st.s.getDB("test"); +const shardDB = st.rs0.getPrimary().getDB(mongosDB.getName()); +const configDB = st.configRS.getPrimary().getDB("config"); + +const mongosColl = assertDropAndRecreateCollection(mongosDB, jsTestName()); +const shardColl = shardDB[mongosColl.getName()]; +const configColl = configDB.shards; + +// Enable profiling on the shard and config server. +assert.commandWorked(shardDB.setProfilingLevel(2)); +assert.commandWorked(configDB.setProfilingLevel(2)); + +// Create one stream on mongoS that returns v1 tokens, the default. +const v1MongosStream = mongosColl.watch([], {comment: "v1MongosStream"}); + +// Create a second stream on mongoS that explicitly requests v2 tokens. +const v2MongosStream = + mongosColl.watch([], {comment: "v2MongosStream", $_generateV2ResumeTokens: true}); + +// Create a stream directly on the shard which returns the default v1 tokens. +const v1ShardStream = shardColl.watch([], {comment: "v1ShardStream"}); + +// Insert a test document into the collection. +assert.commandWorked(mongosColl.insert({_id: 1})); + +// Wait until all streams have encountered the insert operation. +assert.soon(() => v1MongosStream.hasNext() && v2MongosStream.hasNext() && v1ShardStream.hasNext()); + +// Confirm that in a sharded cluster, mongoS explicitly sets $_generateV2ResumeTokens to false on +// the command that it sends to the shards if nothing was specified by the client. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: shardDB, + filter: { + "originatingCommand.aggregate": mongosColl.getName(), + "originatingCommand.comment": "v1MongosStream", + "originatingCommand.$_generateV2ResumeTokens": false + } +}); + +// Confirm that we also set $_generateV2ResumeTokens to false on the internal new-shard monitoring +// $changeStream that we dispatch to the config servers. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: configDB, + filter: { + "originatingCommand.aggregate": configColl.getName(), + "originatingCommand.comment": "v1MongosStream", + "originatingCommand.$_generateV2ResumeTokens": false + } +}); + +// Confirm that mongoS correctly forwards the value of $_generateV2ResumeTokens to the shards if it +// is specified by the client. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: shardDB, + filter: { + "originatingCommand.aggregate": mongosColl.getName(), + "originatingCommand.comment": "v2MongosStream", + "originatingCommand.$_generateV2ResumeTokens": true + } +}); + +// Confirm that we also forward the value of $_generateV2ResumeTokens to the config servers. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: configDB, + filter: { + "originatingCommand.aggregate": configColl.getName(), + "originatingCommand.comment": "v2MongosStream", + "originatingCommand.$_generateV2ResumeTokens": true + } +}); + +// Confirm that on a replica set - in this case, a direct connection to the shard - no value is set +// for $_generateV2ResumeTokens if the client did not specify one. The aggregation defaults to +// treating the value as false. +profilerHasAtLeastOneMatchingEntryOrThrow({ + profileDB: shardDB, + filter: { + "originatingCommand.aggregate": mongosColl.getName(), + "originatingCommand.comment": "v1ShardStream", + "originatingCommand.$_generateV2ResumeTokens": {$exists: false} + } +}); + +st.stop(); +})();
\ No newline at end of file diff --git a/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js b/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js new file mode 100644 index 00000000000..2a1fe1fbad0 --- /dev/null +++ b/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js @@ -0,0 +1,27 @@ +/** + * Test that the $_generateV2ResumeTokens parameter cannot be used when test commands are disabled. + * @tags: [ + * uses_change_streams, + * requires_sharding, + * requires_replication, + * requires_wiredtiger, + * ] + */ +(function() { +"use strict"; + +// Signal to the ShardingTest that we want to disable test commands. +TestData.enableTestCommands = false; + +// Create a sharding fixture with test commands disabled. +const st = new ShardingTest({shards: 1, rs: {nodes: 1}}); + +// Confirm that attempting to run change streams with $_generateV2ResumeTokens:true fails on mongos. +assert.throwsWithCode(() => st.s.watch([], {$_generateV2ResumeTokens: true}).hasNext(), 6528201); + +// Confirm that attempting to run change streams with $_generateV2ResumeTokens:true fails on shards. +assert.throwsWithCode( + () => st.rs0.getPrimary().watch([], {$_generateV2ResumeTokens: true}).hasNext(), 6528200); + +st.stop(); +})();
\ No newline at end of file diff --git a/jstests/sharding/api_version_stage_allowance_checks.js b/jstests/sharding/api_version_stage_allowance_checks.js index 79edd460778..ac4e49181d7 100644 --- a/jstests/sharding/api_version_stage_allowance_checks.js +++ b/jstests/sharding/api_version_stage_allowance_checks.js @@ -19,6 +19,17 @@ const mongos = st.s0; const dbName = jsTestName(); const db = mongos.getDB(dbName); +// Test that a $changeStream can be opened with 'apiStrict: true'. +const result = db.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {}}], + cursor: {}, + writeConcern: {w: "majority"}, + apiVersion: "1", + apiStrict: true +}); +assert.commandWorked(result); + // Tests that sharded time-series collection can be queried (invoking $_internalUnpackBucket stage) // from an external client with 'apiStrict'. (function testInternalUnpackBucketAllowance() { diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index e21bcfbffdf..127898b875e 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -442,6 +442,14 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( expCtx->collationMatchesDefault = collationMatchesDefault; expCtx->forPerShardCursor = request.getPassthroughToShard().has_value(); + // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0 + // we only expect this to occur during testing. + // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false. + if (request.getGenerateV2ResumeTokens()) { + uassert(6528200, "Invalid request for v2 resume tokens", getTestCommandsEnabled()); + expCtx->changeStreamTokenVersion = 2; + } + return expCtx; } diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl index 0e69d47c6ba..b84513a30e6 100644 --- a/src/mongo/db/pipeline/aggregate_command.idl +++ b/src/mongo/db/pipeline/aggregate_command.idl @@ -261,6 +261,13 @@ commands: cpp_name: passthroughToShard optional: true unstable: true + # TODO SERVER-65370: after 6.0, $_generateV2ResumeTokens should be assumed true if absent. + # TODO SERVER-65369: $_generateV2ResumeTokens can be removed after 7.0. + $_generateV2ResumeTokens: + description: "Internal parameter to signal whether v2 resume tokens should be generated." + type: optionalBool + cpp_name: generateV2ResumeTokens + unstable: false encryptionInformation: description: "Encryption Information schema and other tokens for CRUD commands" type: EncryptionInformation diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index 1cd6fe39c74..1731a99f328 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -75,9 +75,12 @@ void setResumeTokenForEvent(const ResumeTokenData& resumeTokenData, MutableDocum } // namespace ChangeStreamEventTransformation::ChangeStreamEventTransformation( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) - : _changeStreamSpec(spec) { - _resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); + : _changeStreamSpec(spec), _expCtx(expCtx) { + // Extract the resume token from the spec and store it. + _resumeToken = + DocumentSourceChangeStream::resolveResumeTokenFromSpec(_expCtx, _changeStreamSpec); // Determine whether the user requested a point-in-time pre-image, which will affect this // stage's output. @@ -104,7 +107,7 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal, // If we have a resume token, we need to match the version with which it was generated until we // have surpassed it, at which point we can begin generating tokens with our default version. - auto version = (clusterTime > _resumeToken.clusterTime) ? ResumeTokenData::kDefaultTokenVersion + auto version = (clusterTime > _resumeToken.clusterTime) ? _expCtx->changeStreamTokenVersion : _resumeToken.version; // Construct and return the final resume token. @@ -114,7 +117,7 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal, ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation( const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) - : ChangeStreamEventTransformation(spec) { + : ChangeStreamEventTransformation(expCtx, spec) { _documentKeyCache = std::make_unique<change_stream_legacy::DocumentKeyCache>(expCtx, _resumeToken); } @@ -471,6 +474,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum return doc.freeze(); } +ChangeStreamViewDefinitionEventTransformation::ChangeStreamViewDefinitionEventTransformation( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec) + : ChangeStreamEventTransformation(expCtx, spec) {} + std::set<std::string> ChangeStreamViewDefinitionEventTransformation::getFieldNameDependencies() const { return std::set<std::string>{repl::OplogEntry::kOpTypeFieldName.toString(), @@ -541,7 +549,8 @@ ChangeStreamEventTransformer::ChangeStreamEventTransformer( const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { _defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(expCtx, spec); - _viewNsEventBuilder = std::make_unique<ChangeStreamViewDefinitionEventTransformation>(spec); + _viewNsEventBuilder = + std::make_unique<ChangeStreamViewDefinitionEventTransformation>(expCtx, spec); _isSingleCollStream = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns) == DocumentSourceChangeStream::ChangeStreamType::kSingleCollection; } diff --git a/src/mongo/db/pipeline/change_stream_event_transform.h b/src/mongo/db/pipeline/change_stream_event_transform.h index 8bb24eaf2bd..a884fe1c672 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.h +++ b/src/mongo/db/pipeline/change_stream_event_transform.h @@ -40,7 +40,8 @@ namespace mongo { */ class ChangeStreamEventTransformation { public: - ChangeStreamEventTransformation(const DocumentSourceChangeStreamSpec& spec); + ChangeStreamEventTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); virtual ~ChangeStreamEventTransformation() {} @@ -64,6 +65,7 @@ protected: Value opDescription) const; const DocumentSourceChangeStreamSpec _changeStreamSpec; + boost::intrusive_ptr<ExpressionContext> _expCtx; ResumeTokenData _resumeToken; // Set to true if the pre-image should be included in the output documents. @@ -94,8 +96,9 @@ private: */ class ChangeStreamViewDefinitionEventTransformation final : public ChangeStreamEventTransformation { public: - ChangeStreamViewDefinitionEventTransformation(const DocumentSourceChangeStreamSpec& spec) - : ChangeStreamEventTransformation(spec) {} + ChangeStreamViewDefinitionEventTransformation( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); Document applyTransformation(const Document& fromDoc) const override; diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 07678a0235a..af7cd71034f 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -64,8 +64,9 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( if (!userRequestedResumePoint) { // Make sure we update the 'resumeAfter' in the 'spec' so that we serialize the // correct resume token when sending it to the shards. - spec.setResumeAfter(ResumeToken::makeHighWaterMarkToken( - DocumentSourceChangeStream::getStartTimeForNewStream(expCtx))); + auto clusterTime = DocumentSourceChangeStream::getStartTimeForNewStream(expCtx); + spec.setResumeAfter( + ResumeToken::makeHighWaterMarkToken(clusterTime, expCtx->changeStreamTokenVersion)); } // Unfold the $changeStream into its constituent stages and add them to the pipeline. diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 860fdce238c..4fc0041cc0e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -227,13 +227,16 @@ std::string DocumentSourceChangeStream::regexEscapeNsForChangeStream(StringData } ResumeTokenData DocumentSourceChangeStream::resolveResumeTokenFromSpec( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { if (spec.getStartAfter()) { return spec.getStartAfter()->getData(); } else if (spec.getResumeAfter()) { return spec.getResumeAfter()->getData(); } else if (spec.getStartAtOperationTime()) { - return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime()).getData(); + return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime(), + expCtx->changeStreamTokenVersion) + .getData(); } tasserted(5666901, "Expected one of 'startAfter', 'resumeAfter' or 'startAtOperationTime' to be " @@ -292,7 +295,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui } // Obtain the resume token from the spec. This will be used when building the pipeline. - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); // Unfold the $changeStream into its constituent stages and add them to the pipeline. stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec)); @@ -441,7 +444,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification( !spec.getResumeAfter() || !spec.getStartAfter()); auto resumeToken = (spec.getResumeAfter() || spec.getStartAfter()) - ? resolveResumeTokenFromSpec(spec) + ? resolveResumeTokenFromSpec(expCtx, spec) : boost::optional<ResumeTokenData>(); uassert(40674, diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index aebb966cf72..60013e64444 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -287,7 +287,9 @@ public: * returns the equivalent high-watermark token. This method should only ever be called on a spec * where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated. */ - static ResumeTokenData resolveResumeTokenFromSpec(const DocumentSourceChangeStreamSpec& spec); + static ResumeTokenData resolveResumeTokenFromSpec( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); /** * For a change stream with no resume information supplied by the user, returns the clusterTime diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp index a3c6f46f0e2..8121fedb3e7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp @@ -71,7 +71,7 @@ DocumentSourceChangeStreamCheckInvalidate::create( const DocumentSourceChangeStreamSpec& spec) { // If resuming from an "invalidate" using "startAfter", pass along the resume token data to // DSCSCheckInvalidate to signify that another invalidate should not be generated. - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); return new DocumentSourceChangeStreamCheckInvalidate( expCtx, boost::make_optional(resumeToken.fromInvalidate, std::move(resumeToken))); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index cc3787a44ba..3861b21693d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -130,7 +130,7 @@ DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResu intrusive_ptr<DocumentSourceChangeStreamCheckResumability> DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(resumeToken)); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp index d100b6ace48..15fcb30d58f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp @@ -45,7 +45,7 @@ boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent> DocumentSourceChangeStreamEnsureResumeTokenPresent::create( const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); tassert(5666902, "Expected non-high-water-mark resume token", !ResumeToken::isHighWaterMarkToken(resumeToken)); diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp index 8b3463f015b..2f0772a29bc 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp @@ -174,8 +174,8 @@ BSONObj DocumentSourceChangeStreamHandleTopologyChange::createUpdatedCommandForN Timestamp shardAddedTime) { // We must start the new cursor from the moment at which the shard became visible. const auto newShardAddedTime = LogicalTime{shardAddedTime}; - auto resumeTokenForNewShard = - ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp()); + auto resumeTokenForNewShard = ResumeToken::makeHighWaterMarkToken( + newShardAddedTime.addTicks(1).asTimestamp(), pExpCtx->changeStreamTokenVersion); // Create a new shard command object containing the new resume token. auto shardCommand = replaceResumeTokenInCommand(resumeTokenForNewShard.toDocument()); diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp index 85a59f2db68..c11cb0a1aeb 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp @@ -104,7 +104,7 @@ DocumentSourceChangeStreamOplogMatch::DocumentSourceChangeStreamOplogMatch( boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch> DocumentSourceChangeStreamOplogMatch::create(const boost::intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { - auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec); + auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec); return make_intrusive<DocumentSourceChangeStreamOplogMatch>(resumeToken.clusterTime, expCtx); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 5f26ba73267..774e7a2dd23 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -4515,7 +4515,7 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) { auto oplogAfterResumeTime = makeAnOplogEntry(afterResumeTs, documentKey); // Create a v2 high water mark token which sorts immediately before 'firstOplogAtResumeTime'. - ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs).getData(); + ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs, 2).getData(); resumeToken.version = 2; auto expCtx = getExpCtxRaw(); expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("unittests"); diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index cf31f61b536..766150a6e54 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -74,7 +74,8 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) { // Extract the resume token or high-water-mark from the spec. - auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); + auto tokenData = + DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, _changeStreamSpec); // Set the initialPostBatchResumeToken on the expression context. expCtx->initialPostBatchResumeToken = ResumeToken(tokenData).toBSON(); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 5a225a35fca..28603f10a30 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -385,7 +385,9 @@ protected: } intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability( Timestamp ts) { - return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData()); + return createDSCheckResumability( + ResumeToken::makeHighWaterMarkToken(ts, ResumeTokenData::kDefaultTokenVersion) + .getData()); } }; diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 38f7680e2ce..52036933e6f 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -218,6 +218,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith( expCtx->exprDeprectedForApiV1 = exprDeprectedForApiV1; expCtx->initialPostBatchResumeToken = initialPostBatchResumeToken.getOwned(); + expCtx->changeStreamTokenVersion = changeStreamTokenVersion; + expCtx->changeStreamSpec = changeStreamSpec; + expCtx->originalAggregateCommand = originalAggregateCommand.getOwned(); // Note that we intentionally skip copying the value of '_interruptCounter' because 'expCtx' is diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 833a2da9ef1..2fe36d51ab7 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -472,6 +472,9 @@ public: // If present, the spec associated with the current change stream pipeline. boost::optional<DocumentSourceChangeStreamSpec> changeStreamSpec; + // The resume token version that should be generated by a change stream. + int changeStreamTokenVersion = ResumeTokenData::kDefaultTokenVersion; + // True if the expression context is the original one for a given pipeline. // False if another context is created for the same pipeline. Used to disable duplicate // expression counting. diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp index 9bd24748396..0b26a7db813 100644 --- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp +++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp @@ -174,7 +174,8 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional // high-water-mark token at the current clusterTime. auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get()); if (highWaterMark > _latestOplogTimestamp) { - auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark); + auto token = ResumeToken::makeHighWaterMarkToken( + highWaterMark, _pipeline->getContext()->changeStreamTokenVersion); _postBatchResumeToken = token.toDocument().toBson(); _latestOplogTimestamp = highWaterMark; _setSpeculativeReadTimestamp(); diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp index d1a54e4cb52..cc6d3631fd3 100644 --- a/src/mongo/db/pipeline/resume_token.cpp +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -50,9 +50,9 @@ namespace { // Helper function for makeHighWaterMarkToken and isHighWaterMarkToken. ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime, int version) { ResumeTokenData tokenData; + tokenData.version = version; tokenData.clusterTime = clusterTime; tokenData.tokenType = ResumeTokenData::kHighWaterMarkToken; - tokenData.version = version; return tokenData; } } // namespace @@ -297,9 +297,8 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) { return ResumeToken(resumeDoc); } -ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime) { - return ResumeToken( - makeHighWaterMarkResumeTokenData(clusterTime, ResumeTokenData::kDefaultTokenVersion)); +ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime, int version) { + return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, version)); } bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) { diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h index 1c5380b7f12..a157c97fed9 100644 --- a/src/mongo/db/pipeline/resume_token.h +++ b/src/mongo/db/pipeline/resume_token.h @@ -142,7 +142,7 @@ public: /** * Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey. */ - static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime); + static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime, int version); /** * Returns true if the given token data represents a valid high-water-mark resume token; that diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index f44a6526c00..f63f6c61bc6 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -113,6 +113,12 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi << DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))}); aggReq.setFromMongos(true); aggReq.setNeedsMerge(true); + + // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed. + if (expCtx->inMongos) { + aggReq.setGenerateV2ResumeTokens(expCtx->changeStreamTokenVersion == 2); + } + SimpleCursorOptions cursor; cursor.setBatchSize(0); aggReq.setCursor(cursor); @@ -145,6 +151,17 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, cmdForShards[AggregateCommandRequest::kCollationFieldName] = Value(collationObj); } + // We explicitly set $_generateV2ResumeTokens to false, if not already set, to indicate that the + // shards should NOT produce v2 resume tokens for change streams; instead, they should continue + // generating v1 tokens. This facilitates upgrade between 6.0 (which produces v1 by default) + // and 7.0 (which will produce v2 by default, but will be capable of generating v1) by ensuring + // that a 6.0 mongoS on a mixed 6.0/7.0 cluster will see only v1 tokens in the stream. + // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed. + const auto& v2FieldName = AggregateCommandRequest::kGenerateV2ResumeTokensFieldName; + if (auto cmdObj = cmdForShards.peek(); expCtx->inMongos && cmdObj[v2FieldName].missing()) { + cmdForShards[v2FieldName] = Value(false); + } + // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. if (explainVerbosity) { diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index 183c27e05d6..56498b71024 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -395,7 +395,9 @@ TEST(CursorResponseTest, addToBSONSubsequentResponse) { TEST(CursorResponseTest, serializePostBatchResumeToken) { std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; auto postBatchResumeToken = - ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson(); + ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2), ResumeTokenData::kDefaultTokenVersion) + .toDocument() + .toBson(); CursorResponse response( NamespaceString("db.coll"), CursorId(123), batch, boost::none, postBatchResumeToken); auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse); diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index d089ac691c9..80600272e63 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -54,7 +54,10 @@ LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) { } BSONObj makePostBatchResumeToken(Timestamp clusterTime) { - auto pbrt = ResumeToken::makeHighWaterMarkToken(clusterTime).toDocument().toBson(); + auto pbrt = + ResumeToken::makeHighWaterMarkToken(clusterTime, ResumeTokenData::kDefaultTokenVersion) + .toDocument() + .toBson(); invariant(pbrt.firstElement().type() == BSONType::String); return pbrt; } diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 4bcd150206b..b3df71582da 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -133,6 +133,14 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( mergeCtx->inMongos = true; + // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0 + // we only expect this to occur during testing. + // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false. + if (request.getGenerateV2ResumeTokens()) { + uassert(6528201, "Invalid request for v2 resume tokens", getTestCommandsEnabled()); + mergeCtx->changeStreamTokenVersion = 2; + } + // Serialize the 'AggregateCommandRequest' and save it so that the original command can be // reconstructed for dispatch to a new shard, which is sometimes necessary for change streams // pipelines. |