summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml66
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_passthrough.yml53
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml62
-rw-r--r--etc/evergreen.yml33
-rw-r--r--jstests/change_streams/include_cluster_time.js5
-rw-r--r--jstests/change_streams/report_latest_observed_oplog_timestamp.js4
-rw-r--r--jstests/change_streams/required_as_first_stage.js5
-rw-r--r--jstests/change_streams/resume_from_high_water_mark_token.js16
-rw-r--r--jstests/change_streams/shell_helper.js5
-rw-r--r--jstests/libs/change_stream_util.js13
-rw-r--r--jstests/libs/override_methods/network_error_and_txn_override.js27
-rw-r--r--jstests/libs/override_methods/override_helpers.js1
-rw-r--r--jstests/libs/transactions_util.js19
-rw-r--r--jstests/replsets/txn_override_unittests.js26
14 files changed, 322 insertions, 13 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml
new file mode 100644
index 00000000000..ff7c8170176
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_mongos_passthrough.yml
@@ -0,0 +1,66 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
+ - jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+ exclude_with_any_tags:
+ # These tests would fail with "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # No need to use a passthrough to add transactions to a test that already has its own
+ # transactions.
+ - uses_transactions
+ # These tests make assumptions about change stream results that are no longer true once operations
+ # get bundled into transactions.
+ - change_stream_does_not_expect_txns
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - CheckReplOplogs
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ networkErrorAndTxnOverrideConfig:
+ wrapCRUDinTransactions: true
+ # Enable the transactions passthrough.
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/enable_sessions.js');
+ load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js');
+ load('jstests/libs/override_methods/network_error_and_txn_override.js');
+ readMode: commands
+ hooks:
+ # The CheckReplDBHash hook waits until all operations have replicated to and have been applied
+ # on the secondaries, so we run the ValidateCollections hook after it to ensure we're
+ # validating the entire contents of the collection.
+ - class: CheckReplOplogs
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ # Use two shards to make sure we will only talk to the primary shard for the database and will
+ # not delay changes to wait for notifications or a clock advancement from other shards.
+ num_shards: 2
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ periodicNoopIntervalSecs: 1
+ writePeriodicNoops: true
+ num_rs_nodes_per_shard: 1
+ # This test suite doesn't actually shard any collections, but enabling sharding will prevent
+ # read commands against non-existent databases from unconditionally returning a CursorId of 0.
+ enable_sharding:
+ - test
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_passthrough.yml
new file mode 100644
index 00000000000..15730882c82
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_passthrough.yml
@@ -0,0 +1,53 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
+ - jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+ exclude_with_any_tags:
+ # These tests would fail with "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # No need to use a passthrough to add transactions to a test that already has its own
+ # transactions.
+ - uses_transactions
+ # These tests make assumptions about change stream results that are no longer true once operations
+ # get bundled into transactions.
+ - change_stream_does_not_expect_txns
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - CheckReplOplogs
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ networkErrorAndTxnOverrideConfig:
+ wrapCRUDinTransactions: true
+ # Enable the transactions passthrough.
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/enable_sessions.js');
+ load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js');
+ load('jstests/libs/override_methods/network_error_and_txn_override.js');
+ readMode: commands
+ hooks:
+ # The CheckReplDBHash hook waits until all operations have replicated to and have been applied
+ # on the secondaries, so we run the ValidateCollections hook after it to ensure we're
+ # validating the entire contents of the collection.
+ - class: CheckReplOplogs
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ReplicaSetFixture
+ mongod_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ num_nodes: 1
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml
new file mode 100644
index 00000000000..db182de7dac
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/change_streams_multi_stmt_txn_sharded_collections_passthrough.yml
@@ -0,0 +1,62 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/change_streams/**/*.js
+ exclude_files:
+ # Parallel Shell - we do not signal the override to end a txn when a parallel shell closes.
+ - jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+ exclude_with_any_tags:
+ # These tests would fail with "Cowardly refusing to override write concern of command: ..."
+ - assumes_write_concern_unchanged
+ # No need to use a passthrough to add transactions to a test that already has its own
+ # transactions.
+ - uses_transactions
+ # These tests make assumptions about change stream results that are no longer true once operations
+ # get bundled into transactions.
+ - change_stream_does_not_expect_txns
+
+executor:
+ archive:
+ hooks:
+ - CheckReplDBHash
+ - CheckReplOplogs
+ - ValidateCollections
+ config:
+ shell_options:
+ global_vars:
+ TestData:
+ networkErrorAndTxnOverrideConfig:
+ wrapCRUDinTransactions: true
+ # Enable the transactions passthrough.
+ eval: >-
+ var testingReplication = true;
+ load('jstests/libs/override_methods/enable_sessions.js');
+ load('jstests/libs/override_methods/txn_passthrough_cmd_massage.js');
+ load('jstests/libs/override_methods/network_error_and_txn_override.js');
+ readMode: commands
+ hooks:
+ # The CheckReplDBHash hook waits until all operations have replicated to and have been applied
+ # on the secondaries, so we run the ValidateCollections hook after it to ensure we're
+ # validating the entire contents of the collection.
+ - class: CheckReplOplogs
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ShardedClusterFixture
+ mongos_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ mongod_options:
+ bind_ip_all: ''
+ set_parameters:
+ enableTestCommands: 1
+ writePeriodicNoops: 1
+ periodicNoopIntervalSecs: 1
+ num_rs_nodes_per_shard: 1
+ num_shards: 2
+ enable_sharding:
+ - test
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 5d6a075df71..35798c9f162 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -5817,6 +5817,39 @@ tasks:
resmoke_args: --suites=change_streams_whole_cluster_sharded_collections_passthrough --storageEngine=wiredTiger
- <<: *task_template
+ name: change_streams_multi_stmt_txn_passthrough
+ tags: ["change_streams"]
+ depends_on:
+ - name: change_streams
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_multi_stmt_txn_passthrough --storageEngine=wiredTiger
+
+- <<: *task_template
+ name: change_streams_multi_stmt_txn_mongos_passthrough
+ tags: ["change_streams"]
+ depends_on:
+ - name: change_streams
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_multi_stmt_txn_mongos_passthrough --storageEngine=wiredTiger
+
+- <<: *task_template
+ name: change_streams_multi_stmt_txn_sharded_collections_passthrough
+ tags: ["change_streams"]
+ depends_on:
+ - name: change_streams
+ commands:
+ - func: "do setup"
+ - func: "run tests"
+ vars:
+ resmoke_args: --suites=change_streams_multi_stmt_txn_sharded_collections_passthrough --storageEngine=wiredTiger
+
+- <<: *task_template
name: disk_mobile
commands:
- func: "do setup"
diff --git a/jstests/change_streams/include_cluster_time.js b/jstests/change_streams/include_cluster_time.js
index 405bf2d5b16..d035a92f517 100644
--- a/jstests/change_streams/include_cluster_time.js
+++ b/jstests/change_streams/include_cluster_time.js
@@ -1,4 +1,9 @@
// Tests that each change in the stream will include the cluster time at which it happened.
+//
+// This test expects each change stream result to have an operationTime based on the clusterTime in
+// the oplog entry. When operations get bundled into a transaction, their operationTime is instead
+// based on the commit oplog entry, which would cause this test to fail.
+// @tags: [change_stream_does_not_expect_txns]
(function() {
"use strict";
diff --git a/jstests/change_streams/report_latest_observed_oplog_timestamp.js b/jstests/change_streams/report_latest_observed_oplog_timestamp.js
index 45abd52ada9..56754613b47 100644
--- a/jstests/change_streams/report_latest_observed_oplog_timestamp.js
+++ b/jstests/change_streams/report_latest_observed_oplog_timestamp.js
@@ -1,6 +1,10 @@
// Tests that an aggregate with a $changeStream stage will report the latest optime read in
// the oplog by its cursor. This is information is needed in order to correctly merge the results
// from the various shards on mongos.
+//
+// This test expects operations timestamps from a change stream to strictly increase with each
+// operation, which does not happen when the operations get grouped into a transaction.
+// @tags: [change_stream_does_not_expect_txns]
(function() {
"use strict";
diff --git a/jstests/change_streams/required_as_first_stage.js b/jstests/change_streams/required_as_first_stage.js
index 9eb15db9e9d..bdc0b43ba0c 100644
--- a/jstests/change_streams/required_as_first_stage.js
+++ b/jstests/change_streams/required_as_first_stage.js
@@ -1,4 +1,9 @@
// Tests that the $changeStream stage can only be present as the first stage in the pipeline.
+//
+// The passthrough logic that bundles operations into transactions needs to be able identify change
+// stream aggregations so as to avoid running them in a transaction, but that code would fail to
+// recognize the intentionally malformed aggergations that we test here.
+// @tags: [change_stream_does_not_expect_txns]
(function() {
"use strict";
diff --git a/jstests/change_streams/resume_from_high_water_mark_token.js b/jstests/change_streams/resume_from_high_water_mark_token.js
index bc63fdb33d9..973fc32d9c9 100644
--- a/jstests/change_streams/resume_from_high_water_mark_token.js
+++ b/jstests/change_streams/resume_from_high_water_mark_token.js
@@ -160,18 +160,22 @@
const otherCollection = assertCreateCollection(db, otherCollName);
const adminDB = db.getSiblingDB("admin");
- // Open a stream on the test collection. Write one document to the test collection and one to
- // the unrelated collection, in order to push the postBatchResumeToken (PBRT) past the last
- // related event.
+ // Open a stream on the test collection, and write a document to it.
csCursor = testCollection.watch();
assert.commandWorked(testCollection.insert({_id: docId++}));
- assert.commandWorked(otherCollection.insert({}));
- // Consume all events. The PBRT of the batch should be greater than the last event, which
- // guarantees that it is a synthetic high-water-mark token.
+ // Write an event to the unrelated collection in order to advance the PBRT, and then consume all
+ // events. When we see a PBRT that is greater than the timestamp of the last event (stored in
+ // 'relatedEvent'), we know it must be a synthetic high-water-mark token.
+ //
+ // Note that the first insert into the unrelated collection may not be enough to advance the
+ // PBRT; some passthroughs will group the unrelated write into a transaction with the related
+ // write, giving them the same timestamp. We put the unrelated insert into the assert.soon loop,
+ // so that it will eventually get its own transaction with a new timestamp.
let relatedEvent = null;
let hwmToken = null;
assert.soon(() => {
+ assert.commandWorked(otherCollection.insert({}));
if (csCursor.hasNext()) {
relatedEvent = csCursor.next();
}
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js
index f63eba06df7..a044ba76e50 100644
--- a/jstests/change_streams/shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -1,6 +1,11 @@
// Test change streams related shell helpers and options passed to them. Note that, while we only
// call the DBCollection.watch helper in this file, it will be redirected to the DB.watch or
// Mongo.watch equivalents in the whole_db and whole_cluster passthroughs.
+//
+// This test expects each change stream result to have an operationTime based on the clusterTime in
+// the oplog entry. When operations get bundled into a transaction, their operationTime is instead
+// based on the commit oplog entry, which would cause this test to fail.
+// @tags: [change_stream_does_not_expect_txns]
(function() {
"use strict";
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 685a94f8040..e1914ab3ce5 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -65,12 +65,21 @@ function assertInvalidateOp({cursor, opType}) {
*/
function assertChangeStreamEventEq(actualEvent, expectedEvent) {
const testEvent = Object.assign({}, actualEvent);
- if (expectedEvent._id == null) {
+ if (!expectedEvent.hasOwnProperty("_id")) {
delete testEvent._id; // Remove the resume token, if present.
}
- if (expectedEvent.clusterTime == null) {
+ if (!expectedEvent.hasOwnProperty("clusterTime")) {
delete testEvent.clusterTime; // Remove the cluster time, if present.
}
+
+ // The change stream transaction passthrough causes operations to have txnNumber and lsid
+ // values that the test doesn't expect, which can cause comparisons to fail.
+ if (!expectedEvent.hasOwnProperty("txnNumber")) {
+ delete testEvent.txnNumber; // Remove the txnNumber, if present.
+ }
+ if (!expectedEvent.hasOwnProperty("lsid")) {
+ delete testEvent.lsid; // Remove the lsid, if present.
+ }
assert.docEq(testEvent,
expectedEvent,
"Change did not match expected change. Expected change: " + tojson(expectedEvent) +
diff --git a/jstests/libs/override_methods/network_error_and_txn_override.js b/jstests/libs/override_methods/network_error_and_txn_override.js
index 7b4d64a0353..440f10d3c50 100644
--- a/jstests/libs/override_methods/network_error_and_txn_override.js
+++ b/jstests/libs/override_methods/network_error_and_txn_override.js
@@ -250,6 +250,12 @@
ops = [];
}
+ // The (initially empty) set of cursors belonging to aggregation operations that executed
+ // outside of a transaction. Any getMore operations on these cursors must also execute outside
+ // of a transaction. The set stores key/value pairs where the key is a cursor id and the value
+ // is the true boolean value.
+ let nonTxnAggCursorSet = {};
+
// Set the max number of operations to run in a transaction. Once we've hit this number of
// operations, we will commit the transaction. This is to prevent having to retry an extremely
// long running transaction.
@@ -381,8 +387,11 @@
shouldForceWriteConcern = false;
}
} else if (cmdName === "aggregate") {
- if (OverrideHelpers.isAggregationWithListLocalSessionsStage(cmdName, cmdObj)) {
- // The $listLocalSessions stage can only be used with readConcern={level: "local"}.
+ if (OverrideHelpers.isAggregationWithListLocalSessionsStage(cmdName, cmdObj) ||
+ OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj)) {
+ // The $listLocalSessions stage can only be used with readConcern={level: "local"},
+ // and the $changeStream stage can only be used with
+ // readConcern={level: "majority"}.
shouldForceReadConcern = false;
}
@@ -573,6 +582,12 @@
}
}
+ // Returns true iff a command is a "getMore" on a cursor that is in the `nonTxnAggCursorSet`
+ // dictionary of cursors that were created outside of any transaction.
+ function isCommandNonTxnGetMore(cmdName, cmdObj) {
+ return cmdName === "getMore" && nonTxnAggCursorSet[cmdObj.getMore];
+ }
+
function setupTransactionCommand(conn, dbName, cmdName, cmdObj, lsid) {
// We want to overwrite whatever read and write concern is already set.
delete cmdObj.readConcern;
@@ -583,7 +598,8 @@
const driverSession = conn.getDB(dbName).getSession();
const commandSupportsTransaction =
TransactionsUtil.commandSupportsTxn(dbName, cmdName, cmdObj);
- if (commandSupportsTransaction && driverSession.getSessionId() !== null) {
+ if (commandSupportsTransaction && driverSession.getSessionId() !== null &&
+ !isCommandNonTxnGetMore(cmdName, cmdObj)) {
if (isNested()) {
// Nested commands should never start a new transaction.
} else if (ops.length === 0) {
@@ -966,6 +982,11 @@
if (configuredForTxnOverride()) {
logMsgFull("Override got response",
`res: ${tojsononeline(res)}, cmd: ${tojsononeline(cmdObj)}`);
+
+ if (!hasError(res) &&
+ TransactionsUtil.commandIsNonTxnAggregation(cmdName, cmdObj)) {
+ nonTxnAggCursorSet[res.cursor.id] = true;
+ }
}
const logError = (msg) => logErrorFull(msg, cmdName, cmdObj, res);
diff --git a/jstests/libs/override_methods/override_helpers.js b/jstests/libs/override_methods/override_helpers.js
index bd49e0d7191..8afeb999332 100644
--- a/jstests/libs/override_methods/override_helpers.js
+++ b/jstests/libs/override_methods/override_helpers.js
@@ -101,6 +101,7 @@ var OverrideHelpers = (function() {
isAggregationWithListLocalSessionsStage:
makeIsAggregationWithFirstStage("$listLocalSessions"),
isAggregationWithOutOrMergeStage: isAggregationWithOutOrMergeStage,
+ isAggregationWithChangeStreamStage: makeIsAggregationWithFirstStage("$changeStream"),
isMapReduceWithInlineOutput: isMapReduceWithInlineOutput,
prependOverrideInParallelShell: prependOverrideInParallelShell,
overrideRunCommand: overrideRunCommand,
diff --git a/jstests/libs/transactions_util.js b/jstests/libs/transactions_util.js
index 48814e9cf0f..05d768d5e43 100644
--- a/jstests/libs/transactions_util.js
+++ b/jstests/libs/transactions_util.js
@@ -2,6 +2,8 @@
* Utilities for testing transactions.
*/
var TransactionsUtil = (function() {
+ load("jstests/libs/override_methods/override_helpers.js");
+
const kCmdsSupportingTransactions = new Set([
'aggregate',
'delete',
@@ -21,12 +23,21 @@ var TransactionsUtil = (function() {
'delete',
]);
+ // Indicates an aggregation command with a pipeline that cannot run in a transaction but can
+ // still execute concurrently with other transactions. Pipelines with $changeStream or $out
+ // cannot run within a transaction.
+ function commandIsNonTxnAggregation(cmdName, cmdObj) {
+ return OverrideHelpers.isAggregationWithOutOrMergeStage(cmdName, cmdObj) ||
+ OverrideHelpers.isAggregationWithChangeStreamStage(cmdName, cmdObj);
+ }
+
function commandSupportsTxn(dbName, cmdName, cmdObj) {
if (cmdName === 'commitTransaction' || cmdName === 'abortTransaction') {
return true;
}
- if (!kCmdsSupportingTransactions.has(cmdName)) {
+ if (!kCmdsSupportingTransactions.has(cmdName) ||
+ commandIsNonTxnAggregation(cmdName, cmdObj)) {
return false;
}
@@ -89,6 +100,10 @@ var TransactionsUtil = (function() {
}
return {
- commandSupportsTxn, commandTypeCanSupportTxn, deepCopyObject, isTransientTransactionError,
+ commandIsNonTxnAggregation,
+ commandSupportsTxn,
+ commandTypeCanSupportTxn,
+ deepCopyObject,
+ isTransientTransactionError,
};
})();
diff --git a/jstests/replsets/txn_override_unittests.js b/jstests/replsets/txn_override_unittests.js
index 365b404cbc2..99bd77bf365 100644
--- a/jstests/replsets/txn_override_unittests.js
+++ b/jstests/replsets/txn_override_unittests.js
@@ -1348,6 +1348,32 @@
testBadDBName(session, 'local');
}
},
+ {
+ name: "getMore on change stream executes outside transaction",
+ test: function() {
+ assert.commandWorked(testDB.createCollection(collName1));
+
+ // Starting a $changeStream aggregation within a transaction would fail, so the
+ // override has to execute this as a standalone command.
+ const changeStream = testDB.collName1.watch();
+ assert.commandWorked(testDB.collName1.insert({_id: 1}));
+ endCurrentTransactionIfOpen();
+
+ // Calling the `next` function on the change stream cursor will trigger a getmore,
+ // which the override must also run as a standalone command.
+ assert.eq(changeStream.next()["fullDocument"], {_id: 1});
+
+ // An aggregation without $changeStream runs within a transaction.
+ let aggCursor = testDB.collName1.aggregate([], {cursor: {batchSize: 0}});
+ assert.eq(aggCursor.next(), {_id: 1});
+
+ // Creating a non-$changeStream aggregation cursor and running its getMore in a
+ // different transaction will fail.
+ aggCursor = testDB.collName1.aggregate([], {cursor: {batchSize: 0}});
+ endCurrentTransactionIfOpen();
+ assert.throws(() => aggCursor.next());
+ }
+ },
];
// Failpoints, overrides, and post-command functions are set by default to only run once, so