diff options
author | Jason Carey <jcarey@argv.me> | 2018-05-09 16:56:02 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-05-10 19:49:37 -0400 |
commit | 714b97ba9c7dd8de3351eb811befce6c4b6efd63 (patch) | |
tree | 8d0412fe26082fcce784cf39daaa4974fb951b3d | |
parent | 9c001939d82c4522a8cb071b4285b1f1718b81cf (diff) | |
download | mongo-714b97ba9c7dd8de3351eb811befce6c4b6efd63.tar.gz |
SERVER-34923 PeriodicRunnerASIO -> Impl
Replace PeriodicRunnerASIO with PeriodicRunnerImpl, a simpler interface
that runs jobs on dedicated threads.
-rw-r--r-- | src/mongo/db/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/service_liason_mock.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/service_liason_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 4 | ||||
-rw-r--r-- | src/mongo/util/SConscript | 17 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner.h | 18 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio.cpp | 163 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio.h | 126 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio_test.cpp | 270 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_factory.cpp | 10 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_factory.h | 4 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_impl.cpp | 121 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_impl.h | 81 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_impl_test.cpp | 234 |
17 files changed, 483 insertions, 598 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 3821a34d592..921c8f01c9e 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1183,7 +1183,7 @@ envWithAsio.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/executor/async_timer_mock', - '$BUILD_DIR/mongo/util/periodic_runner_asio', + '$BUILD_DIR/mongo/util/periodic_runner_factory', 'kill_sessions', 'service_liason', ], diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index f8d1a583d29..f9b47ed91de 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -594,8 +594,8 @@ ExitCode _initAndListen(int listenPort) { PeriodicTask::startRunningPeriodicTasks(); // Set up the periodic runner for background job execution - auto runner = makePeriodicRunner(); - runner->startup().transitional_ignore(); + auto runner = makePeriodicRunner(serviceContext); + runner->startup(); serviceContext->setPeriodicRunner(std::move(runner)); SessionKiller::set(serviceContext, diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index e0a5e8de314..f0f234d97af 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -65,10 +65,12 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl( _sessionsColl(std::move(collection)), _transactionReaper(std::move(transactionReaper)) { if (!disableLogicalSessionCacheRefresh) { - _service->scheduleJob( - {[this](Client* client) { _periodicRefresh(client); }, _refreshInterval}); - _service->scheduleJob( - {[this](Client* client) { _periodicReap(client); }, _refreshInterval}); + _service->scheduleJob({"LogicalSessionCacheRefresh", + [this](Client* client) { _periodicRefresh(client); }, + _refreshInterval}); + _service->scheduleJob({"LogicalSessionCacheReap", + [this](Client* client) { _periodicReap(client); }, + _refreshInterval}); } _stats.setLastSessionsCollectionJobTimestamp(now()); _stats.setLastTransactionReaperJobTimestamp(now()); diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp index d6b394f6db1..ea79504ae31 100644 --- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp +++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp @@ -59,6 +59,7 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex // run until we reach transactionLifetimeLimitSeconds/2, at which point we run the code and // reset 'seconds'. Etc. PeriodicRunner::PeriodicJob job( + "startPeriodicThreadToAbortExpiredTransactions", [](Client* client) { try { static int seconds = 0; @@ -78,7 +79,7 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex seconds = 0; // The opCtx destructor handles unsetting itself from the Client. - // (The PeriodicRunnerASIO's Client must be reset before returning.) + // (The PeriodicRunner's Client must be reset before returning.) auto opCtx = client->makeOperationContext(); killAllExpiredTransactions(opCtx.get()); diff --git a/src/mongo/db/service_liason_mock.cpp b/src/mongo/db/service_liason_mock.cpp index 57649dc704d..fbc09f88784 100644 --- a/src/mongo/db/service_liason_mock.cpp +++ b/src/mongo/db/service_liason_mock.cpp @@ -32,14 +32,14 @@ #include "mongo/db/service_liason_mock.h" #include "mongo/stdx/memory.h" +#include "mongo/util/periodic_runner_factory.h" namespace mongo { MockServiceLiasonImpl::MockServiceLiasonImpl() { - auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>(); - _timerFactory = timerFactory.get(); - _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory)); - _runner->startup().transitional_ignore(); + _timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>(); + _runner = makePeriodicRunner(getGlobalServiceContext()); + _runner->startup(); } LogicalSessionIdSet MockServiceLiasonImpl::getActiveOpSessions() const { diff --git a/src/mongo/db/service_liason_mock.h b/src/mongo/db/service_liason_mock.h index 19a7d8c321f..66eea52d09f 100644 --- a/src/mongo/db/service_liason_mock.h +++ b/src/mongo/db/service_liason_mock.h @@ -35,7 +35,7 @@ #include "mongo/platform/atomic_word.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/periodic_runner_asio.h" +#include "mongo/util/periodic_runner.h" #include "mongo/util/time_support.h" namespace mongo { @@ -82,8 +82,8 @@ public: const SessionKiller::Matcher& matcher); private: - executor::AsyncTimerFactoryMock* _timerFactory; - std::unique_ptr<PeriodicRunnerASIO> _runner; + std::unique_ptr<executor::AsyncTimerFactoryMock> _timerFactory; + std::unique_ptr<PeriodicRunner> _runner; boost::optional<SessionKiller::Matcher> _matcher; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 8e0db8b8e21..3c68ef51462 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -434,8 +434,8 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { PeriodicTask::startRunningPeriodicTasks(); // Set up the periodic runner for background job execution - auto runner = makePeriodicRunner(); - runner->startup().transitional_ignore(); + auto runner = makePeriodicRunner(serviceContext); + runner->startup(); serviceContext->setPeriodicRunner(std::move(runner)); SessionKiller::set(serviceContext, diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index 43e58dd2dce..10363c867d0 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -257,28 +257,26 @@ env.Library( ) env.Library( - target="periodic_runner_asio", + target="periodic_runner_impl", source=[ - "periodic_runner_asio.cpp", + "periodic_runner_impl.cpp", ], LIBDEPS=[ "$BUILD_DIR/mongo/base", "$BUILD_DIR/mongo/db/service_context", - "$BUILD_DIR/third_party/shim_asio", - "$BUILD_DIR/mongo/executor/async_timer_asio", "periodic_runner", ], ) env.CppUnitTest( - target="periodic_runner_asio_test", + target="periodic_runner_impl_test", source=[ - "periodic_runner_asio_test.cpp", + "periodic_runner_impl_test.cpp", ], LIBDEPS=[ '$BUILD_DIR/mongo/db/service_context_noop_init', - "$BUILD_DIR/mongo/executor/async_timer_mock", - "periodic_runner_asio", + "clock_source_mock", + "periodic_runner_impl", ], ) @@ -288,8 +286,9 @@ env.Library( 'periodic_runner_factory.cpp', ], LIBDEPS=[ + "$BUILD_DIR/mongo/db/service_context", 'periodic_runner', - 'periodic_runner_asio', + 'periodic_runner_impl', ], ) diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h index 6b80226aa55..710b812bb73 100644 --- a/src/mongo/util/periodic_runner.h +++ b/src/mongo/util/periodic_runner.h @@ -28,6 +28,8 @@ #pragma once +#include <string> + #include "mongo/base/disallow_copying.h" #include "mongo/stdx/functional.h" #include "mongo/util/time_support.h" @@ -51,8 +53,13 @@ public: using Job = stdx::function<void(Client* client)>; struct PeriodicJob { - PeriodicJob(Job callable, Milliseconds period) - : job(std::move(callable)), interval(period) {} + PeriodicJob(std::string name, Job callable, Milliseconds period) + : name(std::move(name)), job(std::move(callable)), interval(period) {} + + /** + * name of the job + */ + std::string name; /** * A task to be run at regular intervals by the runner. @@ -60,7 +67,7 @@ public: Job job; /** - * An interval at which the job should be run. Defaults to 1 minute. + * An interval at which the job should be run. */ Milliseconds interval; }; @@ -79,10 +86,9 @@ public: * Starts up this periodic runner. * * This method may safely be called multiple times, either with or without - * calls to shutdown() in between, but implementations may choose whether to - * restart or error on subsequent calls to startup(). + * calls to shutdown() in between. */ - virtual Status startup() = 0; + virtual void startup() = 0; /** * Shuts down this periodic runner. Stops all jobs from running. diff --git a/src/mongo/util/periodic_runner_asio.cpp b/src/mongo/util/periodic_runner_asio.cpp deleted file mode 100644 index 9776d50fecd..00000000000 --- a/src/mongo/util/periodic_runner_asio.cpp +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl - -#include "mongo/platform/basic.h" - -#include <algorithm> -#include <memory> - -#include "mongo/util/periodic_runner_asio.h" - -#include "mongo/db/client.h" -#include "mongo/util/log.h" - -namespace mongo { - -PeriodicRunnerASIO::PeriodicRunnerASIO( - std::unique_ptr<executor::AsyncTimerFactoryInterface> timerFactory) - : _io_service(), - _strand(_io_service), - _timerFactory(std::move(timerFactory)), - _state(State::kReady) {} - -PeriodicRunnerASIO::~PeriodicRunnerASIO() { - // We must call shutdown here to join our background thread. - shutdown(); -} - -void PeriodicRunnerASIO::scheduleJob(PeriodicJob job) { - // The interval we use here will get written over by _scheduleJob_inlock. - auto uniqueTimer = _timerFactory->make(&_strand, Milliseconds{0}); - std::shared_ptr<executor::AsyncTimerInterface> timer{std::move(uniqueTimer)}; - - auto asioJob = std::make_shared<PeriodicJobASIO>(std::move(job), _timerFactory->now(), timer); - - { - stdx::unique_lock<stdx::mutex> lk(_stateMutex); - _jobs.insert(_jobs.end(), asioJob); - if (_state == State::kRunning) { - _scheduleJob(asioJob, true); - _spawnThreads(lk); - } - } -} - -void PeriodicRunnerASIO::_scheduleJob(std::weak_ptr<PeriodicJobASIO> job, bool firstTime) { - auto lockedJob = job.lock(); - if (!lockedJob) { - return; - } - - // Adjust the timer to expire at the correct time. - auto adjustedMS = - std::max(Milliseconds(0), lockedJob->start + lockedJob->interval - _timerFactory->now()); - lockedJob->timer->expireAfter(adjustedMS); - lockedJob->timer->asyncWait([this, job, firstTime](std::error_code ec) mutable { - if (!firstTime) { - if (ec) { - severe() << "Encountered an error in PeriodicRunnerASIO: " << ec.message(); - return; - } - - auto lockedJob = job.lock(); - if (!lockedJob) { - return; - } - - lockedJob->start = _timerFactory->now(); - - lockedJob->job(Client::getCurrent()); - } - - _io_service.post([this, job]() mutable { _scheduleJob(job, false); }); - }); -} - -void PeriodicRunnerASIO::_spawnThreads(WithLock) { - while (_threads.size() < _jobs.size()) { - _threads.emplace_back([this] { - try { - auto client = getGlobalServiceContext()->makeClient("PeriodicRunnerASIO"); - Client::setCurrent(std::move(client)); - - asio::io_service::work workItem(_io_service); - std::error_code ec; - _io_service.run(ec); - - client = Client::releaseCurrent(); - - if (ec) { - severe() << "Failure in PeriodicRunnerASIO: " << ec.message(); - fassertFailed(40438); - } - } catch (...) { - severe() << "Uncaught exception in PeriodicRunnerASIO: " << exceptionToStatus(); - fassertFailed(40439); - } - }); - } -} - -Status PeriodicRunnerASIO::startup() { - stdx::unique_lock<stdx::mutex> lk(_stateMutex); - if (_state != State::kReady) { - return {ErrorCodes::ShutdownInProgress, "startup() already called"}; - } - - _state = State::kRunning; - - // schedule any jobs that we have - for (auto& job : _jobs) { - job->start = _timerFactory->now(); - _scheduleJob(job, true); - } - - _spawnThreads(lk); - - return Status::OK(); -} - -void PeriodicRunnerASIO::shutdown() { - stdx::unique_lock<stdx::mutex> lk(_stateMutex); - if (_state == State::kRunning) { - _state = State::kComplete; - - _io_service.stop(); - _jobs.clear(); - - lk.unlock(); - - for (auto& thread : _threads) { - thread.join(); - } - } -} - -} // namespace mongo diff --git a/src/mongo/util/periodic_runner_asio.h b/src/mongo/util/periodic_runner_asio.h deleted file mode 100644 index abb94dec984..00000000000 --- a/src/mongo/util/periodic_runner_asio.h +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include <asio.hpp> - -#include "mongo/executor/async_timer_interface.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/concurrency/with_lock.h" -#include "mongo/util/periodic_runner.h" - -namespace mongo { - -class Client; - -/** - * A PeriodicRunner implementation that uses the ASIO library's eventing system - * to schedule and run jobs at regular intervals. - * - * This class takes a timer factory so that it may be mocked out for testing. - * - * The runner will set up a background thread per job and allow asio to distribute jobs across those - * threads. Thus, scheduled jobs cannot block each other from running (a long running job can only - * block itself). Scheduled jobs that require an operation context should use - * Client::getCurrent()->makeOperationContext() to create one for themselves, and MUST clear it - * before they return. - * - * The threads running internally will use the thread name "PeriodicRunnerASIO" and - * anything logged from within a scheduled background task will use this thread name. - * Scheduled tasks may set the thread name to a custom value as they run. However, - * if they do this, they MUST set the thread name back to its original value before - * they return. - */ -class PeriodicRunnerASIO : public PeriodicRunner { -public: - using PeriodicRunner::PeriodicJob; - - /** - * Construct a new instance of this class using the provided timer factory. - */ - explicit PeriodicRunnerASIO(std::unique_ptr<executor::AsyncTimerFactoryInterface> timerFactory); - - ~PeriodicRunnerASIO(); - - /** - * Schedule a job to be run at periodic intervals. - */ - void scheduleJob(PeriodicJob job) override; - - /** - * Starts up this periodic runner. - * - * This periodic runner will only run once; if it is subsequently started up - * again, it will return an error. - */ - Status startup() override; - - /** - * Shut down this periodic runner. Stops all jobs from running. This method - * may safely be called multiple times, but only the first call will have any effect. - */ - void shutdown() override; - -private: - struct PeriodicJobASIO { - explicit PeriodicJobASIO(PeriodicJob callable, - Date_t startTime, - std::shared_ptr<executor::AsyncTimerInterface> sharedTimer) - : job(std::move(callable.job)), - interval(callable.interval), - start(startTime), - timer(sharedTimer) {} - Job job; - Milliseconds interval; - Date_t start; - std::shared_ptr<executor::AsyncTimerInterface> timer; - }; - - // Internally, we will transition through these states - enum class State { kReady, kRunning, kComplete }; - - void _scheduleJob(std::weak_ptr<PeriodicJobASIO> job, bool firstTime); - - void _spawnThreads(WithLock); - - asio::io_service _io_service; - asio::io_service::strand _strand; - - std::vector<stdx::thread> _threads; - - std::unique_ptr<executor::AsyncTimerFactoryInterface> _timerFactory; - - stdx::mutex _stateMutex; - State _state; - - std::vector<std::shared_ptr<PeriodicJobASIO>> _jobs; -}; - -} // namespace mongo diff --git a/src/mongo/util/periodic_runner_asio_test.cpp b/src/mongo/util/periodic_runner_asio_test.cpp deleted file mode 100644 index 88885dcd2a7..00000000000 --- a/src/mongo/util/periodic_runner_asio_test.cpp +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/executor/async_timer_interface.h" -#include "mongo/executor/async_timer_mock.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/future.h" -#include "mongo/stdx/memory.h" -#include "mongo/stdx/mutex.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/periodic_runner_asio.h" - -namespace mongo { - -class Client; - -namespace { - -class PeriodicRunnerASIOTestNoSetup : public unittest::Test { -public: - void setUp() override { - auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>(); - _timerFactory = timerFactory.get(); - _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory)); - } - - void tearDown() override { - _runner->shutdown(); - } - - executor::AsyncTimerFactoryMock& timerFactory() { - return *_timerFactory; - } - - std::unique_ptr<PeriodicRunnerASIO>& runner() { - return _runner; - } - - void sleepForReschedule(int jobs) { - while (timerFactory().jobs() < jobs) { - sleepmillis(2); - } - } - -protected: - executor::AsyncTimerFactoryMock* _timerFactory; - std::unique_ptr<PeriodicRunnerASIO> _runner; -}; - -class PeriodicRunnerASIOTest : public PeriodicRunnerASIOTestNoSetup { -public: - void setUp() override { - auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>(); - _timerFactory = timerFactory.get(); - _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory)); - auto res = _runner->startup(); - ASSERT(res.isOK()); - } -}; - -TEST_F(PeriodicRunnerASIOTest, OneJobTest) { - int count = 0; - Milliseconds interval{5}; - - stdx::mutex mutex; - stdx::condition_variable cv; - - // Add a job, ensure that it runs once - PeriodicRunner::PeriodicJob job( - [&count, &mutex, &cv](Client*) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - count++; - } - cv.notify_all(); - }, - interval); - - runner()->scheduleJob(std::move(job)); - - // Ensure nothing happens until we fastForward - { - stdx::unique_lock<stdx::mutex> lk(mutex); - ASSERT_EQ(count, 0); - } - - // Fast forward ten times, we should run all ten times. - for (int i = 0; i < 10; i++) { - timerFactory().fastForward(interval); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count, &i] { return count > i; }); - } - sleepForReschedule(2); - } -} - -TEST_F(PeriodicRunnerASIOTestNoSetup, ScheduleBeforeStartupTest) { - int count = 0; - Milliseconds interval{5}; - - stdx::mutex mutex; - stdx::condition_variable cv; - - // Schedule a job before startup - PeriodicRunner::PeriodicJob job( - [&count, &mutex, &cv](Client*) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - count++; - } - cv.notify_all(); - }, - interval); - - runner()->scheduleJob(std::move(job)); - - // Start the runner, job should still run - ASSERT(runner()->startup().isOK()); - - timerFactory().fastForward(interval); - - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count > 0; }); -} - -TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) { - int count = 0; - Milliseconds interval{5}; - - // Schedule a job before shutdown - PeriodicRunner::PeriodicJob job([&count](Client*) { count++; }, interval); - - runner()->scheduleJob(std::move(job)); - - // Shut down before the job runs - runner()->shutdown(); - - // Even once we fast forward, job should not get run - timerFactory().fastForward(interval); - sleepmillis(10); - ASSERT_EQ(count, 0); - - // Start up the runner again, this should error and should not run - auto res = runner()->startup(); - ASSERT(!res.isOK()); - - timerFactory().fastForward(interval); - sleepmillis(10); - ASSERT_EQ(count, 0); -} - -TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) { - int countA = 0; - int countB = 0; - Milliseconds intervalA{5}; - Milliseconds intervalB{10}; - - stdx::mutex mutex; - stdx::condition_variable cv; - - // Add two jobs, ensure they both run the proper number of times - PeriodicRunner::PeriodicJob jobA( - [&countA, &mutex, &cv](Client*) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - countA++; - } - cv.notify_all(); - }, - intervalA); - - PeriodicRunner::PeriodicJob jobB( - [&countB, &mutex, &cv](Client*) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - countB++; - } - cv.notify_all(); - }, - intervalB); - - runner()->scheduleJob(std::move(jobA)); - runner()->scheduleJob(std::move(jobB)); - - // Fast forward and wait for both jobs to run the right number of times - for (int i = 0; i <= 10; i++) { - timerFactory().fastForward(intervalA); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); }); - } - sleepForReschedule(3); - } -} - -TEST_F(PeriodicRunnerASIOTest, TwoJobsDontDeadlock) { - stdx::mutex mutex; - stdx::condition_variable cv; - stdx::condition_variable doneCv; - bool a = false; - bool b = false; - - PeriodicRunner::PeriodicJob jobA( - [&](Client*) { - stdx::unique_lock<stdx::mutex> lk(mutex); - a = true; - - cv.notify_one(); - cv.wait(lk, [&] { return b; }); - doneCv.notify_one(); - }, - Milliseconds(1)); - - PeriodicRunner::PeriodicJob jobB( - [&](Client*) { - stdx::unique_lock<stdx::mutex> lk(mutex); - b = true; - - cv.notify_one(); - cv.wait(lk, [&] { return a; }); - doneCv.notify_one(); - }, - Milliseconds(1)); - - runner()->scheduleJob(std::move(jobA)); - runner()->scheduleJob(std::move(jobB)); - - timerFactory().fastForward(Milliseconds(1)); - - { - stdx::unique_lock<stdx::mutex> lk(mutex); - doneCv.wait(lk, [&] { return a && b; }); - - ASSERT(a); - ASSERT(b); - } - - tearDown(); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/util/periodic_runner_factory.cpp b/src/mongo/util/periodic_runner_factory.cpp index d859ca844fd..370d7156892 100644 --- a/src/mongo/util/periodic_runner_factory.cpp +++ b/src/mongo/util/periodic_runner_factory.cpp @@ -30,15 +30,13 @@ #include "mongo/util/periodic_runner_factory.h" -#include "mongo/executor/async_timer_asio.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/periodic_runner_asio.h" +#include "mongo/db/service_context.h" +#include "mongo/util/periodic_runner_impl.h" namespace mongo { -std::unique_ptr<PeriodicRunner> makePeriodicRunner() { - return stdx::make_unique<PeriodicRunnerASIO>( - stdx::make_unique<executor::AsyncTimerFactoryASIO>()); +std::unique_ptr<PeriodicRunner> makePeriodicRunner(ServiceContext* svc) { + return std::make_unique<PeriodicRunnerImpl>(svc, svc->getPreciseClockSource()); } } // namespace diff --git a/src/mongo/util/periodic_runner_factory.h b/src/mongo/util/periodic_runner_factory.h index ecbf30fc52a..babd797dd11 100644 --- a/src/mongo/util/periodic_runner_factory.h +++ b/src/mongo/util/periodic_runner_factory.h @@ -34,9 +34,11 @@ namespace mongo { +class ServiceContext; + /** * Returns a new PeriodicRunner. */ -std::unique_ptr<PeriodicRunner> makePeriodicRunner(); +std::unique_ptr<PeriodicRunner> makePeriodicRunner(ServiceContext* svc); } // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp new file mode 100644 index 00000000000..03b0f67b40b --- /dev/null +++ b/src/mongo/util/periodic_runner_impl.cpp @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/util/periodic_runner_impl.h" + +#include "mongo/db/client.h" +#include "mongo/db/service_context.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource) + : _svc(svc), _clockSource(clockSource) {} + +PeriodicRunnerImpl::~PeriodicRunnerImpl() { + shutdown(); +} + +void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) { + auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this); + + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _jobs.push_back(impl); + if (_running) { + impl->run(); + } + } +} + +void PeriodicRunnerImpl::startup() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_running) { + return; + } + + _running = true; + + // schedule any jobs that we have + for (auto job : _jobs) { + job->run(); + } +} + +void PeriodicRunnerImpl::shutdown() { + std::vector<stdx::thread> threads; + const auto guard = MakeGuard([&] { + for (auto& thread : threads) { + thread.join(); + } + }); + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_running) { + _running = false; + + for (auto&& job : _jobs) { + threads.push_back(std::move(job->thread)); + } + + _jobs.clear(); + + _condvar.notify_all(); + } + } +} + +PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, PeriodicRunnerImpl* parent) + : job(std::move(job)), parent(parent) {} + +void PeriodicRunnerImpl::PeriodicJobImpl::run() { + thread = stdx::thread([ this, anchor = shared_from_this() ] { + Client::initThread(job.name, parent->_svc, nullptr); + + while (true) { + auto start = parent->_clockSource->now(); + + job.job(Client::getCurrent()); + + stdx::unique_lock<stdx::mutex> lk(parent->_mutex); + if (parent->_clockSource->waitForConditionUntil( + parent->_condvar, lk, start + job.interval, [&] { + return !parent->_running; + })) { + break; + } + } + }); +} + +} // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h new file mode 100644 index 00000000000..555f12f2b7f --- /dev/null +++ b/src/mongo/util/periodic_runner_impl.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <memory> +#include <vector> + +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/periodic_runner.h" + +namespace mongo { + +class Client; +class ServiceContext; + +/** + * An implementation of the PeriodicRunner which uses a thread per job and condvar waits on those + * threads to independently sleep. + */ +class PeriodicRunnerImpl : public PeriodicRunner { +public: + PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource); + ~PeriodicRunnerImpl(); + + void scheduleJob(PeriodicJob job) override; + + void startup() override; + + void shutdown() override; + +private: + struct PeriodicJobImpl : public std::enable_shared_from_this<PeriodicJobImpl> { + PeriodicJobImpl(PeriodicJob job, PeriodicRunnerImpl* parent); + + void run(); + + PeriodicJob job; + PeriodicRunnerImpl* parent; + stdx::thread thread; + }; + + ServiceContext* _svc; + ClockSource* _clockSource; + + std::vector<std::shared_ptr<PeriodicJobImpl>> _jobs; + + stdx::mutex _mutex; + stdx::condition_variable _condvar; + bool _running = false; +}; + +} // namespace mongo diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp new file mode 100644 index 00000000000..210871ea4db --- /dev/null +++ b/src/mongo/util/periodic_runner_impl_test.cpp @@ -0,0 +1,234 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/unittest/unittest.h" + +#include "mongo/util/periodic_runner_impl.h" + +#include "mongo/db/service_context_noop.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/clock_source_mock.h" + +namespace mongo { + +class Client; + +namespace { + +class PeriodicRunnerImplTestNoSetup : public unittest::Test { +public: + void setUp() override { + _clockSource = std::make_unique<ClockSourceMock>(); + _svc = stdx::make_unique<ServiceContextNoop>(); + _runner = stdx::make_unique<PeriodicRunnerImpl>(_svc.get(), _clockSource.get()); + } + + void tearDown() override { + _runner->shutdown(); + } + + ClockSourceMock& clockSource() { + return *_clockSource; + } + + PeriodicRunner& runner() { + return *_runner; + } + +private: + std::unique_ptr<ServiceContext> _svc; + std::unique_ptr<ClockSourceMock> _clockSource; + std::unique_ptr<PeriodicRunner> _runner; +}; + +class PeriodicRunnerImplTest : public PeriodicRunnerImplTestNoSetup { +public: + void setUp() override { + PeriodicRunnerImplTestNoSetup::setUp(); + runner().startup(); + } +}; + +TEST_F(PeriodicRunnerImplTest, OneJobTest) { + int count = 0; + Milliseconds interval{5}; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Add a job, ensure that it runs once + PeriodicRunner::PeriodicJob job("job", + [&count, &mutex, &cv](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + count++; + } + cv.notify_all(); + }, + interval); + + runner().scheduleJob(std::move(job)); + + // Ensure nothing happens until we fastForward + { + stdx::unique_lock<stdx::mutex> lk(mutex); + ASSERT_EQ(count, 0); + } + + // Fast forward ten times, we should run all ten times. + for (int i = 0; i < 10; i++) { + clockSource().advance(interval); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&count, &i] { return count > i; }); + } + } +} + +TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) { + int count = 0; + Milliseconds interval{5}; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Schedule a job before startup + PeriodicRunner::PeriodicJob job("job", + [&count, &mutex, &cv](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + count++; + } + cv.notify_all(); + }, + interval); + + runner().scheduleJob(std::move(job)); + + // Start the runner, job should still run + runner().startup(); + + clockSource().advance(interval); + + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&count] { return count > 0; }); +} + +TEST_F(PeriodicRunnerImplTest, TwoJobsTest) { + int countA = 0; + int countB = 0; + Milliseconds intervalA{5}; + Milliseconds intervalB{10}; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Add two jobs, ensure they both run the proper number of times + PeriodicRunner::PeriodicJob jobA("job", + [&countA, &mutex, &cv](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + countA++; + } + cv.notify_all(); + }, + intervalA); + + PeriodicRunner::PeriodicJob jobB("job", + [&countB, &mutex, &cv](Client*) { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + countB++; + } + cv.notify_all(); + }, + intervalB); + + runner().scheduleJob(std::move(jobA)); + runner().scheduleJob(std::move(jobB)); + + // Fast forward and wait for both jobs to run the right number of times + for (int i = 0; i <= 10; i++) { + clockSource().advance(intervalA); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); }); + } + } +} + +TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) { + stdx::mutex mutex; + stdx::condition_variable cv; + stdx::condition_variable doneCv; + bool a = false; + bool b = false; + + PeriodicRunner::PeriodicJob jobA("job", + [&](Client*) { + stdx::unique_lock<stdx::mutex> lk(mutex); + a = true; + + cv.notify_one(); + cv.wait(lk, [&] { return b; }); + doneCv.notify_one(); + }, + Milliseconds(1)); + + PeriodicRunner::PeriodicJob jobB("job", + [&](Client*) { + stdx::unique_lock<stdx::mutex> lk(mutex); + b = true; + + cv.notify_one(); + cv.wait(lk, [&] { return a; }); + doneCv.notify_one(); + }, + Milliseconds(1)); + + runner().scheduleJob(std::move(jobA)); + runner().scheduleJob(std::move(jobB)); + + clockSource().advance(Milliseconds(1)); + + { + stdx::unique_lock<stdx::mutex> lk(mutex); + doneCv.wait(lk, [&] { return a && b; }); + + ASSERT(a); + ASSERT(b); + } + + tearDown(); +} + +} // namespace +} // namespace mongo |