summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2016-05-31 14:49:47 -0400
committerAndy Schwerin <schwerin@mongodb.com>2016-07-13 17:37:02 -0400
commitd5985d3a661c45f1c952205f4b6d107c37fa034d (patch)
tree4281eaf54ebefc3359c1839dc7af04b1e9deebb9 /src/mongo/db
parent3f8990345ec18fe2f0316859231c2424e4355b95 (diff)
downloadmongo-d5985d3a661c45f1c952205f4b6d107c37fa034d.tar.gz
SERVER-21004 Interruptible wait on condition variables with OperationContexts.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/client.h2
-rw-r--r--src/mongo/db/operation_context.cpp172
-rw-r--r--src/mongo/db/operation_context.h104
-rw-r--r--src/mongo/db/operation_context_test.cpp270
5 files changed, 528 insertions, 21 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index e76cb918b17..86575fed86e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -174,6 +174,7 @@ env.CppUnitTest(
'service_context',
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/unittest/concurrency',
'$BUILD_DIR/mongo/util/clock_source_mock',
],
)
diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h
index a7523981e47..ca9a159d579 100644
--- a/src/mongo/db/client.h
+++ b/src/mongo/db/client.h
@@ -176,7 +176,7 @@ private:
const ConnectionId _connectionId;
// Protects the contents of the Client (such as changing the OperationContext, etc)
- mutable SpinLock _lock;
+ SpinLock _lock;
// Whether this client is running as DBDirectClient
bool _inDirectClient = false;
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index ae018c38d0e..58529dffedf 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/operation_context.h"
+#include "mongo/bson/inline_decls.h"
#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/random.h"
@@ -40,6 +41,7 @@
#include "mongo/util/clock_source.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/system_tick_source.h"
namespace mongo {
@@ -79,11 +81,6 @@ OperationContext::OperationContext(Client* client, unsigned int opId)
_elapsedTime(client ? client->getServiceContext()->getTickSource()
: SystemTickSource::get()) {}
-void OperationContext::markKilled(ErrorCodes::Error killCode) {
- invariant(killCode != ErrorCodes::OK);
- _killCode.compareAndSwap(ErrorCodes::OK, killCode);
-}
-
void OperationContext::setDeadlineAndMaxTime(Date_t when, Microseconds maxTime) {
invariant(!getClient()->isInDirectClient());
uassert(40120, "Illegal attempt to change operation deadline", !hasDeadline());
@@ -185,7 +182,7 @@ Status OperationContext::checkForInterruptNoAssert() {
}
if (hasDeadlineExpired()) {
- markKilled();
+ markKilled(ErrorCodes::ExceededTimeLimit);
return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit");
}
@@ -204,6 +201,169 @@ Status OperationContext::checkForInterruptNoAssert() {
return Status::OK();
}
+void OperationContext::waitForConditionOrInterrupt(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m) {
+ uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
+}
+
+Status OperationContext::waitForConditionOrInterruptNoAssert(
+ stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m) noexcept {
+ auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max());
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+ invariant(status.getValue() == stdx::cv_status::no_timeout);
+ return status.getStatus();
+}
+
+stdx::cv_status OperationContext::waitForConditionOrInterruptUntil(
+ stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) {
+
+ return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
+}
+
+static NOINLINE_DECL stdx::cv_status cvWaitUntilWithClockSource(ClockSource* clockSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) {
+ if (deadline <= clockSource->now()) {
+ return stdx::cv_status::timeout;
+ }
+
+ struct AlarmInfo {
+ stdx::mutex controlMutex;
+ stdx::mutex* waitMutex;
+ stdx::condition_variable* waitCV;
+ stdx::cv_status cvWaitResult = stdx::cv_status::no_timeout;
+ };
+ auto alarmInfo = std::make_shared<AlarmInfo>();
+ alarmInfo->waitCV = &cv;
+ alarmInfo->waitMutex = m.mutex();
+ invariantOK(clockSource->setAlarm(deadline, [alarmInfo] {
+ stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex);
+ alarmInfo->cvWaitResult = stdx::cv_status::timeout;
+ if (!alarmInfo->waitMutex) {
+ return;
+ }
+ stdx::lock_guard<stdx::mutex> waitLk(*alarmInfo->waitMutex);
+ alarmInfo->waitCV->notify_all();
+ }));
+ cv.wait(m);
+ m.unlock();
+ stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex);
+ m.lock();
+ alarmInfo->waitMutex = nullptr;
+ alarmInfo->waitCV = nullptr;
+ return alarmInfo->cvWaitResult;
+}
+
+// Theory of operation for waitForConditionOrInterruptNoAssertUntil and markKilled:
+//
+// An operation indicates to potential killers that it is waiting on a condition variable by setting
+// _waitMutex and _waitCV, while holding the lock on its parent Client. It then unlocks its Client,
+// unblocking any killers, which are required to have locked the Client before calling markKilled.
+//
+// When _waitMutex and _waitCV are set, killers must lock _waitMutex before setting the _killCode,
+// and must signal _waitCV before releasing _waitMutex. Unfortunately, they must lock _waitMutex
+// without holding a lock on Client to avoid a deadlock with callers of
+// waitForConditionOrInterruptNoAssertUntil(). So, in the event that _waitMutex is set, the killer
+// increments _numKillers, drops the Client lock, acquires _waitMutex and then re-acquires the
+// Client lock. We know that the Client, its OperationContext and _waitMutex will remain valid
+// during this period because the caller of waitForConditionOrInterruptNoAssertUntil will not return
+// while _numKillers > 0 and will not return until it has itself reacquired _waitMutex. Instead,
+// that caller will keep waiting on _waitCV until _numKillers drops to 0.
+//
+// In essence, when _waitMutex is set, _killCode is guarded by _waitMutex and _waitCV, but when
+// _waitMutex is not set, it is guarded by the Client spinlock. Changing _waitMutex is itself
+// guarded by the Client spinlock and _numKillers.
+//
+// When _numKillers does drop to 0, the waiter will null out _waitMutex and _waitCV.
+//
+// This implementation adds a minimum of two spinlock acquire-release pairs to every condition
+// variable wait.
+StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept {
+ invariant(getClient());
+ {
+ stdx::lock_guard<Client> clientLock(*getClient());
+ invariant(!_waitMutex);
+ invariant(!_waitCV);
+ invariant(0 == _numKillers);
+
+ // This interrupt check must be done while holding the client lock, so as not to race with a
+ // concurrent caller of markKilled.
+ auto status = checkForInterruptNoAssert();
+ if (!status.isOK()) {
+ return status;
+ }
+ _waitMutex = m.mutex();
+ _waitCV = &cv;
+ }
+
+ if (hasDeadline()) {
+ deadline = std::min(deadline, getDeadline());
+ }
+
+ const auto waitStatus = [&] {
+ if (Date_t::max() == deadline) {
+ cv.wait(m);
+ return stdx::cv_status::no_timeout;
+ }
+ const auto clockSource = getServiceContext()->getPreciseClockSource();
+ if (clockSource->tracksSystemClock()) {
+ return cv.wait_until(m, deadline.toSystemTimePoint());
+ }
+
+ // The following cases only occur during testing, when the precise clock source is
+ // virtualized and does not track the system clock.
+ return cvWaitUntilWithClockSource(clockSource, cv, m, deadline);
+ }();
+
+ // Continue waiting on cv until no other thread is attempting to kill this one.
+ cv.wait(m, [this] {
+ stdx::lock_guard<Client> clientLock(*getClient());
+ if (0 == _numKillers) {
+ _waitMutex = nullptr;
+ _waitCV = nullptr;
+ return true;
+ }
+ return false;
+ });
+
+ auto status = checkForInterruptNoAssert();
+ if (!status.isOK()) {
+ return status;
+ }
+ if (hasDeadline() && waitStatus == stdx::cv_status::timeout && deadline == getDeadline()) {
+ // It's possible that the system clock used in stdx::condition_variable::wait_until
+ // is slightly ahead of the FastClock used in checkForInterrupt. In this case,
+ // we treat the operation as though it has exceeded its time limit, just as if the
+ // FastClock and system clock had agreed.
+ markKilled(ErrorCodes::ExceededTimeLimit);
+ return Status(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit");
+ }
+ return waitStatus;
+}
+
+void OperationContext::markKilled(ErrorCodes::Error killCode) {
+ invariant(killCode != ErrorCodes::OK);
+ stdx::unique_lock<stdx::mutex> lkWaitMutex;
+ if (_waitMutex) {
+ invariant(++_numKillers > 0);
+ getClient()->unlock();
+ ON_BLOCK_EXIT([this]() noexcept {
+ getClient()->lock();
+ invariant(--_numKillers >= 0);
+ });
+ lkWaitMutex = stdx::unique_lock<stdx::mutex>{*_waitMutex};
+ }
+ _killCode.compareAndSwap(ErrorCodes::OK, killCode);
+ if (lkWaitMutex && _numKillers == 0) {
+ invariant(_waitCV);
+ _waitCV->notify_all();
+ }
+}
+
RecoveryUnit* OperationContext::releaseRecoveryUnit() {
return _recoveryUnit.release();
}
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 0035bfa70d4..aadb6c9f981 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -38,6 +38,8 @@
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/util/decorable.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
@@ -126,10 +128,8 @@ public:
*/
std::unique_ptr<Locker> releaseLockState();
- // --- operation level info? ---
-
/**
- * Raises a UserAssertion if this operation is in a killed state.
+ * Raises a UserException if this operation is in a killed state.
*/
void checkForInterrupt();
@@ -139,6 +139,76 @@ public:
Status checkForInterruptNoAssert();
/**
+ * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the
+ * deadline on this operation to expire. In the event of interruption or operation deadline
+ * expiration, raises a UserException with an error code indicating the interruption type.
+ */
+ void waitForConditionOrInterrupt(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m);
+
+ /**
+ * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
+ * is interrupted or its deadline expires. Throws a DBException for interruption and
+ * deadline expiration.
+ */
+ template <typename Pred>
+ void waitForConditionOrInterrupt(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Pred pred) {
+ while (!pred()) {
+ waitForConditionOrInterrupt(cv, m);
+ }
+ }
+
+ /**
+ * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing
+ * a DBException to report interruption.
+ */
+ Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m) noexcept;
+
+ /**
+ * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or
+ * for the operation to be interrupted, or for the operation's own deadline to expire.
+ *
+ * If the operation deadline expires or the operation is interrupted, throws a DBException. If
+ * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
+ * cv_status::no_timeout.
+ */
+ stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline);
+
+ /**
+ * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline"
+ * expires, or this operation is interrupted, or this operation's own deadline expires.
+ *
+ *
+ * If the operation deadline expires or the operation is interrupted, throws a DBException. If
+ * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
+ * cv_status::no_timeout indicating that "pred" finally returned true.
+ */
+ template <typename Pred>
+ stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline,
+ Pred pred) {
+ while (!pred()) {
+ if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
+ return stdx::cv_status::timeout;
+ }
+ }
+ return stdx::cv_status::no_timeout;
+ }
+
+ /**
+ * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
+ * non-ok status indicates the error instead of a DBException.
+ */
+ StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept;
+
+ /**
* Delegates to CurOp, but is included here to break dependencies.
* Caller does not own the pointer.
*
@@ -200,10 +270,11 @@ public:
* checkForInterruptNoAssert by the thread executing the operation will start returning the
* specified error code.
*
- * If multiple threads kill the same operation with different codes, only the first code will
- * be preserved.
+ * If multiple threads kill the same operation with different codes, only the first code
+ * will be preserved.
*
- * May be called by any thread that has locked the Client owning this operation context.
+ * May be called by any thread that has locked the Client owning this operation context, or
+ * by the thread executing this on behalf of this OperationContext.
*/
void markKilled(ErrorCodes::Error killCode = ErrorCodes::Interrupted);
@@ -272,13 +343,12 @@ public:
return _deadline;
}
- //
- // Legacy "max time" methods for controlling operation deadlines.
- //
-
/**
* Returns the number of microseconds remaining for this operation's time limit, or the
* special value Microseconds::max() if the operation has no time limit.
+ *
+ * This is a legacy "max time" method for controlling operation deadlines. Prefer not to use it
+ * in new code.
*/
Microseconds getRemainingMaxTimeMicros() const;
@@ -312,6 +382,20 @@ private:
// once from OK to some kill code.
AtomicWord<ErrorCodes::Error> _killCode{ErrorCodes::OK};
+
+ // If non-null, _waitMutex and _waitCV are the (mutex, condition variable) pair that the
+ // operation is currently waiting on inside a call to waitForConditionOrInterrupt...().
+ // All access guarded by the Client's lock.
+ stdx::mutex* _waitMutex = nullptr;
+ stdx::condition_variable* _waitCV = nullptr;
+
+ // If _waitMutex and _waitCV are non-null, this is the number of threads in a call to markKilled
+ // actively attempting to kill the operation. If this value is non-zero, the operation is inside
+ // waitForConditionOrInterrupt...() and must stay there until _numKillers reaches 0.
+ //
+ // All access guarded by the Client's lock.
+ int _numKillers = 0;
+
WriteConcernOptions _writeConcern;
Date_t _deadline =
diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp
index cae366b845f..a592c98b03f 100644
--- a/src/mongo/db/operation_context_test.cpp
+++ b/src/mongo/db/operation_context_test.cpp
@@ -32,7 +32,10 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_noop.h"
+#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/tick_source_mock.h"
@@ -42,18 +45,41 @@ namespace mongo {
namespace {
+std::ostream& operator<<(std::ostream& os, stdx::cv_status cvStatus) {
+ switch (cvStatus) {
+ case stdx::cv_status::timeout:
+ return os << "timeout";
+ case stdx::cv_status::no_timeout:
+ return os << "no_timeout";
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+std::ostream& operator<<(std::ostream& os, stdx::future_status futureStatus) {
+ switch (futureStatus) {
+ case stdx::future_status::ready:
+ return os << "ready";
+ case stdx::future_status::deferred:
+ return os << "deferred";
+ case stdx::future_status::timeout:
+ return os << "timeout";
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
class OperationDeadlineTests : public unittest::Test {
public:
void setUp() {
- auto uniqueMockClock = stdx::make_unique<ClockSourceMock>();
- mockClock = uniqueMockClock.get();
service = stdx::make_unique<ServiceContextNoop>();
- service->setFastClockSource(std::move(uniqueMockClock));
+ service->setFastClockSource(stdx::make_unique<SharedClockSourceAdapter>(mockClock));
+ service->setPreciseClockSource(stdx::make_unique<SharedClockSourceAdapter>(mockClock));
service->setTickSource(stdx::make_unique<TickSourceMock>());
client = service->makeClient("OperationDeadlineTest");
}
- ClockSourceMock* mockClock;
+ const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>();
std::unique_ptr<ServiceContext> service;
ServiceContext::UniqueClient client;
};
@@ -124,6 +150,242 @@ TEST_F(OperationDeadlineTests, VeryLargeRelativeDeadlinesNanoseconds) {
txn->getDeadline());
}
+TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCV) {
+ auto txn = client->makeOperationContext();
+ txn->setDeadlineByDate(mockClock->now());
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ ASSERT_EQ(ErrorCodes::ExceededTimeLimit, txn->waitForConditionOrInterruptNoAssert(cv, lk));
+}
+
+TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) {
+ auto txn = client->makeOperationContext();
+ txn->setDeadlineByDate(mockClock->now());
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ ASSERT_EQ(
+ ErrorCodes::ExceededTimeLimit,
+ txn->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now() + Seconds{10}));
+}
+
+TEST_F(OperationDeadlineTests, WaitForKilledOpCV) {
+ auto txn = client->makeOperationContext();
+ txn->markKilled();
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ ASSERT_EQ(ErrorCodes::Interrupted, txn->waitForConditionOrInterruptNoAssert(cv, lk));
+}
+
+TEST_F(OperationDeadlineTests, WaitForUntilExpiredCV) {
+ auto txn = client->makeOperationContext();
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ ASSERT_EQ(stdx::cv_status::timeout,
+ unittest::assertGet(
+ txn->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())));
+}
+
+TEST_F(OperationDeadlineTests, WaitForUntilExpiredCVWithMaxTimeSet) {
+ auto txn = client->makeOperationContext();
+ txn->setDeadlineByDate(mockClock->now() + Seconds{10});
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ ASSERT_EQ(stdx::cv_status::timeout,
+ unittest::assertGet(
+ txn->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now())));
+}
+
+TEST_F(OperationDeadlineTests, DuringWaitMaxTimeExpirationDominatesUntilExpiration) {
+ auto txn = client->makeOperationContext();
+ txn->setDeadlineByDate(mockClock->now());
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ ASSERT_EQ(ErrorCodes::ExceededTimeLimit,
+ txn->waitForConditionOrInterruptNoAssertUntil(cv, lk, mockClock->now()));
+}
+
+class ThreadedOperationDeadlineTests : public OperationDeadlineTests {
+public:
+ struct WaitTestState {
+ void signal() {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ invariant(!isSignaled);
+ isSignaled = true;
+ cv.notify_all();
+ }
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv;
+ bool isSignaled = false;
+ };
+
+ stdx::future<stdx::cv_status> startWaiterWithUntilAndMaxTime(OperationContext* txn,
+ WaitTestState* state,
+ Date_t until,
+ Date_t maxTime) {
+
+ auto barrier = std::make_shared<unittest::Barrier>(2);
+ auto task = stdx::packaged_task<stdx::cv_status()>([=] {
+ if (maxTime < Date_t::max()) {
+ txn->setDeadlineByDate(maxTime);
+ }
+ auto predicate = [state] { return state->isSignaled; };
+ stdx::unique_lock<stdx::mutex> lk(state->mutex);
+ barrier->countDownAndWait();
+ if (until < Date_t::max()) {
+ return txn->waitForConditionOrInterruptUntil(state->cv, lk, until, predicate);
+ } else {
+ txn->waitForConditionOrInterrupt(state->cv, lk, predicate);
+ return stdx::cv_status::no_timeout;
+ }
+ });
+ auto result = task.get_future();
+ stdx::thread(std::move(task)).detach();
+ barrier->countDownAndWait();
+
+ // Now we know that the waiter task must own the mutex, because it does not signal the
+ // barrier until it does.
+ stdx::lock_guard<stdx::mutex> lk(state->mutex);
+
+ // Assuming that txn has not already been interrupted and that maxTime and until are
+ // unexpired, we know that the waiter must be blocked in the condition variable, because it
+ // held the mutex before we tried to acquire it, and only releases it on condition variable
+ // wait.
+ return result;
+ }
+
+ stdx::future<stdx::cv_status> startWaiter(OperationContext* txn, WaitTestState* state) {
+ return startWaiterWithUntilAndMaxTime(txn, state, Date_t::max(), Date_t::max());
+ }
+};
+
+TEST_F(ThreadedOperationDeadlineTests, KillArrivesWhileWaiting) {
+ auto txn = client->makeOperationContext();
+ WaitTestState state;
+ auto waiterResult = startWaiter(txn.get(), &state);
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ {
+ stdx::lock_guard<Client> clientLock(*txn->getClient());
+ txn->markKilled();
+ }
+ ASSERT_THROWS_CODE(waiterResult.get(), DBException, ErrorCodes::Interrupted);
+}
+
+TEST_F(ThreadedOperationDeadlineTests, MaxTimeExpiresWhileWaiting) {
+ auto txn = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithUntilAndMaxTime(txn.get(),
+ &state,
+ startDate + Seconds{60}, // until
+ startDate + Seconds{10}); // maxTime
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()))
+ << waiterResult.get();
+ mockClock->advance(Seconds{9});
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ mockClock->advance(Seconds{2});
+ ASSERT_THROWS_CODE(waiterResult.get(), DBException, ErrorCodes::ExceededTimeLimit);
+}
+
+TEST_F(ThreadedOperationDeadlineTests, UntilExpiresWhileWaiting) {
+ auto txn = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithUntilAndMaxTime(txn.get(),
+ &state,
+ startDate + Seconds{10}, // until
+ startDate + Seconds{60}); // maxTime
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()))
+ << waiterResult.get();
+ mockClock->advance(Seconds{9});
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ mockClock->advance(Seconds{2});
+ ASSERT_EQ(stdx::cv_status::timeout, waiterResult.get());
+}
+
+TEST_F(ThreadedOperationDeadlineTests, SignalOne) {
+ auto txn = client->makeOperationContext();
+ WaitTestState state;
+ auto waiterResult = startWaiter(txn.get(), &state);
+
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()))
+ << waiterResult.get();
+ state.signal();
+ ASSERT_EQ(stdx::cv_status::no_timeout, waiterResult.get());
+}
+
+TEST_F(ThreadedOperationDeadlineTests, KillOneSignalAnother) {
+ auto client1 = service->makeClient("client1");
+ auto client2 = service->makeClient("client2");
+ auto txn1 = client1->makeOperationContext();
+ auto txn2 = client2->makeOperationContext();
+ WaitTestState state1;
+ WaitTestState state2;
+ auto waiterResult1 = startWaiter(txn1.get(), &state1);
+ auto waiterResult2 = startWaiter(txn2.get(), &state2);
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult1.wait_for(Milliseconds::zero().toSystemDuration()));
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult2.wait_for(Milliseconds::zero().toSystemDuration()));
+ {
+ stdx::lock_guard<Client> clientLock(*txn1->getClient());
+ txn1->markKilled();
+ }
+ ASSERT_THROWS_CODE(waiterResult1.get(), DBException, ErrorCodes::Interrupted);
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult2.wait_for(Milliseconds::zero().toSystemDuration()));
+ state2.signal();
+ ASSERT_EQ(stdx::cv_status::no_timeout, waiterResult2.get());
+}
+
+TEST_F(ThreadedOperationDeadlineTests, SignalBeforeUntilExpires) {
+ auto txn = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithUntilAndMaxTime(txn.get(),
+ &state,
+ startDate + Seconds{10}, // until
+ startDate + Seconds{60}); // maxTime
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()))
+ << waiterResult.get();
+ mockClock->advance(Seconds{9});
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ state.signal();
+ ASSERT_EQ(stdx::cv_status::no_timeout, waiterResult.get());
+}
+
+TEST_F(ThreadedOperationDeadlineTests, SignalBeforeMaxTimeExpires) {
+ auto txn = client->makeOperationContext();
+ WaitTestState state;
+ const auto startDate = mockClock->now();
+ auto waiterResult = startWaiterWithUntilAndMaxTime(txn.get(),
+ &state,
+ startDate + Seconds{60}, // until
+ startDate + Seconds{10}); // maxTime
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()))
+ << waiterResult.get();
+ mockClock->advance(Seconds{9});
+ ASSERT_NE(stdx::future_status::ready,
+ waiterResult.wait_for(Milliseconds::zero().toSystemDuration()));
+ state.signal();
+ ASSERT_EQ(stdx::cv_status::no_timeout, waiterResult.get());
+}
+
} // namespace
} // namespace mongo