summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@mongodb.com>2020-01-29 16:49:39 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-06 14:35:37 +0000
commit9548fb8ea6d452ad8dd8dbfacd8188becb80f549 (patch)
treeeb48f062bf059bc2790b01d1efa44fe99272fe0a
parent58081d16653b6aa0befbf9eeef68efcce890f6cc (diff)
downloadmongo-9548fb8ea6d452ad8dd8dbfacd8188becb80f549.tar.gz
SERVER-45851 Make the JournalFlusher interruptible for repl state change if it is waiting for locks
-rw-r--r--src/mongo/db/catalog/catalog_control_test.cpp1
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h5
-rw-r--r--src/mongo/db/storage/storage_engine.h8
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp4
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp50
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h2
7 files changed, 69 insertions, 3 deletions
diff --git a/src/mongo/db/catalog/catalog_control_test.cpp b/src/mongo/db/catalog/catalog_control_test.cpp
index 3ae3fc226ce..7c6f3bf6328 100644
--- a/src/mongo/db/catalog/catalog_control_test.cpp
+++ b/src/mongo/db/catalog/catalog_control_test.cpp
@@ -156,6 +156,7 @@ public:
void setCachePressureForTest(int pressure) final {}
void triggerJournalFlush() const final {}
void waitForJournalFlush(OperationContext* opCtx) const final {}
+ void interruptJournalFlusherForReplStateChange() const final {}
StatusWith<StorageEngine::ReconcileResult> reconcileCatalogAndIdents(
OperationContext* opCtx) final {
return ReconcileResult{};
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index b806827c96a..22b7d908b1d 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -472,6 +472,11 @@ public:
}
/**
+ * See `StorageEngine::interruptJournalFlusherForReplStateChange()`
+ */
+ virtual void interruptJournalFlusherForReplStateChange() const {}
+
+ /**
* Methods to access the storage engine's timestamps.
*/
virtual Timestamp getCheckpointTimestamp() const {
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 01b23882a6d..ea9143d5df2 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -549,6 +549,14 @@ public:
*/
virtual void waitForJournalFlush(OperationContext* opCtx) const = 0;
+ /**
+ * Ensures interruption of the JournalFlusher if it is or will be acquiring a lock.
+ *
+ * TODO: this function will be moved above the Storage Engine layer along with the
+ * JournalFlusher in SERVER-45847.
+ */
+ virtual void interruptJournalFlusherForReplStateChange() const = 0;
+
struct IndexIdentifier {
const RecordId catalogId;
const NamespaceString nss;
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index 93a59a29ab0..e1dc25ba35b 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -856,6 +856,10 @@ void StorageEngineImpl::waitForJournalFlush(OperationContext* opCtx) const {
return _engine->waitForJournalFlush(opCtx);
}
+void StorageEngineImpl::interruptJournalFlusherForReplStateChange() const {
+ return _engine->interruptJournalFlusherForReplStateChange();
+}
+
Timestamp StorageEngineImpl::getAllDurableTimestamp() const {
return _engine->getAllDurableTimestamp();
}
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index 192acfe1437..dfa6c697ee4 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -169,6 +169,8 @@ public:
void waitForJournalFlush(OperationContext* opCtx) const final;
+ void interruptJournalFlusherForReplStateChange() const final;
+
SnapshotManager* getSnapshotManager() const final;
void setJournalListener(JournalListener* jl) final;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 4c9ac8aec9a..ad60e26840c 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -247,17 +247,33 @@ public:
ThreadClient tc(name(), getGlobalServiceContext());
LOG(1) << "starting " << name() << " thread";
+ // Initialize the thread's opCtx.
+ _uniqueCtx.emplace(tc->makeOperationContext());
while (true) {
- auto opCtx = tc->makeOperationContext();
try {
- _sessionCache->waitUntilDurable(opCtx.get(),
+ ON_BLOCK_EXIT([&] {
+ // We do not want to miss an interrupt for the next round. Therefore, the opCtx
+ // will be reset after a flushing round finishes.
+ //
+ // It is fine if the opCtx is signaled between finishing and resetting because
+ // state changes will be seen before the next round. We want to catch any
+ // interrupt signals that occur after state is checked at the start of a round:
+ // the time during or before the next flush.
+ stdx::lock_guard<Latch> lk(_opCtxMutex);
+ _uniqueCtx.reset();
+ _uniqueCtx.emplace(tc->makeOperationContext());
+ });
+
+ _sessionCache->waitUntilDurable(_uniqueCtx->get(),
/*forceCheckpoint*/ false,
/*stableCheckpoint*/ false);
// Signal the waiters that a round completed.
_currentSharedPromise->emplaceValue();
} catch (const AssertionException& e) {
- invariant(e.code() == ErrorCodes::ShutdownInProgress);
+ invariant(ErrorCodes::isShutdownError(e.code()) ||
+ e.code() == ErrorCodes::InterruptedDueToReplStateChange,
+ e.toString());
// Signal the waiters that the fsync was interrupted.
_currentSharedPromise->setError(e.toStatus());
@@ -282,6 +298,8 @@ public:
LOG(1) << "stopping " << name() << " thread";
_nextSharedPromise->setError(
Status(ErrorCodes::ShutdownInProgress, "The storage catalog is being closed."));
+ stdx::lock_guard<Latch> lk(_opCtxMutex);
+ _uniqueCtx.reset();
return;
}
@@ -332,9 +350,28 @@ public:
myFuture.get();
}
+ /**
+ * Interrupts the journal flusher thread via its operation context with an
+ * InterruptedDueToReplStateChange error.
+ */
+ void interruptJournalFlusherForReplStateChange() {
+ stdx::lock_guard<Latch> lk(_opCtxMutex);
+ if (_uniqueCtx) {
+ stdx::lock_guard<Client> lk(*_uniqueCtx->get()->getClient());
+ _uniqueCtx->get()->markKilled(ErrorCodes::InterruptedDueToReplStateChange);
+ }
+ }
+
private:
WiredTigerSessionCache* _sessionCache;
+ // Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed.
+ mutable Mutex _opCtxMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherOpCtxMutex");
+
+ // Saves a reference to the flusher thread's operation context so it can be interrupted if the
+ // flusher is active.
+ boost::optional<ServiceContext::UniqueOperationContext> _uniqueCtx;
+
// Protects the state below.
mutable Mutex _stateMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherStateMutex");
@@ -2219,6 +2256,13 @@ void WiredTigerKVEngine::waitForJournalFlush(OperationContext* opCtx) const {
}
}
+
+void WiredTigerKVEngine::interruptJournalFlusherForReplStateChange() const {
+ if (_journalFlusher) {
+ _journalFlusher->interruptJournalFlusherForReplStateChange();
+ }
+}
+
bool WiredTigerKVEngine::isCacheUnderPressure(OperationContext* opCtx) const {
WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSessionNoTxn();
invariant(session);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 01f7d1c5a8d..e0f33b88da0 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -257,6 +257,8 @@ public:
void waitForJournalFlush(OperationContext* opCtx) const override;
+ void interruptJournalFlusherForReplStateChange() const override;
+
bool isCacheUnderPressure(OperationContext* opCtx) const override;
bool supportsReadConcernMajority() const final;