summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/base/error_codes.yml1
-rw-r--r--src/mongo/db/client.cpp8
-rw-r--r--src/mongo/db/client.h16
-rw-r--r--src/mongo/db/operation_context.cpp15
-rw-r--r--src/mongo/util/periodic_runner_impl.cpp88
-rw-r--r--src/mongo/util/periodic_runner_impl.h1
-rw-r--r--src/mongo/util/periodic_runner_impl_test.cpp39
7 files changed, 131 insertions, 37 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 43ea3f365b8..2676666cdcc 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -376,4 +376,5 @@ error_codes:
- {code: 13436,name: NotMasterOrSecondary,categories: [NotMasterError,RetriableError]}
- {code: 14031,name: OutOfDiskSpace}
- {code: 17280,name: OSBELETE_KeyTooLong}
+ - {code: 46841,name: ClientMarkedKilled,categories: [Interruption,CancelationError,RetriableError]}
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index ec0283e0468..77204405f3c 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -183,6 +183,14 @@ bool Client::hasAnyActiveCurrentOp() const {
return false;
}
+void Client::setKilled() noexcept {
+ stdx::lock_guard<Client> lk(*this);
+ _killed.store(true);
+ if (_opCtx) {
+ _serviceContext->killOperation(lk, _opCtx, ErrorCodes::ClientMarkedKilled);
+ }
+}
+
ThreadClient::ThreadClient(ServiceContext* serviceContext)
: ThreadClient(getThreadName(), serviceContext, nullptr) {}
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index f98d17d5252..3ffed27adc2 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -41,6 +41,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/service_context.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/session.h"
@@ -246,6 +247,19 @@ public:
*/
bool hasAnyActiveCurrentOp() const;
+ /**
+ * Signal the client's OperationContext that it has been killed.
+ * Any future OperationContext on this client will also receive a kill signal.
+ */
+ void setKilled() noexcept;
+
+ /**
+ * Get the state for killing the client's OperationContext.
+ */
+ bool getKilled() const noexcept {
+ return _killed.loadRelaxed();
+ }
+
private:
friend class ServiceContext;
friend class ThreadClient;
@@ -275,6 +289,8 @@ private:
bool _systemOperationKillable = false;
PseudoRandom _prng;
+
+ AtomicWord<bool> _killed{false};
};
/**
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index ee38faab01b..a0bee37b49b 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -203,10 +203,17 @@ bool opShouldFail(Client* client, const BSONObj& failPointInfo) {
} // namespace
Status OperationContext::checkForInterruptNoAssert() noexcept {
- // TODO: Remove the MONGO_likely(getClient()) once all operation contexts are constructed with
- // clients.
- if (MONGO_likely(getClient() && getServiceContext()) &&
- getServiceContext()->getKillAllOperations() && !_isExecutingShutdown) {
+ // TODO: Remove the MONGO_likely(hasClientAndServiceContext) once all operation contexts are
+ // constructed with clients.
+ const auto hasClientAndServiceContext = getClient() && getServiceContext();
+
+ if (MONGO_likely(hasClientAndServiceContext) && getClient()->getKilled() &&
+ !_isExecutingShutdown) {
+ return Status(ErrorCodes::ClientMarkedKilled, "client has been killed");
+ }
+
+ if (MONGO_likely(hasClientAndServiceContext) && getServiceContext()->getKillAllOperations() &&
+ !_isExecutingShutdown) {
return Status(ErrorCodes::InterruptedAtShutdown, "interrupted at shutdown");
}
diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp
index 59a57470552..d39df21ac02 100644
--- a/src/mongo/util/periodic_runner_impl.cpp
+++ b/src/mongo/util/periodic_runner_impl.cpp
@@ -54,7 +54,7 @@ auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor {
PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
ClockSource* source,
ServiceContext* svc)
- : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {}
+ : _job(std::move(job)), _clockSource(source), _serviceContext(svc), _client(nullptr) {}
void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
auto [startPromise, startFuture] = makePromiseFuture<void>();
@@ -66,46 +66,54 @@ void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
_thread = stdx::thread([this, startPromise = std::move(startPromise)]() mutable {
- auto guard = makeGuard([this] { _stopPromise.emplaceValue(); });
+ ON_BLOCK_EXIT([this] { _stopPromise.emplaceValue(); });
- Client::initThread(_job.name, _serviceContext, nullptr);
-
- // Let start() know we're running
+ ThreadClient client(_job.name, _serviceContext, nullptr);
{
- stdx::lock_guard lk(_mutex);
- _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
- }
- startPromise.emplaceValue();
-
- stdx::unique_lock<Latch> lk(_mutex);
- while (_execStatus != ExecutionStatus::CANCELED) {
- // Wait until it's unpaused or canceled
- _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
- if (_execStatus == ExecutionStatus::CANCELED) {
- return;
+ // This ensures client object is not destructed so long as others can access it.
+ ON_BLOCK_EXIT([this] {
+ stdx::lock_guard lk(_mutex);
+ _client = nullptr;
+ });
+
+ // Let start() know we're running
+ {
+ stdx::lock_guard lk(_mutex);
+ _client = client.get();
+ _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
}
+ startPromise.emplaceValue();
+
+ stdx::unique_lock<Latch> lk(_mutex);
+ while (_execStatus != ExecutionStatus::CANCELED) {
+ // Wait until it's unpaused or canceled
+ _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
+ if (_execStatus == ExecutionStatus::CANCELED) {
+ return;
+ }
- auto start = _clockSource->now();
+ auto start = _clockSource->now();
- // Unlock while job is running so we can pause/cancel concurrently
- lk.unlock();
- _job.job(Client::getCurrent());
- lk.lock();
+ // Unlock while job is running so we can pause/cancel concurrently
+ lk.unlock();
+ _job.job(client.get());
+ lk.lock();
- auto getDeadlineFromInterval = [&] { return start + _job.interval; };
+ auto getDeadlineFromInterval = [&] { return start + _job.interval; };
- do {
- auto deadline = getDeadlineFromInterval();
+ do {
+ auto deadline = getDeadlineFromInterval();
- if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] {
- return _execStatus == ExecutionStatus::CANCELED ||
- getDeadlineFromInterval() != deadline;
- })) {
- if (_execStatus == ExecutionStatus::CANCELED) {
- return;
+ if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] {
+ return _execStatus == ExecutionStatus::CANCELED ||
+ getDeadlineFromInterval() != deadline;
+ })) {
+ if (_execStatus == ExecutionStatus::CANCELED) {
+ return;
+ }
}
- }
- } while (_clockSource->now() < getDeadlineFromInterval());
+ } while (_clockSource->now() < getDeadlineFromInterval());
+ }
}
});
@@ -146,15 +154,29 @@ void PeriodicRunnerImpl::PeriodicJobImpl::stop() {
return;
}
+ auto stopFuture = _stopPromise.getFuture();
+
// Only join once
if (lastExecStatus != ExecutionStatus::CANCELED) {
LOGV2_DEBUG(23324, 2, "Stopping periodic job {job_name}", "job_name"_attr = _job.name);
_condvar.notify_one();
+
+ // Kill the client thread and its opCtx (if any) before joining.
+ // So long as `_job` returns upon receiving the kill signal, the following ensures liveness
+ // (i.e., this method will eventually return).
+ if (!stopFuture.isReady()) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ // Check if the client thread has already returned.
+ if (_client) {
+ _client->setKilled();
+ }
+ }
+
_thread.join();
}
- _stopPromise.getFuture().get();
+ stopFuture.get();
}
Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() {
diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h
index 4a89b4b6a05..369fb19882c 100644
--- a/src/mongo/util/periodic_runner_impl.h
+++ b/src/mongo/util/periodic_runner_impl.h
@@ -79,6 +79,7 @@ private:
ClockSource* _clockSource;
ServiceContext* _serviceContext;
+ Client* _client;
stdx::thread _thread;
SharedPromise<void> _stopPromise;
diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp
index 185d3d2706a..22bcbe3506b 100644
--- a/src/mongo/util/periodic_runner_impl_test.cpp
+++ b/src/mongo/util/periodic_runner_impl_test.cpp
@@ -27,8 +27,10 @@
* it in the license file.
*/
+#include "mongo/platform/atomic_word.h"
#include "mongo/platform/basic.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/periodic_runner_impl.h"
@@ -437,5 +439,42 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) {
tearDown();
}
+TEST_F(PeriodicRunnerImplTest, StopProperlyInterruptsOpCtx) {
+ Milliseconds interval{5};
+ unittest::Barrier barrier(2);
+ AtomicWord<bool> killed{false};
+
+ PeriodicRunner::PeriodicJob job(
+ "job",
+ [&barrier, &killed](Client* client) {
+ stdx::condition_variable cv;
+ auto mutex = MONGO_MAKE_LATCH();
+ barrier.countDownAndWait();
+
+ try {
+ auto opCtx = client->makeOperationContext();
+ stdx::unique_lock<Latch> lk(mutex);
+ opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; });
+ } catch (const ExceptionForCat<ErrorCategory::Interruption>& e) {
+ ASSERT_EQ(e.code(), ErrorCodes::ClientMarkedKilled);
+ killed.store(true);
+ return;
+ }
+
+ MONGO_UNREACHABLE;
+ },
+ interval);
+
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
+
+ barrier.countDownAndWait();
+
+ jobAnchor.stop();
+ ASSERT(killed.load());
+
+ tearDown();
+}
+
} // namespace
} // namespace mongo