diff options
14 files changed, 322 insertions, 13 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml new file mode 100644 index 00000000000..ff7c8170176 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml @@ -0,0 +1,66 @@ +test_kind: js_test + +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # Parallel Shell - we do not signal the override to end a txn when a parallel shell closes. + - jstests/change_streams/only_wake_getmore_for_relevant_changes.js + exclude_with_any_tags: + # These tests would fail with "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + # No need to use a passthrough to add transactions to a test that already has its own + # transactions. + - uses_transactions + # These tests make assumptions about change stream results that are no longer true once operations + # get bundled into transactions. + - change_stream_does_not_expect_txns + +executor: + archive: + hooks: + - CheckReplDBHash + - CheckReplOplogs + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + networkErrorAndTxnOverrideConfig: + wrapCRUDinTransactions: true + # Enable the transactions passthrough. + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/enable_sessions.js'); + load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js'); + load('jstests/libs/override_methods/network_error_and_txn_override.js'); + readMode: commands + hooks: + # The CheckReplDBHash hook waits until all operations have replicated to and have been applied + # on the secondaries, so we run the ValidateCollections hook after it to ensure we're + # validating the entire contents of the collection. + - class: CheckReplOplogs + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ShardedClusterFixture + # Use two shards to make sure we will only talk to the primary shard for the database and will + # not delay changes to wait for notifications or a clock advancement from other shards. + num_shards: 2 + mongos_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + mongod_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + periodicNoopIntervalSecs: 1 + writePeriodicNoops: true + num_rs_nodes_per_shard: 1 + # This test suite doesn't actually shard any collections, but enabling sharding will prevent + # read commands against non-existent databases from unconditionally returning a CursorId of 0. + enable_sharding: + - test diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_passthrough.yml new file mode 100644 index 00000000000..15730882c82 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_passthrough.yml @@ -0,0 +1,53 @@ +test_kind: js_test + +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # Parallel Shell - we do not signal the override to end a txn when a parallel shell closes. + - jstests/change_streams/only_wake_getmore_for_relevant_changes.js + exclude_with_any_tags: + # These tests would fail with "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + # No need to use a passthrough to add transactions to a test that already has its own + # transactions. + - uses_transactions + # These tests make assumptions about change stream results that are no longer true once operations + # get bundled into transactions. + - change_stream_does_not_expect_txns + +executor: + archive: + hooks: + - CheckReplDBHash + - CheckReplOplogs + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + networkErrorAndTxnOverrideConfig: + wrapCRUDinTransactions: true + # Enable the transactions passthrough. + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/enable_sessions.js'); + load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js'); + load('jstests/libs/override_methods/network_error_and_txn_override.js'); + readMode: commands + hooks: + # The CheckReplDBHash hook waits until all operations have replicated to and have been applied + # on the secondaries, so we run the ValidateCollections hook after it to ensure we're + # validating the entire contents of the collection. + - class: CheckReplOplogs + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ReplicaSetFixture + mongod_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + num_nodes: 1 diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml new file mode 100644 index 00000000000..db182de7dac --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml @@ -0,0 +1,62 @@ +test_kind: js_test + +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # Parallel Shell - we do not signal the override to end a txn when a parallel shell closes. + - jstests/change_streams/only_wake_getmore_for_relevant_changes.js + exclude_with_any_tags: + # These tests would fail with "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + # No need to use a passthrough to add transactions to a test that already has its own + # transactions. + - uses_transactions + # These tests make assumptions about change stream results that are no longer true once operations + # get bundled into transactions. + - change_stream_does_not_expect_txns + +executor: + archive: + hooks: + - CheckReplDBHash + - CheckReplOplogs + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + networkErrorAndTxnOverrideConfig: + wrapCRUDinTransactions: true + # Enable the transactions passthrough. + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/enable_sessions.js'); + load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js'); + load('jstests/libs/override_methods/network_error_and_txn_override.js'); + readMode: commands + hooks: + # The CheckReplDBHash hook waits until all operations have replicated to and have been applied + # on the secondaries, so we run the ValidateCollections hook after it to ensure we're + # validating the entire contents of the collection. + - class: CheckReplOplogs + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ShardedClusterFixture + mongos_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + mongod_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + writePeriodicNoops: 1 + periodicNoopIntervalSecs: 1 + num_rs_nodes_per_shard: 1 + num_shards: 2 + enable_sharding: + - test diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 5d6a075df71..35798c9f162 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -5817,6 +5817,39 @@ tasks: resmoke_args: --suites=change_streams_whole_cluster_sharded_collections_passthrough --storageEngine=wiredTiger - <<: *task_template + name: change_streams_multi_stmt_txn_passthrough + tags: ["change_streams"] + depends_on: + - name: change_streams + commands: + - func: "do setup" + - func: "run tests" + vars: + resmoke_args: --suites=change_streams_multi_stmt_txn_passthrough --storageEngine=wiredTiger + +- <<: *task_template + name: change_streams_multi_stmt_txn_mongos_passthrough + tags: ["change_streams"] + depends_on: + - name: change_streams + commands: + - func: "do setup" + - func: "run tests" + vars: + resmoke_args: --suites=change_streams_multi_stmt_txn_mongos_passthrough --storageEngine=wiredTiger + +- <<: *task_template + name: change_streams_multi_stmt_txn_sharded_collections_passthrough + tags: ["change_streams"] + depends_on: + - name: change_streams + commands: + - func: "do setup" + - func: "run tests" + vars: + resmoke_args: --suites=change_streams_multi_stmt_txn_sharded_collections_passthrough --storageEngine=wiredTiger + +- <<: *task_template name: disk_mobile commands: - func: "do setup" diff --git a/jstests/change_streams/include_cluster_time.js b/jstests/change_streams/include_cluster_time.js index 405bf2d5b16..d035a92f517 100644 --- a/jstests/change_streams/include_cluster_time.js +++ b/jstests/change_streams/include_cluster_time.js @@ -1,4 +1,9 @@ // Tests that each change in the stream will include the cluster time at which it happened. +// +// This test expects each change stream result to have an operationTime based on the clusterTime in +// the oplog entry. When operations get bundled into a transaction, their operationTime is instead +// based on the commit oplog entry, which would cause this test to fail. +// @tags: [change_stream_does_not_expect_txns] (function() { "use strict"; diff --git a/jstests/change_streams/report_latest_observed_oplog_timestamp.js b/jstests/change_streams/report_latest_observed_oplog_timestamp.js index 45abd52ada9..56754613b47 100644 --- a/jstests/change_streams/report_latest_observed_oplog_timestamp.js +++ b/jstests/change_streams/report_latest_observed_oplog_timestamp.js @@ -1,6 +1,10 @@ // Tests that an aggregate with a $changeStream stage will report the latest optime read in // the oplog by its cursor. This is information is needed in order to correctly merge the results // from the various shards on mongos. +// +// This test expects operations timestamps from a change stream to strictly increase with each +// operation, which does not happen when the operations get grouped into a transaction. +// @tags: [change_stream_does_not_expect_txns] (function() { "use strict"; diff --git a/jstests/change_streams/required_as_first_stage.js b/jstests/change_streams/required_as_first_stage.js index 9eb15db9e9d..bdc0b43ba0c 100644 --- a/jstests/change_streams/required_as_first_stage.js +++ b/jstests/change_streams/required_as_first_stage.js @@ -1,4 +1,9 @@ // Tests that the $changeStream stage can only be present as the first stage in the pipeline. +// +// The passthrough logic that bundles operations into transactions needs to be able identify change +// stream aggregations so as to avoid running them in a transaction, but that code would fail to +// recognize the intentionally malformed aggergations that we test here. +// @tags: [change_stream_does_not_expect_txns] (function() { "use strict"; diff --git a/jstests/change_streams/resume_from_high_water_mark_token.js b/jstests/change_streams/resume_from_high_water_mark_token.js index bc63fdb33d9..973fc32d9c9 100644 --- a/jstests/change_streams/resume_from_high_water_mark_token.js +++ b/jstests/change_streams/resume_from_high_water_mark_token.js @@ -160,18 +160,22 @@ const otherCollection = assertCreateCollection(db, otherCollName); const adminDB = db.getSiblingDB("admin"); - // Open a stream on the test collection. Write one document to the test collection and one to - // the unrelated collection, in order to push the postBatchResumeToken (PBRT) past the last - // related event. + // Open a stream on the test collection, and write a document to it. csCursor = testCollection.watch(); assert.commandWorked(testCollection.insert({_id: docId++})); - assert.commandWorked(otherCollection.insert({})); - // Consume all events. The PBRT of the batch should be greater than the last event, which - // guarantees that it is a synthetic high-water-mark token. + // Write an event to the unrelated collection in order to advance the PBRT, and then consume all + // events. When we see a PBRT that is greater than the timestamp of the last event (stored in + // 'relatedEvent'), we know it must be a synthetic high-water-mark token. + // + // Note that the first insert into the unrelated collection may not be enough to advance the + // PBRT; some passthroughs will group the unrelated write into a transaction with the related + // write, giving them the same timestamp. We put the unrelated insert into the assert.soon loop, + // so that it will eventually get its own transaction with a new timestamp. let relatedEvent = null; let hwmToken = null; assert.soon(() => { + assert.commandWorked(otherCollection.insert({})); if (csCursor.hasNext()) { relatedEvent = csCursor.next(); } diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js index f63eba06df7..a044ba76e50 100644 --- a/jstests/change_streams/shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -1,6 +1,11 @@ // Test change streams related shell helpers and options passed to them. Note that, while we only // call the DBCollection.watch helper in this file, it will be redirected to the DB.watch or // Mongo.watch equivalents in the whole_db and whole_cluster passthroughs. +// +// This test expects each change stream result to have an operationTime based on the clusterTime in +// the oplog entry. When operations get bundled into a transaction, their operationTime is instead +// based on the commit oplog entry, which would cause this test to fail. +// @tags: [change_stream_does_not_expect_txns] (function() { "use strict"; diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 685a94f8040..e1914ab3ce5 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -65,12 +65,21 @@ function assertInvalidateOp({cursor, opType}) { */ function assertChangeStreamEventEq(actualEvent, expectedEvent) { const testEvent = Object.assign({}, actualEvent); - if (expectedEvent._id == null) { + if (!expectedEvent.hasOwnProperty("_id")) { delete testEvent._id; // Remove the resume token, if present. } - if (expectedEvent.clusterTime == null) { + if (!expectedEvent.hasOwnProperty("clusterTime")) { delete testEvent.clusterTime; // Remove the cluster time, if present. } + + // The change stream transaction passthrough causes operations to have txnNumber and lsid + // values that the test doesn't expect, which can cause comparisons to fail. + if (!expectedEvent.hasOwnProperty("txnNumber")) { + delete testEvent.txnNumber; // Remove the txnNumber, if present. + } + if (!expectedEvent.hasOwnProperty("lsid")) { + delete testEvent.lsid; // Remove the lsid, if present. + } assert.docEq(testEvent, expectedEvent, "Change did not match expected change. Expected change: " + tojson(expectedEvent) + diff --git a/jstests/libs/override_methods/network_error_and_txn_override.js b/jstests/libs/override_methods/network_error_and_txn_override.js index 7b4d64a0353..440f10d3c50 100644 --- a/jstests/libs/override_methods/network_error_and_txn_override.js +++ b/jstests/libs/override_methods/network_error_and_txn_override.js @@ -250,6 +250,12 @@ ops = []; } + // The (initially empty) set of cursors belonging to aggregation operations that executed + // outside of a transaction. Any getMore operations on these cursors must also execute outside + // of a transaction. The set stores key/value pairs where the key is a cursor id and the value + // is the true boolean value. + let nonTxnAggCursorSet = {}; + // Set the max number of operations to run in a transaction. Once we've hit this number of // operations, we will commit the transaction. This is to prevent having to retry an extremely // long running transaction. @@ -381,8 +387,11 @@ shouldForceWriteConcern = false; } } else if (cmdName === "aggregate") { - if (OverrideHelpers.isAggregationWithListLocalSessionsStage(cmdName, cmdObj)) { - // The $listLocalSessions stage can only be used with readConcern={level: "local"}. + if (OverrideHelpers.isAggregationWithListLocalSessionsStage(cmdName, cmdObj) || + OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj)) { + // The $listLocalSessions stage can only be used with readConcern={level: "local"}, + // and the $changeStream stage can only be used with + // readConcern={level: "majority"}. shouldForceReadConcern = false; } @@ -573,6 +582,12 @@ } } + // Returns true iff a command is a "getMore" on a cursor that is in the `nonTxnAggCursorSet` + // dictionary of cursors that were created outside of any transaction. + function isCommandNonTxnGetMore(cmdName, cmdObj) { + return cmdName === "getMore" && nonTxnAggCursorSet[cmdObj.getMore]; + } + function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) { // We want to overwrite whatever read and write concern is already set. delete cmdObj.readConcern; @@ -583,7 +598,8 @@ const driverSession = conn.getDB(dbName).getSession(); const commandSupportsTransaction = TransactionsUtil.commandSupportsTxn(dbName, cmdName, cmdObj); - if (commandSupportsTransaction && driverSession.getSessionId() !== null) { + if (commandSupportsTransaction && driverSession.getSessionId() !== null && + !isCommandNonTxnGetMore(cmdName, cmdObj)) { if (isNested()) { // Nested commands should never start a new transaction. } else if (ops.length === 0) { @@ -966,6 +982,11 @@ if (configuredForTxnOverride()) { logMsgFull("Override got response", `res: ${tojsononeline(res)}, cmd: ${tojsononeline(cmdObj)}`); + + if (!hasError(res) && + TransactionsUtil.commandIsNonTxnAggregation(cmdName, cmdObj)) { + nonTxnAggCursorSet[res.cursor.id] = true; + } } const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, res); diff --git a/jstests/libs/override_methods/override_helpers.js b/jstests/libs/override_methods/override_helpers.js index bd49e0d7191..8afeb999332 100644 --- a/jstests/libs/override_methods/override_helpers.js +++ b/jstests/libs/override_methods/override_helpers.js @@ -101,6 +101,7 @@ var OverrideHelpers = (function() { isAggregationWithListLocalSessionsStage: makeIsAggregationWithFirstStage("$listLocalSessions"), isAggregationWithOutOrMergeStage: isAggregationWithOutOrMergeStage, + isAggregationWithChangeStreamStage: makeIsAggregationWithFirstStage("$changeStream"), isMapReduceWithInlineOutput: isMapReduceWithInlineOutput, prependOverrideInParallelShell: prependOverrideInParallelShell, overrideRunCommand: overrideRunCommand, diff --git a/jstests/libs/transactions_util.js b/jstests/libs/transactions_util.js index 48814e9cf0f..05d768d5e43 100644 --- a/jstests/libs/transactions_util.js +++ b/jstests/libs/transactions_util.js @@ -2,6 +2,8 @@ * Utilities for testing transactions. */ var TransactionsUtil = (function() { + load("jstests/libs/override_methods/override_helpers.js"); + const kCmdsSupportingTransactions = new Set([ 'aggregate', 'delete', @@ -21,12 +23,21 @@ var TransactionsUtil = (function() { 'delete', ]); + // Indicates an aggregation command with a pipeline that cannot run in a transaction but can + // still execute concurrently with other transactions. Pipelines with $changeStream or $out + // cannot run within a transaction. + function commandIsNonTxnAggregation(cmdName, cmdObj) { + return OverrideHelpers.isAggregationWithOutOrMergeStage(cmdName, cmdObj) || + OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj); + } + function commandSupportsTxn(dbName, cmdName, cmdObj) { if (cmdName === 'commitTransaction' || cmdName === 'abortTransaction') { return true; } - if (!kCmdsSupportingTransactions.has(cmdName)) { + if (!kCmdsSupportingTransactions.has(cmdName) || + commandIsNonTxnAggregation(cmdName, cmdObj)) { return false; } @@ -89,6 +100,10 @@ var TransactionsUtil = (function() { } return { - commandSupportsTxn, commandTypeCanSupportTxn, deepCopyObject, isTransientTransactionError, + commandIsNonTxnAggregation, + commandSupportsTxn, + commandTypeCanSupportTxn, + deepCopyObject, + isTransientTransactionError, }; })(); diff --git a/jstests/replsets/txn_override_unittests.js b/jstests/replsets/txn_override_unittests.js index 365b404cbc2..99bd77bf365 100644 --- a/jstests/replsets/txn_override_unittests.js +++ b/jstests/replsets/txn_override_unittests.js @@ -1348,6 +1348,32 @@ testBadDBName(session, 'local'); } }, + { + name: "getMore on change stream executes outside transaction", + test: function() { + assert.commandWorked(testDB.createCollection(collName1)); + + // Starting a $changeStream aggregation within a transaction would fail, so the + // override has to execute this as a standalone command. + const changeStream = testDB.collName1.watch(); + assert.commandWorked(testDB.collName1.insert({_id: 1})); + endCurrentTransactionIfOpen(); + + // Calling the `next` function on the change stream cursor will trigger a getmore, + // which the override must also run as a standalone command. + assert.eq(changeStream.next()["fullDocument"], {_id: 1}); + + // An aggregation without $changeStream runs within a transaction. + let aggCursor = testDB.collName1.aggregate([], {cursor: {batchSize: 0}}); + assert.eq(aggCursor.next(), {_id: 1}); + + // Creating a non-$changeStream aggregation cursor and running its getMore in a + // different transaction will fail. + aggCursor = testDB.collName1.aggregate([], {cursor: {batchSize: 0}}); + endCurrentTransactionIfOpen(); + assert.throws(() => aggCursor.next()); + } + }, ]; // Failpoints, overrides, and post-command functions are set by default to only run once, so |