summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-03-12 17:05:05 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-28 19:25:54 +0000
commit19a8df607f40a3d31d145bef9255ed4e019d23c1 (patch)
treee7167ba1e24721c5885a1eb48d8b89fa7deadd02
parent988222a9ccdd2772b44c1791d308645a8f5209eb (diff)
downloadmongo-19a8df607f40a3d31d145bef9255ed4e019d23c1.tar.gz
SERVER-46842 Make PeriodicRunner interrupt blocked operations on stop
(cherry picked from commit ef75364ada70eaf4a096ed07adfeb3175abd719b)
-rw-r--r--src/mongo/base/error_codes.err6
-rw-r--r--src/mongo/db/client.cpp8
-rw-r--r--src/mongo/db/client.h16
-rw-r--r--src/mongo/db/operation_context.cpp15
-rw-r--r--src/mongo/util/periodic_runner_impl.cpp88
-rw-r--r--src/mongo/util/periodic_runner_impl.h1
-rw-r--r--src/mongo/util/periodic_runner_impl_test.cpp39
7 files changed, 134 insertions, 39 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index fddae0b50e7..1c61be94d93 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -319,6 +319,7 @@ error_code("StaleConfig", 13388, extra="StaleConfigInfo");
error_code("DatabaseDifferCase", 13297);
error_code("OBSOLETE_PrepareConfigsFailed", 13104);
error_code("MergeStageNoMatchingDocument", 13113);
+error_code("ClientMarkedKilled", 46841);
# Group related errors into classes, can be checked against ErrorCodes::isXXXClassName methods.
error_class("NetworkError", ["HostUnreachable", "HostNotFound", "NetworkTimeout", "SocketException"])
@@ -329,7 +330,8 @@ error_class("Interruption", ["Interrupted",
"MaxTimeMSExpired",
"CursorKilled",
"LockTimeout",
- "ClientDisconnect"])
+ "ClientDisconnect",
+ "ClientMarkedKilled"])
# isNotMasterError() includes all codes that indicate that the node that received the request was
# not master at some point during command processing, regardless of whether some write may have
@@ -353,7 +355,7 @@ error_class("ShutdownError", ["ShutdownInProgress", "InterruptedAtShutdown"])
# isCancelationError() includes all codes that, when passed to a function as its parameter,
# indicates that it cannot be executed as normal and must abort its intended work.
-error_class("CancelationError", ["ShutdownInProgress", "InterruptedAtShutdown", "CallbackCanceled", "PeriodicJobIsStopped"])
+error_class("CancelationError", ["ShutdownInProgress", "InterruptedAtShutdown", "CallbackCanceled", "PeriodicJobIsStopped", "ClientMarkedKilled"])
error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag",
"TooManyDocumentSequences",
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp
index 8ed19bb98c9..41049f15405 100644
--- a/src/mongo/db/client.cpp
+++ b/src/mongo/db/client.cpp
@@ -164,6 +164,14 @@ void Client::setCurrent(ServiceContext::UniqueClient client) {
currentClient = std::move(client);
}
+void Client::setKilled() noexcept {
+ stdx::lock_guard<Client> lk(*this);
+ _killed.store(true);
+ if (_opCtx) {
+ _serviceContext->killOperation(lk, _opCtx, ErrorCodes::ClientMarkedKilled);
+ }
+}
+
ThreadClient::ThreadClient(ServiceContext* serviceContext)
: ThreadClient(getThreadName(), serviceContext, nullptr) {}
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index 75e828c63c3..2c06e0e1453 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -41,6 +41,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/service_context.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/session.h"
@@ -229,6 +230,19 @@ public:
return _prng;
}
+ /**
+ * Signal the client's OperationContext that it has been killed.
+ * Any future OperationContext on this client will also receive a kill signal.
+ */
+ void setKilled() noexcept;
+
+ /**
+ * Get the state for killing the client's OperationContext.
+ */
+ bool getKilled() const noexcept {
+ return _killed.loadRelaxed();
+ }
+
private:
friend class ServiceContext;
friend class ThreadClient;
@@ -258,6 +272,8 @@ private:
bool _systemOperationKillable = false;
PseudoRandom _prng;
+
+ AtomicWord<bool> _killed{false};
};
/**
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index a88078acb5f..d5b8e11f8da 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -199,10 +199,17 @@ bool opShouldFail(Client* client, const BSONObj& failPointInfo) {
} // namespace
Status OperationContext::checkForInterruptNoAssert() noexcept {
- // TODO: Remove the MONGO_likely(getClient()) once all operation contexts are constructed with
- // clients.
- if (MONGO_likely(getClient() && getServiceContext()) &&
- getServiceContext()->getKillAllOperations() && !_isExecutingShutdown) {
+ // TODO: Remove the MONGO_likely(hasClientAndServiceContext) once all operation contexts are
+ // constructed with clients.
+ const auto hasClientAndServiceContext = getClient() && getServiceContext();
+
+ if (MONGO_likely(hasClientAndServiceContext) && getClient()->getKilled() &&
+ !_isExecutingShutdown) {
+ return Status(ErrorCodes::ClientMarkedKilled, "client has been killed");
+ }
+
+ if (MONGO_likely(hasClientAndServiceContext) && getServiceContext()->getKillAllOperations() &&
+ !_isExecutingShutdown) {
return Status(ErrorCodes::InterruptedAtShutdown, "interrupted at shutdown");
}
diff --git a/src/mongo/util/periodic_runner_impl.cpp b/src/mongo/util/periodic_runner_impl.cpp
index 88e83782957..06b1c63de2e 100644
--- a/src/mongo/util/periodic_runner_impl.cpp
+++ b/src/mongo/util/periodic_runner_impl.cpp
@@ -56,7 +56,7 @@ auto PeriodicRunnerImpl::makeJob(PeriodicJob job) -> JobAnchor {
PeriodicRunnerImpl::PeriodicJobImpl::PeriodicJobImpl(PeriodicJob job,
ClockSource* source,
ServiceContext* svc)
- : _job(std::move(job)), _clockSource(source), _serviceContext(svc) {}
+ : _job(std::move(job)), _clockSource(source), _serviceContext(svc), _client(nullptr) {}
void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
{
@@ -70,46 +70,54 @@ void PeriodicRunnerImpl::PeriodicJobImpl::_run() {
auto [startPromise, startFuture] = makePromiseFuture<void>();
_thread = stdx::thread([this, startPromise = std::move(startPromise)]() mutable {
- auto guard = makeGuard([this] { _stopPromise.emplaceValue(); });
+ ON_BLOCK_EXIT([this] { _stopPromise.emplaceValue(); });
- Client::initThread(_job.name, _serviceContext, nullptr);
-
- // Let start() know we're running
+ ThreadClient client(_job.name, _serviceContext, nullptr);
{
- stdx::lock_guard lk(_mutex);
- _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
- }
- startPromise.emplaceValue();
-
- stdx::unique_lock<Latch> lk(_mutex);
- while (_execStatus != ExecutionStatus::CANCELED) {
- // Wait until it's unpaused or canceled
- _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
- if (_execStatus == ExecutionStatus::CANCELED) {
- return;
+ // This ensures client object is not destructed so long as others can access it.
+ ON_BLOCK_EXIT([this] {
+ stdx::lock_guard lk(_mutex);
+ _client = nullptr;
+ });
+
+ // Let start() know we're running
+ {
+ stdx::lock_guard lk(_mutex);
+ _client = client.get();
+ _execStatus = PeriodicJobImpl::ExecutionStatus::RUNNING;
}
+ startPromise.emplaceValue();
+
+ stdx::unique_lock<Latch> lk(_mutex);
+ while (_execStatus != ExecutionStatus::CANCELED) {
+ // Wait until it's unpaused or canceled
+ _condvar.wait(lk, [&] { return _execStatus != ExecutionStatus::PAUSED; });
+ if (_execStatus == ExecutionStatus::CANCELED) {
+ return;
+ }
- auto start = _clockSource->now();
+ auto start = _clockSource->now();
- // Unlock while job is running so we can pause/cancel concurrently
- lk.unlock();
- _job.job(Client::getCurrent());
- lk.lock();
+ // Unlock while job is running so we can pause/cancel concurrently
+ lk.unlock();
+ _job.job(client.get());
+ lk.lock();
- auto getDeadlineFromInterval = [&] { return start + _job.interval; };
+ auto getDeadlineFromInterval = [&] { return start + _job.interval; };
- do {
- auto deadline = getDeadlineFromInterval();
+ do {
+ auto deadline = getDeadlineFromInterval();
- if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] {
- return _execStatus == ExecutionStatus::CANCELED ||
- getDeadlineFromInterval() != deadline;
- })) {
- if (_execStatus == ExecutionStatus::CANCELED) {
- return;
+ if (_clockSource->waitForConditionUntil(_condvar, lk, deadline, [&] {
+ return _execStatus == ExecutionStatus::CANCELED ||
+ getDeadlineFromInterval() != deadline;
+ })) {
+ if (_execStatus == ExecutionStatus::CANCELED) {
+ return;
+ }
}
- }
- } while (_clockSource->now() < getDeadlineFromInterval());
+ } while (_clockSource->now() < getDeadlineFromInterval());
+ }
}
});
@@ -155,15 +163,29 @@ void PeriodicRunnerImpl::PeriodicJobImpl::stop() {
return;
}
+ auto stopFuture = _stopPromise.getFuture();
+
// Only join once
if (lastExecStatus != ExecutionStatus::CANCELED) {
LOG(2) << "Stopping periodic job " << _job.name;
_condvar.notify_one();
+
+ // Kill the client thread and its opCtx (if any) before joining.
+ // So long as `_job` returns upon receiving the kill signal, the following ensures liveness
+ // (i.e., this method will eventually return).
+ if (!stopFuture.isReady()) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ // Check if the client thread has already returned.
+ if (_client) {
+ _client->setKilled();
+ }
+ }
+
_thread.join();
}
- _stopPromise.getFuture().get();
+ stopFuture.get();
}
Milliseconds PeriodicRunnerImpl::PeriodicJobImpl::getPeriod() {
diff --git a/src/mongo/util/periodic_runner_impl.h b/src/mongo/util/periodic_runner_impl.h
index 4a89b4b6a05..369fb19882c 100644
--- a/src/mongo/util/periodic_runner_impl.h
+++ b/src/mongo/util/periodic_runner_impl.h
@@ -79,6 +79,7 @@ private:
ClockSource* _clockSource;
ServiceContext* _serviceContext;
+ Client* _client;
stdx::thread _thread;
SharedPromise<void> _stopPromise;
diff --git a/src/mongo/util/periodic_runner_impl_test.cpp b/src/mongo/util/periodic_runner_impl_test.cpp
index 4f3f8ee44b6..ec193afffc6 100644
--- a/src/mongo/util/periodic_runner_impl_test.cpp
+++ b/src/mongo/util/periodic_runner_impl_test.cpp
@@ -30,8 +30,10 @@
#include <boost/optional.hpp>
#include "mongo/base/error_codes.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/platform/basic.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/periodic_runner_impl.h"
@@ -448,6 +450,43 @@ TEST_F(PeriodicRunnerImplTest, ChangingIntervalWorks) {
tearDown();
}
+TEST_F(PeriodicRunnerImplTest, StopProperlyInterruptsOpCtx) {
+ Milliseconds interval{5};
+ unittest::Barrier barrier(2);
+ AtomicWord<bool> killed{false};
+
+ PeriodicRunner::PeriodicJob job(
+ "job",
+ [&barrier, &killed](Client* client) {
+ stdx::condition_variable cv;
+ auto mutex = MONGO_MAKE_LATCH();
+ barrier.countDownAndWait();
+
+ try {
+ auto opCtx = client->makeOperationContext();
+ stdx::unique_lock<Latch> lk(mutex);
+ opCtx->waitForConditionOrInterrupt(cv, lk, [] { return false; });
+ } catch (const ExceptionForCat<ErrorCategory::Interruption>& e) {
+ ASSERT_EQ(e.code(), ErrorCodes::ClientMarkedKilled);
+ killed.store(true);
+ return;
+ }
+
+ MONGO_UNREACHABLE;
+ },
+ interval);
+
+ auto jobAnchor = runner().makeJob(std::move(job));
+ jobAnchor.start();
+
+ barrier.countDownAndWait();
+
+ jobAnchor.stop();
+ ASSERT(killed.load());
+
+ tearDown();
+}
+
TEST_F(PeriodicRunnerImplTest, ThrowsErrorOnceStopped) {
auto jobAnchor = makeStoppedJob();
ASSERT_THROWS_CODE_AND_WHAT(jobAnchor.start(),