summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/catalog/SConscript1
-rw-r--r--src/mongo/db/operation_context.cpp70
-rw-r--r--src/mongo/db/operation_context.h166
-rw-r--r--src/mongo/db/operation_context_test.cpp229
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp2
-rw-r--r--src/mongo/rpc/SConscript1
-rw-r--r--src/mongo/s/async_requests_sender.cpp61
-rw-r--r--src/mongo/s/async_requests_sender.h6
-rw-r--r--src/mongo/stdx/SConscript15
-rw-r--r--src/mongo/stdx/condition_variable.h138
-rw-r--r--src/mongo/stdx/condition_variable_bm.cpp87
-rw-r--r--src/mongo/transport/baton.h16
-rw-r--r--src/mongo/transport/baton_asio_linux.h111
-rw-r--r--src/mongo/transport/transport_layer.h5
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp42
-rw-r--r--src/mongo/transport/transport_layer_asio_integration_test.cpp7
-rw-r--r--src/mongo/util/clock_source.cpp13
-rw-r--r--src/mongo/util/clock_source.h31
-rw-r--r--src/mongo/util/future.h96
-rw-r--r--src/mongo/util/future_test_future_int.cpp30
-rw-r--r--src/mongo/util/interruptible.h452
-rw-r--r--src/mongo/util/producer_consumer_queue.h173
-rw-r--r--src/mongo/util/producer_consumer_queue_test.cpp27
-rw-r--r--src/mongo/util/waitable.h141
25 files changed, 1416 insertions, 505 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index e262fe78f4a..dca2d87d1be 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -36,6 +36,7 @@ env.SConscript(
's',
'scripting',
'shell',
+ 'stdx',
'tools',
'transport',
'unittest',
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 05ce3036ad4..8ab0dd4f364 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -202,6 +202,7 @@ env.Library(
'database_holder',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/storage/storage_options',
],
)
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 94a5ea312d2..873972a8c30 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -88,7 +88,9 @@ void OperationContext::setDeadlineAndMaxTime(Date_t when,
ErrorCodes::Error timeoutError) {
invariant(!getClient()->isInDirectClient());
invariant(ErrorCodes::isExceededTimeLimitError(timeoutError));
- uassert(40120, "Illegal attempt to change operation deadline", !hasDeadline());
+ uassert(40120,
+ "Illegal attempt to change operation deadline",
+ _hasArtificialDeadline || !hasDeadline());
_deadline = when;
_maxTime = maxTime;
_timeoutError = timeoutError;
@@ -165,10 +167,6 @@ Microseconds OperationContext::getRemainingMaxTimeMicros() const {
return _maxTime - getElapsedTime();
}
-void OperationContext::checkForInterrupt() {
- uassertStatusOK(checkForInterruptNoAssert());
-}
-
namespace {
// Helper function for checkForInterrupt fail point. Decides whether the operation currently
@@ -190,7 +188,7 @@ bool opShouldFail(Client* client, const BSONObj& failPointInfo) {
} // namespace
-Status OperationContext::checkForInterruptNoAssert() {
+Status OperationContext::checkForInterruptNoAssert() noexcept {
// TODO: Remove the MONGO_likely(getClient()) once all operation contexts are constructed with
// clients.
if (MONGO_likely(getClient() && getServiceContext()) &&
@@ -199,10 +197,16 @@ Status OperationContext::checkForInterruptNoAssert() {
}
if (hasDeadlineExpired()) {
- markKilled(_timeoutError);
+ if (!_hasArtificialDeadline) {
+ markKilled(_timeoutError);
+ }
return Status(_timeoutError, "operation exceeded time limit");
}
+ if (_ignoreInterrupts) {
+ return Status::OK();
+ }
+
MONGO_FAIL_POINT_BLOCK(checkForInterruptFail, scopedFailPoint) {
if (opShouldFail(getClient(), scopedFailPoint.getData())) {
log() << "set pending kill on op " << getOpID() << ", for checkForInterruptFail";
@@ -218,41 +222,6 @@ Status OperationContext::checkForInterruptNoAssert() {
return Status::OK();
}
-void OperationContext::sleepUntil(Date_t deadline) {
- stdx::mutex m;
- stdx::condition_variable cv;
- stdx::unique_lock<stdx::mutex> lk(m);
- invariant(!waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }));
-}
-
-void OperationContext::sleepFor(Milliseconds duration) {
- stdx::mutex m;
- stdx::condition_variable cv;
- stdx::unique_lock<stdx::mutex> lk(m);
- invariant(!waitForConditionOrInterruptFor(cv, lk, duration, [] { return false; }));
-}
-
-void OperationContext::waitForConditionOrInterrupt(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m) {
- uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
-}
-
-Status OperationContext::waitForConditionOrInterruptNoAssert(
- stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m) noexcept {
- auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max());
- if (!status.isOK()) {
- return status.getStatus();
- }
- invariant(status.getValue() == stdx::cv_status::no_timeout);
- return status.getStatus();
-}
-
-stdx::cv_status OperationContext::waitForConditionOrInterruptUntil(
- stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) {
-
- return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
-}
-
// Theory of operation for waitForConditionOrInterruptNoAssertUntil and markKilled:
//
// An operation indicates to potential killers that it is waiting on a condition variable by setting
@@ -312,14 +281,15 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
const auto waitStatus = [&] {
if (Date_t::max() == deadline) {
- cv.wait(m);
+ Waitable::wait(_baton.get(), getServiceContext()->getPreciseClockSource(), cv, m);
return stdx::cv_status::no_timeout;
}
- return getServiceContext()->getPreciseClockSource()->waitForConditionUntil(cv, m, deadline);
+ return getServiceContext()->getPreciseClockSource()->waitForConditionUntil(
+ cv, m, deadline, _baton.get());
}();
// Continue waiting on cv until no other thread is attempting to kill this one.
- cv.wait(m, [this] {
+ Waitable::wait(_baton.get(), getServiceContext()->getPreciseClockSource(), cv, m, [this] {
stdx::lock_guard<Client> clientLock(*getClient());
if (0 == _numKillers) {
_waitMutex = nullptr;
@@ -338,9 +308,12 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
// is slightly ahead of the FastClock used in checkForInterrupt. In this case,
// we treat the operation as though it has exceeded its time limit, just as if the
// FastClock and system clock had agreed.
- markKilled(_timeoutError);
+ if (!_hasArtificialDeadline) {
+ markKilled(_timeoutError);
+ }
return Status(_timeoutError, "operation exceeded time limit");
}
+
return waitStatus;
}
@@ -361,11 +334,6 @@ void OperationContext::markKilled(ErrorCodes::Error killCode) {
invariant(_waitCV);
_waitCV->notify_all();
}
-
- // If we have a baton, we need to wake it up. The baton itself will check for interruption
- if (_baton) {
- _baton->schedule([] {});
- }
}
void OperationContext::setLogicalSessionId(LogicalSessionId lsid) {
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 712a0ab860c..9931a6e1931 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -44,6 +44,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/decorable.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
@@ -70,7 +71,7 @@ class UnreplicatedWritesBlock;
* (RecoveryUnitState) to reduce complexity and duplication in the storage-engine specific
* RecoveryUnit and to allow better invariant checking.
*/
-class OperationContext : public Decorable<OperationContext> {
+class OperationContext : public Interruptible, public Decorable<OperationContext> {
MONGO_DISALLOW_COPYING(OperationContext);
public:
@@ -127,107 +128,9 @@ public:
std::unique_ptr<Locker> swapLockState(std::unique_ptr<Locker> locker);
/**
- * Raises a AssertionException if this operation is in a killed state.
- */
- void checkForInterrupt();
-
- /**
* Returns Status::OK() unless this operation is in a killed state.
*/
- Status checkForInterruptNoAssert();
-
- /**
- * Sleeps until "deadline"; throws an exception if the operation is interrupted before then.
- */
- void sleepUntil(Date_t deadline);
-
- /**
- * Sleeps for "duration" ms; throws an exception if the operation is interrupted before then.
- */
- void sleepFor(Milliseconds duration);
-
- /**
- * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the
- * deadline on this operation to expire. In the event of interruption or operation deadline
- * expiration, raises a AssertionException with an error code indicating the interruption type.
- */
- void waitForConditionOrInterrupt(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m);
-
- /**
- * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
- * is interrupted or its deadline expires. Throws a DBException for interruption and
- * deadline expiration.
- */
- template <typename Pred>
- void waitForConditionOrInterrupt(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m,
- Pred pred) {
- while (!pred()) {
- waitForConditionOrInterrupt(cv, m);
- }
- }
-
- /**
- * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing
- * a DBException to report interruption.
- */
- Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m) noexcept;
-
- /**
- * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or
- * for the operation to be interrupted, or for the operation's own deadline to expire.
- *
- * If the operation deadline expires or the operation is interrupted, throws a DBException. If
- * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
- * cv_status::no_timeout.
- */
- stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m,
- Date_t deadline);
-
- /**
- * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline"
- * expires, or this operation is interrupted, or this operation's own deadline expires.
- *
- *
- * If the operation deadline expires or the operation is interrupted, throws a DBException. If
- * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
- * cv_status::no_timeout indicating that "pred" finally returned true.
- */
- template <typename Pred>
- bool waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m,
- Date_t deadline,
- Pred pred) {
- while (!pred()) {
- if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
- return pred();
- }
- }
- return true;
- }
-
- /**
- * Same as the predicate form of waitForConditionOrInterruptUntil, but takes a relative
- * amount of time to wait instead of an absolute time point.
- */
- template <typename Pred>
- bool waitForConditionOrInterruptFor(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m,
- Milliseconds duration,
- Pred pred) {
- return waitForConditionOrInterruptUntil(
- cv, m, getExpirationDateForWaitForValue(duration), pred);
- }
-
- /**
- * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
- * non-ok status indicates the error instead of a DBException.
- */
- StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
- stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept;
+ Status checkForInterruptNoAssert() noexcept override;
/**
* Returns the service context under which this operation context runs, or nullptr if there is
@@ -353,6 +256,9 @@ public:
* without lock by the thread executing on behalf of this operation context.
*/
ErrorCodes::Error getKillStatus() const {
+ if (_ignoreInterrupts) {
+ return ErrorCodes::OK;
+ }
return _killCode.loadRelaxed();
}
@@ -397,16 +303,9 @@ public:
}
/**
- * Returns true if this operation has a deadline.
- */
- bool hasDeadline() const {
- return getDeadline() < Date_t::max();
- }
-
- /**
* Returns the deadline for this operation, or Date_t::max() if there is no deadline.
*/
- Date_t getDeadline() const {
+ Date_t getDeadline() const override {
return _deadline;
}
@@ -425,7 +324,54 @@ public:
*/
Microseconds getRemainingMaxTimeMicros() const;
+ StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) noexcept override;
+
private:
+ IgnoreInterruptsState pushIgnoreInterrupts() override {
+ IgnoreInterruptsState iis{_ignoreInterrupts,
+ {_deadline, _timeoutError, _hasArtificialDeadline}};
+ _hasArtificialDeadline = true;
+ setDeadlineByDate(Date_t::max(), ErrorCodes::ExceededTimeLimit);
+ _ignoreInterrupts = true;
+
+ return iis;
+ }
+
+ void popIgnoreInterrupts(IgnoreInterruptsState iis) override {
+ _ignoreInterrupts = iis.ignoreInterrupts;
+
+ setDeadlineByDate(iis.deadline.deadline, iis.deadline.error);
+ _hasArtificialDeadline = iis.deadline.hasArtificialDeadline;
+
+ _markKilledIfDeadlineRequires();
+ }
+
+ DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) override {
+ DeadlineState ds{_deadline, _timeoutError, _hasArtificialDeadline};
+
+ _hasArtificialDeadline = true;
+ setDeadlineByDate(std::min(_deadline, deadline), error);
+
+ return ds;
+ }
+
+ void popArtificialDeadline(DeadlineState ds) override {
+ setDeadlineByDate(ds.deadline, ds.error);
+ _hasArtificialDeadline = ds.hasArtificialDeadline;
+
+ _markKilledIfDeadlineRequires();
+ }
+
+ void _markKilledIfDeadlineRequires() {
+ if (!_ignoreInterrupts && !_hasArtificialDeadline && hasDeadlineExpired() &&
+ !isKillPending()) {
+ markKilled(_timeoutError);
+ }
+ }
+
/**
* Returns true if this operation has a deadline and it has passed according to the fast clock
* on ServiceContext.
@@ -447,7 +393,7 @@ private:
* Returns the timepoint that is "waitFor" ms after now according to the
* ServiceContext's precise clock.
*/
- Date_t getExpirationDateForWaitForValue(Milliseconds waitFor);
+ Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override;
/**
* Set whether or not operations should generate oplog entries.
@@ -502,6 +448,8 @@ private:
Date_t::max(); // The timepoint at which this operation exceeds its time limit.
ErrorCodes::Error _timeoutError = ErrorCodes::ExceededTimeLimit;
+ bool _ignoreInterrupts = false;
+ bool _hasArtificialDeadline = false;
// Max operation time requested by the user or by the cursor in the case of a getMore with no
// user-specified maxTime. This is tracked with microsecond granularity for the purpose of
diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp
index b779102b702..02891cfa151 100644
--- a/src/mongo/db/operation_context_test.cpp
+++ b/src/mongo/db/operation_context_test.cpp
@@ -200,6 +200,25 @@ TEST(OperationContextTest, OpCtxGroup) {
}
}
+TEST(OperationContextTest, IgnoreInterruptsWorks) {
+ auto serviceCtx = ServiceContext::make();
+ auto client = serviceCtx->makeClient("OperationContextTest");
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->markKilled(ErrorCodes::BadValue);
+ ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::BadValue);
+ ASSERT_EQUALS(opCtx->getKillStatus(), ErrorCodes::BadValue);
+
+ opCtx->runWithoutInterruption([&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ });
+
+ ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::BadValue);
+
+ ASSERT_EQUALS(opCtx->getKillStatus(), ErrorCodes::BadValue);
+}
+
class OperationDeadlineTests : public unittest::Test {
public:
void setUp() {
@@ -210,6 +229,13 @@ public:
client = service->makeClient("OperationDeadlineTest");
}
+ void checkForInterruptForTimeout(OperationContext* opCtx) {
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ opCtx->waitForConditionOrInterrupt(cv, lk);
+ }
+
const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>();
ServiceContext::UniqueServiceContext service;
ServiceContext::UniqueClient client;
@@ -302,6 +328,209 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) {
.getStatus());
}
+TEST_F(OperationDeadlineTests, NestedTimeoutsTimeoutInOrder) {
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(500), ErrorCodes::MaxTimeMSExpired);
+
+ bool reachedA = false;
+ bool reachedB = false;
+ bool reachedC = false;
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(50), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ try {
+ opCtx->runWithDeadline(mockClock->now() + Milliseconds(10),
+ ErrorCodes::ExceededTimeLimit,
+ [&] {
+ ASSERT_OK(
+ opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(20));
+ checkForInterruptForTimeout(opCtx.get());
+ ASSERT(false);
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ opCtx->checkForInterrupt();
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(50));
+ reachedA = true;
+ }
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ opCtx->checkForInterrupt();
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(50));
+ reachedB = true;
+ }
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ reachedC = true;
+ ASSERT_OK(opCtx->getKillStatus());
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ }
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+ ASSERT(reachedC);
+
+ ASSERT_OK(opCtx->getKillStatus());
+
+ mockClock->advance(Seconds(1));
+
+ ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::MaxTimeMSExpired);
+}
+
+TEST_F(OperationDeadlineTests, NestedTimeoutsThatViolateMaxTime) {
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
+
+ bool reachedA = false;
+ bool reachedB = false;
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(50));
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ reachedA = true;
+ }
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::MaxTimeMSExpired>&) {
+ reachedB = true;
+ }
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+}
+
+TEST_F(OperationDeadlineTests, NestedNonMaxTimeMSTimeoutsThatAreLargerAreIgnored) {
+ auto opCtx = client->makeOperationContext();
+
+ bool reachedA = false;
+ bool reachedB = false;
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(10), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(50));
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ reachedA = true;
+ }
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ reachedB = true;
+ }
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+}
+
+TEST_F(OperationDeadlineTests, DeadlineAfterIgnoreInterruptsReopens) {
+ auto opCtx = client->makeOperationContext();
+
+ bool reachedA = false;
+ bool reachedB = false;
+ bool reachedC = false;
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(500), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+
+ opCtx->runWithoutInterruption([&] {
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Seconds(1), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(750));
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ mockClock->advance(Milliseconds(500));
+ reachedA = true;
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ opCtx->checkForInterrupt();
+ reachedB = true;
+ }
+ });
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>& ex) {
+ reachedC = true;
+ }
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+ ASSERT(reachedC);
+}
+
+TEST_F(OperationDeadlineTests, DeadlineAfterRunWithoutInterruptSeesViolatedMaxMS) {
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(100), ErrorCodes::MaxTimeMSExpired);
+
+ ASSERT_THROWS_CODE(opCtx->runWithoutInterruption([&] {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(200), ErrorCodes::ExceededTimeLimit, [&] {
+ mockClock->advance(Milliseconds(300));
+ opCtx->checkForInterrupt();
+ });
+ }),
+ DBException,
+ ErrorCodes::MaxTimeMSExpired);
+}
+
+TEST_F(OperationDeadlineTests, DeadlineAfterRunWithoutInterruptDoesntSeeUnviolatedMaxMS) {
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(200), ErrorCodes::MaxTimeMSExpired);
+
+ ASSERT_THROWS_CODE(opCtx->runWithoutInterruption([&] {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ mockClock->advance(Milliseconds(150));
+ opCtx->checkForInterrupt();
+ });
+ }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+}
+
TEST_F(OperationDeadlineTests, WaitForKilledOpCV) {
auto opCtx = client->makeOperationContext();
opCtx->markKilled();
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index 5e79798c6aa..b74e424fe1f 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -96,7 +96,7 @@ void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) {
return;
}
- _timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) {
+ _timer->waitUntil(_reactor->now() + timeoutVal).getAsync([cb = std::move(cb)](Status status) {
// If we get canceled, then we don't worry about the timeout anymore
if (status == ErrorCodes::CallbackCanceled) {
return;
diff --git a/src/mongo/rpc/SConscript b/src/mongo/rpc/SConscript
index f7883dbc9e2..f5892449616 100644
--- a/src/mongo/rpc/SConscript
+++ b/src/mongo/rpc/SConscript
@@ -174,6 +174,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/mongo/transport/transport_layer_common',
"$BUILD_DIR/mongo/util/concurrency/spin_lock",
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index aef8ac9d3b7..4ef49669505 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -99,7 +99,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
// Otherwise, wait for some response to be received.
if (_interruptStatus.isOK()) {
try {
- _makeProgress(_opCtx);
+ _makeProgress();
} catch (const AssertionException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
@@ -108,7 +108,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
continue;
}
} else {
- _makeProgress(nullptr);
+ _opCtx->runWithoutInterruption([&] { _makeProgress(); });
}
}
return *readyResponse;
@@ -139,11 +139,6 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
_scheduleRequests();
}
- // If we have baton requests, we want to process those before proceeding
- if (_batonRequests) {
- return boost::none;
- }
-
// Check if any remote is ready.
invariant(!_remotes.empty());
for (auto& remote : _remotes) {
@@ -215,11 +210,6 @@ void AsyncRequestsSender::_scheduleRequests() {
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
- if (_baton) {
- _batonRequests++;
- _baton->schedule([this] { _batonRequests--; });
- }
-
// Push a noop response to the queue to indicate that a remote is ready for
// re-processing due to failure.
_responseQueue.push(boost::none);
@@ -245,11 +235,6 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
[remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- if (_baton) {
- _batonRequests++;
- _baton->schedule([this] { _batonRequests--; });
- }
-
_responseQueue.push(Job{cbData, remoteIndex});
},
_baton);
@@ -262,22 +247,8 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
}
// Passing opCtx means you'd like to opt into opCtx interruption. During cleanup we actually don't.
-void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) {
- invariant(!opCtx || opCtx == _opCtx);
-
- boost::optional<Job> job;
-
- if (_baton) {
- // If we're using a baton, we peek the queue, and block on the baton if it's empty
- if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) {
- job = std::move(*tryJob);
- } else {
- _baton->run(opCtx, boost::none);
- }
- } else {
- // Otherwise we block on the queue
- job = opCtx ? _responseQueue.pop(opCtx) : _responseQueue.pop();
- }
+void AsyncRequestsSender::_makeProgress() {
+ auto job = _responseQueue.pop(_opCtx);
if (!job) {
return;
@@ -350,22 +321,22 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
// progress on previous requests.
auto pf = makePromiseFuture<HostAndPort>();
- ars->_batonRequests++;
stdx::thread bgChecker([&] {
pf.promise.setWith(
[&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); });
-
- ars->_baton->schedule([ars] { ars->_batonRequests--; });
});
- const auto guard = MakeGuard([&] { bgChecker.join(); });
-
- while (!pf.future.isReady()) {
- if (!ars->_baton->run(nullptr, deadline)) {
- break;
- }
- }
-
- return pf.future.getNoThrow();
+ const auto threadGuard = MakeGuard([&] { bgChecker.join(); });
+
+ // We ignore interrupts here because we want to spin the baton for the full duration of the
+ // findHostWithMaxWait. We set the time limit (although the bg checker should fulfill our
+ // promise) to sync up with the findHostWithMaxWait.
+ //
+ // TODO clean this up after SERVER-35689 when we can do async targeting.
+ return ars->_opCtx->runWithoutInterruption([&] {
+ return ars->_opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
+ return pf.future.getNoThrow(ars->_opCtx);
+ });
+ });
}();
if (!findHostStatus.isOK()) {
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index dfb053c977e..fe7d5e20cb7 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -39,6 +39,7 @@
#include "mongo/executor/task_executor.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/shard_id.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/producer_consumer_queue.h"
#include "mongo/util/time_support.h"
@@ -281,17 +282,14 @@ private:
/**
* Waits for forward progress in gathering responses from a remote.
*
- * If the opCtx is non-null, use it while waiting on completion.
- *
* Stores the response or error in the remote.
*/
- void _makeProgress(OperationContext* opCtx);
+ void _makeProgress();
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
BatonDetacher _baton;
- size_t _batonRequests = 0;
// The metadata obj to pass along with the command remote. Used to indicate that the command is
// ok to run on secondaries.
diff --git a/src/mongo/stdx/SConscript b/src/mongo/stdx/SConscript
new file mode 100644
index 00000000000..a51aa3423f4
--- /dev/null
+++ b/src/mongo/stdx/SConscript
@@ -0,0 +1,15 @@
+# -*- mode: python -*-
+
+Import("env")
+
+env = env.Clone()
+
+env.Benchmark(
+ target='condition_variable_bm',
+ source=[
+ 'condition_variable_bm.cpp',
+ ],
+ LIBDEPS=[
+ ],
+)
+
diff --git a/src/mongo/stdx/condition_variable.h b/src/mongo/stdx/condition_variable.h
index 40ca92ba8f6..6de5dbdb66d 100644
--- a/src/mongo/stdx/condition_variable.h
+++ b/src/mongo/stdx/condition_variable.h
@@ -28,15 +28,151 @@
#pragma once
+#include <atomic>
#include <condition_variable>
+#include <list>
+
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
+
+/**
+ * Notifyable is a slim type meant to allow integration of special kinds of waiters for
+ * stdx::condition_variable. Specifially, the notify() on this type will be called directly from
+ * stdx::condition_varibale::notify_(one|all).
+ *
+ * See Waitable for the stdx::condition_variable integration.
+ */
+class Notifyable {
+public:
+ virtual void notify() noexcept = 0;
+
+protected:
+ ~Notifyable() = default;
+};
+
+class Waitable;
+
namespace stdx {
-using condition_variable = ::std::condition_variable; // NOLINT
using condition_variable_any = ::std::condition_variable_any; // NOLINT
using cv_status = ::std::cv_status; // NOLINT
using ::std::notify_all_at_thread_exit; // NOLINT
+/**
+ * We wrap std::condition_variable to allow us to register Notifyables which can "wait" on the
+ * condvar without actually waiting on the std::condition_variable. This allows us to possibly do
+ * productive work in those types, rather than sleeping in the os.
+ */
+class condition_variable : private std::condition_variable { // NOLINT
+public:
+ using std::condition_variable::condition_variable; // NOLINT
+
+ void notify_one() noexcept {
+ if (_notifyableCount.load()) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_notifyNextNotifyable(lk)) {
+ return;
+ }
+ }
+
+ std::condition_variable::notify_one(); // NOLINT
+ }
+
+ void notify_all() noexcept {
+ if (_notifyableCount.load()) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ while (_notifyNextNotifyable(lk)) {
+ }
+ }
+
+ std::condition_variable::notify_all(); // NOLINT
+ }
+
+ using std::condition_variable::wait; // NOLINT
+ using std::condition_variable::wait_for; // NOLINT
+ using std::condition_variable::wait_until; // NOLINT
+ using std::condition_variable::native_handle; // NOLINT
+
+private:
+ friend class ::mongo::Waitable;
+
+ /**
+ * Runs the callback with the Notifyable registered on the condvar. This ensures that for the
+ * duration of the callback execution, a notification on the condvar will trigger a notify() to
+ * the Notifyable.
+ *
+ * The scheme here is that list entries are erased from the notification list when notified (so
+ * that they don't eat multiple notify_one's). We detect that condition by noting that our
+ * Notifyable* has been overwritten with null (in which case we should avoid a double erase).
+ *
+ * The method is private, and accessed via friendship in Waitable.
+ */
+ template <typename Callback>
+ void _runWithNotifyable(Notifyable& notifyable, Callback&& cb) noexcept {
+ static_assert(noexcept(std::forward<Callback>(cb)()),
+ "Only noexcept functions may be invoked with _runWithNotifyable");
+
+ // We use this local pad to receive notification that we were notified, rather than timing
+ // out organically.
+ //
+ // Note that n must be guarded by _mutex after its insertion in _notifyables (so that we can
+ // detect notification in a thread-safe manner).
+ Notifyable* n = &notifyable;
+
+ auto iter = [&] {
+ stdx::lock_guard<stdx::mutex> localMutex(_mutex);
+ _notifyableCount.addAndFetch(1);
+ return _notifyables.insert(_notifyables.end(), &n);
+ }();
+
+ std::forward<Callback>(cb)();
+
+ stdx::lock_guard<stdx::mutex> localMutex(_mutex);
+ // if n is null, we were notified, and erased in _notifyNextNotifyable
+ if (n) {
+ _notifyableCount.subtractAndFetch(1);
+ _notifyables.erase(iter);
+ }
+ }
+
+ /**
+ * Notifies the next notifyable.
+ *
+ * Returns true if there was a notifyable to be notified.
+ *
+ * Note that as part of notifying, we zero out pointers allocated on the stack by
+ * _runWithNotifyable callers. This is safe because we hold _mutex while we do so, and our
+ * erasure communicates that those waiters need not clear themselves from the notification list
+ * on wakeup.
+ */
+ bool _notifyNextNotifyable(const stdx::lock_guard<stdx::mutex>&) noexcept {
+ auto iter = _notifyables.begin();
+ if (iter == _notifyables.end()) {
+ return false;
+ }
+
+ _notifyableCount.subtractAndFetch(1);
+
+ (**iter)->notify();
+
+ // null out iter here, so that the notifyable won't remove itself from the list when it
+ // wakes up
+ **iter = nullptr;
+
+ _notifyables.erase(iter);
+
+ return true;
+ }
+
+ AtomicUInt64 _notifyableCount;
+
+ stdx::mutex _mutex;
+ std::list<Notifyable**> _notifyables;
+};
+
} // namespace stdx
} // namespace mongo
diff --git a/src/mongo/stdx/condition_variable_bm.cpp b/src/mongo/stdx/condition_variable_bm.cpp
new file mode 100644
index 00000000000..7e020d60c0e
--- /dev/null
+++ b/src/mongo/stdx/condition_variable_bm.cpp
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <benchmark/benchmark.h>
+
+#include "mongo/bson/inline_decls.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+
+void BM_stdNotifyOne(benchmark::State& state) {
+ std::condition_variable cv; // NOLINT
+
+ for (auto _ : state) {
+ benchmark::ClobberMemory();
+ cv.notify_one();
+ }
+}
+
+void BM_stdxNotifyOneNoNotifyables(benchmark::State& state) {
+ stdx::condition_variable cv;
+
+ for (auto _ : state) {
+ benchmark::ClobberMemory();
+ cv.notify_one();
+ }
+}
+
+volatile bool alwaysTrue = true;
+
+void BM_stdWaitWithTruePredicate(benchmark::State& state) {
+ std::condition_variable cv; // NOLINT
+ stdx::mutex mutex;
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ for (auto _ : state) {
+ benchmark::ClobberMemory();
+ cv.wait(lk, [&] { return alwaysTrue; });
+ }
+}
+
+void BM_stdxWaitWithTruePredicate(benchmark::State& state) {
+ stdx::condition_variable cv;
+ stdx::mutex mutex;
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ for (auto _ : state) {
+ benchmark::ClobberMemory();
+ cv.wait(lk, [&] { return alwaysTrue; });
+ }
+}
+
+BENCHMARK(BM_stdNotifyOne);
+BENCHMARK(BM_stdWaitWithTruePredicate);
+BENCHMARK(BM_stdxNotifyOneNoNotifyables);
+BENCHMARK(BM_stdxWaitWithTruePredicate);
+
+} // namespace mongo
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
index f854dfeeaf4..33409cd20b8 100644
--- a/src/mongo/transport/baton.h
+++ b/src/mongo/transport/baton.h
@@ -34,6 +34,7 @@
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
#include "mongo/util/time_support.h"
+#include "mongo/util/waitable.h"
namespace mongo {
@@ -55,7 +56,7 @@ class ReactorTimer;
* context switches, as well as improving the readability of stack traces by grounding async
* execution on top of a regular client call stack.
*/
-class Baton {
+class Baton : public Waitable {
public:
virtual ~Baton() = default;
@@ -94,11 +95,6 @@ public:
virtual Future<void> addSession(Session& session, Type type) = 0;
/**
- * Adds a timer, returning a future which activates after a duration.
- */
- virtual Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) = 0;
-
- /**
* Adds a timer, returning a future which activates after a deadline.
*/
virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0;
@@ -116,14 +112,6 @@ public:
* Returns true if the timer was in the baton to be cancelled.
*/
virtual bool cancelTimer(const ReactorTimer& timer) = 0;
-
- /**
- * Runs the baton. This blocks, waiting for networking events or timeouts, and fulfills
- * promises and executes scheduled work.
- *
- * Returns false if the optional deadline has passed
- */
- virtual bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) = 0;
};
using BatonHandle = std::shared_ptr<Baton>;
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 40829bc43e7..8f18707c12a 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -54,6 +54,23 @@ namespace transport {
* We implement our networking reactor on top of poll + eventfd for wakeups
*/
class TransportLayerASIO::BatonASIO : public Baton {
+ /**
+ * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for
+ * ::poll).
+ *
+ * Its methods are all unreachable because we never actually use its timer-ness (we just need
+ * its address for baton book keeping).
+ */
+ class InternalReactorTimer : public ReactorTimer {
+ public:
+ void cancel(const BatonHandle& baton = nullptr) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) override {
+ MONGO_UNREACHABLE;
+ }
+ };
/**
* RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the
@@ -138,10 +155,6 @@ public:
return std::move(pf.future);
}
- Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) override {
- return waitUntil(timer, Date_t::now() + timeout);
- }
-
Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override {
auto pf = makePromiseFuture<void>();
_safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] {
@@ -202,7 +215,33 @@ public:
}
}
- bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) override {
+ void notify() noexcept override {
+ schedule([] {});
+ }
+
+ /**
+ * We synthesize a run_until by creating a synthetic timer which we use to exit run early (we
+ * create a regular waitUntil baton event off the timer, with the passed deadline).
+ */
+ Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept override {
+ InternalReactorTimer irt;
+ auto future = waitUntil(irt, deadline);
+
+ run(clkSource);
+
+ // If the future is ready our timer has fired, in which case we timed out
+ if (future.isReady()) {
+ future.get();
+
+ return Waitable::TimeoutState::Timeout;
+ } else {
+ cancelTimer(irt);
+
+ return Waitable::TimeoutState::NoTimeout;
+ }
+ }
+
+ void run(ClockSource* clkSource) noexcept override {
std::vector<SharedPromise<void>> toFulfill;
// We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks
@@ -227,44 +266,18 @@ public:
}
});
- // Note that it's important to check for interrupt without the lock, because markKilled
- // calls schedule, which will deadlock if we're holding the lock when calling this.
- if (opCtx) {
- opCtx->checkForInterrupt();
- }
-
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (opCtx) {
- invariant(opCtx == _opCtx);
- }
-
- auto now = Date_t::now();
-
- // If our deadline has passed, return that we've already failed
- if (deadline && *deadline <= now) {
- return false;
- }
// If anything was scheduled, run it now. No need to poll
if (_scheduled.size()) {
- return true;
+ return;
}
- boost::optional<Milliseconds> timeout;
+ boost::optional<Date_t> deadline;
// If we have a timer, poll no longer than that
if (_timers.size()) {
- timeout = _timers.begin()->expiration - now;
- }
-
- if (deadline) {
- auto deadlineTimeout = *deadline - now;
-
- // If we didn't have a timer with a deadline, or our deadline is sooner than that
- // timer
- if (!timeout || (deadlineTimeout < *timeout)) {
- timeout = deadlineTimeout;
- }
+ deadline = _timers.begin()->expiration;
}
std::vector<decltype(_sessions)::iterator> sessions;
@@ -282,13 +295,22 @@ public:
sessions.push_back(iter);
}
+ auto now = clkSource->now();
+
int rval = 0;
// If we don't have a timeout, or we have a timeout that's unexpired, run poll.
- if (!timeout || (*timeout > Milliseconds(0))) {
+ if (!deadline || (*deadline > now)) {
+ if (deadline && !clkSource->tracksSystemClock()) {
+ invariant(clkSource->setAlarm(*deadline, [this] { notify(); }));
+
+ deadline.reset();
+ }
+
_inPoll = true;
lk.unlock();
- rval =
- ::poll(pollSet.data(), pollSet.size(), timeout.value_or(Milliseconds(-1)).count());
+ rval = ::poll(pollSet.data(),
+ pollSet.size(),
+ deadline ? Milliseconds(*deadline - now).count() : -1);
const auto pollGuard = MakeGuard([&] {
lk.lock();
@@ -300,20 +322,9 @@ public:
severe() << "error in poll: " << errnoWithDescription(errno);
fassertFailed(50834);
}
-
- // Note that it's important to check for interrupt without the lock, because markKilled
- // calls schedule, which will deadlock if we're holding the lock when calling this.
- if (opCtx) {
- opCtx->checkForInterrupt();
- }
}
- now = Date_t::now();
-
- // If our deadline passed while in poll, we've failed
- if (deadline && now > *deadline) {
- return false;
- }
+ now = clkSource->now();
// Fire expired timers
for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) {
@@ -348,7 +359,7 @@ public:
invariant(remaining == 0);
}
- return true;
+ return;
}
private:
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index b3ee5aa92a7..a5a81005a73 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -134,12 +134,11 @@ public:
virtual void cancel(const BatonHandle& baton = nullptr) = 0;
/*
- * Returns a future that will be filled with Status::OK after the timeout has ellapsed.
+ * Returns a future that will be filled with Status::OK after the deadline has passed.
*
* Calling this implicitly calls cancel().
*/
- virtual Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) = 0;
- virtual Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) = 0;
+ virtual Future<void> waitUntil(Date_t deadline, const BatonHandle& baton = nullptr) = 0;
};
class Reactor {
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index ad87ea0ed38..b3522f537c1 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -86,13 +86,6 @@ public:
_timer->cancel();
}
- Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) override {
- if (baton) {
- return _asyncWait([&] { return baton->waitFor(*this, timeout); }, baton);
- } else {
- return _asyncWait([&] { _timer->expires_after(timeout.toSystemDuration()); });
- }
- }
Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override {
if (baton) {
@@ -538,25 +531,26 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
}
if (timeout > Milliseconds{0} && timeout < Milliseconds::max()) {
- connector->timeoutTimer.waitFor(timeout).getAsync([connector](Status status) {
- if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) {
- return;
- }
+ connector->timeoutTimer.waitUntil(reactor->now() + timeout)
+ .getAsync([connector](Status status) {
+ if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) {
+ return;
+ }
- connector->promise.setError(
- makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"},
- connector->peer,
- connector->resolvedEndpoint));
+ connector->promise.setError(
+ makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"},
+ connector->peer,
+ connector->resolvedEndpoint));
- std::error_code ec;
- stdx::lock_guard<stdx::mutex> lk(connector->mutex);
- connector->resolver.cancel();
- if (connector->session) {
- connector->session->end();
- } else {
- connector->socket.cancel(ec);
- }
- });
+ std::error_code ec;
+ stdx::lock_guard<stdx::mutex> lk(connector->mutex);
+ connector->resolver.cancel();
+ if (connector->session) {
+ connector->session->end();
+ } else {
+ connector->socket.cancel(ec);
+ }
+ });
}
connector->resolver.asyncResolve(connector->peer, _listenerOptions.enableIPv6)
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp
index 0e595449d54..d43885da603 100644
--- a/src/mongo/transport/transport_layer_asio_integration_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/async_client.h"
#include "mongo/client/connection_string.h"
#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/session.h"
@@ -128,11 +129,7 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) {
auto future = handle->runCommandRequest(ecr, baton);
const auto batonGuard = MakeGuard([&] { baton->detach(); });
- while (!future.isReady()) {
- baton->run(nullptr, boost::none);
- }
-
- assertOK(future.get());
+ future.get(opCtx.get());
}
}
diff --git a/src/mongo/util/clock_source.cpp b/src/mongo/util/clock_source.cpp
index d562089c27a..ec02c771294 100644
--- a/src/mongo/util/clock_source.cpp
+++ b/src/mongo/util/clock_source.cpp
@@ -30,13 +30,20 @@
#include "mongo/stdx/thread.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/waitable.h"
namespace mongo {
stdx::cv_status ClockSource::waitForConditionUntil(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m,
- Date_t deadline) {
+ Date_t deadline,
+ Waitable* waitable) {
if (_tracksSystemClock) {
- return cv.wait_until(m, deadline.toSystemTimePoint());
+ if (deadline == Date_t::max()) {
+ Waitable::wait(waitable, this, cv, m);
+ return stdx::cv_status::no_timeout;
+ }
+
+ return Waitable::wait_until(waitable, this, cv, m, deadline.toSystemTimePoint());
}
// The rest of this function only runs during testing, when the clock source is virtualized and
@@ -75,7 +82,7 @@ stdx::cv_status ClockSource::waitForConditionUntil(stdx::condition_variable& cv,
alarmInfo->waitCV->notify_all();
}));
if (!invokedAlarmInline) {
- cv.wait(m);
+ Waitable::wait(waitable, this, cv, m);
}
m.unlock();
stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex);
diff --git a/src/mongo/util/clock_source.h b/src/mongo/util/clock_source.h
index e38ce7c932f..12401533b79 100644
--- a/src/mongo/util/clock_source.h
+++ b/src/mongo/util/clock_source.h
@@ -28,6 +28,8 @@
#pragma once
+#include <type_traits>
+
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
@@ -41,6 +43,16 @@ class Date_t;
* An interface for getting the current wall clock time.
*/
class ClockSource {
+ // We need a type trait to differentiate waitable ptr args from predicates.
+ //
+ // This returns true for non-pointers and function pointers
+ template <typename Pred>
+ struct CouldBePredicate
+ : public std::integral_constant<bool,
+ !std::is_pointer<Pred>::value ||
+ std::is_function<std::remove_pointer_t<Pred>>::value> {
+ };
+
public:
virtual ~ClockSource() = default;
@@ -79,19 +91,21 @@ public:
*/
stdx::cv_status waitForConditionUntil(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m,
- Date_t deadline);
+ Date_t deadline,
+ Waitable* waitable = nullptr);
/**
* Like cv.wait_until(m, deadline, pred), but uses this ClockSource instead of
* stdx::chrono::system_clock to measure the passage of time.
*/
- template <typename Pred>
+ template <typename Pred, std::enable_if_t<CouldBePredicate<Pred>::value, int> = 0>
bool waitForConditionUntil(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m,
Date_t deadline,
- const Pred& pred) {
+ const Pred& pred,
+ Waitable* waitable = nullptr) {
while (!pred()) {
- if (waitForConditionUntil(cv, m, deadline) == stdx::cv_status::timeout) {
+ if (waitForConditionUntil(cv, m, deadline, waitable) == stdx::cv_status::timeout) {
return pred();
}
}
@@ -102,12 +116,15 @@ public:
* Like cv.wait_for(m, duration, pred), but uses this ClockSource instead of
* stdx::chrono::system_clock to measure the passage of time.
*/
- template <typename Duration, typename Pred>
+ template <typename Duration,
+ typename Pred,
+ std::enable_if_t<CouldBePredicate<Pred>::value, int> = 0>
bool waitForConditionFor(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m,
Duration duration,
- const Pred& pred) {
- return waitForConditionUntil(cv, m, now() + duration, pred);
+ const Pred& pred,
+ Waitable* waitable = nullptr) {
+ return waitForConditionUntil(cv, m, now() + duration, pred, waitable);
}
protected:
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h
index a400a881b0b..9a52ec4c1bf 100644
--- a/src/mongo/util/future.h
+++ b/src/mongo/util/future.h
@@ -44,6 +44,7 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/functional.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/scopeguard.h"
@@ -315,7 +316,7 @@ public:
virtual ~SharedStateBase() = default;
// Only called by future side.
- void wait() noexcept {
+ void wait(Interruptible* interruptible) {
if (state.load(std::memory_order_acquire) == SSBState::kFinished)
return;
@@ -330,7 +331,7 @@ public:
}
stdx::unique_lock<stdx::mutex> lk(mx);
- cv->wait(lk, [&] {
+ interruptible->waitForConditionOrInterrupt(*cv, lk, [&] {
// The mx locking above is insufficient to establish an acquire if state transitions to
// kFinished before we get here, but we aquire mx before the producer does.
return state.load(std::memory_order_acquire) == SSBState::kFinished;
@@ -753,37 +754,86 @@ public:
}
/**
+ * Returns when the future isReady().
+ *
+ * Throws if the interruptible passed is interrupted (explicitly or via deadline).
+ */
+ void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const {
+ if (_immediate) {
+ return;
+ }
+
+ _shared->wait(interruptible);
+ }
+
+ /**
+ * Returns Status::OK() when the future isReady().
+ *
+ * Returns a non-okay status if the interruptible is interrupted.
+ */
+ Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const
+ noexcept {
+ if (_immediate) {
+ return Status::OK();
+ }
+
+ try {
+ _shared->wait(interruptible);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
+ return Status::OK();
+ }
+
+ /**
* Gets the value out of this Future, blocking until it is ready.
*
* get() methods throw on error, while getNoThrow() returns a !OK status.
*
* These methods can be called multiple times, except for the rvalue overloads.
+ *
+ * Note: It is impossible to differentiate interruptible interruption from an error propagating
+ * down the future chain with these methods. If you need to distinguish the two cases, call
+ * wait() first.
*/
- T get() && {
- return std::move(getImpl());
+ T get(Interruptible* interruptible = Interruptible::notInterruptible()) && {
+ return std::move(getImpl(interruptible));
}
- T& get() & {
- return getImpl();
+ T& get(Interruptible* interruptible = Interruptible::notInterruptible()) & {
+ return getImpl(interruptible);
}
- const T& get() const& {
- return const_cast<Future*>(this)->getImpl();
+ const T& get(Interruptible* interruptible = Interruptible::notInterruptible()) const& {
+ return const_cast<Future*>(this)->getImpl(interruptible);
}
- StatusWith<T> getNoThrow() && noexcept {
+ StatusWith<T> getNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) &&
+ noexcept {
if (_immediate) {
return std::move(*_immediate);
}
- _shared->wait();
+ try {
+ _shared->wait(interruptible);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
if (!_shared->status.isOK())
return std::move(_shared->status);
return std::move(*_shared->data);
}
- StatusWith<T> getNoThrow() const& noexcept {
+ StatusWith<T> getNoThrow(
+ Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept {
if (_immediate) {
return *_immediate;
}
- _shared->wait();
+ try {
+ _shared->wait(interruptible);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
if (!_shared->status.isOK())
return _shared->status;
return *_shared->data;
@@ -1092,12 +1142,12 @@ private:
friend class Future;
friend class Promise<T>;
- T& getImpl() {
+ T& getImpl(Interruptible* interruptible) {
if (_immediate) {
return *_immediate;
}
- _shared->wait();
+ _shared->wait(interruptible);
uassertStatusOK(_shared->status);
return *(_shared->data);
}
@@ -1244,12 +1294,22 @@ public:
return _inner.isReady();
}
- void get() const {
- _inner.get();
+ void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const {
+ _inner.wait(interruptible);
+ }
+
+ Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const
+ noexcept {
+ return _inner.waitNoThrow(interruptible);
+ }
+
+ void get(Interruptible* interruptible = Interruptible::notInterruptible()) const {
+ _inner.get(interruptible);
}
- Status getNoThrow() const noexcept {
- return _inner.getNoThrow().getStatus();
+ Status getNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const
+ noexcept {
+ return _inner.getNoThrow(interruptible).getStatus();
}
template <typename Func> // Status -> void
diff --git a/src/mongo/util/future_test_future_int.cpp b/src/mongo/util/future_test_future_int.cpp
index 803b0992500..6707a310161 100644
--- a/src/mongo/util/future_test_future_int.cpp
+++ b/src/mongo/util/future_test_future_int.cpp
@@ -165,6 +165,36 @@ TEST(Future, Fail_isReady) {
});
}
+TEST(Future, Success_wait) {
+ FUTURE_SUCCESS_TEST([] { return 1; },
+ [](Future<int>&& fut) {
+ fut.wait();
+ ASSERT_EQ(fut.get(), 1);
+ });
+}
+
+TEST(Future, Success_waitNoThrow) {
+ FUTURE_SUCCESS_TEST([] { return 1; },
+ [](Future<int>&& fut) {
+ ASSERT_OK(fut.waitNoThrow());
+ ASSERT_EQ(fut.get(), 1);
+ });
+}
+
+TEST(Future, Fail_wait) {
+ FUTURE_FAIL_TEST<int>([](Future<int>&& fut) {
+ fut.wait();
+ ASSERT_THROWS_failStatus(fut.get());
+ });
+}
+
+TEST(Future, Fail_waitNoThrow) {
+ FUTURE_FAIL_TEST<int>([](Future<int>&& fut) {
+ ASSERT_OK(fut.waitNoThrow());
+ ASSERT_THROWS_failStatus(fut.get());
+ });
+}
+
TEST(Future, isReady_TSAN_OK) {
bool done = false;
auto fut = async([&] {
diff --git a/src/mongo/util/interruptible.h b/src/mongo/util/interruptible.h
new file mode 100644
index 00000000000..e47a34ba29c
--- /dev/null
+++ b/src/mongo/util/interruptible.h
@@ -0,0 +1,452 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/time_support.h"
+#include "mongo/util/waitable.h"
+
+namespace mongo {
+
+/**
+ * A type which can be used to wait on condition variables with a level triggered one-way interrupt.
+ * I.e. after the interrupt is triggered (via some non-public api call) subsequent calls to
+ * waitForConditionXXX will fail. Interrupts must unblock all callers of waitForConditionXXX.
+ */
+class Interruptible {
+protected:
+ struct DeadlineState {
+ Date_t deadline;
+ ErrorCodes::Error error;
+ bool hasArtificialDeadline;
+ };
+
+ struct IgnoreInterruptsState {
+ bool ignoreInterrupts;
+ DeadlineState deadline;
+ };
+
+ /**
+ * A deadline guard provides a subsidiary deadline to the parent.
+ */
+ class DeadlineGuard {
+ public:
+ DeadlineGuard(const DeadlineGuard&) = delete;
+ DeadlineGuard& operator=(const DeadlineGuard&) = delete;
+
+ DeadlineGuard(DeadlineGuard&& other)
+ : _interruptible(other._interruptible), _oldDeadline(other._oldDeadline) {
+ other._interruptible = nullptr;
+ }
+
+ DeadlineGuard& operator=(DeadlineGuard&& other) = delete;
+
+ ~DeadlineGuard() {
+ if (_interruptible) {
+ _interruptible->popArtificialDeadline(_oldDeadline);
+ }
+ }
+
+ private:
+ friend Interruptible;
+
+ explicit DeadlineGuard(Interruptible& interruptible,
+ Date_t newDeadline,
+ ErrorCodes::Error error)
+ : _interruptible(&interruptible),
+ _oldDeadline(_interruptible->pushArtificialDeadline(newDeadline, error)) {}
+
+ Interruptible* _interruptible;
+ DeadlineState _oldDeadline;
+ };
+
+ DeadlineGuard makeDeadlineGuard(Date_t deadline, ErrorCodes::Error error) {
+ return DeadlineGuard(*this, deadline, error);
+ }
+
+ /**
+ * An interruption guard provides a region where interruption is ignored.
+ *
+ * Note that this causes the deadline to be reset to Date_t::max(), but that it can also be
+ * subsequently reduced in size after the fact.
+ */
+ class IgnoreInterruptionsGuard {
+ public:
+ IgnoreInterruptionsGuard(const IgnoreInterruptionsGuard&) = delete;
+ IgnoreInterruptionsGuard& operator=(const IgnoreInterruptionsGuard&) = delete;
+
+ IgnoreInterruptionsGuard(IgnoreInterruptionsGuard&& other)
+ : _interruptible(other._interruptible), _oldState(other._oldState) {
+ other._interruptible = nullptr;
+ }
+
+ IgnoreInterruptionsGuard& operator=(IgnoreInterruptionsGuard&&) = delete;
+
+ ~IgnoreInterruptionsGuard() {
+ if (_interruptible) {
+ _interruptible->popIgnoreInterrupts(_oldState);
+ }
+ }
+
+ private:
+ friend Interruptible;
+
+ explicit IgnoreInterruptionsGuard(Interruptible& interruptible)
+ : _interruptible(&interruptible), _oldState(_interruptible->pushIgnoreInterrupts()) {}
+
+ Interruptible* _interruptible;
+ IgnoreInterruptsState _oldState;
+ };
+
+ IgnoreInterruptionsGuard makeIgnoreInterruptionsGuard() {
+ return IgnoreInterruptionsGuard(*this);
+ }
+
+public:
+ /**
+ * Returns a statically allocated instance that cannot be interrupted. Useful as a default
+ * argument to interruptible taking methods.
+ */
+ static Interruptible* notInterruptible();
+
+ /**
+ * Invokes the passed callback with a deadline guard active initialized with the passed
+ * deadline. Additionally handles the dance of try/catching the invocation and checking
+ * checkForInterrupt with the guard inactive (to allow a higher level timeout to override a
+ * lower level one)
+ */
+ template <typename Callback>
+ decltype(auto) runWithDeadline(Date_t deadline, ErrorCodes::Error error, Callback&& cb) {
+ invariant(ErrorCodes::isExceededTimeLimitError(error));
+
+ try {
+ const auto guard = makeDeadlineGuard(deadline, error);
+ return std::forward<Callback>(cb)();
+ } catch (const ExceptionForCat<ErrorCategory::ExceededTimeLimitError>&) {
+ // May throw replacement exception
+ checkForInterrupt();
+ throw;
+ }
+ }
+
+ bool hasDeadline() const {
+ return getDeadline() != Date_t::max();
+ }
+
+ /**
+ * Returns the deadline for this interruptible, or Date_t::max() if there is no deadline.
+ */
+ virtual Date_t getDeadline() const = 0;
+
+ /**
+ * Invokes the passed callback with an interruption guard active. Additionally handles the
+ * dance of try/catching the invocation and checking checkForInterrupt with the guard inactive
+ * (to allow a higher level timeout to override a lower level one, or for top level interruption
+ * to propagate)
+ */
+ template <typename Callback>
+ decltype(auto) runWithoutInterruption(Callback&& cb) {
+ try {
+ const auto guard = makeIgnoreInterruptionsGuard();
+ return std::forward<Callback>(cb)();
+ } catch (const ExceptionForCat<ErrorCategory::ExceededTimeLimitError>&) {
+ // May throw replacement exception
+ checkForInterrupt();
+ throw;
+ }
+ }
+
+ /**
+ * Raises a AssertionException if this operation is in a killed state.
+ */
+ void checkForInterrupt() {
+ uassertStatusOK(checkForInterruptNoAssert());
+ }
+
+ /**
+ * Returns Status::OK() unless this operation is in a killed state.
+ */
+ virtual Status checkForInterruptNoAssert() noexcept = 0;
+
+ /**
+ * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the
+ * deadline on this operation to expire. In the event of interruption or operation deadline
+ * expiration, raises a AssertionException with an error code indicating the interruption type.
+ */
+ void waitForConditionOrInterrupt(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m) {
+ uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
+ }
+
+ /**
+ * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
+ * is interrupted or its deadline expires. Throws a DBException for interruption and
+ * deadline expiration.
+ */
+ template <typename Pred>
+ void waitForConditionOrInterrupt(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Pred pred) {
+ while (!pred()) {
+ waitForConditionOrInterrupt(cv, m);
+ }
+ }
+
+ /**
+ * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing
+ * a DBException to report interruption.
+ */
+ Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m) noexcept {
+ auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max());
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+
+ invariant(status.getValue() == stdx::cv_status::no_timeout);
+ return Status::OK();
+ }
+
+ /**
+ * Same as the predicate form of waitForConditionOrInterrupt, except that it returns a not okay
+ * status instead of throwing on interruption.
+ */
+ template <typename Pred>
+ Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Pred pred) noexcept {
+ while (!pred()) {
+ auto status = waitForConditionOrInterruptNoAssert(cv, m);
+
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ return Status::OK();
+ }
+
+ /**
+ * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or
+ * for the operation to be interrupted, or for the operation's own deadline to expire.
+ *
+ * If the operation deadline expires or the operation is interrupted, throws a DBException. If
+ * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
+ * cv_status::no_timeout.
+ */
+ stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) {
+ return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
+ }
+
+ /**
+ * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline"
+ * expires, or this operation is interrupted, or this operation's own deadline expires.
+ *
+ *
+ * If the operation deadline expires or the operation is interrupted, throws a DBException. If
+ * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
+ * cv_status::no_timeout indicating that "pred" finally returned true.
+ */
+ template <typename Pred>
+ bool waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline,
+ Pred pred) {
+ while (!pred()) {
+ if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
+ return pred();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Same as the non-predicate form of waitForConditionOrInterruptUntil, but takes a relative
+ * amount of time to wait instead of an absolute time point.
+ */
+ stdx::cv_status waitForConditionOrInterruptFor(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Milliseconds ms) {
+ return uassertStatusOK(
+ waitForConditionOrInterruptNoAssertUntil(cv, m, getExpirationDateForWaitForValue(ms)));
+ }
+
+ /**
+ * Same as the predicate form of waitForConditionOrInterruptUntil, but takes a relative
+ * amount of time to wait instead of an absolute time point.
+ */
+ template <typename Pred>
+ bool waitForConditionOrInterruptFor(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Milliseconds ms,
+ Pred pred) {
+ while (!pred()) {
+ if (stdx::cv_status::timeout == waitForConditionOrInterruptFor(cv, m, ms)) {
+ return pred();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
+ * non-ok status indicates the error instead of a DBException.
+ */
+ virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) noexcept = 0;
+
+ /**
+ * Sleeps until "deadline"; throws an exception if the interruptible is interrupted before then.
+ */
+ void sleepUntil(Date_t deadline) {
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ invariant(!waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }));
+ }
+
+ /**
+ * Sleeps for "duration" ms; throws an exception if the interruptible is interrupted before
+ * then.
+ */
+ void sleepFor(Milliseconds duration) {
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ invariant(!waitForConditionOrInterruptFor(cv, lk, duration, [] { return false; }));
+ }
+
+protected:
+ /**
+ * Pushes an ignore interruption critical section into the interruptible. Until an associated
+ * popIgnoreInterrupts is invoked, the interruptible should ignore interruptions related to
+ * explicit interruption or previously set deadlines.
+ *
+ * Note that new deadlines can be set after this is called, which will again introduce the
+ * possibility of interruption.
+ *
+ * Returns state needed to pop interruption.
+ */
+ virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0;
+
+ /**
+ * Pops the ignored interruption critical section introduced by push.
+ */
+ virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0;
+
+ /**
+ * Pushes a subsidiary deadline into the interruptible. Until an associated
+ * popArtificialDeadline is
+ * invoked, the interruptible will fail checkForInterrupt and waitForConditionOrInterrupt calls
+ * with the passed error code if the deadline has passed.
+ *
+ * Note that deadline's higher than the current value are constrained (such that the passed
+ * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed).
+ *
+ * Returns state needed to pop the deadline.
+ */
+ virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0;
+
+ /**
+ * Pops the subsidiary deadline introduced by push.
+ */
+ virtual void popArtificialDeadline(DeadlineState) = 0;
+
+ /**
+ * Returns the equivalent of Date_t::now() + waitFor for the interruptible's clock
+ */
+ virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0;
+
+ class NotInterruptible;
+};
+
+/**
+ * A not interruptible type which can be used as a lightweight default arg for interruptible taking
+ * functions.
+ */
+class Interruptible::NotInterruptible final : public Interruptible {
+ StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) noexcept override {
+
+ if (deadline == Date_t::max()) {
+ cv.wait(m);
+ return stdx::cv_status::no_timeout;
+ }
+
+ return cv.wait_until(m, deadline.toSystemTimePoint());
+ }
+
+ Date_t getDeadline() const override {
+ return Date_t::max();
+ }
+
+ Status checkForInterruptNoAssert() noexcept override {
+ return Status::OK();
+ }
+
+ // It's invalid to call the deadline or ignore interruption guards on a possibly noop
+ // interruptible.
+ //
+ // The noop interruptible should only be invoked as a default arg at the bottom of the call
+ // stack (with types that won't modify it's invocation)
+ IgnoreInterruptsState pushIgnoreInterrupts() override {
+ MONGO_UNREACHABLE;
+ }
+
+ void popIgnoreInterrupts(IgnoreInterruptsState) override {
+ MONGO_UNREACHABLE;
+ }
+
+ DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void popArtificialDeadline(DeadlineState) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override {
+ return Date_t::now() + waitFor;
+ }
+};
+
+inline Interruptible* Interruptible::notInterruptible() {
+ static NotInterruptible notInterruptible{};
+
+ return &notInterruptible;
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h
index 8f2b17d2265..a2e1a09e9bb 100644
--- a/src/mongo/util/producer_consumer_queue.h
+++ b/src/mongo/util/producer_consumer_queue.h
@@ -38,6 +38,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -56,43 +57,6 @@ struct DefaultCostFunction {
}
};
-// Various helpers to tighten down whether the args getting passed are valid interruption args.
-//
-// Whatever the caller passes in the interruption args, they need to be invocable on one of
-// these helpers. std::is_invocable would do the job in C++17
-constexpr std::false_type areInterruptionArgsHelper(...) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Milliseconds) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Date_t) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(Milliseconds) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(Date_t) {
- return {};
-}
-
-template <typename U, typename... InterruptionArgs>
-constexpr auto areInterruptionArgs(U&& u, InterruptionArgs&&... args) {
- return areInterruptionArgsHelper(std::forward<U>(u), std::forward<InterruptionArgs>(args)...);
-}
-
-constexpr std::true_type areInterruptionArgs() {
- return {};
-}
-
} // namespace producer_consumer_queue_detail
/**
@@ -109,20 +73,9 @@ constexpr std::true_type areInterruptionArgs() {
* multi-consumer - Any number of threads may pop work out of the queue
*
* Interruptibility:
- * All of the blocking methods on this type allow for 6 kinds of interruptibility. The matrix is
- * parameterized by (void|OperationContext*)|(void|Milliseconds|Date_t). These provide different
- * kinds of waiting based on whether the method should be interruptible via opCtx, and then
- * whether they should timeout via deadline or duration
+ * All of the blocking methods on this type take an interruptible.
*
- * A contrived example: pcq.pop(opCtx, Minutes(1)) would be warranted if:
- * - The caller is blocking on a client thread. (opCtx)
- * - The caller needs to act periodically on inactivity. (the duration)
- *
- * Exceptions include:
- * timeouts
- * ErrorCodes::ExceededTimeLimit exceptions
- * opCtx interrupts
- * ErrorCodes::Interrupted exceptions
+ * Exceptions outside the interruptible include:
* closure of queue endpoints
* ErrorCodes::ProducerConsumerQueueEndClosed
* pushes with batches that exceed the max queue size
@@ -165,11 +118,7 @@ public:
// Pushes the passed T into the queue
//
// Leaves T unchanged if an interrupt exception is thrown while waiting for space
- template <
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- void push(T&& t, InterruptionArgs&&... interruptionArgs) {
+ void push(T&& t, Interruptible* interruptible = Interruptible::notInterruptible()) {
_pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
auto cost = _invokeCostFunc(t, lk);
uassert(ErrorCodes::ProducerConsumerQueueBatchTooLarge,
@@ -179,7 +128,7 @@ public:
<< ")",
cost <= _max);
- _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForSpace(lk, cost, interruptible);
_push(lk, std::move(t));
});
}
@@ -195,13 +144,10 @@ public:
//
// Lifecycle methods of T must not throw if you want to use this method, as there's no obvious
// mechanism to see what was and was not pushed if those do throw
- template <
- typename StartIterator,
- typename EndIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- void pushMany(StartIterator start, EndIterator last, InterruptionArgs&&... interruptionArgs) {
+ template <typename StartIterator, typename EndIterator>
+ void pushMany(StartIterator start,
+ EndIterator last,
+ Interruptible* interruptible = Interruptible::notInterruptible()) {
return _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
size_t cost = 0;
for (auto iter = start; iter != last; ++iter) {
@@ -215,7 +161,7 @@ public:
<< ")",
cost <= _max);
- _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForSpace(lk, cost, interruptible);
for (auto iter = start; iter != last; ++iter) {
_push(lk, std::move(*iter));
@@ -232,13 +178,9 @@ public:
}
// Pops one T out of the queue
- template <
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- T pop(InterruptionArgs&&... interruptionArgs) {
+ T pop(Interruptible* interruptible = Interruptible::notInterruptible()) {
return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
- _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForNonEmpty(lk, interruptible);
return _pop(lk);
});
}
@@ -250,14 +192,10 @@ public:
// TODO: add sfinae to check to enforce
//
// Returns the cost value of the items extracted, along with the updated output iterator
- template <
- typename OutputIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- std::pair<size_t, OutputIterator> popMany(OutputIterator iterator,
- InterruptionArgs&&... interruptionArgs) {
- return popManyUpTo(_max, iterator, std::forward<InterruptionArgs>(interruptionArgs)...);
+ template <typename OutputIterator>
+ std::pair<size_t, OutputIterator> popMany(
+ OutputIterator iterator, Interruptible* interruptible = Interruptible::notInterruptible()) {
+ return popManyUpTo(_max, iterator, interruptible);
}
// Waits for at least one item in the queue, then pops items out of the queue until it would
@@ -267,18 +205,15 @@ public:
// TODO: add sfinae to check to enforce
//
// Returns the cost value of the items extracted, along with the updated output iterator
- template <
- typename OutputIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- std::pair<size_t, OutputIterator> popManyUpTo(size_t budget,
- OutputIterator iterator,
- InterruptionArgs&&... interruptionArgs) {
+ template <typename OutputIterator>
+ std::pair<size_t, OutputIterator> popManyUpTo(
+ size_t budget,
+ OutputIterator iterator,
+ Interruptible* interruptible = Interruptible::notInterruptible()) {
return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
size_t cost = 0;
- _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForNonEmpty(lk, interruptible);
while (auto out = _tryPop(lk)) {
cost += _invokeCostFunc(*out, lk);
@@ -449,10 +384,9 @@ private:
return t;
}
- template <typename... InterruptionArgs>
void _waitForSpace(stdx::unique_lock<stdx::mutex>& lk,
size_t cost,
- InterruptionArgs&&... interruptionArgs) {
+ Interruptible* interruptible) {
invariant(!_producerWants);
_producerWants = cost;
@@ -464,12 +398,10 @@ private:
_checkProducerClosed(lk);
return _current + cost <= _max;
},
- std::forward<InterruptionArgs>(interruptionArgs)...);
+ interruptible);
}
- template <typename... InterruptionArgs>
- void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk,
- InterruptionArgs&&... interruptionArgs) {
+ void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, Interruptible* interruptible) {
_consumers++;
const auto guard = MakeGuard([&] { _consumers--; });
@@ -480,64 +412,15 @@ private:
_checkConsumerClosed(lk);
return _queue.size();
},
- std::forward<InterruptionArgs>(interruptionArgs)...);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx) {
- opCtx->waitForConditionOrInterrupt(condvar, lk, pred);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred) {
- condvar.wait(lk, pred);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx,
- Date_t deadline) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- opCtx->waitForConditionOrInterruptUntil(condvar, lk, deadline, pred));
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- Date_t deadline) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- condvar.wait_until(lk, deadline.toSystemTimePoint(), pred));
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx,
- Milliseconds duration) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- opCtx->waitForConditionOrInterruptFor(condvar, lk, duration, pred));
+ interruptible);
}
template <typename Callback>
void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
stdx::condition_variable& condvar,
Callback&& pred,
- Milliseconds duration) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- condvar.wait_for(lk, duration.toSystemDuration(), pred));
+ Interruptible* interruptible) {
+ interruptible->waitForConditionOrInterrupt(condvar, lk, pred);
}
mutable stdx::mutex _mutex;
diff --git a/src/mongo/util/producer_consumer_queue_test.cpp b/src/mongo/util/producer_consumer_queue_test.cpp
index fa7af5e04a9..44161c8e64f 100644
--- a/src/mongo/util/producer_consumer_queue_test.cpp
+++ b/src/mongo/util/producer_consumer_queue_test.cpp
@@ -76,7 +76,8 @@ public:
auto client = _serviceCtx->makeClient(name.toString());
auto opCtx = client->makeOperationContext();
- cb(opCtx.get(), _timeout);
+ opCtx->runWithDeadline(
+ _timeout, ErrorCodes::ExceededTimeLimit, [&] { cb(opCtx.get()); });
});
}
@@ -96,20 +97,6 @@ public:
}
};
-template <typename Timeout>
-class ProducerConsumerQueueTestHelper<Timeout> {
-public:
- ProducerConsumerQueueTestHelper(Timeout timeout) : _timeout(timeout) {}
-
- template <typename Callback>
- stdx::thread runThread(StringData name, Callback&& cb) {
- return stdx::thread([this, name, cb] { cb(_timeout); });
- }
-
-private:
- Timeout _timeout;
-};
-
class ProducerConsumerQueueTest : public unittest::Test {
public:
template <typename Callback>
@@ -127,27 +114,17 @@ public:
const Minutes duration(30);
callback(ProducerConsumerQueueTestHelper<OperationContext>(_serviceCtx.get()));
- callback(ProducerConsumerQueueTestHelper<OperationContext, Milliseconds>(_serviceCtx.get(),
- duration));
callback(ProducerConsumerQueueTestHelper<OperationContext, Date_t>(
_serviceCtx.get(), _serviceCtx->getPreciseClockSource()->now() + duration));
callback(ProducerConsumerQueueTestHelper<>());
- callback(ProducerConsumerQueueTestHelper<Milliseconds>(duration));
- callback(ProducerConsumerQueueTestHelper<Date_t>(
- _serviceCtx->getPreciseClockSource()->now() + duration));
}
template <typename Callback>
void runTimeoutPermutations(Callback&& callback) {
const Milliseconds duration(10);
- callback(ProducerConsumerQueueTestHelper<OperationContext, Milliseconds>(_serviceCtx.get(),
- duration));
callback(ProducerConsumerQueueTestHelper<OperationContext, Date_t>(
_serviceCtx.get(), _serviceCtx->getPreciseClockSource()->now() + duration));
- callback(ProducerConsumerQueueTestHelper<Milliseconds>(duration));
- callback(ProducerConsumerQueueTestHelper<Date_t>(
- _serviceCtx->getPreciseClockSource()->now() + duration));
}
private:
diff --git a/src/mongo/util/waitable.h b/src/mongo/util/waitable.h
new file mode 100644
index 00000000000..21a4ed3ea2a
--- /dev/null
+++ b/src/mongo/util/waitable.h
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/**
+ * Waitable is a lightweight type that can be used with stdx::condition_variable and can do other
+ * work while the condvar 'waits'.
+ *
+ * It handles this dance by using a special hook that condvar provides to register itself (as a
+ * notifyable, which it inherits from) during calls to wait. Then, rather than actually waiting on
+ * the condvar, it invokes its run/run_until methods.
+ *
+ * The current implementer of Waitable is the transport layer baton type, which performs delayed IO
+ * when it would otherwise block.
+ */
+class Waitable : public Notifyable {
+public:
+ static void wait(Waitable* waitable,
+ ClockSource* clkSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk) {
+ if (waitable) {
+ cv._runWithNotifyable(*waitable, [&]() noexcept {
+ lk.unlock();
+ waitable->run(clkSource);
+ lk.lock();
+ });
+ } else {
+ cv.wait(lk);
+ }
+ }
+
+ template <typename Predicate>
+ static void wait(Waitable* waitable,
+ ClockSource* clkSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ Predicate pred) {
+ while (!pred()) {
+ wait(waitable, clkSource, cv, lk);
+ }
+ }
+
+ static stdx::cv_status wait_until(
+ Waitable* waitable,
+ ClockSource* clkSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ const stdx::chrono::time_point<stdx::chrono::system_clock>& timeout_time) {
+ if (waitable) {
+ auto rval = stdx::cv_status::no_timeout;
+
+ cv._runWithNotifyable(*waitable, [&]() noexcept {
+ lk.unlock();
+ if (waitable->run_until(clkSource, Date_t(timeout_time)) == TimeoutState::Timeout) {
+ rval = stdx::cv_status::timeout;
+ }
+ lk.lock();
+ });
+
+ return rval;
+ } else {
+ return cv.wait_until(lk, timeout_time);
+ }
+ }
+
+ template <typename Predicate>
+ static bool wait_until(Waitable* waitable,
+ ClockSource* clkSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ const stdx::chrono::time_point<stdx::chrono::system_clock>& timeout_time,
+ Predicate pred) {
+ while (!pred()) {
+ if (wait_until(waitable, clkSource, cv, lk, timeout_time) == stdx::cv_status::timeout) {
+ return pred();
+ }
+ }
+
+ return true;
+ }
+
+protected:
+ ~Waitable() noexcept {}
+
+ enum class TimeoutState {
+ NoTimeout,
+ Timeout,
+ };
+
+ /**
+ * Run some amount of work. The intention is that this function perform work until it's
+ * possible that the surrounding condvar clause could have finished.
+ *
+ * Note that like regular condvar.wait, this allows implementers the flexibility to possibly
+ * return early.
+ *
+ * We take a clock source here to allow for synthetic timeouts.
+ */
+ virtual void run(ClockSource* clkSource) noexcept = 0;
+
+ /**
+ * Like run, but only until the passed deadline has passed.
+ */
+ virtual TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept = 0;
+};
+
+} // namespace mongo