diff options
8 files changed, 731 insertions, 0 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml b/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml index 7f569634ba0..b22e43d140f 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_update_v1_oplog.yml @@ -6,6 +6,7 @@ selector: exclude_files: # Expects oplog entries to be in $v:2 format. - jstests/replsets/v2_delta_oplog_entries.js + - jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js executor: config: diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 5782df5f14e..f1fb315f7d7 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -134,6 +134,10 @@ all: test_file: jstests/replsets/reconfig_removes_node_in_rollback.js - ticket: SERVER-55725 test_file: jstests/sharding/time_zone_info_mongos.js + - ticket: SERVER-55305 + test_file: jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js + - ticket: SERVER-55305 + test_file: jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js b/jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js new file mode 100644 index 00000000000..39ceaa77713 --- /dev/null +++ b/jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js @@ -0,0 +1,183 @@ +/** + * Tests that the rollback procedure will update the 'config.transactions' table to be consistent + * with the node data at the 'stableTimestamp', specifically in the case where multiple derived ops + * to the 'config.transactions' table were coalesced into a single operation during secondary oplog + * application. + * We also test that if a node crashes after oplog truncation during rollback, the update made to + * the 'config.transactions' table is persisted on startup. + * + * @tags: [requires_persistence] + */ + +(function() { +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/write_concern_util.js"); + +const oplogApplierBatchSize = 100; + +function runTest(crashAfterRollbackTruncation) { + const rst = new ReplSetTest({ + nodes: { + n0: {}, + // Set the 'syncdelay' to 1s to speed up checkpointing. Also explicitly set the batch + // size for oplog application to ensure the number of retryable write statements being + // made majority committed isn't a multiple of it. + n1: {syncdelay: 1, setParameter: {replBatchLimitOperations: oplogApplierBatchSize}}, + // Set the bgSyncOplogFetcherBatchSize to 1 oplog entry to guarantee replication + // progress with the stopReplProducerOnDocument failpoint. + n2: {setParameter: {bgSyncOplogFetcherBatchSize: 1}}, + n3: {setParameter: {bgSyncOplogFetcherBatchSize: 1}}, + n4: {setParameter: {bgSyncOplogFetcherBatchSize: 1}}, + }, + // Force secondaries to sync from the primary to guarantee replication progress with the + // stopReplProducerOnDocument failpoint. Also disable primary catchup because some + // replicated retryable write statements are intentionally not being made majority + // committed. + settings: {chainingAllowed: false, catchUpTimeoutMillis: 0}, + }); + rst.startSet(); + rst.initiate(); + + const primary = rst.getPrimary(); + const ns = "test.retryable_write_partial_rollback"; + assert.commandWorked( + primary.getCollection(ns).insert({_id: 0, counter: 0}, {writeConcern: {w: 5}})); + + const [secondary1, secondary2, secondary3, secondary4] = rst.getSecondaries(); + + // Disable replication on all of the secondaries to manually control the replication progress. + const stopReplProducerFailpoints = [secondary1, secondary2, secondary3, secondary4].map( + conn => configureFailPoint(conn, 'stopReplProducer')); + + // While replication is still entirely disabled, additionally disable replication partway into + // the retryable write on all but the first secondary. The idea is that while secondary1 will + // apply all of the oplog entries in a single batch, the other secondaries will only apply up to + // counterMajorityCommitted oplog entries. + const counterTotal = oplogApplierBatchSize; + const counterMajorityCommitted = counterTotal - 2; + const stopReplProducerOnDocumentFailpoints = [secondary2, secondary3, secondary4].map( + conn => configureFailPoint(conn, + 'stopReplProducerOnDocument', + {document: {"diff.u.counter": counterMajorityCommitted + 1}})); + + const lsid = ({id: UUID()}); + + assert.commandWorked(primary.getCollection(ns).runCommand("update", { + updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})), + lsid, + txnNumber: NumberLong(1), + })); + + const stmtMajorityCommitted = primary.getCollection("local.oplog.rs") + .findOne({ns, "o.diff.u.counter": counterMajorityCommitted}); + assert.neq(null, stmtMajorityCommitted); + + for (const fp of stopReplProducerFailpoints) { + fp.off(); + + // Wait for the secondary to have applied through the counterMajorityCommitted retryable + // write statement. We do this for each secondary individually, starting with secondary1, to + // guarantee that secondary1 will advance its stable_timestamp when learning of the other + // secondaries also having applied through counterMajorityCommitted. + assert.soon(() => { + const {optimes: {appliedOpTime, durableOpTime}} = + assert.commandWorked(fp.conn.adminCommand({replSetGetStatus: 1})); + + print(`${fp.conn.host}: ${tojsononeline({ + appliedOpTime, + durableOpTime, + stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts + })}`); + + return bsonWoCompare(appliedOpTime.ts, stmtMajorityCommitted.ts) >= 0 && + bsonWoCompare(durableOpTime.ts, stmtMajorityCommitted.ts) >= 0; + }); + } + + // Wait for secondary1 to have advanced its stable_timestamp. + assert.soon(() => { + const {lastStableRecoveryTimestamp} = + assert.commandWorked(secondary1.adminCommand({replSetGetStatus: 1})); + + print(`${secondary1.host}: ${tojsononeline({ + lastStableRecoveryTimestamp, + stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts + })}`); + + return bsonWoCompare(lastStableRecoveryTimestamp, stmtMajorityCommitted.ts) >= 0; + }); + + // Step up one of the other secondaries and do a write which becomes majority committed to force + // secondary1 to go into rollback. + rst.freeze(secondary1); + assert.commandWorked(secondary2.adminCommand({replSetStepUp: 1})); + rst.freeze(primary); + rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary2); + + let hangAfterTruncate; + if (crashAfterRollbackTruncation) { + hangAfterTruncate = configureFailPoint(secondary1, 'hangAfterOplogTruncationInRollback'); + } + + for (const fp of stopReplProducerOnDocumentFailpoints) { + fp.off(); + } + + // Wait for secondary2 to be a writable primary. + rst.getPrimary(); + + // Do a write which becomes majority committed and wait for secondary1 to complete its rollback. + assert.commandWorked( + secondary2.getCollection("test.dummy").insert({}, {writeConcern: {w: 'majority'}})); + + if (crashAfterRollbackTruncation) { + // Entering rollback will close connections so we expect some network errors when waiting + // on the failpoint. + assert.soonNoExcept(() => { + hangAfterTruncate.wait(); + return true; + }, `failed to wait for fail point ${hangAfterTruncate.failPointName}`); + + // Crash the node after it performs oplog truncation. + rst.stop(secondary1, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}); + node = rst.restart(secondary1, { + "noReplSet": false, + setParameter: 'failpoint.stopReplProducer=' + tojson({mode: 'alwaysOn'}) + }); + rst.waitForState(secondary1, ReplSetTest.State.SECONDARY); + secondary1.setSecondaryOk(); + // On startup, we expect to see the update persisted in the 'config.transactions' table. + let restoredDoc = + secondary1.getCollection('config.transactions').findOne({"_id.id": lsid.id}); + assert.neq(null, restoredDoc); + secondary1.adminCommand({configureFailPoint: "stopReplProducer", mode: "off"}); + } + + // Reconnect to secondary1 after it completes its rollback and step it up to be the new primary. + rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary2); + assert.commandWorked(secondary1.adminCommand({replSetFreeze: 0})); + rst.stepUp(secondary1, {awaitWritablePrimary: false}); + + const docBeforeRetry = secondary1.getCollection(ns).findOne({_id: 0}); + assert.eq(docBeforeRetry, {_id: 0, counter: counterMajorityCommitted}); + + assert.commandWorked(secondary1.getCollection(ns).runCommand("update", { + updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})), + lsid, + txnNumber: NumberLong(1), + writeConcern: {w: 5}, + })); + + const docAfterRetry = secondary1.getCollection(ns).findOne({_id: 0}); + assert.eq(docAfterRetry, {_id: 0, counter: counterTotal}); + + rst.stopSet(); +} + +// Test the general scenario where we perform the appropriate update to the 'config.transactions' +// table during rollback. +runTest(false); +// Extends the test to crash the secondary in the middle of rollback right after oplog truncation. +// We assert that the update made to the 'config.transactions' table persisted on startup. +runTest(true); +})();
\ No newline at end of file diff --git a/jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js b/jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js new file mode 100644 index 00000000000..e6ca0056c31 --- /dev/null +++ b/jstests/replsets/rollback_with_coalesced_txn_table_updates_from_vectored_inserts.js @@ -0,0 +1,115 @@ +/** + * Tests that the rollback procedure will update the 'config.transactions' table to be consistent + * with the node data at the 'stableTimestamp', specifically in the case where multiple derived ops + * to the 'config.transactions' table were coalesced into a single operation when performing + * vectored inserts on the primary. + */ + +(function() { +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/write_concern_util.js"); + +const rst = new ReplSetTest({ + nodes: { + // Set the syncdelay to 1s to speed up checkpointing. + n0: {syncdelay: 1}, + // Set the bgSyncOplogFetcherBatchSize to 1 oplog entry to guarantee replication progress + // with the stopReplProducerOnDocument failpoint. + n1: {setParameter: {bgSyncOplogFetcherBatchSize: 1}}, + n2: {setParameter: {bgSyncOplogFetcherBatchSize: 1}}, + }, + // Force secondaries to sync from the primary to guarantee replication progress with the + // stopReplProducerOnDocument failpoint. Also disable primary catchup because some replicated + // retryable write statements are intentionally not being made majority committed. + settings: {chainingAllowed: false, catchUpTimeoutMillis: 0}, +}); +rst.startSet(); +rst.initiate(); + +const primary = rst.getPrimary(); +const ns = "test.retryable_write_coalesced_txn_updates"; +assert.commandWorked(primary.getCollection(ns).insert({_id: -1}, {writeConcern: {w: 3}})); + +const [secondary1, secondary2] = rst.getSecondaries(); + +// Disable replication partway into the retryable write on all of the secondaries. The idea is that +// while the primary will apply all of the writes in a single storage transaction, the secondaries +// will only apply up to insertBatchMajorityCommitted oplog entries. +const insertBatchTotal = 20; +const insertBatchMajorityCommitted = insertBatchTotal - 2; +const stopReplProducerOnDocumentFailpoints = [secondary1, secondary2].map( + conn => configureFailPoint( + conn, 'stopReplProducerOnDocument', {document: {"_id": insertBatchMajorityCommitted + 1}})); + +const lsid = ({id: UUID()}); + +assert.commandWorked(primary.getCollection(ns).runCommand("insert", { + documents: Array.from({length: insertBatchTotal}, (_, i) => ({_id: i})), + lsid, + txnNumber: NumberLong(1), +})); + +const stmtMajorityCommitted = + primary.getCollection("local.oplog.rs").findOne({ns, "o._id": insertBatchMajorityCommitted}); +assert.neq(null, stmtMajorityCommitted); + +// Wait for the primary to have advanced its stable_timestamp. +assert.soon(() => { + const {lastStableRecoveryTimestamp} = + assert.commandWorked(primary.adminCommand({replSetGetStatus: 1})); + + const wtStatus = assert.commandWorked(primary.adminCommand({serverStatus: 1})).wiredTiger; + const latestMajority = + wtStatus["snapshot-window-settings"]["latest majority snapshot timestamp available"]; + + print(`${primary.host}: ${tojsononeline({ + lastStableRecoveryTimestamp, + stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts, + "latest majority snapshot timestamp available": latestMajority + })}`); + + // Make sure 'secondary1' has a 'lastApplied' optime equal to 'stmtMajorityCommitted.ts'. + // Otherwise, it can fail to win the election later. + const {optimes: {appliedOpTime}} = + assert.commandWorked(secondary1.adminCommand({replSetGetStatus: 1})); + print(`${secondary1.host}: ${tojsononeline({appliedOpTime})}`); + + return bsonWoCompare(lastStableRecoveryTimestamp, stmtMajorityCommitted.ts) >= 0 && + bsonWoCompare(appliedOpTime.ts, stmtMajorityCommitted.ts) >= 0; +}); + +// Step up one of the secondaries and do a write which becomes majority committed to force the +// current primary to go into rollback. +assert.commandWorked(secondary1.adminCommand({replSetStepUp: 1})); +rst.freeze(primary); +rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary1); + +for (const fp of stopReplProducerOnDocumentFailpoints) { + fp.off(); +} + +rst.getPrimary(); // Wait for secondary1 to be a writable primary. + +// Do a write which becomes majority committed and wait for the old primary to have completed its +// rollback. +assert.commandWorked(secondary1.getCollection("test.dummy").insert({}, {writeConcern: {w: 3}})); + +// Reconnect to the primary after it completes its rollback and step it up to be the primary again. +rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary1); +assert.commandWorked(primary.adminCommand({replSetFreeze: 0})); +rst.stepUp(primary); + +print(`${primary.host} session txn record: ${ + tojson(primary.getCollection("config.transactions").findOne({"_id.id": lsid.id}))}`); + +// Make sure we don't re-execute operations that have already been inserted by making sure we +// we don't get a 'DuplicateKeyError'. +assert.commandWorked(primary.getCollection(ns).runCommand("insert", { + documents: Array.from({length: insertBatchTotal}, (_, i) => ({_id: i})), + lsid, + txnNumber: NumberLong(1), + writeConcern: {w: 3}, +})); + +rst.stopSet(); +})();
\ No newline at end of file diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index c9d4aa3c381..2bc00bfefe2 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -60,6 +60,8 @@ namespace mongo { namespace repl { +MONGO_FAIL_POINT_DEFINE(hangAfterOplogTruncationInRollback); + namespace { const auto kRecoveryBatchLogLevel = logv2::LogSeverity::Debug(2); @@ -449,6 +451,8 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, // This may take an IS lock on the oplog collection. _truncateOplogIfNeededAndThenClearOplogTruncateAfterPoint(opCtx, &stableTimestamp); + hangAfterOplogTruncationInRollback.pauseWhileSet(); + auto topOfOplogSW = _getTopOfOplog(opCtx); if (topOfOplogSW.getStatus() == ErrorCodes::CollectionIsEmpty || topOfOplogSW.getStatus() == ErrorCodes::NamespaceNotFound) { diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp index 1ce1bb85b8e..14be427c59c 100644 --- a/src/mongo/db/repl/rollback_impl.cpp +++ b/src/mongo/db/repl/rollback_impl.cpp @@ -43,7 +43,10 @@ #include "mongo/db/commands.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/replication_state_transition_lock_guard.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/kill_sessions_local.h" #include "mongo/db/logical_time_validator.h" @@ -457,6 +460,80 @@ StatusWith<std::set<NamespaceString>> RollbackImpl::_namespacesForOp(const Oplog return namespaces; } +void RollbackImpl::_restoreTxnsTableEntryFromRetryableWrites(OperationContext* opCtx, + Timestamp stableTimestamp) { + auto client = std::make_unique<DBDirectClient>(opCtx); + // Query for retryable writes oplog entries with a non-null 'prevWriteOpTime' value + // less than or equal to the 'stableTimestamp'. This query intends to include no-op + // retryable writes oplog entries that have been applied through a migration process. + const auto filter = BSON("op" << BSON("$in" << BSON_ARRAY("i" + << "u" + << "d"))); + // We use the 'fromMigrate' field to differentiate migrated retryable writes entries from + // transactions entries. + const auto filterFromMigration = BSON("op" + << "n" + << "fromMigrate" << true); + auto cursor = client->query( + NamespaceString::kRsOplogNamespace, + QUERY("ts" << BSON("$gt" << stableTimestamp) << "txnNumber" << BSON("$exists" << true) + << "stmtId" << BSON("$exists" << true) << "prevOpTime.ts" + << BSON("$gte" << Timestamp(1, 0) << "$lte" << stableTimestamp) << "$or" + << BSON_ARRAY(filter << filterFromMigration))); + while (cursor->more()) { + auto doc = cursor->next(); + auto swEntry = OplogEntry::parse(doc); + fassert(5530502, swEntry.isOK()); + auto entry = swEntry.getValue(); + auto prevWriteOpTime = *entry.getPrevWriteOpTimeInTransaction(); + OperationSessionInfo opSessionInfo = entry.getOperationSessionInfo(); + const auto sessionId = *opSessionInfo.getSessionId(); + const auto txnNumber = *opSessionInfo.getTxnNumber(); + const auto wallClockTime = entry.getWallClockTime(); + + invariant(!prevWriteOpTime.isNull() && prevWriteOpTime.getTimestamp() <= stableTimestamp); + // This is a retryable writes oplog entry with a non-null 'prevWriteOpTime' value that + // is less than or equal to the 'stableTimestamp'. + LOGV2(5530501, + "Restoring sessions entry to be consistent with 'stableTimestamp'", + "stableTimestamp"_attr = stableTimestamp, + "sessionId"_attr = sessionId, + "txnNumber"_attr = txnNumber, + "lastWriteOpTime"_attr = prevWriteOpTime); + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(sessionId); + sessionTxnRecord.setTxnNum(txnNumber); + try { + TransactionHistoryIterator iter(prevWriteOpTime); + auto nextOplogEntry = iter.next(opCtx); + sessionTxnRecord.setLastWriteOpTime(nextOplogEntry.getOpTime()); + sessionTxnRecord.setLastWriteDate(nextOplogEntry.getWallClockTime()); + } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>&) { + // It's possible that the next entry in the oplog chain has been truncated due to + // oplog cap maintenance. + sessionTxnRecord.setLastWriteOpTime(prevWriteOpTime); + sessionTxnRecord.setLastWriteDate(wallClockTime); + } + const auto nss = NamespaceString::kSessionTransactionsTableNamespace; + writeConflictRetry(opCtx, "updateSessionTransactionsTableInRollback", nss.ns(), [&] { + AutoGetCollection collection(opCtx, nss, MODE_IX); + auto filter = BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON()); + UnreplicatedWritesBlock uwb(opCtx); + // Perform an untimestamped write so that it will not be rolled back on recovering + // to the 'stableTimestamp' if we were to crash. This is safe because this update is + // meant to be consistent with the 'stableTimestamp' and not the common point. + Helpers::upsert( + opCtx, nss.ns(), filter, sessionTxnRecord.toBSON(), /*fromMigrate=*/false); + }); + } + // Take a stable checkpoint so that writes to the 'config.transactions' table are + // persisted to disk before truncating the oplog. If we were to take an unstable checkpoint, we + // would have to update replication metadata like 'minValid.appliedThrough' to be consistent + // with the oplog. + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(opCtx, + /*stableCheckpoint=*/true); +} + void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns( OperationContext* opCtx, RollBackLocalOperations::RollbackCommonPoint commonPoint) noexcept { // Stop and wait for all background index builds to complete before starting the rollback @@ -514,6 +591,12 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns( "update"_attr = _observerInfo.rollbackCommandCounts[kUpdateCmdName], "delete"_attr = _observerInfo.rollbackCommandCounts[kDeleteCmdName]); + // Retryable writes create derived updates to the transactions table which can be coalesced into + // one operation, so certain session operations history may be lost after restoring to the + // 'stableTimestamp'. We must scan the oplog and restore the transactions table entries to + // detail the last executed writes. + _restoreTxnsTableEntryFromRetryableWrites(opCtx, stableTimestamp); + // During replication recovery, we truncate all oplog entries with timestamps greater than the // oplog truncate after point. If we entered rollback, we are guaranteed to have at least one // oplog entry after the common point. diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h index 4d95e5e7e4e..9fe3206f4f7 100644 --- a/src/mongo/db/repl/rollback_impl.h +++ b/src/mongo/db/repl/rollback_impl.h @@ -367,6 +367,17 @@ private: void _stopAndWaitForIndexBuilds(OperationContext* opCtx); /** + * Performs a forward scan of the oplog starting at 'stableTimestamp', exclusive. For every + * retryable write oplog entry that has a 'prevOpTime' <= 'stableTimestamp', update the + * transactions table with the appropriate information to detail the last executed operation. We + * do this because derived updates to the transactions table can be coalesced into one + * operation, and so certain session entry updates may not exist when restoring to the + * 'stableTimestamp'. + */ + void _restoreTxnsTableEntryFromRetryableWrites(OperationContext* opCtx, + Timestamp stableTimestamp); + + /** * Recovers to the stable timestamp while holding the global exclusive lock. * Returns the stable timestamp that the storage engine recovered to. */ diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp index 767c0e81703..f03faaa390b 100644 --- a/src/mongo/db/repl/rollback_impl_test.cpp +++ b/src/mongo/db/repl/rollback_impl_test.cpp @@ -410,6 +410,60 @@ OplogInterfaceMock::Operation makeOpAndRecordId(int count) { } /** + * Helper to create a noop entry that represents a migrated retryable write or transaction oplog + * entry. + */ +BSONObj makeMigratedNoop(OpTime opTime, + boost::optional<BSONObj> o2, + LogicalSessionId lsid, + int txnNum, + OpTime prevOpTime, + boost::optional<int> stmtId, + int wallClockMillis, + bool isRetryableWrite) { + repl::MutableOplogEntry op; + op.setOpType(repl::OpTypeEnum::kNoop); + op.setNss(nss); + op.setObject(BSONObj()); + op.setOpTime(opTime); + if (isRetryableWrite) { + op.setFromMigrate(true); + } + op.setObject2(o2); + if (stmtId) { + op.setStatementIds({*stmtId}); + } + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(lsid); + sessionInfo.setTxnNumber(txnNum); + op.setOperationSessionInfo(sessionInfo); + op.setPrevWriteOpTimeInTransaction(prevOpTime); + op.setWallClockTime(Date_t::fromMillisSinceEpoch(wallClockMillis)); + return op.toBSON(); +} + +/** + * Helper to create a transaction command oplog entry. + */ +BSONObj makeTransactionOplogEntry(OpTime opTime, + LogicalSessionId lsid, + int txnNum, + OpTime prevOpTime) { + repl::MutableOplogEntry op; + op.setOpType(repl::OpTypeEnum::kCommand); + op.setNss(nss); + op.setObject(BSON("applyOps" << BSONArray())); + op.setOpTime(opTime); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(lsid); + sessionInfo.setTxnNumber(txnNum); + op.setOperationSessionInfo(sessionInfo); + op.setPrevWriteOpTimeInTransaction(prevOpTime); + op.setWallClockTime(Date_t()); + return op.toBSON(); +} + +/** * Asserts that the documents in the oplog have the given timestamps. */ void _assertDocsInOplog(OperationContext* opCtx, std::vector<int> timestamps) { @@ -1524,6 +1578,282 @@ TEST_F(RollbackImplTest, RollbackFixesCountForUnpreparedTransactionApplyOpsChain ASSERT_EQ(_storageInterface->getFinalCollectionCount(collId), 1); } +TEST_F(RollbackImplTest, RollbackRestoresTxnTableEntryToBeConsistentWithStableTimestamp) { + const auto collUuid = UUID::gen(); + const auto nss = NamespaceString::kSessionTransactionsTableNamespace; + _initializeCollection(_opCtx.get(), collUuid, nss); + LogicalSessionFromClient fromClient{}; + fromClient.setId(UUID::gen()); + LogicalSessionId lsid = makeLogicalSessionId(fromClient, _opCtx.get()); + LogicalSessionFromClient fromClient2{}; + fromClient2.setId(UUID::gen()); + LogicalSessionId lsid2 = makeLogicalSessionId(fromClient2, _opCtx.get()); + LogicalSessionFromClient fromClient3{}; + fromClient3.setId(UUID::gen()); + LogicalSessionId lsid3 = makeLogicalSessionId(fromClient3, _opCtx.get()); + + auto commonOpTime = OpTime(Timestamp(4, 4), 4); + auto commonPoint = makeOpAndRecordId(commonOpTime); + _remoteOplog->setOperations({commonPoint}); + _storageInterface->setStableTimestamp(nullptr, commonOpTime.getTimestamp()); + + auto insertObj1 = BSON("_id" << 1); + auto insertObj2 = BSON("_id" << 2); + const auto txnNumOne = 1LL; + // Create retryable write oplog entry before 'stableTimestamp'. + auto opBeforeStableTs = makeInsertOplogEntry(1, insertObj1, nss.ns(), collUuid); + const auto prevOpTime = OpTime(opBeforeStableTs["ts"].timestamp(), 1); + BSONObjBuilder opBeforeStableTsBuilder(opBeforeStableTs); + opBeforeStableTsBuilder.append("lsid", lsid.toBSON()); + opBeforeStableTsBuilder.append("txnNumber", txnNumOne); + opBeforeStableTsBuilder.append("prevOpTime", OpTime().toBSON()); + opBeforeStableTsBuilder.append("stmtId", 1); + BSONObj oplogEntryBeforeStableTs = opBeforeStableTsBuilder.done(); + + const auto txnNumTwo = 2LL; + // Create no-op retryable write entry before 'stableTimestamp'. + auto noopPrevOpTime = OpTime(Timestamp(2, 2), 2); + auto noopEntryBeforeStableTs = makeMigratedNoop(noopPrevOpTime, + oplogEntryBeforeStableTs, + lsid2, + txnNumTwo, + OpTime(), + 1 /* stmtId */, + 2 /* wallClockMillis */, + true /* isRetryableWrite */); + + // Create transactions entry after 'stableTimestamp'. Transactions entries are of 'command' op + // type. + auto txnOpTime = OpTime(Timestamp(3, 3), 3); + auto txnEntryBeforeStableTs = makeTransactionOplogEntry(txnOpTime, lsid3, 3, OpTime()); + + // Create retryable write oplog entry after 'stableTimestamp'. + auto firstOpAfterStableTs = makeInsertOplogEntry(5, insertObj2, nss.ns(), collUuid); + BSONObjBuilder opAfterStableTsBuilder(firstOpAfterStableTs); + opAfterStableTsBuilder.append("lsid", lsid.toBSON()); + opAfterStableTsBuilder.append("txnNumber", txnNumOne); + // 'prevOpTime' points to 'oplogEntryBeforeStableTs'. + opAfterStableTsBuilder.append("prevOpTime", prevOpTime.toBSON()); + opAfterStableTsBuilder.append("stmtId", 2); + BSONObj firstOplogEntryAfterStableTs = opAfterStableTsBuilder.done(); + + // Create no-op retryable write entry after 'stableTimestamp'. + auto noopEntryAfterStableTs = makeMigratedNoop(OpTime(Timestamp(6, 6), 6), + firstOplogEntryAfterStableTs, + lsid2, + txnNumTwo, + noopPrevOpTime, + 2 /* stmtId */, + 5 /* wallClockMillis */, + true /* isRetryableWrite */); + + // Create transactions entry after 'stableTimestamp'. Transactions entries are of 'command' op + // type. + auto txnEntryAfterStableTs = + makeTransactionOplogEntry(OpTime(Timestamp(7, 7), 7), lsid3, 3, txnOpTime); + + ASSERT_OK(_insertOplogEntry(oplogEntryBeforeStableTs)); + ASSERT_OK(_insertOplogEntry(noopEntryBeforeStableTs)); + ASSERT_OK(_insertOplogEntry(txnEntryBeforeStableTs)); + ASSERT_OK(_insertOplogEntry(commonPoint.first)); + ASSERT_OK(_insertOplogEntry(firstOplogEntryAfterStableTs)); + ASSERT_OK(_insertOplogEntry(noopEntryAfterStableTs)); + ASSERT_OK(_insertOplogEntry(txnEntryAfterStableTs)); + + auto status = _storageInterface->findSingleton(_opCtx.get(), nss); + // The 'config.transactions' table is currently empty. + ASSERT_NOT_OK(status); + + // Doing a rollback should upsert two entries into the 'config.transactions' table. + ASSERT_OK(_rollback->runRollback(_opCtx.get())); + auto swDoc = _storageInterface->findById( + _opCtx.get(), nss, firstOplogEntryAfterStableTs.getField("lsid")); + ASSERT_OK(swDoc); + auto sessionsEntryBson = swDoc.getValue(); + // New sessions entry should match the session information retrieved from the retryable writes + // oplog entry from before the 'stableTimestamp'. + ASSERT_EQUALS(sessionsEntryBson["txnNum"].numberInt(), + oplogEntryBeforeStableTs["txnNumber"].numberInt()); + ASSERT_EQUALS(sessionsEntryBson["lastWriteOpTime"].timestamp(), + oplogEntryBeforeStableTs["prevOpTime"].timestamp()); + ASSERT_EQUALS(sessionsEntryBson["lastWriteDate"].date(), + oplogEntryBeforeStableTs["wall"].date()); + + swDoc = + _storageInterface->findById(_opCtx.get(), nss, noopEntryBeforeStableTs.getField("lsid")); + ASSERT_OK(swDoc); + sessionsEntryBson = swDoc.getValue(); + // New sessions entry should match the session information retrieved from the noop retryable + // writes oplog entry from before the 'stableTimestamp'. + ASSERT_EQUALS(sessionsEntryBson["txnNum"].numberInt(), + noopEntryBeforeStableTs["txnNumber"].numberInt()); + ASSERT_EQUALS(sessionsEntryBson["lastWriteOpTime"].timestamp(), + noopEntryBeforeStableTs["prevOpTime"].timestamp()); + ASSERT_EQUALS(sessionsEntryBson["lastWriteDate"].date(), + noopEntryBeforeStableTs["wall"].date()); + + // 'lsid3' does not get restored because it is not a retryable writes entry. + status = _storageInterface->findById(_opCtx.get(), nss, txnEntryAfterStableTs.getField("lsid")); + ASSERT_NOT_OK(status); +} + +TEST_F( + RollbackImplTest, + RollbackRestoresTxnTableEntryToBeConsistentWithStableTimestampWithMissingPrevWriteOplogEntry) { + const auto collUuid = UUID::gen(); + const auto nss = NamespaceString::kSessionTransactionsTableNamespace; + _initializeCollection(_opCtx.get(), collUuid, nss); + LogicalSessionFromClient fromClient{}; + fromClient.setId(UUID::gen()); + LogicalSessionId lsid = makeLogicalSessionId(fromClient, _opCtx.get()); + LogicalSessionFromClient fromClient2{}; + fromClient2.setId(UUID::gen()); + LogicalSessionId lsid2 = makeLogicalSessionId(fromClient2, _opCtx.get()); + + auto commonOpTime = OpTime(Timestamp(2, 2), 1); + auto commonPoint = makeOpAndRecordId(OpTime(Timestamp(2, 2), 1)); + _remoteOplog->setOperations({commonPoint}); + _storageInterface->setStableTimestamp(nullptr, commonOpTime.getTimestamp()); + + // Create oplog entry after 'stableTimestamp'. + auto insertObj = BSON("_id" << 1); + auto firstOpAfterStableTs = makeInsertOplogEntry(3, insertObj, nss.ns(), collUuid); + BSONObjBuilder builder(firstOpAfterStableTs); + builder.append("lsid", lsid.toBSON()); + builder.append("txnNumber", 2LL); + // 'prevOpTime' points to an opTime not found in the oplog. This can happen in practice + // due to the 'OplogCapMaintainerThread' truncating entries from before the 'stableTimestamp'. + builder.append("prevOpTime", OpTime(Timestamp(1, 0), 1).toBSON()); + builder.append("stmtId", 2); + BSONObj firstOplogEntryAfterStableTs = builder.done(); + + // Create no-op entry after 'stableTimestamp'. + // 'prevOpTime' point sto an opTime not found in the oplog. + auto noopEntryAfterStableTs = makeMigratedNoop(OpTime(Timestamp(5, 5), 5), + firstOplogEntryAfterStableTs, + lsid2, + 3 /* txnNum */, + OpTime(Timestamp(2, 1), 1), + 2 /* stmtId */, + 5 /* wallClockMillis */, + true /*isRetryableWrite */); + + ASSERT_OK(_insertOplogEntry(commonPoint.first)); + ASSERT_OK(_insertOplogEntry(firstOplogEntryAfterStableTs)); + ASSERT_OK(_insertOplogEntry(noopEntryAfterStableTs)); + + auto status = _storageInterface->findSingleton(_opCtx.get(), nss); + // The 'config.transactions' table is currently empty. + ASSERT_NOT_OK(status); + + // Doing a rollback should upsert two entries into the 'config.transactions' table. + ASSERT_OK(_rollback->runRollback(_opCtx.get())); + auto swDoc = _storageInterface->findById( + _opCtx.get(), nss, firstOplogEntryAfterStableTs.getField("lsid")); + ASSERT_OK(swDoc); + auto sessionsEntryBson = swDoc.getValue(); + // New sessions entry should match the session information retrieved from the retryable writes + // oplog entry from after the 'stableTimestamp'. + ASSERT_EQUALS(sessionsEntryBson["txnNum"].numberInt(), + firstOplogEntryAfterStableTs["txnNumber"].numberInt()); + ASSERT_EQUALS(sessionsEntryBson["lastWriteOpTime"].timestamp(), + firstOplogEntryAfterStableTs["prevOpTime"].timestamp()); + ASSERT_EQUALS(sessionsEntryBson["lastWriteDate"].date(), + firstOplogEntryAfterStableTs["wall"].date()); + + swDoc = _storageInterface->findById(_opCtx.get(), nss, noopEntryAfterStableTs.getField("lsid")); + ASSERT_OK(swDoc); + sessionsEntryBson = swDoc.getValue(); + // New sessions entry should match the session information retrieved from the retryable writes + // no-op oplog entry from after the 'stableTimestamp'. + ASSERT_EQUALS(sessionsEntryBson["txnNum"].numberInt(), + noopEntryAfterStableTs["txnNumber"].numberInt()); + ASSERT_EQUALS(sessionsEntryBson["lastWriteOpTime"].timestamp(), + noopEntryAfterStableTs["prevOpTime"].timestamp()); + ASSERT_EQUALS(sessionsEntryBson["lastWriteDate"].date(), noopEntryAfterStableTs["wall"].date()); +} + +TEST_F(RollbackImplTest, RollbackDoesNotRestoreTxnsTableWhenNoRetryableWritesEntriesAfterStableTs) { + const auto collUuid = UUID::gen(); + const auto nss = NamespaceString::kSessionTransactionsTableNamespace; + _initializeCollection(_opCtx.get(), collUuid, nss); + LogicalSessionFromClient fromClient{}; + fromClient.setId(UUID::gen()); + LogicalSessionId lsid = makeLogicalSessionId(fromClient, _opCtx.get()); + LogicalSessionFromClient fromClient2{}; + fromClient2.setId(UUID::gen()); + LogicalSessionId lsid2 = makeLogicalSessionId(fromClient2, _opCtx.get()); + + auto commonOpTime = OpTime(Timestamp(4, 4), 4); + auto commonPoint = makeOpAndRecordId(commonOpTime); + _remoteOplog->setOperations({commonPoint}); + _storageInterface->setStableTimestamp(nullptr, commonOpTime.getTimestamp()); + + auto insertObj1 = BSON("_id" << 1); + auto insertObj2 = BSON("_id" << 2); + const auto txnNumOne = 1LL; + // Create oplog entry before 'stableTimestamp'. + auto opBeforeStableTs = makeInsertOplogEntry(1, insertObj1, nss.ns(), collUuid); + BSONObjBuilder opBeforeStableTsBuilder(opBeforeStableTs); + opBeforeStableTsBuilder.append("lsid", lsid.toBSON()); + opBeforeStableTsBuilder.append("txnNumber", txnNumOne); + opBeforeStableTsBuilder.append("prevOpTime", OpTime().toBSON()); + opBeforeStableTsBuilder.append("stmtId", 1); + BSONObj oplogEntryBeforeStableTs = opBeforeStableTsBuilder.done(); + + const auto txnNumTwo = 2LL; + // Create no-op entry before 'stableTimestamp'. + auto noopEntryBeforeStableTs = makeMigratedNoop(OpTime(Timestamp(2, 2), 2), + oplogEntryBeforeStableTs, + lsid2, + txnNumTwo, + OpTime(), + 1 /* stmtId*/, + 2 /* wallClockMillis */, + true /* isRetryableWrite */); + + // Create migrated no-op transactions entry before 'stableTimestamp'. + auto txnOpTime = OpTime(Timestamp(3, 3), 3); + auto txnEntryBeforeStableTs = makeMigratedNoop(txnOpTime, + BSONObj(), + lsid, + txnNumTwo, + OpTime(), + boost::none /* stmtId */, + 4 /* wallClockMillis */, + false /* isRetryableWrite */); + + // Create regular non-retryable write oplog entry after 'stableTimestamp'. + auto insertEntry = makeInsertOplogEntry(7, BSON("_id" << 3), nss.ns(), collUuid); + + // Create migrated no-op transactions entry after 'stableTimestamp'. + auto txnEntryAfterStableTs = makeMigratedNoop(txnOpTime, + BSONObj(), + lsid, + txnNumTwo, + txnOpTime, + boost::none /* stmtId */, + 10 /* wallClockMillis */, + false /* isRetryableWrite */); + + ASSERT_OK(_insertOplogEntry(oplogEntryBeforeStableTs)); + ASSERT_OK(_insertOplogEntry(noopEntryBeforeStableTs)); + ASSERT_OK(_insertOplogEntry(txnEntryBeforeStableTs)); + ASSERT_OK(_insertOplogEntry(commonPoint.first)); + ASSERT_OK(_insertOplogEntry(insertEntry)); + ASSERT_OK(_insertOplogEntry(txnEntryAfterStableTs)); + + auto status = _storageInterface->findSingleton(_opCtx.get(), nss); + // The 'config.transactions' table is currently empty. + ASSERT_NOT_OK(status); + + // Doing a rollback should not restore any entries into the 'config.transactions' table. + ASSERT_OK(_rollback->runRollback(_opCtx.get())); + status = _storageInterface->findSingleton(_opCtx.get(), nss); + // No upserts were made to the 'config.transactions' table. + ASSERT_NOT_OK(status); +} + /** * Fixture to help test that rollback records the correct information in its RollbackObserverInfo * struct. |