diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2021-05-10 11:56:04 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-12 15:12:20 +0000 |
commit | 2a99e03b813f33342ffe83ccc5df9b8d2c33bf08 (patch) | |
tree | ed1abc5a03f131bf76d48988b2f796b51d069ae0 | |
parent | 0c3335b6a4c2281d1b1650508c3075752cc12c2d (diff) | |
download | mongo-2a99e03b813f33342ffe83ccc5df9b8d2c33bf08.tar.gz |
SERVER-55766 Introduce an optimized "for restore" startup replication recovery mechanism
(cherry picked from commit 40b7635321caadb7219f7d990a049a93d9776490)
-rw-r--r-- | jstests/replsets/startup_recovery_for_restore.js | 160 | ||||
-rw-r--r-- | jstests/replsets/startup_recovery_for_restore_needs_rollback.js | 133 | ||||
-rw-r--r-- | jstests/replsets/startup_recovery_for_restore_restarts.js | 181 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 101 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.h | 17 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 7 |
9 files changed, 597 insertions, 19 deletions
diff --git a/jstests/replsets/startup_recovery_for_restore.js b/jstests/replsets/startup_recovery_for_restore.js new file mode 100644 index 00000000000..bd5b5a98db7 --- /dev/null +++ b/jstests/replsets/startup_recovery_for_restore.js @@ -0,0 +1,160 @@ +/* + * Tests that we can recover from a node with a lagged stable timestamp using the special + * "for restore" mode, but not read from older points-in-time on the recovered node. + * + * This test only makes sense for storage engines that support recover to stable timestamp. + * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication, + * requires_majority_read_concern, uses_transactions, uses_prepare_transaction, + * # We don't expect to do this while upgrading. + * multiversion_incompatible] + */ + +(function() { +"use strict"; +load("jstests/libs/fail_point_util.js"); + +const dbName = TestData.testName; + +const logLevel = tojson({storage: {recovery: 2}}); + +const rst = new ReplSetTest({ + nodes: [{}, {}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], + settings: {chainingAllowed: false} +}); + +const startParams = { + logComponentVerbosity: logLevel, + replBatchLimitOperations: 100 +}; +const nodes = rst.startSet({setParameter: startParams}); +let restoreNode = nodes[1]; +rst.initiateWithHighElectionTimeout(); +const primary = rst.getPrimary(); +const db = primary.getDB(dbName); +const collName = "testcoll"; +const sentinelCollName = "sentinelcoll"; +const coll = db[collName]; +const paddingStr = "XXXXXXXXX"; + +// Pre-load some documents. +const nPreDocs = 2; +coll.insert([{_id: "pre1"}, {_id: "pre2"}]); +rst.awaitReplication(); + +const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +// Keep the stable timestamp from moving on the node we're going to restart in restore mode. +assert.commandWorked(restoreNode.adminCommand({ + configureFailPoint: 'holdStableTimestampAtSpecificTimestamp', + mode: 'alwaysOn', + data: {"timestamp": holdOpTime} +})); + +// Insert a bunch of documents. +let bulk = coll.initializeUnorderedBulkOp(); +const nDocs = 1000; +jsTestLog("Inserting " + nDocs + " documents with snapshotting disabled on one node."); +for (let id = 1; id <= nDocs; id++) { + bulk.insert({_id: id, paddingStr: paddingStr}); +} +bulk.execute(); +rst.awaitReplication(); + +jsTestLog("Stopping replication on secondaries to hold back majority commit point."); +let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer'); +let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer'); + +jsTestLog("Writing first sentinel document."); +const sentinel1Timestamp = + assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s1"}]})) + .operationTime; + +const nExtraDocs = 50; +jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back."); +bulk = coll.initializeUnorderedBulkOp(); +for (let id = 1; id <= nExtraDocs; id++) { + bulk.insert({_id: (id + nDocs), paddingStr: paddingStr}); +} +bulk.execute(); +const lastId = nDocs + nExtraDocs; + +const penultimateOpTime = + assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +const sentinel2Timestamp = + assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s2"}]})) + .operationTime; + +rst.awaitReplication(undefined, undefined, [restoreNode]); + +jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag"); +restoreNode = rst.restart(restoreNode, { + noReplSet: true, + setParameter: Object.merge(startParams, { + startupRecoveryForRestore: true, + recoverFromOplogAsStandalone: true, + takeUnstableCheckpointOnShutdown: true + }) +}); +// Make sure we can read something after standalone recovery. +assert.eq(2, restoreNode.getDB(dbName)[sentinelCollName].find({}).itcount()); + +jsTestLog("Restarting restore node again, in repl set mode"); +restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams}); + +rst.awaitSecondaryNodes(undefined, [restoreNode]); +jsTestLog("Finished restarting restore node"); + +const restoreDb = restoreNode.getDB(dbName); + +jsTestLog("Checking restore node untimestamped read."); +// Basic test: should see all docs with untimestamped read. +assert.eq(nPreDocs + nDocs + nExtraDocs, coll.find().itcount()); +assert.eq(nPreDocs + nDocs + nExtraDocs, restoreDb[collName].find().itcount()); + +// For the remaining checks we step up the restored node so we can do atClusterTime reads on it. +// They are necessarily speculative because we are preventing majority optimes from advancing. + +jsTestLog("Stepping up restore node"); +rst.stepUp(restoreNode, {awaitReplicationBeforeStepUp: false}); + +// Should also be able to read at the final sentinel optime on restore node. +const restoreNodeSession = restoreNode.startSession({causalConsistency: false}); +restoreNodeSession.startTransaction( + {readConcern: {level: "snapshot", atClusterTime: sentinel2Timestamp}}); +const restoreNodeSessionDb = restoreNodeSession.getDatabase(dbName); +jsTestLog("Checking top-of-oplog read works on restored node."); + +let res = assert.commandWorked( + restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}})); +assert.eq(1, res.cursor.firstBatch.length); +assert.docEq({_id: lastId, paddingStr: paddingStr}, res.cursor.firstBatch[0]); + +// Must abort because majority is not advancing. +restoreNodeSession.abortTransaction(); + +// Should NOT able to read at the first sentinel optime on the restore node. +restoreNodeSession.startTransaction( + {readConcern: {level: "snapshot", atClusterTime: sentinel1Timestamp}}); +jsTestLog( + "Checking restore node majority optime read, which should fail, because the restore node does not have that history."); +res = assert.commandFailedWithCode( + restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": {"$gte": nDocs}}}), + ErrorCodes.SnapshotTooOld); +restoreNodeSession.abortTransaction(); + +// Should NOT able to read at the penultimate optime on the restore node either. +jsTestLog( + "Checking restore node top-of-oplog minus 1 read, which should fail, because the restore node does not have that history."); +restoreNodeSession.startTransaction( + {readConcern: {level: "snapshot", atClusterTime: penultimateOpTime}}); +res = assert.commandFailedWithCode( + restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}}), + ErrorCodes.SnapshotTooOld); +restoreNodeSession.abortTransaction(); + +// Allow set to become current and shut down with ordinary dbHash verification. +stopReplProducer2.off(); +stopReplProducer3.off(); +rst.stopSet(); +})(); diff --git a/jstests/replsets/startup_recovery_for_restore_needs_rollback.js b/jstests/replsets/startup_recovery_for_restore_needs_rollback.js new file mode 100644 index 00000000000..a3f74368f4b --- /dev/null +++ b/jstests/replsets/startup_recovery_for_restore_needs_rollback.js @@ -0,0 +1,133 @@ +/* + * Tests that if we recover from a node with a lagged stable timestamp using the special + * "for restore" mode, and there was a rollback within the recovered oplog, that we crash rather + * than attempt to use the node. + * + * This test only makes sense for storage engines that support recover to stable timestamp. + * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication, + * requires_majority_read_concern, uses_transactions, uses_prepare_transaction, + * # We don't expect to do this while upgrading. + * multiversion_incompatible] + */ + +(function() { +"use strict"; +load("jstests/libs/fail_point_util.js"); + +const dbName = TestData.testName; +const logLevel = tojson({storage: {recovery: 2}}); + +// The restore node is made non-voting so the majority is 2. +// Disable primary catch up since we want to force a rollback. +const rst = new ReplSetTest({ + nodes: [{}, {rsConfig: {votes: 0, priority: 0}}, {}, {}], + settings: {catchUpTimeoutMillis: 0, chainingAllowed: false} +}); + +const startParams = { + logComponentVerbosity: logLevel, + replBatchLimitOperations: 100 +}; +const nodes = rst.startSet({setParameter: startParams}); +let restoreNode = nodes[1]; +rst.initiateWithHighElectionTimeout(); +const primary = rst.getPrimary(); +const db = primary.getDB(dbName); +const collName = "testcoll"; +const sentinelCollName = "sentinelcoll"; +const coll = db[collName]; +const sentinelColl = db[sentinelCollName]; +const paddingStr = "XXXXXXXXX"; + +// Pre-load some documents. +coll.insert([{_id: "pre1"}, {_id: "pre2"}]); +rst.awaitReplication(); + +const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +// Keep the stable timestamp from moving on the node we're going to restart in restore mode. +assert.commandWorked(restoreNode.adminCommand({ + configureFailPoint: 'holdStableTimestampAtSpecificTimestamp', + mode: 'alwaysOn', + data: {"timestamp": holdOpTime} +})); + +// Insert a bunch of documents. +let bulk = coll.initializeUnorderedBulkOp(); +const nDocs = 1000; +jsTestLog("Inserting " + nDocs + " documents with snapshotting disabled on one node."); +for (let id = 1; id <= nDocs; id++) { + bulk.insert({_id: id, paddingStr: paddingStr}); +} +bulk.execute(); +rst.awaitReplication(); + +jsTestLog("Stopping replication on secondaries to hold back majority commit point."); +let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer'); +let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer'); + +const nExtraDocs = 50; +jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back."); +bulk = coll.initializeUnorderedBulkOp(); +const lastId = nDocs + nExtraDocs; +for (let id = 1; id <= nExtraDocs; id++) { + bulk.insert({_id: (id + nDocs), paddingStr: paddingStr}); +} +bulk.execute(); +rst.awaitReplication(undefined, undefined, [restoreNode]); + +// Stop some nodes so we can force a rollback +rst.stop(primary); +rst.stop(restoreNode, undefined, undefined, {forRestart: true}); + +// Restart replication and step up a new primary. +stopReplProducer2.off(); +stopReplProducer3.off(); + +const newPrimary = nodes[2]; +// Must send stepUp command without using ReplSetTest helper, as ReplSetTest helper expects all +// nodes to be alive. +assert.commandWorked(newPrimary.adminCommand({replSetStepUp: 1})); +rst.awaitNodesAgreeOnPrimary(undefined, [nodes[2], nodes[3]]); +assert.soon(() => (rst.getPrimary() == newPrimary)); + +// Write some stuff to force a rollback +assert.commandWorked(newPrimary.getDB(dbName)[collName].insert({_id: "ForceRollback"})); +rst.awaitReplication(undefined, undefined, [nodes[3]]); + +// Bring the new node up in startupRecoveryForRestore mode. Since it can't see the set, this +// should succeed. + +jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag"); +clearRawMongoProgramOutput(); +restoreNode = rst.start( + restoreNode, + { + noReplSet: true, + setParameter: Object.merge(startParams, { + startupRecoveryForRestore: true, + recoverFromOplogAsStandalone: true, + takeUnstableCheckpointOnShutdown: true, + 'failpoint.hangAfterCollectionInserts': + tojson({mode: 'alwaysOn', data: {collectionNS: sentinelColl.getFullName()}}), + + }) + }, + true /* restart */); +// Make sure we can read the last doc after standalone recovery. +assert.docEq({_id: lastId, paddingStr: paddingStr}, + restoreNode.getDB(dbName)[collName].findOne({_id: lastId})); + +clearRawMongoProgramOutput(); +jsTestLog("Restarting restore node again, in repl set mode"); +restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams}); + +// This node should not come back up, because it has no stable timestamp to recover to. +assert.soon(() => (rawMongoProgramOutput().search("UnrecoverableRollbackError") >= 0)); + +// Remove the nodes which are down. +rst.remove(primary); +rst.remove(restoreNode); +// Shut down the set. +rst.stopSet(); +})(); diff --git a/jstests/replsets/startup_recovery_for_restore_restarts.js b/jstests/replsets/startup_recovery_for_restore_restarts.js new file mode 100644 index 00000000000..7b52bf47227 --- /dev/null +++ b/jstests/replsets/startup_recovery_for_restore_restarts.js @@ -0,0 +1,181 @@ +/* + * Tests that we can recover from a node with a lagged stable timestamp using the special + * "for restore" mode, but not read from older points-in-time on the recovered node, and that + * we can do so even after we crash in the middle of an attempt to restore. + * + * This test only makes sense for storage engines that support recover to stable timestamp. + * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication, + * requires_majority_read_concern, uses_transactions, uses_prepare_transaction, + * # We don't expect to do this while upgrading. + * multiversion_incompatible] + */ + +(function() { +"use strict"; +load("jstests/libs/fail_point_util.js"); +const SIGKILL = 9; + +const dbName = TestData.testName; +const logLevel = tojson({storage: {recovery: 2}}); + +const rst = new ReplSetTest({ + nodes: [{}, {}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], + settings: {chainingAllowed: false} +}); + +const startParams = { + logComponentVerbosity: logLevel, + replBatchLimitOperations: 100 +}; +const nodes = rst.startSet({setParameter: startParams}); +let restoreNode = nodes[1]; +rst.initiateWithHighElectionTimeout(); +const primary = rst.getPrimary(); +const db = primary.getDB(dbName); +const collName = "testcoll"; +const sentinelCollName = "sentinelcoll"; +const coll = db[collName]; +const sentinelColl = db[sentinelCollName]; +const paddingStr = "XXXXXXXXX"; + +// Pre-load some documents. + +const nDocs = 100; +let bulk = coll.initializeUnorderedBulkOp(); +for (let id = 1; id <= nDocs; id++) { + bulk.insert({_id: id, paddingStr: paddingStr}); +} +bulk.execute(); +rst.awaitReplication(); + +const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +// Keep the stable timestamp from moving on the node we're going to restart in restore mode. +assert.commandWorked(restoreNode.adminCommand({ + configureFailPoint: 'holdStableTimestampAtSpecificTimestamp', + mode: 'alwaysOn', + data: {"timestamp": holdOpTime} +})); + +// Do a bunch of updates, which are chosen so if one is missed, we'll know. +bulk = coll.initializeUnorderedBulkOp(); +const nUpdates = 1000; +const writeSentinelsAfter = 650; // This should be set to get us to the middle of a batch. +jsTestLog("Making " + nUpdates + " updates with snapshotting disabled on one node."); +for (let updateNo = 1; updateNo <= nUpdates; updateNo++) { + let id = (updateNo - 1) % nDocs + 1; + let updateField = "u" + updateNo; + let setDoc = {}; + setDoc[updateField] = updateNo; + bulk.find({_id: id}).updateOne({"$set": setDoc}); + if (updateNo == writeSentinelsAfter) { + // Write a bunch of inserts to the sentinel collection, which will be used to hang + // oplog application mid-batch. + bulk.execute(); + bulk = sentinelColl.initializeUnorderedBulkOp(); + for (let j = 1; j <= 100; j++) { + bulk.insert({_id: j}); + } + bulk.execute(); + bulk = coll.initializeUnorderedBulkOp(); + } +} +bulk.execute(); +rst.awaitReplication(); + +jsTestLog("Stopping replication on secondaries to hold back majority commit point."); +let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer'); +let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer'); + +const nExtraDocs = 50; +jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back."); +bulk = coll.initializeUnorderedBulkOp(); +const lastId = nDocs + nExtraDocs; +for (let id = 1; id <= nExtraDocs; id++) { + bulk.insert({_id: (id + nDocs), paddingStr: paddingStr}); +} +bulk.execute(); + +const penultimateOpTime = + assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime; + +const sentinel2Timestamp = + assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s2"}]})) + .operationTime; + +rst.awaitReplication(undefined, undefined, [restoreNode]); + +jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag"); +// Must use stop/start with waitForConnect: false. See SERVER-56446 +rst.stop(restoreNode, undefined, undefined, {forRestart: true}); +clearRawMongoProgramOutput(); +rst.start(restoreNode, + { + noReplSet: true, + waitForConnect: false, + syncdelay: 1, // Take a lot of unstable checkpoints. + setParameter: Object.merge(startParams, { + startupRecoveryForRestore: true, + recoverFromOplogAsStandalone: true, + takeUnstableCheckpointOnShutdown: true, + 'failpoint.hangAfterCollectionInserts': + tojson({mode: 'alwaysOn', data: {collectionNS: sentinelColl.getFullName()}}), + + }) + }, + true /* restart */); +assert.soon(() => { // Can't use checklog because we can't connect to the mongo in startup mode. + return rawMongoProgramOutput().search("hangAfterCollectionInserts fail point enabled") !== -1; +}); +assert.soon(() => { + return rawMongoProgramOutput().search("Completed unstable checkpoint.") !== -1; +}); + +jsTestLog("Restarting restore node uncleanly"); +rst.stop(restoreNode, SIGKILL, {allowedExitCode: MongoRunner.EXIT_SIGKILL}, {forRestart: true}); +restoreNode = rst.start(restoreNode, + { + waitForConnect: true, + noReplSet: true, + setParameter: Object.merge(startParams, { + startupRecoveryForRestore: true, + recoverFromOplogAsStandalone: true, + takeUnstableCheckpointOnShutdown: true + }) + }, + true /* restart */); +// Make sure we can read something after standalone recovery. +assert.eq(1, restoreNode.getDB(dbName)[sentinelCollName].find({}).limit(1).itcount()); + +jsTestLog("Restarting restore node again, in repl set mode"); +restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams}); + +rst.awaitSecondaryNodes(undefined, [restoreNode]); +jsTestLog("Finished restarting restore node"); + +// For the timestamp check we step up the restored node so we can do atClusterTime reads on it. +// They are necessarily speculative because we are preventing majority optimes from advancing. + +jsTestLog("Stepping up restore node"); +rst.stepUp(restoreNode, {awaitReplicationBeforeStepUp: false}); + +// Should NOT able to read at the penultimate optime on the restore node. +const restoreNodeSession = restoreNode.startSession({causalConsistency: false}); +const restoreNodeSessionDb = restoreNodeSession.getDatabase(dbName); +jsTestLog( + "Checking restore node top-of-oplog minus 1 read, which should fail, because the restore node does not have that history."); +restoreNodeSession.startTransaction( + {readConcern: {level: "snapshot", atClusterTime: penultimateOpTime}}); +assert.commandFailedWithCode( + restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}}), + ErrorCodes.SnapshotTooOld); +restoreNodeSession.abortTransaction(); + +// Should see that very last document. +assert.docEq({_id: "s2"}, restoreNode.getDB(dbName)[sentinelCollName].findOne({_id: "s2"})); + +// Allow set to become current and shut down with ordinary dbHash verification. +stopReplProducer2.off(); +stopReplProducer3.off(); +rst.stopSet(); +})(); diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index d80a165cef3..352ffa62672 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -105,6 +105,7 @@ #include "mongo/db/repair_database_and_check_version.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/replication_coordinator.h" @@ -546,6 +547,10 @@ ExitCode _initAndListen(int listenPort) { str::stream() << "Cannot take an unstable checkpoint on shutdown while using queryableBackupMode", !gTakeUnstableCheckpointOnShutdown); + uassert(5576603, + str::stream() << "Cannot specify both queryableBackupMode and " + << "startupRecoveryForRestore at the same time", + !repl::startupRecoveryForRestore); auto replCoord = repl::ReplicationCoordinator::get(startupOpCtx.get()); invariant(replCoord); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5a6386819f8..58964522e5b 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -340,6 +340,7 @@ env.Library( LIBDEPS_PRIVATE=[ 'oplog', 'oplog_application', + 'repl_server_parameters', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/storage/storage_options', ], diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 9e8b771b201..a52a0f1a692 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -266,3 +266,14 @@ server_parameters: # and if it is not, readPreference is 'nearest'. default: "" validator: { callback: 'validateReadPreferenceMode' } + + startupRecoveryForRestore: + description: >- + When set, do startup recovery in such a way that the history of the recovered + operations is not preserved. At the end of startup recovery, snapshot reads before + the recovered top of oplog will not be possible. Reduces cache pressure when + recovering many oplog entries, as when restoring from backup in some scenarios. + set_at: startup + cpp_vartype: bool + cpp_varname: startupRecoveryForRestore + default: false diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index d6b3127ae09..ac5c00d9aed 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -42,6 +42,7 @@ #include "mongo/db/repl/apply_ops.h" #include "mongo/db/repl/oplog_applier_impl.h" #include "mongo/db/repl/oplog_buffer.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/transaction_oplog_application.h" @@ -141,8 +142,8 @@ public: fassertFailedNoTrace(40293); } - auto firstTimestampFound = - fassert(40291, OpTime::parseFromOplogEntry(_cursor->nextSafe())).getTimestamp(); + _opTimeAtStartPoint = fassert(40291, OpTime::parseFromOplogEntry(_cursor->nextSafe())); + const auto firstTimestampFound = _opTimeAtStartPoint.getTimestamp(); if (firstTimestampFound != _oplogApplicationStartPoint) { severe() << "Oplog entry at " << _oplogApplicationStartPoint.toBSON() << " is missing; actual entry found is " << firstTimestampFound.toBSON(); @@ -198,6 +199,10 @@ public: MONGO_UNREACHABLE; } + OpTime getOpTimeAtStartPoint() { + return _opTimeAtStartPoint; + } + private: enum class Mode { kPeek, kPop }; bool _peekOrPop(Value* value, Mode mode) { @@ -211,6 +216,7 @@ private: const Timestamp _oplogApplicationStartPoint; const boost::optional<Timestamp> _oplogApplicationEndPoint; + OpTime _opTimeAtStartPoint; std::unique_ptr<DBDirectClient> _client; std::unique_ptr<DBClientCursor> _cursor; }; @@ -286,7 +292,13 @@ void ReplicationRecoveryImpl::recoverFromOplogAsStandalone(OperationContext* opC // Initialize the cached pointer to the oplog collection. acquireOplogCollectionForLogging(opCtx); - if (recoveryTS) { + if (recoveryTS || startupRecoveryForRestore) { + if (startupRecoveryForRestore && !recoveryTS) { + warning() << "Replication startup parameter 'startupRecoveryForRestore' is set and " + "recovering from an unstable checkpoint. Assuming this is a resume of " + "an earlier attempt to recover for restore."; + } + // We pass in "none" for the stable timestamp so that recoverFromOplog asks storage // for the recoveryTimestamp just like on replica set recovery. const auto stableTimestamp = boost::none; @@ -344,7 +356,8 @@ void ReplicationRecoveryImpl::recoverFromOplogUpTo(OperationContext* opCtx, Time << endPoint.toString() << "' in the oplog."); } - Timestamp appliedUpTo = _applyOplogOperations(opCtx, *startPoint, endPoint); + Timestamp appliedUpTo = _applyOplogOperations( + opCtx, *startPoint, endPoint, RecoveryMode::kStartupFromStableTimestamp); if (appliedUpTo.isNull()) { log() << "No stored oplog entries to apply for recovery between " << startPoint->toString() << " (inclusive) and " << endPoint.toString() << " (inclusive)."; @@ -399,6 +412,7 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, // recovery timestamp. If the storage engine returns a timestamp, we recover from that point. // However, if the storage engine returns "none", the storage engine does not have a stable // checkpoint and we must recover from an unstable checkpoint instead. + bool isRollbackRecovery = stableTimestamp != boost::none; const bool supportsRecoveryTimestamp = _storageInterface->supportsRecoveryTimestamp(opCtx->getServiceContext()); if (!stableTimestamp && supportsRecoveryTimestamp) { @@ -414,7 +428,10 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, if (stableTimestamp) { invariant(supportsRecoveryTimestamp); - _recoverFromStableTimestamp(opCtx, *stableTimestamp, appliedThrough, topOfOplog); + const auto recoveryMode = isRollbackRecovery ? RecoveryMode::kRollbackFromStableTimestamp + : RecoveryMode::kStartupFromStableTimestamp; + _recoverFromStableTimestamp( + opCtx, *stableTimestamp, appliedThrough, topOfOplog, recoveryMode); } else { _recoverFromUnstableCheckpoint(opCtx, appliedThrough, topOfOplog); } @@ -426,7 +443,8 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx, void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCtx, Timestamp stableTimestamp, OpTime appliedThrough, - OpTime topOfOplog) { + OpTime topOfOplog, + RecoveryMode recoveryMode) { invariant(!stableTimestamp.isNull()); invariant(!topOfOplog.isNull()); const auto truncateAfterPoint = _consistencyMarkers->getOplogTruncateAfterPoint(opCtx); @@ -435,7 +453,21 @@ void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCt << ", TruncateAfter: " << truncateAfterPoint << ")"; log() << "Starting recovery oplog application at the stable timestamp: " << stableTimestamp; - _applyToEndOfOplog(opCtx, stableTimestamp, topOfOplog.getTimestamp()); + + if (recoveryMode == RecoveryMode::kStartupFromStableTimestamp && startupRecoveryForRestore) { + warning() << "Replication startup parameter 'startupRecoveryForRestore' is set, " + "recovering without preserving history before top of oplog."; + // Take only unstable checkpoints during the recovery process. + _storageInterface->setInitialDataTimestamp(opCtx->getServiceContext(), + Timestamp::kAllowUnstableCheckpointsSentinel); + // Allow "oldest" timestamp to move forward freely. + _storageInterface->setStableTimestamp(opCtx->getServiceContext(), Timestamp::min()); + } + _applyToEndOfOplog(opCtx, stableTimestamp, topOfOplog.getTimestamp(), recoveryMode); + if (recoveryMode == RecoveryMode::kStartupFromStableTimestamp && startupRecoveryForRestore) { + _storageInterface->setInitialDataTimestamp(opCtx->getServiceContext(), + topOfOplog.getTimestamp()); + } } void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* opCtx, @@ -468,7 +500,17 @@ void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* o opCtx->getServiceContext()->getStorageEngine()->setOldestTimestamp( appliedThrough.getTimestamp()); - _applyToEndOfOplog(opCtx, appliedThrough.getTimestamp(), topOfOplog.getTimestamp()); + if (startupRecoveryForRestore) { + // When we're recovering for a restore, we may be recovering a large number of oplog + // entries, so we want to take unstable checkpoints to reduce cache pressure and allow + // resumption in case of a crash. + _storageInterface->setInitialDataTimestamp( + opCtx->getServiceContext(), Timestamp::kAllowUnstableCheckpointsSentinel); + } + _applyToEndOfOplog(opCtx, + appliedThrough.getTimestamp(), + topOfOplog.getTimestamp(), + RecoveryMode::kStartupFromUnstableCheckpoint); } // `_recoverFromUnstableCheckpoint` is only expected to be called on startup. @@ -496,7 +538,8 @@ void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* o void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, const Timestamp& oplogApplicationStartPoint, - const Timestamp& topOfOplog) { + const Timestamp& topOfOplog, + const RecoveryMode recoveryMode) { invariant(!oplogApplicationStartPoint.isNull()); invariant(!topOfOplog.isNull()); @@ -511,7 +554,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, fassertFailedNoTrace(40313); } - Timestamp appliedUpTo = _applyOplogOperations(opCtx, oplogApplicationStartPoint, topOfOplog); + Timestamp appliedUpTo = + _applyOplogOperations(opCtx, oplogApplicationStartPoint, topOfOplog, recoveryMode); invariant(!appliedUpTo.isNull()); invariant(appliedUpTo == topOfOplog, str::stream() << "Did not apply to top of oplog. Applied through: " @@ -521,7 +565,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx, const Timestamp& startPoint, - const Timestamp& endPoint) { + const Timestamp& endPoint, + RecoveryMode recoveryMode) { log() << "Replaying stored operations from " << startPoint << " (inclusive) to " << endPoint << " (inclusive)."; @@ -556,11 +601,37 @@ Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx batchLimits.bytes = OplogApplier::calculateBatchLimitBytes(opCtx, _storageInterface); batchLimits.ops = OplogApplier::getBatchLimitOperations(); + // If we're doing unstable checkpoints during the recovery process (as we do during the special + // startupRecoveryForRestore mode), we need to advance the consistency marker for each batch so + // the next time we recover we won't start all the way over. Further, we can advance the oldest + // timestamp to avoid keeping too much history. + // + // If we're recovering from a stable checkpoint (except the special startupRecoveryForRestore + // mode, which discards history before the top of oplog), we aren't doing new checkpoints during + // recovery so there is no point in advancing the consistency marker and we cannot advance + // "oldest" becaue it would be later than "stable". + const bool advanceTimestampsEachBatch = startupRecoveryForRestore && + (recoveryMode == RecoveryMode::kStartupFromStableTimestamp || + recoveryMode == RecoveryMode::kStartupFromUnstableCheckpoint); + OpTime applyThroughOpTime; OplogApplier::Operations batch; + auto* replCoord = ReplicationCoordinator::get(opCtx); while ( !(batch = fassert(50763, oplogApplier.getNextApplierBatch(opCtx, batchLimits))).empty()) { + if (advanceTimestampsEachBatch && applyThroughOpTime.isNull()) { + // We must set appliedThrough before applying anything at all, so we know + // any unstable checkpoints we take are "dirty". A null appliedThrough indicates + // a clean shutdown which may not be the case if we had started applying a batch. + _consistencyMarkers->setAppliedThrough(opCtx, oplogBuffer.getOpTimeAtStartPoint()); + } applyThroughOpTime = uassertStatusOK(oplogApplier.multiApply(opCtx, std::move(batch))); + if (advanceTimestampsEachBatch) { + invariant(!applyThroughOpTime.isNull()); + _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime); + replCoord->getServiceContext()->getStorageEngine()->setOldestTimestamp( + applyThroughOpTime.getTimestamp()); + } } stats.complete(applyThroughOpTime); invariant(oplogBuffer.isEmpty(), @@ -578,9 +649,11 @@ Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx // to that checkpoint at a replication consistent point, and applying the oplog is safe. // If we don't have a stable checkpoint, then we must be in startup recovery, and not rollback // recovery, because we only roll back to a stable timestamp when we have a stable checkpoint. - // Startup recovery from an unstable checkpoint only ever applies a single batch and it is safe - // to replay the batch from any point. - _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime); + // It is safe to do startup recovery from an unstable checkpoint provided we recover to the + // end of the oplog and discard history before it, as _recoverFromUnstableCheckpoint does. + if (!advanceTimestampsEachBatch) { + _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime); + } return applyThroughOpTime.getTimestamp(); } diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h index a881b0a81fa..3bdc9979be7 100644 --- a/src/mongo/db/repl/replication_recovery.h +++ b/src/mongo/db/repl/replication_recovery.h @@ -85,6 +85,14 @@ public: void recoverFromOplogUpTo(OperationContext* opCtx, Timestamp endPoint) override; private: + enum class RecoveryMode { + kStartupFromStableTimestamp, + kStartupFromUnstableCheckpoint, + kRollbackFromStableTimestamp, + // There is no RecoveryMode::kRollbackFromUnstableCheckpoint, rollback can only recover from + // a stable timestamp. + }; + /** * Confirms that the data and oplog all indicate that the nodes has an unstable checkpoint * that is fully up to date. @@ -98,7 +106,8 @@ private: void _recoverFromStableTimestamp(OperationContext* opCtx, Timestamp stableTimestamp, OpTime appliedThrough, - OpTime topOfOplog); + OpTime topOfOplog, + RecoveryMode recoveryMode); /** * After truncating the oplog, completes recovery if we're recovering from an unstable @@ -114,7 +123,8 @@ private: */ void _applyToEndOfOplog(OperationContext* opCtx, const Timestamp& oplogApplicationStartPoint, - const Timestamp& topOfOplog); + const Timestamp& topOfOplog, + RecoveryMode recoveryMode); /** * Applies all oplog entries from startPoint (inclusive) to endPoint (inclusive). Returns the @@ -122,7 +132,8 @@ private: */ Timestamp _applyOplogOperations(OperationContext* opCtx, const Timestamp& startPoint, - const Timestamp& endPoint); + const Timestamp& endPoint, + RecoveryMode recoveryMode); /** * Gets the last applied OpTime from the end of the oplog. Returns CollectionIsEmpty if there is diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 33b3bc05b40..2e3f0768003 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -344,7 +344,7 @@ public: // Three cases: // // First, initialDataTimestamp is Timestamp(0, 1) -> Take full checkpoint. This is - // when there is no consistent view of the data (i.e: during initial sync). + // when there is no consistent view of the data (e.g: during initial sync). // // Second, stableTimestamp < initialDataTimestamp: Skip checkpoints. The data on // disk is prone to being rolled back. Hold off on checkpoints. Hope that the @@ -357,6 +357,9 @@ public: UniqueWiredTigerSession session = _sessionCache->getSession(); WT_SESSION* s = session->getSession(); invariantWTOK(s->checkpoint(s, "use_timestamp=false")); + LOG_FOR_RECOVERY(2) + << "Completed unstable checkpoint." + << " InitialDataTimestamp: " << initialDataTimestamp.toString(); } else if (stableTimestamp < initialDataTimestamp) { LOG_FOR_RECOVERY(2) << "Stable timestamp is behind the initial data timestamp, skipping " @@ -1005,7 +1008,7 @@ void WiredTigerKVEngine::cleanShutdown() { const Timestamp stableTimestamp = getStableTimestamp(); const Timestamp initialDataTimestamp = getInitialDataTimestamp(); - if (stableTimestamp >= initialDataTimestamp) { + if (gTakeUnstableCheckpointOnShutdown || stableTimestamp >= initialDataTimestamp) { invariantWTOK(_conn->close(_conn, closeConfig.c_str())); } else { log() << "Skipping checkpoint during clean shutdown because stableTimestamp (" |