summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsamantharitter <samantha.ritter@10gen.com>2017-05-19 11:40:26 -0400
committersamantharitter <samantha.ritter@10gen.com>2017-06-01 11:54:52 -0400
commitaa6b880061964e9456cbc7f278a9bf1117d7c815 (patch)
treea8be0de989038888a0cba4f3d83e91af5efd060d
parentcd2e5f07df13612b68deea94b82b001fca119373 (diff)
downloadmongo-aa6b880061964e9456cbc7f278a9bf1117d7c815.tar.gz
SERVER-29283 Fix circular shared_ptr dependency in the periodic runner
-rw-r--r--src/mongo/util/periodic_runner.h15
-rw-r--r--src/mongo/util/periodic_runner_asio.cpp107
-rw-r--r--src/mongo/util/periodic_runner_asio.h31
-rw-r--r--src/mongo/util/periodic_runner_asio_test.cpp62
4 files changed, 153 insertions, 62 deletions
diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h
index c8b2d1e702a..eaf5170b5cd 100644
--- a/src/mongo/util/periodic_runner.h
+++ b/src/mongo/util/periodic_runner.h
@@ -63,16 +63,27 @@ public:
/**
* Schedules a job to be run at periodic intervals.
+ *
+ * If the runner is not running when a job is scheduled, that job should
+ * be saved so that it may run in the future once startup() is called.
*/
- virtual Status scheduleJob(PeriodicJob job) = 0;
+ virtual void scheduleJob(PeriodicJob job) = 0;
/**
* Starts up this periodic runner.
+ *
+ * This method may safely be called multiple times, either with or without
+ * calls to shutdown() in between, but implementations may choose whether to
+ * restart or error on subsequent calls to startup().
*/
- virtual void startup() = 0;
+ virtual Status startup() = 0;
/**
* Shuts down this periodic runner. Stops all jobs from running.
+ *
+ * This method may safely be called multiple times, either with or without
+ * calls to startup() in between. Any jobs that have been scheduled on this
+ * runner should no longer execute once shutdown() is called.
*/
virtual void shutdown() = 0;
};
diff --git a/src/mongo/util/periodic_runner_asio.cpp b/src/mongo/util/periodic_runner_asio.cpp
index e5261a8a19f..1fd59ede31c 100644
--- a/src/mongo/util/periodic_runner_asio.cpp
+++ b/src/mongo/util/periodic_runner_asio.cpp
@@ -43,83 +43,100 @@ PeriodicRunnerASIO::PeriodicRunnerASIO(
: _io_service(),
_strand(_io_service),
_timerFactory(std::move(timerFactory)),
- _running(false) {}
+ _state(State::kReady) {}
PeriodicRunnerASIO::~PeriodicRunnerASIO() {
// We must call shutdown here to join our background thread.
shutdown();
}
-Status PeriodicRunnerASIO::scheduleJob(PeriodicJob job) {
- {
- stdx::unique_lock<stdx::mutex> lk(_runningMutex);
- if (!_running) {
- return {ErrorCodes::ShutdownInProgress, "The runner has been shut down."};
- }
- }
-
+void PeriodicRunnerASIO::scheduleJob(PeriodicJob job) {
// The interval we use here will get written over by _scheduleJob_inlock.
auto uniqueTimer = _timerFactory->make(&_strand, Milliseconds{0});
std::shared_ptr<executor::AsyncTimerInterface> timer{std::move(uniqueTimer)};
- PeriodicJobASIO asioJob(std::move(job), _timerFactory->now());
-
- _scheduleJob(std::move(asioJob), std::move(timer));
+ auto asioJob = std::make_shared<PeriodicJobASIO>(std::move(job), _timerFactory->now(), timer);
+ _jobs.insert(_jobs.end(), asioJob);
- return Status::OK();
+ {
+ stdx::unique_lock<stdx::mutex> lk(_stateMutex);
+ if (_state == State::kRunning) {
+ _scheduleJob(asioJob);
+ }
+ }
}
-void PeriodicRunnerASIO::_scheduleJob(PeriodicJobASIO job,
- std::shared_ptr<executor::AsyncTimerInterface> timer) {
+void PeriodicRunnerASIO::_scheduleJob(std::weak_ptr<PeriodicJobASIO> job) {
+ auto lockedJob = job.lock();
+ if (!lockedJob) {
+ return;
+ }
+
// Adjust the timer to expire at the correct time.
- auto adjustedMS = job.start + job.interval - _timerFactory->now();
- timer->expireAfter(adjustedMS);
- timer->asyncWait([ timer, this, job = std::move(job) ](std::error_code ec) mutable {
+ auto adjustedMS = lockedJob->start + lockedJob->interval - _timerFactory->now();
+ lockedJob->timer->expireAfter(adjustedMS);
+ lockedJob->timer->asyncWait([this, job](std::error_code ec) mutable {
if (ec) {
severe() << "Encountered an error in PeriodicRunnerASIO: " << ec.message();
return;
}
- stdx::unique_lock<stdx::mutex> lk(_runningMutex);
- if (!_running) {
+ stdx::unique_lock<stdx::mutex> lk(_stateMutex);
+ if (_state != State::kRunning) {
return;
}
- job.start = _timerFactory->now();
- job.job();
+ auto lockedJob = job.lock();
+ if (!lockedJob) {
+ return;
+ }
+
+ lockedJob->start = _timerFactory->now();
+ lockedJob->job();
- _io_service.post([ timer, this, job = std::move(job) ]() mutable {
- _scheduleJob(std::move(job), timer);
- });
+ _io_service.post([this, job]() mutable { _scheduleJob(job); });
});
}
-void PeriodicRunnerASIO::startup() {
- stdx::unique_lock<stdx::mutex> lk(_runningMutex);
- if (!_running) {
- _running = true;
- _thread = stdx::thread([this]() {
- try {
- asio::io_service::work work(_io_service);
- std::error_code ec;
- _io_service.run(ec);
- if (ec) {
- severe() << "Failure in PeriodicRunnerASIO: " << ec.message();
- fassertFailed(40438);
- }
- } catch (...) {
- severe() << "Uncaught exception in PeriodicRunnerASIO: " << exceptionToStatus();
- fassertFailed(40439);
+Status PeriodicRunnerASIO::startup() {
+ stdx::unique_lock<stdx::mutex> lk(_stateMutex);
+ if (_state != State::kReady) {
+ return {ErrorCodes::ShutdownInProgress, "startup() already called"};
+ }
+
+ _state = State::kRunning;
+ _thread = stdx::thread([this]() {
+ try {
+ asio::io_service::work workItem(_io_service);
+ std::error_code ec;
+ _io_service.run(ec);
+ if (ec) {
+ severe() << "Failure in PeriodicRunnerASIO: " << ec.message();
+ fassertFailed(40438);
}
- });
+ } catch (...) {
+ severe() << "Uncaught exception in PeriodicRunnerASIO: " << exceptionToStatus();
+ fassertFailed(40439);
+ }
+ });
+
+ // schedule any jobs that we have
+ for (auto& job : _jobs) {
+ job->start = _timerFactory->now();
+ _scheduleJob(job);
}
+
+ return Status::OK();
}
void PeriodicRunnerASIO::shutdown() {
- stdx::unique_lock<stdx::mutex> lk(_runningMutex);
- if (_running) {
- _running = false;
+ stdx::unique_lock<stdx::mutex> lk(_stateMutex);
+ if (_state == State::kRunning) {
+ _state = State::kComplete;
+
_io_service.stop();
+
+ _jobs.clear();
_thread.join();
}
}
diff --git a/src/mongo/util/periodic_runner_asio.h b/src/mongo/util/periodic_runner_asio.h
index bdb4d6b7e40..b35f29bd66f 100644
--- a/src/mongo/util/periodic_runner_asio.h
+++ b/src/mongo/util/periodic_runner_asio.h
@@ -57,28 +57,41 @@ public:
/**
* Schedule a job to be run at periodic intervals.
*/
- Status scheduleJob(PeriodicJob job) override;
+ void scheduleJob(PeriodicJob job) override;
/**
* Starts up this periodic runner.
+ *
+ * This periodic runner will only run once; if it is subsequently started up
+ * again, it will return an error.
*/
- void startup() override;
+ Status startup() override;
/**
- * Shut down this periodic runner. Stops all jobs from running.
+ * Shut down this periodic runner. Stops all jobs from running. This method
+ * may safely be called multiple times, but only the first call will have any effect.
*/
void shutdown() override;
private:
struct PeriodicJobASIO {
- explicit PeriodicJobASIO(PeriodicJob callable, Date_t startTime)
- : job(std::move(callable.job)), interval(callable.interval), start(startTime) {}
+ explicit PeriodicJobASIO(PeriodicJob callable,
+ Date_t startTime,
+ std::shared_ptr<executor::AsyncTimerInterface> sharedTimer)
+ : job(std::move(callable.job)),
+ interval(callable.interval),
+ start(startTime),
+ timer(sharedTimer) {}
Job job;
Milliseconds interval;
Date_t start;
+ std::shared_ptr<executor::AsyncTimerInterface> timer;
};
- void _scheduleJob(PeriodicJobASIO job, std::shared_ptr<executor::AsyncTimerInterface> timer);
+ // Internally, we will transition through these states
+ enum class State { kReady, kRunning, kComplete };
+
+ void _scheduleJob(std::weak_ptr<PeriodicJobASIO> job);
asio::io_service _io_service;
asio::io_service::strand _strand;
@@ -87,8 +100,10 @@ private:
std::unique_ptr<executor::AsyncTimerFactoryInterface> _timerFactory;
- stdx::mutex _runningMutex;
- bool _running;
+ stdx::mutex _stateMutex;
+ State _state;
+
+ std::vector<std::shared_ptr<PeriodicJobASIO>> _jobs;
};
} // namespace mongo
diff --git a/src/mongo/util/periodic_runner_asio_test.cpp b/src/mongo/util/periodic_runner_asio_test.cpp
index 1502c0a9f1a..4094ecb3dfe 100644
--- a/src/mongo/util/periodic_runner_asio_test.cpp
+++ b/src/mongo/util/periodic_runner_asio_test.cpp
@@ -40,13 +40,12 @@
namespace mongo {
namespace {
-class PeriodicRunnerASIOTest : public unittest::Test {
+class PeriodicRunnerASIOTestNoSetup : public unittest::Test {
public:
void setUp() override {
auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>();
_timerFactory = timerFactory.get();
_runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory));
- _runner->startup();
}
void tearDown() override {
@@ -61,11 +60,22 @@ public:
return _runner;
}
-private:
+protected:
executor::AsyncTimerFactoryMock* _timerFactory;
std::unique_ptr<PeriodicRunnerASIO> _runner;
};
+class PeriodicRunnerASIOTest : public PeriodicRunnerASIOTestNoSetup {
+public:
+ void setUp() override {
+ auto timerFactory = stdx::make_unique<executor::AsyncTimerFactoryMock>();
+ _timerFactory = timerFactory.get();
+ _runner = stdx::make_unique<PeriodicRunnerASIO>(std::move(timerFactory));
+ auto res = _runner->startup();
+ ASSERT(res.isOK());
+ }
+};
+
TEST_F(PeriodicRunnerASIOTest, OneJobTest) {
int count = 0;
Milliseconds interval{5};
@@ -84,7 +94,7 @@ TEST_F(PeriodicRunnerASIOTest, OneJobTest) {
},
interval);
- ASSERT_OK(runner()->scheduleJob(std::move(job)));
+ runner()->scheduleJob(std::move(job));
// Ensure nothing happens until we fastForward
{
@@ -102,13 +112,43 @@ TEST_F(PeriodicRunnerASIOTest, OneJobTest) {
}
}
+TEST_F(PeriodicRunnerASIOTestNoSetup, ScheduleBeforeStartupTest) {
+ int count = 0;
+ Milliseconds interval{5};
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+
+ // Schedule a job before startup
+ PeriodicRunner::PeriodicJob job(
+ [&count, &mutex, &cv] {
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ count++;
+ }
+ cv.notify_all();
+ },
+ interval);
+
+ runner()->scheduleJob(std::move(job));
+
+ // Start the runner, job should still run
+ ASSERT(runner()->startup().isOK());
+
+ timerFactory().fastForward(interval);
+
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ cv.wait(lk, [&count] { return count > 0; });
+}
+
TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) {
int count = 0;
Milliseconds interval{5};
// Schedule a job before shutdown
PeriodicRunner::PeriodicJob job([&count] { count++; }, interval);
- ASSERT_OK(runner()->scheduleJob(std::move(job)));
+
+ runner()->scheduleJob(std::move(job));
// Shut down before the job runs
runner()->shutdown();
@@ -117,6 +157,14 @@ TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) {
timerFactory().fastForward(interval);
sleepmillis(10);
ASSERT_EQ(count, 0);
+
+ // Start up the runner again, this should error and should not run
+ auto res = runner()->startup();
+ ASSERT(!res.isOK());
+
+ timerFactory().fastForward(interval);
+ sleepmillis(10);
+ ASSERT_EQ(count, 0);
}
TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) {
@@ -149,8 +197,8 @@ TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) {
},
intervalB);
- ASSERT_OK(runner()->scheduleJob(std::move(jobA)));
- ASSERT_OK(runner()->scheduleJob(std::move(jobB)));
+ runner()->scheduleJob(std::move(jobA));
+ runner()->scheduleJob(std::move(jobB));
// Fast forward and wait for both jobs to run the right number of times
for (int i = 0; i <= 10; i++) {