summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Wahlin <james@mongodb.com>2019-03-20 09:36:58 -0400
committerJames Wahlin <james@mongodb.com>2019-03-21 15:22:15 -0400
commite3970d050b4ff6523317616e76c0dc97d87b332e (patch)
treee6c15432104f029f72675f5e8c362a3abe10ead0
parenta51f50784adaa0c86ace974247d4d0c088152f8e (diff)
downloadmongo-e3970d050b4ff6523317616e76c0dc97d87b332e.tar.gz
SERVER-39678 Comprehensive test for resuming a Change Stream with prepared transactions
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--jstests/change_streams/change_stream.js6
-rw-r--r--jstests/libs/change_stream_util.js8
-rw-r--r--jstests/noPassthrough/change_stream_transaction.js209
-rw-r--r--jstests/sharding/change_stream_transaction_sharded.js268
5 files changed, 489 insertions, 3 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index a8b827152ce..e68178a3e81 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -32,6 +32,7 @@ selector:
- jstests/sharding/aggregation_internal_parameters.js
- jstests/sharding/agg_error_reports_shard_host_and_port.js
- jstests/sharding/change_stream_metadata_notifications.js
+ - jstests/sharding/change_stream_transaction_sharded.js
- jstests/sharding/change_streams.js
- jstests/sharding/collation_lookup.js
- jstests/sharding/collation_targeting.js
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 9f41255c599..396504f2439 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -186,7 +186,7 @@
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
assert.writeOK(db.t2.insert({_id: 100, c: 1}));
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: []});
+ cst.assertNoChange(cursor);
expected = {
documentKey: {_id: 100},
fullDocument: {_id: 100, c: 1},
@@ -208,8 +208,8 @@
const dne2cursor =
cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2});
assert.writeOK(db.t2.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"}));
- cst.assertNextChangesEqual({cursor: dne1cursor, expectedChanges: []});
- cst.assertNextChangesEqual({cursor: dne2cursor, expectedChanges: []});
+ cst.assertNoChange(dne1cursor);
+ cst.assertNoChange(dne2cursor);
if (!isMongos) {
jsTestLog("Ensuring attempt to read with legacy operations fails.");
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index dbf30e515dd..685a94f8040 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -256,6 +256,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
};
/**
+ * Retrieves the next batch in the change stream and confirms that it is empty.
+ */
+ self.assertNoChange = function(cursor) {
+ cursor = self.getNextBatch(cursor);
+ assert.eq(0, cursor.nextBatch.length, () => "Cursor had changes: " + tojson(cursor));
+ };
+
+ /**
* Gets the next document in the change stream. This always executes a 'getMore' first.
* If the current batch has a document in it, that one will be ignored.
*/
diff --git a/jstests/noPassthrough/change_stream_transaction.js b/jstests/noPassthrough/change_stream_transaction.js
new file mode 100644
index 00000000000..b8d14d24a0a
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_transaction.js
@@ -0,0 +1,209 @@
+// Confirms that change streams only see committed operations for prepared transactions.
+// @tags: [uses_transactions,uses_change_streams,requires_majority_read_concern]
+(function() {
+ "use strict";
+
+ load("jstests/core/txns/libs/prepare_helpers.js"); // For PrepareHelpers.
+
+ const dbName = "test";
+ const collName = "change_stream_transaction";
+
+ /**
+ * Asserts that the expected operation type and documentKey are found on the change stream
+ * cursor. Returns the change stream document.
+ */
+ function assertWriteVisible(cursor, operationType, documentKey) {
+ assert.soon(() => cursor.hasNext());
+ const changeDoc = cursor.next();
+ assert.eq(operationType, changeDoc.operationType, changeDoc);
+ assert.eq(documentKey, changeDoc.documentKey, changeDoc);
+ return changeDoc;
+ }
+
+ /**
+ * Asserts that the expected operation type and documentKey are found on the change stream
+ * cursor. Pushes the corresponding resume token and change stream document to an array.
+ */
+ function assertWriteVisibleWithCapture(cursor, operationType, documentKey, changeList) {
+ const changeDoc = assertWriteVisible(cursor, operationType, documentKey);
+ changeList.push(changeDoc);
+ }
+
+ /**
+ * Asserts that there are no changes waiting on the change stream cursor.
+ */
+ function assertNoChanges(cursor) {
+ assert(!cursor.hasNext(), () => {
+ return "Unexpected change set: " + tojson(cursor.toArray());
+ });
+ }
+
+ function runTest(conn) {
+ const db = conn.getDB(dbName);
+ const coll = db.getCollection(collName);
+ const unwatchedColl = db.getCollection(collName + "_unwatched");
+ let changeList = [];
+
+ // Collections must be created outside of any transaction.
+ assert.commandWorked(db.createCollection(coll.getName()));
+ assert.commandWorked(db.createCollection(unwatchedColl.getName()));
+
+ //
+ // Start transaction 1.
+ //
+ const session1 = db.getMongo().startSession();
+ const sessionDb1 = session1.getDatabase(dbName);
+ const sessionColl1 = sessionDb1[collName];
+ session1.startTransaction({readConcern: {level: "majority"}});
+
+ //
+ // Start transaction 2.
+ //
+ const session2 = db.getMongo().startSession();
+ const sessionDb2 = session2.getDatabase(dbName);
+ const sessionColl2 = sessionDb2[collName];
+ session2.startTransaction({readConcern: {level: "majority"}});
+
+ //
+ // Start transaction 3.
+ //
+ const session3 = db.getMongo().startSession();
+ const sessionDb3 = session3.getDatabase(dbName);
+ const sessionColl3 = sessionDb3[collName];
+ session3.startTransaction({readConcern: {level: "majority"}});
+
+ // Open a change stream on the test collection.
+ const changeStreamCursor = coll.watch();
+
+ // Insert a document and confirm that the change stream has it.
+ assert.commandWorked(coll.insert({_id: "no-txn-doc-1"}, {writeConcern: {w: "majority"}}));
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "insert", {_id: "no-txn-doc-1"}, changeList);
+
+ // Insert two documents under each transaction and confirm no change stream updates.
+ assert.commandWorked(sessionColl1.insert([{_id: "txn1-doc-1"}, {_id: "txn1-doc-2"}]));
+ assert.commandWorked(sessionColl2.insert([{_id: "txn2-doc-1"}, {_id: "txn2-doc-2"}]));
+ assertNoChanges(changeStreamCursor);
+
+ // Update one document under each transaction and confirm no change stream updates.
+ assert.commandWorked(sessionColl1.update({_id: "txn1-doc-1"}, {$set: {"updated": 1}}));
+ assert.commandWorked(sessionColl2.update({_id: "txn2-doc-1"}, {$set: {"updated": 1}}));
+ assertNoChanges(changeStreamCursor);
+
+ // Update and then remove the second doc under each transaction and confirm no change stream
+ // events are seen.
+ assert.commandWorked(
+ sessionColl1.update({_id: "txn1-doc-2"}, {$set: {"update-before-delete": 1}}));
+ assert.commandWorked(
+ sessionColl2.update({_id: "txn2-doc-2"}, {$set: {"update-before-delete": 1}}));
+ assert.commandWorked(sessionColl1.remove({_id: "txn1-doc-2"}));
+ assert.commandWorked(sessionColl2.remove({_id: "txn2-doc-2"}));
+ assertNoChanges(changeStreamCursor);
+
+ // Perform a write to the 'session1' transaction in a collection that is not being watched
+ // by 'changeStreamCursor'. We do not expect to see this write in the change stream either
+ // now or on commit.
+ assert.commandWorked(
+ sessionDb1[unwatchedColl.getName()].insert({_id: "txn1-doc-unwatched-collection"}));
+ assertNoChanges(changeStreamCursor);
+
+ // Perform a write to the 'session3' transaction in a collection that is not being watched
+ // by 'changeStreamCursor'. We do not expect to see this write in the change stream either
+ // now or on commit.
+ assert.commandWorked(
+ sessionDb3[unwatchedColl.getName()].insert({_id: "txn3-doc-unwatched-collection"}));
+ assertNoChanges(changeStreamCursor);
+
+ // Perform a write outside of a transaction and confirm that the change stream sees only
+ // this write.
+ assert.commandWorked(coll.insert({_id: "no-txn-doc-2"}, {writeConcern: {w: "majority"}}));
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "insert", {_id: "no-txn-doc-2"}, changeList);
+ assertNoChanges(changeStreamCursor);
+
+ let prepareTimestampTxn1;
+ prepareTimestampTxn1 = PrepareHelpers.prepareTransaction(session1);
+ assertNoChanges(changeStreamCursor);
+
+ // TODO SERVER-39036: Change writeConcern to majority. Prior to this ticket a majority write
+ // will block on a prepared transaction. We should also be able to move the check for
+ // document existence prior to the transaction commit with this change.
+ // Perform a write at writeConcern w: local.
+ assert.commandWorked(coll.insert({_id: "no-txn-doc-3"}, {writeConcern: {w: 1}}));
+
+ //
+ // Commit first transaction and confirm expected changes.
+ //
+ assert.commandWorked(
+ PrepareHelpers.commitTransactionAfterPrepareTS(session1, prepareTimestampTxn1));
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "insert", {_id: "no-txn-doc-3"}, changeList);
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "insert", {_id: "txn1-doc-1"}, changeList);
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "insert", {_id: "txn1-doc-2"}, changeList);
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "update", {_id: "txn1-doc-1"}, changeList);
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "update", {_id: "txn1-doc-2"}, changeList);
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "delete", {_id: "txn1-doc-2"}, changeList);
+ assertNoChanges(changeStreamCursor);
+
+ // Transition the second transaction to prepared. We skip capturing the prepare
+ // timestamp it is not required for abortTransaction().
+ PrepareHelpers.prepareTransaction(session2);
+ assertNoChanges(changeStreamCursor);
+
+ // TODO SERVER-39036: Change writeConcern to majority. Prior to this ticket a majority write
+ // will block on a prepared transaction. We should also be able to move the check for
+ // document existence prior to the transaction abort with this change.
+ // Perform a write at writeConcern w: local.
+ assert.commandWorked(coll.insert({_id: "no-txn-doc-4"}, {writeConcern: {w: 1}}));
+
+ //
+ // Abort second transaction.
+ //
+ session2.abortTransaction();
+ assertWriteVisibleWithCapture(
+ changeStreamCursor, "insert", {_id: "no-txn-doc-4"}, changeList);
+ assertNoChanges(changeStreamCursor);
+ changeStreamCursor.close();
+
+ // Test that change stream resume returns the expected set of documents at each point
+ // captured by this test.
+ for (let i = 0; i < changeList.length; ++i) {
+ const resumeCursor = coll.watch([], {startAfter: changeList[i]._id});
+
+ for (let x = (i + 1); x < changeList.length; ++x) {
+ const expectedChangeDoc = changeList[x];
+ assertWriteVisible(
+ resumeCursor, expectedChangeDoc.operationType, expectedChangeDoc.documentKey);
+ }
+
+ assertNoChanges(resumeCursor);
+ resumeCursor.close();
+ }
+
+ //
+ // Prepare and commit the third transaction and confirm that there are no visible changes.
+ //
+ let prepareTimestampTxn3;
+ prepareTimestampTxn3 = PrepareHelpers.prepareTransaction(session3);
+ assertNoChanges(changeStreamCursor);
+
+ assert.commandWorked(
+ PrepareHelpers.commitTransactionAfterPrepareTS(session3, prepareTimestampTxn3));
+ assertNoChanges(changeStreamCursor);
+
+ assert.commandWorked(db.dropDatabase());
+ }
+
+ const rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+
+ runTest(rst.getPrimary());
+
+ rst.stopSet();
+})();
diff --git a/jstests/sharding/change_stream_transaction_sharded.js b/jstests/sharding/change_stream_transaction_sharded.js
new file mode 100644
index 00000000000..8d7349e653d
--- /dev/null
+++ b/jstests/sharding/change_stream_transaction_sharded.js
@@ -0,0 +1,268 @@
+// Confirms that change streams only see committed operations for sharded transactions.
+// @tags: [
+// requires_sharding,
+// uses_change_streams,
+// uses_multi_shard_transaction,
+// uses_transactions,
+// ]
+(function() {
+ "use strict";
+
+ const dbName = "test";
+ const collName = "change_stream_transaction_sharded";
+ const namespace = dbName + "." + collName;
+
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+ });
+
+ const mongosConn = st.s;
+ assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).createIndex({shard: 1}));
+ st.ensurePrimaryShard(dbName, st.shard0.shardName);
+ // Shard the test collection and split it into two chunks: one that contains all {shard: 1}
+ // documents and one that contains all {shard: 2} documents.
+ st.shardColl(collName,
+ {shard: 1} /* shard key */,
+ {shard: 2} /* split at */,
+ {shard: 2} /* move the chunk containing {shard: 2} to its own shard */,
+ dbName,
+ true);
+ // Seed each chunk with an initial document.
+ assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).insert(
+ {shard: 1}, {writeConcern: {w: "majority"}}));
+ assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).insert(
+ {shard: 2}, {writeConcern: {w: "majority"}}));
+
+ const db = mongosConn.getDB(dbName);
+ const coll = db.getCollection(collName);
+ let changeListShard1 = [], changeListShard2 = [];
+
+ //
+ // Start transaction 1.
+ //
+ const session1 = db.getMongo().startSession({causalConsistency: true});
+ const sessionDb1 = session1.getDatabase(dbName);
+ const sessionColl1 = sessionDb1[collName];
+ session1.startTransaction({readConcern: {level: "majority"}});
+
+ //
+ // Start transaction 2.
+ //
+ const session2 = db.getMongo().startSession({causalConsistency: true});
+ const sessionDb2 = session2.getDatabase(dbName);
+ const sessionColl2 = sessionDb2[collName];
+ session2.startTransaction({readConcern: {level: "majority"}});
+
+ /**
+ * Asserts that there are no changes waiting on the change stream cursor.
+ */
+ function assertNoChanges(cursor) {
+ assert(!cursor.hasNext(), () => {
+ return "Unexpected change set: " + tojson(cursor.toArray());
+ });
+ }
+
+ //
+ // Perform writes both in and outside of transactions and confirm that the changes expected are
+ // returned by the change stream.
+ //
+ (function() {
+ /**
+ * Asserts that the expected changes are found on the change stream cursor. Pushes the
+ * corresponding change stream document (with resume token) to an array. When expected
+ * changes are provided for both shards, we must assume that either shard's changes could
+ * come first or that they are interleaved via applyOps index. This is because a cross shard
+ * transaction may commit at a different cluster time on each shard, which impacts the
+ * ordering of the change stream.
+ */
+ function assertWritesVisibleWithCapture(cursor,
+ expectedChangesShard1,
+ expectedChangesShard2,
+ changeCaptureListShard1,
+ changeCaptureListShard2) {
+ function assertChangeEqualWithCapture(changeDoc, expectedChange, changeCaptureList) {
+ assert.eq(expectedChange.operationType, changeDoc.operationType);
+ assert.eq(expectedChange._id, changeDoc.documentKey._id);
+ changeCaptureList.push(changeDoc);
+ }
+
+ while (expectedChangesShard1.length || expectedChangesShard2.length) {
+ assert.soon(() => cursor.hasNext());
+ const changeDoc = cursor.next();
+
+ if (changeDoc.documentKey.shard === 1) {
+ assert(expectedChangesShard1.length);
+ assertChangeEqualWithCapture(
+ changeDoc, expectedChangesShard1[0], changeCaptureListShard1);
+ expectedChangesShard1.shift();
+ } else {
+ assert(changeDoc.documentKey.shard === 2);
+ assert(expectedChangesShard2.length);
+ assertChangeEqualWithCapture(
+ changeDoc, expectedChangesShard2[0], changeCaptureListShard2);
+ expectedChangesShard2.shift();
+ }
+ }
+
+ assertNoChanges(cursor);
+ }
+
+ // Open a change stream on the test collection.
+ const changeStreamCursor = coll.watch();
+
+ // Insert a document and confirm that the change stream has it.
+ assert.commandWorked(
+ coll.insert({shard: 1, _id: "no-txn-doc-1"}, {writeConcern: {w: "majority"}}));
+ assertWritesVisibleWithCapture(changeStreamCursor,
+ [{operationType: "insert", _id: "no-txn-doc-1"}],
+ [],
+ changeListShard1,
+ changeListShard2);
+
+ // Insert two documents under each transaction and confirm no change stream updates.
+ assert.commandWorked(
+ sessionColl1.insert([{shard: 1, _id: "txn1-doc-1"}, {shard: 2, _id: "txn1-doc-2"}]));
+ assert.commandWorked(
+ sessionColl2.insert([{shard: 1, _id: "txn2-doc-1"}, {shard: 2, _id: "txn2-doc-2"}]));
+ assertNoChanges(changeStreamCursor);
+
+ // Update one document under each transaction and confirm no change stream updates.
+ assert.commandWorked(
+ sessionColl1.update({shard: 1, _id: "txn1-doc-1"}, {$set: {"updated": 1}}));
+ assert.commandWorked(
+ sessionColl2.update({shard: 2, _id: "txn2-doc-2"}, {$set: {"updated": 1}}));
+ assertNoChanges(changeStreamCursor);
+
+ // Update and then remove second doc under each transaction.
+ assert.commandWorked(sessionColl1.update({shard: 2, _id: "txn1-doc-2"},
+ {$set: {"update-before-delete": 1}}));
+ assert.commandWorked(sessionColl2.update({shard: 1, _id: "txn2-doc-1"},
+ {$set: {"update-before-delete": 1}}));
+ assert.commandWorked(sessionColl1.remove({shard: 2, _id: "txn1-doc-2"}));
+ assert.commandWorked(sessionColl2.remove({shard: 1, _id: "txn2-doc-2"}));
+ assertNoChanges(changeStreamCursor);
+
+ // Perform a write outside of a transaction and confirm that the change stream sees only
+ // this write.
+ assert.commandWorked(
+ coll.insert({shard: 2, _id: "no-txn-doc-2"}, {writeConcern: {w: "majority"}}));
+ assertWritesVisibleWithCapture(changeStreamCursor,
+ [],
+ [{operationType: "insert", _id: "no-txn-doc-2"}],
+ changeListShard1,
+ changeListShard2);
+ assertNoChanges(changeStreamCursor);
+
+ // Perform a write outside of the transaction.
+ assert.commandWorked(
+ coll.insert({shard: 1, _id: "no-txn-doc-3"}, {writeConcern: {w: "majority"}}));
+
+ // Commit first transaction and confirm that the change stream sees the changes expected
+ // from each shard.
+ session1.commitTransaction();
+ assertWritesVisibleWithCapture(changeStreamCursor,
+ [
+ {operationType: "insert", _id: "no-txn-doc-3"},
+ {operationType: "insert", _id: "txn1-doc-1"},
+ {operationType: "update", _id: "txn1-doc-1"}
+ ],
+ [
+ {operationType: "insert", _id: "txn1-doc-2"},
+ {operationType: "update", _id: "txn1-doc-2"},
+ {operationType: "delete", _id: "txn1-doc-2"}
+ ],
+ changeListShard1,
+ changeListShard2);
+ assertNoChanges(changeStreamCursor);
+
+ // Perform a write outside of the transaction.
+ assert.commandWorked(
+ coll.insert({shard: 2, _id: "no-txn-doc-4"}, {writeConcern: {w: "majority"}}));
+
+ // Abort second transaction and confirm that the change stream sees only the previous
+ // non-transaction write.
+ session2.abortTransaction();
+ assertWritesVisibleWithCapture(changeStreamCursor,
+ [],
+ [{operationType: "insert", _id: "no-txn-doc-4"}],
+ changeListShard1,
+ changeListShard2);
+ assertNoChanges(changeStreamCursor);
+ changeStreamCursor.close();
+ })();
+
+ //
+ // Open a change stream at each resume point captured for the previous writes. Confirm that the
+ // documents returned match what was returned for the initial change stream.
+ //
+ (function() {
+
+ /**
+ * Iterates over a list of changes and returns the index of the change whose resume token is
+ * higher than that of 'changeDoc'. It is expected that 'changeList' entries at this index
+ * and beyond will be included in a change stream resumed at 'changeDoc._id'.
+ */
+ function getPostTokenChangeIndex(changeDoc, changeList) {
+ for (let i = 0; i < changeList.length; ++i) {
+ if (changeDoc._id._data < changeList[i]._id._data) {
+ return i;
+ }
+ }
+
+ return changeList.length;
+ }
+
+ /**
+ * Confirms that the change represented by 'changeDoc' exists in 'shardChangeList' at index
+ * 'changeListIndex'.
+ */
+ function shardHasDocumentAtChangeListIndex(changeDoc, shardChangeList, changeListIndex) {
+ assert(changeListIndex < shardChangeList.length);
+
+ const expectedChangeDoc = shardChangeList[changeListIndex];
+ assert.eq(changeDoc, expectedChangeDoc);
+ assert.eq(expectedChangeDoc.documentKey,
+ changeDoc.documentKey,
+ tojson(changeDoc) + ", " + tojson(expectedChangeDoc));
+ }
+
+ /**
+ * Test that change stream returns the expected set of documuments when resumed from each
+ * point captured by 'changeList'.
+ */
+ function confirmResumeForChangeList(changeList, changeListShard1, changeListShard2) {
+ for (let i = 0; i < changeList.length; ++i) {
+ const resumeDoc = changeList[i];
+ let indexShard1 = getPostTokenChangeIndex(resumeDoc, changeListShard1);
+ let indexShard2 = getPostTokenChangeIndex(resumeDoc, changeListShard2);
+ const resumeCursor = coll.watch([], {startAfter: resumeDoc._id});
+
+ while ((indexShard1 + indexShard2) <
+ (changeListShard1.length + changeListShard2.length)) {
+ assert.soon(() => resumeCursor.hasNext());
+ const changeDoc = resumeCursor.next();
+
+ if (changeDoc.documentKey.shard === 1) {
+ shardHasDocumentAtChangeListIndex(
+ changeDoc, changeListShard1, indexShard1++);
+ } else {
+ assert(changeDoc.documentKey.shard === 2);
+ shardHasDocumentAtChangeListIndex(
+ changeDoc, changeListShard2, indexShard2++);
+ }
+ }
+
+ assertNoChanges(resumeCursor);
+ resumeCursor.close();
+ }
+ }
+
+ // Confirm that the sequence of events returned by the stream is consistent when resuming
+ // from any point in the stream on either shard.
+ confirmResumeForChangeList(changeListShard1, changeListShard1, changeListShard2);
+ confirmResumeForChangeList(changeListShard2, changeListShard1, changeListShard2);
+ })();
+
+ st.stop();
+})();