diff options
author | James Wahlin <james@mongodb.com> | 2019-03-20 09:36:58 -0400 |
---|---|---|
committer | James Wahlin <james@mongodb.com> | 2019-03-21 15:22:15 -0400 |
commit | e3970d050b4ff6523317616e76c0dc97d87b332e (patch) | |
tree | e6c15432104f029f72675f5e8c362a3abe10ead0 | |
parent | a51f50784adaa0c86ace974247d4d0c088152f8e (diff) | |
download | mongo-e3970d050b4ff6523317616e76c0dc97d87b332e.tar.gz |
SERVER-39678 Comprehensive test for resuming a Change Stream with prepared transactions
5 files changed, 489 insertions, 3 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index a8b827152ce..e68178a3e81 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -32,6 +32,7 @@ selector: - jstests/sharding/aggregation_internal_parameters.js - jstests/sharding/agg_error_reports_shard_host_and_port.js - jstests/sharding/change_stream_metadata_notifications.js + - jstests/sharding/change_stream_transaction_sharded.js - jstests/sharding/change_streams.js - jstests/sharding/collation_lookup.js - jstests/sharding/collation_targeting.js diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 9f41255c599..396504f2439 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -186,7 +186,7 @@ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); assert.writeOK(db.t2.insert({_id: 100, c: 1})); - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: []}); + cst.assertNoChange(cursor); expected = { documentKey: {_id: 100}, fullDocument: {_id: 100, c: 1}, @@ -208,8 +208,8 @@ const dne2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2}); assert.writeOK(db.t2.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); - cst.assertNextChangesEqual({cursor: dne1cursor, expectedChanges: []}); - cst.assertNextChangesEqual({cursor: dne2cursor, expectedChanges: []}); + cst.assertNoChange(dne1cursor); + cst.assertNoChange(dne2cursor); if (!isMongos) { jsTestLog("Ensuring attempt to read with legacy operations fails."); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index dbf30e515dd..685a94f8040 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -256,6 +256,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { }; /** + * Retrieves the next batch in the change stream and confirms that it is empty. + */ + self.assertNoChange = function(cursor) { + cursor = self.getNextBatch(cursor); + assert.eq(0, cursor.nextBatch.length, () => "Cursor had changes: " + tojson(cursor)); + }; + + /** * Gets the next document in the change stream. This always executes a 'getMore' first. * If the current batch has a document in it, that one will be ignored. */ diff --git a/jstests/noPassthrough/change_stream_transaction.js b/jstests/noPassthrough/change_stream_transaction.js new file mode 100644 index 00000000000..b8d14d24a0a --- /dev/null +++ b/jstests/noPassthrough/change_stream_transaction.js @@ -0,0 +1,209 @@ +// Confirms that change streams only see committed operations for prepared transactions. +// @tags: [uses_transactions,uses_change_streams,requires_majority_read_concern] +(function() { + "use strict"; + + load("jstests/core/txns/libs/prepare_helpers.js"); // For PrepareHelpers. + + const dbName = "test"; + const collName = "change_stream_transaction"; + + /** + * Asserts that the expected operation type and documentKey are found on the change stream + * cursor. Returns the change stream document. + */ + function assertWriteVisible(cursor, operationType, documentKey) { + assert.soon(() => cursor.hasNext()); + const changeDoc = cursor.next(); + assert.eq(operationType, changeDoc.operationType, changeDoc); + assert.eq(documentKey, changeDoc.documentKey, changeDoc); + return changeDoc; + } + + /** + * Asserts that the expected operation type and documentKey are found on the change stream + * cursor. Pushes the corresponding resume token and change stream document to an array. + */ + function assertWriteVisibleWithCapture(cursor, operationType, documentKey, changeList) { + const changeDoc = assertWriteVisible(cursor, operationType, documentKey); + changeList.push(changeDoc); + } + + /** + * Asserts that there are no changes waiting on the change stream cursor. + */ + function assertNoChanges(cursor) { + assert(!cursor.hasNext(), () => { + return "Unexpected change set: " + tojson(cursor.toArray()); + }); + } + + function runTest(conn) { + const db = conn.getDB(dbName); + const coll = db.getCollection(collName); + const unwatchedColl = db.getCollection(collName + "_unwatched"); + let changeList = []; + + // Collections must be created outside of any transaction. + assert.commandWorked(db.createCollection(coll.getName())); + assert.commandWorked(db.createCollection(unwatchedColl.getName())); + + // + // Start transaction 1. + // + const session1 = db.getMongo().startSession(); + const sessionDb1 = session1.getDatabase(dbName); + const sessionColl1 = sessionDb1[collName]; + session1.startTransaction({readConcern: {level: "majority"}}); + + // + // Start transaction 2. + // + const session2 = db.getMongo().startSession(); + const sessionDb2 = session2.getDatabase(dbName); + const sessionColl2 = sessionDb2[collName]; + session2.startTransaction({readConcern: {level: "majority"}}); + + // + // Start transaction 3. + // + const session3 = db.getMongo().startSession(); + const sessionDb3 = session3.getDatabase(dbName); + const sessionColl3 = sessionDb3[collName]; + session3.startTransaction({readConcern: {level: "majority"}}); + + // Open a change stream on the test collection. + const changeStreamCursor = coll.watch(); + + // Insert a document and confirm that the change stream has it. + assert.commandWorked(coll.insert({_id: "no-txn-doc-1"}, {writeConcern: {w: "majority"}})); + assertWriteVisibleWithCapture( + changeStreamCursor, "insert", {_id: "no-txn-doc-1"}, changeList); + + // Insert two documents under each transaction and confirm no change stream updates. + assert.commandWorked(sessionColl1.insert([{_id: "txn1-doc-1"}, {_id: "txn1-doc-2"}])); + assert.commandWorked(sessionColl2.insert([{_id: "txn2-doc-1"}, {_id: "txn2-doc-2"}])); + assertNoChanges(changeStreamCursor); + + // Update one document under each transaction and confirm no change stream updates. + assert.commandWorked(sessionColl1.update({_id: "txn1-doc-1"}, {$set: {"updated": 1}})); + assert.commandWorked(sessionColl2.update({_id: "txn2-doc-1"}, {$set: {"updated": 1}})); + assertNoChanges(changeStreamCursor); + + // Update and then remove the second doc under each transaction and confirm no change stream + // events are seen. + assert.commandWorked( + sessionColl1.update({_id: "txn1-doc-2"}, {$set: {"update-before-delete": 1}})); + assert.commandWorked( + sessionColl2.update({_id: "txn2-doc-2"}, {$set: {"update-before-delete": 1}})); + assert.commandWorked(sessionColl1.remove({_id: "txn1-doc-2"})); + assert.commandWorked(sessionColl2.remove({_id: "txn2-doc-2"})); + assertNoChanges(changeStreamCursor); + + // Perform a write to the 'session1' transaction in a collection that is not being watched + // by 'changeStreamCursor'. We do not expect to see this write in the change stream either + // now or on commit. + assert.commandWorked( + sessionDb1[unwatchedColl.getName()].insert({_id: "txn1-doc-unwatched-collection"})); + assertNoChanges(changeStreamCursor); + + // Perform a write to the 'session3' transaction in a collection that is not being watched + // by 'changeStreamCursor'. We do not expect to see this write in the change stream either + // now or on commit. + assert.commandWorked( + sessionDb3[unwatchedColl.getName()].insert({_id: "txn3-doc-unwatched-collection"})); + assertNoChanges(changeStreamCursor); + + // Perform a write outside of a transaction and confirm that the change stream sees only + // this write. + assert.commandWorked(coll.insert({_id: "no-txn-doc-2"}, {writeConcern: {w: "majority"}})); + assertWriteVisibleWithCapture( + changeStreamCursor, "insert", {_id: "no-txn-doc-2"}, changeList); + assertNoChanges(changeStreamCursor); + + let prepareTimestampTxn1; + prepareTimestampTxn1 = PrepareHelpers.prepareTransaction(session1); + assertNoChanges(changeStreamCursor); + + // TODO SERVER-39036: Change writeConcern to majority. Prior to this ticket a majority write + // will block on a prepared transaction. We should also be able to move the check for + // document existence prior to the transaction commit with this change. + // Perform a write at writeConcern w: local. + assert.commandWorked(coll.insert({_id: "no-txn-doc-3"}, {writeConcern: {w: 1}})); + + // + // Commit first transaction and confirm expected changes. + // + assert.commandWorked( + PrepareHelpers.commitTransactionAfterPrepareTS(session1, prepareTimestampTxn1)); + assertWriteVisibleWithCapture( + changeStreamCursor, "insert", {_id: "no-txn-doc-3"}, changeList); + assertWriteVisibleWithCapture( + changeStreamCursor, "insert", {_id: "txn1-doc-1"}, changeList); + assertWriteVisibleWithCapture( + changeStreamCursor, "insert", {_id: "txn1-doc-2"}, changeList); + assertWriteVisibleWithCapture( + changeStreamCursor, "update", {_id: "txn1-doc-1"}, changeList); + assertWriteVisibleWithCapture( + changeStreamCursor, "update", {_id: "txn1-doc-2"}, changeList); + assertWriteVisibleWithCapture( + changeStreamCursor, "delete", {_id: "txn1-doc-2"}, changeList); + assertNoChanges(changeStreamCursor); + + // Transition the second transaction to prepared. We skip capturing the prepare + // timestamp it is not required for abortTransaction(). + PrepareHelpers.prepareTransaction(session2); + assertNoChanges(changeStreamCursor); + + // TODO SERVER-39036: Change writeConcern to majority. Prior to this ticket a majority write + // will block on a prepared transaction. We should also be able to move the check for + // document existence prior to the transaction abort with this change. + // Perform a write at writeConcern w: local. + assert.commandWorked(coll.insert({_id: "no-txn-doc-4"}, {writeConcern: {w: 1}})); + + // + // Abort second transaction. + // + session2.abortTransaction(); + assertWriteVisibleWithCapture( + changeStreamCursor, "insert", {_id: "no-txn-doc-4"}, changeList); + assertNoChanges(changeStreamCursor); + changeStreamCursor.close(); + + // Test that change stream resume returns the expected set of documents at each point + // captured by this test. + for (let i = 0; i < changeList.length; ++i) { + const resumeCursor = coll.watch([], {startAfter: changeList[i]._id}); + + for (let x = (i + 1); x < changeList.length; ++x) { + const expectedChangeDoc = changeList[x]; + assertWriteVisible( + resumeCursor, expectedChangeDoc.operationType, expectedChangeDoc.documentKey); + } + + assertNoChanges(resumeCursor); + resumeCursor.close(); + } + + // + // Prepare and commit the third transaction and confirm that there are no visible changes. + // + let prepareTimestampTxn3; + prepareTimestampTxn3 = PrepareHelpers.prepareTransaction(session3); + assertNoChanges(changeStreamCursor); + + assert.commandWorked( + PrepareHelpers.commitTransactionAfterPrepareTS(session3, prepareTimestampTxn3)); + assertNoChanges(changeStreamCursor); + + assert.commandWorked(db.dropDatabase()); + } + + const rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + + runTest(rst.getPrimary()); + + rst.stopSet(); +})(); diff --git a/jstests/sharding/change_stream_transaction_sharded.js b/jstests/sharding/change_stream_transaction_sharded.js new file mode 100644 index 00000000000..8d7349e653d --- /dev/null +++ b/jstests/sharding/change_stream_transaction_sharded.js @@ -0,0 +1,268 @@ +// Confirms that change streams only see committed operations for sharded transactions. +// @tags: [ +// requires_sharding, +// uses_change_streams, +// uses_multi_shard_transaction, +// uses_transactions, +// ] +(function() { + "use strict"; + + const dbName = "test"; + const collName = "change_stream_transaction_sharded"; + const namespace = dbName + "." + collName; + + const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} + }); + + const mongosConn = st.s; + assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).createIndex({shard: 1})); + st.ensurePrimaryShard(dbName, st.shard0.shardName); + // Shard the test collection and split it into two chunks: one that contains all {shard: 1} + // documents and one that contains all {shard: 2} documents. + st.shardColl(collName, + {shard: 1} /* shard key */, + {shard: 2} /* split at */, + {shard: 2} /* move the chunk containing {shard: 2} to its own shard */, + dbName, + true); + // Seed each chunk with an initial document. + assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).insert( + {shard: 1}, {writeConcern: {w: "majority"}})); + assert.commandWorked(mongosConn.getDB(dbName).getCollection(collName).insert( + {shard: 2}, {writeConcern: {w: "majority"}})); + + const db = mongosConn.getDB(dbName); + const coll = db.getCollection(collName); + let changeListShard1 = [], changeListShard2 = []; + + // + // Start transaction 1. + // + const session1 = db.getMongo().startSession({causalConsistency: true}); + const sessionDb1 = session1.getDatabase(dbName); + const sessionColl1 = sessionDb1[collName]; + session1.startTransaction({readConcern: {level: "majority"}}); + + // + // Start transaction 2. + // + const session2 = db.getMongo().startSession({causalConsistency: true}); + const sessionDb2 = session2.getDatabase(dbName); + const sessionColl2 = sessionDb2[collName]; + session2.startTransaction({readConcern: {level: "majority"}}); + + /** + * Asserts that there are no changes waiting on the change stream cursor. + */ + function assertNoChanges(cursor) { + assert(!cursor.hasNext(), () => { + return "Unexpected change set: " + tojson(cursor.toArray()); + }); + } + + // + // Perform writes both in and outside of transactions and confirm that the changes expected are + // returned by the change stream. + // + (function() { + /** + * Asserts that the expected changes are found on the change stream cursor. Pushes the + * corresponding change stream document (with resume token) to an array. When expected + * changes are provided for both shards, we must assume that either shard's changes could + * come first or that they are interleaved via applyOps index. This is because a cross shard + * transaction may commit at a different cluster time on each shard, which impacts the + * ordering of the change stream. + */ + function assertWritesVisibleWithCapture(cursor, + expectedChangesShard1, + expectedChangesShard2, + changeCaptureListShard1, + changeCaptureListShard2) { + function assertChangeEqualWithCapture(changeDoc, expectedChange, changeCaptureList) { + assert.eq(expectedChange.operationType, changeDoc.operationType); + assert.eq(expectedChange._id, changeDoc.documentKey._id); + changeCaptureList.push(changeDoc); + } + + while (expectedChangesShard1.length || expectedChangesShard2.length) { + assert.soon(() => cursor.hasNext()); + const changeDoc = cursor.next(); + + if (changeDoc.documentKey.shard === 1) { + assert(expectedChangesShard1.length); + assertChangeEqualWithCapture( + changeDoc, expectedChangesShard1[0], changeCaptureListShard1); + expectedChangesShard1.shift(); + } else { + assert(changeDoc.documentKey.shard === 2); + assert(expectedChangesShard2.length); + assertChangeEqualWithCapture( + changeDoc, expectedChangesShard2[0], changeCaptureListShard2); + expectedChangesShard2.shift(); + } + } + + assertNoChanges(cursor); + } + + // Open a change stream on the test collection. + const changeStreamCursor = coll.watch(); + + // Insert a document and confirm that the change stream has it. + assert.commandWorked( + coll.insert({shard: 1, _id: "no-txn-doc-1"}, {writeConcern: {w: "majority"}})); + assertWritesVisibleWithCapture(changeStreamCursor, + [{operationType: "insert", _id: "no-txn-doc-1"}], + [], + changeListShard1, + changeListShard2); + + // Insert two documents under each transaction and confirm no change stream updates. + assert.commandWorked( + sessionColl1.insert([{shard: 1, _id: "txn1-doc-1"}, {shard: 2, _id: "txn1-doc-2"}])); + assert.commandWorked( + sessionColl2.insert([{shard: 1, _id: "txn2-doc-1"}, {shard: 2, _id: "txn2-doc-2"}])); + assertNoChanges(changeStreamCursor); + + // Update one document under each transaction and confirm no change stream updates. + assert.commandWorked( + sessionColl1.update({shard: 1, _id: "txn1-doc-1"}, {$set: {"updated": 1}})); + assert.commandWorked( + sessionColl2.update({shard: 2, _id: "txn2-doc-2"}, {$set: {"updated": 1}})); + assertNoChanges(changeStreamCursor); + + // Update and then remove second doc under each transaction. + assert.commandWorked(sessionColl1.update({shard: 2, _id: "txn1-doc-2"}, + {$set: {"update-before-delete": 1}})); + assert.commandWorked(sessionColl2.update({shard: 1, _id: "txn2-doc-1"}, + {$set: {"update-before-delete": 1}})); + assert.commandWorked(sessionColl1.remove({shard: 2, _id: "txn1-doc-2"})); + assert.commandWorked(sessionColl2.remove({shard: 1, _id: "txn2-doc-2"})); + assertNoChanges(changeStreamCursor); + + // Perform a write outside of a transaction and confirm that the change stream sees only + // this write. + assert.commandWorked( + coll.insert({shard: 2, _id: "no-txn-doc-2"}, {writeConcern: {w: "majority"}})); + assertWritesVisibleWithCapture(changeStreamCursor, + [], + [{operationType: "insert", _id: "no-txn-doc-2"}], + changeListShard1, + changeListShard2); + assertNoChanges(changeStreamCursor); + + // Perform a write outside of the transaction. + assert.commandWorked( + coll.insert({shard: 1, _id: "no-txn-doc-3"}, {writeConcern: {w: "majority"}})); + + // Commit first transaction and confirm that the change stream sees the changes expected + // from each shard. + session1.commitTransaction(); + assertWritesVisibleWithCapture(changeStreamCursor, + [ + {operationType: "insert", _id: "no-txn-doc-3"}, + {operationType: "insert", _id: "txn1-doc-1"}, + {operationType: "update", _id: "txn1-doc-1"} + ], + [ + {operationType: "insert", _id: "txn1-doc-2"}, + {operationType: "update", _id: "txn1-doc-2"}, + {operationType: "delete", _id: "txn1-doc-2"} + ], + changeListShard1, + changeListShard2); + assertNoChanges(changeStreamCursor); + + // Perform a write outside of the transaction. + assert.commandWorked( + coll.insert({shard: 2, _id: "no-txn-doc-4"}, {writeConcern: {w: "majority"}})); + + // Abort second transaction and confirm that the change stream sees only the previous + // non-transaction write. + session2.abortTransaction(); + assertWritesVisibleWithCapture(changeStreamCursor, + [], + [{operationType: "insert", _id: "no-txn-doc-4"}], + changeListShard1, + changeListShard2); + assertNoChanges(changeStreamCursor); + changeStreamCursor.close(); + })(); + + // + // Open a change stream at each resume point captured for the previous writes. Confirm that the + // documents returned match what was returned for the initial change stream. + // + (function() { + + /** + * Iterates over a list of changes and returns the index of the change whose resume token is + * higher than that of 'changeDoc'. It is expected that 'changeList' entries at this index + * and beyond will be included in a change stream resumed at 'changeDoc._id'. + */ + function getPostTokenChangeIndex(changeDoc, changeList) { + for (let i = 0; i < changeList.length; ++i) { + if (changeDoc._id._data < changeList[i]._id._data) { + return i; + } + } + + return changeList.length; + } + + /** + * Confirms that the change represented by 'changeDoc' exists in 'shardChangeList' at index + * 'changeListIndex'. + */ + function shardHasDocumentAtChangeListIndex(changeDoc, shardChangeList, changeListIndex) { + assert(changeListIndex < shardChangeList.length); + + const expectedChangeDoc = shardChangeList[changeListIndex]; + assert.eq(changeDoc, expectedChangeDoc); + assert.eq(expectedChangeDoc.documentKey, + changeDoc.documentKey, + tojson(changeDoc) + ", " + tojson(expectedChangeDoc)); + } + + /** + * Test that change stream returns the expected set of documuments when resumed from each + * point captured by 'changeList'. + */ + function confirmResumeForChangeList(changeList, changeListShard1, changeListShard2) { + for (let i = 0; i < changeList.length; ++i) { + const resumeDoc = changeList[i]; + let indexShard1 = getPostTokenChangeIndex(resumeDoc, changeListShard1); + let indexShard2 = getPostTokenChangeIndex(resumeDoc, changeListShard2); + const resumeCursor = coll.watch([], {startAfter: resumeDoc._id}); + + while ((indexShard1 + indexShard2) < + (changeListShard1.length + changeListShard2.length)) { + assert.soon(() => resumeCursor.hasNext()); + const changeDoc = resumeCursor.next(); + + if (changeDoc.documentKey.shard === 1) { + shardHasDocumentAtChangeListIndex( + changeDoc, changeListShard1, indexShard1++); + } else { + assert(changeDoc.documentKey.shard === 2); + shardHasDocumentAtChangeListIndex( + changeDoc, changeListShard2, indexShard2++); + } + } + + assertNoChanges(resumeCursor); + resumeCursor.close(); + } + } + + // Confirm that the sequence of events returned by the stream is consistent when resuming + // from any point in the stream on either shard. + confirmResumeForChangeList(changeListShard1, changeListShard1, changeListShard2); + confirmResumeForChangeList(changeListShard2, changeListShard1, changeListShard2); + })(); + + st.stop(); +})(); |