summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-03-30 21:31:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-30 21:59:05 +0000
commit7ceeed142005460b81efc2f1d534f8fbcf8a1f65 (patch)
tree34d4ad13ff3638be4e29c01e8b5bbfc57634c56f
parent6a06d60a41605738cfb882aa5a5aaedeb30e0512 (diff)
downloadmongo-7ceeed142005460b81efc2f1d534f8fbcf8a1f65.tar.gz
SERVER-64429 Add txn oplog filtering for createIndexes event
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml6
-rw-r--r--jstests/change_streams/ddl_create_index_txn.js118
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.cpp8
3 files changed, 130 insertions, 2 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index 409a6ef0dbf..1d9f2808321 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -3,6 +3,12 @@ test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
+ exclude_files:
+ # This test creates a collection (and index) inside a transaction. Even though the collections are
+ # unsharded this suite enables sharding in the test database which makes transactions against
+ # it distributed. This causes the following tests to fail since creating a collection in a
+ # distributed transaction is not allowed.
+ - jstests/change_streams/ddl_create_index_txn.js
exclude_with_any_tags:
##
# The next tags correspond to the special errors thrown by the
diff --git a/jstests/change_streams/ddl_create_index_txn.js b/jstests/change_streams/ddl_create_index_txn.js
new file mode 100644
index 00000000000..303f9b079ce
--- /dev/null
+++ b/jstests/change_streams/ddl_create_index_txn.js
@@ -0,0 +1,118 @@
+/**
+ * Tests that a change stream will correctly unwind createIndexes operations from applyOps when
+ * createIndexes is done in a transaction.
+ *
+ * @tags: [
+ * uses_transactions,
+ * requires_majority_read_concern,
+ * requires_fcv_60
+ * # In order to run this test with sharding we would have to create a transaction that creates
+ * # the collection, shards it, and then creates the index. however sharding a collection in a
+ * # transaction is not allowed and creating an index in a transaction on a collection that was
+ * # not created in that transaction is also not allowed. so this test only works with unsharded
+ * # collections.
+ * assumes_unsharded_collection
+ * ]
+ */
+(function() {
+
+load("jstests/libs/auto_retry_transaction_in_sharding.js"); // For withTxnAndAutoRetryOnMongos.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos.
+load('jstests/libs/collection_drop_recreate.js'); // 'assertDropCollection'.
+
+const dbName = "test";
+const collName = jsTestName();
+const otherCollName = jsTestName() + "_2";
+const coll = db[jsTestName()];
+
+const otherDBName = jsTestName() + "_db";
+const otherDB = db.getSiblingDB(otherDBName);
+const otherDBCollName = "someColl";
+
+const session = db.getMongo().startSession();
+
+const sessionDB = session.getDatabase(db.getName());
+const sessionOtherDB = session.getDatabase(otherDBName);
+const sessionColl = sessionDB[collName];
+const sessionOtherColl = sessionDB[otherCollName];
+const sessionOtherDBColl = sessionOtherDB[otherDBCollName];
+
+assertDropCollection(sessionDB, collName);
+assertDropCollection(sessionDB, otherCollName);
+assertDropCollection(sessionOtherDB, otherDBCollName);
+
+let csOptions = {showExpandedEvents: true};
+const pipeline = [{$changeStream: csOptions}, {$project: {"lsid.uid": 0}}];
+
+let cst = new ChangeStreamTest(db);
+let changeStream = cst.startWatchingChanges({pipeline, collection: coll});
+
+const testStartTime = changeStream.postBatchResumeToken;
+assert.neq(testStartTime, undefined);
+
+const txnOptions = {
+ readConcern: {level: "local"},
+ writeConcern: {w: "majority"}
+};
+
+withTxnAndAutoRetryOnMongos(session, () => {
+ assert.commandWorked(sessionColl.createIndex({unused: 1}));
+ assert.commandWorked(sessionOtherColl.createIndex({unused: 1}));
+ assert.commandWorked(sessionOtherDBColl.createIndex({unused: 1}));
+}, txnOptions);
+
+const lsid = session.getSessionId();
+const txnNumber = session.getTxnNumber_forTesting();
+
+const expectedChanges = [
+ {operationType: "create", ns: {db: dbName, coll: collName}, lsid, txnNumber},
+ {
+ operationType: "createIndexes",
+ ns: {db: dbName, coll: collName},
+ "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
+ lsid,
+ txnNumber
+ }
+];
+
+// Test single coll changeStream.
+cst.assertNextChangesEqual({cursor: changeStream, expectedChanges});
+
+// Test whole db changeStream.
+const otherCollEvents = [
+ {operationType: "create", ns: {db: dbName, coll: otherCollName}},
+ {
+ operationType: "createIndexes",
+ ns: {db: dbName, coll: otherCollName},
+ "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
+ lsid,
+ txnNumber
+ }
+];
+expectedChanges.push(...otherCollEvents);
+csOptions.startAfter = testStartTime;
+changeStream = cst.startWatchingChanges({pipeline, collection: 1});
+cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+
+cst.cleanUp();
+cst = new ChangeStreamTest(db.getSiblingDB("admin"));
+
+// Test whole cluster changeStream.
+const otherDBEvents = [
+ {operationType: "create", ns: {db: otherDBName, coll: otherDBCollName}},
+ {
+ operationType: "createIndexes",
+ ns: {db: otherDBName, coll: otherDBCollName},
+ "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]},
+ lsid,
+ txnNumber
+ }
+];
+expectedChanges.push(...otherDBEvents);
+csOptions.allChangesForCluster = true;
+changeStream = cst.startWatchingChanges({pipeline, collection: 1});
+cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+
+cst.cleanUp();
+})();
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
index 75c9e930808..b45af75c96d 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
@@ -256,8 +256,12 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
// Match relevant command events on the monitored namespaces.
orBuilder.append(BSON(
"o.applyOps" << BSON(
- "$elemMatch" << BSON("ns" << BSONRegEx(cmdNsRegex)
- << OR(BSON("o.create" << BSONRegEx(collRegex)))))));
+ "$elemMatch" << BSON(
+ "ns" << BSONRegEx(cmdNsRegex)
+ << OR(BSON("o.create" << BSONRegEx(collRegex)),
+ // We don't need to consider 'o.commitIndexBuild' here because
+ // creating an index on a non-empty collection is not allowed.
+ BSON("o.createIndexes" << BSONRegEx(collRegex)))))));
// The default repl::OpTime is the value used to indicate a null "prevOpTime" link.
orBuilder.append(BSON(repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName