summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2022-04-11 17:09:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-11 23:28:08 +0000
commit9f0897f74297a4ae415a3b5ebf482ff78afc2058 (patch)
tree9829721a2bcded15f410b23a8af5962bfd5e6a2d
parentbbb9ba057afddf16a714c3c39688bee9fe8ccb56 (diff)
downloadmongo-9f0897f74297a4ae415a3b5ebf482ff78afc2058.tar.gz
SERVER-65282 Add $_generateV2ResumeTokens parameter to aggregate command
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml59
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml61
-rw-r--r--etc/evergreen_yml_components/definitions.yml16
-rw-r--r--jstests/aggregation/api_version_stage_allowance_checks.js12
-rw-r--r--jstests/change_streams/generate_v2_resume_token.js35
-rw-r--r--jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js26
-rw-r--r--jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js109
-rw-r--r--jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js27
-rw-r--r--jstests/sharding/api_version_stage_allowance_checks.js11
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp8
-rw-r--r--src/mongo/db/pipeline/aggregate_command.idl7
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp19
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.h9
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp3
-rw-r--r--src/mongo/db/pipeline/expression_context.h3
-rw-r--r--src/mongo/db/pipeline/plan_executor_pipeline.cpp3
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp7
-rw-r--r--src/mongo/db/pipeline/resume_token.h2
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp17
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp4
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp5
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp8
33 files changed, 459 insertions, 31 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml
new file mode 100644
index 00000000000..4179f872b7f
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml
@@ -0,0 +1,59 @@
+test_kind: js_test
+# TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1.
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # This test explicitly compares v1 and v2 tokens, and must be able to generate the former.
+ - jstests/change_streams/generate_v2_resume_token.js
+ exclude_with_any_tags:
+ ##
+ # The next tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - CheckReplOplogs
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: null
+ enableMajorityReadConcern: ''
+ # Enable causal consistency for change streams suites using 1 node replica sets. Some tests
+ # rely on the assumption that a w:majority write will be visible immediately in a subsequently
+ # opened change stream. In 1 node replica sets, an operation that majority commits at
+ # timestamp T will force the majority snapshot to advance to T, but the oplog visibility point
+ # may not have advanced to T yet. Subsequent majority snapshot reads will see this write in
+ # the oplog, but speculative majority reads may not, since they read from a local snapshot and
+ # are bound to the oplog visibility rules. Using causal consistency forces the visibility
+ # point to advance to the timestamp of the last write before doing a new read.
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js');
+ load('jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js');
+ hooks:
+ # The CheckReplDBHash hook waits until all operations have replicated to and have been applied
+ # on the secondaries, so we run the ValidateCollections hook after it to ensure we're
+ # validating the entire contents of the collection.
+ - class: CheckReplOplogs
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ReplicaSetFixture
+ mongod_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ num_nodes: 2
diff --git a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml
new file mode 100644
index 00000000000..a0f93f572b0
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml
@@ -0,0 +1,61 @@
+test_kind: js_test
+# TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1.
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # This test explicitly compares v1 and v2 tokens, and must be able to generate the former.
+ - jstests/change_streams/generate_v2_resume_token.js
+ exclude_with_any_tags:
+ ##
+ # The next tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # Exclude any that assume sharding is disabled
+ - assumes_against_mongod_not_mongos
+ - assumes_unsharded_collection
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: null
+ enableMajorityReadConcern: ''
+ # Enable causal consistency for change streams suites using 1 node replica sets. See
+ # change_streams.yml for detailed explanation.
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js');
+ load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js');
+ load('jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js');
+ hooks:
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ writePeriodicNoops: 1
+ periodicNoopIntervalSecs: 1
+ coordinateCommitReturnImmediatelyAfterPersistingDecision: true
+ num_shards: 2
+ enable_sharding:
+ - test
diff --git a/etc/evergreen_yml_components/definitions.yml b/etc/evergreen_yml_components/definitions.yml
index 98b9c802a79..5b7eae3aa68 100644
--- a/etc/evergreen_yml_components/definitions.yml
+++ b/etc/evergreen_yml_components/definitions.yml
@@ -4201,6 +4201,13 @@ tasks:
- func: "do setup"
- func: "run tests"
+- <<: *task_template
+ name: change_streams_v2_resume_token_passthrough
+ tags: ["change_streams"]
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+
- <<: *gen_task_template
name: change_streams_multiversion_gen
tags: ["multiversion", "multiversion_passthrough"]
@@ -4258,6 +4265,15 @@ tasks:
- func: "do setup"
- func: "run tests"
+- <<: *task_template
+ name: change_streams_v2_resume_token_sharded_collections_passthrough
+ tags: ["change_streams"]
+ depends_on:
+ - name: change_streams
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+
- <<: *gen_task_template
name: change_streams_sharded_collections_multiversion_gen
tags: ["multiversion_passthrough", "multiversion"]
diff --git a/jstests/aggregation/api_version_stage_allowance_checks.js b/jstests/aggregation/api_version_stage_allowance_checks.js
index 751e35663f2..f47b27fd630 100644
--- a/jstests/aggregation/api_version_stage_allowance_checks.js
+++ b/jstests/aggregation/api_version_stage_allowance_checks.js
@@ -143,6 +143,18 @@ result = testDB.runCommand({
});
assert.commandWorked(result);
+// Tests that the internal '$_generateV2ResumeTokens' option does not fail with 'apiStrict: true'.
+result = testDB.runCommand({
+ aggregate: collName,
+ pipeline: [{$project: {_id: 0}}],
+ cursor: {},
+ writeConcern: {w: "majority"},
+ $_generateV2ResumeTokens: true,
+ apiVersion: "1",
+ apiStrict: true
+});
+assert.commandWorked(result);
+
// Tests that time-series collection can be queried (invoking $_internalUnpackBucket stage)
// from an external client with 'apiStrict'.
(function testInternalUnpackBucketAllowance() {
diff --git a/jstests/change_streams/generate_v2_resume_token.js b/jstests/change_streams/generate_v2_resume_token.js
new file mode 100644
index 00000000000..4c2fc5229fb
--- /dev/null
+++ b/jstests/change_streams/generate_v2_resume_token.js
@@ -0,0 +1,35 @@
+/**
+ * Test that the $_generateV2ResumeTokens parameter can be used to force change streams to return v2
+ * tokens.
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+
+const coll = assertDropAndRecreateCollection(db, jsTestName());
+
+// Create one stream that returns v1 tokens, the default.
+const v1Stream = coll.watch([]);
+
+// Create a second stream that explicitly requests v2 tokens.
+const v2Stream = coll.watch([], {$_generateV2ResumeTokens: true});
+
+// Insert a test document into the collection.
+assert.commandWorked(coll.insert({_id: 1}));
+
+// Wait until both streams have encountered the insert operation.
+assert.soon(() => v1Stream.hasNext() && v2Stream.hasNext());
+const v1Event = v1Stream.next();
+const v2Event = v2Stream.next();
+
+// Confirm that the streams see the same event, but the resume tokens differ.
+const v1ResumeToken = v1Event._id;
+const v2ResumeToken = v2Event._id;
+
+delete v1Event._id;
+delete v2Event._id;
+
+assert.docEq(v1Event, v2Event);
+assert.neq(v1ResumeToken, v2ResumeToken);
+})(); \ No newline at end of file
diff --git a/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js b/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js
new file mode 100644
index 00000000000..068d121fff7
--- /dev/null
+++ b/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js
@@ -0,0 +1,26 @@
+/**
+ * Loading this file overrides 'runCommand' with a function that modifies any $changeStream
+ * aggregation to use $_generateV2ResumeTokens:true.
+ * TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1.
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/override_methods/override_helpers.js"); // For 'OverrideHelpers'.
+
+// Override runCommand to set $_generateV2ResumeTokens on all $changeStreams.
+function runCommandV2Tokens(conn, dbName, cmdName, cmdObj, originalRunCommand, makeRunCommandArgs) {
+ if (OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj)) {
+ // Make a copy to avoid mutating the user's original command object.
+ cmdObj = Object.assign({}, cmdObj, {$_generateV2ResumeTokens: true});
+ }
+ return originalRunCommand.apply(conn, makeRunCommandArgs(cmdObj));
+}
+
+// Always apply the override if a test spawns a parallel shell.
+OverrideHelpers.prependOverrideInParallelShell(
+ "jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js");
+
+// Override the default runCommand with our custom version.
+OverrideHelpers.overrideRunCommand(runCommandV2Tokens);
+})(); \ No newline at end of file
diff --git a/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js b/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js
new file mode 100644
index 00000000000..5139bff5b56
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js
@@ -0,0 +1,109 @@
+/**
+ * Test that mongoS explicitly sets the value of $_generateV2ResumeTokens to 'false' on the commands
+ * it sends to the shards if no value was specified by the client, and that if a value was
+ * specified, it forwards it to the shards. On a replica set, no explicit value is set; the
+ * aggregation simply treats it as default-false.
+ * TODO SERVER-65370: remove or rework this test when v2 tokens become the default.
+ * @tags: [
+ * uses_change_streams,
+ * requires_sharding,
+ * requires_replication,
+ * requires_wiredtiger,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow.
+
+// Create a sharding fixture with a single one-node replset shard and a one-node replset config
+// server. The latter is to ensure that there is only one node that the internal new-shard monitor
+// $changeStream can be sent to, since it is dispatched with secondaryPreferred readPreference.
+const st = new ShardingTest({shards: 1, rs: {nodes: 1}, config: {nodes: 1}});
+
+const mongosDB = st.s.getDB("test");
+const shardDB = st.rs0.getPrimary().getDB(mongosDB.getName());
+const configDB = st.configRS.getPrimary().getDB("config");
+
+const mongosColl = assertDropAndRecreateCollection(mongosDB, jsTestName());
+const shardColl = shardDB[mongosColl.getName()];
+const configColl = configDB.shards;
+
+// Enable profiling on the shard and config server.
+assert.commandWorked(shardDB.setProfilingLevel(2));
+assert.commandWorked(configDB.setProfilingLevel(2));
+
+// Create one stream on mongoS that returns v1 tokens, the default.
+const v1MongosStream = mongosColl.watch([], {comment: "v1MongosStream"});
+
+// Create a second stream on mongoS that explicitly requests v2 tokens.
+const v2MongosStream =
+ mongosColl.watch([], {comment: "v2MongosStream", $_generateV2ResumeTokens: true});
+
+// Create a stream directly on the shard which returns the default v1 tokens.
+const v1ShardStream = shardColl.watch([], {comment: "v1ShardStream"});
+
+// Insert a test document into the collection.
+assert.commandWorked(mongosColl.insert({_id: 1}));
+
+// Wait until all streams have encountered the insert operation.
+assert.soon(() => v1MongosStream.hasNext() && v2MongosStream.hasNext() && v1ShardStream.hasNext());
+
+// Confirm that in a sharded cluster, mongoS explicitly sets $_generateV2ResumeTokens to false on
+// the command that it sends to the shards if nothing was specified by the client.
+profilerHasAtLeastOneMatchingEntryOrThrow({
+ profileDB: shardDB,
+ filter: {
+ "originatingCommand.aggregate": mongosColl.getName(),
+ "originatingCommand.comment": "v1MongosStream",
+ "originatingCommand.$_generateV2ResumeTokens": false
+ }
+});
+
+// Confirm that we also set $_generateV2ResumeTokens to false on the internal new-shard monitoring
+// $changeStream that we dispatch to the config servers.
+profilerHasAtLeastOneMatchingEntryOrThrow({
+ profileDB: configDB,
+ filter: {
+ "originatingCommand.aggregate": configColl.getName(),
+ "originatingCommand.comment": "v1MongosStream",
+ "originatingCommand.$_generateV2ResumeTokens": false
+ }
+});
+
+// Confirm that mongoS correctly forwards the value of $_generateV2ResumeTokens to the shards if it
+// is specified by the client.
+profilerHasAtLeastOneMatchingEntryOrThrow({
+ profileDB: shardDB,
+ filter: {
+ "originatingCommand.aggregate": mongosColl.getName(),
+ "originatingCommand.comment": "v2MongosStream",
+ "originatingCommand.$_generateV2ResumeTokens": true
+ }
+});
+
+// Confirm that we also forward the value of $_generateV2ResumeTokens to the config servers.
+profilerHasAtLeastOneMatchingEntryOrThrow({
+ profileDB: configDB,
+ filter: {
+ "originatingCommand.aggregate": configColl.getName(),
+ "originatingCommand.comment": "v2MongosStream",
+ "originatingCommand.$_generateV2ResumeTokens": true
+ }
+});
+
+// Confirm that on a replica set - in this case, a direct connection to the shard - no value is set
+// for $_generateV2ResumeTokens if the client did not specify one. The aggregation defaults to
+// treating the value as false.
+profilerHasAtLeastOneMatchingEntryOrThrow({
+ profileDB: shardDB,
+ filter: {
+ "originatingCommand.aggregate": mongosColl.getName(),
+ "originatingCommand.comment": "v1ShardStream",
+ "originatingCommand.$_generateV2ResumeTokens": {$exists: false}
+ }
+});
+
+st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js b/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js
new file mode 100644
index 00000000000..2a1fe1fbad0
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js
@@ -0,0 +1,27 @@
+/**
+ * Test that the $_generateV2ResumeTokens parameter cannot be used when test commands are disabled.
+ * @tags: [
+ * uses_change_streams,
+ * requires_sharding,
+ * requires_replication,
+ * requires_wiredtiger,
+ * ]
+ */
+(function() {
+"use strict";
+
+// Signal to the ShardingTest that we want to disable test commands.
+TestData.enableTestCommands = false;
+
+// Create a sharding fixture with test commands disabled.
+const st = new ShardingTest({shards: 1, rs: {nodes: 1}});
+
+// Confirm that attempting to run change streams with $_generateV2ResumeTokens:true fails on mongos.
+assert.throwsWithCode(() => st.s.watch([], {$_generateV2ResumeTokens: true}).hasNext(), 6528201);
+
+// Confirm that attempting to run change streams with $_generateV2ResumeTokens:true fails on shards.
+assert.throwsWithCode(
+ () => st.rs0.getPrimary().watch([], {$_generateV2ResumeTokens: true}).hasNext(), 6528200);
+
+st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/sharding/api_version_stage_allowance_checks.js b/jstests/sharding/api_version_stage_allowance_checks.js
index 79edd460778..ac4e49181d7 100644
--- a/jstests/sharding/api_version_stage_allowance_checks.js
+++ b/jstests/sharding/api_version_stage_allowance_checks.js
@@ -19,6 +19,17 @@ const mongos = st.s0;
const dbName = jsTestName();
const db = mongos.getDB(dbName);
+// Test that a $changeStream can be opened with 'apiStrict: true'.
+const result = db.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {}}],
+ cursor: {},
+ writeConcern: {w: "majority"},
+ apiVersion: "1",
+ apiStrict: true
+});
+assert.commandWorked(result);
+
// Tests that sharded time-series collection can be queried (invoking $_internalUnpackBucket stage)
// from an external client with 'apiStrict'.
(function testInternalUnpackBucketAllowance() {
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index e21bcfbffdf..127898b875e 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -442,6 +442,14 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
expCtx->collationMatchesDefault = collationMatchesDefault;
expCtx->forPerShardCursor = request.getPassthroughToShard().has_value();
+ // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0
+ // we only expect this to occur during testing.
+ // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false.
+ if (request.getGenerateV2ResumeTokens()) {
+ uassert(6528200, "Invalid request for v2 resume tokens", getTestCommandsEnabled());
+ expCtx->changeStreamTokenVersion = 2;
+ }
+
return expCtx;
}
diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl
index 0e69d47c6ba..b84513a30e6 100644
--- a/src/mongo/db/pipeline/aggregate_command.idl
+++ b/src/mongo/db/pipeline/aggregate_command.idl
@@ -261,6 +261,13 @@ commands:
cpp_name: passthroughToShard
optional: true
unstable: true
+ # TODO SERVER-65370: after 6.0, $_generateV2ResumeTokens should be assumed true if absent.
+ # TODO SERVER-65369: $_generateV2ResumeTokens can be removed after 7.0.
+ $_generateV2ResumeTokens:
+ description: "Internal parameter to signal whether v2 resume tokens should be generated."
+ type: optionalBool
+ cpp_name: generateV2ResumeTokens
+ unstable: false
encryptionInformation:
description: "Encryption Information schema and other tokens for CRUD commands"
type: EncryptionInformation
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 1cd6fe39c74..1731a99f328 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -75,9 +75,12 @@ void setResumeTokenForEvent(const ResumeTokenData& resumeTokenData, MutableDocum
} // namespace
ChangeStreamEventTransformation::ChangeStreamEventTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec)
- : _changeStreamSpec(spec) {
- _resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
+ : _changeStreamSpec(spec), _expCtx(expCtx) {
+ // Extract the resume token from the spec and store it.
+ _resumeToken =
+ DocumentSourceChangeStream::resolveResumeTokenFromSpec(_expCtx, _changeStreamSpec);
// Determine whether the user requested a point-in-time pre-image, which will affect this
// stage's output.
@@ -104,7 +107,7 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal,
// If we have a resume token, we need to match the version with which it was generated until we
// have surpassed it, at which point we can begin generating tokens with our default version.
- auto version = (clusterTime > _resumeToken.clusterTime) ? ResumeTokenData::kDefaultTokenVersion
+ auto version = (clusterTime > _resumeToken.clusterTime) ? _expCtx->changeStreamTokenVersion
: _resumeToken.version;
// Construct and return the final resume token.
@@ -114,7 +117,7 @@ ResumeTokenData ChangeStreamEventTransformation::makeResumeToken(Value tsVal,
ChangeStreamDefaultEventTransformation::ChangeStreamDefaultEventTransformation(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec)
- : ChangeStreamEventTransformation(spec) {
+ : ChangeStreamEventTransformation(expCtx, spec) {
_documentKeyCache =
std::make_unique<change_stream_legacy::DocumentKeyCache>(expCtx, _resumeToken);
}
@@ -471,6 +474,11 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
return doc.freeze();
}
+ChangeStreamViewDefinitionEventTransformation::ChangeStreamViewDefinitionEventTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec)
+ : ChangeStreamEventTransformation(expCtx, spec) {}
+
std::set<std::string> ChangeStreamViewDefinitionEventTransformation::getFieldNameDependencies()
const {
return std::set<std::string>{repl::OplogEntry::kOpTypeFieldName.toString(),
@@ -541,7 +549,8 @@ ChangeStreamEventTransformer::ChangeStreamEventTransformer(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
_defaultEventBuilder = std::make_unique<ChangeStreamDefaultEventTransformation>(expCtx, spec);
- _viewNsEventBuilder = std::make_unique<ChangeStreamViewDefinitionEventTransformation>(spec);
+ _viewNsEventBuilder =
+ std::make_unique<ChangeStreamViewDefinitionEventTransformation>(expCtx, spec);
_isSingleCollStream = DocumentSourceChangeStream::getChangeStreamType(expCtx->ns) ==
DocumentSourceChangeStream::ChangeStreamType::kSingleCollection;
}
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.h b/src/mongo/db/pipeline/change_stream_event_transform.h
index 8bb24eaf2bd..a884fe1c672 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.h
+++ b/src/mongo/db/pipeline/change_stream_event_transform.h
@@ -40,7 +40,8 @@ namespace mongo {
*/
class ChangeStreamEventTransformation {
public:
- ChangeStreamEventTransformation(const DocumentSourceChangeStreamSpec& spec);
+ ChangeStreamEventTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
virtual ~ChangeStreamEventTransformation() {}
@@ -64,6 +65,7 @@ protected:
Value opDescription) const;
const DocumentSourceChangeStreamSpec _changeStreamSpec;
+ boost::intrusive_ptr<ExpressionContext> _expCtx;
ResumeTokenData _resumeToken;
// Set to true if the pre-image should be included in the output documents.
@@ -94,8 +96,9 @@ private:
*/
class ChangeStreamViewDefinitionEventTransformation final : public ChangeStreamEventTransformation {
public:
- ChangeStreamViewDefinitionEventTransformation(const DocumentSourceChangeStreamSpec& spec)
- : ChangeStreamEventTransformation(spec) {}
+ ChangeStreamViewDefinitionEventTransformation(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
Document applyTransformation(const Document& fromDoc) const override;
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 07678a0235a..af7cd71034f 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -64,8 +64,9 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
if (!userRequestedResumePoint) {
// Make sure we update the 'resumeAfter' in the 'spec' so that we serialize the
// correct resume token when sending it to the shards.
- spec.setResumeAfter(ResumeToken::makeHighWaterMarkToken(
- DocumentSourceChangeStream::getStartTimeForNewStream(expCtx)));
+ auto clusterTime = DocumentSourceChangeStream::getStartTimeForNewStream(expCtx);
+ spec.setResumeAfter(
+ ResumeToken::makeHighWaterMarkToken(clusterTime, expCtx->changeStreamTokenVersion));
}
// Unfold the $changeStream into its constituent stages and add them to the pipeline.
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 860fdce238c..4fc0041cc0e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -227,13 +227,16 @@ std::string DocumentSourceChangeStream::regexEscapeNsForChangeStream(StringData
}
ResumeTokenData DocumentSourceChangeStream::resolveResumeTokenFromSpec(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
if (spec.getStartAfter()) {
return spec.getStartAfter()->getData();
} else if (spec.getResumeAfter()) {
return spec.getResumeAfter()->getData();
} else if (spec.getStartAtOperationTime()) {
- return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime()).getData();
+ return ResumeToken::makeHighWaterMarkToken(*spec.getStartAtOperationTime(),
+ expCtx->changeStreamTokenVersion)
+ .getData();
}
tasserted(5666901,
"Expected one of 'startAfter', 'resumeAfter' or 'startAtOperationTime' to be "
@@ -292,7 +295,7 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
}
// Obtain the resume token from the spec. This will be used when building the pipeline.
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
// Unfold the $changeStream into its constituent stages and add them to the pipeline.
stages.push_back(DocumentSourceChangeStreamOplogMatch::create(expCtx, spec));
@@ -441,7 +444,7 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
!spec.getResumeAfter() || !spec.getStartAfter());
auto resumeToken = (spec.getResumeAfter() || spec.getStartAfter())
- ? resolveResumeTokenFromSpec(spec)
+ ? resolveResumeTokenFromSpec(expCtx, spec)
: boost::optional<ResumeTokenData>();
uassert(40674,
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index aebb966cf72..60013e64444 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -287,7 +287,9 @@ public:
* returns the equivalent high-watermark token. This method should only ever be called on a spec
* where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated.
*/
- static ResumeTokenData resolveResumeTokenFromSpec(const DocumentSourceChangeStreamSpec& spec);
+ static ResumeTokenData resolveResumeTokenFromSpec(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
/**
* For a change stream with no resume information supplied by the user, returns the clusterTime
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
index a3c6f46f0e2..8121fedb3e7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_invalidate.cpp
@@ -71,7 +71,7 @@ DocumentSourceChangeStreamCheckInvalidate::create(
const DocumentSourceChangeStreamSpec& spec) {
// If resuming from an "invalidate" using "startAfter", pass along the resume token data to
// DSCSCheckInvalidate to signify that another invalidate should not be generated.
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
return new DocumentSourceChangeStreamCheckInvalidate(
expCtx, boost::make_optional(resumeToken.fromInvalidate, std::move(resumeToken)));
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
index cc3787a44ba..3861b21693d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
@@ -130,7 +130,7 @@ DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResu
intrusive_ptr<DocumentSourceChangeStreamCheckResumability>
DocumentSourceChangeStreamCheckResumability::create(const intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
return new DocumentSourceChangeStreamCheckResumability(expCtx, std::move(resumeToken));
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
index d100b6ace48..15fcb30d58f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_ensure_resume_token_present.cpp
@@ -45,7 +45,7 @@ boost::intrusive_ptr<DocumentSourceChangeStreamEnsureResumeTokenPresent>
DocumentSourceChangeStreamEnsureResumeTokenPresent::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
tassert(5666902,
"Expected non-high-water-mark resume token",
!ResumeToken::isHighWaterMarkToken(resumeToken));
diff --git a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
index 8b3463f015b..2f0772a29bc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_handle_topology_change.cpp
@@ -174,8 +174,8 @@ BSONObj DocumentSourceChangeStreamHandleTopologyChange::createUpdatedCommandForN
Timestamp shardAddedTime) {
// We must start the new cursor from the moment at which the shard became visible.
const auto newShardAddedTime = LogicalTime{shardAddedTime};
- auto resumeTokenForNewShard =
- ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp());
+ auto resumeTokenForNewShard = ResumeToken::makeHighWaterMarkToken(
+ newShardAddedTime.addTicks(1).asTimestamp(), pExpCtx->changeStreamTokenVersion);
// Create a new shard command object containing the new resume token.
auto shardCommand = replaceResumeTokenInCommand(resumeTokenForNewShard.toDocument());
diff --git a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
index 85a59f2db68..c11cb0a1aeb 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_oplog_match.cpp
@@ -104,7 +104,7 @@ DocumentSourceChangeStreamOplogMatch::DocumentSourceChangeStreamOplogMatch(
boost::intrusive_ptr<DocumentSourceChangeStreamOplogMatch>
DocumentSourceChangeStreamOplogMatch::create(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec& spec) {
- auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(spec);
+ auto resumeToken = DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, spec);
return make_intrusive<DocumentSourceChangeStreamOplogMatch>(resumeToken.clusterTime, expCtx);
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 5f26ba73267..774e7a2dd23 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -4515,7 +4515,7 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) {
auto oplogAfterResumeTime = makeAnOplogEntry(afterResumeTs, documentKey);
// Create a v2 high water mark token which sorts immediately before 'firstOplogAtResumeTime'.
- ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs).getData();
+ ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs, 2).getData();
resumeToken.version = 2;
auto expCtx = getExpCtxRaw();
expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("unittests");
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index cf31f61b536..766150a6e54 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -74,7 +74,8 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
_isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) {
// Extract the resume token or high-water-mark from the spec.
- auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
+ auto tokenData =
+ DocumentSourceChangeStream::resolveResumeTokenFromSpec(expCtx, _changeStreamSpec);
// Set the initialPostBatchResumeToken on the expression context.
expCtx->initialPostBatchResumeToken = ResumeToken(tokenData).toBSON();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 5a225a35fca..28603f10a30 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -385,7 +385,9 @@ protected:
}
intrusive_ptr<DocumentSourceChangeStreamCheckResumability> createDSCheckResumability(
Timestamp ts) {
- return createDSCheckResumability(ResumeToken::makeHighWaterMarkToken(ts).getData());
+ return createDSCheckResumability(
+ ResumeToken::makeHighWaterMarkToken(ts, ResumeTokenData::kDefaultTokenVersion)
+ .getData());
}
};
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 38f7680e2ce..52036933e6f 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -218,6 +218,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(
expCtx->exprDeprectedForApiV1 = exprDeprectedForApiV1;
expCtx->initialPostBatchResumeToken = initialPostBatchResumeToken.getOwned();
+ expCtx->changeStreamTokenVersion = changeStreamTokenVersion;
+ expCtx->changeStreamSpec = changeStreamSpec;
+
expCtx->originalAggregateCommand = originalAggregateCommand.getOwned();
// Note that we intentionally skip copying the value of '_interruptCounter' because 'expCtx' is
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 833a2da9ef1..2fe36d51ab7 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -472,6 +472,9 @@ public:
// If present, the spec associated with the current change stream pipeline.
boost::optional<DocumentSourceChangeStreamSpec> changeStreamSpec;
+ // The resume token version that should be generated by a change stream.
+ int changeStreamTokenVersion = ResumeTokenData::kDefaultTokenVersion;
+
// True if the expression context is the original one for a given pipeline.
// False if another context is created for the same pipeline. Used to disable duplicate
// expression counting.
diff --git a/src/mongo/db/pipeline/plan_executor_pipeline.cpp b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
index 9bd24748396..0b26a7db813 100644
--- a/src/mongo/db/pipeline/plan_executor_pipeline.cpp
+++ b/src/mongo/db/pipeline/plan_executor_pipeline.cpp
@@ -174,7 +174,8 @@ void PlanExecutorPipeline::_performChangeStreamsAccounting(const boost::optional
// high-water-mark token at the current clusterTime.
auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
if (highWaterMark > _latestOplogTimestamp) {
- auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
+ auto token = ResumeToken::makeHighWaterMarkToken(
+ highWaterMark, _pipeline->getContext()->changeStreamTokenVersion);
_postBatchResumeToken = token.toDocument().toBson();
_latestOplogTimestamp = highWaterMark;
_setSpeculativeReadTimestamp();
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index d1a54e4cb52..cc6d3631fd3 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -50,9 +50,9 @@ namespace {
// Helper function for makeHighWaterMarkToken and isHighWaterMarkToken.
ResumeTokenData makeHighWaterMarkResumeTokenData(Timestamp clusterTime, int version) {
ResumeTokenData tokenData;
+ tokenData.version = version;
tokenData.clusterTime = clusterTime;
tokenData.tokenType = ResumeTokenData::kHighWaterMarkToken;
- tokenData.version = version;
return tokenData;
}
} // namespace
@@ -297,9 +297,8 @@ ResumeToken ResumeToken::parse(const Document& resumeDoc) {
return ResumeToken(resumeDoc);
}
-ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime) {
- return ResumeToken(
- makeHighWaterMarkResumeTokenData(clusterTime, ResumeTokenData::kDefaultTokenVersion));
+ResumeToken ResumeToken::makeHighWaterMarkToken(Timestamp clusterTime, int version) {
+ return ResumeToken(makeHighWaterMarkResumeTokenData(clusterTime, version));
}
bool ResumeToken::isHighWaterMarkToken(const ResumeTokenData& tokenData) {
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 1c5380b7f12..a157c97fed9 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -142,7 +142,7 @@ public:
/**
* Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey.
*/
- static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime);
+ static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime, int version);
/**
* Returns true if the given token data represents a valid high-water-mark resume token; that
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index f44a6526c00..f63f6c61bc6 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -113,6 +113,12 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi
<< DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))});
aggReq.setFromMongos(true);
aggReq.setNeedsMerge(true);
+
+ // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed.
+ if (expCtx->inMongos) {
+ aggReq.setGenerateV2ResumeTokens(expCtx->changeStreamTokenVersion == 2);
+ }
+
SimpleCursorOptions cursor;
cursor.setBatchSize(0);
aggReq.setCursor(cursor);
@@ -145,6 +151,17 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
cmdForShards[AggregateCommandRequest::kCollationFieldName] = Value(collationObj);
}
+ // We explicitly set $_generateV2ResumeTokens to false, if not already set, to indicate that the
+ // shards should NOT produce v2 resume tokens for change streams; instead, they should continue
+ // generating v1 tokens. This facilitates upgrade between 6.0 (which produces v1 by default)
+ // and 7.0 (which will produce v2 by default, but will be capable of generating v1) by ensuring
+ // that a 6.0 mongoS on a mixed 6.0/7.0 cluster will see only v1 tokens in the stream.
+ // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed.
+ const auto& v2FieldName = AggregateCommandRequest::kGenerateV2ResumeTokensFieldName;
+ if (auto cmdObj = cmdForShards.peek(); expCtx->inMongos && cmdObj[v2FieldName].missing()) {
+ cmdForShards[v2FieldName] = Value(false);
+ }
+
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (explainVerbosity) {
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index 183c27e05d6..56498b71024 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -395,7 +395,9 @@ TEST(CursorResponseTest, addToBSONSubsequentResponse) {
TEST(CursorResponseTest, serializePostBatchResumeToken) {
std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
auto postBatchResumeToken =
- ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson();
+ ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2), ResumeTokenData::kDefaultTokenVersion)
+ .toDocument()
+ .toBson();
CursorResponse response(
NamespaceString("db.coll"), CursorId(123), batch, boost::none, postBatchResumeToken);
auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse);
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index d089ac691c9..80600272e63 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -54,7 +54,10 @@ LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) {
}
BSONObj makePostBatchResumeToken(Timestamp clusterTime) {
- auto pbrt = ResumeToken::makeHighWaterMarkToken(clusterTime).toDocument().toBson();
+ auto pbrt =
+ ResumeToken::makeHighWaterMarkToken(clusterTime, ResumeTokenData::kDefaultTokenVersion)
+ .toDocument()
+ .toBson();
invariant(pbrt.firstElement().type() == BSONType::String);
return pbrt;
}
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 4bcd150206b..b3df71582da 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -133,6 +133,14 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
mergeCtx->inMongos = true;
+ // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0
+ // we only expect this to occur during testing.
+ // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false.
+ if (request.getGenerateV2ResumeTokens()) {
+ uassert(6528201, "Invalid request for v2 resume tokens", getTestCommandsEnabled());
+ mergeCtx->changeStreamTokenVersion = 2;
+ }
+
// Serialize the 'AggregateCommandRequest' and save it so that the original command can be
// reconstructed for dispatch to a new shard, which is sometimes necessary for change streams
// pipelines.