summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-04-26 15:03:46 -0400
committerBen Caimano <ben.caimano@10gen.com>2019-07-25 11:22:10 -0400
commita48fd4e357551bb332003a4a31190724eff29959 (patch)
tree0e55bf6dcf654d7483c1328a65ebd5c629a3c491
parentd5ca9fc4512841881dd24dc111ebf5342c1b4713 (diff)
downloadmongo-a48fd4e357551bb332003a4a31190724eff29959.tar.gz
SERVER-39936 Use PeriodicRunner handles to simplify shutdown ordering
-rw-r--r--src/mongo/db/db.cpp18
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp1
-rw-r--r--src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp52
-rw-r--r--src/mongo/db/periodic_runner_job_abort_expired_transactions.h30
-rw-r--r--src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp52
-rw-r--r--src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h25
-rw-r--r--src/mongo/db/s/periodic_balancer_config_refresher.cpp15
-rw-r--r--src/mongo/db/s/periodic_balancer_config_refresher.h2
-rw-r--r--src/mongo/db/service_context.h10
-rw-r--r--src/mongo/db/service_context_d_test_fixture.cpp1
-rw-r--r--src/mongo/db/service_liaison_mock.cpp5
-rw-r--r--src/mongo/db/service_liaison_mongod.cpp22
-rw-r--r--src/mongo/db/service_liaison_mongod.h4
-rw-r--r--src/mongo/db/service_liaison_mongos.cpp18
-rw-r--r--src/mongo/db/service_liaison_mongos.h3
-rw-r--r--src/mongo/db/storage/flow_control.cpp3
-rw-r--r--src/mongo/db/storage/flow_control.h2
-rw-r--r--src/mongo/db/storage/kv/storage_engine_test.cpp4
-rw-r--r--src/mongo/db/storage/mobile/mobile_kv_engine.cpp2
-rw-r--r--src/mongo/db/storage/mobile/mobile_kv_engine.h2
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp8
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h1
-rw-r--r--src/mongo/dbtests/framework.cpp6
-rw-r--r--src/mongo/embedded/embedded.cpp6
-rw-r--r--src/mongo/embedded/periodic_runner_embedded.cpp99
-rw-r--r--src/mongo/embedded/periodic_runner_embedded.h41
-rw-r--r--src/mongo/s/server.cpp7
-rw-r--r--src/mongo/util/SConscript1
-rw-r--r--src/mongo/util/mock_periodic_runner_impl.h24
-rw-r--r--src/mongo/util/periodic_runner.cpp50
-rw-r--r--src/mongo/util/periodic_runner.h84
-rw-r--r--src/mongo/util/periodic_runner_impl.cpp152
-rw-r--r--src/mongo/util/periodic_runner_impl.h45
-rw-r--r--src/mongo/util/periodic_runner_impl_test.cpp87
34 files changed, 406 insertions, 476 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 755bd5997d7..55c15c26593 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -329,10 +329,9 @@ ExitCode _initAndListen(int listenPort) {
// Set up the periodic runner for background job execution. This is required to be running
// before the storage engine is initialized.
auto runner = makePeriodicRunner(serviceContext);
- runner->startup();
serviceContext->setPeriodicRunner(std::move(runner));
FlowControl::set(serviceContext,
- stdx::make_unique<FlowControl>(
+ std::make_unique<FlowControl>(
serviceContext, repl::ReplicationCoordinator::get(serviceContext)));
initializeStorageEngine(serviceContext, StorageEngineInitFlags::kNone);
@@ -617,11 +616,11 @@ ExitCode _initAndListen(int listenPort) {
// Only do this on storage engines supporting snapshot reads, which hold resources we wish to
// release periodically in order to avoid storage cache pressure build up.
if (storageEngine->supportsReadConcernSnapshot()) {
- startPeriodicThreadToAbortExpiredTransactions(serviceContext);
+ PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->start();
// The inMemory engine is not yet used for replica or sharded transactions in production so
// it does not currently maintain snapshot history. It is live in testing, however.
if (!storageEngine->isEphemeral() || getTestCommandsEnabled()) {
- startPeriodicThreadToDecreaseSnapshotHistoryCachePressure(serviceContext);
+ PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(serviceContext)->start();
}
}
@@ -939,13 +938,12 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
flowControlTicketholder->setInShutdown();
}
- // Shut down the background periodic task runner. This must be done before shutting down the
- // storage engine.
- if (auto runner = serviceContext->getPeriodicRunner()) {
- runner->shutdown();
- }
+ if (auto storageEngine = serviceContext->getStorageEngine()) {
+ if (storageEngine->supportsReadConcernSnapshot()) {
+ PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->stop();
+ PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(serviceContext)->stop();
+ }
- if (serviceContext->getStorageEngine()) {
ServiceContext::UniqueOperationContext uniqueOpCtx;
OperationContext* opCtx = client->getOperationContext();
if (!opCtx) {
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 2617d9060c4..8afd9f6889f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -41,7 +41,6 @@
#include "mongo/platform/atomic_word.h"
#include "mongo/util/duration.h"
#include "mongo/util/log.h"
-#include "mongo/util/periodic_runner.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
index f3fa239daac..0f55d053fb3 100644
--- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
+++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
@@ -58,13 +58,32 @@ Milliseconds getPeriod(const Argument& transactionLifetimeLimitSeconds) {
return period;
}
+
} // namespace
-void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext) {
- // Enforce calling this function once, and only once.
- static bool firstCall = true;
- invariant(firstCall);
- firstCall = false;
+auto PeriodicThreadToAbortExpiredTransactions::get(ServiceContext* serviceContext)
+ -> PeriodicThreadToAbortExpiredTransactions& {
+ auto& jobContainer = _serviceDecoration(serviceContext);
+ jobContainer._init(serviceContext);
+
+ return jobContainer;
+}
+
+auto PeriodicThreadToAbortExpiredTransactions::operator*() const noexcept -> PeriodicJobAnchor& {
+ stdx::lock_guard lk(_mutex);
+ return *_anchor;
+}
+
+auto PeriodicThreadToAbortExpiredTransactions::operator-> () const noexcept -> PeriodicJobAnchor* {
+ stdx::lock_guard lk(_mutex);
+ return _anchor.get();
+}
+
+void PeriodicThreadToAbortExpiredTransactions::_init(ServiceContext* serviceContext) {
+ stdx::lock_guard lk(_mutex);
+ if (_anchor) {
+ return;
+ }
auto periodicRunner = serviceContext->getPeriodicRunner();
invariant(periodicRunner);
@@ -87,18 +106,17 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex
},
getPeriod(gTransactionLifetimeLimitSeconds.load()));
- auto handle = periodicRunner->makeJob(std::move(job));
- handle->start();
-
- TransactionParticipant::observeTransactionLifetimeLimitSeconds
- .addObserver([handle = std::move(handle)](const Argument& secs) {
- try {
- handle->setPeriod(getPeriod(secs));
- } catch (const DBException& ex) {
- log() << "Failed to update period of thread which aborts expired transactions "
- << ex.toStatus();
- }
- });
+ _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
+
+ TransactionParticipant::observeTransactionLifetimeLimitSeconds.addObserver([anchor = _anchor](
+ const Argument& secs) {
+ try {
+ anchor->setPeriod(getPeriod(secs));
+ } catch (const DBException& ex) {
+ log() << "Failed to update period of thread which aborts expired transactions "
+ << ex.toStatus();
+ }
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
index 6f07e37c247..88bf08d7ee5 100644
--- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
+++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h
@@ -29,18 +29,34 @@
#pragma once
-namespace mongo {
+#include <memory>
+
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/periodic_runner.h"
-class ServiceContext;
+namespace mongo {
/**
- * Defines and starts a periodic background job to check for and abort expired transactions.
+ * Defines a periodic background job to check for and abort expired transactions.
* The job will run every (transactionLifetimeLimitSeconds/2) seconds, or at most once per second
* and at least once per minute.
- *
- * This function should only ever be called once, during mongod server startup (db.cpp).
- * The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary.
*/
-void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext);
+class PeriodicThreadToAbortExpiredTransactions {
+public:
+ static PeriodicThreadToAbortExpiredTransactions& get(ServiceContext* serviceContext);
+
+ PeriodicJobAnchor& operator*() const noexcept;
+ PeriodicJobAnchor* operator->() const noexcept;
+
+private:
+ void _init(ServiceContext* serviceContext);
+
+ inline static const auto _serviceDecoration =
+ ServiceContext::declareDecoration<PeriodicThreadToAbortExpiredTransactions>();
+
+ mutable stdx::mutex _mutex;
+ std::shared_ptr<PeriodicJobAnchor> _anchor;
+};
} // namespace mongo
diff --git a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp
index 3a5a41b5ee3..a550f9a3624 100644
--- a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp
+++ b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp
@@ -43,11 +43,30 @@
namespace mongo {
-void startPeriodicThreadToDecreaseSnapshotHistoryCachePressure(ServiceContext* serviceContext) {
- // Enforce calling this function once, and only once.
- static bool firstCall = true;
- invariant(firstCall);
- firstCall = false;
+auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(ServiceContext* serviceContext)
+ -> PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded& {
+ auto& jobContainer = _serviceDecoration(serviceContext);
+ jobContainer._init(serviceContext);
+ return jobContainer;
+}
+
+auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::operator-> () const noexcept
+ -> PeriodicJobAnchor* {
+ stdx::lock_guard lk(_mutex);
+ return _anchor.get();
+}
+
+auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::operator*() const noexcept
+ -> PeriodicJobAnchor& {
+ stdx::lock_guard lk(_mutex);
+ return *_anchor;
+}
+
+void PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::_init(ServiceContext* serviceContext) {
+ stdx::lock_guard lk(_mutex);
+ if (_anchor) {
+ return;
+ }
auto periodicRunner = serviceContext->getPeriodicRunner();
invariant(periodicRunner);
@@ -71,19 +90,18 @@ void startPeriodicThreadToDecreaseSnapshotHistoryCachePressure(ServiceContext* s
},
Seconds(snapshotWindowParams.checkCachePressurePeriodSeconds.load()));
- auto handle = periodicRunner->makeJob(std::move(job));
- handle->start();
+ _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
- SnapshotWindowParams::observeCheckCachePressurePeriodSeconds
- .addObserver([handle = std::move(handle)](const auto& secs) {
- try {
- handle->setPeriod(Seconds(secs));
- } catch (const DBException& ex) {
- log() << "Failed to update the period of the thread which decreases data history "
- "target window size if there is cache pressure."
- << ex.toStatus();
- }
- });
+ SnapshotWindowParams::observeCheckCachePressurePeriodSeconds.addObserver([anchor = _anchor](
+ const auto& secs) {
+ try {
+ anchor->setPeriod(Seconds(secs));
+ } catch (const DBException& ex) {
+ log() << "Failed to update the period of the thread which decreases data history "
+ "target window size if there have been no new SnapshotTooOld errors."
+ << ex.toStatus();
+ }
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h
index fca8b832fbf..c6a2e830a86 100644
--- a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h
+++ b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h
@@ -29,9 +29,13 @@
#pragma once
-namespace mongo {
+#include <memory>
+
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/periodic_runner.h"
-class ServiceContext;
+namespace mongo {
/**
* Periodically checks for storage engine cache pressure to determine whether the maintained
@@ -41,6 +45,21 @@ class ServiceContext;
* This function should only ever be called once, during mongod server startup (db.cpp).
* The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary.
*/
-void startPeriodicThreadToDecreaseSnapshotHistoryCachePressure(ServiceContext* serviceContext);
+class PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded {
+public:
+ static PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded& get(ServiceContext* serviceContext);
+
+ PeriodicJobAnchor* operator->() const noexcept;
+ PeriodicJobAnchor& operator*() const noexcept;
+
+private:
+ void _init(ServiceContext* serviceContext);
+
+ inline static const auto _serviceDecoration =
+ ServiceContext::declareDecoration<PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded>();
+
+ mutable stdx::mutex _mutex;
+ std::shared_ptr<PeriodicJobAnchor> _anchor;
+};
} // namespace mongo
diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.cpp b/src/mongo/db/s/periodic_balancer_config_refresher.cpp
index 87b8ad55a4d..f9cab569d89 100644
--- a/src/mongo/db/s/periodic_balancer_config_refresher.cpp
+++ b/src/mongo/db/s/periodic_balancer_config_refresher.cpp
@@ -46,8 +46,7 @@ namespace {
const auto getPeriodicBalancerConfigRefresher =
ServiceContext::declareDecoration<PeriodicBalancerConfigRefresher>();
-std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher(
- ServiceContext* serviceContext) {
+PeriodicJobAnchor launchBalancerConfigRefresher(ServiceContext* serviceContext) {
auto periodicRunner = serviceContext->getPeriodicRunner();
invariant(periodicRunner);
@@ -66,7 +65,7 @@ std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher
},
Seconds(30));
auto balancerConfigRefresher = periodicRunner->makeJob(std::move(job));
- balancerConfigRefresher->start();
+ balancerConfigRefresher.start();
return balancerConfigRefresher;
}
@@ -86,7 +85,7 @@ void PeriodicBalancerConfigRefresher::onShardingInitialization(ServiceContext* s
_isPrimary = isPrimary;
// This function is called on sharding state initialization, so go ahead
// and start up the balancer config refresher task if we're a primary.
- if (isPrimary && !_balancerConfigRefresher) {
+ if (isPrimary && !_balancerConfigRefresher.isValid()) {
_balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext);
}
}
@@ -95,12 +94,12 @@ void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) {
_isPrimary = true;
// If this is the first time we're stepping up, start a thread to periodically refresh the
// balancer configuration.
- if (!_balancerConfigRefresher) {
+ if (!_balancerConfigRefresher.isValid()) {
_balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext);
} else {
// If we're stepping up again after having stepped down, just resume
// the existing task.
- _balancerConfigRefresher->resume();
+ _balancerConfigRefresher.resume();
}
}
}
@@ -108,9 +107,9 @@ void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) {
void PeriodicBalancerConfigRefresher::onStepDown() {
if (_isPrimary) {
_isPrimary = false;
- invariant(_balancerConfigRefresher);
+ invariant(_balancerConfigRefresher.isValid());
// We don't need to be refreshing the balancer configuration unless we're primary.
- _balancerConfigRefresher->pause();
+ _balancerConfigRefresher.pause();
}
}
diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.h b/src/mongo/db/s/periodic_balancer_config_refresher.h
index 96a36202025..a331f9cfab5 100644
--- a/src/mongo/db/s/periodic_balancer_config_refresher.h
+++ b/src/mongo/db/s/periodic_balancer_config_refresher.h
@@ -80,6 +80,6 @@ private:
bool _isPrimary{false};
// Periodic job for refreshing the balancer configuration
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _balancerConfigRefresher;
+ PeriodicJobAnchor _balancerConfigRefresher;
};
} // namespace mongo
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index 8a9741bcc42..ff2a6342903 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -533,11 +533,6 @@ private:
stdx::mutex _mutex;
/**
- * The storage engine, if any.
- */
- std::unique_ptr<StorageEngine> _storageEngine;
-
- /**
* The periodic runner.
*/
std::unique_ptr<PeriodicRunner> _runner;
@@ -558,6 +553,11 @@ private:
std::unique_ptr<transport::ServiceExecutor> _serviceExecutor;
/**
+ * The storage engine, if any.
+ */
+ std::unique_ptr<StorageEngine> _storageEngine;
+
+ /**
* Vector of registered observers.
*/
std::vector<ClientObserverHolder> _clientObservers;
diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp
index 873e558082e..992e8a86967 100644
--- a/src/mongo/db/service_context_d_test_fixture.cpp
+++ b/src/mongo/db/service_context_d_test_fixture.cpp
@@ -73,7 +73,6 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine, RepairAct
// Set up the periodic runner to allow background job execution for tests that require it.
auto runner = makePeriodicRunner(getServiceContext());
- runner->startup();
getServiceContext()->setPeriodicRunner(std::move(runner));
storageGlobalParams.dbpath = _tempDir.path();
diff --git a/src/mongo/db/service_liaison_mock.cpp b/src/mongo/db/service_liaison_mock.cpp
index ef8f5d49e2e..10fd0322d8e 100644
--- a/src/mongo/db/service_liaison_mock.cpp
+++ b/src/mongo/db/service_liaison_mock.cpp
@@ -40,7 +40,6 @@ namespace mongo {
MockServiceLiaisonImpl::MockServiceLiaisonImpl() {
_timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>();
_runner = makePeriodicRunner(getGlobalServiceContext());
- _runner->startup();
}
LogicalSessionIdSet MockServiceLiaisonImpl::getActiveOpSessions() const {
@@ -53,9 +52,7 @@ LogicalSessionIdSet MockServiceLiaisonImpl::getOpenCursorSessions(OperationConte
return _cursorSessions;
}
-void MockServiceLiaisonImpl::join() {
- _runner->shutdown();
-}
+void MockServiceLiaisonImpl::join() {}
Date_t MockServiceLiaisonImpl::now() const {
return _timerFactory->now();
diff --git a/src/mongo/db/service_liaison_mongod.cpp b/src/mongo/db/service_liaison_mongod.cpp
index 1008267bf19..94e1fbd9217 100644
--- a/src/mongo/db/service_liaison_mongod.cpp
+++ b/src/mongo/db/service_liaison_mongod.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
#include "mongo/platform/basic.h"
#include "mongo/db/service_liaison_mongod.h"
@@ -37,6 +39,7 @@
#include "mongo/db/service_context.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -73,17 +76,20 @@ LogicalSessionIdSet ServiceLiaisonMongod::getOpenCursorSessions(OperationContext
void ServiceLiaisonMongod::scheduleJob(PeriodicRunner::PeriodicJob job) {
invariant(hasGlobalServiceContext());
- auto jobHandle = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
- jobHandle->start();
- _jobs.push_back(std::move(jobHandle));
+ auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
+ jobAnchor.start();
+
+ {
+ stdx::lock_guard lk(_mutex);
+ _jobs.push_back(std::move(jobAnchor));
+ }
}
void ServiceLiaisonMongod::join() {
- invariant(hasGlobalServiceContext());
- for (auto&& jobHandle : _jobs) {
- jobHandle->stop();
- }
- _jobs.clear();
+ auto jobs = [&] {
+ stdx::lock_guard lk(_mutex);
+ return std::exchange(_jobs, {});
+ }();
}
Date_t ServiceLiaisonMongod::now() const {
diff --git a/src/mongo/db/service_liaison_mongod.h b/src/mongo/db/service_liaison_mongod.h
index c334eb5f36c..b1060425f6f 100644
--- a/src/mongo/db/service_liaison_mongod.h
+++ b/src/mongo/db/service_liaison_mongod.h
@@ -68,7 +68,9 @@ protected:
* Returns the service context.
*/
ServiceContext* _context() override;
- std::vector<std::unique_ptr<PeriodicRunner::PeriodicJobHandle>> _jobs;
+
+ stdx::mutex _mutex;
+ std::vector<PeriodicJobAnchor> _jobs;
};
} // namespace mongo
diff --git a/src/mongo/db/service_liaison_mongos.cpp b/src/mongo/db/service_liaison_mongos.cpp
index 7da5e508109..666ca06ea68 100644
--- a/src/mongo/db/service_liaison_mongos.cpp
+++ b/src/mongo/db/service_liaison_mongos.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
#include "mongo/platform/basic.h"
#include "mongo/db/service_liaison_mongos.h"
@@ -36,7 +38,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/clock_source.h"
-#include "mongo/util/periodic_runner.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -79,12 +81,20 @@ LogicalSessionIdSet ServiceLiaisonMongos::getOpenCursorSessions(OperationContext
void ServiceLiaisonMongos::scheduleJob(PeriodicRunner::PeriodicJob job) {
invariant(hasGlobalServiceContext());
- getGlobalServiceContext()->getPeriodicRunner()->scheduleJob(std::move(job));
+ auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job));
+ jobAnchor.start();
+
+ {
+ stdx::lock_guard lk(_mutex);
+ _jobs.push_back(std::move(jobAnchor));
+ }
}
void ServiceLiaisonMongos::join() {
- invariant(hasGlobalServiceContext());
- getGlobalServiceContext()->getPeriodicRunner()->shutdown();
+ auto jobs = [&] {
+ stdx::lock_guard lk(_mutex);
+ return std::exchange(_jobs, {});
+ }();
}
Date_t ServiceLiaisonMongos::now() const {
diff --git a/src/mongo/db/service_liaison_mongos.h b/src/mongo/db/service_liaison_mongos.h
index 580dc3cf00e..ab40801557d 100644
--- a/src/mongo/db/service_liaison_mongos.h
+++ b/src/mongo/db/service_liaison_mongos.h
@@ -68,6 +68,9 @@ protected:
* Returns the service context.
*/
ServiceContext* _context() override;
+
+ stdx::mutex _mutex;
+ std::vector<PeriodicJobAnchor> _jobs;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/flow_control.cpp b/src/mongo/db/storage/flow_control.cpp
index e4ba6ad4b03..edf1d10ee1b 100644
--- a/src/mongo/db/storage/flow_control.cpp
+++ b/src/mongo/db/storage/flow_control.cpp
@@ -134,12 +134,13 @@ FlowControl::FlowControl(ServiceContext* service, repl::ReplicationCoordinator*
// cause a slow start on start up.
FlowControlTicketholder::set(service, stdx::make_unique<FlowControlTicketholder>(_kMaxTickets));
- service->getPeriodicRunner()->scheduleJob(
+ _jobAnchor = service->getPeriodicRunner()->makeJob(
{"FlowControlRefresher",
[this](Client* client) {
FlowControlTicketholder::get(client->getServiceContext())->refreshTo(getNumTickets());
},
Seconds(1)});
+ _jobAnchor.start();
}
FlowControl* FlowControl::get(ServiceContext* service) {
diff --git a/src/mongo/db/storage/flow_control.h b/src/mongo/db/storage/flow_control.h
index e3905a3728b..64f0d0b1d00 100644
--- a/src/mongo/db/storage/flow_control.h
+++ b/src/mongo/db/storage/flow_control.h
@@ -141,6 +141,8 @@ private:
// This value is used for calculating server status metrics.
std::uint64_t _startWaitTime = 0;
+
+ PeriodicJobAnchor _jobAnchor;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/kv/storage_engine_test.cpp b/src/mongo/db/storage/kv/storage_engine_test.cpp
index 6890f89f0a2..25f4fd1d106 100644
--- a/src/mongo/db/storage/kv/storage_engine_test.cpp
+++ b/src/mongo/db/storage/kv/storage_engine_test.cpp
@@ -390,10 +390,6 @@ public:
}
~TimestampKVEngineTest() {
- // Shut down the background periodic task runner, before the storage engine.
- auto runner = getServiceContext()->getPeriodicRunner();
- runner->shutdown();
-
_storageEngine->cleanShutdown();
_storageEngine.reset();
}
diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.cpp b/src/mongo/db/storage/mobile/mobile_kv_engine.cpp
index 7169ab230d2..89eaad0568b 100644
--- a/src/mongo/db/storage/mobile/mobile_kv_engine.cpp
+++ b/src/mongo/db/storage/mobile/mobile_kv_engine.cpp
@@ -101,7 +101,7 @@ MobileKVEngine::MobileKVEngine(const std::string& path,
maybeVacuum(client, Date_t::max());
},
Minutes(options.vacuumCheckIntervalMinutes)));
- _vacuumJob->start();
+ _vacuumJob.start();
}
}
diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.h b/src/mongo/db/storage/mobile/mobile_kv_engine.h
index aa084674322..025c96841c7 100644
--- a/src/mongo/db/storage/mobile/mobile_kv_engine.h
+++ b/src/mongo/db/storage/mobile/mobile_kv_engine.h
@@ -156,7 +156,7 @@ private:
std::string _path;
embedded::MobileOptions _options;
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _vacuumJob;
+ PeriodicJobAnchor _vacuumJob;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index 812981c7b56..35131d7be1c 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -891,7 +891,7 @@ void StorageEngineImpl::TimestampMonitor::startup() {
// Take a global lock in MODE_IS while fetching timestamps to guarantee that
// rollback-to-stable isn't running concurrently.
- {
+ try {
auto opCtx = client->getOperationContext();
mongo::ServiceContext::UniqueOperationContext uOpCtx;
if (!opCtx) {
@@ -908,6 +908,9 @@ void StorageEngineImpl::TimestampMonitor::startup() {
checkpoint = _engine->getCheckpointTimestamp();
oldest = _engine->getOldestTimestamp();
stable = _engine->getStableTimestamp();
+ } catch (const ExceptionFor<ErrorCodes::InterruptedAtShutdown>&) {
+ // If we're interrupted at shutdown, it's fine to give up on future notifications
+ return;
}
Timestamp minOfCheckpointAndOldest =
@@ -936,7 +939,8 @@ void StorageEngineImpl::TimestampMonitor::startup() {
},
Seconds(1));
- _periodicRunner->scheduleJob(std::move(job));
+ _job = _periodicRunner->makeJob(std::move(job));
+ _job.start();
_running = true;
}
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index 2236e8444f2..52a137c1225 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -280,6 +280,7 @@ public:
KVEngine* _engine;
bool _running;
+ PeriodicJobAnchor _job;
// The set of timestamps that were last reported to the listeners by the monitor.
MonitoredTimestamps _currentTimestamps;
diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp
index 69760dd428e..a426df6a2a9 100644
--- a/src/mongo/dbtests/framework.cpp
+++ b/src/mongo/dbtests/framework.cpp
@@ -75,11 +75,6 @@ int runDbTests(int argc, char** argv) {
// the memory and makes leak sanitizer happy.
ScriptEngine::dropScopeCache();
- // Shut down the background periodic task runner, before the storage engine.
- if (auto runner = getGlobalServiceContext()->getPeriodicRunner()) {
- runner->shutdown();
- }
-
// We may be shut down before we have a global storage
// engine.
if (!getGlobalServiceContext()->getStorageEngine())
@@ -103,7 +98,6 @@ int runDbTests(int argc, char** argv) {
// Set up the periodic runner for background job execution, which is required by the storage
// engine to be running beforehand.
auto runner = makePeriodicRunner(globalServiceContext);
- runner->startup();
globalServiceContext->setPeriodicRunner(std::move(runner));
initializeStorageEngine(globalServiceContext, StorageEngineInitFlags::kNone);
diff --git a/src/mongo/embedded/embedded.cpp b/src/mongo/embedded/embedded.cpp
index db941660254..a405cbc2185 100644
--- a/src/mongo/embedded/embedded.cpp
+++ b/src/mongo/embedded/embedded.cpp
@@ -169,11 +169,6 @@ void shutdown(ServiceContext* srvContext) {
LogicalSessionCache::set(serviceContext, nullptr);
- // Shut down the background periodic task runner, before the storage engine.
- if (auto runner = serviceContext->getPeriodicRunner()) {
- runner->shutdown();
- }
-
repl::ReplicationCoordinator::get(serviceContext)->shutdown(shutdownOpCtx.get());
IndexBuildsCoordinator::get(serviceContext)->shutdown();
@@ -236,7 +231,6 @@ ServiceContext* initialize(const char* yaml_config) {
// The periodic runner is required by the storage engine to be running beforehand.
auto periodicRunner = std::make_unique<PeriodicRunnerEmbedded>(
serviceContext, serviceContext->getPreciseClockSource());
- periodicRunner->startup();
serviceContext->setPeriodicRunner(std::move(periodicRunner));
setUpCatalog(serviceContext);
diff --git a/src/mongo/embedded/periodic_runner_embedded.cpp b/src/mongo/embedded/periodic_runner_embedded.cpp
index bb0c17c40a3..41ae8a49e93 100644
--- a/src/mongo/embedded/periodic_runner_embedded.cpp
+++ b/src/mongo/embedded/periodic_runner_embedded.cpp
@@ -37,21 +37,6 @@
#include "mongo/util/scopeguard.h"
namespace mongo {
-namespace {
-
-template <typename T>
-std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) {
- if (auto p = ptr.lock()) {
- return p;
- } else {
- uasserted(ErrorCodes::InternalError, errMsg);
- }
-}
-
-constexpr auto kPeriodicJobHandleLifetimeErrMsg =
- "The PeriodicRunner job for this handle no longer exists"_sd;
-
-} // namespace
struct PeriodicRunnerEmbedded::PeriodicJobSorter {
bool operator()(std::shared_ptr<PeriodicJobImpl> const& lhs,
@@ -64,59 +49,13 @@ struct PeriodicRunnerEmbedded::PeriodicJobSorter {
PeriodicRunnerEmbedded::PeriodicRunnerEmbedded(ServiceContext* svc, ClockSource* clockSource)
: _svc(svc), _clockSource(clockSource) {}
-PeriodicRunnerEmbedded::~PeriodicRunnerEmbedded() {
- PeriodicRunnerEmbedded::shutdown();
-}
-
-std::shared_ptr<PeriodicRunnerEmbedded::PeriodicJobImpl> PeriodicRunnerEmbedded::createAndAddJob(
- PeriodicJob job, bool shouldStart) {
+auto PeriodicRunnerEmbedded::makeJob(PeriodicJob job) -> JobAnchor {
auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this);
stdx::lock_guard<stdx::mutex> lk(_mutex);
_jobs.push_back(impl);
std::push_heap(_jobs.begin(), _jobs.end(), PeriodicJobSorter());
- if (shouldStart && _running)
- impl->start();
- return impl;
-}
-
-std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerEmbedded::makeJob(
- PeriodicJob job) {
- return std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(std::move(job), false));
-}
-
-void PeriodicRunnerEmbedded::scheduleJob(PeriodicJob job) {
- createAndAddJob(std::move(job), true);
-}
-
-void PeriodicRunnerEmbedded::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_running) {
- return;
- }
-
- _running = true;
-
- // schedule any jobs that we have
- for (auto& job : _jobs) {
- job->start();
- }
-}
-
-void PeriodicRunnerEmbedded::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_running) {
- _running = false;
-
- for (auto& job : _jobs) {
- stdx::lock_guard<stdx::mutex> jobLock(job->_mutex);
- if (job->isAlive(jobLock)) {
- job->_stopWithMasterAndJobLock(lk, jobLock);
- }
- }
- _jobs.clear();
- }
+ return JobAnchor(impl);
}
bool PeriodicRunnerEmbedded::tryPump() {
@@ -225,7 +164,9 @@ void PeriodicRunnerEmbedded::PeriodicJobImpl::stop() {
// sure the user can invalidate it after this call.
stdx::lock_guard<stdx::mutex> masterLock(_periodicRunner->_mutex);
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _stopWithMasterAndJobLock(masterLock, lk);
+ if (isAlive(lk)) {
+ _stopWithMasterAndJobLock(masterLock, lk);
+ }
}
Milliseconds PeriodicRunnerEmbedded::PeriodicJobImpl::getPeriod() {
@@ -255,34 +196,4 @@ bool PeriodicRunnerEmbedded::PeriodicJobImpl::isAlive(WithLock lk) {
return _execStatus == ExecutionStatus::kRunning || _execStatus == ExecutionStatus::kPaused;
}
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::start() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->start();
-}
-
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::stop() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->stop();
-}
-
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::pause() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->pause();
-}
-
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::resume() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->resume();
-}
-
-Milliseconds PeriodicRunnerEmbedded::PeriodicJobHandleImpl::getPeriod() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- return job->getPeriod();
-}
-
-void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::setPeriod(Milliseconds ms) {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->setPeriod(ms);
-}
-
} // namespace mongo
diff --git a/src/mongo/embedded/periodic_runner_embedded.h b/src/mongo/embedded/periodic_runner_embedded.h
index a1e56079208..6d82c50db44 100644
--- a/src/mongo/embedded/periodic_runner_embedded.h
+++ b/src/mongo/embedded/periodic_runner_embedded.h
@@ -47,21 +47,15 @@ namespace mongo {
class PeriodicRunnerEmbedded : public PeriodicRunner {
public:
PeriodicRunnerEmbedded(ServiceContext* svc, ClockSource* clockSource);
- ~PeriodicRunnerEmbedded();
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override;
- void scheduleJob(PeriodicJob job) override;
-
- void startup() override;
-
- void shutdown() override;
+ JobAnchor makeJob(PeriodicJob job) override;
// Safe to call from multiple threads but will only execute on one thread at a time.
// Returns true if it attempted to run any jobs.
bool tryPump();
private:
- class PeriodicJobImpl {
+ class PeriodicJobImpl : public ControllableJob {
PeriodicJobImpl(const PeriodicJobImpl&) = delete;
PeriodicJobImpl& operator=(const PeriodicJobImpl&) = delete;
@@ -69,12 +63,12 @@ private:
friend class PeriodicRunnerEmbedded;
PeriodicJobImpl(PeriodicJob job, ClockSource* source, PeriodicRunnerEmbedded* runner);
- void start();
- void pause();
- void resume();
- void stop();
- Milliseconds getPeriod();
- void setPeriod(Milliseconds ms);
+ void start() override;
+ void pause() override;
+ void resume() override;
+ void stop() override;
+ Milliseconds getPeriod() override;
+ void setPeriod(Milliseconds ms) override;
bool isAlive(WithLock lk);
@@ -101,24 +95,6 @@ private:
};
struct PeriodicJobSorter;
- std::shared_ptr<PeriodicRunnerEmbedded::PeriodicJobImpl> createAndAddJob(PeriodicJob job,
- bool shouldStart);
-
- class PeriodicJobHandleImpl : public PeriodicJobHandle {
- public:
- explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl)
- : _jobWeak(jobImpl) {}
- void start() override;
- void stop() override;
- void pause() override;
- void resume() override;
- Milliseconds getPeriod() override;
- void setPeriod(Milliseconds ms) override;
-
- private:
- std::weak_ptr<PeriodicJobImpl> _jobWeak;
- };
-
ServiceContext* _svc;
ClockSource* _clockSource;
@@ -127,7 +103,6 @@ private:
std::vector<std::shared_ptr<PeriodicJobImpl>> _Pausedjobs;
stdx::mutex _mutex;
- bool _running = false;
};
} // namespace mongo
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index df41a09ec8d..782620dc454 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -280,12 +280,6 @@ void cleanupTask(ServiceContext* serviceContext) {
if (serviceContext) {
serviceContext->setKillAllOperations();
-
- // Shut down the background periodic task runner.
- auto runner = serviceContext->getPeriodicRunner();
- if (runner) {
- runner->shutdown();
- }
}
// Perform all shutdown operations after setKillAllOperations is called in order to ensure
@@ -564,7 +558,6 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
// Set up the periodic runner for background job execution
{
auto runner = makePeriodicRunner(serviceContext);
- runner->startup();
serviceContext->setPeriodicRunner(std::move(runner));
}
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index 59007e2c9f4..757dec8da59 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -324,6 +324,7 @@ env.Library(
"periodic_runner.cpp",
],
LIBDEPS=[
+ "$BUILD_DIR/mongo/base",
],
)
diff --git a/src/mongo/util/mock_periodic_runner_impl.h b/src/mongo/util/mock_periodic_runner_impl.h
index 207fefe904d..361de33a4cd 100644
--- a/src/mongo/util/mock_periodic_runner_impl.h
+++ b/src/mongo/util/mock_periodic_runner_impl.h
@@ -40,25 +40,23 @@ namespace mongo {
*/
class MockPeriodicRunnerImpl final : public PeriodicRunner {
public:
- class MockPeriodicJobHandleImpl final : public PeriodicRunner::PeriodicJobHandle {
+ class Job final : public ControllableJob {
public:
- ~MockPeriodicJobHandleImpl() = default;
-
- void start() override{};
- void stop() override{};
- void pause() override{};
- void resume() override{};
+ ~Job() = default;
+
+ void start() final{};
+ void stop() final{};
+ void pause() final{};
+ void resume() final{};
+ Milliseconds getPeriod() final{};
+ void setPeriod(Milliseconds ms) final{};
};
~MockPeriodicRunnerImpl() = default;
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicRunner::PeriodicJob job) {
- return std::make_unique<MockPeriodicJobHandleImpl>();
+ PeriodicAnchor makeJob(PeriodicJob job) final {
+ return std::weak_ptr<Job>{};
}
-
- void scheduleJob(PeriodicRunner::PeriodicJob job) {}
- void startup() {}
- void shutdown() {}
};
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner.cpp b/src/mongo/util/periodic_runner.cpp
index dc54356142c..ed7069d4183 100644
--- a/src/mongo/util/periodic_runner.cpp
+++ b/src/mongo/util/periodic_runner.cpp
@@ -31,8 +31,58 @@
#include "mongo/util/periodic_runner.h"
+#include "mongo/util/assert_util.h"
+
namespace mongo {
PeriodicRunner::~PeriodicRunner() = default;
+PeriodicJobAnchor::PeriodicJobAnchor(std::shared_ptr<Job> handle) : _handle{std::move(handle)} {}
+
+PeriodicJobAnchor::~PeriodicJobAnchor() {
+ if (!_handle) {
+ return;
+ }
+
+ _handle->stop();
+}
+
+void PeriodicJobAnchor::start() {
+ invariant(_handle);
+ _handle->start();
+}
+
+void PeriodicJobAnchor::pause() {
+ invariant(_handle);
+ _handle->pause();
+}
+
+void PeriodicJobAnchor::resume() {
+ invariant(_handle);
+ _handle->resume();
+}
+
+void PeriodicJobAnchor::stop() {
+ invariant(_handle);
+ _handle->stop();
+}
+
+void PeriodicJobAnchor::setPeriod(Milliseconds ms) {
+ invariant(_handle);
+ _handle->setPeriod(ms);
+}
+
+Milliseconds PeriodicJobAnchor::getPeriod() {
+ invariant(_handle);
+ return _handle->getPeriod();
+}
+
+void PeriodicJobAnchor::detach() {
+ _handle.reset();
+}
+
+bool PeriodicJobAnchor::isValid() const noexcept {
+ return static_cast<bool>(_handle);
+}
+
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h
index a474fb0d6ad..93a03498357 100644
--- a/src/mongo/util/periodic_runner.h
+++ b/src/mongo/util/periodic_runner.h
@@ -29,14 +29,19 @@
#pragma once
+#include <functional>
+#include <memory>
#include <string>
-#include "mongo/stdx/functional.h"
+#include <boost/optional.hpp>
+
+#include "mongo/stdx/mutex.h"
#include "mongo/util/time_support.h"
namespace mongo {
class Client;
+class PeriodicJobAnchor;
/**
* An interface for objects that run work items at specified intervals. Each individually scheduled
@@ -51,7 +56,8 @@ class Client;
*/
class PeriodicRunner {
public:
- using Job = stdx::function<void(Client* client)>;
+ using Job = std::function<void(Client* client)>;
+ using JobAnchor = PeriodicJobAnchor;
struct PeriodicJob {
PeriodicJob(std::string name, Job callable, Milliseconds period)
@@ -73,9 +79,12 @@ public:
Milliseconds interval;
};
- class PeriodicJobHandle {
+ /**
+ * A ControllableJob allows a user to reschedule the execution of a Job
+ */
+ class ControllableJob {
public:
- virtual ~PeriodicJobHandle() = default;
+ virtual ~ControllableJob() = default;
/**
* Starts running the job
@@ -115,39 +124,70 @@ public:
virtual ~PeriodicRunner();
-
/**
* Creates a new job and adds it to the runner, but does not schedule it.
* The caller is responsible for calling 'start' on the resulting handle in
* order to begin the job running. This API should be used when the caller
* is interested in observing and controlling the job execution state.
*/
- virtual std::unique_ptr<PeriodicJobHandle> makeJob(PeriodicJob job) = 0;
+ virtual JobAnchor makeJob(PeriodicJob job) = 0;
+};
- /**
- * Schedules a job to be run at periodic intervals.
- *
- * If the runner is not running when a job is scheduled, that job should
- * be saved so that it may run in the future once startup() is called.
- */
- virtual void scheduleJob(PeriodicJob job) = 0;
+/**
+ * A PeriodicJobAnchor allows the holder to control the scheduling of a job for the lifetime of the
+ * anchor. When an anchor is destructed, it stops its underlying job.
+ *
+ * The underlying weak_ptr for this class is not synchronized. In essence, treat use of this class
+ * as if it were a raw pointer to a ControllableJob.
+ *
+ * Each wrapped PeriodicRunner::ControllableJob function on this object throws
+ * if the underlying job is gone (e.g. in shutdown).
+ */
+class[[nodiscard]] PeriodicJobAnchor {
+public:
+ using Job = PeriodicRunner::ControllableJob;
+
+public:
+ // Note that this constructor is only intended for use with PeriodicRunner::makeJob()
+ explicit PeriodicJobAnchor(std::shared_ptr<Job> handle);
+
+ PeriodicJobAnchor() = default;
+ PeriodicJobAnchor(PeriodicJobAnchor &&) = default;
+ PeriodicJobAnchor& operator=(PeriodicJobAnchor&&) = default;
+
+ PeriodicJobAnchor(const PeriodicJobAnchor&) = delete;
+ PeriodicJobAnchor& operator=(const PeriodicJobAnchor&) = delete;
+
+ ~PeriodicJobAnchor();
+
+ void start();
+ void pause();
+ void resume();
+ void stop();
+ void setPeriod(Milliseconds ms);
+ Milliseconds getPeriod();
/**
- * Starts up this periodic runner.
+ * Abandon responsibility for scheduling the execution of this job
*
- * This method may safely be called multiple times, either with or without
- * calls to shutdown() in between.
+ * This effectively invalidates the anchor.
*/
- virtual void startup() = 0;
+ void detach();
/**
- * Shuts down this periodic runner. Stops all jobs from running.
+ * Returns if this PeriodicJobAnchor is associated with a PeriodicRunner::ControllableJob
*
- * This method may safely be called multiple times, either with or without
- * calls to startup() in between. Any jobs that have been scheduled on this
- * runner should no longer execute once shutdown() is called.
+ * This function is useful to see if a PeriodicJobAnchor is initialized. It does not necessarily
+ * inform whether a PeriodicJobAnchor will throw from a control function above.
*/
- virtual void shutdown() = 0;
+ bool isValid() const noexcept;
+
+ explicit operator bool() const noexcept {
+ return isValid();
+ }
+
+private:
+ std::shared_ptr<Job> _handle;
};
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp
index 8ffc8839bb8..fc21a7184a4 100644
--- a/src/mongo/util/periodic_runner_impl.cpp
+++ b/src/mongo/util/periodic_runner_impl.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
#include "mongo/platform/basic.h"
#include "mongo/util/periodic_runner_impl.h"
@@ -34,6 +36,7 @@
#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -41,71 +44,41 @@ namespace mongo {
PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource)
: _svc(svc), _clockSource(clockSource) {}
-PeriodicRunnerImpl::~PeriodicRunnerImpl() {
- PeriodicRunnerImpl::shutdown();
-}
-
-std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> PeriodicRunnerImpl::createAndAddJob(
- PeriodicJob job) {
+auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor {
auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this->_svc);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _jobs.push_back(impl);
- return impl;
-}
-
-std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerImpl::makeJob(PeriodicJob job) {
- auto handle = std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(job));
- return std::move(handle);
+ JobAnchor anchor(std::move(impl));
+ return anchor;
}
-void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) {
- auto impl = createAndAddJob(job);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_running) {
- impl->start();
- }
-}
+PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
+ ClockSource* source,
+ ServiceContext* svc)
+ : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {}
-void PeriodicRunnerImpl::startup() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
+ auto[startPromise, startFuture] = makePromiseFuture<void>();
- if (_running) {
- return;
+ {
+ stdx::lock_guard lk(_mutex);
+ invariant(_execStatus == ExecutionStatus::NOT_SCHEDULED);
}
- _running = true;
- // schedule any jobs that we have
- for (auto& job : _jobs) {
- job->start();
- }
-}
+ _thread = stdx::thread([ this, startPromise = std::move(startPromise) ]() mutable {
+ auto guard = makeGuard([this] { _stopPromise.emplaceValue(); });
-void PeriodicRunnerImpl::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_running) {
- _running = false;
+ Client::initThread(_job.name, _serviceContext, nullptr);
- for (auto& job : _jobs) {
- job->stop();
+ // Let start() know we're running
+ {
+ stdx::lock_guard lk(_mutex);
+ _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
}
- _jobs.clear();
- }
-}
+ startPromise.emplaceValue();
-PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
- ClockSource* source,
- ServiceContext* svc)
- : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {}
-
-void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::NOT_SCHEDULED);
- _thread = stdx::thread([this] {
- Client::initThread(_job.name, _serviceContext, nullptr);
- while (true) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock lk(_mutex);
+ while (_execStatus != ExecutionStatus::CANCELED) {
// Wait until it's unpaused or canceled
_condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
if (_execStatus == ExecutionStatus::CANCELED) {
@@ -135,10 +108,14 @@ void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
} while (_clockSource->now() < getDeadlineFromInterval());
}
});
- _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
+
+ // Wait for the thread to actually start
+ startFuture.get();
}
void PeriodicRunnerImpl::PeriodicJobImpl::start() {
+ LOG(2) << "Starting periodic job " << _job.name;
+
_run();
}
@@ -158,17 +135,26 @@ void PeriodicRunnerImpl::PeriodicJobImpl::resume() {
}
void PeriodicRunnerImpl::PeriodicJobImpl::stop() {
- {
+ auto lastExecStatus = [&] {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_execStatus != ExecutionStatus::RUNNING && _execStatus != ExecutionStatus::PAUSED)
- return;
- invariant(_thread.joinable());
+ return std::exchange(_execStatus, ExecutionStatus::CANCELED);
+ }();
- _execStatus = PeriodicJobImpl::ExecutionStatus::CANCELED;
+ // If we never started, then nobody should wait
+ if (lastExecStatus == ExecutionStatus::NOT_SCHEDULED) {
+ return;
}
- _condvar.notify_one();
- _thread.join();
+
+ // Only join once
+ if (lastExecStatus != ExecutionStatus::CANCELED) {
+ LOG(2) << "Stopping periodic job " << _job.name;
+
+ _condvar.notify_one();
+ _thread.join();
+ }
+
+ _stopPromise.getFuture().get();
}
Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() {
@@ -185,50 +171,4 @@ void PeriodicRunnerImpl::PeriodicJobImpl::setPeriod(Milliseconds ms) {
}
}
-namespace {
-
-template <typename T>
-std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) {
- if (auto p = ptr.lock()) {
- return p;
- } else {
- uasserted(ErrorCodes::InternalError, errMsg);
- }
-}
-
-constexpr auto kPeriodicJobHandleLifetimeErrMsg =
- "The PeriodicRunner job for this handle no longer exists"_sd;
-
-} // namespace
-
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::start() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->start();
-}
-
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::stop() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->stop();
-}
-
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::pause() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->pause();
-}
-
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::resume() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->resume();
-}
-
-Milliseconds PeriodicRunnerImpl::PeriodicJobHandleImpl::getPeriod() {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- return job->getPeriod();
-}
-
-void PeriodicRunnerImpl::PeriodicJobHandleImpl::setPeriod(Milliseconds ms) {
- auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
- job->setPeriod(ms);
-}
-
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h
index eb6663d708b..a921a66c59f 100644
--- a/src/mongo/util/periodic_runner_impl.h
+++ b/src/mongo/util/periodic_runner_impl.h
@@ -36,6 +36,7 @@
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/future.h"
#include "mongo/util/periodic_runner.h"
namespace mongo {
@@ -50,17 +51,11 @@ class ServiceContext;
class PeriodicRunnerImpl : public PeriodicRunner {
public:
PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource);
- ~PeriodicRunnerImpl();
- std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override;
- void scheduleJob(PeriodicJob job) override;
-
- void startup() override;
-
- void shutdown() override;
+ JobAnchor makeJob(PeriodicJob job) override;
private:
- class PeriodicJobImpl {
+ class PeriodicJobImpl : public ControllableJob {
PeriodicJobImpl(const PeriodicJobImpl&) = delete;
PeriodicJobImpl& operator=(const PeriodicJobImpl&) = delete;
@@ -68,12 +63,12 @@ private:
friend class PeriodicRunnerImpl;
PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc);
- void start();
- void pause();
- void resume();
- void stop();
- Milliseconds getPeriod();
- void setPeriod(Milliseconds ms);
+ void start() override;
+ void pause() override;
+ void resume() override;
+ void stop() override;
+ Milliseconds getPeriod() override;
+ void setPeriod(Milliseconds ms) override;
enum class ExecutionStatus { NOT_SCHEDULED, RUNNING, PAUSED, CANCELED };
@@ -83,7 +78,9 @@ private:
PeriodicJob _job;
ClockSource* _clockSource;
ServiceContext* _serviceContext;
+
stdx::thread _thread;
+ SharedPromise<void> _stopPromise;
stdx::mutex _mutex;
stdx::condition_variable _condvar;
@@ -95,28 +92,8 @@ private:
std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> createAndAddJob(PeriodicJob job);
- class PeriodicJobHandleImpl : public PeriodicJobHandle {
- public:
- explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl)
- : _jobWeak(jobImpl) {}
- void start() override;
- void stop() override;
- void pause() override;
- void resume() override;
- Milliseconds getPeriod() override;
- void setPeriod(Milliseconds ms) override;
-
- private:
- std::weak_ptr<PeriodicJobImpl> _jobWeak;
- };
-
ServiceContext* _svc;
ClockSource* _clockSource;
-
- std::vector<std::shared_ptr<PeriodicJobImpl>> _jobs;
-
- stdx::mutex _mutex;
- bool _running = false;
};
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp
index 58c4b568adc..8d74b1f4fe1 100644
--- a/src/mongo/util/periodic_runner_impl_test.cpp
+++ b/src/mongo/util/periodic_runner_impl_test.cpp
@@ -51,10 +51,6 @@ public:
_runner = stdx::make_unique<PeriodicRunnerImpl>(getServiceContext(), _clockSource.get());
}
- void tearDown() override {
- _runner->shutdown();
- }
-
ClockSourceMock& clockSource() {
return *_clockSource;
}
@@ -72,7 +68,6 @@ class PeriodicRunnerImplTest : public PeriodicRunnerImplTestNoSetup {
public:
void setUp() override {
PeriodicRunnerImplTestNoSetup::setUp();
- runner().startup();
}
};
@@ -94,7 +89,8 @@ TEST_F(PeriodicRunnerImplTest, OneJobTest) {
},
interval);
- runner().scheduleJob(std::move(job));
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Fast forward ten times, we should run all ten times.
for (int i = 0; i < 10; i++) {
@@ -126,7 +122,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobDoesNotRunWithoutStart) {
},
interval);
- auto handle = runner().makeJob(std::move(job));
+ auto jobAnchor = runner().makeJob(std::move(job));
clockSource().advance(interval);
ASSERT_EQ(count, 0);
@@ -151,8 +147,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobRunsCorrectlyWithStart) {
},
interval);
- auto handle = runner().makeJob(std::move(job));
- handle->start();
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Fast forward ten times, we should run all ten times.
for (int i = 0; i < 10; i++) {
{
@@ -186,8 +182,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) {
},
interval);
- auto handle = runner().makeJob(std::move(job));
- handle->start();
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Wait for the first execution.
{
stdx::unique_lock<stdx::mutex> lk(mutex);
@@ -197,7 +193,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) {
{
stdx::unique_lock<stdx::mutex> lk(mutex);
isPaused = true;
- handle->pause();
+ jobAnchor.pause();
}
// Fast forward ten times, we shouldn't run anymore. If we do, the assert inside the job will
@@ -228,8 +224,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
},
interval);
- auto handle = runner().makeJob(std::move(job));
- handle->start();
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Wait for the first execution.
{
stdx::unique_lock<stdx::mutex> lk(mutex);
@@ -252,7 +248,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
}
}
- handle->pause();
+ jobAnchor.pause();
// Fast forward ten times, we shouldn't run anymore.
for (int i = 0; i < 10; i++) {
@@ -262,7 +258,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
// Make sure we didn't run anymore while paused.
ASSERT_EQ(count, numIterationsBeforePause);
- handle->resume();
+ jobAnchor.resume();
// Fast forward, we should run at least once.
clockSource().advance(interval);
@@ -275,39 +271,6 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
tearDown();
}
-TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) {
- int count = 0;
- Milliseconds interval{5};
-
- stdx::mutex mutex;
- stdx::condition_variable cv;
-
- // Schedule a job before startup
- PeriodicRunner::PeriodicJob job("job",
- [&count, &mutex, &cv](Client*) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- count++;
- }
- cv.notify_all();
- },
- interval);
-
- runner().scheduleJob(std::move(job));
-
- // Start the runner, job should still run
- runner().startup();
-
- clockSource().advance(interval);
-
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count > 0; });
- }
-
- tearDown();
-}
-
TEST_F(PeriodicRunnerImplTest, TwoJobsTest) {
int countA = 0;
int countB = 0;
@@ -338,8 +301,11 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsTest) {
},
intervalB);
- runner().scheduleJob(std::move(jobA));
- runner().scheduleJob(std::move(jobB));
+ auto jobAnchorA = runner().makeJob(std::move(jobA));
+ auto jobAnchorB = runner().makeJob(std::move(jobB));
+
+ jobAnchorA.start();
+ jobAnchorB.start();
// Fast forward and wait for both jobs to run the right number of times
for (int i = 0; i <= 10; i++) {
@@ -382,8 +348,11 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) {
},
Milliseconds(1));
- runner().scheduleJob(std::move(jobA));
- runner().scheduleJob(std::move(jobB));
+ auto jobAnchorA = runner().makeJob(std::move(jobA));
+ auto jobAnchorB = runner().makeJob(std::move(jobB));
+
+ jobAnchorA.start();
+ jobAnchorB.start();
clockSource().advance(Milliseconds(1));
@@ -415,16 +384,16 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) {
},
Milliseconds(5));
- auto handle = runner().makeJob(std::move(job));
- handle->start();
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
// Wait for the first execution.
{
stdx::unique_lock<stdx::mutex> lk(mutex);
cv.wait(lk, [&] { return timesCalled; });
}
- handle->setPeriod(Milliseconds(10));
- ASSERT_EQ(handle->getPeriod(), Milliseconds(10));
+ jobAnchor.setPeriod(Milliseconds(10));
+ ASSERT_EQ(jobAnchor.getPeriod(), Milliseconds(10));
// if we change the period to a longer duration, that doesn't trigger a run
{
@@ -456,8 +425,8 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) {
ASSERT_EQ(timesCalled, 2ul);
}
- handle->setPeriod(Milliseconds(4));
- ASSERT_EQ(handle->getPeriod(), Milliseconds(4));
+ jobAnchor.setPeriod(Milliseconds(4));
+ ASSERT_EQ(jobAnchor.getPeriod(), Milliseconds(4));
// shortening triggers the period
{