summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2019-08-13 12:48:25 -0400
committerDianna Hohensee <dianna.hohensee@10gen.com>2019-08-23 14:30:06 -0400
commit266e079792be38e67442ef5fb2dbd557a6ca694d (patch)
tree9d47e3010299be13a7a37cc33fbaec7356c53b89
parentf98b80b33347b52cc4cd7b11880ac3057a32359f (diff)
downloadmongo-266e079792be38e67442ef5fb2dbd557a6ca694d.tar.gz
SERVER-42221 Add concurrency control to checkpoint requests and expose it to the validation cmd code layer
-rw-r--r--src/mongo/db/catalog/catalog_control_test.cpp3
-rw-r--r--src/mongo/db/commands/fsync.cpp2
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl_test.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp2
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp4
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp6
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp2
-rw-r--r--src/mongo/db/storage/biggie/biggie_recovery_unit.cpp2
-rw-r--r--src/mongo/db/storage/biggie/biggie_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp7
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h23
-rw-r--r--src/mongo/db/storage/mobile/mobile_recovery_unit.cpp3
-rw-r--r--src/mongo/db/storage/mobile/mobile_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/recovery_unit.h7
-rw-r--r--src/mongo/db/storage/recovery_unit_noop.h2
-rw-r--r--src/mongo/db/storage/recovery_unit_test_harness.cpp2
-rw-r--r--src/mongo/db/storage/storage_engine.h38
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp5
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h10
-rw-r--r--src/mongo/db/storage/storage_repair_observer.cpp2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp64
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h13
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp23
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp14
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp9
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp26
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp6
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h3
-rw-r--r--src/mongo/db/write_concern.cpp6
33 files changed, 230 insertions, 77 deletions
diff --git a/src/mongo/db/catalog/catalog_control_test.cpp b/src/mongo/db/catalog/catalog_control_test.cpp
index 827afdaed4b..ac152d32f38 100644
--- a/src/mongo/db/catalog/catalog_control_test.cpp
+++ b/src/mongo/db/catalog/catalog_control_test.cpp
@@ -114,6 +114,9 @@ public:
const DurableCatalog* getCatalog() const {
return nullptr;
}
+ std::unique_ptr<CheckpointLock> getCheckpointLock(OperationContext* opCtx) {
+ return nullptr;
+ }
};
/**
diff --git a/src/mongo/db/commands/fsync.cpp b/src/mongo/db/commands/fsync.cpp
index 3f3f2cc9374..66535156467 100644
--- a/src/mongo/db/commands/fsync.cpp
+++ b/src/mongo/db/commands/fsync.cpp
@@ -401,7 +401,7 @@ void FSyncLockThread::run() {
if (_allowFsyncFailure) {
warning() << "Locking despite storage engine being unable to begin backup : "
<< e.toString();
- opCtx.recoveryUnit()->waitUntilDurable();
+ opCtx.recoveryUnit()->waitUntilDurable(&opCtx);
} else {
error() << "storage engine unable to begin backup : " << e.toString();
fsyncCmd.threadStatus = e.toStatus();
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index a57b7e35ceb..406ad96c9fd 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -139,7 +139,7 @@ void ReplicationConsistencyMarkersImpl::setInitialSyncFlag(OperationContext* opC
update.timestamp = Timestamp();
_updateMinValidDocument(opCtx, update);
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
}
void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* opCtx) {
@@ -170,7 +170,7 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o
setOplogTruncateAfterPoint(opCtx, Timestamp());
if (getGlobalServiceContext()->getStorageEngine()->isDurable()) {
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
replCoord->setMyLastDurableOpTimeAndWallTime(opTimeAndWallTime);
}
}
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
index 04ebe4c6312..7fa36f8b983 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
@@ -136,13 +136,13 @@ private:
*/
class RecoveryUnitWithDurabilityTracking : public RecoveryUnitNoop {
public:
- bool waitUntilDurable() override;
+ bool waitUntilDurable(OperationContext* opCtx) override;
bool waitUntilDurableCalled = false;
};
-bool RecoveryUnitWithDurabilityTracking::waitUntilDurable() {
+bool RecoveryUnitWithDurabilityTracking::waitUntilDurable(OperationContext* opCtx) {
waitUntilDurableCalled = true;
- return RecoveryUnitNoop::waitUntilDurable();
+ return RecoveryUnitNoop::waitUntilDurable(opCtx);
}
TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 69fd30eac81..171fe14585f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -605,7 +605,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
return status;
}
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
return Status::OK();
} catch (const DBException& ex) {
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index dac500778f2..07621bf0ce8 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -328,7 +328,7 @@ void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* o
// timestamp to determine where to play oplog forward from. As this method shows, when a
// recovery timestamp does not exist, the applied through is used to determine where to start
// playing oplog entries from.
- opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable();
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(opCtx);
}
void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
@@ -504,7 +504,7 @@ void ReplicationRecoveryImpl::_truncateOplogIfNeededAndThenClearOplogTruncateAft
// Clear the oplogTruncateAfterPoint now that we have removed any holes that might exist in the
// oplog -- and so that we do not truncate future entries erroneously.
_consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {});
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
}
} // namespace repl
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index b896b43d6e4..1b685116242 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -1503,7 +1503,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
// are persisted to disk before truncating oplog.
log() << "Waiting for an unstable checkpoint";
const bool stableCheckpoint = false;
- opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint);
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(opCtx, stableCheckpoint);
}
log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString() << " ("
@@ -1541,7 +1541,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
// Take an unstable checkpoint to ensure the appliedThrough write is persisted to disk.
log() << "Waiting for an unstable checkpoint";
const bool stableCheckpoint = false;
- opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint);
+ opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(opCtx, stableCheckpoint);
// Ensure that appliedThrough is unset in the next stable checkpoint.
log() << "Clearing appliedThrough";
@@ -1676,7 +1676,7 @@ void rollback(OperationContext* opCtx,
// so that if we wind up shutting down uncleanly in response to something we rolled back
// we know that we won't wind up right back in the same situation when we start back up
// because the rollback wasn't durable.
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
// If we detected that we rolled back the shardIdentity document as part of this rollback
// then we must shut down to clear the in-memory ShardingState associated with the
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 7145bb15560..88f4e2f36c7 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -177,7 +177,7 @@ StatusWith<int> StorageInterfaceImpl::incrementRollbackID(OperationContext* opCt
// We wait until durable so that we are sure the Rollback ID is updated before rollback ends.
if (status.isOK()) {
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
return newRBID;
}
return status;
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 64b9071dd99..2320f05ff51 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -206,7 +206,7 @@ void ApplyBatchFinalizerForJournal::_run() {
}
auto opCtx = cc().makeOperationContext();
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
_recordDurable(latestOpTimeAndWallTime);
}
}
diff --git a/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp b/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp
index 16910fbc69e..4786b7de7c9 100644
--- a/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp
+++ b/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp
@@ -91,7 +91,7 @@ void RecoveryUnit::abortUnitOfWork() {
_abort();
}
-bool RecoveryUnit::waitUntilDurable() {
+bool RecoveryUnit::waitUntilDurable(OperationContext* opCtx) {
invariant(!_inUnitOfWork(), toString(_getState()));
return true; // This is an in-memory storage engine.
}
diff --git a/src/mongo/db/storage/biggie/biggie_recovery_unit.h b/src/mongo/db/storage/biggie/biggie_recovery_unit.h
index e3bca95885d..13a61067955 100644
--- a/src/mongo/db/storage/biggie/biggie_recovery_unit.h
+++ b/src/mongo/db/storage/biggie/biggie_recovery_unit.h
@@ -53,7 +53,7 @@ public:
return _inUnitOfWork();
}
- virtual bool waitUntilDurable() override;
+ virtual bool waitUntilDurable(OperationContext* opCtx) override;
virtual void abandonSnapshot() override;
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
index 61abd09b8f8..c99194a4406 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
@@ -63,7 +63,10 @@ void EphemeralForTestRecoveryUnit::commitUnitOfWork() {
// This ensures that the journal listener gets called on each commit.
// SERVER-22575: Remove this once we add a generic mechanism to periodically wait
// for durability.
- waitUntilDurable();
+ if (_waitUntilDurableCallback) {
+ _waitUntilDurableCallback();
+ }
+
_setState(State::kInactive);
}
@@ -86,7 +89,7 @@ void EphemeralForTestRecoveryUnit::abortUnitOfWork() {
_setState(State::kInactive);
}
-bool EphemeralForTestRecoveryUnit::waitUntilDurable() {
+bool EphemeralForTestRecoveryUnit::waitUntilDurable(OperationContext* opCtx) {
if (_waitUntilDurableCallback) {
_waitUntilDurableCallback();
}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
index d117a993090..ab368f45bf5 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
@@ -50,7 +50,7 @@ public:
void commitUnitOfWork() final;
void abortUnitOfWork() final;
- virtual bool waitUntilDurable();
+ virtual bool waitUntilDurable(OperationContext* opCtx);
bool inActiveTxn() const;
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 2c39ddbde93..2d8118c6f68 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -52,9 +52,19 @@ class SnapshotManager;
class KVEngine {
public:
- virtual RecoveryUnit* newRecoveryUnit() = 0;
+ /**
+ * This function should only be called after the StorageEngine is set on the ServiceContext.
+ *
+ * Starts asycnhronous threads for a storage engine's integration layer. Any such thread
+ * generating an OperationContext should be initialized here.
+ *
+ * In order for OperationContexts to be generated with real Locker objects, the generation must
+ * occur after the StorageEngine is instantiated and set on the ServiceContext. Otherwise,
+ * OperationContexts are created with LockerNoops.
+ */
+ virtual void startAsyncThreads() {}
- // ---------
+ virtual RecoveryUnit* newRecoveryUnit() = 0;
/**
* Requesting multiple copies for the same ns/ident is a rules violation; Calling on a
@@ -230,6 +240,15 @@ public:
}
/**
+ * See StorageEngine::getCheckpointLock for details.
+ */
+ virtual std::unique_ptr<StorageEngine::CheckpointLock> getCheckpointLock(
+ OperationContext* opCtx) {
+ uasserted(ErrorCodes::CommandNotSupported,
+ "The current storage engine does not support checkpoints");
+ }
+
+ /**
* Returns whether the KVEngine supports checkpoints.
*/
virtual bool supportsCheckpoints() const {
diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
index cbb59b061a9..05141f78c11 100644
--- a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
+++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
@@ -115,7 +115,7 @@ void MobileRecoveryUnit::abortUnitOfWork() {
_abort();
}
-bool MobileRecoveryUnit::waitUntilDurable() {
+bool MobileRecoveryUnit::waitUntilDurable(OperationContext* opCtx) {
// This is going to be slow as we're taking a global X lock and doing a full checkpoint. This
// should not be needed to do on Android or iOS if we are on WAL and synchronous=NORMAL which
// are our default settings. The system will make sure any non-flushed writes will not be lost
@@ -123,7 +123,6 @@ bool MobileRecoveryUnit::waitUntilDurable() {
// not call this (by disabling writeConcern j:true) but allow it when this is used inside
// mongod.
if (_sessionPool->getOptions().durabilityLevel < 2) {
- OperationContext* opCtx = Client::getCurrent()->getOperationContext();
_ensureSession(opCtx);
RECOVERY_UNIT_TRACE() << "waitUntilDurable called, attempting to perform a checkpoint";
int framesInWAL = 0;
diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.h b/src/mongo/db/storage/mobile/mobile_recovery_unit.h
index bf8917dda3a..163d2354837 100644
--- a/src/mongo/db/storage/mobile/mobile_recovery_unit.h
+++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.h
@@ -54,7 +54,7 @@ public:
void beginUnitOfWork(OperationContext* opCtx) override;
void commitUnitOfWork() override;
void abortUnitOfWork() override;
- bool waitUntilDurable() override;
+ bool waitUntilDurable(OperationContext* opCtx) override;
void abandonSnapshot() override;
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index a64ad387f12..b04caa40921 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -189,7 +189,7 @@ public:
* isDurable() returned true. This cannot be called from inside a unit of work, and should
* fail if it is.
*/
- virtual bool waitUntilDurable() = 0;
+ virtual bool waitUntilDurable(OperationContext* opCtx) = 0;
/**
* Unlike `waitUntilDurable`, this method takes a stable checkpoint, making durable any writes
@@ -200,8 +200,9 @@ public:
* This must not be called by a system taking user writes until after a stable timestamp is
* passed to the storage engine.
*/
- virtual bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) {
- return waitUntilDurable();
+ virtual bool waitUntilUnjournaledWritesDurable(OperationContext* opCtx,
+ bool stableCheckpoint = true) {
+ return waitUntilDurable(opCtx);
}
/**
diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h
index eddc0b2bdc1..740cd6f8d1c 100644
--- a/src/mongo/db/storage/recovery_unit_noop.h
+++ b/src/mongo/db/storage/recovery_unit_noop.h
@@ -64,7 +64,7 @@ public:
virtual void abandonSnapshot() {}
- virtual bool waitUntilDurable() {
+ virtual bool waitUntilDurable(OperationContext* opCtx) {
return true;
}
diff --git a/src/mongo/db/storage/recovery_unit_test_harness.cpp b/src/mongo/db/storage/recovery_unit_test_harness.cpp
index 531eadd55b1..4a6e6df8981 100644
--- a/src/mongo/db/storage/recovery_unit_test_harness.cpp
+++ b/src/mongo/db/storage/recovery_unit_test_harness.cpp
@@ -151,7 +151,7 @@ DEATH_TEST_F(RecoveryUnitTestHarness, PrepareMustBeInUnitOfWork, "invariant") {
DEATH_TEST_F(RecoveryUnitTestHarness, WaitUntilDurableMustBeOutOfUnitOfWork, "invariant") {
opCtx->recoveryUnit()->beginUnitOfWork(opCtx.get());
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx.get());
}
DEATH_TEST_F(RecoveryUnitTestHarness, AbandonSnapshotMustBeOutOfUnitOfWork, "invariant") {
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index cb99d4fc8f5..6f08d0765cd 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -151,6 +151,36 @@ public:
};
/**
+ * RAII-style class required for checkpoint activity. Instances should be obtained via
+ * getCheckpointLock() calls.
+ *
+ * Operations taking a checkpoint should create a CheckpointLock first. Also used when opening
+ * several checkpoint cursors to guarantee that all cursors are against the same checkpoint.
+ *
+ * This interface is placed in the StorageEngine in order to be accessible externally and
+ * internally to the storage layer. Each storage engine chooses how to implement it.
+ */
+ class CheckpointLock {
+ CheckpointLock(const CheckpointLock&) = delete;
+ CheckpointLock& operator=(const CheckpointLock&) = delete;
+
+ // We should never call the move constructor of the base class. We should not create base
+ // CheckpointLock instances, so any CheckpointLock type will actually be an instance of a
+ // derived class. At that point, moving the CheckpointLock type would call this constructor
+ // and skip the derived class' move constructor, likely leading to subtle bugs.
+ //
+ // Always using CheckpointLock pointer types will obviate needing a move constructor in
+ // either base or derived classes.
+ CheckpointLock(CheckpointLock&&) = delete;
+
+ public:
+ virtual ~CheckpointLock() = default;
+
+ protected:
+ CheckpointLock() = default;
+ };
+
+ /**
* The destructor should only be called if we are tearing down but not exiting the process.
*/
virtual ~StorageEngine() {}
@@ -558,6 +588,14 @@ public:
virtual const KVEngine* getEngine() const = 0;
virtual DurableCatalog* getCatalog() = 0;
virtual const DurableCatalog* getCatalog() const = 0;
+
+ /**
+ * Returns a CheckpointLock RAII instance that holds the checkpoint resource mutex.
+ *
+ * All operations taking a checkpoint should use this CheckpointLock. Also applicable for
+ * opening several checkpoint cursors to ensure the same checkpoint is targeted.
+ */
+ virtual std::unique_ptr<CheckpointLock> getCheckpointLock(OperationContext* opCtx) = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index f96445b0f66..5df8a4e4df7 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -537,6 +537,11 @@ void StorageEngineImpl::cleanShutdown() {
StorageEngineImpl::~StorageEngineImpl() {}
void StorageEngineImpl::finishInit() {
+ // A storage engine may need to start threads that require OperationsContexts with real Lockers,
+ // as opposed to LockerNoops. Placing the start logic here, after the StorageEngine has been
+ // instantiated, causes makeOperationContext() to create LockerImpls instead of LockerNoops.
+ _engine->startAsyncThreads();
+
if (_engine->supportsRecoveryTimestamp()) {
_timestampMonitor = std::make_unique<TimestampMonitor>(
_engine.get(), getGlobalServiceContext()->getPeriodicRunner());
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index 13e704367d2..07f2cf6f42d 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -63,7 +63,7 @@ struct StorageEngineOptions {
class StorageEngineImpl final : public StorageEngineInterface, public StorageEngine {
public:
/**
- * @param engine - ownership passes to me
+ * @param engine - ownership passes to me.
*/
StorageEngineImpl(KVEngine* engine, StorageEngineOptions options = StorageEngineOptions());
@@ -163,8 +163,6 @@ public:
void setJournalListener(JournalListener* jl) final;
- // ------ kv ------
-
/**
* A TimestampMonitor is used to listen for any changes in the timestamps implemented by the
* storage engine and to notify any registered listeners upon changes to these timestamps.
@@ -309,6 +307,7 @@ public:
KVEngine* getEngine() {
return _engine.get();
}
+
const KVEngine* getEngine() const {
return _engine.get();
}
@@ -320,10 +319,15 @@ public:
DurableCatalog* getCatalog() {
return _catalog.get();
}
+
const DurableCatalog* getCatalog() const {
return _catalog.get();
}
+ std::unique_ptr<CheckpointLock> getCheckpointLock(OperationContext* opCtx) {
+ return _engine->getCheckpointLock(opCtx);
+ }
+
/**
* Drop abandoned idents. Returns a parallel list of index name, index spec pairs to rebuild.
*/
diff --git a/src/mongo/db/storage/storage_repair_observer.cpp b/src/mongo/db/storage/storage_repair_observer.cpp
index 3bd7c0de042..23320197d5d 100644
--- a/src/mongo/db/storage/storage_repair_observer.cpp
+++ b/src/mongo/db/storage/storage_repair_observer.cpp
@@ -148,7 +148,7 @@ void StorageRepairObserver::_invalidateReplConfigIfNeeded(OperationContext* opCt
configBuilder.append(repl::ReplSetConfig::kRepairedFieldName, true);
Helpers::putSingleton(opCtx, kConfigNss.ns().c_str(), configBuilder.obj());
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
}
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index d49cdfe821b..cabbef66485 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -234,10 +234,11 @@ public:
LOG(1) << "starting " << name() << " thread";
while (!_shuttingDown.load()) {
+ auto opCtx = tc->makeOperationContext();
try {
const bool forceCheckpoint = false;
const bool stableCheckpoint = false;
- _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint);
+ _sessionCache->waitUntilDurable(opCtx.get(), forceCheckpoint, stableCheckpoint);
} catch (const AssertionException& e) {
invariant(e.code() == ErrorCodes::ShutdownInProgress);
}
@@ -260,6 +261,33 @@ private:
AtomicWord<bool> _shuttingDown{false};
};
+namespace {
+
+/**
+ * RAII class that holds an exclusive lock on the checkpoint resource mutex.
+ *
+ * Instances are created via getCheckpointLock(), which passes in the checkpoint resource mutex.
+ */
+class CheckpointLockImpl : public StorageEngine::CheckpointLock {
+ CheckpointLockImpl(const CheckpointLockImpl&) = delete;
+ CheckpointLockImpl& operator=(const CheckpointLockImpl&) = delete;
+ CheckpointLockImpl(CheckpointLockImpl&& other) = delete;
+
+public:
+ CheckpointLockImpl() = delete;
+ CheckpointLockImpl(OperationContext* opCtx, Lock::ResourceMutex mutex)
+ : _lk(opCtx->lockState(), mutex) {
+ invariant(_lk.isLocked());
+ }
+
+ ~CheckpointLockImpl() = default;
+
+private:
+ Lock::ExclusiveLock _lk;
+};
+
+} // namespace
+
std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult& r) {
if (r.isOK()) {
if (r.getValue()) {
@@ -291,6 +319,8 @@ public:
LOG(1) << "starting " << name() << " thread";
while (!_shuttingDown.load()) {
+ auto opCtx = tc->makeOperationContext();
+
{
stdx::unique_lock<stdx::mutex> lock(_mutex);
MONGO_IDLE_THREAD_BLOCK;
@@ -342,6 +372,7 @@ public:
if (initialDataTimestamp.asULL() <= 1) {
UniqueWiredTigerSession session = _sessionCache->getSession();
WT_SESSION* s = session->getSession();
+ auto checkpointLock = _wiredTigerKVEngine->getCheckpointLock(opCtx.get());
invariantWTOK(s->checkpoint(s, "use_timestamp=false"));
} else if (stableTimestamp < initialDataTimestamp) {
LOG_FOR_RECOVERY(2)
@@ -358,6 +389,7 @@ public:
UniqueWiredTigerSession session = _sessionCache->getSession();
WT_SESSION* s = session->getSession();
+ auto checkpointLock = _wiredTigerKVEngine->getCheckpointLock(opCtx.get());
invariantWTOK(s->checkpoint(s, "use_timestamp=true"));
if (oplogNeededForRollback.isOK()) {
@@ -671,11 +703,6 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
_sessionSweeper = std::make_unique<WiredTigerSessionSweeper>(_sessionCache.get());
_sessionSweeper->go();
- if (_durable && !_ephemeral) {
- _journalFlusher = std::make_unique<WiredTigerJournalFlusher>(_sessionCache.get());
- _journalFlusher->go();
- }
-
// Until the Replication layer installs a real callback, prevent truncating the oplog.
setOldestActiveTransactionTimestampCallback(
[](Timestamp) { return StatusWith(boost::make_optional(Timestamp::min())); });
@@ -686,9 +713,6 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
setOldestTimestamp(_recoveryTimestamp, false);
setStableTimestamp(_recoveryTimestamp, false);
}
-
- _checkpointThread = std::make_unique<WiredTigerCheckpointThread>(this, _sessionCache.get());
- _checkpointThread->go();
}
if (_ephemeral && !getTestCommandsEnabled()) {
@@ -713,7 +737,6 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
Locker::setGlobalThrottling(&openReadTransaction, &openWriteTransaction);
}
-
WiredTigerKVEngine::~WiredTigerKVEngine() {
if (_conn) {
cleanShutdown();
@@ -722,6 +745,20 @@ WiredTigerKVEngine::~WiredTigerKVEngine() {
_sessionCache.reset(nullptr);
}
+void WiredTigerKVEngine::startAsyncThreads() {
+ if (!_ephemeral) {
+ if (_durable) {
+ _journalFlusher = std::make_unique<WiredTigerJournalFlusher>(_sessionCache.get());
+ _journalFlusher->go();
+ }
+ if (!_readOnly) {
+ _checkpointThread =
+ std::make_unique<WiredTigerCheckpointThread>(this, _sessionCache.get());
+ _checkpointThread->go();
+ }
+ }
+}
+
void WiredTigerKVEngine::appendGlobalStats(BSONObjBuilder& b) {
BSONObjBuilder bb(b.subobjStart("concurrentTransactions"));
{
@@ -990,7 +1027,7 @@ int WiredTigerKVEngine::flushAllFiles(OperationContext* opCtx, bool sync) {
const bool forceCheckpoint = true;
// If there's no journal, we must take a full checkpoint.
const bool stableCheckpoint = _durable;
- _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint);
+ _sessionCache->waitUntilDurable(opCtx, forceCheckpoint, stableCheckpoint);
return 1;
}
@@ -1899,6 +1936,11 @@ Timestamp WiredTigerKVEngine::getPinnedOplog() const {
return Timestamp::min();
}
+std::unique_ptr<StorageEngine::CheckpointLock> WiredTigerKVEngine::getCheckpointLock(
+ OperationContext* opCtx) {
+ return std::make_unique<CheckpointLockImpl>(opCtx, _checkpointMutex);
+}
+
bool WiredTigerKVEngine::supportsReadConcernSnapshot() const {
return true;
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index bb8c1e4c76c..7b98f9fd388 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -39,6 +39,7 @@
#include "mongo/bson/ordering.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
@@ -80,6 +81,8 @@ public:
~WiredTigerKVEngine();
+ void startAsyncThreads() override;
+
void setRecordStoreExtraOptions(const std::string& options);
void setSortedDataInterfaceExtraOptions(const std::string& options);
@@ -345,6 +348,12 @@ public:
return _clockSource;
}
+ /**
+ * Returns a CheckpointLockImpl RAII instance holding the _checkpointMutex.
+ */
+ std::unique_ptr<StorageEngine::CheckpointLock> getCheckpointLock(
+ OperationContext* opCtx) override;
+
private:
class WiredTigerSessionSweeper;
class WiredTigerJournalFlusher;
@@ -461,5 +470,9 @@ private:
// Timestamp of data at startup. Used internally to advise checkpointing and recovery to a
// timestamp. Provided by replication layer because WT does not persist timestamps.
AtomicWord<std::uint64_t> _initialDataTimestamp;
+
+ // Required for taking a checkpoint; and can be used to ensure multiple checkpoint cursors
+ // target the same checkpoint.
+ Lock::ResourceMutex _checkpointMutex = Lock::ResourceMutex("checkpointCursorMutex");
};
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
index 8a1ca32e483..334cda5b748 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp
@@ -80,16 +80,19 @@ public:
private:
WiredTigerKVEngine* makeEngine() {
- return new WiredTigerKVEngine(kWiredTigerEngineName,
- _dbpath.path(),
- _cs.get(),
- "",
- 1,
- 0,
- false,
- false,
- _forRepair,
- false);
+ auto engine = new WiredTigerKVEngine(kWiredTigerEngineName,
+ _dbpath.path(),
+ _cs.get(),
+ "",
+ 1,
+ 0,
+ false,
+ false,
+ _forRepair,
+ false);
+ // There are unit tests expecting checkpoints to occur asynchronously.
+ engine->startAsyncThreads();
+ return engine;
}
const std::unique_ptr<ClockSource> _cs = std::make_unique<ClockSourceMock>();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
index e3ec7e9097d..2edf06fb8a2 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
@@ -33,6 +33,7 @@
#include <cstring>
+#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
@@ -162,6 +163,17 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses
// forward cursors. The timestamp is used to hide oplog entries that might be committed but
// have uncommitted entries ahead of them.
while (true) {
+ auto opCtx = cc().makeOperationContext();
+
+ // This thread is started before we finish creating the StorageEngine and consequently
+ // makeOperationContext() returns an OperationContext with a LockerNoop. Rather than trying
+ // to refactor the code to start this thread after the StorageEngine is fully instantiated,
+ // we will use this temporary hack to give the opCtx a real Locker.
+ //
+ // TODO (SERVER-41392): the Replicate Before Journaling project will be removing the
+ // waitUntilDurable() call requiring an opCtx parameter.
+ opCtx->swapLockState(std::make_unique<LockerImpl>());
+
stdx::unique_lock<stdx::mutex> lk(_oplogVisibilityStateMutex);
{
MONGO_IDLE_THREAD_BLOCK;
@@ -219,7 +231,7 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses
// In order to avoid oplog holes after an unclean shutdown, we must ensure this proposed
// oplog read timestamp's documents are durable before publishing that timestamp.
- sessionCache->waitUntilDurable(/*forceCheckpoint=*/false, false);
+ sessionCache->waitUntilDurable(opCtx.get(), /*forceCheckpoint=*/false, false);
lk.lock();
// Publish the new timestamp value. Avoid going backward.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 3b785878db8..b0275a2c7ab 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -251,21 +251,22 @@ void WiredTigerRecoveryUnit::_ensureSession() {
}
}
-bool WiredTigerRecoveryUnit::waitUntilDurable() {
+bool WiredTigerRecoveryUnit::waitUntilDurable(OperationContext* opCtx) {
invariant(!_inUnitOfWork(), toString(_getState()));
const bool forceCheckpoint = false;
const bool stableCheckpoint = false;
- _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint);
+ _sessionCache->waitUntilDurable(opCtx, forceCheckpoint, stableCheckpoint);
return true;
}
-bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable(bool stableCheckpoint) {
+bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable(OperationContext* opCtx,
+ bool stableCheckpoint) {
invariant(!_inUnitOfWork(), toString(_getState()));
const bool forceCheckpoint = true;
// Calling `waitUntilDurable` with `forceCheckpoint` set to false only performs a log
// (journal) flush, and thus has no effect on unjournaled writes. Setting `forceCheckpoint` to
// true will lock in stable writes to unjournaled tables.
- _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint);
+ _sessionCache->waitUntilDurable(opCtx, forceCheckpoint, stableCheckpoint);
return true;
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index 81483c51cd7..c0dae116c73 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -107,9 +107,10 @@ public:
void commitUnitOfWork() override;
void abortUnitOfWork() override;
- bool waitUntilDurable() override;
+ bool waitUntilDurable(OperationContext* opCtx) override;
- bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) override;
+ bool waitUntilUnjournaledWritesDurable(OperationContext* opCtx,
+ bool stableCheckpoint = true) override;
void abandonSnapshot() override;
void preallocateSnapshot() override;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
index fbd3b398e2a..27ce7b946bd 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
@@ -61,6 +61,13 @@ public:
false, // .repair
false // .readOnly
) {
+ // Deliberately not calling _engine->startAsyncThreads() because it starts an asynchronous
+ // checkpointing thread that can interfere with unit tests manipulating checkpoints
+ // manually.
+ //
+ // Alternatively, we would have to start using wiredTigerGlobalOptions.checkpointDelaySecs
+ // to set a high enough value such that the async thread never runs during testing.
+
repl::ReplicationCoordinator::set(
getGlobalServiceContext(),
std::unique_ptr<repl::ReplicationCoordinator>(new repl::ReplicationCoordinatorMock(
@@ -599,7 +606,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, CheckpointCursorsAreNotCached) {
ru->abandonSnapshot();
- // Force a checkpoint
+ // Force a checkpoint.
ASSERT_EQUALS(1, engine->flushAllFiles(opCtx, true));
// Test 2: Checkpoint cursors are not expected to be cached, they
@@ -690,7 +697,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, CheckpointCursorNotChanged) {
ASSERT_EQUALS(1, rs->numRecords(opCtx));
ru->commitUnitOfWork();
- // Force a checkpoint
+ // Force a checkpoint.
ASSERT_EQUALS(1, engine->flushAllFiles(opCtx, true));
// Test 1: Open a checkpoint cursor and ensure it has the first record.
@@ -709,16 +716,13 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, CheckpointCursorNotChanged) {
ASSERT(!originalCheckpointCursor->seekExact(s2.getValue()));
ASSERT(originalCheckpointCursor->seekExact(s1.getValue()));
- // Test 3: New record does not appear in new checkpoint cursor since no
- // checkpoint was created.
- // TODO: Implement in SERVER-42221
- //
- // ru->setTimestampReadSource(WiredTigerRecoveryUnit::ReadSource::kCheckpoint);
- // auto newCheckpointCursor = rs->getCursor(opCtx, true);
- // ASSERT(!newCheckpointCursor->seekExact(s2.getValue()));
- //
+ // Test 3: New record does not appear in new checkpoint cursor since no new checkpoint was
+ // created.
+ ru->setTimestampReadSource(WiredTigerRecoveryUnit::ReadSource::kCheckpoint);
+ auto checkpointCursor = rs->getCursor(opCtx, true);
+ ASSERT(!checkpointCursor->seekExact(s2.getValue()));
- // Force a checkpoint
+ // Force a checkpoint.
ASSERT_EQUALS(1, engine->flushAllFiles(opCtx, true));
// Test 4: Old and new record should appear in new checkpoint cursor. Only old record
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index 325c6728ac9..382c753daf2 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -227,7 +227,9 @@ void WiredTigerSessionCache::shuttingDown() {
closeAll();
}
-void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint, bool stableCheckpoint) {
+void WiredTigerSessionCache::waitUntilDurable(OperationContext* opCtx,
+ bool forceCheckpoint,
+ bool stableCheckpoint) {
// For inMemory storage engines, the data is "as durable as it's going to get".
// That is, a restart is equivalent to a complete node failure.
if (isEphemeral()) {
@@ -259,6 +261,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint, bool stableC
stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex);
JournalListener::Token token = _journalListener->getToken();
auto config = stableCheckpoint ? "use_timestamp=true" : "use_timestamp=false";
+ auto checkpointLock = _engine->getCheckpointLock(opCtx);
invariantWTOK(s->checkpoint(s, config));
_journalListener->onDurable(token);
}
@@ -295,6 +298,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint, bool stableC
invariantWTOK(_waitUntilDurableSession->log_flush(_waitUntilDurableSession, "sync=on"));
LOG(4) << "flushed journal";
} else {
+ auto checkpointLock = _engine->getCheckpointLock(opCtx);
invariantWTOK(_waitUntilDurableSession->checkpoint(_waitUntilDurableSession, nullptr));
LOG(4) << "created checkpoint";
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
index d09220380a3..72b55e311ed 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h
@@ -258,12 +258,13 @@ public:
void shuttingDown();
bool isEphemeral();
+
/**
* Waits until all commits that happened before this call are durable, either by flushing
* the log or forcing a checkpoint if forceCheckpoint is true or the journal is disabled.
* Uses a temporary session. Safe to call without any locks, even during shutdown.
*/
- void waitUntilDurable(bool forceCheckpoint, bool stableCheckpoint);
+ void waitUntilDurable(OperationContext* opCtx, bool forceCheckpoint, bool stableCheckpoint);
/**
* Waits until a prepared unit of work has ended (either been commited or aborted). This
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index ca87ea7d50c..b4e2b756f9f 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -191,7 +191,7 @@ Status waitForWriteConcern(OperationContext* opCtx,
result->fsyncFiles = storageEngine->flushAllFiles(opCtx, true);
} else {
// We only need to commit the journal if we're durable
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
}
break;
}
@@ -200,10 +200,10 @@ Status waitForWriteConcern(OperationContext* opCtx,
// Wait for ops to become durable then update replication system's
// knowledge of this.
auto appliedOpTimeAndWallTime = replCoord->getMyLastAppliedOpTimeAndWallTime();
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
replCoord->setMyLastDurableOpTimeAndWallTimeForward(appliedOpTimeAndWallTime);
} else {
- opCtx->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
}
break;
}