summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsamantharitter <samantha.ritter@10gen.com>2017-05-08 17:48:48 -0400
committersamantharitter <samantha.ritter@10gen.com>2017-05-15 13:36:13 -0400
commitc31686212e0011909bbe13f8740fe4f45b8117ef (patch)
tree9ca4ebb8ccc933a83c72615ee588da2d148b9ee8
parent71ddab2d041f94f1a5c04c2a218b68a448d08479 (diff)
downloadmongo-c31686212e0011909bbe13f8740fe4f45b8117ef.tar.gz
SERVER-29114 Fix PeriodicRunnerASIO and test
-rw-r--r--src/mongo/executor/async_timer_mock.cpp6
-rw-r--r--src/mongo/executor/async_timer_mock.h2
-rw-r--r--src/mongo/util/periodic_runner_asio.cpp26
-rw-r--r--src/mongo/util/periodic_runner_asio.h8
-rw-r--r--src/mongo/util/periodic_runner_asio_test.cpp74
5 files changed, 73 insertions, 43 deletions
diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp
index 020991d8fe8..27b09fbb620 100644
--- a/src/mongo/executor/async_timer_mock.cpp
+++ b/src/mongo/executor/async_timer_mock.cpp
@@ -131,13 +131,13 @@ std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(Milliseconds ex
std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(asio::io_service::strand* strand,
Milliseconds expiration) {
- stdx::lock_guard<stdx::mutex> lk(_timersMutex);
+ stdx::lock_guard<stdx::recursive_mutex> lk(_timersMutex);
auto elem = _timers.emplace(std::make_shared<AsyncTimerMockImpl>(expiration));
return stdx::make_unique<AsyncTimerMock>(*elem.first);
}
void AsyncTimerFactoryMock::fastForward(Milliseconds time) {
- stdx::lock_guard<stdx::mutex> lk(_timersMutex);
+ stdx::lock_guard<stdx::recursive_mutex> lk(_timersMutex);
_curTime += time;
@@ -149,7 +149,7 @@ void AsyncTimerFactoryMock::fastForward(Milliseconds time) {
}
Date_t AsyncTimerFactoryMock::now() {
- stdx::lock_guard<stdx::mutex> lk(_timersMutex);
+ stdx::lock_guard<stdx::recursive_mutex> lk(_timersMutex);
return Date_t::fromDurationSinceEpoch(_curTime);
}
diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h
index 862d9a93863..c0595b95364 100644
--- a/src/mongo/executor/async_timer_mock.h
+++ b/src/mongo/executor/async_timer_mock.h
@@ -138,7 +138,7 @@ public:
Date_t now() override;
private:
- stdx::mutex _timersMutex;
+ stdx::recursive_mutex _timersMutex;
stdx::unordered_set<std::shared_ptr<AsyncTimerMockImpl>> _timers;
Milliseconds _curTime;
};
diff --git a/src/mongo/util/periodic_runner_asio.cpp b/src/mongo/util/periodic_runner_asio.cpp
index 7af691e36d2..e5261a8a19f 100644
--- a/src/mongo/util/periodic_runner_asio.cpp
+++ b/src/mongo/util/periodic_runner_asio.cpp
@@ -51,27 +51,28 @@ PeriodicRunnerASIO::~PeriodicRunnerASIO() {
}
Status PeriodicRunnerASIO::scheduleJob(PeriodicJob job) {
- stdx::unique_lock<stdx::mutex> lk(_runningMutex);
- if (!_running) {
- return {ErrorCodes::ShutdownInProgress, "The runner has been shut down."};
+ {
+ stdx::unique_lock<stdx::mutex> lk(_runningMutex);
+ if (!_running) {
+ return {ErrorCodes::ShutdownInProgress, "The runner has been shut down."};
+ }
}
// The interval we use here will get written over by _scheduleJob_inlock.
auto uniqueTimer = _timerFactory->make(&_strand, Milliseconds{0});
std::shared_ptr<executor::AsyncTimerInterface> timer{std::move(uniqueTimer)};
- PeriodicJobASIO asioJob(std::move(job));
+ PeriodicJobASIO asioJob(std::move(job), _timerFactory->now());
- _scheduleJob_inlock(std::move(asioJob), std::move(timer), lk);
+ _scheduleJob(std::move(asioJob), std::move(timer));
return Status::OK();
}
-void PeriodicRunnerASIO::_scheduleJob_inlock(PeriodicJobASIO job,
- std::shared_ptr<executor::AsyncTimerInterface> timer,
- const stdx::unique_lock<stdx::mutex>& lk) {
+void PeriodicRunnerASIO::_scheduleJob(PeriodicJobASIO job,
+ std::shared_ptr<executor::AsyncTimerInterface> timer) {
// Adjust the timer to expire at the correct time.
- auto adjustedMS = job.start + job.interval - Date_t::now();
+ auto adjustedMS = job.start + job.interval - _timerFactory->now();
timer->expireAfter(adjustedMS);
timer->asyncWait([ timer, this, job = std::move(job) ](std::error_code ec) mutable {
if (ec) {
@@ -84,14 +85,11 @@ void PeriodicRunnerASIO::_scheduleJob_inlock(PeriodicJobASIO job,
return;
}
- job.start = Date_t::now();
+ job.start = _timerFactory->now();
job.job();
_io_service.post([ timer, this, job = std::move(job) ]() mutable {
- stdx::unique_lock<stdx::mutex> lk(_runningMutex);
- if (_running) {
- _scheduleJob_inlock(std::move(job), timer, lk);
- }
+ _scheduleJob(std::move(job), timer);
});
});
}
diff --git a/src/mongo/util/periodic_runner_asio.h b/src/mongo/util/periodic_runner_asio.h
index 1c41bd3182f..bdb4d6b7e40 100644
--- a/src/mongo/util/periodic_runner_asio.h
+++ b/src/mongo/util/periodic_runner_asio.h
@@ -71,16 +71,14 @@ public:
private:
struct PeriodicJobASIO {
- explicit PeriodicJobASIO(PeriodicJob callable)
- : job(std::move(callable.job)), interval(callable.interval), start(Date_t::now()) {}
+ explicit PeriodicJobASIO(PeriodicJob callable, Date_t startTime)
+ : job(std::move(callable.job)), interval(callable.interval), start(startTime) {}
Job job;
Milliseconds interval;
Date_t start;
};
- void _scheduleJob_inlock(PeriodicJobASIO job,
- std::shared_ptr<executor::AsyncTimerInterface> timer,
- const stdx::unique_lock<stdx::mutex>& lk);
+ void _scheduleJob(PeriodicJobASIO job, std::shared_ptr<executor::AsyncTimerInterface> timer);
asio::io_service _io_service;
asio::io_service::strand _strand;
diff --git a/src/mongo/util/periodic_runner_asio_test.cpp b/src/mongo/util/periodic_runner_asio_test.cpp
index 7f6a42fe425..1502c0a9f1a 100644
--- a/src/mongo/util/periodic_runner_asio_test.cpp
+++ b/src/mongo/util/periodic_runner_asio_test.cpp
@@ -30,7 +30,10 @@
#include "mongo/executor/async_timer_interface.h"
#include "mongo/executor/async_timer_mock.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/periodic_runner_asio.h"
@@ -67,26 +70,36 @@ TEST_F(PeriodicRunnerASIOTest, OneJobTest) {
int count = 0;
Milliseconds interval{5};
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
// Add a job, ensure that it runs once
- PeriodicRunner::PeriodicJob job([&count] { count++; }, interval);
+ PeriodicRunner::PeriodicJob job(
+ [&count, &mutex, &cv] {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ count++;
+ }
+ cv.notify_all();
+ },
+ interval);
+
ASSERT_OK(runner()->scheduleJob(std::move(job)));
// Ensure nothing happens until we fastForward
- ASSERT_EQ(count, 0);
-
- // Fast forward, we should run once, and only once
- timerFactory().fastForward(interval);
- sleepmillis(10);
- ASSERT_EQ(count, 1);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ ASSERT_EQ(count, 0);
+ }
- // Fast forward again, we should run all ten times.
- for (int i = 0; i < 9; i++) {
+ // Fast forward ten times, we should run all ten times.
+ for (int i = 0; i < 10; i++) {
timerFactory().fastForward(interval);
- // Give asio time to execute
- sleepmillis(10);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&count, &i] { return count > i; });
+ }
}
-
- ASSERT_EQ(count, 10);
}
TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) {
@@ -112,20 +125,41 @@ TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) {
Milliseconds intervalA{5};
Milliseconds intervalB{10};
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
// Add two jobs, ensure they both run the proper number of times
- PeriodicRunner::PeriodicJob jobA([&countA] { countA++; }, intervalA);
- PeriodicRunner::PeriodicJob jobB([&countB] { countB++; }, intervalB);
+ PeriodicRunner::PeriodicJob jobA(
+ [&countA, &mutex, &cv] {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ countA++;
+ }
+ cv.notify_all();
+ },
+ intervalA);
+
+ PeriodicRunner::PeriodicJob jobB(
+ [&countB, &mutex, &cv] {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ countB++;
+ }
+ cv.notify_all();
+ },
+ intervalB);
ASSERT_OK(runner()->scheduleJob(std::move(jobA)));
ASSERT_OK(runner()->scheduleJob(std::move(jobB)));
- for (int i = 0; i < 10; i++) {
+ // Fast forward and wait for both jobs to run the right number of times
+ for (int i = 0; i <= 10; i++) {
timerFactory().fastForward(intervalA);
- sleepmillis(10);
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&countA, &countB, &i] { return (countA > i && countB >= i / 2); });
+ }
}
-
- ASSERT_EQ(countA, 10);
- ASSERT_EQ(countB, 5);
}
} // namespace