summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2020-04-02 10:37:26 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-03 18:08:47 +0000
commitc7220a8080388da758230240a94ca0a15156148b (patch)
tree8ed48297dc23336fa9d11dac6ce84ae4feb0c566
parent71bb4d6683a27d2e5c484618433639f66e0c1bd7 (diff)
downloadmongo-c7220a8080388da758230240a94ca0a15156148b.tar.gz
SERVER-46819 Allow transactions in change stream sharded passthroughs
(cherry picked from commit 8c9563e56b429bf609f47ac3a6f36920dd6807f3)
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml2
-rw-r--r--jstests/change_streams/apply_ops.js130
-rw-r--r--jstests/change_streams/apply_ops_resumability.js199
-rw-r--r--jstests/change_streams/change_stream.js2
-rw-r--r--jstests/change_streams/report_post_batch_resume_token.js56
-rw-r--r--jstests/libs/auto_retry_transaction_in_sharding.js97
-rw-r--r--jstests/libs/change_stream_util.js139
16 files changed, 464 insertions, 179 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
index 62e5d1a2e1c..a70f485b5fc 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
@@ -12,8 +12,6 @@ selector:
##
# "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
- # Transactions not supported on sharded clusters.
- - uses_transactions
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index 05357a578b5..9e82e7a67a5 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -12,8 +12,6 @@ selector:
##
# "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
- # Transactions not supported on sharded clusters.
- - uses_transactions
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
index e1dc1e9a6fa..71a51f3ef09 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
@@ -25,8 +25,6 @@ selector:
# "Cowardly refusing to run test with overridden read preference when it reads from a
# non-replicated collection: ..."
- assumes_read_preference_unchanged
- # Transactions not supported on sharded cluster.
- - uses_transactions
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
index 7a7cdd7320f..873cbcb8b6a 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
@@ -13,8 +13,6 @@ selector:
##
# "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
- # Transactions not supported on sharded clusters.
- - uses_transactions
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
- assumes_unsharded_collection
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
index 968d2cd6b13..b154b5a6819 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
@@ -12,8 +12,6 @@ selector:
##
# "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
- # Transactions not supported on sharded clusters.
- - uses_transactions
# Not relevant for whole-cluster change streams.
- do_not_run_in_whole_cluster_passthrough
# Exclude any that assume sharding is disabled
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
index 368b1d4d538..c646f91cf8c 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
@@ -25,8 +25,6 @@ selector:
# "Cowardly refusing to run test with overridden read preference when it reads from a
# non-replicated collection: ..."
- assumes_read_preference_unchanged
- # Transactions not supported on sharded cluster.
- - uses_transactions
# Not relevant for whole-cluster change streams.
- do_not_run_in_whole_cluster_passthrough
# Exclude any that assume sharding is disabled
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
index 4d38135bc0f..b2df0199282 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
@@ -12,8 +12,6 @@ selector:
##
# "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
- # Transactions not supported on sharded clusters.
- - uses_transactions
# Not relevant for whole-cluster change streams.
- do_not_run_in_whole_cluster_passthrough
# Exclude any that assume sharding is disabled
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
index d9cd88e7a98..286881ae831 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
@@ -15,8 +15,6 @@ selector:
##
# "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
- # Transactions not supported on sharded clusters.
- - uses_transactions
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
index 88f63128bab..7f327fe56e5 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
@@ -27,8 +27,6 @@ selector:
# "Cowardly refusing to run test with overridden read preference when it reads from a
# non-replicated collection: ..."
- assumes_read_preference_unchanged
- # Transactions not supported on sharded cluster.
- - uses_transactions
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
index a861822f3b2..f99f31f4ae2 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
@@ -15,8 +15,6 @@ selector:
##
# "Cowardly refusing to override write concern of command: ..."
- assumes_write_concern_unchanged
- # Transactions not supported on sharded clusters.
- - uses_transactions
# Exclude any that assume sharding is disabled
- assumes_against_mongod_not_mongos
- assumes_unsharded_collection
diff --git a/jstests/change_streams/apply_ops.js b/jstests/change_streams/apply_ops.js
index fa232f77f1b..e8a13088337 100644
--- a/jstests/change_streams/apply_ops.js
+++ b/jstests/change_streams/apply_ops.js
@@ -1,11 +1,13 @@
// Tests that a change stream will correctly unwind applyOps entries generated by a transaction.
-// @tags: [uses_transactions]
+// @tags: [uses_transactions, requires_snapshot_read, requires_majority_read_concern]
(function() {
"use strict";
-load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
-load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/auto_retry_transaction_in_sharding.js"); // For withTxnAndAutoRetryOnMongos.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos.
const otherCollName = "change_stream_apply_ops_2";
const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
@@ -17,8 +19,14 @@ assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
// Insert a document that gets deleted as part of the transaction.
const kDeletedDocumentId = 0;
-coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"},
- {writeConcern: {w: "majority"}});
+const insertRes = assert.commandWorked(coll.runCommand("insert", {
+ documents: [{_id: kDeletedDocumentId, a: "I was here before the transaction"}],
+ writeConcern: {w: "majority"}
+}));
+
+// Record the clusterTime of the insert, and increment it to give the test start time.
+const testStartTime = insertRes.$clusterTime.clusterTime;
+testStartTime.i++;
let cst = new ChangeStreamTest(db);
let changeStream = cst.startWatchingChanges({
@@ -31,38 +39,49 @@ let changeStream = cst.startWatchingChanges({
const sessionOptions = {
causalConsistency: false
};
+const txnOptions = {
+ readConcern: {level: "snapshot"},
+ writeConcern: {w: "majority"}
+};
+
const session = db.getMongo().startSession(sessionOptions);
+
+// Create these variables before starting the transaction. In sharded passthroughs, accessing
+// db[collname] may attempt to implicitly shard the collection, which is not allowed in a txn.
const sessionDb = session.getDatabase(db.getName());
const sessionColl = sessionDb[coll.getName()];
-
-session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}});
-assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
-assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
-
-// One insert on a collection that we're not watching. This should be skipped by the
-// single-collection changestream.
-assert.commandWorked(sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"}));
-
-// One insert on a collection in a different database. This should be skipped by the single
-// collection and single-db changestreams.
-assert.commandWorked(
- session.getDatabase(otherDbName)[otherDbCollName].insert({_id: 222, a: "Doc on other DB"}));
-
-assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
-
-assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId}));
-
-assert.commandWorked(session.commitTransaction_forTesting());
-
-// Do applyOps on the collection that we care about. This is an "external" applyOps, though
-// (not run as part of a transaction) so its entries should be skipped in the change
-// stream. This checks that applyOps that don't have an 'lsid' and 'txnNumber' field do not
-// get unwound.
-assert.commandWorked(db.runCommand({
- applyOps: [
- {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}},
- ]
-}));
+const sessionOtherColl = sessionDb[otherCollName];
+const sessionOtherDbColl = session.getDatabase(otherDbName)[otherDbCollName];
+
+withTxnAndAutoRetryOnMongos(session, () => {
+ // Two inserts on the main test collection.
+ assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
+ assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
+
+ // One insert on a collection that we're not watching. This should be skipped by the
+ // single-collection changestream.
+ assert.commandWorked(sessionOtherColl.insert({_id: 111, a: "Doc on other collection"}));
+
+ // One insert on a collection in a different database. This should be skipped by the single
+ // collection and single-db changestreams.
+ assert.commandWorked(sessionOtherDbColl.insert({_id: 222, a: "Doc on other DB"}));
+
+ assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
+
+ assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId}));
+}, txnOptions);
+
+// Do applyOps on the collection that we care about. This is an "external" applyOps, though (not run
+// as part of a transaction) so its entries should be skipped in the change stream. This checks that
+// applyOps that don't have an 'lsid' and 'txnNumber' field do not get unwound. Skip if running in a
+// sharded passthrough, since the applyOps command does not exist on mongoS.
+if (!FixtureHelpers.isMongos(db)) {
+ assert.commandWorked(db.runCommand({
+ applyOps: [
+ {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}},
+ ]
+ }));
+}
// Drop the collection. This will trigger an "invalidate" event at the end of the stream.
assert.commandWorked(db.runCommand({drop: coll.getName()}));
@@ -106,18 +125,39 @@ const expectedChanges = [
},
];
+// If we are running in a sharded passthrough, then this may have been a multi-shard transaction.
+// Change streams will interleave the txn events from across the shards in (clusterTime, txnOpIndex)
+// order, and so may not reflect the ordering of writes in the test. We thus verify that exactly the
+// expected set of events are observed, but we relax the ordering requirements.
+function assertNextChangesEqual({cursor, expectedChanges, expectInvalidate}) {
+ const assertEqualFunc = FixtureHelpers.isMongos(db) ? cst.assertNextChangesEqualUnordered
+ : cst.assertNextChangesEqual;
+ return assertEqualFunc(
+ {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: expectInvalidate});
+}
+
+//
+// Test behavior of single-collection change streams with apply ops.
+//
+
// Verify that the stream returns the expected sequence of changes.
-const changes =
- cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+const changes = assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+
// Single collection change stream should also be invalidated by the drop.
-cst.assertNextChangesEqual({
+assertNextChangesEqual({
cursor: changeStream,
expectedChanges: [{operationType: "invalidate"}],
expectInvalidate: true
});
-// Obtain the clusterTime from the first change.
-const startTime = changes[0].clusterTime;
+//
+// Test behavior of whole-db change streams with apply ops.
+//
+
+// In a sharded cluster, whole-db-or-cluster streams will see a collection drop from each shard.
+for (let i = 1; i < FixtureHelpers.numberOfShardsForCollection(coll); ++i) {
+ expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}});
+}
// Add an entry for the insert on db.otherColl into expectedChanges.
expectedChanges.splice(2, 0, {
@@ -132,10 +172,14 @@ expectedChanges.splice(2, 0, {
// Verify that a whole-db stream returns the expected sequence of changes, including the insert
// on the other collection but NOT the changes on the other DB or the manual applyOps.
changeStream = cst.startWatchingChanges({
- pipeline: [{$changeStream: {startAtOperationTime: startTime}}, {$project: {"lsid.uid": 0}}],
+ pipeline: [{$changeStream: {startAtOperationTime: testStartTime}}, {$project: {"lsid.uid": 0}}],
collection: 1
});
-cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+
+//
+// Test behavior of whole-cluster change streams with apply ops.
+//
// Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
expectedChanges.splice(3, 0, {
@@ -152,12 +196,12 @@ expectedChanges.splice(3, 0, {
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
changeStream = cst.startWatchingChanges({
pipeline: [
- {$changeStream: {startAtOperationTime: startTime, allChangesForCluster: true}},
+ {$changeStream: {startAtOperationTime: testStartTime, allChangesForCluster: true}},
{$project: {"lsid.uid": 0}}
],
collection: 1
});
-cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
cst.cleanUp();
}());
diff --git a/jstests/change_streams/apply_ops_resumability.js b/jstests/change_streams/apply_ops_resumability.js
index bf581d40ee1..5bd0de87047 100644
--- a/jstests/change_streams/apply_ops_resumability.js
+++ b/jstests/change_streams/apply_ops_resumability.js
@@ -1,11 +1,14 @@
-// Tests that a change stream will correctly unwind applyOps entries generated by a transaction.
-// @tags: [uses_transactions]
+// Tests that a change stream will correctly unwind applyOps entries generated by a transaction, and
+// that we can resume from any point within the transaction.
+// @tags: [uses_transactions, requires_snapshot_read, requires_majority_read_concern]
(function() {
"use strict";
-load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
-load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/auto_retry_transaction_in_sharding.js"); // For withTxnAndAutoRetryOnMongos.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos.
const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
const otherCollName = "change_stream_apply_ops_2";
@@ -19,6 +22,9 @@ let cst = new ChangeStreamTest(db);
let changeStream = cst.startWatchingChanges(
{pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], collection: coll});
+// Record the clusterTime at the outset of the test, before any writes are performed.
+const testStartTime = db.isMaster().$clusterTime.clusterTime;
+
// Do an insert outside of a transaction.
assert.commandWorked(coll.insert({_id: 0, a: 123}));
@@ -26,32 +32,45 @@ assert.commandWorked(coll.insert({_id: 0, a: 123}));
const sessionOptions = {
causalConsistency: false
};
+const txnOptions = {
+ readConcern: {level: "snapshot"},
+ writeConcern: {w: "majority"}
+};
+
const session = db.getMongo().startSession(sessionOptions);
+
+// Create these variables before starting the transaction. In sharded passthroughs, accessing
+// db[collname] may attempt to implicitly shard the collection, which is not allowed in a txn.
const sessionDb = session.getDatabase(db.getName());
const sessionColl = sessionDb[coll.getName()];
+const sessionOtherColl = sessionDb[otherCollName];
+const sessionOtherDbColl = session.getDatabase(otherDbName)[otherDbCollName];
-session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}});
-assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
-assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
+withTxnAndAutoRetryOnMongos(session, () => {
+ // Two inserts on the main test collection.
+ assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
+ assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
-// One insert on a collection that we're not watching. This should be skipped by the
-// single-collection change stream.
-assert.commandWorked(sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"}));
+ // One insert on a collection that we're not watching. This should be skipped by the
+ // single-collection change stream.
+ assert.commandWorked(sessionOtherColl.insert({_id: 111, a: "Doc on other collection"}));
-// One insert on a collection in a different database. This should be skipped by the single
-// collection and single-db changestreams.
-assert.commandWorked(
- session.getDatabase(otherDbName)[otherDbCollName].insert({_id: 222, a: "Doc on other DB"}));
+ // One insert on a collection in a different database. This should be skipped by the single
+ // collection and single-db changestreams.
+ assert.commandWorked(sessionOtherDbColl.insert({_id: 222, a: "Doc on other DB"}));
-assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
-
-assert.commandWorked(session.commitTransaction_forTesting());
+ assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
+}, txnOptions);
// Now insert another document, not part of a transaction.
assert.commandWorked(coll.insert({_id: 3, a: 123}));
-// Define the set of changes expected for the single-collection case per the operations above.
-const expectedChanges = [
+// Drop the collection. This will trigger a "drop" event, which in the case of the single-collection
+// stream will be followed by an "invalidate".
+assert.commandWorked(db.runCommand({drop: coll.getName()}));
+
+// Define the set of all changes expected to be generated by the operations above.
+let expectedChanges = [
{
documentKey: {_id: 0},
fullDocument: {_id: 0, a: 123},
@@ -75,6 +94,22 @@ const expectedChanges = [
txnNumber: session.getTxnNumber_forTesting(),
},
{
+ documentKey: {_id: 111},
+ fullDocument: {_id: 111, a: "Doc on other collection"},
+ ns: {db: db.getName(), coll: otherCollName},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: session.getTxnNumber_forTesting(),
+ },
+ {
+ documentKey: {_id: 222},
+ fullDocument: {_id: 222, a: "Doc on other DB"},
+ ns: {db: otherDbName, coll: otherDbCollName},
+ operationType: "insert",
+ lsid: session.getSessionId(),
+ txnNumber: session.getTxnNumber_forTesting(),
+ },
+ {
documentKey: {_id: 1},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "update",
@@ -88,18 +123,66 @@ const expectedChanges = [
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
+ {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}
];
+// Validate that we observe all expected changes in the stream, and replace the'expectedChanges'
+// list with the changes returned by ChangeStreamTest. These will include the _id resume tokens for
+// each change, so subsequent tests will be able to resume from any point.
+(function validateExpectedChangesAndPopulateResumeTokens() {
+ const wholeClusterCST = new ChangeStreamTest(db.getSiblingDB("admin"));
+ const wholeClusterCursor = wholeClusterCST.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {startAtOperationTime: testStartTime, allChangesForCluster: true}},
+ {$project: {"lsid.uid": 0}}
+ ],
+ collection: 1
+ });
+ // If we are running in a sharded passthrough, then this may have been a multi-shard txn. Change
+ // streams will interleave the txn events from across the shards in (clusterTime, txnOpIndex)
+ // order, and so may not reflect the ordering of writes in the test. The ordering of events is
+ // important for later tests, so if we are running on mongoS we verify that exactly the expected
+ // set of events are observed, and then we adopt the order in which they were returned.
+ if (FixtureHelpers.isMongos(db)) {
+ expectedChanges = wholeClusterCST.assertNextChangesEqualUnordered(
+ {cursor: wholeClusterCursor, expectedChanges: expectedChanges});
+ } else {
+ expectedChanges = wholeClusterCST.assertNextChangesEqual(
+ {cursor: wholeClusterCursor, expectedChanges: expectedChanges});
+ }
+})();
+
+// Helper function to find the first non-transaction event and the first two transaction events in
+// the given list of change stream events.
+function findMilestoneEvents(eventList) {
+ const nonTxnIdx = eventList.findIndex(event => !event.lsid),
+ firstTxnIdx = eventList.findIndex(event => event.lsid),
+ secondTxnIdx = eventList.findIndex((event, idx) => (idx > firstTxnIdx && event.lsid));
+ // Return the array indices of each event, and the events themselves.
+ return [
+ nonTxnIdx,
+ firstTxnIdx,
+ secondTxnIdx,
+ eventList[nonTxnIdx],
+ eventList[firstTxnIdx],
+ eventList[secondTxnIdx]
+ ];
+}
+
//
// Test behavior of single-collection change streams with apply ops.
//
+// Filter out any events that aren't on the main test collection namespace.
+const expectedSingleCollChanges = expectedChanges.filter(
+ event => (event.ns.db === db.getName() && event.ns.coll === coll.getName()));
+
// Verify that the stream returns the expected sequence of changes.
-const changes =
- cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedSingleCollChanges});
-// Record the first (non-transaction) change and the first in-transaction change.
-const nonTxnChange = changes[0], firstTxnChange = changes[1], secondTxnChange = changes[2];
+// Obtain the first non-transaction change and the first two in-transaction changes.
+let [nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] =
+ findMilestoneEvents(expectedSingleCollChanges);
// Resume after the first non-transaction change. Be sure we see the documents from the
// transaction again.
@@ -107,16 +190,18 @@ changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: nonTxnChange._id}}, {$project: {"lsid.uid": 0}}],
collection: coll
});
-cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(1)});
+cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedSingleCollChanges.slice(nonTxnIdx + 1)});
// Resume after the first transaction change. Be sure we see the second change again.
changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstTxnChange._id}}, {$project: {"lsid.uid": 0}}],
collection: coll
});
-cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(2)});
+cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedSingleCollChanges.slice(firstTxnIdx + 1)});
-// Try starting another change stream from the _last_ change caused by the transaction. Verify
+// Try starting another change stream from the second change caused by the transaction. Verify
// that we can see the insert performed after the transaction was committed.
let otherCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}],
@@ -124,34 +209,28 @@ let otherCursor = cst.startWatchingChanges({
doNotModifyInPassthroughs: true // A collection drop only invalidates single-collection
// change streams.
});
-cst.assertNextChangesEqual({cursor: otherCursor, expectedChanges: expectedChanges.slice(3)});
+cst.assertNextChangesEqual(
+ {cursor: otherCursor, expectedChanges: expectedSingleCollChanges.slice(secondTxnIdx + 1)});
-// Drop the collection. This will trigger a "drop" followed by an "invalidate" for the single
-// collection change stream.
-assert.commandWorked(db.runCommand({drop: coll.getName()}));
-let change = cst.getOneChange(otherCursor);
-assert.eq(change.operationType, "drop");
-assert.eq(change.ns, {db: db.getName(), coll: coll.getName()});
-change = cst.getOneChange(otherCursor, true);
-assert.eq(change.operationType, "invalidate");
+// Verify that the next event observed by the stream is an invalidate following the collection drop.
+const invalidateEvent = cst.getOneChange(otherCursor, true);
+assert.eq(invalidateEvent.operationType, "invalidate");
//
// Test behavior of whole-db change streams with apply ops.
//
-// For a whole-db or whole-cluster change stream, the collection drop should return a single
-// "drop" entry and not invalidate the stream.
-expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}});
-
-// Add an entry for the insert on db.otherColl into expectedChanges.
-expectedChanges.splice(3, 0, {
- documentKey: {_id: 111},
- fullDocument: {_id: 111, a: "Doc on other collection"},
- ns: {db: db.getName(), coll: otherCollName},
- operationType: "insert",
- lsid: session.getSessionId(),
- txnNumber: session.getTxnNumber_forTesting(),
-});
+// In a sharded cluster, whole-db-or-cluster streams will see a collection drop from each shard.
+for (let i = 1; i < FixtureHelpers.numberOfShardsForCollection(coll); ++i) {
+ expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}});
+}
+
+// Filter out any events that aren't on the main test database.
+const expectedSingleDBChanges = expectedChanges.filter(event => (event.ns.db === db.getName()));
+
+// Obtain the first non-transaction change and the first two in-transaction changes.
+[nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] =
+ findMilestoneEvents(expectedSingleDBChanges);
// Verify that a whole-db stream can be resumed from the middle of the transaction, and that it
// will see all subsequent changes including the insert on the other collection but NOT the
@@ -160,17 +239,16 @@ changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}],
collection: 1,
});
-cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(3)});
-
-// Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
-expectedChanges.splice(4, 0, {
- documentKey: {_id: 222},
- fullDocument: {_id: 222, a: "Doc on other DB"},
- ns: {db: otherDbName, coll: otherDbCollName},
- operationType: "insert",
- lsid: session.getSessionId(),
- txnNumber: session.getTxnNumber_forTesting(),
-});
+cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedSingleDBChanges.slice(secondTxnIdx + 1)});
+
+//
+// Test behavior of whole-cluster change streams with apply ops.
+//
+
+// Obtain the first non-transaction change and the first two in-transaction changes.
+[nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] =
+ findMilestoneEvents(expectedChanges);
// Verify that a whole-cluster stream can be resumed from the middle of the transaction, and
// that it will see all subsequent changes including the insert on the other collection and the
@@ -183,7 +261,8 @@ changeStream = cst.startWatchingChanges({
],
collection: 1
});
-cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(3)});
+cst.assertNextChangesEqual(
+ {cursor: changeStream, expectedChanges: expectedChanges.slice(secondTxnIdx + 1)});
cst.cleanUp();
}());
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index faa6a816077..e98eef10694 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -180,7 +180,7 @@ expected = [
operationType: "delete",
}
];
-cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected});
jsTestLog("Testing intervening write on another collection");
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js
index 1e9a110c99f..e7e9e862f88 100644
--- a/jstests/change_streams/report_post_batch_resume_token.js
+++ b/jstests/change_streams/report_post_batch_resume_token.js
@@ -7,6 +7,7 @@
"use strict";
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/fixture_helpers.js"); // For isMongos.
// Drop and recreate collections to assure a clean run.
const collName = "report_post_batch_resume_token";
@@ -14,6 +15,13 @@ const testCollection = assertDropAndRecreateCollection(db, collName);
const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);
const adminDB = db.getSiblingDB("admin");
+// Helper function which swallows an assertion if we are running on a sharded cluster.
+assert.eqIfNotMongos = function(val1, val2, errMsg) {
+ if (!FixtureHelpers.isMongos(db)) {
+ assert.eq(val1, val2, errMsg);
+ }
+};
+
let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate.
const batchSize = 2;
@@ -31,16 +39,16 @@ assert.eq(csCursor.objsLeftInBatch(), 0);
initialAggPBRT = csCursor.getResumeToken();
assert.neq(undefined, initialAggPBRT);
-// Write some documents to the test collection.
+// Verify that no events are returned and the PBRT does not advance or go backwards, even as
+// documents are written into the collection.
for (let i = 0; i < 5; ++i) {
+ assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
+ const getMorePBRT = csCursor.getResumeToken();
+ // TODO SERVER-47810: this should also be true on mongoS.
+ assert.eqIfNotMongos(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
assert.commandWorked(testCollection.insert({_id: docId++}));
}
-// Verify that no events are returned and the PBRT does not advance or go backwards.
-assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
-let getMorePBRT = csCursor.getResumeToken();
-assert.eq(bsonWoCompare(initialAggPBRT, getMorePBRT), 0);
-
// Test that postBatchResumeToken is present on empty initial aggregate batch.
csCursor = testCollection.watch();
assert.eq(csCursor.objsLeftInBatch(), 0);
@@ -49,7 +57,7 @@ assert.neq(undefined, initialAggPBRT);
// Test that postBatchResumeToken is present on empty getMore batch.
assert(!csCursor.hasNext()); // Causes a getMore to be dispatched.
-getMorePBRT = csCursor.getResumeToken();
+let getMorePBRT = csCursor.getResumeToken();
assert.neq(undefined, getMorePBRT);
assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
@@ -59,11 +67,13 @@ assert.commandWorked(testCollection.insert({_id: docId++}));
assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched.
assert(csCursor.objsLeftInBatch() == 1);
-// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal
-// to the resume token of the last item in the batch and greater than the initial PBRT.
+// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal to
+// the resume token of the last item in the batch and greater than the initial PBRT.
let resumeTokenFromDoc = csCursor.next()._id;
getMorePBRT = csCursor.getResumeToken();
-assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
+// When running in a sharded passthrough, we cannot guarantee that the retrieved event was the last
+// item in the oplog, and so we cannot assert that the PBRT is equal to the event's resume token.
+assert.eqIfNotMongos(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0);
assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
// Now seed the collection with enough documents to fit in two batches.
@@ -90,7 +100,9 @@ while (csCursor.objsLeftInBatch()) {
resumeTokenFromDoc = eventFromCursor._id;
}
getMorePBRT = csCursor.getResumeToken();
-assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0);
+// When running in a sharded passthrough, we cannot guarantee that the retrieved event was the last
+// item in the oplog, and so we cannot assert that the PBRT is equal to the event's resume token.
+assert.eqIfNotMongos(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0);
assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0);
// Test that postBatchResumeToken advances with writes to an unrelated collection. First make
@@ -140,6 +152,15 @@ assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0);
assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0);
assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0);
+// Sharded collection passthroughs use prepared transactions, which require majority read concern.
+// If the collection is sharded and majority read concern is disabled, skip the transaction tests.
+const rcCmdRes = testCollection.runCommand("find", {readConcern: {level: "majority"}});
+if (FixtureHelpers.isSharded(testCollection) &&
+ rcCmdRes.code === ErrorCodes.ReadConcernMajorityNotEnabled) {
+ jsTestLog("Skipping transaction tests since majority read concern is disabled.");
+ return;
+}
+
// Test that the PBRT is correctly updated when reading events from within a transaction.
csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}});
const session = db.getMongo().startSession();
@@ -165,11 +186,16 @@ assert.eq(csCursor.objsLeftInBatch(), 2);
// The clusterTime should be the same on each, but the resume token keeps advancing.
const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next();
const txnClusterTime = txnEvent1.clusterTime;
-assert.eq(txnEvent2.clusterTime, txnClusterTime);
+// On a sharded cluster, the events in the txn may be spread across multiple shards. Events from
+// each shard will all have the same clusterTime, but the clusterTimes may differ between shards.
+// Therefore, we cannot guarantee that the clusterTime of txnEvent2 is always the same as the
+// clusterTime of txnEvent1, since the events may have occurred on different shards.
+assert.eqIfNotMongos(txnEvent2.clusterTime, txnClusterTime);
assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0);
assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0);
-// The PBRT of the first transaction batch is equal to the last document's resumeToken.
+// The PBRT of the first transaction batch is equal to the last document's resumeToken. We have
+// more events to return from the transaction, and so the PBRT cannot have advanced any further.
getMorePBRT = csCursor.getResumeToken();
assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0);
@@ -181,7 +207,9 @@ assert.eq(csCursor.objsLeftInBatch(), 1);
// The clusterTime of this event is the same as the two events from the previous batch, but its
// resume token is greater than the previous PBRT.
const txnEvent3 = csCursor.next();
-assert.eq(txnEvent3.clusterTime, txnClusterTime);
+// As before, we cannot guarantee that the clusterTime of txnEvent3 is always the same as that of
+// txnEvent1 when running in a sharded cluster. However, the PBRT should advance in any deployment.
+assert.eqIfNotMongos(txnEvent3.clusterTime, txnClusterTime);
assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0);
// Because we wrote to the unrelated collection, the final event in the transaction does not
diff --git a/jstests/libs/auto_retry_transaction_in_sharding.js b/jstests/libs/auto_retry_transaction_in_sharding.js
new file mode 100644
index 00000000000..f81a366970a
--- /dev/null
+++ b/jstests/libs/auto_retry_transaction_in_sharding.js
@@ -0,0 +1,97 @@
+'use strict';
+
+load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js');
+Random.setRandomSeed();
+
+var {
+ withTxnAndAutoRetryOnMongos,
+ retryOnceOnTransientOnMongos,
+ retryOnceOnTransientAndRestartTxnOnMongos
+} = (() => {
+ /**
+ * Runs 'func' inside of a transaction started with 'txnOptions', and automatically retries
+ * until it either succeeds or the server returns a non-TransientTransactionError error
+ * response.
+ *
+ * The caller should take care to ensure 'func' doesn't modify any captured variables in a
+ * speculative fashion where calling it multiple times would lead to unintended behavior. The
+ * transaction started by the withTxnAndAutoRetryOnMongos() function is only known to have
+ * committed after the withTxnAndAutoRetryOnMongos() function returns.
+ *
+ * This behaviour only applies if the client is a mongos
+ *
+ * TODO SERVER-39704: Once completed, the usages of this function should be revisited to
+ * determine whether it is still necessary or the retries performed by MongoS make it
+ * unnecessary
+ */
+ function withTxnAndAutoRetryOnMongos(session, func, txnOptions) {
+ if (session.getClient().isMongos()) {
+ withTxnAndAutoRetry(session, func, {txnOptions});
+ } else {
+ session.startTransaction(txnOptions);
+ func();
+ assert.commandWorked(session.commitTransaction_forTesting());
+ }
+ }
+
+ /**
+ * Runs 'func' and retries it only once if a transient error occurred.
+ *
+ * This behaviour only applies if the client is a mongos
+ *
+ * TODO SERVER-39704: Once completed, the usages of this function should be revisited to
+ * determine whether it is still necessary or the retries performed by MongoS make it
+ * unnecessary
+ */
+ function retryOnceOnTransientOnMongos(session, func) {
+ if (session.getClient().isMongos()) {
+ try {
+ func();
+ } catch (e) {
+ if ((e.hasOwnProperty('errorLabels') &&
+ e.errorLabels.includes('TransientTransactionError'))) {
+ func();
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ func();
+ }
+ }
+
+ /**
+ * Runs 'func' and retries it only once restarting the transaction if a transient
+ * error occurred.
+ *
+ * This behaviour only applies if the client is a mongos
+ *
+ * TODO SERVER-39704: Once completed, the usages of this function should be revisited to
+ * determine whether it is still necessary or the retries performed by MongoS make it
+ * unnecessary
+ */
+ function retryOnceOnTransientAndRestartTxnOnMongos(session, func, txnOptions) {
+ if (session.getClient().isMongos()) {
+ try {
+ func();
+ } catch (e) {
+ if ((e.hasOwnProperty('errorLabels') &&
+ e.errorLabels.includes('TransientTransactionError'))) {
+ session.abortTransaction_forTesting();
+ session.startTransaction(txnOptions);
+ func();
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ func();
+ }
+ }
+
+ return {
+ withTxnAndAutoRetryOnMongos,
+ retryOnceOnTransientOnMongos,
+ retryOnceOnTransientAndRestartTxnOnMongos
+ };
+})(); \ No newline at end of file
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index c505e47f39f..7431a64627c 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -59,27 +59,28 @@ function assertInvalidateOp({cursor, opType}) {
return null;
}
+function pruneOptionalFields(event, expected) {
+ if (!expected.hasOwnProperty("_id"))
+ delete event._id;
+
+ if (!expected.hasOwnProperty("clusterTime"))
+ delete event.clusterTime;
+
+ if (!expected.hasOwnProperty("txnNumber"))
+ delete event.txnNumber;
+
+ if (!expected.hasOwnProperty("lsid"))
+ delete event.lsid;
+
+ return event;
+}
/**
* Helper to check whether a change stream event matches the given expected event. Ignores the
* resume token and clusterTime unless they are explicitly listed in the expectedEvent.
*/
function assertChangeStreamEventEq(actualEvent, expectedEvent) {
- const testEvent = Object.assign({}, actualEvent);
- if (!expectedEvent.hasOwnProperty("_id")) {
- delete testEvent._id; // Remove the resume token, if present.
- }
- if (!expectedEvent.hasOwnProperty("clusterTime")) {
- delete testEvent.clusterTime; // Remove the cluster time, if present.
- }
+ const testEvent = pruneOptionalFields(Object.assign({}, actualEvent), expectedEvent);
- // The change stream transaction passthrough causes operations to have txnNumber and lsid
- // values that the test doesn't expect, which can cause comparisons to fail.
- if (!expectedEvent.hasOwnProperty("txnNumber")) {
- delete testEvent.txnNumber; // Remove the txnNumber, if present.
- }
- if (!expectedEvent.hasOwnProperty("lsid")) {
- delete testEvent.lsid; // Remove the lsid, if present.
- }
assert.docEq(testEvent,
expectedEvent,
"Change did not match expected change. Expected change: " + tojson(expectedEvent) +
@@ -170,6 +171,34 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
return nextBatch[0];
}
+ self.getNextChanges = function(cursor, numChanges, skipFirst) {
+ let changes = [];
+
+ for (let i = 0; i < numChanges; i++) {
+ // Since the first change may be on the original cursor, we need to check for that
+ // change on the cursor before we move the cursor forward.
+ if (i === 0 && !skipFirst) {
+ changes[0] = getNextDocFromCursor(cursor);
+ if (changes[0]) {
+ continue;
+ }
+ }
+
+ assert.soon(
+ () => {
+ cursor = self.getNextBatch(cursor);
+ changes[i] = getNextDocFromCursor(cursor);
+ return changes[i] !== null;
+ },
+ () => {
+ return "timed out waiting for another result from the change stream, observed changes: " +
+ tojson(changes) + ", expected changes: " + numChanges;
+ });
+ }
+
+ return changes;
+ };
+
/**
* Checks if the change has been invalidated.
*/
@@ -205,8 +234,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
*
* Returns a list of the changes seen.
*/
- self.assertNextChangesEqual = function(
- {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) {
+ self.assertNextChangesEqual = function({
+ cursor,
+ expectedChanges,
+ expectedNumChanges,
+ expectInvalidate,
+ skipFirstBatch,
+ ignoreOrder
+ }) {
expectInvalidate = expectInvalidate || false;
skipFirstBatch = skipFirstBatch || false;
@@ -230,25 +265,23 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
expectedNumChanges = expectedChanges.length;
}
- let changes = [];
- for (let i = 0; i < expectedNumChanges; i++) {
- // Since the first change may be on the original cursor, we need to check for that
- // change on the cursor before we move the cursor forward.
- if (i === 0 && !skipFirstBatch) {
- changes[0] = getNextDocFromCursor(cursor);
- if (changes[0]) {
- assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate);
- continue;
- }
+ let changes = self.getNextChanges(cursor, expectedNumChanges, skipFirstBatch);
+ if (ignoreOrder) {
+ const errMsgFunc = () => `${tojson(changes)} != ${tojson(expectedChanges)}`;
+ assert.eq(changes.length, expectedNumChanges, errMsgFunc);
+ for (let i = 0; i < changes.length; i++) {
+ assert(expectedChanges.some(expectedChange => {
+ return _convertExceptionToReturnStatus(() => {
+ assertChangeStreamEventEq(changes[i], expectedChange);
+ return true;
+ })();
+ }),
+ errMsgFunc);
+ }
+ } else {
+ for (let i = 0; i < changes.length; i++) {
+ assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate);
}
-
- assert.soon(function() {
- // We need to replace the cursor variable so we return the correct cursor.
- cursor = self.getNextBatch(cursor);
- changes[i] = getNextDocFromCursor(cursor);
- return changes[i] !== null;
- }, "timed out waiting for another result from the change stream");
- assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate);
}
// If we expect invalidation, the final change should have operation type "invalidate".
@@ -265,6 +298,25 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
};
/**
+ * Iterates through the change stream and asserts that the next changes are the expected ones.
+ * The order of the change events from the cursor relative to their order in the list of
+ * expected changes is ignored, however.
+ *
+ * Returns a list of the changes seen.
+ */
+ self.assertNextChangesEqualUnordered = function(
+ {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) {
+ return self.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: expectedChanges,
+ expectedNumChanges: expectedNumChanges,
+ expectInvalidate: expectInvalidate,
+ skipFirstBatch: skipFirstBatch,
+ ignoreOrder: true
+ });
+ };
+
+ /**
* Retrieves the next batch in the change stream and confirms that it is empty.
*/
self.assertNoChange = function(cursor) {
@@ -277,12 +329,17 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
* If the current batch has a document in it, that one will be ignored.
*/
self.getOneChange = function(cursor, expectInvalidate = false) {
- changes = self.assertNextChangesEqual({
- cursor: cursor,
- expectedNumChanges: 1,
- expectInvalidate: expectInvalidate,
- skipFirstBatch: true
- });
+ changes = self.getNextChanges(cursor, 1, true);
+
+ if (expectInvalidate) {
+ assert(isInvalidated(changes[changes.length - 1]),
+ "Last change was not invalidated when it was expected: " + tojson(changes));
+
+ // We make sure that the next batch kills the cursor after an invalidation entry.
+ let finalCursor = self.getNextBatch(cursor);
+ assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor));
+ }
+
return changes[0];
};