diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2022-09-07 14:23:22 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-05 02:07:55 +0000 |
commit | c66d973949a04a0b80e8bfa7f40e9d2ccf731932 (patch) | |
tree | e7c2b4eda0695e776491ebb919a63fdf8fda94d6 | |
parent | a80813750a255a19690ef3335438091415f8b525 (diff) | |
download | mongo-c66d973949a04a0b80e8bfa7f40e9d2ccf731932.tar.gz |
SERVER-69001: Have initial sync persist its last oplog time into the minvalid document.
(cherry picked from commit ff2fffdf496ac1bc039cd8c84024cc6159cf80b6)
(cherry picked from commit 14b1ea6d58cf2a2169b2a07268fa2266419703b4)
-rw-r--r-- | buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py | 100 | ||||
-rw-r--r-- | jstests/replsets/initial_sync1.js | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 96 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_consistency_markers_impl.cpp | 1 |
4 files changed, 82 insertions, 143 deletions
diff --git a/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py b/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py index 33912e323e4..e7f6a6a9aec 100644 --- a/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py +++ b/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py @@ -266,16 +266,15 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase): secondary.await_ready() client = secondary.mongo_client() - minvalid_doc = client.local["replset.minvalid"].find_one() oplog_truncate_after_doc = client.local["replset.oplogTruncateAfterPoint"].find_one() recovery_timestamp_res = client.admin.command("replSetTest", getLastStableRecoveryTimestamp=True) latest_oplog_doc = client.local["oplog.rs"].find_one(sort=[("$natural", pymongo.DESCENDING)]) - self.logger.info("Checking invariants: minValid: {}, oplogTruncateAfterPoint: {}," + self.logger.info("Checking replication invariants. oplogTruncateAfterPoint: {}," " stable recovery timestamp: {}, latest oplog doc: {}".format( - minvalid_doc, oplog_truncate_after_doc, recovery_timestamp_res, + oplog_truncate_after_doc, recovery_timestamp_res, latest_oplog_doc)) null_ts = bson.Timestamp(0, 0) @@ -289,13 +288,6 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase): raise errors.ServerFailure( "Latest oplog entry had no 'ts' field: {}".format(latest_oplog_doc)) - # The "oplogTruncateAfterPoint" document may not exist at startup. If so, we default - # it to null. - oplog_truncate_after_ts = null_ts - if oplog_truncate_after_doc is not None: - oplog_truncate_after_ts = oplog_truncate_after_doc.get( - "oplogTruncateAfterPoint", null_ts) - # The "lastStableRecoveryTimestamp" field is present if the storage engine supports # "recover to a timestamp". If it's a null timestamp on a durable storage engine, that # means we do not yet have a stable checkpoint timestamp and must be restarting at the @@ -318,94 +310,6 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase): recovery_timestamp, latest_oplog_entry_ts, recovery_timestamp_res, latest_oplog_doc)) - if minvalid_doc is not None: - applied_through_ts = minvalid_doc.get("begin", {}).get("ts", null_ts) - minvalid_ts = minvalid_doc.get("ts", null_ts) - - # The "appliedThrough" value should always equal the "last stable recovery - # timestamp", AKA the stable checkpoint for durable engines, on server restart. - # - # The written "appliedThrough" time is updated with the latest timestamp at the end - # of each batch application, and batch boundaries are the only valid stable - # timestamps on secondaries. Therefore, a non-null appliedThrough timestamp must - # equal the checkpoint timestamp, because any stable timestamp that the checkpoint - # could use includes an equal persisted appliedThrough timestamp. - if (recovery_timestamp != null_ts and applied_through_ts != null_ts - and (not recovery_timestamp == applied_through_ts)): - raise errors.ServerFailure( - "The condition last stable recovery timestamp ({}) == appliedThrough ({})" - " doesn't hold: minValid document={}," - " getLastStableRecoveryTimestamp result={}, last oplog entry={}".format( - recovery_timestamp, applied_through_ts, minvalid_doc, - recovery_timestamp_res, latest_oplog_doc)) - - if applied_through_ts == null_ts: - # We clear "appliedThrough" to represent having applied through the top of the - # oplog in PRIMARY state or immediately after "rollback via refetch". - # If we are using a storage engine that supports "recover to a timestamp," - # then we will have a "last stable recovery timestamp" and we should use that - # as our "appliedThrough" (similarly to why we assert their equality above). - # If both are null, then we are in PRIMARY state on a storage engine that does - # not support "recover to a timestamp" or in RECOVERING immediately after - # "rollback via refetch". Since we do not update "minValid" in PRIMARY state, - # we leave "appliedThrough" as null so that the invariants below hold, rather - # than substituting the latest oplog entry for the "appliedThrough" value. - applied_through_ts = recovery_timestamp - - if minvalid_ts == null_ts: - # The server treats the "ts" field in the minValid document as missing when its - # value is the null timestamp. - minvalid_ts = applied_through_ts - - if latest_oplog_entry_ts == null_ts: - # If the oplog is empty, we treat the "minValid" as the latest oplog entry. - latest_oplog_entry_ts = minvalid_ts - - if oplog_truncate_after_ts == null_ts: - # The server treats the "oplogTruncateAfterPoint" field as missing when its - # value is the null timestamp. When it is null, the oplog is complete and - # should not be truncated, so it is effectively the top of the oplog. - oplog_truncate_after_ts = latest_oplog_entry_ts - - # Check the ordering invariants before the secondary has reconciled the end of - # its oplog. - # The "oplogTruncateAfterPoint" is set to the first timestamp of each batch of - # oplog entries before they are written to the oplog. Thus, it can be ahead - # of the top of the oplog before any oplog entries are written, and behind it - # after some are written. Thus, we cannot compare it to the top of the oplog. - - # appliedThrough <= minValid - # appliedThrough represents the end of the previous batch, so it is always the - # earliest. - if applied_through_ts > minvalid_ts: - raise errors.ServerFailure( - "The condition appliedThrough <= minValid ({} <= {}) doesn't hold: minValid" - " document={}, latest oplog entry={}".format( - applied_through_ts, minvalid_ts, minvalid_doc, latest_oplog_doc)) - - # minValid <= oplogTruncateAfterPoint - # This is true because this hook is never run after a rollback. Thus, we only - # move "minValid" to the end of each batch after the batch is written to the oplog. - # We reset the "oplogTruncateAfterPoint" to null before we move "minValid" from - # the end of the previous batch to the end of the current batch. Thus "minValid" - # must be less than or equal to the "oplogTruncateAfterPoint". - if minvalid_ts > oplog_truncate_after_ts: - raise errors.ServerFailure( - "The condition minValid <= oplogTruncateAfterPoint ({} <= {}) doesn't" - " hold: minValid document={}, oplogTruncateAfterPoint document={}," - " latest oplog entry={}".format(minvalid_ts, oplog_truncate_after_ts, - minvalid_doc, oplog_truncate_after_doc, - latest_oplog_doc)) - - # minvalid <= latest oplog entry - # "minValid" is set to the end of a batch after the batch is written to the oplog. - # Thus it is always less than or equal to the top of the oplog. - if minvalid_ts > latest_oplog_entry_ts: - raise errors.ServerFailure( - "The condition minValid <= top of oplog ({} <= {}) doesn't" - " hold: minValid document={}, latest oplog entry={}".format( - minvalid_ts, latest_oplog_entry_ts, minvalid_doc, latest_oplog_doc)) - try: secondary.teardown() except errors.ServerFailure as exc: diff --git a/jstests/replsets/initial_sync1.js b/jstests/replsets/initial_sync1.js index 0a536b4d601..9fd1403e20f 100644 --- a/jstests/replsets/initial_sync1.js +++ b/jstests/replsets/initial_sync1.js @@ -51,7 +51,16 @@ admin_s1.runCommand({replSetFreeze: 999999}); print("6. Bring up #3"); var hostname = getHostName(); -var slave2 = MongoRunner.runMongod(Object.merge({replSet: basename, oplogSize: 2}, x509_options2)); +var slave2 = MongoRunner.runMongod(Object.merge({ + replSet: basename, + oplogSize: 2, + // Preserve the initial sync state to validate an assertion. + setParameter: { + "failpoint.skipClearInitialSyncState": tojson({mode: 'alwaysOn'}), + "failpoint.initialSyncHangBeforeCompletingOplogFetching": tojson({mode: 'alwaysOn'}), + }, +}, + x509_options2)); var local_s2 = slave2.getDB("local"); var admin_s2 = slave2.getDB("admin"); @@ -77,6 +86,17 @@ wait(function() { return config2.version == config.version && (config3 && config3.version == config.version); }); +assert.commandWorked(slave2.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCompletingOplogFetching", + timesEntered: 1, + maxTimeMS: 60 * 1000, +})); + +// Fetch the minValid document at the end of initial sync. +let syncingNodeMinvalid = slave2.getDB("local").replset.minvalid.findOne()["ts"]; +assert.commandWorked(slave2.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCompletingOplogFetching", mode: "off"})); + replTest.waitForState(slave2, [ReplSetTest.State.SECONDARY, ReplSetTest.State.RECOVERING]); print("7. Kill the secondary in the middle of syncing"); @@ -102,5 +122,11 @@ assert.commandWorked(bulk.execute()); print("11. Everyone happy eventually"); replTest.awaitReplication(); +// SERVER-69001: Assert that the last oplog for initial sync was persisted in the minvalid document. +let lastInitialSyncOp = + slave2.adminCommand("replSetGetStatus")["initialSyncStatus"]["initialSyncOplogEnd"]; +assert.eq(lastInitialSyncOp, syncingNodeMinvalid); + MongoRunner.stopMongod(slave2); + replTest.stopSet(); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index e6c47a25317..928cb7cbeba 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -1399,53 +1399,63 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { OpTimeAndWallTime resultOpTimeAndWallTime = {OpTime(), Date_t()}; { - stdx::lock_guard<Latch> lock(_mutex); - auto status = _checkForShutdownAndConvertStatus_inlock( - result.getStatus(), "error fetching last oplog entry for stop timestamp"); - if (_shouldRetryError(lock, status)) { - auto scheduleStatus = - (*_attemptExec) - ->scheduleWork([this, - onCompletionGuard](executor::TaskExecutor::CallbackArgs args) { - // It is not valid to schedule the retry from within this callback, - // hence we schedule a lambda to schedule the retry. - stdx::lock_guard<Latch> lock(_mutex); - // Since the stopTimestamp is retrieved after we have done all the work of - // retrieving collection data, we handle retries within this class by - // retrying for 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 - // hours). This is the same retry strategy used when retrieving collection - // data, and avoids retrieving all the data and then throwing it away due to - // a transient network outage. - auto status = _scheduleLastOplogEntryFetcher_inlock( - [=](const StatusWith<mongo::Fetcher::QueryResponse>& status, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) { - _lastOplogEntryFetcherCallbackForStopTimestamp(status, - onCompletionGuard); - }, - kInitialSyncerHandlesRetries); - if (!status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); - } - }); - if (scheduleStatus.isOK()) + { + stdx::lock_guard<Latch> lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + result.getStatus(), "error fetching last oplog entry for stop timestamp"); + if (_shouldRetryError(lock, status)) { + auto scheduleStatus = + (*_attemptExec) + ->scheduleWork( + [this, onCompletionGuard](executor::TaskExecutor::CallbackArgs args) { + // It is not valid to schedule the retry from within this callback, + // hence we schedule a lambda to schedule the retry. + stdx::lock_guard<Latch> lock(_mutex); + // Since the stopTimestamp is retrieved after we have done all the + // work of retrieving collection data, we handle retries within this + // class by retrying for + // 'initialSyncTransientErrorRetryPeriodSeconds' (default 24 hours). + // This is the same retry strategy used when retrieving collection + // data, and avoids retrieving all the data and then throwing it + // away due to a transient network outage. + auto status = _scheduleLastOplogEntryFetcher_inlock( + [=](const StatusWith<mongo::Fetcher::QueryResponse>& status, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { + _lastOplogEntryFetcherCallbackForStopTimestamp( + status, onCompletionGuard); + }, + kInitialSyncerHandlesRetries); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock( + lock, status); + } + }); + if (scheduleStatus.isOK()) + return; + // If scheduling failed, we're shutting down and cannot retry. + // So just continue with the original failed status. + } + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; - // If scheduling failed, we're shutting down and cannot retry. - // So just continue with the original failed status. - } - if (!status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); - return; - } + } - auto&& optimeStatus = parseOpTimeAndWallTime(result); - if (!optimeStatus.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, - optimeStatus.getStatus()); - return; + auto&& optimeStatus = parseOpTimeAndWallTime(result); + if (!optimeStatus.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, + optimeStatus.getStatus()); + return; + } + resultOpTimeAndWallTime = optimeStatus.getValue(); } - resultOpTimeAndWallTime = optimeStatus.getValue(); + // Release the _mutex to write to disk. + auto opCtx = makeOpCtx(); + _replicationProcess->getConsistencyMarkers()->setMinValid(opCtx.get(), + resultOpTimeAndWallTime.opTime); + + stdx::lock_guard<Latch> lock(_mutex); _initialSyncState->stopTimestamp = resultOpTimeAndWallTime.opTime.getTimestamp(); // If the beginFetchingTimestamp is different from the stopTimestamp, it indicates that diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index e7151875651..1f9b0be9518 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -225,7 +225,6 @@ void ReplicationConsistencyMarkersImpl::setMinValid(OperationContext* opCtx, // This method is only used with storage engines that do not support recover to stable // timestamp. As a result, their timestamps do not matter. - invariant(!opCtx->getServiceContext()->getStorageEngine()->supportsRecoverToStableTimestamp()); update.timestamp = Timestamp(); _updateMinValidDocument(opCtx, update); |