diff options
author | samantharitter <samantha.ritter@10gen.com> | 2017-05-08 17:48:48 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2017-05-15 13:36:13 -0400 |
commit | c31686212e0011909bbe13f8740fe4f45b8117ef (patch) | |
tree | 9ca4ebb8ccc933a83c72615ee588da2d148b9ee8 | |
parent | 71ddab2d041f94f1a5c04c2a218b68a448d08479 (diff) | |
download | mongo-c31686212e0011909bbe13f8740fe4f45b8117ef.tar.gz |
SERVER-29114 Fix PeriodicRunnerASIO and test
-rw-r--r-- | src/mongo/executor/async_timer_mock.cpp | 6 | ||||
-rw-r--r-- | src/mongo/executor/async_timer_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio.cpp | 26 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio.h | 8 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_asio_test.cpp | 74 |
5 files changed, 73 insertions, 43 deletions
diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp index 020991d8fe8..27b09fbb620 100644 --- a/src/mongo/executor/async_timer_mock.cpp +++ b/src/mongo/executor/async_timer_mock.cpp @@ -131,13 +131,13 @@ std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(Milliseconds ex std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(asio::io_service::strand* strand, Milliseconds expiration) { - stdx::lock_guard<stdx::mutex> lk(_timersMutex); + stdx::lock_guard<stdx::recursive_mutex> lk(_timersMutex); auto elem = _timers.emplace(std::make_shared<AsyncTimerMockImpl>(expiration)); return stdx::make_unique<AsyncTimerMock>(*elem.first); } void AsyncTimerFactoryMock::fastForward(Milliseconds time) { - stdx::lock_guard<stdx::mutex> lk(_timersMutex); + stdx::lock_guard<stdx::recursive_mutex> lk(_timersMutex); _curTime += time; @@ -149,7 +149,7 @@ void AsyncTimerFactoryMock::fastForward(Milliseconds time) { } Date_t AsyncTimerFactoryMock::now() { - stdx::lock_guard<stdx::mutex> lk(_timersMutex); + stdx::lock_guard<stdx::recursive_mutex> lk(_timersMutex); return Date_t::fromDurationSinceEpoch(_curTime); } diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h index 862d9a93863..c0595b95364 100644 --- a/src/mongo/executor/async_timer_mock.h +++ b/src/mongo/executor/async_timer_mock.h @@ -138,7 +138,7 @@ public: Date_t now() override; private: - stdx::mutex _timersMutex; + stdx::recursive_mutex _timersMutex; stdx::unordered_set<std::shared_ptr<AsyncTimerMockImpl>> _timers; Milliseconds _curTime; }; diff --git a/src/mongo/util/periodic_runner_asio.cpp b/src/mongo/util/periodic_runner_asio.cpp index 7af691e36d2..e5261a8a19f 100644 --- a/src/mongo/util/periodic_runner_asio.cpp +++ b/src/mongo/util/periodic_runner_asio.cpp @@ -51,27 +51,28 @@ PeriodicRunnerASIO::~PeriodicRunnerASIO() { } Status PeriodicRunnerASIO::scheduleJob(PeriodicJob job) { - stdx::unique_lock<stdx::mutex> lk(_runningMutex); - if (!_running) { - return {ErrorCodes::ShutdownInProgress, "The runner has been shut down."}; + { + stdx::unique_lock<stdx::mutex> lk(_runningMutex); + if (!_running) { + return {ErrorCodes::ShutdownInProgress, "The runner has been shut down."}; + } } // The interval we use here will get written over by _scheduleJob_inlock. auto uniqueTimer = _timerFactory->make(&_strand, Milliseconds{0}); std::shared_ptr<executor::AsyncTimerInterface> timer{std::move(uniqueTimer)}; - PeriodicJobASIO asioJob(std::move(job)); + PeriodicJobASIO asioJob(std::move(job), _timerFactory->now()); - _scheduleJob_inlock(std::move(asioJob), std::move(timer), lk); + _scheduleJob(std::move(asioJob), std::move(timer)); return Status::OK(); } -void PeriodicRunnerASIO::_scheduleJob_inlock(PeriodicJobASIO job, - std::shared_ptr<executor::AsyncTimerInterface> timer, - const stdx::unique_lock<stdx::mutex>& lk) { +void PeriodicRunnerASIO::_scheduleJob(PeriodicJobASIO job, + std::shared_ptr<executor::AsyncTimerInterface> timer) { // Adjust the timer to expire at the correct time. - auto adjustedMS = job.start + job.interval - Date_t::now(); + auto adjustedMS = job.start + job.interval - _timerFactory->now(); timer->expireAfter(adjustedMS); timer->asyncWait([ timer, this, job = std::move(job) ](std::error_code ec) mutable { if (ec) { @@ -84,14 +85,11 @@ void PeriodicRunnerASIO::_scheduleJob_inlock(PeriodicJobASIO job, return; } - job.start = Date_t::now(); + job.start = _timerFactory->now(); job.job(); _io_service.post([ timer, this, job = std::move(job) ]() mutable { - stdx::unique_lock<stdx::mutex> lk(_runningMutex); - if (_running) { - _scheduleJob_inlock(std::move(job), timer, lk); - } + _scheduleJob(std::move(job), timer); }); }); } diff --git a/src/mongo/util/periodic_runner_asio.h b/src/mongo/util/periodic_runner_asio.h index 1c41bd3182f..bdb4d6b7e40 100644 --- a/src/mongo/util/periodic_runner_asio.h +++ b/src/mongo/util/periodic_runner_asio.h @@ -71,16 +71,14 @@ public: private: struct PeriodicJobASIO { - explicit PeriodicJobASIO(PeriodicJob callable) - : job(std::move(callable.job)), interval(callable.interval), start(Date_t::now()) {} + explicit PeriodicJobASIO(PeriodicJob callable, Date_t startTime) + : job(std::move(callable.job)), interval(callable.interval), start(startTime) {} Job job; Milliseconds interval; Date_t start; }; - void _scheduleJob_inlock(PeriodicJobASIO job, - std::shared_ptr<executor::AsyncTimerInterface> timer, - const stdx::unique_lock<stdx::mutex>& lk); + void _scheduleJob(PeriodicJobASIO job, std::shared_ptr<executor::AsyncTimerInterface> timer); asio::io_service _io_service; asio::io_service::strand _strand; diff --git a/src/mongo/util/periodic_runner_asio_test.cpp b/src/mongo/util/periodic_runner_asio_test.cpp index 7f6a42fe425..1502c0a9f1a 100644 --- a/src/mongo/util/periodic_runner_asio_test.cpp +++ b/src/mongo/util/periodic_runner_asio_test.cpp @@ -30,7 +30,10 @@ #include "mongo/executor/async_timer_interface.h" #include "mongo/executor/async_timer_mock.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" #include "mongo/unittest/unittest.h" #include "mongo/util/periodic_runner_asio.h" @@ -67,26 +70,36 @@ TEST_F(PeriodicRunnerASIOTest, OneJobTest) { int count = 0; Milliseconds interval{5}; + stdx::mutex mutex; + stdx::condition_variable cv; + // Add a job, ensure that it runs once - PeriodicRunner::PeriodicJob job([&count] { count++; }, interval); + PeriodicRunner::PeriodicJob job( + [&count, &mutex, &cv] { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + count++; + } + cv.notify_all(); + }, + interval); + ASSERT_OK(runner()->scheduleJob(std::move(job))); // Ensure nothing happens until we fastForward - ASSERT_EQ(count, 0); - - // Fast forward, we should run once, and only once - timerFactory().fastForward(interval); - sleepmillis(10); - ASSERT_EQ(count, 1); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + ASSERT_EQ(count, 0); + } - // Fast forward again, we should run all ten times. - for (int i = 0; i < 9; i++) { + // Fast forward ten times, we should run all ten times. + for (int i = 0; i < 10; i++) { timerFactory().fastForward(interval); - // Give asio time to execute - sleepmillis(10); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&count, &i] { return count > i; }); + } } - - ASSERT_EQ(count, 10); } TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) { @@ -112,20 +125,41 @@ TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) { Milliseconds intervalA{5}; Milliseconds intervalB{10}; + stdx::mutex mutex; + stdx::condition_variable cv; + // Add two jobs, ensure they both run the proper number of times - PeriodicRunner::PeriodicJob jobA([&countA] { countA++; }, intervalA); - PeriodicRunner::PeriodicJob jobB([&countB] { countB++; }, intervalB); + PeriodicRunner::PeriodicJob jobA( + [&countA, &mutex, &cv] { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + countA++; + } + cv.notify_all(); + }, + intervalA); + + PeriodicRunner::PeriodicJob jobB( + [&countB, &mutex, &cv] { + { + stdx::unique_lock<stdx::mutex> lk(mutex); + countB++; + } + cv.notify_all(); + }, + intervalB); ASSERT_OK(runner()->scheduleJob(std::move(jobA))); ASSERT_OK(runner()->scheduleJob(std::move(jobB))); - for (int i = 0; i < 10; i++) { + // Fast forward and wait for both jobs to run the right number of times + for (int i = 0; i <= 10; i++) { timerFactory().fastForward(intervalA); - sleepmillis(10); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); }); + } } - - ASSERT_EQ(countA, 10); - ASSERT_EQ(countB, 5); } } // namespace |