diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-03-12 17:05:05 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-24 04:32:03 +0000 |
commit | ef75364ada70eaf4a096ed07adfeb3175abd719b (patch) | |
tree | 51fdaba76229875086d6e1c32889da230f92b9e4 /src/mongo/util | |
parent | 30bd61a92cd4c8d6dd5588b59b9bdf4a0cd3c7d8 (diff) | |
download | mongo-ef75364ada70eaf4a096ed07adfeb3175abd719b.tar.gz |
SERVER-46841 Make PeriodicRunner interrupt blocked operations on stop
Diffstat (limited to 'src/mongo/util')
-rw-r--r-- | src/mongo/util/periodic_runner_impl.cpp | 88 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/util/periodic_runner_impl_test.cpp | 39 |
3 files changed, 95 insertions, 33 deletions
diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp index 59a57470552..d39df21ac02 100644 --- a/src/mongo/util/periodic_runner_impl.cpp +++ b/src/mongo/util/periodic_runner_impl.cpp @@ -54,7 +54,7 @@ auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor { PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc) - : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {} + : _job(std::move(job)), _clockSource(source), _serviceContext(svc), _client(nullptr) {} void PeriodicRunnerImpl::PeriodicJobImpl::_run() { auto [startPromise, startFuture] = makePromiseFuture<void>(); @@ -66,46 +66,54 @@ void PeriodicRunnerImpl::PeriodicJobImpl::_run() { _thread = stdx::thread([this, startPromise = std::move(startPromise)]() mutable { - auto guard = makeGuard([this] { _stopPromise.emplaceValue(); }); + ON_BLOCK_EXIT([this] { _stopPromise.emplaceValue(); }); - Client::initThread(_job.name, _serviceContext, nullptr); - - // Let start() know we're running + ThreadClient client(_job.name, _serviceContext, nullptr); { - stdx::lock_guard lk(_mutex); - _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; - } - startPromise.emplaceValue(); - - stdx::unique_lock<Latch> lk(_mutex); - while (_execStatus != ExecutionStatus::CANCELED) { - // Wait until it's unpaused or canceled - _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; }); - if (_execStatus == ExecutionStatus::CANCELED) { - return; + // This ensures client object is not destructed so long as others can access it. + ON_BLOCK_EXIT([this] { + stdx::lock_guard lk(_mutex); + _client = nullptr; + }); + + // Let start() know we're running + { + stdx::lock_guard lk(_mutex); + _client = client.get(); + _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; } + startPromise.emplaceValue(); + + stdx::unique_lock<Latch> lk(_mutex); + while (_execStatus != ExecutionStatus::CANCELED) { + // Wait until it's unpaused or canceled + _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; }); + if (_execStatus == ExecutionStatus::CANCELED) { + return; + } - auto start = _clockSource->now(); + auto start = _clockSource->now(); - // Unlock while job is running so we can pause/cancel concurrently - lk.unlock(); - _job.job(Client::getCurrent()); - lk.lock(); + // Unlock while job is running so we can pause/cancel concurrently + lk.unlock(); + _job.job(client.get()); + lk.lock(); - auto getDeadlineFromInterval = [&] { return start + _job.interval; }; + auto getDeadlineFromInterval = [&] { return start + _job.interval; }; - do { - auto deadline = getDeadlineFromInterval(); + do { + auto deadline = getDeadlineFromInterval(); - if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] { - return _execStatus == ExecutionStatus::CANCELED || - getDeadlineFromInterval() != deadline; - })) { - if (_execStatus == ExecutionStatus::CANCELED) { - return; + if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] { + return _execStatus == ExecutionStatus::CANCELED || + getDeadlineFromInterval() != deadline; + })) { + if (_execStatus == ExecutionStatus::CANCELED) { + return; + } } - } - } while (_clockSource->now() < getDeadlineFromInterval()); + } while (_clockSource->now() < getDeadlineFromInterval()); + } } }); @@ -146,15 +154,29 @@ void PeriodicRunnerImpl::PeriodicJobImpl::stop() { return; } + auto stopFuture = _stopPromise.getFuture(); + // Only join once if (lastExecStatus != ExecutionStatus::CANCELED) { LOGV2_DEBUG(23324, 2, "Stopping periodic job {job_name}", "job_name"_attr = _job.name); _condvar.notify_one(); + + // Kill the client thread and its opCtx (if any) before joining. + // So long as `_job` returns upon receiving the kill signal, the following ensures liveness + // (i.e., this method will eventually return). + if (!stopFuture.isReady()) { + stdx::lock_guard<Latch> lk(_mutex); + // Check if the client thread has already returned. + if (_client) { + _client->setKilled(); + } + } + _thread.join(); } - _stopPromise.getFuture().get(); + stopFuture.get(); } Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() { diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h index 4a89b4b6a05..369fb19882c 100644 --- a/src/mongo/util/periodic_runner_impl.h +++ b/src/mongo/util/periodic_runner_impl.h @@ -79,6 +79,7 @@ private: ClockSource* _clockSource; ServiceContext* _serviceContext; + Client* _client; stdx::thread _thread; SharedPromise<void> _stopPromise; diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp index 185d3d2706a..22bcbe3506b 100644 --- a/src/mongo/util/periodic_runner_impl_test.cpp +++ b/src/mongo/util/periodic_runner_impl_test.cpp @@ -27,8 +27,10 @@ * it in the license file. */ +#include "mongo/platform/atomic_word.h" #include "mongo/platform/basic.h" +#include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" #include "mongo/util/periodic_runner_impl.h" @@ -437,5 +439,42 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) { tearDown(); } +TEST_F(PeriodicRunnerImplTest, StopProperlyInterruptsOpCtx) { + Milliseconds interval{5}; + unittest::Barrier barrier(2); + AtomicWord<bool> killed{false}; + + PeriodicRunner::PeriodicJob job( + "job", + [&barrier, &killed](Client* client) { + stdx::condition_variable cv; + auto mutex = MONGO_MAKE_LATCH(); + barrier.countDownAndWait(); + + try { + auto opCtx = client->makeOperationContext(); + stdx::unique_lock<Latch> lk(mutex); + opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }); + } catch (const ExceptionForCat<ErrorCategory::Interruption>& e) { + ASSERT_EQ(e.code(), ErrorCodes::ClientMarkedKilled); + killed.store(true); + return; + } + + MONGO_UNREACHABLE; + }, + interval); + + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); + + barrier.countDownAndWait(); + + jobAnchor.stop(); + ASSERT(killed.load()); + + tearDown(); +} + } // namespace } // namespace mongo |