diff options
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp')
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 56 |
1 files changed, 52 insertions, 4 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index c4cf26c0739..4c9ac8aec9a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -250,10 +250,17 @@ public: while (true) { auto opCtx = tc->makeOperationContext(); try { - _sessionCache->waitUntilDurable( - opCtx.get(), /*forceCheckpoint*/ false, /*stableCheckpoint*/ false); + _sessionCache->waitUntilDurable(opCtx.get(), + /*forceCheckpoint*/ false, + /*stableCheckpoint*/ false); + + // Signal the waiters that a round completed. + _currentSharedPromise->emplaceValue(); } catch (const AssertionException& e) { invariant(e.code() == ErrorCodes::ShutdownInProgress); + + // Signal the waiters that the fsync was interrupted. + _currentSharedPromise->setError(e.toStatus()); } // Wait until either journalCommitIntervalMs passes or an immediate journal flush is @@ -261,6 +268,7 @@ public: auto deadline = Date_t::now() + Milliseconds(storageGlobalParams.journalCommitIntervalMs.load()); + stdx::unique_lock<Latch> lk(_stateMutex); MONGO_IDLE_THREAD_BLOCK; @@ -272,8 +280,14 @@ public: if (_shuttingDown) { LOG(1) << "stopping " << name() << " thread"; + _nextSharedPromise->setError( + Status(ErrorCodes::ShutdownInProgress, "The storage catalog is being closed.")); return; } + + // Take the next promise as current and reset the next promise. + _currentSharedPromise = + std::exchange(_nextSharedPromise, std::make_unique<SharedPromise<void>>()); } } @@ -290,7 +304,7 @@ public: } /** - * Signals an immediate journal flush. + * Signals an immediate journal flush and leaves. */ void triggerJournalFlush() { stdx::lock_guard<Latch> lk(_stateMutex); @@ -300,6 +314,24 @@ public: } } + /** + * Signals an immediate journal flush and waits for it to complete before returning. + * + * Will throw ShutdownInProgress if the flusher thread is being stopped. + */ + void waitForJournalFlush() { + auto myFuture = [&]() { + stdx::unique_lock<Latch> lk(_stateMutex); + if (!_flushJournalNow) { + _flushJournalNow = true; + _flushJournalNowCV.notify_one(); + } + return _nextSharedPromise->getFuture(); + }(); + // Throws on error if the catalog is closed. + myFuture.get(); + } + private: WiredTigerSessionCache* _sessionCache; @@ -307,11 +339,19 @@ private: mutable Mutex _stateMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherStateMutex"); // Signaled to wake up the thread, if the thread is waiting. The thread will check whether - // _flushJournalNow or _shuttingDown is set. + // _flushJournalNow or _shuttingDown is set and flush or stop accordingly. mutable stdx::condition_variable _flushJournalNowCV; bool _flushJournalNow = false; bool _shuttingDown = false; + + // New callers get a future from nextSharedPromise. The JournalFlusher thread will swap that to + // currentSharedPromise at the start of every round of flushing, and reset nextSharedPromise + // with a new shared promise. + std::unique_ptr<SharedPromise<void>> _currentSharedPromise = + std::make_unique<SharedPromise<void>>(); + std::unique_ptr<SharedPromise<void>> _nextSharedPromise = + std::make_unique<SharedPromise<void>>(); }; namespace { @@ -2171,6 +2211,14 @@ void WiredTigerKVEngine::triggerJournalFlush() const { } } +void WiredTigerKVEngine::waitForJournalFlush(OperationContext* opCtx) const { + if (_journalFlusher) { + _journalFlusher->waitForJournalFlush(); + } else { + opCtx->recoveryUnit()->waitUntilDurable(opCtx); + } +} + bool WiredTigerKVEngine::isCacheUnderPressure(OperationContext* opCtx) const { WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSessionNoTxn(); invariant(session); |