diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-04-26 15:03:46 -0400 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-06-21 17:02:30 -0400 |
commit | 1eff33bd1a8d48eb607675f87faf1836ba325006 (patch) | |
tree | 80149c8ff06158f7fdaaa0014500c06d080575a4 /src | |
parent | 8c4dfc2ba0568bd128a27f6481994758ce5f1c10 (diff) | |
download | mongo-1eff33bd1a8d48eb607675f87faf1836ba325006.tar.gz |
SERVER-39936 Use PeriodicRunner handles to simplify shutdown ordering
Diffstat (limited to 'src')
34 files changed, 404 insertions, 473 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index a35ab54a2f1..bd9e2a9b75f 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -326,7 +326,6 @@ ExitCode _initAndListen(int listenPort) { // Set up the periodic runner for background job execution. This is required to be running // before the storage engine is initialized. auto runner = makePeriodicRunner(serviceContext); - runner->startup(); serviceContext->setPeriodicRunner(std::move(runner)); FlowControl::set(serviceContext, std::make_unique<FlowControl>( @@ -603,8 +602,8 @@ ExitCode _initAndListen(int listenPort) { // Only do this on storage engines supporting snapshot reads, which hold resources we wish to // release periodically in order to avoid storage cache pressure build up. if (storageEngine->supportsReadConcernSnapshot()) { - startPeriodicThreadToAbortExpiredTransactions(serviceContext); - startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(serviceContext); + PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->start(); + PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(serviceContext)->start(); } // Set up the logical session cache @@ -919,13 +918,12 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { flowControlTicketholder->setInShutdown(); } - // Shut down the background periodic task runner. This must be done before shutting down the - // storage engine. - if (auto runner = serviceContext->getPeriodicRunner()) { - runner->shutdown(); - } + if (auto storageEngine = serviceContext->getStorageEngine()) { + if (storageEngine->supportsReadConcernSnapshot()) { + PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->stop(); + PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(serviceContext)->stop(); + } - if (serviceContext->getStorageEngine()) { ServiceContext::UniqueOperationContext uniqueOpCtx; OperationContext* opCtx = client->getOperationContext(); if (!opCtx) { diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index 2617d9060c4..8afd9f6889f 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -41,7 +41,6 @@ #include "mongo/platform/atomic_word.h" #include "mongo/util/duration.h" #include "mongo/util/log.h" -#include "mongo/util/periodic_runner.h" #include "mongo/util/scopeguard.h" namespace mongo { diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp index f3fa239daac..0f55d053fb3 100644 --- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp +++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp @@ -58,13 +58,32 @@ Milliseconds getPeriod(const Argument& transactionLifetimeLimitSeconds) { return period; } + } // namespace -void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext) { - // Enforce calling this function once, and only once. - static bool firstCall = true; - invariant(firstCall); - firstCall = false; +auto PeriodicThreadToAbortExpiredTransactions::get(ServiceContext* serviceContext) + -> PeriodicThreadToAbortExpiredTransactions& { + auto& jobContainer = _serviceDecoration(serviceContext); + jobContainer._init(serviceContext); + + return jobContainer; +} + +auto PeriodicThreadToAbortExpiredTransactions::operator*() const noexcept -> PeriodicJobAnchor& { + stdx::lock_guard lk(_mutex); + return *_anchor; +} + +auto PeriodicThreadToAbortExpiredTransactions::operator-> () const noexcept -> PeriodicJobAnchor* { + stdx::lock_guard lk(_mutex); + return _anchor.get(); +} + +void PeriodicThreadToAbortExpiredTransactions::_init(ServiceContext* serviceContext) { + stdx::lock_guard lk(_mutex); + if (_anchor) { + return; + } auto periodicRunner = serviceContext->getPeriodicRunner(); invariant(periodicRunner); @@ -87,18 +106,17 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex }, getPeriod(gTransactionLifetimeLimitSeconds.load())); - auto handle = periodicRunner->makeJob(std::move(job)); - handle->start(); - - TransactionParticipant::observeTransactionLifetimeLimitSeconds - .addObserver([handle = std::move(handle)](const Argument& secs) { - try { - handle->setPeriod(getPeriod(secs)); - } catch (const DBException& ex) { - log() << "Failed to update period of thread which aborts expired transactions " - << ex.toStatus(); - } - }); + _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job))); + + TransactionParticipant::observeTransactionLifetimeLimitSeconds.addObserver([anchor = _anchor]( + const Argument& secs) { + try { + anchor->setPeriod(getPeriod(secs)); + } catch (const DBException& ex) { + log() << "Failed to update period of thread which aborts expired transactions " + << ex.toStatus(); + } + }); } } // namespace mongo diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h index 6f07e37c247..88bf08d7ee5 100644 --- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h +++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h @@ -29,18 +29,34 @@ #pragma once -namespace mongo { +#include <memory> + +#include "mongo/db/service_context.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/periodic_runner.h" -class ServiceContext; +namespace mongo { /** - * Defines and starts a periodic background job to check for and abort expired transactions. + * Defines a periodic background job to check for and abort expired transactions. * The job will run every (transactionLifetimeLimitSeconds/2) seconds, or at most once per second * and at least once per minute. - * - * This function should only ever be called once, during mongod server startup (db.cpp). - * The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary. */ -void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext); +class PeriodicThreadToAbortExpiredTransactions { +public: + static PeriodicThreadToAbortExpiredTransactions& get(ServiceContext* serviceContext); + + PeriodicJobAnchor& operator*() const noexcept; + PeriodicJobAnchor* operator->() const noexcept; + +private: + void _init(ServiceContext* serviceContext); + + inline static const auto _serviceDecoration = + ServiceContext::declareDecoration<PeriodicThreadToAbortExpiredTransactions>(); + + mutable stdx::mutex _mutex; + std::shared_ptr<PeriodicJobAnchor> _anchor; +}; } // namespace mongo diff --git a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp index 8e4f6594bab..c27418d948f 100644 --- a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp +++ b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.cpp @@ -43,11 +43,30 @@ namespace mongo { -void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* serviceContext) { - // Enforce calling this function once, and only once. - static bool firstCall = true; - invariant(firstCall); - firstCall = false; +auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::get(ServiceContext* serviceContext) + -> PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded& { + auto& jobContainer = _serviceDecoration(serviceContext); + jobContainer._init(serviceContext); + return jobContainer; +} + +auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::operator-> () const noexcept + -> PeriodicJobAnchor* { + stdx::lock_guard lk(_mutex); + return _anchor.get(); +} + +auto PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::operator*() const noexcept + -> PeriodicJobAnchor& { + stdx::lock_guard lk(_mutex); + return *_anchor; +} + +void PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded::_init(ServiceContext* serviceContext) { + stdx::lock_guard lk(_mutex); + if (_anchor) { + return; + } auto periodicRunner = serviceContext->getPeriodicRunner(); invariant(periodicRunner); @@ -71,19 +90,19 @@ void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* ser }, Seconds(snapshotWindowParams.decreaseHistoryIfNotNeededPeriodSeconds.load())); - auto handle = periodicRunner->makeJob(std::move(job)); - handle->start(); + _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job))); - SnapshotWindowParams::observeDecreaseHistoryIfNotNeededPeriodSeconds - .addObserver([handle = std::move(handle)](const auto& secs) { - try { - handle->setPeriod(Seconds(secs)); - } catch (const DBException& ex) { - log() << "Failed to update the period of the thread which decreases data history " - "target window size if there have been no new SnapshotTooOld errors." - << ex.toStatus(); - } - }); + SnapshotWindowParams::observeDecreaseHistoryIfNotNeededPeriodSeconds.addObserver([anchor = + _anchor]( + const auto& secs) { + try { + anchor->setPeriod(Seconds(secs)); + } catch (const DBException& ex) { + log() << "Failed to update the period of the thread which decreases data history " + "target window size if there have been no new SnapshotTooOld errors." + << ex.toStatus(); + } + }); } } // namespace mongo diff --git a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h index 1b6c2e65b18..f6e23949a24 100644 --- a/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h +++ b/src/mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h @@ -29,9 +29,13 @@ #pragma once -namespace mongo { +#include <memory> + +#include "mongo/db/service_context.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/periodic_runner.h" -class ServiceContext; +namespace mongo { /** * Periodically checks whether there has been any storage engine cache pressure and SnapshotTooOld @@ -43,6 +47,21 @@ class ServiceContext; * This function should only ever be called once, during mongod server startup (db.cpp). * The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary. */ -void startPeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded(ServiceContext* serviceContext); +class PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded { +public: + static PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded& get(ServiceContext* serviceContext); + + PeriodicJobAnchor* operator->() const noexcept; + PeriodicJobAnchor& operator*() const noexcept; + +private: + void _init(ServiceContext* serviceContext); + + inline static const auto _serviceDecoration = + ServiceContext::declareDecoration<PeriodicThreadToDecreaseSnapshotHistoryIfNotNeeded>(); + + mutable stdx::mutex _mutex; + std::shared_ptr<PeriodicJobAnchor> _anchor; +}; } // namespace mongo diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.cpp b/src/mongo/db/s/periodic_balancer_config_refresher.cpp index 87b8ad55a4d..f9cab569d89 100644 --- a/src/mongo/db/s/periodic_balancer_config_refresher.cpp +++ b/src/mongo/db/s/periodic_balancer_config_refresher.cpp @@ -46,8 +46,7 @@ namespace { const auto getPeriodicBalancerConfigRefresher = ServiceContext::declareDecoration<PeriodicBalancerConfigRefresher>(); -std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher( - ServiceContext* serviceContext) { +PeriodicJobAnchor launchBalancerConfigRefresher(ServiceContext* serviceContext) { auto periodicRunner = serviceContext->getPeriodicRunner(); invariant(periodicRunner); @@ -66,7 +65,7 @@ std::unique_ptr<PeriodicRunner::PeriodicJobHandle> launchBalancerConfigRefresher }, Seconds(30)); auto balancerConfigRefresher = periodicRunner->makeJob(std::move(job)); - balancerConfigRefresher->start(); + balancerConfigRefresher.start(); return balancerConfigRefresher; } @@ -86,7 +85,7 @@ void PeriodicBalancerConfigRefresher::onShardingInitialization(ServiceContext* s _isPrimary = isPrimary; // This function is called on sharding state initialization, so go ahead // and start up the balancer config refresher task if we're a primary. - if (isPrimary && !_balancerConfigRefresher) { + if (isPrimary && !_balancerConfigRefresher.isValid()) { _balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext); } } @@ -95,12 +94,12 @@ void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) { _isPrimary = true; // If this is the first time we're stepping up, start a thread to periodically refresh the // balancer configuration. - if (!_balancerConfigRefresher) { + if (!_balancerConfigRefresher.isValid()) { _balancerConfigRefresher = launchBalancerConfigRefresher(serviceContext); } else { // If we're stepping up again after having stepped down, just resume // the existing task. - _balancerConfigRefresher->resume(); + _balancerConfigRefresher.resume(); } } } @@ -108,9 +107,9 @@ void PeriodicBalancerConfigRefresher::onStepUp(ServiceContext* serviceContext) { void PeriodicBalancerConfigRefresher::onStepDown() { if (_isPrimary) { _isPrimary = false; - invariant(_balancerConfigRefresher); + invariant(_balancerConfigRefresher.isValid()); // We don't need to be refreshing the balancer configuration unless we're primary. - _balancerConfigRefresher->pause(); + _balancerConfigRefresher.pause(); } } diff --git a/src/mongo/db/s/periodic_balancer_config_refresher.h b/src/mongo/db/s/periodic_balancer_config_refresher.h index 96a36202025..a331f9cfab5 100644 --- a/src/mongo/db/s/periodic_balancer_config_refresher.h +++ b/src/mongo/db/s/periodic_balancer_config_refresher.h @@ -80,6 +80,6 @@ private: bool _isPrimary{false}; // Periodic job for refreshing the balancer configuration - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _balancerConfigRefresher; + PeriodicJobAnchor _balancerConfigRefresher; }; } // namespace mongo diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index d122a32e5e3..0aa04389245 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -533,11 +533,6 @@ private: stdx::mutex _mutex; /** - * The storage engine, if any. - */ - std::unique_ptr<StorageEngine> _storageEngine; - - /** * The periodic runner. */ std::unique_ptr<PeriodicRunner> _runner; @@ -558,6 +553,11 @@ private: std::unique_ptr<transport::ServiceExecutor> _serviceExecutor; /** + * The storage engine, if any. + */ + std::unique_ptr<StorageEngine> _storageEngine; + + /** * Vector of registered observers. */ std::vector<ClientObserverHolder> _clientObservers; diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp index 873e558082e..992e8a86967 100644 --- a/src/mongo/db/service_context_d_test_fixture.cpp +++ b/src/mongo/db/service_context_d_test_fixture.cpp @@ -73,7 +73,6 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine, RepairAct // Set up the periodic runner to allow background job execution for tests that require it. auto runner = makePeriodicRunner(getServiceContext()); - runner->startup(); getServiceContext()->setPeriodicRunner(std::move(runner)); storageGlobalParams.dbpath = _tempDir.path(); diff --git a/src/mongo/db/service_liaison_mock.cpp b/src/mongo/db/service_liaison_mock.cpp index fd95a7ebc6d..ab4397f1980 100644 --- a/src/mongo/db/service_liaison_mock.cpp +++ b/src/mongo/db/service_liaison_mock.cpp @@ -40,7 +40,6 @@ namespace mongo { MockServiceLiaisonImpl::MockServiceLiaisonImpl() { _timerFactory = std::make_unique<executor::AsyncTimerFactoryMock>(); _runner = makePeriodicRunner(getGlobalServiceContext()); - _runner->startup(); } LogicalSessionIdSet MockServiceLiaisonImpl::getActiveOpSessions() const { @@ -53,9 +52,7 @@ LogicalSessionIdSet MockServiceLiaisonImpl::getOpenCursorSessions(OperationConte return _cursorSessions; } -void MockServiceLiaisonImpl::join() { - _runner->shutdown(); -} +void MockServiceLiaisonImpl::join() {} Date_t MockServiceLiaisonImpl::now() const { return _timerFactory->now(); diff --git a/src/mongo/db/service_liaison_mongod.cpp b/src/mongo/db/service_liaison_mongod.cpp index 1008267bf19..94e1fbd9217 100644 --- a/src/mongo/db/service_liaison_mongod.cpp +++ b/src/mongo/db/service_liaison_mongod.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + #include "mongo/platform/basic.h" #include "mongo/db/service_liaison_mongod.h" @@ -37,6 +39,7 @@ #include "mongo/db/service_context.h" #include "mongo/stdx/mutex.h" #include "mongo/util/clock_source.h" +#include "mongo/util/log.h" namespace mongo { @@ -73,17 +76,20 @@ LogicalSessionIdSet ServiceLiaisonMongod::getOpenCursorSessions(OperationContext void ServiceLiaisonMongod::scheduleJob(PeriodicRunner::PeriodicJob job) { invariant(hasGlobalServiceContext()); - auto jobHandle = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); - jobHandle->start(); - _jobs.push_back(std::move(jobHandle)); + auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); + jobAnchor.start(); + + { + stdx::lock_guard lk(_mutex); + _jobs.push_back(std::move(jobAnchor)); + } } void ServiceLiaisonMongod::join() { - invariant(hasGlobalServiceContext()); - for (auto&& jobHandle : _jobs) { - jobHandle->stop(); - } - _jobs.clear(); + auto jobs = [&] { + stdx::lock_guard lk(_mutex); + return std::exchange(_jobs, {}); + }(); } Date_t ServiceLiaisonMongod::now() const { diff --git a/src/mongo/db/service_liaison_mongod.h b/src/mongo/db/service_liaison_mongod.h index c334eb5f36c..b1060425f6f 100644 --- a/src/mongo/db/service_liaison_mongod.h +++ b/src/mongo/db/service_liaison_mongod.h @@ -68,7 +68,9 @@ protected: * Returns the service context. */ ServiceContext* _context() override; - std::vector<std::unique_ptr<PeriodicRunner::PeriodicJobHandle>> _jobs; + + stdx::mutex _mutex; + std::vector<PeriodicJobAnchor> _jobs; }; } // namespace mongo diff --git a/src/mongo/db/service_liaison_mongos.cpp b/src/mongo/db/service_liaison_mongos.cpp index 7da5e508109..666ca06ea68 100644 --- a/src/mongo/db/service_liaison_mongos.cpp +++ b/src/mongo/db/service_liaison_mongos.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + #include "mongo/platform/basic.h" #include "mongo/db/service_liaison_mongos.h" @@ -36,7 +38,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/stdx/mutex.h" #include "mongo/util/clock_source.h" -#include "mongo/util/periodic_runner.h" +#include "mongo/util/log.h" namespace mongo { @@ -79,12 +81,20 @@ LogicalSessionIdSet ServiceLiaisonMongos::getOpenCursorSessions(OperationContext void ServiceLiaisonMongos::scheduleJob(PeriodicRunner::PeriodicJob job) { invariant(hasGlobalServiceContext()); - getGlobalServiceContext()->getPeriodicRunner()->scheduleJob(std::move(job)); + auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); + jobAnchor.start(); + + { + stdx::lock_guard lk(_mutex); + _jobs.push_back(std::move(jobAnchor)); + } } void ServiceLiaisonMongos::join() { - invariant(hasGlobalServiceContext()); - getGlobalServiceContext()->getPeriodicRunner()->shutdown(); + auto jobs = [&] { + stdx::lock_guard lk(_mutex); + return std::exchange(_jobs, {}); + }(); } Date_t ServiceLiaisonMongos::now() const { diff --git a/src/mongo/db/service_liaison_mongos.h b/src/mongo/db/service_liaison_mongos.h index 580dc3cf00e..ab40801557d 100644 --- a/src/mongo/db/service_liaison_mongos.h +++ b/src/mongo/db/service_liaison_mongos.h @@ -68,6 +68,9 @@ protected: * Returns the service context. */ ServiceContext* _context() override; + + stdx::mutex _mutex; + std::vector<PeriodicJobAnchor> _jobs; }; } // namespace mongo diff --git a/src/mongo/db/storage/flow_control.cpp b/src/mongo/db/storage/flow_control.cpp index 3f1d2e0fec3..df076e2d82c 100644 --- a/src/mongo/db/storage/flow_control.cpp +++ b/src/mongo/db/storage/flow_control.cpp @@ -134,12 +134,13 @@ FlowControl::FlowControl(ServiceContext* service, repl::ReplicationCoordinator* // cause a slow start on start up. FlowControlTicketholder::set(service, std::make_unique<FlowControlTicketholder>(_kMaxTickets)); - service->getPeriodicRunner()->scheduleJob( + _jobAnchor = service->getPeriodicRunner()->makeJob( {"FlowControlRefresher", [this](Client* client) { FlowControlTicketholder::get(client->getServiceContext())->refreshTo(getNumTickets()); }, Seconds(1)}); + _jobAnchor.start(); } FlowControl* FlowControl::get(ServiceContext* service) { diff --git a/src/mongo/db/storage/flow_control.h b/src/mongo/db/storage/flow_control.h index e3905a3728b..64f0d0b1d00 100644 --- a/src/mongo/db/storage/flow_control.h +++ b/src/mongo/db/storage/flow_control.h @@ -141,6 +141,8 @@ private: // This value is used for calculating server status metrics. std::uint64_t _startWaitTime = 0; + + PeriodicJobAnchor _jobAnchor; }; } // namespace mongo diff --git a/src/mongo/db/storage/kv/storage_engine_impl.cpp b/src/mongo/db/storage/kv/storage_engine_impl.cpp index 3aa39b1c123..f2734ced75f 100644 --- a/src/mongo/db/storage/kv/storage_engine_impl.cpp +++ b/src/mongo/db/storage/kv/storage_engine_impl.cpp @@ -877,7 +877,7 @@ void StorageEngineImpl::TimestampMonitor::startup() { // Take a global lock in MODE_IS while fetching timestamps to guarantee that // rollback-to-stable isn't running concurrently. - { + try { auto opCtx = client->getOperationContext(); mongo::ServiceContext::UniqueOperationContext uOpCtx; if (!opCtx) { @@ -894,6 +894,9 @@ void StorageEngineImpl::TimestampMonitor::startup() { checkpoint = _engine->getCheckpointTimestamp(); oldest = _engine->getOldestTimestamp(); stable = _engine->getStableTimestamp(); + } catch (const ExceptionFor<ErrorCodes::InterruptedAtShutdown>&) { + // If we're interrupted at shutdown, it's fine to give up on future notifications + return; } Timestamp minOfCheckpointAndOldest = @@ -922,7 +925,8 @@ void StorageEngineImpl::TimestampMonitor::startup() { }, Seconds(1)); - _periodicRunner->scheduleJob(std::move(job)); + _job = _periodicRunner->makeJob(std::move(job)); + _job.start(); _running = true; } diff --git a/src/mongo/db/storage/kv/storage_engine_impl.h b/src/mongo/db/storage/kv/storage_engine_impl.h index 4d060cdcd4b..6c6dd3fdf2c 100644 --- a/src/mongo/db/storage/kv/storage_engine_impl.h +++ b/src/mongo/db/storage/kv/storage_engine_impl.h @@ -280,6 +280,7 @@ public: KVEngine* _engine; bool _running; + PeriodicJobAnchor _job; // The set of timestamps that were last reported to the listeners by the monitor. MonitoredTimestamps _currentTimestamps; diff --git a/src/mongo/db/storage/kv/storage_engine_test.cpp b/src/mongo/db/storage/kv/storage_engine_test.cpp index d424f64f3c1..40eddbfcef3 100644 --- a/src/mongo/db/storage/kv/storage_engine_test.cpp +++ b/src/mongo/db/storage/kv/storage_engine_test.cpp @@ -391,10 +391,6 @@ public: } ~TimestampKVEngineTest() { - // Shut down the background periodic task runner, before the storage engine. - auto runner = getServiceContext()->getPeriodicRunner(); - runner->shutdown(); - _storageEngine->cleanShutdown(); _storageEngine.reset(); } diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.cpp b/src/mongo/db/storage/mobile/mobile_kv_engine.cpp index 4c8b2401683..c6ff81b12f7 100644 --- a/src/mongo/db/storage/mobile/mobile_kv_engine.cpp +++ b/src/mongo/db/storage/mobile/mobile_kv_engine.cpp @@ -100,7 +100,7 @@ MobileKVEngine::MobileKVEngine(const std::string& path, maybeVacuum(client, Date_t::max()); }, Minutes(options.vacuumCheckIntervalMinutes))); - _vacuumJob->start(); + _vacuumJob.start(); } } diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.h b/src/mongo/db/storage/mobile/mobile_kv_engine.h index 10da2d2e1a0..094bf0674d9 100644 --- a/src/mongo/db/storage/mobile/mobile_kv_engine.h +++ b/src/mongo/db/storage/mobile/mobile_kv_engine.h @@ -154,7 +154,7 @@ private: std::string _path; embedded::MobileOptions _options; - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> _vacuumJob; + PeriodicJobAnchor _vacuumJob; }; } // namespace mongo diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp index c4947ebba57..c0f09bbf48b 100644 --- a/src/mongo/dbtests/framework.cpp +++ b/src/mongo/dbtests/framework.cpp @@ -75,11 +75,6 @@ int runDbTests(int argc, char** argv) { // the memory and makes leak sanitizer happy. ScriptEngine::dropScopeCache(); - // Shut down the background periodic task runner, before the storage engine. - if (auto runner = getGlobalServiceContext()->getPeriodicRunner()) { - runner->shutdown(); - } - // We may be shut down before we have a global storage // engine. if (!getGlobalServiceContext()->getStorageEngine()) @@ -103,7 +98,6 @@ int runDbTests(int argc, char** argv) { // Set up the periodic runner for background job execution, which is required by the storage // engine to be running beforehand. auto runner = makePeriodicRunner(globalServiceContext); - runner->startup(); globalServiceContext->setPeriodicRunner(std::move(runner)); initializeStorageEngine(globalServiceContext, StorageEngineInitFlags::kNone); diff --git a/src/mongo/embedded/embedded.cpp b/src/mongo/embedded/embedded.cpp index d63202c95b4..54c95abe186 100644 --- a/src/mongo/embedded/embedded.cpp +++ b/src/mongo/embedded/embedded.cpp @@ -169,11 +169,6 @@ void shutdown(ServiceContext* srvContext) { LogicalSessionCache::set(serviceContext, nullptr); - // Shut down the background periodic task runner, before the storage engine. - if (auto runner = serviceContext->getPeriodicRunner()) { - runner->shutdown(); - } - repl::ReplicationCoordinator::get(serviceContext)->shutdown(shutdownOpCtx.get()); IndexBuildsCoordinator::get(serviceContext)->shutdown(); @@ -236,7 +231,6 @@ ServiceContext* initialize(const char* yaml_config) { // The periodic runner is required by the storage engine to be running beforehand. auto periodicRunner = std::make_unique<PeriodicRunnerEmbedded>( serviceContext, serviceContext->getPreciseClockSource()); - periodicRunner->startup(); serviceContext->setPeriodicRunner(std::move(periodicRunner)); setUpCatalog(serviceContext); diff --git a/src/mongo/embedded/periodic_runner_embedded.cpp b/src/mongo/embedded/periodic_runner_embedded.cpp index bb0c17c40a3..41ae8a49e93 100644 --- a/src/mongo/embedded/periodic_runner_embedded.cpp +++ b/src/mongo/embedded/periodic_runner_embedded.cpp @@ -37,21 +37,6 @@ #include "mongo/util/scopeguard.h" namespace mongo { -namespace { - -template <typename T> -std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) { - if (auto p = ptr.lock()) { - return p; - } else { - uasserted(ErrorCodes::InternalError, errMsg); - } -} - -constexpr auto kPeriodicJobHandleLifetimeErrMsg = - "The PeriodicRunner job for this handle no longer exists"_sd; - -} // namespace struct PeriodicRunnerEmbedded::PeriodicJobSorter { bool operator()(std::shared_ptr<PeriodicJobImpl> const& lhs, @@ -64,59 +49,13 @@ struct PeriodicRunnerEmbedded::PeriodicJobSorter { PeriodicRunnerEmbedded::PeriodicRunnerEmbedded(ServiceContext* svc, ClockSource* clockSource) : _svc(svc), _clockSource(clockSource) {} -PeriodicRunnerEmbedded::~PeriodicRunnerEmbedded() { - PeriodicRunnerEmbedded::shutdown(); -} - -std::shared_ptr<PeriodicRunnerEmbedded::PeriodicJobImpl> PeriodicRunnerEmbedded::createAndAddJob( - PeriodicJob job, bool shouldStart) { +auto PeriodicRunnerEmbedded::makeJob(PeriodicJob job) -> JobAnchor { auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this); stdx::lock_guard<stdx::mutex> lk(_mutex); _jobs.push_back(impl); std::push_heap(_jobs.begin(), _jobs.end(), PeriodicJobSorter()); - if (shouldStart && _running) - impl->start(); - return impl; -} - -std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerEmbedded::makeJob( - PeriodicJob job) { - return std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(std::move(job), false)); -} - -void PeriodicRunnerEmbedded::scheduleJob(PeriodicJob job) { - createAndAddJob(std::move(job), true); -} - -void PeriodicRunnerEmbedded::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_running) { - return; - } - - _running = true; - - // schedule any jobs that we have - for (auto& job : _jobs) { - job->start(); - } -} - -void PeriodicRunnerEmbedded::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_running) { - _running = false; - - for (auto& job : _jobs) { - stdx::lock_guard<stdx::mutex> jobLock(job->_mutex); - if (job->isAlive(jobLock)) { - job->_stopWithMasterAndJobLock(lk, jobLock); - } - } - _jobs.clear(); - } + return JobAnchor(impl); } bool PeriodicRunnerEmbedded::tryPump() { @@ -225,7 +164,9 @@ void PeriodicRunnerEmbedded::PeriodicJobImpl::stop() { // sure the user can invalidate it after this call. stdx::lock_guard<stdx::mutex> masterLock(_periodicRunner->_mutex); stdx::lock_guard<stdx::mutex> lk(_mutex); - _stopWithMasterAndJobLock(masterLock, lk); + if (isAlive(lk)) { + _stopWithMasterAndJobLock(masterLock, lk); + } } Milliseconds PeriodicRunnerEmbedded::PeriodicJobImpl::getPeriod() { @@ -255,34 +196,4 @@ bool PeriodicRunnerEmbedded::PeriodicJobImpl::isAlive(WithLock lk) { return _execStatus == ExecutionStatus::kRunning || _execStatus == ExecutionStatus::kPaused; } -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::start() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->start(); -} - -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::stop() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->stop(); -} - -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::pause() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->pause(); -} - -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::resume() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->resume(); -} - -Milliseconds PeriodicRunnerEmbedded::PeriodicJobHandleImpl::getPeriod() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - return job->getPeriod(); -} - -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::setPeriod(Milliseconds ms) { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->setPeriod(ms); -} - } // namespace mongo diff --git a/src/mongo/embedded/periodic_runner_embedded.h b/src/mongo/embedded/periodic_runner_embedded.h index a1e56079208..6d82c50db44 100644 --- a/src/mongo/embedded/periodic_runner_embedded.h +++ b/src/mongo/embedded/periodic_runner_embedded.h @@ -47,21 +47,15 @@ namespace mongo { class PeriodicRunnerEmbedded : public PeriodicRunner { public: PeriodicRunnerEmbedded(ServiceContext* svc, ClockSource* clockSource); - ~PeriodicRunnerEmbedded(); - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override; - void scheduleJob(PeriodicJob job) override; - - void startup() override; - - void shutdown() override; + JobAnchor makeJob(PeriodicJob job) override; // Safe to call from multiple threads but will only execute on one thread at a time. // Returns true if it attempted to run any jobs. bool tryPump(); private: - class PeriodicJobImpl { + class PeriodicJobImpl : public ControllableJob { PeriodicJobImpl(const PeriodicJobImpl&) = delete; PeriodicJobImpl& operator=(const PeriodicJobImpl&) = delete; @@ -69,12 +63,12 @@ private: friend class PeriodicRunnerEmbedded; PeriodicJobImpl(PeriodicJob job, ClockSource* source, PeriodicRunnerEmbedded* runner); - void start(); - void pause(); - void resume(); - void stop(); - Milliseconds getPeriod(); - void setPeriod(Milliseconds ms); + void start() override; + void pause() override; + void resume() override; + void stop() override; + Milliseconds getPeriod() override; + void setPeriod(Milliseconds ms) override; bool isAlive(WithLock lk); @@ -101,24 +95,6 @@ private: }; struct PeriodicJobSorter; - std::shared_ptr<PeriodicRunnerEmbedded::PeriodicJobImpl> createAndAddJob(PeriodicJob job, - bool shouldStart); - - class PeriodicJobHandleImpl : public PeriodicJobHandle { - public: - explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl) - : _jobWeak(jobImpl) {} - void start() override; - void stop() override; - void pause() override; - void resume() override; - Milliseconds getPeriod() override; - void setPeriod(Milliseconds ms) override; - - private: - std::weak_ptr<PeriodicJobImpl> _jobWeak; - }; - ServiceContext* _svc; ClockSource* _clockSource; @@ -127,7 +103,6 @@ private: std::vector<std::shared_ptr<PeriodicJobImpl>> _Pausedjobs; stdx::mutex _mutex; - bool _running = false; }; } // namespace mongo diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 0a4a7fb2cbf..b31b90bde7d 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -212,12 +212,6 @@ void cleanupTask(ServiceContext* serviceContext) { if (serviceContext) { serviceContext->setKillAllOperations(); - - // Shut down the background periodic task runner. - auto runner = serviceContext->getPeriodicRunner(); - if (runner) { - runner->shutdown(); - } } // Perform all shutdown operations after setKillAllOperations is called in order to ensure @@ -493,7 +487,6 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { // Set up the periodic runner for background job execution { auto runner = makePeriodicRunner(serviceContext); - runner->startup(); serviceContext->setPeriodicRunner(std::move(runner)); } diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index 2c9dd3bfbc7..11e885ec0ae 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -162,6 +162,7 @@ env.Library( "periodic_runner.cpp", ], LIBDEPS=[ + "$BUILD_DIR/mongo/base", ], ) diff --git a/src/mongo/util/mock_periodic_runner_impl.h b/src/mongo/util/mock_periodic_runner_impl.h index 207fefe904d..361de33a4cd 100644 --- a/src/mongo/util/mock_periodic_runner_impl.h +++ b/src/mongo/util/mock_periodic_runner_impl.h @@ -40,25 +40,23 @@ namespace mongo { */ class MockPeriodicRunnerImpl final : public PeriodicRunner { public: - class MockPeriodicJobHandleImpl final : public PeriodicRunner::PeriodicJobHandle { + class Job final : public ControllableJob { public: - ~MockPeriodicJobHandleImpl() = default; - - void start() override{}; - void stop() override{}; - void pause() override{}; - void resume() override{}; + ~Job() = default; + + void start() final{}; + void stop() final{}; + void pause() final{}; + void resume() final{}; + Milliseconds getPeriod() final{}; + void setPeriod(Milliseconds ms) final{}; }; ~MockPeriodicRunnerImpl() = default; - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicRunner::PeriodicJob job) { - return std::make_unique<MockPeriodicJobHandleImpl>(); + PeriodicAnchor makeJob(PeriodicJob job) final { + return std::weak_ptr<Job>{}; } - - void scheduleJob(PeriodicRunner::PeriodicJob job) {} - void startup() {} - void shutdown() {} }; } // namespace mongo diff --git a/src/mongo/util/periodic_runner.cpp b/src/mongo/util/periodic_runner.cpp index dc54356142c..ed7069d4183 100644 --- a/src/mongo/util/periodic_runner.cpp +++ b/src/mongo/util/periodic_runner.cpp @@ -31,8 +31,58 @@ #include "mongo/util/periodic_runner.h" +#include "mongo/util/assert_util.h" + namespace mongo { PeriodicRunner::~PeriodicRunner() = default; +PeriodicJobAnchor::PeriodicJobAnchor(std::shared_ptr<Job> handle) : _handle{std::move(handle)} {} + +PeriodicJobAnchor::~PeriodicJobAnchor() { + if (!_handle) { + return; + } + + _handle->stop(); +} + +void PeriodicJobAnchor::start() { + invariant(_handle); + _handle->start(); +} + +void PeriodicJobAnchor::pause() { + invariant(_handle); + _handle->pause(); +} + +void PeriodicJobAnchor::resume() { + invariant(_handle); + _handle->resume(); +} + +void PeriodicJobAnchor::stop() { + invariant(_handle); + _handle->stop(); +} + +void PeriodicJobAnchor::setPeriod(Milliseconds ms) { + invariant(_handle); + _handle->setPeriod(ms); +} + +Milliseconds PeriodicJobAnchor::getPeriod() { + invariant(_handle); + return _handle->getPeriod(); +} + +void PeriodicJobAnchor::detach() { + _handle.reset(); +} + +bool PeriodicJobAnchor::isValid() const noexcept { + return static_cast<bool>(_handle); +} + } // namespace mongo diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h index 1508078a87f..93a03498357 100644 --- a/src/mongo/util/periodic_runner.h +++ b/src/mongo/util/periodic_runner.h @@ -30,13 +30,18 @@ #pragma once #include <functional> +#include <memory> #include <string> +#include <boost/optional.hpp> + +#include "mongo/stdx/mutex.h" #include "mongo/util/time_support.h" namespace mongo { class Client; +class PeriodicJobAnchor; /** * An interface for objects that run work items at specified intervals. Each individually scheduled @@ -52,6 +57,7 @@ class Client; class PeriodicRunner { public: using Job = std::function<void(Client* client)>; + using JobAnchor = PeriodicJobAnchor; struct PeriodicJob { PeriodicJob(std::string name, Job callable, Milliseconds period) @@ -73,9 +79,12 @@ public: Milliseconds interval; }; - class PeriodicJobHandle { + /** + * A ControllableJob allows a user to reschedule the execution of a Job + */ + class ControllableJob { public: - virtual ~PeriodicJobHandle() = default; + virtual ~ControllableJob() = default; /** * Starts running the job @@ -115,39 +124,70 @@ public: virtual ~PeriodicRunner(); - /** * Creates a new job and adds it to the runner, but does not schedule it. * The caller is responsible for calling 'start' on the resulting handle in * order to begin the job running. This API should be used when the caller * is interested in observing and controlling the job execution state. */ - virtual std::unique_ptr<PeriodicJobHandle> makeJob(PeriodicJob job) = 0; + virtual JobAnchor makeJob(PeriodicJob job) = 0; +}; - /** - * Schedules a job to be run at periodic intervals. - * - * If the runner is not running when a job is scheduled, that job should - * be saved so that it may run in the future once startup() is called. - */ - virtual void scheduleJob(PeriodicJob job) = 0; +/** + * A PeriodicJobAnchor allows the holder to control the scheduling of a job for the lifetime of the + * anchor. When an anchor is destructed, it stops its underlying job. + * + * The underlying weak_ptr for this class is not synchronized. In essence, treat use of this class + * as if it were a raw pointer to a ControllableJob. + * + * Each wrapped PeriodicRunner::ControllableJob function on this object throws + * if the underlying job is gone (e.g. in shutdown). + */ +class[[nodiscard]] PeriodicJobAnchor { +public: + using Job = PeriodicRunner::ControllableJob; + +public: + // Note that this constructor is only intended for use with PeriodicRunner::makeJob() + explicit PeriodicJobAnchor(std::shared_ptr<Job> handle); + + PeriodicJobAnchor() = default; + PeriodicJobAnchor(PeriodicJobAnchor &&) = default; + PeriodicJobAnchor& operator=(PeriodicJobAnchor&&) = default; + + PeriodicJobAnchor(const PeriodicJobAnchor&) = delete; + PeriodicJobAnchor& operator=(const PeriodicJobAnchor&) = delete; + + ~PeriodicJobAnchor(); + + void start(); + void pause(); + void resume(); + void stop(); + void setPeriod(Milliseconds ms); + Milliseconds getPeriod(); /** - * Starts up this periodic runner. + * Abandon responsibility for scheduling the execution of this job * - * This method may safely be called multiple times, either with or without - * calls to shutdown() in between. + * This effectively invalidates the anchor. */ - virtual void startup() = 0; + void detach(); /** - * Shuts down this periodic runner. Stops all jobs from running. + * Returns if this PeriodicJobAnchor is associated with a PeriodicRunner::ControllableJob * - * This method may safely be called multiple times, either with or without - * calls to startup() in between. Any jobs that have been scheduled on this - * runner should no longer execute once shutdown() is called. + * This function is useful to see if a PeriodicJobAnchor is initialized. It does not necessarily + * inform whether a PeriodicJobAnchor will throw from a control function above. */ - virtual void shutdown() = 0; + bool isValid() const noexcept; + + explicit operator bool() const noexcept { + return isValid(); + } + +private: + std::shared_ptr<Job> _handle; }; } // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp index 8ffc8839bb8..fc21a7184a4 100644 --- a/src/mongo/util/periodic_runner_impl.cpp +++ b/src/mongo/util/periodic_runner_impl.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + #include "mongo/platform/basic.h" #include "mongo/util/periodic_runner_impl.h" @@ -34,6 +36,7 @@ #include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/util/clock_source.h" +#include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -41,71 +44,41 @@ namespace mongo { PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource) : _svc(svc), _clockSource(clockSource) {} -PeriodicRunnerImpl::~PeriodicRunnerImpl() { - PeriodicRunnerImpl::shutdown(); -} - -std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> PeriodicRunnerImpl::createAndAddJob( - PeriodicJob job) { +auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor { auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this->_svc); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _jobs.push_back(impl); - return impl; -} - -std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerImpl::makeJob(PeriodicJob job) { - auto handle = std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(job)); - return std::move(handle); + JobAnchor anchor(std::move(impl)); + return anchor; } -void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) { - auto impl = createAndAddJob(job); - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_running) { - impl->start(); - } -} +PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, + ClockSource* source, + ServiceContext* svc) + : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {} -void PeriodicRunnerImpl::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void PeriodicRunnerImpl::PeriodicJobImpl::_run() { + auto[startPromise, startFuture] = makePromiseFuture<void>(); - if (_running) { - return; + { + stdx::lock_guard lk(_mutex); + invariant(_execStatus == ExecutionStatus::NOT_SCHEDULED); } - _running = true; - // schedule any jobs that we have - for (auto& job : _jobs) { - job->start(); - } -} + _thread = stdx::thread([ this, startPromise = std::move(startPromise) ]() mutable { + auto guard = makeGuard([this] { _stopPromise.emplaceValue(); }); -void PeriodicRunnerImpl::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_running) { - _running = false; + Client::initThread(_job.name, _serviceContext, nullptr); - for (auto& job : _jobs) { - job->stop(); + // Let start() know we're running + { + stdx::lock_guard lk(_mutex); + _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; } - _jobs.clear(); - } -} + startPromise.emplaceValue(); -PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, - ClockSource* source, - ServiceContext* svc) - : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {} - -void PeriodicRunnerImpl::PeriodicJobImpl::_run() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::NOT_SCHEDULED); - _thread = stdx::thread([this] { - Client::initThread(_job.name, _serviceContext, nullptr); - while (true) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock lk(_mutex); + while (_execStatus != ExecutionStatus::CANCELED) { // Wait until it's unpaused or canceled _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; }); if (_execStatus == ExecutionStatus::CANCELED) { @@ -135,10 +108,14 @@ void PeriodicRunnerImpl::PeriodicJobImpl::_run() { } while (_clockSource->now() < getDeadlineFromInterval()); } }); - _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; + + // Wait for the thread to actually start + startFuture.get(); } void PeriodicRunnerImpl::PeriodicJobImpl::start() { + LOG(2) << "Starting periodic job " << _job.name; + _run(); } @@ -158,17 +135,26 @@ void PeriodicRunnerImpl::PeriodicJobImpl::resume() { } void PeriodicRunnerImpl::PeriodicJobImpl::stop() { - { + auto lastExecStatus = [&] { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_execStatus != ExecutionStatus::RUNNING && _execStatus != ExecutionStatus::PAUSED) - return; - invariant(_thread.joinable()); + return std::exchange(_execStatus, ExecutionStatus::CANCELED); + }(); - _execStatus = PeriodicJobImpl::ExecutionStatus::CANCELED; + // If we never started, then nobody should wait + if (lastExecStatus == ExecutionStatus::NOT_SCHEDULED) { + return; } - _condvar.notify_one(); - _thread.join(); + + // Only join once + if (lastExecStatus != ExecutionStatus::CANCELED) { + LOG(2) << "Stopping periodic job " << _job.name; + + _condvar.notify_one(); + _thread.join(); + } + + _stopPromise.getFuture().get(); } Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() { @@ -185,50 +171,4 @@ void PeriodicRunnerImpl::PeriodicJobImpl::setPeriod(Milliseconds ms) { } } -namespace { - -template <typename T> -std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) { - if (auto p = ptr.lock()) { - return p; - } else { - uasserted(ErrorCodes::InternalError, errMsg); - } -} - -constexpr auto kPeriodicJobHandleLifetimeErrMsg = - "The PeriodicRunner job for this handle no longer exists"_sd; - -} // namespace - -void PeriodicRunnerImpl::PeriodicJobHandleImpl::start() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->start(); -} - -void PeriodicRunnerImpl::PeriodicJobHandleImpl::stop() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->stop(); -} - -void PeriodicRunnerImpl::PeriodicJobHandleImpl::pause() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->pause(); -} - -void PeriodicRunnerImpl::PeriodicJobHandleImpl::resume() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->resume(); -} - -Milliseconds PeriodicRunnerImpl::PeriodicJobHandleImpl::getPeriod() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - return job->getPeriod(); -} - -void PeriodicRunnerImpl::PeriodicJobHandleImpl::setPeriod(Milliseconds ms) { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->setPeriod(ms); -} - } // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h index eb6663d708b..a921a66c59f 100644 --- a/src/mongo/util/periodic_runner_impl.h +++ b/src/mongo/util/periodic_runner_impl.h @@ -36,6 +36,7 @@ #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/clock_source.h" +#include "mongo/util/future.h" #include "mongo/util/periodic_runner.h" namespace mongo { @@ -50,17 +51,11 @@ class ServiceContext; class PeriodicRunnerImpl : public PeriodicRunner { public: PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource); - ~PeriodicRunnerImpl(); - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override; - void scheduleJob(PeriodicJob job) override; - - void startup() override; - - void shutdown() override; + JobAnchor makeJob(PeriodicJob job) override; private: - class PeriodicJobImpl { + class PeriodicJobImpl : public ControllableJob { PeriodicJobImpl(const PeriodicJobImpl&) = delete; PeriodicJobImpl& operator=(const PeriodicJobImpl&) = delete; @@ -68,12 +63,12 @@ private: friend class PeriodicRunnerImpl; PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc); - void start(); - void pause(); - void resume(); - void stop(); - Milliseconds getPeriod(); - void setPeriod(Milliseconds ms); + void start() override; + void pause() override; + void resume() override; + void stop() override; + Milliseconds getPeriod() override; + void setPeriod(Milliseconds ms) override; enum class ExecutionStatus { NOT_SCHEDULED, RUNNING, PAUSED, CANCELED }; @@ -83,7 +78,9 @@ private: PeriodicJob _job; ClockSource* _clockSource; ServiceContext* _serviceContext; + stdx::thread _thread; + SharedPromise<void> _stopPromise; stdx::mutex _mutex; stdx::condition_variable _condvar; @@ -95,28 +92,8 @@ private: std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> createAndAddJob(PeriodicJob job); - class PeriodicJobHandleImpl : public PeriodicJobHandle { - public: - explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl) - : _jobWeak(jobImpl) {} - void start() override; - void stop() override; - void pause() override; - void resume() override; - Milliseconds getPeriod() override; - void setPeriod(Milliseconds ms) override; - - private: - std::weak_ptr<PeriodicJobImpl> _jobWeak; - }; - ServiceContext* _svc; ClockSource* _clockSource; - - std::vector<std::shared_ptr<PeriodicJobImpl>> _jobs; - - stdx::mutex _mutex; - bool _running = false; }; } // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp index fde21f4dd86..86f2f1a96d6 100644 --- a/src/mongo/util/periodic_runner_impl_test.cpp +++ b/src/mongo/util/periodic_runner_impl_test.cpp @@ -51,10 +51,6 @@ public: _runner = std::make_unique<PeriodicRunnerImpl>(getServiceContext(), _clockSource.get()); } - void tearDown() override { - _runner->shutdown(); - } - ClockSourceMock& clockSource() { return *_clockSource; } @@ -72,7 +68,6 @@ class PeriodicRunnerImplTest : public PeriodicRunnerImplTestNoSetup { public: void setUp() override { PeriodicRunnerImplTestNoSetup::setUp(); - runner().startup(); } }; @@ -94,7 +89,8 @@ TEST_F(PeriodicRunnerImplTest, OneJobTest) { }, interval); - runner().scheduleJob(std::move(job)); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Fast forward ten times, we should run all ten times. for (int i = 0; i < 10; i++) { @@ -126,7 +122,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobDoesNotRunWithoutStart) { }, interval); - auto handle = runner().makeJob(std::move(job)); + auto jobAnchor = runner().makeJob(std::move(job)); clockSource().advance(interval); ASSERT_EQ(count, 0); @@ -151,8 +147,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobRunsCorrectlyWithStart) { }, interval); - auto handle = runner().makeJob(std::move(job)); - handle->start(); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Fast forward ten times, we should run all ten times. for (int i = 0; i < 10; i++) { { @@ -186,8 +182,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) { }, interval); - auto handle = runner().makeJob(std::move(job)); - handle->start(); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Wait for the first execution. { stdx::unique_lock<stdx::mutex> lk(mutex); @@ -197,7 +193,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) { { stdx::unique_lock<stdx::mutex> lk(mutex); isPaused = true; - handle->pause(); + jobAnchor.pause(); } // Fast forward ten times, we shouldn't run anymore. If we do, the assert inside the job will @@ -228,8 +224,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { }, interval); - auto handle = runner().makeJob(std::move(job)); - handle->start(); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Wait for the first execution. { stdx::unique_lock<stdx::mutex> lk(mutex); @@ -252,7 +248,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { } } - handle->pause(); + jobAnchor.pause(); // Fast forward ten times, we shouldn't run anymore. for (int i = 0; i < 10; i++) { @@ -262,7 +258,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { // Make sure we didn't run anymore while paused. ASSERT_EQ(count, numIterationsBeforePause); - handle->resume(); + jobAnchor.resume(); // Fast forward, we should run at least once. clockSource().advance(interval); @@ -275,39 +271,6 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { tearDown(); } -TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) { - int count = 0; - Milliseconds interval{5}; - - stdx::mutex mutex; - stdx::condition_variable cv; - - // Schedule a job before startup - PeriodicRunner::PeriodicJob job("job", - [&count, &mutex, &cv](Client*) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - count++; - } - cv.notify_all(); - }, - interval); - - runner().scheduleJob(std::move(job)); - - // Start the runner, job should still run - runner().startup(); - - clockSource().advance(interval); - - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count > 0; }); - } - - tearDown(); -} - TEST_F(PeriodicRunnerImplTest, TwoJobsTest) { int countA = 0; int countB = 0; @@ -338,8 +301,11 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsTest) { }, intervalB); - runner().scheduleJob(std::move(jobA)); - runner().scheduleJob(std::move(jobB)); + auto jobAnchorA = runner().makeJob(std::move(jobA)); + auto jobAnchorB = runner().makeJob(std::move(jobB)); + + jobAnchorA.start(); + jobAnchorB.start(); // Fast forward and wait for both jobs to run the right number of times for (int i = 0; i <= 10; i++) { @@ -382,8 +348,11 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) { }, Milliseconds(1)); - runner().scheduleJob(std::move(jobA)); - runner().scheduleJob(std::move(jobB)); + auto jobAnchorA = runner().makeJob(std::move(jobA)); + auto jobAnchorB = runner().makeJob(std::move(jobB)); + + jobAnchorA.start(); + jobAnchorB.start(); clockSource().advance(Milliseconds(1)); @@ -415,16 +384,16 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) { }, Milliseconds(5)); - auto handle = runner().makeJob(std::move(job)); - handle->start(); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Wait for the first execution. { stdx::unique_lock<stdx::mutex> lk(mutex); cv.wait(lk, [&] { return timesCalled; }); } - handle->setPeriod(Milliseconds(10)); - ASSERT_EQ(handle->getPeriod(), Milliseconds(10)); + jobAnchor.setPeriod(Milliseconds(10)); + ASSERT_EQ(jobAnchor.getPeriod(), Milliseconds(10)); // if we change the period to a longer duration, that doesn't trigger a run { @@ -456,8 +425,8 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) { ASSERT_EQ(timesCalled, 2ul); } - handle->setPeriod(Milliseconds(4)); - ASSERT_EQ(handle->getPeriod(), Milliseconds(4)); + jobAnchor.setPeriod(Milliseconds(4)); + ASSERT_EQ(jobAnchor.getPeriod(), Milliseconds(4)); // shortening triggers the period { |