From fb91e1d6e719de42905d63d26f065a4ac835be1e Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Fri, 10 Jan 2020 08:57:29 -0500 Subject: SERVER-45665 Make JournalFlusher flush on command and waitForWriteConcern asynchronously call waitUntilDurable through the JournalFlusher. Making waitForWriteConcern asynchronously call waitUntilDurable is a performance gain. --- src/mongo/db/catalog/catalog_control_test.cpp | 1 + src/mongo/db/s/wait_for_majority_service_test.cpp | 2 +- src/mongo/db/storage/kv/kv_engine.h | 9 +++- src/mongo/db/storage/storage_engine.h | 7 +++ src/mongo/db/storage/storage_engine_impl.cpp | 4 ++ src/mongo/db/storage/storage_engine_impl.h | 2 + .../db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 56 ++++++++++++++++++++-- .../db/storage/wiredtiger/wiredtiger_kv_engine.h | 2 + src/mongo/db/write_concern.cpp | 12 +---- 9 files changed, 79 insertions(+), 16 deletions(-) diff --git a/src/mongo/db/catalog/catalog_control_test.cpp b/src/mongo/db/catalog/catalog_control_test.cpp index 6b15acb840f..3ae3fc226ce 100644 --- a/src/mongo/db/catalog/catalog_control_test.cpp +++ b/src/mongo/db/catalog/catalog_control_test.cpp @@ -155,6 +155,7 @@ public: } void setCachePressureForTest(int pressure) final {} void triggerJournalFlush() const final {} + void waitForJournalFlush(OperationContext* opCtx) const final {} StatusWith reconcileCatalogAndIdents( OperationContext* opCtx) final { return ReconcileResult{}; diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp index 1d7c975d385..c9e2a8af3ef 100644 --- a/src/mongo/db/s/wait_for_majority_service_test.cpp +++ b/src/mongo/db/s/wait_for_majority_service_test.cpp @@ -38,7 +38,7 @@ namespace mongo { namespace { -class WaitForMajorityServiceTest : public ServiceContextTest { +class WaitForMajorityServiceTest : public ServiceContextMongoDTest { public: void setUp() override { auto service = getServiceContext(); diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index 7c3316cb6a5..b806827c96a 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -462,7 +462,14 @@ public: /** * See `StorageEngine::triggerJournalFlush()` */ - virtual void triggerJournalFlush() const {}; + virtual void triggerJournalFlush() const {} + + /** + * See `StorageEngine::waitForJournalFlush()` + */ + virtual void waitForJournalFlush(OperationContext* opCtx) const { + opCtx->recoveryUnit()->waitUntilDurable(opCtx); + } /** * Methods to access the storage engine's timestamps. diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 45e2c444228..01b23882a6d 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -542,6 +542,13 @@ public: */ virtual void triggerJournalFlush() const = 0; + /** + * Initiates if needed and waits for a complete round of journal flushing to execute. + * + * Can throw ShutdownInProgress if the storage engine is being closed. + */ + virtual void waitForJournalFlush(OperationContext* opCtx) const = 0; + struct IndexIdentifier { const RecordId catalogId; const NamespaceString nss; diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp index 71cdd7fd74b..93a59a29ab0 100644 --- a/src/mongo/db/storage/storage_engine_impl.cpp +++ b/src/mongo/db/storage/storage_engine_impl.cpp @@ -852,6 +852,10 @@ void StorageEngineImpl::triggerJournalFlush() const { return _engine->triggerJournalFlush(); } +void StorageEngineImpl::waitForJournalFlush(OperationContext* opCtx) const { + return _engine->waitForJournalFlush(opCtx); +} + Timestamp StorageEngineImpl::getAllDurableTimestamp() const { return _engine->getAllDurableTimestamp(); } diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h index f34c5bbac29..192acfe1437 100644 --- a/src/mongo/db/storage/storage_engine_impl.h +++ b/src/mongo/db/storage/storage_engine_impl.h @@ -167,6 +167,8 @@ public: void triggerJournalFlush() const final; + void waitForJournalFlush(OperationContext* opCtx) const final; + SnapshotManager* getSnapshotManager() const final; void setJournalListener(JournalListener* jl) final; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index c4cf26c0739..4c9ac8aec9a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -250,10 +250,17 @@ public: while (true) { auto opCtx = tc->makeOperationContext(); try { - _sessionCache->waitUntilDurable( - opCtx.get(), /*forceCheckpoint*/ false, /*stableCheckpoint*/ false); + _sessionCache->waitUntilDurable(opCtx.get(), + /*forceCheckpoint*/ false, + /*stableCheckpoint*/ false); + + // Signal the waiters that a round completed. + _currentSharedPromise->emplaceValue(); } catch (const AssertionException& e) { invariant(e.code() == ErrorCodes::ShutdownInProgress); + + // Signal the waiters that the fsync was interrupted. + _currentSharedPromise->setError(e.toStatus()); } // Wait until either journalCommitIntervalMs passes or an immediate journal flush is @@ -261,6 +268,7 @@ public: auto deadline = Date_t::now() + Milliseconds(storageGlobalParams.journalCommitIntervalMs.load()); + stdx::unique_lock lk(_stateMutex); MONGO_IDLE_THREAD_BLOCK; @@ -272,8 +280,14 @@ public: if (_shuttingDown) { LOG(1) << "stopping " << name() << " thread"; + _nextSharedPromise->setError( + Status(ErrorCodes::ShutdownInProgress, "The storage catalog is being closed.")); return; } + + // Take the next promise as current and reset the next promise. + _currentSharedPromise = + std::exchange(_nextSharedPromise, std::make_unique>()); } } @@ -290,7 +304,7 @@ public: } /** - * Signals an immediate journal flush. + * Signals an immediate journal flush and leaves. */ void triggerJournalFlush() { stdx::lock_guard lk(_stateMutex); @@ -300,6 +314,24 @@ public: } } + /** + * Signals an immediate journal flush and waits for it to complete before returning. + * + * Will throw ShutdownInProgress if the flusher thread is being stopped. + */ + void waitForJournalFlush() { + auto myFuture = [&]() { + stdx::unique_lock lk(_stateMutex); + if (!_flushJournalNow) { + _flushJournalNow = true; + _flushJournalNowCV.notify_one(); + } + return _nextSharedPromise->getFuture(); + }(); + // Throws on error if the catalog is closed. + myFuture.get(); + } + private: WiredTigerSessionCache* _sessionCache; @@ -307,11 +339,19 @@ private: mutable Mutex _stateMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherStateMutex"); // Signaled to wake up the thread, if the thread is waiting. The thread will check whether - // _flushJournalNow or _shuttingDown is set. + // _flushJournalNow or _shuttingDown is set and flush or stop accordingly. mutable stdx::condition_variable _flushJournalNowCV; bool _flushJournalNow = false; bool _shuttingDown = false; + + // New callers get a future from nextSharedPromise. The JournalFlusher thread will swap that to + // currentSharedPromise at the start of every round of flushing, and reset nextSharedPromise + // with a new shared promise. + std::unique_ptr> _currentSharedPromise = + std::make_unique>(); + std::unique_ptr> _nextSharedPromise = + std::make_unique>(); }; namespace { @@ -2171,6 +2211,14 @@ void WiredTigerKVEngine::triggerJournalFlush() const { } } +void WiredTigerKVEngine::waitForJournalFlush(OperationContext* opCtx) const { + if (_journalFlusher) { + _journalFlusher->waitForJournalFlush(); + } else { + opCtx->recoveryUnit()->waitUntilDurable(opCtx); + } +} + bool WiredTigerKVEngine::isCacheUnderPressure(OperationContext* opCtx) const { WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSessionNoTxn(); invariant(session); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 40295cb4670..01f7d1c5a8d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -255,6 +255,8 @@ public: void triggerJournalFlush() const override; + void waitForJournalFlush(OperationContext* opCtx) const override; + bool isCacheUnderPressure(OperationContext* opCtx) const override; bool supportsReadConcernMajority() const final; diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index fa3fa1527a1..018ca7b5d75 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -242,20 +242,12 @@ Status waitForWriteConcern(OperationContext* opCtx, result->fsyncFiles = storageEngine->flushAllFiles(opCtx, true); } else { // We only need to commit the journal if we're durable - opCtx->recoveryUnit()->waitUntilDurable(opCtx); + getGlobalServiceContext()->getStorageEngine()->waitForJournalFlush(opCtx); } break; } case WriteConcernOptions::SyncMode::JOURNAL: - if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::Mode::modeNone) { - // Wait for ops to become durable then update replication system's - // knowledge of this. - auto appliedOpTimeAndWallTime = replCoord->getMyLastAppliedOpTimeAndWallTime(); - opCtx->recoveryUnit()->waitUntilDurable(opCtx); - replCoord->setMyLastDurableOpTimeAndWallTimeForward(appliedOpTimeAndWallTime); - } else { - opCtx->recoveryUnit()->waitUntilDurable(opCtx); - } + getGlobalServiceContext()->getStorageEngine()->waitForJournalFlush(opCtx); break; } -- cgit v1.2.1