summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2018-06-08 16:18:03 -0400
committerMatthew Saltz <matthew.saltz@mongodb.com>2018-07-03 14:21:00 -0400
commit738ee996e0a1f5b0c92f27f2746d94d14092c1bd (patch)
tree4d06b3a2a24eb906b75c625d44da1a14fe299e48 /src
parentb2cd49a55e98894bdcef5b646d3b7f752addf944 (diff)
downloadmongo-738ee996e0a1f5b0c92f27f2746d94d14092c1bd.tar.gz
SERVER-35585 Make PeriodicRunner jobs be pausable/resumable
Diffstat (limited to 'src')
-rw-r--r--src/mongo/util/periodic_runner.h28
-rw-r--r--src/mongo/util/periodic_runner_impl.cpp161
-rw-r--r--src/mongo/util/periodic_runner_impl.h49
-rw-r--r--src/mongo/util/periodic_runner_impl_test.cpp143
4 files changed, 334 insertions, 47 deletions
diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h
index 710b812bb73..c6d7fb01ac4 100644
--- a/src/mongo/util/periodic_runner.h
+++ b/src/mongo/util/periodic_runner.h
@@ -72,8 +72,36 @@ public:
Milliseconds interval;
};
+ class PeriodicJobHandle {
+ public:
+ virtual ~PeriodicJobHandle() = default;
+
+ /**
+ * Starts running the job
+ */
+ virtual void start() = 0;
+ /**
+ * Pauses the job temporarily so that it does not execute until
+ * unpaused
+ */
+ virtual void pause() = 0;
+ /**
+ * Resumes a paused job so that it continues executing each interval
+ */
+ virtual void resume() = 0;
+ };
+
virtual ~PeriodicRunner();
+
+ /**
+ * Creates a new job and adds it to the runner, but does not schedule it.
+ * The caller is responsible for calling 'start' on the resulting handle in
+ * order to begin the job running. This API should be used when the caller
+ * is interested in observing and controlling the job execution state.
+ */
+ virtual std::unique_ptr<PeriodicJobHandle> makeJob(PeriodicJob job) = 0;
+
/**
* Schedules a job to be run at periodic intervals.
*
diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp
index 03b0f67b40b..0be6fba57a7 100644
--- a/src/mongo/util/periodic_runner_impl.cpp
+++ b/src/mongo/util/periodic_runner_impl.cpp
@@ -41,18 +41,28 @@ PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSo
: _svc(svc), _clockSource(clockSource) {}
PeriodicRunnerImpl::~PeriodicRunnerImpl() {
- shutdown();
+ PeriodicRunnerImpl::shutdown();
}
-void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) {
- auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this);
+std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> PeriodicRunnerImpl::createAndAddJob(
+ PeriodicJob job) {
+ auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this->_clockSource, this->_svc);
- {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _jobs.push_back(impl);
- if (_running) {
- impl->run();
- }
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _jobs.push_back(impl);
+ return impl;
+}
+
+std::unique_ptr<PeriodicRunner::PeriodicJobHandle> PeriodicRunnerImpl::makeJob(PeriodicJob job) {
+ auto handle = std::make_unique<PeriodicJobHandleImpl>(createAndAddJob(job));
+ return std::move(handle);
+}
+
+void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) {
+ auto impl = createAndAddJob(job);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_running) {
+ impl->start();
}
}
@@ -66,56 +76,127 @@ void PeriodicRunnerImpl::startup() {
_running = true;
// schedule any jobs that we have
- for (auto job : _jobs) {
- job->run();
+ for (auto& job : _jobs) {
+ job->start();
}
}
void PeriodicRunnerImpl::shutdown() {
- std::vector<stdx::thread> threads;
- const auto guard = MakeGuard([&] {
- for (auto& thread : threads) {
- thread.join();
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_running) {
+ _running = false;
+
+ for (auto& job : _jobs) {
+ if (job->isAlive()) {
+ job->stop();
+ }
+ }
+ _jobs.clear();
+ }
+}
+
+PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
+ ClockSource* source,
+ ServiceContext* svc)
+ : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {}
+
+void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
+ _thread = stdx::thread([this] {
+ Client::initThread(_job.name, _serviceContext, nullptr);
+ while (true) {
+ auto start = _clockSource->now();
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ // Wait until it's unpaused or canceled
+ _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
+ if (_execStatus == ExecutionStatus::CANCELED) {
+ break;
+ }
+
+ // Unlock while job is running so we can pause/cancel concurrently
+ lk.unlock();
+ _job.job(Client::getCurrent());
+ lk.lock();
+
+ if (_clockSource->waitForConditionUntil(_condvar, lk, start + _job.interval, [&] {
+ return _execStatus == ExecutionStatus::CANCELED;
+ })) {
+ break;
+ }
}
});
+}
+void PeriodicRunnerImpl::PeriodicJobImpl::start() {
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_running) {
- _running = false;
+ invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::NOT_SCHEDULED);
+ _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
+ }
+ _run();
+}
- for (auto&& job : _jobs) {
- threads.push_back(std::move(job->thread));
- }
+void PeriodicRunnerImpl::PeriodicJobImpl::pause() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::RUNNING);
+ _execStatus = PeriodicJobImpl::ExecutionStatus::PAUSED;
+}
+
+void PeriodicRunnerImpl::PeriodicJobImpl::resume() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::PAUSED);
+ _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
+ }
+ _condvar.notify_one();
+}
- _jobs.clear();
+void PeriodicRunnerImpl::PeriodicJobImpl::stop() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(isAlive());
+ invariant(_thread.joinable());
- _condvar.notify_all();
- }
+ _execStatus = PeriodicJobImpl::ExecutionStatus::CANCELED;
}
+ _condvar.notify_one();
+ _thread.join();
}
-PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, PeriodicRunnerImpl* parent)
- : job(std::move(job)), parent(parent) {}
+bool PeriodicRunnerImpl::PeriodicJobImpl::isAlive() {
+ return _execStatus == ExecutionStatus::RUNNING || _execStatus == ExecutionStatus::PAUSED;
+}
-void PeriodicRunnerImpl::PeriodicJobImpl::run() {
- thread = stdx::thread([ this, anchor = shared_from_this() ] {
- Client::initThread(job.name, parent->_svc, nullptr);
- while (true) {
- auto start = parent->_clockSource->now();
+namespace {
- job.job(Client::getCurrent());
+template <typename T>
+std::shared_ptr<T> lockAndAssertExists(std::weak_ptr<T> ptr, StringData errMsg) {
+ if (auto p = ptr.lock()) {
+ return p;
+ } else {
+ uasserted(ErrorCodes::InternalError, errMsg);
+ }
+}
- stdx::unique_lock<stdx::mutex> lk(parent->_mutex);
- if (parent->_clockSource->waitForConditionUntil(
- parent->_condvar, lk, start + job.interval, [&] {
- return !parent->_running;
- })) {
- break;
- }
- }
- });
+constexpr auto kPeriodicJobHandleLifetimeErrMsg =
+ "The PeriodicRunner job for this handle no longer exists"_sd;
+
+} // namespace
+
+void PeriodicRunnerImpl::PeriodicJobHandleImpl::start() {
+ auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
+ job->start();
+}
+
+void PeriodicRunnerImpl::PeriodicJobHandleImpl::pause() {
+ auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
+ job->pause();
+}
+
+void PeriodicRunnerImpl::PeriodicJobHandleImpl::resume() {
+ auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
+ job->resume();
}
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h
index 555f12f2b7f..49f7040e361 100644
--- a/src/mongo/util/periodic_runner_impl.h
+++ b/src/mongo/util/periodic_runner_impl.h
@@ -51,6 +51,7 @@ public:
PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource);
~PeriodicRunnerImpl();
+ std::unique_ptr<PeriodicRunner::PeriodicJobHandle> makeJob(PeriodicJob job) override;
void scheduleJob(PeriodicJob job) override;
void startup() override;
@@ -58,14 +59,49 @@ public:
void shutdown() override;
private:
- struct PeriodicJobImpl : public std::enable_shared_from_this<PeriodicJobImpl> {
- PeriodicJobImpl(PeriodicJob job, PeriodicRunnerImpl* parent);
+ class PeriodicJobImpl {
+ MONGO_DISALLOW_COPYING(PeriodicJobImpl);
- void run();
+ public:
+ PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc);
- PeriodicJob job;
- PeriodicRunnerImpl* parent;
- stdx::thread thread;
+ void start();
+ void pause();
+ void resume();
+ void stop();
+
+ bool isAlive();
+
+ enum class ExecutionStatus { NOT_SCHEDULED, RUNNING, PAUSED, CANCELED };
+
+ private:
+ void _run();
+
+ PeriodicJob _job;
+ ClockSource* _clockSource;
+ ServiceContext* _serviceContext;
+ stdx::thread _thread;
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _condvar;
+ /**
+ * The current execution status of the job.
+ */
+ ExecutionStatus _execStatus{ExecutionStatus::NOT_SCHEDULED};
+ };
+
+ std::shared_ptr<PeriodicRunnerImpl::PeriodicJobImpl> createAndAddJob(PeriodicJob job);
+
+ class PeriodicJobHandleImpl : public PeriodicJobHandle {
+ public:
+ explicit PeriodicJobHandleImpl(std::weak_ptr<PeriodicJobImpl> jobImpl)
+ : _jobWeak(jobImpl) {}
+ void start() override;
+ void pause() override;
+ void resume() override;
+
+ private:
+ std::weak_ptr<PeriodicJobImpl> _jobWeak;
};
ServiceContext* _svc;
@@ -74,7 +110,6 @@ private:
std::vector<std::shared_ptr<PeriodicJobImpl>> _jobs;
stdx::mutex _mutex;
- stdx::condition_variable _condvar;
bool _running = false;
};
diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp
index 1ffda65bfaa..583825d21e4 100644
--- a/src/mongo/util/periodic_runner_impl_test.cpp
+++ b/src/mongo/util/periodic_runner_impl_test.cpp
@@ -105,6 +105,148 @@ TEST_F(PeriodicRunnerImplTest, OneJobTest) {
}
}
+TEST_F(PeriodicRunnerImplTest, OnePausableJobDoesNotRunWithoutStart) {
+ int count = 0;
+ Milliseconds interval{5};
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Add a job, ensure that it runs once
+ PeriodicRunner::PeriodicJob job("job",
+ [&count, &mutex, &cv](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ count++;
+ }
+ cv.notify_all();
+ },
+ interval);
+
+ auto handle = runner().makeJob(std::move(job));
+ clockSource().advance(interval);
+ ASSERT_EQ(count, 0);
+}
+
+TEST_F(PeriodicRunnerImplTest, OnePausableJobRunsCorrectlyWithStart) {
+ int count = 0;
+ Milliseconds interval{5};
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Add a job, ensure that it runs once
+ PeriodicRunner::PeriodicJob job("job",
+ [&count, &mutex, &cv](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ count++;
+ }
+ cv.notify_all();
+ },
+ interval);
+
+ auto handle = runner().makeJob(std::move(job));
+ handle->start();
+ // Fast forward ten times, we should run all ten times.
+ for (int i = 0; i < 10; i++) {
+ clockSource().advance(interval);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&count, &i] { return count > i; });
+ }
+ }
+}
+
+TEST_F(PeriodicRunnerImplTest, OnePausableJobPausesCorrectly) {
+ int count = 0;
+ Milliseconds interval{5};
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Add a job, ensure that it runs once
+ PeriodicRunner::PeriodicJob job("job",
+ [&count, &mutex, &cv](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ count++;
+ }
+ cv.notify_all();
+ },
+ interval);
+
+ auto handle = runner().makeJob(std::move(job));
+ handle->start();
+ // Fast forward ten times, we should run all ten times.
+ for (int i = 0; i < 10; i++) {
+ clockSource().advance(interval);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&count, &i] { return count > i; });
+ }
+ }
+ auto numExecutionsBeforePause = count;
+ handle->pause();
+ // Fast forward ten times, we shouldn't run anymore
+ for (int i = 0; i < 10; i++) {
+ clockSource().advance(interval);
+ }
+ ASSERT_TRUE(count == numExecutionsBeforePause || count == numExecutionsBeforePause + 1);
+}
+
+TEST_F(PeriodicRunnerImplTest, OnePausableJobResumesCorrectly) {
+ int count = 0;
+ int numFastForwardsForIterationWhileActive = 10;
+ Milliseconds interval{5};
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Add a job, ensure that it runs once
+ PeriodicRunner::PeriodicJob job("job",
+ [&count, &mutex, &cv](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ count++;
+ }
+ cv.notify_all();
+ },
+ interval);
+
+ auto handle = runner().makeJob(std::move(job));
+ handle->start();
+ // Fast forward ten times, we should run all ten times.
+ for (int i = 0; i < numFastForwardsForIterationWhileActive; i++) {
+ clockSource().advance(interval);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&count, &i] { return count > i; });
+ }
+ }
+ auto countBeforePause = count;
+ ASSERT_TRUE(countBeforePause == numFastForwardsForIterationWhileActive ||
+ countBeforePause == numFastForwardsForIterationWhileActive + 1);
+ handle->pause();
+ // Fast forward ten times, we shouldn't run anymore
+ for (int i = 0; i < 10; i++) {
+ clockSource().advance(interval);
+ }
+ handle->resume();
+ // Fast forward ten times, we should run all ten times.
+ for (int i = 0; i < numFastForwardsForIterationWhileActive; i++) {
+ clockSource().advance(interval);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&count, &countBeforePause, &i] { return count > countBeforePause + i; });
+ }
+ }
+
+ // This is slightly racy so once in a while count will be one extra
+ ASSERT_TRUE(count == numFastForwardsForIterationWhileActive * 2 ||
+ count == numFastForwardsForIterationWhileActive * 2 + 1);
+}
+
TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) {
int count = 0;
Milliseconds interval{5};
@@ -175,6 +317,7 @@ TEST_F(PeriodicRunnerImplTest, TwoJobsTest) {
cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); });
}
}
+ tearDown();
}
TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) {