summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@mongodb.com>2020-01-10 08:57:29 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-05 18:42:37 +0000
commitfb91e1d6e719de42905d63d26f065a4ac835be1e (patch)
tree0e81d0370d36561bba6a34b135b27ff8c2493dfa
parentd8b5b7e7d954919155f9841ba181b26f56e436db (diff)
downloadmongo-fb91e1d6e719de42905d63d26f065a4ac835be1e.tar.gz
SERVER-45665 Make JournalFlusher flush on command and waitForWriteConcern asynchronously call waitUntilDurable through the JournalFlusher.
Making waitForWriteConcern asynchronously call waitUntilDurable is a performance gain.
-rw-r--r--src/mongo/db/catalog/catalog_control_test.cpp1
-rw-r--r--src/mongo/db/s/wait_for_majority_service_test.cpp2
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h9
-rw-r--r--src/mongo/db/storage/storage_engine.h7
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp4
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp56
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h2
-rw-r--r--src/mongo/db/write_concern.cpp12
9 files changed, 79 insertions, 16 deletions
diff --git a/src/mongo/db/catalog/catalog_control_test.cpp b/src/mongo/db/catalog/catalog_control_test.cpp
index 6b15acb840f..3ae3fc226ce 100644
--- a/src/mongo/db/catalog/catalog_control_test.cpp
+++ b/src/mongo/db/catalog/catalog_control_test.cpp
@@ -155,6 +155,7 @@ public:
}
void setCachePressureForTest(int pressure) final {}
void triggerJournalFlush() const final {}
+ void waitForJournalFlush(OperationContext* opCtx) const final {}
StatusWith<StorageEngine::ReconcileResult> reconcileCatalogAndIdents(
OperationContext* opCtx) final {
return ReconcileResult{};
diff --git a/src/mongo/db/s/wait_for_majority_service_test.cpp b/src/mongo/db/s/wait_for_majority_service_test.cpp
index 1d7c975d385..c9e2a8af3ef 100644
--- a/src/mongo/db/s/wait_for_majority_service_test.cpp
+++ b/src/mongo/db/s/wait_for_majority_service_test.cpp
@@ -38,7 +38,7 @@
namespace mongo {
namespace {
-class WaitForMajorityServiceTest : public ServiceContextTest {
+class WaitForMajorityServiceTest : public ServiceContextMongoDTest {
public:
void setUp() override {
auto service = getServiceContext();
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 7c3316cb6a5..b806827c96a 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -462,7 +462,14 @@ public:
/**
* See `StorageEngine::triggerJournalFlush()`
*/
- virtual void triggerJournalFlush() const {};
+ virtual void triggerJournalFlush() const {}
+
+ /**
+ * See `StorageEngine::waitForJournalFlush()`
+ */
+ virtual void waitForJournalFlush(OperationContext* opCtx) const {
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ }
/**
* Methods to access the storage engine's timestamps.
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 45e2c444228..01b23882a6d 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -542,6 +542,13 @@ public:
*/
virtual void triggerJournalFlush() const = 0;
+ /**
+ * Initiates if needed and waits for a complete round of journal flushing to execute.
+ *
+ * Can throw ShutdownInProgress if the storage engine is being closed.
+ */
+ virtual void waitForJournalFlush(OperationContext* opCtx) const = 0;
+
struct IndexIdentifier {
const RecordId catalogId;
const NamespaceString nss;
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index 71cdd7fd74b..93a59a29ab0 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -852,6 +852,10 @@ void StorageEngineImpl::triggerJournalFlush() const {
return _engine->triggerJournalFlush();
}
+void StorageEngineImpl::waitForJournalFlush(OperationContext* opCtx) const {
+ return _engine->waitForJournalFlush(opCtx);
+}
+
Timestamp StorageEngineImpl::getAllDurableTimestamp() const {
return _engine->getAllDurableTimestamp();
}
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index f34c5bbac29..192acfe1437 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -167,6 +167,8 @@ public:
void triggerJournalFlush() const final;
+ void waitForJournalFlush(OperationContext* opCtx) const final;
+
SnapshotManager* getSnapshotManager() const final;
void setJournalListener(JournalListener* jl) final;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index c4cf26c0739..4c9ac8aec9a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -250,10 +250,17 @@ public:
while (true) {
auto opCtx = tc->makeOperationContext();
try {
- _sessionCache->waitUntilDurable(
- opCtx.get(), /*forceCheckpoint*/ false, /*stableCheckpoint*/ false);
+ _sessionCache->waitUntilDurable(opCtx.get(),
+ /*forceCheckpoint*/ false,
+ /*stableCheckpoint*/ false);
+
+ // Signal the waiters that a round completed.
+ _currentSharedPromise->emplaceValue();
} catch (const AssertionException& e) {
invariant(e.code() == ErrorCodes::ShutdownInProgress);
+
+ // Signal the waiters that the fsync was interrupted.
+ _currentSharedPromise->setError(e.toStatus());
}
// Wait until either journalCommitIntervalMs passes or an immediate journal flush is
@@ -261,6 +268,7 @@ public:
auto deadline =
Date_t::now() + Milliseconds(storageGlobalParams.journalCommitIntervalMs.load());
+
stdx::unique_lock<Latch> lk(_stateMutex);
MONGO_IDLE_THREAD_BLOCK;
@@ -272,8 +280,14 @@ public:
if (_shuttingDown) {
LOG(1) << "stopping " << name() << " thread";
+ _nextSharedPromise->setError(
+ Status(ErrorCodes::ShutdownInProgress, "The storage catalog is being closed."));
return;
}
+
+ // Take the next promise as current and reset the next promise.
+ _currentSharedPromise =
+ std::exchange(_nextSharedPromise, std::make_unique<SharedPromise<void>>());
}
}
@@ -290,7 +304,7 @@ public:
}
/**
- * Signals an immediate journal flush.
+ * Signals an immediate journal flush and leaves.
*/
void triggerJournalFlush() {
stdx::lock_guard<Latch> lk(_stateMutex);
@@ -300,6 +314,24 @@ public:
}
}
+ /**
+ * Signals an immediate journal flush and waits for it to complete before returning.
+ *
+ * Will throw ShutdownInProgress if the flusher thread is being stopped.
+ */
+ void waitForJournalFlush() {
+ auto myFuture = [&]() {
+ stdx::unique_lock<Latch> lk(_stateMutex);
+ if (!_flushJournalNow) {
+ _flushJournalNow = true;
+ _flushJournalNowCV.notify_one();
+ }
+ return _nextSharedPromise->getFuture();
+ }();
+ // Throws on error if the catalog is closed.
+ myFuture.get();
+ }
+
private:
WiredTigerSessionCache* _sessionCache;
@@ -307,11 +339,19 @@ private:
mutable Mutex _stateMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherStateMutex");
// Signaled to wake up the thread, if the thread is waiting. The thread will check whether
- // _flushJournalNow or _shuttingDown is set.
+ // _flushJournalNow or _shuttingDown is set and flush or stop accordingly.
mutable stdx::condition_variable _flushJournalNowCV;
bool _flushJournalNow = false;
bool _shuttingDown = false;
+
+ // New callers get a future from nextSharedPromise. The JournalFlusher thread will swap that to
+ // currentSharedPromise at the start of every round of flushing, and reset nextSharedPromise
+ // with a new shared promise.
+ std::unique_ptr<SharedPromise<void>> _currentSharedPromise =
+ std::make_unique<SharedPromise<void>>();
+ std::unique_ptr<SharedPromise<void>> _nextSharedPromise =
+ std::make_unique<SharedPromise<void>>();
};
namespace {
@@ -2171,6 +2211,14 @@ void WiredTigerKVEngine::triggerJournalFlush() const {
}
}
+void WiredTigerKVEngine::waitForJournalFlush(OperationContext* opCtx) const {
+ if (_journalFlusher) {
+ _journalFlusher->waitForJournalFlush();
+ } else {
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ }
+}
+
bool WiredTigerKVEngine::isCacheUnderPressure(OperationContext* opCtx) const {
WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSessionNoTxn();
invariant(session);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 40295cb4670..01f7d1c5a8d 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -255,6 +255,8 @@ public:
void triggerJournalFlush() const override;
+ void waitForJournalFlush(OperationContext* opCtx) const override;
+
bool isCacheUnderPressure(OperationContext* opCtx) const override;
bool supportsReadConcernMajority() const final;
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index fa3fa1527a1..018ca7b5d75 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -242,20 +242,12 @@ Status waitForWriteConcern(OperationContext* opCtx,
result->fsyncFiles = storageEngine->flushAllFiles(opCtx, true);
} else {
// We only need to commit the journal if we're durable
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ getGlobalServiceContext()->getStorageEngine()->waitForJournalFlush(opCtx);
}
break;
}
case WriteConcernOptions::SyncMode::JOURNAL:
- if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::Mode::modeNone) {
- // Wait for ops to become durable then update replication system's
- // knowledge of this.
- auto appliedOpTimeAndWallTime = replCoord->getMyLastAppliedOpTimeAndWallTime();
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
- replCoord->setMyLastDurableOpTimeAndWallTimeForward(appliedOpTimeAndWallTime);
- } else {
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
- }
+ getGlobalServiceContext()->getStorageEngine()->waitForJournalFlush(opCtx);
break;
}