summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/db.cpp2
-rw-r--r--src/mongo/db/repl/SConscript5
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/rollback_impl.cpp12
-rw-r--r--src/mongo/db/repl/rollback_impl.h2
-rw-r--r--src/mongo/db/repl/rollback_impl_test.cpp2
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.h12
-rw-r--r--src/mongo/db/repl/storage_interface.h4
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp14
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h2
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h4
-rw-r--r--src/mongo/db/service_context_d_test_fixture.cpp2
-rw-r--r--src/mongo/db/storage/SConscript38
-rw-r--r--src/mongo/db/storage/control/journal_flusher.cpp188
-rw-r--r--src/mongo/db/storage/control/journal_flusher.h106
-rw-r--r--src/mongo/db/storage/control/storage_control.cpp94
-rw-r--r--src/mongo/db/storage/control/storage_control.h83
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h17
-rw-r--r--src/mongo/db/storage/storage_engine.h20
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp12
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h6
-rw-r--r--src/mongo/db/storage/storage_engine_init.cpp2
-rw-r--r--src/mongo/db/storage/storage_engine_mock.h3
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp210
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h10
-rw-r--r--src/mongo/db/write_concern.cpp5
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/dbtests/framework.cpp2
-rw-r--r--src/mongo/embedded/SConscript1
-rw-r--r--src/mongo/embedded/embedded.cpp2
33 files changed, 565 insertions, 308 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 46bb3856235..61e30714695 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -443,6 +443,7 @@ mongod = env.Program(
'db/storage/ephemeral_for_test/storage_ephemeral_for_test',
'db/storage/flow_control',
'db/storage/flow_control_parameters',
+ 'db/storage/storage_control',
'db/storage/storage_engine_lock_file',
'db/storage/storage_engine_metadata',
'db/storage/storage_init_d',
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index d3293189dca..6af9a1f9705 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -954,6 +954,7 @@ env.Library(
"commands/server_status_core",
"s/sharding_api_d",
"shared_request_handling",
+ "$BUILD_DIR/mongo/db/storage/storage_control",
],
)
@@ -1697,6 +1698,7 @@ env.Library(
'index/index_access_methods',
'index_builds_coordinator_mongod',
'service_context_d',
+ 'storage/storage_control',
'storage/devnull/storage_devnull',
'storage/ephemeral_for_test/storage_ephemeral_for_test',
'storage/storage_options',
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index d81dbe5be51..0615dbf351a 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -140,6 +140,7 @@
#include "mongo/db/startup_warnings_mongod.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/backup_cursor_hooks.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/encryption_hooks.h"
#include "mongo/db/storage/flow_control.h"
#include "mongo/db/storage/flow_control_parameters_gen.h"
@@ -352,6 +353,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
serviceContext, repl::ReplicationCoordinator::get(serviceContext)));
initializeStorageEngine(serviceContext, StorageEngineInitFlags::kNone);
+ StorageControl::startStorageControls(serviceContext);
#ifdef MONGO_CONFIG_WIREDTIGER_ENABLED
if (EncryptionHooks::get(serviceContext)->restartRequired()) {
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index f8511f94316..b3ff0c580c9 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -234,6 +234,7 @@ env.Library(
'repl_server_parameters',
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/storage/oplog_cap_maintainer_thread',
+ '$BUILD_DIR/mongo/db/storage/storage_control',
'$BUILD_DIR/mongo/db/logical_clock',
],
)
@@ -565,6 +566,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/commands/mongod_fsync',
+ '$BUILD_DIR/mongo/db/storage/storage_control',
'repl_server_parameters',
'replication_auth',
],
@@ -1168,10 +1170,11 @@ env.Library(
'rollback_source_impl',
],
LIBDEPS_PRIVATE=[
- 'local_oplog_info',
'$BUILD_DIR/mongo/db/commands/mongod_fcv',
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
+ '$BUILD_DIR/mongo/db/storage/storage_control',
+ 'local_oplog_info',
'repl_server_parameters',
],
)
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 2bf49db1f3f..6eba03227f8 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/timer_stats.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/basic.h"
#include "mongo/util/fail_point.h"
@@ -724,8 +725,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
// new writes with timestamps associated with those oplog entries will show up in the future. We
// want to flush the journal as soon as possible in order to free ops waiting with 'j' write
// concern.
- const auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
- storageEngine->triggerJournalFlush();
+ StorageControl::triggerJournalFlush(opCtx->getServiceContext());
// Use this fail point to hold the PBWM lock and prevent the batch from completing.
if (MONGO_unlikely(pauseBatchApplicationBeforeCompletion.shouldFail())) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index dbab6e4ca84..7812031ee3b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -89,6 +89,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/flow_control.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/system_index.h"
@@ -787,13 +788,13 @@ void ReplicationCoordinatorExternalStateImpl::clearOplogVisibilityStateForStepDo
//
// This makes sure the JournalFlusher is not stuck waiting for a lock that stepdown might hold
// before doing an update write to the truncate point.
- _service->getStorageEngine()->interruptJournalFlusherForReplStateChange();
+ StorageControl::interruptJournalFlusherForReplStateChange(_service);
// Wait for another round of journal flushing. This will ensure that we wait for the current
// round to completely finish and have no chance of racing with unsetting the truncate point
// below. It is possible that the JournalFlusher will not check for the interrupt signaled
// above, if writing is imminent, so we must make sure that the code completes fully.
- _service->getStorageEngine()->waitForJournalFlush(opCtx);
+ StorageControl::waitForJournalFlush(opCtx);
// We can clear the oplogTruncateAfterPoint because we know there are no user writes during
// stepdown and therefore presently no oplog holes.
diff --git a/src/mongo/db/repl/rollback_impl.cpp b/src/mongo/db/repl/rollback_impl.cpp
index 9c7c0c92119..37c05f71430 100644
--- a/src/mongo/db/repl/rollback_impl.cpp
+++ b/src/mongo/db/repl/rollback_impl.cpp
@@ -507,11 +507,10 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns(
}
// Recover to the stable timestamp.
- auto stableTimestampSW = _recoverToStableTimestamp(opCtx);
- fassert(31049, stableTimestampSW);
+ auto stableTimestamp = _recoverToStableTimestamp(opCtx);
- _rollbackStats.stableTimestamp = stableTimestampSW.getValue();
- _listener->onRecoverToStableTimestamp(stableTimestampSW.getValue());
+ _rollbackStats.stableTimestamp = stableTimestamp;
+ _listener->onRecoverToStableTimestamp(stableTimestamp);
// Log the total number of insert and update operations that have been rolled back as a
// result of recovering to the stable timestamp.
@@ -555,8 +554,7 @@ void RollbackImpl::_runPhaseFromAbortToReconstructPreparedTxns(
_resetDropPendingState(opCtx);
// Run the recovery process.
- _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx,
- stableTimestampSW.getValue());
+ _replicationProcess->getReplicationRecovery()->recoverFromOplog(opCtx, stableTimestamp);
_listener->onRecoverFromOplog();
// Sets the correct post-rollback counts on any collections whose counts changed during the
@@ -1178,7 +1176,7 @@ void RollbackImpl::_writeRollbackFileForNamespace(OperationContext* opCtx,
_listener->onRollbackFileWrittenForNamespace(std::move(uuid), std::move(nss));
}
-StatusWith<Timestamp> RollbackImpl::_recoverToStableTimestamp(OperationContext* opCtx) {
+Timestamp RollbackImpl::_recoverToStableTimestamp(OperationContext* opCtx) {
// Recover to the stable timestamp while holding the global exclusive lock. This may throw,
// which the caller must handle.
Lock::GlobalWrite globalWrite(opCtx);
diff --git a/src/mongo/db/repl/rollback_impl.h b/src/mongo/db/repl/rollback_impl.h
index 450d799a3d5..07cd40098e3 100644
--- a/src/mongo/db/repl/rollback_impl.h
+++ b/src/mongo/db/repl/rollback_impl.h
@@ -370,7 +370,7 @@ private:
* Recovers to the stable timestamp while holding the global exclusive lock.
* Returns the stable timestamp that the storage engine recovered to.
*/
- StatusWith<Timestamp> _recoverToStableTimestamp(OperationContext* opCtx);
+ Timestamp _recoverToStableTimestamp(OperationContext* opCtx);
/**
* Process a single oplog entry that is getting rolled back and update the necessary rollback
diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp
index 6a540287d8a..f35406c4b5e 100644
--- a/src/mongo/db/repl/rollback_impl_test.cpp
+++ b/src/mongo/db/repl/rollback_impl_test.cpp
@@ -665,7 +665,7 @@ TEST_F(RollbackImplTest, RollbackCallsRecoverToStableTimestamp) {
DEATH_TEST_REGEX_F(RollbackImplTest,
RollbackFassertsIfRecoverToStableTimestampFails,
- "Fatal assertion.*31049") {
+ "Fatal assertion.*45847000") {
auto op = makeOpAndRecordId(1);
_remoteOplog->setOperations({op});
ASSERT_OK(_insertOplogEntry(op.first));
diff --git a/src/mongo/db/repl/rollback_test_fixture.h b/src/mongo/db/repl/rollback_test_fixture.h
index bd675894c0c..445b8efaf0f 100644
--- a/src/mongo/db/repl/rollback_test_fixture.h
+++ b/src/mongo/db/repl/rollback_test_fixture.h
@@ -138,18 +138,18 @@ public:
}
/**
- * If '_recoverToTimestampStatus' is non-empty, returns it. If '_recoverToTimestampStatus' is
+ * If '_recoverToTimestampStatus' is non-empty, fasserts. If '_recoverToTimestampStatus' is
* empty, updates '_currTimestamp' to be equal to '_stableTimestamp' and returns the new value
* of '_currTimestamp'.
*/
- StatusWith<Timestamp> recoverToStableTimestamp(OperationContext* opCtx) override {
+ Timestamp recoverToStableTimestamp(OperationContext* opCtx) override {
stdx::lock_guard<Latch> lock(_mutex);
if (_recoverToTimestampStatus) {
- return _recoverToTimestampStatus.get();
- } else {
- _currTimestamp = _stableTimestamp;
- return _currTimestamp;
+ fassert(45847000, _recoverToTimestampStatus.get());
}
+
+ _currTimestamp = _stableTimestamp;
+ return _currTimestamp;
}
bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override {
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index b03d8e253f1..3f54e5abca1 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -371,8 +371,10 @@ public:
* "local.replset.minvalid" which should be reverted to the last stable timestamp.
*
* The 'stable' timestamp is set by calling StorageInterface::setStableTimestamp.
+ *
+ * Returns the stable timestamp to which it reverted the data.
*/
- virtual StatusWith<Timestamp> recoverToStableTimestamp(OperationContext* opCtx) = 0;
+ virtual Timestamp recoverToStableTimestamp(OperationContext* opCtx) = 0;
/**
* Returns whether the storage engine supports "recover to stable timestamp".
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 8d4f4532e24..5bd6caa4526 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -74,6 +74,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/rollback_gen.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/storage/oplog_cap_maintainer_thread.h"
#include "mongo/logv2/log.h"
@@ -1162,8 +1163,17 @@ void StorageInterfaceImpl::setInitialDataTimestamp(ServiceContext* serviceCtx,
serviceCtx->getStorageEngine()->setInitialDataTimestamp(snapshotName);
}
-StatusWith<Timestamp> StorageInterfaceImpl::recoverToStableTimestamp(OperationContext* opCtx) {
- return opCtx->getServiceContext()->getStorageEngine()->recoverToStableTimestamp(opCtx);
+Timestamp StorageInterfaceImpl::recoverToStableTimestamp(OperationContext* opCtx) {
+ auto serviceContext = opCtx->getServiceContext();
+
+ StorageControl::stopStorageControls(serviceContext);
+
+ auto swStableTimestamp = serviceContext->getStorageEngine()->recoverToStableTimestamp(opCtx);
+ fassert(31049, swStableTimestamp);
+
+ StorageControl::startStorageControls(serviceContext);
+
+ return swStableTimestamp.getValue();
}
bool StorageInterfaceImpl::supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const {
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index f0e558ebf7d..ba7698f30a6 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -164,7 +164,7 @@ public:
void setInitialDataTimestamp(ServiceContext* serviceCtx, Timestamp snapshotName) override;
- StatusWith<Timestamp> recoverToStableTimestamp(OperationContext* opCtx) override;
+ Timestamp recoverToStableTimestamp(OperationContext* opCtx) override;
bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override;
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 00f6e27a031..9f610ded093 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -300,8 +300,8 @@ public:
Timestamp getInitialDataTimestamp() const;
- StatusWith<Timestamp> recoverToStableTimestamp(OperationContext* opCtx) override {
- return Status{ErrorCodes::IllegalOperation, "recoverToStableTimestamp not implemented."};
+ Timestamp recoverToStableTimestamp(OperationContext* opCtx) override {
+ return Timestamp();
}
bool supportsRecoverToStableTimestamp(ServiceContext* serviceCtx) const override {
diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp
index d48597710cc..91f597bd074 100644
--- a/src/mongo/db/service_context_d_test_fixture.cpp
+++ b/src/mongo/db/service_context_d_test_fixture.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/s/collection_sharding_state_factory_shard.h"
#include "mongo/db/service_entry_point_mongod.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/storage_engine_init.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/assert_util.h"
@@ -83,6 +84,7 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine, RepairAct
initializeStorageEngine(serviceContext,
StorageEngineInitFlags::kAllowNoLockFile |
StorageEngineInitFlags::kSkipMetadataFile);
+ StorageControl::startStorageControls(serviceContext);
DatabaseHolder::set(serviceContext, std::make_unique<DatabaseHolderImpl>());
IndexAccessMethodFactory::set(serviceContext, std::make_unique<IndexAccessMethodFactoryImpl>());
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index b14b2bb352f..00ab2d40076 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -79,11 +79,25 @@ env.Library(
target='oplog_hack',
source=[
'oplog_hack.cpp',
- ],
+ ],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
- ]
- )
+ ],
+)
+
+env.Library(
+ target='storage_control',
+ source=[
+ 'control/storage_control.cpp',
+ ],
+ LIBDEPS=[
+ 'journal_flusher',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/service_context',
+ ],
+)
env.Library(
target='oplog_cap_maintainer_thread',
@@ -92,6 +106,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/util/background_job',
+ '$BUILD_DIR/mongo/db/namespace_string',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
@@ -100,7 +115,21 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/collection_catalog',
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
- '$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/service_context',
+ ],
+)
+
+env.Library(
+ target='journal_flusher',
+ source=[
+ 'control/journal_flusher.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/util/background_job',
+ ],
+ LIBDEPS_PRIVATE=[
+ 'storage_options',
+ '$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/service_context',
],
)
@@ -244,6 +273,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ 'storage_control',
'storage_engine_lock_file',
'storage_repair_observer',
'storage_engine_metadata',
diff --git a/src/mongo/db/storage/control/journal_flusher.cpp b/src/mongo/db/storage/control/journal_flusher.cpp
new file mode 100644
index 00000000000..b7b3f9dfd1f
--- /dev/null
+++ b/src/mongo/db/storage/control/journal_flusher.cpp
@@ -0,0 +1,188 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/control/journal_flusher.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/recovery_unit.h"
+#include "mongo/db/storage/storage_options.h"
+#include "mongo/logv2/log.h"
+#include "mongo/stdx/future.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
+#include "mongo/util/fail_point.h"
+
+namespace mongo {
+
+namespace {
+
+const auto getJournalFlusher = ServiceContext::declareDecoration<std::unique_ptr<JournalFlusher>>();
+
+MONGO_FAIL_POINT_DEFINE(pauseJournalFlusherThread);
+
+} // namespace
+
+JournalFlusher* JournalFlusher::get(ServiceContext* serviceCtx) {
+ auto& journalFlusher = getJournalFlusher(serviceCtx);
+ invariant(journalFlusher);
+ return journalFlusher.get();
+}
+
+JournalFlusher* JournalFlusher::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+void JournalFlusher::set(ServiceContext* serviceCtx, std::unique_ptr<JournalFlusher> flusher) {
+ auto& journalFlusher = getJournalFlusher(serviceCtx);
+ if (journalFlusher) {
+ invariant(!journalFlusher->running(),
+ "Tried to reset the JournalFlusher without shutting down the original instance.");
+ }
+
+ invariant(flusher);
+ journalFlusher = std::move(flusher);
+}
+
+void JournalFlusher::run() {
+ ThreadClient tc(name(), getGlobalServiceContext());
+ LOGV2_DEBUG(45847001, 1, "starting {name} thread", "name"_attr = name());
+
+ // Initialize the thread's opCtx.
+ _uniqueCtx.emplace(tc->makeOperationContext());
+
+ // Updates to a non-replicated collection, oplogTruncateAfterPoint, are made by this thread.
+ // Non-replicated writes will not contribute to replication lag and can be safely excluded
+ // from Flow Control.
+ _uniqueCtx->get()->setShouldParticipateInFlowControl(false);
+ while (true) {
+
+ pauseJournalFlusherThread.pauseWhileSet(_uniqueCtx->get());
+
+ try {
+ ON_BLOCK_EXIT([&] {
+ // We do not want to miss an interrupt for the next round. Therefore, the opCtx
+ // will be reset after a flushing round finishes.
+ //
+ // It is fine if the opCtx is signaled between finishing and resetting because
+ // state changes will be seen before the next round. We want to catch any
+ // interrupt signals that occur after state is checked at the start of a round:
+ // the time during or before the next flush.
+ stdx::lock_guard<Latch> lk(_opCtxMutex);
+ _uniqueCtx.reset();
+ _uniqueCtx.emplace(tc->makeOperationContext());
+ _uniqueCtx->get()->setShouldParticipateInFlowControl(false);
+ });
+
+ _uniqueCtx->get()->recoveryUnit()->waitUntilDurable(_uniqueCtx->get());
+
+ // Signal the waiters that a round completed.
+ _currentSharedPromise->emplaceValue();
+ } catch (const AssertionException& e) {
+ invariant(ErrorCodes::isShutdownError(e.code()) ||
+ e.code() == ErrorCodes::InterruptedDueToReplStateChange,
+ e.toString());
+
+ // Signal the waiters that the fsync was interrupted.
+ _currentSharedPromise->setError(e.toStatus());
+ }
+
+ // Wait until either journalCommitIntervalMs passes or an immediate journal flush is
+ // requested (or shutdown).
+
+ auto deadline =
+ Date_t::now() + Milliseconds(storageGlobalParams.journalCommitIntervalMs.load());
+
+ stdx::unique_lock<Latch> lk(_stateMutex);
+
+ MONGO_IDLE_THREAD_BLOCK;
+ _flushJournalNowCV.wait_until(
+ lk, deadline.toSystemTimePoint(), [&] { return _flushJournalNow || _shuttingDown; });
+
+ _flushJournalNow = false;
+
+ if (_shuttingDown) {
+ LOGV2_DEBUG(45847002, 1, "stopping {name} thread", "name"_attr = name());
+ _nextSharedPromise->setError(
+ Status(ErrorCodes::ShutdownInProgress, "The storage catalog is being closed."));
+ stdx::lock_guard<Latch> lk(_opCtxMutex);
+ _uniqueCtx.reset();
+ return;
+ }
+
+ // Take the next promise as current and reset the next promise.
+ _currentSharedPromise =
+ std::exchange(_nextSharedPromise, std::make_unique<SharedPromise<void>>());
+ }
+}
+
+void JournalFlusher::shutdown() {
+ LOGV2(22320, "Shutting down journal flusher thread");
+ {
+ stdx::lock_guard<Latch> lk(_stateMutex);
+ _shuttingDown = true;
+ _flushJournalNowCV.notify_one();
+ }
+ wait();
+ LOGV2(22321, "Finished shutting down journal flusher thread");
+}
+
+void JournalFlusher::triggerJournalFlush() {
+ stdx::lock_guard<Latch> lk(_stateMutex);
+ if (!_flushJournalNow) {
+ _flushJournalNow = true;
+ _flushJournalNowCV.notify_one();
+ }
+}
+
+void JournalFlusher::waitForJournalFlush() {
+ auto myFuture = [&]() {
+ stdx::unique_lock<Latch> lk(_stateMutex);
+ if (!_flushJournalNow) {
+ _flushJournalNow = true;
+ _flushJournalNowCV.notify_one();
+ }
+ return _nextSharedPromise->getFuture();
+ }();
+ // Throws on error if the catalog is closed or the flusher round is interrupted by stepdown.
+ myFuture.get();
+}
+
+void JournalFlusher::interruptJournalFlusherForReplStateChange() {
+ stdx::lock_guard<Latch> lk(_opCtxMutex);
+ if (_uniqueCtx) {
+ stdx::lock_guard<Client> lk(*_uniqueCtx->get()->getClient());
+ _uniqueCtx->get()->markKilled(ErrorCodes::InterruptedDueToReplStateChange);
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/control/journal_flusher.h b/src/mongo/db/storage/control/journal_flusher.h
new file mode 100644
index 00000000000..7204a09104c
--- /dev/null
+++ b/src/mongo/db/storage/control/journal_flusher.h
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/service_context.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/background.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+class OperationContext;
+
+class JournalFlusher : public BackgroundJob {
+public:
+ explicit JournalFlusher() : BackgroundJob(/*deleteSelf*/ false) {}
+
+ static JournalFlusher* get(ServiceContext* serviceCtx);
+ static JournalFlusher* get(OperationContext* opCtx);
+ static void set(ServiceContext* serviceCtx, std::unique_ptr<JournalFlusher> journalFlusher);
+
+ std::string name() const {
+ return "JournalFlusher";
+ }
+
+ void run();
+
+ /**
+ * Signals the thread to quit and then waits until it does.
+ */
+ void shutdown();
+
+ /**
+ * Signals an immediate journal flush and leaves.
+ */
+ void triggerJournalFlush();
+
+ /**
+ * Signals an immediate journal flush and waits for it to complete before returning.
+ *
+ * Will throw ShutdownInProgress if the flusher thread is being stopped.
+ * Will throw InterruptedDueToReplStateChange if a flusher round is interrupted by stepdown.
+ */
+ void waitForJournalFlush();
+
+ /**
+ * Interrupts the journal flusher thread via its operation context with an
+ * InterruptedDueToReplStateChange error.
+ */
+ void interruptJournalFlusherForReplStateChange();
+
+private:
+ // Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed.
+ mutable Mutex _opCtxMutex = MONGO_MAKE_LATCH("JournalFlusherOpCtxMutex");
+
+ // Saves a reference to the flusher thread's operation context so it can be interrupted if the
+ // flusher is active.
+ boost::optional<ServiceContext::UniqueOperationContext> _uniqueCtx;
+
+ // Protects the state below.
+ mutable Mutex _stateMutex = MONGO_MAKE_LATCH("JournalFlusherStateMutex");
+
+ // Signaled to wake up the thread, if the thread is waiting. The thread will check whether
+ // _flushJournalNow or _shuttingDown is set and flush or stop accordingly.
+ mutable stdx::condition_variable _flushJournalNowCV;
+
+ bool _flushJournalNow = false;
+ bool _shuttingDown = false;
+
+ // New callers get a future from nextSharedPromise. The JournalFlusher thread will swap that to
+ // currentSharedPromise at the start of every round of flushing, and reset nextSharedPromise
+ // with a new shared promise.
+ std::unique_ptr<SharedPromise<void>> _currentSharedPromise =
+ std::make_unique<SharedPromise<void>>();
+ std::unique_ptr<SharedPromise<void>> _nextSharedPromise =
+ std::make_unique<SharedPromise<void>>();
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/control/storage_control.cpp b/src/mongo/db/storage/control/storage_control.cpp
new file mode 100644
index 00000000000..db0678ae1ac
--- /dev/null
+++ b/src/mongo/db/storage/control/storage_control.cpp
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/control/storage_control.h"
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/control/journal_flusher.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/background.h"
+
+namespace mongo {
+
+namespace StorageControl {
+
+void startStorageControls(ServiceContext* serviceContext) {
+ auto storageEngine = serviceContext->getStorageEngine();
+
+ // Instantiate a thread to periodically, and upon request, flush writes to disk.
+ if (!storageEngine->isEphemeral() && storageEngine->isDurable()) {
+ std::unique_ptr<JournalFlusher> journalFlusher = std::make_unique<JournalFlusher>();
+ journalFlusher->go();
+ JournalFlusher::set(serviceContext, std::move(journalFlusher));
+ }
+}
+
+void stopStorageControls(ServiceContext* serviceContext) {
+ auto storageEngine = serviceContext->getStorageEngine();
+
+ if (!storageEngine->isEphemeral() && storageEngine->isDurable()) {
+ JournalFlusher::get(serviceContext)->shutdown();
+ }
+}
+
+void triggerJournalFlush(ServiceContext* serviceContext) {
+ auto storageEngine = serviceContext->getStorageEngine();
+
+ if (!storageEngine->isEphemeral() && storageEngine->isDurable()) {
+ JournalFlusher::get(serviceContext)->triggerJournalFlush();
+ }
+}
+
+void waitForJournalFlush(OperationContext* opCtx) {
+ auto serviceContext = opCtx->getServiceContext();
+ auto storageEngine = serviceContext->getStorageEngine();
+
+ if (!storageEngine->isEphemeral() && storageEngine->isDurable()) {
+ JournalFlusher::get(serviceContext)->waitForJournalFlush();
+ } else {
+ opCtx->recoveryUnit()->waitUntilDurable(opCtx);
+ }
+}
+
+void interruptJournalFlusherForReplStateChange(ServiceContext* serviceContext) {
+ auto storageEngine = serviceContext->getStorageEngine();
+
+ if (!storageEngine->isEphemeral() && storageEngine->isDurable()) {
+ JournalFlusher::get(serviceContext)->interruptJournalFlusherForReplStateChange();
+ }
+}
+
+} // namespace StorageControl
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/control/storage_control.h b/src/mongo/db/storage/control/storage_control.h
new file mode 100644
index 00000000000..75a1289adbf
--- /dev/null
+++ b/src/mongo/db/storage/control/storage_control.h
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+
+class OperationContext;
+class ServiceContext;
+
+/**
+ * Helper functions to manipulate independent processes that perform actions against the storage
+ * engine.
+ */
+namespace StorageControl {
+
+/**
+ * Responsible for initializing independent processes for replication that and interact with the
+ * storage layer.
+ *
+ * Instantiates the JournalFlusher to periodically, and upon request, flush writes to disk.
+ *
+ * Safe to call again after stopStorageControls has been called, to restart any processes that were
+ * stopped.
+ */
+void startStorageControls(ServiceContext* serviceContext);
+
+/**
+ * Stops the processes begun by startStorageControls.
+ *
+ * The OplogCapMaintainerThread is unowned and therefore ignored; the JournalFlusher is shut
+ * down.
+ *
+ * Safe to call multiple times.
+ */
+void stopStorageControls(ServiceContext* serviceContext);
+
+/**
+ * Prompts an immediate journal flush and returns without waiting for it.
+ */
+void triggerJournalFlush(ServiceContext* serviceContext);
+
+/**
+ * Initiates if needed and waits for a complete round of journal flushing to execute.
+ *
+ * Can throw ShutdownInProgress if the storage engine is being closed.
+ */
+void waitForJournalFlush(OperationContext* opCtx);
+
+/**
+ * Ensures interruption of the JournalFlusher if it is or will be acquiring a lock.
+ */
+void interruptJournalFlusherForReplStateChange(ServiceContext* serviceContext);
+
+} // namespace StorageControl
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 49ba215ba9a..3716211e99f 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -460,23 +460,6 @@ public:
}
/**
- * See `StorageEngine::triggerJournalFlush()`
- */
- virtual void triggerJournalFlush() const {}
-
- /**
- * See `StorageEngine::waitForJournalFlush()`
- */
- virtual void waitForJournalFlush(OperationContext* opCtx) const {
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
- }
-
- /**
- * See `StorageEngine::interruptJournalFlusherForReplStateChange()`
- */
- virtual void interruptJournalFlusherForReplStateChange() const {}
-
- /**
* Methods to access the storage engine's timestamps.
*/
virtual Timestamp getCheckpointTimestamp() const {
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 1ea159ebccf..e3c28af59cf 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -550,26 +550,6 @@ public:
*/
virtual void setCachePressureForTest(int pressure) = 0;
- /**
- * Prompts an immediate journal flush.
- */
- virtual void triggerJournalFlush() const = 0;
-
- /**
- * Initiates if needed and waits for a complete round of journal flushing to execute.
- *
- * Can throw ShutdownInProgress if the storage engine is being closed.
- */
- virtual void waitForJournalFlush(OperationContext* opCtx) const = 0;
-
- /**
- * Ensures interruption of the JournalFlusher if it is or will be acquiring a lock.
- *
- * TODO: this function will be moved above the Storage Engine layer along with the
- * JournalFlusher in SERVER-45847.
- */
- virtual void interruptJournalFlusherForReplStateChange() const = 0;
-
struct IndexIdentifier {
const RecordId catalogId;
const NamespaceString nss;
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index d249b0a54ea..c90f55d67ac 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -875,18 +875,6 @@ bool StorageEngineImpl::supportsTwoPhaseIndexBuild() const {
return true;
}
-void StorageEngineImpl::triggerJournalFlush() const {
- return _engine->triggerJournalFlush();
-}
-
-void StorageEngineImpl::waitForJournalFlush(OperationContext* opCtx) const {
- return _engine->waitForJournalFlush(opCtx);
-}
-
-void StorageEngineImpl::interruptJournalFlusherForReplStateChange() const {
- return _engine->interruptJournalFlusherForReplStateChange();
-}
-
Timestamp StorageEngineImpl::getAllDurableTimestamp() const {
return _engine->getAllDurableTimestamp();
}
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index 688ecfaa034..dd5cd043142 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -165,12 +165,6 @@ public:
bool supportsTwoPhaseIndexBuild() const final;
- void triggerJournalFlush() const final;
-
- void waitForJournalFlush(OperationContext* opCtx) const final;
-
- void interruptJournalFlusherForReplStateChange() const final;
-
SnapshotManager* getSnapshotManager() const final;
void setJournalListener(JournalListener* jl) final;
diff --git a/src/mongo/db/storage/storage_engine_init.cpp b/src/mongo/db/storage/storage_engine_init.cpp
index c33cf056f36..c9040efc0ee 100644
--- a/src/mongo/db/storage/storage_engine_init.cpp
+++ b/src/mongo/db/storage/storage_engine_init.cpp
@@ -40,6 +40,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/storage_engine_lock_file.h"
#include "mongo/db/storage/storage_engine_metadata.h"
#include "mongo/db/storage/storage_options.h"
@@ -178,6 +179,7 @@ void initializeStorageEngine(ServiceContext* service, const StorageEngineInitFla
void shutdownGlobalStorageEngineCleanly(ServiceContext* service) {
invariant(service->getStorageEngine());
+ StorageControl::stopStorageControls(service);
service->getStorageEngine()->cleanShutdown();
auto& lockFile = StorageEngineLockFile::get(service);
if (lockFile) {
diff --git a/src/mongo/db/storage/storage_engine_mock.h b/src/mongo/db/storage/storage_engine_mock.h
index 1fe279ff909..1b7db400c5c 100644
--- a/src/mongo/db/storage/storage_engine_mock.h
+++ b/src/mongo/db/storage/storage_engine_mock.h
@@ -145,9 +145,6 @@ public:
return false;
}
void setCachePressureForTest(int pressure) final {}
- void triggerJournalFlush() const final {}
- void waitForJournalFlush(OperationContext* opCtx) const final {}
- void interruptJournalFlusherForReplStateChange() const final {}
StatusWith<StorageEngine::ReconcileResult> reconcileCatalogAndIdents(
OperationContext* opCtx) final {
return ReconcileResult{};
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 8385c0b7e64..26ef526ae86 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -111,7 +111,6 @@ MONGO_FAIL_POINT_DEFINE(WTPreserveSnapshotHistoryIndefinitely);
MONGO_FAIL_POINT_DEFINE(WTSetOldestTSToStableTS);
MONGO_FAIL_POINT_DEFINE(pauseCheckpointThread);
-MONGO_FAIL_POINT_DEFINE(pauseJournalFlusherThread);
} // namespace
@@ -238,174 +237,6 @@ private:
stdx::condition_variable _condvar;
};
-class WiredTigerKVEngine::WiredTigerJournalFlusher : public BackgroundJob {
-public:
- explicit WiredTigerJournalFlusher(WiredTigerSessionCache* sessionCache)
- : BackgroundJob(false /* deleteSelf */), _sessionCache(sessionCache) {}
-
- virtual string name() const {
- return "WTJournalFlusher";
- }
-
- virtual void run() {
- ThreadClient tc(name(), getGlobalServiceContext());
- LOGV2_DEBUG(22305, 1, "starting {name} thread", "name"_attr = name());
-
- // Initialize the thread's opCtx.
- _uniqueCtx.emplace(tc->makeOperationContext());
-
- // Updates to a non-replicated collection, oplogTruncateAfterPoint, are made by this thread.
- // Non-replicated writes will not contribute to replication lag and can be safely excluded
- // from Flow Control.
- _uniqueCtx->get()->setShouldParticipateInFlowControl(false);
- while (true) {
-
- pauseJournalFlusherThread.pauseWhileSet(_uniqueCtx->get());
-
- try {
- ON_BLOCK_EXIT([&] {
- // We do not want to miss an interrupt for the next round. Therefore, the opCtx
- // will be reset after a flushing round finishes.
- //
- // It is fine if the opCtx is signaled between finishing and resetting because
- // state changes will be seen before the next round. We want to catch any
- // interrupt signals that occur after state is checked at the start of a round:
- // the time during or before the next flush.
- stdx::lock_guard<Latch> lk(_opCtxMutex);
- _uniqueCtx.reset();
- _uniqueCtx.emplace(tc->makeOperationContext());
- _uniqueCtx->get()->setShouldParticipateInFlowControl(false);
- });
-
- _sessionCache->waitUntilDurable(
- _uniqueCtx->get(),
- WiredTigerSessionCache::Fsync::kJournal,
- WiredTigerSessionCache::UseJournalListener::kUpdate);
-
- // Signal the waiters that a round completed.
- _currentSharedPromise->emplaceValue();
- } catch (const AssertionException& e) {
- invariant(ErrorCodes::isShutdownError(e.code()) ||
- e.code() == ErrorCodes::InterruptedDueToReplStateChange,
- e.toString());
-
- // Signal the waiters that the fsync was interrupted.
- _currentSharedPromise->setError(e.toStatus());
- }
-
- // Wait until either journalCommitIntervalMs passes or an immediate journal flush is
- // requested (or shutdown).
-
- auto deadline =
- Date_t::now() + Milliseconds(storageGlobalParams.journalCommitIntervalMs.load());
-
- stdx::unique_lock<Latch> lk(_stateMutex);
-
- MONGO_IDLE_THREAD_BLOCK;
- _flushJournalNowCV.wait_until(lk, deadline.toSystemTimePoint(), [&] {
- return _flushJournalNow || _shuttingDown;
- });
-
- _flushJournalNow = false;
-
- if (_shuttingDown) {
- LOGV2_DEBUG(22306, 1, "stopping {name} thread", "name"_attr = name());
- _nextSharedPromise->setError(
- Status(ErrorCodes::ShutdownInProgress, "The storage catalog is being closed."));
- stdx::lock_guard<Latch> lk(_opCtxMutex);
- _uniqueCtx.reset();
- return;
- }
-
- // Take the next promise as current and reset the next promise.
- _currentSharedPromise =
- std::exchange(_nextSharedPromise, std::make_unique<SharedPromise<void>>());
- }
- }
-
- /**
- * Signals the thread to quit and then waits until it does.
- */
- void shutdown() {
- {
- stdx::lock_guard<Latch> lk(_stateMutex);
- _shuttingDown = true;
- _flushJournalNowCV.notify_one();
- }
- wait();
- }
-
- /**
- * Signals an immediate journal flush and leaves.
- */
- void triggerJournalFlush() {
- stdx::lock_guard<Latch> lk(_stateMutex);
- if (!_flushJournalNow) {
- _flushJournalNow = true;
- _flushJournalNowCV.notify_one();
- }
- }
-
- /**
- * Signals an immediate journal flush and waits for it to complete before returning.
- *
- * Will throw ShutdownInProgress if the flusher thread is being stopped.
- * Will throw InterruptedDueToReplStateChange if a flusher round is interrupted by stepdown.
- */
- void waitForJournalFlush() {
- auto myFuture = [&]() {
- stdx::unique_lock<Latch> lk(_stateMutex);
- if (!_flushJournalNow) {
- _flushJournalNow = true;
- _flushJournalNowCV.notify_one();
- }
- return _nextSharedPromise->getFuture();
- }();
- // Throws on error if the catalog is closed or the flusher round is interrupted by stepdown.
- myFuture.get();
- }
-
- /**
- * Interrupts the journal flusher thread via its operation context with an
- * InterruptedDueToReplStateChange error.
- */
- void interruptJournalFlusherForReplStateChange() {
- stdx::lock_guard<Latch> lk(_opCtxMutex);
- if (_uniqueCtx) {
- stdx::lock_guard<Client> lk(*_uniqueCtx->get()->getClient());
- _uniqueCtx->get()->markKilled(ErrorCodes::InterruptedDueToReplStateChange);
- }
- }
-
-private:
- WiredTigerSessionCache* _sessionCache;
-
- // Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed.
- mutable Mutex _opCtxMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherOpCtxMutex");
-
- // Saves a reference to the flusher thread's operation context so it can be interrupted if the
- // flusher is active.
- boost::optional<ServiceContext::UniqueOperationContext> _uniqueCtx;
-
- // Protects the state below.
- mutable Mutex _stateMutex = MONGO_MAKE_LATCH("WiredTigerJournalFlusherStateMutex");
-
- // Signaled to wake up the thread, if the thread is waiting. The thread will check whether
- // _flushJournalNow or _shuttingDown is set and flush or stop accordingly.
- mutable stdx::condition_variable _flushJournalNowCV;
-
- bool _flushJournalNow = false;
- bool _shuttingDown = false;
-
- // New callers get a future from nextSharedPromise. The JournalFlusher thread will swap that to
- // currentSharedPromise at the start of every round of flushing, and reset nextSharedPromise
- // with a new shared promise.
- std::unique_ptr<SharedPromise<void>> _currentSharedPromise =
- std::make_unique<SharedPromise<void>>();
- std::unique_ptr<SharedPromise<void>> _nextSharedPromise =
- std::make_unique<SharedPromise<void>>();
-};
-
namespace {
/**
@@ -987,10 +818,6 @@ WiredTigerKVEngine::~WiredTigerKVEngine() {
void WiredTigerKVEngine::startAsyncThreads() {
if (!_ephemeral) {
- if (_durable) {
- _journalFlusher = std::make_unique<WiredTigerJournalFlusher>(_sessionCache.get());
- _journalFlusher->go();
- }
if (!_readOnly) {
_checkpointThread =
std::make_unique<WiredTigerCheckpointThread>(this, _sessionCache.get());
@@ -1082,11 +909,6 @@ void WiredTigerKVEngine::cleanShutdown() {
_sessionSweeper->shutdown();
LOGV2(22319, "Finished shutting down session sweeper thread");
}
- if (_journalFlusher) {
- LOGV2(22320, "Shutting down journal flusher thread");
- _journalFlusher->shutdown();
- LOGV2(22321, "Finished shutting down journal flusher thread");
- }
if (_checkpointThread) {
LOGV2(22322, "Shutting down checkpoint thread");
_checkpointThread->shutdown();
@@ -2156,13 +1978,8 @@ StatusWith<Timestamp> WiredTigerKVEngine::recoverToStableTimestamp(OperationCont
if (!_ephemeral) {
LOGV2_FOR_ROLLBACK(
- 23990,
- 2,
- "WiredTiger::RecoverToStableTimestamp shutting down journal and checkpoint threads.");
+ 23990, 2, "WiredTiger::RecoverToStableTimestamp shutting down checkpoint thread.");
// Shutdown WiredTigerKVEngine owned accesses into the storage engine.
- if (_durable) {
- _journalFlusher->shutdown();
- }
_checkpointThread->shutdown();
}
@@ -2182,10 +1999,6 @@ StatusWith<Timestamp> WiredTigerKVEngine::recoverToStableTimestamp(OperationCont
}
if (!_ephemeral) {
- if (_durable) {
- _journalFlusher = std::make_unique<WiredTigerJournalFlusher>(_sessionCache.get());
- _journalFlusher->go();
- }
_checkpointThread = std::make_unique<WiredTigerCheckpointThread>(this, _sessionCache.get());
_checkpointThread->go();
}
@@ -2368,27 +2181,6 @@ void WiredTigerKVEngine::haltOplogManager() {
}
}
-void WiredTigerKVEngine::triggerJournalFlush() const {
- if (_journalFlusher) {
- _journalFlusher->triggerJournalFlush();
- }
-}
-
-void WiredTigerKVEngine::waitForJournalFlush(OperationContext* opCtx) const {
- if (_journalFlusher) {
- _journalFlusher->waitForJournalFlush();
- } else {
- opCtx->recoveryUnit()->waitUntilDurable(opCtx);
- }
-}
-
-
-void WiredTigerKVEngine::interruptJournalFlusherForReplStateChange() const {
- if (_journalFlusher) {
- _journalFlusher->interruptJournalFlusherForReplStateChange();
- }
-}
-
bool WiredTigerKVEngine::isCacheUnderPressure(OperationContext* opCtx) const {
WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSessionNoTxn();
invariant(session);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index c3ec2ad6ad8..8f948cbc8b7 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -253,12 +253,6 @@ public:
bool supportsOplogStones() const final override;
- void triggerJournalFlush() const override;
-
- void waitForJournalFlush(OperationContext* opCtx) const override;
-
- void interruptJournalFlusherForReplStateChange() const override;
-
bool isCacheUnderPressure(OperationContext* opCtx) const override;
bool supportsReadConcernMajority() const final;
@@ -300,7 +294,7 @@ public:
* `waitForAllEarlierOplogWritesToBeVisible`, is advised to first see if the oplog manager is
* running with a call to `isRunning`.
*
- * A caller that simply wants to call `triggerJournalFlush` may do so without concern.
+ * A caller that simply wants to call `triggerOplogVisibilityUpdate` may do so without concern.
*/
WiredTigerOplogManager* getOplogManager() const {
return _oplogManager.get();
@@ -371,7 +365,6 @@ public:
private:
class WiredTigerSessionSweeper;
- class WiredTigerJournalFlusher;
class WiredTigerCheckpointThread;
/**
@@ -461,7 +454,6 @@ private:
const bool _keepDataHistory = true;
std::unique_ptr<WiredTigerSessionSweeper> _sessionSweeper;
- std::unique_ptr<WiredTigerJournalFlusher> _journalFlusher; // Depends on _sizeStorer
std::unique_ptr<WiredTigerCheckpointThread> _checkpointThread;
std::string _rsOptions;
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index d72aa98df12..d5749ac7e5c 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/timer_stats.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/transaction_validation.h"
#include "mongo/db/write_concern_options.h"
@@ -313,13 +314,13 @@ Status waitForWriteConcern(OperationContext* opCtx,
result->fsyncFiles = 1;
} else {
// We only need to commit the journal if we're durable
- storageEngine->waitForJournalFlush(opCtx);
+ StorageControl::waitForJournalFlush(opCtx);
}
break;
}
case WriteConcernOptions::SyncMode::JOURNAL:
waitForNoOplogHolesIfNeeded(opCtx);
- storageEngine->waitForJournalFlush(opCtx);
+ StorageControl::waitForJournalFlush(opCtx);
break;
}
} catch (const DBException& ex) {
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 8c26c01d965..26111c36ca9 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -37,6 +37,7 @@ env.Library(
'$BUILD_DIR/mongo/db/op_observer',
'$BUILD_DIR/mongo/db/service_context_d',
'$BUILD_DIR/mongo/db/s/sharding_runtime_d',
+ '$BUILD_DIR/mongo/db/storage/storage_control',
'$BUILD_DIR/mongo/scripting/scripting_common',
'$BUILD_DIR/mongo/unittest/unittest',
'framework_options',
diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp
index d16101b7fc0..4af0488ad20 100644
--- a/src/mongo/dbtests/framework.cpp
+++ b/src/mongo/dbtests/framework.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/s/collection_sharding_state_factory_shard.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/storage_engine_init.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/dbtests/framework_options.h"
@@ -105,6 +106,7 @@ int runDbTests(int argc, char** argv) {
globalServiceContext->setPeriodicRunner(std::move(runner));
initializeStorageEngine(globalServiceContext, StorageEngineInitFlags::kNone);
+ StorageControl::startStorageControls(globalServiceContext);
DatabaseHolder::set(globalServiceContext, std::make_unique<DatabaseHolderImpl>());
IndexAccessMethodFactory::set(globalServiceContext,
std::make_unique<IndexAccessMethodFactoryImpl>());
diff --git a/src/mongo/embedded/SConscript b/src/mongo/embedded/SConscript
index 63ccc64fa10..6938957ec7d 100644
--- a/src/mongo/embedded/SConscript
+++ b/src/mongo/embedded/SConscript
@@ -115,6 +115,7 @@ env.Library(
'$BUILD_DIR/mongo/db/service_liaison_mongod',
'$BUILD_DIR/mongo/db/sessions_collection_standalone',
'$BUILD_DIR/mongo/db/storage/wiredtiger/storage_wiredtiger' if wiredtiger else [],
+ '$BUILD_DIR/mongo/db/storage/storage_control',
'$BUILD_DIR/mongo/db/storage/storage_engine_common',
'$BUILD_DIR/mongo/db/storage/storage_engine_lock_file',
'$BUILD_DIR/mongo/db/storage/storage_engine_metadata',
diff --git a/src/mongo/embedded/embedded.cpp b/src/mongo/embedded/embedded.cpp
index cb5921fb0dc..ca065dd9469 100644
--- a/src/mongo/embedded/embedded.cpp
+++ b/src/mongo/embedded/embedded.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/service_liaison_mongod.h"
#include "mongo/db/session_killer.h"
#include "mongo/db/sessions_collection_standalone.h"
+#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/encryption_hooks.h"
#include "mongo/db/storage/storage_engine_init.h"
#include "mongo/db/ttl.h"
@@ -239,6 +240,7 @@ ServiceContext* initialize(const char* yaml_config) {
setUpCatalog(serviceContext);
initializeStorageEngine(serviceContext, StorageEngineInitFlags::kAllowNoLockFile);
+ StorageControl::startStorageControls(serviceContext);
// Warn if we detect configurations for multiple registered storage engines in the same
// configuration file/environment.