diff options
3 files changed, 130 insertions, 2 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index 409a6ef0dbf..1d9f2808321 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -3,6 +3,12 @@ test_kind: js_test
- jstests/change_streams/**/*.js
+ exclude_files:
+ # This test creates a collection (and index) inside a transaction. Even though the collections are
+ # unsharded this suite enables sharding in the test database which makes transactions against
+ # it distributed. This causes the following tests to fail since creating a collection in a
+ # distributed transaction is not allowed.
+ - jstests/change_streams/ddl_create_index_txn.js
# The next tags correspond to the special errors thrown by the
diff --git a/jstests/change_streams/ddl_create_index_txn.js b/jstests/change_streams/ddl_create_index_txn.js
new file mode 100644
index 00000000000..303f9b079ce
--- /dev/null
+++ b/jstests/change_streams/ddl_create_index_txn.js
@@ -0,0 +1,118 @@
+ * Tests that a change stream will correctly unwind createIndexes operations from applyOps when
+ * createIndexes is done in a transaction.
+ *
+ * @tags: [
+ * uses_transactions,
+ * requires_majority_read_concern,
+ * requires_fcv_60
+ * # In order to run this test with sharding we would have to create a transaction that creates
+ * # the collection, shards it, and then creates the index. however sharding a collection in a
+ * # transaction is not allowed and creating an index in a transaction on a collection that was
+ * # not created in that transaction is also not allowed. so this test only works with unsharded
+ * # collections.
+ * assumes_unsharded_collection
+ * ]
+ */
+(function() {
+load("jstests/libs/auto_retry_transaction_in_sharding.js"); // For withTxnAndAutoRetryOnMongos.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos.
+load('jstests/libs/collection_drop_recreate.js'); // 'assertDropCollection'.
+const dbName = "test";
+const collName = jsTestName();
+const otherCollName = jsTestName() + "_2";
+const coll = db[jsTestName()];
+const otherDBName = jsTestName() + "_db";
+const otherDB = db.getSiblingDB(otherDBName);
+const otherDBCollName = "someColl";
+const session = db.getMongo().startSession();
+const sessionDB = session.getDatabase(db.getName());
+const sessionOtherDB = session.getDatabase(otherDBName);
+const sessionColl = sessionDB[collName];
+const sessionOtherColl = sessionDB[otherCollName];
+const sessionOtherDBColl = sessionOtherDB[otherDBCollName];
+assertDropCollection(sessionDB, collName);
+assertDropCollection(sessionDB, otherCollName);
+assertDropCollection(sessionOtherDB, otherDBCollName);
+let csOptions = {showExpandedEvents: true};
+const pipeline = [{$changeStream: csOptions}, {$project: {"lsid.uid": 0}}];
+let cst = new ChangeStreamTest(db);
+let changeStream = cst.startWatchingChanges({pipeline, collection: coll});
+const testStartTime = changeStream.postBatchResumeToken;
+assert.neq(testStartTime, undefined);
+const txnOptions = {
+ readConcern: {level: "local"},
+ writeConcern: {w: "majority"}
+withTxnAndAutoRetryOnMongos(session, () => {
+ assert.commandWorked(sessionColl.createIndex({unused: 1}));
+ assert.commandWorked(sessionOtherColl.createIndex({unused: 1}));
+ assert.commandWorked(sessionOtherDBColl.createIndex({unused: 1}));
+}, txnOptions);
+const lsid = session.getSessionId();
+const txnNumber = session.getTxnNumber_forTesting();
+const expectedChanges = [
+ {operationType: "create", ns: {db: dbName, coll: collName}, lsid, txnNumber},
+ {
+ operationType: "createIndexes",
+ ns: {db: dbName, coll: collName},
+ "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
+ lsid,
+ txnNumber
+ }
+// Test single coll changeStream.
+cst.assertNextChangesEqual({cursor: changeStream, expectedChanges});
+// Test whole db changeStream.
+const otherCollEvents = [
+ {operationType: "create", ns: {db: dbName, coll: otherCollName}},
+ {
+ operationType: "createIndexes",
+ ns: {db: dbName, coll: otherCollName},
+ "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
+ lsid,
+ txnNumber
+ }
+csOptions.startAfter = testStartTime;
+changeStream = cst.startWatchingChanges({pipeline, collection: 1});
+cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+cst = new ChangeStreamTest(db.getSiblingDB("admin"));
+// Test whole cluster changeStream.
+const otherDBEvents = [
+ {operationType: "create", ns: {db: otherDBName, coll: otherDBCollName}},
+ {
+ operationType: "createIndexes",
+ ns: {db: otherDBName, coll: otherDBCollName},
+ "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
+ lsid,
+ txnNumber
+ }
+csOptions.allChangesForCluster = true;
+changeStream = cst.startWatchingChanges({pipeline, collection: 1});
+cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
index 75c9e930808..b45af75c96d 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
@@ -256,8 +256,12 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
// Match relevant command events on the monitored namespaces.
"o.applyOps" << BSON(
- "$elemMatch" << BSON("ns" << BSONRegEx(cmdNsRegex)
- << OR(BSON("o.create" << BSONRegEx(collRegex)))))));
+ "$elemMatch" << BSON(
+ "ns" << BSONRegEx(cmdNsRegex)
+ << OR(BSON("o.create" << BSONRegEx(collRegex)),
+ // We don't need to consider 'o.commitIndexBuild' here because
+ // creating an index on a non-empty collection is not allowed.
+ BSON("o.createIndexes" << BSONRegEx(collRegex)))))));
// The default repl::OpTime is the value used to indicate a null "prevOpTime" link.