summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/multiVersion/config_transactions_set_fcv.js432
-rw-r--r--jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js145
-rw-r--r--jstests/multiVersion/sharded_txn_downgrade_cluster.js96
-rw-r--r--jstests/multiVersion/sharded_txn_upgrade_cluster.js82
-rw-r--r--src/mongo/db/commands/set_feature_compatibility_version_command.cpp164
-rw-r--r--src/mongo/db/op_observer_impl.cpp11
-rw-r--r--src/mongo/db/repl/session_update_tracker.cpp32
-rw-r--r--src/mongo/db/repl/sync_tail.cpp6
-rw-r--r--src/mongo/db/transaction_participant.cpp22
9 files changed, 970 insertions, 20 deletions
diff --git a/jstests/multiVersion/config_transactions_set_fcv.js b/jstests/multiVersion/config_transactions_set_fcv.js
new file mode 100644
index 00000000000..ca8d4bb4dfa
--- /dev/null
+++ b/jstests/multiVersion/config_transactions_set_fcv.js
@@ -0,0 +1,432 @@
+/**
+ * Tests that config.transactions documents are correctly modified on FCV upgrade/downgrade and that
+ * retryability is preserved for transactions that don't use prepare after upgrade and always for
+ * retryable writes.
+ *
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+(function() {
+ "use strict";
+ load("jstests/libs/feature_compatibility_version.js");
+ load('jstests/sharding/libs/sharded_transactions_helpers.js');
+
+ const dbName = "test";
+ const collName = "config_transactions_set_fcv";
+
+ // Define autocommit as a variable so it can be used in object literals w/o an explicit value.
+ const autocommit = false;
+
+ // Start a replica set with an odd number of members to verify nodes outside the majority behave
+ // correctly around setFeatureCompatibilityVersion, which uses majority writes to update the FCV
+ // document. The primary isn't expected to change, so each secondary is given priority 0.
+ const rst =
+ new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]});
+ rst.startSet();
+ rst.initiate();
+
+ let testDB = rst.getPrimary().getDB(dbName);
+ let adminDB = rst.getPrimary().getDB("admin");
+
+ assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}}));
+
+ // Starts a dummy transaction, commits or aborts it with or without prepare, then returns the
+ // commit or abort response. Returns the response from prepare if it fails.
+ function runTxn({lsid, txnNumber}, {commit, prepare, leaveOpen}) {
+ const startTransactionRes = testDB.runCommand({
+ insert: collName,
+ documents: [{x: "dummy_txn"}],
+ txnNumber: NumberLong(txnNumber),
+ startTransaction: true, lsid, autocommit,
+ });
+ if (!startTransactionRes.ok || leaveOpen) {
+ return startTransactionRes;
+ }
+
+ if (prepare) {
+ const prepareRes = testDB.adminCommand(
+ {prepareTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
+ if (!prepareRes.ok) {
+ return prepareRes;
+ }
+
+ if (commit) {
+ // Add 1 to the increment so that the commitTimestamp is after the prepareTimestamp.
+ const commitTimestamp = Timestamp(prepareRes.prepareTimestamp.getTime(),
+ prepareRes.prepareTimestamp.getInc() + 1);
+ return testDB.adminCommand({
+ commitTransaction: 1,
+ commitTimestamp,
+ txnNumber: NumberLong(txnNumber), lsid, autocommit
+ });
+ } else {
+ return testDB.adminCommand(
+ {abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
+ }
+ }
+
+ if (commit) {
+ return testDB.adminCommand(
+ {commitTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
+ } else {
+ return testDB.adminCommand(
+ {abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
+ }
+ }
+
+ // Retries commitTransaction for the given txnId, returning the response.
+ function retryCommit({lsid, txnNumber}) {
+ return testDB.adminCommand(
+ {commitTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
+ }
+
+ // Asserts aborting the given txnId returns NoSuchTransaction.
+ function assertTransactionAborted({lsid, txnNumber}) {
+ assert.commandFailedWithCode(
+ testDB.adminCommand(
+ {abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit}),
+ ErrorCodes.NoSuchTransaction);
+ }
+
+ // Global counter for the number of retryable writes completed. Used to verify retried retryable
+ // writes aren't double applied.
+ let numRetryableWrites = 0;
+
+ // Runs a dummy retryable write and increments the retryable write counter.
+ function assertRetryableWriteWorked({lsid, txnNumber}) {
+ numRetryableWrites += 1;
+ assert.commandWorked(testDB.runCommand({
+ insert: collName,
+ documents: [{fromRetryableWrite: true}],
+ txnNumber: NumberLong(txnNumber), lsid
+ }));
+ }
+
+ // Verifies a txnId has already been used for a retryable write by running a dummy retryable
+ // write and asserting the write isn't applied.
+ function assertRetryableWriteCanBeRetried({lsid, txnNumber}) {
+ assert.commandWorked(testDB.runCommand({
+ insert: collName,
+ documents: [{fromRetryableWrite: true}],
+ txnNumber: NumberLong(txnNumber), lsid
+ }));
+ assert.eq(numRetryableWrites, testDB[collName].find({fromRetryableWrite: true}).itcount());
+ }
+
+ // Searches config.transactions for an entry for the given txnId on each node in the replica
+ // set, verifying the entry does / does not exist and has the expected state, if specified.
+ function checkConfigTransactionEntry(rst, {lsid, txnNumber}, {hasEntry, expectedState}) {
+ rst.awaitReplication();
+ rst.nodes.forEach((node) => {
+ // Search for id since we don't know the uid, which is generated by the server.
+ const entry = node.getDB("config").transactions.findOne({"_id.id": lsid.id});
+
+ if (!hasEntry) {
+ // There should be no entry for this session or it should be for an earlier
+ // operation.
+ if (entry) {
+ assert.gt(txnNumber,
+ entry.txnNum,
+ "expected entry to have lower txnNumber, entry: " + tojson(entry) +
+ ", node: " + tojson(node));
+ } else {
+ assert.isnull(entry,
+ "expected entry to be null, entry: " + tojson(entry) +
+ ", node: " + tojson(node));
+ }
+ return;
+ }
+
+ assert.eq(txnNumber,
+ entry.txnNum,
+ "expected entry to have the same txnNumber, entry: " + tojson(entry) +
+ ", node: " + tojson(node));
+
+ if (expectedState) {
+ assert.eq(expectedState,
+ entry.state,
+ "entry: " + tojson(entry) + ", node: " + tojson(node));
+ } else {
+ assert(!entry.hasOwnProperty("state"),
+ "expected entry to not have state, entry: " + tojson(entry) + ", node: " +
+ tojson(node));
+ }
+ });
+ }
+
+ function runTest({shouldRestart}) {
+ // The test waits for failpoints to log a message when hit, so clear the program output
+ // before starting so messages from previous iterations aren't in it.
+ clearRawMongoProgramOutput();
+
+ const txnIds = {
+ write: {lsid: {id: UUID()}, txnNumber: 0}, // Retryable write.
+ commit: {lsid: {id: UUID()}, txnNumber: 0}, // Committed transaction w/o prepare.
+ commitPrepare: {lsid: {id: UUID()}, txnNumber: 0}, // Committed transaction w/ prepare.
+ abort: {lsid: {id: UUID()}, txnNumber: 0}, // Aborted transaction w/o prepare.
+ abortPrepare: {lsid: {id: UUID()}, txnNumber: 0}, // Aborted transaction after prepare.
+ concurrentTxn: {lsid: {id: UUID()}, txnNumber: 0}, // Transaction concurrent w/ setFCV.
+ concurrentWrite:
+ {lsid: {id: UUID()}, txnNumber: 0}, // Retryable write concurrent w/ setFCV.
+ upgradingTxn:
+ {lsid: {id: UUID()}, txnNumber: 0}, // Transaction started during FCV upgrade.
+ };
+
+ //
+ // In the latest FCV, verify the expected updates are made to config.transactions for each
+ // case and the successful operations are retryable.
+ //
+ checkFCV(adminDB, latestFCV);
+
+ assertRetryableWriteWorked(txnIds.write);
+ assert.commandWorked(runTxn(txnIds.commit, {commit: true, prepare: false}));
+ assert.commandWorked(runTxn(txnIds.commitPrepare, {commit: true, prepare: true}));
+ assert.commandWorked(runTxn(txnIds.abort, {commit: false, prepare: false}));
+ assert.commandWorked(runTxn(txnIds.abortPrepare, {commit: false, prepare: true}));
+
+ checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true});
+ checkConfigTransactionEntry(
+ rst, txnIds.commit, {hasEntry: true, expectedState: "committed"});
+ checkConfigTransactionEntry(
+ rst, txnIds.commitPrepare, {hasEntry: true, expectedState: "committed"});
+ checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false});
+ checkConfigTransactionEntry(
+ rst, txnIds.abortPrepare, {hasEntry: true, expectedState: "aborted"});
+
+ // The retryable write and the commit of both committed transactions should be retryable.
+ // The aborted transactions should still be aborted.
+ assertRetryableWriteCanBeRetried(txnIds.write);
+ assert.commandWorked(retryCommit(txnIds.commit));
+ assert.commandWorked(retryCommit(txnIds.commitPrepare));
+ assertTransactionAborted(txnIds.abort);
+ assertTransactionAborted(txnIds.abortPrepare);
+
+ //
+ // Downgrade to the last-stable FCV and verify config.transactions was updated as expected
+ // for previously completed operations and operations concurrent with the downgrade.
+ //
+
+ if (shouldRestart) {
+ // Restart to verify config.transactions entries for sessions not in-memory at the
+ // beginning of FCV downgrade are updated correctly.
+ jsTestLog("Restarting replica set before downgrading the featureCompatibilityVersion.");
+ for (let i = 0; i < rst.nodes.length; i++) {
+ rst.restart(i);
+ }
+ testDB = rst.getPrimary().getDB(dbName);
+ adminDB = rst.getPrimary().getDB("admin");
+ }
+
+ // Make setFCV pause in the downgrading state after getting the list of sessions to
+ // potentially modify.
+ assert.commandWorked(rst.getPrimary().adminCommand(
+ {configureFailPoint: "pauseBeforeDowngradingSessions", mode: "alwaysOn"}));
+
+ // Downgrade FCV in a parallel shell and wait until it blocks at the failpoint above.
+ const awaitDowngradeFCV = startParallelShell(() => {
+ load("jstests/libs/feature_compatibility_version.js");
+ jsTestLog("Downgrade the featureCompatibilityVersion in a parallel shell.");
+ assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: lastStableFCV}));
+ }, rst.getPrimary().port);
+ waitForFailpoint("Hit pauseBeforeDowngradingSessions failpoint", 1 /*numTimes*/);
+
+ // Concurrent transactions that use prepare will fail.
+ assert.commandFailedWithCode(runTxn(txnIds.concurrentTxn, {commit: true, prepare: true}),
+ ErrorCodes.CommandNotSupported);
+ txnIds.concurrentTxn.txnNumber += 1;
+
+ // Concurrent transactions that do not use prepare and retryable writes succeed.
+ assert.commandWorked(runTxn(txnIds.concurrentTxn, {commit: true, prepare: false}));
+ assertRetryableWriteWorked(txnIds.concurrentWrite);
+
+ // Unset the failpoint and wait for the downgrade to finish.
+ assert.commandWorked(rst.getPrimary().adminCommand(
+ {configureFailPoint: "pauseBeforeDowngradingSessions", mode: "off"}));
+
+ awaitDowngradeFCV();
+ checkFCV(adminDB, lastStableFCV);
+
+ // The successful concurrent operations should have entries without state and be retryable.
+ checkConfigTransactionEntry(rst, txnIds.concurrentTxn, {hasEntry: true});
+ assert.commandWorked(retryCommit(txnIds.concurrentTxn));
+ checkConfigTransactionEntry(rst, txnIds.concurrentWrite, {hasEntry: true});
+ assertRetryableWriteCanBeRetried(txnIds.concurrentWrite);
+
+ // Only the retryable write entry should remain.
+ checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true});
+ checkConfigTransactionEntry(rst, txnIds.commit, {hasEntry: false});
+ checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false});
+ checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false});
+ if (!shouldRestart) {
+ // TODO SERVER-35872: Transactions aborted after prepare don't transition to the correct
+ // state after refreshing from storage.
+ checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false});
+ }
+
+ // The retryable write can be retried.
+ assertRetryableWriteCanBeRetried(txnIds.write);
+
+ // Neither of the commits can be retried.
+ assert.commandFailedWithCode(retryCommit(txnIds.commit), ErrorCodes.NoSuchTransaction);
+ assert.commandFailedWithCode(retryCommit(txnIds.commitPrepare),
+ ErrorCodes.NoSuchTransaction);
+
+ //
+ // In the last-stable FCV, verify the expected updates are made to config.transactions for
+ // each case and the successful operations are retryable.
+ //
+
+ // Reset each txnId to test upgrade with a clean slate.
+ Object.keys(txnIds).forEach((txnIdKey) => {
+ txnIds[txnIdKey].lsid = {id: UUID()};
+ txnIds[txnIdKey].txnNumber = 0;
+ });
+
+ // Prepare can't be used in FCV 4.0, so only commit, abort, and retryable write should
+ // succeed.
+ assertRetryableWriteWorked(txnIds.write);
+ assert.commandWorked(runTxn(txnIds.commit, {commit: true, prepare: false}));
+ assert.commandFailedWithCode(runTxn(txnIds.commitPrepare, {commit: true, prepare: true}),
+ ErrorCodes.CommandNotSupported);
+ assert.commandWorked(runTxn(txnIds.abort, {commit: false, prepare: false}));
+ assert.commandFailedWithCode(runTxn(txnIds.abortPrepare, {commit: false, prepare: true}),
+ ErrorCodes.CommandNotSupported);
+
+ // Only the retryable write and transaction that committed without prepare should have an
+ // entry. Neither should have state.
+ checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true});
+ checkConfigTransactionEntry(rst, txnIds.commit, {hasEntry: true});
+ checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false});
+ checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false});
+ checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false});
+
+ // The retryable write and successful commit can be retried.
+ assertRetryableWriteCanBeRetried(txnIds.write);
+ assert.commandWorked(retryCommit(txnIds.commit));
+
+ if (shouldRestart) {
+ // Restart to verify config.transactions entries for sessions not in-memory at the
+ // beginning of FCV upgrade are updated correctly.
+ jsTestLog("Restarting replica set before upgrading the featureCompatibilityVersion.");
+ for (let i = 0; i < rst.nodes.length; i++) {
+ rst.restart(i);
+ }
+ testDB = rst.getPrimary().getDB(dbName);
+ adminDB = rst.getPrimary().getDB("admin");
+ }
+
+ //
+ // Upgrade to the latest FCV and verify config.transactions was updated as expected for
+ // previously completed operations and operations concurrent with the upgrade.
+ //
+
+ // Run a retryable write on the session that will be used during upgrade so it has a
+ // transaction table entry and will be checked out by the upgrade.
+ assertRetryableWriteWorked(txnIds.upgradingTxn);
+ txnIds.upgradingTxn.txnNumber += 1;
+
+ // Make setFCV pause in the upgrading state after getting the list of sessions to
+ // potentially modify.
+ assert.commandWorked(rst.getPrimary().adminCommand(
+ {configureFailPoint: "pauseBeforeUpgradingSessions", mode: "alwaysOn"}));
+
+ // Upgrade FCV in a parallel shell and wait until it blocks at the failpoint above.
+ const awaitUpgradeFCV = startParallelShell(() => {
+ load("jstests/libs/feature_compatibility_version.js");
+ jsTestLog("Upgrade the featureCompatibilityVersion in a parallel shell.");
+ assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+ }, rst.getPrimary().port);
+ waitForFailpoint("Hit pauseBeforeUpgradingSessions failpoint", 1 /*numTimes*/);
+
+ // Concurrent transactions that use prepare will fail.
+ assert.commandFailedWithCode(runTxn(txnIds.concurrentTxn, {commit: true, prepare: true}),
+ ErrorCodes.CommandNotSupported);
+ txnIds.concurrentTxn.txnNumber += 1;
+
+ // Concurrent transactions that do not use prepare and retryable writes succeed.
+ assert.commandWorked(runTxn(txnIds.concurrentTxn, {commit: true, prepare: false}));
+ assertRetryableWriteWorked(txnIds.concurrentWrite);
+
+ // Start a transaction in the upgrading state and verify that it doesn't get aborted by the
+ // rest of the upgrade. Note that all sessions are killed and their transactions aborted for
+ // writes to the FCV document except when it is set to the fully upgraded state, so this
+ // can't be tested for downgrade.
+ assert.commandWorked(runTxn(txnIds.upgradingTxn, {leaveOpen: true}));
+
+ // Unset the failpoint and wait for the upgrade to finish.
+ assert.commandWorked(rst.getPrimary().adminCommand(
+ {configureFailPoint: "pauseBeforeUpgradingSessions", mode: "off"}));
+
+ awaitUpgradeFCV();
+ checkFCV(adminDB, latestFCV);
+
+ // The transaction started while upgrading shouldn't have been killed and can be committed.
+ assert.commandWorked(testDB.adminCommand({
+ commitTransaction: 1,
+ lsid: txnIds.upgradingTxn.lsid,
+ txnNumber: NumberLong(txnIds.upgradingTxn.txnNumber), autocommit
+ }));
+
+ // The successful concurrent transaction should have "committed" state and be retryable, and
+ // the concurrent retryable write should not have state and also be retryable.
+ checkConfigTransactionEntry(
+ rst, txnIds.concurrentTxn, {hasEntry: true, expectedState: "committed"});
+ assert.commandWorked(retryCommit(txnIds.concurrentTxn));
+ checkConfigTransactionEntry(rst, txnIds.concurrentWrite, {hasEntry: true});
+ assertRetryableWriteCanBeRetried(txnIds.concurrentWrite);
+
+ // There should still only be entries for the committed transaction and retryable write. The
+ // committed transaction should now have a "state" field.
+ checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true});
+ checkConfigTransactionEntry(
+ rst, txnIds.commit, {hasEntry: true, expectedState: "committed"});
+ checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false});
+ checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false});
+ checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false});
+
+ // The retryable write and successful commit can be retried.
+ assertRetryableWriteCanBeRetried(txnIds.write);
+ assert.commandWorked(retryCommit(txnIds.commit));
+ }
+
+ runTest({shouldRestart: false});
+ runTest({shouldRestart: true});
+
+ //
+ // Verify setFCV is interruptible between modifying sessions.
+ //
+ clearRawMongoProgramOutput();
+ checkFCV(adminDB, latestFCV);
+
+ // Construct a config.transactions entry that would be modified by downgrade.
+ const txnIds = {interrupt: {lsid: {id: UUID()}, txnNumber: 0}};
+ assert.commandWorked(runTxn(txnIds.interrupt, {commit: true, prepare: true}));
+ checkConfigTransactionEntry(
+ rst, txnIds.interrupt, {hasEntry: true, expectedState: "committed"});
+
+ // Pause setFCV before it would modify the entry.
+ assert.commandWorked(rst.getPrimary().adminCommand(
+ {configureFailPoint: "pauseBeforeDowngradingSessions", mode: "alwaysOn"}));
+
+ TestData.setFCVLsid = {id: UUID()};
+ const awaitUpgradeFCV = startParallelShell(() => {
+ load("jstests/libs/feature_compatibility_version.js");
+ assert.commandFailedWithCode(
+ db.adminCommand(
+ {setFeatureCompatibilityVersion: lastStableFCV, lsid: TestData.setFCVLsid}),
+ ErrorCodes.Interrupted);
+ }, rst.getPrimary().port);
+ waitForFailpoint("Hit pauseBeforeDowngradingSessions failpoint", 1 /*numTimes*/);
+
+ // Kill the session running setFCV.
+ assert.commandWorked(rst.getPrimary().adminCommand({killSessions: [TestData.setFCVLsid]}));
+
+ // Unpause the failpoint and verify setFCV returns without modifying config.transactions.
+ assert.commandWorked(rst.getPrimary().adminCommand(
+ {configureFailPoint: "pauseBeforeDowngradingSessions", mode: "off"}));
+
+ awaitUpgradeFCV();
+ checkConfigTransactionEntry(
+ rst, txnIds.interrupt, {hasEntry: true, expectedState: "committed"});
+
+ rst.stopSet();
+}());
diff --git a/jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js b/jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js
new file mode 100644
index 00000000000..4180b52fa01
--- /dev/null
+++ b/jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js
@@ -0,0 +1,145 @@
+/**
+ * Functions and variables shared between multiversion/sharded_txn_upgrade_cluster.js and
+ * multiversion/sharded_txn_downgrade_cluster.js.
+ */
+
+// Define autocommit as a variable so it can be used in object literals w/o an explicit value.
+const autocommit = false;
+
+// Sets up a cluster at the given binary version with two shards and a collection sharded by "skey"
+// with one chunk on each shard.
+function setUpTwoShardClusterWithBinVersion(dbName, collName, binVersion) {
+ const st = new ShardingTest({
+ shards: 2,
+ other: {
+ mongosOptions: {binVersion},
+ configOptions: {binVersion},
+ rsOptions: {binVersion},
+ },
+ rs: {nodes: 3} // Use 3 node replica sets to allow binary changes with no downtime.
+ });
+ checkFCV(st.configRS.getPrimary().getDB("admin"),
+ binVersion === "latest" ? latestFCV : lastStableFCV);
+
+ // Set up a sharded collection with two chunks, one on each shard.
+ const ns = dbName + "." + collName;
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ st.ensurePrimaryShard(dbName, st.shard0.shardName);
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {skey: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: ns, middle: {skey: 0}}));
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {skey: 1}, to: st.shard1.shardName}));
+
+ return st;
+}
+
+// Runs a transaction against the given database using the given txnId by running two inserts.
+// Depending on the multiShard parameter, will insert to one or two shards. Assumes testDB is a
+// sharded collection sharded by skey, with chunks: [minKey, 0), [0, maxKey).
+function runTxn(testDB, collName, {lsid, txnNumber}, {multiShard}) {
+ const docs = multiShard ? [{skey: -1}, {skey: 1}] : [{skey: 1}];
+ const startTransactionRes = testDB.runCommand({
+ insert: collName,
+ documents: docs,
+ txnNumber: NumberLong(txnNumber),
+ startTransaction: true, lsid, autocommit,
+ });
+ if (!startTransactionRes.ok) {
+ return startTransactionRes;
+ }
+
+ const secondStatementRes = testDB.runCommand({
+ insert: collName,
+ documents: docs,
+ txnNumber: NumberLong(txnNumber), lsid, autocommit,
+ });
+ if (!secondStatementRes.ok) {
+ return secondStatementRes;
+ }
+
+ return testDB.adminCommand(
+ {commitTransaction: 1, lsid, txnNumber: NumberLong(txnNumber), autocommit});
+}
+
+// Retries commitTransaction for the given txnId, returning the response.
+function retryCommit(testDB, {lsid, txnNumber}) {
+ return testDB.adminCommand(
+ {commitTransaction: 1, lsid, txnNumber: NumberLong(txnNumber), autocommit});
+}
+
+// Global counter for the number of multi shard retryable writes completed. Used to verify retried
+// retryable writes aren't double applied.
+let numMultiShardRetryableWrites = 0;
+
+// Runs a dummy retryable write against two shards and increments the retryable write counter.
+// Assumes testDB is a sharded collection sharded by skey, with chunks: [minKey, 0), [0, maxKey).
+function assertMultiShardRetryableWriteWorked(testDB, collName, {lsid, txnNumber}) {
+ numMultiShardRetryableWrites += 1;
+ assert.commandWorked(testDB.runCommand({
+ insert: collName,
+ documents: [{skey: -1, fromRetryableWrite: true}, {skey: 1, fromRetryableWrite: true}],
+ txnNumber: NumberLong(txnNumber), lsid
+ }));
+}
+
+// Verifies a txnId has already been used for a retryable write by running a dummy retryable write
+// and asserting the write isn't applied. Assumes testDB is a sharded collection sharded by skey,
+// with chunks: [minKey, 0), [0, maxKey).
+function assertMultiShardRetryableWriteCanBeRetried(testDB, collName, {lsid, txnNumber}) {
+ assert.commandWorked(testDB.runCommand({
+ insert: collName,
+ documents: [{skey: -1, fromRetryableWrite: true}, {skey: 1, fromRetryableWrite: true}],
+ txnNumber: NumberLong(txnNumber), lsid
+ }));
+ assert.eq(numMultiShardRetryableWrites * 2, // Each write inserts 2 documents.
+ testDB[collName].find({fromRetryableWrite: true}).itcount());
+}
+
+// Recreates unique indexes in the FCV 4.0 format to allow for a binary downgrade form 4.2 to 4.0.
+//
+// Taken from:
+// https://docs.mongodb.com/master/release-notes/4.2-downgrade-sharded-cluster/#remove-backwards-incompatible-persisted-features
+function downgradeUniqueIndexesScript(db) {
+ var unique_idx_v1 = [];
+ var unique_idx_v2 = [];
+ db.adminCommand("listDatabases").databases.forEach(function(d) {
+ let mdb = db.getSiblingDB(d.name);
+ mdb.getCollectionInfos().forEach(function(c) {
+ let currentCollection = mdb.getCollection(c.name);
+ currentCollection.getIndexes().forEach(function(i) {
+ if (i.unique) {
+ if (i.v === 1) {
+ unique_idx_v1.push(i);
+ } else {
+ unique_idx_v2.push(i);
+ }
+ return;
+ }
+ });
+ });
+ });
+
+ // Drop and recreate all v:1 indexes
+ for (let idx of unique_idx_v1) {
+ let [dbName, collName] = idx.ns.split(".");
+ let res = db.getSiblingDB(dbName).runCommand({dropIndexes: collName, index: idx.name});
+ assert.commandWorked(res);
+ res = db.getSiblingDB(dbName).runCommand({
+ createIndexes: collName,
+ indexes: [{"key": idx.key, "name": idx.name, "unique": true, "v": 1}]
+ });
+ assert.commandWorked(res);
+ }
+
+ // Drop and recreate all v:2 indexes
+ for (let idx of unique_idx_v2) {
+ let [dbName, collName] = idx.ns.split(".");
+ let res = db.getSiblingDB(dbName).runCommand({dropIndexes: collName, index: idx.name});
+ assert.commandWorked(res);
+ res = db.getSiblingDB(dbName).runCommand({
+ createIndexes: collName,
+ indexes: [{"key": idx.key, "name": idx.name, "unique": true, "v": 2}]
+ });
+ assert.commandWorked(res);
+ }
+}
diff --git a/jstests/multiVersion/sharded_txn_downgrade_cluster.js b/jstests/multiVersion/sharded_txn_downgrade_cluster.js
new file mode 100644
index 00000000000..394cb89903f
--- /dev/null
+++ b/jstests/multiVersion/sharded_txn_downgrade_cluster.js
@@ -0,0 +1,96 @@
+/**
+ * Tests downgrading a cluster from the current to last stable version succeeds, verifying the
+ * behavior of transactions and retryable writes throughout the process.
+ *
+ * @tags: [uses_transactions, uses_multi_shard_transaction]
+ */
+
+// Checking UUID consistency uses cached connections, which are not valid across restarts or
+// stepdowns.
+TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+
+(function() {
+ "use strict";
+
+ load("jstests/libs/feature_compatibility_version.js");
+ load("jstests/multiVersion/libs/multi_rs.js");
+ load("jstests/multiVersion/libs/multi_cluster.js");
+ load("jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js");
+
+ const dbName = "test";
+ const collName = "sharded_txn_downgrade_cluster";
+
+ // Start a cluster with two shards at the latest version.
+ const st = setUpTwoShardClusterWithBinVersion(dbName, collName, "latest");
+
+ const txnIds = {
+ commit: {lsid: {id: UUID()}, txnNumber: 0},
+ commitMulti: {lsid: {id: UUID()}, txnNumber: 0},
+ write: {lsid: {id: UUID()}, txnNumber: 0},
+ };
+
+ let testDB = st.s.getDB(dbName);
+
+ // Retryable writes and transactions with and without prepare should work.
+ assert.commandWorked(runTxn(testDB, collName, txnIds.commit, {multiShard: false}));
+ assert.commandWorked(runTxn(testDB, collName, txnIds.commitMulti, {multiShard: true}));
+ assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.write);
+
+ // commitTransaction for both transactions and the retryable write should be retryable.
+ assert.commandWorked(retryCommit(testDB, txnIds.commit));
+ assert.commandWorked(retryCommit(testDB, txnIds.commitMulti));
+ assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write);
+
+ // Downgrade featureCompatibilityVersion.
+ assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: lastStableFCV}));
+ checkFCV(st.configRS.getPrimary().getDB("admin"), lastStableFCV);
+
+ // Only the retryable write can be retried. Can't retry the multi shard transaction because it
+ // uses coordinateCommit, which is not allowed in FCV 4.0.
+ assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write);
+ assert.commandFailedWithCode(retryCommit(testDB, txnIds.commit), ErrorCodes.NoSuchTransaction);
+ assert.commandFailedWithCode(retryCommit(testDB, txnIds.commitMulti),
+ ErrorCodes.CommandNotSupported);
+
+ downgradeUniqueIndexesScript(st.s.getDB("test"));
+
+ // Downgrade the mongos servers first.
+ jsTestLog("Downgrading mongos servers.");
+ st.upgradeCluster("last-stable",
+ {upgradeConfigs: false, upgradeMongos: true, upgradeShards: false});
+
+ // Then downgrade the shard servers.
+ jsTestLog("Downgrading shard servers.");
+ st.upgradeCluster("last-stable",
+ {upgradeConfigs: false, upgradeMongos: false, upgradeShards: true});
+
+ // Then downgrade the config servers.
+ jsTestLog("Downgrading config servers.");
+ st.upgradeCluster("last-stable",
+ {upgradeConfigs: true, upgradeMongos: false, upgradeShards: false});
+ checkFCV(st.configRS.getPrimary().getDB("admin"), lastStableFCV);
+
+ testDB = st.s.getDB(dbName);
+
+ // Can still retry the retryable write.
+ assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write);
+
+ // The txnIds used for the earlier commits should be re-usable because their history was
+ // removed.
+ assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.commit);
+ assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.commit);
+
+ assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.commitMulti);
+ assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.commitMulti);
+
+ // Can perform a new operation on each session.
+ Object.keys(txnIds).forEach((txnIdKey) => {
+ txnIds[txnIdKey].txnNumber += 1;
+ });
+
+ assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.commit);
+ assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.commitMulti);
+ assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.write);
+
+ st.stop();
+})();
diff --git a/jstests/multiVersion/sharded_txn_upgrade_cluster.js b/jstests/multiVersion/sharded_txn_upgrade_cluster.js
new file mode 100644
index 00000000000..04c3bddfde7
--- /dev/null
+++ b/jstests/multiVersion/sharded_txn_upgrade_cluster.js
@@ -0,0 +1,82 @@
+/**
+ * Tests upgrading a cluster from last stable to current version, verifying the behavior of
+ * transactions and retryable writes throughout the process.
+ *
+ * @tags: [uses_transactions, uses_multi_shard_transaction]
+ */
+
+// Checking UUID consistency uses cached connections, which are not valid across restarts or
+// stepdowns.
+TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+
+(function() {
+ "use strict";
+
+ load("jstests/libs/feature_compatibility_version.js");
+ load("jstests/multiVersion/libs/multi_rs.js");
+ load("jstests/multiVersion/libs/multi_cluster.js");
+ load("jstests/multiVersion/libs/sharded_txn_upgrade_downgrade_cluster_shared.js");
+
+ const dbName = "test";
+ const collName = "sharded_txn_upgrade_cluster";
+
+ // Start a cluster with two shards at the last stable version.
+ const st = setUpTwoShardClusterWithBinVersion(dbName, collName, "last-stable");
+
+ const txnIds = {
+ commit: {lsid: {id: UUID()}, txnNumber: 0},
+ commitMulti: {lsid: {id: UUID()}, txnNumber: 0},
+ write: {lsid: {id: UUID()}, txnNumber: 0},
+ };
+
+ let testDB = st.s.getDB(dbName);
+
+ // Only retryable writes work and they are retryable.
+ assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.write);
+ assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write);
+
+ // Upgrade the config servers.
+ jsTestLog("Upgrading config servers.");
+ st.upgradeCluster("latest", {upgradeConfigs: true, upgradeMongos: false, upgradeShards: false});
+
+ // Then upgrade the shard servers.
+ jsTestLog("Upgrading shard servers.");
+ st.upgradeCluster("latest", {upgradeConfigs: false, upgradeMongos: false, upgradeShards: true});
+
+ // Then upgrade mongos servers.
+ jsTestLog("Upgrading mongos servers.");
+ st.upgradeCluster("latest", {upgradeConfigs: false, upgradeMongos: true, upgradeShards: false});
+ checkFCV(st.configRS.getPrimary().getDB("admin"), lastStableFCV);
+
+ testDB = st.s.getDB(dbName);
+
+ // Can still retry the retryable write.
+ assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write);
+
+ // Transactions that don't use prepare are allowed in FCV 4.0 with a 4.2 binary mongos.
+ assert.commandWorked(runTxn(testDB, collName, txnIds.commit, {multiShard: false}));
+
+ // Multi shard transactions will fail because coordinateCommit is not allowed in FCV 4.0.
+ assert.commandFailedWithCode(runTxn(testDB, collName, txnIds.commitMulti, {multiShard: true}),
+ ErrorCodes.CommandNotSupported);
+
+ // Upgrade the cluster's feature compatibility version to the latest.
+ assert.commandWorked(
+ st.s.getDB("admin").runCommand({setFeatureCompatibilityVersion: latestFCV}));
+ checkFCV(st.configRS.getPrimary().getDB("admin"), latestFCV);
+
+ // Can still retry the retryable write and the committed transaction.
+ assertMultiShardRetryableWriteCanBeRetried(testDB, collName, txnIds.write);
+ assert.commandWorked(retryCommit(testDB, txnIds.commit));
+
+ // Can perform a new operation on each session.
+ Object.keys(txnIds).forEach((txnIdKey) => {
+ txnIds[txnIdKey].txnNumber += 1;
+ });
+
+ assert.commandWorked(runTxn(testDB, collName, txnIds.commit, {multiShard: false}));
+ assert.commandWorked(runTxn(testDB, collName, txnIds.commitMulti, {multiShard: true}));
+ assertMultiShardRetryableWriteWorked(testDB, collName, txnIds.write);
+
+ st.stop();
+})();
diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
index 08b13786b9e..50196ec7aed 100644
--- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
+++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp
@@ -27,12 +27,15 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
#include "mongo/platform/basic.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/coll_mod.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/commands/feature_compatibility_version_command_parser.h"
@@ -40,15 +43,22 @@
#include "mongo/db/commands/feature_compatibility_version_parser.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/database_version_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -57,6 +67,156 @@ namespace {
MONGO_FAIL_POINT_DEFINE(featureCompatibilityDowngrade);
MONGO_FAIL_POINT_DEFINE(featureCompatibilityUpgrade);
+MONGO_FAIL_POINT_DEFINE(pauseBeforeUpgradingSessions);
+MONGO_FAIL_POINT_DEFINE(pauseBeforeDowngradingSessions);
+
+/**
+ * Returns a set of the logical session ids of each entry in config.transactions that matches the
+ * given query.
+ */
+LogicalSessionIdSet getMatchingSessionIdsFromTransactionTable(OperationContext* opCtx,
+ Query query) {
+ LogicalSessionIdSet sessionIds = {};
+
+ DBDirectClient client(opCtx);
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, query);
+ while (cursor->more()) {
+ auto txnRecord = SessionTxnRecord::parse(
+ IDLParserErrorContext("setFCV-find-matching-sessions"), cursor->next());
+ sessionIds.insert(txnRecord.getSessionId());
+ }
+ return sessionIds;
+}
+
+/**
+ * Checks out each given session with a new operation context, verifies the session's transaction
+ * participant passes the validation function, runs the modification function with a direct
+ * client from another new operation context while the session is checked out, then invalidates the
+ * session.
+ */
+void forEachSessionWithCheckout(
+ OperationContext* opCtx,
+ LogicalSessionIdSet sessionIds,
+ stdx::function<bool(OperationContext* opCtx)> verifyTransactionParticipantFn,
+ stdx::function<void(DBDirectClient* client, LogicalSessionId sessionId)>
+ performModificationFn) {
+ // Construct a new operation context to check out the session with.
+ auto clientForCheckout =
+ opCtx->getServiceContext()->makeClient("setFCV-transaction-table-checkout");
+ AlternativeClientRegion acrForCheckout(clientForCheckout);
+ for (const auto& sessionId : sessionIds) {
+ // Check for interrupt on the parent opCtx because killing it won't be propagated to the
+ // opCtx checking out the session and performing the modification.
+ opCtx->checkForInterrupt();
+
+ const auto opCtxForCheckout = cc().makeOperationContext();
+ opCtxForCheckout->setLogicalSessionId(sessionId);
+ MongoDOperationContextSession ocs(opCtxForCheckout.get());
+
+ // Now that the session is checked out, verify it still needs to be modified using its
+ // transaction participant.
+ if (!verifyTransactionParticipantFn(opCtxForCheckout.get())) {
+ continue;
+ }
+
+ {
+ // Perform the modification on another operation context to bypass retryable writes and
+ // transactions machinery.
+ auto clientForModification =
+ opCtx->getServiceContext()->makeClient("setFCV-transaction-table-modification");
+ AlternativeClientRegion acrForModification(clientForModification);
+
+ const auto opCtxForModification = cc().makeOperationContext();
+ DBDirectClient directClient(opCtxForModification.get());
+ performModificationFn(&directClient, sessionId);
+ }
+
+ // Note that invalidating the session here is unnecessary if the modification function
+ // writes directly to config.transactions, which already invalidates the affected session.
+ auto txnParticipant = TransactionParticipant::get(opCtxForCheckout.get());
+ txnParticipant.invalidate(opCtxForCheckout.get());
+ }
+}
+
+/**
+ * Removes all documents from config.transactions with a "state" field because they may point to
+ * oplog entries in a format a 4.0 mongod cannot process.
+ */
+void downgradeTransactionTable(OperationContext* opCtx) {
+ // In FCV 4.0, all transaction table entries associated with a transaction have a "state" field.
+ Query query(BSON("state" << BSON("$exists" << true)));
+ LogicalSessionIdSet sessionIdsWithState =
+ getMatchingSessionIdsFromTransactionTable(opCtx, query);
+
+ if (MONGO_FAIL_POINT(pauseBeforeDowngradingSessions)) {
+ LOG(0) << "Hit pauseBeforeDowngradingSessions failpoint";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(pauseBeforeDowngradingSessions);
+ }
+
+ // Remove all transaction table entries associated with a committed / aborted transaction. Note
+ // that transactions that abort before prepare have no entry.
+ forEachSessionWithCheckout(
+ opCtx,
+ sessionIdsWithState,
+ [](OperationContext* opCtx) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ return txnParticipant.transactionIsCommitted() || txnParticipant.transactionIsAborted();
+ },
+ [](DBDirectClient* directClient, LogicalSessionId sessionId) {
+ const auto commandResponse = directClient->runCommand([&] {
+ write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(BSON("_id" << sessionId.toBSON()));
+ entry.setMulti(false);
+ return entry;
+ }()});
+ return deleteOp.serialize({});
+ }());
+ uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply()));
+ });
+}
+
+/**
+ * Adds a "state" field to all documents in config.transactions that represent committed
+ * transactions so they are in the 4.2 format.
+ */
+void upgradeTransactionTable(OperationContext* opCtx) {
+ // Retryable writes and committed transactions have the same format in FCV 4.0, so use an empty
+ // query to return all session ids in the transaction table.
+ LogicalSessionIdSet allSessionIds = getMatchingSessionIdsFromTransactionTable(opCtx, Query());
+
+ if (MONGO_FAIL_POINT(pauseBeforeUpgradingSessions)) {
+ LOG(0) << "Hit pauseBeforeUpgradingSessions failpoint";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(pauseBeforeUpgradingSessions);
+ }
+
+ // Add state=committed to the transaction table entry for each session that most recently
+ // committed a transaction.
+ forEachSessionWithCheckout(
+ opCtx,
+ allSessionIds,
+ [](OperationContext* opCtx) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ return txnParticipant.transactionIsCommitted();
+ },
+ [](DBDirectClient* directClient, LogicalSessionId sessionId) {
+ const auto commandResponse = directClient->runCommand([&] {
+ write_ops::Update updateOp(NamespaceString::kSessionTransactionsTableNamespace);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(BSON("_id" << sessionId.toBSON()));
+ entry.setU(BSON("$set" << BSON("state"
+ << "committed")));
+ entry.setMulti(false);
+ return entry;
+ }()});
+ return updateOp.serialize({});
+ }());
+ uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply()));
+ });
+}
+
/**
* Sets the minimum allowed version for the cluster. If it is 4.0, then the node should not use 4.2
* features.
@@ -171,6 +331,8 @@ public:
updateUniqueIndexesOnUpgrade(opCtx);
+ upgradeTransactionTable(opCtx);
+
// Upgrade shards before config finishes its upgrade.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
uassertStatusOK(
@@ -213,6 +375,8 @@ public:
Lock::GlobalLock lk(opCtx, MODE_S);
}
+ downgradeTransactionTable(opCtx);
+
// Downgrade shards before config finishes its downgrade.
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
uassertStatusOK(
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 829905e322e..17db14347b9 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1018,7 +1018,16 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
auto times = replLogApplyOps(
opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot);
- auto txnState = prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted;
+ auto txnState = [prepare]() -> boost::optional<DurableTxnStateEnum> {
+ if (serverGlobalParams.featureCompatibility.getVersion() <
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) {
+ invariant(!prepare);
+ return boost::none;
+ }
+
+ return prepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted;
+ }();
+
onWriteOpCompleted(
opCtx, cmdNss, {stmtId}, times.writeOpTime, times.wallClockTime, txnState);
return times;
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp
index 32cc3466879..958e77f1497 100644
--- a/src/mongo/db/repl/session_update_tracker.cpp
+++ b/src/mongo/db/repl/session_update_tracker.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/server_options.h"
#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/util/assert_util.h"
@@ -68,20 +69,25 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate(
newTxnRecord.setLastWriteOpTime(entry.getOpTime());
newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
- switch (entry.getCommandType()) {
- case repl::OplogEntry::CommandType::kApplyOps:
- newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared
- : DurableTxnStateEnum::kCommitted);
- break;
- case repl::OplogEntry::CommandType::kCommitTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- break;
- case repl::OplogEntry::CommandType::kAbortTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kAborted);
- break;
- default:
- break;
+ // "state" is a new field in 4.2.
+ if (serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) {
+ switch (entry.getCommandType()) {
+ case repl::OplogEntry::CommandType::kApplyOps:
+ newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared
+ : DurableTxnStateEnum::kCommitted);
+ break;
+ case repl::OplogEntry::CommandType::kCommitTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+ break;
+ case repl::OplogEntry::CommandType::kAbortTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kAborted);
+ break;
+ default:
+ break;
+ }
}
+
return newTxnRecord.toBSON();
}();
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index c811ff0dd82..6ac27643f1e 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -907,12 +907,16 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* opCtx,
// Commands must be processed one at a time. The only exception to this is applyOps because
// applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is safe
// to batch applyOps commands with CRUD operations when reading from the oplog buffer.
+ //
// Oplog entries on 'system.views' should also be processed one at a time. View catalog
// immediately reflects changes for each oplog entry so we can see inconsistent view catalog if
// multiple oplog entries on 'system.views' are being applied out of the original order.
+ //
+ // Process updates to 'admin.system.version' individually as well so the secondary's FCV when
+ // processing each operation matches the primary's when committing that operation.
if ((entry.isCommand() &&
(entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare())) ||
- entry.getNss().isSystemDotViews()) {
+ entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) {
if (ops->getCount() == 1) {
// apply commands one-at-a-time
_consume(opCtx, oplogBuffer);
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index dad487ccdb6..941fa6c5b06 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -155,6 +155,15 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
return result;
}
+ // State is a new field in FCV 4.2 that indicates if a transaction committed, so check it in FCV
+ // 4.2 and upgrading to 4.2. Check when downgrading as well so sessions refreshed at the start
+ // of downgrade enter the correct state.
+ if ((serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kDowngradingTo40) &&
+ result.lastTxnRecord->getState() == DurableTxnStateEnum::kCommitted) {
+ result.transactionCommitted = true;
+ }
+
auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime());
while (it.hasNext()) {
try {
@@ -181,11 +190,14 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
entry.getOpTime());
}
- // Either an applyOps oplog entry without a prepare flag or the state being kCommitted
- // marks the commit of a transaction.
- if ((entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps &&
- !entry.shouldPrepare()) ||
- (result.lastTxnRecord->getState() == DurableTxnStateEnum::kCommitted)) {
+ // State is a new field in FCV 4.2, so look for an applyOps oplog entry without a
+ // prepare flag to mark a committed transaction in FCV 4.0 or downgrading to 4.0. Check
+ // when upgrading as well so sessions refreshed at the beginning of upgrade enter the
+ // correct state.
+ if ((serverGlobalParams.featureCompatibility.getVersion() <=
+ ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42) &&
+ (entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps &&
+ !entry.shouldPrepare())) {
result.transactionCommitted = true;
}
} catch (const DBException& ex) {