From 19a8df607f40a3d31d145bef9255ed4e019d23c1 Mon Sep 17 00:00:00 2001 From: Amirsaman Memaripour Date: Thu, 12 Mar 2020 17:05:05 -0400 Subject: SERVER-46842 Make PeriodicRunner interrupt blocked operations on stop (cherry picked from commit ef75364ada70eaf4a096ed07adfeb3175abd719b) --- src/mongo/base/error_codes.err | 6 +- src/mongo/db/client.cpp | 8 +++ src/mongo/db/client.h | 16 +++++ src/mongo/db/operation_context.cpp | 15 +++-- src/mongo/util/periodic_runner_impl.cpp | 88 +++++++++++++++++----------- src/mongo/util/periodic_runner_impl.h | 1 + src/mongo/util/periodic_runner_impl_test.cpp | 39 ++++++++++++ 7 files changed, 134 insertions(+), 39 deletions(-) diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index fddae0b50e7..1c61be94d93 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -319,6 +319,7 @@ error_code("StaleConfig", 13388, extra="StaleConfigInfo"); error_code("DatabaseDifferCase", 13297); error_code("OBSOLETE_PrepareConfigsFailed", 13104); error_code("MergeStageNoMatchingDocument", 13113); +error_code("ClientMarkedKilled", 46841); # Group related errors into classes, can be checked against ErrorCodes::isXXXClassName methods. error_class("NetworkError", ["HostUnreachable", "HostNotFound", "NetworkTimeout", "SocketException"]) @@ -329,7 +330,8 @@ error_class("Interruption", ["Interrupted", "MaxTimeMSExpired", "CursorKilled", "LockTimeout", - "ClientDisconnect"]) + "ClientDisconnect", + "ClientMarkedKilled"]) # isNotMasterError() includes all codes that indicate that the node that received the request was # not master at some point during command processing, regardless of whether some write may have @@ -353,7 +355,7 @@ error_class("ShutdownError", ["ShutdownInProgress", "InterruptedAtShutdown"]) # isCancelationError() includes all codes that, when passed to a function as its parameter, # indicates that it cannot be executed as normal and must abort its intended work. -error_class("CancelationError", ["ShutdownInProgress", "InterruptedAtShutdown", "CallbackCanceled", "PeriodicJobIsStopped"]) +error_class("CancelationError", ["ShutdownInProgress", "InterruptedAtShutdown", "CallbackCanceled", "PeriodicJobIsStopped", "ClientMarkedKilled"]) error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag", "TooManyDocumentSequences", diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 8ed19bb98c9..41049f15405 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -164,6 +164,14 @@ void Client::setCurrent(ServiceContext::UniqueClient client) { currentClient = std::move(client); } +void Client::setKilled() noexcept { + stdx::lock_guard lk(*this); + _killed.store(true); + if (_opCtx) { + _serviceContext->killOperation(lk, _opCtx, ErrorCodes::ClientMarkedKilled); + } +} + ThreadClient::ThreadClient(ServiceContext* serviceContext) : ThreadClient(getThreadName(), serviceContext, nullptr) {} diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 75e828c63c3..2c06e0e1453 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -41,6 +41,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/service_context.h" +#include "mongo/platform/atomic_word.h" #include "mongo/platform/random.h" #include "mongo/stdx/thread.h" #include "mongo/transport/session.h" @@ -229,6 +230,19 @@ public: return _prng; } + /** + * Signal the client's OperationContext that it has been killed. + * Any future OperationContext on this client will also receive a kill signal. + */ + void setKilled() noexcept; + + /** + * Get the state for killing the client's OperationContext. + */ + bool getKilled() const noexcept { + return _killed.loadRelaxed(); + } + private: friend class ServiceContext; friend class ThreadClient; @@ -258,6 +272,8 @@ private: bool _systemOperationKillable = false; PseudoRandom _prng; + + AtomicWord _killed{false}; }; /** diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index a88078acb5f..d5b8e11f8da 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -199,10 +199,17 @@ bool opShouldFail(Client* client, const BSONObj& failPointInfo) { } // namespace Status OperationContext::checkForInterruptNoAssert() noexcept { - // TODO: Remove the MONGO_likely(getClient()) once all operation contexts are constructed with - // clients. - if (MONGO_likely(getClient() && getServiceContext()) && - getServiceContext()->getKillAllOperations() && !_isExecutingShutdown) { + // TODO: Remove the MONGO_likely(hasClientAndServiceContext) once all operation contexts are + // constructed with clients. + const auto hasClientAndServiceContext = getClient() && getServiceContext(); + + if (MONGO_likely(hasClientAndServiceContext) && getClient()->getKilled() && + !_isExecutingShutdown) { + return Status(ErrorCodes::ClientMarkedKilled, "client has been killed"); + } + + if (MONGO_likely(hasClientAndServiceContext) && getServiceContext()->getKillAllOperations() && + !_isExecutingShutdown) { return Status(ErrorCodes::InterruptedAtShutdown, "interrupted at shutdown"); } diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp index 88e83782957..06b1c63de2e 100644 --- a/src/mongo/util/periodic_runner_impl.cpp +++ b/src/mongo/util/periodic_runner_impl.cpp @@ -56,7 +56,7 @@ auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor { PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job, ClockSource* source, ServiceContext* svc) - : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {} + : _job(std::move(job)), _clockSource(source), _serviceContext(svc), _client(nullptr) {} void PeriodicRunnerImpl::PeriodicJobImpl::_run() { { @@ -70,46 +70,54 @@ void PeriodicRunnerImpl::PeriodicJobImpl::_run() { auto [startPromise, startFuture] = makePromiseFuture(); _thread = stdx::thread([this, startPromise = std::move(startPromise)]() mutable { - auto guard = makeGuard([this] { _stopPromise.emplaceValue(); }); + ON_BLOCK_EXIT([this] { _stopPromise.emplaceValue(); }); - Client::initThread(_job.name, _serviceContext, nullptr); - - // Let start() know we're running + ThreadClient client(_job.name, _serviceContext, nullptr); { - stdx::lock_guard lk(_mutex); - _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; - } - startPromise.emplaceValue(); - - stdx::unique_lock lk(_mutex); - while (_execStatus != ExecutionStatus::CANCELED) { - // Wait until it's unpaused or canceled - _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; }); - if (_execStatus == ExecutionStatus::CANCELED) { - return; + // This ensures client object is not destructed so long as others can access it. + ON_BLOCK_EXIT([this] { + stdx::lock_guard lk(_mutex); + _client = nullptr; + }); + + // Let start() know we're running + { + stdx::lock_guard lk(_mutex); + _client = client.get(); + _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING; } + startPromise.emplaceValue(); + + stdx::unique_lock lk(_mutex); + while (_execStatus != ExecutionStatus::CANCELED) { + // Wait until it's unpaused or canceled + _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; }); + if (_execStatus == ExecutionStatus::CANCELED) { + return; + } - auto start = _clockSource->now(); + auto start = _clockSource->now(); - // Unlock while job is running so we can pause/cancel concurrently - lk.unlock(); - _job.job(Client::getCurrent()); - lk.lock(); + // Unlock while job is running so we can pause/cancel concurrently + lk.unlock(); + _job.job(client.get()); + lk.lock(); - auto getDeadlineFromInterval = [&] { return start + _job.interval; }; + auto getDeadlineFromInterval = [&] { return start + _job.interval; }; - do { - auto deadline = getDeadlineFromInterval(); + do { + auto deadline = getDeadlineFromInterval(); - if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] { - return _execStatus == ExecutionStatus::CANCELED || - getDeadlineFromInterval() != deadline; - })) { - if (_execStatus == ExecutionStatus::CANCELED) { - return; + if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] { + return _execStatus == ExecutionStatus::CANCELED || + getDeadlineFromInterval() != deadline; + })) { + if (_execStatus == ExecutionStatus::CANCELED) { + return; + } } - } - } while (_clockSource->now() < getDeadlineFromInterval()); + } while (_clockSource->now() < getDeadlineFromInterval()); + } } }); @@ -155,15 +163,29 @@ void PeriodicRunnerImpl::PeriodicJobImpl::stop() { return; } + auto stopFuture = _stopPromise.getFuture(); + // Only join once if (lastExecStatus != ExecutionStatus::CANCELED) { LOG(2) << "Stopping periodic job " << _job.name; _condvar.notify_one(); + + // Kill the client thread and its opCtx (if any) before joining. + // So long as `_job` returns upon receiving the kill signal, the following ensures liveness + // (i.e., this method will eventually return). + if (!stopFuture.isReady()) { + stdx::lock_guard lk(_mutex); + // Check if the client thread has already returned. + if (_client) { + _client->setKilled(); + } + } + _thread.join(); } - _stopPromise.getFuture().get(); + stopFuture.get(); } Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() { diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h index 4a89b4b6a05..369fb19882c 100644 --- a/src/mongo/util/periodic_runner_impl.h +++ b/src/mongo/util/periodic_runner_impl.h @@ -79,6 +79,7 @@ private: ClockSource* _clockSource; ServiceContext* _serviceContext; + Client* _client; stdx::thread _thread; SharedPromise _stopPromise; diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp index 4f3f8ee44b6..ec193afffc6 100644 --- a/src/mongo/util/periodic_runner_impl_test.cpp +++ b/src/mongo/util/periodic_runner_impl_test.cpp @@ -30,8 +30,10 @@ #include #include "mongo/base/error_codes.h" +#include "mongo/platform/atomic_word.h" #include "mongo/platform/basic.h" +#include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" #include "mongo/util/periodic_runner_impl.h" @@ -448,6 +450,43 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) { tearDown(); } +TEST_F(PeriodicRunnerImplTest, StopProperlyInterruptsOpCtx) { + Milliseconds interval{5}; + unittest::Barrier barrier(2); + AtomicWord killed{false}; + + PeriodicRunner::PeriodicJob job( + "job", + [&barrier, &killed](Client* client) { + stdx::condition_variable cv; + auto mutex = MONGO_MAKE_LATCH(); + barrier.countDownAndWait(); + + try { + auto opCtx = client->makeOperationContext(); + stdx::unique_lock lk(mutex); + opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; }); + } catch (const ExceptionForCat& e) { + ASSERT_EQ(e.code(), ErrorCodes::ClientMarkedKilled); + killed.store(true); + return; + } + + MONGO_UNREACHABLE; + }, + interval); + + auto jobAnchor = runner().makeJob(std::move(job)); + jobAnchor.start(); + + barrier.countDownAndWait(); + + jobAnchor.stop(); + ASSERT(killed.load()); + + tearDown(); +} + TEST_F(PeriodicRunnerImplTest, ThrowsErrorOnceStopped) { auto jobAnchor = makeStoppedJob(); ASSERT_THROWS_CODE_AND_WHAT(jobAnchor.start(), -- cgit v1.2.1