summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/startup_recovery_for_restore.js160
-rw-r--r--jstests/replsets/startup_recovery_for_restore_needs_rollback.js135
-rw-r--r--jstests/replsets/startup_recovery_for_restore_restarts.js180
-rw-r--r--src/mongo/db/mongod_main.cpp4
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl11
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp104
-rw-r--r--src/mongo/db/repl/replication_recovery.h17
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp6
9 files changed, 599 insertions, 19 deletions
diff --git a/jstests/replsets/startup_recovery_for_restore.js b/jstests/replsets/startup_recovery_for_restore.js
new file mode 100644
index 00000000000..bd5b5a98db7
--- /dev/null
+++ b/jstests/replsets/startup_recovery_for_restore.js
@@ -0,0 +1,160 @@
+/*
+ * Tests that we can recover from a node with a lagged stable timestamp using the special
+ * "for restore" mode, but not read from older points-in-time on the recovered node.
+ *
+ * This test only makes sense for storage engines that support recover to stable timestamp.
+ * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication,
+ * requires_majority_read_concern, uses_transactions, uses_prepare_transaction,
+ * # We don't expect to do this while upgrading.
+ * multiversion_incompatible]
+ */
+
+(function() {
+"use strict";
+load("jstests/libs/fail_point_util.js");
+
+const dbName = TestData.testName;
+
+const logLevel = tojson({storage: {recovery: 2}});
+
+const rst = new ReplSetTest({
+ nodes: [{}, {}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
+ settings: {chainingAllowed: false}
+});
+
+const startParams = {
+ logComponentVerbosity: logLevel,
+ replBatchLimitOperations: 100
+};
+const nodes = rst.startSet({setParameter: startParams});
+let restoreNode = nodes[1];
+rst.initiateWithHighElectionTimeout();
+const primary = rst.getPrimary();
+const db = primary.getDB(dbName);
+const collName = "testcoll";
+const sentinelCollName = "sentinelcoll";
+const coll = db[collName];
+const paddingStr = "XXXXXXXXX";
+
+// Pre-load some documents.
+const nPreDocs = 2;
+coll.insert([{_id: "pre1"}, {_id: "pre2"}]);
+rst.awaitReplication();
+
+const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime;
+
+// Keep the stable timestamp from moving on the node we're going to restart in restore mode.
+assert.commandWorked(restoreNode.adminCommand({
+ configureFailPoint: 'holdStableTimestampAtSpecificTimestamp',
+ mode: 'alwaysOn',
+ data: {"timestamp": holdOpTime}
+}));
+
+// Insert a bunch of documents.
+let bulk = coll.initializeUnorderedBulkOp();
+const nDocs = 1000;
+jsTestLog("Inserting " + nDocs + " documents with snapshotting disabled on one node.");
+for (let id = 1; id <= nDocs; id++) {
+ bulk.insert({_id: id, paddingStr: paddingStr});
+}
+bulk.execute();
+rst.awaitReplication();
+
+jsTestLog("Stopping replication on secondaries to hold back majority commit point.");
+let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer');
+let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer');
+
+jsTestLog("Writing first sentinel document.");
+const sentinel1Timestamp =
+ assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s1"}]}))
+ .operationTime;
+
+const nExtraDocs = 50;
+jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back.");
+bulk = coll.initializeUnorderedBulkOp();
+for (let id = 1; id <= nExtraDocs; id++) {
+ bulk.insert({_id: (id + nDocs), paddingStr: paddingStr});
+}
+bulk.execute();
+const lastId = nDocs + nExtraDocs;
+
+const penultimateOpTime =
+ assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime;
+
+const sentinel2Timestamp =
+ assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s2"}]}))
+ .operationTime;
+
+rst.awaitReplication(undefined, undefined, [restoreNode]);
+
+jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag");
+restoreNode = rst.restart(restoreNode, {
+ noReplSet: true,
+ setParameter: Object.merge(startParams, {
+ startupRecoveryForRestore: true,
+ recoverFromOplogAsStandalone: true,
+ takeUnstableCheckpointOnShutdown: true
+ })
+});
+// Make sure we can read something after standalone recovery.
+assert.eq(2, restoreNode.getDB(dbName)[sentinelCollName].find({}).itcount());
+
+jsTestLog("Restarting restore node again, in repl set mode");
+restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams});
+
+rst.awaitSecondaryNodes(undefined, [restoreNode]);
+jsTestLog("Finished restarting restore node");
+
+const restoreDb = restoreNode.getDB(dbName);
+
+jsTestLog("Checking restore node untimestamped read.");
+// Basic test: should see all docs with untimestamped read.
+assert.eq(nPreDocs + nDocs + nExtraDocs, coll.find().itcount());
+assert.eq(nPreDocs + nDocs + nExtraDocs, restoreDb[collName].find().itcount());
+
+// For the remaining checks we step up the restored node so we can do atClusterTime reads on it.
+// They are necessarily speculative because we are preventing majority optimes from advancing.
+
+jsTestLog("Stepping up restore node");
+rst.stepUp(restoreNode, {awaitReplicationBeforeStepUp: false});
+
+// Should also be able to read at the final sentinel optime on restore node.
+const restoreNodeSession = restoreNode.startSession({causalConsistency: false});
+restoreNodeSession.startTransaction(
+ {readConcern: {level: "snapshot", atClusterTime: sentinel2Timestamp}});
+const restoreNodeSessionDb = restoreNodeSession.getDatabase(dbName);
+jsTestLog("Checking top-of-oplog read works on restored node.");
+
+let res = assert.commandWorked(
+ restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}}));
+assert.eq(1, res.cursor.firstBatch.length);
+assert.docEq({_id: lastId, paddingStr: paddingStr}, res.cursor.firstBatch[0]);
+
+// Must abort because majority is not advancing.
+restoreNodeSession.abortTransaction();
+
+// Should NOT able to read at the first sentinel optime on the restore node.
+restoreNodeSession.startTransaction(
+ {readConcern: {level: "snapshot", atClusterTime: sentinel1Timestamp}});
+jsTestLog(
+ "Checking restore node majority optime read, which should fail, because the restore node does not have that history.");
+res = assert.commandFailedWithCode(
+ restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": {"$gte": nDocs}}}),
+ ErrorCodes.SnapshotTooOld);
+restoreNodeSession.abortTransaction();
+
+// Should NOT able to read at the penultimate optime on the restore node either.
+jsTestLog(
+ "Checking restore node top-of-oplog minus 1 read, which should fail, because the restore node does not have that history.");
+restoreNodeSession.startTransaction(
+ {readConcern: {level: "snapshot", atClusterTime: penultimateOpTime}});
+res = assert.commandFailedWithCode(
+ restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}}),
+ ErrorCodes.SnapshotTooOld);
+restoreNodeSession.abortTransaction();
+
+// Allow set to become current and shut down with ordinary dbHash verification.
+stopReplProducer2.off();
+stopReplProducer3.off();
+rst.stopSet();
+})();
diff --git a/jstests/replsets/startup_recovery_for_restore_needs_rollback.js b/jstests/replsets/startup_recovery_for_restore_needs_rollback.js
new file mode 100644
index 00000000000..88ae489d625
--- /dev/null
+++ b/jstests/replsets/startup_recovery_for_restore_needs_rollback.js
@@ -0,0 +1,135 @@
+/*
+ * Tests that if we recover from a node with a lagged stable timestamp using the special
+ * "for restore" mode, and there was a rollback within the recovered oplog, that we crash rather
+ * than attempt to use the node.
+ *
+ * This test only makes sense for storage engines that support recover to stable timestamp.
+ * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication,
+ * requires_majority_read_concern, uses_transactions, uses_prepare_transaction,
+ * # We don't expect to do this while upgrading.
+ * multiversion_incompatible]
+ */
+
+(function() {
+"use strict";
+load("jstests/libs/fail_point_util.js");
+
+const dbName = TestData.testName;
+const logLevel = tojson({storage: {recovery: 2}});
+
+// The restore node is made non-voting so the majority is 2.
+// Disable primary catch up since we want to force a rollback.
+const rst = new ReplSetTest({
+ nodes: [{}, {rsConfig: {votes: 0, priority: 0}}, {}, {}],
+ settings: {catchUpTimeoutMillis: 0, chainingAllowed: false}
+});
+
+const startParams = {
+ logComponentVerbosity: logLevel,
+ replBatchLimitOperations: 100
+};
+const nodes = rst.startSet({setParameter: startParams});
+let restoreNode = nodes[1];
+rst.initiateWithHighElectionTimeout();
+const primary = rst.getPrimary();
+const db = primary.getDB(dbName);
+const collName = "testcoll";
+const sentinelCollName = "sentinelcoll";
+const coll = db[collName];
+const sentinelColl = db[sentinelCollName];
+const paddingStr = "XXXXXXXXX";
+
+// Pre-load some documents.
+coll.insert([{_id: "pre1"}, {_id: "pre2"}]);
+rst.awaitReplication();
+
+const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime;
+
+// Keep the stable timestamp from moving on the node we're going to restart in restore mode.
+assert.commandWorked(restoreNode.adminCommand({
+ configureFailPoint: 'holdStableTimestampAtSpecificTimestamp',
+ mode: 'alwaysOn',
+ data: {"timestamp": holdOpTime}
+}));
+
+// Insert a bunch of documents.
+let bulk = coll.initializeUnorderedBulkOp();
+const nDocs = 1000;
+jsTestLog("Inserting " + nDocs + " documents with snapshotting disabled on one node.");
+for (let id = 1; id <= nDocs; id++) {
+ bulk.insert({_id: id, paddingStr: paddingStr});
+}
+bulk.execute();
+rst.awaitReplication();
+
+jsTestLog("Stopping replication on secondaries to hold back majority commit point.");
+let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer');
+let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer');
+
+const nExtraDocs = 50;
+jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back.");
+bulk = coll.initializeUnorderedBulkOp();
+const lastId = nDocs + nExtraDocs;
+for (let id = 1; id <= nExtraDocs; id++) {
+ bulk.insert({_id: (id + nDocs), paddingStr: paddingStr});
+}
+bulk.execute();
+rst.awaitReplication(undefined, undefined, [restoreNode]);
+
+// Stop some nodes so we can force a rollback
+rst.stop(primary);
+rst.stop(restoreNode, undefined, undefined, {forRestart: true});
+
+// Restart replication and step up a new primary.
+stopReplProducer2.off();
+stopReplProducer3.off();
+
+const newPrimary = nodes[2];
+// Must send stepUp command without using ReplSetTest helper, as ReplSetTest helper expects all
+// nodes to be alive.
+assert.commandWorked(newPrimary.adminCommand({replSetStepUp: 1}));
+rst.awaitNodesAgreeOnPrimary(undefined, [nodes[2], nodes[3]]);
+assert.soon(() => (rst.getPrimary() == newPrimary));
+
+// Write some stuff to force a rollback
+assert.commandWorked(newPrimary.getDB(dbName)[collName].insert({_id: "ForceRollback"}));
+rst.awaitReplication(undefined, undefined, [nodes[3]]);
+
+// Bring the new node up in startupRecoveryForRestore mode. Since it can't see the set, this
+// should succeed.
+
+jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag");
+clearRawMongoProgramOutput();
+restoreNode = rst.start(
+ restoreNode,
+ {
+ noReplSet: true,
+ setParameter: Object.merge(startParams, {
+ startupRecoveryForRestore: true,
+ recoverFromOplogAsStandalone: true,
+ takeUnstableCheckpointOnShutdown: true,
+ 'failpoint.hangAfterCollectionInserts':
+ tojson({mode: 'alwaysOn', data: {collectionNS: sentinelColl.getFullName()}}),
+
+ })
+ },
+ true /* restart */);
+// Make sure we can read the last doc after standalone recovery.
+assert.docEq({_id: lastId, paddingStr: paddingStr},
+ restoreNode.getDB(dbName)[collName].findOne({_id: lastId}));
+
+clearRawMongoProgramOutput();
+jsTestLog("Restarting restore node again, in repl set mode");
+restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams});
+
+// This node should not come back up, because it has no stable timestamp to recover to.
+assert.soon(() => (rawMongoProgramOutput().search("UnrecoverableRollbackError") >= 0));
+// Hide the exit code from stopSet.
+waitMongoProgram(parseInt(restoreNode.port));
+
+// Remove the nodes which are down.
+rst.remove(primary);
+rst.remove(restoreNode);
+// Shut down the set.
+rst.stopSet();
+})();
diff --git a/jstests/replsets/startup_recovery_for_restore_restarts.js b/jstests/replsets/startup_recovery_for_restore_restarts.js
new file mode 100644
index 00000000000..be2e3515812
--- /dev/null
+++ b/jstests/replsets/startup_recovery_for_restore_restarts.js
@@ -0,0 +1,180 @@
+/*
+ * Tests that we can recover from a node with a lagged stable timestamp using the special
+ * "for restore" mode, but not read from older points-in-time on the recovered node, and that
+ * we can do so even after we crash in the middle of an attempt to restore.
+ *
+ * This test only makes sense for storage engines that support recover to stable timestamp.
+ * @tags: [requires_wiredtiger, requires_persistence, requires_journaling, requires_replication,
+ * requires_majority_read_concern, uses_transactions, uses_prepare_transaction,
+ * # We don't expect to do this while upgrading.
+ * multiversion_incompatible]
+ */
+
+(function() {
+"use strict";
+load("jstests/libs/fail_point_util.js");
+const SIGKILL = 9;
+
+const dbName = TestData.testName;
+const logLevel = tojson({storage: {recovery: 2}});
+
+const rst = new ReplSetTest({
+ nodes: [{}, {}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
+ settings: {chainingAllowed: false}
+});
+
+const startParams = {
+ logComponentVerbosity: logLevel,
+ replBatchLimitOperations: 100
+};
+const nodes = rst.startSet({setParameter: startParams});
+let restoreNode = nodes[1];
+rst.initiateWithHighElectionTimeout();
+const primary = rst.getPrimary();
+const db = primary.getDB(dbName);
+const collName = "testcoll";
+const sentinelCollName = "sentinelcoll";
+const coll = db[collName];
+const sentinelColl = db[sentinelCollName];
+const paddingStr = "XXXXXXXXX";
+
+// Pre-load some documents.
+
+const nDocs = 100;
+let bulk = coll.initializeUnorderedBulkOp();
+for (let id = 1; id <= nDocs; id++) {
+ bulk.insert({_id: id, paddingStr: paddingStr});
+}
+bulk.execute();
+rst.awaitReplication();
+
+const holdOpTime = assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime;
+
+// Keep the stable timestamp from moving on the node we're going to restart in restore mode.
+assert.commandWorked(restoreNode.adminCommand({
+ configureFailPoint: 'holdStableTimestampAtSpecificTimestamp',
+ mode: 'alwaysOn',
+ data: {"timestamp": holdOpTime}
+}));
+
+// Do a bunch of updates, which are chosen so if one is missed, we'll know.
+bulk = coll.initializeUnorderedBulkOp();
+const nUpdates = 1000;
+const writeSentinelsAfter = 650; // This should be set to get us to the middle of a batch.
+jsTestLog("Making " + nUpdates + " updates with snapshotting disabled on one node.");
+for (let updateNo = 1; updateNo <= nUpdates; updateNo++) {
+ let id = (updateNo - 1) % nDocs + 1;
+ let updateField = "u" + updateNo;
+ let setDoc = {};
+ setDoc[updateField] = updateNo;
+ bulk.find({_id: id}).updateOne({"$set": setDoc});
+ if (updateNo == writeSentinelsAfter) {
+ // Write a bunch of inserts to the sentinel collection, which will be used to hang
+ // oplog application mid-batch.
+ bulk.execute();
+ bulk = sentinelColl.initializeUnorderedBulkOp();
+ for (let j = 1; j <= 100; j++) {
+ bulk.insert({_id: j});
+ }
+ bulk.execute();
+ bulk = coll.initializeUnorderedBulkOp();
+ }
+}
+bulk.execute();
+rst.awaitReplication();
+
+jsTestLog("Stopping replication on secondaries to hold back majority commit point.");
+let stopReplProducer2 = configureFailPoint(nodes[2], 'stopReplProducer');
+let stopReplProducer3 = configureFailPoint(nodes[3], 'stopReplProducer');
+
+const nExtraDocs = 50;
+jsTestLog("Inserting " + nExtraDocs + " documents with majority point held back.");
+bulk = coll.initializeUnorderedBulkOp();
+const lastId = nDocs + nExtraDocs;
+for (let id = 1; id <= nExtraDocs; id++) {
+ bulk.insert({_id: (id + nDocs), paddingStr: paddingStr});
+}
+bulk.execute();
+
+const penultimateOpTime =
+ assert.commandWorked(db.runCommand({find: collName, limit: 1})).operationTime;
+
+const sentinel2Timestamp =
+ assert.commandWorked(db.runCommand({insert: sentinelCollName, documents: [{_id: "s2"}]}))
+ .operationTime;
+
+rst.awaitReplication(undefined, undefined, [restoreNode]);
+
+jsTestLog("Restarting restore node with the --startupRecoveryForRestore flag");
+// Must use stop/start with waitForConnect: false. See SERVER-56446
+rst.stop(restoreNode, undefined, undefined, {forRestart: true});
+clearRawMongoProgramOutput();
+rst.start(restoreNode,
+ {
+ noReplSet: true,
+ waitForConnect: false,
+ syncdelay: 1, // Take a lot of unstable checkpoints.
+ setParameter: Object.merge(startParams, {
+ startupRecoveryForRestore: true,
+ recoverFromOplogAsStandalone: true,
+ takeUnstableCheckpointOnShutdown: true,
+ 'failpoint.hangAfterCollectionInserts':
+ tojson({mode: 'alwaysOn', data: {collectionNS: sentinelColl.getFullName()}}),
+
+ })
+ },
+ true /* restart */);
+assert.soon(() => { // Can't use checklog because we can't connect to the mongo in startup mode.
+ return rawMongoProgramOutput().search("hangAfterCollectionInserts fail point enabled") !== -1;
+});
+assert.soon(() => {
+ return rawMongoProgramOutput().search("Completed unstable checkpoint.") !== -1;
+});
+
+jsTestLog("Restarting restore node uncleanly");
+rst.stop(restoreNode, SIGKILL, {allowedExitCode: MongoRunner.EXIT_SIGKILL}, {forRestart: true});
+restoreNode = rst.start(restoreNode,
+ {
+ noReplSet: true,
+ setParameter: Object.merge(startParams, {
+ startupRecoveryForRestore: true,
+ recoverFromOplogAsStandalone: true,
+ takeUnstableCheckpointOnShutdown: true
+ })
+ },
+ true /* restart */);
+// Make sure we can read something after standalone recovery.
+assert.eq(1, restoreNode.getDB(dbName)[sentinelCollName].find({}).limit(1).itcount());
+
+jsTestLog("Restarting restore node again, in repl set mode");
+restoreNode = rst.restart(restoreNode, {noReplSet: false, setParameter: startParams});
+
+rst.awaitSecondaryNodes(undefined, [restoreNode]);
+jsTestLog("Finished restarting restore node");
+
+// For the timestamp check we step up the restored node so we can do atClusterTime reads on it.
+// They are necessarily speculative because we are preventing majority optimes from advancing.
+
+jsTestLog("Stepping up restore node");
+rst.stepUp(restoreNode, {awaitReplicationBeforeStepUp: false});
+
+// Should NOT able to read at the penultimate optime on the restore node.
+const restoreNodeSession = restoreNode.startSession({causalConsistency: false});
+const restoreNodeSessionDb = restoreNodeSession.getDatabase(dbName);
+jsTestLog(
+ "Checking restore node top-of-oplog minus 1 read, which should fail, because the restore node does not have that history.");
+restoreNodeSession.startTransaction(
+ {readConcern: {level: "snapshot", atClusterTime: penultimateOpTime}});
+assert.commandFailedWithCode(
+ restoreNodeSessionDb.runCommand({find: collName, filter: {"_id": lastId}}),
+ ErrorCodes.SnapshotTooOld);
+restoreNodeSession.abortTransaction();
+
+// Should see that very last document.
+assert.docEq({_id: "s2"}, restoreNode.getDB(dbName)[sentinelCollName].findOne({_id: "s2"}));
+
+// Allow set to become current and shut down with ordinary dbHash verification.
+stopReplProducer2.off();
+stopReplProducer3.off();
+rst.stopSet();
+})();
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index ee8b253fd70..198d49d6df1 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -634,6 +634,10 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
str::stream()
<< "Cannot take an unstable checkpoint on shutdown while using queryableBackupMode",
!gTakeUnstableCheckpointOnShutdown);
+ uassert(5576603,
+ str::stream() << "Cannot specify both queryableBackupMode and "
+ << "startupRecoveryForRestore at the same time",
+ !repl::startupRecoveryForRestore);
auto replCoord = repl::ReplicationCoordinator::get(startupOpCtx.get());
invariant(replCoord);
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c7c8e29894a..a7990f383d7 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -295,6 +295,7 @@ env.Library(
'oplog',
'oplog_application',
'oplog_interface_local',
+ 'repl_server_parameters',
],
)
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 4fd7cd85015..050c2ed6e2c 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -492,6 +492,17 @@ server_parameters:
cpp_varname: enableDefaultWriteConcernUpdatesForInitiate
default: false
+ startupRecoveryForRestore:
+ description: >-
+ When set, do startup recovery in such a way that the history of the recovered
+ operations is not preserved. At the end of startup recovery, snapshot reads before
+ the recovered top of oplog will not be possible. Reduces cache pressure when
+ recovering many oplog entries, as when restoring from backup in some scenarios.
+ set_at: startup
+ cpp_vartype: bool
+ cpp_varname: startupRecoveryForRestore
+ default: false
+
feature_flags:
featureFlagTenantMigrations:
description: >-
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 675800c3cad..c9d4aa3c381 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/oplog_applier_impl.h"
#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/oplog_interface_local.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/transaction_oplog_application.h"
@@ -160,8 +161,8 @@ public:
40293, "Couldn't find any entries in the oplog, which should be impossible", attrs);
}
- auto firstTimestampFound =
- fassert(40291, OpTime::parseFromOplogEntry(_cursor->nextSafe())).getTimestamp();
+ _opTimeAtStartPoint = fassert(40291, OpTime::parseFromOplogEntry(_cursor->nextSafe()));
+ const auto firstTimestampFound = _opTimeAtStartPoint.getTimestamp();
if (firstTimestampFound != _oplogApplicationStartPoint) {
LOGV2_FATAL_NOTRACE(
40292,
@@ -215,6 +216,10 @@ public:
MONGO_UNREACHABLE;
}
+ OpTime getOpTimeAtStartPoint() {
+ return _opTimeAtStartPoint;
+ }
+
private:
enum class Mode { kPeek, kPop };
bool _peekOrPop(Value* value, Mode mode) {
@@ -228,6 +233,7 @@ private:
const Timestamp _oplogApplicationStartPoint;
const boost::optional<Timestamp> _oplogApplicationEndPoint;
+ OpTime _opTimeAtStartPoint;
std::unique_ptr<DBDirectClient> _client;
std::unique_ptr<DBClientCursor> _cursor;
};
@@ -315,7 +321,14 @@ void ReplicationRecoveryImpl::recoverFromOplogAsStandalone(OperationContext* opC
// Initialize the cached pointer to the oplog collection.
acquireOplogCollectionForLogging(opCtx);
- if (recoveryTS) {
+ if (recoveryTS || startupRecoveryForRestore) {
+ if (startupRecoveryForRestore && !recoveryTS) {
+ LOGV2_WARNING(5576601,
+ "Replication startup parameter 'startupRecoveryForRestore' is set and "
+ "recovering from an unstable checkpoint. Assuming this is a resume of "
+ "an earlier attempt to recover for restore.");
+ }
+
// We pass in "none" for the stable timestamp so that recoverFromOplog asks storage
// for the recoveryTimestamp just like on replica set recovery.
const auto stableTimestamp = boost::none;
@@ -387,7 +400,8 @@ void ReplicationRecoveryImpl::recoverFromOplogUpTo(OperationContext* opCtx, Time
<< endPoint.toString() << "' in the oplog.");
}
- Timestamp appliedUpTo = _applyOplogOperations(opCtx, *startPoint, endPoint);
+ Timestamp appliedUpTo = _applyOplogOperations(
+ opCtx, *startPoint, endPoint, RecoveryMode::kStartupFromStableTimestamp);
if (appliedUpTo.isNull()) {
LOGV2(21541,
"No stored oplog entries to apply for recovery between {startPoint} (inclusive) and "
@@ -425,6 +439,7 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx,
// recovery timestamp. If the storage engine returns a timestamp, we recover from that point.
// However, if the storage engine returns "none", the storage engine does not have a stable
// checkpoint and we must recover from an unstable checkpoint instead.
+ bool isRollbackRecovery = stableTimestamp != boost::none;
const bool supportsRecoveryTimestamp =
_storageInterface->supportsRecoveryTimestamp(opCtx->getServiceContext());
if (!stableTimestamp && supportsRecoveryTimestamp) {
@@ -447,7 +462,9 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx,
if (stableTimestamp) {
invariant(supportsRecoveryTimestamp);
- _recoverFromStableTimestamp(opCtx, *stableTimestamp, topOfOplog);
+ const auto recoveryMode = isRollbackRecovery ? RecoveryMode::kRollbackFromStableTimestamp
+ : RecoveryMode::kStartupFromStableTimestamp;
+ _recoverFromStableTimestamp(opCtx, *stableTimestamp, topOfOplog, recoveryMode);
} else {
_recoverFromUnstableCheckpoint(
opCtx, _consistencyMarkers->getAppliedThrough(opCtx), topOfOplog);
@@ -462,7 +479,8 @@ void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx,
void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCtx,
Timestamp stableTimestamp,
- OpTime topOfOplog) {
+ OpTime topOfOplog,
+ RecoveryMode recoveryMode) {
invariant(!stableTimestamp.isNull());
invariant(!topOfOplog.isNull());
@@ -478,7 +496,22 @@ void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCt
"Starting recovery oplog application at the stable timestamp: {stableTimestamp}",
"Starting recovery oplog application at the stable timestamp",
"stableTimestamp"_attr = stableTimestamp);
- _applyToEndOfOplog(opCtx, stableTimestamp, topOfOplog.getTimestamp());
+
+ if (recoveryMode == RecoveryMode::kStartupFromStableTimestamp && startupRecoveryForRestore) {
+ LOGV2_WARNING(5576600,
+ "Replication startup parameter 'startupRecoveryForRestore' is set, "
+ "recovering without preserving history before top of oplog.");
+ // Take only unstable checkpoints during the recovery process.
+ _storageInterface->setInitialDataTimestamp(opCtx->getServiceContext(),
+ Timestamp::kAllowUnstableCheckpointsSentinel);
+ // Allow "oldest" timestamp to move forward freely.
+ _storageInterface->setStableTimestamp(opCtx->getServiceContext(), Timestamp::min());
+ }
+ _applyToEndOfOplog(opCtx, stableTimestamp, topOfOplog.getTimestamp(), recoveryMode);
+ if (recoveryMode == RecoveryMode::kStartupFromStableTimestamp && startupRecoveryForRestore) {
+ _storageInterface->setInitialDataTimestamp(opCtx->getServiceContext(),
+ topOfOplog.getTimestamp());
+ }
}
void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* opCtx,
@@ -520,7 +553,17 @@ void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* o
opCtx->getServiceContext()->getStorageEngine()->setOldestTimestamp(
appliedThrough.getTimestamp());
- _applyToEndOfOplog(opCtx, appliedThrough.getTimestamp(), topOfOplog.getTimestamp());
+ if (startupRecoveryForRestore) {
+ // When we're recovering for a restore, we may be recovering a large number of oplog
+ // entries, so we want to take unstable checkpoints to reduce cache pressure and allow
+ // resumption in case of a crash.
+ _storageInterface->setInitialDataTimestamp(
+ opCtx->getServiceContext(), Timestamp::kAllowUnstableCheckpointsSentinel);
+ }
+ _applyToEndOfOplog(opCtx,
+ appliedThrough.getTimestamp(),
+ topOfOplog.getTimestamp(),
+ RecoveryMode::kStartupFromUnstableCheckpoint);
}
// `_recoverFromUnstableCheckpoint` is only expected to be called on startup.
@@ -548,7 +591,8 @@ void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* o
void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
const Timestamp& oplogApplicationStartPoint,
- const Timestamp& topOfOplog) {
+ const Timestamp& topOfOplog,
+ const RecoveryMode recoveryMode) {
invariant(!oplogApplicationStartPoint.isNull());
invariant(!topOfOplog.isNull());
@@ -567,7 +611,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
"topOfOplog"_attr = topOfOplog.toBSON());
}
- Timestamp appliedUpTo = _applyOplogOperations(opCtx, oplogApplicationStartPoint, topOfOplog);
+ Timestamp appliedUpTo =
+ _applyOplogOperations(opCtx, oplogApplicationStartPoint, topOfOplog, recoveryMode);
invariant(!appliedUpTo.isNull());
invariant(appliedUpTo == topOfOplog,
str::stream() << "Did not apply to top of oplog. Applied through: "
@@ -577,7 +622,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx,
const Timestamp& startPoint,
- const Timestamp& endPoint) {
+ const Timestamp& endPoint,
+ RecoveryMode recoveryMode) {
// The oplog buffer will fetch all entries >= the startPoint timestamp, but it skips the first
// op on startup, which is why the startPoint is described as "exclusive".
LOGV2(21550,
@@ -592,10 +638,11 @@ Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx
RecoveryOplogApplierStats stats;
auto writerPool = makeReplWriterPool();
+ auto* replCoord = ReplicationCoordinator::get(opCtx);
OplogApplierImpl oplogApplier(nullptr,
&oplogBuffer,
&stats,
- ReplicationCoordinator::get(opCtx),
+ replCoord,
_consistencyMarkers,
_storageInterface,
OplogApplier::Options(OplogApplication::Mode::kRecovering),
@@ -605,11 +652,36 @@ Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx
batchLimits.bytes = getBatchLimitOplogBytes(opCtx, _storageInterface);
batchLimits.ops = getBatchLimitOplogEntries();
+ // If we're doing unstable checkpoints during the recovery process (as we do during the special
+ // startupRecoveryForRestore mode), we need to advance the consistency marker for each batch so
+ // the next time we recover we won't start all the way over. Further, we can advance the oldest
+ // timestamp to avoid keeping too much history.
+ //
+ // If we're recovering from a stable checkpoint (except the special startupRecoveryForRestore
+ // mode, which discards history before the top of oplog), we aren't doing new checkpoints during
+ // recovery so there is no point in advancing the consistency marker and we cannot advance
+ // "oldest" becaue it would be later than "stable".
+ const bool advanceTimestampsEachBatch = startupRecoveryForRestore &&
+ (recoveryMode == RecoveryMode::kStartupFromStableTimestamp ||
+ recoveryMode == RecoveryMode::kStartupFromUnstableCheckpoint);
+
OpTime applyThroughOpTime;
std::vector<OplogEntry> batch;
while (
!(batch = fassert(50763, oplogApplier.getNextApplierBatch(opCtx, batchLimits))).empty()) {
+ if (advanceTimestampsEachBatch && applyThroughOpTime.isNull()) {
+ // We must set appliedThrough before applying anything at all, so we know
+ // any unstable checkpoints we take are "dirty". A null appliedThrough indicates
+ // a clean shutdown which may not be the case if we had started applying a batch.
+ _consistencyMarkers->setAppliedThrough(opCtx, oplogBuffer.getOpTimeAtStartPoint());
+ }
applyThroughOpTime = uassertStatusOK(oplogApplier.applyOplogBatch(opCtx, std::move(batch)));
+ if (advanceTimestampsEachBatch) {
+ invariant(!applyThroughOpTime.isNull());
+ _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime);
+ replCoord->getServiceContext()->getStorageEngine()->setOldestTimestamp(
+ applyThroughOpTime.getTimestamp());
+ }
}
stats.complete(applyThroughOpTime);
invariant(oplogBuffer.isEmpty(),
@@ -627,9 +699,11 @@ Timestamp ReplicationRecoveryImpl::_applyOplogOperations(OperationContext* opCtx
// to that checkpoint at a replication consistent point, and applying the oplog is safe.
// If we don't have a stable checkpoint, then we must be in startup recovery, and not rollback
// recovery, because we only roll back to a stable timestamp when we have a stable checkpoint.
- // Startup recovery from an unstable checkpoint only ever applies a single batch and it is safe
- // to replay the batch from any point.
- _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime);
+ // It is safe to do startup recovery from an unstable checkpoint provided we recover to the
+ // end of the oplog and discard history before it, as _recoverFromUnstableCheckpoint does.
+ if (!advanceTimestampsEachBatch) {
+ _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime);
+ }
return applyThroughOpTime.getTimestamp();
}
diff --git a/src/mongo/db/repl/replication_recovery.h b/src/mongo/db/repl/replication_recovery.h
index 320076e24a8..9439655e512 100644
--- a/src/mongo/db/repl/replication_recovery.h
+++ b/src/mongo/db/repl/replication_recovery.h
@@ -85,6 +85,14 @@ public:
void recoverFromOplogUpTo(OperationContext* opCtx, Timestamp endPoint) override;
private:
+ enum class RecoveryMode {
+ kStartupFromStableTimestamp,
+ kStartupFromUnstableCheckpoint,
+ kRollbackFromStableTimestamp,
+ // There is no RecoveryMode::kRollbackFromUnstableCheckpoint, rollback can only recover from
+ // a stable timestamp.
+ };
+
/**
* Confirms that the data and oplog all indicate that the nodes has an unstable checkpoint
* that is fully up to date.
@@ -97,7 +105,8 @@ private:
*/
void _recoverFromStableTimestamp(OperationContext* opCtx,
Timestamp stableTimestamp,
- OpTime topOfOplog);
+ OpTime topOfOplog,
+ RecoveryMode recoveryMode);
/**
* After truncating the oplog, completes recovery if we're recovering from an unstable
@@ -113,7 +122,8 @@ private:
*/
void _applyToEndOfOplog(OperationContext* opCtx,
const Timestamp& oplogApplicationStartPoint,
- const Timestamp& topOfOplog);
+ const Timestamp& topOfOplog,
+ RecoveryMode recoveryMode);
/**
* Applies all oplog entries from startPoint (exclusive) to endPoint (inclusive). Returns the
@@ -121,7 +131,8 @@ private:
*/
Timestamp _applyOplogOperations(OperationContext* opCtx,
const Timestamp& startPoint,
- const Timestamp& endPoint);
+ const Timestamp& endPoint,
+ RecoveryMode recoveryMode);
/**
* Gets the last applied OpTime from the end of the oplog. Returns CollectionIsEmpty if there is
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 8371d6e312a..cfc6bbccb50 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -1816,7 +1816,7 @@ void WiredTigerKVEngine::checkpoint() {
// Three cases:
//
// First, initialDataTimestamp is Timestamp(0, 1) -> Take full checkpoint. This is when
- // there is no consistent view of the data (i.e: during initial sync).
+ // there is no consistent view of the data (e.g: during initial sync).
//
// Second, stableTimestamp < initialDataTimestamp: Skip checkpoints. The data on disk is
// prone to being rolled back. Hold off on checkpoints. Hope that the stable timestamp
@@ -1828,6 +1828,10 @@ void WiredTigerKVEngine::checkpoint() {
UniqueWiredTigerSession session = _sessionCache->getSession();
WT_SESSION* s = session->getSession();
invariantWTOK(s->checkpoint(s, "use_timestamp=false"));
+ LOGV2_FOR_RECOVERY(5576602,
+ 2,
+ "Completed unstable checkpoint.",
+ "initialDataTimestamp"_attr = initialDataTimestamp.toString());
} else if (stableTimestamp < initialDataTimestamp) {
LOGV2_FOR_RECOVERY(
23985,