diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2022-05-18 22:59:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-24 21:12:59 +0000 |
commit | 222a559ce8c742b4170985979291fe563d888d17 (patch) | |
tree | 217a6748442ed65aded3f5a211fb1af8e8c32f6a | |
parent | 27921f09fb1f2125a9c2da1b36bde75993c21100 (diff) | |
download | mongo-222a559ce8c742b4170985979291fe563d888d17.tar.gz |
SERVER-66565 Create config.transactions partial index in setFCV
20 files changed, 543 insertions, 74 deletions
diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js b/jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js index e145e9294ad..f5dcb21d859 100644 --- a/jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js +++ b/jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js @@ -131,6 +131,31 @@ var $config = extendWorkload($config, function($config, $super) { } }; + // Runs concurrent retryable writes to verify they always observe the partial index after + // upgrade. + $config.states.retryableWrite = function(db, collName) { + const retryableWriteSession = db.getMongo().startSession( + {causalConsistency: this.shouldUseCausalConsistency, retryWrites: true}); + const retryableWriteDB = retryableWriteSession.getDatabase(db.getName()); + + print("Starting retryable write state, session: " + + tojsononeline(retryableWriteSession.getSessionId())); + + const docId = UUID(); + for (let i = 0; i < 10; ++i) { + const res = assert.commandWorked( + retryableWriteDB[collName].update({_id: docId, forRetryableWriteState: true}, + {$inc: {counter: 1}, $set: {idx: i}}, + {upsert: true})); + if (i === 0) { + assert.eq(res.nUpserted, 1, tojson(res)); + } else { + assert.eq(res.nModified, 1, tojson(res)); + } + } + print("Finished retryable write state"); + }; + if ($config.passConnectionCache) { // If 'passConnectionCache' is true, every state function must accept 3 parameters: db, // collName and connCache. This workload does not set 'passConnectionCache' since it doesn't @@ -143,58 +168,74 @@ var $config = extendWorkload($config, function($config, $super) { $config.transitions = { init: { - setFCV: 0.2, - internalTransactionForInsert: 0.2, - internalTransactionForUpdate: 0.2, - internalTransactionForDelete: 0.2, - internalTransactionForFindAndModify: 0.2, + setFCV: 0.18, + internalTransactionForInsert: 0.18, + internalTransactionForUpdate: 0.18, + internalTransactionForDelete: 0.18, + internalTransactionForFindAndModify: 0.18, + retryableWrite: 0.1, }, setFCV: { - setFCV: 0.2, + setFCV: 0.15, internalTransactionForInsert: 0.15, internalTransactionForUpdate: 0.15, internalTransactionForDelete: 0.15, internalTransactionForFindAndModify: 0.15, - verifyDocuments: 0.2 + retryableWrite: 0.1, + verifyDocuments: 0.15 }, internalTransactionForInsert: { - setFCV: 0.4, + setFCV: 0.3, internalTransactionForInsert: 0.1, internalTransactionForUpdate: 0.1, internalTransactionForDelete: 0.1, internalTransactionForFindAndModify: 0.1, + retryableWrite: 0.1, verifyDocuments: 0.2 }, internalTransactionForUpdate: { - setFCV: 0.4, + setFCV: 0.3, internalTransactionForInsert: 0.1, internalTransactionForUpdate: 0.1, internalTransactionForDelete: 0.1, internalTransactionForFindAndModify: 0.1, + retryableWrite: 0.1, verifyDocuments: 0.2 }, internalTransactionForDelete: { - setFCV: 0.4, + setFCV: 0.3, internalTransactionForInsert: 0.1, internalTransactionForUpdate: 0.1, internalTransactionForDelete: 0.1, internalTransactionForFindAndModify: 0.1, + retryableWrite: 0.1, verifyDocuments: 0.2 }, internalTransactionForFindAndModify: { - setFCV: 0.4, + setFCV: 0.3, internalTransactionForInsert: 0.1, internalTransactionForUpdate: 0.1, internalTransactionForDelete: 0.1, internalTransactionForFindAndModify: 0.1, + retryableWrite: 0.1, verifyDocuments: 0.2 }, verifyDocuments: { - setFCV: 0.4, + setFCV: 0.3, + internalTransactionForInsert: 0.1, + internalTransactionForUpdate: 0.1, + internalTransactionForDelete: 0.1, + internalTransactionForFindAndModify: 0.1, + retryableWrite: 0.1, + verifyDocuments: 0.2 + }, + retryableWrite: { + setFCV: 0.3, internalTransactionForInsert: 0.1, internalTransactionForUpdate: 0.1, internalTransactionForDelete: 0.1, internalTransactionForFindAndModify: 0.1, + retryableWrite: 0.1, verifyDocuments: 0.2 } }; diff --git a/jstests/multiVersion/internal_transactions_index_setFCV.js b/jstests/multiVersion/internal_transactions_index_setFCV.js new file mode 100644 index 00000000000..783dcaace3f --- /dev/null +++ b/jstests/multiVersion/internal_transactions_index_setFCV.js @@ -0,0 +1,152 @@ +/* + * Tests the setFCV command creates/removes the partial config.transactions index for retryable + * transactions on upgrade/downgrade. + * + * @tags: [uses_transactions] + */ +(function() { +"use strict"; + +// Verifies both the _id index and partial parent_lsid index exists for config.transactions. +function assertPartialIndexExists(node) { + const configDB = node.getDB("config"); + const indexSpecs = assert.commandWorked(configDB.runCommand({"listIndexes": "transactions"})) + .cursor.firstBatch; + indexSpecs.sort((index0, index1) => index0.name > index1.name); + assert.eq(indexSpecs.length, 2); + const idIndexSpec = indexSpecs[0]; + assert.eq(idIndexSpec.key, {"_id": 1}); + const partialIndexSpec = indexSpecs[1]; + assert.eq(partialIndexSpec.name, "parent_lsid"); + assert.eq(partialIndexSpec.key, {"parentLsid": 1, "_id.txnNumber": 1, "_id": 1}); + assert.eq(partialIndexSpec.partialFilterExpression, {"parentLsid": {"$exists": true}}); +} + +// Verifies only the _id index exists for config.transactions. +function assertPartialIndexDoesNotExist(node) { + const configDB = node.getDB("config"); + const indexSpecs = assert.commandWorked(configDB.runCommand({"listIndexes": "transactions"})) + .cursor.firstBatch; + assert.eq(indexSpecs.length, 1); + const idIndexSpec = indexSpecs[0]; + assert.eq(idIndexSpec.key, {"_id": 1}); +} + +/** + * Verifies the partial index is dropped/created on FCV transitions and retryable writes work in all + * FCVs. + */ +function runTest(setFCVConn, modifyIndexConns, verifyIndexConns) { + // Start at latest FCV which should have the index. + assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: latestFCV})); + verifyIndexConns.forEach(conn => { + assertPartialIndexExists(conn); + }); + + // Downgrade to last LTS removes index. + assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + verifyIndexConns.forEach(conn => { + assertPartialIndexDoesNotExist(conn); + }); + + assert.commandWorked(setFCVConn.getDB("foo").runCommand( + {insert: "bar", documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)})); + + // Upgrade from last LTS to latest adds index. + assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: latestFCV})); + verifyIndexConns.forEach(conn => { + assertPartialIndexExists(conn); + }); + + assert.commandWorked(setFCVConn.getDB("foo").runCommand( + {insert: "bar", documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)})); + + // Downgrade from latest to last continuous removes index. + assert.commandWorked( + setFCVConn.adminCommand({setFeatureCompatibilityVersion: lastContinuousFCV})); + verifyIndexConns.forEach(conn => { + assertPartialIndexDoesNotExist(conn); + }); + + assert.commandWorked(setFCVConn.getDB("foo").runCommand( + {insert: "bar", documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)})); + + // Upgrade from last continuous to latest LTS adds index. + assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: latestFCV})); + verifyIndexConns.forEach(conn => { + assertPartialIndexExists(conn); + }); + + assert.commandWorked(setFCVConn.getDB("foo").runCommand( + {insert: "bar", documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)})); + + // Verify downgrading ignores IndexNotFound. + modifyIndexConns.forEach(conn => { + assert.commandWorked(conn.getCollection("config.transactions").dropIndex("parent_lsid")); + }); + assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + verifyIndexConns.forEach(conn => { + assertPartialIndexDoesNotExist(conn); + }); + + // Verify upgrading works if the index already exists. + modifyIndexConns.forEach(conn => { + assert.commandWorked(conn.getDB("config").runCommand({ + createIndexes: "transactions", + indexes: [{ + v: 2, + partialFilterExpression: {parentLsid: {$exists: true}}, + name: "parent_lsid", + key: {parentLsid: 1, "_id.txnNumber": 1, _id: 1} + }], + })); + }); + assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: latestFCV})); + verifyIndexConns.forEach(conn => { + assertPartialIndexExists(conn); + }); +} + +{ + const st = new ShardingTest({shards: 1, rs: {nodes: 2}}); + // Note setFCV always waits for majority write concern so in a two node cluster secondaries will + // always have replicated the setFCV writes. + runTest(st.s, [st.rs0.getPrimary(), st.configRS.getPrimary()], [ + st.rs0.getPrimary(), + st.rs0.getSecondary(), + st.configRS.getPrimary(), + st.configRS.getSecondary() + ]); + st.stop(); +} + +{ + const rst = new ReplSetTest({nodes: 2}); + rst.startSet(); + rst.initiate(); + // Note setFCV always waits for majority write concern so in a two node cluster secondaries will + // always have replicated the setFCV writes. + runTest(rst.getPrimary(), [rst.getPrimary()], [rst.getPrimary(), rst.getSecondary()]); + rst.stopSet(); +} + +{ + const conn = MongoRunner.runMongod(); + + const configTxnsCollection = conn.getCollection("config.transactions"); + assert(!configTxnsCollection.exists()); + + // Verify each upgrade/downgrade path can succeed and won't implicitly create + // config.transactions, which doesn't exist on standalone mongods. + assert.commandWorked(conn.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV})); + assert(!configTxnsCollection.exists()); + + assert.commandWorked(conn.adminCommand({setFeatureCompatibilityVersion: latestFCV})); + assert(!configTxnsCollection.exists()); + + assert.commandWorked(conn.adminCommand({setFeatureCompatibilityVersion: lastContinuousFCV})); + assert(!configTxnsCollection.exists()); + + MongoRunner.stopMongod(conn); +} +})(); diff --git a/jstests/replsets/rollback_transaction_table.js b/jstests/replsets/rollback_transaction_table.js index 3e399517abf..7a13668b625 100644 --- a/jstests/replsets/rollback_transaction_table.js +++ b/jstests/replsets/rollback_transaction_table.js @@ -87,6 +87,18 @@ assert.commandWorked(downstream.getDB("config").transactions.renameCollection("f assert.commandWorked(downstream.getDB("config").foo.renameCollection("transactions")); assert(downstream.getDB("config").transactions.drop()); assert.commandWorked(downstream.getDB("config").createCollection("transactions")); +// Creating the index fails in FCVs lower than 6.0, but it isn't required in that configuration, so +// it's safe to ignore that error. +assert.commandWorkedOrFailedWithCode(downstream.getDB("config").runCommand({ + createIndexes: "transactions", + indexes: [{ + name: "parent_lsid", + key: {parentLsid: 1, "_id.txnNumber": 1, _id: 1}, + partialFilterExpression: {parentLsid: {$exists: true}}, + v: 2 + }] +}), + ErrorCodes.IllegalOperation); jsTestLog("Running a transaction on the 'downstream node' and waiting for it to replicate."); let firstLsid = {id: UUID()}; diff --git a/jstests/sharding/internal_txns/partial_index.js b/jstests/sharding/internal_txns/partial_index.js index ad4081dd086..032505660ab 100644 --- a/jstests/sharding/internal_txns/partial_index.js +++ b/jstests/sharding/internal_txns/partial_index.js @@ -115,5 +115,136 @@ st.rs0.nodes.forEach(node => { assertFindUsesCoveredQuery(node); }); +// +// Verify clients can create the index only if they provide the exact specification and that +// operations requiring the index fails if it does not exist. +// + +const indexConn = st.rs0.getPrimary(); +assert.commandWorked(indexConn.getCollection("config.transactions").dropIndex("parent_lsid")); + +// Normal writes don't involve config.transactions, so they succeed. +assert.commandWorked(indexConn.getDB(kDbName).runCommand( + {insert: kCollName, documents: [{x: 1}], lsid: {id: UUID()}})); + +// Retryable writes read from the partial index, so they fail. +let res = assert.commandFailedWithCode( + indexConn.getDB(kDbName).runCommand( + {insert: kCollName, documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)}), + ErrorCodes.BadValue); +assert(res.errmsg.includes("Please create an index directly "), tojson(res)); + +// User transactions read from the partial index, so they fail. +assert.commandFailedWithCode(indexConn.getDB(kDbName).runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: {id: UUID()}, + txnNumber: NumberLong(11), + startTransaction: true, + autocommit: false +}), + ErrorCodes.BadValue); + +// Non retryable internal transactions do not read from or update the partial index, so they can +// succeed without the index existing. +let nonRetryableTxnSession = {id: UUID(), txnUUID: UUID()}; +assert.commandWorked(indexConn.getDB(kDbName).runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: nonRetryableTxnSession, + txnNumber: NumberLong(11), + stmtId: NumberInt(0), + startTransaction: true, + autocommit: false +})); +assert.commandWorked(indexConn.adminCommand({ + commitTransaction: 1, + lsid: nonRetryableTxnSession, + txnNumber: NumberLong(11), + autocommit: false +})); + +// Retryable transactions read from the partial index, so they fail. +assert.commandFailedWithCode(indexConn.getDB(kDbName).runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: {id: UUID(), txnUUID: UUID(), txnNumber: NumberLong(2)}, + txnNumber: NumberLong(11), + stmtId: NumberInt(0), + startTransaction: true, + autocommit: false +}), + ErrorCodes.BadValue); + +// Recreating the partial index requires the exact options used internally, but in any order. +assert.commandFailedWithCode(indexConn.getDB("config").runCommand({ + createIndexes: "transactions", + indexes: [{v: 2, name: "parent_lsid", key: {parentLsid: 1, "_id.txnNumber": 1, _id: 1}}], +}), + ErrorCodes.IllegalOperation); +assert.commandWorked(indexConn.getDB("config").runCommand({ + createIndexes: "transactions", + indexes: [{ + name: "parent_lsid", + key: {parentLsid: 1, "_id.txnNumber": 1, _id: 1}, + partialFilterExpression: {parentLsid: {$exists: true}}, + v: 2, + }], +})); + +// Operations involving the index should succeed now. + +assert.commandWorked(indexConn.getDB(kDbName).runCommand( + {insert: kCollName, documents: [{x: 1}], lsid: {id: UUID()}})); + +assert.commandWorked(indexConn.getDB(kDbName).runCommand( + {insert: kCollName, documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)})); + +let userSessionAfter = {id: UUID()}; +assert.commandWorked(indexConn.getDB(kDbName).runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: userSessionAfter, + txnNumber: NumberLong(11), + startTransaction: true, + autocommit: false +})); +assert.commandWorked(indexConn.adminCommand( + {commitTransaction: 1, lsid: userSessionAfter, txnNumber: NumberLong(11), autocommit: false})); + +let nonRetryableTxnSessionAfter = {id: UUID(), txnUUID: UUID()}; +assert.commandWorked(indexConn.getDB(kDbName).runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: nonRetryableTxnSessionAfter, + txnNumber: NumberLong(11), + stmtId: NumberInt(0), + startTransaction: true, + autocommit: false +})); +assert.commandWorked(indexConn.adminCommand({ + commitTransaction: 1, + lsid: nonRetryableTxnSessionAfter, + txnNumber: NumberLong(11), + autocommit: false +})); + +let retryableTxnSessionAfter = {id: UUID(), txnUUID: UUID(), txnNumber: NumberLong(2)}; +assert.commandWorked(indexConn.getDB(kDbName).runCommand({ + insert: kCollName, + documents: [{x: 1}], + lsid: retryableTxnSessionAfter, + txnNumber: NumberLong(11), + stmtId: NumberInt(0), + startTransaction: true, + autocommit: false +})); +assert.commandWorked(indexConn.adminCommand({ + commitTransaction: 1, + lsid: retryableTxnSessionAfter, + txnNumber: NumberLong(11), + autocommit: false +})); + st.stop(); })(); diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 590b145470f..143c5eae3ef 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -563,6 +563,7 @@ env.Library( '$BUILD_DIR/mongo/db/exec/stagedebug_cmd', '$BUILD_DIR/mongo/db/fle_crud_mongod', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', + '$BUILD_DIR/mongo/db/internal_transactions_feature_flag', '$BUILD_DIR/mongo/db/multitenancy', '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface', diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index c665a45a983..c743a405714 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -35,6 +35,7 @@ #include <vector> #include "mongo/base/string_data.h" +#include "mongo/bson/unordered_fields_bsonobj_comparator.h" #include "mongo/crypto/encryption_fields_util.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/clustered_collection_util.h" @@ -63,6 +64,7 @@ #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/db/storage/two_phase_index_build_knobs_gen.h" #include "mongo/db/timeseries/catalog_helper.h" #include "mongo/db/timeseries/timeseries_commands_conversion_helper.h" @@ -444,16 +446,28 @@ CreateIndexesReply runCreateIndexesOnNewCollection( return reply; } +bool isCreatingInternalConfigTxnsPartialIndex(const CreateIndexesCommand& cmd) { + if (cmd.getIndexes().size() > 1) { + return false; + } + const auto& index = cmd.getIndexes()[0]; + + UnorderedFieldsBSONObjComparator comparator; + return comparator.compare(index, MongoDSessionCatalog::getConfigTxnPartialIndexSpec()) == 0; +} + CreateIndexesReply runCreateIndexesWithCoordinator(OperationContext* opCtx, const CreateIndexesCommand& cmd) { const auto ns = cmd.getNamespace(); uassertStatusOK(userAllowedWriteNS(opCtx, ns)); // Disallow users from creating new indexes on config.transactions since the sessions code - // was optimized to not update indexes + // was optimized to not update indexes. The only exception is the partial index used to support + // retryable transactions that the sessions code knows how to handle. uassert(ErrorCodes::IllegalOperation, str::stream() << "not allowed to create index on " << ns.ns(), - ns != NamespaceString::kSessionTransactionsTableNamespace); + ns != NamespaceString::kSessionTransactionsTableNamespace || + isCreatingInternalConfigTxnsPartialIndex(cmd)); uassert(ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "Cannot write to system collection " << ns.toString() diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 3f6eb510fb4..d62877c1521 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -53,6 +53,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/index_builds_coordinator.h" +#include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/persistent_task_store.h" @@ -79,6 +80,7 @@ #include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/server_options.h" #include "mongo/db/session_catalog.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" #include "mongo/db/vector_clock.h" @@ -117,6 +119,41 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeUpdatingSessionDocs); */ Lock::ResourceMutex commandMutex("setFCVCommandMutex"); +void createPartialConfigTransactionsIndex(OperationContext* opCtx) { + { + AutoGetCollection coll(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS); + if (!coll) { + // Early return to avoid implicitly creating config.transactions. + return; + } + } + + DBDirectClient client(opCtx); + + auto indexSpec = MongoDSessionCatalog::getConfigTxnPartialIndexSpec(); + + // Throws on error and is a noop if the index already exists. + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), {indexSpec}); +} + +void dropPartialConfigTransactionsIndex(OperationContext* opCtx) { + DBDirectClient client(opCtx); + + BSONObjBuilder cmdBuilder; + cmdBuilder.append("dropIndexes", NamespaceString::kSessionTransactionsTableNamespace.coll()); + cmdBuilder.append("index", MongoDSessionCatalog::kConfigTxnsPartialIndexName); + auto dropIndexCmd = cmdBuilder.obj(); + + // Throws on error. + BSONObj result; + client.runCommand( + NamespaceString::kSessionTransactionsTableNamespace.db().toString(), dropIndexCmd, result); + const auto status = getStatusFromWriteCommandReply(result); + if (status != ErrorCodes::IndexNotFound && status != ErrorCodes::NamespaceNotFound) { + uassertStatusOK(status); + } +} + /** * Deletes the persisted default read/write concern document. */ @@ -622,6 +659,10 @@ private: opCtx, CommandHelpers::appendMajorityWriteConcern(requestPhase2.toBSON({})))); } + if (feature_flags::gFeatureFlagInternalTransactions.isEnabledOnVersion(requestedVersion)) { + createPartialConfigTransactionsIndex(opCtx); + } + // Create the pre-images collection if the feature flag is enabled on the requested version. // TODO SERVER-61770: Remove once FCV 6.0 becomes last-lts. if (feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabledOnVersion( @@ -916,6 +957,10 @@ private: } } + if (!feature_flags::gFeatureFlagInternalTransactions.isEnabledOnVersion(requestedVersion)) { + dropPartialConfigTransactionsIndex(opCtx); + } + // TODO SERVER-64720 Remove when 6.0 becomes last LTS if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { ShardingDDLCoordinatorService::getService(opCtx) diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index c2e10b83299..8b194d14e95 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -176,6 +176,7 @@ public: // Ensure that we are primary. auto replCoord = repl::ReplicationCoordinator::get(opCtx.get()); ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + MongoDSessionCatalog::onStepUp(opCtx.get()); ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn()); } @@ -717,7 +718,6 @@ public: OpObserverTest::setUp(); auto opCtx = cc().makeOperationContext(); - MongoDSessionCatalog::onStepUp(opCtx.get()); } /** @@ -838,7 +838,6 @@ public: OpObserverTest::setUp(); _opCtx = cc().makeOperationContext(); _opObserver.emplace(); - MongoDSessionCatalog::onStepUp(opCtx()); _times.emplace(opCtx()); } diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index 17860543896..714dddef4b5 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -35,6 +35,7 @@ #include <boost/optional/optional_io.hpp> #include <vector> +#include "mongo/db/dbdirectclient.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/op_observer_noop.h" #include "mongo/db/op_observer_registry.h" @@ -50,6 +51,7 @@ #include "mongo/db/repl/tenant_oplog_batcher.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/service_context_test_fixture.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/logv2/log.h" #include "mongo/unittest/log_test.h" @@ -320,6 +322,11 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) { TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied) { createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace); + { + DBDirectClient client(_opCtx.get()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); + } NamespaceString nss(dbName, "bar"); auto uuid = createCollectionWithUuid(_opCtx.get(), nss); auto lsid = makeLogicalSessionId(_opCtx.get()); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp index 82ddf586f76..a30e3fa1a05 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp @@ -60,6 +60,8 @@ class ShardingCatalogManagerBumpCollectionVersionAndChangeMetadataTest auto opCtx = operationContext(); DBDirectClient client(opCtx); client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); client.createCollection(CollectionType::ConfigNS.ns()); LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 623524a545d..91e1b4a21bc 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" @@ -75,6 +76,8 @@ protected: auto opCtx = operationContext(); DBDirectClient client(opCtx); client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); // TODO: SERVER-26919 set the flag on the mock repl coordinator just for the window where it // actually needs to bypass the op observer. diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp index e3b59a8068f..6a5197b4c41 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp @@ -49,6 +49,7 @@ #include "mongo/db/s/resharding/resharding_service_test_helpers.h" #include "mongo/db/s/resharding/resharding_util.h" #include "mongo/db/s/transaction_coordinator_service.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_collection.h" @@ -149,6 +150,8 @@ public: auto opCtx = operationContext(); DBDirectClient client(opCtx); client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); client.createCollection(NamespaceString::kConfigReshardingOperationsNamespace.ns()); client.createCollection(CollectionType::ConfigNS.ns()); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 00788960871..a8fb4d83889 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -73,6 +73,8 @@ protected: auto opCtx = operationContext(); DBDirectClient client(opCtx); client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); client.createCollection(NamespaceString::kConfigReshardingOperationsNamespace.ns()); client.createCollection(CollectionType::ConfigNS.ns()); client.createIndex(TagsType::ConfigNS.ns(), BSON("ns" << 1 << "min" << 1)); diff --git a/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp index fb3925ea159..e942dcd139f 100644 --- a/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp @@ -184,6 +184,8 @@ protected: ReshardingEnv setupReshardingEnv(OperationContext* opCtx, bool refreshTempNss) { DBDirectClient client(opCtx); ASSERT(client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns())); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( opCtx); diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index dd709560ab5..02334d1f4bf 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -140,6 +140,9 @@ public: operationContext(), NamespaceString::kSessionTransactionsTableNamespace.db().toString(), BSON("create" << NamespaceString::kSessionTransactionsTableNamespace.coll()))); + DBDirectClient client(operationContext()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( operationContext()); diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index d36c42aa9b6..edf1049a847 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -45,6 +45,7 @@ #include "mongo/db/s/session_catalog_migration.h" #include "mongo/db/s/session_catalog_migration_source.h" #include "mongo/db/session.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_participant.h" #include "mongo/executor/remote_command_request.h" @@ -2554,6 +2555,8 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithCommittedWrit DBDirectClient client(opCtx()); client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); // Enter an oplog entry before creating SessionCatalogMigrationSource to set config.transactions // average object size to the size of this entry. auto entry = makeOplogEntry( diff --git a/src/mongo/db/s/sharding_ddl_util_test.cpp b/src/mongo/db/s/sharding_ddl_util_test.cpp index 044e11ccda9..f1921cf38c4 100644 --- a/src/mongo/db/s/sharding_ddl_util_test.cpp +++ b/src/mongo/db/s/sharding_ddl_util_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/transaction_coordinator_service.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" @@ -59,6 +60,8 @@ protected: auto opCtx = operationContext(); DBDirectClient client(opCtx); client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()}); client.createCollection(NamespaceString::kConfigReshardingOperationsNamespace.ns()); client.createCollection(CollectionType::ConfigNS.ns()); diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 5cd0b76ad23..0e14c08a058 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -346,19 +346,11 @@ void createTransactionTable(OperationContext* opCtx) { str::stream() << "Failed to create the " << NamespaceString::kSessionTransactionsTableNamespace.ns() << " collection"); - NewIndexSpec index; - index.setV(int(IndexDescriptor::kLatestIndexVersion)); - index.setKey(BSON( - SessionTxnRecord::kParentSessionIdFieldName - << 1 - << (SessionTxnRecord::kSessionIdFieldName + "." + LogicalSessionId::kTxnNumberFieldName) - << 1 << SessionTxnRecord::kSessionIdFieldName << 1)); - index.setName("parent_lsid"); - index.setPartialFilterExpression(BSON("parentLsid" << BSON("$exists" << true))); + auto indexSpec = MongoDSessionCatalog::getConfigTxnPartialIndexSpec(); const auto createIndexStatus = repl::StorageInterface::get(opCtx)->createIndexesOnEmptyCollection( - opCtx, NamespaceString::kSessionTransactionsTableNamespace, {index.toBSON()}); + opCtx, NamespaceString::kSessionTransactionsTableNamespace, {indexSpec}); uassertStatusOKWithContext( createIndexStatus, str::stream() << "Failed to create partial index for the " @@ -411,6 +403,21 @@ void abortInProgressTransactions(OperationContext* opCtx) { } } // namespace +const std::string MongoDSessionCatalog::kConfigTxnsPartialIndexName = "parent_lsid"; + +BSONObj MongoDSessionCatalog::getConfigTxnPartialIndexSpec() { + NewIndexSpec index; + index.setV(int(IndexDescriptor::kLatestIndexVersion)); + index.setKey(BSON( + SessionTxnRecord::kParentSessionIdFieldName + << 1 + << (SessionTxnRecord::kSessionIdFieldName + "." + LogicalSessionId::kTxnNumberFieldName) + << 1 << SessionTxnRecord::kSessionIdFieldName << 1)); + index.setName(MongoDSessionCatalog::kConfigTxnsPartialIndexName); + index.setPartialFilterExpression(BSON("parentLsid" << BSON("$exists" << true))); + return index.toBSON(); +} + void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) { // Invalidate sessions that could have a retryable write on it, so that we can refresh from disk // in case the in-memory state was out of sync. diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h index a3179683746..eb8402aece4 100644 --- a/src/mongo/db/session_catalog_mongod.h +++ b/src/mongo/db/session_catalog_mongod.h @@ -37,6 +37,14 @@ class SessionsCollection; class MongoDSessionCatalog { public: + static const std::string kConfigTxnsPartialIndexName; + + /** + * Returns the specification for the partial index on config.transactions used to support + * retryable transactions. + */ + static BSONObj getConfigTxnPartialIndexSpec(); + /** * Invoked when the node enters the primary state. Ensures that the transactions collection is * created. Throws on severe exceptions due to which it is not safe to continue the step-up diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index a2ff68bb8a8..77817e23ed2 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -66,6 +66,7 @@ #include "mongo/db/s/sharding_write_router.h" #include "mongo/db/server_recovery.h" #include "mongo/db/server_transactions_metrics.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/db/stats/fill_locker_info.h" #include "mongo/db/storage/flow_control.h" #include "mongo/db/transaction_history_iterator.h" @@ -180,6 +181,20 @@ auto performReadWithNoTimestampDBDirectClient(OperationContext* opCtx, Callable& return callable(&client); } +void rethrowPartialIndexQueryBadValueWithContext(const DBException& ex) { + if (ex.reason().find("hint provided does not correspond to an existing index")) { + uassertStatusOKWithContext( + ex.toStatus(), + str::stream() + << "Failed to find partial index for " + << NamespaceString::kSessionTransactionsTableNamespace.ns() + << ". Please create an index directly on this replica set with the specification: " + << MongoDSessionCatalog::getConfigTxnPartialIndexSpec() << " or drop the " + << NamespaceString::kSessionTransactionsTableNamespace.ns() + << " collection and step up a new primary."); + } +} + struct ActiveTransactionHistory { boost::optional<SessionTxnRecord> lastTxnRecord; TransactionParticipant::CommittedStatementTimestampMap committedStatements; @@ -326,29 +341,35 @@ TxnNumber fetchHighestTxnNumberWithInternalSessions(OperationContext* opCtx, highestTxnNumber = osession.getHighestTxnNumberWithChildSessions(); }); - performReadWithNoTimestampDBDirectClient(opCtx, [&](DBDirectClient* client) { - FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace}; - findRequest.setFilter(BSON( - SessionTxnRecord::kParentSessionIdFieldName - << parentLsid.toBSON() - << (SessionTxnRecord::kSessionIdFieldName + "." + LogicalSessionId::kTxnNumberFieldName) - << BSON("$gte" << highestTxnNumber))); - findRequest.setSort(BSON( - (SessionTxnRecord::kSessionIdFieldName + "." + LogicalSessionId::kTxnNumberFieldName) - << -1)); - findRequest.setProjection(BSON(SessionTxnRecord::kSessionIdFieldName << 1)); - findRequest.setLimit(1); - - auto cursor = client->find(findRequest); - - while (cursor->more()) { - const auto doc = cursor->next(); - const auto childLsid = LogicalSessionId::parse( - IDLParserErrorContext("LogicalSessionId"), doc.getObjectField("_id")); - highestTxnNumber = std::max(highestTxnNumber, *childLsid.getTxnNumber()); - invariant(!cursor->more()); - } - }); + try { + performReadWithNoTimestampDBDirectClient(opCtx, [&](DBDirectClient* client) { + FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace}; + findRequest.setFilter(BSON(SessionTxnRecord::kParentSessionIdFieldName + << parentLsid.toBSON() + << (SessionTxnRecord::kSessionIdFieldName + "." + + LogicalSessionId::kTxnNumberFieldName) + << BSON("$gte" << highestTxnNumber))); + findRequest.setSort(BSON((SessionTxnRecord::kSessionIdFieldName + "." + + LogicalSessionId::kTxnNumberFieldName) + << -1)); + findRequest.setProjection(BSON(SessionTxnRecord::kSessionIdFieldName << 1)); + findRequest.setLimit(1); + findRequest.setHint(BSON("$hint" << MongoDSessionCatalog::kConfigTxnsPartialIndexName)); + + auto cursor = client->find(findRequest); + + while (cursor->more()) { + const auto doc = cursor->next(); + const auto childLsid = LogicalSessionId::parse( + IDLParserErrorContext("LogicalSessionId"), doc.getObjectField("_id")); + highestTxnNumber = std::max(highestTxnNumber, *childLsid.getTxnNumber()); + invariant(!cursor->more()); + } + }); + } catch (const ExceptionFor<ErrorCodes::BadValue>& ex) { + rethrowPartialIndexQueryBadValueWithContext(ex); + throw; + } return highestTxnNumber; } @@ -2874,22 +2895,27 @@ void TransactionParticipant::Participant::_refreshActiveTransactionParticipantsF // Make sure that every child session has a corresponding // Session/TransactionParticipant. - performReadWithNoTimestampDBDirectClient(opCtx, [&](DBDirectClient* client) { - FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace}; - findRequest.setFilter(BSON(SessionTxnRecord::kParentSessionIdFieldName - << parentTxnParticipant._sessionId().toBSON() - << (SessionTxnRecord::kSessionIdFieldName + "." + - LogicalSessionId::kTxnNumberFieldName) - << BSON("$gte" << *activeRetryableWriteTxnNumber))); - findRequest.setProjection(BSON("_id" << 1)); - - auto cursor = client->find(findRequest); - - while (cursor->more()) { - const auto doc = cursor->next(); - const auto childLsid = LogicalSessionId::parse( - IDLParserErrorContext("LogicalSessionId"), doc.getObjectField("_id")); - uassert(6202001, + try { + performReadWithNoTimestampDBDirectClient(opCtx, [&](DBDirectClient* client) { + FindCommandRequest findRequest{ + NamespaceString::kSessionTransactionsTableNamespace}; + findRequest.setFilter(BSON(SessionTxnRecord::kParentSessionIdFieldName + << parentTxnParticipant._sessionId().toBSON() + << (SessionTxnRecord::kSessionIdFieldName + "." + + LogicalSessionId::kTxnNumberFieldName) + << BSON("$gte" << *activeRetryableWriteTxnNumber))); + findRequest.setProjection(BSON(SessionTxnRecord::kSessionIdFieldName << 1)); + findRequest.setHint( + BSON("$hint" << MongoDSessionCatalog::kConfigTxnsPartialIndexName)); + + auto cursor = client->find(findRequest); + + while (cursor->more()) { + const auto doc = cursor->next(); + const auto childLsid = LogicalSessionId::parse( + IDLParserErrorContext("LogicalSessionId"), doc.getObjectField("_id")); + uassert( + 6202001, str::stream() << "Refresh expected the highest transaction number in the session " << parentTxnParticipant._sessionId() << " to be " @@ -2898,15 +2924,20 @@ void TransactionParticipant::Participant::_refreshActiveTransactionParticipantsF << " entry for an internal transaction for retryable writes with " << "transaction number " << *childLsid.getTxnNumber(), *childLsid.getTxnNumber() == *activeRetryableWriteTxnNumber); - auto sessionCatalog = SessionCatalog::get(opCtx); - sessionCatalog->createSessionIfDoesNotExist(childLsid); - sessionCatalog->scanSession(childLsid, [&](const ObservableSession& osession) { - auto childTxnParticipant = - TransactionParticipant::get(opCtx, osession.get()); - childTxnParticipants.push_back(childTxnParticipant); - }); - } - }); + auto sessionCatalog = SessionCatalog::get(opCtx); + sessionCatalog->createSessionIfDoesNotExist(childLsid); + sessionCatalog->scanSession( + childLsid, [&](const ObservableSession& osession) { + auto childTxnParticipant = + TransactionParticipant::get(opCtx, osession.get()); + childTxnParticipants.push_back(childTxnParticipant); + }); + } + }); + } catch (const ExceptionFor<ErrorCodes::BadValue>& ex) { + rethrowPartialIndexQueryBadValueWithContext(ex); + throw; + } for (auto& childTxnParticipant : childTxnParticipants) { childTxnParticipant._refreshSelfFromStorageIfNeeded(opCtx, fetchOplogEntries); |