summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Banala <arun.banala@mongodb.com>2022-04-21 17:38:00 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-28 10:17:50 +0000
commitae0241b7e9f6ce79d4cc2e8141d5ba88178369c8 (patch)
treeb4d734bc77a5a571083b2e9e96c5e38a233eaffd
parent782e90f7ae48b583c0a33d555286ef035f35e396 (diff)
downloadmongo-ae0241b7e9f6ce79d4cc2e8141d5ba88178369c8.tar.gz
SERVER-64970 Start generating new v2 resume token format for all events by default
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_passthrough.yml (renamed from buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml)5
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_sharded_collections_passthrough.yml (renamed from buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml)5
-rw-r--r--etc/evergreen_yml_components/definitions.yml4
-rw-r--r--jstests/aggregation/api_version_stage_allowance_checks.js2
-rw-r--r--jstests/change_streams/generate_v1_resume_token.js (renamed from jstests/change_streams/generate_v2_resume_token.js)14
-rw-r--r--jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js (renamed from jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js)11
-rw-r--r--jstests/noPassthrough/change_stream_generate_v2_tokens_flag_with_test_commands_disabled.js (renamed from jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js)12
-rw-r--r--jstests/noPassthrough/change_stream_mongos_with_generate_v2_resume_tokens_flag.js (renamed from jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js)44
-rw-r--r--jstests/noPassthrough/change_stream_operation_metrics.js2
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp12
-rw-r--r--src/mongo/db/pipeline/aggregate_command.idl1
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform_test.cpp34
-rw-r--r--src/mongo/db/pipeline/change_stream_test_helpers.cpp18
-rw-r--r--src/mongo/db/pipeline/change_stream_test_helpers.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp609
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp2
-rw-r--r--src/mongo/db/pipeline/resume_token.h2
-rw-r--r--src/mongo/db/pipeline/resume_token_test.cpp5
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp19
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp14
20 files changed, 459 insertions, 368 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_passthrough.yml
index 4179f872b7f..1e122fc5fd9 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_passthrough.yml
@@ -1,11 +1,10 @@
test_kind: js_test
-# TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1.
selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
# This test explicitly compares v1 and v2 tokens, and must be able to generate the former.
- - jstests/change_streams/generate_v2_resume_token.js
+ - jstests/change_streams/generate_v1_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
@@ -40,7 +39,7 @@ executor:
var testingReplication = true;
load('jstests/libs/override_methods/set_read_and_write_concerns.js');
load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js');
- load('jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js');
+ load('jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js');
hooks:
# The CheckReplDBHash hook waits until all operations have replicated to and have been applied
# on the secondaries, so we run the ValidateCollections hook after it to ensure we're
diff --git a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_sharded_collections_passthrough.yml
index a0f93f572b0..90e6eccefb1 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_v2_resume_token_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_v1_resume_token_sharded_collections_passthrough.yml
@@ -1,11 +1,10 @@
test_kind: js_test
-# TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1.
selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
# This test explicitly compares v1 and v2 tokens, and must be able to generate the former.
- - jstests/change_streams/generate_v2_resume_token.js
+ - jstests/change_streams/generate_v1_resume_token.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
@@ -37,7 +36,7 @@ executor:
load('jstests/libs/override_methods/set_read_and_write_concerns.js');
load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js');
load('jstests/libs/override_methods/enable_causal_consistency_without_read_pref.js');
- load('jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js');
+ load('jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js');
hooks:
- class: CheckReplDBHash
- class: ValidateCollections
diff --git a/etc/evergreen_yml_components/definitions.yml b/etc/evergreen_yml_components/definitions.yml
index 234f6834e4a..546e7530984 100644
--- a/etc/evergreen_yml_components/definitions.yml
+++ b/etc/evergreen_yml_components/definitions.yml
@@ -4203,7 +4203,7 @@ tasks:
- func: "run tests"
- <<: *task_template
- name: change_streams_v2_resume_token_passthrough
+ name: change_streams_v1_resume_token_passthrough
tags: ["change_streams"]
commands:
- func: "do setup"
@@ -4267,7 +4267,7 @@ tasks:
- func: "run tests"
- <<: *task_template
- name: change_streams_v2_resume_token_sharded_collections_passthrough
+ name: change_streams_v1_resume_token_sharded_collections_passthrough
tags: ["change_streams"]
depends_on:
- name: change_streams
diff --git a/jstests/aggregation/api_version_stage_allowance_checks.js b/jstests/aggregation/api_version_stage_allowance_checks.js
index f47b27fd630..1ef9d090e3f 100644
--- a/jstests/aggregation/api_version_stage_allowance_checks.js
+++ b/jstests/aggregation/api_version_stage_allowance_checks.js
@@ -149,7 +149,7 @@ result = testDB.runCommand({
pipeline: [{$project: {_id: 0}}],
cursor: {},
writeConcern: {w: "majority"},
- $_generateV2ResumeTokens: true,
+ $_generateV2ResumeTokens: false,
apiVersion: "1",
apiStrict: true
});
diff --git a/jstests/change_streams/generate_v2_resume_token.js b/jstests/change_streams/generate_v1_resume_token.js
index 016b127d33b..05ebc2f6d92 100644
--- a/jstests/change_streams/generate_v2_resume_token.js
+++ b/jstests/change_streams/generate_v1_resume_token.js
@@ -1,8 +1,8 @@
/**
- * Test that the $_generateV2ResumeTokens parameter can be used to force change streams to return v2
+ * Test that the $_generateV2ResumeTokens parameter can be used to force change streams to return v1
* tokens.
* @tags: [
- * requires_fcv_60
+ * requires_fcv_61
* ]
*/
(function() {
@@ -12,11 +12,11 @@ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateC
const coll = assertDropAndRecreateCollection(db, jsTestName());
-// Create one stream that returns v1 tokens, the default.
-const v1Stream = coll.watch([]);
+// Create one stream that returns v2 tokens, the default.
+const v2Stream = coll.watch([]);
-// Create a second stream that explicitly requests v2 tokens.
-const v2Stream = coll.watch([], {$_generateV2ResumeTokens: true});
+// Create a second stream that explicitly requests v1 tokens.
+const v1Stream = coll.watch([], {$_generateV2ResumeTokens: false});
// Insert a test document into the collection.
assert.commandWorked(coll.insert({_id: 1}));
@@ -34,5 +34,5 @@ delete v1Event._id;
delete v2Event._id;
assert.docEq(v1Event, v2Event);
-assert.neq(v1ResumeToken, v2ResumeToken);
+assert.neq(v1ResumeToken, v2ResumeToken, {v1ResumeToken, v2ResumeToken});
})(); \ No newline at end of file
diff --git a/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js b/jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js
index 068d121fff7..b0fc5dc0de8 100644
--- a/jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js
+++ b/jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js
@@ -1,7 +1,6 @@
/**
* Loading this file overrides 'runCommand' with a function that modifies any $changeStream
- * aggregation to use $_generateV2ResumeTokens:true.
- * TODO SERVER-65370: remove this suite when v2 tokens become the default, or rework it to test v1.
+ * aggregation to use $_generateV2ResumeTokens:false.
*/
(function() {
"use strict";
@@ -9,18 +8,18 @@
load("jstests/libs/override_methods/override_helpers.js"); // For 'OverrideHelpers'.
// Override runCommand to set $_generateV2ResumeTokens on all $changeStreams.
-function runCommandV2Tokens(conn, dbName, cmdName, cmdObj, originalRunCommand, makeRunCommandArgs) {
+function runCommandV1Tokens(conn, dbName, cmdName, cmdObj, originalRunCommand, makeRunCommandArgs) {
if (OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj)) {
// Make a copy to avoid mutating the user's original command object.
- cmdObj = Object.assign({}, cmdObj, {$_generateV2ResumeTokens: true});
+ cmdObj = Object.assign({}, cmdObj, {$_generateV2ResumeTokens: false});
}
return originalRunCommand.apply(conn, makeRunCommandArgs(cmdObj));
}
// Always apply the override if a test spawns a parallel shell.
OverrideHelpers.prependOverrideInParallelShell(
- "jstests/libs/override_methods/implicit_v2_resume_token_changestreams.js");
+ "jstests/libs/override_methods/implicit_v1_resume_token_changestreams.js");
// Override the default runCommand with our custom version.
-OverrideHelpers.overrideRunCommand(runCommandV2Tokens);
+OverrideHelpers.overrideRunCommand(runCommandV1Tokens);
})(); \ No newline at end of file
diff --git a/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js b/jstests/noPassthrough/change_stream_generate_v2_tokens_flag_with_test_commands_disabled.js
index 2a1fe1fbad0..5b1585da5c1 100644
--- a/jstests/noPassthrough/change_stream_only_generates_v1_tokens_with_test_commands_disabled.js
+++ b/jstests/noPassthrough/change_stream_generate_v2_tokens_flag_with_test_commands_disabled.js
@@ -1,5 +1,6 @@
/**
- * Test that the $_generateV2ResumeTokens parameter cannot be used when test commands are disabled.
+ * Test that the $_generateV2ResumeTokens parameter cannot be used on mongoS when test commands are
+ * disabled.
* @tags: [
* uses_change_streams,
* requires_sharding,
@@ -16,12 +17,19 @@ TestData.enableTestCommands = false;
// Create a sharding fixture with test commands disabled.
const st = new ShardingTest({shards: 1, rs: {nodes: 1}});
-// Confirm that attempting to run change streams with $_generateV2ResumeTokens:true fails on mongos.
+// Confirm that attempting to set any values for $_generateV2ResumeTokens field fails on mongos.
assert.throwsWithCode(() => st.s.watch([], {$_generateV2ResumeTokens: true}).hasNext(), 6528201);
+assert.throwsWithCode(() => st.s.watch([], {$_generateV2ResumeTokens: false}).hasNext(), 6528201);
// Confirm that attempting to run change streams with $_generateV2ResumeTokens:true fails on shards.
assert.throwsWithCode(
() => st.rs0.getPrimary().watch([], {$_generateV2ResumeTokens: true}).hasNext(), 6528200);
+// Explicity requesting v1 tokens is allowed on a shard. This is to allow a 6.0 mongoS to
+// communicate with a 7.0 shard.
+const stream = st.rs0.getPrimary().watch([], {$_generateV2ResumeTokens: false});
+assert.commandWorked(st.s.getDB("test")["coll"].insert({x: 1}));
+assert.soon(() => stream.hasNext());
+
st.stop();
})(); \ No newline at end of file
diff --git a/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js b/jstests/noPassthrough/change_stream_mongos_with_generate_v2_resume_tokens_flag.js
index 5139bff5b56..193c7e76825 100644
--- a/jstests/noPassthrough/change_stream_mongos_explicitly_requests_v1_resume_token.js
+++ b/jstests/noPassthrough/change_stream_mongos_with_generate_v2_resume_tokens_flag.js
@@ -1,9 +1,8 @@
/**
- * Test that mongoS explicitly sets the value of $_generateV2ResumeTokens to 'false' on the commands
- * it sends to the shards if no value was specified by the client, and that if a value was
- * specified, it forwards it to the shards. On a replica set, no explicit value is set; the
- * aggregation simply treats it as default-false.
- * TODO SERVER-65370: remove or rework this test when v2 tokens become the default.
+ * Test that mongoS does not set the value of $_generateV2ResumeTokens on the commands it sends to
+ * the shards, if no value was specified by the client. If a value was specified, mongoS forwards it
+ * to the shards. On a replica set, no explicit value is set; the aggregation simply treats it as
+ * default-true.
* @tags: [
* uses_change_streams,
* requires_sharding,
@@ -34,24 +33,24 @@ const configColl = configDB.shards;
assert.commandWorked(shardDB.setProfilingLevel(2));
assert.commandWorked(configDB.setProfilingLevel(2));
-// Create one stream on mongoS that returns v1 tokens, the default.
-const v1MongosStream = mongosColl.watch([], {comment: "v1MongosStream"});
+// Create one stream on mongoS that returns v2 tokens, the default.
+const v2MongosStream = mongosColl.watch([], {comment: "v2MongosStream"});
-// Create a second stream on mongoS that explicitly requests v2 tokens.
-const v2MongosStream =
- mongosColl.watch([], {comment: "v2MongosStream", $_generateV2ResumeTokens: true});
+// Create a second stream on mongoS that explicitly requests v1 tokens.
+const v1MongosStream =
+ mongosColl.watch([], {comment: "v1MongosStream", $_generateV2ResumeTokens: false});
-// Create a stream directly on the shard which returns the default v1 tokens.
-const v1ShardStream = shardColl.watch([], {comment: "v1ShardStream"});
+// Create a stream directly on the shard which returns the default v2 tokens.
+const v2ShardStream = shardColl.watch([], {comment: "v2ShardStream"});
// Insert a test document into the collection.
assert.commandWorked(mongosColl.insert({_id: 1}));
// Wait until all streams have encountered the insert operation.
-assert.soon(() => v1MongosStream.hasNext() && v2MongosStream.hasNext() && v1ShardStream.hasNext());
+assert.soon(() => v1MongosStream.hasNext() && v2MongosStream.hasNext() && v2ShardStream.hasNext());
-// Confirm that in a sharded cluster, mongoS explicitly sets $_generateV2ResumeTokens to false on
-// the command that it sends to the shards if nothing was specified by the client.
+// Confirm that in a sharded cluster, when v1 token is explicitly requested, mongoS fowards
+// $_generateV2ResumeTokens:false to the shard.
profilerHasAtLeastOneMatchingEntryOrThrow({
profileDB: shardDB,
filter: {
@@ -72,35 +71,36 @@ profilerHasAtLeastOneMatchingEntryOrThrow({
}
});
-// Confirm that mongoS correctly forwards the value of $_generateV2ResumeTokens to the shards if it
-// is specified by the client.
+// Confirm that mongoS never sets the $_generateV2ResumeTokens field when client didn't explicitly
+// specify.
profilerHasAtLeastOneMatchingEntryOrThrow({
profileDB: shardDB,
filter: {
"originatingCommand.aggregate": mongosColl.getName(),
"originatingCommand.comment": "v2MongosStream",
- "originatingCommand.$_generateV2ResumeTokens": true
+ "originatingCommand.$_generateV2ResumeTokens": {$exists: false}
}
});
-// Confirm that we also forward the value of $_generateV2ResumeTokens to the config servers.
+// Confirm that we also do not set the $_generateV2ResumeTokens field on the request sent to the
+// config server.
profilerHasAtLeastOneMatchingEntryOrThrow({
profileDB: configDB,
filter: {
"originatingCommand.aggregate": configColl.getName(),
"originatingCommand.comment": "v2MongosStream",
- "originatingCommand.$_generateV2ResumeTokens": true
+ "originatingCommand.$_generateV2ResumeTokens": {$exists: false}
}
});
// Confirm that on a replica set - in this case, a direct connection to the shard - no value is set
// for $_generateV2ResumeTokens if the client did not specify one. The aggregation defaults to
-// treating the value as false.
+// treating the value as true.
profilerHasAtLeastOneMatchingEntryOrThrow({
profileDB: shardDB,
filter: {
"originatingCommand.aggregate": mongosColl.getName(),
- "originatingCommand.comment": "v1ShardStream",
+ "originatingCommand.comment": "v2ShardStream",
"originatingCommand.$_generateV2ResumeTokens": {$exists: false}
}
});
diff --git a/jstests/noPassthrough/change_stream_operation_metrics.js b/jstests/noPassthrough/change_stream_operation_metrics.js
index bf7c75a9e24..37407122954 100644
--- a/jstests/noPassthrough/change_stream_operation_metrics.js
+++ b/jstests/noPassthrough/change_stream_operation_metrics.js
@@ -164,7 +164,7 @@ let nextId = nDocs;
assert.gt(metrics[dbName].primaryMetrics.docUnitsRead, 0);
assert.gt(metrics[dbName].primaryMetrics.cursorSeeks, 0);
// Returns one large document
- assert.eq(metrics[dbName].primaryMetrics.docUnitsReturned, 3);
+ assert.eq(metrics[dbName].primaryMetrics.docUnitsReturned, 4);
});
// Update the document and ensure the metrics are aggregated.
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index ecf938a5e41..4a7a59f2fd1 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -445,12 +445,12 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
expCtx->forPerShardCursor = request.getPassthroughToShard().has_value();
expCtx->allowDiskUse = request.getAllowDiskUse().value_or(allowDiskUseByDefault.load());
- // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0
- // we only expect this to occur during testing.
- // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false.
- if (request.getGenerateV2ResumeTokens()) {
- uassert(6528200, "Invalid request for v2 resume tokens", getTestCommandsEnabled());
- expCtx->changeStreamTokenVersion = 2;
+ // If the request explicitly specified NOT to use v2 resume tokens for change streams, set this
+ // on the expCtx. This can happen if a the request originated from 6.0 mongos, or in test mode.
+ if (request.getGenerateV2ResumeTokens().has_value()) {
+ // We only ever expect an explicit $_generateV2ResumeTokens to be false.
+ uassert(6528200, "Invalid request for v2 tokens", !request.getGenerateV2ResumeTokens());
+ expCtx->changeStreamTokenVersion = 1;
}
return expCtx;
diff --git a/src/mongo/db/pipeline/aggregate_command.idl b/src/mongo/db/pipeline/aggregate_command.idl
index b84513a30e6..03637da1041 100644
--- a/src/mongo/db/pipeline/aggregate_command.idl
+++ b/src/mongo/db/pipeline/aggregate_command.idl
@@ -261,7 +261,6 @@ commands:
cpp_name: passthroughToShard
optional: true
unstable: true
- # TODO SERVER-65370: after 6.0, $_generateV2ResumeTokens should be assumed true if absent.
# TODO SERVER-65369: $_generateV2ResumeTokens can be removed after 7.0.
$_generateV2ResumeTokens:
description: "Internal parameter to signal whether v2 resume tokens should be generated."
diff --git a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp
index 2f4271e097f..89dabf3c73e 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform_test.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform_test.cpp
@@ -73,7 +73,10 @@ TEST(ChangeStreamEventTransformTest, TestDefaultUpdateTransform) {
// Update fields
Document expectedUpdateField{
{DocumentSourceChangeStream::kIdField,
- makeResumeToken(kDefaultTs, testUuid(), Value(documentKey))},
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ Value(documentKey),
+ DocumentSourceChangeStream::kUpdateOpType)},
{DocumentSourceChangeStream::kOperationTypeField,
DocumentSourceChangeStream::kUpdateOpType},
{DocumentSourceChangeStream::kClusterTimeField, kDefaultTs},
@@ -111,10 +114,7 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewTransform) {
Document expectedDoc{
{DocumentSourceChangeStream::kIdField,
makeResumeToken(
- kDefaultTs,
- testUuid(),
- Value(Document{{"operationType", DocumentSourceChangeStream::kCreateOpType},
- {"operationDescription", opDescription}}))},
+ kDefaultTs, testUuid(), opDescription, DocumentSourceChangeStream::kCreateOpType)},
{DocumentSourceChangeStream::kOperationTypeField,
DocumentSourceChangeStream::kCreateOpType},
{DocumentSourceChangeStream::kClusterTimeField, kDefaultTs},
@@ -145,17 +145,19 @@ TEST(ChangeStreamEventTransformTest, TestCreateViewOnSingleCollection) {
boost::none, // fromMigrate
boost::none); // o2
- Document expectedDoc{{DocumentSourceChangeStream::kIdField,
- makeResumeToken(kDefaultTs, testUuid(), documentKey)},
- {DocumentSourceChangeStream::kOperationTypeField,
- DocumentSourceChangeStream::kInsertOpType},
- {DocumentSourceChangeStream::kClusterTimeField, kDefaultTs},
- {DocumentSourceChangeStream::kCollectionUuidField, testUuid()},
- {DocumentSourceChangeStream::kWallTimeField, Date_t()},
- {DocumentSourceChangeStream::kFullDocumentField, Document(document)},
- {DocumentSourceChangeStream::kNamespaceField,
- Document{{"db", systemViewNss.db()}, {"coll", systemViewNss.coll()}}},
- {DocumentSourceChangeStream::kDocumentKeyField, documentKey}};
+ Document expectedDoc{
+ {DocumentSourceChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, testUuid(), documentKey, DocumentSourceChangeStream::kInsertOpType)},
+ {DocumentSourceChangeStream::kOperationTypeField,
+ DocumentSourceChangeStream::kInsertOpType},
+ {DocumentSourceChangeStream::kClusterTimeField, kDefaultTs},
+ {DocumentSourceChangeStream::kCollectionUuidField, testUuid()},
+ {DocumentSourceChangeStream::kWallTimeField, Date_t()},
+ {DocumentSourceChangeStream::kFullDocumentField, Document(document)},
+ {DocumentSourceChangeStream::kNamespaceField,
+ Document{{"db", systemViewNss.db()}, {"coll", systemViewNss.coll()}}},
+ {DocumentSourceChangeStream::kDocumentKeyField, documentKey}};
ASSERT_DOCUMENT_EQ(applyTransformation(oplogEntry), expectedDoc);
}
diff --git a/src/mongo/db/pipeline/change_stream_test_helpers.cpp b/src/mongo/db/pipeline/change_stream_test_helpers.cpp
index b3a29103d94..ccb8966e5da 100644
--- a/src/mongo/db/pipeline/change_stream_test_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_test_helpers.cpp
@@ -58,12 +58,26 @@ LogicalSessionFromClient testLsid() {
Document makeResumeToken(Timestamp ts,
ImplicitValue uuid,
- ImplicitValue docKey,
+ ImplicitValue docKeyOrOpDesc,
+ StringData operationType,
ResumeTokenData::FromInvalidate fromInvalidate,
size_t txnOpIndex) {
+ static const std::set<StringData> kCrudOps = {
+ "insert"_sd, "update"_sd, "replace"_sd, "delete"_sd};
+ auto eventId = Value(Document{
+ {"operationType", operationType},
+ {kCrudOps.count(operationType) ? "documentKey" : "operationDescription", docKeyOrOpDesc}});
+ return makeResumeTokenWithEventId(ts, uuid, eventId, fromInvalidate, txnOpIndex);
+}
+
+Document makeResumeTokenWithEventId(Timestamp ts,
+ ImplicitValue uuid,
+ ImplicitValue eventIdentifier,
+ ResumeTokenData::FromInvalidate fromInvalidate,
+ size_t txnOpIndex) {
ResumeTokenData tokenData;
tokenData.clusterTime = ts;
- tokenData.eventIdentifier = docKey;
+ tokenData.eventIdentifier = eventIdentifier;
tokenData.fromInvalidate = fromInvalidate;
tokenData.txnOpIndex = txnOpIndex;
if (!uuid.missing())
diff --git a/src/mongo/db/pipeline/change_stream_test_helpers.h b/src/mongo/db/pipeline/change_stream_test_helpers.h
index 2a2059cc582..b2492dbe5dd 100644
--- a/src/mongo/db/pipeline/change_stream_test_helpers.h
+++ b/src/mongo/db/pipeline/change_stream_test_helpers.h
@@ -57,12 +57,20 @@ const UUID& testUuid();
LogicalSessionFromClient testLsid();
Document makeResumeToken(Timestamp ts,
- ImplicitValue uuid = Value(),
- ImplicitValue docKey = Value(),
+ ImplicitValue uuid,
+ ImplicitValue docKeyOrOpDesc,
+ StringData operationType,
ResumeTokenData::FromInvalidate fromInvalidate =
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
size_t txnOpIndex = 0);
+Document makeResumeTokenWithEventId(Timestamp ts,
+ ImplicitValue uuid,
+ ImplicitValue eventIdentifier,
+ ResumeTokenData::FromInvalidate fromInvalidate =
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate,
+ size_t txnOpIndex = 0);
+
/**
* Creates an OplogEntry with given parameters and preset defaults for this test suite.
*/
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index cc559e218a5..a1b1c22bd5f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -345,6 +345,24 @@ public:
return res;
}
+ Document makeExpectedUpdateEvent(Timestamp ts,
+ const NamespaceString& nss,
+ BSONObj documentKey,
+ Document upateMod,
+ bool expandedEvents = false) {
+ return Document{
+ {DSChangeStream::kIdField,
+ makeResumeToken(ts, testUuid(), V{documentKey}, DSChangeStream::kUpdateOpType)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kCollectionUuidField, expandedEvents ? V{testUuid()} : Value()},
+ {DSChangeStream::kWallTimeField, Date_t()},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, V{documentKey}},
+ {"updateDescription", upateMod},
+ };
+ }
+
/**
* Helper function to do a $v:2 delta oplog test.
*/
@@ -356,19 +374,9 @@ public:
testUuid(), // uuid
boost::none, // fromMigrate
o2); // o2
- // Update fields
- Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
- {DSChangeStream::kWallTimeField, Date_t()},
- {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
- {
- "updateDescription",
- updateModificationEntry,
- },
- };
+
+ const auto expectedUpdateField =
+ makeExpectedUpdateEvent(kDefaultTs, nss, o2, updateModificationEntry);
checkTransformation(deltaOplog, expectedUpdateField);
}
@@ -513,16 +521,17 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndResumeAfter
catalog.registerCollection(expCtx->opCtx, testUuid(), std::move(collection));
});
- ASSERT_THROWS_CODE(
- DSChangeStream::createFromBson(
- BSON(DSChangeStream::kStageName
- << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))
- << "startAtOperationTime" << kDefaultTs))
- .firstElement(),
- expCtx),
- AssertionException,
- 40674);
+ ASSERT_THROWS_CODE(DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON(
+ "resumeAfter" << makeResumeToken(kDefaultTs,
+ testUuid(),
+ BSON("x" << 2 << "_id" << 1),
+ DSChangeStream::kInsertOpType)
+ << "startAtOperationTime" << kDefaultTs))
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ 40674);
}
TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAfterAndResumeAfterOptions) {
@@ -538,10 +547,15 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAfterAndResumeAfterOptions) {
ASSERT_THROWS_CODE(
DSChangeStream::createFromBson(
BSON(DSChangeStream::kStageName
- << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))
- << "startAfter"
- << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))))
+ << BSON("resumeAfter" << makeResumeToken(kDefaultTs,
+ testUuid(),
+ BSON("x" << 2 << "_id" << 1),
+ DSChangeStream::kInsertOpType)
+ << "startAfter"
+ << makeResumeToken(kDefaultTs,
+ testUuid(),
+ BSON("x" << 2 << "_id" << 1),
+ DSChangeStream::kInsertOpType)))
.firstElement(),
expCtx),
AssertionException,
@@ -558,16 +572,17 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndStartAfterO
catalog.registerCollection(opCtx, testUuid(), std::move(collection));
});
- ASSERT_THROWS_CODE(
- DSChangeStream::createFromBson(
- BSON(DSChangeStream::kStageName
- << BSON("startAfter"
- << makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))
- << "startAtOperationTime" << kDefaultTs))
- .firstElement(),
- expCtx),
- AssertionException,
- 40674);
+ ASSERT_THROWS_CODE(DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON(
+ "startAfter" << makeResumeToken(kDefaultTs,
+ testUuid(),
+ BSON("x" << 2 << "_id" << 1),
+ DSChangeStream::kInsertOpType)
+ << "startAtOperationTime" << kDefaultTs))
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ 40674);
}
TEST_F(ChangeStreamStageTest, ShouldRejectResumeAfterWithResumeTokenMissingUUID) {
@@ -582,7 +597,9 @@ TEST_F(ChangeStreamStageTest, ShouldRejectResumeAfterWithResumeTokenMissingUUID)
ASSERT_THROWS_CODE(
DSChangeStream::createFromBson(
- BSON(DSChangeStream::kStageName << BSON("resumeAfter" << makeResumeToken(kDefaultTs)))
+ BSON(DSChangeStream::kStageName
+ << BSON("resumeAfter" << makeResumeToken(
+ kDefaultTs, Value(), Value(), DSChangeStream::kInsertOpType)))
.firstElement(),
expCtx),
AssertionException,
@@ -649,7 +666,8 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
Document expectedInsert{
{DSChangeStream::kIdField,
- makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
+ makeResumeToken(
+ kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -678,7 +696,8 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) {
Document expectedInsert{
{DSChangeStream::kIdField,
- makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))},
+ makeResumeToken(
+ kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -698,7 +717,8 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) {
BSON("_id" << 1)); // o2
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -733,7 +753,8 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) {
auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
Document expectedInsert{
{DSChangeStream::kIdField,
- makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2))},
+ makeResumeToken(
+ kDefaultTs, testUuid(), BSON("_id" << 1 << "x" << 2), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -754,19 +775,8 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFields) {
boost::none, // fromMigrate
o2); // o2
- // Update fields
- Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
- {DSChangeStream::kWallTimeField, Date_t()},
- {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
- {
- "updateDescription",
- D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}},
- },
- };
+ const auto expectedUpdateField = makeExpectedUpdateEvent(
+ kDefaultTs, nss, o2, D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}});
checkTransformation(updateField, expectedUpdateField);
}
@@ -780,20 +790,12 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsShowExpandedEvents) {
boost::none, // fromMigrate
o2); // o2
- // Update fields
- Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
- {DSChangeStream::kCollectionUuidField, testUuid()},
- {DSChangeStream::kWallTimeField, Date_t()},
- {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
- {
- "updateDescription",
- D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}},
- },
- };
+ const auto expectedUpdateField =
+ makeExpectedUpdateEvent(kDefaultTs,
+ nss,
+ o2,
+ D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}},
+ true /* expanded events */);
checkTransformation(updateField, expectedUpdateField, kShowExpandedEventsSpec);
}
@@ -935,19 +937,8 @@ TEST_F(ChangeStreamStageTest, TransformUpdateFieldsLegacyNoId) {
boost::none, // fromMigrate
o2); // o2
- // Update fields
- Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
- {DSChangeStream::kWallTimeField, Date_t()},
- {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kDocumentKeyField, D{{"x", 1}, {"y", 1}}},
- {
- "updateDescription",
- D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}},
- },
- };
+ const auto expectedUpdateField = makeExpectedUpdateEvent(
+ kDefaultTs, nss, o2, D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}});
checkTransformation(updateField, expectedUpdateField);
}
@@ -961,19 +952,9 @@ TEST_F(ChangeStreamStageTest, TransformRemoveFields) {
boost::none, // fromMigrate
o2); // o2
- // Remove fields
- Document expectedRemoveField{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
- {DSChangeStream::kWallTimeField, Date_t()},
- {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kDocumentKeyField, Document{{"_id", 1}, {"x", 2}}},
- {
- "updateDescription",
- D{{"updatedFields", D{}}, {"removedFields", {"y"_sd}}},
- }};
- checkTransformation(removeField, expectedRemoveField);
+ const auto expectedUpdateField = makeExpectedUpdateEvent(
+ kDefaultTs, nss, o2, D{{"updatedFields", D{}}, {"removedFields", {"y"_sd}}});
+ checkTransformation(removeField, expectedUpdateField);
} // namespace
TEST_F(ChangeStreamStageTest, TransformReplace) {
@@ -988,7 +969,8 @@ TEST_F(ChangeStreamStageTest, TransformReplace) {
// Replace
Document expectedReplace{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1011,7 +993,8 @@ TEST_F(ChangeStreamStageTest, TransformReplaceShowExpandedEvents) {
// Replace
Document expectedReplace{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kCollectionUuidField, testUuid()},
@@ -1034,7 +1017,8 @@ TEST_F(ChangeStreamStageTest, TransformDelete) {
// Delete
Document expectedDelete{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1065,7 +1049,8 @@ TEST_F(ChangeStreamStageTest, TransformDeleteShowExpandedEvents) {
// Delete
Document expectedDelete{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kCollectionUuidField, testUuid()},
@@ -1110,7 +1095,8 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) {
auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
Document expectedDelete{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1125,7 +1111,8 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid());
Document expectedDrop{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1133,8 +1120,11 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ Value(),
+ DSChangeStream::kDropCollectionOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1147,7 +1137,8 @@ TEST_F(ChangeStreamStageTest, TransformDropShowExpandedEvents) {
OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid());
Document expectedDrop{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kCollectionUuidField, testUuid()},
@@ -1157,8 +1148,11 @@ TEST_F(ChangeStreamStageTest, TransformDropShowExpandedEvents) {
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ Value(),
+ DSChangeStream::kDropCollectionOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1178,10 +1172,7 @@ TEST_F(ChangeStreamStageTest, TransformCreate) {
Document expectedCreate{
{DSChangeStream::kIdField,
makeResumeToken(
- kDefaultTs,
- testUuid(),
- Value(Document{{"operationType", DocumentSourceChangeStream::kCreateOpType},
- {"operationDescription", expectedOpDescription}}))},
+ kDefaultTs, testUuid(), Value(expectedOpDescription), DSChangeStream::kCreateOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kCreateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kCollectionUuidField, testUuid()},
@@ -1201,7 +1192,8 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
Document expectedRename{
{DSChangeStream::kRenameTargetNssField,
D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1209,8 +1201,11 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ Value(),
+ DSChangeStream::kRenameCollectionOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1226,25 +1221,30 @@ TEST_F(ChangeStreamStageTest, TransformRenameShowExpandedEvents) {
<< "dropTarget" << dropTarget),
testUuid());
+ const auto opDescription = D{
+ {"to", D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
+ {"dropTarget", dropTarget},
+ };
Document expectedRename{
{DSChangeStream::kRenameTargetNssField,
D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, testUuid(), V{opDescription}, DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kCollectionUuidField, testUuid()},
{DSChangeStream::kWallTimeField, Date_t()},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kOperationDescriptionField,
- D{
- {"to", D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
- {"dropTarget", dropTarget},
- }},
+ {DSChangeStream::kOperationDescriptionField, opDescription},
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ V{opDescription},
+ DSChangeStream::kRenameCollectionOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1279,7 +1279,8 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
Document expectedRename{
{DSChangeStream::kRenameTargetNssField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1287,8 +1288,11 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ Value(),
+ DSChangeStream::kRenameCollectionOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1312,7 +1316,12 @@ TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
o2Field.toBson());
Document expectedNewShardDetected{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << o2Field))},
+ {DSChangeStream::kIdField,
+ makeResumeTokenWithEventId(
+ kDefaultTs,
+ testUuid(),
+ Document{{"operationType", DSChangeStream::kNewShardDetectedOpType},
+ {"documentKey", BSON("_id" << o2Field)}})},
{DSChangeStream::kOperationTypeField, DSChangeStream::kNewShardDetectedOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1340,7 +1349,10 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) {
Document expectedReshardingBegin{
{DSChangeStream::kReshardingUuidField, reshardingUuid},
{DSChangeStream::kIdField,
- makeResumeToken(kDefaultTs, uuid, BSON("_id" << o2Field.toBSON()))},
+ makeResumeTokenWithEventId(kDefaultTs,
+ uuid,
+ Document{{"operationType", DSChangeStream::kReshardBeginOpType},
+ {"documentKey", BSON("_id" << o2Field.toBSON())}})},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1370,7 +1382,11 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
Document expectedReshardingDoneCatchUp{
{DSChangeStream::kReshardingUuidField, reshardingUuid},
{DSChangeStream::kIdField,
- makeResumeToken(kDefaultTs, reshardingUuid, BSON("_id" << o2Field.toBSON()))},
+ makeResumeTokenWithEventId(
+ kDefaultTs,
+ reshardingUuid,
+ Document{{"operationType", DSChangeStream::kReshardDoneCatchUpOpType},
+ {"documentKey", BSON("_id" << o2Field.toBSON())}})},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardDoneCatchUpOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1586,7 +1602,8 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
Document expectedResult{
{DSChangeStream::kTxnNumberField, static_cast<int>(*sessionInfo.getTxnNumber())},
{DSChangeStream::kLsidField, Document{{sessionInfo.getSessionId()->toBSON()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSONObj())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), BSONObj(), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -1682,6 +1699,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
makeResumeToken(applyOpsOpTime2.getTimestamp(),
testUuid(),
V{D{{"_id", 123}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
0));
@@ -1699,6 +1717,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
makeResumeToken(applyOpsOpTime2.getTimestamp(),
testUuid(),
V{D{{"_id", 456}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
1));
@@ -1716,6 +1735,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
makeResumeToken(applyOpsOpTime2.getTimestamp(),
testUuid(),
V{D{{"_id", 789}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
2));
}
@@ -1853,6 +1873,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
makeResumeToken(applyOpsOpTime5.getTimestamp(),
testUuid(),
V{D{{"_id", 123}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
0));
@@ -1870,6 +1891,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
makeResumeToken(applyOpsOpTime5.getTimestamp(),
testUuid(),
V{D{{"_id", 456}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
1));
}
@@ -2037,6 +2059,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
V{D{{"_id", 123}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
0));
@@ -2055,6 +2078,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
V{D{{"_id", 456}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
1));
@@ -2073,6 +2097,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
V{D{{"_id", 789}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
2));
@@ -2181,6 +2206,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
V{D{{"_id", 123}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
0));
@@ -2199,6 +2225,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
V{D{{"_id", 456}}},
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
1));
@@ -2325,18 +2352,8 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
o2, // o2
opTime); // opTime
- Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), o2)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
- {DSChangeStream::kClusterTimeField, ts},
- {DSChangeStream::kWallTimeField, Date_t()},
- {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
- {
- "updateDescription",
- D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}},
- },
- };
+ const auto expectedUpdateField = makeExpectedUpdateEvent(
+ ts, nss, o2, D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}});
checkTransformation(updateField, expectedUpdateField);
// Test the 'clusterTime' field is copied from the oplog entry for a collection drop.
@@ -2344,7 +2361,8 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
createCommand(BSON("drop" << nss.coll()), testUuid(), boost::none, opTime);
Document expectedDrop{
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(ts, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, ts},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2363,7 +2381,8 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
Document expectedRename{
{DSChangeStream::kRenameTargetNssField,
D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(ts, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, ts},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2467,7 +2486,8 @@ TEST_F(ChangeStreamStageTest, DSCSTransformStageWithResumeTokenSerialize) {
auto expCtx = getExpCtx();
DocumentSourceChangeStreamSpec spec;
- spec.setResumeAfter(ResumeToken::parse(makeResumeToken(kDefaultTs, testUuid())));
+ spec.setResumeAfter(ResumeToken::parse(
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)));
auto originalSpec = BSON("" << spec.toBSON());
// Verify that the 'initialPostBatchResumeToken' is populated while parsing.
@@ -2533,8 +2553,12 @@ TEST_F(ChangeStreamStageTest, DSCSCheckInvalidateStageSerialization) {
auto expCtx = getExpCtx();
DocumentSourceChangeStreamCheckInvalidateSpec spec;
- spec.setStartAfterInvalidate(ResumeToken::parse(makeResumeToken(
- kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)));
+ spec.setStartAfterInvalidate(
+ ResumeToken::parse(makeResumeToken(kDefaultTs,
+ testUuid(),
+ Value(),
+ DSChangeStream::kDropCollectionOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)));
auto stageSpecAsBSON = BSON("" << spec.toBSON());
validateDocumentSourceStageSerialization<DocumentSourceChangeStreamCheckInvalidate>(
@@ -2545,7 +2569,8 @@ TEST_F(ChangeStreamStageTest, DSCSResumabilityStageSerialization) {
auto expCtx = getExpCtx();
DocumentSourceChangeStreamCheckResumabilitySpec spec;
- spec.setResumeToken(ResumeToken::parse(makeResumeToken(kDefaultTs, testUuid())));
+ spec.setResumeToken(ResumeToken::parse(
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)));
auto stageSpecAsBSON = BSON("" << spec.toBSON());
validateDocumentSourceStageSerialization<DocumentSourceChangeStreamCheckResumability>(
@@ -2578,7 +2603,8 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
auto lastStage = stages.back();
Document expectedDrop{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2586,8 +2612,11 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ Value(),
+ DSChangeStream::kDropCollectionOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2629,7 +2658,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2FieldIn
BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2);
- auto resumeToken = makeResumeToken(ts, uuid, docKey);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey, DSChangeStream::kInsertOpType);
BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
@@ -2642,7 +2671,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2FieldIn
// The documentKey should have just an _id in this case.
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, uuid, D{{"_id", 2}})},
+ {DSChangeStream::kIdField,
+ makeResumeToken(ts, uuid, D{{"_id", 2}}, DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2671,7 +2701,7 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldUseO2FieldInOplog) {
BSONObj docKey = BSON("_id" << 1);
- auto resumeToken = makeResumeToken(ts, uuid, docKey);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey, DSChangeStream::kInsertOpType);
BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
BSONObj o2 = BSON("_id" << 2 << "shardKey" << 3);
@@ -2684,7 +2714,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldUseO2FieldInOplog) {
opTime); // opTime
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(ts, uuid, insertDoc, DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2712,7 +2743,7 @@ TEST_F(ChangeStreamStageTest, ResumeAfterFailsIfResumeTokenDoesNotContainUUID) {
});
// Create a resume token from only the timestamp.
- auto resumeToken = makeResumeToken(ts);
+ auto resumeToken = makeResumeToken(ts, Value(), Value(), DSChangeStream::kDropCollectionOpType);
ASSERT_THROWS_CODE(
DSChangeStream::createFromBson(
@@ -2731,7 +2762,8 @@ TEST_F(ChangeStreamStageTest, RenameFromSystemToUserCollectionShouldIncludeNotif
// Note that the collection rename does *not* have the queued invalidated field.
Document expectedRename{
{DSChangeStream::kRenameTargetNssField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2750,7 +2782,8 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif
Document expectedRename{
{DSChangeStream::kRenameTargetNssField,
D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2772,6 +2805,7 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
makeResumeToken(kDefaultTs,
testUuid(),
BSON("x" << 2 << "_id" << 1),
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kFromInvalidate);
ASSERT_THROWS_CODE(
@@ -2799,7 +2833,8 @@ TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) {
auto next = stages.back()->getNext();
- auto expectedSortKey = makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1));
+ auto expectedSortKey = makeResumeToken(
+ kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType);
ASSERT_TRUE(next.isAdvanced());
ASSERT_VALUE_EQ(next.releaseDocument().metadata().getSortKey(), Value(expectedSortKey));
@@ -2825,7 +2860,8 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) {
Document expectedInsert{
{DSChangeStream::kIdField,
- makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
+ makeResumeToken(
+ kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2846,7 +2882,8 @@ TEST_F(ChangeStreamStageDBTest, TransformInsertShowExpandedEvents) {
Document expectedInsert{
{DSChangeStream::kIdField,
- makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
+ makeResumeToken(
+ kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kCollectionUuidField, testUuid()},
@@ -2870,7 +2907,8 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) {
// Insert on another collection in the same database.
Document expectedInsert{
{DSChangeStream::kIdField,
- makeResumeToken(kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1))},
+ makeResumeToken(
+ kDefaultTs, testUuid(), BSON("x" << 2 << "_id" << 1), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2929,7 +2967,9 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
auto insert = makeOplogEntry(
OpTypeEnum::kInsert, ns, BSON("_id" << 1), testUuid(), boost::none, BSON("_id" << 1));
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
+ {DSChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, testUuid(), BSON("_id" << 1), DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -2946,15 +2986,8 @@ TEST_F(ChangeStreamStageDBTest, TransformUpdateFields) {
BSONObj o2 = BSON("_id" << 1 << "x" << 2);
auto updateField = makeOplogEntry(OpTypeEnum::kUpdate, nss, o, testUuid(), boost::none, o2);
- Document expectedUpdateField{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
- {DSChangeStream::kWallTimeField, Date_t()},
- {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
- {"updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}},
- };
+ const auto expectedUpdateField = makeExpectedUpdateEvent(
+ kDefaultTs, nss, o2, D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}});
checkTransformation(updateField, expectedUpdateField);
}
@@ -2968,18 +3001,8 @@ TEST_F(ChangeStreamStageDBTest, TransformRemoveFields) {
boost::none, // fromMigrate
o2); // o2
- // Remove fields
- Document expectedRemoveField{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
- {DSChangeStream::kWallTimeField, Date_t()},
- {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
- {
- "updateDescription",
- D{{"updatedFields", D{}}, {"removedFields", {"y"_sd}}},
- }};
+ const auto expectedRemoveField = makeExpectedUpdateEvent(
+ kDefaultTs, nss, o2, D{{"updatedFields", D{}}, {"removedFields", {"y"_sd}}});
checkTransformation(removeField, expectedRemoveField);
}
@@ -2995,7 +3018,8 @@ TEST_F(ChangeStreamStageDBTest, TransformReplace) {
// Replace
Document expectedReplace{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o2)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), o2, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3017,7 +3041,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDelete) {
// Delete
Document expectedDelete{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3062,7 +3087,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) {
// Delete
auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
Document expectedDelete{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), o)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), o, DSChangeStream::kDeleteOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3076,7 +3102,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) {
TEST_F(ChangeStreamStageDBTest, TransformDrop) {
OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid());
Document expectedDrop{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3093,7 +3120,8 @@ TEST_F(ChangeStreamStageDBTest, TransformRename) {
Document expectedRename{
{DSChangeStream::kRenameTargetNssField,
D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3107,7 +3135,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
// Drop database entry doesn't have a UUID.
Document expectedDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, Value(), Value(), DSChangeStream::kDropDatabaseOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3115,8 +3144,11 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ Value(),
+ Value(),
+ DSChangeStream::kDropDatabaseOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3130,7 +3162,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabaseShowExpandedEvents) {
// Drop database entry doesn't have a UUID.
Document expectedDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, Value(), Value(), DSChangeStream::kDropDatabaseOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3138,8 +3171,11 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabaseShowExpandedEvents) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField,
- makeResumeToken(
- kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
+ makeResumeToken(kDefaultTs,
+ Value(),
+ Value(),
+ DSChangeStream::kDropDatabaseOpType,
+ ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3192,7 +3228,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "off"));
Document expectedDeleteNoPreImage{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kDeleteOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3206,7 +3243,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "whenAvailable"));
Document expectedDeleteWithPreImage{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kDeleteOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kDeleteOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3286,7 +3324,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "off"));
Document expectedUpdateNoPreImage{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kUpdateOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3304,7 +3343,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "whenAvailable"));
Document expectedUpdateWithPreImage{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kUpdateOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kUpdateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3388,7 +3428,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
auto spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "off"));
Document expectedReplaceNoPreImage{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3403,7 +3444,8 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "whenAvailable"));
Document expectedReplaceWithPreImage{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), documentKey)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), documentKey, DSChangeStream::kReplaceOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kReplaceOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3466,7 +3508,8 @@ TEST_F(ChangeStreamStageDBTest, RenameFromSystemToUserCollectionShouldIncludeNot
Document expectedRename{
{DSChangeStream::kRenameTargetNssField,
D{{"db", renamedColl.db()}, {"coll", renamedColl.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3485,7 +3528,8 @@ TEST_F(ChangeStreamStageDBTest, RenameFromUserToSystemCollectionShouldIncludeNot
Document expectedRename{
{DSChangeStream::kRenameTargetNssField,
D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}},
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), Value(), DSChangeStream::kRenameCollectionOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3514,7 +3558,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2Field
});
BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2);
- auto resumeToken = makeResumeToken(ts, uuid, docKey);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey, DSChangeStream::kInsertOpType);
BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
@@ -3527,7 +3571,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2Field
// The documentKey should just have an _id in this case.
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, uuid, D{{"_id", 2}})},
+ {DSChangeStream::kIdField,
+ makeResumeToken(ts, uuid, D{{"_id", 2}}, DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3551,7 +3596,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldUseO2FieldInOplog) {
});
BSONObj docKey = BSON("_id" << 1);
- auto resumeToken = makeResumeToken(ts, uuid, docKey);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey, DSChangeStream::kInsertOpType);
BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
BSONObj o2 = BSON("_id" << 2 << "shardKey" << 3);
@@ -3564,7 +3609,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldUseO2FieldInOplog) {
opTime); // opTime
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(ts, uuid, insertDoc, DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, ts},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3591,6 +3637,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
makeResumeToken(kDefaultTs,
testUuid(),
BSON("x" << 2 << "_id" << 1),
+ DSChangeStream::kInsertOpType,
ResumeTokenData::FromInvalidate::kFromInvalidate);
ASSERT_THROWS_CODE(
@@ -3611,15 +3658,19 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) {
});
// Create a resume token from only the timestamp, similar to a 'dropDatabase' entry.
- auto resumeToken = makeResumeToken(
- kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kNotFromInvalidate);
+ auto resumeToken = makeResumeToken(kDefaultTs,
+ Value(),
+ Value(),
+ DSChangeStream::kInsertOpType,
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate);
BSONObj insertDoc = BSON("_id" << 2);
auto insertEntry =
makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc, testUuid(), boost::none, insertDoc);
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, testUuid(), insertDoc, DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3641,14 +3692,16 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
});
// Create a resume token from only the timestamp, similar to a 'dropDatabase' entry.
- auto resumeToken = makeResumeToken(kDefaultTs);
+ auto resumeToken =
+ makeResumeToken(kDefaultTs, Value(), Value(), DSChangeStream::kDropDatabaseOpType);
BSONObj insertDoc = BSON("_id" << 2);
auto insertEntry =
makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc, uuid, boost::none, insertDoc);
Document expectedInsert{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)},
+ {DSChangeStream::kIdField,
+ makeResumeToken(kDefaultTs, uuid, insertDoc, DSChangeStream::kInsertOpType)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kWallTimeField, Date_t()},
@@ -3717,10 +3770,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleMatchAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
BSON("$match" << BSON("operationType"
<< "insert")),
BSON("$match" << BSON("operationType"
@@ -3796,10 +3850,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithMultipleProjectAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
BSON("$project" << BSON("operationType" << 1)),
BSON("$project" << BSON("fullDocument" << 1))};
@@ -3826,10 +3881,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithProjectMatchAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
BSON("$project" << BSON("operationType" << 1)),
BSON("$match" << BSON("operationType"
<< "insert"))};
@@ -3906,10 +3962,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithUnsetAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
BSON("$unset"
<< "operationType")};
@@ -3983,10 +4040,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithAddFieldsAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
BSON("$addFields" << BSON("stockPrice" << 100))};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -4058,10 +4116,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithSetAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
BSON("$set" << BSON("stockPrice" << 100))};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -4109,10 +4168,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceRootAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
BSON("$replaceRoot" << BSON("newRoot"
<< "$fullDocument"))};
@@ -4162,10 +4222,11 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithReplaceWithAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
const std::vector<BSONObj> rawPipeline = {
- BSON("$changeStream" << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
BSON("$replaceWith"
<< "$fullDocument")};
@@ -4278,23 +4339,23 @@ TEST_F(ChangeStreamStageTest, ChangeStreamWithShowExpandedEventsFalseAndUserProj
TEST_F(ChangeStreamStageTest, ChangeStreamWithAllStagesAndResumeToken) {
// We enable the 'showExpandedEvents' flag to avoid injecting an additional $match stage which
// filters out newly added events.
- const std::vector<BSONObj>
- rawPipeline = {BSON("$changeStream"
- << BSON("resumeAfter"
- << makeResumeToken(kDefaultTs, testUuid())
- << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName
- << true)),
- BSON("$project" << BSON("operationType" << 1)),
- BSON("$unset"
- << "_id"),
- BSON("$addFields" << BSON("stockPrice" << 100)),
- BSON("$set" << BSON("fullDocument.stockPrice" << 100)),
- BSON("$match" << BSON("operationType"
- << "insert")),
- BSON("$replaceRoot" << BSON("newRoot"
- << "$fullDocument")),
- BSON("$replaceWith"
- << "fullDocument.stockPrice")};
+ const std::vector<BSONObj> rawPipeline = {
+ BSON("$changeStream"
+ << BSON("resumeAfter"
+ << makeResumeToken(
+ kDefaultTs, testUuid(), Value(), DSChangeStream::kDropCollectionOpType)
+ << DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName << true)),
+ BSON("$project" << BSON("operationType" << 1)),
+ BSON("$unset"
+ << "_id"),
+ BSON("$addFields" << BSON("stockPrice" << 100)),
+ BSON("$set" << BSON("fullDocument.stockPrice" << 100)),
+ BSON("$match" << BSON("operationType"
+ << "insert")),
+ BSON("$replaceRoot" << BSON("newRoot"
+ << "$fullDocument")),
+ BSON("$replaceWith"
+ << "fullDocument.stockPrice")};
auto pipeline = buildTestPipeline(rawPipeline);
@@ -4379,14 +4440,16 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2Token) {
Value(Document{{"operationType", "update"_sd}, {"documentKey", higherDocumentKey}}));
// The next event has a clusterTime later than the resume point, and should therefore start
- // using the default token version.
+ // using the default token version, which is 2.
next = lastStage->getNext();
ASSERT(next.isAdvanced());
const auto afterResumeTsResumeToken =
ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData();
ASSERT_EQ(afterResumeTsResumeToken.clusterTime, afterResumeTs);
ASSERT_EQ(afterResumeTsResumeToken.version, ResumeTokenData::kDefaultTokenVersion);
- ASSERT_VALUE_EQ(afterResumeTsResumeToken.eventIdentifier, Value(midDocumentKey));
+ ASSERT_VALUE_EQ(
+ afterResumeTsResumeToken.eventIdentifier,
+ Value(Document{{"operationType", "update"_sd}, {"documentKey", midDocumentKey}}));
// Verify that no other events are returned.
next = lastStage->getNext();
@@ -4445,14 +4508,16 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV1Token) {
ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData();
ASSERT_EQ(afterResumeTsResumeToken.clusterTime, afterResumeTs);
ASSERT_EQ(afterResumeTsResumeToken.version, ResumeTokenData::kDefaultTokenVersion);
- ASSERT_VALUE_EQ(afterResumeTsResumeToken.eventIdentifier, Value(midDocumentKey));
+ ASSERT_VALUE_EQ(
+ afterResumeTsResumeToken.eventIdentifier,
+ Value(Document{{"operationType", "update"_sd}, {"documentKey", midDocumentKey}}));
// Verify that no other events are returned.
next = lastStage->getNext();
ASSERT_FALSE(next.isAdvanced());
}
-TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) {
+TEST_F(MultiTokenFormatVersionTest, CanResumeFromV1HighWaterMark) {
const auto beforeResumeTs = Timestamp(100, 1);
const auto resumeTs = Timestamp(100, 2);
const auto afterResumeTs = Timestamp(100, 3);
@@ -4467,7 +4532,7 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) {
// Create a v2 high water mark token which sorts immediately before 'firstOplogAtResumeTime'.
ResumeTokenData resumeToken = ResumeToken::makeHighWaterMarkToken(resumeTs, 2).getData();
- resumeToken.version = 2;
+ resumeToken.version = 1;
auto expCtx = getExpCtxRaw();
expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS(
TenantDatabaseName(boost::none, "unittests"));
@@ -4485,26 +4550,23 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) {
// The high water mark token should be order ahead of every other entry with the same
// clusterTime. So we should see both entries that match the resumeToken's clusterTime, and both
- // should have inherited the token version 2 from the high water mark.
+ // should have inherited the token version 1 from the high water mark.
auto lastStage = stages.back();
auto next = lastStage->getNext();
ASSERT(next.isAdvanced());
const auto sameTsResumeToken1 =
ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData();
ASSERT_EQ(sameTsResumeToken1.clusterTime, resumeTs);
- ASSERT_EQ(sameTsResumeToken1.version, 2);
- ASSERT_VALUE_EQ(sameTsResumeToken1.eventIdentifier,
- Value(Document{{"operationType", "update"_sd}, {"documentKey", documentKey}}));
+ ASSERT_EQ(sameTsResumeToken1.version, 1);
+ ASSERT_VALUE_EQ(sameTsResumeToken1.eventIdentifier, Value(documentKey));
next = lastStage->getNext();
ASSERT(next.isAdvanced());
const auto sameTsResumeToken2 =
ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData();
ASSERT_EQ(sameTsResumeToken2.clusterTime, resumeTs);
- ASSERT_EQ(sameTsResumeToken2.version, 2);
- ASSERT_VALUE_EQ(
- sameTsResumeToken2.eventIdentifier,
- Value(Document{{"operationType", "update"_sd}, {"documentKey", higherDocumentKey}}));
+ ASSERT_EQ(sameTsResumeToken2.version, 1);
+ ASSERT_VALUE_EQ(sameTsResumeToken2.eventIdentifier, Value(higherDocumentKey));
// The resumeToken after the current clusterTime should start using the default version, and
// corresponding 'eventIdentifier' format.
@@ -4514,7 +4576,8 @@ TEST_F(MultiTokenFormatVersionTest, CanResumeFromV2HighWaterMark) {
ResumeToken::parse(next.releaseDocument()["_id"].getDocument()).getData();
ASSERT_EQ(afterResumeTsResumeToken.clusterTime, afterResumeTs);
ASSERT_EQ(afterResumeTsResumeToken.version, ResumeTokenData::kDefaultTokenVersion);
- ASSERT_VALUE_EQ(afterResumeTsResumeToken.eventIdentifier, Value(documentKey));
+ ASSERT_VALUE_EQ(afterResumeTsResumeToken.eventIdentifier,
+ Value(Document{{"operationType", "update"_sd}, {"documentKey", documentKey}}));
// Verify that no other events are returned.
next = lastStage->getNext();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 28603f10a30..701c5b495a7 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -655,6 +655,7 @@ TEST_F(CheckResumeTokenTest, ShouldSwallowInvalidateFromEachShardForStartAfterIn
invalidateToken.clusterTime = resumeTimestamp;
invalidateToken.uuid = uuids[0];
invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
+ invalidateToken.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken);
// Add three documents which each have the invalidate resume token. We expect to see this in the
@@ -695,6 +696,7 @@ TEST_F(CheckResumeTokenTest, ShouldNotSwallowUnrelatedInvalidateForStartAfterInv
invalidateToken.clusterTime = resumeTimestamp;
invalidateToken.uuid = uuids[0];
invalidateToken.fromInvalidate = ResumeTokenData::kFromInvalidate;
+ invalidateToken.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
auto checkResumeToken = createDSEnsureResumeTokenPresent(invalidateToken);
// Create a second invalidate token with the same clusterTime but a different UUID.
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index a157c97fed9..589cf458ee7 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -44,7 +44,7 @@ struct ResumeTokenData {
/*
* The default or "preferred" token version generated by this version of the server.
*/
- static constexpr int kDefaultTokenVersion = 1;
+ static constexpr int kDefaultTokenVersion = 2;
/**
* Flag to indicate if the resume token is from an invalidate notification.
diff --git a/src/mongo/db/pipeline/resume_token_test.cpp b/src/mongo/db/pipeline/resume_token_test.cpp
index eed85344ddd..cbdd6b8b0eb 100644
--- a/src/mongo/db/pipeline/resume_token_test.cpp
+++ b/src/mongo/db/pipeline/resume_token_test.cpp
@@ -61,6 +61,7 @@ TEST(ResumeToken, EncodesTimestampOnlyTokenFromData) {
ResumeTokenData resumeTokenDataIn;
resumeTokenDataIn.clusterTime = ts;
+ resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
ResumeToken token(resumeTokenDataIn);
ResumeTokenData tokenData = token.getData();
@@ -91,6 +92,7 @@ TEST(ResumeToken, TimestampOnlyTokenShouldRoundTripThroughHexEncoding) {
ResumeTokenData resumeTokenDataIn;
resumeTokenDataIn.clusterTime = ts;
+ resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
// Test serialization/parsing through Document.
auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
@@ -243,6 +245,7 @@ TEST(ResumeToken, FailsToParseForInvalidTokenFormats) {
Timestamp ts(1010, 4);
ResumeTokenData tokenData;
tokenData.clusterTime = ts;
+ tokenData.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
auto goodTokenDocBinData = ResumeToken(tokenData).toDocument();
auto goodData = goodTokenDocBinData["_data"].getStringData();
ASSERT_THROWS(ResumeToken::parse(Document{{"_data"_sd, goodData}, {"_typeBits", "string"_sd}}),
@@ -259,6 +262,7 @@ TEST(ResumeToken, FailsToDecodeInvalidKeyString) {
ResumeTokenData tokenData;
tokenData.clusterTime = ts;
+ tokenData.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
auto goodTokenDocBinData = ResumeToken(tokenData).toDocument();
auto goodData = goodTokenDocBinData["_data"].getStringData();
@@ -382,6 +386,7 @@ TEST(ResumeToken, InvalidTxnOpIndex) {
ResumeTokenData resumeTokenDataIn;
resumeTokenDataIn.clusterTime = ts;
resumeTokenDataIn.txnOpIndex = 1234;
+ resumeTokenDataIn.eventIdentifier = Value(Document{{"operationType", "drop"_sd}});
// Should round trip with a non-negative txnOpIndex.
auto rtToken = ResumeToken::parse(ResumeToken(resumeTokenDataIn).toDocument().toBson());
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index f63f6c61bc6..b49422c9959 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -114,9 +114,11 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi
aggReq.setFromMongos(true);
aggReq.setNeedsMerge(true);
- // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed.
- if (expCtx->inMongos) {
- aggReq.setGenerateV2ResumeTokens(expCtx->changeStreamTokenVersion == 2);
+ // TODO SERVER-65369: This code block can be removed after 7.0.
+ if (expCtx->inMongos && expCtx->changeStreamTokenVersion == 1) {
+ // A request for v1 resume tokens on mongos should only be allowed in test mode.
+ tassert(6497000, "Invalid request for v1 resume tokens", getTestCommandsEnabled());
+ aggReq.setGenerateV2ResumeTokens(false);
}
SimpleCursorOptions cursor;
@@ -151,17 +153,6 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
cmdForShards[AggregateCommandRequest::kCollationFieldName] = Value(collationObj);
}
- // We explicitly set $_generateV2ResumeTokens to false, if not already set, to indicate that the
- // shards should NOT produce v2 resume tokens for change streams; instead, they should continue
- // generating v1 tokens. This facilitates upgrade between 6.0 (which produces v1 by default)
- // and 7.0 (which will produce v2 by default, but will be capable of generating v1) by ensuring
- // that a 6.0 mongoS on a mixed 6.0/7.0 cluster will see only v1 tokens in the stream.
- // TODO SERVER-65370: from 6.1 onwards, we will default to v2 and this block should be removed.
- const auto& v2FieldName = AggregateCommandRequest::kGenerateV2ResumeTokensFieldName;
- if (auto cmdObj = cmdForShards.peek(); expCtx->inMongos && cmdObj[v2FieldName].missing()) {
- cmdForShards[v2FieldName] = Value(false);
- }
-
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (explainVerbosity) {
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index b3df71582da..72f57512200 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -133,12 +133,14 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
mergeCtx->inMongos = true;
- // If the request specified v2 resume tokens for change streams, set this on the expCtx. On 6.0
- // we only expect this to occur during testing.
- // TODO SERVER-65370: after 6.0, assume true unless present and explicitly false.
- if (request.getGenerateV2ResumeTokens()) {
- uassert(6528201, "Invalid request for v2 resume tokens", getTestCommandsEnabled());
- mergeCtx->changeStreamTokenVersion = 2;
+ // If the request explicity specified NOT to use v2 resume tokens for change streams, set this
+ // on the expCtx. We only ever expect to see an explicit value during testing.
+ if (request.getGenerateV2ResumeTokens().has_value()) {
+ // If $_generateV2ResumeTokens was specified, we must be testing and it must be false.
+ uassert(6528201,
+ "Invalid request for v2 resume tokens",
+ getTestCommandsEnabled() && !request.getGenerateV2ResumeTokens());
+ mergeCtx->changeStreamTokenVersion = 1;
}
// Serialize the 'AggregateCommandRequest' and save it so that the original command can be