diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2019-08-13 12:48:25 -0400 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2019-08-23 14:30:06 -0400 |
commit | 266e079792be38e67442ef5fb2dbd557a6ca694d (patch) | |
tree | 9d47e3010299be13a7a37cc33fbaec7356c53b89 | |
parent | f98b80b33347b52cc4cd7b11880ac3057a32359f (diff) | |
download | mongo-266e079792be38e67442ef5fb2dbd557a6ca694d.tar.gz |
SERVER-42221 Add concurrency control to checkpoint requests and expose it to the validation cmd code layer
33 files changed, 230 insertions, 77 deletions
diff --git a/src/mongo/db/catalog/catalog_control_test.cpp b/src/mongo/db/catalog/catalog_control_test.cpp index 827afdaed4b..ac152d32f38 100644 --- a/src/mongo/db/catalog/catalog_control_test.cpp +++ b/src/mongo/db/catalog/catalog_control_test.cpp @@ -114,6 +114,9 @@ public: const DurableCatalog* getCatalog() const { return nullptr; } + std::unique_ptr<CheckpointLock> getCheckpointLock(OperationContext* opCtx) { + return nullptr; + } }; /** diff --git a/src/mongo/db/commands/fsync.cpp b/src/mongo/db/commands/fsync.cpp index 3f3f2cc9374..66535156467 100644 --- a/src/mongo/db/commands/fsync.cpp +++ b/src/mongo/db/commands/fsync.cpp @@ -401,7 +401,7 @@ void FSyncLockThread::run() { if (_allowFsyncFailure) { warning() << "Locking despite storage engine being unable to begin backup : " << e.toString(); - opCtx.recoveryUnit()->waitUntilDurable(); + opCtx.recoveryUnit()->waitUntilDurable(&opCtx); } else { error() << "storage engine unable to begin backup : " << e.toString(); fsyncCmd.threadStatus = e.toStatus(); diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index a57b7e35ceb..406ad96c9fd 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -139,7 +139,7 @@ void ReplicationConsistencyMarkersImpl::setInitialSyncFlag(OperationContext* opC update.timestamp = Timestamp(); _updateMinValidDocument(opCtx, update); - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); } void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* opCtx) { @@ -170,7 +170,7 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o setOplogTruncateAfterPoint(opCtx, Timestamp()); if (getGlobalServiceContext()->getStorageEngine()->isDurable()) { - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); replCoord->setMyLastDurableOpTimeAndWallTime(opTimeAndWallTime); } } diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp index 04ebe4c6312..7fa36f8b983 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp @@ -136,13 +136,13 @@ private: */ class RecoveryUnitWithDurabilityTracking : public RecoveryUnitNoop { public: - bool waitUntilDurable() override; + bool waitUntilDurable(OperationContext* opCtx) override; bool waitUntilDurableCalled = false; }; -bool RecoveryUnitWithDurabilityTracking::waitUntilDurable() { +bool RecoveryUnitWithDurabilityTracking::waitUntilDurable(OperationContext* opCtx) { waitUntilDurableCalled = true; - return RecoveryUnitNoop::waitUntilDurable(); + return RecoveryUnitNoop::waitUntilDurable(opCtx); } TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 69fd30eac81..171fe14585f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -605,7 +605,7 @@ Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( return status; } - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); return Status::OK(); } catch (const DBException& ex) { diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index dac500778f2..07621bf0ce8 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -328,7 +328,7 @@ void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* o // timestamp to determine where to play oplog forward from. As this method shows, when a // recovery timestamp does not exist, the applied through is used to determine where to start // playing oplog entries from. - opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(); + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(opCtx); } void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, @@ -504,7 +504,7 @@ void ReplicationRecoveryImpl::_truncateOplogIfNeededAndThenClearOplogTruncateAft // Clear the oplogTruncateAfterPoint now that we have removed any holes that might exist in the // oplog -- and so that we do not truncate future entries erroneously. _consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {}); - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); } } // namespace repl diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index b896b43d6e4..1b685116242 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -1503,7 +1503,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, // are persisted to disk before truncating oplog. log() << "Waiting for an unstable checkpoint"; const bool stableCheckpoint = false; - opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint); + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(opCtx, stableCheckpoint); } log() << "Truncating the oplog at " << fixUpInfo.commonPoint.toString() << " (" @@ -1541,7 +1541,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, // Take an unstable checkpoint to ensure the appliedThrough write is persisted to disk. log() << "Waiting for an unstable checkpoint"; const bool stableCheckpoint = false; - opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(stableCheckpoint); + opCtx->recoveryUnit()->waitUntilUnjournaledWritesDurable(opCtx, stableCheckpoint); // Ensure that appliedThrough is unset in the next stable checkpoint. log() << "Clearing appliedThrough"; @@ -1676,7 +1676,7 @@ void rollback(OperationContext* opCtx, // so that if we wind up shutting down uncleanly in response to something we rolled back // we know that we won't wind up right back in the same situation when we start back up // because the rollback wasn't durable. - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); // If we detected that we rolled back the shardIdentity document as part of this rollback // then we must shut down to clear the in-memory ShardingState associated with the diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 7145bb15560..88f4e2f36c7 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -177,7 +177,7 @@ StatusWith<int> StorageInterfaceImpl::incrementRollbackID(OperationContext* opCt // We wait until durable so that we are sure the Rollback ID is updated before rollback ends. if (status.isOK()) { - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); return newRBID; } return status; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 64b9071dd99..2320f05ff51 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -206,7 +206,7 @@ void ApplyBatchFinalizerForJournal::_run() { } auto opCtx = cc().makeOperationContext(); - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); _recordDurable(latestOpTimeAndWallTime); } } diff --git a/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp b/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp index 16910fbc69e..4786b7de7c9 100644 --- a/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp +++ b/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp @@ -91,7 +91,7 @@ void RecoveryUnit::abortUnitOfWork() { _abort(); } -bool RecoveryUnit::waitUntilDurable() { +bool RecoveryUnit::waitUntilDurable(OperationContext* opCtx) { invariant(!_inUnitOfWork(), toString(_getState())); return true; // This is an in-memory storage engine. } diff --git a/src/mongo/db/storage/biggie/biggie_recovery_unit.h b/src/mongo/db/storage/biggie/biggie_recovery_unit.h index e3bca95885d..13a61067955 100644 --- a/src/mongo/db/storage/biggie/biggie_recovery_unit.h +++ b/src/mongo/db/storage/biggie/biggie_recovery_unit.h @@ -53,7 +53,7 @@ public: return _inUnitOfWork(); } - virtual bool waitUntilDurable() override; + virtual bool waitUntilDurable(OperationContext* opCtx) override; virtual void abandonSnapshot() override; diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp index 61abd09b8f8..c99194a4406 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp @@ -63,7 +63,10 @@ void EphemeralForTestRecoveryUnit::commitUnitOfWork() { // This ensures that the journal listener gets called on each commit. // SERVER-22575: Remove this once we add a generic mechanism to periodically wait // for durability. - waitUntilDurable(); + if (_waitUntilDurableCallback) { + _waitUntilDurableCallback(); + } + _setState(State::kInactive); } @@ -86,7 +89,7 @@ void EphemeralForTestRecoveryUnit::abortUnitOfWork() { _setState(State::kInactive); } -bool EphemeralForTestRecoveryUnit::waitUntilDurable() { +bool EphemeralForTestRecoveryUnit::waitUntilDurable(OperationContext* opCtx) { if (_waitUntilDurableCallback) { _waitUntilDurableCallback(); } diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h index d117a993090..ab368f45bf5 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h @@ -50,7 +50,7 @@ public: void commitUnitOfWork() final; void abortUnitOfWork() final; - virtual bool waitUntilDurable(); + virtual bool waitUntilDurable(OperationContext* opCtx); bool inActiveTxn() const; diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index 2c39ddbde93..2d8118c6f68 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -52,9 +52,19 @@ class SnapshotManager; class KVEngine { public: - virtual RecoveryUnit* newRecoveryUnit() = 0; + /** + * This function should only be called after the StorageEngine is set on the ServiceContext. + * + * Starts asycnhronous threads for a storage engine's integration layer. Any such thread + * generating an OperationContext should be initialized here. + * + * In order for OperationContexts to be generated with real Locker objects, the generation must + * occur after the StorageEngine is instantiated and set on the ServiceContext. Otherwise, + * OperationContexts are created with LockerNoops. + */ + virtual void startAsyncThreads() {} - // --------- + virtual RecoveryUnit* newRecoveryUnit() = 0; /** * Requesting multiple copies for the same ns/ident is a rules violation; Calling on a @@ -230,6 +240,15 @@ public: } /** + * See StorageEngine::getCheckpointLock for details. + */ + virtual std::unique_ptr<StorageEngine::CheckpointLock> getCheckpointLock( + OperationContext* opCtx) { + uasserted(ErrorCodes::CommandNotSupported, + "The current storage engine does not support checkpoints"); + } + + /** * Returns whether the KVEngine supports checkpoints. */ virtual bool supportsCheckpoints() const { diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp index cbb59b061a9..05141f78c11 100644 --- a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp +++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp @@ -115,7 +115,7 @@ void MobileRecoveryUnit::abortUnitOfWork() { _abort(); } -bool MobileRecoveryUnit::waitUntilDurable() { +bool MobileRecoveryUnit::waitUntilDurable(OperationContext* opCtx) { // This is going to be slow as we're taking a global X lock and doing a full checkpoint. This // should not be needed to do on Android or iOS if we are on WAL and synchronous=NORMAL which // are our default settings. The system will make sure any non-flushed writes will not be lost @@ -123,7 +123,6 @@ bool MobileRecoveryUnit::waitUntilDurable() { // not call this (by disabling writeConcern j:true) but allow it when this is used inside // mongod. if (_sessionPool->getOptions().durabilityLevel < 2) { - OperationContext* opCtx = Client::getCurrent()->getOperationContext(); _ensureSession(opCtx); RECOVERY_UNIT_TRACE() << "waitUntilDurable called, attempting to perform a checkpoint"; int framesInWAL = 0; diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.h b/src/mongo/db/storage/mobile/mobile_recovery_unit.h index bf8917dda3a..163d2354837 100644 --- a/src/mongo/db/storage/mobile/mobile_recovery_unit.h +++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.h @@ -54,7 +54,7 @@ public: void beginUnitOfWork(OperationContext* opCtx) override; void commitUnitOfWork() override; void abortUnitOfWork() override; - bool waitUntilDurable() override; + bool waitUntilDurable(OperationContext* opCtx) override; void abandonSnapshot() override; diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index a64ad387f12..b04caa40921 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -189,7 +189,7 @@ public: * isDurable() returned true. This cannot be called from inside a unit of work, and should * fail if it is. */ - virtual bool waitUntilDurable() = 0; + virtual bool waitUntilDurable(OperationContext* opCtx) = 0; /** * Unlike `waitUntilDurable`, this method takes a stable checkpoint, making durable any writes @@ -200,8 +200,9 @@ public: * This must not be called by a system taking user writes until after a stable timestamp is * passed to the storage engine. */ - virtual bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) { - return waitUntilDurable(); + virtual bool waitUntilUnjournaledWritesDurable(OperationContext* opCtx, + bool stableCheckpoint = true) { + return waitUntilDurable(opCtx); } /** diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h index eddc0b2bdc1..740cd6f8d1c 100644 --- a/src/mongo/db/storage/recovery_unit_noop.h +++ b/src/mongo/db/storage/recovery_unit_noop.h @@ -64,7 +64,7 @@ public: virtual void abandonSnapshot() {} - virtual bool waitUntilDurable() { + virtual bool waitUntilDurable(OperationContext* opCtx) { return true; } diff --git a/src/mongo/db/storage/recovery_unit_test_harness.cpp b/src/mongo/db/storage/recovery_unit_test_harness.cpp index 531eadd55b1..4a6e6df8981 100644 --- a/src/mongo/db/storage/recovery_unit_test_harness.cpp +++ b/src/mongo/db/storage/recovery_unit_test_harness.cpp @@ -151,7 +151,7 @@ DEATH_TEST_F(RecoveryUnitTestHarness, PrepareMustBeInUnitOfWork, "invariant") { DEATH_TEST_F(RecoveryUnitTestHarness, WaitUntilDurableMustBeOutOfUnitOfWork, "invariant") { opCtx->recoveryUnit()->beginUnitOfWork(opCtx.get()); - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx.get()); } DEATH_TEST_F(RecoveryUnitTestHarness, AbandonSnapshotMustBeOutOfUnitOfWork, "invariant") { diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index cb99d4fc8f5..6f08d0765cd 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -151,6 +151,36 @@ public: }; /** + * RAII-style class required for checkpoint activity. Instances should be obtained via + * getCheckpointLock() calls. + * + * Operations taking a checkpoint should create a CheckpointLock first. Also used when opening + * several checkpoint cursors to guarantee that all cursors are against the same checkpoint. + * + * This interface is placed in the StorageEngine in order to be accessible externally and + * internally to the storage layer. Each storage engine chooses how to implement it. + */ + class CheckpointLock { + CheckpointLock(const CheckpointLock&) = delete; + CheckpointLock& operator=(const CheckpointLock&) = delete; + + // We should never call the move constructor of the base class. We should not create base + // CheckpointLock instances, so any CheckpointLock type will actually be an instance of a + // derived class. At that point, moving the CheckpointLock type would call this constructor + // and skip the derived class' move constructor, likely leading to subtle bugs. + // + // Always using CheckpointLock pointer types will obviate needing a move constructor in + // either base or derived classes. + CheckpointLock(CheckpointLock&&) = delete; + + public: + virtual ~CheckpointLock() = default; + + protected: + CheckpointLock() = default; + }; + + /** * The destructor should only be called if we are tearing down but not exiting the process. */ virtual ~StorageEngine() {} @@ -558,6 +588,14 @@ public: virtual const KVEngine* getEngine() const = 0; virtual DurableCatalog* getCatalog() = 0; virtual const DurableCatalog* getCatalog() const = 0; + + /** + * Returns a CheckpointLock RAII instance that holds the checkpoint resource mutex. + * + * All operations taking a checkpoint should use this CheckpointLock. Also applicable for + * opening several checkpoint cursors to ensure the same checkpoint is targeted. + */ + virtual std::unique_ptr<CheckpointLock> getCheckpointLock(OperationContext* opCtx) = 0; }; } // namespace mongo diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp index f96445b0f66..5df8a4e4df7 100644 --- a/src/mongo/db/storage/storage_engine_impl.cpp +++ b/src/mongo/db/storage/storage_engine_impl.cpp @@ -537,6 +537,11 @@ void StorageEngineImpl::cleanShutdown() { StorageEngineImpl::~StorageEngineImpl() {} void StorageEngineImpl::finishInit() { + // A storage engine may need to start threads that require OperationsContexts with real Lockers, + // as opposed to LockerNoops. Placing the start logic here, after the StorageEngine has been + // instantiated, causes makeOperationContext() to create LockerImpls instead of LockerNoops. + _engine->startAsyncThreads(); + if (_engine->supportsRecoveryTimestamp()) { _timestampMonitor = std::make_unique<TimestampMonitor>( _engine.get(), getGlobalServiceContext()->getPeriodicRunner()); diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h index 13e704367d2..07f2cf6f42d 100644 --- a/src/mongo/db/storage/storage_engine_impl.h +++ b/src/mongo/db/storage/storage_engine_impl.h @@ -63,7 +63,7 @@ struct StorageEngineOptions { class StorageEngineImpl final : public StorageEngineInterface, public StorageEngine { public: /** - * @param engine - ownership passes to me + * @param engine - ownership passes to me. */ StorageEngineImpl(KVEngine* engine, StorageEngineOptions options = StorageEngineOptions()); @@ -163,8 +163,6 @@ public: void setJournalListener(JournalListener* jl) final; - // ------ kv ------ - /** * A TimestampMonitor is used to listen for any changes in the timestamps implemented by the * storage engine and to notify any registered listeners upon changes to these timestamps. @@ -309,6 +307,7 @@ public: KVEngine* getEngine() { return _engine.get(); } + const KVEngine* getEngine() const { return _engine.get(); } @@ -320,10 +319,15 @@ public: DurableCatalog* getCatalog() { return _catalog.get(); } + const DurableCatalog* getCatalog() const { return _catalog.get(); } + std::unique_ptr<CheckpointLock> getCheckpointLock(OperationContext* opCtx) { + return _engine->getCheckpointLock(opCtx); + } + /** * Drop abandoned idents. Returns a parallel list of index name, index spec pairs to rebuild. */ diff --git a/src/mongo/db/storage/storage_repair_observer.cpp b/src/mongo/db/storage/storage_repair_observer.cpp index 3bd7c0de042..23320197d5d 100644 --- a/src/mongo/db/storage/storage_repair_observer.cpp +++ b/src/mongo/db/storage/storage_repair_observer.cpp @@ -148,7 +148,7 @@ void StorageRepairObserver::_invalidateReplConfigIfNeeded(OperationContext* opCt configBuilder.append(repl::ReplSetConfig::kRepairedFieldName, true); Helpers::putSingleton(opCtx, kConfigNss.ns().c_str(), configBuilder.obj()); - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); } } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index d49cdfe821b..cabbef66485 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -234,10 +234,11 @@ public: LOG(1) << "starting " << name() << " thread"; while (!_shuttingDown.load()) { + auto opCtx = tc->makeOperationContext(); try { const bool forceCheckpoint = false; const bool stableCheckpoint = false; - _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint); + _sessionCache->waitUntilDurable(opCtx.get(), forceCheckpoint, stableCheckpoint); } catch (const AssertionException& e) { invariant(e.code() == ErrorCodes::ShutdownInProgress); } @@ -260,6 +261,33 @@ private: AtomicWord<bool> _shuttingDown{false}; }; +namespace { + +/** + * RAII class that holds an exclusive lock on the checkpoint resource mutex. + * + * Instances are created via getCheckpointLock(), which passes in the checkpoint resource mutex. + */ +class CheckpointLockImpl : public StorageEngine::CheckpointLock { + CheckpointLockImpl(const CheckpointLockImpl&) = delete; + CheckpointLockImpl& operator=(const CheckpointLockImpl&) = delete; + CheckpointLockImpl(CheckpointLockImpl&& other) = delete; + +public: + CheckpointLockImpl() = delete; + CheckpointLockImpl(OperationContext* opCtx, Lock::ResourceMutex mutex) + : _lk(opCtx->lockState(), mutex) { + invariant(_lk.isLocked()); + } + + ~CheckpointLockImpl() = default; + +private: + Lock::ExclusiveLock _lk; +}; + +} // namespace + std::string toString(const StorageEngine::OldestActiveTransactionTimestampResult& r) { if (r.isOK()) { if (r.getValue()) { @@ -291,6 +319,8 @@ public: LOG(1) << "starting " << name() << " thread"; while (!_shuttingDown.load()) { + auto opCtx = tc->makeOperationContext(); + { stdx::unique_lock<stdx::mutex> lock(_mutex); MONGO_IDLE_THREAD_BLOCK; @@ -342,6 +372,7 @@ public: if (initialDataTimestamp.asULL() <= 1) { UniqueWiredTigerSession session = _sessionCache->getSession(); WT_SESSION* s = session->getSession(); + auto checkpointLock = _wiredTigerKVEngine->getCheckpointLock(opCtx.get()); invariantWTOK(s->checkpoint(s, "use_timestamp=false")); } else if (stableTimestamp < initialDataTimestamp) { LOG_FOR_RECOVERY(2) @@ -358,6 +389,7 @@ public: UniqueWiredTigerSession session = _sessionCache->getSession(); WT_SESSION* s = session->getSession(); + auto checkpointLock = _wiredTigerKVEngine->getCheckpointLock(opCtx.get()); invariantWTOK(s->checkpoint(s, "use_timestamp=true")); if (oplogNeededForRollback.isOK()) { @@ -671,11 +703,6 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, _sessionSweeper = std::make_unique<WiredTigerSessionSweeper>(_sessionCache.get()); _sessionSweeper->go(); - if (_durable && !_ephemeral) { - _journalFlusher = std::make_unique<WiredTigerJournalFlusher>(_sessionCache.get()); - _journalFlusher->go(); - } - // Until the Replication layer installs a real callback, prevent truncating the oplog. setOldestActiveTransactionTimestampCallback( [](Timestamp) { return StatusWith(boost::make_optional(Timestamp::min())); }); @@ -686,9 +713,6 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, setOldestTimestamp(_recoveryTimestamp, false); setStableTimestamp(_recoveryTimestamp, false); } - - _checkpointThread = std::make_unique<WiredTigerCheckpointThread>(this, _sessionCache.get()); - _checkpointThread->go(); } if (_ephemeral && !getTestCommandsEnabled()) { @@ -713,7 +737,6 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, Locker::setGlobalThrottling(&openReadTransaction, &openWriteTransaction); } - WiredTigerKVEngine::~WiredTigerKVEngine() { if (_conn) { cleanShutdown(); @@ -722,6 +745,20 @@ WiredTigerKVEngine::~WiredTigerKVEngine() { _sessionCache.reset(nullptr); } +void WiredTigerKVEngine::startAsyncThreads() { + if (!_ephemeral) { + if (_durable) { + _journalFlusher = std::make_unique<WiredTigerJournalFlusher>(_sessionCache.get()); + _journalFlusher->go(); + } + if (!_readOnly) { + _checkpointThread = + std::make_unique<WiredTigerCheckpointThread>(this, _sessionCache.get()); + _checkpointThread->go(); + } + } +} + void WiredTigerKVEngine::appendGlobalStats(BSONObjBuilder& b) { BSONObjBuilder bb(b.subobjStart("concurrentTransactions")); { @@ -990,7 +1027,7 @@ int WiredTigerKVEngine::flushAllFiles(OperationContext* opCtx, bool sync) { const bool forceCheckpoint = true; // If there's no journal, we must take a full checkpoint. const bool stableCheckpoint = _durable; - _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint); + _sessionCache->waitUntilDurable(opCtx, forceCheckpoint, stableCheckpoint); return 1; } @@ -1899,6 +1936,11 @@ Timestamp WiredTigerKVEngine::getPinnedOplog() const { return Timestamp::min(); } +std::unique_ptr<StorageEngine::CheckpointLock> WiredTigerKVEngine::getCheckpointLock( + OperationContext* opCtx) { + return std::make_unique<CheckpointLockImpl>(opCtx, _checkpointMutex); +} + bool WiredTigerKVEngine::supportsReadConcernSnapshot() const { return true; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index bb8c1e4c76c..7b98f9fd388 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -39,6 +39,7 @@ #include "mongo/bson/ordering.h" #include "mongo/bson/timestamp.h" +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/storage/kv/kv_engine.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h" @@ -80,6 +81,8 @@ public: ~WiredTigerKVEngine(); + void startAsyncThreads() override; + void setRecordStoreExtraOptions(const std::string& options); void setSortedDataInterfaceExtraOptions(const std::string& options); @@ -345,6 +348,12 @@ public: return _clockSource; } + /** + * Returns a CheckpointLockImpl RAII instance holding the _checkpointMutex. + */ + std::unique_ptr<StorageEngine::CheckpointLock> getCheckpointLock( + OperationContext* opCtx) override; + private: class WiredTigerSessionSweeper; class WiredTigerJournalFlusher; @@ -461,5 +470,9 @@ private: // Timestamp of data at startup. Used internally to advise checkpointing and recovery to a // timestamp. Provided by replication layer because WT does not persist timestamps. AtomicWord<std::uint64_t> _initialDataTimestamp; + + // Required for taking a checkpoint; and can be used to ensure multiple checkpoint cursors + // target the same checkpoint. + Lock::ResourceMutex _checkpointMutex = Lock::ResourceMutex("checkpointCursorMutex"); }; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp index 8a1ca32e483..334cda5b748 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp @@ -80,16 +80,19 @@ public: private: WiredTigerKVEngine* makeEngine() { - return new WiredTigerKVEngine(kWiredTigerEngineName, - _dbpath.path(), - _cs.get(), - "", - 1, - 0, - false, - false, - _forRepair, - false); + auto engine = new WiredTigerKVEngine(kWiredTigerEngineName, + _dbpath.path(), + _cs.get(), + "", + 1, + 0, + false, + false, + _forRepair, + false); + // There are unit tests expecting checkpoints to occur asynchronously. + engine->startAsyncThreads(); + return engine; } const std::unique_ptr<ClockSource> _cs = std::make_unique<ClockSourceMock>(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp index e3ec7e9097d..2edf06fb8a2 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp @@ -33,6 +33,7 @@ #include <cstring> +#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" @@ -162,6 +163,17 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses // forward cursors. The timestamp is used to hide oplog entries that might be committed but // have uncommitted entries ahead of them. while (true) { + auto opCtx = cc().makeOperationContext(); + + // This thread is started before we finish creating the StorageEngine and consequently + // makeOperationContext() returns an OperationContext with a LockerNoop. Rather than trying + // to refactor the code to start this thread after the StorageEngine is fully instantiated, + // we will use this temporary hack to give the opCtx a real Locker. + // + // TODO (SERVER-41392): the Replicate Before Journaling project will be removing the + // waitUntilDurable() call requiring an opCtx parameter. + opCtx->swapLockState(std::make_unique<LockerImpl>()); + stdx::unique_lock<stdx::mutex> lk(_oplogVisibilityStateMutex); { MONGO_IDLE_THREAD_BLOCK; @@ -219,7 +231,7 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(WiredTigerSessionCache* ses // In order to avoid oplog holes after an unclean shutdown, we must ensure this proposed // oplog read timestamp's documents are durable before publishing that timestamp. - sessionCache->waitUntilDurable(/*forceCheckpoint=*/false, false); + sessionCache->waitUntilDurable(opCtx.get(), /*forceCheckpoint=*/false, false); lk.lock(); // Publish the new timestamp value. Avoid going backward. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 3b785878db8..b0275a2c7ab 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -251,21 +251,22 @@ void WiredTigerRecoveryUnit::_ensureSession() { } } -bool WiredTigerRecoveryUnit::waitUntilDurable() { +bool WiredTigerRecoveryUnit::waitUntilDurable(OperationContext* opCtx) { invariant(!_inUnitOfWork(), toString(_getState())); const bool forceCheckpoint = false; const bool stableCheckpoint = false; - _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint); + _sessionCache->waitUntilDurable(opCtx, forceCheckpoint, stableCheckpoint); return true; } -bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable(bool stableCheckpoint) { +bool WiredTigerRecoveryUnit::waitUntilUnjournaledWritesDurable(OperationContext* opCtx, + bool stableCheckpoint) { invariant(!_inUnitOfWork(), toString(_getState())); const bool forceCheckpoint = true; // Calling `waitUntilDurable` with `forceCheckpoint` set to false only performs a log // (journal) flush, and thus has no effect on unjournaled writes. Setting `forceCheckpoint` to // true will lock in stable writes to unjournaled tables. - _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint); + _sessionCache->waitUntilDurable(opCtx, forceCheckpoint, stableCheckpoint); return true; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index 81483c51cd7..c0dae116c73 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -107,9 +107,10 @@ public: void commitUnitOfWork() override; void abortUnitOfWork() override; - bool waitUntilDurable() override; + bool waitUntilDurable(OperationContext* opCtx) override; - bool waitUntilUnjournaledWritesDurable(bool stableCheckpoint = true) override; + bool waitUntilUnjournaledWritesDurable(OperationContext* opCtx, + bool stableCheckpoint = true) override; void abandonSnapshot() override; void preallocateSnapshot() override; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp index fbd3b398e2a..27ce7b946bd 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp @@ -61,6 +61,13 @@ public: false, // .repair false // .readOnly ) { + // Deliberately not calling _engine->startAsyncThreads() because it starts an asynchronous + // checkpointing thread that can interfere with unit tests manipulating checkpoints + // manually. + // + // Alternatively, we would have to start using wiredTigerGlobalOptions.checkpointDelaySecs + // to set a high enough value such that the async thread never runs during testing. + repl::ReplicationCoordinator::set( getGlobalServiceContext(), std::unique_ptr<repl::ReplicationCoordinator>(new repl::ReplicationCoordinatorMock( @@ -599,7 +606,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, CheckpointCursorsAreNotCached) { ru->abandonSnapshot(); - // Force a checkpoint + // Force a checkpoint. ASSERT_EQUALS(1, engine->flushAllFiles(opCtx, true)); // Test 2: Checkpoint cursors are not expected to be cached, they @@ -690,7 +697,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, CheckpointCursorNotChanged) { ASSERT_EQUALS(1, rs->numRecords(opCtx)); ru->commitUnitOfWork(); - // Force a checkpoint + // Force a checkpoint. ASSERT_EQUALS(1, engine->flushAllFiles(opCtx, true)); // Test 1: Open a checkpoint cursor and ensure it has the first record. @@ -709,16 +716,13 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, CheckpointCursorNotChanged) { ASSERT(!originalCheckpointCursor->seekExact(s2.getValue())); ASSERT(originalCheckpointCursor->seekExact(s1.getValue())); - // Test 3: New record does not appear in new checkpoint cursor since no - // checkpoint was created. - // TODO: Implement in SERVER-42221 - // - // ru->setTimestampReadSource(WiredTigerRecoveryUnit::ReadSource::kCheckpoint); - // auto newCheckpointCursor = rs->getCursor(opCtx, true); - // ASSERT(!newCheckpointCursor->seekExact(s2.getValue())); - // + // Test 3: New record does not appear in new checkpoint cursor since no new checkpoint was + // created. + ru->setTimestampReadSource(WiredTigerRecoveryUnit::ReadSource::kCheckpoint); + auto checkpointCursor = rs->getCursor(opCtx, true); + ASSERT(!checkpointCursor->seekExact(s2.getValue())); - // Force a checkpoint + // Force a checkpoint. ASSERT_EQUALS(1, engine->flushAllFiles(opCtx, true)); // Test 4: Old and new record should appear in new checkpoint cursor. Only old record diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 325c6728ac9..382c753daf2 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -227,7 +227,9 @@ void WiredTigerSessionCache::shuttingDown() { closeAll(); } -void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint, bool stableCheckpoint) { +void WiredTigerSessionCache::waitUntilDurable(OperationContext* opCtx, + bool forceCheckpoint, + bool stableCheckpoint) { // For inMemory storage engines, the data is "as durable as it's going to get". // That is, a restart is equivalent to a complete node failure. if (isEphemeral()) { @@ -259,6 +261,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint, bool stableC stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex); JournalListener::Token token = _journalListener->getToken(); auto config = stableCheckpoint ? "use_timestamp=true" : "use_timestamp=false"; + auto checkpointLock = _engine->getCheckpointLock(opCtx); invariantWTOK(s->checkpoint(s, config)); _journalListener->onDurable(token); } @@ -295,6 +298,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint, bool stableC invariantWTOK(_waitUntilDurableSession->log_flush(_waitUntilDurableSession, "sync=on")); LOG(4) << "flushed journal"; } else { + auto checkpointLock = _engine->getCheckpointLock(opCtx); invariantWTOK(_waitUntilDurableSession->checkpoint(_waitUntilDurableSession, nullptr)); LOG(4) << "created checkpoint"; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index d09220380a3..72b55e311ed 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -258,12 +258,13 @@ public: void shuttingDown(); bool isEphemeral(); + /** * Waits until all commits that happened before this call are durable, either by flushing * the log or forcing a checkpoint if forceCheckpoint is true or the journal is disabled. * Uses a temporary session. Safe to call without any locks, even during shutdown. */ - void waitUntilDurable(bool forceCheckpoint, bool stableCheckpoint); + void waitUntilDurable(OperationContext* opCtx, bool forceCheckpoint, bool stableCheckpoint); /** * Waits until a prepared unit of work has ended (either been commited or aborted). This diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index ca87ea7d50c..b4e2b756f9f 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -191,7 +191,7 @@ Status waitForWriteConcern(OperationContext* opCtx, result->fsyncFiles = storageEngine->flushAllFiles(opCtx, true); } else { // We only need to commit the journal if we're durable - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); } break; } @@ -200,10 +200,10 @@ Status waitForWriteConcern(OperationContext* opCtx, // Wait for ops to become durable then update replication system's // knowledge of this. auto appliedOpTimeAndWallTime = replCoord->getMyLastAppliedOpTimeAndWallTime(); - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); replCoord->setMyLastDurableOpTimeAndWallTimeForward(appliedOpTimeAndWallTime); } else { - opCtx->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(opCtx); } break; } |