summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-05-18 22:59:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-24 21:12:59 +0000
commit222a559ce8c742b4170985979291fe563d888d17 (patch)
tree217a6748442ed65aded3f5a211fb1af8e8c32f6a
parent27921f09fb1f2125a9c2da1b36bde75993c21100 (diff)
downloadmongo-222a559ce8c742b4170985979291fe563d888d17.tar.gz
SERVER-66565 Create config.transactions partial index in setFCV
-rw-r--r--jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js65
-rw-r--r--jstests/multiVersion/internal_transactions_index_setFCV.js152
-rw-r--r--jstests/replsets/rollback_transaction_table.js12
-rw-r--r--jstests/sharding/internal_txns/partial_index.js131
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/create_indexes.cpp18
-rw-r--r--src/mongo/db/commands/set_feature_compatibility_version_command.cpp45
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp3
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp7
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp2
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp3
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp3
-rw-r--r--src/mongo/db/s/sharding_ddl_util_test.cpp3
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp27
-rw-r--r--src/mongo/db/session_catalog_mongod.h8
-rw-r--r--src/mongo/db/transaction_participant.cpp127
20 files changed, 543 insertions, 74 deletions
diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js b/jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js
index e145e9294ad..f5dcb21d859 100644
--- a/jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js
+++ b/jstests/concurrency/fsm_workloads/internal_transactions_setFCV.js
@@ -131,6 +131,31 @@ var $config = extendWorkload($config, function($config, $super) {
}
};
+ // Runs concurrent retryable writes to verify they always observe the partial index after
+ // upgrade.
+ $config.states.retryableWrite = function(db, collName) {
+ const retryableWriteSession = db.getMongo().startSession(
+ {causalConsistency: this.shouldUseCausalConsistency, retryWrites: true});
+ const retryableWriteDB = retryableWriteSession.getDatabase(db.getName());
+
+ print("Starting retryable write state, session: " +
+ tojsononeline(retryableWriteSession.getSessionId()));
+
+ const docId = UUID();
+ for (let i = 0; i < 10; ++i) {
+ const res = assert.commandWorked(
+ retryableWriteDB[collName].update({_id: docId, forRetryableWriteState: true},
+ {$inc: {counter: 1}, $set: {idx: i}},
+ {upsert: true}));
+ if (i === 0) {
+ assert.eq(res.nUpserted, 1, tojson(res));
+ } else {
+ assert.eq(res.nModified, 1, tojson(res));
+ }
+ }
+ print("Finished retryable write state");
+ };
+
if ($config.passConnectionCache) {
// If 'passConnectionCache' is true, every state function must accept 3 parameters: db,
// collName and connCache. This workload does not set 'passConnectionCache' since it doesn't
@@ -143,58 +168,74 @@ var $config = extendWorkload($config, function($config, $super) {
$config.transitions = {
init: {
- setFCV: 0.2,
- internalTransactionForInsert: 0.2,
- internalTransactionForUpdate: 0.2,
- internalTransactionForDelete: 0.2,
- internalTransactionForFindAndModify: 0.2,
+ setFCV: 0.18,
+ internalTransactionForInsert: 0.18,
+ internalTransactionForUpdate: 0.18,
+ internalTransactionForDelete: 0.18,
+ internalTransactionForFindAndModify: 0.18,
+ retryableWrite: 0.1,
},
setFCV: {
- setFCV: 0.2,
+ setFCV: 0.15,
internalTransactionForInsert: 0.15,
internalTransactionForUpdate: 0.15,
internalTransactionForDelete: 0.15,
internalTransactionForFindAndModify: 0.15,
- verifyDocuments: 0.2
+ retryableWrite: 0.1,
+ verifyDocuments: 0.15
},
internalTransactionForInsert: {
- setFCV: 0.4,
+ setFCV: 0.3,
internalTransactionForInsert: 0.1,
internalTransactionForUpdate: 0.1,
internalTransactionForDelete: 0.1,
internalTransactionForFindAndModify: 0.1,
+ retryableWrite: 0.1,
verifyDocuments: 0.2
},
internalTransactionForUpdate: {
- setFCV: 0.4,
+ setFCV: 0.3,
internalTransactionForInsert: 0.1,
internalTransactionForUpdate: 0.1,
internalTransactionForDelete: 0.1,
internalTransactionForFindAndModify: 0.1,
+ retryableWrite: 0.1,
verifyDocuments: 0.2
},
internalTransactionForDelete: {
- setFCV: 0.4,
+ setFCV: 0.3,
internalTransactionForInsert: 0.1,
internalTransactionForUpdate: 0.1,
internalTransactionForDelete: 0.1,
internalTransactionForFindAndModify: 0.1,
+ retryableWrite: 0.1,
verifyDocuments: 0.2
},
internalTransactionForFindAndModify: {
- setFCV: 0.4,
+ setFCV: 0.3,
internalTransactionForInsert: 0.1,
internalTransactionForUpdate: 0.1,
internalTransactionForDelete: 0.1,
internalTransactionForFindAndModify: 0.1,
+ retryableWrite: 0.1,
verifyDocuments: 0.2
},
verifyDocuments: {
- setFCV: 0.4,
+ setFCV: 0.3,
+ internalTransactionForInsert: 0.1,
+ internalTransactionForUpdate: 0.1,
+ internalTransactionForDelete: 0.1,
+ internalTransactionForFindAndModify: 0.1,
+ retryableWrite: 0.1,
+ verifyDocuments: 0.2
+ },
+ retryableWrite: {
+ setFCV: 0.3,
internalTransactionForInsert: 0.1,
internalTransactionForUpdate: 0.1,
internalTransactionForDelete: 0.1,
internalTransactionForFindAndModify: 0.1,
+ retryableWrite: 0.1,
verifyDocuments: 0.2
}
};
diff --git a/jstests/multiVersion/internal_transactions_index_setFCV.js b/jstests/multiVersion/internal_transactions_index_setFCV.js
new file mode 100644
index 00000000000..783dcaace3f
--- /dev/null
+++ b/jstests/multiVersion/internal_transactions_index_setFCV.js
@@ -0,0 +1,152 @@
+/*
+ * Tests the setFCV command creates/removes the partial config.transactions index for retryable
+ * transactions on upgrade/downgrade.
+ *
+ * @tags: [uses_transactions]
+ */
+(function() {
+"use strict";
+
+// Verifies both the _id index and partial parent_lsid index exists for config.transactions.
+function assertPartialIndexExists(node) {
+ const configDB = node.getDB("config");
+ const indexSpecs = assert.commandWorked(configDB.runCommand({"listIndexes": "transactions"}))
+ .cursor.firstBatch;
+ indexSpecs.sort((index0, index1) => index0.name > index1.name);
+ assert.eq(indexSpecs.length, 2);
+ const idIndexSpec = indexSpecs[0];
+ assert.eq(idIndexSpec.key, {"_id": 1});
+ const partialIndexSpec = indexSpecs[1];
+ assert.eq(partialIndexSpec.name, "parent_lsid");
+ assert.eq(partialIndexSpec.key, {"parentLsid": 1, "_id.txnNumber": 1, "_id": 1});
+ assert.eq(partialIndexSpec.partialFilterExpression, {"parentLsid": {"$exists": true}});
+}
+
+// Verifies only the _id index exists for config.transactions.
+function assertPartialIndexDoesNotExist(node) {
+ const configDB = node.getDB("config");
+ const indexSpecs = assert.commandWorked(configDB.runCommand({"listIndexes": "transactions"}))
+ .cursor.firstBatch;
+ assert.eq(indexSpecs.length, 1);
+ const idIndexSpec = indexSpecs[0];
+ assert.eq(idIndexSpec.key, {"_id": 1});
+}
+
+/**
+ * Verifies the partial index is dropped/created on FCV transitions and retryable writes work in all
+ * FCVs.
+ */
+function runTest(setFCVConn, modifyIndexConns, verifyIndexConns) {
+ // Start at latest FCV which should have the index.
+ assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+ verifyIndexConns.forEach(conn => {
+ assertPartialIndexExists(conn);
+ });
+
+ // Downgrade to last LTS removes index.
+ assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
+ verifyIndexConns.forEach(conn => {
+ assertPartialIndexDoesNotExist(conn);
+ });
+
+ assert.commandWorked(setFCVConn.getDB("foo").runCommand(
+ {insert: "bar", documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)}));
+
+ // Upgrade from last LTS to latest adds index.
+ assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+ verifyIndexConns.forEach(conn => {
+ assertPartialIndexExists(conn);
+ });
+
+ assert.commandWorked(setFCVConn.getDB("foo").runCommand(
+ {insert: "bar", documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)}));
+
+ // Downgrade from latest to last continuous removes index.
+ assert.commandWorked(
+ setFCVConn.adminCommand({setFeatureCompatibilityVersion: lastContinuousFCV}));
+ verifyIndexConns.forEach(conn => {
+ assertPartialIndexDoesNotExist(conn);
+ });
+
+ assert.commandWorked(setFCVConn.getDB("foo").runCommand(
+ {insert: "bar", documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)}));
+
+ // Upgrade from last continuous to latest LTS adds index.
+ assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+ verifyIndexConns.forEach(conn => {
+ assertPartialIndexExists(conn);
+ });
+
+ assert.commandWorked(setFCVConn.getDB("foo").runCommand(
+ {insert: "bar", documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)}));
+
+ // Verify downgrading ignores IndexNotFound.
+ modifyIndexConns.forEach(conn => {
+ assert.commandWorked(conn.getCollection("config.transactions").dropIndex("parent_lsid"));
+ });
+ assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
+ verifyIndexConns.forEach(conn => {
+ assertPartialIndexDoesNotExist(conn);
+ });
+
+ // Verify upgrading works if the index already exists.
+ modifyIndexConns.forEach(conn => {
+ assert.commandWorked(conn.getDB("config").runCommand({
+ createIndexes: "transactions",
+ indexes: [{
+ v: 2,
+ partialFilterExpression: {parentLsid: {$exists: true}},
+ name: "parent_lsid",
+ key: {parentLsid: 1, "_id.txnNumber": 1, _id: 1}
+ }],
+ }));
+ });
+ assert.commandWorked(setFCVConn.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+ verifyIndexConns.forEach(conn => {
+ assertPartialIndexExists(conn);
+ });
+}
+
+{
+ const st = new ShardingTest({shards: 1, rs: {nodes: 2}});
+ // Note setFCV always waits for majority write concern so in a two node cluster secondaries will
+ // always have replicated the setFCV writes.
+ runTest(st.s, [st.rs0.getPrimary(), st.configRS.getPrimary()], [
+ st.rs0.getPrimary(),
+ st.rs0.getSecondary(),
+ st.configRS.getPrimary(),
+ st.configRS.getSecondary()
+ ]);
+ st.stop();
+}
+
+{
+ const rst = new ReplSetTest({nodes: 2});
+ rst.startSet();
+ rst.initiate();
+ // Note setFCV always waits for majority write concern so in a two node cluster secondaries will
+ // always have replicated the setFCV writes.
+ runTest(rst.getPrimary(), [rst.getPrimary()], [rst.getPrimary(), rst.getSecondary()]);
+ rst.stopSet();
+}
+
+{
+ const conn = MongoRunner.runMongod();
+
+ const configTxnsCollection = conn.getCollection("config.transactions");
+ assert(!configTxnsCollection.exists());
+
+ // Verify each upgrade/downgrade path can succeed and won't implicitly create
+ // config.transactions, which doesn't exist on standalone mongods.
+ assert.commandWorked(conn.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
+ assert(!configTxnsCollection.exists());
+
+ assert.commandWorked(conn.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+ assert(!configTxnsCollection.exists());
+
+ assert.commandWorked(conn.adminCommand({setFeatureCompatibilityVersion: lastContinuousFCV}));
+ assert(!configTxnsCollection.exists());
+
+ MongoRunner.stopMongod(conn);
+}
+})();
diff --git a/jstests/replsets/rollback_transaction_table.js b/jstests/replsets/rollback_transaction_table.js
index 3e399517abf..7a13668b625 100644
--- a/jstests/replsets/rollback_transaction_table.js
+++ b/jstests/replsets/rollback_transaction_table.js
@@ -87,6 +87,18 @@ assert.commandWorked(downstream.getDB("config").transactions.renameCollection("f
assert.commandWorked(downstream.getDB("config").foo.renameCollection("transactions"));
assert(downstream.getDB("config").transactions.drop());
assert.commandWorked(downstream.getDB("config").createCollection("transactions"));
+// Creating the index fails in FCVs lower than 6.0, but it isn't required in that configuration, so
+// it's safe to ignore that error.
+assert.commandWorkedOrFailedWithCode(downstream.getDB("config").runCommand({
+ createIndexes: "transactions",
+ indexes: [{
+ name: "parent_lsid",
+ key: {parentLsid: 1, "_id.txnNumber": 1, _id: 1},
+ partialFilterExpression: {parentLsid: {$exists: true}},
+ v: 2
+ }]
+}),
+ ErrorCodes.IllegalOperation);
jsTestLog("Running a transaction on the 'downstream node' and waiting for it to replicate.");
let firstLsid = {id: UUID()};
diff --git a/jstests/sharding/internal_txns/partial_index.js b/jstests/sharding/internal_txns/partial_index.js
index ad4081dd086..032505660ab 100644
--- a/jstests/sharding/internal_txns/partial_index.js
+++ b/jstests/sharding/internal_txns/partial_index.js
@@ -115,5 +115,136 @@ st.rs0.nodes.forEach(node => {
assertFindUsesCoveredQuery(node);
});
+//
+// Verify clients can create the index only if they provide the exact specification and that
+// operations requiring the index fails if it does not exist.
+//
+
+const indexConn = st.rs0.getPrimary();
+assert.commandWorked(indexConn.getCollection("config.transactions").dropIndex("parent_lsid"));
+
+// Normal writes don't involve config.transactions, so they succeed.
+assert.commandWorked(indexConn.getDB(kDbName).runCommand(
+ {insert: kCollName, documents: [{x: 1}], lsid: {id: UUID()}}));
+
+// Retryable writes read from the partial index, so they fail.
+let res = assert.commandFailedWithCode(
+ indexConn.getDB(kDbName).runCommand(
+ {insert: kCollName, documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)}),
+ ErrorCodes.BadValue);
+assert(res.errmsg.includes("Please create an index directly "), tojson(res));
+
+// User transactions read from the partial index, so they fail.
+assert.commandFailedWithCode(indexConn.getDB(kDbName).runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(11),
+ startTransaction: true,
+ autocommit: false
+}),
+ ErrorCodes.BadValue);
+
+// Non retryable internal transactions do not read from or update the partial index, so they can
+// succeed without the index existing.
+let nonRetryableTxnSession = {id: UUID(), txnUUID: UUID()};
+assert.commandWorked(indexConn.getDB(kDbName).runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: nonRetryableTxnSession,
+ txnNumber: NumberLong(11),
+ stmtId: NumberInt(0),
+ startTransaction: true,
+ autocommit: false
+}));
+assert.commandWorked(indexConn.adminCommand({
+ commitTransaction: 1,
+ lsid: nonRetryableTxnSession,
+ txnNumber: NumberLong(11),
+ autocommit: false
+}));
+
+// Retryable transactions read from the partial index, so they fail.
+assert.commandFailedWithCode(indexConn.getDB(kDbName).runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: {id: UUID(), txnUUID: UUID(), txnNumber: NumberLong(2)},
+ txnNumber: NumberLong(11),
+ stmtId: NumberInt(0),
+ startTransaction: true,
+ autocommit: false
+}),
+ ErrorCodes.BadValue);
+
+// Recreating the partial index requires the exact options used internally, but in any order.
+assert.commandFailedWithCode(indexConn.getDB("config").runCommand({
+ createIndexes: "transactions",
+ indexes: [{v: 2, name: "parent_lsid", key: {parentLsid: 1, "_id.txnNumber": 1, _id: 1}}],
+}),
+ ErrorCodes.IllegalOperation);
+assert.commandWorked(indexConn.getDB("config").runCommand({
+ createIndexes: "transactions",
+ indexes: [{
+ name: "parent_lsid",
+ key: {parentLsid: 1, "_id.txnNumber": 1, _id: 1},
+ partialFilterExpression: {parentLsid: {$exists: true}},
+ v: 2,
+ }],
+}));
+
+// Operations involving the index should succeed now.
+
+assert.commandWorked(indexConn.getDB(kDbName).runCommand(
+ {insert: kCollName, documents: [{x: 1}], lsid: {id: UUID()}}));
+
+assert.commandWorked(indexConn.getDB(kDbName).runCommand(
+ {insert: kCollName, documents: [{x: 1}], lsid: {id: UUID()}, txnNumber: NumberLong(11)}));
+
+let userSessionAfter = {id: UUID()};
+assert.commandWorked(indexConn.getDB(kDbName).runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: userSessionAfter,
+ txnNumber: NumberLong(11),
+ startTransaction: true,
+ autocommit: false
+}));
+assert.commandWorked(indexConn.adminCommand(
+ {commitTransaction: 1, lsid: userSessionAfter, txnNumber: NumberLong(11), autocommit: false}));
+
+let nonRetryableTxnSessionAfter = {id: UUID(), txnUUID: UUID()};
+assert.commandWorked(indexConn.getDB(kDbName).runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: nonRetryableTxnSessionAfter,
+ txnNumber: NumberLong(11),
+ stmtId: NumberInt(0),
+ startTransaction: true,
+ autocommit: false
+}));
+assert.commandWorked(indexConn.adminCommand({
+ commitTransaction: 1,
+ lsid: nonRetryableTxnSessionAfter,
+ txnNumber: NumberLong(11),
+ autocommit: false
+}));
+
+let retryableTxnSessionAfter = {id: UUID(), txnUUID: UUID(), txnNumber: NumberLong(2)};
+assert.commandWorked(indexConn.getDB(kDbName).runCommand({
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: retryableTxnSessionAfter,
+ txnNumber: NumberLong(11),
+ stmtId: NumberInt(0),
+ startTransaction: true,
+ autocommit: false
+}));
+assert.commandWorked(indexConn.adminCommand({
+ commitTransaction: 1,
+ lsid: retryableTxnSessionAfter,
+ txnNumber: NumberLong(11),
+ autocommit: false
+}));
+
st.stop();
})();
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 590b145470f..143c5eae3ef 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -563,6 +563,7 @@ env.Library(
'$BUILD_DIR/mongo/db/exec/stagedebug_cmd',
'$BUILD_DIR/mongo/db/fle_crud_mongod',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
+ '$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/multitenancy',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/pipeline/process_interface/mongo_process_interface',
diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp
index c665a45a983..c743a405714 100644
--- a/src/mongo/db/commands/create_indexes.cpp
+++ b/src/mongo/db/commands/create_indexes.cpp
@@ -35,6 +35,7 @@
#include <vector>
#include "mongo/base/string_data.h"
+#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
#include "mongo/crypto/encryption_fields_util.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/clustered_collection_util.h"
@@ -63,6 +64,7 @@
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/storage/two_phase_index_build_knobs_gen.h"
#include "mongo/db/timeseries/catalog_helper.h"
#include "mongo/db/timeseries/timeseries_commands_conversion_helper.h"
@@ -444,16 +446,28 @@ CreateIndexesReply runCreateIndexesOnNewCollection(
return reply;
}
+bool isCreatingInternalConfigTxnsPartialIndex(const CreateIndexesCommand& cmd) {
+ if (cmd.getIndexes().size() > 1) {
+ return false;
+ }
+ const auto& index = cmd.getIndexes()[0];
+
+ UnorderedFieldsBSONObjComparator comparator;
+ return comparator.compare(index, MongoDSessionCatalog::getConfigTxnPartialIndexSpec()) == 0;
+}
+
CreateIndexesReply runCreateIndexesWithCoordinator(OperationContext* opCtx,
const CreateIndexesCommand& cmd) {
const auto ns = cmd.getNamespace();
uassertStatusOK(userAllowedWriteNS(opCtx, ns));
// Disallow users from creating new indexes on config.transactions since the sessions code
- // was optimized to not update indexes
+ // was optimized to not update indexes. The only exception is the partial index used to support
+ // retryable transactions that the sessions code knows how to handle.
uassert(ErrorCodes::IllegalOperation,
str::stream() << "not allowed to create index on " << ns.ns(),
- ns != NamespaceString::kSessionTransactionsTableNamespace);
+ ns != NamespaceString::kSessionTransactionsTableNamespace ||
+ isCreatingInternalConfigTxnsPartialIndex(cmd));
uassert(ErrorCodes::OperationNotSupportedInTransaction,
str::stream() << "Cannot write to system collection " << ns.toString()
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
index 3f6eb510fb4..d62877c1521 100644
--- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
+++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index_builds_coordinator.h"
+#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/persistent_task_store.h"
@@ -79,6 +80,7 @@
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
#include "mongo/db/vector_clock.h"
@@ -117,6 +119,41 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeUpdatingSessionDocs);
*/
Lock::ResourceMutex commandMutex("setFCVCommandMutex");
+void createPartialConfigTransactionsIndex(OperationContext* opCtx) {
+ {
+ AutoGetCollection coll(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS);
+ if (!coll) {
+ // Early return to avoid implicitly creating config.transactions.
+ return;
+ }
+ }
+
+ DBDirectClient client(opCtx);
+
+ auto indexSpec = MongoDSessionCatalog::getConfigTxnPartialIndexSpec();
+
+ // Throws on error and is a noop if the index already exists.
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(), {indexSpec});
+}
+
+void dropPartialConfigTransactionsIndex(OperationContext* opCtx) {
+ DBDirectClient client(opCtx);
+
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append("dropIndexes", NamespaceString::kSessionTransactionsTableNamespace.coll());
+ cmdBuilder.append("index", MongoDSessionCatalog::kConfigTxnsPartialIndexName);
+ auto dropIndexCmd = cmdBuilder.obj();
+
+ // Throws on error.
+ BSONObj result;
+ client.runCommand(
+ NamespaceString::kSessionTransactionsTableNamespace.db().toString(), dropIndexCmd, result);
+ const auto status = getStatusFromWriteCommandReply(result);
+ if (status != ErrorCodes::IndexNotFound && status != ErrorCodes::NamespaceNotFound) {
+ uassertStatusOK(status);
+ }
+}
+
/**
* Deletes the persisted default read/write concern document.
*/
@@ -622,6 +659,10 @@ private:
opCtx, CommandHelpers::appendMajorityWriteConcern(requestPhase2.toBSON({}))));
}
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabledOnVersion(requestedVersion)) {
+ createPartialConfigTransactionsIndex(opCtx);
+ }
+
// Create the pre-images collection if the feature flag is enabled on the requested version.
// TODO SERVER-61770: Remove once FCV 6.0 becomes last-lts.
if (feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabledOnVersion(
@@ -916,6 +957,10 @@ private:
}
}
+ if (!feature_flags::gFeatureFlagInternalTransactions.isEnabledOnVersion(requestedVersion)) {
+ dropPartialConfigTransactionsIndex(opCtx);
+ }
+
// TODO SERVER-64720 Remove when 6.0 becomes last LTS
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
ShardingDDLCoordinatorService::getService(opCtx)
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index c2e10b83299..8b194d14e95 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -176,6 +176,7 @@ public:
// Ensure that we are primary.
auto replCoord = repl::ReplicationCoordinator::get(opCtx.get());
ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ MongoDSessionCatalog::onStepUp(opCtx.get());
ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
}
@@ -717,7 +718,6 @@ public:
OpObserverTest::setUp();
auto opCtx = cc().makeOperationContext();
- MongoDSessionCatalog::onStepUp(opCtx.get());
}
/**
@@ -838,7 +838,6 @@ public:
OpObserverTest::setUp();
_opCtx = cc().makeOperationContext();
_opObserver.emplace();
- MongoDSessionCatalog::onStepUp(opCtx());
_times.emplace(opCtx());
}
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
index 17860543896..714dddef4b5 100644
--- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -35,6 +35,7 @@
#include <boost/optional/optional_io.hpp>
#include <vector>
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/op_observer_noop.h"
#include "mongo/db/op_observer_registry.h"
@@ -50,6 +51,7 @@
#include "mongo/db/repl/tenant_oplog_batcher.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/service_context_test_fixture.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/log_test.h"
@@ -320,6 +322,11 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeTransaction) {
TEST_F(TenantOplogApplierTest, CommitUnpreparedTransaction_DataPartiallyApplied) {
createCollectionWithUuid(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace);
+ {
+ DBDirectClient client(_opCtx.get());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
+ }
NamespaceString nss(dbName, "bar");
auto uuid = createCollectionWithUuid(_opCtx.get(), nss);
auto lsid = makeLogicalSessionId(_opCtx.get());
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp
index 82ddf586f76..a30e3fa1a05 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp
@@ -60,6 +60,8 @@ class ShardingCatalogManagerBumpCollectionVersionAndChangeMetadataTest
auto opCtx = operationContext();
DBDirectClient client(opCtx);
client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
client.createCollection(CollectionType::ConfigNS.ns());
LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 623524a545d..91e1b4a21bc 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
@@ -75,6 +76,8 @@ protected:
auto opCtx = operationContext();
DBDirectClient client(opCtx);
client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
// TODO: SERVER-26919 set the flag on the mock repl coordinator just for the window where it
// actually needs to bypass the op observer.
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
index e3b59a8068f..6a5197b4c41 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
#include "mongo/db/s/resharding/resharding_util.h"
#include "mongo/db/s/transaction_coordinator_service.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_collection.h"
@@ -149,6 +150,8 @@ public:
auto opCtx = operationContext();
DBDirectClient client(opCtx);
client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
client.createCollection(NamespaceString::kConfigReshardingOperationsNamespace.ns());
client.createCollection(CollectionType::ConfigNS.ns());
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index 00788960871..a8fb4d83889 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -73,6 +73,8 @@ protected:
auto opCtx = operationContext();
DBDirectClient client(opCtx);
client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
client.createCollection(NamespaceString::kConfigReshardingOperationsNamespace.ns());
client.createCollection(CollectionType::ConfigNS.ns());
client.createIndex(TagsType::ConfigNS.ns(), BSON("ns" << 1 << "min" << 1));
diff --git a/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp
index fb3925ea159..e942dcd139f 100644
--- a/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_destined_recipient_test.cpp
@@ -184,6 +184,8 @@ protected:
ReshardingEnv setupReshardingEnv(OperationContext* opCtx, bool refreshTempNss) {
DBDirectClient client(opCtx);
ASSERT(client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()));
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
opCtx);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index dd709560ab5..02334d1f4bf 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -140,6 +140,9 @@ public:
operationContext(),
NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
BSON("create" << NamespaceString::kSessionTransactionsTableNamespace.coll())));
+ DBDirectClient client(operationContext());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
operationContext());
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index d36c42aa9b6..edf1049a847 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/s/session_catalog_migration.h"
#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/db/session.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/executor/remote_command_request.h"
@@ -2554,6 +2555,8 @@ TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithCommittedWrit
DBDirectClient client(opCtx());
client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
// Enter an oplog entry before creating SessionCatalogMigrationSource to set config.transactions
// average object size to the size of this entry.
auto entry = makeOplogEntry(
diff --git a/src/mongo/db/s/sharding_ddl_util_test.cpp b/src/mongo/db/s/sharding_ddl_util_test.cpp
index 044e11ccda9..f1921cf38c4 100644
--- a/src/mongo/db/s/sharding_ddl_util_test.cpp
+++ b/src/mongo/db/s/sharding_ddl_util_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/s/config/config_server_test_fixture.h"
#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/transaction_coordinator_service.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
@@ -59,6 +60,8 @@ protected:
auto opCtx = operationContext();
DBDirectClient client(opCtx);
client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createIndexes(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {MongoDSessionCatalog::getConfigTxnPartialIndexSpec()});
client.createCollection(NamespaceString::kConfigReshardingOperationsNamespace.ns());
client.createCollection(CollectionType::ConfigNS.ns());
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index 5cd0b76ad23..0e14c08a058 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -346,19 +346,11 @@ void createTransactionTable(OperationContext* opCtx) {
str::stream() << "Failed to create the "
<< NamespaceString::kSessionTransactionsTableNamespace.ns() << " collection");
- NewIndexSpec index;
- index.setV(int(IndexDescriptor::kLatestIndexVersion));
- index.setKey(BSON(
- SessionTxnRecord::kParentSessionIdFieldName
- << 1
- << (SessionTxnRecord::kSessionIdFieldName + "." + LogicalSessionId::kTxnNumberFieldName)
- << 1 << SessionTxnRecord::kSessionIdFieldName << 1));
- index.setName("parent_lsid");
- index.setPartialFilterExpression(BSON("parentLsid" << BSON("$exists" << true)));
+ auto indexSpec = MongoDSessionCatalog::getConfigTxnPartialIndexSpec();
const auto createIndexStatus =
repl::StorageInterface::get(opCtx)->createIndexesOnEmptyCollection(
- opCtx, NamespaceString::kSessionTransactionsTableNamespace, {index.toBSON()});
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace, {indexSpec});
uassertStatusOKWithContext(
createIndexStatus,
str::stream() << "Failed to create partial index for the "
@@ -411,6 +403,21 @@ void abortInProgressTransactions(OperationContext* opCtx) {
}
} // namespace
+const std::string MongoDSessionCatalog::kConfigTxnsPartialIndexName = "parent_lsid";
+
+BSONObj MongoDSessionCatalog::getConfigTxnPartialIndexSpec() {
+ NewIndexSpec index;
+ index.setV(int(IndexDescriptor::kLatestIndexVersion));
+ index.setKey(BSON(
+ SessionTxnRecord::kParentSessionIdFieldName
+ << 1
+ << (SessionTxnRecord::kSessionIdFieldName + "." + LogicalSessionId::kTxnNumberFieldName)
+ << 1 << SessionTxnRecord::kSessionIdFieldName << 1));
+ index.setName(MongoDSessionCatalog::kConfigTxnsPartialIndexName);
+ index.setPartialFilterExpression(BSON("parentLsid" << BSON("$exists" << true)));
+ return index.toBSON();
+}
+
void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
// Invalidate sessions that could have a retryable write on it, so that we can refresh from disk
// in case the in-memory state was out of sync.
diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h
index a3179683746..eb8402aece4 100644
--- a/src/mongo/db/session_catalog_mongod.h
+++ b/src/mongo/db/session_catalog_mongod.h
@@ -37,6 +37,14 @@ class SessionsCollection;
class MongoDSessionCatalog {
public:
+ static const std::string kConfigTxnsPartialIndexName;
+
+ /**
+ * Returns the specification for the partial index on config.transactions used to support
+ * retryable transactions.
+ */
+ static BSONObj getConfigTxnPartialIndexSpec();
+
/**
* Invoked when the node enters the primary state. Ensures that the transactions collection is
* created. Throws on severe exceptions due to which it is not safe to continue the step-up
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index a2ff68bb8a8..77817e23ed2 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -66,6 +66,7 @@
#include "mongo/db/s/sharding_write_router.h"
#include "mongo/db/server_recovery.h"
#include "mongo/db/server_transactions_metrics.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/storage/flow_control.h"
#include "mongo/db/transaction_history_iterator.h"
@@ -180,6 +181,20 @@ auto performReadWithNoTimestampDBDirectClient(OperationContext* opCtx, Callable&
return callable(&client);
}
+void rethrowPartialIndexQueryBadValueWithContext(const DBException& ex) {
+ if (ex.reason().find("hint provided does not correspond to an existing index")) {
+ uassertStatusOKWithContext(
+ ex.toStatus(),
+ str::stream()
+ << "Failed to find partial index for "
+ << NamespaceString::kSessionTransactionsTableNamespace.ns()
+ << ". Please create an index directly on this replica set with the specification: "
+ << MongoDSessionCatalog::getConfigTxnPartialIndexSpec() << " or drop the "
+ << NamespaceString::kSessionTransactionsTableNamespace.ns()
+ << " collection and step up a new primary.");
+ }
+}
+
struct ActiveTransactionHistory {
boost::optional<SessionTxnRecord> lastTxnRecord;
TransactionParticipant::CommittedStatementTimestampMap committedStatements;
@@ -326,29 +341,35 @@ TxnNumber fetchHighestTxnNumberWithInternalSessions(OperationContext* opCtx,
highestTxnNumber = osession.getHighestTxnNumberWithChildSessions();
});
- performReadWithNoTimestampDBDirectClient(opCtx, [&](DBDirectClient* client) {
- FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace};
- findRequest.setFilter(BSON(
- SessionTxnRecord::kParentSessionIdFieldName
- << parentLsid.toBSON()
- << (SessionTxnRecord::kSessionIdFieldName + "." + LogicalSessionId::kTxnNumberFieldName)
- << BSON("$gte" << highestTxnNumber)));
- findRequest.setSort(BSON(
- (SessionTxnRecord::kSessionIdFieldName + "." + LogicalSessionId::kTxnNumberFieldName)
- << -1));
- findRequest.setProjection(BSON(SessionTxnRecord::kSessionIdFieldName << 1));
- findRequest.setLimit(1);
-
- auto cursor = client->find(findRequest);
-
- while (cursor->more()) {
- const auto doc = cursor->next();
- const auto childLsid = LogicalSessionId::parse(
- IDLParserErrorContext("LogicalSessionId"), doc.getObjectField("_id"));
- highestTxnNumber = std::max(highestTxnNumber, *childLsid.getTxnNumber());
- invariant(!cursor->more());
- }
- });
+ try {
+ performReadWithNoTimestampDBDirectClient(opCtx, [&](DBDirectClient* client) {
+ FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace};
+ findRequest.setFilter(BSON(SessionTxnRecord::kParentSessionIdFieldName
+ << parentLsid.toBSON()
+ << (SessionTxnRecord::kSessionIdFieldName + "." +
+ LogicalSessionId::kTxnNumberFieldName)
+ << BSON("$gte" << highestTxnNumber)));
+ findRequest.setSort(BSON((SessionTxnRecord::kSessionIdFieldName + "." +
+ LogicalSessionId::kTxnNumberFieldName)
+ << -1));
+ findRequest.setProjection(BSON(SessionTxnRecord::kSessionIdFieldName << 1));
+ findRequest.setLimit(1);
+ findRequest.setHint(BSON("$hint" << MongoDSessionCatalog::kConfigTxnsPartialIndexName));
+
+ auto cursor = client->find(findRequest);
+
+ while (cursor->more()) {
+ const auto doc = cursor->next();
+ const auto childLsid = LogicalSessionId::parse(
+ IDLParserErrorContext("LogicalSessionId"), doc.getObjectField("_id"));
+ highestTxnNumber = std::max(highestTxnNumber, *childLsid.getTxnNumber());
+ invariant(!cursor->more());
+ }
+ });
+ } catch (const ExceptionFor<ErrorCodes::BadValue>& ex) {
+ rethrowPartialIndexQueryBadValueWithContext(ex);
+ throw;
+ }
return highestTxnNumber;
}
@@ -2874,22 +2895,27 @@ void TransactionParticipant::Participant::_refreshActiveTransactionParticipantsF
// Make sure that every child session has a corresponding
// Session/TransactionParticipant.
- performReadWithNoTimestampDBDirectClient(opCtx, [&](DBDirectClient* client) {
- FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace};
- findRequest.setFilter(BSON(SessionTxnRecord::kParentSessionIdFieldName
- << parentTxnParticipant._sessionId().toBSON()
- << (SessionTxnRecord::kSessionIdFieldName + "." +
- LogicalSessionId::kTxnNumberFieldName)
- << BSON("$gte" << *activeRetryableWriteTxnNumber)));
- findRequest.setProjection(BSON("_id" << 1));
-
- auto cursor = client->find(findRequest);
-
- while (cursor->more()) {
- const auto doc = cursor->next();
- const auto childLsid = LogicalSessionId::parse(
- IDLParserErrorContext("LogicalSessionId"), doc.getObjectField("_id"));
- uassert(6202001,
+ try {
+ performReadWithNoTimestampDBDirectClient(opCtx, [&](DBDirectClient* client) {
+ FindCommandRequest findRequest{
+ NamespaceString::kSessionTransactionsTableNamespace};
+ findRequest.setFilter(BSON(SessionTxnRecord::kParentSessionIdFieldName
+ << parentTxnParticipant._sessionId().toBSON()
+ << (SessionTxnRecord::kSessionIdFieldName + "." +
+ LogicalSessionId::kTxnNumberFieldName)
+ << BSON("$gte" << *activeRetryableWriteTxnNumber)));
+ findRequest.setProjection(BSON(SessionTxnRecord::kSessionIdFieldName << 1));
+ findRequest.setHint(
+ BSON("$hint" << MongoDSessionCatalog::kConfigTxnsPartialIndexName));
+
+ auto cursor = client->find(findRequest);
+
+ while (cursor->more()) {
+ const auto doc = cursor->next();
+ const auto childLsid = LogicalSessionId::parse(
+ IDLParserErrorContext("LogicalSessionId"), doc.getObjectField("_id"));
+ uassert(
+ 6202001,
str::stream()
<< "Refresh expected the highest transaction number in the session "
<< parentTxnParticipant._sessionId() << " to be "
@@ -2898,15 +2924,20 @@ void TransactionParticipant::Participant::_refreshActiveTransactionParticipantsF
<< " entry for an internal transaction for retryable writes with "
<< "transaction number " << *childLsid.getTxnNumber(),
*childLsid.getTxnNumber() == *activeRetryableWriteTxnNumber);
- auto sessionCatalog = SessionCatalog::get(opCtx);
- sessionCatalog->createSessionIfDoesNotExist(childLsid);
- sessionCatalog->scanSession(childLsid, [&](const ObservableSession& osession) {
- auto childTxnParticipant =
- TransactionParticipant::get(opCtx, osession.get());
- childTxnParticipants.push_back(childTxnParticipant);
- });
- }
- });
+ auto sessionCatalog = SessionCatalog::get(opCtx);
+ sessionCatalog->createSessionIfDoesNotExist(childLsid);
+ sessionCatalog->scanSession(
+ childLsid, [&](const ObservableSession& osession) {
+ auto childTxnParticipant =
+ TransactionParticipant::get(opCtx, osession.get());
+ childTxnParticipants.push_back(childTxnParticipant);
+ });
+ }
+ });
+ } catch (const ExceptionFor<ErrorCodes::BadValue>& ex) {
+ rethrowPartialIndexQueryBadValueWithContext(ex);
+ throw;
+ }
for (auto& childTxnParticipant : childTxnParticipants) {
childTxnParticipant._refreshSelfFromStorageIfNeeded(opCtx, fetchOplogEntries);