summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2018-04-18 12:37:03 +0100
committerBernard Gorman <bernard.gorman@gmail.com>2018-04-23 23:22:25 +0100
commit0d104ecab0eba60fb780d2749e39896a69382e79 (patch)
tree2e2709a614d7940363635b47635fe4557e3169e4
parentbb5ec65bcca169956c20373b65d188e94e3ba10a (diff)
downloadmongo-0d104ecab0eba60fb780d2749e39896a69382e79.tar.gz
SERVER-34499 Add passthrough suite(s) replacing single-collection and single-db $changeStream with whole-cluster streams filtered by namespace
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml74
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml55
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml90
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml65
-rw-r--r--etc/evergreen.yml76
-rw-r--r--jstests/change_streams/change_stream_apply_ops.js295
-rw-r--r--jstests/change_streams/change_stream_apply_ops_resumability.js352
-rw-r--r--jstests/change_streams/change_stream_shell_helper.js202
-rw-r--r--jstests/change_streams/change_stream_whole_db_invalidations.js6
-rw-r--r--jstests/change_streams/include_cluster_time.js64
-rw-r--r--jstests/change_streams/lookup_post_image.js429
-rw-r--r--jstests/libs/override_methods/implicit_whole_cluster_changestreams.js54
-rw-r--r--jstests/libs/override_methods/implicit_whole_db_changestreams.js150
13 files changed, 1127 insertions, 785 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
new file mode 100644
index 00000000000..a34bf9bda63
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
@@ -0,0 +1,74 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # This test exercises an internal detail of mongos<->mongod communication and is not expected
+ # to work against a mongos.
+ - jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data.
+ - jstests/change_streams/change_stream_shell_helper.js
+ - jstests/change_streams/lookup_post_image.js
+ - jstests/change_streams/include_cluster_time.js
+ exclude_with_any_tags:
+ ##
+ # The next three tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override read concern of command: ..."
+ - assumes_read_concern_unchanged
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # "Cowardly refusing to run test with overridden write concern when it uses a command that can
+ # only perform w=1 writes: ..."
+ - requires_eval_command
+ # Transactions not supported on sharded clusters.
+ - uses_transactions
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: majority
+ enableMajorityReadConcern: ''
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/implicit_whole_cluster_changestreams.js');
+ readMode: commands
+ hooks:
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ # Use two shards to make sure we will only talk to the primary shard for the database and will
+ # not delay changes to wait for notifications or a clock advancement from other shards.
+ num_shards: 2
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ nopreallocj: ''
+ set_parameters:
+ enableTestCommands: 1
+ numInitialSyncAttempts: 1
+ periodicNoopIntervalSecs: 1
+ writePeriodicNoops: true
+ num_rs_nodes_per_shard: 1
+ # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding
+ # will prevent read commands against non-existent databases from unconditionally returning a
+ # CursorId of 0.
+ enable_sharding:
+ - test
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml
new file mode 100644
index 00000000000..8d63f933521
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml
@@ -0,0 +1,55 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_with_any_tags:
+ ##
+ # The next three tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override read concern of command: ..."
+ - assumes_read_concern_unchanged
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # "Cowardly refusing to run test with overridden write concern when it uses a command that can
+ # only perform w=1 writes: ..."
+ - requires_eval_command
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - CheckReplOplogs
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: majority
+ enableMajorityReadConcern: ''
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/implicit_whole_cluster_changestreams.js');
+ readMode: commands
+ hooks:
+ # The CheckReplDBHash hook waits until all operations have replicated to and have been applied
+ # on the secondaries, so we run the ValidateCollections hook after it to ensure we're
+ # validating the entire contents of the collection.
+ - class: CheckPrimary
+ - class: CheckReplOplogs
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ReplicaSetFixture
+ mongod_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ numInitialSyncAttempts: 1
+ num_nodes: 1
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
new file mode 100644
index 00000000000..eff8a859473
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
@@ -0,0 +1,90 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # This test starts a parallel shell which we want to read from the same node as the change stream,
+ # but the parallel shell performs an insert which will not work against the secondary.
+ - jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+ # This test is not expected to work when run against a mongos.
+ - jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ exclude_with_any_tags:
+ ##
+ # The next three tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override read concern of command: ..."
+ - assumes_read_concern_unchanged
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # "Cowardly refusing to run test with overridden write concern when it uses a command that can
+ # only perform w=1 writes: ..."
+ - requires_eval_command
+ ##
+ # The next tag corresponds to the special error thrown by the set_read_preference_secondary.js
+ # override when it refuses to replace the readPreference of a particular command. Above each tag
+ # are the message(s) that cause the tag to be warranted.
+ ##
+ # "Cowardly refusing to override read preference of command: ..."
+ # "Cowardly refusing to run test with overridden read preference when it reads from a
+ # non-replicated collection: ..."
+ - assumes_read_preference_unchanged
+ # Transactions not supported on sharded cluster.
+ - uses_transactions
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: majority
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/set_read_preference_secondary.js');
+ load('jstests/libs/override_methods/implicit_whole_cluster_changestreams.js');
+ hooks:
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ mongos_options:
+ set_parameters:
+ enableTestCommands: 1
+ logComponentVerbosity:
+ verbosity: 0
+ command: 1
+ network:
+ verbosity: 1
+ asio: 2
+ tracking: 0
+ mongod_options:
+ nopreallocj: ''
+ set_parameters:
+ enableTestCommands: 1
+ logComponentVerbosity:
+ verbosity: 0
+ command: 1
+ query: 1
+ replication: 3
+ numInitialSyncAttempts: 1
+ # This suite requires w:"majority" writes to be applied on all shard secondaries.
+ # By setting shards to two node replsets and having secondaries vote, the majority
+ # is all voting nodes. TODO: use causal consistency instead (SERVER-32791).
+ num_rs_nodes_per_shard: 2
+ shard_options:
+ voting_secondaries: true
+ # TODO SERVER-34138: This suite doesn't actually shard any collections, but enabling sharding
+ # will prevent read commands against non-existent databases from unconditionally returning a
+ # CursorId of 0.
+ enable_sharding:
+ - test
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
new file mode 100644
index 00000000000..6f659d668c9
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
@@ -0,0 +1,65 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos.
+ - jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ exclude_with_any_tags:
+ ##
+ # The next three tags correspond to the special errors thrown by the
+ # set_read_and_write_concerns.js override when it refuses to replace the readConcern or
+ # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be
+ # warranted.
+ ##
+ # "Cowardly refusing to override read concern of command: ..."
+ - assumes_read_concern_unchanged
+ # "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # "Cowardly refusing to run test with overridden write concern when it uses a command that can
+ # only perform w=1 writes: ..."
+ - requires_eval_command
+ # Transactions not supported on sharded clusters.
+ - uses_transactions
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ defaultReadConcernLevel: majority
+ enableMajorityReadConcern: ''
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/set_read_and_write_concerns.js');
+ load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js');
+ load('jstests/libs/override_methods/implicit_whole_cluster_changestreams.js');
+ readMode: commands
+ hooks:
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ nopreallocj: ''
+ set_parameters:
+ enableTestCommands: 1
+ writePeriodicNoops: 1
+ periodicNoopIntervalSecs: 1
+ numInitialSyncAttempts: 1
+ num_rs_nodes_per_shard: 1
+ num_shards: 2
+ enable_sharding:
+ - test
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index bec044ea444..3eb0c9b77ad 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -3609,6 +3609,50 @@ tasks:
run_multiple_jobs: true
- <<: *task_template
+ name: change_streams_whole_cluster_passthrough
+ depends_on:
+ - name: change_streams
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_whole_cluster_passthrough --storageEngine=wiredTiger
+ run_multiple_jobs: true
+
+- <<: *task_template
+ name: change_streams_whole_cluster_mongos_passthrough
+ depends_on:
+ - name: change_streams_mongos_passthrough
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_whole_cluster_mongos_passthrough --storageEngine=wiredTiger
+ run_multiple_jobs: true
+
+- <<: *task_template
+ name: change_streams_whole_cluster_secondary_reads_passthrough
+ depends_on:
+ - name: change_streams_secondary_reads
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_whole_cluster_secondary_reads_passthrough --storageEngine=wiredTiger
+ run_multiple_jobs: true
+
+- <<: *task_template
+ name: change_streams_whole_cluster_sharded_collections_passthrough
+ depends_on:
+ - name: change_streams_sharded_collections_passthrough
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_whole_cluster_sharded_collections_passthrough --storageEngine=wiredTiger
+ run_multiple_jobs: true
+
+- <<: *task_template
name: dbtest
commands:
- func: "do setup"
@@ -6367,6 +6411,10 @@ buildvariants:
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_secondary_reads_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_secondary_reads_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: dbtest
- name: disk_wiredtiger
- name: failpoints
@@ -6550,6 +6598,9 @@ buildvariants:
- name: change_streams_whole_db_passthrough
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: concurrency
- name: concurrency_replication
- name: concurrency_sharded
@@ -7626,6 +7677,9 @@ buildvariants:
- name: change_streams_whole_db_passthrough
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: dbtest
- name: disk_wiredtiger
- name: failpoints
@@ -8424,6 +8478,9 @@ buildvariants:
- name: change_streams_whole_db_passthrough
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: concurrency
- name: concurrency_replication
- name: concurrency_sharded
@@ -9036,6 +9093,10 @@ buildvariants:
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_secondary_reads_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_secondary_reads_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: concurrency
- name: concurrency_replication
- name: concurrency_sharded
@@ -9563,6 +9624,10 @@ buildvariants:
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_secondary_reads_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_secondary_reads_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: concurrency
- name: concurrency_replication
- name: concurrency_sharded
@@ -11107,6 +11172,9 @@ buildvariants:
- name: change_streams_whole_db_passthrough
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: concurrency
distros:
- rhel62-large # Some workloads require a lot of memory, use a bigger machine for this suite.
@@ -11730,6 +11798,10 @@ buildvariants:
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_secondary_reads_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_secondary_reads_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: concurrency
- name: concurrency_replication
- name: concurrency_sharded
@@ -11893,6 +11965,10 @@ buildvariants:
- name: change_streams_whole_db_mongos_passthrough
- name: change_streams_whole_db_secondary_reads_passthrough
- name: change_streams_whole_db_sharded_collections_passthrough
+ - name: change_streams_whole_cluster_passthrough
+ - name: change_streams_whole_cluster_mongos_passthrough
+ - name: change_streams_whole_cluster_secondary_reads_passthrough
+ - name: change_streams_whole_cluster_sharded_collections_passthrough
- name: concurrency
- name: concurrency_replication
- name: concurrency_sharded
diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/change_stream_apply_ops.js
index 45f4e78fafa..28948ed84b5 100644
--- a/jstests/change_streams/change_stream_apply_ops.js
+++ b/jstests/change_streams/change_stream_apply_ops.js
@@ -4,160 +4,147 @@
(function() {
"use strict";
- load("jstests/libs/change_stream_util.js");
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- var WatchMode = {
- kCollection: 1,
- kDb: 2,
- kCluster: 3,
- };
-
- function testChangeStreamsWithTransactions(watchMode) {
- let dbToStartTestOn = db;
- if (watchMode == WatchMode.kCluster) {
- dbToStartTestOn = db.getSiblingDB("admin");
- }
-
- const otherCollName = "change_stream_apply_ops_2";
- const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
- assertDropAndRecreateCollection(db, otherCollName);
-
- const otherDbName = "change_stream_apply_ops_db";
- const otherDbCollName = "someColl";
- assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
-
- // Insert a document that gets deleted as part of the transaction.
- const kDeletedDocumentId = 0;
- coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"});
-
- let cst = new ChangeStreamTest(dbToStartTestOn);
-
- let changeStream = null;
- if (watchMode == WatchMode.kCluster) {
- changeStream = cst.startWatchingAllChangesForCluster();
- } else {
- const collArg = (watchMode == WatchMode.kCollection ? coll : 1);
- changeStream =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collArg});
- }
-
- const sessionOptions = {causalConsistency: false};
- const session = db.getMongo().startSession(sessionOptions);
- const sessionDb = session.getDatabase(db.getName());
- const sessionColl = sessionDb[coll.getName()];
-
- session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}});
- assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
- assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
-
- // One insert on a collection that we're not watching. This should be skipped by the
- // single-collection changestream.
- assert.commandWorked(
- sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"}));
-
- // This should be skipped by the single-collection and single-db changestreams.
- assert.commandWorked(session.getDatabase(otherDbName)[otherDbCollName].insert(
- {_id: 222, a: "Doc on other DB"}));
-
- assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
-
- assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId}));
-
- session.commitTransaction();
-
- // Do applyOps on the collection that we care about. This is an "external" applyOps, though
- // (not run as part of a transaction) so its entries should be skipped in the change
- // stream. This checks that applyOps that don't have an 'lsid' and 'txnNumber' field do not
- // get unwound.
- assert.commandWorked(db.runCommand({
- applyOps: [
- {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}},
- ]
- }));
-
- // Check for the first insert.
- let change = cst.getOneChange(changeStream);
- assert.eq(change.fullDocument._id, 1);
- assert.eq(change.operationType, "insert", tojson(change));
- const firstChangeClusterTime = change.clusterTime;
- assert(firstChangeClusterTime instanceof Timestamp, tojson(change));
- const firstChangeTxnNumber = change.txnNumber;
- const firstChangeLsid = change.lsid;
- assert.eq(typeof firstChangeLsid, "object");
- assert.eq(change.ns.coll, coll.getName());
- assert.eq(change.ns.db, db.getName());
-
- // Check for the second insert.
- change = cst.getOneChange(changeStream);
- assert.eq(change.fullDocument._id, 2);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(firstChangeClusterTime, change.clusterTime);
- assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
- assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
- assert.eq(change.ns.coll, coll.getName());
- assert.eq(change.ns.db, db.getName());
-
- if (watchMode >= WatchMode.kDb) {
- // We should see the insert on the other collection.
- change = cst.getOneChange(changeStream);
- assert.eq(change.fullDocument._id, 111);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(firstChangeClusterTime, change.clusterTime);
- assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
- assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
- assert.eq(change.ns.coll, otherCollName);
- assert.eq(change.ns.db, db.getName());
- }
-
- if (watchMode >= WatchMode.kCluster) {
- // We should see the insert on the other db.
- change = cst.getOneChange(changeStream);
- assert.eq(change.fullDocument._id, 222);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(firstChangeClusterTime, change.clusterTime);
- assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
- assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
- assert.eq(change.ns.coll, otherDbCollName);
- assert.eq(change.ns.db, otherDbName);
- }
-
- // Check for the update.
- change = cst.getOneChange(changeStream);
- assert.eq(change.operationType, "update", tojson(change));
- assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1}));
- assert.eq(firstChangeClusterTime, change.clusterTime);
- assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
- assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
- assert.eq(change.ns.coll, coll.getName());
- assert.eq(change.ns.db, db.getName());
-
- // Check for the delete.
- change = cst.getOneChange(changeStream);
- assert.eq(change.documentKey._id, kDeletedDocumentId);
- assert.eq(change.operationType, "delete", tojson(change));
- assert.eq(firstChangeClusterTime, change.clusterTime);
- assert.eq(firstChangeTxnNumber.valueOf(), change.txnNumber);
- assert.eq(0, bsonWoCompare(firstChangeLsid, change.lsid));
- assert.eq(change.ns.coll, coll.getName());
- assert.eq(change.ns.db, db.getName());
-
- // Drop the collection. This will trigger an "invalidate" event.
- assert.commandWorked(db.runCommand({drop: coll.getName()}));
-
- // The drop should have invalidated the change stream.
- cst.assertNextChangesEqual({
- cursor: changeStream,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- cst.cleanUp();
- }
-
- // TODO: SERVER-34302 should allow us to simplify this test, so we're not required to
- // explicitly run both against a single collection and against the whole DB.
- testChangeStreamsWithTransactions(WatchMode.kCollection);
- testChangeStreamsWithTransactions(WatchMode.kDb);
- testChangeStreamsWithTransactions(WatchMode.kCluster);
+ const otherCollName = "change_stream_apply_ops_2";
+ const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
+ assertDropAndRecreateCollection(db, otherCollName);
+
+ const otherDbName = "change_stream_apply_ops_db";
+ const otherDbCollName = "someColl";
+ assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
+
+ // Insert a document that gets deleted as part of the transaction.
+ const kDeletedDocumentId = 0;
+ coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"});
+
+ let cst = new ChangeStreamTest(db);
+ let changeStream = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], collection: coll});
+
+ const sessionOptions = {causalConsistency: false};
+ const session = db.getMongo().startSession(sessionOptions);
+ const sessionDb = session.getDatabase(db.getName());
+ const sessionColl = sessionDb[coll.getName()];
+
+ session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}});
+ assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
+ assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
+
+ // One insert on a collection that we're not watching. This should be skipped by the
+ // single-collection changestream.
+ assert.commandWorked(sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"}));
+
+ // One insert on a collection in a different database. This should be skipped by the single
+ // collection and single-db changestreams.
+ assert.commandWorked(
+ session.getDatabase(otherDbName)[otherDbCollName].insert({_id: 222, a: "Doc on other DB"}));
+
+ assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
+
+ assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId}));
+
+ session.commitTransaction();
+
+ // Do applyOps on the collection that we care about. This is an "external" applyOps, though
+ // (not run as part of a transaction) so its entries should be skipped in the change
+ // stream. This checks that applyOps that don't have an 'lsid' and 'txnNumber' field do not
+ // get unwound.
+ assert.commandWorked(db.runCommand({
+ applyOps: [
+ {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}},
+ ]
+ }));
+
+ // Drop the collection. This will trigger an "invalidate" event at the end of the stream.
+ assert.commandWorked(db.runCommand({drop: coll.getName()}));
+
+ // Define the set of changes expected for the single-collection case per the operations above.
+ const expectedChanges = [
+ {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1, a: 0},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ },
+ {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, a: 0},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ },
+ {
+ documentKey: {_id: 1},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "update",
+ updateDescription: {removedFields: [], updatedFields: {a: 1}},
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ },
+ {
+ documentKey: {_id: kDeletedDocumentId},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "delete",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ },
+ {operationType: "invalidate"},
+ ];
+
+ // Verify that the stream returns the expected sequence of changes.
+ const changes = cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true});
+
+ // Obtain the clusterTime from the first change.
+ const startTime = changes[0].clusterTime;
+
+ // Add an entry for the insert on db.otherColl into expectedChanges.
+ expectedChanges.splice(2, 0, {
+ documentKey: {_id: 111},
+ fullDocument: {_id: 111, a: "Doc on other collection"},
+ ns: {db: db.getName(), coll: otherCollName},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ });
+
+ // Verify that a whole-db stream returns the expected sequence of changes, including the insert
+ // on the other collection but NOT the changes on the other DB or the manual applyOps.
+ changeStream = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {startAtClusterTime: {ts: startTime}}}, {$project: {"lsid.uid": 0}}],
+ collection: 1
+ });
+ cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true});
+
+ // Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
+ expectedChanges.splice(3, 0, {
+ documentKey: {_id: 222},
+ fullDocument: {_id: 222, a: "Doc on other DB"},
+ ns: {db: otherDbName, coll: otherDbCollName},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ });
+
+ // Verify that a whole-cluster stream returns the expected sequence of changes, including the
+ // inserts on the other collection and the other database, but NOT the manual applyOps.
+ cst = new ChangeStreamTest(db.getSiblingDB("admin"));
+ changeStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {startAtClusterTime: {ts: startTime}, allChangesForCluster: true}},
+ {$project: {"lsid.uid": 0}}
+ ],
+ collection: 1
+ });
+ cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true});
+
+ cst.cleanUp();
}());
diff --git a/jstests/change_streams/change_stream_apply_ops_resumability.js b/jstests/change_streams/change_stream_apply_ops_resumability.js
index 8a9c8d55a62..8d7ee90b8e6 100644
--- a/jstests/change_streams/change_stream_apply_ops_resumability.js
+++ b/jstests/change_streams/change_stream_apply_ops_resumability.js
@@ -4,195 +4,169 @@
(function() {
"use strict";
- load("jstests/libs/change_stream_util.js");
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- var WatchMode = {
- kCollection: 1,
- kDb: 2,
- kCluster: 3,
- };
-
- function getChangeStream({cst, watchMode, coll, resumeToken}) {
- const changeStreamDoc = {};
- if (resumeToken) {
- changeStreamDoc.resumeAfter = resumeToken;
- }
-
- if (watchMode == WatchMode.kCluster) {
- changeStreamDoc.allChangesForCluster = true;
- }
- const collArg = (watchMode == WatchMode.kCollection ? coll : 1);
-
- return cst.startWatchingChanges({
- pipeline: [{$changeStream: changeStreamDoc}],
- collection: collArg,
- // Use a batch size of 0 to prevent any notifications from being returned in the first
- // batch. These would be ignored by ChangeStreamTest.getOneChange().
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- }
-
- function testChangeStreamsWithTransactions(watchMode) {
- let dbToStartTestOn = db;
- if (watchMode == WatchMode.kCluster) {
- dbToStartTestOn = db.getSiblingDB("admin");
- }
-
- const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
- const otherCollName = "change_stream_apply_ops_2";
- assertDropAndRecreateCollection(db, otherCollName);
-
- const otherDbName = "change_stream_apply_ops_db";
- const otherDbCollName = "someColl";
- assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
-
- const cst = new ChangeStreamTest(dbToStartTestOn);
-
- let changeStream = getChangeStream({cst: cst, watchMode: watchMode, coll: coll});
-
- // Do an insert outside of a transaction.
- assert.commandWorked(coll.insert({_id: 0, a: 123}));
- const nonTxnChange = cst.getOneChange(changeStream);
- assert.eq(nonTxnChange.operationType, "insert");
- assert.eq(nonTxnChange.documentKey, {_id: 0});
-
- const sessionOptions = {causalConsistency: false};
- const session = db.getMongo().startSession(sessionOptions);
- const sessionDb = session.getDatabase(db.getName());
- const sessionColl = sessionDb[coll.getName()];
-
- session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}});
- assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
- assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
-
- // One insert on a collection that we're not watching. This should be skipped by the
- // single-collection change stream.
- assert.commandWorked(
- sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"}));
-
- // This should be skipped by the single-collection and single-db changestreams.
- assert.commandWorked(session.getDatabase(otherDbName)[otherDbCollName].insert(
- {_id: 222, a: "Doc on other DB"}));
-
- assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
-
- session.commitTransaction();
-
- // Now insert another document, not part of a transaction.
- assert.commandWorked(coll.insert({_id: 3, a: 123}));
-
- // Check for the first insert.
- const firstTxnChange = cst.getOneChange(changeStream);
- assert.eq(firstTxnChange.fullDocument._id, 1);
- assert.eq(firstTxnChange.operationType, "insert", tojson(firstTxnChange));
-
- // Check for the second insert.
- const secondTxnChange = cst.getOneChange(changeStream);
- assert.eq(secondTxnChange.fullDocument._id, 2);
- assert.eq(secondTxnChange.operationType, "insert", tojson(secondTxnChange));
-
- // Resume after the first non-transaction change. Be sure we see the documents from the
- // transaction again.
- changeStream = getChangeStream(
- {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id});
- assert.docEq(cst.getOneChange(changeStream), firstTxnChange);
- assert.docEq(cst.getOneChange(changeStream), secondTxnChange);
-
- // Resume after the first transaction change. Be sure we see the second change again.
- changeStream = getChangeStream(
- {cst: cst, watchMode: watchMode, coll: coll, resumeToken: firstTxnChange._id});
- assert.docEq(cst.getOneChange(changeStream), secondTxnChange);
-
- let change = secondTxnChange;
- if (watchMode >= WatchMode.kDb) {
- // We should see the insert on the other collection.
- change = cst.getOneChange(changeStream);
- assert.eq(change.fullDocument._id, 111);
- assert.eq(change.operationType, "insert", tojson(change));
-
- // Resume from the beginning again, be sure we see everything up until now.
- changeStream = getChangeStream(
- {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id});
- assert.docEq(cst.getOneChange(changeStream), firstTxnChange);
- assert.docEq(cst.getOneChange(changeStream), secondTxnChange);
- assert.docEq(cst.getOneChange(changeStream), change);
- }
-
- if (watchMode >= WatchMode.kCluster) {
- // We should see the insert on the other db.
- change = cst.getOneChange(changeStream);
- assert.eq(change.fullDocument._id, 222);
- assert.eq(change.operationType, "insert", tojson(change));
-
- // Resume from the beginning again, be sure we see everything up until now.
- changeStream = getChangeStream(
- {cst: cst, watchMode: watchMode, coll: coll, resumeToken: nonTxnChange._id});
- assert.docEq(cst.getOneChange(changeStream), firstTxnChange);
- assert.docEq(cst.getOneChange(changeStream), secondTxnChange);
- // We should see the document which was inserted on the other _collection_.
- const changeFromOtherCollection = cst.getOneChange(changeStream);
- assert.eq(changeFromOtherCollection.fullDocument._id, 111);
-
- // Resume from the document in the other collection.
- changeStream = getChangeStream({
- cst: cst,
- watchMode: watchMode,
- coll: coll,
- resumeToken: changeFromOtherCollection._id
- });
-
- // We should again see the most recent document.
- assert.docEq(cst.getOneChange(changeStream), change);
- }
-
- // Try starting another change stream from the latest change, the _last_ change caused by
- // the transaction.
- let otherCursor =
- getChangeStream({cst: cst, watchMode: watchMode, coll: coll, resumeToken: change._id});
-
- // Check for the update.
- change = cst.getOneChange(changeStream);
- assert.eq(change.operationType, "update", tojson(change));
- assert.eq(tojson(change.updateDescription.updatedFields), tojson({"a": 1}));
-
- // Check for the update on the other stream.
- assert.docEq(change, cst.getOneChange(otherCursor));
-
- // Now test that we can resume from the _last_ change caused by a transaction. We will
- // check that both the initial change stream and the new one find the document that's
- // inserted outside of the transaction.
- otherCursor =
- getChangeStream({cst: cst, watchMode: watchMode, coll: coll, resumeToken: change._id});
-
- // Now check that the document inserted after the transaction is found.
- change = cst.getOneChange(changeStream);
- assert.eq(change.fullDocument._id, 3);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.docEq(change, cst.getOneChange(otherCursor));
-
- // Drop the collection. This will trigger an "invalidate" event.
- assert.commandWorked(db.runCommand({drop: coll.getName()}));
-
- // The drop should have invalidated the change stream.
- cst.assertNextChangesEqual({
- cursor: changeStream,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- cst.assertNextChangesEqual({
- cursor: otherCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- cst.cleanUp();
- }
-
- // TODO: SERVER-34302 should allow us to simplify this test, so we're not required to
- // explicitly run both against a single collection and against the whole DB.
- testChangeStreamsWithTransactions(WatchMode.kCollection);
- testChangeStreamsWithTransactions(WatchMode.kDb);
- testChangeStreamsWithTransactions(WatchMode.kCluster);
+ const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
+ const otherCollName = "change_stream_apply_ops_2";
+ assertDropAndRecreateCollection(db, otherCollName);
+
+ const otherDbName = "change_stream_apply_ops_db";
+ const otherDbCollName = "someColl";
+ assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
+
+ let cst = new ChangeStreamTest(db);
+ let changeStream = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], collection: coll});
+
+ // Do an insert outside of a transaction.
+ assert.commandWorked(coll.insert({_id: 0, a: 123}));
+
+ // Open a session, and perform two writes within a transaction.
+ const sessionOptions = {causalConsistency: false};
+ const session = db.getMongo().startSession(sessionOptions);
+ const sessionDb = session.getDatabase(db.getName());
+ const sessionColl = sessionDb[coll.getName()];
+
+ session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}});
+ assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
+ assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
+
+ // One insert on a collection that we're not watching. This should be skipped by the
+ // single-collection change stream.
+ assert.commandWorked(sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"}));
+
+ // One insert on a collection in a different database. This should be skipped by the single
+ // collection and single-db changestreams.
+ assert.commandWorked(
+ session.getDatabase(otherDbName)[otherDbCollName].insert({_id: 222, a: "Doc on other DB"}));
+
+ assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
+
+ session.commitTransaction();
+
+ // Now insert another document, not part of a transaction.
+ assert.commandWorked(coll.insert({_id: 3, a: 123}));
+
+ // Define the set of changes expected for the single-collection case per the operations above.
+ const expectedChanges = [
+ {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, a: 123},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1, a: 0},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ },
+ {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, a: 0},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ },
+ {
+ documentKey: {_id: 1},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "update",
+ updateDescription: {removedFields: [], updatedFields: {a: 1}},
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ },
+ {
+ documentKey: {_id: 3},
+ fullDocument: {_id: 3, a: 123},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "insert",
+ },
+ ];
+
+ // Verify that the stream returns the expected sequence of changes.
+ const changes =
+ cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+
+ // Record the first (non-transaction) change and the first in-transaction change.
+ const nonTxnChange = changes[0], firstTxnChange = changes[1], secondTxnChange = changes[2];
+
+ // Resume after the first non-transaction change. Be sure we see the documents from the
+ // transaction again.
+ changeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: nonTxnChange._id}}, {$project: {"lsid.uid": 0}}],
+ collection: coll
+ });
+ cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(1)});
+
+ // Resume after the first transaction change. Be sure we see the second change again.
+ changeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: firstTxnChange._id}}, {$project: {"lsid.uid": 0}}],
+ collection: coll
+ });
+ cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(2)});
+
+ // Try starting another change stream from the _last_ change caused by the transaction. Verify
+ // that we can see the insert performed after the transaction was committed.
+ let otherCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}],
+ collection: coll
+ });
+ cst.assertNextChangesEqual({cursor: otherCursor, expectedChanges: expectedChanges.slice(3)});
+
+ // Drop the collection. This will trigger an "invalidate" event at the end of the stream.
+ assert.commandWorked(db.runCommand({drop: coll.getName()}));
+ expectedChanges.push({operationType: "invalidate"});
+
+ // Add an entry for the insert on db.otherColl into expectedChanges.
+ expectedChanges.splice(3, 0, {
+ documentKey: {_id: 111},
+ fullDocument: {_id: 111, a: "Doc on other collection"},
+ ns: {db: db.getName(), coll: otherCollName},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ });
+
+ // Verify that a whole-db stream can be resumed from the middle of the transaction, and that it
+ // will see all subsequent changes including the insert on the other collection but NOT the
+ // changes on the other DB.
+ changeStream = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}],
+ collection: 1
+ });
+ cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedChanges.slice(3), expectInvalidate: true});
+
+ // Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
+ expectedChanges.splice(4, 0, {
+ documentKey: {_id: 222},
+ fullDocument: {_id: 222, a: "Doc on other DB"},
+ ns: {db: otherDbName, coll: otherDbCollName},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session._txnNumber),
+ });
+
+ // Verify that a whole-db stream can be resumed from the middle of the transaction, and that it
+ // will see all subsequent changes including the insert on the other collection and the changes
+ // on the other DB.
+ cst = new ChangeStreamTest(db.getSiblingDB("admin"));
+ changeStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {resumeAfter: secondTxnChange._id, allChangesForCluster: true}},
+ {$project: {"lsid.uid": 0}}
+ ],
+ collection: 1
+ });
+ cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedChanges.slice(3), expectInvalidate: true});
+
+ cst.cleanUp();
}());
diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js
index f8ddcce5545..0150e1ba39d 100644
--- a/jstests/change_streams/change_stream_shell_helper.js
+++ b/jstests/change_streams/change_stream_shell_helper.js
@@ -1,8 +1,11 @@
-// Test change streams related shell helpers and options passed to them.
+// Test change streams related shell helpers and options passed to them. Note that, while we only
+// call the DBCollection.watch helper in this file, it will be redirected to the DB.watch or
+// Mongo.watch equivalents in the whole_db and whole_cluster passthroughs.
(function() {
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper");
@@ -40,96 +43,69 @@
}
jsTestLog("Testing watch() without options");
- let singleCollCursor = coll.watch();
- let wholeDbCursor = db.watch();
- let wholeClusterCursor = db.getMongo().watch();
+ let changeStreamCursor = coll.watch();
- [singleCollCursor, wholeDbCursor, wholeClusterCursor].forEach((cursor) =>
- assert(!cursor.hasNext()));
+ assert(!changeStreamCursor.hasNext());
// Write the first document into the collection. We will save the resume token from this change.
assert.writeOK(coll.insert({_id: 0, x: 1}));
let resumeToken;
// Test that each of the change stream cursors picks up the change.
- for (let cursor of[singleCollCursor, wholeDbCursor, wholeClusterCursor]) {
- print(`Running test on namespace '${cursor._ns}'`);
- assert.soon(() => cursor.hasNext());
- let change = cursor.next();
- assert(!cursor.hasNext());
- let expected = {
- documentKey: {_id: 0},
- fullDocument: {_id: 0, x: 1},
- ns: {db: "test", coll: coll.getName()},
- operationType: "insert",
- };
- assert("_id" in change, "Got unexpected change: " + tojson(change));
- // Remember the _id of the first op to resume the stream.
- resumeToken = change._id;
- // Remove the fields we cannot predict, then test that the change is as expected.
- delete change._id;
- delete change.clusterTime;
- assert.docEq(change, expected);
- }
+ assert.soon(() => changeStreamCursor.hasNext());
+ let change = changeStreamCursor.next();
+ assert(!changeStreamCursor.hasNext());
+ let expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, x: 1},
+ ns: {db: "test", coll: coll.getName()},
+ operationType: "insert",
+ };
+ assert("_id" in change, "Got unexpected change: " + tojson(change));
+ // Remember the _id of the first op to resume the stream.
+ resumeToken = change._id;
+ // Remove the fields we cannot predict, then test that the change is as expected.
+ delete change._id;
+ delete change.clusterTime;
+ assert.docEq(change, expected);
jsTestLog("Testing watch() with pipeline");
- singleCollCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]);
- wholeDbCursor = db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]);
- wholeClusterCursor = db.getMongo().watch([{$project: {_id: 0, docId: "$documentKey._id"}}]);
+ changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]);
// Store the cluster time of the insert as the timestamp to start from.
const resumeTime =
assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]}))
.$clusterTime.clusterTime;
- checkNextChange(singleCollCursor, {docId: 1});
- checkNextChange(wholeDbCursor, {docId: 1});
- checkNextChange(wholeClusterCursor, {docId: 1});
+ checkNextChange(changeStreamCursor, {docId: 1});
jsTestLog("Testing watch() with pipeline and resumeAfter");
- singleCollCursor =
+ changeStreamCursor =
coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
- wholeDbCursor =
- db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
- wholeClusterCursor = db.getMongo().watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
- {resumeAfter: resumeToken});
- checkNextChange(singleCollCursor, {docId: 1});
- checkNextChange(wholeDbCursor, {docId: 1});
- checkNextChange(wholeClusterCursor, {docId: 1});
+ checkNextChange(changeStreamCursor, {docId: 1});
jsTestLog("Testing watch() with pipeline and startAtClusterTime");
- singleCollCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
- {startAtClusterTime: {ts: resumeTime}});
- wholeDbCursor = db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
- {startAtClusterTime: {ts: resumeTime}});
- wholeClusterCursor = db.getMongo().watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
- {startAtClusterTime: {ts: resumeTime}});
- checkNextChange(singleCollCursor, {docId: 1});
- checkNextChange(wholeDbCursor, {docId: 1});
- checkNextChange(wholeClusterCursor, {docId: 1});
+ changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
+ {startAtClusterTime: {ts: resumeTime}});
+ checkNextChange(changeStreamCursor, {docId: 1});
jsTestLog("Testing watch() with updateLookup");
- singleCollCursor = coll.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"});
- wholeDbCursor = db.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"});
- wholeClusterCursor =
- db.getMongo().watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"});
+ changeStreamCursor = coll.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"});
assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}}));
- let expected = {
+ expected = {
documentKey: {_id: 0},
fullDocument: {_id: 0, x: 10},
ns: {db: "test", coll: coll.getName()},
operationType: "update",
updateDescription: {removedFields: [], updatedFields: {x: 10}},
};
- checkNextChange(singleCollCursor, expected);
- checkNextChange(wholeDbCursor, expected);
- checkNextChange(wholeClusterCursor, expected);
+ checkNextChange(changeStreamCursor, expected);
jsTestLog("Testing watch() with batchSize");
// Only test mongod because mongos uses batch size 0 for aggregate commands internally to
// establish cursors quickly. GetMore on mongos doesn't respect batch size due to SERVER-31992.
- const isMongos = db.runCommand({isdbgrid: 1}).isdbgrid;
+ const isMongos = FixtureHelpers.isMongos(db);
if (!isMongos) {
// Increase a field by 5 times and verify the batch size is respected.
for (let i = 0; i < 5; i++) {
@@ -137,61 +113,34 @@
}
// Only watch the "update" changes of the specific doc since the beginning.
- singleCollCursor = coll.watch(
- [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}],
- {resumeAfter: resumeToken, batchSize: 2});
- wholeDbCursor = db.watch(
- [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}],
- {resumeAfter: resumeToken, batchSize: 2});
- wholeClusterCursor = db.getMongo().watch(
+ changeStreamCursor = coll.watch(
[{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}],
{resumeAfter: resumeToken, batchSize: 2});
- for (let cursor of[singleCollCursor, wholeDbCursor, wholeClusterCursor]) {
- print(`Running test on namespace '${cursor._ns}'`);
- // Check the first batch.
- assert.eq(cursor.objsLeftInBatch(), 2);
- // Consume the first batch.
- assert(cursor.hasNext());
- cursor.next();
- assert(cursor.hasNext());
- cursor.next();
- // Confirm that the batch is empty.
- assert.eq(cursor.objsLeftInBatch(), 0);
-
- // Check the batch returned by getMore.
- assert(cursor.hasNext());
- assert.eq(cursor.objsLeftInBatch(), 2);
- cursor.next();
- assert(cursor.hasNext());
- cursor.next();
- assert.eq(cursor.objsLeftInBatch(), 0);
- // There are more changes coming, just not in the batch.
- assert(cursor.hasNext());
- }
+ // Check the first batch.
+ assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
+ // Consume the first batch.
+ assert(changeStreamCursor.hasNext());
+ changeStreamCursor.next();
+ assert(changeStreamCursor.hasNext());
+ changeStreamCursor.next();
+ // Confirm that the batch is empty.
+ assert.eq(changeStreamCursor.objsLeftInBatch(), 0);
+
+ // Check the batch returned by getMore.
+ assert(changeStreamCursor.hasNext());
+ assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
+ changeStreamCursor.next();
+ assert(changeStreamCursor.hasNext());
+ changeStreamCursor.next();
+ assert.eq(changeStreamCursor.objsLeftInBatch(), 0);
+ // There are more changes coming, just not in the batch.
+ assert(changeStreamCursor.hasNext());
}
jsTestLog("Testing watch() with maxAwaitTimeMS");
- singleCollCursor = coll.watch([], {maxAwaitTimeMS: 500});
- testCommandIsCalled(() => assert(!singleCollCursor.hasNext()), (cmdObj) => {
- assert.eq("getMore",
- Object.keys(cmdObj)[0],
- "expected getMore command, but was: " + tojson(cmdObj));
- assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj));
- assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj));
- });
-
- wholeDbCursor = db.watch([], {maxAwaitTimeMS: 500});
- testCommandIsCalled(() => assert(!wholeDbCursor.hasNext()), (cmdObj) => {
- assert.eq("getMore",
- Object.keys(cmdObj)[0],
- "expected getMore command, but was: " + tojson(cmdObj));
- assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj));
- assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj));
- });
-
- wholeClusterCursor = db.getMongo().watch([], {maxAwaitTimeMS: 500});
- testCommandIsCalled(() => assert(!wholeClusterCursor.hasNext()), (cmdObj) => {
+ changeStreamCursor = coll.watch([], {maxAwaitTimeMS: 500});
+ testCommandIsCalled(() => assert(!changeStreamCursor.hasNext()), (cmdObj) => {
assert.eq("getMore",
Object.keys(cmdObj)[0],
"expected getMore command, but was: " + tojson(cmdObj));
@@ -200,9 +149,7 @@
});
jsTestLog("Testing the cursor gets closed when the collection gets dropped");
- singleCollCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]);
- wholeDbCursor = db.watch([{$project: {_id: 0, clusterTime: 0}}]);
- wholeClusterCursor = db.getMongo().watch([{$project: {_id: 0, clusterTime: 0}}]);
+ changeStreamCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]);
assert.writeOK(coll.insert({_id: 2, x: 1}));
expected = {
documentKey: {_id: 2},
@@ -210,32 +157,19 @@
ns: {db: "test", coll: coll.getName()},
operationType: "insert",
};
- checkNextChange(singleCollCursor, expected);
- assert(!singleCollCursor.hasNext());
- assert(!singleCollCursor.isClosed());
- assert(!singleCollCursor.isExhausted());
-
- checkNextChange(wholeDbCursor, expected);
- assert(!wholeDbCursor.hasNext());
- assert(!wholeDbCursor.isClosed());
- assert(!wholeDbCursor.isExhausted());
-
- checkNextChange(wholeClusterCursor, expected);
- assert(!wholeClusterCursor.hasNext());
- assert(!wholeClusterCursor.isClosed());
- assert(!wholeClusterCursor.isExhausted());
+ checkNextChange(changeStreamCursor, expected);
+ assert(!changeStreamCursor.hasNext());
+ assert(!changeStreamCursor.isClosed());
+ assert(!changeStreamCursor.isExhausted());
// Dropping the collection should invalidate any open change streams.
assertDropCollection(db, coll.getName());
- for (let cursor of[singleCollCursor, wholeDbCursor, wholeClusterCursor]) {
- print(`Running test on namespace '${cursor._ns}'`);
- assert.soon(() => cursor.hasNext());
- assert(cursor.isClosed());
- assert(!cursor.isExhausted());
- expected = {operationType: "invalidate"};
- checkNextChange(cursor, expected);
- assert(!cursor.hasNext());
- assert(cursor.isClosed());
- assert(cursor.isExhausted());
- }
+ assert.soon(() => changeStreamCursor.hasNext());
+ assert(changeStreamCursor.isClosed());
+ assert(!changeStreamCursor.isExhausted());
+ expected = {operationType: "invalidate"};
+ checkNextChange(changeStreamCursor, expected);
+ assert(!changeStreamCursor.hasNext());
+ assert(changeStreamCursor.isClosed());
+ assert(changeStreamCursor.isExhausted());
}());
diff --git a/jstests/change_streams/change_stream_whole_db_invalidations.js b/jstests/change_streams/change_stream_whole_db_invalidations.js
index 455bff853fe..463359ec839 100644
--- a/jstests/change_streams/change_stream_whole_db_invalidations.js
+++ b/jstests/change_streams/change_stream_whole_db_invalidations.js
@@ -66,7 +66,11 @@
// Create collection on the database being watched.
coll = assertDropAndRecreateCollection(testDB, "change_stream_whole_db_invalidations");
- aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+ // Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be
+ // upconverted to a cluster-wide stream, which *would* be invalidated by dropping the collection
+ // in the other database.
+ aggCursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true});
// Drop the collection on the other database, this should *not* invalidate the change stream.
assertDropCollection(otherDB, otherDBColl.getName());
diff --git a/jstests/change_streams/include_cluster_time.js b/jstests/change_streams/include_cluster_time.js
index 387c368ecc1..6d0c33785e8 100644
--- a/jstests/change_streams/include_cluster_time.js
+++ b/jstests/change_streams/include_cluster_time.js
@@ -2,16 +2,12 @@
(function() {
"use strict";
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
// Drop and recreate the collections to be used in this set of tests.
const coll = assertDropAndRecreateCollection(db, "include_cluster_time");
- const collectionStream = coll.watch();
- const dbStream =
- db.watch([{$match: {$or: [{operationType: "invalidate"}, {"ns.coll": coll.getName()}]}}]);
- const clusterStream = db.getMongo().watch(
- [{$match: {$or: [{operationType: "invalidate"}, {"ns.coll": coll.getName()}]}}]);
+ const changeStream = coll.watch();
const insertClusterTime =
assert.commandWorked(coll.runCommand("insert", {documents: [{_id: 0}]}))
@@ -30,34 +26,30 @@
const dropClusterTime =
assert.commandWorked(db.runCommand({drop: coll.getName()})).$clusterTime.clusterTime;
- for (let changeStream of[collectionStream, dbStream, clusterStream]) {
- jsTestLog(`Testing stream on ns ${changeStream._ns}`);
-
- // Make sure each operation has a reasonable cluster time. Note that we should not assert
- // that the cluster times are equal, because the cluster time returned from the command is
- // generated by a second, independent read of the logical clock than the one used to
- // generate the oplog entry. It's possible that the system did something to advance the time
- // between the two reads of the clock.
- assert.soon(() => changeStream.hasNext());
- let next = changeStream.next();
- assert.eq(next.operationType, "insert");
- assert.lte(next.clusterTime, insertClusterTime);
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "update");
- assert.lte(next.clusterTime, updateClusterTime);
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "delete");
- assert.lte(next.clusterTime, deleteClusterTime);
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "invalidate");
- assert.lte(next.clusterTime, dropClusterTime);
-
- changeStream.close();
- }
+ // Make sure each operation has a reasonable cluster time. Note that we should not assert
+ // that the cluster times are equal, because the cluster time returned from the command is
+ // generated by a second, independent read of the logical clock than the one used to
+ // generate the oplog entry. It's possible that the system did something to advance the time
+ // between the two reads of the clock.
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.lte(next.clusterTime, insertClusterTime);
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.lte(next.clusterTime, updateClusterTime);
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "delete");
+ assert.lte(next.clusterTime, deleteClusterTime);
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "invalidate");
+ assert.lte(next.clusterTime, dropClusterTime);
+
+ changeStream.close();
}());
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index 9b41cbbf7f0..61e51f989d2 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -11,229 +11,208 @@
load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'.
const coll = assertDropAndRecreateCollection(db, "change_post_image");
-
- function testUpdateLookup(coll, collToWatch, changeStreamDB = db, changeStreamSpec = {}) {
- coll.drop();
-
- const cst = new ChangeStreamTest(changeStreamDB);
-
- jsTestLog("Testing change streams without 'fullDocument' specified");
- // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for
- // an insert.
- let cursor = cst.startWatchingChanges(
- {pipeline: [{$changeStream: changeStreamSpec}], collection: collToWatch});
- assert.writeOK(coll.insert({_id: "fullDocument not specified"}));
- let latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "insert");
- assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified"});
-
- // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for a
- // replacement-style update.
- assert.writeOK(coll.update({_id: "fullDocument not specified"},
- {_id: "fullDocument not specified", replaced: true}));
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "replace");
- assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified", replaced: true});
-
- // Test that not specifying 'fullDocument' does not include a 'fullDocument' in the result
- // for a non-replacement update.
- assert.writeOK(coll.update({_id: "fullDocument not specified"}, {$set: {updated: true}}));
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "update");
- assert(!latestChange.hasOwnProperty("fullDocument"));
-
- jsTestLog("Testing change streams with 'fullDocument' specified as 'default'");
-
- // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the
- // result for an insert.
- const defaultFullDocSpec = Object.assign({fullDocument: "default"}, changeStreamSpec);
- cursor = cst.startWatchingChanges(
- {collection: collToWatch, pipeline: [{$changeStream: defaultFullDocSpec}]});
- assert.writeOK(coll.insert({_id: "fullDocument is default"}));
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "insert");
- assert.eq(latestChange.fullDocument, {_id: "fullDocument is default"});
-
- // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the
- // result for a replacement-style update.
- assert.writeOK(coll.update({_id: "fullDocument is default"},
- {_id: "fullDocument is default", replaced: true}));
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "replace");
- assert.eq(latestChange.fullDocument, {_id: "fullDocument is default", replaced: true});
-
- // Test that specifying 'fullDocument' as 'default' does not include a 'fullDocument' in the
- // result for a non-replacement update.
- assert.writeOK(coll.update({_id: "fullDocument is default"}, {$set: {updated: true}}));
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "update");
- assert(!latestChange.hasOwnProperty("fullDocument"));
-
- jsTestLog("Testing change streams with 'fullDocument' specified as 'updateLookup'");
-
- // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in
- // the result for an insert.
- const updateLookupSpec = Object.assign({fullDocument: "updateLookup"}, changeStreamSpec);
- cursor = cst.startWatchingChanges(
- {collection: collToWatch, pipeline: [{$changeStream: updateLookupSpec}]});
- assert.writeOK(coll.insert({_id: "fullDocument is lookup"}));
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "insert");
- assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup"});
-
- // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in
- // the result for a replacement-style update.
- assert.writeOK(coll.update({_id: "fullDocument is lookup"},
- {_id: "fullDocument is lookup", replaced: true}));
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "replace");
- assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup", replaced: true});
-
- // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in
- // the result for a non-replacement update.
- assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updated: true}}));
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "update");
- assert.eq(latestChange.fullDocument,
- {_id: "fullDocument is lookup", replaced: true, updated: true});
-
- // Test that looking up the post image of an update after deleting the document will result
- // in a 'fullDocument' with a value of null.
- cursor = cst.startWatchingChanges({
- collection: collToWatch,
- pipeline: [{$changeStream: updateLookupSpec}, {$match: {operationType: "update"}}]
- });
- assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}}));
- assert.writeOK(coll.remove({_id: "fullDocument is lookup"}));
- // If this test is running with secondary read preference, it's necessary for the remove
- // to propagate to all secondary nodes and be available for majority reads before we can
- // assume looking up the document will fail.
- FixtureHelpers.awaitLastOpCommitted(db);
-
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "update");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, null);
- const deleteDocResumePoint = latestChange._id;
-
- // Test that looking up the post image of an update after the collection has been dropped
- // will result in 'fullDocument' with a value of null. This must be done using getMore
- // because new cursors cannot be established after a collection drop.
- assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
- assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}}));
-
- // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet.
- const resumeAfterDeleteAndUpdateLookupSpec = Object.assign(
- {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}, changeStreamSpec);
- cursor = cst.startWatchingChanges({
- collection: collToWatch,
- pipeline: [
- {$changeStream: resumeAfterDeleteAndUpdateLookupSpec},
- {$match: {operationType: {$ne: "delete"}}}
- ],
- aggregateOptions: {cursor: {batchSize: 0}}
- });
-
- // Save another stream to test post-image lookup after the collection is recreated.
- let cursorBeforeDrop = cst.startWatchingChanges({
- collection: collToWatch,
- pipeline: [
- {$changeStream: resumeAfterDeleteAndUpdateLookupSpec},
- {$match: {operationType: {$ne: "delete"}}}
- ],
- aggregateOptions: {cursor: {batchSize: 0}}
- });
-
- // Retrieve the 'insert' operation from the latter stream. This is necessary on a sharded
- // collection so that the documentKey is retrieved before the collection is recreated;
- // otherwise, per SERVER-31691, a uassert will occur.
- // TODO SERVER-31847: all remaining operations on the old UUID should be visible even if we
- // have not retrieved the first oplog entry before the collection is recreated.
- latestChange = cst.getOneChange(cursorBeforeDrop);
- assert.eq(latestChange.operationType, "insert");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"});
-
- // Drop the collection and wait until two-phase drop finishes.
- assertDropCollection(db, coll.getName());
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db,
- coll.getName());
- });
- // If this test is running with secondary read preference, it's necessary for the drop
- // to propagate to all secondary nodes and be available for majority reads before we can
- // assume looking up the document will fail.
- FixtureHelpers.awaitLastOpCommitted(db);
-
- // Check the next $changeStream entry; this is the test document inserted above.
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "insert");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"});
-
- // The next entry is the 'update' operation. Because the collection has been dropped, our
- // attempt to look up the post-image results in a null document.
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "update");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, null);
-
- // Test that looking up the post image of an update after the collection has been dropped
- // and created again will result in 'fullDocument' with a value of null. This must be done
- // using getMore because new cursors cannot be established after a collection drop.
-
- // Insert a document with the same _id, verify the change stream won't return it due to
- // different UUID.
- assertCreateCollection(db, coll.getName());
- assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
-
- // Confirm that the next entry's post-image is null since new collection has a different
- // UUID.
- latestChange = cst.getOneChange(cursorBeforeDrop);
- assert.eq(latestChange.operationType, "update");
- assert(latestChange.hasOwnProperty("fullDocument"));
- assert.eq(latestChange.fullDocument, null);
-
- // Test that invalidate entries don't have 'fullDocument' even if 'updateLookup' is
- // specified.
- const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate");
- cursor = cst.startWatchingChanges({
- collection: isNumber(collToWatch) ? collToWatch : collInvalidate.getName(),
- pipeline: [{$changeStream: updateLookupSpec}],
- aggregateOptions: {cursor: {batchSize: 0}}
- });
- assert.writeOK(collInvalidate.insert({_id: "testing invalidate"}));
- assertDropCollection(db, collInvalidate.getName());
- // Wait until two-phase drop finishes.
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
- db, collInvalidate.getName());
- });
- latestChange = cst.getOneChange(cursor);
- assert.eq(latestChange.operationType, "insert");
- latestChange = cst.getOneChange(cursor, true);
- assert.eq(latestChange.operationType, "invalidate");
- assert(!latestChange.hasOwnProperty("fullDocument"));
-
- jsTestLog("Testing full document lookup with a real getMore");
- assert.writeOK(coll.insert({_id: "getMoreEnabled"}));
-
- cursor = cst.startWatchingChanges({
- collection: collToWatch,
- pipeline: [{$changeStream: updateLookupSpec}],
- });
- assert.writeOK(coll.update({_id: "getMoreEnabled"}, {$set: {updated: true}}));
-
- let doc = cst.getOneChange(cursor);
- assert.docEq(doc["fullDocument"], {_id: "getMoreEnabled", updated: true});
- }
-
- // Test update lookup with a change stream on a single collection.
- testUpdateLookup(coll, coll.getName());
-
- // Test update lookup with a change stream on a whole database.
- testUpdateLookup(coll, 1);
-
- // Test update lookup with a change stream on the whole cluster.
- testUpdateLookup(coll, 1, db.getSiblingDB("admin"), {allChangesForCluster: true});
+ const cst = new ChangeStreamTest(db);
+
+ jsTestLog("Testing change streams without 'fullDocument' specified");
+ // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for
+ // an insert.
+ let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: coll});
+ assert.writeOK(coll.insert({_id: "fullDocument not specified"}));
+ let latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "insert");
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified"});
+
+ // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for a
+ // replacement-style update.
+ assert.writeOK(coll.update({_id: "fullDocument not specified"},
+ {_id: "fullDocument not specified", replaced: true}));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "replace");
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified", replaced: true});
+
+ // Test that not specifying 'fullDocument' does not include a 'fullDocument' in the result
+ // for a non-replacement update.
+ assert.writeOK(coll.update({_id: "fullDocument not specified"}, {$set: {updated: true}}));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "update");
+ assert(!latestChange.hasOwnProperty("fullDocument"));
+
+ jsTestLog("Testing change streams with 'fullDocument' specified as 'default'");
+
+ // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the
+ // result for an insert.
+ cursor = cst.startWatchingChanges(
+ {collection: coll, pipeline: [{$changeStream: {fullDocument: "default"}}]});
+ assert.writeOK(coll.insert({_id: "fullDocument is default"}));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "insert");
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument is default"});
+
+ // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the
+ // result for a replacement-style update.
+ assert.writeOK(coll.update({_id: "fullDocument is default"},
+ {_id: "fullDocument is default", replaced: true}));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "replace");
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument is default", replaced: true});
+
+ // Test that specifying 'fullDocument' as 'default' does not include a 'fullDocument' in the
+ // result for a non-replacement update.
+ assert.writeOK(coll.update({_id: "fullDocument is default"}, {$set: {updated: true}}));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "update");
+ assert(!latestChange.hasOwnProperty("fullDocument"));
+
+ jsTestLog("Testing change streams with 'fullDocument' specified as 'updateLookup'");
+
+ // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in
+ // the result for an insert.
+ cursor = cst.startWatchingChanges(
+ {collection: coll, pipeline: [{$changeStream: {fullDocument: "updateLookup"}}]});
+ assert.writeOK(coll.insert({_id: "fullDocument is lookup"}));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "insert");
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup"});
+
+ // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in
+ // the result for a replacement-style update.
+ assert.writeOK(coll.update({_id: "fullDocument is lookup"},
+ {_id: "fullDocument is lookup", replaced: true}));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "replace");
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup", replaced: true});
+
+ // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in
+ // the result for a non-replacement update.
+ assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updated: true}}));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "update");
+ assert.eq(latestChange.fullDocument,
+ {_id: "fullDocument is lookup", replaced: true, updated: true});
+
+ // Test that looking up the post image of an update after deleting the document will result
+ // in a 'fullDocument' with a value of null.
+ cursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [
+ {$changeStream: {fullDocument: "updateLookup"}},
+ {$match: {operationType: "update"}}
+ ]
+ });
+ assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}}));
+ assert.writeOK(coll.remove({_id: "fullDocument is lookup"}));
+ // If this test is running with secondary read preference, it's necessary for the remove
+ // to propagate to all secondary nodes and be available for majority reads before we can
+ // assume looking up the document will fail.
+ FixtureHelpers.awaitLastOpCommitted(db);
+
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "update");
+ assert(latestChange.hasOwnProperty("fullDocument"));
+ assert.eq(latestChange.fullDocument, null);
+ const deleteDocResumePoint = latestChange._id;
+
+ // Test that looking up the post image of an update after the collection has been dropped
+ // will result in 'fullDocument' with a value of null. This must be done using getMore
+ // because new cursors cannot be established after a collection drop.
+ assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
+ assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}}));
+
+ // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet.
+ cursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [
+ {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$match: {operationType: {$ne: "delete"}}}
+ ],
+ aggregateOptions: {cursor: {batchSize: 0}}
+ });
+
+ // Save another stream to test post-image lookup after the collection is recreated.
+ const cursorBeforeDrop = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [
+ {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$match: {operationType: {$ne: "delete"}}}
+ ],
+ aggregateOptions: {cursor: {batchSize: 0}}
+ });
+
+ // Retrieve the 'insert' operation from the latter stream. This is necessary on a sharded
+ // collection so that the documentKey is retrieved before the collection is recreated;
+ // otherwise, per SERVER-31691, a uassert will occur.
+ latestChange = cst.getOneChange(cursorBeforeDrop);
+ assert.eq(latestChange.operationType, "insert");
+ assert(latestChange.hasOwnProperty("fullDocument"));
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"});
+
+ // Drop the collection and wait until two-phase drop finishes.
+ assertDropCollection(db, coll.getName());
+ assert.soon(function() {
+ return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName());
+ });
+ // If this test is running with secondary read preference, it's necessary for the drop
+ // to propagate to all secondary nodes and be available for majority reads before we can
+ // assume looking up the document will fail.
+ FixtureHelpers.awaitLastOpCommitted(db);
+
+ // Check the next $changeStream entry; this is the test document inserted above.
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "insert");
+ assert(latestChange.hasOwnProperty("fullDocument"));
+ assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"});
+
+ // The next entry is the 'update' operation. Because the collection has been dropped, our
+ // attempt to look up the post-image results in a null document.
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "update");
+ assert(latestChange.hasOwnProperty("fullDocument"));
+ assert.eq(latestChange.fullDocument, null);
+
+ // Test that looking up the post image of an update after the collection has been dropped
+ // and created again will result in 'fullDocument' with a value of null. This must be done
+ // using getMore because new cursors cannot be established after a collection drop.
+
+ // Insert a document with the same _id, verify the change stream won't return it due to
+ // different UUID.
+ assertCreateCollection(db, coll.getName());
+ assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
+
+ // Confirm that the next entry's post-image is null since new collection has a different
+ // UUID.
+ latestChange = cst.getOneChange(cursorBeforeDrop);
+ assert.eq(latestChange.operationType, "update");
+ assert(latestChange.hasOwnProperty("fullDocument"));
+ assert.eq(latestChange.fullDocument, null);
+
+ jsTestLog("Testing full document lookup with a real getMore");
+ assert.writeOK(coll.insert({_id: "getMoreEnabled"}));
+
+ cursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: {fullDocument: "updateLookup"}}],
+ });
+ assert.writeOK(coll.update({_id: "getMoreEnabled"}, {$set: {updated: true}}));
+
+ const doc = cst.getOneChange(cursor);
+ assert.docEq(doc["fullDocument"], {_id: "getMoreEnabled", updated: true});
+
+ // Test that invalidate entries don't have 'fullDocument' even if 'updateLookup' is
+ // specified.
+ cursor = cst.startWatchingChanges({
+ collection: coll,
+ pipeline: [{$changeStream: {fullDocument: "updateLookup"}}],
+ aggregateOptions: {cursor: {batchSize: 0}}
+ });
+ assert.writeOK(coll.insert({_id: "testing invalidate"}));
+ assertDropCollection(db, coll.getName());
+ // Wait until two-phase drop finishes.
+ assert.soon(function() {
+ return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName());
+ });
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "insert");
+ latestChange = cst.getOneChange(cursor, true);
+ assert.eq(latestChange.operationType, "invalidate");
+ assert(!latestChange.hasOwnProperty("fullDocument"));
}());
diff --git a/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js
new file mode 100644
index 00000000000..3ee93a6fb24
--- /dev/null
+++ b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js
@@ -0,0 +1,54 @@
+/**
+ * Loading this file overrides DB.prototype._runCommandImpl with a function that converts any
+ * attempt to run $changeStream on a single collection or single database into a cluster-wide
+ * $changeStream filtered by that database or namespace. Single-collection/db rules regarding
+ * internal collections and views are respected. Non-$changeStream commands and commands which
+ * explicitly request to be exempted from modification by setting the 'noPassthrough' flag, are
+ * passed through as-is.
+ */
+
+// For the whole_cluster passthrough, we simply override the necessary methods in the whole_db
+// passthrough's ChangeStreamPassthroughHelpers.
+load("jstests/libs/override_methods/implicit_whole_db_changestreams.js");
+
+// Any valid single-collection or single-database request is upconvertable to cluster-wide.
+ChangeStreamPassthroughHelpers.isUpconvertableChangeStreamRequest =
+ ChangeStreamPassthroughHelpers.isValidChangeStreamRequest;
+
+ChangeStreamPassthroughHelpers.nsMatchFilter = function(db, collName) {
+ // The $match filter we inject into the pipeline will depend on whether this is a
+ // single-collection or whole-db stream.
+ const isSingleCollectionStream = (typeof collName === 'string');
+
+ return {
+ $match: {
+ $or: [
+ {
+ "ns.db": db.getName(),
+ "ns.coll": (isSingleCollectionStream ? collName : {$exists: true})
+ },
+ {operationType: "invalidate"}
+ ]
+ }
+ };
+};
+
+ChangeStreamPassthroughHelpers.execDBName = function(db) {
+ return "admin";
+};
+
+ChangeStreamPassthroughHelpers.changeStreamSpec = function() {
+ return {allChangesForCluster: true};
+};
+
+// Redirect the DB's 'watch' function to use the cluster-wide version. The Collection.watch helper
+// has already been overridden to use DB.watch when we loaded 'implicit_whole_db_changestreams.js',
+// so this ensures that both the Collection and DB helpers will actually call the Mongo function.
+// Although calls to the shell helpers will ultimately resolve to the overridden runCommand anyway,
+// we need to override the helper to ensure that the Mongo.watch function itself is exercised by the
+// passthrough wherever Collection.watch or DB.watch is called.
+DB.prototype.watch = function(pipeline, options) {
+ pipeline = Object.assign([], pipeline);
+ pipeline.unshift(ChangeStreamPassthroughHelpers.nsMatchFilter(this, 1));
+ return this.getMongo().watch(pipeline, options);
+};
diff --git a/jstests/libs/override_methods/implicit_whole_db_changestreams.js b/jstests/libs/override_methods/implicit_whole_db_changestreams.js
index deaf8f7c216..ae4ed192033 100644
--- a/jstests/libs/override_methods/implicit_whole_db_changestreams.js
+++ b/jstests/libs/override_methods/implicit_whole_db_changestreams.js
@@ -7,75 +7,121 @@
* 'noPassthrough' flag, are passed through as-is.
*/
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
// Helper function which tests can call to explicitly request that the command not be modified by
// the passthrough code. When defined, ChangeStreamTest will adopt this as its default runCommand
// implementation to allow individual tests to exempt themselves from modification.
const changeStreamPassthroughAwareRunCommand = (db, cmdObj, noPassthrough) =>
db.runCommand(cmdObj, undefined, undefined, noPassthrough);
+// Defines a set of functions to validate incoming $changeStream requests and transform
+// single-collection streams into equivalent whole-db streams. Separating these functions allows the
+// runCommand override to generically upconvert $changeStream requests, and the
+// ChangeStreamPassthroughHelpers may themselves be overridden by other passthroughs in order to
+// alter the behaviour of runCommand.
+const ChangeStreamPassthroughHelpers = {
+ isValidChangeStreamRequest: function(db, cmdObj) {
+ // Determine whether this command is a valid $changeStream aggregation on a single
+ // collection or database.
+ if (!(cmdObj && cmdObj.aggregate && Array.isArray(cmdObj.pipeline) &&
+ cmdObj.pipeline.length > 0 && cmdObj.pipeline[0].$changeStream)) {
+ return false;
+ }
+ // Single-collection and whole-db streams cannot be opened on internal databases.
+ if (db.getName() == "admin" || db.getName() == "config" || db.getName() == "local") {
+ return false;
+ }
+ // If the client's $changeStream spec already contains everything we intend to modify, pass
+ // the command through as-is.
+ const testSpec = this.changeStreamSpec(), csParams = Object.keys(testSpec);
+ if (csParams.length > 0 &&
+ csParams.every((csParam) =>
+ testSpec[csParam] === cmdObj.pipeline[0].$changeStream[csParam])) {
+ return false;
+ }
+ // The remaining checks are only relevant to single-collection streams. If the 'aggregate'
+ // field of the command object is not a string, validate that it is equal to 1.
+ if (typeof cmdObj.aggregate !== 'string') {
+ return cmdObj.aggregate == 1;
+ }
+ // Single-collection streams cannot be opened on internal collections in any database.
+ if (cmdObj.aggregate.startsWith("system.")) {
+ return false;
+ }
+ // Single-collection streams cannot be opened on views.
+ if (FixtureHelpers.getViewDefinition(db, cmdObj.aggregate)) {
+ return false;
+ }
+ // This is a well-formed request.
+ return true;
+ },
+ // All valid single-collection change stream requests are upconvertable in this passthrough.
+ isUpconvertableChangeStreamRequest: function(db, cmdObj) {
+ return this.isValidChangeStreamRequest(db, cmdObj) &&
+ (typeof cmdObj.aggregate === 'string');
+ },
+ nsMatchFilter: function(db, collName) {
+ return {
+ $match: {
+ $or:
+ [{"ns.db": db.getName(), "ns.coll": collName}, {operationType: "invalidate"}]
+ }
+ };
+ },
+ execDBName: function(db) {
+ return db.getName();
+ },
+ changeStreamSpec: function() {
+ return {};
+ },
+ upconvertChangeStreamRequest: function(db, cmdObj) {
+ // Take a copy of the command object such that the original is not altered.
+ cmdObj = Object.assign({}, cmdObj);
+
+ // To convert this command into a whole-db stream, we insert a $match stage just after
+ // the $changeStream stage that filters by database and collection name, and we update
+ // the command's execution 'namespace' to 1.
+ let pipeline = [{
+ $changeStream:
+ Object.assign({}, cmdObj.pipeline[0].$changeStream, this.changeStreamSpec())
+ }];
+ pipeline.push(this.nsMatchFilter(db, cmdObj.aggregate));
+ pipeline = pipeline.concat(cmdObj.pipeline.slice(1));
+ cmdObj.pipeline = pipeline;
+ cmdObj.aggregate = 1;
+
+ return [this.execDBName(db), cmdObj];
+ },
+ upconvertGetMoreRequest: function(db, cmdObj) {
+ return [this.execDBName(db), Object.assign({}, cmdObj, {collection: "$cmd.aggregate"})];
+ }
+};
+
(function() {
'use strict';
- load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-
const originalRunCommandImpl = DB.prototype._runCommandImpl;
const originalRunCommand = DB.prototype.runCommand;
const upconvertedCursors = new Set();
- function isValidChangeStreamRequest(db, cmdObj) {
- // Determine whether this command is a $changeStream aggregation on a single collection.
- if (cmdObj && typeof cmdObj.aggregate === 'string' && Array.isArray(cmdObj.pipeline) &&
- cmdObj.pipeline.length > 0 && cmdObj.pipeline[0].$changeStream) {
- // Single-collection streams cannot be opened on internal databases.
- if (db.getName() == "admin" || db.getName() == "config" || db.getName() == "local") {
- return false;
- }
- // Single-collection streams cannot be opened on internal collections in any database.
- if (cmdObj.aggregate.startsWith("system.")) {
- return false;
- }
- // Single-collection streams cannot be opened on views.
- if (FixtureHelpers.getViewDefinition(db, cmdObj.aggregate)) {
- return false;
- }
- // This is a well-formed single-collection request.
- return true;
- }
-
- return false;
- }
+ const db = null;
const passthroughRunCommandImpl = function(dbName, cmdObj, options) {
- // Check whether this command is a valid $changeStream request.
- let upconvertCursor = isValidChangeStreamRequest(this, cmdObj);
+ // Check whether this command is an upconvertable $changeStream request.
+ const upconvertCursor =
+ ChangeStreamPassthroughHelpers.isUpconvertableChangeStreamRequest(this, cmdObj);
if (upconvertCursor) {
- // Having validated the legality of the stream, take a copy of the command object such
- // that the original object is not altered.
- cmdObj = Object.assign({}, cmdObj);
-
- // To convert this command into a whole-db stream, we insert a $match stage just after
- // the $changeStream stage that filters by database and collection name, and we update
- // the command's execution 'namespace' to 1.
- let pipeline = [{$changeStream: Object.assign({}, cmdObj.pipeline[0].$changeStream)}];
- pipeline.push({
- $match: {
- $or: [
- {"ns.db": dbName, "ns.coll": cmdObj.aggregate},
- {operationType: "invalidate"}
- ]
- }
- });
- pipeline = pipeline.concat(cmdObj.pipeline.slice(1));
- cmdObj.pipeline = pipeline;
- cmdObj.aggregate = 1;
+ [dbName, cmdObj] =
+ ChangeStreamPassthroughHelpers.upconvertChangeStreamRequest(this, cmdObj);
}
// If the command is a getMore, it may be a $changeStream that we upconverted to run
// whole-db. Ensure that we update the 'collection' field to be the collectionless
// namespace.
if (cmdObj && cmdObj.getMore && upconvertedCursors.has(cmdObj.getMore.toString())) {
- cmdObj = Object.assign({}, cmdObj, {collection: "$cmd.aggregate"});
+ [dbName, cmdObj] = ChangeStreamPassthroughHelpers.upconvertGetMoreRequest(this, cmdObj);
}
// Pass the modified command to the original runCommand implementation.
@@ -89,6 +135,18 @@ const changeStreamPassthroughAwareRunCommand = (db, cmdObj, noPassthrough) =>
return res;
};
+ // Redirect the Collection's 'watch' function to use the whole-DB version. Although calls to the
+ // shell helpers will ultimately resolve to the overridden runCommand anyway, we need to
+ // override the helpers to ensure that the DB.watch function itself is exercised by the
+ // passthrough wherever Collection.watch is called.
+ DBCollection.prototype.watch = function(pipeline, options) {
+ pipeline = Object.assign([], pipeline);
+ pipeline.unshift(
+ ChangeStreamPassthroughHelpers.nsMatchFilter(this.getDB(), this.getName()));
+ return this.getDB().watch(pipeline, options);
+ };
+
+ // Override DB.runCommand to use the custom or original _runCommandImpl.
DB.prototype.runCommand = function(cmdObj, extra, queryOptions, noPassthrough) {
this._runCommandImpl = (noPassthrough ? originalRunCommandImpl : passthroughRunCommandImpl);
return originalRunCommand.apply(this, [cmdObj, extra, queryOptions]);