/**
* Copyright (C) 2018 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/util/periodic_runner_impl.h"
#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource)
: _svc(svc), _clockSource(clockSource) {}
PeriodicRunnerImpl::~PeriodicRunnerImpl() {
PeriodicRunnerImpl::shutdown();
}
std::shared_ptr PeriodicRunnerImpl::createAndAddJob(
PeriodicJob job) {
auto impl = std::make_shared(std::move(job), this->_clockSource, this->_svc);
stdx::lock_guard lk(_mutex);
_jobs.push_back(impl);
return impl;
}
std::unique_ptr PeriodicRunnerImpl::makeJob(PeriodicJob job) {
auto handle = std::make_unique(createAndAddJob(job));
return std::move(handle);
}
void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) {
auto impl = createAndAddJob(job);
stdx::lock_guard lk(_mutex);
if (_running) {
impl->start();
}
}
void PeriodicRunnerImpl::startup() {
stdx::lock_guard lk(_mutex);
if (_running) {
return;
}
_running = true;
// schedule any jobs that we have
for (auto& job : _jobs) {
job->start();
}
}
void PeriodicRunnerImpl::shutdown() {
stdx::lock_guard lk(_mutex);
if (_running) {
_running = false;
for (auto& job : _jobs) {
if (job->isAlive()) {
job->stop();
}
}
_jobs.clear();
}
}
PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
ClockSource* source,
ServiceContext* svc)
: _job(std::move(job)), _clockSource(source), _serviceContext(svc) {}
void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
_thread = stdx::thread([this] {
Client::initThread(_job.name, _serviceContext, nullptr);
while (true) {
auto start = _clockSource->now();
stdx::unique_lock lk(_mutex);
// Wait until it's unpaused or canceled
_condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
if (_execStatus == ExecutionStatus::CANCELED) {
break;
}
// Unlock while job is running so we can pause/cancel concurrently
lk.unlock();
_job.job(Client::getCurrent());
lk.lock();
if (_clockSource->waitForConditionUntil(_condvar, lk, start + _job.interval, [&] {
return _execStatus == ExecutionStatus::CANCELED;
})) {
break;
}
}
});
}
void PeriodicRunnerImpl::PeriodicJobImpl::start() {
{
stdx::lock_guard lk(_mutex);
invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::NOT_SCHEDULED);
_execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
}
_run();
}
void PeriodicRunnerImpl::PeriodicJobImpl::pause() {
stdx::lock_guard lk(_mutex);
invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::RUNNING);
_execStatus = PeriodicJobImpl::ExecutionStatus::PAUSED;
}
void PeriodicRunnerImpl::PeriodicJobImpl::resume() {
{
stdx::lock_guard lk(_mutex);
invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::PAUSED);
_execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
}
_condvar.notify_one();
}
void PeriodicRunnerImpl::PeriodicJobImpl::stop() {
{
stdx::lock_guard lk(_mutex);
invariant(isAlive());
invariant(_thread.joinable());
_execStatus = PeriodicJobImpl::ExecutionStatus::CANCELED;
}
invariant(_thread.joinable());
_condvar.notify_one();
_thread.join();
}
bool PeriodicRunnerImpl::PeriodicJobImpl::isAlive() {
return _execStatus == ExecutionStatus::RUNNING || _execStatus == ExecutionStatus::PAUSED;
}
namespace {
template
std::shared_ptr lockAndAssertExists(std::weak_ptr ptr, StringData errMsg) {
if (auto p = ptr.lock()) {
return p;
} else {
uasserted(ErrorCodes::InternalError, errMsg);
}
}
constexpr auto kPeriodicJobHandleLifetimeErrMsg =
"The PeriodicRunner job for this handle no longer exists"_sd;
} // namespace
void PeriodicRunnerImpl::PeriodicJobHandleImpl::start() {
auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
job->start();
}
void PeriodicRunnerImpl::PeriodicJobHandleImpl::stop() {
auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
job->stop();
}
void PeriodicRunnerImpl::PeriodicJobHandleImpl::pause() {
auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
job->pause();
}
void PeriodicRunnerImpl::PeriodicJobHandleImpl::resume() {
auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg);
job->resume();
}
} // namespace mongo