diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2020-04-02 10:37:26 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-03 18:08:47 +0000 |
commit | c7220a8080388da758230240a94ca0a15156148b (patch) | |
tree | 8ed48297dc23336fa9d11dac6ce84ae4feb0c566 | |
parent | 71bb4d6683a27d2e5c484618433639f66e0c1bd7 (diff) | |
download | mongo-c7220a8080388da758230240a94ca0a15156148b.tar.gz |
SERVER-46819 Allow transactions in change stream sharded passthroughs
(cherry picked from commit 8c9563e56b429bf609f47ac3a6f36920dd6807f3)
16 files changed, 464 insertions, 179 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index 62e5d1a2e1c..a70f485b5fc 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -12,8 +12,6 @@ selector: ## # "Cowardly refusing to override write concern of command: ..." - assumes_write_concern_unchanged - # Transactions not supported on sharded clusters. - - uses_transactions # Exclude any that assume sharding is disabled - assumes_against_mongod_not_mongos diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml index 05357a578b5..9e82e7a67a5 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -12,8 +12,6 @@ selector: ## # "Cowardly refusing to override write concern of command: ..." - assumes_write_concern_unchanged - # Transactions not supported on sharded clusters. - - uses_transactions # Exclude any that assume sharding is disabled - assumes_against_mongod_not_mongos diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml index e1dc1e9a6fa..71a51f3ef09 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml @@ -25,8 +25,6 @@ selector: # "Cowardly refusing to run test with overridden read preference when it reads from a # non-replicated collection: ..." - assumes_read_preference_unchanged - # Transactions not supported on sharded cluster. - - uses_transactions # Exclude any that assume sharding is disabled - assumes_against_mongod_not_mongos diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml index 7a7cdd7320f..873cbcb8b6a 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -13,8 +13,6 @@ selector: ## # "Cowardly refusing to override write concern of command: ..." - assumes_write_concern_unchanged - # Transactions not supported on sharded clusters. - - uses_transactions # Exclude any that assume sharding is disabled - assumes_against_mongod_not_mongos - assumes_unsharded_collection diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml index 968d2cd6b13..b154b5a6819 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml @@ -12,8 +12,6 @@ selector: ## # "Cowardly refusing to override write concern of command: ..." - assumes_write_concern_unchanged - # Transactions not supported on sharded clusters. - - uses_transactions # Not relevant for whole-cluster change streams. - do_not_run_in_whole_cluster_passthrough # Exclude any that assume sharding is disabled diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml index 368b1d4d538..c646f91cf8c 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml @@ -25,8 +25,6 @@ selector: # "Cowardly refusing to run test with overridden read preference when it reads from a # non-replicated collection: ..." - assumes_read_preference_unchanged - # Transactions not supported on sharded cluster. - - uses_transactions # Not relevant for whole-cluster change streams. - do_not_run_in_whole_cluster_passthrough # Exclude any that assume sharding is disabled diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml index 4d38135bc0f..b2df0199282 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml @@ -12,8 +12,6 @@ selector: ## # "Cowardly refusing to override write concern of command: ..." - assumes_write_concern_unchanged - # Transactions not supported on sharded clusters. - - uses_transactions # Not relevant for whole-cluster change streams. - do_not_run_in_whole_cluster_passthrough # Exclude any that assume sharding is disabled diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml index d9cd88e7a98..286881ae831 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml @@ -15,8 +15,6 @@ selector: ## # "Cowardly refusing to override write concern of command: ..." - assumes_write_concern_unchanged - # Transactions not supported on sharded clusters. - - uses_transactions # Exclude any that assume sharding is disabled - assumes_against_mongod_not_mongos diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml index 88f63128bab..7f327fe56e5 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml @@ -27,8 +27,6 @@ selector: # "Cowardly refusing to run test with overridden read preference when it reads from a # non-replicated collection: ..." - assumes_read_preference_unchanged - # Transactions not supported on sharded cluster. - - uses_transactions # Exclude any that assume sharding is disabled - assumes_against_mongod_not_mongos diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml index a861822f3b2..f99f31f4ae2 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml @@ -15,8 +15,6 @@ selector: ## # "Cowardly refusing to override write concern of command: ..." - assumes_write_concern_unchanged - # Transactions not supported on sharded clusters. - - uses_transactions # Exclude any that assume sharding is disabled - assumes_against_mongod_not_mongos - assumes_unsharded_collection diff --git a/jstests/change_streams/apply_ops.js b/jstests/change_streams/apply_ops.js index fa232f77f1b..e8a13088337 100644 --- a/jstests/change_streams/apply_ops.js +++ b/jstests/change_streams/apply_ops.js @@ -1,11 +1,13 @@ // Tests that a change stream will correctly unwind applyOps entries generated by a transaction. -// @tags: [uses_transactions] +// @tags: [uses_transactions, requires_snapshot_read, requires_majority_read_concern] (function() { "use strict"; -load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. -load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/libs/auto_retry_transaction_in_sharding.js"); // For withTxnAndAutoRetryOnMongos. +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos. const otherCollName = "change_stream_apply_ops_2"; const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops"); @@ -17,8 +19,14 @@ assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName); // Insert a document that gets deleted as part of the transaction. const kDeletedDocumentId = 0; -coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"}, - {writeConcern: {w: "majority"}}); +const insertRes = assert.commandWorked(coll.runCommand("insert", { + documents: [{_id: kDeletedDocumentId, a: "I was here before the transaction"}], + writeConcern: {w: "majority"} +})); + +// Record the clusterTime of the insert, and increment it to give the test start time. +const testStartTime = insertRes.$clusterTime.clusterTime; +testStartTime.i++; let cst = new ChangeStreamTest(db); let changeStream = cst.startWatchingChanges({ @@ -31,38 +39,49 @@ let changeStream = cst.startWatchingChanges({ const sessionOptions = { causalConsistency: false }; +const txnOptions = { + readConcern: {level: "snapshot"}, + writeConcern: {w: "majority"} +}; + const session = db.getMongo().startSession(sessionOptions); + +// Create these variables before starting the transaction. In sharded passthroughs, accessing +// db[collname] may attempt to implicitly shard the collection, which is not allowed in a txn. const sessionDb = session.getDatabase(db.getName()); const sessionColl = sessionDb[coll.getName()]; - -session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}}); -assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); -assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); - -// One insert on a collection that we're not watching. This should be skipped by the -// single-collection changestream. -assert.commandWorked(sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"})); - -// One insert on a collection in a different database. This should be skipped by the single -// collection and single-db changestreams. -assert.commandWorked( - session.getDatabase(otherDbName)[otherDbCollName].insert({_id: 222, a: "Doc on other DB"})); - -assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); - -assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId})); - -assert.commandWorked(session.commitTransaction_forTesting()); - -// Do applyOps on the collection that we care about. This is an "external" applyOps, though -// (not run as part of a transaction) so its entries should be skipped in the change -// stream. This checks that applyOps that don't have an 'lsid' and 'txnNumber' field do not -// get unwound. -assert.commandWorked(db.runCommand({ - applyOps: [ - {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}}, - ] -})); +const sessionOtherColl = sessionDb[otherCollName]; +const sessionOtherDbColl = session.getDatabase(otherDbName)[otherDbCollName]; + +withTxnAndAutoRetryOnMongos(session, () => { + // Two inserts on the main test collection. + assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); + assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); + + // One insert on a collection that we're not watching. This should be skipped by the + // single-collection changestream. + assert.commandWorked(sessionOtherColl.insert({_id: 111, a: "Doc on other collection"})); + + // One insert on a collection in a different database. This should be skipped by the single + // collection and single-db changestreams. + assert.commandWorked(sessionOtherDbColl.insert({_id: 222, a: "Doc on other DB"})); + + assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); + + assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId})); +}, txnOptions); + +// Do applyOps on the collection that we care about. This is an "external" applyOps, though (not run +// as part of a transaction) so its entries should be skipped in the change stream. This checks that +// applyOps that don't have an 'lsid' and 'txnNumber' field do not get unwound. Skip if running in a +// sharded passthrough, since the applyOps command does not exist on mongoS. +if (!FixtureHelpers.isMongos(db)) { + assert.commandWorked(db.runCommand({ + applyOps: [ + {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}}, + ] + })); +} // Drop the collection. This will trigger an "invalidate" event at the end of the stream. assert.commandWorked(db.runCommand({drop: coll.getName()})); @@ -106,18 +125,39 @@ const expectedChanges = [ }, ]; +// If we are running in a sharded passthrough, then this may have been a multi-shard transaction. +// Change streams will interleave the txn events from across the shards in (clusterTime, txnOpIndex) +// order, and so may not reflect the ordering of writes in the test. We thus verify that exactly the +// expected set of events are observed, but we relax the ordering requirements. +function assertNextChangesEqual({cursor, expectedChanges, expectInvalidate}) { + const assertEqualFunc = FixtureHelpers.isMongos(db) ? cst.assertNextChangesEqualUnordered + : cst.assertNextChangesEqual; + return assertEqualFunc( + {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: expectInvalidate}); +} + +// +// Test behavior of single-collection change streams with apply ops. +// + // Verify that the stream returns the expected sequence of changes. -const changes = - cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); +const changes = assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); + // Single collection change stream should also be invalidated by the drop. -cst.assertNextChangesEqual({ +assertNextChangesEqual({ cursor: changeStream, expectedChanges: [{operationType: "invalidate"}], expectInvalidate: true }); -// Obtain the clusterTime from the first change. -const startTime = changes[0].clusterTime; +// +// Test behavior of whole-db change streams with apply ops. +// + +// In a sharded cluster, whole-db-or-cluster streams will see a collection drop from each shard. +for (let i = 1; i < FixtureHelpers.numberOfShardsForCollection(coll); ++i) { + expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}); +} // Add an entry for the insert on db.otherColl into expectedChanges. expectedChanges.splice(2, 0, { @@ -132,10 +172,14 @@ expectedChanges.splice(2, 0, { // Verify that a whole-db stream returns the expected sequence of changes, including the insert // on the other collection but NOT the changes on the other DB or the manual applyOps. changeStream = cst.startWatchingChanges({ - pipeline: [{$changeStream: {startAtOperationTime: startTime}}, {$project: {"lsid.uid": 0}}], + pipeline: [{$changeStream: {startAtOperationTime: testStartTime}}, {$project: {"lsid.uid": 0}}], collection: 1 }); -cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); +assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); + +// +// Test behavior of whole-cluster change streams with apply ops. +// // Add an entry for the insert on otherDb.otherDbColl into expectedChanges. expectedChanges.splice(3, 0, { @@ -152,12 +196,12 @@ expectedChanges.splice(3, 0, { cst = new ChangeStreamTest(db.getSiblingDB("admin")); changeStream = cst.startWatchingChanges({ pipeline: [ - {$changeStream: {startAtOperationTime: startTime, allChangesForCluster: true}}, + {$changeStream: {startAtOperationTime: testStartTime, allChangesForCluster: true}}, {$project: {"lsid.uid": 0}} ], collection: 1 }); -cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); +assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); cst.cleanUp(); }()); diff --git a/jstests/change_streams/apply_ops_resumability.js b/jstests/change_streams/apply_ops_resumability.js index bf581d40ee1..5bd0de87047 100644 --- a/jstests/change_streams/apply_ops_resumability.js +++ b/jstests/change_streams/apply_ops_resumability.js @@ -1,11 +1,14 @@ -// Tests that a change stream will correctly unwind applyOps entries generated by a transaction. -// @tags: [uses_transactions] +// Tests that a change stream will correctly unwind applyOps entries generated by a transaction, and +// that we can resume from any point within the transaction. +// @tags: [uses_transactions, requires_snapshot_read, requires_majority_read_concern] (function() { "use strict"; -load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. -load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/libs/auto_retry_transaction_in_sharding.js"); // For withTxnAndAutoRetryOnMongos. +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos. const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops"); const otherCollName = "change_stream_apply_ops_2"; @@ -19,6 +22,9 @@ let cst = new ChangeStreamTest(db); let changeStream = cst.startWatchingChanges( {pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], collection: coll}); +// Record the clusterTime at the outset of the test, before any writes are performed. +const testStartTime = db.isMaster().$clusterTime.clusterTime; + // Do an insert outside of a transaction. assert.commandWorked(coll.insert({_id: 0, a: 123})); @@ -26,32 +32,45 @@ assert.commandWorked(coll.insert({_id: 0, a: 123})); const sessionOptions = { causalConsistency: false }; +const txnOptions = { + readConcern: {level: "snapshot"}, + writeConcern: {w: "majority"} +}; + const session = db.getMongo().startSession(sessionOptions); + +// Create these variables before starting the transaction. In sharded passthroughs, accessing +// db[collname] may attempt to implicitly shard the collection, which is not allowed in a txn. const sessionDb = session.getDatabase(db.getName()); const sessionColl = sessionDb[coll.getName()]; +const sessionOtherColl = sessionDb[otherCollName]; +const sessionOtherDbColl = session.getDatabase(otherDbName)[otherDbCollName]; -session.startTransaction({readConcern: {level: "snapshot"}, writeConcern: {w: "majority"}}); -assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); -assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); +withTxnAndAutoRetryOnMongos(session, () => { + // Two inserts on the main test collection. + assert.commandWorked(sessionColl.insert({_id: 1, a: 0})); + assert.commandWorked(sessionColl.insert({_id: 2, a: 0})); -// One insert on a collection that we're not watching. This should be skipped by the -// single-collection change stream. -assert.commandWorked(sessionDb[otherCollName].insert({_id: 111, a: "Doc on other collection"})); + // One insert on a collection that we're not watching. This should be skipped by the + // single-collection change stream. + assert.commandWorked(sessionOtherColl.insert({_id: 111, a: "Doc on other collection"})); -// One insert on a collection in a different database. This should be skipped by the single -// collection and single-db changestreams. -assert.commandWorked( - session.getDatabase(otherDbName)[otherDbCollName].insert({_id: 222, a: "Doc on other DB"})); + // One insert on a collection in a different database. This should be skipped by the single + // collection and single-db changestreams. + assert.commandWorked(sessionOtherDbColl.insert({_id: 222, a: "Doc on other DB"})); -assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); - -assert.commandWorked(session.commitTransaction_forTesting()); + assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}})); +}, txnOptions); // Now insert another document, not part of a transaction. assert.commandWorked(coll.insert({_id: 3, a: 123})); -// Define the set of changes expected for the single-collection case per the operations above. -const expectedChanges = [ +// Drop the collection. This will trigger a "drop" event, which in the case of the single-collection +// stream will be followed by an "invalidate". +assert.commandWorked(db.runCommand({drop: coll.getName()})); + +// Define the set of all changes expected to be generated by the operations above. +let expectedChanges = [ { documentKey: {_id: 0}, fullDocument: {_id: 0, a: 123}, @@ -75,6 +94,22 @@ const expectedChanges = [ txnNumber: session.getTxnNumber_forTesting(), }, { + documentKey: {_id: 111}, + fullDocument: {_id: 111, a: "Doc on other collection"}, + ns: {db: db.getName(), coll: otherCollName}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: session.getTxnNumber_forTesting(), + }, + { + documentKey: {_id: 222}, + fullDocument: {_id: 222, a: "Doc on other DB"}, + ns: {db: otherDbName, coll: otherDbCollName}, + operationType: "insert", + lsid: session.getSessionId(), + txnNumber: session.getTxnNumber_forTesting(), + }, + { documentKey: {_id: 1}, ns: {db: db.getName(), coll: coll.getName()}, operationType: "update", @@ -88,18 +123,66 @@ const expectedChanges = [ ns: {db: db.getName(), coll: coll.getName()}, operationType: "insert", }, + {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}} ]; +// Validate that we observe all expected changes in the stream, and replace the'expectedChanges' +// list with the changes returned by ChangeStreamTest. These will include the _id resume tokens for +// each change, so subsequent tests will be able to resume from any point. +(function validateExpectedChangesAndPopulateResumeTokens() { + const wholeClusterCST = new ChangeStreamTest(db.getSiblingDB("admin")); + const wholeClusterCursor = wholeClusterCST.startWatchingChanges({ + pipeline: [ + {$changeStream: {startAtOperationTime: testStartTime, allChangesForCluster: true}}, + {$project: {"lsid.uid": 0}} + ], + collection: 1 + }); + // If we are running in a sharded passthrough, then this may have been a multi-shard txn. Change + // streams will interleave the txn events from across the shards in (clusterTime, txnOpIndex) + // order, and so may not reflect the ordering of writes in the test. The ordering of events is + // important for later tests, so if we are running on mongoS we verify that exactly the expected + // set of events are observed, and then we adopt the order in which they were returned. + if (FixtureHelpers.isMongos(db)) { + expectedChanges = wholeClusterCST.assertNextChangesEqualUnordered( + {cursor: wholeClusterCursor, expectedChanges: expectedChanges}); + } else { + expectedChanges = wholeClusterCST.assertNextChangesEqual( + {cursor: wholeClusterCursor, expectedChanges: expectedChanges}); + } +})(); + +// Helper function to find the first non-transaction event and the first two transaction events in +// the given list of change stream events. +function findMilestoneEvents(eventList) { + const nonTxnIdx = eventList.findIndex(event => !event.lsid), + firstTxnIdx = eventList.findIndex(event => event.lsid), + secondTxnIdx = eventList.findIndex((event, idx) => (idx > firstTxnIdx && event.lsid)); + // Return the array indices of each event, and the events themselves. + return [ + nonTxnIdx, + firstTxnIdx, + secondTxnIdx, + eventList[nonTxnIdx], + eventList[firstTxnIdx], + eventList[secondTxnIdx] + ]; +} + // // Test behavior of single-collection change streams with apply ops. // +// Filter out any events that aren't on the main test collection namespace. +const expectedSingleCollChanges = expectedChanges.filter( + event => (event.ns.db === db.getName() && event.ns.coll === coll.getName())); + // Verify that the stream returns the expected sequence of changes. -const changes = - cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); +cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedSingleCollChanges}); -// Record the first (non-transaction) change and the first in-transaction change. -const nonTxnChange = changes[0], firstTxnChange = changes[1], secondTxnChange = changes[2]; +// Obtain the first non-transaction change and the first two in-transaction changes. +let [nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] = + findMilestoneEvents(expectedSingleCollChanges); // Resume after the first non-transaction change. Be sure we see the documents from the // transaction again. @@ -107,16 +190,18 @@ changeStream = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: nonTxnChange._id}}, {$project: {"lsid.uid": 0}}], collection: coll }); -cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(1)}); +cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedSingleCollChanges.slice(nonTxnIdx + 1)}); // Resume after the first transaction change. Be sure we see the second change again. changeStream = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: firstTxnChange._id}}, {$project: {"lsid.uid": 0}}], collection: coll }); -cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(2)}); +cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedSingleCollChanges.slice(firstTxnIdx + 1)}); -// Try starting another change stream from the _last_ change caused by the transaction. Verify +// Try starting another change stream from the second change caused by the transaction. Verify // that we can see the insert performed after the transaction was committed. let otherCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}], @@ -124,34 +209,28 @@ let otherCursor = cst.startWatchingChanges({ doNotModifyInPassthroughs: true // A collection drop only invalidates single-collection // change streams. }); -cst.assertNextChangesEqual({cursor: otherCursor, expectedChanges: expectedChanges.slice(3)}); +cst.assertNextChangesEqual( + {cursor: otherCursor, expectedChanges: expectedSingleCollChanges.slice(secondTxnIdx + 1)}); -// Drop the collection. This will trigger a "drop" followed by an "invalidate" for the single -// collection change stream. -assert.commandWorked(db.runCommand({drop: coll.getName()})); -let change = cst.getOneChange(otherCursor); -assert.eq(change.operationType, "drop"); -assert.eq(change.ns, {db: db.getName(), coll: coll.getName()}); -change = cst.getOneChange(otherCursor, true); -assert.eq(change.operationType, "invalidate"); +// Verify that the next event observed by the stream is an invalidate following the collection drop. +const invalidateEvent = cst.getOneChange(otherCursor, true); +assert.eq(invalidateEvent.operationType, "invalidate"); // // Test behavior of whole-db change streams with apply ops. // -// For a whole-db or whole-cluster change stream, the collection drop should return a single -// "drop" entry and not invalidate the stream. -expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}); - -// Add an entry for the insert on db.otherColl into expectedChanges. -expectedChanges.splice(3, 0, { - documentKey: {_id: 111}, - fullDocument: {_id: 111, a: "Doc on other collection"}, - ns: {db: db.getName(), coll: otherCollName}, - operationType: "insert", - lsid: session.getSessionId(), - txnNumber: session.getTxnNumber_forTesting(), -}); +// In a sharded cluster, whole-db-or-cluster streams will see a collection drop from each shard. +for (let i = 1; i < FixtureHelpers.numberOfShardsForCollection(coll); ++i) { + expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}); +} + +// Filter out any events that aren't on the main test database. +const expectedSingleDBChanges = expectedChanges.filter(event => (event.ns.db === db.getName())); + +// Obtain the first non-transaction change and the first two in-transaction changes. +[nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] = + findMilestoneEvents(expectedSingleDBChanges); // Verify that a whole-db stream can be resumed from the middle of the transaction, and that it // will see all subsequent changes including the insert on the other collection but NOT the @@ -160,17 +239,16 @@ changeStream = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}], collection: 1, }); -cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(3)}); - -// Add an entry for the insert on otherDb.otherDbColl into expectedChanges. -expectedChanges.splice(4, 0, { - documentKey: {_id: 222}, - fullDocument: {_id: 222, a: "Doc on other DB"}, - ns: {db: otherDbName, coll: otherDbCollName}, - operationType: "insert", - lsid: session.getSessionId(), - txnNumber: session.getTxnNumber_forTesting(), -}); +cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedSingleDBChanges.slice(secondTxnIdx + 1)}); + +// +// Test behavior of whole-cluster change streams with apply ops. +// + +// Obtain the first non-transaction change and the first two in-transaction changes. +[nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] = + findMilestoneEvents(expectedChanges); // Verify that a whole-cluster stream can be resumed from the middle of the transaction, and // that it will see all subsequent changes including the insert on the other collection and the @@ -183,7 +261,8 @@ changeStream = cst.startWatchingChanges({ ], collection: 1 }); -cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(3)}); +cst.assertNextChangesEqual( + {cursor: changeStream, expectedChanges: expectedChanges.slice(secondTxnIdx + 1)}); cst.cleanUp(); }()); diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index faa6a816077..e98eef10694 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -180,7 +180,7 @@ expected = [ operationType: "delete", } ]; -cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); +cst.assertNextChangesEqualUnordered({cursor: cursor, expectedChanges: expected}); jsTestLog("Testing intervening write on another collection"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); diff --git a/jstests/change_streams/report_post_batch_resume_token.js b/jstests/change_streams/report_post_batch_resume_token.js index 1e9a110c99f..e7e9e862f88 100644 --- a/jstests/change_streams/report_post_batch_resume_token.js +++ b/jstests/change_streams/report_post_batch_resume_token.js @@ -7,6 +7,7 @@ "use strict"; load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/libs/fixture_helpers.js"); // For isMongos. // Drop and recreate collections to assure a clean run. const collName = "report_post_batch_resume_token"; @@ -14,6 +15,13 @@ const testCollection = assertDropAndRecreateCollection(db, collName); const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName); const adminDB = db.getSiblingDB("admin"); +// Helper function which swallows an assertion if we are running on a sharded cluster. +assert.eqIfNotMongos = function(val1, val2, errMsg) { + if (!FixtureHelpers.isMongos(db)) { + assert.eq(val1, val2, errMsg); + } +}; + let docId = 0; // Tracks _id of documents inserted to ensure that we do not duplicate. const batchSize = 2; @@ -31,16 +39,16 @@ assert.eq(csCursor.objsLeftInBatch(), 0); initialAggPBRT = csCursor.getResumeToken(); assert.neq(undefined, initialAggPBRT); -// Write some documents to the test collection. +// Verify that no events are returned and the PBRT does not advance or go backwards, even as +// documents are written into the collection. for (let i = 0; i < 5; ++i) { + assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. + const getMorePBRT = csCursor.getResumeToken(); + // TODO SERVER-47810: this should also be true on mongoS. + assert.eqIfNotMongos(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); assert.commandWorked(testCollection.insert({_id: docId++})); } -// Verify that no events are returned and the PBRT does not advance or go backwards. -assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. -let getMorePBRT = csCursor.getResumeToken(); -assert.eq(bsonWoCompare(initialAggPBRT, getMorePBRT), 0); - // Test that postBatchResumeToken is present on empty initial aggregate batch. csCursor = testCollection.watch(); assert.eq(csCursor.objsLeftInBatch(), 0); @@ -49,7 +57,7 @@ assert.neq(undefined, initialAggPBRT); // Test that postBatchResumeToken is present on empty getMore batch. assert(!csCursor.hasNext()); // Causes a getMore to be dispatched. -getMorePBRT = csCursor.getResumeToken(); +let getMorePBRT = csCursor.getResumeToken(); assert.neq(undefined, getMorePBRT); assert.gte(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); @@ -59,11 +67,13 @@ assert.commandWorked(testCollection.insert({_id: docId++})); assert.soon(() => csCursor.hasNext()); // Causes a getMore to be dispatched. assert(csCursor.objsLeftInBatch() == 1); -// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal -// to the resume token of the last item in the batch and greater than the initial PBRT. +// Because the retrieved event is the most recent entry in the oplog, the PBRT should be equal to +// the resume token of the last item in the batch and greater than the initial PBRT. let resumeTokenFromDoc = csCursor.next()._id; getMorePBRT = csCursor.getResumeToken(); -assert.eq(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); +// When running in a sharded passthrough, we cannot guarantee that the retrieved event was the last +// item in the oplog, and so we cannot assert that the PBRT is equal to the event's resume token. +assert.eqIfNotMongos(bsonWoCompare(getMorePBRT, resumeTokenFromDoc), 0); assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); // Now seed the collection with enough documents to fit in two batches. @@ -90,7 +100,9 @@ while (csCursor.objsLeftInBatch()) { resumeTokenFromDoc = eventFromCursor._id; } getMorePBRT = csCursor.getResumeToken(); -assert.eq(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0); +// When running in a sharded passthrough, we cannot guarantee that the retrieved event was the last +// item in the oplog, and so we cannot assert that the PBRT is equal to the event's resume token. +assert.eqIfNotMongos(bsonWoCompare(resumeTokenFromDoc, getMorePBRT), 0); assert.gt(bsonWoCompare(getMorePBRT, initialAggPBRT), 0); // Test that postBatchResumeToken advances with writes to an unrelated collection. First make @@ -140,6 +152,15 @@ assert.gte(bsonWoCompare(previousGetMorePBRT, resumeTokenFromDoc), 0); assert.gt(bsonWoCompare(resumeTokenFromSecondDoc, previousGetMorePBRT), 0); assert.gte(bsonWoCompare(getMorePBRT, resumeTokenFromSecondDoc), 0); +// Sharded collection passthroughs use prepared transactions, which require majority read concern. +// If the collection is sharded and majority read concern is disabled, skip the transaction tests. +const rcCmdRes = testCollection.runCommand("find", {readConcern: {level: "majority"}}); +if (FixtureHelpers.isSharded(testCollection) && + rcCmdRes.code === ErrorCodes.ReadConcernMajorityNotEnabled) { + jsTestLog("Skipping transaction tests since majority read concern is disabled."); + return; +} + // Test that the PBRT is correctly updated when reading events from within a transaction. csCursor = testCollection.watch([], {cursor: {batchSize: batchSize}}); const session = db.getMongo().startSession(); @@ -165,11 +186,16 @@ assert.eq(csCursor.objsLeftInBatch(), 2); // The clusterTime should be the same on each, but the resume token keeps advancing. const txnEvent1 = csCursor.next(), txnEvent2 = csCursor.next(); const txnClusterTime = txnEvent1.clusterTime; -assert.eq(txnEvent2.clusterTime, txnClusterTime); +// On a sharded cluster, the events in the txn may be spread across multiple shards. Events from +// each shard will all have the same clusterTime, but the clusterTimes may differ between shards. +// Therefore, we cannot guarantee that the clusterTime of txnEvent2 is always the same as the +// clusterTime of txnEvent1, since the events may have occurred on different shards. +assert.eqIfNotMongos(txnEvent2.clusterTime, txnClusterTime); assert.gt(bsonWoCompare(txnEvent1._id, previousGetMorePBRT), 0); assert.gt(bsonWoCompare(txnEvent2._id, txnEvent1._id), 0); -// The PBRT of the first transaction batch is equal to the last document's resumeToken. +// The PBRT of the first transaction batch is equal to the last document's resumeToken. We have +// more events to return from the transaction, and so the PBRT cannot have advanced any further. getMorePBRT = csCursor.getResumeToken(); assert.eq(bsonWoCompare(getMorePBRT, txnEvent2._id), 0); @@ -181,7 +207,9 @@ assert.eq(csCursor.objsLeftInBatch(), 1); // The clusterTime of this event is the same as the two events from the previous batch, but its // resume token is greater than the previous PBRT. const txnEvent3 = csCursor.next(); -assert.eq(txnEvent3.clusterTime, txnClusterTime); +// As before, we cannot guarantee that the clusterTime of txnEvent3 is always the same as that of +// txnEvent1 when running in a sharded cluster. However, the PBRT should advance in any deployment. +assert.eqIfNotMongos(txnEvent3.clusterTime, txnClusterTime); assert.gt(bsonWoCompare(txnEvent3._id, previousGetMorePBRT), 0); // Because we wrote to the unrelated collection, the final event in the transaction does not diff --git a/jstests/libs/auto_retry_transaction_in_sharding.js b/jstests/libs/auto_retry_transaction_in_sharding.js new file mode 100644 index 00000000000..f81a366970a --- /dev/null +++ b/jstests/libs/auto_retry_transaction_in_sharding.js @@ -0,0 +1,97 @@ +'use strict'; + +load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js'); +Random.setRandomSeed(); + +var { + withTxnAndAutoRetryOnMongos, + retryOnceOnTransientOnMongos, + retryOnceOnTransientAndRestartTxnOnMongos +} = (() => { + /** + * Runs 'func' inside of a transaction started with 'txnOptions', and automatically retries + * until it either succeeds or the server returns a non-TransientTransactionError error + * response. + * + * The caller should take care to ensure 'func' doesn't modify any captured variables in a + * speculative fashion where calling it multiple times would lead to unintended behavior. The + * transaction started by the withTxnAndAutoRetryOnMongos() function is only known to have + * committed after the withTxnAndAutoRetryOnMongos() function returns. + * + * This behaviour only applies if the client is a mongos + * + * TODO SERVER-39704: Once completed, the usages of this function should be revisited to + * determine whether it is still necessary or the retries performed by MongoS make it + * unnecessary + */ + function withTxnAndAutoRetryOnMongos(session, func, txnOptions) { + if (session.getClient().isMongos()) { + withTxnAndAutoRetry(session, func, {txnOptions}); + } else { + session.startTransaction(txnOptions); + func(); + assert.commandWorked(session.commitTransaction_forTesting()); + } + } + + /** + * Runs 'func' and retries it only once if a transient error occurred. + * + * This behaviour only applies if the client is a mongos + * + * TODO SERVER-39704: Once completed, the usages of this function should be revisited to + * determine whether it is still necessary or the retries performed by MongoS make it + * unnecessary + */ + function retryOnceOnTransientOnMongos(session, func) { + if (session.getClient().isMongos()) { + try { + func(); + } catch (e) { + if ((e.hasOwnProperty('errorLabels') && + e.errorLabels.includes('TransientTransactionError'))) { + func(); + } else { + throw e; + } + } + } else { + func(); + } + } + + /** + * Runs 'func' and retries it only once restarting the transaction if a transient + * error occurred. + * + * This behaviour only applies if the client is a mongos + * + * TODO SERVER-39704: Once completed, the usages of this function should be revisited to + * determine whether it is still necessary or the retries performed by MongoS make it + * unnecessary + */ + function retryOnceOnTransientAndRestartTxnOnMongos(session, func, txnOptions) { + if (session.getClient().isMongos()) { + try { + func(); + } catch (e) { + if ((e.hasOwnProperty('errorLabels') && + e.errorLabels.includes('TransientTransactionError'))) { + session.abortTransaction_forTesting(); + session.startTransaction(txnOptions); + func(); + } else { + throw e; + } + } + } else { + func(); + } + } + + return { + withTxnAndAutoRetryOnMongos, + retryOnceOnTransientOnMongos, + retryOnceOnTransientAndRestartTxnOnMongos + }; +})();
\ No newline at end of file diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index c505e47f39f..7431a64627c 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -59,27 +59,28 @@ function assertInvalidateOp({cursor, opType}) { return null; } +function pruneOptionalFields(event, expected) { + if (!expected.hasOwnProperty("_id")) + delete event._id; + + if (!expected.hasOwnProperty("clusterTime")) + delete event.clusterTime; + + if (!expected.hasOwnProperty("txnNumber")) + delete event.txnNumber; + + if (!expected.hasOwnProperty("lsid")) + delete event.lsid; + + return event; +} /** * Helper to check whether a change stream event matches the given expected event. Ignores the * resume token and clusterTime unless they are explicitly listed in the expectedEvent. */ function assertChangeStreamEventEq(actualEvent, expectedEvent) { - const testEvent = Object.assign({}, actualEvent); - if (!expectedEvent.hasOwnProperty("_id")) { - delete testEvent._id; // Remove the resume token, if present. - } - if (!expectedEvent.hasOwnProperty("clusterTime")) { - delete testEvent.clusterTime; // Remove the cluster time, if present. - } + const testEvent = pruneOptionalFields(Object.assign({}, actualEvent), expectedEvent); - // The change stream transaction passthrough causes operations to have txnNumber and lsid - // values that the test doesn't expect, which can cause comparisons to fail. - if (!expectedEvent.hasOwnProperty("txnNumber")) { - delete testEvent.txnNumber; // Remove the txnNumber, if present. - } - if (!expectedEvent.hasOwnProperty("lsid")) { - delete testEvent.lsid; // Remove the lsid, if present. - } assert.docEq(testEvent, expectedEvent, "Change did not match expected change. Expected change: " + tojson(expectedEvent) + @@ -170,6 +171,34 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { return nextBatch[0]; } + self.getNextChanges = function(cursor, numChanges, skipFirst) { + let changes = []; + + for (let i = 0; i < numChanges; i++) { + // Since the first change may be on the original cursor, we need to check for that + // change on the cursor before we move the cursor forward. + if (i === 0 && !skipFirst) { + changes[0] = getNextDocFromCursor(cursor); + if (changes[0]) { + continue; + } + } + + assert.soon( + () => { + cursor = self.getNextBatch(cursor); + changes[i] = getNextDocFromCursor(cursor); + return changes[i] !== null; + }, + () => { + return "timed out waiting for another result from the change stream, observed changes: " + + tojson(changes) + ", expected changes: " + numChanges; + }); + } + + return changes; + }; + /** * Checks if the change has been invalidated. */ @@ -205,8 +234,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * * Returns a list of the changes seen. */ - self.assertNextChangesEqual = function( - {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) { + self.assertNextChangesEqual = function({ + cursor, + expectedChanges, + expectedNumChanges, + expectInvalidate, + skipFirstBatch, + ignoreOrder + }) { expectInvalidate = expectInvalidate || false; skipFirstBatch = skipFirstBatch || false; @@ -230,25 +265,23 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { expectedNumChanges = expectedChanges.length; } - let changes = []; - for (let i = 0; i < expectedNumChanges; i++) { - // Since the first change may be on the original cursor, we need to check for that - // change on the cursor before we move the cursor forward. - if (i === 0 && !skipFirstBatch) { - changes[0] = getNextDocFromCursor(cursor); - if (changes[0]) { - assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate); - continue; - } + let changes = self.getNextChanges(cursor, expectedNumChanges, skipFirstBatch); + if (ignoreOrder) { + const errMsgFunc = () => `${tojson(changes)} != ${tojson(expectedChanges)}`; + assert.eq(changes.length, expectedNumChanges, errMsgFunc); + for (let i = 0; i < changes.length; i++) { + assert(expectedChanges.some(expectedChange => { + return _convertExceptionToReturnStatus(() => { + assertChangeStreamEventEq(changes[i], expectedChange); + return true; + })(); + }), + errMsgFunc); + } + } else { + for (let i = 0; i < changes.length; i++) { + assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate); } - - assert.soon(function() { - // We need to replace the cursor variable so we return the correct cursor. - cursor = self.getNextBatch(cursor); - changes[i] = getNextDocFromCursor(cursor); - return changes[i] !== null; - }, "timed out waiting for another result from the change stream"); - assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate); } // If we expect invalidation, the final change should have operation type "invalidate". @@ -265,6 +298,25 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { }; /** + * Iterates through the change stream and asserts that the next changes are the expected ones. + * The order of the change events from the cursor relative to their order in the list of + * expected changes is ignored, however. + * + * Returns a list of the changes seen. + */ + self.assertNextChangesEqualUnordered = function( + {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) { + return self.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: expectedChanges, + expectedNumChanges: expectedNumChanges, + expectInvalidate: expectInvalidate, + skipFirstBatch: skipFirstBatch, + ignoreOrder: true + }); + }; + + /** * Retrieves the next batch in the change stream and confirms that it is empty. */ self.assertNoChange = function(cursor) { @@ -277,12 +329,17 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * If the current batch has a document in it, that one will be ignored. */ self.getOneChange = function(cursor, expectInvalidate = false) { - changes = self.assertNextChangesEqual({ - cursor: cursor, - expectedNumChanges: 1, - expectInvalidate: expectInvalidate, - skipFirstBatch: true - }); + changes = self.getNextChanges(cursor, 1, true); + + if (expectInvalidate) { + assert(isInvalidated(changes[changes.length - 1]), + "Last change was not invalidated when it was expected: " + tojson(changes)); + + // We make sure that the next batch kills the cursor after an invalidation entry. + let finalCursor = self.getNextBatch(cursor); + assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor)); + } + return changes[0]; }; |