diff options
author | samantharitter <samantha.ritter@10gen.com> | 2017-05-19 11:40:26 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2017-06-01 11:54:52 -0400 |
commit | aa6b880061964e9456cbc7f278a9bf1117d7c815 (patch) | |
tree | a8be0de989038888a0cba4f3d83e91af5efd060d | |
parent | cd2e5f07df13612b68deea94b82b001fca119373 (diff) | |
download | mongo-aa6b880061964e9456cbc7f278a9bf1117d7c815.tar.gz |
SERVER-29283 Fix circular shared_ptr dependency in the periodic runner
-rw-r--r-- | src/mongo/util/periodic_runner.h | 15 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio.cpp | 107 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio.h | 31 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio_test.cpp | 62 |
4 files changed, 153 insertions, 62 deletions
diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h index c8b2d1e702a..eaf5170b5cd 100644 --- a/src/mongo/util/periodic_runner.h +++ b/src/mongo/util/periodic_runner.h @@ -63,16 +63,27 @@ public: /** * Schedules a job to be run at periodic intervals. + * + * If the runner is not running when a job is scheduled, that job should + * be saved so that it may run in the future once startup() is called. */ - virtual Status scheduleJob(PeriodicJob job) = 0; + virtual void scheduleJob(PeriodicJob job) = 0; /** * Starts up this periodic runner. + * + * This method may safely be called multiple times, either with or without + * calls to shutdown() in between, but implementations may choose whether to + * restart or error on subsequent calls to startup(). */ - virtual void startup() = 0; + virtual Status startup() = 0; /** * Shuts down this periodic runner. Stops all jobs from running. + * + * This method may safely be called multiple times, either with or without + * calls to startup() in between. Any jobs that have been scheduled on this + * runner should no longer execute once shutdown() is called. */ virtual void shutdown() = 0; }; diff --git a/src/mongo/util/periodic_runner_asio.cpp b/src/mongo/util/periodic_runner_asio.cpp index e5261a8a19f..1fd59ede31c 100644 --- a/src/mongo/util/periodic_runner_asio.cpp +++ b/src/mongo/util/periodic_runner_asio.cpp @@ -43,83 +43,100 @@ PeriodicRunnerASIO::PeriodicRunnerASIO( : _io_service(), _strand(_io_service), _timerFactory(std::move(timerFactory)), - _running(false) {} + _state(State::kReady) {} PeriodicRunnerASIO::~PeriodicRunnerASIO() { // We must call shutdown here to join our background thread. shutdown(); } -Status PeriodicRunnerASIO::scheduleJob(PeriodicJob job) { - { - stdx::unique_lock<stdx::mutex> lk(_runningMutex); - if (!_running) { - return {ErrorCodes::ShutdownInProgress, "The runner has been shut down."}; - } - } - +void PeriodicRunnerASIO::scheduleJob(PeriodicJob job) { // The interval we use here will get written over by _scheduleJob_inlock. auto uniqueTimer = _timerFactory->make(&_strand, Milliseconds{0}); std::shared_ptr<executor::AsyncTimerInterface> timer{std::move(uniqueTimer)}; - PeriodicJobASIO asioJob(std::move(job), _timerFactory->now()); - - _scheduleJob(std::move(asioJob), std::move(timer)); + auto asioJob = std::make_shared<PeriodicJobASIO>(std::move(job), _timerFactory->now(), timer); + _jobs.insert(_jobs.end(), asioJob); - return Status::OK(); + { + stdx::unique_lock<stdx::mutex> lk(_stateMutex); + if (_state == State::kRunning) { + _scheduleJob(asioJob); + } + } } -void PeriodicRunnerASIO::_scheduleJob(PeriodicJobASIO job, - std::shared_ptr<executor::AsyncTimerInterface> timer) { +void PeriodicRunnerASIO::_scheduleJob(std::weak_ptr<PeriodicJobASIO> job) { + auto lockedJob = job.lock(); + if (!lockedJob) { + return; + } + // Adjust the timer to expire at the correct time. - auto adjustedMS = job.start + job.interval - _timerFactory->now(); - timer->expireAfter(adjustedMS); - timer->asyncWait([ timer, this, job = std::move(job) ](std::error_code ec) mutable { + auto adjustedMS = lockedJob->start + lockedJob->interval - _timerFactory->now(); + lockedJob->timer->expireAfter(adjustedMS); + lockedJob->timer->asyncWait([this, job](std::error_code ec) mutable { if (ec) { severe() << "Encountered an error in PeriodicRunnerASIO: " << ec.message(); return; } - stdx::unique_lock<stdx::mutex> lk(_runningMutex); - if (!_running) { + stdx::unique_lock<stdx::mutex> lk(_stateMutex); + if (_state != State::kRunning) { return; } - job.start = _timerFactory->now(); - job.job(); + auto lockedJob = job.lock(); + if (!lockedJob) { + return; + } + + lockedJob->start = _timerFactory->now(); + lockedJob->job(); - _io_service.post([ timer, this, job = std::move(job) ]() mutable { - _scheduleJob(std::move(job), timer); - }); + _io_service.post([this, job]() mutable { _scheduleJob(job); }); }); } -void PeriodicRunnerASIO::startup() { - stdx::unique_lock<stdx::mutex> lk(_runningMutex); - if (!_running) { - _running = true; - _thread = stdx::thread([this]() { - try { - asio::io_service::work work(_io_service); - std::error_code ec; - _io_service.run(ec); - if (ec) { - severe() << "Failure in PeriodicRunnerASIO: " << ec.message(); - fassertFailed(40438); - } - } catch (...) { - severe() << "Uncaught exception in PeriodicRunnerASIO: " << exceptionToStatus(); - fassertFailed(40439); +Status PeriodicRunnerASIO::startup() { + stdx::unique_lock<stdx::mutex> lk(_stateMutex); + if (_state != State::kReady) { + return {ErrorCodes::ShutdownInProgress, "startup() already called"}; + } + + _state = State::kRunning; + _thread = stdx::thread([this]() { + try { + asio::io_service::work workItem(_io_service); + std::error_code ec; + _io_service.run(ec); + if (ec) { + severe() << "Failure in PeriodicRunnerASIO: " << ec.message(); + fassertFailed(40438); } - }); + } catch (...) { + severe() << "Uncaught exception in PeriodicRunnerASIO: " << exceptionToStatus(); + fassertFailed(40439); + } + }); + + // schedule any jobs that we have + for (auto& job : _jobs) { + job->start = _timerFactory->now(); + _scheduleJob(job); } + + return Status::OK(); } void PeriodicRunnerASIO::shutdown() { - stdx::unique_lock<stdx::mutex> lk(_runningMutex); - if (_running) { - _running = false; + stdx::unique_lock<stdx::mutex> lk(_stateMutex); + if (_state == State::kRunning) { + _state = State::kComplete; + _io_service.stop(); + + _jobs.clear(); _thread.join(); } } diff --git a/src/mongo/util/periodic_runner_asio.h b/src/mongo/util/periodic_runner_asio.h index bdb4d6b7e40..b35f29bd66f 100644 --- a/src/mongo/util/periodic_runner_asio.h +++ b/src/mongo/util/periodic_runner_asio.h @@ -57,28 +57,41 @@ public: /** * Schedule a job to be run at periodic intervals. */ - Status scheduleJob(PeriodicJob job) override; + void scheduleJob(PeriodicJob job) override; /** * Starts up this periodic runner. + * + * This periodic runner will only run once; if it is subsequently started up + * again, it will return an error. */ - void startup() override; + Status startup() override; /** - * Shut down this periodic runner. Stops all jobs from running. + * Shut down this periodic runner. Stops all jobs from running. This method + * may safely be called multiple times, but only the first call will have any effect. */ void shutdown() override; private: struct PeriodicJobASIO { - explicit PeriodicJobASIO(PeriodicJob callable, Date_t startTime) - : job(std::move(callable.job)), interval(callable.interval), start(startTime) {} + explicit PeriodicJobASIO(PeriodicJob callable, + Date_t startTime, + std::shared_ptr<executor::AsyncTimerInterface> sharedTimer) + : job(std::move(callable.job)), + interval(callable.interval), + start(startTime), + timer(sharedTimer) {} Job job; Milliseconds interval; Date_t start; + std::shared_ptr<executor::AsyncTimerInterface> timer; }; - void _scheduleJob(PeriodicJobASIO job, std::shared_ptr<executor::AsyncTimerInterface> timer); + // Internally, we will transition through these states + enum class State { kReady, kRunning, kComplete }; + + void _scheduleJob(std::weak_ptr<PeriodicJobASIO> job); asio::io_service _io_service; asio::io_service::strand _strand; @@ -87,8 +100,10 @@ private: std::unique_ptr<executor::AsyncTimerFactoryInterface> _timerFactory; - stdx::mutex _runningMutex; - bool _running; + stdx::mutex _stateMutex; + State _state; + + std::vector<std::shared_ptr<PeriodicJobASIO>> _jobs; }; } // namespace mongo diff --git a/src/mongo/util/periodic_runner_asio_test.cpp b/src/mongo/util/periodic_runner_asio_test.cpp index 1502c0a9f1a..4094ecb3dfe 100644 --- a/src/mongo/util/periodic_runner_asio_test.cpp +++ b/src/mongo/util/periodic_runner_asio_test.cpp @@ -40,13 +40,12 @@ namespace mongo { namespace { -class PeriodicRunnerASIOTest : public unittest::Test { +class PeriodicRunnerASIOTestNoSetup : public unittest::Test { public: void setUp() override { auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>(); _timerFactory = timerFactory.get(); _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory)); - _runner->startup(); } void tearDown() override { @@ -61,11 +60,22 @@ public: return _runner; } -private: +protected: executor::AsyncTimerFactoryMock* _timerFactory; std::unique_ptr<PeriodicRunnerASIO> _runner; }; +class PeriodicRunnerASIOTest : public PeriodicRunnerASIOTestNoSetup { +public: + void setUp() override { + auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>(); + _timerFactory = timerFactory.get(); + _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory)); + auto res = _runner->startup(); + ASSERT(res.isOK()); + } +}; + TEST_F(PeriodicRunnerASIOTest, OneJobTest) { int count = 0; Milliseconds interval{5}; @@ -84,7 +94,7 @@ TEST_F(PeriodicRunnerASIOTest, OneJobTest) { }, interval); - ASSERT_OK(runner()->scheduleJob(std::move(job))); + runner()->scheduleJob(std::move(job)); // Ensure nothing happens until we fastForward { @@ -102,13 +112,43 @@ TEST_F(PeriodicRunnerASIOTest, OneJobTest) { } } +TEST_F(PeriodicRunnerASIOTestNoSetup, ScheduleBeforeStartupTest) { + int count = 0; + Milliseconds interval{5}; + + stdx::mutex mutex; + stdx::condition_variable cv; + + // Schedule a job before startup + PeriodicRunner::PeriodicJob job( + [&count, &mutex, &cv] { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + count++; + } + cv.notify_all(); + }, + interval); + + runner()->scheduleJob(std::move(job)); + + // Start the runner, job should still run + ASSERT(runner()->startup().isOK()); + + timerFactory().fastForward(interval); + + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&count] { return count > 0; }); +} + TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) { int count = 0; Milliseconds interval{5}; // Schedule a job before shutdown PeriodicRunner::PeriodicJob job([&count] { count++; }, interval); - ASSERT_OK(runner()->scheduleJob(std::move(job))); + + runner()->scheduleJob(std::move(job)); // Shut down before the job runs runner()->shutdown(); @@ -117,6 +157,14 @@ TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) { timerFactory().fastForward(interval); sleepmillis(10); ASSERT_EQ(count, 0); + + // Start up the runner again, this should error and should not run + auto res = runner()->startup(); + ASSERT(!res.isOK()); + + timerFactory().fastForward(interval); + sleepmillis(10); + ASSERT_EQ(count, 0); } TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) { @@ -149,8 +197,8 @@ TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) { }, intervalB); - ASSERT_OK(runner()->scheduleJob(std::move(jobA))); - ASSERT_OK(runner()->scheduleJob(std::move(jobB))); + runner()->scheduleJob(std::move(jobA)); + runner()->scheduleJob(std::move(jobB)); // Fast forward and wait for both jobs to run the right number of times for (int i = 0; i <= 10; i++) { |