summaryrefslogtreecommitdiff
path: root/src/mongo/util
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-03-12 17:05:05 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-24 04:32:03 +0000
commitef75364ada70eaf4a096ed07adfeb3175abd719b (patch)
tree51fdaba76229875086d6e1c32889da230f92b9e4 /src/mongo/util
parent30bd61a92cd4c8d6dd5588b59b9bdf4a0cd3c7d8 (diff)
downloadmongo-ef75364ada70eaf4a096ed07adfeb3175abd719b.tar.gz
SERVER-46841 Make PeriodicRunner interrupt blocked operations on stop
Diffstat (limited to 'src/mongo/util')
-rw-r--r--src/mongo/util/periodic_runner_impl.cpp88
-rw-r--r--src/mongo/util/periodic_runner_impl.h1
-rw-r--r--src/mongo/util/periodic_runner_impl_test.cpp39
3 files changed, 95 insertions, 33 deletions
diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp
index 59a57470552..d39df21ac02 100644
--- a/src/mongo/util/periodic_runner_impl.cpp
+++ b/src/mongo/util/periodic_runner_impl.cpp
@@ -54,7 +54,7 @@ auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor {
PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
ClockSource* source,
ServiceContext* svc)
- : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {}
+ : _job(std::move(job)), _clockSource(source), _serviceContext(svc), _client(nullptr) {}
void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
auto [startPromise, startFuture] = makePromiseFuture<void>();
@@ -66,46 +66,54 @@ void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
_thread = stdx::thread([this, startPromise = std::move(startPromise)]() mutable {
- auto guard = makeGuard([this] { _stopPromise.emplaceValue(); });
+ ON_BLOCK_EXIT([this] { _stopPromise.emplaceValue(); });
- Client::initThread(_job.name, _serviceContext, nullptr);
-
- // Let start() know we're running
+ ThreadClient client(_job.name, _serviceContext, nullptr);
{
- stdx::lock_guard lk(_mutex);
- _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
- }
- startPromise.emplaceValue();
-
- stdx::unique_lock<Latch> lk(_mutex);
- while (_execStatus != ExecutionStatus::CANCELED) {
- // Wait until it's unpaused or canceled
- _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
- if (_execStatus == ExecutionStatus::CANCELED) {
- return;
+ // This ensures client object is not destructed so long as others can access it.
+ ON_BLOCK_EXIT([this] {
+ stdx::lock_guard lk(_mutex);
+ _client = nullptr;
+ });
+
+ // Let start() know we're running
+ {
+ stdx::lock_guard lk(_mutex);
+ _client = client.get();
+ _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
}
+ startPromise.emplaceValue();
+
+ stdx::unique_lock<Latch> lk(_mutex);
+ while (_execStatus != ExecutionStatus::CANCELED) {
+ // Wait until it's unpaused or canceled
+ _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
+ if (_execStatus == ExecutionStatus::CANCELED) {
+ return;
+ }
- auto start = _clockSource->now();
+ auto start = _clockSource->now();
- // Unlock while job is running so we can pause/cancel concurrently
- lk.unlock();
- _job.job(Client::getCurrent());
- lk.lock();
+ // Unlock while job is running so we can pause/cancel concurrently
+ lk.unlock();
+ _job.job(client.get());
+ lk.lock();
- auto getDeadlineFromInterval = [&] { return start + _job.interval; };
+ auto getDeadlineFromInterval = [&] { return start + _job.interval; };
- do {
- auto deadline = getDeadlineFromInterval();
+ do {
+ auto deadline = getDeadlineFromInterval();
- if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] {
- return _execStatus == ExecutionStatus::CANCELED ||
- getDeadlineFromInterval() != deadline;
- })) {
- if (_execStatus == ExecutionStatus::CANCELED) {
- return;
+ if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] {
+ return _execStatus == ExecutionStatus::CANCELED ||
+ getDeadlineFromInterval() != deadline;
+ })) {
+ if (_execStatus == ExecutionStatus::CANCELED) {
+ return;
+ }
}
- }
- } while (_clockSource->now() < getDeadlineFromInterval());
+ } while (_clockSource->now() < getDeadlineFromInterval());
+ }
}
});
@@ -146,15 +154,29 @@ void PeriodicRunnerImpl::PeriodicJobImpl::stop() {
return;
}
+ auto stopFuture = _stopPromise.getFuture();
+
// Only join once
if (lastExecStatus != ExecutionStatus::CANCELED) {
LOGV2_DEBUG(23324, 2, "Stopping periodic job {job_name}", "job_name"_attr = _job.name);
_condvar.notify_one();
+
+ // Kill the client thread and its opCtx (if any) before joining.
+ // So long as `_job` returns upon receiving the kill signal, the following ensures liveness
+ // (i.e., this method will eventually return).
+ if (!stopFuture.isReady()) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ // Check if the client thread has already returned.
+ if (_client) {
+ _client->setKilled();
+ }
+ }
+
_thread.join();
}
- _stopPromise.getFuture().get();
+ stopFuture.get();
}
Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() {
diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h
index 4a89b4b6a05..369fb19882c 100644
--- a/src/mongo/util/periodic_runner_impl.h
+++ b/src/mongo/util/periodic_runner_impl.h
@@ -79,6 +79,7 @@ private:
ClockSource* _clockSource;
ServiceContext* _serviceContext;
+ Client* _client;
stdx::thread _thread;
SharedPromise<void> _stopPromise;
diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp
index 185d3d2706a..22bcbe3506b 100644
--- a/src/mongo/util/periodic_runner_impl_test.cpp
+++ b/src/mongo/util/periodic_runner_impl_test.cpp
@@ -27,8 +27,10 @@
* it in the license file.
*/
+#include "mongo/platform/atomic_word.h"
#include "mongo/platform/basic.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/periodic_runner_impl.h"
@@ -437,5 +439,42 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) {
tearDown();
}
+TEST_F(PeriodicRunnerImplTest, StopProperlyInterruptsOpCtx) {
+ Milliseconds interval{5};
+ unittest::Barrier barrier(2);
+ AtomicWord<bool> killed{false};
+
+ PeriodicRunner::PeriodicJob job(
+ "job",
+ [&barrier, &killed](Client* client) {
+ stdx::condition_variable cv;
+ auto mutex = MONGO_MAKE_LATCH();
+ barrier.countDownAndWait();
+
+ try {
+ auto opCtx = client->makeOperationContext();
+ stdx::unique_lock<Latch> lk(mutex);
+ opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; });
+ } catch (const ExceptionForCat<ErrorCategory::Interruption>& e) {
+ ASSERT_EQ(e.code(), ErrorCodes::ClientMarkedKilled);
+ killed.store(true);
+ return;
+ }
+
+ MONGO_UNREACHABLE;
+ },
+ interval);
+
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
+
+ barrier.countDownAndWait();
+
+ jobAnchor.stop();
+ ASSERT(killed.load());
+
+ tearDown();
+}
+
} // namespace
} // namespace mongo