diff options
-rw-r--r-- | jstests/multiVersion/config_transactions_set_fcv.js | 432 | ||||
-rw-r--r-- | jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js | 145 | ||||
-rw-r--r-- | jstests/multiVersion/sharded_txn_downgrade_cluster.js | 96 | ||||
-rw-r--r-- | jstests/multiVersion/sharded_txn_upgrade_cluster.js | 82 | ||||
-rw-r--r-- | src/mongo/db/commands/set_feature_compatibility_version_command.cpp | 164 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/session_update_tracker.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 22 |
9 files changed, 970 insertions, 20 deletions
diff --git a/jstests/multiVersion/config_transactions_set_fcv.js b/jstests/multiVersion/config_transactions_set_fcv.js new file mode 100644 index 00000000000..ca8d4bb4dfa --- /dev/null +++ b/jstests/multiVersion/config_transactions_set_fcv.js @@ -0,0 +1,432 @@ +/** + * Tests that config.transactions documents are correctly modified on FCV upgrade/downgrade and that + * retryability is preserved for transactions that don't use prepare after upgrade and always for + * retryable writes. + * + * @tags: [uses_transactions, uses_prepare_transaction] + */ +(function() { + "use strict"; + load("jstests/libs/feature_compatibility_version.js"); + load('jstests/sharding/libs/sharded_transactions_helpers.js'); + + const dbName = "test"; + const collName = "config_transactions_set_fcv"; + + // Define autocommit as a variable so it can be used in object literals w/o an explicit value. + const autocommit = false; + + // Start a replica set with an odd number of members to verify nodes outside the majority behave + // correctly around setFeatureCompatibilityVersion, which uses majority writes to update the FCV + // document. The primary isn't expected to change, so each secondary is given priority 0. + const rst = + new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]}); + rst.startSet(); + rst.initiate(); + + let testDB = rst.getPrimary().getDB(dbName); + let adminDB = rst.getPrimary().getDB("admin"); + + assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}})); + + // Starts a dummy transaction, commits or aborts it with or without prepare, then returns the + // commit or abort response. Returns the response from prepare if it fails. + function runTxn({lsid, txnNumber}, {commit, prepare, leaveOpen}) { + const startTransactionRes = testDB.runCommand({ + insert: collName, + documents: [{x: "dummy_txn"}], + txnNumber: NumberLong(txnNumber), + startTransaction: true, lsid, autocommit, + }); + if (!startTransactionRes.ok || leaveOpen) { + return startTransactionRes; + } + + if (prepare) { + const prepareRes = testDB.adminCommand( + {prepareTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit}); + if (!prepareRes.ok) { + return prepareRes; + } + + if (commit) { + // Add 1 to the increment so that the commitTimestamp is after the prepareTimestamp. + const commitTimestamp = Timestamp(prepareRes.prepareTimestamp.getTime(), + prepareRes.prepareTimestamp.getInc() + 1); + return testDB.adminCommand({ + commitTransaction: 1, + commitTimestamp, + txnNumber: NumberLong(txnNumber), lsid, autocommit + }); + } else { + return testDB.adminCommand( + {abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit}); + } + } + + if (commit) { + return testDB.adminCommand( + {commitTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit}); + } else { + return testDB.adminCommand( + {abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit}); + } + } + + // Retries commitTransaction for the given txnId, returning the response. + function retryCommit({lsid, txnNumber}) { + return testDB.adminCommand( + {commitTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit}); + } + + // Asserts aborting the given txnId returns NoSuchTransaction. + function assertTransactionAborted({lsid, txnNumber}) { + assert.commandFailedWithCode( + testDB.adminCommand( + {abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit}), + ErrorCodes.NoSuchTransaction); + } + + // Global counter for the number of retryable writes completed. Used to verify retried retryable + // writes aren't double applied. + let numRetryableWrites = 0; + + // Runs a dummy retryable write and increments the retryable write counter. + function assertRetryableWriteWorked({lsid, txnNumber}) { + numRetryableWrites += 1; + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{fromRetryableWrite: true}], + txnNumber: NumberLong(txnNumber), lsid + })); + } + + // Verifies a txnId has already been used for a retryable write by running a dummy retryable + // write and asserting the write isn't applied. + function assertRetryableWriteCanBeRetried({lsid, txnNumber}) { + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{fromRetryableWrite: true}], + txnNumber: NumberLong(txnNumber), lsid + })); + assert.eq(numRetryableWrites, testDB[collName].find({fromRetryableWrite: true}).itcount()); + } + + // Searches config.transactions for an entry for the given txnId on each node in the replica + // set, verifying the entry does / does not exist and has the expected state, if specified. + function checkConfigTransactionEntry(rst, {lsid, txnNumber}, {hasEntry, expectedState}) { + rst.awaitReplication(); + rst.nodes.forEach((node) => { + // Search for id since we don't know the uid, which is generated by the server. + const entry = node.getDB("config").transactions.findOne({"_id.id": lsid.id}); + + if (!hasEntry) { + // There should be no entry for this session or it should be for an earlier + // operation. + if (entry) { + assert.gt(txnNumber, + entry.txnNum, + "expected entry to have lower txnNumber, entry: " + tojson(entry) + + ", node: " + tojson(node)); + } else { + assert.isnull(entry, + "expected entry to be null, entry: " + tojson(entry) + + ", node: " + tojson(node)); + } + return; + } + + assert.eq(txnNumber, + entry.txnNum, + "expected entry to have the same txnNumber, entry: " + tojson(entry) + + ", node: " + tojson(node)); + + if (expectedState) { + assert.eq(expectedState, + entry.state, + "entry: " + tojson(entry) + ", node: " + tojson(node)); + } else { + assert(!entry.hasOwnProperty("state"), + "expected entry to not have state, entry: " + tojson(entry) + ", node: " + + tojson(node)); + } + }); + } + + function runTest({shouldRestart}) { + // The test waits for failpoints to log a message when hit, so clear the program output + // before starting so messages from previous iterations aren't in it. + clearRawMongoProgramOutput(); + + const txnIds = { + write: {lsid: {id: UUID()}, txnNumber: 0}, // Retryable write. + commit: {lsid: {id: UUID()}, txnNumber: 0}, // Committed transaction w/o prepare. + commitPrepare: {lsid: {id: UUID()}, txnNumber: 0}, // Committed transaction w/ prepare. + abort: {lsid: {id: UUID()}, txnNumber: 0}, // Aborted transaction w/o prepare. + abortPrepare: {lsid: {id: UUID()}, txnNumber: 0}, // Aborted transaction after prepare. + concurrentTxn: {lsid: {id: UUID()}, txnNumber: 0}, // Transaction concurrent w/ setFCV. + concurrentWrite: + {lsid: {id: UUID()}, txnNumber: 0}, // Retryable write concurrent w/ setFCV. + upgradingTxn: + {lsid: {id: UUID()}, txnNumber: 0}, // Transaction started during FCV upgrade. + }; + + // + // In the latest FCV, verify the expected updates are made to config.transactions for each + // case and the successful operations are retryable. + // + checkFCV(adminDB, latestFCV); + + assertRetryableWriteWorked(txnIds.write); + assert.commandWorked(runTxn(txnIds.commit, {commit: true, prepare: false})); + assert.commandWorked(runTxn(txnIds.commitPrepare, {commit: true, prepare: true})); + assert.commandWorked(runTxn(txnIds.abort, {commit: false, prepare: false})); + assert.commandWorked(runTxn(txnIds.abortPrepare, {commit: false, prepare: true})); + + checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true}); + checkConfigTransactionEntry( + rst, txnIds.commit, {hasEntry: true, expectedState: "committed"}); + checkConfigTransactionEntry( + rst, txnIds.commitPrepare, {hasEntry: true, expectedState: "committed"}); + checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false}); + checkConfigTransactionEntry( + rst, txnIds.abortPrepare, {hasEntry: true, expectedState: "aborted"}); + + // The retryable write and the commit of both committed transactions should be retryable. + // The aborted transactions should still be aborted. + assertRetryableWriteCanBeRetried(txnIds.write); + assert.commandWorked(retryCommit(txnIds.commit)); + assert.commandWorked(retryCommit(txnIds.commitPrepare)); + assertTransactionAborted(txnIds.abort); + assertTransactionAborted(txnIds.abortPrepare); + + // + // Downgrade to the last-stable FCV and verify config.transactions was updated as expected + // for previously completed operations and operations concurrent with the downgrade. + // + + if (shouldRestart) { + // Restart to verify config.transactions entries for sessions not in-memory at the + // beginning of FCV downgrade are updated correctly. + jsTestLog("Restarting replica set before downgrading the featureCompatibilityVersion."); + for (let i = 0; i < rst.nodes.length; i++) { + rst.restart(i); + } + testDB = rst.getPrimary().getDB(dbName); + adminDB = rst.getPrimary().getDB("admin"); + } + + // Make setFCV pause in the downgrading state after getting the list of sessions to + // potentially modify. + assert.commandWorked(rst.getPrimary().adminCommand( + {configureFailPoint: "pauseBeforeDowngradingSessions", mode: "alwaysOn"})); + + // Downgrade FCV in a parallel shell and wait until it blocks at the failpoint above. + const awaitDowngradeFCV = startParallelShell(() => { + load("jstests/libs/feature_compatibility_version.js"); + jsTestLog("Downgrade the featureCompatibilityVersion in a parallel shell."); + assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: lastStableFCV})); + }, rst.getPrimary().port); + waitForFailpoint("Hit pauseBeforeDowngradingSessions failpoint", 1 /*numTimes*/); + + // Concurrent transactions that use prepare will fail. + assert.commandFailedWithCode(runTxn(txnIds.concurrentTxn, {commit: true, prepare: true}), + ErrorCodes.CommandNotSupported); + txnIds.concurrentTxn.txnNumber += 1; + + // Concurrent transactions that do not use prepare and retryable writes succeed. + assert.commandWorked(runTxn(txnIds.concurrentTxn, {commit: true, prepare: false})); + assertRetryableWriteWorked(txnIds.concurrentWrite); + + // Unset the failpoint and wait for the downgrade to finish. + assert.commandWorked(rst.getPrimary().adminCommand( + {configureFailPoint: "pauseBeforeDowngradingSessions", mode: "off"})); + + awaitDowngradeFCV(); + checkFCV(adminDB, lastStableFCV); + + // The successful concurrent operations should have entries without state and be retryable. + checkConfigTransactionEntry(rst, txnIds.concurrentTxn, {hasEntry: true}); + assert.commandWorked(retryCommit(txnIds.concurrentTxn)); + checkConfigTransactionEntry(rst, txnIds.concurrentWrite, {hasEntry: true}); + assertRetryableWriteCanBeRetried(txnIds.concurrentWrite); + + // Only the retryable write entry should remain. + checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true}); + checkConfigTransactionEntry(rst, txnIds.commit, {hasEntry: false}); + checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false}); + checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false}); + if (!shouldRestart) { + // TODO SERVER-35872: Transactions aborted after prepare don't transition to the correct + // state after refreshing from storage. + checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false}); + } + + // The retryable write can be retried. + assertRetryableWriteCanBeRetried(txnIds.write); + + // Neither of the commits can be retried. + assert.commandFailedWithCode(retryCommit(txnIds.commit), ErrorCodes.NoSuchTransaction); + assert.commandFailedWithCode(retryCommit(txnIds.commitPrepare), + ErrorCodes.NoSuchTransaction); + + // + // In the last-stable FCV, verify the expected updates are made to config.transactions for + // each case and the successful operations are retryable. + // + + // Reset each txnId to test upgrade with a clean slate. + Object.keys(txnIds).forEach((txnIdKey) => { + txnIds[txnIdKey].lsid = {id: UUID()}; + txnIds[txnIdKey].txnNumber = 0; + }); + + // Prepare can't be used in FCV 4.0, so only commit, abort, and retryable write should + // succeed. + assertRetryableWriteWorked(txnIds.write); + assert.commandWorked(runTxn(txnIds.commit, {commit: true, prepare: false})); + assert.commandFailedWithCode(runTxn(txnIds.commitPrepare, {commit: true, prepare: true}), + ErrorCodes.CommandNotSupported); + assert.commandWorked(runTxn(txnIds.abort, {commit: false, prepare: false})); + assert.commandFailedWithCode(runTxn(txnIds.abortPrepare, {commit: false, prepare: true}), + ErrorCodes.CommandNotSupported); + + // Only the retryable write and transaction that committed without prepare should have an + // entry. Neither should have state. + checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true}); + checkConfigTransactionEntry(rst, txnIds.commit, {hasEntry: true}); + checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false}); + checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false}); + checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false}); + + // The retryable write and successful commit can be retried. + assertRetryableWriteCanBeRetried(txnIds.write); + assert.commandWorked(retryCommit(txnIds.commit)); + + if (shouldRestart) { + // Restart to verify config.transactions entries for sessions not in-memory at the + // beginning of FCV upgrade are updated correctly. + jsTestLog("Restarting replica set before upgrading the featureCompatibilityVersion."); + for (let i = 0; i < rst.nodes.length; i++) { + rst.restart(i); + } + testDB = rst.getPrimary().getDB(dbName); + adminDB = rst.getPrimary().getDB("admin"); + } + + // + // Upgrade to the latest FCV and verify config.transactions was updated as expected for + // previously completed operations and operations concurrent with the upgrade. + // + + // Run a retryable write on the session that will be used during upgrade so it has a + // transaction table entry and will be checked out by the upgrade. + assertRetryableWriteWorked(txnIds.upgradingTxn); + txnIds.upgradingTxn.txnNumber += 1; + + // Make setFCV pause in the upgrading state after getting the list of sessions to + // potentially modify. + assert.commandWorked(rst.getPrimary().adminCommand( + {configureFailPoint: "pauseBeforeUpgradingSessions", mode: "alwaysOn"})); + + // Upgrade FCV in a parallel shell and wait until it blocks at the failpoint above. + const awaitUpgradeFCV = startParallelShell(() => { + load("jstests/libs/feature_compatibility_version.js"); + jsTestLog("Upgrade the featureCompatibilityVersion in a parallel shell."); + assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: latestFCV})); + }, rst.getPrimary().port); + waitForFailpoint("Hit pauseBeforeUpgradingSessions failpoint", 1 /*numTimes*/); + + // Concurrent transactions that use prepare will fail. + assert.commandFailedWithCode(runTxn(txnIds.concurrentTxn, {commit: true, prepare: true}), + ErrorCodes.CommandNotSupported); + txnIds.concurrentTxn.txnNumber += 1; + + // Concurrent transactions that do not use prepare and retryable writes succeed. + assert.commandWorked(runTxn(txnIds.concurrentTxn, {commit: true, prepare: false})); + assertRetryableWriteWorked(txnIds.concurrentWrite); + + // Start a transaction in the upgrading state and verify that it doesn't get aborted by the + // rest of the upgrade. Note that all sessions are killed and their transactions aborted for + // writes to the FCV document except when it is set to the fully upgraded state, so this + // can't be tested for downgrade. + assert.commandWorked(runTxn(txnIds.upgradingTxn, {leaveOpen: true})); + + // Unset the failpoint and wait for the upgrade to finish. + assert.commandWorked(rst.getPrimary().adminCommand( + {configureFailPoint: "pauseBeforeUpgradingSessions", mode: "off"})); + + awaitUpgradeFCV(); + checkFCV(adminDB, latestFCV); + + // The transaction started while upgrading shouldn't have been killed and can be committed. + assert.commandWorked(testDB.adminCommand({ + commitTransaction: 1, + lsid: txnIds.upgradingTxn.lsid, + txnNumber: NumberLong(txnIds.upgradingTxn.txnNumber), autocommit + })); + + // The successful concurrent transaction should have "committed" state and be retryable, and + // the concurrent retryable write should not have state and also be retryable. + checkConfigTransactionEntry( + rst, txnIds.concurrentTxn, {hasEntry: true, expectedState: "committed"}); + assert.commandWorked(retryCommit(txnIds.concurrentTxn)); + checkConfigTransactionEntry(rst, txnIds.concurrentWrite, {hasEntry: true}); + assertRetryableWriteCanBeRetried(txnIds.concurrentWrite); + + // There should still only be entries for the committed transaction and retryable write. The + // committed transaction should now have a "state" field. + checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true}); + checkConfigTransactionEntry( + rst, txnIds.commit, {hasEntry: true, expectedState: "committed"}); + checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false}); + checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false}); + checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false}); + + // The retryable write and successful commit can be retried. + assertRetryableWriteCanBeRetried(txnIds.write); + assert.commandWorked(retryCommit(txnIds.commit)); + } + + runTest({shouldRestart: false}); + runTest({shouldRestart: true}); + + // + // Verify setFCV is interruptible between modifying sessions. + // + clearRawMongoProgramOutput(); + checkFCV(adminDB, latestFCV); + + // Construct a config.transactions entry that would be modified by downgrade. + const txnIds = {interrupt: {lsid: {id: UUID()}, txnNumber: 0}}; + assert.commandWorked(runTxn(txnIds.interrupt, {commit: true, prepare: true})); + checkConfigTransactionEntry( + rst, txnIds.interrupt, {hasEntry: true, expectedState: "committed"}); + + // Pause setFCV before it would modify the entry. + assert.commandWorked(rst.getPrimary().adminCommand( + {configureFailPoint: "pauseBeforeDowngradingSessions", mode: "alwaysOn"})); + + TestData.setFCVLsid = {id: UUID()}; + const awaitUpgradeFCV = startParallelShell(() => { + load("jstests/libs/feature_compatibility_version.js"); + assert.commandFailedWithCode( + db.adminCommand( + {setFeatureCompatibilityVersion: lastStableFCV, lsid: TestData.setFCVLsid}), + ErrorCodes.Interrupted); + }, rst.getPrimary().port); + waitForFailpoint("Hit pauseBeforeDowngradingSessions failpoint", 1 /*numTimes*/); + + // Kill the session running setFCV. + assert.commandWorked(rst.getPrimary().adminCommand({killSessions: [TestData.setFCVLsid]})); + + // Unpause the failpoint and verify setFCV returns without modifying config.transactions. + assert.commandWorked(rst.getPrimary().adminCommand( + {configureFailPoint: "pauseBeforeDowngradingSessions", mode: "off"})); + + awaitUpgradeFCV(); + checkConfigTransactionEntry( + rst, txnIds.interrupt, {hasEntry: true, expectedState: "committed"}); + + rst.stopSet(); +}()); diff --git a/jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js b/jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js new file mode 100644 index 00000000000..4180b52fa01 --- /dev/null +++ b/jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js @@ -0,0 +1,145 @@ +/** + * Functions and variables shared between multiversion/sharded_txn_upgrade_cluster.js and + * multiversion/sharded_txn_downgrade_cluster.js. + */ + +// Define autocommit as a variable so it can be used in object literals w/o an explicit value. +const autocommit = false; + +// Sets up a cluster at the given binary version with two shards and a collection sharded by "skey" +// with one chunk on each shard. +function setUpTwoShardClusterWithBinVersion(dbName, collName, binVersion) { + const st = new ShardingTest({ + shards: 2, + other: { + mongosOptions: {binVersion}, + configOptions: {binVersion}, + rsOptions: {binVersion}, + }, + rs: {nodes: 3} // Use 3 node replica sets to allow binary changes with no downtime. + }); + checkFCV(st.configRS.getPrimary().getDB("admin"), + binVersion === "latest" ? latestFCV : lastStableFCV); + + // Set up a sharded collection with two chunks, one on each shard. + const ns = dbName + "." + collName; + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + st.ensurePrimaryShard(dbName, st.shard0.shardName); + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {skey: 1}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {skey: 0}})); + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {skey: 1}, to: st.shard1.shardName})); + + return st; +} + +// Runs a transaction against the given database using the given txnId by running two inserts. +// Depending on the multiShard parameter, will insert to one or two shards. Assumes testDB is a +// sharded collection sharded by skey, with chunks: [minKey, 0), [0, maxKey). +function runTxn(testDB, collName, {lsid, txnNumber}, {multiShard}) { + const docs = multiShard ? [{skey: -1}, {skey: 1}] : [{skey: 1}]; + const startTransactionRes = testDB.runCommand({ + insert: collName, + documents: docs, + txnNumber: NumberLong(txnNumber), + startTransaction: true, lsid, autocommit, + }); + if (!startTransactionRes.ok) { + return startTransactionRes; + } + + const secondStatementRes = testDB.runCommand({ + insert: collName, + documents: docs, + txnNumber: NumberLong(txnNumber), lsid, autocommit, + }); + if (!secondStatementRes.ok) { + return secondStatementRes; + } + + return testDB.adminCommand( + {commitTransaction: 1, lsid, txnNumber: NumberLong(txnNumber), autocommit}); +} + +// Retries commitTransaction for the given txnId, returning the response. +function retryCommit(testDB, {lsid, txnNumber}) { + return testDB.adminCommand( + {commitTransaction: 1, lsid, txnNumber: NumberLong(txnNumber), autocommit}); +} + +// Global counter for the number of multi shard retryable writes completed. Used to verify retried +// retryable writes aren't double applied. +let numMultiShardRetryableWrites = 0; + +// Runs a dummy retryable write against two shards and increments the retryable write counter. +// Assumes testDB is a sharded collection sharded by skey, with chunks: [minKey, 0), [0, maxKey). +function assertMultiShardRetryableWriteWorked(testDB, collName, {lsid, txnNumber}) { + numMultiShardRetryableWrites += 1; + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{skey: -1, fromRetryableWrite: true}, {skey: 1, fromRetryableWrite: true}], + txnNumber: NumberLong(txnNumber), lsid + })); +} + +// Verifies a txnId has already been used for a retryable write by running a dummy retryable write +// and asserting the write isn't applied. Assumes testDB is a sharded collection sharded by skey, +// with chunks: [minKey, 0), [0, maxKey). +function assertMultiShardRetryableWriteCanBeRetried(testDB, collName, {lsid, txnNumber}) { + assert.commandWorked(testDB.runCommand({ + insert: collName, + documents: [{skey: -1, fromRetryableWrite: true}, {skey: 1, fromRetryableWrite: true}], + txnNumber: NumberLong(txnNumber), lsid + })); + assert.eq(numMultiShardRetryableWrites * 2, // Each write inserts 2 documents. + testDB[collName].find({fromRetryableWrite: true}).itcount()); +} + +// Recreates unique indexes in the FCV 4.0 format to allow for a binary downgrade form 4.2 to 4.0. +// +// Taken from: +// https://docs.mongodb.com/master/release-notes/4.2-downgrade-sharded-cluster/#remove-backwards-incompatible-persisted-features +function downgradeUniqueIndexesScript(db) { + var unique_idx_v1 = []; + var unique_idx_v2 = []; + db.adminCommand("listDatabases").databases.forEach(function(d) { + let mdb = db.getSiblingDB(d.name); + mdb.getCollectionInfos().forEach(function(c) { + let currentCollection = mdb.getCollection(c.name); + currentCollection.getIndexes().forEach(function(i) { + if (i.unique) { + if (i.v === 1) { + unique_idx_v1.push(i); + } else { + unique_idx_v2.push(i); + } + return; + } + }); + }); + }); + + // Drop and recreate all v:1 indexes + for (let idx of unique_idx_v1) { + let [dbName, collName] = idx.ns.split("."); + let res = db.getSiblingDB(dbName).runCommand({dropIndexes: collName, index: idx.name}); + assert.commandWorked(res); + res = db.getSiblingDB(dbName).runCommand({ + createIndexes: collName, + indexes: [{"key": idx.key, "name": idx.name, "unique": true, "v": 1}] + }); + assert.commandWorked(res); + } + + // Drop and recreate all v:2 indexes + for (let idx of unique_idx_v2) { + let [dbName, collName] = idx.ns.split("."); + let res = db.getSiblingDB(dbName).runCommand({dropIndexes: collName, index: idx.name}); + assert.commandWorked(res); + res = db.getSiblingDB(dbName).runCommand({ + createIndexes: collName, + indexes: [{"key": idx.key, "name": idx.name, "unique": true, "v": 2}] + }); + assert.commandWorked(res); + } +} diff --git a/jstests/multiVersion/sharded_txn_downgrade_cluster.js b/jstests/multiVersion/sharded_txn_downgrade_cluster.js new file mode 100644 index 00000000000..394cb89903f --- /dev/null +++ b/jstests/multiVersion/sharded_txn_downgrade_cluster.js @@ -0,0 +1,96 @@ +/** + * Tests downgrading a cluster from the current to last stable version succeeds, verifying the + * behavior of transactions and retryable writes throughout the process. + * + * @tags: [uses_transactions, uses_multi_shard_transaction] + */ + +// Checking UUID consistency uses cached connections, which are not valid across restarts or +// stepdowns. +TestData.skipCheckingUUIDsConsistentAcrossCluster = true; + +(function() { + "use strict"; + + load("jstests/libs/feature_compatibility_version.js"); + load("jstests/multiVersion/libs/multi_rs.js"); + load("jstests/multiVersion/libs/multi_cluster.js"); + load("jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js"); + + const dbName = "test"; + const collName = "sharded_txn_downgrade_cluster"; + + // Start a cluster with two shards at the latest version. + const st = setUpTwoShardClusterWithBinVersion(dbName, collName, "latest"); + + const txnIds = { + commit: {lsid: {id: UUID()}, txnNumber: 0}, + commitMulti: {lsid: {id: UUID()}, txnNumber: 0}, + write: {lsid: {id: UUID()}, txnNumber: 0}, + }; + + let testDB = st.s.getDB(dbName); + + // Retryable writes and transactions with and without prepare should work. + assert.commandWorked(runTxn(testDB, collName, txnIds.commit, {multiShard: false})); + assert.commandWorked(runTxn(testDB, collName, txnIds.commitMulti, {multiShard: true})); + assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.write); + + // commitTransaction for both transactions and the retryable write should be retryable. + assert.commandWorked(retryCommit(testDB, txnIds.commit)); + assert.commandWorked(retryCommit(testDB, txnIds.commitMulti)); + assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write); + + // Downgrade featureCompatibilityVersion. + assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: lastStableFCV})); + checkFCV(st.configRS.getPrimary().getDB("admin"), lastStableFCV); + + // Only the retryable write can be retried. Can't retry the multi shard transaction because it + // uses coordinateCommit, which is not allowed in FCV 4.0. + assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write); + assert.commandFailedWithCode(retryCommit(testDB, txnIds.commit), ErrorCodes.NoSuchTransaction); + assert.commandFailedWithCode(retryCommit(testDB, txnIds.commitMulti), + ErrorCodes.CommandNotSupported); + + downgradeUniqueIndexesScript(st.s.getDB("test")); + + // Downgrade the mongos servers first. + jsTestLog("Downgrading mongos servers."); + st.upgradeCluster("last-stable", + {upgradeConfigs: false, upgradeMongos: true, upgradeShards: false}); + + // Then downgrade the shard servers. + jsTestLog("Downgrading shard servers."); + st.upgradeCluster("last-stable", + {upgradeConfigs: false, upgradeMongos: false, upgradeShards: true}); + + // Then downgrade the config servers. + jsTestLog("Downgrading config servers."); + st.upgradeCluster("last-stable", + {upgradeConfigs: true, upgradeMongos: false, upgradeShards: false}); + checkFCV(st.configRS.getPrimary().getDB("admin"), lastStableFCV); + + testDB = st.s.getDB(dbName); + + // Can still retry the retryable write. + assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write); + + // The txnIds used for the earlier commits should be re-usable because their history was + // removed. + assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.commit); + assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.commit); + + assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.commitMulti); + assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.commitMulti); + + // Can perform a new operation on each session. + Object.keys(txnIds).forEach((txnIdKey) => { + txnIds[txnIdKey].txnNumber += 1; + }); + + assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.commit); + assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.commitMulti); + assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.write); + + st.stop(); +})(); diff --git a/jstests/multiVersion/sharded_txn_upgrade_cluster.js b/jstests/multiVersion/sharded_txn_upgrade_cluster.js new file mode 100644 index 00000000000..04c3bddfde7 --- /dev/null +++ b/jstests/multiVersion/sharded_txn_upgrade_cluster.js @@ -0,0 +1,82 @@ +/** + * Tests upgrading a cluster from last stable to current version, verifying the behavior of + * transactions and retryable writes throughout the process. + * + * @tags: [uses_transactions, uses_multi_shard_transaction] + */ + +// Checking UUID consistency uses cached connections, which are not valid across restarts or +// stepdowns. +TestData.skipCheckingUUIDsConsistentAcrossCluster = true; + +(function() { + "use strict"; + + load("jstests/libs/feature_compatibility_version.js"); + load("jstests/multiVersion/libs/multi_rs.js"); + load("jstests/multiVersion/libs/multi_cluster.js"); + load("jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js"); + + const dbName = "test"; + const collName = "sharded_txn_upgrade_cluster"; + + // Start a cluster with two shards at the last stable version. + const st = setUpTwoShardClusterWithBinVersion(dbName, collName, "last-stable"); + + const txnIds = { + commit: {lsid: {id: UUID()}, txnNumber: 0}, + commitMulti: {lsid: {id: UUID()}, txnNumber: 0}, + write: {lsid: {id: UUID()}, txnNumber: 0}, + }; + + let testDB = st.s.getDB(dbName); + + // Only retryable writes work and they are retryable. + assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.write); + assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write); + + // Upgrade the config servers. + jsTestLog("Upgrading config servers."); + st.upgradeCluster("latest", {upgradeConfigs: true, upgradeMongos: false, upgradeShards: false}); + + // Then upgrade the shard servers. + jsTestLog("Upgrading shard servers."); + st.upgradeCluster("latest", {upgradeConfigs: false, upgradeMongos: false, upgradeShards: true}); + + // Then upgrade mongos servers. + jsTestLog("Upgrading mongos servers."); + st.upgradeCluster("latest", {upgradeConfigs: false, upgradeMongos: true, upgradeShards: false}); + checkFCV(st.configRS.getPrimary().getDB("admin"), lastStableFCV); + + testDB = st.s.getDB(dbName); + + // Can still retry the retryable write. + assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write); + + // Transactions that don't use prepare are allowed in FCV 4.0 with a 4.2 binary mongos. + assert.commandWorked(runTxn(testDB, collName, txnIds.commit, {multiShard: false})); + + // Multi shard transactions will fail because coordinateCommit is not allowed in FCV 4.0. + assert.commandFailedWithCode(runTxn(testDB, collName, txnIds.commitMulti, {multiShard: true}), + ErrorCodes.CommandNotSupported); + + // Upgrade the cluster's feature compatibility version to the latest. + assert.commandWorked( + st.s.getDB("admin").runCommand({setFeatureCompatibilityVersion: latestFCV})); + checkFCV(st.configRS.getPrimary().getDB("admin"), latestFCV); + + // Can still retry the retryable write and the committed transaction. + assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write); + assert.commandWorked(retryCommit(testDB, txnIds.commit)); + + // Can perform a new operation on each session. + Object.keys(txnIds).forEach((txnIdKey) => { + txnIds[txnIdKey].txnNumber += 1; + }); + + assert.commandWorked(runTxn(testDB, collName, txnIds.commit, {multiShard: false})); + assert.commandWorked(runTxn(testDB, collName, txnIds.commitMulti, {multiShard: true})); + assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.write); + + st.stop(); +})(); diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index 08b13786b9e..50196ec7aed 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -27,12 +27,15 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + #include "mongo/platform/basic.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/coll_mod.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" +#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/commands/feature_compatibility_version_command_parser.h" @@ -40,15 +43,22 @@ #include "mongo/db/commands/feature_compatibility_version_parser.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/server_options.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/database_version_helpers.h" #include "mongo/s/grid.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -57,6 +67,156 @@ namespace { MONGO_FAIL_POINT_DEFINE(featureCompatibilityDowngrade); MONGO_FAIL_POINT_DEFINE(featureCompatibilityUpgrade); +MONGO_FAIL_POINT_DEFINE(pauseBeforeUpgradingSessions); +MONGO_FAIL_POINT_DEFINE(pauseBeforeDowngradingSessions); + +/** + * Returns a set of the logical session ids of each entry in config.transactions that matches the + * given query. + */ +LogicalSessionIdSet getMatchingSessionIdsFromTransactionTable(OperationContext* opCtx, + Query query) { + LogicalSessionIdSet sessionIds = {}; + + DBDirectClient client(opCtx); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, query); + while (cursor->more()) { + auto txnRecord = SessionTxnRecord::parse( + IDLParserErrorContext("setFCV-find-matching-sessions"), cursor->next()); + sessionIds.insert(txnRecord.getSessionId()); + } + return sessionIds; +} + +/** + * Checks out each given session with a new operation context, verifies the session's transaction + * participant passes the validation function, runs the modification function with a direct + * client from another new operation context while the session is checked out, then invalidates the + * session. + */ +void forEachSessionWithCheckout( + OperationContext* opCtx, + LogicalSessionIdSet sessionIds, + stdx::function<bool(OperationContext* opCtx)> verifyTransactionParticipantFn, + stdx::function<void(DBDirectClient* client, LogicalSessionId sessionId)> + performModificationFn) { + // Construct a new operation context to check out the session with. + auto clientForCheckout = + opCtx->getServiceContext()->makeClient("setFCV-transaction-table-checkout"); + AlternativeClientRegion acrForCheckout(clientForCheckout); + for (const auto& sessionId : sessionIds) { + // Check for interrupt on the parent opCtx because killing it won't be propagated to the + // opCtx checking out the session and performing the modification. + opCtx->checkForInterrupt(); + + const auto opCtxForCheckout = cc().makeOperationContext(); + opCtxForCheckout->setLogicalSessionId(sessionId); + MongoDOperationContextSession ocs(opCtxForCheckout.get()); + + // Now that the session is checked out, verify it still needs to be modified using its + // transaction participant. + if (!verifyTransactionParticipantFn(opCtxForCheckout.get())) { + continue; + } + + { + // Perform the modification on another operation context to bypass retryable writes and + // transactions machinery. + auto clientForModification = + opCtx->getServiceContext()->makeClient("setFCV-transaction-table-modification"); + AlternativeClientRegion acrForModification(clientForModification); + + const auto opCtxForModification = cc().makeOperationContext(); + DBDirectClient directClient(opCtxForModification.get()); + performModificationFn(&directClient, sessionId); + } + + // Note that invalidating the session here is unnecessary if the modification function + // writes directly to config.transactions, which already invalidates the affected session. + auto txnParticipant = TransactionParticipant::get(opCtxForCheckout.get()); + txnParticipant.invalidate(opCtxForCheckout.get()); + } +} + +/** + * Removes all documents from config.transactions with a "state" field because they may point to + * oplog entries in a format a 4.0 mongod cannot process. + */ +void downgradeTransactionTable(OperationContext* opCtx) { + // In FCV 4.0, all transaction table entries associated with a transaction have a "state" field. + Query query(BSON("state" << BSON("$exists" << true))); + LogicalSessionIdSet sessionIdsWithState = + getMatchingSessionIdsFromTransactionTable(opCtx, query); + + if (MONGO_FAIL_POINT(pauseBeforeDowngradingSessions)) { + LOG(0) << "Hit pauseBeforeDowngradingSessions failpoint"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(pauseBeforeDowngradingSessions); + } + + // Remove all transaction table entries associated with a committed / aborted transaction. Note + // that transactions that abort before prepare have no entry. + forEachSessionWithCheckout( + opCtx, + sessionIdsWithState, + [](OperationContext* opCtx) { + auto txnParticipant = TransactionParticipant::get(opCtx); + return txnParticipant.transactionIsCommitted() || txnParticipant.transactionIsAborted(); + }, + [](DBDirectClient* directClient, LogicalSessionId sessionId) { + const auto commandResponse = directClient->runCommand([&] { + write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON("_id" << sessionId.toBSON())); + entry.setMulti(false); + return entry; + }()}); + return deleteOp.serialize({}); + }()); + uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply())); + }); +} + +/** + * Adds a "state" field to all documents in config.transactions that represent committed + * transactions so they are in the 4.2 format. + */ +void upgradeTransactionTable(OperationContext* opCtx) { + // Retryable writes and committed transactions have the same format in FCV 4.0, so use an empty + // query to return all session ids in the transaction table. + LogicalSessionIdSet allSessionIds = getMatchingSessionIdsFromTransactionTable(opCtx, Query()); + + if (MONGO_FAIL_POINT(pauseBeforeUpgradingSessions)) { + LOG(0) << "Hit pauseBeforeUpgradingSessions failpoint"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(pauseBeforeUpgradingSessions); + } + + // Add state=committed to the transaction table entry for each session that most recently + // committed a transaction. + forEachSessionWithCheckout( + opCtx, + allSessionIds, + [](OperationContext* opCtx) { + auto txnParticipant = TransactionParticipant::get(opCtx); + return txnParticipant.transactionIsCommitted(); + }, + [](DBDirectClient* directClient, LogicalSessionId sessionId) { + const auto commandResponse = directClient->runCommand([&] { + write_ops::Update updateOp(NamespaceString::kSessionTransactionsTableNamespace); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON("_id" << sessionId.toBSON())); + entry.setU(BSON("$set" << BSON("state" + << "committed"))); + entry.setMulti(false); + return entry; + }()}); + return updateOp.serialize({}); + }()); + uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply())); + }); +} + /** * Sets the minimum allowed version for the cluster. If it is 4.0, then the node should not use 4.2 * features. @@ -171,6 +331,8 @@ public: updateUniqueIndexesOnUpgrade(opCtx); + upgradeTransactionTable(opCtx); + // Upgrade shards before config finishes its upgrade. if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { uassertStatusOK( @@ -213,6 +375,8 @@ public: Lock::GlobalLock lk(opCtx, MODE_S); } + downgradeTransactionTable(opCtx); + // Downgrade shards before config finishes its downgrade. if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { uassertStatusOK( diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 829905e322e..17db14347b9 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1018,7 +1018,16 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, auto times = replLogApplyOps( opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot); - auto txnState = prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted; + auto txnState = [prepare]() -> boost::optional<DurableTxnStateEnum> { + if (serverGlobalParams.featureCompatibility.getVersion() < + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) { + invariant(!prepare); + return boost::none; + } + + return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted; + }(); + onWriteOpCompleted( opCtx, cmdNss, {stmtId}, times.writeOpTime, times.wallClockTime, txnState); return times; diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index 32cc3466879..958e77f1497 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -35,6 +35,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/server_options.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/util/assert_util.h" @@ -68,20 +69,25 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate( newTxnRecord.setLastWriteOpTime(entry.getOpTime()); newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); - switch (entry.getCommandType()) { - case repl::OplogEntry::CommandType::kApplyOps: - newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared - : DurableTxnStateEnum::kCommitted); - break; - case repl::OplogEntry::CommandType::kCommitTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - break; - case repl::OplogEntry::CommandType::kAbortTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kAborted); - break; - default: - break; + // "state" is a new field in 4.2. + if (serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) { + switch (entry.getCommandType()) { + case repl::OplogEntry::CommandType::kApplyOps: + newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared + : DurableTxnStateEnum::kCommitted); + break; + case repl::OplogEntry::CommandType::kCommitTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kCommitted); + break; + case repl::OplogEntry::CommandType::kAbortTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kAborted); + break; + default: + break; + } } + return newTxnRecord.toBSON(); }(); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index c811ff0dd82..6ac27643f1e 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -907,12 +907,16 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx, // Commands must be processed one at a time. The only exception to this is applyOps because // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is safe // to batch applyOps commands with CRUD operations when reading from the oplog buffer. + // // Oplog entries on 'system.views' should also be processed one at a time. View catalog // immediately reflects changes for each oplog entry so we can see inconsistent view catalog if // multiple oplog entries on 'system.views' are being applied out of the original order. + // + // Process updates to 'admin.system.version' individually as well so the secondary's FCV when + // processing each operation matches the primary's when committing that operation. if ((entry.isCommand() && (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare())) || - entry.getNss().isSystemDotViews()) { + entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) { if (ops->getCount() == 1) { // apply commands one-at-a-time _consume(opCtx, oplogBuffer); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index dad487ccdb6..941fa6c5b06 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -155,6 +155,15 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, return result; } + // State is a new field in FCV 4.2 that indicates if a transaction committed, so check it in FCV + // 4.2 and upgrading to 4.2. Check when downgrading as well so sessions refreshed at the start + // of downgrade enter the correct state. + if ((serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kDowngradingTo40) && + result.lastTxnRecord->getState() == DurableTxnStateEnum::kCommitted) { + result.transactionCommitted = true; + } + auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime()); while (it.hasNext()) { try { @@ -181,11 +190,14 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, entry.getOpTime()); } - // Either an applyOps oplog entry without a prepare flag or the state being kCommitted - // marks the commit of a transaction. - if ((entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps && - !entry.shouldPrepare()) || - (result.lastTxnRecord->getState() == DurableTxnStateEnum::kCommitted)) { + // State is a new field in FCV 4.2, so look for an applyOps oplog entry without a + // prepare flag to mark a committed transaction in FCV 4.0 or downgrading to 4.0. Check + // when upgrading as well so sessions refreshed at the beginning of upgrade enter the + // correct state. + if ((serverGlobalParams.featureCompatibility.getVersion() <= + ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) && + (entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps && + !entry.shouldPrepare())) { result.transactionCommitted = true; } } catch (const DBException& ex) { |