From 7ceeed142005460b81efc2f1d534f8fbcf8a1f65 Mon Sep 17 00:00:00 2001 From: "Mickey. J Winters" Date: Wed, 30 Mar 2022 21:31:10 +0000 Subject: SERVER-64429 Add txn oplog filtering for createIndexes event --- .../change_streams_mongos_sessions_passthrough.yml | 6 ++ jstests/change_streams/ddl_create_index_txn.js | 118 +++++++++++++++++++++ .../db/pipeline/change_stream_filter_helpers.cpp | 8 +- 3 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 jstests/change_streams/ddl_create_index_txn.js diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml index 409a6ef0dbf..1d9f2808321 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -3,6 +3,12 @@ test_kind: js_test selector: roots: - jstests/change_streams/**/*.js + exclude_files: + # This test creates a collection (and index) inside a transaction. Even though the collections are + # unsharded this suite enables sharding in the test database which makes transactions against + # it distributed. This causes the following tests to fail since creating a collection in a + # distributed transaction is not allowed. + - jstests/change_streams/ddl_create_index_txn.js exclude_with_any_tags: ## # The next tags correspond to the special errors thrown by the diff --git a/jstests/change_streams/ddl_create_index_txn.js b/jstests/change_streams/ddl_create_index_txn.js new file mode 100644 index 00000000000..303f9b079ce --- /dev/null +++ b/jstests/change_streams/ddl_create_index_txn.js @@ -0,0 +1,118 @@ +/** + * Tests that a change stream will correctly unwind createIndexes operations from applyOps when + * createIndexes is done in a transaction. + * + * @tags: [ + * uses_transactions, + * requires_majority_read_concern, + * requires_fcv_60 + * # In order to run this test with sharding we would have to create a transaction that creates + * # the collection, shards it, and then creates the index. however sharding a collection in a + * # transaction is not allowed and creating an index in a transaction on a collection that was + * # not created in that transaction is also not allowed. so this test only works with unsharded + * # collections. + * assumes_unsharded_collection + * ] + */ +(function() { + +load("jstests/libs/auto_retry_transaction_in_sharding.js"); // For withTxnAndAutoRetryOnMongos. +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.isMongos. +load('jstests/libs/collection_drop_recreate.js'); // 'assertDropCollection'. + +const dbName = "test"; +const collName = jsTestName(); +const otherCollName = jsTestName() + "_2"; +const coll = db[jsTestName()]; + +const otherDBName = jsTestName() + "_db"; +const otherDB = db.getSiblingDB(otherDBName); +const otherDBCollName = "someColl"; + +const session = db.getMongo().startSession(); + +const sessionDB = session.getDatabase(db.getName()); +const sessionOtherDB = session.getDatabase(otherDBName); +const sessionColl = sessionDB[collName]; +const sessionOtherColl = sessionDB[otherCollName]; +const sessionOtherDBColl = sessionOtherDB[otherDBCollName]; + +assertDropCollection(sessionDB, collName); +assertDropCollection(sessionDB, otherCollName); +assertDropCollection(sessionOtherDB, otherDBCollName); + +let csOptions = {showExpandedEvents: true}; +const pipeline = [{$changeStream: csOptions}, {$project: {"lsid.uid": 0}}]; + +let cst = new ChangeStreamTest(db); +let changeStream = cst.startWatchingChanges({pipeline, collection: coll}); + +const testStartTime = changeStream.postBatchResumeToken; +assert.neq(testStartTime, undefined); + +const txnOptions = { + readConcern: {level: "local"}, + writeConcern: {w: "majority"} +}; + +withTxnAndAutoRetryOnMongos(session, () => { + assert.commandWorked(sessionColl.createIndex({unused: 1})); + assert.commandWorked(sessionOtherColl.createIndex({unused: 1})); + assert.commandWorked(sessionOtherDBColl.createIndex({unused: 1})); +}, txnOptions); + +const lsid = session.getSessionId(); +const txnNumber = session.getTxnNumber_forTesting(); + +const expectedChanges = [ + {operationType: "create", ns: {db: dbName, coll: collName}, lsid, txnNumber}, + { + operationType: "createIndexes", + ns: {db: dbName, coll: collName}, + "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]}, + lsid, + txnNumber + } +]; + +// Test single coll changeStream. +cst.assertNextChangesEqual({cursor: changeStream, expectedChanges}); + +// Test whole db changeStream. +const otherCollEvents = [ + {operationType: "create", ns: {db: dbName, coll: otherCollName}}, + { + operationType: "createIndexes", + ns: {db: dbName, coll: otherCollName}, + "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]}, + lsid, + txnNumber + } +]; +expectedChanges.push(...otherCollEvents); +csOptions.startAfter = testStartTime; +changeStream = cst.startWatchingChanges({pipeline, collection: 1}); +cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); + +cst.cleanUp(); +cst = new ChangeStreamTest(db.getSiblingDB("admin")); + +// Test whole cluster changeStream. +const otherDBEvents = [ + {operationType: "create", ns: {db: otherDBName, coll: otherDBCollName}}, + { + operationType: "createIndexes", + ns: {db: otherDBName, coll: otherDBCollName}, + "operationDescription": {"indexes": [{"v": 2, "key": {"unused": 1}, "name": "unused_1"}]}, + lsid, + txnNumber + } +]; +expectedChanges.push(...otherDBEvents); +csOptions.allChangesForCluster = true; +changeStream = cst.startWatchingChanges({pipeline, collection: 1}); +cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); + +cst.cleanUp(); +})(); diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp index 75c9e930808..b45af75c96d 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp @@ -256,8 +256,12 @@ std::unique_ptr buildTransactionFilter( // Match relevant command events on the monitored namespaces. orBuilder.append(BSON( "o.applyOps" << BSON( - "$elemMatch" << BSON("ns" << BSONRegEx(cmdNsRegex) - << OR(BSON("o.create" << BSONRegEx(collRegex))))))); + "$elemMatch" << BSON( + "ns" << BSONRegEx(cmdNsRegex) + << OR(BSON("o.create" << BSONRegEx(collRegex)), + // We don't need to consider 'o.commitIndexBuild' here because + // creating an index on a non-empty collection is not allowed. + BSON("o.createIndexes" << BSONRegEx(collRegex))))))); // The default repl::OpTime is the value used to indicate a null "prevOpTime" link. orBuilder.append(BSON(repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName -- cgit v1.2.1