summaryrefslogtreecommitdiff
path: root/src/mongo/db/operation_context.cpp
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2016-05-31 14:49:47 -0400
committerAndy Schwerin <schwerin@mongodb.com>2016-07-13 17:37:02 -0400
commitd5985d3a661c45f1c952205f4b6d107c37fa034d (patch)
tree4281eaf54ebefc3359c1839dc7af04b1e9deebb9 /src/mongo/db/operation_context.cpp
parent3f8990345ec18fe2f0316859231c2424e4355b95 (diff)
downloadmongo-d5985d3a661c45f1c952205f4b6d107c37fa034d.tar.gz
SERVER-21004 Interruptible wait on condition variables with OperationContexts.
Diffstat (limited to 'src/mongo/db/operation_context.cpp')
-rw-r--r--src/mongo/db/operation_context.cpp172
1 files changed, 166 insertions, 6 deletions
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index ae018c38d0e..58529dffedf 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/operation_context.h"
+#include "mongo/bson/inline_decls.h"
#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/random.h"
@@ -40,6 +41,7 @@
#include "mongo/util/clock_source.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/system_tick_source.h"
namespace mongo {
@@ -79,11 +81,6 @@ OperationContext::OperationContext(Client* client, unsigned int opId)
_elapsedTime(client ? client->getServiceContext()->getTickSource()
: SystemTickSource::get()) {}
-void OperationContext::markKilled(ErrorCodes::Error killCode) {
- invariant(killCode != ErrorCodes::OK);
- _killCode.compareAndSwap(ErrorCodes::OK, killCode);
-}
-
void OperationContext::setDeadlineAndMaxTime(Date_t when, Microseconds maxTime) {
invariant(!getClient()->isInDirectClient());
uassert(40120, "Illegal attempt to change operation deadline", !hasDeadline());
@@ -185,7 +182,7 @@ Status OperationContext::checkForInterruptNoAssert() {
}
if (hasDeadlineExpired()) {
- markKilled();
+ markKilled(ErrorCodes::ExceededTimeLimit);
return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit");
}
@@ -204,6 +201,169 @@ Status OperationContext::checkForInterruptNoAssert() {
return Status::OK();
}
+void OperationContext::waitForConditionOrInterrupt(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m) {
+ uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
+}
+
+Status OperationContext::waitForConditionOrInterruptNoAssert(
+ stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m) noexcept {
+ auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max());
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+ invariant(status.getValue() == stdx::cv_status::no_timeout);
+ return status.getStatus();
+}
+
+stdx::cv_status OperationContext::waitForConditionOrInterruptUntil(
+ stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) {
+
+ return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
+}
+
+static NOINLINE_DECL stdx::cv_status cvWaitUntilWithClockSource(ClockSource* clockSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) {
+ if (deadline <= clockSource->now()) {
+ return stdx::cv_status::timeout;
+ }
+
+ struct AlarmInfo {
+ stdx::mutex controlMutex;
+ stdx::mutex* waitMutex;
+ stdx::condition_variable* waitCV;
+ stdx::cv_status cvWaitResult = stdx::cv_status::no_timeout;
+ };
+ auto alarmInfo = std::make_shared<AlarmInfo>();
+ alarmInfo->waitCV = &cv;
+ alarmInfo->waitMutex = m.mutex();
+ invariantOK(clockSource->setAlarm(deadline, [alarmInfo] {
+ stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex);
+ alarmInfo->cvWaitResult = stdx::cv_status::timeout;
+ if (!alarmInfo->waitMutex) {
+ return;
+ }
+ stdx::lock_guard<stdx::mutex> waitLk(*alarmInfo->waitMutex);
+ alarmInfo->waitCV->notify_all();
+ }));
+ cv.wait(m);
+ m.unlock();
+ stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex);
+ m.lock();
+ alarmInfo->waitMutex = nullptr;
+ alarmInfo->waitCV = nullptr;
+ return alarmInfo->cvWaitResult;
+}
+
+// Theory of operation for waitForConditionOrInterruptNoAssertUntil and markKilled:
+//
+// An operation indicates to potential killers that it is waiting on a condition variable by setting
+// _waitMutex and _waitCV, while holding the lock on its parent Client. It then unlocks its Client,
+// unblocking any killers, which are required to have locked the Client before calling markKilled.
+//
+// When _waitMutex and _waitCV are set, killers must lock _waitMutex before setting the _killCode,
+// and must signal _waitCV before releasing _waitMutex. Unfortunately, they must lock _waitMutex
+// without holding a lock on Client to avoid a deadlock with callers of
+// waitForConditionOrInterruptNoAssertUntil(). So, in the event that _waitMutex is set, the killer
+// increments _numKillers, drops the Client lock, acquires _waitMutex and then re-acquires the
+// Client lock. We know that the Client, its OperationContext and _waitMutex will remain valid
+// during this period because the caller of waitForConditionOrInterruptNoAssertUntil will not return
+// while _numKillers > 0 and will not return until it has itself reacquired _waitMutex. Instead,
+// that caller will keep waiting on _waitCV until _numKillers drops to 0.
+//
+// In essence, when _waitMutex is set, _killCode is guarded by _waitMutex and _waitCV, but when
+// _waitMutex is not set, it is guarded by the Client spinlock. Changing _waitMutex is itself
+// guarded by the Client spinlock and _numKillers.
+//
+// When _numKillers does drop to 0, the waiter will null out _waitMutex and _waitCV.
+//
+// This implementation adds a minimum of two spinlock acquire-release pairs to every condition
+// variable wait.
+StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept {
+ invariant(getClient());
+ {
+ stdx::lock_guard<Client> clientLock(*getClient());
+ invariant(!_waitMutex);
+ invariant(!_waitCV);
+ invariant(0 == _numKillers);
+
+ // This interrupt check must be done while holding the client lock, so as not to race with a
+ // concurrent caller of markKilled.
+ auto status = checkForInterruptNoAssert();
+ if (!status.isOK()) {
+ return status;
+ }
+ _waitMutex = m.mutex();
+ _waitCV = &cv;
+ }
+
+ if (hasDeadline()) {
+ deadline = std::min(deadline, getDeadline());
+ }
+
+ const auto waitStatus = [&] {
+ if (Date_t::max() == deadline) {
+ cv.wait(m);
+ return stdx::cv_status::no_timeout;
+ }
+ const auto clockSource = getServiceContext()->getPreciseClockSource();
+ if (clockSource->tracksSystemClock()) {
+ return cv.wait_until(m, deadline.toSystemTimePoint());
+ }
+
+ // The following cases only occur during testing, when the precise clock source is
+ // virtualized and does not track the system clock.
+ return cvWaitUntilWithClockSource(clockSource, cv, m, deadline);
+ }();
+
+ // Continue waiting on cv until no other thread is attempting to kill this one.
+ cv.wait(m, [this] {
+ stdx::lock_guard<Client> clientLock(*getClient());
+ if (0 == _numKillers) {
+ _waitMutex = nullptr;
+ _waitCV = nullptr;
+ return true;
+ }
+ return false;
+ });
+
+ auto status = checkForInterruptNoAssert();
+ if (!status.isOK()) {
+ return status;
+ }
+ if (hasDeadline() && waitStatus == stdx::cv_status::timeout && deadline == getDeadline()) {
+ // It's possible that the system clock used in stdx::condition_variable::wait_until
+ // is slightly ahead of the FastClock used in checkForInterrupt. In this case,
+ // we treat the operation as though it has exceeded its time limit, just as if the
+ // FastClock and system clock had agreed.
+ markKilled(ErrorCodes::ExceededTimeLimit);
+ return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit");
+ }
+ return waitStatus;
+}
+
+void OperationContext::markKilled(ErrorCodes::Error killCode) {
+ invariant(killCode != ErrorCodes::OK);
+ stdx::unique_lock<stdx::mutex> lkWaitMutex;
+ if (_waitMutex) {
+ invariant(++_numKillers > 0);
+ getClient()->unlock();
+ ON_BLOCK_EXIT([this]() noexcept {
+ getClient()->lock();
+ invariant(--_numKillers >= 0);
+ });
+ lkWaitMutex = stdx::unique_lock<stdx::mutex>{*_waitMutex};
+ }
+ _killCode.compareAndSwap(ErrorCodes::OK, killCode);
+ if (lkWaitMutex && _numKillers == 0) {
+ invariant(_waitCV);
+ _waitCV->notify_all();
+ }
+}
+
RecoveryUnit* OperationContext::releaseRecoveryUnit() {
return _recoveryUnit.release();
}