summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp')
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp56
1 files changed, 52 insertions, 4 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index c4cf26c0739..4c9ac8aec9a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -250,10 +250,17 @@ public:
while (true) {
auto opCtx = tc->makeOperationContext();
try {
- _sessionCache->waitUntilDurable(
- opCtx.get(), /*forceCheckpoint*/ false, /*stableCheckpoint*/ false);
+ _sessionCache->waitUntilDurable(opCtx.get(),
+ /*forceCheckpoint*/ false,
+ /*stableCheckpoint*/ false);
+
+ // Signal the waiters that a round completed.
+ _currentSharedPromise->emplaceValue();
} catch (const AssertionException& e) {
invariant(e.code() == ErrorCodes::ShutdownInProgress);
+
+ // Signal the waiters that the fsync was interrupted.
+ _currentSharedPromise->setError(e.toStatus());
}
// Wait until either journalCommitIntervalMs passes or an immediate journal flush is
@@ -261,6 +268,7 @@ public:
auto deadline =
Date_t::now() + Milliseconds(storageGlobalParams.journalCommitIntervalMs.load());
+
stdx::unique_lock<Latch> lk(_stateMutex);
MONGO_IDLE_THREAD_BLOCK;
@@ -272,8 +280,14 @@ public:
if (_shuttingDown) {
LOG(1) << "stopping " << name() << " thread";
+ _nextSharedPromise->setError(
+ Status(ErrorCodes::ShutdownInProgress, "The storage catalog is being closed."));
return;
}
+
+ // Take the next promise as current and reset the next promise.
+ _currentSharedPromise =
+ std::exchange(_nextSharedPromise, std::make_unique<SharedPromise<void>>());
}
}
@@ -290,7 +304,7 @@ public:
}
/**
- * Signals an immediate journal flush.
+ * Signals an immediate journal flush and leaves.
*/
void triggerJournalFlush() {
stdx::lock_guard<Latch> lk(_stateMutex);
@@ -300,6 +314,24 @@ public:
}
}
+ /**
+ * Signals an immediate journal flush and waits for it to complete before returning.
+ *
+ * Will throw ShutdownInProgress if the flusher thread is being stopped.
+ */
+ void waitForJournalFlush() {
+ auto myFuture = [&]() {
+ stdx::unique_lock<Latch> lk(_stateMutex);
+ if (!_flushJournalNow) {
+ _flushJournalNow = true;
+ _flushJournalNowCV.notify_one();
+ }
+ return _nextSharedPromise->getFuture();
+ }();
+ // Throws on error if the catalog is closed.
+ myFuture.get();
+ }
+
private:
WiredTigerSessionCache* _sessionCache;
@@ -307,11 +339,19 @@ private:
mutable Mutex _stateMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherStateMutex");
// Signaled to wake up the thread, if the thread is waiting. The thread will check whether
- // _flushJournalNow or _shuttingDown is set.
+ // _flushJournalNow or _shuttingDown is set and flush or stop accordingly.
mutable stdx::condition_variable _flushJournalNowCV;
bool _flushJournalNow = false;
bool _shuttingDown = false;
+
+ // New callers get a future from nextSharedPromise. The JournalFlusher thread will swap that to
+ // currentSharedPromise at the start of every round of flushing, and reset nextSharedPromise
+ // with a new shared promise.
+ std::unique_ptr<SharedPromise<void>> _currentSharedPromise =
+ std::make_unique<SharedPromise<void>>();
+ std::unique_ptr<SharedPromise<void>> _nextSharedPromise =
+ std::make_unique<SharedPromise<void>>();
};
namespace {
@@ -2171,6 +2211,14 @@ void WiredTigerKVEngine::triggerJournalFlush() const {
}
}
+void WiredTigerKVEngine::waitForJournalFlush(OperationContext* opCtx) const {
+ if (_journalFlusher) {
+ _journalFlusher->waitForJournalFlush();
+ } else {
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ }
+}
+
bool WiredTigerKVEngine::isCacheUnderPressure(OperationContext* opCtx) const {
WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSessionNoTxn();
invariant(session);