diff options
Diffstat (limited to 'src')
14 files changed, 345 insertions, 95 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 4a05bea3c75..66779823508 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -260,6 +260,9 @@ env.Library( 'repl_coordinator_interface', 'replication_consistency_markers_idl', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog_raii', + ], ) env.Library( diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h index c59abdac678..e873d4977c9 100644 --- a/src/mongo/db/repl/replication_consistency_markers.h +++ b/src/mongo/db/repl/replication_consistency_markers.h @@ -152,22 +152,76 @@ public: virtual void ensureFastCountOnOplogTruncateAfterPoint(OperationContext* opCtx) = 0; /** - * The oplog truncate after point is set to the beginning of a batch of oplog entries before - * the oplog entries are written into the oplog, and reset before we begin applying the batch. - * On startup all oplog entries with a value >= the oplog truncate after point should be - * deleted. We write operations to the oplog in parallel so if we crash mid-batch there could - * be holes in the oplog. Deleting them at startup keeps us consistent. + * On startup all oplog entries with a ts field >= the oplog truncate after point will be + * deleted. If the truncate point is null, no oplog entries are truncated. A null truncate point + * can be found on startup if the server was certain at the time of shutdown that there were no + * parallel writes running. * - * If null, no documents should be deleted. + * Write operations are done in parallel, creating momentary oplog 'holes' where writes at an + * earlier timestamp are not yet committed. Secondaries can read an oplog entry from a + * sync-source as soon as there are no holes behind the oplog entry in-memory, but before there + * are no holes behind the oplog entry on disk. Therefore, after a crash, the oplog is truncated + * back to its on-disk no holes point that is guaranteed to be consistent with the rest of the + * replica set. * - * If we are in feature compatibility version 3.4 and there is no oplog truncate after point - * document, we fall back on the old oplog delete from point field in the minValid - * collection. + * A primary will update the oplog truncate after point before every journal flush to disk with + * the storage engine tracked in-memory no holes point. + * + * For other replication states than PRIMARY, the oplog truncate after point is updated + * directly. For batch application, the oplog truncate after point is set to the current + * lastApplied timestamp prior to writing a batch of oplog entries into the oplog, and reset to + * null once the parallel oplog entry writes are complete. + * + * Concurrency control and serialization is the responsibility of the caller. */ virtual void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) = 0; virtual Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const = 0; + /** + * Turns updating the OplogTruncateAfterPoint in refreshOplogTruncateAfterPointIfPrimary on/off. + * + * Any already running calls to refreshOplogTruncateAfterPointIfPrimary must be interrupted to + * ensure that the updates to the truncate point via that function have stopped. + */ + virtual void startUsingOplogTruncateAfterPointForPrimary() = 0; + virtual void stopUsingOplogTruncateAfterPointForPrimary() = 0; + + /** + * Indicates whether the oplog truncate after point is currently in use (being periodically + * refreshed), which is only done while in state PRIMARY. + * + * This class stores its own relevant replication state knowledge to avoid potential deadlocks + * in accessing the replication coordinator's mutex to check; and will remain false for + * standalones that do not use timestamps. + */ + virtual bool isOplogTruncateAfterPointBeingUsedForPrimary() const = 0; + + /** + * Initializes the oplog truncate after point with the timestamp of the latest oplog entry. + * + * On stepup to primary, the truncate point must be initialized to protect the window of time + * between completion of stepup and the first periodic flush to disk that prompts a truncate + * point update. Otherwise, in-memory writes (with no holes) can replicate while the on-disk + * writes still have holes, at which point we could crash, leaving this node with unknown data + * holes that other nodes do not have (they have the data). + */ + virtual void setOplogTruncateAfterPointToTopOfOplog(OperationContext* opCtx) = 0; + + /** + * Updates the OplogTruncateAfterPoint with the latest no-holes oplog timestamp. + * + * If primary, returns the OpTime and WallTime of the oplog entry associated with the updated + * oplog truncate after point. + * Returns boost::none if isOplogTruncateAfterPointBeingUsedForPrimary returns false. + * + * stopUsingOplogTruncateAfterPointForPrimary() will cause new calls to this function to do + * nothing, but any already running callers of this function will need to be interrupted to + * ensure the state change is in effect (that an update will not racily go ahead). + */ + virtual boost::optional<OpTimeAndWallTime> refreshOplogTruncateAfterPointIfPrimary( + OperationContext* opCtx) = 0; + // -------- Applied Through ---------- /** diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index ca9f654d934..44fbd42b0e6 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -34,6 +34,7 @@ #include "mongo/db/bson/bson_helper.h" #include "mongo/db/catalog/collection_options.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/repl/optime.h" @@ -172,6 +173,8 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o // Make sure to clear the oplogTrucateAfterPoint in case it is stale. Otherwise, we risk the // possibility of deleting oplog entries that we want to keep. It is safe to clear this // here since we are consistent at the top of our oplog at this point. + invariant(!isOplogTruncateAfterPointBeingUsedForPrimary(), + "Clearing the truncate point while primary is unsafe: it is asynchronously updated."); setOplogTruncateAfterPoint(opCtx, Timestamp()); if (getGlobalServiceContext()->getStorageEngine()->isDurable()) { @@ -313,27 +316,6 @@ OpTime ReplicationConsistencyMarkersImpl::getAppliedThrough(OperationContext* op return appliedThrough.get(); } -boost::optional<OplogTruncateAfterPointDocument> -ReplicationConsistencyMarkersImpl::_getOplogTruncateAfterPointDocument( - OperationContext* opCtx) const { - auto doc = _storageInterface->findById( - opCtx, _oplogTruncateAfterPointNss, kOplogTruncateAfterPointId["_id"]); - - if (!doc.isOK()) { - if (doc.getStatus() == ErrorCodes::NoSuchKey || - doc.getStatus() == ErrorCodes::NamespaceNotFound) { - return boost::none; - } else { - // Fails if there is an error other than the collection being missing or being empty. - fassertFailedWithStatus(40510, doc.getStatus()); - } - } - - auto oplogTruncateAfterPoint = OplogTruncateAfterPointDocument::parse( - IDLParserErrorContext("OplogTruncateAfterPointDocument"), doc.getValue()); - return oplogTruncateAfterPoint; -} - void ReplicationConsistencyMarkersImpl::ensureFastCountOnOplogTruncateAfterPoint( OperationContext* opCtx) { LOGV2_DEBUG(21295, @@ -390,19 +372,130 @@ void ReplicationConsistencyMarkersImpl::setOplogTruncateAfterPoint(OperationCont << timestamp))); } +boost::optional<OplogTruncateAfterPointDocument> +ReplicationConsistencyMarkersImpl::_getOplogTruncateAfterPointDocument( + OperationContext* opCtx) const { + auto doc = _storageInterface->findById( + opCtx, _oplogTruncateAfterPointNss, kOplogTruncateAfterPointId["_id"]); + + if (!doc.isOK()) { + if (doc.getStatus() == ErrorCodes::NoSuchKey || + doc.getStatus() == ErrorCodes::NamespaceNotFound) { + return boost::none; + } else { + // Fails if there is an error other than the collection being missing or being empty. + fassertFailedWithStatus(40510, doc.getStatus()); + } + } + + auto oplogTruncateAfterPoint = OplogTruncateAfterPointDocument::parse( + IDLParserErrorContext("OplogTruncateAfterPointDocument"), doc.getValue()); + return oplogTruncateAfterPoint; +} + Timestamp ReplicationConsistencyMarkersImpl::getOplogTruncateAfterPoint( OperationContext* opCtx) const { - auto doc = _getOplogTruncateAfterPointDocument(opCtx); - if (!doc) { + auto truncatePointDoc = _getOplogTruncateAfterPointDocument(opCtx); + if (!truncatePointDoc) { LOGV2_DEBUG( 21297, 3, "Returning empty oplog truncate after point since document did not exist"); - return {}; + return Timestamp(); } + Timestamp truncatePointTimestamp = truncatePointDoc->getOplogTruncateAfterPoint(); + + LOGV2_DEBUG(21298, + 3, + "Returning oplog truncate after point: {truncatePointTimestamp}", + "truncatePointTimestamp"_attr = truncatePointTimestamp); + return truncatePointTimestamp; +} - Timestamp out = doc->getOplogTruncateAfterPoint(); +void ReplicationConsistencyMarkersImpl::startUsingOplogTruncateAfterPointForPrimary() { + stdx::lock_guard<Latch> lk(_truncatePointIsPrimaryMutex); + // There is only one path to stepup and it is not called redundantly. + invariant(!_isPrimary); + _isPrimary = true; +} + +void ReplicationConsistencyMarkersImpl::stopUsingOplogTruncateAfterPointForPrimary() { + stdx::lock_guard<Latch> lk(_truncatePointIsPrimaryMutex); + _isPrimary = false; +} - LOGV2_DEBUG(21298, 3, "returning oplog truncate after point: {out}", "out"_attr = out); - return out; +bool ReplicationConsistencyMarkersImpl::isOplogTruncateAfterPointBeingUsedForPrimary() const { + stdx::lock_guard<Latch> lk(_truncatePointIsPrimaryMutex); + return _isPrimary; +} + +void ReplicationConsistencyMarkersImpl::setOplogTruncateAfterPointToTopOfOplog( + OperationContext* opCtx) { + auto timestamp = _storageInterface->getLatestOplogTimestamp(opCtx); + LOGV2_DEBUG(21551, + 3, + "Initializing oplog truncate after point: {timestamp}", + "timestamp"_attr = timestamp); + setOplogTruncateAfterPoint(opCtx, timestamp); +} + +boost::optional<OpTimeAndWallTime> +ReplicationConsistencyMarkersImpl::refreshOplogTruncateAfterPointIfPrimary( + OperationContext* opCtx) { + + if (!isOplogTruncateAfterPointBeingUsedForPrimary()) { + // Stepdown clears the truncate point, after which the truncate point is set manually as + // needed, so nothing should be done here -- else we might truncate something we should not. + return boost::none; + } + + // Temporarily allow writes if kIgnoreConflicts is set on the recovery unit so the truncate + // point can be updated. The kIgnoreConflicts setting only allows reads. + auto originalBehavior = opCtx->recoveryUnit()->getPrepareConflictBehavior(); + if (originalBehavior == PrepareConflictBehavior::kIgnoreConflicts) { + opCtx->recoveryUnit()->setPrepareConflictBehavior( + PrepareConflictBehavior::kIgnoreConflictsAllowWrites); + } + ON_BLOCK_EXIT([&] { opCtx->recoveryUnit()->setPrepareConflictBehavior(originalBehavior); }); + + // The locks necessary to write to the oplog truncate after point's collection and read from the + // oplog collection must be taken up front so that the mutex can also be taken around both + // operations without causing deadlocks. + AutoGetCollection autoTruncateColl(opCtx, _oplogTruncateAfterPointNss, MODE_IX); + AutoGetCollection autoOplogColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS); + stdx::lock_guard<Latch> lk(_refreshOplogTruncateAfterPointMutex); + + // Update the oplogTruncateAfterPoint to the storage engine's reported oplog timestamp with no + // holes behind it in-memory (only, not on disk, despite the name). + auto truncateTimestamp = _storageInterface->getAllDurableTimestamp(opCtx->getServiceContext()); + + if (truncateTimestamp != Timestamp(StorageEngine::kMinimumTimestamp)) { + setOplogTruncateAfterPoint(opCtx, truncateTimestamp); + } else { + // The all_durable timestamp has not yet been set: there have been no oplog writes since + // this server instance started up. In this case, we will return the current + // oplogTruncateAfterPoint without updating it, since there's nothing to update. + truncateTimestamp = getOplogTruncateAfterPoint(opCtx); + + // A primary cannot have an unset oplogTruncateAfterPoint because it is initialized on + // step-up. + invariant(!truncateTimestamp.isNull()); + } + + // Reset the snapshot so that it is ensured to see the latest oplog entries. + opCtx->recoveryUnit()->abandonSnapshot(); + + // Fetch the oplog entry <= timestamp. all_durable may be set to a value between oplog entries. + // We need an oplog entry in order to return term and wallclock time for an OpTimeAndWallTime + // result. + auto truncateOplogEntryBSON = _storageInterface->findOplogEntryLessThanOrEqualToTimestamp( + opCtx, autoOplogColl.getCollection(), truncateTimestamp); + + // The truncate point moves the Durable timestamp forward, so it should always exist in the + // oplog. + invariant(truncateOplogEntryBSON, "Found no oplog entry lte " + truncateTimestamp.toString()); + + return fassert( + 44555001, + OpTimeAndWallTime::parseOpTimeAndWallTimeFromOplogEntry(truncateOplogEntryBSON.get())); } Status ReplicationConsistencyMarkersImpl::createInternalCollections(OperationContext* opCtx) { @@ -414,7 +507,6 @@ Status ReplicationConsistencyMarkersImpl::createInternalCollections(OperationCon << " Error: " << status.toString()}; } } - return Status::OK(); } diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h index 4cb924eeea0..552d03a372e 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.h +++ b/src/mongo/db/repl/replication_consistency_markers_impl.h @@ -70,9 +70,19 @@ public: void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override; void ensureFastCountOnOplogTruncateAfterPoint(OperationContext* opCtx) override; + void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override; Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override; + void startUsingOplogTruncateAfterPointForPrimary() override; + void stopUsingOplogTruncateAfterPointForPrimary() override; + bool isOplogTruncateAfterPointBeingUsedForPrimary() const override; + + void setOplogTruncateAfterPointToTopOfOplog(OperationContext* opCtx) override; + + boost::optional<OpTimeAndWallTime> refreshOplogTruncateAfterPointIfPrimary( + OperationContext* opCtx) override; + void setAppliedThrough(OperationContext* opCtx, const OpTime& optime, bool setTimestamp = true) override; @@ -114,6 +124,21 @@ private: StorageInterface* _storageInterface; const NamespaceString _minValidNss; const NamespaceString _oplogTruncateAfterPointNss; + + // Protects modifying and reading _isPrimary below. + mutable Mutex _truncatePointIsPrimaryMutex = + MONGO_MAKE_LATCH("ReplicationConsistencyMarkers::_truncatePointIsPrimaryMutex"); + + // Tracks whether or not the node is primary. Avoids potential deadlocks taking the replication + // coordinator's mutex to check replication state. Also remains false for standalones that do + // not use timestamps. + bool _isPrimary = false; + + // Locks around fetching the 'all_durable' timestamp from the storage engine and updating the + // oplogTruncateAfterPoint. This prevents the oplogTruncateAfterPoint from going backwards in + // time in case of multiple callers to refreshOplogTruncateAfterPointIfPrimary. + mutable Mutex _refreshOplogTruncateAfterPointMutex = + MONGO_MAKE_LATCH("ReplicationConsistencyMarkers::_refreshOplogTruncateAfterPointMutex"); }; } // namespace repl diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp index 92744af2032..28710b460eb 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp @@ -95,6 +95,23 @@ Timestamp ReplicationConsistencyMarkersMock::getOplogTruncateAfterPoint( return _oplogTruncateAfterPoint; } +void ReplicationConsistencyMarkersMock::startUsingOplogTruncateAfterPointForPrimary() {} + +void ReplicationConsistencyMarkersMock::stopUsingOplogTruncateAfterPointForPrimary() {} + +bool ReplicationConsistencyMarkersMock::isOplogTruncateAfterPointBeingUsedForPrimary() const { + return true; +} + +void ReplicationConsistencyMarkersMock::setOplogTruncateAfterPointToTopOfOplog( + OperationContext* opCtx){}; + +boost::optional<OpTimeAndWallTime> +ReplicationConsistencyMarkersMock::refreshOplogTruncateAfterPointIfPrimary( + OperationContext* opCtx) { + return boost::none; +} + void ReplicationConsistencyMarkersMock::setAppliedThrough(OperationContext* opCtx, const OpTime& optime, bool setTimestamp) { diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h index eff8bf2961b..7afe39a6bd7 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.h +++ b/src/mongo/db/repl/replication_consistency_markers_mock.h @@ -62,9 +62,19 @@ public: void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override; void ensureFastCountOnOplogTruncateAfterPoint(OperationContext* opCtx) override; + void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override; Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override; + void startUsingOplogTruncateAfterPointForPrimary() override; + void stopUsingOplogTruncateAfterPointForPrimary() override; + bool isOplogTruncateAfterPointBeingUsedForPrimary() const override; + + void setOplogTruncateAfterPointToTopOfOplog(OperationContext* opCtx) override; + + boost::optional<OpTimeAndWallTime> refreshOplogTruncateAfterPointIfPrimary( + OperationContext* opCtx) override; + void setAppliedThrough(OperationContext* opCtx, const OpTime& optime, bool setTimestamp = true) override; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 0e5cf9f73cd..cd8ce9e6b3b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -406,6 +406,17 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx) // _taskExecutor outside of _threadMutex because once _startedThreads is set to true, the // _taskExecutor pointer never changes. _taskExecutor->join(); + + // Clear the truncate point if we are still primary, so nothing gets truncated unnecessarily on + // startup. There are no oplog holes on clean primary shutdown. Stepdown is similarly safe and + // clears the truncate point. The other replication states do need truncation if the truncate + // point is set: e.g. interruption mid batch application can leave oplog holes. + if (!storageGlobalParams.readOnly && + _replicationProcess->getConsistencyMarkers() + ->isOplogTruncateAfterPointBeingUsedForPrimary()) { + _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, + Timestamp()); + } } executor::TaskExecutor* ReplicationCoordinatorExternalStateImpl::getTaskExecutor() const { @@ -466,13 +477,30 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC MongoDSessionCatalog::onStepUp(opCtx); + invariant( + _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); + + // A primary periodically updates the oplogTruncateAfterPoint to allow replication to proceed + // without danger of unidentifiable oplog holes on unclean shutdown due to parallel writes. + // + // Initialize the oplogTruncateAfterPoint so that user writes are safe on unclean shutdown + // between completion of transition to primary and the first async oplogTruncateAfterPoint + // update. + _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPointToTopOfOplog(opCtx); + + // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use the + // truncate point, rather than last applied, to update the repl durable timestamp. + // + // The truncate point must be used while primary for repl's durable timestamp because otherwise + // we could truncate last applied writes on startup recovery after an unclean shutdown that were + // previously majority confirmed to the user. + _replicationProcess->getConsistencyMarkers()->startUsingOplogTruncateAfterPointForPrimary(); + // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be // done before we add anything to our oplog. // We record this update at the 'lastAppliedOpTime'. If there are any outstanding // checkpoints being taken, they should only reflect this write if they see all writes up // to our 'lastAppliedOpTime'. - invariant( - _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull()); auto lastAppliedOpTime = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime(); _replicationProcess->getConsistencyMarkers()->clearAppliedThrough( opCtx, lastAppliedOpTime.getTimestamp()); @@ -742,13 +770,29 @@ void ReplicationCoordinatorExternalStateImpl::clearOplogVisibilityStateForStepDo ON_BLOCK_EXIT([&] { opCtx->setShouldParticipateInFlowControl(originalFlowControlSetting); }); opCtx->setShouldParticipateInFlowControl(false); - // We can clear the oplogTruncateAfterPoint because we know there are no concurrent user writes - // during stepdown and therefore presently no oplog holes. + // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go back to + // using last applied to update repl's durable timestamp instead of the truncate point. + _replicationProcess->getConsistencyMarkers()->stopUsingOplogTruncateAfterPointForPrimary(); + + // Interrupt the current JournalFlusher thread round, so it recognizes that it is no longer + // primary. Otherwise the asynchronously running thread could race with setting the truncate + // point to null below. This would leave the truncate point potentially stale in a non-PRIMARY + // state, where last applied would be used to update repl's durable timestamp and confirm + // majority writes. Startup recovery could truncate majority confirmed writes back to the stale + // truncate after point. // - // This value is updated periodically while in PRIMARY mode to protect against oplog holes on - // unclean shutdown. The value must then be cleared on stepdown because stepup expects the value - // to be unset. Batch application, in mode SECONDARY, also uses the value to protect against - // unclean shutdown, and will handle both setting AND unsetting the value. + // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold + // before doing an update write to the truncate point. + _service->getStorageEngine()->interruptJournalFlusherForReplStateChange(); + + // Wait for another round of journal flushing. This will ensure that we wait for the current + // round to completely finish and have no chance of racing with unsetting the truncate point + // below. It is possible that the JournalFlusher will not check for the interrupt signaled + // above, if writing is imminent, so we must make sure that the code completes fully. + _service->getStorageEngine()->waitForJournalFlush(opCtx); + + // We can clear the oplogTruncateAfterPoint because we know there are no user writes during + // stepdown and therefore presently no oplog holes. _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp()); } @@ -976,6 +1020,16 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM } JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(OperationContext* opCtx) { + // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp in + // order to avoid majority confirming any writes that could later be truncated. + auto truncatePoint = repl::ReplicationProcess::get(opCtx) + ->getConsistencyMarkers() + ->refreshOplogTruncateAfterPointIfPrimary(opCtx); + if (truncatePoint) { + return truncatePoint.get(); + } + + // All other repl states use the last applied. return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime(); } diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 1dedaf98eef..1ea159ebccf 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -61,6 +61,13 @@ struct StorageGlobalParams; class StorageEngine { public: /** + * This is the minimum valid timestamp; it can be used for reads that need to see all + * untimestamped data but no timestamped data. We cannot use 0 here because 0 means see all + * timestamped data. + */ + static const uint64_t kMinimumTimestamp = 1; + + /** * When the storage engine needs to know how much oplog to preserve for the sake of active * transactions, it executes a callback that returns either the oldest active transaction * timestamp, or boost::none if there is no active transaction, or an error if it fails. @@ -598,6 +605,8 @@ public: * commit_timestamp and prepared transactions that have been given a durable_timestamp. * Previously, the deprecated all_committed timestamp would also include prepared transactions * that were prepared but not committed which could make the stable timestamp briefly jump back. + * + * Returns kMinimumTimestamp if there have been no new writes since the storage engine started. */ virtual Timestamp getAllDurableTimestamp() const = 0; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index dfd0fe4741a..4490e00306b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -338,6 +338,7 @@ public: * Signals an immediate journal flush and waits for it to complete before returning. * * Will throw ShutdownInProgress if the flusher thread is being stopped. + * Will throw InterruptedDueToReplStateChange if a flusher round is interrupted by stepdown. */ void waitForJournalFlush() { auto myFuture = [&]() { @@ -348,7 +349,7 @@ public: } return _nextSharedPromise->getFuture(); }(); - // Throws on error if the catalog is closed. + // Throws on error if the catalog is closed or the flusher round is interrupted by stepdown. myFuture.get(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp index cb20d2344f2..7a739b9a3c1 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp @@ -43,11 +43,6 @@ #include "mongo/util/scopeguard.h" namespace mongo { -namespace { -// This is the minimum valid timestamp; it can be used for reads that need to see all untimestamped -// data but no timestamped data. We cannot use 0 here because 0 means see all timestamped data. -const uint64_t kMinimumTimestamp = 1; -} // namespace MONGO_FAIL_POINT_DEFINE(WTPausePrimaryOplogDurabilityLoop); @@ -72,7 +67,7 @@ void WiredTigerOplogManager::start(OperationContext* opCtx, "oplogVisibility"_attr = oplogVisibility); } else { // Avoid setting oplog visibility to 0. That means "everything is visible". - setOplogReadTimestamp(Timestamp(kMinimumTimestamp)); + setOplogReadTimestamp(Timestamp(StorageEngine::kMinimumTimestamp)); } // Need to obtain the mutex before starting the thread, as otherwise it may race ahead @@ -177,17 +172,6 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses // forward cursors. The timestamp is used to hide oplog entries that might be committed but // have uncommitted entries ahead of them. while (true) { - auto opCtx = cc().makeOperationContext(); - - // This thread is started before we finish creating the StorageEngine and consequently - // makeOperationContext() returns an OperationContext with a LockerNoop. Rather than trying - // to refactor the code to start this thread after the StorageEngine is fully instantiated, - // we will use this temporary hack to give the opCtx a real Locker. - // - // TODO (SERVER-41392): the Replicate Before Journaling project will be removing the - // waitUntilDurable() call requiring an opCtx parameter. - cc().swapLockState(std::make_unique<LockerImpl>()); - stdx::unique_lock<Latch> lk(_oplogVisibilityStateMutex); { MONGO_IDLE_THREAD_BLOCK; @@ -246,12 +230,6 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses continue; } - // In order to avoid oplog holes after an unclean shutdown, we must ensure this proposed - // oplog read timestamp's documents are durable before publishing that timestamp. - sessionCache->waitUntilDurable(opCtx.get(), - WiredTigerSessionCache::Fsync::kJournal, - WiredTigerSessionCache::UseJournalListener::kUpdate); - lk.lock(); // Publish the new timestamp value. Avoid going backward. auto oldTimestamp = getOplogReadTimestamp(); @@ -291,7 +269,7 @@ uint64_t WiredTigerOplogManager::fetchAllDurableValue(WT_CONNECTION* conn) { if (wtstatus == WT_NOTFOUND) { // Treat this as lowest possible timestamp; we need to see all preexisting data but no new // (timestamped) data. - return kMinimumTimestamp; + return StorageEngine::kMinimumTimestamp; } else { invariantWTOK(wtstatus); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h index 2c81da13c0a..a899861f262 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h @@ -70,7 +70,7 @@ public: std::uint64_t getOplogReadTimestamp() const; void setOplogReadTimestamp(Timestamp ts); - // Triggers the oplogJournal thread to update its oplog read timestamp, by flushing the journal. + // Triggers the oplogJournal thread to update the oplog read timestamp. void triggerOplogVisibilityUpdate(); // Waits until all committed writes at this point to become visible (that is, no holes exist in diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index b56f31fc455..978694fc270 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -250,9 +250,9 @@ void WiredTigerSessionCache::waitUntilDurable(OperationContext* opCtx, // Update the JournalListener before we return. As far as listeners are concerned, all // writes are as 'durable' as they are ever going to get on an inMemory storage engine. auto journalListener = [&]() -> JournalListener* { - // The JournalListener may not be set immediately, so we must check under a mutex so - // as not to access the variable while setting a JournalListener. A JournalListener - // is only allowed to be set once, so using the pointer outside of a mutex is safe. + // The JournalListener may not be set immediately, so we must check under a mutex so as + // not to access the variable while setting a JournalListener. A JournalListener is only + // allowed to be set once, so using the pointer outside of a mutex is safe. stdx::unique_lock<Latch> lk(_journalListenerMutex); return _journalListener; }(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index e488b73cd65..d4ec5de2ad2 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -309,7 +309,8 @@ public: * Taking a checkpoint has the benefit of persisting unjournaled writes. * * 'useListener' controls whether or not the JournalListener is updated with the last durable - * value of the timestamp that it tracks. + * value of the timestamp that it tracks. The JournalListener's token is fetched before writing + * out to disk and set afterwards to update the repl layer durable timestamp. * * Uses a temporary session. Safe to call without any locks, even during shutdown. */ diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index 1a7f4ee868e..85262273fc6 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -267,28 +267,34 @@ Status waitForWriteConcern(OperationContext* opCtx, WriteConcernOptions writeConcernWithPopulatedSyncMode = replCoord->populateUnsetWriteConcernOptionsSyncMode(writeConcern); - switch (writeConcernWithPopulatedSyncMode.syncMode) { - case WriteConcernOptions::SyncMode::UNSET: - LOGV2_FATAL(22550, "Attempting to wait on a WriteConcern with an unset sync option"); - fassertFailed(34410); - case WriteConcernOptions::SyncMode::NONE: - break; - case WriteConcernOptions::SyncMode::FSYNC: { - if (!storageEngine->isDurable()) { - storageEngine->flushAllFiles(opCtx, /*callerHoldsReadLock*/ false); - - // This field has had a dummy value since MMAP went away. It is undocumented. - // Maintaining it so as not to cause unnecessary user pain across upgrades. - result->fsyncFiles = 1; - } else { - // We only need to commit the journal if we're durable - storageEngine->waitForJournalFlush(opCtx); + // Waiting for durability (flushing the journal or all files to disk) can throw on interruption. + try { + switch (writeConcernWithPopulatedSyncMode.syncMode) { + case WriteConcernOptions::SyncMode::UNSET: + LOGV2_FATAL(22550, + "Attempting to wait on a WriteConcern with an unset sync option"); + fassertFailed(34410); + case WriteConcernOptions::SyncMode::NONE: + break; + case WriteConcernOptions::SyncMode::FSYNC: { + if (!storageEngine->isDurable()) { + storageEngine->flushAllFiles(opCtx, /*callerHoldsReadLock*/ false); + + // This field has had a dummy value since MMAP went away. It is undocumented. + // Maintaining it so as not to cause unnecessary user pain across upgrades. + result->fsyncFiles = 1; + } else { + // We only need to commit the journal if we're durable + storageEngine->waitForJournalFlush(opCtx); + } + break; } - break; + case WriteConcernOptions::SyncMode::JOURNAL: + storageEngine->waitForJournalFlush(opCtx); + break; } - case WriteConcernOptions::SyncMode::JOURNAL: - storageEngine->waitForJournalFlush(opCtx); - break; + } catch (const DBException& ex) { + return ex.toStatus(); } result->syncMillis = syncTimer.millis(); |