summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2022-09-07 14:23:22 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-17 22:17:12 +0000
commitc8a222bd7cddb738dac52b83bed55d2bddf9363b (patch)
tree42dbdcab90818101e88fc378d2f89e8f15cc3800
parent06d618d3d6dfa4423b574e029fed8b84779a45d1 (diff)
downloadmongo-c8a222bd7cddb738dac52b83bed55d2bddf9363b.tar.gz
SERVER-69001: Have initial sync persist its last oplog time into the minvalid document.
(cherry picked from commit ff2fffdf496ac1bc039cd8c84024cc6159cf80b6)
-rw-r--r--buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py100
-rw-r--r--jstests/replsets/initial_sync1.js27
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp44
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp1
4 files changed, 59 insertions, 113 deletions
diff --git a/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py b/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py
index f85f44a1c29..d896fe0e00d 100644
--- a/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py
+++ b/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py
@@ -266,16 +266,15 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase):
secondary.await_ready()
client = secondary.mongo_client()
- minvalid_doc = client.local["replset.minvalid"].find_one()
oplog_truncate_after_doc = client.local["replset.oplogTruncateAfterPoint"].find_one()
recovery_timestamp_res = client.admin.command("replSetTest",
getLastStableRecoveryTimestamp=True)
latest_oplog_doc = client.local["oplog.rs"].find_one(sort=[("$natural",
pymongo.DESCENDING)])
- self.logger.info("Checking invariants: minValid: {}, oplogTruncateAfterPoint: {},"
+ self.logger.info("Checking replication invariants. oplogTruncateAfterPoint: {},"
" stable recovery timestamp: {}, latest oplog doc: {}".format(
- minvalid_doc, oplog_truncate_after_doc, recovery_timestamp_res,
+ oplog_truncate_after_doc, recovery_timestamp_res,
latest_oplog_doc))
null_ts = bson.Timestamp(0, 0)
@@ -289,13 +288,6 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase):
raise errors.ServerFailure(
"Latest oplog entry had no 'ts' field: {}".format(latest_oplog_doc))
- # The "oplogTruncateAfterPoint" document may not exist at startup. If so, we default
- # it to null.
- oplog_truncate_after_ts = null_ts
- if oplog_truncate_after_doc is not None:
- oplog_truncate_after_ts = oplog_truncate_after_doc.get(
- "oplogTruncateAfterPoint", null_ts)
-
# The "lastStableRecoveryTimestamp" field is present if the storage engine supports
# "recover to a timestamp". If it's a null timestamp on a durable storage engine, that
# means we do not yet have a stable checkpoint timestamp and must be restarting at the
@@ -318,94 +310,6 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase):
recovery_timestamp, latest_oplog_entry_ts,
recovery_timestamp_res, latest_oplog_doc))
- if minvalid_doc is not None:
- applied_through_ts = minvalid_doc.get("begin", {}).get("ts", null_ts)
- minvalid_ts = minvalid_doc.get("ts", null_ts)
-
- # The "appliedThrough" value should always equal the "last stable recovery
- # timestamp", AKA the stable checkpoint for durable engines, on server restart.
- #
- # The written "appliedThrough" time is updated with the latest timestamp at the end
- # of each batch application, and batch boundaries are the only valid stable
- # timestamps on secondaries. Therefore, a non-null appliedThrough timestamp must
- # equal the checkpoint timestamp, because any stable timestamp that the checkpoint
- # could use includes an equal persisted appliedThrough timestamp.
- if (recovery_timestamp != null_ts and applied_through_ts != null_ts
- and (not recovery_timestamp == applied_through_ts)):
- raise errors.ServerFailure(
- "The condition last stable recovery timestamp ({}) == appliedThrough ({})"
- " doesn't hold: minValid document={},"
- " getLastStableRecoveryTimestamp result={}, last oplog entry={}".format(
- recovery_timestamp, applied_through_ts, minvalid_doc,
- recovery_timestamp_res, latest_oplog_doc))
-
- if applied_through_ts == null_ts:
- # We clear "appliedThrough" to represent having applied through the top of the
- # oplog in PRIMARY state or immediately after "rollback via refetch".
- # If we are using a storage engine that supports "recover to a timestamp,"
- # then we will have a "last stable recovery timestamp" and we should use that
- # as our "appliedThrough" (similarly to why we assert their equality above).
- # If both are null, then we are in PRIMARY state on a storage engine that does
- # not support "recover to a timestamp" or in RECOVERING immediately after
- # "rollback via refetch". Since we do not update "minValid" in PRIMARY state,
- # we leave "appliedThrough" as null so that the invariants below hold, rather
- # than substituting the latest oplog entry for the "appliedThrough" value.
- applied_through_ts = recovery_timestamp
-
- if minvalid_ts == null_ts:
- # The server treats the "ts" field in the minValid document as missing when its
- # value is the null timestamp.
- minvalid_ts = applied_through_ts
-
- if latest_oplog_entry_ts == null_ts:
- # If the oplog is empty, we treat the "minValid" as the latest oplog entry.
- latest_oplog_entry_ts = minvalid_ts
-
- if oplog_truncate_after_ts == null_ts:
- # The server treats the "oplogTruncateAfterPoint" field as missing when its
- # value is the null timestamp. When it is null, the oplog is complete and
- # should not be truncated, so it is effectively the top of the oplog.
- oplog_truncate_after_ts = latest_oplog_entry_ts
-
- # Check the ordering invariants before the secondary has reconciled the end of
- # its oplog.
- # The "oplogTruncateAfterPoint" is set to the first timestamp of each batch of
- # oplog entries before they are written to the oplog. Thus, it can be ahead
- # of the top of the oplog before any oplog entries are written, and behind it
- # after some are written. Thus, we cannot compare it to the top of the oplog.
-
- # appliedThrough <= minValid
- # appliedThrough represents the end of the previous batch, so it is always the
- # earliest.
- if applied_through_ts > minvalid_ts:
- raise errors.ServerFailure(
- "The condition appliedThrough <= minValid ({} <= {}) doesn't hold: minValid"
- " document={}, latest oplog entry={}".format(
- applied_through_ts, minvalid_ts, minvalid_doc, latest_oplog_doc))
-
- # minValid <= oplogTruncateAfterPoint
- # This is true because this hook is never run after a rollback. Thus, we only
- # move "minValid" to the end of each batch after the batch is written to the oplog.
- # We reset the "oplogTruncateAfterPoint" to null before we move "minValid" from
- # the end of the previous batch to the end of the current batch. Thus "minValid"
- # must be less than or equal to the "oplogTruncateAfterPoint".
- if minvalid_ts > oplog_truncate_after_ts:
- raise errors.ServerFailure(
- "The condition minValid <= oplogTruncateAfterPoint ({} <= {}) doesn't"
- " hold: minValid document={}, oplogTruncateAfterPoint document={},"
- " latest oplog entry={}".format(minvalid_ts, oplog_truncate_after_ts,
- minvalid_doc, oplog_truncate_after_doc,
- latest_oplog_doc))
-
- # minvalid <= latest oplog entry
- # "minValid" is set to the end of a batch after the batch is written to the oplog.
- # Thus it is always less than or equal to the top of the oplog.
- if minvalid_ts > latest_oplog_entry_ts:
- raise errors.ServerFailure(
- "The condition minValid <= top of oplog ({} <= {}) doesn't"
- " hold: minValid document={}, latest oplog entry={}".format(
- minvalid_ts, latest_oplog_entry_ts, minvalid_doc, latest_oplog_doc))
-
try:
secondary.teardown()
except errors.ServerFailure as exc:
diff --git a/jstests/replsets/initial_sync1.js b/jstests/replsets/initial_sync1.js
index 92ace5d0252..feeeb9b4bcc 100644
--- a/jstests/replsets/initial_sync1.js
+++ b/jstests/replsets/initial_sync1.js
@@ -51,7 +51,16 @@ admin_s1.runCommand({replSetFreeze: 999999});
print("6. Bring up #3");
var hostname = getHostName();
-var slave2 = MongoRunner.runMongod(Object.merge({replSet: basename, oplogSize: 2}, x509_options2));
+var slave2 = MongoRunner.runMongod(Object.merge({
+ replSet: basename,
+ oplogSize: 2,
+ // Preserve the initial sync state to validate an assertion.
+ setParameter: {
+ "failpoint.skipClearInitialSyncState": tojson({mode: 'alwaysOn'}),
+ "failpoint.initialSyncHangBeforeCompletingOplogFetching": tojson({mode: 'alwaysOn'})
+ },
+},
+ x509_options2));
var local_s2 = slave2.getDB("local");
var admin_s2 = slave2.getDB("admin");
@@ -77,6 +86,17 @@ wait(function() {
return config2.version == config.version && (config3 && config3.version == config.version);
});
+assert.commandWorked(slave2.adminCommand({
+ waitForFailPoint: "initialSyncHangBeforeCompletingOplogFetching",
+ timesEntered: 1,
+ maxTimeMS: 60 * 1000,
+}));
+
+// Fetch the minValid document at the end of initial sync.
+let syncingNodeMinvalid = slave2.getDB("local").replset.minvalid.findOne()["ts"];
+assert.commandWorked(slave2.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeCompletingOplogFetching", mode: "off"}));
+
replTest.waitForState(slave2, [ReplSetTest.State.SECONDARY, ReplSetTest.State.RECOVERING]);
print("7. Kill the secondary in the middle of syncing");
@@ -102,5 +122,10 @@ assert.writeOK(bulk.execute());
print("11. Everyone happy eventually");
replTest.awaitReplication();
+// SERVER-69001: Assert that the last oplog for initial sync was persisted in the minvalid document.
+let lastInitialSyncOp =
+ slave2.adminCommand("replSetGetStatus")["initialSyncStatus"]["initialSyncOplogEnd"];
+assert.eq(lastInitialSyncOp, syncingNodeMinvalid);
+
MongoRunner.stopMongod(slave2);
replTest.stopSet();
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 2ba982d5d12..76e3959d7ad 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -114,6 +114,9 @@ MONGO_FAIL_POINT_DEFINE(failInitialSyncBeforeApplyingBatch);
// Failpoint which fasserts if applying a batch fails.
MONGO_FAIL_POINT_DEFINE(initialSyncFassertIfApplyingBatchFails);
+// Failpoint which causes the initial sync function to hang before stopping the oplog fetcher.
+MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCompletingOplogFetching);
+
// Failpoint which skips clearing _initialSyncState after a successful initial sync attempt.
MONGO_FAIL_POINT_DEFINE(skipClearInitialSyncState);
@@ -1185,22 +1188,30 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
OpTimeAndWallTime resultOpTimeAndWallTime = {OpTime(), Date_t()};
{
- stdx::lock_guard<Latch> lock(_mutex);
- auto status = _checkForShutdownAndConvertStatus_inlock(
- result.getStatus(), "error fetching last oplog entry for stop timestamp");
- if (!status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
- return;
- }
+ {
+ stdx::lock_guard<Latch> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(
+ result.getStatus(), "error fetching last oplog entry for stop timestamp");
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
+ return;
+ }
- auto&& optimeStatus = parseOpTimeAndWallTime(result);
- if (!optimeStatus.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock,
- optimeStatus.getStatus());
- return;
+ auto&& optimeStatus = parseOpTimeAndWallTime(result);
+ if (!optimeStatus.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock,
+ optimeStatus.getStatus());
+ return;
+ }
+ resultOpTimeAndWallTime = optimeStatus.getValue();
}
- resultOpTimeAndWallTime = optimeStatus.getValue();
+ // Release the _mutex to write to disk.
+ auto opCtx = makeOpCtx();
+ _replicationProcess->getConsistencyMarkers()->setMinValid(opCtx.get(),
+ resultOpTimeAndWallTime.opTime);
+
+ stdx::lock_guard<Latch> lock(_mutex);
_initialSyncState->stopTimestamp = resultOpTimeAndWallTime.opTime.getTimestamp();
// If the beginFetchingTimestamp is different from the stopTimestamp, it indicates that
@@ -1463,6 +1474,13 @@ void InitialSyncer::_rollbackCheckerCheckForRollbackCallback(
return;
}
+ if (MONGO_FAIL_POINT(initialSyncHangBeforeCompletingOplogFetching)) {
+ log() << "initial sync - initialSyncHangBeforeCompletingOplogFetching fail point "
+ "enabled. Blocking until fail point is disabled.";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(initialSyncHangBeforeCompletingOplogFetching);
+ }
+
+
// Update all unique indexes belonging to non-replicated collections on secondaries. See comment
// in ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage() for the explanation of
// why we do this.
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index a57b7e35ceb..113c5251e2e 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -198,7 +198,6 @@ void ReplicationConsistencyMarkersImpl::setMinValid(OperationContext* opCtx,
// This method is only used with storage engines that do not support recover to stable
// timestamp. As a result, their timestamps do not matter.
- invariant(!opCtx->getServiceContext()->getStorageEngine()->supportsRecoverToStableTimestamp());
update.timestamp = Timestamp();
_updateMinValidDocument(opCtx, update);