diff options
author | Dianna Hohensee <dianna.hohensee@mongodb.com> | 2020-01-29 16:49:39 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-06 14:35:37 +0000 |
commit | 9548fb8ea6d452ad8dd8dbfacd8188becb80f549 (patch) | |
tree | eb48f062bf059bc2790b01d1efa44fe99272fe0a | |
parent | 58081d16653b6aa0befbf9eeef68efcce890f6cc (diff) | |
download | mongo-9548fb8ea6d452ad8dd8dbfacd8188becb80f549.tar.gz |
SERVER-45851 Make the JournalFlusher interruptible for repl state change if it is waiting for locks
-rw-r--r-- | src/mongo/db/catalog/catalog_control_test.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_engine.h | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine.h | 8 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 50 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h | 2 |
7 files changed, 69 insertions, 3 deletions
diff --git a/src/mongo/db/catalog/catalog_control_test.cpp b/src/mongo/db/catalog/catalog_control_test.cpp index 3ae3fc226ce..7c6f3bf6328 100644 --- a/src/mongo/db/catalog/catalog_control_test.cpp +++ b/src/mongo/db/catalog/catalog_control_test.cpp @@ -156,6 +156,7 @@ public: void setCachePressureForTest(int pressure) final {} void triggerJournalFlush() const final {} void waitForJournalFlush(OperationContext* opCtx) const final {} + void interruptJournalFlusherForReplStateChange() const final {} StatusWith<StorageEngine::ReconcileResult> reconcileCatalogAndIdents( OperationContext* opCtx) final { return ReconcileResult{}; diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index b806827c96a..22b7d908b1d 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -472,6 +472,11 @@ public: } /** + * See `StorageEngine::interruptJournalFlusherForReplStateChange()` + */ + virtual void interruptJournalFlusherForReplStateChange() const {} + + /** * Methods to access the storage engine's timestamps. */ virtual Timestamp getCheckpointTimestamp() const { diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 01b23882a6d..ea9143d5df2 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -549,6 +549,14 @@ public: */ virtual void waitForJournalFlush(OperationContext* opCtx) const = 0; + /** + * Ensures interruption of the JournalFlusher if it is or will be acquiring a lock. + * + * TODO: this function will be moved above the Storage Engine layer along with the + * JournalFlusher in SERVER-45847. + */ + virtual void interruptJournalFlusherForReplStateChange() const = 0; + struct IndexIdentifier { const RecordId catalogId; const NamespaceString nss; diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp index 93a59a29ab0..e1dc25ba35b 100644 --- a/src/mongo/db/storage/storage_engine_impl.cpp +++ b/src/mongo/db/storage/storage_engine_impl.cpp @@ -856,6 +856,10 @@ void StorageEngineImpl::waitForJournalFlush(OperationContext* opCtx) const { return _engine->waitForJournalFlush(opCtx); } +void StorageEngineImpl::interruptJournalFlusherForReplStateChange() const { + return _engine->interruptJournalFlusherForReplStateChange(); +} + Timestamp StorageEngineImpl::getAllDurableTimestamp() const { return _engine->getAllDurableTimestamp(); } diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h index 192acfe1437..dfa6c697ee4 100644 --- a/src/mongo/db/storage/storage_engine_impl.h +++ b/src/mongo/db/storage/storage_engine_impl.h @@ -169,6 +169,8 @@ public: void waitForJournalFlush(OperationContext* opCtx) const final; + void interruptJournalFlusherForReplStateChange() const final; + SnapshotManager* getSnapshotManager() const final; void setJournalListener(JournalListener* jl) final; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 4c9ac8aec9a..ad60e26840c 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -247,17 +247,33 @@ public: ThreadClient tc(name(), getGlobalServiceContext()); LOG(1) << "starting " << name() << " thread"; + // Initialize the thread's opCtx. + _uniqueCtx.emplace(tc->makeOperationContext()); while (true) { - auto opCtx = tc->makeOperationContext(); try { - _sessionCache->waitUntilDurable(opCtx.get(), + ON_BLOCK_EXIT([&] { + // We do not want to miss an interrupt for the next round. Therefore, the opCtx + // will be reset after a flushing round finishes. + // + // It is fine if the opCtx is signaled between finishing and resetting because + // state changes will be seen before the next round. We want to catch any + // interrupt signals that occur after state is checked at the start of a round: + // the time during or before the next flush. + stdx::lock_guard<Latch> lk(_opCtxMutex); + _uniqueCtx.reset(); + _uniqueCtx.emplace(tc->makeOperationContext()); + }); + + _sessionCache->waitUntilDurable(_uniqueCtx->get(), /*forceCheckpoint*/ false, /*stableCheckpoint*/ false); // Signal the waiters that a round completed. _currentSharedPromise->emplaceValue(); } catch (const AssertionException& e) { - invariant(e.code() == ErrorCodes::ShutdownInProgress); + invariant(ErrorCodes::isShutdownError(e.code()) || + e.code() == ErrorCodes::InterruptedDueToReplStateChange, + e.toString()); // Signal the waiters that the fsync was interrupted. _currentSharedPromise->setError(e.toStatus()); @@ -282,6 +298,8 @@ public: LOG(1) << "stopping " << name() << " thread"; _nextSharedPromise->setError( Status(ErrorCodes::ShutdownInProgress, "The storage catalog is being closed.")); + stdx::lock_guard<Latch> lk(_opCtxMutex); + _uniqueCtx.reset(); return; } @@ -332,9 +350,28 @@ public: myFuture.get(); } + /** + * Interrupts the journal flusher thread via its operation context with an + * InterruptedDueToReplStateChange error. + */ + void interruptJournalFlusherForReplStateChange() { + stdx::lock_guard<Latch> lk(_opCtxMutex); + if (_uniqueCtx) { + stdx::lock_guard<Client> lk(*_uniqueCtx->get()->getClient()); + _uniqueCtx->get()->markKilled(ErrorCodes::InterruptedDueToReplStateChange); + } + } + private: WiredTigerSessionCache* _sessionCache; + // Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed. + mutable Mutex _opCtxMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherOpCtxMutex"); + + // Saves a reference to the flusher thread's operation context so it can be interrupted if the + // flusher is active. + boost::optional<ServiceContext::UniqueOperationContext> _uniqueCtx; + // Protects the state below. mutable Mutex _stateMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherStateMutex"); @@ -2219,6 +2256,13 @@ void WiredTigerKVEngine::waitForJournalFlush(OperationContext* opCtx) const { } } + +void WiredTigerKVEngine::interruptJournalFlusherForReplStateChange() const { + if (_journalFlusher) { + _journalFlusher->interruptJournalFlusherForReplStateChange(); + } +} + bool WiredTigerKVEngine::isCacheUnderPressure(OperationContext* opCtx) const { WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSessionNoTxn(); invariant(session); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 01f7d1c5a8d..e0f33b88da0 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -257,6 +257,8 @@ public: void waitForJournalFlush(OperationContext* opCtx) const override; + void interruptJournalFlusherForReplStateChange() const override; + bool isCacheUnderPressure(OperationContext* opCtx) const override; bool supportsReadConcernMajority() const final; |