From c8a222bd7cddb738dac52b83bed55d2bddf9363b Mon Sep 17 00:00:00 2001 From: Daniel Gottlieb Date: Wed, 7 Sep 2022 14:23:22 -0400 Subject: SERVER-69001: Have initial sync persist its last oplog time into the minvalid document. (cherry picked from commit ff2fffdf496ac1bc039cd8c84024cc6159cf80b6) --- .../testing/hooks/periodic_kill_secondaries.py | 100 +-------------------- jstests/replsets/initial_sync1.js | 27 +++++- src/mongo/db/repl/initial_syncer.cpp | 44 ++++++--- .../repl/replication_consistency_markers_impl.cpp | 1 - 4 files changed, 59 insertions(+), 113 deletions(-) diff --git a/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py b/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py index f85f44a1c29..d896fe0e00d 100644 --- a/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py +++ b/buildscripts/resmokelib/testing/hooks/periodic_kill_secondaries.py @@ -266,16 +266,15 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase): secondary.await_ready() client = secondary.mongo_client() - minvalid_doc = client.local["replset.minvalid"].find_one() oplog_truncate_after_doc = client.local["replset.oplogTruncateAfterPoint"].find_one() recovery_timestamp_res = client.admin.command("replSetTest", getLastStableRecoveryTimestamp=True) latest_oplog_doc = client.local["oplog.rs"].find_one(sort=[("$natural", pymongo.DESCENDING)]) - self.logger.info("Checking invariants: minValid: {}, oplogTruncateAfterPoint: {}," + self.logger.info("Checking replication invariants. oplogTruncateAfterPoint: {}," " stable recovery timestamp: {}, latest oplog doc: {}".format( - minvalid_doc, oplog_truncate_after_doc, recovery_timestamp_res, + oplog_truncate_after_doc, recovery_timestamp_res, latest_oplog_doc)) null_ts = bson.Timestamp(0, 0) @@ -289,13 +288,6 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase): raise errors.ServerFailure( "Latest oplog entry had no 'ts' field: {}".format(latest_oplog_doc)) - # The "oplogTruncateAfterPoint" document may not exist at startup. If so, we default - # it to null. - oplog_truncate_after_ts = null_ts - if oplog_truncate_after_doc is not None: - oplog_truncate_after_ts = oplog_truncate_after_doc.get( - "oplogTruncateAfterPoint", null_ts) - # The "lastStableRecoveryTimestamp" field is present if the storage engine supports # "recover to a timestamp". If it's a null timestamp on a durable storage engine, that # means we do not yet have a stable checkpoint timestamp and must be restarting at the @@ -318,94 +310,6 @@ class PeriodicKillSecondariesTestCase(interface.DynamicTestCase): recovery_timestamp, latest_oplog_entry_ts, recovery_timestamp_res, latest_oplog_doc)) - if minvalid_doc is not None: - applied_through_ts = minvalid_doc.get("begin", {}).get("ts", null_ts) - minvalid_ts = minvalid_doc.get("ts", null_ts) - - # The "appliedThrough" value should always equal the "last stable recovery - # timestamp", AKA the stable checkpoint for durable engines, on server restart. - # - # The written "appliedThrough" time is updated with the latest timestamp at the end - # of each batch application, and batch boundaries are the only valid stable - # timestamps on secondaries. Therefore, a non-null appliedThrough timestamp must - # equal the checkpoint timestamp, because any stable timestamp that the checkpoint - # could use includes an equal persisted appliedThrough timestamp. - if (recovery_timestamp != null_ts and applied_through_ts != null_ts - and (not recovery_timestamp == applied_through_ts)): - raise errors.ServerFailure( - "The condition last stable recovery timestamp ({}) == appliedThrough ({})" - " doesn't hold: minValid document={}," - " getLastStableRecoveryTimestamp result={}, last oplog entry={}".format( - recovery_timestamp, applied_through_ts, minvalid_doc, - recovery_timestamp_res, latest_oplog_doc)) - - if applied_through_ts == null_ts: - # We clear "appliedThrough" to represent having applied through the top of the - # oplog in PRIMARY state or immediately after "rollback via refetch". - # If we are using a storage engine that supports "recover to a timestamp," - # then we will have a "last stable recovery timestamp" and we should use that - # as our "appliedThrough" (similarly to why we assert their equality above). - # If both are null, then we are in PRIMARY state on a storage engine that does - # not support "recover to a timestamp" or in RECOVERING immediately after - # "rollback via refetch". Since we do not update "minValid" in PRIMARY state, - # we leave "appliedThrough" as null so that the invariants below hold, rather - # than substituting the latest oplog entry for the "appliedThrough" value. - applied_through_ts = recovery_timestamp - - if minvalid_ts == null_ts: - # The server treats the "ts" field in the minValid document as missing when its - # value is the null timestamp. - minvalid_ts = applied_through_ts - - if latest_oplog_entry_ts == null_ts: - # If the oplog is empty, we treat the "minValid" as the latest oplog entry. - latest_oplog_entry_ts = minvalid_ts - - if oplog_truncate_after_ts == null_ts: - # The server treats the "oplogTruncateAfterPoint" field as missing when its - # value is the null timestamp. When it is null, the oplog is complete and - # should not be truncated, so it is effectively the top of the oplog. - oplog_truncate_after_ts = latest_oplog_entry_ts - - # Check the ordering invariants before the secondary has reconciled the end of - # its oplog. - # The "oplogTruncateAfterPoint" is set to the first timestamp of each batch of - # oplog entries before they are written to the oplog. Thus, it can be ahead - # of the top of the oplog before any oplog entries are written, and behind it - # after some are written. Thus, we cannot compare it to the top of the oplog. - - # appliedThrough <= minValid - # appliedThrough represents the end of the previous batch, so it is always the - # earliest. - if applied_through_ts > minvalid_ts: - raise errors.ServerFailure( - "The condition appliedThrough <= minValid ({} <= {}) doesn't hold: minValid" - " document={}, latest oplog entry={}".format( - applied_through_ts, minvalid_ts, minvalid_doc, latest_oplog_doc)) - - # minValid <= oplogTruncateAfterPoint - # This is true because this hook is never run after a rollback. Thus, we only - # move "minValid" to the end of each batch after the batch is written to the oplog. - # We reset the "oplogTruncateAfterPoint" to null before we move "minValid" from - # the end of the previous batch to the end of the current batch. Thus "minValid" - # must be less than or equal to the "oplogTruncateAfterPoint". - if minvalid_ts > oplog_truncate_after_ts: - raise errors.ServerFailure( - "The condition minValid <= oplogTruncateAfterPoint ({} <= {}) doesn't" - " hold: minValid document={}, oplogTruncateAfterPoint document={}," - " latest oplog entry={}".format(minvalid_ts, oplog_truncate_after_ts, - minvalid_doc, oplog_truncate_after_doc, - latest_oplog_doc)) - - # minvalid <= latest oplog entry - # "minValid" is set to the end of a batch after the batch is written to the oplog. - # Thus it is always less than or equal to the top of the oplog. - if minvalid_ts > latest_oplog_entry_ts: - raise errors.ServerFailure( - "The condition minValid <= top of oplog ({} <= {}) doesn't" - " hold: minValid document={}, latest oplog entry={}".format( - minvalid_ts, latest_oplog_entry_ts, minvalid_doc, latest_oplog_doc)) - try: secondary.teardown() except errors.ServerFailure as exc: diff --git a/jstests/replsets/initial_sync1.js b/jstests/replsets/initial_sync1.js index 92ace5d0252..feeeb9b4bcc 100644 --- a/jstests/replsets/initial_sync1.js +++ b/jstests/replsets/initial_sync1.js @@ -51,7 +51,16 @@ admin_s1.runCommand({replSetFreeze: 999999}); print("6. Bring up #3"); var hostname = getHostName(); -var slave2 = MongoRunner.runMongod(Object.merge({replSet: basename, oplogSize: 2}, x509_options2)); +var slave2 = MongoRunner.runMongod(Object.merge({ + replSet: basename, + oplogSize: 2, + // Preserve the initial sync state to validate an assertion. + setParameter: { + "failpoint.skipClearInitialSyncState": tojson({mode: 'alwaysOn'}), + "failpoint.initialSyncHangBeforeCompletingOplogFetching": tojson({mode: 'alwaysOn'}) + }, +}, + x509_options2)); var local_s2 = slave2.getDB("local"); var admin_s2 = slave2.getDB("admin"); @@ -77,6 +86,17 @@ wait(function() { return config2.version == config.version && (config3 && config3.version == config.version); }); +assert.commandWorked(slave2.adminCommand({ + waitForFailPoint: "initialSyncHangBeforeCompletingOplogFetching", + timesEntered: 1, + maxTimeMS: 60 * 1000, +})); + +// Fetch the minValid document at the end of initial sync. +let syncingNodeMinvalid = slave2.getDB("local").replset.minvalid.findOne()["ts"]; +assert.commandWorked(slave2.adminCommand( + {configureFailPoint: "initialSyncHangBeforeCompletingOplogFetching", mode: "off"})); + replTest.waitForState(slave2, [ReplSetTest.State.SECONDARY, ReplSetTest.State.RECOVERING]); print("7. Kill the secondary in the middle of syncing"); @@ -102,5 +122,10 @@ assert.writeOK(bulk.execute()); print("11. Everyone happy eventually"); replTest.awaitReplication(); +// SERVER-69001: Assert that the last oplog for initial sync was persisted in the minvalid document. +let lastInitialSyncOp = + slave2.adminCommand("replSetGetStatus")["initialSyncStatus"]["initialSyncOplogEnd"]; +assert.eq(lastInitialSyncOp, syncingNodeMinvalid); + MongoRunner.stopMongod(slave2); replTest.stopSet(); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 2ba982d5d12..76e3959d7ad 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -114,6 +114,9 @@ MONGO_FAIL_POINT_DEFINE(failInitialSyncBeforeApplyingBatch); // Failpoint which fasserts if applying a batch fails. MONGO_FAIL_POINT_DEFINE(initialSyncFassertIfApplyingBatchFails); +// Failpoint which causes the initial sync function to hang before stopping the oplog fetcher. +MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCompletingOplogFetching); + // Failpoint which skips clearing _initialSyncState after a successful initial sync attempt. MONGO_FAIL_POINT_DEFINE(skipClearInitialSyncState); @@ -1185,22 +1188,30 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( std::shared_ptr onCompletionGuard) { OpTimeAndWallTime resultOpTimeAndWallTime = {OpTime(), Date_t()}; { - stdx::lock_guard lock(_mutex); - auto status = _checkForShutdownAndConvertStatus_inlock( - result.getStatus(), "error fetching last oplog entry for stop timestamp"); - if (!status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); - return; - } + { + stdx::lock_guard lock(_mutex); + auto status = _checkForShutdownAndConvertStatus_inlock( + result.getStatus(), "error fetching last oplog entry for stop timestamp"); + if (!status.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); + return; + } - auto&& optimeStatus = parseOpTimeAndWallTime(result); - if (!optimeStatus.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, - optimeStatus.getStatus()); - return; + auto&& optimeStatus = parseOpTimeAndWallTime(result); + if (!optimeStatus.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, + optimeStatus.getStatus()); + return; + } + resultOpTimeAndWallTime = optimeStatus.getValue(); } - resultOpTimeAndWallTime = optimeStatus.getValue(); + // Release the _mutex to write to disk. + auto opCtx = makeOpCtx(); + _replicationProcess->getConsistencyMarkers()->setMinValid(opCtx.get(), + resultOpTimeAndWallTime.opTime); + + stdx::lock_guard lock(_mutex); _initialSyncState->stopTimestamp = resultOpTimeAndWallTime.opTime.getTimestamp(); // If the beginFetchingTimestamp is different from the stopTimestamp, it indicates that @@ -1463,6 +1474,13 @@ void InitialSyncer::_rollbackCheckerCheckForRollbackCallback( return; } + if (MONGO_FAIL_POINT(initialSyncHangBeforeCompletingOplogFetching)) { + log() << "initial sync - initialSyncHangBeforeCompletingOplogFetching fail point " + "enabled. Blocking until fail point is disabled."; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(initialSyncHangBeforeCompletingOplogFetching); + } + + // Update all unique indexes belonging to non-replicated collections on secondaries. See comment // in ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage() for the explanation of // why we do this. diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index a57b7e35ceb..113c5251e2e 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -198,7 +198,6 @@ void ReplicationConsistencyMarkersImpl::setMinValid(OperationContext* opCtx, // This method is only used with storage engines that do not support recover to stable // timestamp. As a result, their timestamps do not matter. - invariant(!opCtx->getServiceContext()->getStorageEngine()->supportsRecoverToStableTimestamp()); update.timestamp = Timestamp(); _updateMinValidDocument(opCtx, update); -- cgit v1.2.1