summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2022-09-07 14:23:22 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 19:50:40 +0000
commitff2fffdf496ac1bc039cd8c84024cc6159cf80b6 (patch)
treeba11c4a785f01c2c8692ba234e4745347fa25373
parent84f7412daaf4c44c4fd325230076101e0d95c05f (diff)
downloadmongo-ff2fffdf496ac1bc039cd8c84024cc6159cf80b6.tar.gz
SERVER-69001: Have initial sync persist its last oplog time into the minvalid document.
-rw-r--r--buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py100
-rw-r--r--jstests/replsets/initial_sync1.js15
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp96
3 files changed, 68 insertions, 143 deletions
diff --git a/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py b/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py
index 125649a0b2c..4cce5617645 100644
--- a/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py
+++ b/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py
@@ -274,16 +274,15 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase):
secondary.await_ready()
client = secondary.mongo_client()
- minvalid_doc = client.local["replset.minvalid"].find_one()
oplog_truncate_after_doc = client.local["replset.oplogTruncateAfterPoint"].find_one()
recovery_timestamp_res = client.admin.command("replSetTest",
getLastStableRecoveryTimestamp=True)
latest_oplog_doc = client.local["oplog.rs"].find_one(sort=[("$natural",
pymongo.DESCENDING)])
- self.logger.info("Checking invariants: minValid: {}, oplogTruncateAfterPoint: {},"
+ self.logger.info("Checking replication invariants. oplogTruncateAfterPoint: {},"
" stable recovery timestamp: {}, latest oplog doc: {}".format(
- minvalid_doc, oplog_truncate_after_doc, recovery_timestamp_res,
+ oplog_truncate_after_doc, recovery_timestamp_res,
latest_oplog_doc))
null_ts = bson.Timestamp(0, 0)
@@ -297,13 +296,6 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase):
raise errors.ServerFailure(
"Latest oplog entry had no 'ts' field: {}".format(latest_oplog_doc))
- # The "oplogTruncateAfterPoint" document may not exist at startup. If so, we default
- # it to null.
- oplog_truncate_after_ts = null_ts
- if oplog_truncate_after_doc is not None:
- oplog_truncate_after_ts = oplog_truncate_after_doc.get(
- "oplogTruncateAfterPoint", null_ts)
-
# The "lastStableRecoveryTimestamp" field is present if the storage engine supports
# "recover to a timestamp". If it's a null timestamp on a durable storage engine, that
# means we do not yet have a stable checkpoint timestamp and must be restarting at the
@@ -326,94 +318,6 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase):
recovery_timestamp, latest_oplog_entry_ts,
recovery_timestamp_res, latest_oplog_doc))
- if minvalid_doc is not None:
- applied_through_ts = minvalid_doc.get("begin", {}).get("ts", null_ts)
- minvalid_ts = minvalid_doc.get("ts", null_ts)
-
- # The "appliedThrough" value should always equal the "last stable recovery
- # timestamp", AKA the stable checkpoint for durable engines, on server restart.
- #
- # The written "appliedThrough" time is updated with the latest timestamp at the end
- # of each batch application, and batch boundaries are the only valid stable
- # timestamps on secondaries. Therefore, a non-null appliedThrough timestamp must
- # equal the checkpoint timestamp, because any stable timestamp that the checkpoint
- # could use includes an equal persisted appliedThrough timestamp.
- if (recovery_timestamp != null_ts and applied_through_ts != null_ts
- and (not recovery_timestamp == applied_through_ts)):
- raise errors.ServerFailure(
- "The condition last stable recovery timestamp ({}) == appliedThrough ({})"
- " doesn't hold: minValid document={},"
- " getLastStableRecoveryTimestamp result={}, last oplog entry={}".format(
- recovery_timestamp, applied_through_ts, minvalid_doc,
- recovery_timestamp_res, latest_oplog_doc))
-
- if applied_through_ts == null_ts:
- # We clear "appliedThrough" to represent having applied through the top of the
- # oplog in PRIMARY state or immediately after "rollback via refetch".
- # If we are using a storage engine that supports "recover to a timestamp,"
- # then we will have a "last stable recovery timestamp" and we should use that
- # as our "appliedThrough" (similarly to why we assert their equality above).
- # If both are null, then we are in PRIMARY state on a storage engine that does
- # not support "recover to a timestamp" or in RECOVERING immediately after
- # "rollback via refetch". Since we do not update "minValid" in PRIMARY state,
- # we leave "appliedThrough" as null so that the invariants below hold, rather
- # than substituting the latest oplog entry for the "appliedThrough" value.
- applied_through_ts = recovery_timestamp
-
- if minvalid_ts == null_ts:
- # The server treats the "ts" field in the minValid document as missing when its
- # value is the null timestamp.
- minvalid_ts = applied_through_ts
-
- if latest_oplog_entry_ts == null_ts:
- # If the oplog is empty, we treat the "minValid" as the latest oplog entry.
- latest_oplog_entry_ts = minvalid_ts
-
- if oplog_truncate_after_ts == null_ts:
- # The server treats the "oplogTruncateAfterPoint" field as missing when its
- # value is the null timestamp. When it is null, the oplog is complete and
- # should not be truncated, so it is effectively the top of the oplog.
- oplog_truncate_after_ts = latest_oplog_entry_ts
-
- # Check the ordering invariants before the secondary has reconciled the end of
- # its oplog.
- # The "oplogTruncateAfterPoint" is set to the first timestamp of each batch of
- # oplog entries before they are written to the oplog. Thus, it can be ahead
- # of the top of the oplog before any oplog entries are written, and behind it
- # after some are written. Thus, we cannot compare it to the top of the oplog.
-
- # appliedThrough <= minValid
- # appliedThrough represents the end of the previous batch, so it is always the
- # earliest.
- if applied_through_ts > minvalid_ts:
- raise errors.ServerFailure(
- "The condition appliedThrough <= minValid ({} <= {}) doesn't hold: minValid"
- " document={}, latest oplog entry={}".format(
- applied_through_ts, minvalid_ts, minvalid_doc, latest_oplog_doc))
-
- # minValid <= oplogTruncateAfterPoint
- # This is true because this hook is never run after a rollback. Thus, we only
- # move "minValid" to the end of each batch after the batch is written to the oplog.
- # We reset the "oplogTruncateAfterPoint" to null before we move "minValid" from
- # the end of the previous batch to the end of the current batch. Thus "minValid"
- # must be less than or equal to the "oplogTruncateAfterPoint".
- if minvalid_ts > oplog_truncate_after_ts:
- raise errors.ServerFailure(
- "The condition minValid <= oplogTruncateAfterPoint ({} <= {}) doesn't"
- " hold: minValid document={}, oplogTruncateAfterPoint document={},"
- " latest oplog entry={}".format(minvalid_ts, oplog_truncate_after_ts,
- minvalid_doc, oplog_truncate_after_doc,
- latest_oplog_doc))
-
- # minvalid <= latest oplog entry
- # "minValid" is set to the end of a batch after the batch is written to the oplog.
- # Thus it is always less than or equal to the top of the oplog.
- if minvalid_ts > latest_oplog_entry_ts:
- raise errors.ServerFailure(
- "The condition minValid <= top of oplog ({} <= {}) doesn't"
- " hold: minValid document={}, latest oplog entry={}".format(
- minvalid_ts, latest_oplog_entry_ts, minvalid_doc, latest_oplog_doc))
-
try:
secondary.teardown()
except errors.ServerFailure:
diff --git a/jstests/replsets/initial_sync1.js b/jstests/replsets/initial_sync1.js
index d66f960e132..98cc5d0e1e1 100644
--- a/jstests/replsets/initial_sync1.js
+++ b/jstests/replsets/initial_sync1.js
@@ -53,8 +53,13 @@ admin_s1.runCommand({replSetFreeze: 999999});
print("6. Bring up #3");
var hostname = getHostName();
-var secondary2 =
- MongoRunner.runMongod(Object.merge({replSet: basename, oplogSize: 2}, x509_options2));
+var secondary2 = MongoRunner.runMongod(Object.merge({
+ replSet: basename,
+ oplogSize: 2,
+ // Preserve the initial sync state to validate an assertion.
+ setParameter: {"failpoint.skipClearInitialSyncState": tojson({mode: 'alwaysOn'})}
+},
+ x509_options2));
var local_s2 = secondary2.getDB("local");
var admin_s2 = secondary2.getDB("admin");
@@ -108,5 +113,11 @@ assert.commandWorked(bulk.execute());
print("11. Everyone happy eventually");
replTest.awaitReplication();
+// SERVER-69001: Assert that the last oplog for initial sync was persisted in the minvalid document.
+let syncingNodeMinvalid = secondary2.getDB("local").replset.minvalid.findOne()["ts"];
+let lastInitialSyncOp =
+ secondary2.adminCommand("replSetGetStatus")["initialSyncStatus"]["initialSyncOplogEnd"];
+assert.eq(lastInitialSyncOp, syncingNodeMinvalid);
+
MongoRunner.stopMongod(secondary2);
replTest.stopSet();
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 4c28e28fd56..fae8ee042e0 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -1400,53 +1400,63 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
OpTimeAndWallTime resultOpTimeAndWallTime = {OpTime(), Date_t()};
{
- stdx::lock_guard<Latch> lock(_mutex);
- auto status = _checkForShutdownAndConvertStatus_inlock(
- result.getStatus(), "error fetching last oplog entry for stop timestamp");
- if (_shouldRetryError(lock, status)) {
- auto scheduleStatus =
- (*_attemptExec)
- ->scheduleWork([this,
- onCompletionGuard](executor::TaskExecutor::CallbackArgs args) {
- // It is not valid to schedule the retry from within this callback,
- // hence we schedule a lambda to schedule the retry.
- stdx::lock_guard<Latch> lock(_mutex);
- // Since the stopTimestamp is retrieved after we have done all the work of
- // retrieving collection data, we handle retries within this class by
- // retrying for 'initialSyncTransientErrorRetryPeriodSeconds' (default 24
- // hours). This is the same retry strategy used when retrieving collection
- // data, and avoids retrieving all the data and then throwing it away due to
- // a transient network outage.
- auto status = _scheduleLastOplogEntryFetcher_inlock(
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& status,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) {
- _lastOplogEntryFetcherCallbackForStopTimestamp(status,
- onCompletionGuard);
- },
- kInitialSyncerHandlesRetries);
- if (!status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
- }
- });
- if (scheduleStatus.isOK())
+ {
+ stdx::lock_guard<Latch> lock(_mutex);
+ auto status = _checkForShutdownAndConvertStatus_inlock(
+ result.getStatus(), "error fetching last oplog entry for stop timestamp");
+ if (_shouldRetryError(lock, status)) {
+ auto scheduleStatus =
+ (*_attemptExec)
+ ->scheduleWork(
+ [this, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) {
+ // It is not valid to schedule the retry from within this callback,
+ // hence we schedule a lambda to schedule the retry.
+ stdx::lock_guard<Latch> lock(_mutex);
+ // Since the stopTimestamp is retrieved after we have done all the
+ // work of retrieving collection data, we handle retries within this
+ // class by retrying for
+ // 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 hours).
+ // This is the same retry strategy used when retrieving collection
+ // data, and avoids retrieving all the data and then throwing it
+ // away due to a transient network outage.
+ auto status = _scheduleLastOplogEntryFetcher_inlock(
+ [=](const StatusWith<mongo::Fetcher::QueryResponse>& status,
+ mongo::Fetcher::NextAction*,
+ mongo::BSONObjBuilder*) {
+ _lastOplogEntryFetcherCallbackForStopTimestamp(
+ status, onCompletionGuard);
+ },
+ kInitialSyncerHandlesRetries);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(
+ lock, status);
+ }
+ });
+ if (scheduleStatus.isOK())
+ return;
+ // If scheduling failed, we're shutting down and cannot retry.
+ // So just continue with the original failed status.
+ }
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
- // If scheduling failed, we're shutting down and cannot retry.
- // So just continue with the original failed status.
- }
- if (!status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
- return;
- }
+ }
- auto&& optimeStatus = parseOpTimeAndWallTime(result);
- if (!optimeStatus.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock,
- optimeStatus.getStatus());
- return;
+ auto&& optimeStatus = parseOpTimeAndWallTime(result);
+ if (!optimeStatus.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock,
+ optimeStatus.getStatus());
+ return;
+ }
+ resultOpTimeAndWallTime = optimeStatus.getValue();
}
- resultOpTimeAndWallTime = optimeStatus.getValue();
+ // Release the _mutex to write to disk.
+ auto opCtx = makeOpCtx();
+ _replicationProcess->getConsistencyMarkers()->setMinValid(
+ opCtx.get(), resultOpTimeAndWallTime.opTime, true);
+
+ stdx::lock_guard<Latch> lock(_mutex);
_initialSyncState->stopTimestamp = resultOpTimeAndWallTime.opTime.getTimestamp();
// If the beginFetchingTimestamp is different from the stopTimestamp, it indicates that