diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2016-05-31 14:49:47 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2016-07-13 17:37:02 -0400 |
commit | d5985d3a661c45f1c952205f4b6d107c37fa034d (patch) | |
tree | 4281eaf54ebefc3359c1839dc7af04b1e9deebb9 /src/mongo/db/operation_context.cpp | |
parent | 3f8990345ec18fe2f0316859231c2424e4355b95 (diff) | |
download | mongo-d5985d3a661c45f1c952205f4b6d107c37fa034d.tar.gz |
SERVER-21004 Interruptible wait on condition variables with OperationContexts.
Diffstat (limited to 'src/mongo/db/operation_context.cpp')
-rw-r--r-- | src/mongo/db/operation_context.cpp | 172 |
1 files changed, 166 insertions, 6 deletions
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index ae018c38d0e..58529dffedf 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -32,6 +32,7 @@ #include "mongo/db/operation_context.h" +#include "mongo/bson/inline_decls.h" #include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/platform/random.h" @@ -40,6 +41,7 @@ #include "mongo/util/clock_source.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/system_tick_source.h" namespace mongo { @@ -79,11 +81,6 @@ OperationContext::OperationContext(Client* client, unsigned int opId) _elapsedTime(client ? client->getServiceContext()->getTickSource() : SystemTickSource::get()) {} -void OperationContext::markKilled(ErrorCodes::Error killCode) { - invariant(killCode != ErrorCodes::OK); - _killCode.compareAndSwap(ErrorCodes::OK, killCode); -} - void OperationContext::setDeadlineAndMaxTime(Date_t when, Microseconds maxTime) { invariant(!getClient()->isInDirectClient()); uassert(40120, "Illegal attempt to change operation deadline", !hasDeadline()); @@ -185,7 +182,7 @@ Status OperationContext::checkForInterruptNoAssert() { } if (hasDeadlineExpired()) { - markKilled(); + markKilled(ErrorCodes::ExceededTimeLimit); return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); } @@ -204,6 +201,169 @@ Status OperationContext::checkForInterruptNoAssert() { return Status::OK(); } +void OperationContext::waitForConditionOrInterrupt(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m) { + uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m)); +} + +Status OperationContext::waitForConditionOrInterruptNoAssert( + stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m) noexcept { + auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max()); + if (!status.isOK()) { + return status.getStatus(); + } + invariant(status.getValue() == stdx::cv_status::no_timeout); + return status.getStatus(); +} + +stdx::cv_status OperationContext::waitForConditionOrInterruptUntil( + stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) { + + return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline)); +} + +static NOINLINE_DECL stdx::cv_status cvWaitUntilWithClockSource(ClockSource* clockSource, + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline) { + if (deadline <= clockSource->now()) { + return stdx::cv_status::timeout; + } + + struct AlarmInfo { + stdx::mutex controlMutex; + stdx::mutex* waitMutex; + stdx::condition_variable* waitCV; + stdx::cv_status cvWaitResult = stdx::cv_status::no_timeout; + }; + auto alarmInfo = std::make_shared<AlarmInfo>(); + alarmInfo->waitCV = &cv; + alarmInfo->waitMutex = m.mutex(); + invariantOK(clockSource->setAlarm(deadline, [alarmInfo] { + stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex); + alarmInfo->cvWaitResult = stdx::cv_status::timeout; + if (!alarmInfo->waitMutex) { + return; + } + stdx::lock_guard<stdx::mutex> waitLk(*alarmInfo->waitMutex); + alarmInfo->waitCV->notify_all(); + })); + cv.wait(m); + m.unlock(); + stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex); + m.lock(); + alarmInfo->waitMutex = nullptr; + alarmInfo->waitCV = nullptr; + return alarmInfo->cvWaitResult; +} + +// Theory of operation for waitForConditionOrInterruptNoAssertUntil and markKilled: +// +// An operation indicates to potential killers that it is waiting on a condition variable by setting +// _waitMutex and _waitCV, while holding the lock on its parent Client. It then unlocks its Client, +// unblocking any killers, which are required to have locked the Client before calling markKilled. +// +// When _waitMutex and _waitCV are set, killers must lock _waitMutex before setting the _killCode, +// and must signal _waitCV before releasing _waitMutex. Unfortunately, they must lock _waitMutex +// without holding a lock on Client to avoid a deadlock with callers of +// waitForConditionOrInterruptNoAssertUntil(). So, in the event that _waitMutex is set, the killer +// increments _numKillers, drops the Client lock, acquires _waitMutex and then re-acquires the +// Client lock. We know that the Client, its OperationContext and _waitMutex will remain valid +// during this period because the caller of waitForConditionOrInterruptNoAssertUntil will not return +// while _numKillers > 0 and will not return until it has itself reacquired _waitMutex. Instead, +// that caller will keep waiting on _waitCV until _numKillers drops to 0. +// +// In essence, when _waitMutex is set, _killCode is guarded by _waitMutex and _waitCV, but when +// _waitMutex is not set, it is guarded by the Client spinlock. Changing _waitMutex is itself +// guarded by the Client spinlock and _numKillers. +// +// When _numKillers does drop to 0, the waiter will null out _waitMutex and _waitCV. +// +// This implementation adds a minimum of two spinlock acquire-release pairs to every condition +// variable wait. +StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept { + invariant(getClient()); + { + stdx::lock_guard<Client> clientLock(*getClient()); + invariant(!_waitMutex); + invariant(!_waitCV); + invariant(0 == _numKillers); + + // This interrupt check must be done while holding the client lock, so as not to race with a + // concurrent caller of markKilled. + auto status = checkForInterruptNoAssert(); + if (!status.isOK()) { + return status; + } + _waitMutex = m.mutex(); + _waitCV = &cv; + } + + if (hasDeadline()) { + deadline = std::min(deadline, getDeadline()); + } + + const auto waitStatus = [&] { + if (Date_t::max() == deadline) { + cv.wait(m); + return stdx::cv_status::no_timeout; + } + const auto clockSource = getServiceContext()->getPreciseClockSource(); + if (clockSource->tracksSystemClock()) { + return cv.wait_until(m, deadline.toSystemTimePoint()); + } + + // The following cases only occur during testing, when the precise clock source is + // virtualized and does not track the system clock. + return cvWaitUntilWithClockSource(clockSource, cv, m, deadline); + }(); + + // Continue waiting on cv until no other thread is attempting to kill this one. + cv.wait(m, [this] { + stdx::lock_guard<Client> clientLock(*getClient()); + if (0 == _numKillers) { + _waitMutex = nullptr; + _waitCV = nullptr; + return true; + } + return false; + }); + + auto status = checkForInterruptNoAssert(); + if (!status.isOK()) { + return status; + } + if (hasDeadline() && waitStatus == stdx::cv_status::timeout && deadline == getDeadline()) { + // It's possible that the system clock used in stdx::condition_variable::wait_until + // is slightly ahead of the FastClock used in checkForInterrupt. In this case, + // we treat the operation as though it has exceeded its time limit, just as if the + // FastClock and system clock had agreed. + markKilled(ErrorCodes::ExceededTimeLimit); + return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); + } + return waitStatus; +} + +void OperationContext::markKilled(ErrorCodes::Error killCode) { + invariant(killCode != ErrorCodes::OK); + stdx::unique_lock<stdx::mutex> lkWaitMutex; + if (_waitMutex) { + invariant(++_numKillers > 0); + getClient()->unlock(); + ON_BLOCK_EXIT([this]() noexcept { + getClient()->lock(); + invariant(--_numKillers >= 0); + }); + lkWaitMutex = stdx::unique_lock<stdx::mutex>{*_waitMutex}; + } + _killCode.compareAndSwap(ErrorCodes::OK, killCode); + if (lkWaitMutex && _numKillers == 0) { + invariant(_waitCV); + _waitCV->notify_all(); + } +} + RecoveryUnit* OperationContext::releaseRecoveryUnit() { return _recoveryUnit.release(); } |