/** * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/util/periodic_runner_impl.h" #include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/util/clock_source.h" #include "mongo/util/scopeguard.h" namespace mongo { PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource) : _svc(svc), _clockSource(clockSource) {} PeriodicRunnerImpl::~PeriodicRunnerImpl() { PeriodicRunnerImpl::shutdown(); } std::shared_ptr PeriodicRunnerImpl::createAndAddJob( PeriodicJob job) { auto impl = std::make_shared(std::move(job), this->_clockSource, this->_svc); stdx::lock_guard lk(_mutex); _jobs.push_back(impl); return impl; } std::unique_ptr PeriodicRunnerImpl::makeJob(PeriodicJob job) { auto handle = std::make_unique(createAndAddJob(job)); return std::move(handle); } void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) { auto impl = createAndAddJob(job); stdx::lock_guard lk(_mutex); if (_running) { impl->start(); } } void PeriodicRunnerImpl::startup() { stdx::lock_guard lk(_mutex); if (_running) { return; } _running = true; // schedule any jobs that we have for (auto& job : _jobs) { job->start(); } } void PeriodicRunnerImpl::shutdown() { stdx::lock_guard lk(_mutex); if (_running) { _running = false; for (auto& job : _jobs) { if (job->isAlive()) { job->stop(); } } _jobs.clear(); } } PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc) : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {} void PeriodicRunnerImpl::PeriodicJobImpl::_run() { _thread = stdx::thread([this] { Client::initThread(_job.name, _serviceContext, nullptr); while (true) { auto start = _clockSource->now(); stdx::unique_lock lk(_mutex); // Wait until it's unpaused or canceled _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; }); if (_execStatus == ExecutionStatus::CANCELED) { break; } // Unlock while job is running so we can pause/cancel concurrently lk.unlock(); _job.job(Client::getCurrent()); lk.lock(); if (_clockSource->waitForConditionUntil(_condvar, lk, start + _job.interval, [&] { return _execStatus == ExecutionStatus::CANCELED; })) { break; } } }); } void PeriodicRunnerImpl::PeriodicJobImpl::start() { { stdx::lock_guard lk(_mutex); invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::NOT_SCHEDULED); _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; } _run(); } void PeriodicRunnerImpl::PeriodicJobImpl::pause() { stdx::lock_guard lk(_mutex); invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::RUNNING); _execStatus = PeriodicJobImpl::ExecutionStatus::PAUSED; } void PeriodicRunnerImpl::PeriodicJobImpl::resume() { { stdx::lock_guard lk(_mutex); invariant(_execStatus == PeriodicJobImpl::ExecutionStatus::PAUSED); _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; } _condvar.notify_one(); } void PeriodicRunnerImpl::PeriodicJobImpl::stop() { { stdx::lock_guard lk(_mutex); invariant(isAlive()); invariant(_thread.joinable()); _execStatus = PeriodicJobImpl::ExecutionStatus::CANCELED; } invariant(_thread.joinable()); _condvar.notify_one(); _thread.join(); } bool PeriodicRunnerImpl::PeriodicJobImpl::isAlive() { return _execStatus == ExecutionStatus::RUNNING || _execStatus == ExecutionStatus::PAUSED; } namespace { template std::shared_ptr lockAndAssertExists(std::weak_ptr ptr, StringData errMsg) { if (auto p = ptr.lock()) { return p; } else { uasserted(ErrorCodes::InternalError, errMsg); } } constexpr auto kPeriodicJobHandleLifetimeErrMsg = "The PeriodicRunner job for this handle no longer exists"_sd; } // namespace void PeriodicRunnerImpl::PeriodicJobHandleImpl::start() { auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); job->start(); } void PeriodicRunnerImpl::PeriodicJobHandleImpl::stop() { auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); job->stop(); } void PeriodicRunnerImpl::PeriodicJobHandleImpl::pause() { auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); job->pause(); } void PeriodicRunnerImpl::PeriodicJobHandleImpl::resume() { auto job = lockAndAssertExists(_jobWeak, kPeriodicJobHandleLifetimeErrMsg); job->resume(); } } // namespace mongo