summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@mongodb.com>2020-02-21 15:43:53 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-27 06:27:50 +0000
commitab2b0971d21bf8a208ee43312d91c4600b63ccc5 (patch)
treeb8633dd59fff755ac686bb9986ad59f82cda5a38 /src
parent20657a8a6e9d2ed4df9fd8fa3f8c54d12907761f (diff)
downloadmongo-ab2b0971d21bf8a208ee43312d91c4600b63ccc5.tar.gz
SERVER-44555 Replicate Before Journaling
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/replication_consistency_markers.h72
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp148
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.h25
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.cpp17
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.h10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp70
-rw-r--r--src/mongo/db/storage/storage_engine.h9
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp26
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp6
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h3
-rw-r--r--src/mongo/db/write_concern.cpp46
14 files changed, 345 insertions, 95 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 4a05bea3c75..66779823508 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -260,6 +260,9 @@ env.Library(
'repl_coordinator_interface',
'replication_consistency_markers_idl',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog_raii',
+ ],
)
env.Library(
diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h
index c59abdac678..e873d4977c9 100644
--- a/src/mongo/db/repl/replication_consistency_markers.h
+++ b/src/mongo/db/repl/replication_consistency_markers.h
@@ -152,22 +152,76 @@ public:
virtual void ensureFastCountOnOplogTruncateAfterPoint(OperationContext* opCtx) = 0;
/**
- * The oplog truncate after point is set to the beginning of a batch of oplog entries before
- * the oplog entries are written into the oplog, and reset before we begin applying the batch.
- * On startup all oplog entries with a value >= the oplog truncate after point should be
- * deleted. We write operations to the oplog in parallel so if we crash mid-batch there could
- * be holes in the oplog. Deleting them at startup keeps us consistent.
+ * On startup all oplog entries with a ts field >= the oplog truncate after point will be
+ * deleted. If the truncate point is null, no oplog entries are truncated. A null truncate point
+ * can be found on startup if the server was certain at the time of shutdown that there were no
+ * parallel writes running.
*
- * If null, no documents should be deleted.
+ * Write operations are done in parallel, creating momentary oplog 'holes' where writes at an
+ * earlier timestamp are not yet committed. Secondaries can read an oplog entry from a
+ * sync-source as soon as there are no holes behind the oplog entry in-memory, but before there
+ * are no holes behind the oplog entry on disk. Therefore, after a crash, the oplog is truncated
+ * back to its on-disk no holes point that is guaranteed to be consistent with the rest of the
+ * replica set.
*
- * If we are in feature compatibility version 3.4 and there is no oplog truncate after point
- * document, we fall back on the old oplog delete from point field in the minValid
- * collection.
+ * A primary will update the oplog truncate after point before every journal flush to disk with
+ * the storage engine tracked in-memory no holes point.
+ *
+ * For other replication states than PRIMARY, the oplog truncate after point is updated
+ * directly. For batch application, the oplog truncate after point is set to the current
+ * lastApplied timestamp prior to writing a batch of oplog entries into the oplog, and reset to
+ * null once the parallel oplog entry writes are complete.
+ *
+ * Concurrency control and serialization is the responsibility of the caller.
*/
virtual void setOplogTruncateAfterPoint(OperationContext* opCtx,
const Timestamp& timestamp) = 0;
virtual Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const = 0;
+ /**
+ * Turns updating the OplogTruncateAfterPoint in refreshOplogTruncateAfterPointIfPrimary on/off.
+ *
+ * Any already running calls to refreshOplogTruncateAfterPointIfPrimary must be interrupted to
+ * ensure that the updates to the truncate point via that function have stopped.
+ */
+ virtual void startUsingOplogTruncateAfterPointForPrimary() = 0;
+ virtual void stopUsingOplogTruncateAfterPointForPrimary() = 0;
+
+ /**
+ * Indicates whether the oplog truncate after point is currently in use (being periodically
+ * refreshed), which is only done while in state PRIMARY.
+ *
+ * This class stores its own relevant replication state knowledge to avoid potential deadlocks
+ * in accessing the replication coordinator's mutex to check; and will remain false for
+ * standalones that do not use timestamps.
+ */
+ virtual bool isOplogTruncateAfterPointBeingUsedForPrimary() const = 0;
+
+ /**
+ * Initializes the oplog truncate after point with the timestamp of the latest oplog entry.
+ *
+ * On stepup to primary, the truncate point must be initialized to protect the window of time
+ * between completion of stepup and the first periodic flush to disk that prompts a truncate
+ * point update. Otherwise, in-memory writes (with no holes) can replicate while the on-disk
+ * writes still have holes, at which point we could crash, leaving this node with unknown data
+ * holes that other nodes do not have (they have the data).
+ */
+ virtual void setOplogTruncateAfterPointToTopOfOplog(OperationContext* opCtx) = 0;
+
+ /**
+ * Updates the OplogTruncateAfterPoint with the latest no-holes oplog timestamp.
+ *
+ * If primary, returns the OpTime and WallTime of the oplog entry associated with the updated
+ * oplog truncate after point.
+ * Returns boost::none if isOplogTruncateAfterPointBeingUsedForPrimary returns false.
+ *
+ * stopUsingOplogTruncateAfterPointForPrimary() will cause new calls to this function to do
+ * nothing, but any already running callers of this function will need to be interrupted to
+ * ensure the state change is in effect (that an update will not racily go ahead).
+ */
+ virtual boost::optional<OpTimeAndWallTime> refreshOplogTruncateAfterPointIfPrimary(
+ OperationContext* opCtx) = 0;
+
// -------- Applied Through ----------
/**
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index ca9f654d934..44fbd42b0e6 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/bson/bson_helper.h"
#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/repl/optime.h"
@@ -172,6 +173,8 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o
// Make sure to clear the oplogTrucateAfterPoint in case it is stale. Otherwise, we risk the
// possibility of deleting oplog entries that we want to keep. It is safe to clear this
// here since we are consistent at the top of our oplog at this point.
+ invariant(!isOplogTruncateAfterPointBeingUsedForPrimary(),
+ "Clearing the truncate point while primary is unsafe: it is asynchronously updated.");
setOplogTruncateAfterPoint(opCtx, Timestamp());
if (getGlobalServiceContext()->getStorageEngine()->isDurable()) {
@@ -313,27 +316,6 @@ OpTime ReplicationConsistencyMarkersImpl::getAppliedThrough(OperationContext* op
return appliedThrough.get();
}
-boost::optional<OplogTruncateAfterPointDocument>
-ReplicationConsistencyMarkersImpl::_getOplogTruncateAfterPointDocument(
- OperationContext* opCtx) const {
- auto doc = _storageInterface->findById(
- opCtx, _oplogTruncateAfterPointNss, kOplogTruncateAfterPointId["_id"]);
-
- if (!doc.isOK()) {
- if (doc.getStatus() == ErrorCodes::NoSuchKey ||
- doc.getStatus() == ErrorCodes::NamespaceNotFound) {
- return boost::none;
- } else {
- // Fails if there is an error other than the collection being missing or being empty.
- fassertFailedWithStatus(40510, doc.getStatus());
- }
- }
-
- auto oplogTruncateAfterPoint = OplogTruncateAfterPointDocument::parse(
- IDLParserErrorContext("OplogTruncateAfterPointDocument"), doc.getValue());
- return oplogTruncateAfterPoint;
-}
-
void ReplicationConsistencyMarkersImpl::ensureFastCountOnOplogTruncateAfterPoint(
OperationContext* opCtx) {
LOGV2_DEBUG(21295,
@@ -390,19 +372,130 @@ void ReplicationConsistencyMarkersImpl::setOplogTruncateAfterPoint(OperationCont
<< timestamp)));
}
+boost::optional<OplogTruncateAfterPointDocument>
+ReplicationConsistencyMarkersImpl::_getOplogTruncateAfterPointDocument(
+ OperationContext* opCtx) const {
+ auto doc = _storageInterface->findById(
+ opCtx, _oplogTruncateAfterPointNss, kOplogTruncateAfterPointId["_id"]);
+
+ if (!doc.isOK()) {
+ if (doc.getStatus() == ErrorCodes::NoSuchKey ||
+ doc.getStatus() == ErrorCodes::NamespaceNotFound) {
+ return boost::none;
+ } else {
+ // Fails if there is an error other than the collection being missing or being empty.
+ fassertFailedWithStatus(40510, doc.getStatus());
+ }
+ }
+
+ auto oplogTruncateAfterPoint = OplogTruncateAfterPointDocument::parse(
+ IDLParserErrorContext("OplogTruncateAfterPointDocument"), doc.getValue());
+ return oplogTruncateAfterPoint;
+}
+
Timestamp ReplicationConsistencyMarkersImpl::getOplogTruncateAfterPoint(
OperationContext* opCtx) const {
- auto doc = _getOplogTruncateAfterPointDocument(opCtx);
- if (!doc) {
+ auto truncatePointDoc = _getOplogTruncateAfterPointDocument(opCtx);
+ if (!truncatePointDoc) {
LOGV2_DEBUG(
21297, 3, "Returning empty oplog truncate after point since document did not exist");
- return {};
+ return Timestamp();
}
+ Timestamp truncatePointTimestamp = truncatePointDoc->getOplogTruncateAfterPoint();
+
+ LOGV2_DEBUG(21298,
+ 3,
+ "Returning oplog truncate after point: {truncatePointTimestamp}",
+ "truncatePointTimestamp"_attr = truncatePointTimestamp);
+ return truncatePointTimestamp;
+}
- Timestamp out = doc->getOplogTruncateAfterPoint();
+void ReplicationConsistencyMarkersImpl::startUsingOplogTruncateAfterPointForPrimary() {
+ stdx::lock_guard<Latch> lk(_truncatePointIsPrimaryMutex);
+ // There is only one path to stepup and it is not called redundantly.
+ invariant(!_isPrimary);
+ _isPrimary = true;
+}
+
+void ReplicationConsistencyMarkersImpl::stopUsingOplogTruncateAfterPointForPrimary() {
+ stdx::lock_guard<Latch> lk(_truncatePointIsPrimaryMutex);
+ _isPrimary = false;
+}
- LOGV2_DEBUG(21298, 3, "returning oplog truncate after point: {out}", "out"_attr = out);
- return out;
+bool ReplicationConsistencyMarkersImpl::isOplogTruncateAfterPointBeingUsedForPrimary() const {
+ stdx::lock_guard<Latch> lk(_truncatePointIsPrimaryMutex);
+ return _isPrimary;
+}
+
+void ReplicationConsistencyMarkersImpl::setOplogTruncateAfterPointToTopOfOplog(
+ OperationContext* opCtx) {
+ auto timestamp = _storageInterface->getLatestOplogTimestamp(opCtx);
+ LOGV2_DEBUG(21551,
+ 3,
+ "Initializing oplog truncate after point: {timestamp}",
+ "timestamp"_attr = timestamp);
+ setOplogTruncateAfterPoint(opCtx, timestamp);
+}
+
+boost::optional<OpTimeAndWallTime>
+ReplicationConsistencyMarkersImpl::refreshOplogTruncateAfterPointIfPrimary(
+ OperationContext* opCtx) {
+
+ if (!isOplogTruncateAfterPointBeingUsedForPrimary()) {
+ // Stepdown clears the truncate point, after which the truncate point is set manually as
+ // needed, so nothing should be done here -- else we might truncate something we should not.
+ return boost::none;
+ }
+
+ // Temporarily allow writes if kIgnoreConflicts is set on the recovery unit so the truncate
+ // point can be updated. The kIgnoreConflicts setting only allows reads.
+ auto originalBehavior = opCtx->recoveryUnit()->getPrepareConflictBehavior();
+ if (originalBehavior == PrepareConflictBehavior::kIgnoreConflicts) {
+ opCtx->recoveryUnit()->setPrepareConflictBehavior(
+ PrepareConflictBehavior::kIgnoreConflictsAllowWrites);
+ }
+ ON_BLOCK_EXIT([&] { opCtx->recoveryUnit()->setPrepareConflictBehavior(originalBehavior); });
+
+ // The locks necessary to write to the oplog truncate after point's collection and read from the
+ // oplog collection must be taken up front so that the mutex can also be taken around both
+ // operations without causing deadlocks.
+ AutoGetCollection autoTruncateColl(opCtx, _oplogTruncateAfterPointNss, MODE_IX);
+ AutoGetCollection autoOplogColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS);
+ stdx::lock_guard<Latch> lk(_refreshOplogTruncateAfterPointMutex);
+
+ // Update the oplogTruncateAfterPoint to the storage engine's reported oplog timestamp with no
+ // holes behind it in-memory (only, not on disk, despite the name).
+ auto truncateTimestamp = _storageInterface->getAllDurableTimestamp(opCtx->getServiceContext());
+
+ if (truncateTimestamp != Timestamp(StorageEngine::kMinimumTimestamp)) {
+ setOplogTruncateAfterPoint(opCtx, truncateTimestamp);
+ } else {
+ // The all_durable timestamp has not yet been set: there have been no oplog writes since
+ // this server instance started up. In this case, we will return the current
+ // oplogTruncateAfterPoint without updating it, since there's nothing to update.
+ truncateTimestamp = getOplogTruncateAfterPoint(opCtx);
+
+ // A primary cannot have an unset oplogTruncateAfterPoint because it is initialized on
+ // step-up.
+ invariant(!truncateTimestamp.isNull());
+ }
+
+ // Reset the snapshot so that it is ensured to see the latest oplog entries.
+ opCtx->recoveryUnit()->abandonSnapshot();
+
+ // Fetch the oplog entry <= timestamp. all_durable may be set to a value between oplog entries.
+ // We need an oplog entry in order to return term and wallclock time for an OpTimeAndWallTime
+ // result.
+ auto truncateOplogEntryBSON = _storageInterface->findOplogEntryLessThanOrEqualToTimestamp(
+ opCtx, autoOplogColl.getCollection(), truncateTimestamp);
+
+ // The truncate point moves the Durable timestamp forward, so it should always exist in the
+ // oplog.
+ invariant(truncateOplogEntryBSON, "Found no oplog entry lte " + truncateTimestamp.toString());
+
+ return fassert(
+ 44555001,
+ OpTimeAndWallTime::parseOpTimeAndWallTimeFromOplogEntry(truncateOplogEntryBSON.get()));
}
Status ReplicationConsistencyMarkersImpl::createInternalCollections(OperationContext* opCtx) {
@@ -414,7 +507,6 @@ Status ReplicationConsistencyMarkersImpl::createInternalCollections(OperationCon
<< " Error: " << status.toString()};
}
}
-
return Status::OK();
}
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h
index 4cb924eeea0..552d03a372e 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.h
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.h
@@ -70,9 +70,19 @@ public:
void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override;
void ensureFastCountOnOplogTruncateAfterPoint(OperationContext* opCtx) override;
+
void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override;
+ void startUsingOplogTruncateAfterPointForPrimary() override;
+ void stopUsingOplogTruncateAfterPointForPrimary() override;
+ bool isOplogTruncateAfterPointBeingUsedForPrimary() const override;
+
+ void setOplogTruncateAfterPointToTopOfOplog(OperationContext* opCtx) override;
+
+ boost::optional<OpTimeAndWallTime> refreshOplogTruncateAfterPointIfPrimary(
+ OperationContext* opCtx) override;
+
void setAppliedThrough(OperationContext* opCtx,
const OpTime& optime,
bool setTimestamp = true) override;
@@ -114,6 +124,21 @@ private:
StorageInterface* _storageInterface;
const NamespaceString _minValidNss;
const NamespaceString _oplogTruncateAfterPointNss;
+
+ // Protects modifying and reading _isPrimary below.
+ mutable Mutex _truncatePointIsPrimaryMutex =
+ MONGO_MAKE_LATCH("ReplicationConsistencyMarkers::_truncatePointIsPrimaryMutex");
+
+ // Tracks whether or not the node is primary. Avoids potential deadlocks taking the replication
+ // coordinator's mutex to check replication state. Also remains false for standalones that do
+ // not use timestamps.
+ bool _isPrimary = false;
+
+ // Locks around fetching the 'all_durable' timestamp from the storage engine and updating the
+ // oplogTruncateAfterPoint. This prevents the oplogTruncateAfterPoint from going backwards in
+ // time in case of multiple callers to refreshOplogTruncateAfterPointIfPrimary.
+ mutable Mutex _refreshOplogTruncateAfterPointMutex =
+ MONGO_MAKE_LATCH("ReplicationConsistencyMarkers::_refreshOplogTruncateAfterPointMutex");
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
index 92744af2032..28710b460eb 100644
--- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
@@ -95,6 +95,23 @@ Timestamp ReplicationConsistencyMarkersMock::getOplogTruncateAfterPoint(
return _oplogTruncateAfterPoint;
}
+void ReplicationConsistencyMarkersMock::startUsingOplogTruncateAfterPointForPrimary() {}
+
+void ReplicationConsistencyMarkersMock::stopUsingOplogTruncateAfterPointForPrimary() {}
+
+bool ReplicationConsistencyMarkersMock::isOplogTruncateAfterPointBeingUsedForPrimary() const {
+ return true;
+}
+
+void ReplicationConsistencyMarkersMock::setOplogTruncateAfterPointToTopOfOplog(
+ OperationContext* opCtx){};
+
+boost::optional<OpTimeAndWallTime>
+ReplicationConsistencyMarkersMock::refreshOplogTruncateAfterPointIfPrimary(
+ OperationContext* opCtx) {
+ return boost::none;
+}
+
void ReplicationConsistencyMarkersMock::setAppliedThrough(OperationContext* opCtx,
const OpTime& optime,
bool setTimestamp) {
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h
index eff8bf2961b..7afe39a6bd7 100644
--- a/src/mongo/db/repl/replication_consistency_markers_mock.h
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.h
@@ -62,9 +62,19 @@ public:
void setMinValidToAtLeast(OperationContext* opCtx, const OpTime& minValid) override;
void ensureFastCountOnOplogTruncateAfterPoint(OperationContext* opCtx) override;
+
void setOplogTruncateAfterPoint(OperationContext* opCtx, const Timestamp& timestamp) override;
Timestamp getOplogTruncateAfterPoint(OperationContext* opCtx) const override;
+ void startUsingOplogTruncateAfterPointForPrimary() override;
+ void stopUsingOplogTruncateAfterPointForPrimary() override;
+ bool isOplogTruncateAfterPointBeingUsedForPrimary() const override;
+
+ void setOplogTruncateAfterPointToTopOfOplog(OperationContext* opCtx) override;
+
+ boost::optional<OpTimeAndWallTime> refreshOplogTruncateAfterPointIfPrimary(
+ OperationContext* opCtx) override;
+
void setAppliedThrough(OperationContext* opCtx,
const OpTime& optime,
bool setTimestamp = true) override;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 0e5cf9f73cd..cd8ce9e6b3b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -406,6 +406,17 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx)
// _taskExecutor outside of _threadMutex because once _startedThreads is set to true, the
// _taskExecutor pointer never changes.
_taskExecutor->join();
+
+ // Clear the truncate point if we are still primary, so nothing gets truncated unnecessarily on
+ // startup. There are no oplog holes on clean primary shutdown. Stepdown is similarly safe and
+ // clears the truncate point. The other replication states do need truncation if the truncate
+ // point is set: e.g. interruption mid batch application can leave oplog holes.
+ if (!storageGlobalParams.readOnly &&
+ _replicationProcess->getConsistencyMarkers()
+ ->isOplogTruncateAfterPointBeingUsedForPrimary()) {
+ _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx,
+ Timestamp());
+ }
}
executor::TaskExecutor* ReplicationCoordinatorExternalStateImpl::getTaskExecutor() const {
@@ -466,13 +477,30 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
MongoDSessionCatalog::onStepUp(opCtx);
+ invariant(
+ _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull());
+
+ // A primary periodically updates the oplogTruncateAfterPoint to allow replication to proceed
+ // without danger of unidentifiable oplog holes on unclean shutdown due to parallel writes.
+ //
+ // Initialize the oplogTruncateAfterPoint so that user writes are safe on unclean shutdown
+ // between completion of transition to primary and the first async oplogTruncateAfterPoint
+ // update.
+ _replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPointToTopOfOplog(opCtx);
+
+ // Tell the system to start updating the oplogTruncateAfterPoint asynchronously and to use the
+ // truncate point, rather than last applied, to update the repl durable timestamp.
+ //
+ // The truncate point must be used while primary for repl's durable timestamp because otherwise
+ // we could truncate last applied writes on startup recovery after an unclean shutdown that were
+ // previously majority confirmed to the user.
+ _replicationProcess->getConsistencyMarkers()->startUsingOplogTruncateAfterPointForPrimary();
+
// Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
// done before we add anything to our oplog.
// We record this update at the 'lastAppliedOpTime'. If there are any outstanding
// checkpoints being taken, they should only reflect this write if they see all writes up
// to our 'lastAppliedOpTime'.
- invariant(
- _replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull());
auto lastAppliedOpTime = repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime();
_replicationProcess->getConsistencyMarkers()->clearAppliedThrough(
opCtx, lastAppliedOpTime.getTimestamp());
@@ -742,13 +770,29 @@ void ReplicationCoordinatorExternalStateImpl::clearOplogVisibilityStateForStepDo
ON_BLOCK_EXIT([&] { opCtx->setShouldParticipateInFlowControl(originalFlowControlSetting); });
opCtx->setShouldParticipateInFlowControl(false);
- // We can clear the oplogTruncateAfterPoint because we know there are no concurrent user writes
- // during stepdown and therefore presently no oplog holes.
+ // Tell the system to stop updating the oplogTruncateAfterPoint asynchronously and to go back to
+ // using last applied to update repl's durable timestamp instead of the truncate point.
+ _replicationProcess->getConsistencyMarkers()->stopUsingOplogTruncateAfterPointForPrimary();
+
+ // Interrupt the current JournalFlusher thread round, so it recognizes that it is no longer
+ // primary. Otherwise the asynchronously running thread could race with setting the truncate
+ // point to null below. This would leave the truncate point potentially stale in a non-PRIMARY
+ // state, where last applied would be used to update repl's durable timestamp and confirm
+ // majority writes. Startup recovery could truncate majority confirmed writes back to the stale
+ // truncate after point.
//
- // This value is updated periodically while in PRIMARY mode to protect against oplog holes on
- // unclean shutdown. The value must then be cleared on stepdown because stepup expects the value
- // to be unset. Batch application, in mode SECONDARY, also uses the value to protect against
- // unclean shutdown, and will handle both setting AND unsetting the value.
+ // This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold
+ // before doing an update write to the truncate point.
+ _service->getStorageEngine()->interruptJournalFlusherForReplStateChange();
+
+ // Wait for another round of journal flushing. This will ensure that we wait for the current
+ // round to completely finish and have no chance of racing with unsetting the truncate point
+ // below. It is possible that the JournalFlusher will not check for the interrupt signaled
+ // above, if writing is imminent, so we must make sure that the code completes fully.
+ _service->getStorageEngine()->waitForJournalFlush(opCtx);
+
+ // We can clear the oplogTruncateAfterPoint because we know there are no user writes during
+ // stepdown and therefore presently no oplog holes.
_replicationProcess->getConsistencyMarkers()->setOplogTruncateAfterPoint(opCtx, Timestamp());
}
@@ -976,6 +1020,16 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM
}
JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken(OperationContext* opCtx) {
+ // If in state PRIMARY, the oplogTruncateAfterPoint must be used for the Durable timestamp in
+ // order to avoid majority confirming any writes that could later be truncated.
+ auto truncatePoint = repl::ReplicationProcess::get(opCtx)
+ ->getConsistencyMarkers()
+ ->refreshOplogTruncateAfterPointIfPrimary(opCtx);
+ if (truncatePoint) {
+ return truncatePoint.get();
+ }
+
+ // All other repl states use the last applied.
return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime();
}
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 1dedaf98eef..1ea159ebccf 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -61,6 +61,13 @@ struct StorageGlobalParams;
class StorageEngine {
public:
/**
+ * This is the minimum valid timestamp; it can be used for reads that need to see all
+ * untimestamped data but no timestamped data. We cannot use 0 here because 0 means see all
+ * timestamped data.
+ */
+ static const uint64_t kMinimumTimestamp = 1;
+
+ /**
* When the storage engine needs to know how much oplog to preserve for the sake of active
* transactions, it executes a callback that returns either the oldest active transaction
* timestamp, or boost::none if there is no active transaction, or an error if it fails.
@@ -598,6 +605,8 @@ public:
* commit_timestamp and prepared transactions that have been given a durable_timestamp.
* Previously, the deprecated all_committed timestamp would also include prepared transactions
* that were prepared but not committed which could make the stable timestamp briefly jump back.
+ *
+ * Returns kMinimumTimestamp if there have been no new writes since the storage engine started.
*/
virtual Timestamp getAllDurableTimestamp() const = 0;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index dfd0fe4741a..4490e00306b 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -338,6 +338,7 @@ public:
* Signals an immediate journal flush and waits for it to complete before returning.
*
* Will throw ShutdownInProgress if the flusher thread is being stopped.
+ * Will throw InterruptedDueToReplStateChange if a flusher round is interrupted by stepdown.
*/
void waitForJournalFlush() {
auto myFuture = [&]() {
@@ -348,7 +349,7 @@ public:
}
return _nextSharedPromise->getFuture();
}();
- // Throws on error if the catalog is closed.
+ // Throws on error if the catalog is closed or the flusher round is interrupted by stepdown.
myFuture.get();
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
index cb20d2344f2..7a739b9a3c1 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
@@ -43,11 +43,6 @@
#include "mongo/util/scopeguard.h"
namespace mongo {
-namespace {
-// This is the minimum valid timestamp; it can be used for reads that need to see all untimestamped
-// data but no timestamped data. We cannot use 0 here because 0 means see all timestamped data.
-const uint64_t kMinimumTimestamp = 1;
-} // namespace
MONGO_FAIL_POINT_DEFINE(WTPausePrimaryOplogDurabilityLoop);
@@ -72,7 +67,7 @@ void WiredTigerOplogManager::start(OperationContext* opCtx,
"oplogVisibility"_attr = oplogVisibility);
} else {
// Avoid setting oplog visibility to 0. That means "everything is visible".
- setOplogReadTimestamp(Timestamp(kMinimumTimestamp));
+ setOplogReadTimestamp(Timestamp(StorageEngine::kMinimumTimestamp));
}
// Need to obtain the mutex before starting the thread, as otherwise it may race ahead
@@ -177,17 +172,6 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses
// forward cursors. The timestamp is used to hide oplog entries that might be committed but
// have uncommitted entries ahead of them.
while (true) {
- auto opCtx = cc().makeOperationContext();
-
- // This thread is started before we finish creating the StorageEngine and consequently
- // makeOperationContext() returns an OperationContext with a LockerNoop. Rather than trying
- // to refactor the code to start this thread after the StorageEngine is fully instantiated,
- // we will use this temporary hack to give the opCtx a real Locker.
- //
- // TODO (SERVER-41392): the Replicate Before Journaling project will be removing the
- // waitUntilDurable() call requiring an opCtx parameter.
- cc().swapLockState(std::make_unique<LockerImpl>());
-
stdx::unique_lock<Latch> lk(_oplogVisibilityStateMutex);
{
MONGO_IDLE_THREAD_BLOCK;
@@ -246,12 +230,6 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses
continue;
}
- // In order to avoid oplog holes after an unclean shutdown, we must ensure this proposed
- // oplog read timestamp's documents are durable before publishing that timestamp.
- sessionCache->waitUntilDurable(opCtx.get(),
- WiredTigerSessionCache::Fsync::kJournal,
- WiredTigerSessionCache::UseJournalListener::kUpdate);
-
lk.lock();
// Publish the new timestamp value. Avoid going backward.
auto oldTimestamp = getOplogReadTimestamp();
@@ -291,7 +269,7 @@ uint64_t WiredTigerOplogManager::fetchAllDurableValue(WT_CONNECTION* conn) {
if (wtstatus == WT_NOTFOUND) {
// Treat this as lowest possible timestamp; we need to see all preexisting data but no new
// (timestamped) data.
- return kMinimumTimestamp;
+ return StorageEngine::kMinimumTimestamp;
} else {
invariantWTOK(wtstatus);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h
index 2c81da13c0a..a899861f262 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h
@@ -70,7 +70,7 @@ public:
std::uint64_t getOplogReadTimestamp() const;
void setOplogReadTimestamp(Timestamp ts);
- // Triggers the oplogJournal thread to update its oplog read timestamp, by flushing the journal.
+ // Triggers the oplogJournal thread to update the oplog read timestamp.
void triggerOplogVisibilityUpdate();
// Waits until all committed writes at this point to become visible (that is, no holes exist in
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index b56f31fc455..978694fc270 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -250,9 +250,9 @@ void WiredTigerSessionCache::waitUntilDurable(OperationContext* opCtx,
// Update the JournalListener before we return. As far as listeners are concerned, all
// writes are as 'durable' as they are ever going to get on an inMemory storage engine.
auto journalListener = [&]() -> JournalListener* {
- // The JournalListener may not be set immediately, so we must check under a mutex so
- // as not to access the variable while setting a JournalListener. A JournalListener
- // is only allowed to be set once, so using the pointer outside of a mutex is safe.
+ // The JournalListener may not be set immediately, so we must check under a mutex so as
+ // not to access the variable while setting a JournalListener. A JournalListener is only
+ // allowed to be set once, so using the pointer outside of a mutex is safe.
stdx::unique_lock<Latch> lk(_journalListenerMutex);
return _journalListener;
}();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
index e488b73cd65..d4ec5de2ad2 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
@@ -309,7 +309,8 @@ public:
* Taking a checkpoint has the benefit of persisting unjournaled writes.
*
* 'useListener' controls whether or not the JournalListener is updated with the last durable
- * value of the timestamp that it tracks.
+ * value of the timestamp that it tracks. The JournalListener's token is fetched before writing
+ * out to disk and set afterwards to update the repl layer durable timestamp.
*
* Uses a temporary session. Safe to call without any locks, even during shutdown.
*/
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index 1a7f4ee868e..85262273fc6 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -267,28 +267,34 @@ Status waitForWriteConcern(OperationContext* opCtx,
WriteConcernOptions writeConcernWithPopulatedSyncMode =
replCoord->populateUnsetWriteConcernOptionsSyncMode(writeConcern);
- switch (writeConcernWithPopulatedSyncMode.syncMode) {
- case WriteConcernOptions::SyncMode::UNSET:
- LOGV2_FATAL(22550, "Attempting to wait on a WriteConcern with an unset sync option");
- fassertFailed(34410);
- case WriteConcernOptions::SyncMode::NONE:
- break;
- case WriteConcernOptions::SyncMode::FSYNC: {
- if (!storageEngine->isDurable()) {
- storageEngine->flushAllFiles(opCtx, /*callerHoldsReadLock*/ false);
-
- // This field has had a dummy value since MMAP went away. It is undocumented.
- // Maintaining it so as not to cause unnecessary user pain across upgrades.
- result->fsyncFiles = 1;
- } else {
- // We only need to commit the journal if we're durable
- storageEngine->waitForJournalFlush(opCtx);
+ // Waiting for durability (flushing the journal or all files to disk) can throw on interruption.
+ try {
+ switch (writeConcernWithPopulatedSyncMode.syncMode) {
+ case WriteConcernOptions::SyncMode::UNSET:
+ LOGV2_FATAL(22550,
+ "Attempting to wait on a WriteConcern with an unset sync option");
+ fassertFailed(34410);
+ case WriteConcernOptions::SyncMode::NONE:
+ break;
+ case WriteConcernOptions::SyncMode::FSYNC: {
+ if (!storageEngine->isDurable()) {
+ storageEngine->flushAllFiles(opCtx, /*callerHoldsReadLock*/ false);
+
+ // This field has had a dummy value since MMAP went away. It is undocumented.
+ // Maintaining it so as not to cause unnecessary user pain across upgrades.
+ result->fsyncFiles = 1;
+ } else {
+ // We only need to commit the journal if we're durable
+ storageEngine->waitForJournalFlush(opCtx);
+ }
+ break;
}
- break;
+ case WriteConcernOptions::SyncMode::JOURNAL:
+ storageEngine->waitForJournalFlush(opCtx);
+ break;
}
- case WriteConcernOptions::SyncMode::JOURNAL:
- storageEngine->waitForJournalFlush(opCtx);
- break;
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
result->syncMillis = syncTimer.millis();