diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2018-06-08 16:18:03 -0400 |
---|---|---|
committer | Matthew Saltz <matthew.saltz@mongodb.com> | 2018-07-03 14:21:00 -0400 |
commit | 738ee996e0a1f5b0c92f27f2746d94d14092c1bd (patch) | |
tree | 4d06b3a2a24eb906b75c625d44da1a14fe299e48 /src | |
parent | b2cd49a55e98894bdcef5b646d3b7f752addf944 (diff) | |
download | mongo-738ee996e0a1f5b0c92f27f2746d94d14092c1bd.tar.gz |
SERVER-35585 Make PeriodicRunner jobs be pausable/resumable
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/util/periodic_runner.h | 28 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_impl.cpp | 161 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_impl.h | 49 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_impl_test.cpp | 143 |
4 files changed, 334 insertions, 47 deletions
diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h index 710b812bb73..c6d7fb01ac4 100644 --- a/src/mongo/util/periodic_runner.h +++ b/src/mongo/util/periodic_runner.h @@ -72,8 +72,36 @@ public: Milliseconds interval; }; + class PeriodicJobHandle { + public: + virtual ~PeriodicJobHandle() = default; + + /** + * Starts running the job + */ + virtual void start() = 0; + /** + * Pauses the job temporarily so that it does not execute until + * unpaused + */ + virtual void pause() = 0; + /** + * Resumes a paused job so that it continues executing each interval + */ + virtual void resume() = 0; + }; + virtual ~PeriodicRunner(); + + /** + * Creates a new job and adds it to the runner, but does not schedule it. + * The caller is responsible for calling 'start' on the resulting handle in + * order to begin the job running. This API should be used when the caller + * is interested in observing and controlling the job execution state. + */ + virtual std::unique_ptr<PeriodicJobHandle> makeJob(PeriodicJob job) = 0; + /** * Schedules a job to be run at periodic intervals. * diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp index 03b0f67b40b..0be6fba57a7 100644 --- a/src/mongo/util/periodic_runner_impl.cpp +++ b/src/mongo/util/periodic_runner_impl.cpp @@ -41,18 +41,28 @@ PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSo : _svc(svc), _clockSource(clockSource) {} PeriodicRunnerImpl::~PeriodicRunnerImpl() { - shutdown(); + PeriodicRunnerImpl::shutdown(); } -void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) { - auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this); +std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> PeriodicRunnerImpl::createAndAddJob( + PeriodicJob job) { + auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this->_svc); - { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _jobs.push_back(impl); - if (_running) { - impl->run(); - } + stdx::lock_guard<stdx::mutex> lk(_mutex); + _jobs.push_back(impl); + return impl; +} + +std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerImpl::makeJob(PeriodicJob job) { + auto handle = std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(job)); + return std::move(handle); +} + +void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) { + auto impl = createAndAddJob(job); + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_running) { + impl->start(); } } @@ -66,56 +76,127 @@ void PeriodicRunnerImpl::startup() { _running = true; // schedule any jobs that we have - for (auto job : _jobs) { - job->run(); + for (auto& job : _jobs) { + job->start(); } } void PeriodicRunnerImpl::shutdown() { - std::vector<stdx::thread> threads; - const auto guard = MakeGuard([&] { - for (auto& thread : threads) { - thread.join(); + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_running) { + _running = false; + + for (auto& job : _jobs) { + if (job->isAlive()) { + job->stop(); + } + } + _jobs.clear(); + } +} + +PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, + ClockSource* source, + ServiceContext* svc) + : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {} + +void PeriodicRunnerImpl::PeriodicJobImpl::_run() { + _thread = stdx::thread([this] { + Client::initThread(_job.name, _serviceContext, nullptr); + while (true) { + auto start = _clockSource->now(); + + stdx::unique_lock<stdx::mutex> lk(_mutex); + // Wait until it's unpaused or canceled + _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; }); + if (_execStatus == ExecutionStatus::CANCELED) { + break; + } + + // Unlock while job is running so we can pause/cancel concurrently + lk.unlock(); + _job.job(Client::getCurrent()); + lk.lock(); + + if (_clockSource->waitForConditionUntil(_condvar, lk, start + _job.interval, [&] { + return _execStatus == ExecutionStatus::CANCELED; + })) { + break; + } } }); +} +void PeriodicRunnerImpl::PeriodicJobImpl::start() { { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_running) { - _running = false; + invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::NOT_SCHEDULED); + _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; + } + _run(); +} - for (auto&& job : _jobs) { - threads.push_back(std::move(job->thread)); - } +void PeriodicRunnerImpl::PeriodicJobImpl::pause() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::RUNNING); + _execStatus = PeriodicJobImpl::ExecutionStatus::PAUSED; +} + +void PeriodicRunnerImpl::PeriodicJobImpl::resume() { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::PAUSED); + _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; + } + _condvar.notify_one(); +} - _jobs.clear(); +void PeriodicRunnerImpl::PeriodicJobImpl::stop() { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(isAlive()); + invariant(_thread.joinable()); - _condvar.notify_all(); - } + _execStatus = PeriodicJobImpl::ExecutionStatus::CANCELED; } + _condvar.notify_one(); + _thread.join(); } -PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, PeriodicRunnerImpl* parent) - : job(std::move(job)), parent(parent) {} +bool PeriodicRunnerImpl::PeriodicJobImpl::isAlive() { + return _execStatus == ExecutionStatus::RUNNING || _execStatus == ExecutionStatus::PAUSED; +} -void PeriodicRunnerImpl::PeriodicJobImpl::run() { - thread = stdx::thread([ this, anchor = shared_from_this() ] { - Client::initThread(job.name, parent->_svc, nullptr); - while (true) { - auto start = parent->_clockSource->now(); +namespace { - job.job(Client::getCurrent()); +template <typename T> +std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) { + if (auto p = ptr.lock()) { + return p; + } else { + uasserted(ErrorCodes::InternalError, errMsg); + } +} - stdx::unique_lock<stdx::mutex> lk(parent->_mutex); - if (parent->_clockSource->waitForConditionUntil( - parent->_condvar, lk, start + job.interval, [&] { - return !parent->_running; - })) { - break; - } - } - }); +constexpr auto kPeriodicJobHandleLifetimeErrMsg = + "The PeriodicRunner job for this handle no longer exists"_sd; + +} // namespace + +void PeriodicRunnerImpl::PeriodicJobHandleImpl::start() { + auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); + job->start(); +} + +void PeriodicRunnerImpl::PeriodicJobHandleImpl::pause() { + auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); + job->pause(); +} + +void PeriodicRunnerImpl::PeriodicJobHandleImpl::resume() { + auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); + job->resume(); } } // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h index 555f12f2b7f..49f7040e361 100644 --- a/src/mongo/util/periodic_runner_impl.h +++ b/src/mongo/util/periodic_runner_impl.h @@ -51,6 +51,7 @@ public: PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource); ~PeriodicRunnerImpl(); + std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override; void scheduleJob(PeriodicJob job) override; void startup() override; @@ -58,14 +59,49 @@ public: void shutdown() override; private: - struct PeriodicJobImpl : public std::enable_shared_from_this<PeriodicJobImpl> { - PeriodicJobImpl(PeriodicJob job, PeriodicRunnerImpl* parent); + class PeriodicJobImpl { + MONGO_DISALLOW_COPYING(PeriodicJobImpl); - void run(); + public: + PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc); - PeriodicJob job; - PeriodicRunnerImpl* parent; - stdx::thread thread; + void start(); + void pause(); + void resume(); + void stop(); + + bool isAlive(); + + enum class ExecutionStatus { NOT_SCHEDULED, RUNNING, PAUSED, CANCELED }; + + private: + void _run(); + + PeriodicJob _job; + ClockSource* _clockSource; + ServiceContext* _serviceContext; + stdx::thread _thread; + + stdx::mutex _mutex; + stdx::condition_variable _condvar; + /** + * The current execution status of the job. + */ + ExecutionStatus _execStatus{ExecutionStatus::NOT_SCHEDULED}; + }; + + std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> createAndAddJob(PeriodicJob job); + + class PeriodicJobHandleImpl : public PeriodicJobHandle { + public: + explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl) + : _jobWeak(jobImpl) {} + void start() override; + void pause() override; + void resume() override; + + private: + std::weak_ptr<PeriodicJobImpl> _jobWeak; }; ServiceContext* _svc; @@ -74,7 +110,6 @@ private: std::vector<std::shared_ptr<PeriodicJobImpl>> _jobs; stdx::mutex _mutex; - stdx::condition_variable _condvar; bool _running = false; }; diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp index 1ffda65bfaa..583825d21e4 100644 --- a/src/mongo/util/periodic_runner_impl_test.cpp +++ b/src/mongo/util/periodic_runner_impl_test.cpp @@ -105,6 +105,148 @@ TEST_F(PeriodicRunnerImplTest, OneJobTest) { } } +TEST_F(PeriodicRunnerImplTest, OnePausableJobDoesNotRunWithoutStart) { + int count = 0; + Milliseconds interval{5}; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Add a job, ensure that it runs once + PeriodicRunner::PeriodicJob job("job", + [&count, &mutex, &cv](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + count++; + } + cv.notify_all(); + }, + interval); + + auto handle = runner().makeJob(std::move(job)); + clockSource().advance(interval); + ASSERT_EQ(count, 0); +} + +TEST_F(PeriodicRunnerImplTest, OnePausableJobRunsCorrectlyWithStart) { + int count = 0; + Milliseconds interval{5}; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Add a job, ensure that it runs once + PeriodicRunner::PeriodicJob job("job", + [&count, &mutex, &cv](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + count++; + } + cv.notify_all(); + }, + interval); + + auto handle = runner().makeJob(std::move(job)); + handle->start(); + // Fast forward ten times, we should run all ten times. + for (int i = 0; i < 10; i++) { + clockSource().advance(interval); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&count, &i] { return count > i; }); + } + } +} + +TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) { + int count = 0; + Milliseconds interval{5}; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Add a job, ensure that it runs once + PeriodicRunner::PeriodicJob job("job", + [&count, &mutex, &cv](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + count++; + } + cv.notify_all(); + }, + interval); + + auto handle = runner().makeJob(std::move(job)); + handle->start(); + // Fast forward ten times, we should run all ten times. + for (int i = 0; i < 10; i++) { + clockSource().advance(interval); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&count, &i] { return count > i; }); + } + } + auto numExecutionsBeforePause = count; + handle->pause(); + // Fast forward ten times, we shouldn't run anymore + for (int i = 0; i < 10; i++) { + clockSource().advance(interval); + } + ASSERT_TRUE(count == numExecutionsBeforePause || count == numExecutionsBeforePause + 1); +} + +TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) { + int count = 0; + int numFastForwardsForIterationWhileActive = 10; + Milliseconds interval{5}; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Add a job, ensure that it runs once + PeriodicRunner::PeriodicJob job("job", + [&count, &mutex, &cv](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + count++; + } + cv.notify_all(); + }, + interval); + + auto handle = runner().makeJob(std::move(job)); + handle->start(); + // Fast forward ten times, we should run all ten times. + for (int i = 0; i < numFastForwardsForIterationWhileActive; i++) { + clockSource().advance(interval); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&count, &i] { return count > i; }); + } + } + auto countBeforePause = count; + ASSERT_TRUE(countBeforePause == numFastForwardsForIterationWhileActive || + countBeforePause == numFastForwardsForIterationWhileActive + 1); + handle->pause(); + // Fast forward ten times, we shouldn't run anymore + for (int i = 0; i < 10; i++) { + clockSource().advance(interval); + } + handle->resume(); + // Fast forward ten times, we should run all ten times. + for (int i = 0; i < numFastForwardsForIterationWhileActive; i++) { + clockSource().advance(interval); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&count, &countBeforePause, &i] { return count > countBeforePause + i; }); + } + } + + // This is slightly racy so once in a while count will be one extra + ASSERT_TRUE(count == numFastForwardsForIterationWhileActive * 2 || + count == numFastForwardsForIterationWhileActive * 2 + 1); +} + TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) { int count = 0; Milliseconds interval{5}; @@ -175,6 +317,7 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsTest) { cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); }); } } + tearDown(); } TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) { |