summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-05-09 16:56:02 -0400
committerJason Carey <jcarey@argv.me>2018-05-10 19:49:37 -0400
commit714b97ba9c7dd8de3351eb811befce6c4b6efd63 (patch)
tree8d0412fe26082fcce784cf39daaa4974fb951b3d
parent9c001939d82c4522a8cb071b4285b1f1718b81cf (diff)
downloadmongo-714b97ba9c7dd8de3351eb811befce6c4b6efd63.tar.gz
SERVER-34923 PeriodicRunnerASIO -> Impl
Replace PeriodicRunnerASIO with PeriodicRunnerImpl, a simpler interface that runs jobs on dedicated threads.
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/db.cpp4
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp10
-rw-r--r--src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp3
-rw-r--r--src/mongo/db/service_liason_mock.cpp8
-rw-r--r--src/mongo/db/service_liason_mock.h6
-rw-r--r--src/mongo/s/server.cpp4
-rw-r--r--src/mongo/util/SConscript17
-rw-r--r--src/mongo/util/periodic_runner.h18
-rw-r--r--src/mongo/util/periodic_runner_asio.cpp163
-rw-r--r--src/mongo/util/periodic_runner_asio.h126
-rw-r--r--src/mongo/util/periodic_runner_asio_test.cpp270
-rw-r--r--src/mongo/util/periodic_runner_factory.cpp10
-rw-r--r--src/mongo/util/periodic_runner_factory.h4
-rw-r--r--src/mongo/util/periodic_runner_impl.cpp121
-rw-r--r--src/mongo/util/periodic_runner_impl.h81
-rw-r--r--src/mongo/util/periodic_runner_impl_test.cpp234
17 files changed, 483 insertions, 598 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 3821a34d592..921c8f01c9e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1183,7 +1183,7 @@ envWithAsio.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/executor/async_timer_mock',
- '$BUILD_DIR/mongo/util/periodic_runner_asio',
+ '$BUILD_DIR/mongo/util/periodic_runner_factory',
'kill_sessions',
'service_liason',
],
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index f8d1a583d29..f9b47ed91de 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -594,8 +594,8 @@ ExitCode _initAndListen(int listenPort) {
PeriodicTask::startRunningPeriodicTasks();
// Set up the periodic runner for background job execution
- auto runner = makePeriodicRunner();
- runner->startup().transitional_ignore();
+ auto runner = makePeriodicRunner(serviceContext);
+ runner->startup();
serviceContext->setPeriodicRunner(std::move(runner));
SessionKiller::set(serviceContext,
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index e0a5e8de314..f0f234d97af 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -65,10 +65,12 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
_sessionsColl(std::move(collection)),
_transactionReaper(std::move(transactionReaper)) {
if (!disableLogicalSessionCacheRefresh) {
- _service->scheduleJob(
- {[this](Client* client) { _periodicRefresh(client); }, _refreshInterval});
- _service->scheduleJob(
- {[this](Client* client) { _periodicReap(client); }, _refreshInterval});
+ _service->scheduleJob({"LogicalSessionCacheRefresh",
+ [this](Client* client) { _periodicRefresh(client); },
+ _refreshInterval});
+ _service->scheduleJob({"LogicalSessionCacheReap",
+ [this](Client* client) { _periodicReap(client); },
+ _refreshInterval});
}
_stats.setLastSessionsCollectionJobTimestamp(now());
_stats.setLastTransactionReaperJobTimestamp(now());
diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
index d6b394f6db1..ea79504ae31 100644
--- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
+++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp
@@ -59,6 +59,7 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex
// run until we reach transactionLifetimeLimitSeconds/2, at which point we run the code and
// reset 'seconds'. Etc.
PeriodicRunner::PeriodicJob job(
+ "startPeriodicThreadToAbortExpiredTransactions",
[](Client* client) {
try {
static int seconds = 0;
@@ -78,7 +79,7 @@ void startPeriodicThreadToAbortExpiredTransactions(ServiceContext* serviceContex
seconds = 0;
// The opCtx destructor handles unsetting itself from the Client.
- // (The PeriodicRunnerASIO's Client must be reset before returning.)
+ // (The PeriodicRunner's Client must be reset before returning.)
auto opCtx = client->makeOperationContext();
killAllExpiredTransactions(opCtx.get());
diff --git a/src/mongo/db/service_liason_mock.cpp b/src/mongo/db/service_liason_mock.cpp
index 57649dc704d..fbc09f88784 100644
--- a/src/mongo/db/service_liason_mock.cpp
+++ b/src/mongo/db/service_liason_mock.cpp
@@ -32,14 +32,14 @@
#include "mongo/db/service_liason_mock.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/periodic_runner_factory.h"
namespace mongo {
MockServiceLiasonImpl::MockServiceLiasonImpl() {
- auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>();
- _timerFactory = timerFactory.get();
- _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory));
- _runner->startup().transitional_ignore();
+ _timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>();
+ _runner = makePeriodicRunner(getGlobalServiceContext());
+ _runner->startup();
}
LogicalSessionIdSet MockServiceLiasonImpl::getActiveOpSessions() const {
diff --git a/src/mongo/db/service_liason_mock.h b/src/mongo/db/service_liason_mock.h
index 19a7d8c321f..66eea52d09f 100644
--- a/src/mongo/db/service_liason_mock.h
+++ b/src/mongo/db/service_liason_mock.h
@@ -35,7 +35,7 @@
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/periodic_runner_asio.h"
+#include "mongo/util/periodic_runner.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -82,8 +82,8 @@ public:
const SessionKiller::Matcher& matcher);
private:
- executor::AsyncTimerFactoryMock* _timerFactory;
- std::unique_ptr<PeriodicRunnerASIO> _runner;
+ std::unique_ptr<executor::AsyncTimerFactoryMock> _timerFactory;
+ std::unique_ptr<PeriodicRunner> _runner;
boost::optional<SessionKiller::Matcher> _matcher;
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 8e0db8b8e21..3c68ef51462 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -434,8 +434,8 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
PeriodicTask::startRunningPeriodicTasks();
// Set up the periodic runner for background job execution
- auto runner = makePeriodicRunner();
- runner->startup().transitional_ignore();
+ auto runner = makePeriodicRunner(serviceContext);
+ runner->startup();
serviceContext->setPeriodicRunner(std::move(runner));
SessionKiller::set(serviceContext,
diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript
index 43e58dd2dce..10363c867d0 100644
--- a/src/mongo/util/SConscript
+++ b/src/mongo/util/SConscript
@@ -257,28 +257,26 @@ env.Library(
)
env.Library(
- target="periodic_runner_asio",
+ target="periodic_runner_impl",
source=[
- "periodic_runner_asio.cpp",
+ "periodic_runner_impl.cpp",
],
LIBDEPS=[
"$BUILD_DIR/mongo/base",
"$BUILD_DIR/mongo/db/service_context",
- "$BUILD_DIR/third_party/shim_asio",
- "$BUILD_DIR/mongo/executor/async_timer_asio",
"periodic_runner",
],
)
env.CppUnitTest(
- target="periodic_runner_asio_test",
+ target="periodic_runner_impl_test",
source=[
- "periodic_runner_asio_test.cpp",
+ "periodic_runner_impl_test.cpp",
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/service_context_noop_init',
- "$BUILD_DIR/mongo/executor/async_timer_mock",
- "periodic_runner_asio",
+ "clock_source_mock",
+ "periodic_runner_impl",
],
)
@@ -288,8 +286,9 @@ env.Library(
'periodic_runner_factory.cpp',
],
LIBDEPS=[
+ "$BUILD_DIR/mongo/db/service_context",
'periodic_runner',
- 'periodic_runner_asio',
+ 'periodic_runner_impl',
],
)
diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h
index 6b80226aa55..710b812bb73 100644
--- a/src/mongo/util/periodic_runner.h
+++ b/src/mongo/util/periodic_runner.h
@@ -28,6 +28,8 @@
#pragma once
+#include <string>
+
#include "mongo/base/disallow_copying.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/time_support.h"
@@ -51,8 +53,13 @@ public:
using Job = stdx::function<void(Client* client)>;
struct PeriodicJob {
- PeriodicJob(Job callable, Milliseconds period)
- : job(std::move(callable)), interval(period) {}
+ PeriodicJob(std::string name, Job callable, Milliseconds period)
+ : name(std::move(name)), job(std::move(callable)), interval(period) {}
+
+ /**
+ * name of the job
+ */
+ std::string name;
/**
* A task to be run at regular intervals by the runner.
@@ -60,7 +67,7 @@ public:
Job job;
/**
- * An interval at which the job should be run. Defaults to 1 minute.
+ * An interval at which the job should be run.
*/
Milliseconds interval;
};
@@ -79,10 +86,9 @@ public:
* Starts up this periodic runner.
*
* This method may safely be called multiple times, either with or without
- * calls to shutdown() in between, but implementations may choose whether to
- * restart or error on subsequent calls to startup().
+ * calls to shutdown() in between.
*/
- virtual Status startup() = 0;
+ virtual void startup() = 0;
/**
* Shuts down this periodic runner. Stops all jobs from running.
diff --git a/src/mongo/util/periodic_runner_asio.cpp b/src/mongo/util/periodic_runner_asio.cpp
deleted file mode 100644
index 9776d50fecd..00000000000
--- a/src/mongo/util/periodic_runner_asio.cpp
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
-
-#include "mongo/platform/basic.h"
-
-#include <algorithm>
-#include <memory>
-
-#include "mongo/util/periodic_runner_asio.h"
-
-#include "mongo/db/client.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-PeriodicRunnerASIO::PeriodicRunnerASIO(
- std::unique_ptr<executor::AsyncTimerFactoryInterface> timerFactory)
- : _io_service(),
- _strand(_io_service),
- _timerFactory(std::move(timerFactory)),
- _state(State::kReady) {}
-
-PeriodicRunnerASIO::~PeriodicRunnerASIO() {
- // We must call shutdown here to join our background thread.
- shutdown();
-}
-
-void PeriodicRunnerASIO::scheduleJob(PeriodicJob job) {
- // The interval we use here will get written over by _scheduleJob_inlock.
- auto uniqueTimer = _timerFactory->make(&_strand, Milliseconds{0});
- std::shared_ptr<executor::AsyncTimerInterface> timer{std::move(uniqueTimer)};
-
- auto asioJob = std::make_shared<PeriodicJobASIO>(std::move(job), _timerFactory->now(), timer);
-
- {
- stdx::unique_lock<stdx::mutex> lk(_stateMutex);
- _jobs.insert(_jobs.end(), asioJob);
- if (_state == State::kRunning) {
- _scheduleJob(asioJob, true);
- _spawnThreads(lk);
- }
- }
-}
-
-void PeriodicRunnerASIO::_scheduleJob(std::weak_ptr<PeriodicJobASIO> job, bool firstTime) {
- auto lockedJob = job.lock();
- if (!lockedJob) {
- return;
- }
-
- // Adjust the timer to expire at the correct time.
- auto adjustedMS =
- std::max(Milliseconds(0), lockedJob->start + lockedJob->interval - _timerFactory->now());
- lockedJob->timer->expireAfter(adjustedMS);
- lockedJob->timer->asyncWait([this, job, firstTime](std::error_code ec) mutable {
- if (!firstTime) {
- if (ec) {
- severe() << "Encountered an error in PeriodicRunnerASIO: " << ec.message();
- return;
- }
-
- auto lockedJob = job.lock();
- if (!lockedJob) {
- return;
- }
-
- lockedJob->start = _timerFactory->now();
-
- lockedJob->job(Client::getCurrent());
- }
-
- _io_service.post([this, job]() mutable { _scheduleJob(job, false); });
- });
-}
-
-void PeriodicRunnerASIO::_spawnThreads(WithLock) {
- while (_threads.size() < _jobs.size()) {
- _threads.emplace_back([this] {
- try {
- auto client = getGlobalServiceContext()->makeClient("PeriodicRunnerASIO");
- Client::setCurrent(std::move(client));
-
- asio::io_service::work workItem(_io_service);
- std::error_code ec;
- _io_service.run(ec);
-
- client = Client::releaseCurrent();
-
- if (ec) {
- severe() << "Failure in PeriodicRunnerASIO: " << ec.message();
- fassertFailed(40438);
- }
- } catch (...) {
- severe() << "Uncaught exception in PeriodicRunnerASIO: " << exceptionToStatus();
- fassertFailed(40439);
- }
- });
- }
-}
-
-Status PeriodicRunnerASIO::startup() {
- stdx::unique_lock<stdx::mutex> lk(_stateMutex);
- if (_state != State::kReady) {
- return {ErrorCodes::ShutdownInProgress, "startup() already called"};
- }
-
- _state = State::kRunning;
-
- // schedule any jobs that we have
- for (auto& job : _jobs) {
- job->start = _timerFactory->now();
- _scheduleJob(job, true);
- }
-
- _spawnThreads(lk);
-
- return Status::OK();
-}
-
-void PeriodicRunnerASIO::shutdown() {
- stdx::unique_lock<stdx::mutex> lk(_stateMutex);
- if (_state == State::kRunning) {
- _state = State::kComplete;
-
- _io_service.stop();
- _jobs.clear();
-
- lk.unlock();
-
- for (auto& thread : _threads) {
- thread.join();
- }
- }
-}
-
-} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_asio.h b/src/mongo/util/periodic_runner_asio.h
deleted file mode 100644
index abb94dec984..00000000000
--- a/src/mongo/util/periodic_runner_asio.h
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include <asio.hpp>
-
-#include "mongo/executor/async_timer_interface.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/concurrency/with_lock.h"
-#include "mongo/util/periodic_runner.h"
-
-namespace mongo {
-
-class Client;
-
-/**
- * A PeriodicRunner implementation that uses the ASIO library's eventing system
- * to schedule and run jobs at regular intervals.
- *
- * This class takes a timer factory so that it may be mocked out for testing.
- *
- * The runner will set up a background thread per job and allow asio to distribute jobs across those
- * threads. Thus, scheduled jobs cannot block each other from running (a long running job can only
- * block itself). Scheduled jobs that require an operation context should use
- * Client::getCurrent()->makeOperationContext() to create one for themselves, and MUST clear it
- * before they return.
- *
- * The threads running internally will use the thread name "PeriodicRunnerASIO" and
- * anything logged from within a scheduled background task will use this thread name.
- * Scheduled tasks may set the thread name to a custom value as they run. However,
- * if they do this, they MUST set the thread name back to its original value before
- * they return.
- */
-class PeriodicRunnerASIO : public PeriodicRunner {
-public:
- using PeriodicRunner::PeriodicJob;
-
- /**
- * Construct a new instance of this class using the provided timer factory.
- */
- explicit PeriodicRunnerASIO(std::unique_ptr<executor::AsyncTimerFactoryInterface> timerFactory);
-
- ~PeriodicRunnerASIO();
-
- /**
- * Schedule a job to be run at periodic intervals.
- */
- void scheduleJob(PeriodicJob job) override;
-
- /**
- * Starts up this periodic runner.
- *
- * This periodic runner will only run once; if it is subsequently started up
- * again, it will return an error.
- */
- Status startup() override;
-
- /**
- * Shut down this periodic runner. Stops all jobs from running. This method
- * may safely be called multiple times, but only the first call will have any effect.
- */
- void shutdown() override;
-
-private:
- struct PeriodicJobASIO {
- explicit PeriodicJobASIO(PeriodicJob callable,
- Date_t startTime,
- std::shared_ptr<executor::AsyncTimerInterface> sharedTimer)
- : job(std::move(callable.job)),
- interval(callable.interval),
- start(startTime),
- timer(sharedTimer) {}
- Job job;
- Milliseconds interval;
- Date_t start;
- std::shared_ptr<executor::AsyncTimerInterface> timer;
- };
-
- // Internally, we will transition through these states
- enum class State { kReady, kRunning, kComplete };
-
- void _scheduleJob(std::weak_ptr<PeriodicJobASIO> job, bool firstTime);
-
- void _spawnThreads(WithLock);
-
- asio::io_service _io_service;
- asio::io_service::strand _strand;
-
- std::vector<stdx::thread> _threads;
-
- std::unique_ptr<executor::AsyncTimerFactoryInterface> _timerFactory;
-
- stdx::mutex _stateMutex;
- State _state;
-
- std::vector<std::shared_ptr<PeriodicJobASIO>> _jobs;
-};
-
-} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_asio_test.cpp b/src/mongo/util/periodic_runner_asio_test.cpp
deleted file mode 100644
index 88885dcd2a7..00000000000
--- a/src/mongo/util/periodic_runner_asio_test.cpp
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/executor/async_timer_interface.h"
-#include "mongo/executor/async_timer_mock.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/future.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/util/periodic_runner_asio.h"
-
-namespace mongo {
-
-class Client;
-
-namespace {
-
-class PeriodicRunnerASIOTestNoSetup : public unittest::Test {
-public:
- void setUp() override {
- auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>();
- _timerFactory = timerFactory.get();
- _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory));
- }
-
- void tearDown() override {
- _runner->shutdown();
- }
-
- executor::AsyncTimerFactoryMock& timerFactory() {
- return *_timerFactory;
- }
-
- std::unique_ptr<PeriodicRunnerASIO>& runner() {
- return _runner;
- }
-
- void sleepForReschedule(int jobs) {
- while (timerFactory().jobs() < jobs) {
- sleepmillis(2);
- }
- }
-
-protected:
- executor::AsyncTimerFactoryMock* _timerFactory;
- std::unique_ptr<PeriodicRunnerASIO> _runner;
-};
-
-class PeriodicRunnerASIOTest : public PeriodicRunnerASIOTestNoSetup {
-public:
- void setUp() override {
- auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>();
- _timerFactory = timerFactory.get();
- _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory));
- auto res = _runner->startup();
- ASSERT(res.isOK());
- }
-};
-
-TEST_F(PeriodicRunnerASIOTest, OneJobTest) {
- int count = 0;
- Milliseconds interval{5};
-
- stdx::mutex mutex;
- stdx::condition_variable cv;
-
- // Add a job, ensure that it runs once
- PeriodicRunner::PeriodicJob job(
- [&count, &mutex, &cv](Client*) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- count++;
- }
- cv.notify_all();
- },
- interval);
-
- runner()->scheduleJob(std::move(job));
-
- // Ensure nothing happens until we fastForward
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- ASSERT_EQ(count, 0);
- }
-
- // Fast forward ten times, we should run all ten times.
- for (int i = 0; i < 10; i++) {
- timerFactory().fastForward(interval);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count, &i] { return count > i; });
- }
- sleepForReschedule(2);
- }
-}
-
-TEST_F(PeriodicRunnerASIOTestNoSetup, ScheduleBeforeStartupTest) {
- int count = 0;
- Milliseconds interval{5};
-
- stdx::mutex mutex;
- stdx::condition_variable cv;
-
- // Schedule a job before startup
- PeriodicRunner::PeriodicJob job(
- [&count, &mutex, &cv](Client*) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- count++;
- }
- cv.notify_all();
- },
- interval);
-
- runner()->scheduleJob(std::move(job));
-
- // Start the runner, job should still run
- ASSERT(runner()->startup().isOK());
-
- timerFactory().fastForward(interval);
-
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count > 0; });
-}
-
-TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) {
- int count = 0;
- Milliseconds interval{5};
-
- // Schedule a job before shutdown
- PeriodicRunner::PeriodicJob job([&count](Client*) { count++; }, interval);
-
- runner()->scheduleJob(std::move(job));
-
- // Shut down before the job runs
- runner()->shutdown();
-
- // Even once we fast forward, job should not get run
- timerFactory().fastForward(interval);
- sleepmillis(10);
- ASSERT_EQ(count, 0);
-
- // Start up the runner again, this should error and should not run
- auto res = runner()->startup();
- ASSERT(!res.isOK());
-
- timerFactory().fastForward(interval);
- sleepmillis(10);
- ASSERT_EQ(count, 0);
-}
-
-TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) {
- int countA = 0;
- int countB = 0;
- Milliseconds intervalA{5};
- Milliseconds intervalB{10};
-
- stdx::mutex mutex;
- stdx::condition_variable cv;
-
- // Add two jobs, ensure they both run the proper number of times
- PeriodicRunner::PeriodicJob jobA(
- [&countA, &mutex, &cv](Client*) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- countA++;
- }
- cv.notify_all();
- },
- intervalA);
-
- PeriodicRunner::PeriodicJob jobB(
- [&countB, &mutex, &cv](Client*) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- countB++;
- }
- cv.notify_all();
- },
- intervalB);
-
- runner()->scheduleJob(std::move(jobA));
- runner()->scheduleJob(std::move(jobB));
-
- // Fast forward and wait for both jobs to run the right number of times
- for (int i = 0; i <= 10; i++) {
- timerFactory().fastForward(intervalA);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); });
- }
- sleepForReschedule(3);
- }
-}
-
-TEST_F(PeriodicRunnerASIOTest, TwoJobsDontDeadlock) {
- stdx::mutex mutex;
- stdx::condition_variable cv;
- stdx::condition_variable doneCv;
- bool a = false;
- bool b = false;
-
- PeriodicRunner::PeriodicJob jobA(
- [&](Client*) {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- a = true;
-
- cv.notify_one();
- cv.wait(lk, [&] { return b; });
- doneCv.notify_one();
- },
- Milliseconds(1));
-
- PeriodicRunner::PeriodicJob jobB(
- [&](Client*) {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- b = true;
-
- cv.notify_one();
- cv.wait(lk, [&] { return a; });
- doneCv.notify_one();
- },
- Milliseconds(1));
-
- runner()->scheduleJob(std::move(jobA));
- runner()->scheduleJob(std::move(jobB));
-
- timerFactory().fastForward(Milliseconds(1));
-
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- doneCv.wait(lk, [&] { return a && b; });
-
- ASSERT(a);
- ASSERT(b);
- }
-
- tearDown();
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_factory.cpp b/src/mongo/util/periodic_runner_factory.cpp
index d859ca844fd..370d7156892 100644
--- a/src/mongo/util/periodic_runner_factory.cpp
+++ b/src/mongo/util/periodic_runner_factory.cpp
@@ -30,15 +30,13 @@
#include "mongo/util/periodic_runner_factory.h"
-#include "mongo/executor/async_timer_asio.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/util/periodic_runner_asio.h"
+#include "mongo/db/service_context.h"
+#include "mongo/util/periodic_runner_impl.h"
namespace mongo {
-std::unique_ptr<PeriodicRunner> makePeriodicRunner() {
- return stdx::make_unique<PeriodicRunnerASIO>(
- stdx::make_unique<executor::AsyncTimerFactoryASIO>());
+std::unique_ptr<PeriodicRunner> makePeriodicRunner(ServiceContext* svc) {
+ return std::make_unique<PeriodicRunnerImpl>(svc, svc->getPreciseClockSource());
}
} // namespace
diff --git a/src/mongo/util/periodic_runner_factory.h b/src/mongo/util/periodic_runner_factory.h
index ecbf30fc52a..babd797dd11 100644
--- a/src/mongo/util/periodic_runner_factory.h
+++ b/src/mongo/util/periodic_runner_factory.h
@@ -34,9 +34,11 @@
namespace mongo {
+class ServiceContext;
+
/**
* Returns a new PeriodicRunner.
*/
-std::unique_ptr<PeriodicRunner> makePeriodicRunner();
+std::unique_ptr<PeriodicRunner> makePeriodicRunner(ServiceContext* svc);
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp
new file mode 100644
index 00000000000..03b0f67b40b
--- /dev/null
+++ b/src/mongo/util/periodic_runner_impl.cpp
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/util/periodic_runner_impl.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/service_context.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+PeriodicRunnerImpl::PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource)
+ : _svc(svc), _clockSource(clockSource) {}
+
+PeriodicRunnerImpl::~PeriodicRunnerImpl() {
+ shutdown();
+}
+
+void PeriodicRunnerImpl::scheduleJob(PeriodicJob job) {
+ auto impl = std::make_shared<PeriodicJobImpl>(std::move(job), this);
+
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _jobs.push_back(impl);
+ if (_running) {
+ impl->run();
+ }
+ }
+}
+
+void PeriodicRunnerImpl::startup() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_running) {
+ return;
+ }
+
+ _running = true;
+
+ // schedule any jobs that we have
+ for (auto job : _jobs) {
+ job->run();
+ }
+}
+
+void PeriodicRunnerImpl::shutdown() {
+ std::vector<stdx::thread> threads;
+ const auto guard = MakeGuard([&] {
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ });
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_running) {
+ _running = false;
+
+ for (auto&& job : _jobs) {
+ threads.push_back(std::move(job->thread));
+ }
+
+ _jobs.clear();
+
+ _condvar.notify_all();
+ }
+ }
+}
+
+PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, PeriodicRunnerImpl* parent)
+ : job(std::move(job)), parent(parent) {}
+
+void PeriodicRunnerImpl::PeriodicJobImpl::run() {
+ thread = stdx::thread([ this, anchor = shared_from_this() ] {
+ Client::initThread(job.name, parent->_svc, nullptr);
+
+ while (true) {
+ auto start = parent->_clockSource->now();
+
+ job.job(Client::getCurrent());
+
+ stdx::unique_lock<stdx::mutex> lk(parent->_mutex);
+ if (parent->_clockSource->waitForConditionUntil(
+ parent->_condvar, lk, start + job.interval, [&] {
+ return !parent->_running;
+ })) {
+ break;
+ }
+ }
+ });
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h
new file mode 100644
index 00000000000..555f12f2b7f
--- /dev/null
+++ b/src/mongo/util/periodic_runner_impl.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/periodic_runner.h"
+
+namespace mongo {
+
+class Client;
+class ServiceContext;
+
+/**
+ * An implementation of the PeriodicRunner which uses a thread per job and condvar waits on those
+ * threads to independently sleep.
+ */
+class PeriodicRunnerImpl : public PeriodicRunner {
+public:
+ PeriodicRunnerImpl(ServiceContext* svc, ClockSource* clockSource);
+ ~PeriodicRunnerImpl();
+
+ void scheduleJob(PeriodicJob job) override;
+
+ void startup() override;
+
+ void shutdown() override;
+
+private:
+ struct PeriodicJobImpl : public std::enable_shared_from_this<PeriodicJobImpl> {
+ PeriodicJobImpl(PeriodicJob job, PeriodicRunnerImpl* parent);
+
+ void run();
+
+ PeriodicJob job;
+ PeriodicRunnerImpl* parent;
+ stdx::thread thread;
+ };
+
+ ServiceContext* _svc;
+ ClockSource* _clockSource;
+
+ std::vector<std::shared_ptr<PeriodicJobImpl>> _jobs;
+
+ stdx::mutex _mutex;
+ stdx::condition_variable _condvar;
+ bool _running = false;
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp
new file mode 100644
index 00000000000..210871ea4db
--- /dev/null
+++ b/src/mongo/util/periodic_runner_impl_test.cpp
@@ -0,0 +1,234 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/unittest/unittest.h"
+
+#include "mongo/util/periodic_runner_impl.h"
+
+#include "mongo/db/service_context_noop.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/clock_source_mock.h"
+
+namespace mongo {
+
+class Client;
+
+namespace {
+
+class PeriodicRunnerImplTestNoSetup : public unittest::Test {
+public:
+ void setUp() override {
+ _clockSource = std::make_unique<ClockSourceMock>();
+ _svc = stdx::make_unique<ServiceContextNoop>();
+ _runner = stdx::make_unique<PeriodicRunnerImpl>(_svc.get(), _clockSource.get());
+ }
+
+ void tearDown() override {
+ _runner->shutdown();
+ }
+
+ ClockSourceMock& clockSource() {
+ return *_clockSource;
+ }
+
+ PeriodicRunner& runner() {
+ return *_runner;
+ }
+
+private:
+ std::unique_ptr<ServiceContext> _svc;
+ std::unique_ptr<ClockSourceMock> _clockSource;
+ std::unique_ptr<PeriodicRunner> _runner;
+};
+
+class PeriodicRunnerImplTest : public PeriodicRunnerImplTestNoSetup {
+public:
+ void setUp() override {
+ PeriodicRunnerImplTestNoSetup::setUp();
+ runner().startup();
+ }
+};
+
+TEST_F(PeriodicRunnerImplTest, OneJobTest) {
+ int count = 0;
+ Milliseconds interval{5};
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Add a job, ensure that it runs once
+ PeriodicRunner::PeriodicJob job("job",
+ [&count, &mutex, &cv](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ count++;
+ }
+ cv.notify_all();
+ },
+ interval);
+
+ runner().scheduleJob(std::move(job));
+
+ // Ensure nothing happens until we fastForward
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ ASSERT_EQ(count, 0);
+ }
+
+ // Fast forward ten times, we should run all ten times.
+ for (int i = 0; i < 10; i++) {
+ clockSource().advance(interval);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&count, &i] { return count > i; });
+ }
+ }
+}
+
+TEST_F(PeriodicRunnerImplTestNoSetup, ScheduleBeforeStartupTest) {
+ int count = 0;
+ Milliseconds interval{5};
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Schedule a job before startup
+ PeriodicRunner::PeriodicJob job("job",
+ [&count, &mutex, &cv](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ count++;
+ }
+ cv.notify_all();
+ },
+ interval);
+
+ runner().scheduleJob(std::move(job));
+
+ // Start the runner, job should still run
+ runner().startup();
+
+ clockSource().advance(interval);
+
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&count] { return count > 0; });
+}
+
+TEST_F(PeriodicRunnerImplTest, TwoJobsTest) {
+ int countA = 0;
+ int countB = 0;
+ Milliseconds intervalA{5};
+ Milliseconds intervalB{10};
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Add two jobs, ensure they both run the proper number of times
+ PeriodicRunner::PeriodicJob jobA("job",
+ [&countA, &mutex, &cv](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ countA++;
+ }
+ cv.notify_all();
+ },
+ intervalA);
+
+ PeriodicRunner::PeriodicJob jobB("job",
+ [&countB, &mutex, &cv](Client*) {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ countB++;
+ }
+ cv.notify_all();
+ },
+ intervalB);
+
+ runner().scheduleJob(std::move(jobA));
+ runner().scheduleJob(std::move(jobB));
+
+ // Fast forward and wait for both jobs to run the right number of times
+ for (int i = 0; i <= 10; i++) {
+ clockSource().advance(intervalA);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); });
+ }
+ }
+}
+
+TEST_F(PeriodicRunnerImplTest, TwoJobsDontDeadlock) {
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+ stdx::condition_variable doneCv;
+ bool a = false;
+ bool b = false;
+
+ PeriodicRunner::PeriodicJob jobA("job",
+ [&](Client*) {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ a = true;
+
+ cv.notify_one();
+ cv.wait(lk, [&] { return b; });
+ doneCv.notify_one();
+ },
+ Milliseconds(1));
+
+ PeriodicRunner::PeriodicJob jobB("job",
+ [&](Client*) {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ b = true;
+
+ cv.notify_one();
+ cv.wait(lk, [&] { return a; });
+ doneCv.notify_one();
+ },
+ Milliseconds(1));
+
+ runner().scheduleJob(std::move(jobA));
+ runner().scheduleJob(std::move(jobB));
+
+ clockSource().advance(Milliseconds(1));
+
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ doneCv.wait(lk, [&] { return a && b; });
+
+ ASSERT(a);
+ ASSERT(b);
+ }
+
+ tearDown();
+}
+
+} // namespace
+} // namespace mongo