diff options
-rw-r--r-- | jstests/replsets/startup_recovery_for_restore.js | 160 | ||||
-rw-r--r-- | jstests/replsets/startup_recovery_for_restore_needs_rollback.js | 135 | ||||
-rw-r--r-- | jstests/replsets/startup_recovery_for_restore_restarts.js | 180 | ||||
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.h | 17 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 6 |
9 files changed, 599 insertions, 19 deletions
diff --git a/jstests/replsets/startup_recovery_for_restore.js b/jstests/replsets/startup_recovery_for_restore.js new file mode 100644 index 00000000000..bd5b5a98db7 --- /dev/null +++ b/jstests/replsets/startup_recovery_for_restore.js @@ -0,0 +1,160 @@ +/* + * Tests that we can recover from a node with a lagged stable timestamp using the special + * "for restore" mode, but not read from older points-in-time on the recovered node. + * + * This test only makes sense for storage engines that support recover to stable timestamp. + * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication, + * requires_majority_read_concern, uses_transactions, uses_prepare_transaction, + * # We don't expect to do this while upgrading. + * multiversion_incompatible] + */ + +(function() { +"use strict"; +load("jstests/libs/fail_point_util.js"); + +const dbName = TestData.testName; + +const logLevel = tojson({storage: {recovery: 2}}); + +const rst = new ReplSetTest({ + nodes: [{}, {}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], + settings: {chainingAllowed: false} +}); + +const startParams = { + logComponentVerbosity: logLevel, + replBatchLimitOperations: 100 +}; +const nodes = rst.startSet({setParameter: startParams}); +let restoreNode = nodes[1]; +rst.initiateWithHighElectionTimeout(); +const primary = rst.getPrimary(); +const db = primary.getDB(dbName); +const collName = "testcoll"; +const sentinelCollName = "sentinelcoll"; +const coll = db[collName]; +const paddingStr = "XXXXXXXXX"; + +// Pre-load some documents. +const nPreDocs = 2; +coll.insert([{_id: "pre1"}, {_id: "pre2"}]); +rst.awaitReplication(); + +const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +// Keep the stable timestamp from moving on the node we're going to restart in restore mode. +assert.commandWorked(restoreNode.adminCommand({ + configureFailPoint: 'holdStableTimestampAtSpecificTimestamp', + mode: 'alwaysOn', + data: {"timestamp": holdOpTime} +})); + +// Insert a bunch of documents. +let bulk = coll.initializeUnorderedBulkOp(); +const nDocs = 1000; +jsTestLog("Inserting " + nDocs + " documents with snapshotting disabled on one node."); +for (let id = 1; id <= nDocs; id++) { + bulk.insert({_id: id, paddingStr: paddingStr}); +} +bulk.execute(); +rst.awaitReplication(); + +jsTestLog("Stopping replication on secondaries to hold back majority commit point."); +let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer'); +let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer'); + +jsTestLog("Writing first sentinel document."); +const sentinel1Timestamp = + assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s1"}]})) + .operationTime; + +const nExtraDocs = 50; +jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back."); +bulk = coll.initializeUnorderedBulkOp(); +for (let id = 1; id <= nExtraDocs; id++) { + bulk.insert({_id: (id + nDocs), paddingStr: paddingStr}); +} +bulk.execute(); +const lastId = nDocs + nExtraDocs; + +const penultimateOpTime = + assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +const sentinel2Timestamp = + assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s2"}]})) + .operationTime; + +rst.awaitReplication(undefined, undefined, [restoreNode]); + +jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag"); +restoreNode = rst.restart(restoreNode, { + noReplSet: true, + setParameter: Object.merge(startParams, { + startupRecoveryForRestore: true, + recoverFromOplogAsStandalone: true, + takeUnstableCheckpointOnShutdown: true + }) +}); +// Make sure we can read something after standalone recovery. +assert.eq(2, restoreNode.getDB(dbName)[sentinelCollName].find({}).itcount()); + +jsTestLog("Restarting restore node again, in repl set mode"); +restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams}); + +rst.awaitSecondaryNodes(undefined, [restoreNode]); +jsTestLog("Finished restarting restore node"); + +const restoreDb = restoreNode.getDB(dbName); + +jsTestLog("Checking restore node untimestamped read."); +// Basic test: should see all docs with untimestamped read. +assert.eq(nPreDocs + nDocs + nExtraDocs, coll.find().itcount()); +assert.eq(nPreDocs + nDocs + nExtraDocs, restoreDb[collName].find().itcount()); + +// For the remaining checks we step up the restored node so we can do atClusterTime reads on it. +// They are necessarily speculative because we are preventing majority optimes from advancing. + +jsTestLog("Stepping up restore node"); +rst.stepUp(restoreNode, {awaitReplicationBeforeStepUp: false}); + +// Should also be able to read at the final sentinel optime on restore node. +const restoreNodeSession = restoreNode.startSession({causalConsistency: false}); +restoreNodeSession.startTransaction( + {readConcern: {level: "snapshot", atClusterTime: sentinel2Timestamp}}); +const restoreNodeSessionDb = restoreNodeSession.getDatabase(dbName); +jsTestLog("Checking top-of-oplog read works on restored node."); + +let res = assert.commandWorked( + restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}})); +assert.eq(1, res.cursor.firstBatch.length); +assert.docEq({_id: lastId, paddingStr: paddingStr}, res.cursor.firstBatch[0]); + +// Must abort because majority is not advancing. +restoreNodeSession.abortTransaction(); + +// Should NOT able to read at the first sentinel optime on the restore node. +restoreNodeSession.startTransaction( + {readConcern: {level: "snapshot", atClusterTime: sentinel1Timestamp}}); +jsTestLog( + "Checking restore node majority optime read, which should fail, because the restore node does not have that history."); +res = assert.commandFailedWithCode( + restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": {"$gte": nDocs}}}), + ErrorCodes.SnapshotTooOld); +restoreNodeSession.abortTransaction(); + +// Should NOT able to read at the penultimate optime on the restore node either. +jsTestLog( + "Checking restore node top-of-oplog minus 1 read, which should fail, because the restore node does not have that history."); +restoreNodeSession.startTransaction( + {readConcern: {level: "snapshot", atClusterTime: penultimateOpTime}}); +res = assert.commandFailedWithCode( + restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}}), + ErrorCodes.SnapshotTooOld); +restoreNodeSession.abortTransaction(); + +// Allow set to become current and shut down with ordinary dbHash verification. +stopReplProducer2.off(); +stopReplProducer3.off(); +rst.stopSet(); +})(); diff --git a/jstests/replsets/startup_recovery_for_restore_needs_rollback.js b/jstests/replsets/startup_recovery_for_restore_needs_rollback.js new file mode 100644 index 00000000000..88ae489d625 --- /dev/null +++ b/jstests/replsets/startup_recovery_for_restore_needs_rollback.js @@ -0,0 +1,135 @@ +/* + * Tests that if we recover from a node with a lagged stable timestamp using the special + * "for restore" mode, and there was a rollback within the recovered oplog, that we crash rather + * than attempt to use the node. + * + * This test only makes sense for storage engines that support recover to stable timestamp. + * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication, + * requires_majority_read_concern, uses_transactions, uses_prepare_transaction, + * # We don't expect to do this while upgrading. + * multiversion_incompatible] + */ + +(function() { +"use strict"; +load("jstests/libs/fail_point_util.js"); + +const dbName = TestData.testName; +const logLevel = tojson({storage: {recovery: 2}}); + +// The restore node is made non-voting so the majority is 2. +// Disable primary catch up since we want to force a rollback. +const rst = new ReplSetTest({ + nodes: [{}, {rsConfig: {votes: 0, priority: 0}}, {}, {}], + settings: {catchUpTimeoutMillis: 0, chainingAllowed: false} +}); + +const startParams = { + logComponentVerbosity: logLevel, + replBatchLimitOperations: 100 +}; +const nodes = rst.startSet({setParameter: startParams}); +let restoreNode = nodes[1]; +rst.initiateWithHighElectionTimeout(); +const primary = rst.getPrimary(); +const db = primary.getDB(dbName); +const collName = "testcoll"; +const sentinelCollName = "sentinelcoll"; +const coll = db[collName]; +const sentinelColl = db[sentinelCollName]; +const paddingStr = "XXXXXXXXX"; + +// Pre-load some documents. +coll.insert([{_id: "pre1"}, {_id: "pre2"}]); +rst.awaitReplication(); + +const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +// Keep the stable timestamp from moving on the node we're going to restart in restore mode. +assert.commandWorked(restoreNode.adminCommand({ + configureFailPoint: 'holdStableTimestampAtSpecificTimestamp', + mode: 'alwaysOn', + data: {"timestamp": holdOpTime} +})); + +// Insert a bunch of documents. +let bulk = coll.initializeUnorderedBulkOp(); +const nDocs = 1000; +jsTestLog("Inserting " + nDocs + " documents with snapshotting disabled on one node."); +for (let id = 1; id <= nDocs; id++) { + bulk.insert({_id: id, paddingStr: paddingStr}); +} +bulk.execute(); +rst.awaitReplication(); + +jsTestLog("Stopping replication on secondaries to hold back majority commit point."); +let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer'); +let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer'); + +const nExtraDocs = 50; +jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back."); +bulk = coll.initializeUnorderedBulkOp(); +const lastId = nDocs + nExtraDocs; +for (let id = 1; id <= nExtraDocs; id++) { + bulk.insert({_id: (id + nDocs), paddingStr: paddingStr}); +} +bulk.execute(); +rst.awaitReplication(undefined, undefined, [restoreNode]); + +// Stop some nodes so we can force a rollback +rst.stop(primary); +rst.stop(restoreNode, undefined, undefined, {forRestart: true}); + +// Restart replication and step up a new primary. +stopReplProducer2.off(); +stopReplProducer3.off(); + +const newPrimary = nodes[2]; +// Must send stepUp command without using ReplSetTest helper, as ReplSetTest helper expects all +// nodes to be alive. +assert.commandWorked(newPrimary.adminCommand({replSetStepUp: 1})); +rst.awaitNodesAgreeOnPrimary(undefined, [nodes[2], nodes[3]]); +assert.soon(() => (rst.getPrimary() == newPrimary)); + +// Write some stuff to force a rollback +assert.commandWorked(newPrimary.getDB(dbName)[collName].insert({_id: "ForceRollback"})); +rst.awaitReplication(undefined, undefined, [nodes[3]]); + +// Bring the new node up in startupRecoveryForRestore mode. Since it can't see the set, this +// should succeed. + +jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag"); +clearRawMongoProgramOutput(); +restoreNode = rst.start( + restoreNode, + { + noReplSet: true, + setParameter: Object.merge(startParams, { + startupRecoveryForRestore: true, + recoverFromOplogAsStandalone: true, + takeUnstableCheckpointOnShutdown: true, + 'failpoint.hangAfterCollectionInserts': + tojson({mode: 'alwaysOn', data: {collectionNS: sentinelColl.getFullName()}}), + + }) + }, + true /* restart */); +// Make sure we can read the last doc after standalone recovery. +assert.docEq({_id: lastId, paddingStr: paddingStr}, + restoreNode.getDB(dbName)[collName].findOne({_id: lastId})); + +clearRawMongoProgramOutput(); +jsTestLog("Restarting restore node again, in repl set mode"); +restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams}); + +// This node should not come back up, because it has no stable timestamp to recover to. +assert.soon(() => (rawMongoProgramOutput().search("UnrecoverableRollbackError") >= 0)); +// Hide the exit code from stopSet. +waitMongoProgram(parseInt(restoreNode.port)); + +// Remove the nodes which are down. +rst.remove(primary); +rst.remove(restoreNode); +// Shut down the set. +rst.stopSet(); +})(); diff --git a/jstests/replsets/startup_recovery_for_restore_restarts.js b/jstests/replsets/startup_recovery_for_restore_restarts.js new file mode 100644 index 00000000000..be2e3515812 --- /dev/null +++ b/jstests/replsets/startup_recovery_for_restore_restarts.js @@ -0,0 +1,180 @@ +/* + * Tests that we can recover from a node with a lagged stable timestamp using the special + * "for restore" mode, but not read from older points-in-time on the recovered node, and that + * we can do so even after we crash in the middle of an attempt to restore. + * + * This test only makes sense for storage engines that support recover to stable timestamp. + * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication, + * requires_majority_read_concern, uses_transactions, uses_prepare_transaction, + * # We don't expect to do this while upgrading. + * multiversion_incompatible] + */ + +(function() { +"use strict"; +load("jstests/libs/fail_point_util.js"); +const SIGKILL = 9; + +const dbName = TestData.testName; +const logLevel = tojson({storage: {recovery: 2}}); + +const rst = new ReplSetTest({ + nodes: [{}, {}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], + settings: {chainingAllowed: false} +}); + +const startParams = { + logComponentVerbosity: logLevel, + replBatchLimitOperations: 100 +}; +const nodes = rst.startSet({setParameter: startParams}); +let restoreNode = nodes[1]; +rst.initiateWithHighElectionTimeout(); +const primary = rst.getPrimary(); +const db = primary.getDB(dbName); +const collName = "testcoll"; +const sentinelCollName = "sentinelcoll"; +const coll = db[collName]; +const sentinelColl = db[sentinelCollName]; +const paddingStr = "XXXXXXXXX"; + +// Pre-load some documents. + +const nDocs = 100; +let bulk = coll.initializeUnorderedBulkOp(); +for (let id = 1; id <= nDocs; id++) { + bulk.insert({_id: id, paddingStr: paddingStr}); +} +bulk.execute(); +rst.awaitReplication(); + +const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +// Keep the stable timestamp from moving on the node we're going to restart in restore mode. +assert.commandWorked(restoreNode.adminCommand({ + configureFailPoint: 'holdStableTimestampAtSpecificTimestamp', + mode: 'alwaysOn', + data: {"timestamp": holdOpTime} +})); + +// Do a bunch of updates, which are chosen so if one is missed, we'll know. +bulk = coll.initializeUnorderedBulkOp(); +const nUpdates = 1000; +const writeSentinelsAfter = 650; // This should be set to get us to the middle of a batch. +jsTestLog("Making " + nUpdates + " updates with snapshotting disabled on one node."); +for (let updateNo = 1; updateNo <= nUpdates; updateNo++) { + let id = (updateNo - 1) % nDocs + 1; + let updateField = "u" + updateNo; + let setDoc = {}; + setDoc[updateField] = updateNo; + bulk.find({_id: id}).updateOne({"$set": setDoc}); + if (updateNo == writeSentinelsAfter) { + // Write a bunch of inserts to the sentinel collection, which will be used to hang + // oplog application mid-batch. + bulk.execute(); + bulk = sentinelColl.initializeUnorderedBulkOp(); + for (let j = 1; j <= 100; j++) { + bulk.insert({_id: j}); + } + bulk.execute(); + bulk = coll.initializeUnorderedBulkOp(); + } +} +bulk.execute(); +rst.awaitReplication(); + +jsTestLog("Stopping replication on secondaries to hold back majority commit point."); +let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer'); +let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer'); + +const nExtraDocs = 50; +jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back."); +bulk = coll.initializeUnorderedBulkOp(); +const lastId = nDocs + nExtraDocs; +for (let id = 1; id <= nExtraDocs; id++) { + bulk.insert({_id: (id + nDocs), paddingStr: paddingStr}); +} +bulk.execute(); + +const penultimateOpTime = + assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +const sentinel2Timestamp = + assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s2"}]})) + .operationTime; + +rst.awaitReplication(undefined, undefined, [restoreNode]); + +jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag"); +// Must use stop/start with waitForConnect: false. See SERVER-56446 +rst.stop(restoreNode, undefined, undefined, {forRestart: true}); +clearRawMongoProgramOutput(); +rst.start(restoreNode, + { + noReplSet: true, + waitForConnect: false, + syncdelay: 1, // Take a lot of unstable checkpoints. + setParameter: Object.merge(startParams, { + startupRecoveryForRestore: true, + recoverFromOplogAsStandalone: true, + takeUnstableCheckpointOnShutdown: true, + 'failpoint.hangAfterCollectionInserts': + tojson({mode: 'alwaysOn', data: {collectionNS: sentinelColl.getFullName()}}), + + }) + }, + true /* restart */); +assert.soon(() => { // Can't use checklog because we can't connect to the mongo in startup mode. + return rawMongoProgramOutput().search("hangAfterCollectionInserts fail point enabled") !== -1; +}); +assert.soon(() => { + return rawMongoProgramOutput().search("Completed unstable checkpoint.") !== -1; +}); + +jsTestLog("Restarting restore node uncleanly"); +rst.stop(restoreNode, SIGKILL, {allowedExitCode: MongoRunner.EXIT_SIGKILL}, {forRestart: true}); +restoreNode = rst.start(restoreNode, + { + noReplSet: true, + setParameter: Object.merge(startParams, { + startupRecoveryForRestore: true, + recoverFromOplogAsStandalone: true, + takeUnstableCheckpointOnShutdown: true + }) + }, + true /* restart */); +// Make sure we can read something after standalone recovery. +assert.eq(1, restoreNode.getDB(dbName)[sentinelCollName].find({}).limit(1).itcount()); + +jsTestLog("Restarting restore node again, in repl set mode"); +restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams}); + +rst.awaitSecondaryNodes(undefined, [restoreNode]); +jsTestLog("Finished restarting restore node"); + +// For the timestamp check we step up the restored node so we can do atClusterTime reads on it. +// They are necessarily speculative because we are preventing majority optimes from advancing. + +jsTestLog("Stepping up restore node"); +rst.stepUp(restoreNode, {awaitReplicationBeforeStepUp: false}); + +// Should NOT able to read at the penultimate optime on the restore node. +const restoreNodeSession = restoreNode.startSession({causalConsistency: false}); +const restoreNodeSessionDb = restoreNodeSession.getDatabase(dbName); +jsTestLog( + "Checking restore node top-of-oplog minus 1 read, which should fail, because the restore node does not have that history."); +restoreNodeSession.startTransaction( + {readConcern: {level: "snapshot", atClusterTime: penultimateOpTime}}); +assert.commandFailedWithCode( + restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}}), + ErrorCodes.SnapshotTooOld); +restoreNodeSession.abortTransaction(); + +// Should see that very last document. +assert.docEq({_id: "s2"}, restoreNode.getDB(dbName)[sentinelCollName].findOne({_id: "s2"})); + +// Allow set to become current and shut down with ordinary dbHash verification. +stopReplProducer2.off(); +stopReplProducer3.off(); +rst.stopSet(); +})(); diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index ee8b253fd70..198d49d6df1 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -634,6 +634,10 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { str::stream() << "Cannot take an unstable checkpoint on shutdown while using queryableBackupMode", !gTakeUnstableCheckpointOnShutdown); + uassert(5576603, + str::stream() << "Cannot specify both queryableBackupMode and " + << "startupRecoveryForRestore at the same time", + !repl::startupRecoveryForRestore); auto replCoord = repl::ReplicationCoordinator::get(startupOpCtx.get()); invariant(replCoord); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c7c8e29894a..a7990f383d7 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -295,6 +295,7 @@ env.Library( 'oplog', 'oplog_application', 'oplog_interface_local', + 'repl_server_parameters', ], ) diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 4fd7cd85015..050c2ed6e2c 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -492,6 +492,17 @@ server_parameters: cpp_varname: enableDefaultWriteConcernUpdatesForInitiate default: false + startupRecoveryForRestore: + description: >- + When set, do startup recovery in such a way that the history of the recovered + operations is not preserved. At the end of startup recovery, snapshot reads before + the recovered top of oplog will not be possible. Reduces cache pressure when + recovering many oplog entries, as when restoring from backup in some scenarios. + set_at: startup + cpp_vartype: bool + cpp_varname: startupRecoveryForRestore + default: false + feature_flags: featureFlagTenantMigrations: description: >- diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 675800c3cad..c9d4aa3c381 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/oplog_applier_impl.h" #include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/oplog_interface_local.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/transaction_oplog_application.h" @@ -160,8 +161,8 @@ public: 40293, "Couldn't find any entries in the oplog, which should be impossible", attrs); } - auto firstTimestampFound = - fassert(40291, OpTime::parseFromOplogEntry(_cursor->nextSafe())).getTimestamp(); + _opTimeAtStartPoint = fassert(40291, OpTime::parseFromOplogEntry(_cursor->nextSafe())); + const auto firstTimestampFound = _opTimeAtStartPoint.getTimestamp(); if (firstTimestampFound != _oplogApplicationStartPoint) { LOGV2_FATAL_NOTRACE( 40292, @@ -215,6 +216,10 @@ public: MONGO_UNREACHABLE; } + OpTime getOpTimeAtStartPoint() { + return _opTimeAtStartPoint; + } + private: enum class Mode { kPeek, kPop }; bool _peekOrPop(Value* value, Mode mode) { @@ -228,6 +233,7 @@ private: const Timestamp _oplogApplicationStartPoint; const boost::optional<Timestamp> _oplogApplicationEndPoint; + OpTime _opTimeAtStartPoint; std::unique_ptr<DBDirectClient> _client; std::unique_ptr<DBClientCursor> _cursor; }; @@ -315,7 +321,14 @@ void ReplicationRecoveryImpl::recoverFromOplogAsStandalone(OperationContext* opC // Initialize the cached pointer to the oplog collection. acquireOplogCollectionForLogging(opCtx); - if (recoveryTS) { + if (recoveryTS || startupRecoveryForRestore) { + if (startupRecoveryForRestore && !recoveryTS) { + LOGV2_WARNING(5576601, + "Replication startup parameter 'startupRecoveryForRestore' is set and " + "recovering from an unstable checkpoint. Assuming this is a resume of " + "an earlier attempt to recover for restore."); + } + // We pass in "none" for the stable timestamp so that recoverFromOplog asks storage // for the recoveryTimestamp just like on replica set recovery. const auto stableTimestamp = boost::none; @@ -387,7 +400,8 @@ void ReplicationRecoveryImpl::recoverFromOplogUpTo(OperationContext* opCtx, Time << endPoint.toString() << "' in the oplog."); } - Timestamp appliedUpTo = _applyOplogOperations(opCtx, *startPoint, endPoint); + Timestamp appliedUpTo = _applyOplogOperations( + opCtx, *startPoint, endPoint, RecoveryMode::kStartupFromStableTimestamp); if (appliedUpTo.isNull()) { LOGV2(21541, "No stored oplog entries to apply for recovery between {startPoint} (inclusive) and " @@ -425,6 +439,7 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, // recovery timestamp. If the storage engine returns a timestamp, we recover from that point. // However, if the storage engine returns "none", the storage engine does not have a stable // checkpoint and we must recover from an unstable checkpoint instead. + bool isRollbackRecovery = stableTimestamp != boost::none; const bool supportsRecoveryTimestamp = _storageInterface->supportsRecoveryTimestamp(opCtx->getServiceContext()); if (!stableTimestamp && supportsRecoveryTimestamp) { @@ -447,7 +462,9 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, if (stableTimestamp) { invariant(supportsRecoveryTimestamp); - _recoverFromStableTimestamp(opCtx, *stableTimestamp, topOfOplog); + const auto recoveryMode = isRollbackRecovery ? RecoveryMode::kRollbackFromStableTimestamp + : RecoveryMode::kStartupFromStableTimestamp; + _recoverFromStableTimestamp(opCtx, *stableTimestamp, topOfOplog, recoveryMode); } else { _recoverFromUnstableCheckpoint( opCtx, _consistencyMarkers->getAppliedThrough(opCtx), topOfOplog); @@ -462,7 +479,8 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCtx, Timestamp stableTimestamp, - OpTime topOfOplog) { + OpTime topOfOplog, + RecoveryMode recoveryMode) { invariant(!stableTimestamp.isNull()); invariant(!topOfOplog.isNull()); @@ -478,7 +496,22 @@ void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCt "Starting recovery oplog application at the stable timestamp: {stableTimestamp}", "Starting recovery oplog application at the stable timestamp", "stableTimestamp"_attr = stableTimestamp); - _applyToEndOfOplog(opCtx, stableTimestamp, topOfOplog.getTimestamp()); + + if (recoveryMode == RecoveryMode::kStartupFromStableTimestamp && startupRecoveryForRestore) { + LOGV2_WARNING(5576600, + "Replication startup parameter 'startupRecoveryForRestore' is set, " + "recovering without preserving history before top of oplog."); + // Take only unstable checkpoints during the recovery process. + _storageInterface->setInitialDataTimestamp(opCtx->getServiceContext(), + Timestamp::kAllowUnstableCheckpointsSentinel); + // Allow "oldest" timestamp to move forward freely. + _storageInterface->setStableTimestamp(opCtx->getServiceContext(), Timestamp::min()); + } + _applyToEndOfOplog(opCtx, stableTimestamp, topOfOplog.getTimestamp(), recoveryMode); + if (recoveryMode == RecoveryMode::kStartupFromStableTimestamp && startupRecoveryForRestore) { + _storageInterface->setInitialDataTimestamp(opCtx->getServiceContext(), + topOfOplog.getTimestamp()); + } } void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* opCtx, @@ -520,7 +553,17 @@ void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* o opCtx->getServiceContext()->getStorageEngine()->setOldestTimestamp( appliedThrough.getTimestamp()); - _applyToEndOfOplog(opCtx, appliedThrough.getTimestamp(), topOfOplog.getTimestamp()); + if (startupRecoveryForRestore) { + // When we're recovering for a restore, we may be recovering a large number of oplog + // entries, so we want to take unstable checkpoints to reduce cache pressure and allow + // resumption in case of a crash. + _storageInterface->setInitialDataTimestamp( + opCtx->getServiceContext(), Timestamp::kAllowUnstableCheckpointsSentinel); + } + _applyToEndOfOplog(opCtx, + appliedThrough.getTimestamp(), + topOfOplog.getTimestamp(), + RecoveryMode::kStartupFromUnstableCheckpoint); } // `_recoverFromUnstableCheckpoint` is only expected to be called on startup. @@ -548,7 +591,8 @@ void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* o void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, const Timestamp& oplogApplicationStartPoint, - const Timestamp& topOfOplog) { + const Timestamp& topOfOplog, + const RecoveryMode recoveryMode) { invariant(!oplogApplicationStartPoint.isNull()); invariant(!topOfOplog.isNull()); @@ -567,7 +611,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, "topOfOplog"_attr = topOfOplog.toBSON()); } - Timestamp appliedUpTo = _applyOplogOperations(opCtx, oplogApplicationStartPoint, topOfOplog); + Timestamp appliedUpTo = + _applyOplogOperations(opCtx, oplogApplicationStartPoint, topOfOplog, recoveryMode); invariant(!appliedUpTo.isNull()); invariant(appliedUpTo == topOfOplog, str::stream() << "Did not apply to top of oplog. Applied through: " @@ -577,7 +622,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx, const Timestamp& startPoint, - const Timestamp& endPoint) { + const Timestamp& endPoint, + RecoveryMode recoveryMode) { // The oplog buffer will fetch all entries >= the startPoint timestamp, but it skips the first // op on startup, which is why the startPoint is described as "exclusive". LOGV2(21550, @@ -592,10 +638,11 @@ Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx RecoveryOplogApplierStats stats; auto writerPool = makeReplWriterPool(); + auto* replCoord = ReplicationCoordinator::get(opCtx); OplogApplierImpl oplogApplier(nullptr, &oplogBuffer, &stats, - ReplicationCoordinator::get(opCtx), + replCoord, _consistencyMarkers, _storageInterface, OplogApplier::Options(OplogApplication::Mode::kRecovering), @@ -605,11 +652,36 @@ Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx batchLimits.bytes = getBatchLimitOplogBytes(opCtx, _storageInterface); batchLimits.ops = getBatchLimitOplogEntries(); + // If we're doing unstable checkpoints during the recovery process (as we do during the special + // startupRecoveryForRestore mode), we need to advance the consistency marker for each batch so + // the next time we recover we won't start all the way over. Further, we can advance the oldest + // timestamp to avoid keeping too much history. + // + // If we're recovering from a stable checkpoint (except the special startupRecoveryForRestore + // mode, which discards history before the top of oplog), we aren't doing new checkpoints during + // recovery so there is no point in advancing the consistency marker and we cannot advance + // "oldest" becaue it would be later than "stable". + const bool advanceTimestampsEachBatch = startupRecoveryForRestore && + (recoveryMode == RecoveryMode::kStartupFromStableTimestamp || + recoveryMode == RecoveryMode::kStartupFromUnstableCheckpoint); + OpTime applyThroughOpTime; std::vector<OplogEntry> batch; while ( !(batch = fassert(50763, oplogApplier.getNextApplierBatch(opCtx, batchLimits))).empty()) { + if (advanceTimestampsEachBatch && applyThroughOpTime.isNull()) { + // We must set appliedThrough before applying anything at all, so we know + // any unstable checkpoints we take are "dirty". A null appliedThrough indicates + // a clean shutdown which may not be the case if we had started applying a batch. + _consistencyMarkers->setAppliedThrough(opCtx, oplogBuffer.getOpTimeAtStartPoint()); + } applyThroughOpTime = uassertStatusOK(oplogApplier.applyOplogBatch(opCtx, std::move(batch))); + if (advanceTimestampsEachBatch) { + invariant(!applyThroughOpTime.isNull()); + _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime); + replCoord->getServiceContext()->getStorageEngine()->setOldestTimestamp( + applyThroughOpTime.getTimestamp()); + } } stats.complete(applyThroughOpTime); invariant(oplogBuffer.isEmpty(), @@ -627,9 +699,11 @@ Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx // to that checkpoint at a replication consistent point, and applying the oplog is safe. // If we don't have a stable checkpoint, then we must be in startup recovery, and not rollback // recovery, because we only roll back to a stable timestamp when we have a stable checkpoint. - // Startup recovery from an unstable checkpoint only ever applies a single batch and it is safe - // to replay the batch from any point. - _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime); + // It is safe to do startup recovery from an unstable checkpoint provided we recover to the + // end of the oplog and discard history before it, as _recoverFromUnstableCheckpoint does. + if (!advanceTimestampsEachBatch) { + _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime); + } return applyThroughOpTime.getTimestamp(); } diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h index 320076e24a8..9439655e512 100644 --- a/src/mongo/db/repl/replication_recovery.h +++ b/src/mongo/db/repl/replication_recovery.h @@ -85,6 +85,14 @@ public: void recoverFromOplogUpTo(OperationContext* opCtx, Timestamp endPoint) override; private: + enum class RecoveryMode { + kStartupFromStableTimestamp, + kStartupFromUnstableCheckpoint, + kRollbackFromStableTimestamp, + // There is no RecoveryMode::kRollbackFromUnstableCheckpoint, rollback can only recover from + // a stable timestamp. + }; + /** * Confirms that the data and oplog all indicate that the nodes has an unstable checkpoint * that is fully up to date. @@ -97,7 +105,8 @@ private: */ void _recoverFromStableTimestamp(OperationContext* opCtx, Timestamp stableTimestamp, - OpTime topOfOplog); + OpTime topOfOplog, + RecoveryMode recoveryMode); /** * After truncating the oplog, completes recovery if we're recovering from an unstable @@ -113,7 +122,8 @@ private: */ void _applyToEndOfOplog(OperationContext* opCtx, const Timestamp& oplogApplicationStartPoint, - const Timestamp& topOfOplog); + const Timestamp& topOfOplog, + RecoveryMode recoveryMode); /** * Applies all oplog entries from startPoint (exclusive) to endPoint (inclusive). Returns the @@ -121,7 +131,8 @@ private: */ Timestamp _applyOplogOperations(OperationContext* opCtx, const Timestamp& startPoint, - const Timestamp& endPoint); + const Timestamp& endPoint, + RecoveryMode recoveryMode); /** * Gets the last applied OpTime from the end of the oplog. Returns CollectionIsEmpty if there is diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 8371d6e312a..cfc6bbccb50 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -1816,7 +1816,7 @@ void WiredTigerKVEngine::checkpoint() { // Three cases: // // First, initialDataTimestamp is Timestamp(0, 1) -> Take full checkpoint. This is when - // there is no consistent view of the data (i.e: during initial sync). + // there is no consistent view of the data (e.g: during initial sync). // // Second, stableTimestamp < initialDataTimestamp: Skip checkpoints. The data on disk is // prone to being rolled back. Hold off on checkpoints. Hope that the stable timestamp @@ -1828,6 +1828,10 @@ void WiredTigerKVEngine::checkpoint() { UniqueWiredTigerSession session = _sessionCache->getSession(); WT_SESSION* s = session->getSession(); invariantWTOK(s->checkpoint(s, "use_timestamp=false")); + LOGV2_FOR_RECOVERY(5576602, + 2, + "Completed unstable checkpoint.", + "initialDataTimestamp"_attr = initialDataTimestamp.toString()); } else if (stableTimestamp < initialDataTimestamp) { LOGV2_FOR_RECOVERY( 23985, |