summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/db.cpp16
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp1
-rw-r--r--src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp52
-rw-r--r--src/mongo/db/periodic_runner_job_abort_expired_transactions.h30
-rw-r--r--src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp53
-rw-r--r--src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h25
-rw-r--r--src/mongo/db/s/periodic_balancer_config_refresher.cpp15
-rw-r--r--src/mongo/db/s/periodic_balancer_config_refresher.h2
-rw-r--r--src/mongo/db/service_context.h10
-rw-r--r--src/mongo/db/service_context_d_test_fixture.cpp1
-rw-r--r--src/mongo/db/service_liaison_mock.cpp5
-rw-r--r--src/mongo/db/service_liaison_mongod.cpp22
-rw-r--r--src/mongo/db/service_liaison_mongod.h4
-rw-r--r--src/mongo/db/service_liaison_mongos.cpp18
-rw-r--r--src/mongo/db/service_liaison_mongos.h3
-rw-r--r--src/mongo/db/storage/flow_control.cpp3
-rw-r--r--src/mongo/db/storage/flow_control.h2
-rw-r--r--src/mongo/db/storage/kv/storage_engine_impl.cpp8
-rw-r--r--src/mongo/db/storage/kv/storage_engine_impl.h1
-rw-r--r--src/mongo/db/storage/kv/storage_engine_test.cpp4
-rw-r--r--src/mongo/db/storage/mobile/mobile_kv_engine.cpp2
-rw-r--r--src/mongo/db/storage/mobile/mobile_kv_engine.h2
22 files changed, 184 insertions, 95 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index a35ab54a2f1..bd9e2a9b75f 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -326,7 +326,6 @@ ExitCode _initAndListen(int listenPort) {
// Set up the periodic runner for background job execution. This is required to be running
// before the storage engine is initialized.
auto runner = makePeriodicRunner(serviceContext);
- runner->startup();
serviceContext->setPeriodicRunner(std::move(runner));
FlowControl::set(serviceContext,
std::make_unique<FlowControl>(
@@ -603,8 +602,8 @@ ExitCode _initAndListen(int listenPort) {
// Only do this on storage engines supporting snapshot reads, which hold resources we wish to
// release periodically in order to avoid storage cache pressure build up.
if (storageEngine->supportsReadConcernSnapshot()) {
- startPeriodicThreadToAbortExpiredTransactions(serviceContext);
- startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(serviceContext);
+ PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->start();
+ PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(serviceContext)->start();
}
// Set up the logical session cache
@@ -919,13 +918,12 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
flowControlTicketholder->setInShutdown();
}
- // Shut down the background periodic task runner. This must be done before shutting down the
- // storage engine.
- if (auto runner = serviceContext->getPeriodicRunner()) {
- runner->shutdown();
- }
+ if (auto storageEngine = serviceContext->getStorageEngine()) {
+ if (storageEngine->supportsReadConcernSnapshot()) {
+ PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->stop();
+ PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(serviceContext)->stop();
+ }
- if (serviceContext->getStorageEngine()) {
ServiceContext::UniqueOperationContext uniqueOpCtx;
OperationContext* opCtx = client->getOperationContext();
if (!opCtx) {
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 2617d9060c4..8afd9f6889f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -41,7 +41,6 @@
#include "mongo/platform/atomic_word.h"
#include "mongo/util/duration.h"
#include "mongo/util/log.h"
-#include "mongo/util/periodic_runner.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
index f3fa239daac..0f55d053fb3 100644
--- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
+++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
@@ -58,13 +58,32 @@ Milliseconds getPeriod(const Argument& transactionLifetimeLimitSeconds) {
return period;
}
+
} // namespace
-void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext) {
- // Enforce calling this function once, and only once.
- static bool firstCall = true;
- invariant(firstCall);
- firstCall = false;
+auto PeriodicThreadToAbortExpiredTransactions::get(ServiceContext* serviceContext)
+ -> PeriodicThreadToAbortExpiredTransactions& {
+ auto& jobContainer = _serviceDecoration(serviceContext);
+ jobContainer._init(serviceContext);
+
+ return jobContainer;
+}
+
+auto PeriodicThreadToAbortExpiredTransactions::operator*() const noexcept -> PeriodicJobAnchor& {
+ stdx::lock_guard lk(_mutex);
+ return *_anchor;
+}
+
+auto PeriodicThreadToAbortExpiredTransactions::operator-> () const noexcept -> PeriodicJobAnchor* {
+ stdx::lock_guard lk(_mutex);
+ return _anchor.get();
+}
+
+void PeriodicThreadToAbortExpiredTransactions::_init(ServiceContext* serviceContext) {
+ stdx::lock_guard lk(_mutex);
+ if (_anchor) {
+ return;
+ }
auto periodicRunner = serviceContext->getPeriodicRunner();
invariant(periodicRunner);
@@ -87,18 +106,17 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex
},
getPeriod(gTransactionLifetimeLimitSeconds.load()));
- auto handle = periodicRunner->makeJob(std::move(job));
- handle->start();
-
- TransactionParticipant::observeTransactionLifetimeLimitSeconds
- .addObserver([handle = std::move(handle)](const Argument& secs) {
- try {
- handle->setPeriod(getPeriod(secs));
- } catch (const DBException& ex) {
- log() << "Failed to update period of thread which aborts expired transactions "
- << ex.toStatus();
- }
- });
+ _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
+
+ TransactionParticipant::observeTransactionLifetimeLimitSeconds.addObserver([anchor = _anchor](
+ const Argument& secs) {
+ try {
+ anchor->setPeriod(getPeriod(secs));
+ } catch (const DBException& ex) {
+ log() << "Failed to update period of thread which aborts expired transactions "
+ << ex.toStatus();
+ }
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
index 6f07e37c247..88bf08d7ee5 100644
--- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
+++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
@@ -29,18 +29,34 @@
#pragma once
-namespace mongo {
+#include <memory>
+
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/periodic_runner.h"
-class ServiceContext;
+namespace mongo {
/**
- * Defines and starts a periodic background job to check for and abort expired transactions.
+ * Defines a periodic background job to check for and abort expired transactions.
* The job will run every (transactionLifetimeLimitSeconds/2) seconds, or at most once per second
* and at least once per minute.
- *
- * This function should only ever be called once, during mongod server startup (db.cpp).
- * The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary.
*/
-void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext);
+class PeriodicThreadToAbortExpiredTransactions {
+public:
+ static PeriodicThreadToAbortExpiredTransactions& get(ServiceContext* serviceContext);
+
+ PeriodicJobAnchor& operator*() const noexcept;
+ PeriodicJobAnchor* operator->() const noexcept;
+
+private:
+ void _init(ServiceContext* serviceContext);
+
+ inline static const auto _serviceDecoration =
+ ServiceContext::declareDecoration<PeriodicThreadToAbortExpiredTransactions>();
+
+ mutable stdx::mutex _mutex;
+ std::shared_ptr<PeriodicJobAnchor> _anchor;
+};
} // namespace mongo
diff --git a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp
index 8e4f6594bab..c27418d948f 100644
--- a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp
+++ b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp
@@ -43,11 +43,30 @@
namespace mongo {
-void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* serviceContext) {
- // Enforce calling this function once, and only once.
- static bool firstCall = true;
- invariant(firstCall);
- firstCall = false;
+auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(ServiceContext* serviceContext)
+ -> PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded& {
+ auto& jobContainer = _serviceDecoration(serviceContext);
+ jobContainer._init(serviceContext);
+ return jobContainer;
+}
+
+auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::operator-> () const noexcept
+ -> PeriodicJobAnchor* {
+ stdx::lock_guard lk(_mutex);
+ return _anchor.get();
+}
+
+auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::operator*() const noexcept
+ -> PeriodicJobAnchor& {
+ stdx::lock_guard lk(_mutex);
+ return *_anchor;
+}
+
+void PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::_init(ServiceContext* serviceContext) {
+ stdx::lock_guard lk(_mutex);
+ if (_anchor) {
+ return;
+ }
auto periodicRunner = serviceContext->getPeriodicRunner();
invariant(periodicRunner);
@@ -71,19 +90,19 @@ void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* ser
},
Seconds(snapshotWindowParams.decreaseHistoryIfNotNeededPeriodSeconds.load()));
- auto handle = periodicRunner->makeJob(std::move(job));
- handle->start();
+ _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
- SnapshotWindowParams::observeDecreaseHistoryIfNotNeededPeriodSeconds
- .addObserver([handle = std::move(handle)](const auto& secs) {
- try {
- handle->setPeriod(Seconds(secs));
- } catch (const DBException& ex) {
- log() << "Failed to update the period of the thread which decreases data history "
- "target window size if there have been no new SnapshotTooOld errors."
- << ex.toStatus();
- }
- });
+ SnapshotWindowParams::observeDecreaseHistoryIfNotNeededPeriodSeconds.addObserver([anchor =
+ _anchor](
+ const auto& secs) {
+ try {
+ anchor->setPeriod(Seconds(secs));
+ } catch (const DBException& ex) {
+ log() << "Failed to update the period of the thread which decreases data history "
+ "target window size if there have been no new SnapshotTooOld errors."
+ << ex.toStatus();
+ }
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h
index 1b6c2e65b18..f6e23949a24 100644
--- a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h
+++ b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h
@@ -29,9 +29,13 @@
#pragma once
-namespace mongo {
+#include <memory>
+
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/periodic_runner.h"
-class ServiceContext;
+namespace mongo {
/**
* Periodically checks whether there has been any storage engine cache pressure and SnapshotTooOld
@@ -43,6 +47,21 @@ class ServiceContext;
* This function should only ever be called once, during mongod server startup (db.cpp).
* The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary.
*/
-void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* serviceContext);
+class PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded {
+public:
+ static PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded& get(ServiceContext* serviceContext);
+
+ PeriodicJobAnchor* operator->() const noexcept;
+ PeriodicJobAnchor& operator*() const noexcept;
+
+private:
+ void _init(ServiceContext* serviceContext);
+
+ inline static const auto _serviceDecoration =
+ ServiceContext::declareDecoration<PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded>();
+
+ mutable stdx::mutex _mutex;
+ std::shared_ptr<PeriodicJobAnchor> _anchor;
+};
} // namespace mongo
diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.cpp b/src/mongo/db/s/periodic_balancer_config_refresher.cpp
index 87b8ad55a4d..f9cab569d89 100644
--- a/src/mongo/db/s/periodic_balancer_config_refresher.cpp
+++ b/src/mongo/db/s/periodic_balancer_config_refresher.cpp
@@ -46,8 +46,7 @@ namespace {
const auto getPeriodicBalancerConfigRefresher =
ServiceContext::declareDecoration<PeriodicBalancerConfigRefresher>();
-std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher(
- ServiceContext* serviceContext) {
+PeriodicJobAnchor launchBalancerConfigRefresher(ServiceContext* serviceContext) {
auto periodicRunner = serviceContext->getPeriodicRunner();
invariant(periodicRunner);
@@ -66,7 +65,7 @@ std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher
},
Seconds(30));
auto balancerConfigRefresher = periodicRunner->makeJob(std::move(job));
- balancerConfigRefresher->start();
+ balancerConfigRefresher.start();
return balancerConfigRefresher;
}
@@ -86,7 +85,7 @@ void PeriodicBalancerConfigRefresher::onShardingInitialization(ServiceContext* s
_isPrimary = isPrimary;
// This function is called on sharding state initialization, so go ahead
// and start up the balancer config refresher task if we're a primary.
- if (isPrimary && !_balancerConfigRefresher) {
+ if (isPrimary && !_balancerConfigRefresher.isValid()) {
_balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext);
}
}
@@ -95,12 +94,12 @@ void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) {
_isPrimary = true;
// If this is the first time we're stepping up, start a thread to periodically refresh the
// balancer configuration.
- if (!_balancerConfigRefresher) {
+ if (!_balancerConfigRefresher.isValid()) {
_balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext);
} else {
// If we're stepping up again after having stepped down, just resume
// the existing task.
- _balancerConfigRefresher->resume();
+ _balancerConfigRefresher.resume();
}
}
}
@@ -108,9 +107,9 @@ void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) {
void PeriodicBalancerConfigRefresher::onStepDown() {
if (_isPrimary) {
_isPrimary = false;
- invariant(_balancerConfigRefresher);
+ invariant(_balancerConfigRefresher.isValid());
// We don't need to be refreshing the balancer configuration unless we're primary.
- _balancerConfigRefresher->pause();
+ _balancerConfigRefresher.pause();
}
}
diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.h b/src/mongo/db/s/periodic_balancer_config_refresher.h
index 96a36202025..a331f9cfab5 100644
--- a/src/mongo/db/s/periodic_balancer_config_refresher.h
+++ b/src/mongo/db/s/periodic_balancer_config_refresher.h
@@ -80,6 +80,6 @@ private:
bool _isPrimary{false};
// Periodic job for refreshing the balancer configuration
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _balancerConfigRefresher;
+ PeriodicJobAnchor _balancerConfigRefresher;
};
} // namespace mongo
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index d122a32e5e3..0aa04389245 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -533,11 +533,6 @@ private:
stdx::mutex _mutex;
/**
- * The storage engine, if any.
- */
- std::unique_ptr<StorageEngine> _storageEngine;
-
- /**
* The periodic runner.
*/
std::unique_ptr<PeriodicRunner> _runner;
@@ -558,6 +553,11 @@ private:
std::unique_ptr<transport::ServiceExecutor> _serviceExecutor;
/**
+ * The storage engine, if any.
+ */
+ std::unique_ptr<StorageEngine> _storageEngine;
+
+ /**
* Vector of registered observers.
*/
std::vector<ClientObserverHolder> _clientObservers;
diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp
index 873e558082e..992e8a86967 100644
--- a/src/mongo/db/service_context_d_test_fixture.cpp
+++ b/src/mongo/db/service_context_d_test_fixture.cpp
@@ -73,7 +73,6 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine, RepairAct
// Set up the periodic runner to allow background job execution for tests that require it.
auto runner = makePeriodicRunner(getServiceContext());
- runner->startup();
getServiceContext()->setPeriodicRunner(std::move(runner));
storageGlobalParams.dbpath = _tempDir.path();
diff --git a/src/mongo/db/service_liaison_mock.cpp b/src/mongo/db/service_liaison_mock.cpp
index fd95a7ebc6d..ab4397f1980 100644
--- a/src/mongo/db/service_liaison_mock.cpp
+++ b/src/mongo/db/service_liaison_mock.cpp
@@ -40,7 +40,6 @@ namespace mongo {
MockServiceLiaisonImpl::MockServiceLiaisonImpl() {
_timerFactory = std::make_unique<executor::AsyncTimerFactoryMock>();
_runner = makePeriodicRunner(getGlobalServiceContext());
- _runner->startup();
}
LogicalSessionIdSet MockServiceLiaisonImpl::getActiveOpSessions() const {
@@ -53,9 +52,7 @@ LogicalSessionIdSet MockServiceLiaisonImpl::getOpenCursorSessions(OperationConte
return _cursorSessions;
}
-void MockServiceLiaisonImpl::join() {
- _runner->shutdown();
-}
+void MockServiceLiaisonImpl::join() {}
Date_t MockServiceLiaisonImpl::now() const {
return _timerFactory->now();
diff --git a/src/mongo/db/service_liaison_mongod.cpp b/src/mongo/db/service_liaison_mongod.cpp
index 1008267bf19..94e1fbd9217 100644
--- a/src/mongo/db/service_liaison_mongod.cpp
+++ b/src/mongo/db/service_liaison_mongod.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
#include "mongo/platform/basic.h"
#include "mongo/db/service_liaison_mongod.h"
@@ -37,6 +39,7 @@
#include "mongo/db/service_context.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -73,17 +76,20 @@ LogicalSessionIdSet ServiceLiaisonMongod::getOpenCursorSessions(OperationContext
void ServiceLiaisonMongod::scheduleJob(PeriodicRunner::PeriodicJob job) {
invariant(hasGlobalServiceContext());
- auto jobHandle = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
- jobHandle->start();
- _jobs.push_back(std::move(jobHandle));
+ auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
+ jobAnchor.start();
+
+ {
+ stdx::lock_guard lk(_mutex);
+ _jobs.push_back(std::move(jobAnchor));
+ }
}
void ServiceLiaisonMongod::join() {
- invariant(hasGlobalServiceContext());
- for (auto&& jobHandle : _jobs) {
- jobHandle->stop();
- }
- _jobs.clear();
+ auto jobs = [&] {
+ stdx::lock_guard lk(_mutex);
+ return std::exchange(_jobs, {});
+ }();
}
Date_t ServiceLiaisonMongod::now() const {
diff --git a/src/mongo/db/service_liaison_mongod.h b/src/mongo/db/service_liaison_mongod.h
index c334eb5f36c..b1060425f6f 100644
--- a/src/mongo/db/service_liaison_mongod.h
+++ b/src/mongo/db/service_liaison_mongod.h
@@ -68,7 +68,9 @@ protected:
* Returns the service context.
*/
ServiceContext* _context() override;
- std::vector<std::unique_ptr<PeriodicRunner::PeriodicJobHandle>> _jobs;
+
+ stdx::mutex _mutex;
+ std::vector<PeriodicJobAnchor> _jobs;
};
} // namespace mongo
diff --git a/src/mongo/db/service_liaison_mongos.cpp b/src/mongo/db/service_liaison_mongos.cpp
index 7da5e508109..666ca06ea68 100644
--- a/src/mongo/db/service_liaison_mongos.cpp
+++ b/src/mongo/db/service_liaison_mongos.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
#include "mongo/platform/basic.h"
#include "mongo/db/service_liaison_mongos.h"
@@ -36,7 +38,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/clock_source.h"
-#include "mongo/util/periodic_runner.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -79,12 +81,20 @@ LogicalSessionIdSet ServiceLiaisonMongos::getOpenCursorSessions(OperationContext
void ServiceLiaisonMongos::scheduleJob(PeriodicRunner::PeriodicJob job) {
invariant(hasGlobalServiceContext());
- getGlobalServiceContext()->getPeriodicRunner()->scheduleJob(std::move(job));
+ auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
+ jobAnchor.start();
+
+ {
+ stdx::lock_guard lk(_mutex);
+ _jobs.push_back(std::move(jobAnchor));
+ }
}
void ServiceLiaisonMongos::join() {
- invariant(hasGlobalServiceContext());
- getGlobalServiceContext()->getPeriodicRunner()->shutdown();
+ auto jobs = [&] {
+ stdx::lock_guard lk(_mutex);
+ return std::exchange(_jobs, {});
+ }();
}
Date_t ServiceLiaisonMongos::now() const {
diff --git a/src/mongo/db/service_liaison_mongos.h b/src/mongo/db/service_liaison_mongos.h
index 580dc3cf00e..ab40801557d 100644
--- a/src/mongo/db/service_liaison_mongos.h
+++ b/src/mongo/db/service_liaison_mongos.h
@@ -68,6 +68,9 @@ protected:
* Returns the service context.
*/
ServiceContext* _context() override;
+
+ stdx::mutex _mutex;
+ std::vector<PeriodicJobAnchor> _jobs;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/flow_control.cpp b/src/mongo/db/storage/flow_control.cpp
index 3f1d2e0fec3..df076e2d82c 100644
--- a/src/mongo/db/storage/flow_control.cpp
+++ b/src/mongo/db/storage/flow_control.cpp
@@ -134,12 +134,13 @@ FlowControl::FlowControl(ServiceContext* service, repl::ReplicationCoordinator*
// cause a slow start on start up.
FlowControlTicketholder::set(service, std::make_unique<FlowControlTicketholder>(_kMaxTickets));
- service->getPeriodicRunner()->scheduleJob(
+ _jobAnchor = service->getPeriodicRunner()->makeJob(
{"FlowControlRefresher",
[this](Client* client) {
FlowControlTicketholder::get(client->getServiceContext())->refreshTo(getNumTickets());
},
Seconds(1)});
+ _jobAnchor.start();
}
FlowControl* FlowControl::get(ServiceContext* service) {
diff --git a/src/mongo/db/storage/flow_control.h b/src/mongo/db/storage/flow_control.h
index e3905a3728b..64f0d0b1d00 100644
--- a/src/mongo/db/storage/flow_control.h
+++ b/src/mongo/db/storage/flow_control.h
@@ -141,6 +141,8 @@ private:
// This value is used for calculating server status metrics.
std::uint64_t _startWaitTime = 0;
+
+ PeriodicJobAnchor _jobAnchor;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/kv/storage_engine_impl.cpp b/src/mongo/db/storage/kv/storage_engine_impl.cpp
index 3aa39b1c123..f2734ced75f 100644
--- a/src/mongo/db/storage/kv/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/kv/storage_engine_impl.cpp
@@ -877,7 +877,7 @@ void StorageEngineImpl::TimestampMonitor::startup() {
// Take a global lock in MODE_IS while fetching timestamps to guarantee that
// rollback-to-stable isn't running concurrently.
- {
+ try {
auto opCtx = client->getOperationContext();
mongo::ServiceContext::UniqueOperationContext uOpCtx;
if (!opCtx) {
@@ -894,6 +894,9 @@ void StorageEngineImpl::TimestampMonitor::startup() {
checkpoint = _engine->getCheckpointTimestamp();
oldest = _engine->getOldestTimestamp();
stable = _engine->getStableTimestamp();
+ } catch (const ExceptionFor<ErrorCodes::InterruptedAtShutdown>&) {
+ // If we're interrupted at shutdown, it's fine to give up on future notifications
+ return;
}
Timestamp minOfCheckpointAndOldest =
@@ -922,7 +925,8 @@ void StorageEngineImpl::TimestampMonitor::startup() {
},
Seconds(1));
- _periodicRunner->scheduleJob(std::move(job));
+ _job = _periodicRunner->makeJob(std::move(job));
+ _job.start();
_running = true;
}
diff --git a/src/mongo/db/storage/kv/storage_engine_impl.h b/src/mongo/db/storage/kv/storage_engine_impl.h
index 4d060cdcd4b..6c6dd3fdf2c 100644
--- a/src/mongo/db/storage/kv/storage_engine_impl.h
+++ b/src/mongo/db/storage/kv/storage_engine_impl.h
@@ -280,6 +280,7 @@ public:
KVEngine* _engine;
bool _running;
+ PeriodicJobAnchor _job;
// The set of timestamps that were last reported to the listeners by the monitor.
MonitoredTimestamps _currentTimestamps;
diff --git a/src/mongo/db/storage/kv/storage_engine_test.cpp b/src/mongo/db/storage/kv/storage_engine_test.cpp
index d424f64f3c1..40eddbfcef3 100644
--- a/src/mongo/db/storage/kv/storage_engine_test.cpp
+++ b/src/mongo/db/storage/kv/storage_engine_test.cpp
@@ -391,10 +391,6 @@ public:
}
~TimestampKVEngineTest() {
- // Shut down the background periodic task runner, before the storage engine.
- auto runner = getServiceContext()->getPeriodicRunner();
- runner->shutdown();
-
_storageEngine->cleanShutdown();
_storageEngine.reset();
}
diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.cpp b/src/mongo/db/storage/mobile/mobile_kv_engine.cpp
index 4c8b2401683..c6ff81b12f7 100644
--- a/src/mongo/db/storage/mobile/mobile_kv_engine.cpp
+++ b/src/mongo/db/storage/mobile/mobile_kv_engine.cpp
@@ -100,7 +100,7 @@ MobileKVEngine::MobileKVEngine(const std::string& path,
maybeVacuum(client, Date_t::max());
},
Minutes(options.vacuumCheckIntervalMinutes)));
- _vacuumJob->start();
+ _vacuumJob.start();
}
}
diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.h b/src/mongo/db/storage/mobile/mobile_kv_engine.h
index 10da2d2e1a0..094bf0674d9 100644
--- a/src/mongo/db/storage/mobile/mobile_kv_engine.h
+++ b/src/mongo/db/storage/mobile/mobile_kv_engine.h
@@ -154,7 +154,7 @@ private:
std::string _path;
embedded::MobileOptions _options;
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _vacuumJob;
+ PeriodicJobAnchor _vacuumJob;
};
} // namespace mongo