diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-07-24 14:30:04 -0400 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-07-29 16:37:15 -0400 |
commit | cc04956b4da01e57abd70b4adc3f4f673451ee56 (patch) | |
tree | 6085995591c05ab42a4d739c5da09bc1f863e4e9 | |
parent | 295e292133b40837a57abe72d78cb2127ecef18a (diff) | |
download | mongo-cc04956b4da01e57abd70b4adc3f4f673451ee56.tar.gz |
SERVER-35114 Make it possible to adjust the period of active jobs in the PeriodicRunner
(cherry picked from commit bf4a3cff4dc5572f2e97cb5279fe63c8227187e0)
SERVER-39936 Use PeriodicRunner handles to simplify shutdown ordering
(cherry picked from commit 1eff33bd1a8d48eb607675f87faf1836ba325006)
This is a partial backport. Parts of the original commits listed do not
exist in v4.0
22 files changed, 458 insertions, 382 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 2b6f05de989..22b76688f2b 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -336,6 +336,11 @@ ExitCode _initAndListen(int listenPort) { } serviceContext->setTransportLayer(std::move(tl)); } + + // Set up the periodic runner for background job execution. This is required to be running + // before the storage engine is initialized. + auto runner = makePeriodicRunner(serviceContext); + serviceContext->setPeriodicRunner(std::move(runner)); initializeStorageEngine(serviceContext, StorageEngineInitFlags::kNone); #ifdef MONGO_CONFIG_WIREDTIGER_ENABLED @@ -614,11 +619,6 @@ ExitCode _initAndListen(int listenPort) { PeriodicTask::startRunningPeriodicTasks(); - // Set up the periodic runner for background job execution - auto runner = makePeriodicRunner(serviceContext); - runner->startup(); - serviceContext->setPeriodicRunner(std::move(runner)); - SessionKiller::set(serviceContext, std::make_shared<SessionKiller>(serviceContext, killSessionsLocal)); @@ -626,7 +626,7 @@ ExitCode _initAndListen(int listenPort) { // Only do this on storage engines supporting snapshot reads, which hold resources we wish to // release periodically in order to avoid storage cache pressure build up. if (storageEngine->supportsReadConcernSnapshot()) { - startPeriodicThreadToAbortExpiredTransactions(serviceContext); + PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->start(); } // Set up the logical session cache @@ -923,12 +923,11 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { // Shut down the global dbclient pool so callers stop waiting for connections. globalConnPool.shutdown(); - // Shut down the background periodic task runner - if (auto runner = serviceContext->getPeriodicRunner()) { - runner->shutdown(); - } + if (auto storageEngine = serviceContext->getStorageEngine()) { + if (storageEngine->supportsReadConcernSnapshot()) { + PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->stop(); + } - if (serviceContext->getStorageEngine()) { ServiceContext::UniqueOperationContext uniqueOpCtx; OperationContext* opCtx = client->getOperationContext(); if (!opCtx) { diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index ce5a013f20f..9d9b1498776 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -43,7 +43,6 @@ #include "mongo/platform/atomic_word.h" #include "mongo/util/duration.h" #include "mongo/util/log.h" -#include "mongo/util/periodic_runner.h" #include "mongo/util/scopeguard.h" namespace mongo { diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp index 16845bb2fd4..e14f93d4d24 100644 --- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp +++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp @@ -43,11 +43,34 @@ namespace mongo { -void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext) { - // Enforce calling this function once, and only once. - static bool firstCall = true; - invariant(firstCall); - firstCall = false; +namespace { +const auto gServiceDecoration = + ServiceContext::declareDecoration<PeriodicThreadToAbortExpiredTransactions>(); +} // anonymous namespace + +auto PeriodicThreadToAbortExpiredTransactions::get(ServiceContext* serviceContext) + -> PeriodicThreadToAbortExpiredTransactions& { + auto& jobContainer = gServiceDecoration(serviceContext); + jobContainer._init(serviceContext); + + return jobContainer; +} + +auto PeriodicThreadToAbortExpiredTransactions::operator*() const noexcept -> PeriodicJobAnchor& { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return *_anchor; +} + +auto PeriodicThreadToAbortExpiredTransactions::operator-> () const noexcept -> PeriodicJobAnchor* { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _anchor.get(); +} + +void PeriodicThreadToAbortExpiredTransactions::_init(ServiceContext* serviceContext) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_anchor) { + return; + } auto periodicRunner = serviceContext->getPeriodicRunner(); invariant(periodicRunner); @@ -94,7 +117,7 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex }, Seconds(1)); - periodicRunner->scheduleJob(std::move(job)); + _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job))); } } // namespace mongo diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h index 44e9640c9a9..777848dc6bc 100644 --- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.h +++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.h @@ -30,18 +30,31 @@ #pragma once -namespace mongo { +#include <memory> + +#include "mongo/db/service_context.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/periodic_runner.h" -class ServiceContext; +namespace mongo { /** - * Defines and starts a periodic background job to check for and abort expired transactions. + * Defines a periodic background job to check for and abort expired transactions. * The job will run every (transactionLifetimeLimitSeconds/2) seconds, or at most once per second * and at least once per minute. - * - * This function should only ever be called once, during mongod server startup (db.cpp). - * The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary. */ -void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContext); +class PeriodicThreadToAbortExpiredTransactions { +public: + static PeriodicThreadToAbortExpiredTransactions& get(ServiceContext* serviceContext); + + PeriodicJobAnchor& operator*() const noexcept; + PeriodicJobAnchor* operator->() const noexcept; + +private: + void _init(ServiceContext* serviceContext); + + mutable stdx::mutex _mutex; + std::shared_ptr<PeriodicJobAnchor> _anchor; +}; } // namespace mongo diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 55b0545171e..013cc87a2fa 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -533,11 +533,6 @@ private: stdx::mutex _mutex; /** - * The storage engine, if any. - */ - std::unique_ptr<StorageEngine> _storageEngine; - - /** * The periodic runner. */ std::unique_ptr<PeriodicRunner> _runner; @@ -558,6 +553,11 @@ private: std::unique_ptr<transport::ServiceExecutor> _serviceExecutor; /** + * The storage engine, if any. + */ + std::unique_ptr<StorageEngine> _storageEngine; + + /** * Vector of registered observers. */ std::vector<ClientObserverHolder> _clientObservers; diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp index 91692972787..5df95627c9c 100644 --- a/src/mongo/db/service_context_d_test_fixture.cpp +++ b/src/mongo/db/service_context_d_test_fixture.cpp @@ -44,6 +44,7 @@ #include "mongo/db/storage/storage_engine_init.h" #include "mongo/db/storage/storage_options.h" #include "mongo/util/assert_util.h" +#include "mongo/util/periodic_runner_factory.h" #include "mongo/db/catalog/database_holder.h" @@ -69,6 +70,10 @@ ServiceContextMongoDTest::ServiceContextMongoDTest(std::string engine, RepairAct auto logicalClock = std::make_unique<LogicalClock>(serviceContext); LogicalClock::set(serviceContext, std::move(logicalClock)); + // Set up the periodic runner to allow background job execution for tests that require it. + auto runner = makePeriodicRunner(getServiceContext()); + getServiceContext()->setPeriodicRunner(std::move(runner)); + storageGlobalParams.dbpath = _tempDir.path(); initializeStorageEngine(serviceContext, StorageEngineInitFlags::kNone); diff --git a/src/mongo/db/service_liaison_mock.cpp b/src/mongo/db/service_liaison_mock.cpp index 8193943090e..ba56d789341 100644 --- a/src/mongo/db/service_liaison_mock.cpp +++ b/src/mongo/db/service_liaison_mock.cpp @@ -41,7 +41,6 @@ namespace mongo { MockServiceLiaisonImpl::MockServiceLiaisonImpl() { _timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>(); _runner = makePeriodicRunner(getGlobalServiceContext()); - _runner->startup(); } LogicalSessionIdSet MockServiceLiaisonImpl::getActiveOpSessions() const { @@ -54,9 +53,7 @@ LogicalSessionIdSet MockServiceLiaisonImpl::getOpenCursorSessions() const { return _cursorSessions; } -void MockServiceLiaisonImpl::join() { - _runner->shutdown(); -} +void MockServiceLiaisonImpl::join() {} Date_t MockServiceLiaisonImpl::now() const { return _timerFactory->now(); diff --git a/src/mongo/db/service_liaison_mongod.cpp b/src/mongo/db/service_liaison_mongod.cpp index af214b60227..efbf1c222bc 100644 --- a/src/mongo/db/service_liaison_mongod.cpp +++ b/src/mongo/db/service_liaison_mongod.cpp @@ -28,6 +28,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + #include "mongo/platform/basic.h" #include "mongo/db/service_liaison_mongod.h" @@ -38,6 +40,7 @@ #include "mongo/db/service_context.h" #include "mongo/stdx/mutex.h" #include "mongo/util/clock_source.h" +#include "mongo/util/log.h" namespace mongo { @@ -87,17 +90,20 @@ LogicalSessionIdSet ServiceLiaisonMongod::getOpenCursorSessions() const { void ServiceLiaisonMongod::scheduleJob(PeriodicRunner::PeriodicJob job) { invariant(hasGlobalServiceContext()); - auto jobHandle = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); - jobHandle->start(); - _jobs.push_back(std::move(jobHandle)); + auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); + jobAnchor.start(); + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _jobs.push_back(std::move(jobAnchor)); + } } void ServiceLiaisonMongod::join() { - invariant(hasGlobalServiceContext()); - for (auto&& jobHandle : _jobs) { - jobHandle->stop(); - } - _jobs.clear(); + auto jobs = [&] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return std::exchange(_jobs, {}); + }(); } Date_t ServiceLiaisonMongod::now() const { diff --git a/src/mongo/db/service_liaison_mongod.h b/src/mongo/db/service_liaison_mongod.h index 5cd081f8823..541b67447ab 100644 --- a/src/mongo/db/service_liaison_mongod.h +++ b/src/mongo/db/service_liaison_mongod.h @@ -69,7 +69,9 @@ protected: * Returns the service context. */ ServiceContext* _context() override; - std::vector<std::unique_ptr<PeriodicRunner::PeriodicJobHandle>> _jobs; + + stdx::mutex _mutex; + std::vector<PeriodicJobAnchor> _jobs; }; } // namespace mongo diff --git a/src/mongo/db/service_liaison_mongos.cpp b/src/mongo/db/service_liaison_mongos.cpp index c52ec45fb6b..7f7c4a9c423 100644 --- a/src/mongo/db/service_liaison_mongos.cpp +++ b/src/mongo/db/service_liaison_mongos.cpp @@ -28,6 +28,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + #include "mongo/platform/basic.h" #include "mongo/db/service_liaison_mongos.h" @@ -37,7 +39,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/stdx/mutex.h" #include "mongo/util/clock_source.h" -#include "mongo/util/periodic_runner.h" +#include "mongo/util/log.h" namespace mongo { @@ -80,12 +82,20 @@ LogicalSessionIdSet ServiceLiaisonMongos::getOpenCursorSessions() const { void ServiceLiaisonMongos::scheduleJob(PeriodicRunner::PeriodicJob job) { invariant(hasGlobalServiceContext()); - getGlobalServiceContext()->getPeriodicRunner()->scheduleJob(std::move(job)); + auto jobAnchor = getGlobalServiceContext()->getPeriodicRunner()->makeJob(std::move(job)); + jobAnchor.start(); + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _jobs.push_back(std::move(jobAnchor)); + } } void ServiceLiaisonMongos::join() { - invariant(hasGlobalServiceContext()); - getGlobalServiceContext()->getPeriodicRunner()->shutdown(); + auto jobs = [&] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return std::exchange(_jobs, {}); + }(); } Date_t ServiceLiaisonMongos::now() const { diff --git a/src/mongo/db/service_liaison_mongos.h b/src/mongo/db/service_liaison_mongos.h index ecf08d5d822..57d06727bf0 100644 --- a/src/mongo/db/service_liaison_mongos.h +++ b/src/mongo/db/service_liaison_mongos.h @@ -69,6 +69,9 @@ protected: * Returns the service context. */ ServiceContext* _context() override; + + stdx::mutex _mutex; + std::vector<PeriodicJobAnchor> _jobs; }; } // namespace mongo diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp index b53b78b380c..95624c6b6aa 100644 --- a/src/mongo/dbtests/framework.cpp +++ b/src/mongo/dbtests/framework.cpp @@ -54,6 +54,7 @@ #include "mongo/util/assert_util.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" +#include "mongo/util/periodic_runner_factory.h" #include "mongo/util/scopeguard.h" #include "mongo/util/version.h" @@ -91,6 +92,11 @@ int runDbTests(int argc, char** argv) { srand((unsigned)frameworkGlobalParams.seed); + // Set up the periodic runner for background job execution, which is required by the storage + // engine to be running beforehand. + auto runner = makePeriodicRunner(globalServiceContext); + globalServiceContext->setPeriodicRunner(std::move(runner)); + initializeStorageEngine(globalServiceContext, StorageEngineInitFlags::kNone); auto registry = stdx::make_unique<OpObserverRegistry>(); registry->addObserver(stdx::make_unique<UUIDCatalogObserver>()); diff --git a/src/mongo/embedded/embedded.cpp b/src/mongo/embedded/embedded.cpp index 8736e53ac79..7f7ee332f0f 100644 --- a/src/mongo/embedded/embedded.cpp +++ b/src/mongo/embedded/embedded.cpp @@ -156,11 +156,6 @@ void shutdown(ServiceContext* srvContext) { LogicalSessionCache::set(serviceContext, nullptr); - // Shut down the background periodic task runner - if (auto runner = serviceContext->getPeriodicRunner()) { - runner->shutdown(); - } - // Global storage engine may not be started in all cases before we exit if (serviceContext->getStorageEngine()) { shutdownGlobalStorageEngineCleanly(serviceContext); @@ -219,6 +214,11 @@ ServiceContext* initialize(const char* yaml_config) { DEV log(LogComponent::kControl) << "DEBUG build (which is slower)" << endl; + // The periodic runner is required by the storage engine to be running beforehand. + auto periodicRunner = std::make_unique<PeriodicRunnerEmbedded>( + serviceContext, serviceContext->getPreciseClockSource()); + serviceContext->setPeriodicRunner(std::move(periodicRunner)); + initializeStorageEngine(serviceContext, StorageEngineInitFlags::kAllowNoLockFile); // Warn if we detect configurations for multiple registered storage engines in the same @@ -312,11 +312,6 @@ ServiceContext* initialize(const char* yaml_config) { restartInProgressIndexesFromLastShutdown(startupOpCtx.get()); } - auto periodicRunner = std::make_unique<PeriodicRunnerEmbedded>( - serviceContext, serviceContext->getPreciseClockSource()); - periodicRunner->startup(); - serviceContext->setPeriodicRunner(std::move(periodicRunner)); - // Set up the logical session cache auto sessionCache = makeLogicalSessionCacheEmbedded(); LogicalSessionCache::set(serviceContext, std::move(sessionCache)); diff --git a/src/mongo/embedded/periodic_runner_embedded.cpp b/src/mongo/embedded/periodic_runner_embedded.cpp index b20ea9cec2f..d21bdbfcd97 100644 --- a/src/mongo/embedded/periodic_runner_embedded.cpp +++ b/src/mongo/embedded/periodic_runner_embedded.cpp @@ -38,21 +38,6 @@ #include "mongo/util/scopeguard.h" namespace mongo { -namespace { - -template <typename T> -std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) { - if (auto p = ptr.lock()) { - return p; - } else { - uasserted(ErrorCodes::InternalError, errMsg); - } -} - -constexpr auto kPeriodicJobHandleLifetimeErrMsg = - "The PeriodicRunner job for this handle no longer exists"_sd; - -} // namespace struct PeriodicRunnerEmbedded::PeriodicJobSorter { bool operator()(std::shared_ptr<PeriodicJobImpl> const& lhs, @@ -65,59 +50,13 @@ struct PeriodicRunnerEmbedded::PeriodicJobSorter { PeriodicRunnerEmbedded::PeriodicRunnerEmbedded(ServiceContext* svc, ClockSource* clockSource) : _svc(svc), _clockSource(clockSource) {} -PeriodicRunnerEmbedded::~PeriodicRunnerEmbedded() { - PeriodicRunnerEmbedded::shutdown(); -} - -std::shared_ptr<PeriodicRunnerEmbedded::PeriodicJobImpl> PeriodicRunnerEmbedded::createAndAddJob( - PeriodicJob job, bool shouldStart) { +auto PeriodicRunnerEmbedded::makeJob(PeriodicJob job) -> JobAnchor { auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this); stdx::lock_guard<stdx::mutex> lk(_mutex); _jobs.push_back(impl); std::push_heap(_jobs.begin(), _jobs.end(), PeriodicJobSorter()); - if (shouldStart && _running) - impl->start(); - return impl; -} - -std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerEmbedded::makeJob( - PeriodicJob job) { - return std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(std::move(job), false)); -} - -void PeriodicRunnerEmbedded::scheduleJob(PeriodicJob job) { - createAndAddJob(std::move(job), true); -} - -void PeriodicRunnerEmbedded::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_running) { - return; - } - - _running = true; - - // schedule any jobs that we have - for (auto& job : _jobs) { - job->start(); - } -} - -void PeriodicRunnerEmbedded::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_running) { - _running = false; - - for (auto& job : _jobs) { - stdx::lock_guard<stdx::mutex> jobLock(job->_mutex); - if (job->isAlive(jobLock)) { - job->_stopWithMasterAndJobLock(lk, jobLock); - } - } - _jobs.clear(); - } + return JobAnchor(impl); } bool PeriodicRunnerEmbedded::tryPump() { @@ -226,37 +165,36 @@ void PeriodicRunnerEmbedded::PeriodicJobImpl::stop() { // sure the user can invalidate it after this call. stdx::lock_guard<stdx::mutex> masterLock(_periodicRunner->_mutex); stdx::lock_guard<stdx::mutex> lk(_mutex); - _stopWithMasterAndJobLock(masterLock, lk); + if (isAlive(lk)) { + _stopWithMasterAndJobLock(masterLock, lk); + } } -void PeriodicRunnerEmbedded::PeriodicJobImpl::_stopWithMasterAndJobLock(WithLock masterLock, - WithLock jobLock) { - invariant(isAlive(jobLock)); - _execStatus = PeriodicJobImpl::ExecutionStatus::kCanceled; +Milliseconds PeriodicRunnerEmbedded::PeriodicJobImpl::getPeriod() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _job.interval; } -bool PeriodicRunnerEmbedded::PeriodicJobImpl::isAlive(WithLock lk) { - return _execStatus == ExecutionStatus::kRunning || _execStatus == ExecutionStatus::kPaused; -} +void PeriodicRunnerEmbedded::PeriodicJobImpl::setPeriod(Milliseconds ms) { + stdx::lock_guard<stdx::mutex> masterLk(_periodicRunner->_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::start() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->start(); -} + _job.interval = ms; -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::stop() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->stop(); + if (_execStatus == PeriodicJobImpl::ExecutionStatus::kRunning) { + std::make_heap( + _periodicRunner->_jobs.begin(), _periodicRunner->_jobs.end(), PeriodicJobSorter()); + } } -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::pause() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->pause(); +void PeriodicRunnerEmbedded::PeriodicJobImpl::_stopWithMasterAndJobLock(WithLock masterLock, + WithLock jobLock) { + invariant(isAlive(jobLock)); + _execStatus = PeriodicJobImpl::ExecutionStatus::kCanceled; } -void PeriodicRunnerEmbedded::PeriodicJobHandleImpl::resume() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->resume(); +bool PeriodicRunnerEmbedded::PeriodicJobImpl::isAlive(WithLock lk) { + return _execStatus == ExecutionStatus::kRunning || _execStatus == ExecutionStatus::kPaused; } } // namespace mongo diff --git a/src/mongo/embedded/periodic_runner_embedded.h b/src/mongo/embedded/periodic_runner_embedded.h index 20dc7a410ba..98632bbce19 100644 --- a/src/mongo/embedded/periodic_runner_embedded.h +++ b/src/mongo/embedded/periodic_runner_embedded.h @@ -48,31 +48,28 @@ namespace mongo { class PeriodicRunnerEmbedded : public PeriodicRunner { public: PeriodicRunnerEmbedded(ServiceContext* svc, ClockSource* clockSource); - ~PeriodicRunnerEmbedded(); - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override; - void scheduleJob(PeriodicJob job) override; - - void startup() override; - - void shutdown() override; + JobAnchor makeJob(PeriodicJob job) override; // Safe to call from multiple threads but will only execute on one thread at a time. // Returns true if it attempted to run any jobs. bool tryPump(); private: - class PeriodicJobImpl { - MONGO_DISALLOW_COPYING(PeriodicJobImpl); + class PeriodicJobImpl : public ControllableJob { + PeriodicJobImpl(const PeriodicJobImpl&) = delete; + PeriodicJobImpl& operator=(const PeriodicJobImpl&) = delete; public: friend class PeriodicRunnerEmbedded; PeriodicJobImpl(PeriodicJob job, ClockSource* source, PeriodicRunnerEmbedded* runner); - void start(); - void pause(); - void resume(); - void stop(); + void start() override; + void pause() override; + void resume() override; + void stop() override; + Milliseconds getPeriod() override; + void setPeriod(Milliseconds ms) override; bool isAlive(WithLock lk); @@ -99,22 +96,6 @@ private: }; struct PeriodicJobSorter; - std::shared_ptr<PeriodicRunnerEmbedded::PeriodicJobImpl> createAndAddJob(PeriodicJob job, - bool shouldStart); - - class PeriodicJobHandleImpl : public PeriodicJobHandle { - public: - explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl) - : _jobWeak(jobImpl) {} - void start() override; - void stop() override; - void pause() override; - void resume() override; - - private: - std::weak_ptr<PeriodicJobImpl> _jobWeak; - }; - ServiceContext* _svc; ClockSource* _clockSource; @@ -123,7 +104,6 @@ private: std::vector<std::shared_ptr<PeriodicJobImpl>> _Pausedjobs; stdx::mutex _mutex; - bool _running = false; }; } // namespace mongo diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index ad5817c67b9..ce517052716 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -191,12 +191,6 @@ void cleanupTask(ServiceContext* serviceContext) { if (serviceContext) { serviceContext->setKillAllOperations(); - - // Shut down the background periodic task runner. - auto runner = serviceContext->getPeriodicRunner(); - if (runner) { - runner->shutdown(); - } } // Perform all shutdown operations after setKillAllOperations is called in order to ensure @@ -432,9 +426,10 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { PeriodicTask::startRunningPeriodicTasks(); // Set up the periodic runner for background job execution - auto runner = makePeriodicRunner(serviceContext); - runner->startup(); - serviceContext->setPeriodicRunner(std::move(runner)); + { + auto runner = makePeriodicRunner(serviceContext); + serviceContext->setPeriodicRunner(std::move(runner)); + } SessionKiller::set(serviceContext, std::make_shared<SessionKiller>(serviceContext, killSessionsRemote)); diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index cd9ac27b4fc..547f379eacc 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -276,6 +276,7 @@ env.Library( "periodic_runner.cpp", ], LIBDEPS=[ + "$BUILD_DIR/mongo/base", ], ) diff --git a/src/mongo/util/periodic_runner.cpp b/src/mongo/util/periodic_runner.cpp index 5e8cbc9b344..fecf2b9e9a5 100644 --- a/src/mongo/util/periodic_runner.cpp +++ b/src/mongo/util/periodic_runner.cpp @@ -32,8 +32,58 @@ #include "mongo/util/periodic_runner.h" +#include "mongo/util/assert_util.h" + namespace mongo { PeriodicRunner::~PeriodicRunner() = default; +PeriodicJobAnchor::PeriodicJobAnchor(std::shared_ptr<Job> handle) : _handle{std::move(handle)} {} + +PeriodicJobAnchor::~PeriodicJobAnchor() { + if (!_handle) { + return; + } + + _handle->stop(); +} + +void PeriodicJobAnchor::start() { + invariant(_handle); + _handle->start(); +} + +void PeriodicJobAnchor::pause() { + invariant(_handle); + _handle->pause(); +} + +void PeriodicJobAnchor::resume() { + invariant(_handle); + _handle->resume(); +} + +void PeriodicJobAnchor::stop() { + invariant(_handle); + _handle->stop(); +} + +void PeriodicJobAnchor::setPeriod(Milliseconds ms) { + invariant(_handle); + _handle->setPeriod(ms); +} + +Milliseconds PeriodicJobAnchor::getPeriod() { + invariant(_handle); + return _handle->getPeriod(); +} + +void PeriodicJobAnchor::detach() { + _handle.reset(); +} + +bool PeriodicJobAnchor::isValid() const noexcept { + return static_cast<bool>(_handle); +} + } // namespace mongo diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h index 48354326e29..5fd16115701 100644 --- a/src/mongo/util/periodic_runner.h +++ b/src/mongo/util/periodic_runner.h @@ -30,15 +30,19 @@ #pragma once +#include <functional> +#include <memory> #include <string> -#include "mongo/base/disallow_copying.h" -#include "mongo/stdx/functional.h" +#include <boost/optional.hpp> + +#include "mongo/stdx/mutex.h" #include "mongo/util/time_support.h" namespace mongo { class Client; +class PeriodicJobAnchor; /** * An interface for objects that run work items at specified intervals. @@ -52,7 +56,8 @@ class Client; */ class PeriodicRunner { public: - using Job = stdx::function<void(Client* client)>; + using Job = std::function<void(Client* client)>; + using JobAnchor = PeriodicJobAnchor; struct PeriodicJob { PeriodicJob(std::string name, Job callable, Milliseconds period) @@ -74,65 +79,115 @@ public: Milliseconds interval; }; - class PeriodicJobHandle { + /** + * A ControllableJob allows a user to reschedule the execution of a Job + */ + class ControllableJob { public: - virtual ~PeriodicJobHandle() = default; + virtual ~ControllableJob() = default; /** * Starts running the job */ virtual void start() = 0; + /** * Pauses the job temporarily so that it does not execute until * unpaused */ virtual void pause() = 0; + /** * Resumes a paused job so that it continues executing each interval */ virtual void resume() = 0; + /** * Stops the job, this function blocks until the job is stopped * Safe to invalidate the job callable after calling this. */ virtual void stop() = 0; + + /** + * Returns the current period for the job + */ + virtual Milliseconds getPeriod() = 0; + + /** + * Updates the period of the job. This takes effect immediately by altering the current + * scheduling of the task. I.e. if more than ms have passed since the last execution of the + * job, it is run immediately. Otherwise the scheduling is adjusted forward or back by + * abs(new - old). + */ + virtual void setPeriod(Milliseconds ms) = 0; }; virtual ~PeriodicRunner(); - /** * Creates a new job and adds it to the runner, but does not schedule it. * The caller is responsible for calling 'start' on the resulting handle in * order to begin the job running. This API should be used when the caller * is interested in observing and controlling the job execution state. */ - virtual std::unique_ptr<PeriodicJobHandle> makeJob(PeriodicJob job) = 0; + virtual JobAnchor makeJob(PeriodicJob job) = 0; +}; - /** - * Schedules a job to be run at periodic intervals. - * - * If the runner is not running when a job is scheduled, that job should - * be saved so that it may run in the future once startup() is called. - */ - virtual void scheduleJob(PeriodicJob job) = 0; +/** + * A PeriodicJobAnchor allows the holder to control the scheduling of a job for the lifetime of the + * anchor. When an anchor is destructed, it stops its underlying job. + * + * The underlying weak_ptr for this class is not synchronized. In essence, treat use of this class + * as if it were a raw pointer to a ControllableJob. + * + * Each wrapped PeriodicRunner::ControllableJob function on this object throws + * if the underlying job is gone (e.g. in shutdown). + */ +class PeriodicJobAnchor { +public: + using Job = PeriodicRunner::ControllableJob; + +public: + // Note that this constructor is only intended for use with PeriodicRunner::makeJob() + explicit PeriodicJobAnchor(std::shared_ptr<Job> handle); + + PeriodicJobAnchor() = default; + PeriodicJobAnchor(PeriodicJobAnchor&&) = default; + PeriodicJobAnchor& operator=(PeriodicJobAnchor&&) = default; + + PeriodicJobAnchor(const PeriodicJobAnchor&) = delete; + PeriodicJobAnchor& operator=(const PeriodicJobAnchor&) = delete; + + ~PeriodicJobAnchor(); + + void start(); + void pause(); + void resume(); + void stop(); + void setPeriod(Milliseconds ms); + Milliseconds getPeriod(); /** - * Starts up this periodic runner. + * Abandon responsibility for scheduling the execution of this job * - * This method may safely be called multiple times, either with or without - * calls to shutdown() in between. + * This effectively invalidates the anchor. */ - virtual void startup() = 0; + void detach(); /** - * Shuts down this periodic runner. Stops all jobs from running. + * Returns if this PeriodicJobAnchor is associated with a PeriodicRunner::ControllableJob * - * This method may safely be called multiple times, either with or without - * calls to startup() in between. Any jobs that have been scheduled on this - * runner should no longer execute once shutdown() is called. + * This function is useful to see if a PeriodicJobAnchor is initialized. It does not necessarily + * inform whether a PeriodicJobAnchor will throw from a control function above. */ - virtual void shutdown() = 0; + bool isValid() const noexcept; + + explicit operator bool() const noexcept { + return isValid(); + } + +private: + std::shared_ptr<Job> _handle; }; } // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp index f5fb19a8a2b..874fda8900c 100644 --- a/src/mongo/util/periodic_runner_impl.cpp +++ b/src/mongo/util/periodic_runner_impl.cpp @@ -28,13 +28,17 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + #include "mongo/platform/basic.h" #include "mongo/util/periodic_runner_impl.h" #include "mongo/db/client.h" #include "mongo/db/service_context.h" +#include "mongo/stdx/functional.h" #include "mongo/util/clock_source.h" +#include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -42,95 +46,81 @@ namespace mongo { PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource) : _svc(svc), _clockSource(clockSource) {} -PeriodicRunnerImpl::~PeriodicRunnerImpl() { - PeriodicRunnerImpl::shutdown(); -} - -std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> PeriodicRunnerImpl::createAndAddJob( - PeriodicJob job) { +auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor { auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this->_svc); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _jobs.push_back(impl); - return impl; + JobAnchor anchor(std::move(impl)); + return anchor; } -std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerImpl::makeJob(PeriodicJob job) { - auto handle = std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(job)); - return std::move(handle); -} - -void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) { - auto impl = createAndAddJob(job); - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_running) { - impl->start(); - } +PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, + ClockSource* source, + ServiceContext* svc) + : _job(std::move(job)), _clockSource(source), _serviceContext(svc) { + auto stopPF = makePromiseFuture<void>(); + _stopPromise = stopPF.promise.share(); + _stopFuture = std::move(stopPF.future); } -void PeriodicRunnerImpl::startup() { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void PeriodicRunnerImpl::PeriodicJobImpl::_run() { + auto startPF = makePromiseFuture<void>(); - if (_running) { - return; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_execStatus == ExecutionStatus::NOT_SCHEDULED); } - _running = true; - - // schedule any jobs that we have - for (auto& job : _jobs) { - job->start(); - } -} + _thread = stdx::thread([ this, startPromise = std::move(startPF.promise) ]() mutable { + ON_BLOCK_EXIT([this] { _stopPromise.emplaceValue(); }); -void PeriodicRunnerImpl::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_running) { - _running = false; + Client::initThread(_job.name, _serviceContext, nullptr); - for (auto& job : _jobs) { - job->stop(); + // Let start() know we're running + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; } - _jobs.clear(); - } -} - -PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, - ClockSource* source, - ServiceContext* svc) - : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {} - -void PeriodicRunnerImpl::PeriodicJobImpl::_run() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::NOT_SCHEDULED); - _thread = stdx::thread([this] { - Client::initThread(_job.name, _serviceContext, nullptr); - while (true) { - auto start = _clockSource->now(); + startPromise.emplaceValue(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (_execStatus != ExecutionStatus::CANCELED) { // Wait until it's unpaused or canceled _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; }); if (_execStatus == ExecutionStatus::CANCELED) { - break; + return; } + auto start = _clockSource->now(); + // Unlock while job is running so we can pause/cancel concurrently lk.unlock(); _job.job(Client::getCurrent()); lk.lock(); - if (_clockSource->waitForConditionUntil(_condvar, lk, start + _job.interval, [&] { - return _execStatus == ExecutionStatus::CANCELED; - })) { - break; - } + auto getDeadlineFromInterval = [&] { return start + _job.interval; }; + + do { + auto deadline = getDeadlineFromInterval(); + + if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] { + return _execStatus == ExecutionStatus::CANCELED || + getDeadlineFromInterval() != deadline; + })) { + if (_execStatus == ExecutionStatus::CANCELED) { + return; + } + } + } while (_clockSource->now() < getDeadlineFromInterval()); } }); - _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; + + // Wait for the thread to actually start + startPF.future.get(); } void PeriodicRunnerImpl::PeriodicJobImpl::start() { + LOG(2) << "Starting periodic job " << _job.name; + _run(); } @@ -150,53 +140,40 @@ void PeriodicRunnerImpl::PeriodicJobImpl::resume() { } void PeriodicRunnerImpl::PeriodicJobImpl::stop() { - { + auto lastExecStatus = [&] { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_execStatus != ExecutionStatus::RUNNING && _execStatus != ExecutionStatus::PAUSED) - return; - invariant(_thread.joinable()); + return std::exchange(_execStatus, ExecutionStatus::CANCELED); + }(); - _execStatus = PeriodicJobImpl::ExecutionStatus::CANCELED; + // If we never started, then nobody should wait + if (lastExecStatus == ExecutionStatus::NOT_SCHEDULED) { + return; } - _condvar.notify_one(); - _thread.join(); -} -namespace { + // Only join once + if (lastExecStatus != ExecutionStatus::CANCELED) { + LOG(2) << "Stopping periodic job " << _job.name; -template <typename T> -std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) { - if (auto p = ptr.lock()) { - return p; - } else { - uasserted(ErrorCodes::InternalError, errMsg); + _condvar.notify_one(); + _thread.join(); } -} -constexpr auto kPeriodicJobHandleLifetimeErrMsg = - "The PeriodicRunner job for this handle no longer exists"_sd; - -} // namespace - -void PeriodicRunnerImpl::PeriodicJobHandleImpl::start() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->start(); + _stopFuture.get(); } -void PeriodicRunnerImpl::PeriodicJobHandleImpl::stop() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->stop(); +Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _job.interval; } -void PeriodicRunnerImpl::PeriodicJobHandleImpl::pause() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->pause(); -} +void PeriodicRunnerImpl::PeriodicJobImpl::setPeriod(Milliseconds ms) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _job.interval = ms; -void PeriodicRunnerImpl::PeriodicJobHandleImpl::resume() { - auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); - job->resume(); + if (_execStatus == PeriodicJobImpl::ExecutionStatus::RUNNING) { + _condvar.notify_one(); + } } } // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h index 8c59a4f3e84..6c9850c18f1 100644 --- a/src/mongo/util/periodic_runner_impl.h +++ b/src/mongo/util/periodic_runner_impl.h @@ -37,6 +37,7 @@ #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/clock_source.h" +#include "mongo/util/future.h" #include "mongo/util/periodic_runner.h" namespace mongo { @@ -51,27 +52,24 @@ class ServiceContext; class PeriodicRunnerImpl : public PeriodicRunner { public: PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource); - ~PeriodicRunnerImpl(); - std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override; - void scheduleJob(PeriodicJob job) override; - - void startup() override; - - void shutdown() override; + JobAnchor makeJob(PeriodicJob job) override; private: - class PeriodicJobImpl { - MONGO_DISALLOW_COPYING(PeriodicJobImpl); + class PeriodicJobImpl : public ControllableJob { + PeriodicJobImpl(const PeriodicJobImpl&) = delete; + PeriodicJobImpl& operator=(const PeriodicJobImpl&) = delete; public: friend class PeriodicRunnerImpl; PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc); - void start(); - void pause(); - void resume(); - void stop(); + void start() override; + void pause() override; + void resume() override; + void stop() override; + Milliseconds getPeriod() override; + void setPeriod(Milliseconds ms) override; enum class ExecutionStatus { NOT_SCHEDULED, RUNNING, PAUSED, CANCELED }; @@ -81,7 +79,10 @@ private: PeriodicJob _job; ClockSource* _clockSource; ServiceContext* _serviceContext; + stdx::thread _thread; + SharedPromise<void> _stopPromise; + Future<void> _stopFuture; stdx::mutex _mutex; stdx::condition_variable _condvar; @@ -93,26 +94,8 @@ private: std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> createAndAddJob(PeriodicJob job); - class PeriodicJobHandleImpl : public PeriodicJobHandle { - public: - explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl) - : _jobWeak(jobImpl) {} - void start() override; - void stop() override; - void pause() override; - void resume() override; - - private: - std::weak_ptr<PeriodicJobImpl> _jobWeak; - }; - ServiceContext* _svc; ClockSource* _clockSource; - - std::vector<std::shared_ptr<PeriodicJobImpl>> _jobs; - - stdx::mutex _mutex; - bool _running = false; }; } // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp index ef3a6ad9782..f455470afa3 100644 --- a/src/mongo/util/periodic_runner_impl_test.cpp +++ b/src/mongo/util/periodic_runner_impl_test.cpp @@ -52,10 +52,6 @@ public: _runner = stdx::make_unique<PeriodicRunnerImpl>(getServiceContext(), _clockSource.get()); } - void tearDown() override { - _runner->shutdown(); - } - ClockSourceMock& clockSource() { return *_clockSource; } @@ -73,7 +69,6 @@ class PeriodicRunnerImplTest : public PeriodicRunnerImplTestNoSetup { public: void setUp() override { PeriodicRunnerImplTestNoSetup::setUp(); - runner().startup(); } }; @@ -95,7 +90,8 @@ TEST_F(PeriodicRunnerImplTest, OneJobTest) { }, interval); - runner().scheduleJob(std::move(job)); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Fast forward ten times, we should run all ten times. for (int i = 0; i < 10; i++) { @@ -127,7 +123,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobDoesNotRunWithoutStart) { }, interval); - auto handle = runner().makeJob(std::move(job)); + auto jobAnchor = runner().makeJob(std::move(job)); clockSource().advance(interval); ASSERT_EQ(count, 0); @@ -152,8 +148,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobRunsCorrectlyWithStart) { }, interval); - auto handle = runner().makeJob(std::move(job)); - handle->start(); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Fast forward ten times, we should run all ten times. for (int i = 0; i < 10; i++) { { @@ -187,8 +183,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) { }, interval); - auto handle = runner().makeJob(std::move(job)); - handle->start(); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Wait for the first execution. { stdx::unique_lock<stdx::mutex> lk(mutex); @@ -198,7 +194,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) { { stdx::unique_lock<stdx::mutex> lk(mutex); isPaused = true; - handle->pause(); + jobAnchor.pause(); } // Fast forward ten times, we shouldn't run anymore. If we do, the assert inside the job will @@ -229,8 +225,8 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { }, interval); - auto handle = runner().makeJob(std::move(job)); - handle->start(); + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); // Wait for the first execution. { stdx::unique_lock<stdx::mutex> lk(mutex); @@ -253,7 +249,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { } } - handle->pause(); + jobAnchor.pause(); // Fast forward ten times, we shouldn't run anymore. for (int i = 0; i < 10; i++) { @@ -263,7 +259,7 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { // Make sure we didn't run anymore while paused. ASSERT_EQ(count, numIterationsBeforePause); - handle->resume(); + jobAnchor.resume(); // Fast forward, we should run at least once. clockSource().advance(interval); @@ -276,39 +272,6 @@ TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { tearDown(); } -TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) { - int count = 0; - Milliseconds interval{5}; - - stdx::mutex mutex; - stdx::condition_variable cv; - - // Schedule a job before startup - PeriodicRunner::PeriodicJob job("job", - [&count, &mutex, &cv](Client*) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - count++; - } - cv.notify_all(); - }, - interval); - - runner().scheduleJob(std::move(job)); - - // Start the runner, job should still run - runner().startup(); - - clockSource().advance(interval); - - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count > 0; }); - } - - tearDown(); -} - TEST_F(PeriodicRunnerImplTest, TwoJobsTest) { int countA = 0; int countB = 0; @@ -339,8 +302,11 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsTest) { }, intervalB); - runner().scheduleJob(std::move(jobA)); - runner().scheduleJob(std::move(jobB)); + auto jobAnchorA = runner().makeJob(std::move(jobA)); + auto jobAnchorB = runner().makeJob(std::move(jobB)); + + jobAnchorA.start(); + jobAnchorB.start(); // Fast forward and wait for both jobs to run the right number of times for (int i = 0; i <= 10; i++) { @@ -383,8 +349,11 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) { }, Milliseconds(1)); - runner().scheduleJob(std::move(jobA)); - runner().scheduleJob(std::move(jobB)); + auto jobAnchorA = runner().makeJob(std::move(jobA)); + auto jobAnchorB = runner().makeJob(std::move(jobB)); + + jobAnchorA.start(); + jobAnchorB.start(); clockSource().advance(Milliseconds(1)); @@ -399,5 +368,75 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) { tearDown(); } +TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) { + size_t timesCalled = 0; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Add a job, ensure that it runs once + PeriodicRunner::PeriodicJob job("job", + [&](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + timesCalled++; + } + cv.notify_one(); + }, + Milliseconds(5)); + + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); + // Wait for the first execution. + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&] { return timesCalled; }); + } + + jobAnchor.setPeriod(Milliseconds(10)); + ASSERT_EQ(jobAnchor.getPeriod(), Milliseconds(10)); + + // if we change the period to a longer duration, that doesn't trigger a run + { + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_EQ(timesCalled, 1ul); + } + + clockSource().advance(Milliseconds(5)); + + // We actually changed the period + { + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_EQ(timesCalled, 1ul); + } + + clockSource().advance(Milliseconds(5)); + + // Now we hit the new cutoff + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&] { return timesCalled == 2ul; }); + } + + clockSource().advance(Milliseconds(5)); + + // Haven't hit it + { + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_EQ(timesCalled, 2ul); + } + + jobAnchor.setPeriod(Milliseconds(4)); + ASSERT_EQ(jobAnchor.getPeriod(), Milliseconds(4)); + + // shortening triggers the period + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&] { return timesCalled == 3ul; }); + } + + tearDown(); +} + } // namespace } // namespace mongo |