diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2016-05-31 14:49:47 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2016-07-13 17:37:02 -0400 |
commit | d5985d3a661c45f1c952205f4b6d107c37fa034d (patch) | |
tree | 4281eaf54ebefc3359c1839dc7af04b1e9deebb9 /src/mongo/db | |
parent | 3f8990345ec18fe2f0316859231c2424e4355b95 (diff) | |
download | mongo-d5985d3a661c45f1c952205f4b6d107c37fa034d.tar.gz |
SERVER-21004 Interruptible wait on condition variables with OperationContexts.
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/client.h | 2 | ||||
-rw-r--r-- | src/mongo/db/operation_context.cpp | 172 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 104 | ||||
-rw-r--r-- | src/mongo/db/operation_context_test.cpp | 270 |
5 files changed, 528 insertions, 21 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index e76cb918b17..86575fed86e 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -174,6 +174,7 @@ env.CppUnitTest( 'service_context', '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/unittest/concurrency', '$BUILD_DIR/mongo/util/clock_source_mock', ], ) diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index a7523981e47..ca9a159d579 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -176,7 +176,7 @@ private: const ConnectionId _connectionId; // Protects the contents of the Client (such as changing the OperationContext, etc) - mutable SpinLock _lock; + SpinLock _lock; // Whether this client is running as DBDirectClient bool _inDirectClient = false; diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index ae018c38d0e..58529dffedf 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -32,6 +32,7 @@ #include "mongo/db/operation_context.h" +#include "mongo/bson/inline_decls.h" #include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/platform/random.h" @@ -40,6 +41,7 @@ #include "mongo/util/clock_source.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/system_tick_source.h" namespace mongo { @@ -79,11 +81,6 @@ OperationContext::OperationContext(Client* client, unsigned int opId) _elapsedTime(client ? client->getServiceContext()->getTickSource() : SystemTickSource::get()) {} -void OperationContext::markKilled(ErrorCodes::Error killCode) { - invariant(killCode != ErrorCodes::OK); - _killCode.compareAndSwap(ErrorCodes::OK, killCode); -} - void OperationContext::setDeadlineAndMaxTime(Date_t when, Microseconds maxTime) { invariant(!getClient()->isInDirectClient()); uassert(40120, "Illegal attempt to change operation deadline", !hasDeadline()); @@ -185,7 +182,7 @@ Status OperationContext::checkForInterruptNoAssert() { } if (hasDeadlineExpired()) { - markKilled(); + markKilled(ErrorCodes::ExceededTimeLimit); return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); } @@ -204,6 +201,169 @@ Status OperationContext::checkForInterruptNoAssert() { return Status::OK(); } +void OperationContext::waitForConditionOrInterrupt(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m) { + uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m)); +} + +Status OperationContext::waitForConditionOrInterruptNoAssert( + stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m) noexcept { + auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max()); + if (!status.isOK()) { + return status.getStatus(); + } + invariant(status.getValue() == stdx::cv_status::no_timeout); + return status.getStatus(); +} + +stdx::cv_status OperationContext::waitForConditionOrInterruptUntil( + stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) { + + return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline)); +} + +static NOINLINE_DECL stdx::cv_status cvWaitUntilWithClockSource(ClockSource* clockSource, + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline) { + if (deadline <= clockSource->now()) { + return stdx::cv_status::timeout; + } + + struct AlarmInfo { + stdx::mutex controlMutex; + stdx::mutex* waitMutex; + stdx::condition_variable* waitCV; + stdx::cv_status cvWaitResult = stdx::cv_status::no_timeout; + }; + auto alarmInfo = std::make_shared<AlarmInfo>(); + alarmInfo->waitCV = &cv; + alarmInfo->waitMutex = m.mutex(); + invariantOK(clockSource->setAlarm(deadline, [alarmInfo] { + stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex); + alarmInfo->cvWaitResult = stdx::cv_status::timeout; + if (!alarmInfo->waitMutex) { + return; + } + stdx::lock_guard<stdx::mutex> waitLk(*alarmInfo->waitMutex); + alarmInfo->waitCV->notify_all(); + })); + cv.wait(m); + m.unlock(); + stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex); + m.lock(); + alarmInfo->waitMutex = nullptr; + alarmInfo->waitCV = nullptr; + return alarmInfo->cvWaitResult; +} + +// Theory of operation for waitForConditionOrInterruptNoAssertUntil and markKilled: +// +// An operation indicates to potential killers that it is waiting on a condition variable by setting +// _waitMutex and _waitCV, while holding the lock on its parent Client. It then unlocks its Client, +// unblocking any killers, which are required to have locked the Client before calling markKilled. +// +// When _waitMutex and _waitCV are set, killers must lock _waitMutex before setting the _killCode, +// and must signal _waitCV before releasing _waitMutex. Unfortunately, they must lock _waitMutex +// without holding a lock on Client to avoid a deadlock with callers of +// waitForConditionOrInterruptNoAssertUntil(). So, in the event that _waitMutex is set, the killer +// increments _numKillers, drops the Client lock, acquires _waitMutex and then re-acquires the +// Client lock. We know that the Client, its OperationContext and _waitMutex will remain valid +// during this period because the caller of waitForConditionOrInterruptNoAssertUntil will not return +// while _numKillers > 0 and will not return until it has itself reacquired _waitMutex. Instead, +// that caller will keep waiting on _waitCV until _numKillers drops to 0. +// +// In essence, when _waitMutex is set, _killCode is guarded by _waitMutex and _waitCV, but when +// _waitMutex is not set, it is guarded by the Client spinlock. Changing _waitMutex is itself +// guarded by the Client spinlock and _numKillers. +// +// When _numKillers does drop to 0, the waiter will null out _waitMutex and _waitCV. +// +// This implementation adds a minimum of two spinlock acquire-release pairs to every condition +// variable wait. +StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept { + invariant(getClient()); + { + stdx::lock_guard<Client> clientLock(*getClient()); + invariant(!_waitMutex); + invariant(!_waitCV); + invariant(0 == _numKillers); + + // This interrupt check must be done while holding the client lock, so as not to race with a + // concurrent caller of markKilled. + auto status = checkForInterruptNoAssert(); + if (!status.isOK()) { + return status; + } + _waitMutex = m.mutex(); + _waitCV = &cv; + } + + if (hasDeadline()) { + deadline = std::min(deadline, getDeadline()); + } + + const auto waitStatus = [&] { + if (Date_t::max() == deadline) { + cv.wait(m); + return stdx::cv_status::no_timeout; + } + const auto clockSource = getServiceContext()->getPreciseClockSource(); + if (clockSource->tracksSystemClock()) { + return cv.wait_until(m, deadline.toSystemTimePoint()); + } + + // The following cases only occur during testing, when the precise clock source is + // virtualized and does not track the system clock. + return cvWaitUntilWithClockSource(clockSource, cv, m, deadline); + }(); + + // Continue waiting on cv until no other thread is attempting to kill this one. + cv.wait(m, [this] { + stdx::lock_guard<Client> clientLock(*getClient()); + if (0 == _numKillers) { + _waitMutex = nullptr; + _waitCV = nullptr; + return true; + } + return false; + }); + + auto status = checkForInterruptNoAssert(); + if (!status.isOK()) { + return status; + } + if (hasDeadline() && waitStatus == stdx::cv_status::timeout && deadline == getDeadline()) { + // It's possible that the system clock used in stdx::condition_variable::wait_until + // is slightly ahead of the FastClock used in checkForInterrupt. In this case, + // we treat the operation as though it has exceeded its time limit, just as if the + // FastClock and system clock had agreed. + markKilled(ErrorCodes::ExceededTimeLimit); + return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); + } + return waitStatus; +} + +void OperationContext::markKilled(ErrorCodes::Error killCode) { + invariant(killCode != ErrorCodes::OK); + stdx::unique_lock<stdx::mutex> lkWaitMutex; + if (_waitMutex) { + invariant(++_numKillers > 0); + getClient()->unlock(); + ON_BLOCK_EXIT([this]() noexcept { + getClient()->lock(); + invariant(--_numKillers >= 0); + }); + lkWaitMutex = stdx::unique_lock<stdx::mutex>{*_waitMutex}; + } + _killCode.compareAndSwap(ErrorCodes::OK, killCode); + if (lkWaitMutex && _numKillers == 0) { + invariant(_waitCV); + _waitCV->notify_all(); + } +} + RecoveryUnit* OperationContext::releaseRecoveryUnit() { return _recoveryUnit.release(); } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 0035bfa70d4..aadb6c9f981 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -38,6 +38,8 @@ #include "mongo/db/storage/storage_options.h" #include "mongo/db/write_concern_options.h" #include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/decorable.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" @@ -126,10 +128,8 @@ public: */ std::unique_ptr<Locker> releaseLockState(); - // --- operation level info? --- - /** - * Raises a UserAssertion if this operation is in a killed state. + * Raises a UserException if this operation is in a killed state. */ void checkForInterrupt(); @@ -139,6 +139,76 @@ public: Status checkForInterruptNoAssert(); /** + * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the + * deadline on this operation to expire. In the event of interruption or operation deadline + * expiration, raises a UserException with an error code indicating the interruption type. + */ + void waitForConditionOrInterrupt(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m); + + /** + * Waits on condition "cv" for "pred" until "pred" returns true, or this operation + * is interrupted or its deadline expires. Throws a DBException for interruption and + * deadline expiration. + */ + template <typename Pred> + void waitForConditionOrInterrupt(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Pred pred) { + while (!pred()) { + waitForConditionOrInterrupt(cv, m); + } + } + + /** + * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing + * a DBException to report interruption. + */ + Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m) noexcept; + + /** + * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or + * for the operation to be interrupted, or for the operation's own deadline to expire. + * + * If the operation deadline expires or the operation is interrupted, throws a DBException. If + * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns + * cv_status::no_timeout. + */ + stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline); + + /** + * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline" + * expires, or this operation is interrupted, or this operation's own deadline expires. + * + * + * If the operation deadline expires or the operation is interrupted, throws a DBException. If + * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns + * cv_status::no_timeout indicating that "pred" finally returned true. + */ + template <typename Pred> + stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline, + Pred pred) { + while (!pred()) { + if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) { + return stdx::cv_status::timeout; + } + } + return stdx::cv_status::no_timeout; + } + + /** + * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and + * non-ok status indicates the error instead of a DBException. + */ + StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept; + + /** * Delegates to CurOp, but is included here to break dependencies. * Caller does not own the pointer. * @@ -200,10 +270,11 @@ public: * checkForInterruptNoAssert by the thread executing the operation will start returning the * specified error code. * - * If multiple threads kill the same operation with different codes, only the first code will - * be preserved. + * If multiple threads kill the same operation with different codes, only the first code + * will be preserved. * - * May be called by any thread that has locked the Client owning this operation context. + * May be called by any thread that has locked the Client owning this operation context, or + * by the thread executing this on behalf of this OperationContext. */ void markKilled(ErrorCodes::Error killCode = ErrorCodes::Interrupted); @@ -272,13 +343,12 @@ public: return _deadline; } - // - // Legacy "max time" methods for controlling operation deadlines. - // - /** * Returns the number of microseconds remaining for this operation's time limit, or the * special value Microseconds::max() if the operation has no time limit. + * + * This is a legacy "max time" method for controlling operation deadlines. Prefer not to use it + * in new code. */ Microseconds getRemainingMaxTimeMicros() const; @@ -312,6 +382,20 @@ private: // once from OK to some kill code. AtomicWord<ErrorCodes::Error> _killCode{ErrorCodes::OK}; + + // If non-null, _waitMutex and _waitCV are the (mutex, condition variable) pair that the + // operation is currently waiting on inside a call to waitForConditionOrInterrupt...(). + // All access guarded by the Client's lock. + stdx::mutex* _waitMutex = nullptr; + stdx::condition_variable* _waitCV = nullptr; + + // If _waitMutex and _waitCV are non-null, this is the number of threads in a call to markKilled + // actively attempting to kill the operation. If this value is non-zero, the operation is inside + // waitForConditionOrInterrupt...() and must stay there until _numKillers reaches 0. + // + // All access guarded by the Client's lock. + int _numKillers = 0; + WriteConcernOptions _writeConcern; Date_t _deadline = diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp index cae366b845f..a592c98b03f 100644 --- a/src/mongo/db/operation_context_test.cpp +++ b/src/mongo/db/operation_context_test.cpp @@ -32,7 +32,10 @@ #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_noop.h" +#include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" #include "mongo/util/tick_source_mock.h" @@ -42,18 +45,41 @@ namespace mongo { namespace { +std::ostream& operator<<(std::ostream& os, stdx::cv_status cvStatus) { + switch (cvStatus) { + case stdx::cv_status::timeout: + return os << "timeout"; + case stdx::cv_status::no_timeout: + return os << "no_timeout"; + default: + MONGO_UNREACHABLE; + } +} + +std::ostream& operator<<(std::ostream& os, stdx::future_status futureStatus) { + switch (futureStatus) { + case stdx::future_status::ready: + return os << "ready"; + case stdx::future_status::deferred: + return os << "deferred"; + case stdx::future_status::timeout: + return os << "timeout"; + default: + MONGO_UNREACHABLE; + } +} + class OperationDeadlineTests : public unittest::Test { public: void setUp() { - auto uniqueMockClock = stdx::make_unique<ClockSourceMock>(); - mockClock = uniqueMockClock.get(); service = stdx::make_unique<ServiceContextNoop>(); - service->setFastClockSource(std::move(uniqueMockClock)); + service->setFastClockSource(stdx::make_unique<SharedClockSourceAdapter>(mockClock)); + service->setPreciseClockSource(stdx::make_unique<SharedClockSourceAdapter>(mockClock)); service->setTickSource(stdx::make_unique<TickSourceMock>()); client = service->makeClient("OperationDeadlineTest"); } - ClockSourceMock* mockClock; + const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>(); std::unique_ptr<ServiceContext> service; ServiceContext::UniqueClient client; }; @@ -124,6 +150,242 @@ TEST_F(OperationDeadlineTests, VeryLargeRelativeDeadlinesNanoseconds) { txn->getDeadline()); } +TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCV) { + auto txn = client->makeOperationContext(); + txn->setDeadlineByDate(mockClock->now()); + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, txn->waitForConditionOrInterruptNoAssert(cv, lk)); +} + +TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) { + auto txn = client->makeOperationContext(); + txn->setDeadlineByDate(mockClock->now()); + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + ASSERT_EQ( + ErrorCodes::ExceededTimeLimit, + txn->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now() + Seconds{10})); +} + +TEST_F(OperationDeadlineTests, WaitForKilledOpCV) { + auto txn = client->makeOperationContext(); + txn->markKilled(); + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + ASSERT_EQ(ErrorCodes::Interrupted, txn->waitForConditionOrInterruptNoAssert(cv, lk)); +} + +TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) { + auto txn = client->makeOperationContext(); + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + ASSERT_EQ(stdx::cv_status::timeout, + unittest::assertGet( + txn->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()))); +} + +TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) { + auto txn = client->makeOperationContext(); + txn->setDeadlineByDate(mockClock->now() + Seconds{10}); + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + ASSERT_EQ(stdx::cv_status::timeout, + unittest::assertGet( + txn->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()))); +} + +TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpiration) { + auto txn = client->makeOperationContext(); + txn->setDeadlineByDate(mockClock->now()); + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + ASSERT_EQ(ErrorCodes::ExceededTimeLimit, + txn->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())); +} + +class ThreadedOperationDeadlineTests : public OperationDeadlineTests { +public: + struct WaitTestState { + void signal() { + stdx::lock_guard<stdx::mutex> lk(mutex); + invariant(!isSignaled); + isSignaled = true; + cv.notify_all(); + } + + stdx::mutex mutex; + stdx::condition_variable cv; + bool isSignaled = false; + }; + + stdx::future<stdx::cv_status> startWaiterWithUntilAndMaxTime(OperationContext* txn, + WaitTestState* state, + Date_t until, + Date_t maxTime) { + + auto barrier = std::make_shared<unittest::Barrier>(2); + auto task = stdx::packaged_task<stdx::cv_status()>([=] { + if (maxTime < Date_t::max()) { + txn->setDeadlineByDate(maxTime); + } + auto predicate = [state] { return state->isSignaled; }; + stdx::unique_lock<stdx::mutex> lk(state->mutex); + barrier->countDownAndWait(); + if (until < Date_t::max()) { + return txn->waitForConditionOrInterruptUntil(state->cv, lk, until, predicate); + } else { + txn->waitForConditionOrInterrupt(state->cv, lk, predicate); + return stdx::cv_status::no_timeout; + } + }); + auto result = task.get_future(); + stdx::thread(std::move(task)).detach(); + barrier->countDownAndWait(); + + // Now we know that the waiter task must own the mutex, because it does not signal the + // barrier until it does. + stdx::lock_guard<stdx::mutex> lk(state->mutex); + + // Assuming that txn has not already been interrupted and that maxTime and until are + // unexpired, we know that the waiter must be blocked in the condition variable, because it + // held the mutex before we tried to acquire it, and only releases it on condition variable + // wait. + return result; + } + + stdx::future<stdx::cv_status> startWaiter(OperationContext* txn, WaitTestState* state) { + return startWaiterWithUntilAndMaxTime(txn, state, Date_t::max(), Date_t::max()); + } +}; + +TEST_F(ThreadedOperationDeadlineTests, KillArrivesWhileWaiting) { + auto txn = client->makeOperationContext(); + WaitTestState state; + auto waiterResult = startWaiter(txn.get(), &state); + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())); + { + stdx::lock_guard<Client> clientLock(*txn->getClient()); + txn->markKilled(); + } + ASSERT_THROWS_CODE(waiterResult.get(), DBException, ErrorCodes::Interrupted); +} + +TEST_F(ThreadedOperationDeadlineTests, MaxTimeExpiresWhileWaiting) { + auto txn = client->makeOperationContext(); + WaitTestState state; + const auto startDate = mockClock->now(); + auto waiterResult = startWaiterWithUntilAndMaxTime(txn.get(), + &state, + startDate + Seconds{60}, // until + startDate + Seconds{10}); // maxTime + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())) + << waiterResult.get(); + mockClock->advance(Seconds{9}); + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())); + mockClock->advance(Seconds{2}); + ASSERT_THROWS_CODE(waiterResult.get(), DBException, ErrorCodes::ExceededTimeLimit); +} + +TEST_F(ThreadedOperationDeadlineTests, UntilExpiresWhileWaiting) { + auto txn = client->makeOperationContext(); + WaitTestState state; + const auto startDate = mockClock->now(); + auto waiterResult = startWaiterWithUntilAndMaxTime(txn.get(), + &state, + startDate + Seconds{10}, // until + startDate + Seconds{60}); // maxTime + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())) + << waiterResult.get(); + mockClock->advance(Seconds{9}); + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())); + mockClock->advance(Seconds{2}); + ASSERT_EQ(stdx::cv_status::timeout, waiterResult.get()); +} + +TEST_F(ThreadedOperationDeadlineTests, SignalOne) { + auto txn = client->makeOperationContext(); + WaitTestState state; + auto waiterResult = startWaiter(txn.get(), &state); + + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())) + << waiterResult.get(); + state.signal(); + ASSERT_EQ(stdx::cv_status::no_timeout, waiterResult.get()); +} + +TEST_F(ThreadedOperationDeadlineTests, KillOneSignalAnother) { + auto client1 = service->makeClient("client1"); + auto client2 = service->makeClient("client2"); + auto txn1 = client1->makeOperationContext(); + auto txn2 = client2->makeOperationContext(); + WaitTestState state1; + WaitTestState state2; + auto waiterResult1 = startWaiter(txn1.get(), &state1); + auto waiterResult2 = startWaiter(txn2.get(), &state2); + ASSERT_NE(stdx::future_status::ready, + waiterResult1.wait_for(Milliseconds::zero().toSystemDuration())); + ASSERT_NE(stdx::future_status::ready, + waiterResult2.wait_for(Milliseconds::zero().toSystemDuration())); + { + stdx::lock_guard<Client> clientLock(*txn1->getClient()); + txn1->markKilled(); + } + ASSERT_THROWS_CODE(waiterResult1.get(), DBException, ErrorCodes::Interrupted); + ASSERT_NE(stdx::future_status::ready, + waiterResult2.wait_for(Milliseconds::zero().toSystemDuration())); + state2.signal(); + ASSERT_EQ(stdx::cv_status::no_timeout, waiterResult2.get()); +} + +TEST_F(ThreadedOperationDeadlineTests, SignalBeforeUntilExpires) { + auto txn = client->makeOperationContext(); + WaitTestState state; + const auto startDate = mockClock->now(); + auto waiterResult = startWaiterWithUntilAndMaxTime(txn.get(), + &state, + startDate + Seconds{10}, // until + startDate + Seconds{60}); // maxTime + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())) + << waiterResult.get(); + mockClock->advance(Seconds{9}); + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())); + state.signal(); + ASSERT_EQ(stdx::cv_status::no_timeout, waiterResult.get()); +} + +TEST_F(ThreadedOperationDeadlineTests, SignalBeforeMaxTimeExpires) { + auto txn = client->makeOperationContext(); + WaitTestState state; + const auto startDate = mockClock->now(); + auto waiterResult = startWaiterWithUntilAndMaxTime(txn.get(), + &state, + startDate + Seconds{60}, // until + startDate + Seconds{10}); // maxTime + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())) + << waiterResult.get(); + mockClock->advance(Seconds{9}); + ASSERT_NE(stdx::future_status::ready, + waiterResult.wait_for(Milliseconds::zero().toSystemDuration())); + state.signal(); + ASSERT_EQ(stdx::cv_status::no_timeout, waiterResult.get()); +} + } // namespace } // namespace mongo |