summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml1
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js183
-rw-r--r--jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js115
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp4
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp83
-rw-r--r--src/mongo/db/repl/rollback_impl.h11
-rw-r--r--src/mongo/db/repl/rollback_impl_test.cpp330
8 files changed, 731 insertions, 0 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml b/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml
index 7f569634ba0..b22e43d140f 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml
@@ -6,6 +6,7 @@ selector:
exclude_files:
# Expects oplog entries to be in $v:2 format.
- jstests/replsets/v2_delta_oplog_entries.js
+ - jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js
executor:
config:
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 5782df5f14e..f1fb315f7d7 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -134,6 +134,10 @@ all:
test_file: jstests/replsets/reconfig_removes_node_in_rollback.js
- ticket: SERVER-55725
test_file: jstests/sharding/time_zone_info_mongos.js
+ - ticket: SERVER-55305
+ test_file: jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js
+ - ticket: SERVER-55305
+ test_file: jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
diff --git a/jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js b/jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js
new file mode 100644
index 00000000000..39ceaa77713
--- /dev/null
+++ b/jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js
@@ -0,0 +1,183 @@
+/**
+ * Tests that the rollback procedure will update the 'config.transactions' table to be consistent
+ * with the node data at the 'stableTimestamp', specifically in the case where multiple derived ops
+ * to the 'config.transactions' table were coalesced into a single operation during secondary oplog
+ * application.
+ * We also test that if a node crashes after oplog truncation during rollback, the update made to
+ * the 'config.transactions' table is persisted on startup.
+ *
+ * @tags: [requires_persistence]
+ */
+
+(function() {
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/write_concern_util.js");
+
+const oplogApplierBatchSize = 100;
+
+function runTest(crashAfterRollbackTruncation) {
+ const rst = new ReplSetTest({
+ nodes: {
+ n0: {},
+ // Set the 'syncdelay' to 1s to speed up checkpointing. Also explicitly set the batch
+ // size for oplog application to ensure the number of retryable write statements being
+ // made majority committed isn't a multiple of it.
+ n1: {syncdelay: 1, setParameter: {replBatchLimitOperations: oplogApplierBatchSize}},
+ // Set the bgSyncOplogFetcherBatchSize to 1 oplog entry to guarantee replication
+ // progress with the stopReplProducerOnDocument failpoint.
+ n2: {setParameter: {bgSyncOplogFetcherBatchSize: 1}},
+ n3: {setParameter: {bgSyncOplogFetcherBatchSize: 1}},
+ n4: {setParameter: {bgSyncOplogFetcherBatchSize: 1}},
+ },
+ // Force secondaries to sync from the primary to guarantee replication progress with the
+ // stopReplProducerOnDocument failpoint. Also disable primary catchup because some
+ // replicated retryable write statements are intentionally not being made majority
+ // committed.
+ settings: {chainingAllowed: false, catchUpTimeoutMillis: 0},
+ });
+ rst.startSet();
+ rst.initiate();
+
+ const primary = rst.getPrimary();
+ const ns = "test.retryable_write_partial_rollback";
+ assert.commandWorked(
+ primary.getCollection(ns).insert({_id: 0, counter: 0}, {writeConcern: {w: 5}}));
+
+ const [secondary1, secondary2, secondary3, secondary4] = rst.getSecondaries();
+
+ // Disable replication on all of the secondaries to manually control the replication progress.
+ const stopReplProducerFailpoints = [secondary1, secondary2, secondary3, secondary4].map(
+ conn => configureFailPoint(conn, 'stopReplProducer'));
+
+ // While replication is still entirely disabled, additionally disable replication partway into
+ // the retryable write on all but the first secondary. The idea is that while secondary1 will
+ // apply all of the oplog entries in a single batch, the other secondaries will only apply up to
+ // counterMajorityCommitted oplog entries.
+ const counterTotal = oplogApplierBatchSize;
+ const counterMajorityCommitted = counterTotal - 2;
+ const stopReplProducerOnDocumentFailpoints = [secondary2, secondary3, secondary4].map(
+ conn => configureFailPoint(conn,
+ 'stopReplProducerOnDocument',
+ {document: {"diff.u.counter": counterMajorityCommitted + 1}}));
+
+ const lsid = ({id: UUID()});
+
+ assert.commandWorked(primary.getCollection(ns).runCommand("update", {
+ updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
+ lsid,
+ txnNumber: NumberLong(1),
+ }));
+
+ const stmtMajorityCommitted = primary.getCollection("local.oplog.rs")
+ .findOne({ns, "o.diff.u.counter": counterMajorityCommitted});
+ assert.neq(null, stmtMajorityCommitted);
+
+ for (const fp of stopReplProducerFailpoints) {
+ fp.off();
+
+ // Wait for the secondary to have applied through the counterMajorityCommitted retryable
+ // write statement. We do this for each secondary individually, starting with secondary1, to
+ // guarantee that secondary1 will advance its stable_timestamp when learning of the other
+ // secondaries also having applied through counterMajorityCommitted.
+ assert.soon(() => {
+ const {optimes: {appliedOpTime, durableOpTime}} =
+ assert.commandWorked(fp.conn.adminCommand({replSetGetStatus: 1}));
+
+ print(`${fp.conn.host}: ${tojsononeline({
+ appliedOpTime,
+ durableOpTime,
+ stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts
+ })}`);
+
+ return bsonWoCompare(appliedOpTime.ts, stmtMajorityCommitted.ts) >= 0 &&
+ bsonWoCompare(durableOpTime.ts, stmtMajorityCommitted.ts) >= 0;
+ });
+ }
+
+ // Wait for secondary1 to have advanced its stable_timestamp.
+ assert.soon(() => {
+ const {lastStableRecoveryTimestamp} =
+ assert.commandWorked(secondary1.adminCommand({replSetGetStatus: 1}));
+
+ print(`${secondary1.host}: ${tojsononeline({
+ lastStableRecoveryTimestamp,
+ stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts
+ })}`);
+
+ return bsonWoCompare(lastStableRecoveryTimestamp, stmtMajorityCommitted.ts) >= 0;
+ });
+
+ // Step up one of the other secondaries and do a write which becomes majority committed to force
+ // secondary1 to go into rollback.
+ rst.freeze(secondary1);
+ assert.commandWorked(secondary2.adminCommand({replSetStepUp: 1}));
+ rst.freeze(primary);
+ rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary2);
+
+ let hangAfterTruncate;
+ if (crashAfterRollbackTruncation) {
+ hangAfterTruncate = configureFailPoint(secondary1, 'hangAfterOplogTruncationInRollback');
+ }
+
+ for (const fp of stopReplProducerOnDocumentFailpoints) {
+ fp.off();
+ }
+
+ // Wait for secondary2 to be a writable primary.
+ rst.getPrimary();
+
+ // Do a write which becomes majority committed and wait for secondary1 to complete its rollback.
+ assert.commandWorked(
+ secondary2.getCollection("test.dummy").insert({}, {writeConcern: {w: 'majority'}}));
+
+ if (crashAfterRollbackTruncation) {
+ // Entering rollback will close connections so we expect some network errors when waiting
+ // on the failpoint.
+ assert.soonNoExcept(() => {
+ hangAfterTruncate.wait();
+ return true;
+ }, `failed to wait for fail point ${hangAfterTruncate.failPointName}`);
+
+ // Crash the node after it performs oplog truncation.
+ rst.stop(secondary1, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL});
+ node = rst.restart(secondary1, {
+ "noReplSet": false,
+ setParameter: 'failpoint.stopReplProducer=' + tojson({mode: 'alwaysOn'})
+ });
+ rst.waitForState(secondary1, ReplSetTest.State.SECONDARY);
+ secondary1.setSecondaryOk();
+ // On startup, we expect to see the update persisted in the 'config.transactions' table.
+ let restoredDoc =
+ secondary1.getCollection('config.transactions').findOne({"_id.id": lsid.id});
+ assert.neq(null, restoredDoc);
+ secondary1.adminCommand({configureFailPoint: "stopReplProducer", mode: "off"});
+ }
+
+ // Reconnect to secondary1 after it completes its rollback and step it up to be the new primary.
+ rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary2);
+ assert.commandWorked(secondary1.adminCommand({replSetFreeze: 0}));
+ rst.stepUp(secondary1, {awaitWritablePrimary: false});
+
+ const docBeforeRetry = secondary1.getCollection(ns).findOne({_id: 0});
+ assert.eq(docBeforeRetry, {_id: 0, counter: counterMajorityCommitted});
+
+ assert.commandWorked(secondary1.getCollection(ns).runCommand("update", {
+ updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
+ lsid,
+ txnNumber: NumberLong(1),
+ writeConcern: {w: 5},
+ }));
+
+ const docAfterRetry = secondary1.getCollection(ns).findOne({_id: 0});
+ assert.eq(docAfterRetry, {_id: 0, counter: counterTotal});
+
+ rst.stopSet();
+}
+
+// Test the general scenario where we perform the appropriate update to the 'config.transactions'
+// table during rollback.
+runTest(false);
+// Extends the test to crash the secondary in the middle of rollback right after oplog truncation.
+// We assert that the update made to the 'config.transactions' table persisted on startup.
+runTest(true);
+})(); \ No newline at end of file
diff --git a/jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js b/jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js
new file mode 100644
index 00000000000..e6ca0056c31
--- /dev/null
+++ b/jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js
@@ -0,0 +1,115 @@
+/**
+ * Tests that the rollback procedure will update the 'config.transactions' table to be consistent
+ * with the node data at the 'stableTimestamp', specifically in the case where multiple derived ops
+ * to the 'config.transactions' table were coalesced into a single operation when performing
+ * vectored inserts on the primary.
+ */
+
+(function() {
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/write_concern_util.js");
+
+const rst = new ReplSetTest({
+ nodes: {
+ // Set the syncdelay to 1s to speed up checkpointing.
+ n0: {syncdelay: 1},
+ // Set the bgSyncOplogFetcherBatchSize to 1 oplog entry to guarantee replication progress
+ // with the stopReplProducerOnDocument failpoint.
+ n1: {setParameter: {bgSyncOplogFetcherBatchSize: 1}},
+ n2: {setParameter: {bgSyncOplogFetcherBatchSize: 1}},
+ },
+ // Force secondaries to sync from the primary to guarantee replication progress with the
+ // stopReplProducerOnDocument failpoint. Also disable primary catchup because some replicated
+ // retryable write statements are intentionally not being made majority committed.
+ settings: {chainingAllowed: false, catchUpTimeoutMillis: 0},
+});
+rst.startSet();
+rst.initiate();
+
+const primary = rst.getPrimary();
+const ns = "test.retryable_write_coalesced_txn_updates";
+assert.commandWorked(primary.getCollection(ns).insert({_id: -1}, {writeConcern: {w: 3}}));
+
+const [secondary1, secondary2] = rst.getSecondaries();
+
+// Disable replication partway into the retryable write on all of the secondaries. The idea is that
+// while the primary will apply all of the writes in a single storage transaction, the secondaries
+// will only apply up to insertBatchMajorityCommitted oplog entries.
+const insertBatchTotal = 20;
+const insertBatchMajorityCommitted = insertBatchTotal - 2;
+const stopReplProducerOnDocumentFailpoints = [secondary1, secondary2].map(
+ conn => configureFailPoint(
+ conn, 'stopReplProducerOnDocument', {document: {"_id": insertBatchMajorityCommitted + 1}}));
+
+const lsid = ({id: UUID()});
+
+assert.commandWorked(primary.getCollection(ns).runCommand("insert", {
+ documents: Array.from({length: insertBatchTotal}, (_, i) => ({_id: i})),
+ lsid,
+ txnNumber: NumberLong(1),
+}));
+
+const stmtMajorityCommitted =
+ primary.getCollection("local.oplog.rs").findOne({ns, "o._id": insertBatchMajorityCommitted});
+assert.neq(null, stmtMajorityCommitted);
+
+// Wait for the primary to have advanced its stable_timestamp.
+assert.soon(() => {
+ const {lastStableRecoveryTimestamp} =
+ assert.commandWorked(primary.adminCommand({replSetGetStatus: 1}));
+
+ const wtStatus = assert.commandWorked(primary.adminCommand({serverStatus: 1})).wiredTiger;
+ const latestMajority =
+ wtStatus["snapshot-window-settings"]["latest majority snapshot timestamp available"];
+
+ print(`${primary.host}: ${tojsononeline({
+ lastStableRecoveryTimestamp,
+ stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts,
+ "latest majority snapshot timestamp available": latestMajority
+ })}`);
+
+ // Make sure 'secondary1' has a 'lastApplied' optime equal to 'stmtMajorityCommitted.ts'.
+ // Otherwise, it can fail to win the election later.
+ const {optimes: {appliedOpTime}} =
+ assert.commandWorked(secondary1.adminCommand({replSetGetStatus: 1}));
+ print(`${secondary1.host}: ${tojsononeline({appliedOpTime})}`);
+
+ return bsonWoCompare(lastStableRecoveryTimestamp, stmtMajorityCommitted.ts) >= 0 &&
+ bsonWoCompare(appliedOpTime.ts, stmtMajorityCommitted.ts) >= 0;
+});
+
+// Step up one of the secondaries and do a write which becomes majority committed to force the
+// current primary to go into rollback.
+assert.commandWorked(secondary1.adminCommand({replSetStepUp: 1}));
+rst.freeze(primary);
+rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary1);
+
+for (const fp of stopReplProducerOnDocumentFailpoints) {
+ fp.off();
+}
+
+rst.getPrimary(); // Wait for secondary1 to be a writable primary.
+
+// Do a write which becomes majority committed and wait for the old primary to have completed its
+// rollback.
+assert.commandWorked(secondary1.getCollection("test.dummy").insert({}, {writeConcern: {w: 3}}));
+
+// Reconnect to the primary after it completes its rollback and step it up to be the primary again.
+rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary1);
+assert.commandWorked(primary.adminCommand({replSetFreeze: 0}));
+rst.stepUp(primary);
+
+print(`${primary.host} session txn record: ${
+ tojson(primary.getCollection("config.transactions").findOne({"_id.id": lsid.id}))}`);
+
+// Make sure we don't re-execute operations that have already been inserted by making sure we
+// we don't get a 'DuplicateKeyError'.
+assert.commandWorked(primary.getCollection(ns).runCommand("insert", {
+ documents: Array.from({length: insertBatchTotal}, (_, i) => ({_id: i})),
+ lsid,
+ txnNumber: NumberLong(1),
+ writeConcern: {w: 3},
+}));
+
+rst.stopSet();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index c9d4aa3c381..2bc00bfefe2 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -60,6 +60,8 @@
namespace mongo {
namespace repl {
+MONGO_FAIL_POINT_DEFINE(hangAfterOplogTruncationInRollback);
+
namespace {
const auto kRecoveryBatchLogLevel = logv2::LogSeverity::Debug(2);
@@ -449,6 +451,8 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx,
// This may take an IS lock on the oplog collection.
_truncateOplogIfNeededAndThenClearOplogTruncateAfterPoint(opCtx, &stableTimestamp);
+ hangAfterOplogTruncationInRollback.pauseWhileSet();
+
auto topOfOplogSW = _getTopOfOplog(opCtx);
if (topOfOplogSW.getStatus() == ErrorCodes::CollectionIsEmpty ||
topOfOplogSW.getStatus() == ErrorCodes::NamespaceNotFound) {
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 1ce1bb85b8e..14be427c59c 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -43,7 +43,10 @@
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/replication_state_transition_lock_guard.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/kill_sessions_local.h"
#include "mongo/db/logical_time_validator.h"
@@ -457,6 +460,80 @@ StatusWith<std::set<NamespaceString>> RollbackImpl::_namespacesForOp(const Oplog
return namespaces;
}
+void RollbackImpl::_restoreTxnsTableEntryFromRetryableWrites(OperationContext* opCtx,
+ Timestamp stableTimestamp) {
+ auto client = std::make_unique<DBDirectClient>(opCtx);
+ // Query for retryable writes oplog entries with a non-null 'prevWriteOpTime' value
+ // less than or equal to the 'stableTimestamp'. This query intends to include no-op
+ // retryable writes oplog entries that have been applied through a migration process.
+ const auto filter = BSON("op" << BSON("$in" << BSON_ARRAY("i"
+ << "u"
+ << "d")));
+ // We use the 'fromMigrate' field to differentiate migrated retryable writes entries from
+ // transactions entries.
+ const auto filterFromMigration = BSON("op"
+ << "n"
+ << "fromMigrate" << true);
+ auto cursor = client->query(
+ NamespaceString::kRsOplogNamespace,
+ QUERY("ts" << BSON("$gt" << stableTimestamp) << "txnNumber" << BSON("$exists" << true)
+ << "stmtId" << BSON("$exists" << true) << "prevOpTime.ts"
+ << BSON("$gte" << Timestamp(1, 0) << "$lte" << stableTimestamp) << "$or"
+ << BSON_ARRAY(filter << filterFromMigration)));
+ while (cursor->more()) {
+ auto doc = cursor->next();
+ auto swEntry = OplogEntry::parse(doc);
+ fassert(5530502, swEntry.isOK());
+ auto entry = swEntry.getValue();
+ auto prevWriteOpTime = *entry.getPrevWriteOpTimeInTransaction();
+ OperationSessionInfo opSessionInfo = entry.getOperationSessionInfo();
+ const auto sessionId = *opSessionInfo.getSessionId();
+ const auto txnNumber = *opSessionInfo.getTxnNumber();
+ const auto wallClockTime = entry.getWallClockTime();
+
+ invariant(!prevWriteOpTime.isNull() && prevWriteOpTime.getTimestamp() <= stableTimestamp);
+ // This is a retryable writes oplog entry with a non-null 'prevWriteOpTime' value that
+ // is less than or equal to the 'stableTimestamp'.
+ LOGV2(5530501,
+ "Restoring sessions entry to be consistent with 'stableTimestamp'",
+ "stableTimestamp"_attr = stableTimestamp,
+ "sessionId"_attr = sessionId,
+ "txnNumber"_attr = txnNumber,
+ "lastWriteOpTime"_attr = prevWriteOpTime);
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setSessionId(sessionId);
+ sessionTxnRecord.setTxnNum(txnNumber);
+ try {
+ TransactionHistoryIterator iter(prevWriteOpTime);
+ auto nextOplogEntry = iter.next(opCtx);
+ sessionTxnRecord.setLastWriteOpTime(nextOplogEntry.getOpTime());
+ sessionTxnRecord.setLastWriteDate(nextOplogEntry.getWallClockTime());
+ } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>&) {
+ // It's possible that the next entry in the oplog chain has been truncated due to
+ // oplog cap maintenance.
+ sessionTxnRecord.setLastWriteOpTime(prevWriteOpTime);
+ sessionTxnRecord.setLastWriteDate(wallClockTime);
+ }
+ const auto nss = NamespaceString::kSessionTransactionsTableNamespace;
+ writeConflictRetry(opCtx, "updateSessionTransactionsTableInRollback", nss.ns(), [&] {
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+ auto filter = BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON());
+ UnreplicatedWritesBlock uwb(opCtx);
+ // Perform an untimestamped write so that it will not be rolled back on recovering
+ // to the 'stableTimestamp' if we were to crash. This is safe because this update is
+ // meant to be consistent with the 'stableTimestamp' and not the common point.
+ Helpers::upsert(
+ opCtx, nss.ns(), filter, sessionTxnRecord.toBSON(), /*fromMigrate=*/false);
+ });
+ }
+ // Take a stable checkpoint so that writes to the 'config.transactions' table are
+ // persisted to disk before truncating the oplog. If we were to take an unstable checkpoint, we
+ // would have to update replication metadata like 'minValid.appliedThrough' to be consistent
+ // with the oplog.
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(opCtx,
+ /*stableCheckpoint=*/true);
+}
+
void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns(
OperationContext* opCtx, RollBackLocalOperations::RollbackCommonPoint commonPoint) noexcept {
// Stop and wait for all background index builds to complete before starting the rollback
@@ -514,6 +591,12 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns(
"update"_attr = _observerInfo.rollbackCommandCounts[kUpdateCmdName],
"delete"_attr = _observerInfo.rollbackCommandCounts[kDeleteCmdName]);
+ // Retryable writes create derived updates to the transactions table which can be coalesced into
+ // one operation, so certain session operations history may be lost after restoring to the
+ // 'stableTimestamp'. We must scan the oplog and restore the transactions table entries to
+ // detail the last executed writes.
+ _restoreTxnsTableEntryFromRetryableWrites(opCtx, stableTimestamp);
+
// During replication recovery, we truncate all oplog entries with timestamps greater than the
// oplog truncate after point. If we entered rollback, we are guaranteed to have at least one
// oplog entry after the common point.
diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h
index 4d95e5e7e4e..9fe3206f4f7 100644
--- a/src/mongo/db/repl/rollback_impl.h
+++ b/src/mongo/db/repl/rollback_impl.h
@@ -367,6 +367,17 @@ private:
void _stopAndWaitForIndexBuilds(OperationContext* opCtx);
/**
+ * Performs a forward scan of the oplog starting at 'stableTimestamp', exclusive. For every
+ * retryable write oplog entry that has a 'prevOpTime' <= 'stableTimestamp', update the
+ * transactions table with the appropriate information to detail the last executed operation. We
+ * do this because derived updates to the transactions table can be coalesced into one
+ * operation, and so certain session entry updates may not exist when restoring to the
+ * 'stableTimestamp'.
+ */
+ void _restoreTxnsTableEntryFromRetryableWrites(OperationContext* opCtx,
+ Timestamp stableTimestamp);
+
+ /**
* Recovers to the stable timestamp while holding the global exclusive lock.
* Returns the stable timestamp that the storage engine recovered to.
*/
diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp
index 767c0e81703..f03faaa390b 100644
--- a/src/mongo/db/repl/rollback_impl_test.cpp
+++ b/src/mongo/db/repl/rollback_impl_test.cpp
@@ -410,6 +410,60 @@ OplogInterfaceMock::Operation makeOpAndRecordId(int count) {
}
/**
+ * Helper to create a noop entry that represents a migrated retryable write or transaction oplog
+ * entry.
+ */
+BSONObj makeMigratedNoop(OpTime opTime,
+ boost::optional<BSONObj> o2,
+ LogicalSessionId lsid,
+ int txnNum,
+ OpTime prevOpTime,
+ boost::optional<int> stmtId,
+ int wallClockMillis,
+ bool isRetryableWrite) {
+ repl::MutableOplogEntry op;
+ op.setOpType(repl::OpTypeEnum::kNoop);
+ op.setNss(nss);
+ op.setObject(BSONObj());
+ op.setOpTime(opTime);
+ if (isRetryableWrite) {
+ op.setFromMigrate(true);
+ }
+ op.setObject2(o2);
+ if (stmtId) {
+ op.setStatementIds({*stmtId});
+ }
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(lsid);
+ sessionInfo.setTxnNumber(txnNum);
+ op.setOperationSessionInfo(sessionInfo);
+ op.setPrevWriteOpTimeInTransaction(prevOpTime);
+ op.setWallClockTime(Date_t::fromMillisSinceEpoch(wallClockMillis));
+ return op.toBSON();
+}
+
+/**
+ * Helper to create a transaction command oplog entry.
+ */
+BSONObj makeTransactionOplogEntry(OpTime opTime,
+ LogicalSessionId lsid,
+ int txnNum,
+ OpTime prevOpTime) {
+ repl::MutableOplogEntry op;
+ op.setOpType(repl::OpTypeEnum::kCommand);
+ op.setNss(nss);
+ op.setObject(BSON("applyOps" << BSONArray()));
+ op.setOpTime(opTime);
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(lsid);
+ sessionInfo.setTxnNumber(txnNum);
+ op.setOperationSessionInfo(sessionInfo);
+ op.setPrevWriteOpTimeInTransaction(prevOpTime);
+ op.setWallClockTime(Date_t());
+ return op.toBSON();
+}
+
+/**
* Asserts that the documents in the oplog have the given timestamps.
*/
void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) {
@@ -1524,6 +1578,282 @@ TEST_F(RollbackImplTest, RollbackFixesCountForUnpreparedTransactionApplyOpsChain
ASSERT_EQ(_storageInterface->getFinalCollectionCount(collId), 1);
}
+TEST_F(RollbackImplTest, RollbackRestoresTxnTableEntryToBeConsistentWithStableTimestamp) {
+ const auto collUuid = UUID::gen();
+ const auto nss = NamespaceString::kSessionTransactionsTableNamespace;
+ _initializeCollection(_opCtx.get(), collUuid, nss);
+ LogicalSessionFromClient fromClient{};
+ fromClient.setId(UUID::gen());
+ LogicalSessionId lsid = makeLogicalSessionId(fromClient, _opCtx.get());
+ LogicalSessionFromClient fromClient2{};
+ fromClient2.setId(UUID::gen());
+ LogicalSessionId lsid2 = makeLogicalSessionId(fromClient2, _opCtx.get());
+ LogicalSessionFromClient fromClient3{};
+ fromClient3.setId(UUID::gen());
+ LogicalSessionId lsid3 = makeLogicalSessionId(fromClient3, _opCtx.get());
+
+ auto commonOpTime = OpTime(Timestamp(4, 4), 4);
+ auto commonPoint = makeOpAndRecordId(commonOpTime);
+ _remoteOplog->setOperations({commonPoint});
+ _storageInterface->setStableTimestamp(nullptr, commonOpTime.getTimestamp());
+
+ auto insertObj1 = BSON("_id" << 1);
+ auto insertObj2 = BSON("_id" << 2);
+ const auto txnNumOne = 1LL;
+ // Create retryable write oplog entry before 'stableTimestamp'.
+ auto opBeforeStableTs = makeInsertOplogEntry(1, insertObj1, nss.ns(), collUuid);
+ const auto prevOpTime = OpTime(opBeforeStableTs["ts"].timestamp(), 1);
+ BSONObjBuilder opBeforeStableTsBuilder(opBeforeStableTs);
+ opBeforeStableTsBuilder.append("lsid", lsid.toBSON());
+ opBeforeStableTsBuilder.append("txnNumber", txnNumOne);
+ opBeforeStableTsBuilder.append("prevOpTime", OpTime().toBSON());
+ opBeforeStableTsBuilder.append("stmtId", 1);
+ BSONObj oplogEntryBeforeStableTs = opBeforeStableTsBuilder.done();
+
+ const auto txnNumTwo = 2LL;
+ // Create no-op retryable write entry before 'stableTimestamp'.
+ auto noopPrevOpTime = OpTime(Timestamp(2, 2), 2);
+ auto noopEntryBeforeStableTs = makeMigratedNoop(noopPrevOpTime,
+ oplogEntryBeforeStableTs,
+ lsid2,
+ txnNumTwo,
+ OpTime(),
+ 1 /* stmtId */,
+ 2 /* wallClockMillis */,
+ true /* isRetryableWrite */);
+
+ // Create transactions entry after 'stableTimestamp'. Transactions entries are of 'command' op
+ // type.
+ auto txnOpTime = OpTime(Timestamp(3, 3), 3);
+ auto txnEntryBeforeStableTs = makeTransactionOplogEntry(txnOpTime, lsid3, 3, OpTime());
+
+ // Create retryable write oplog entry after 'stableTimestamp'.
+ auto firstOpAfterStableTs = makeInsertOplogEntry(5, insertObj2, nss.ns(), collUuid);
+ BSONObjBuilder opAfterStableTsBuilder(firstOpAfterStableTs);
+ opAfterStableTsBuilder.append("lsid", lsid.toBSON());
+ opAfterStableTsBuilder.append("txnNumber", txnNumOne);
+ // 'prevOpTime' points to 'oplogEntryBeforeStableTs'.
+ opAfterStableTsBuilder.append("prevOpTime", prevOpTime.toBSON());
+ opAfterStableTsBuilder.append("stmtId", 2);
+ BSONObj firstOplogEntryAfterStableTs = opAfterStableTsBuilder.done();
+
+ // Create no-op retryable write entry after 'stableTimestamp'.
+ auto noopEntryAfterStableTs = makeMigratedNoop(OpTime(Timestamp(6, 6), 6),
+ firstOplogEntryAfterStableTs,
+ lsid2,
+ txnNumTwo,
+ noopPrevOpTime,
+ 2 /* stmtId */,
+ 5 /* wallClockMillis */,
+ true /* isRetryableWrite */);
+
+ // Create transactions entry after 'stableTimestamp'. Transactions entries are of 'command' op
+ // type.
+ auto txnEntryAfterStableTs =
+ makeTransactionOplogEntry(OpTime(Timestamp(7, 7), 7), lsid3, 3, txnOpTime);
+
+ ASSERT_OK(_insertOplogEntry(oplogEntryBeforeStableTs));
+ ASSERT_OK(_insertOplogEntry(noopEntryBeforeStableTs));
+ ASSERT_OK(_insertOplogEntry(txnEntryBeforeStableTs));
+ ASSERT_OK(_insertOplogEntry(commonPoint.first));
+ ASSERT_OK(_insertOplogEntry(firstOplogEntryAfterStableTs));
+ ASSERT_OK(_insertOplogEntry(noopEntryAfterStableTs));
+ ASSERT_OK(_insertOplogEntry(txnEntryAfterStableTs));
+
+ auto status = _storageInterface->findSingleton(_opCtx.get(), nss);
+ // The 'config.transactions' table is currently empty.
+ ASSERT_NOT_OK(status);
+
+ // Doing a rollback should upsert two entries into the 'config.transactions' table.
+ ASSERT_OK(_rollback->runRollback(_opCtx.get()));
+ auto swDoc = _storageInterface->findById(
+ _opCtx.get(), nss, firstOplogEntryAfterStableTs.getField("lsid"));
+ ASSERT_OK(swDoc);
+ auto sessionsEntryBson = swDoc.getValue();
+ // New sessions entry should match the session information retrieved from the retryable writes
+ // oplog entry from before the 'stableTimestamp'.
+ ASSERT_EQUALS(sessionsEntryBson["txnNum"].numberInt(),
+ oplogEntryBeforeStableTs["txnNumber"].numberInt());
+ ASSERT_EQUALS(sessionsEntryBson["lastWriteOpTime"].timestamp(),
+ oplogEntryBeforeStableTs["prevOpTime"].timestamp());
+ ASSERT_EQUALS(sessionsEntryBson["lastWriteDate"].date(),
+ oplogEntryBeforeStableTs["wall"].date());
+
+ swDoc =
+ _storageInterface->findById(_opCtx.get(), nss, noopEntryBeforeStableTs.getField("lsid"));
+ ASSERT_OK(swDoc);
+ sessionsEntryBson = swDoc.getValue();
+ // New sessions entry should match the session information retrieved from the noop retryable
+ // writes oplog entry from before the 'stableTimestamp'.
+ ASSERT_EQUALS(sessionsEntryBson["txnNum"].numberInt(),
+ noopEntryBeforeStableTs["txnNumber"].numberInt());
+ ASSERT_EQUALS(sessionsEntryBson["lastWriteOpTime"].timestamp(),
+ noopEntryBeforeStableTs["prevOpTime"].timestamp());
+ ASSERT_EQUALS(sessionsEntryBson["lastWriteDate"].date(),
+ noopEntryBeforeStableTs["wall"].date());
+
+ // 'lsid3' does not get restored because it is not a retryable writes entry.
+ status = _storageInterface->findById(_opCtx.get(), nss, txnEntryAfterStableTs.getField("lsid"));
+ ASSERT_NOT_OK(status);
+}
+
+TEST_F(
+ RollbackImplTest,
+ RollbackRestoresTxnTableEntryToBeConsistentWithStableTimestampWithMissingPrevWriteOplogEntry) {
+ const auto collUuid = UUID::gen();
+ const auto nss = NamespaceString::kSessionTransactionsTableNamespace;
+ _initializeCollection(_opCtx.get(), collUuid, nss);
+ LogicalSessionFromClient fromClient{};
+ fromClient.setId(UUID::gen());
+ LogicalSessionId lsid = makeLogicalSessionId(fromClient, _opCtx.get());
+ LogicalSessionFromClient fromClient2{};
+ fromClient2.setId(UUID::gen());
+ LogicalSessionId lsid2 = makeLogicalSessionId(fromClient2, _opCtx.get());
+
+ auto commonOpTime = OpTime(Timestamp(2, 2), 1);
+ auto commonPoint = makeOpAndRecordId(OpTime(Timestamp(2, 2), 1));
+ _remoteOplog->setOperations({commonPoint});
+ _storageInterface->setStableTimestamp(nullptr, commonOpTime.getTimestamp());
+
+ // Create oplog entry after 'stableTimestamp'.
+ auto insertObj = BSON("_id" << 1);
+ auto firstOpAfterStableTs = makeInsertOplogEntry(3, insertObj, nss.ns(), collUuid);
+ BSONObjBuilder builder(firstOpAfterStableTs);
+ builder.append("lsid", lsid.toBSON());
+ builder.append("txnNumber", 2LL);
+ // 'prevOpTime' points to an opTime not found in the oplog. This can happen in practice
+ // due to the 'OplogCapMaintainerThread' truncating entries from before the 'stableTimestamp'.
+ builder.append("prevOpTime", OpTime(Timestamp(1, 0), 1).toBSON());
+ builder.append("stmtId", 2);
+ BSONObj firstOplogEntryAfterStableTs = builder.done();
+
+ // Create no-op entry after 'stableTimestamp'.
+ // 'prevOpTime' point sto an opTime not found in the oplog.
+ auto noopEntryAfterStableTs = makeMigratedNoop(OpTime(Timestamp(5, 5), 5),
+ firstOplogEntryAfterStableTs,
+ lsid2,
+ 3 /* txnNum */,
+ OpTime(Timestamp(2, 1), 1),
+ 2 /* stmtId */,
+ 5 /* wallClockMillis */,
+ true /*isRetryableWrite */);
+
+ ASSERT_OK(_insertOplogEntry(commonPoint.first));
+ ASSERT_OK(_insertOplogEntry(firstOplogEntryAfterStableTs));
+ ASSERT_OK(_insertOplogEntry(noopEntryAfterStableTs));
+
+ auto status = _storageInterface->findSingleton(_opCtx.get(), nss);
+ // The 'config.transactions' table is currently empty.
+ ASSERT_NOT_OK(status);
+
+ // Doing a rollback should upsert two entries into the 'config.transactions' table.
+ ASSERT_OK(_rollback->runRollback(_opCtx.get()));
+ auto swDoc = _storageInterface->findById(
+ _opCtx.get(), nss, firstOplogEntryAfterStableTs.getField("lsid"));
+ ASSERT_OK(swDoc);
+ auto sessionsEntryBson = swDoc.getValue();
+ // New sessions entry should match the session information retrieved from the retryable writes
+ // oplog entry from after the 'stableTimestamp'.
+ ASSERT_EQUALS(sessionsEntryBson["txnNum"].numberInt(),
+ firstOplogEntryAfterStableTs["txnNumber"].numberInt());
+ ASSERT_EQUALS(sessionsEntryBson["lastWriteOpTime"].timestamp(),
+ firstOplogEntryAfterStableTs["prevOpTime"].timestamp());
+ ASSERT_EQUALS(sessionsEntryBson["lastWriteDate"].date(),
+ firstOplogEntryAfterStableTs["wall"].date());
+
+ swDoc = _storageInterface->findById(_opCtx.get(), nss, noopEntryAfterStableTs.getField("lsid"));
+ ASSERT_OK(swDoc);
+ sessionsEntryBson = swDoc.getValue();
+ // New sessions entry should match the session information retrieved from the retryable writes
+ // no-op oplog entry from after the 'stableTimestamp'.
+ ASSERT_EQUALS(sessionsEntryBson["txnNum"].numberInt(),
+ noopEntryAfterStableTs["txnNumber"].numberInt());
+ ASSERT_EQUALS(sessionsEntryBson["lastWriteOpTime"].timestamp(),
+ noopEntryAfterStableTs["prevOpTime"].timestamp());
+ ASSERT_EQUALS(sessionsEntryBson["lastWriteDate"].date(), noopEntryAfterStableTs["wall"].date());
+}
+
+TEST_F(RollbackImplTest, RollbackDoesNotRestoreTxnsTableWhenNoRetryableWritesEntriesAfterStableTs) {
+ const auto collUuid = UUID::gen();
+ const auto nss = NamespaceString::kSessionTransactionsTableNamespace;
+ _initializeCollection(_opCtx.get(), collUuid, nss);
+ LogicalSessionFromClient fromClient{};
+ fromClient.setId(UUID::gen());
+ LogicalSessionId lsid = makeLogicalSessionId(fromClient, _opCtx.get());
+ LogicalSessionFromClient fromClient2{};
+ fromClient2.setId(UUID::gen());
+ LogicalSessionId lsid2 = makeLogicalSessionId(fromClient2, _opCtx.get());
+
+ auto commonOpTime = OpTime(Timestamp(4, 4), 4);
+ auto commonPoint = makeOpAndRecordId(commonOpTime);
+ _remoteOplog->setOperations({commonPoint});
+ _storageInterface->setStableTimestamp(nullptr, commonOpTime.getTimestamp());
+
+ auto insertObj1 = BSON("_id" << 1);
+ auto insertObj2 = BSON("_id" << 2);
+ const auto txnNumOne = 1LL;
+ // Create oplog entry before 'stableTimestamp'.
+ auto opBeforeStableTs = makeInsertOplogEntry(1, insertObj1, nss.ns(), collUuid);
+ BSONObjBuilder opBeforeStableTsBuilder(opBeforeStableTs);
+ opBeforeStableTsBuilder.append("lsid", lsid.toBSON());
+ opBeforeStableTsBuilder.append("txnNumber", txnNumOne);
+ opBeforeStableTsBuilder.append("prevOpTime", OpTime().toBSON());
+ opBeforeStableTsBuilder.append("stmtId", 1);
+ BSONObj oplogEntryBeforeStableTs = opBeforeStableTsBuilder.done();
+
+ const auto txnNumTwo = 2LL;
+ // Create no-op entry before 'stableTimestamp'.
+ auto noopEntryBeforeStableTs = makeMigratedNoop(OpTime(Timestamp(2, 2), 2),
+ oplogEntryBeforeStableTs,
+ lsid2,
+ txnNumTwo,
+ OpTime(),
+ 1 /* stmtId*/,
+ 2 /* wallClockMillis */,
+ true /* isRetryableWrite */);
+
+ // Create migrated no-op transactions entry before 'stableTimestamp'.
+ auto txnOpTime = OpTime(Timestamp(3, 3), 3);
+ auto txnEntryBeforeStableTs = makeMigratedNoop(txnOpTime,
+ BSONObj(),
+ lsid,
+ txnNumTwo,
+ OpTime(),
+ boost::none /* stmtId */,
+ 4 /* wallClockMillis */,
+ false /* isRetryableWrite */);
+
+ // Create regular non-retryable write oplog entry after 'stableTimestamp'.
+ auto insertEntry = makeInsertOplogEntry(7, BSON("_id" << 3), nss.ns(), collUuid);
+
+ // Create migrated no-op transactions entry after 'stableTimestamp'.
+ auto txnEntryAfterStableTs = makeMigratedNoop(txnOpTime,
+ BSONObj(),
+ lsid,
+ txnNumTwo,
+ txnOpTime,
+ boost::none /* stmtId */,
+ 10 /* wallClockMillis */,
+ false /* isRetryableWrite */);
+
+ ASSERT_OK(_insertOplogEntry(oplogEntryBeforeStableTs));
+ ASSERT_OK(_insertOplogEntry(noopEntryBeforeStableTs));
+ ASSERT_OK(_insertOplogEntry(txnEntryBeforeStableTs));
+ ASSERT_OK(_insertOplogEntry(commonPoint.first));
+ ASSERT_OK(_insertOplogEntry(insertEntry));
+ ASSERT_OK(_insertOplogEntry(txnEntryAfterStableTs));
+
+ auto status = _storageInterface->findSingleton(_opCtx.get(), nss);
+ // The 'config.transactions' table is currently empty.
+ ASSERT_NOT_OK(status);
+
+ // Doing a rollback should not restore any entries into the 'config.transactions' table.
+ ASSERT_OK(_rollback->runRollback(_opCtx.get()));
+ status = _storageInterface->findSingleton(_opCtx.get(), nss);
+ // No upserts were made to the 'config.transactions' table.
+ ASSERT_NOT_OK(status);
+}
+
/**
* Fixture to help test that rollback records the correct information in its RollbackObserverInfo
* struct.