summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-07-05 15:50:40 -0400
committerJason Carey <jcarey@argv.me>2018-09-17 18:07:18 -0400
commit6e3c5ea176aadbd0475f8f87525b9f0fabd4bdc9 (patch)
tree9ca79672f893b98d4fc99a42cf36528c4a7b7488
parent1faa184e835a7a628631064af08389471d64ed0f (diff)
downloadmongo-6e3c5ea176aadbd0475f8f87525b9f0fabd4bdc9.tar.gz
SERVER-35679 General Interruption Facility
Add support for a generalized interruptibility facility in the server. This offers a generalized interruptibility facility, trialed in Future<T> and ProducerConsumerQueue<T>. It offers 3 major concepts: Notifyable: A type which can notified off-thread, causing a wake up from some kind of blocking wait Waitable: A type which is Notifyable, and also can perform work while in a ready-to-receive notification state. static methods offer support for running underneath condition_variable::wait's. The chief implementer is the transport layer baton type Interruptible: A type which can wait on condition variables, and offers: - deadlines. This means the type integrates some sort of clock source - interruptibility. This means the type offers a way of noticing that it should no longer run via status or exception Additionally, Interruptible's offer special scoped guards which offer - Exemption from interruption in a region defined by the lifetime of a guard object - Subsidiary deadlines which can trigger recursively, offering specialized timeout and status return support. The series of virtual types allows us to slice the interface between opCtx and future such that opctx can use future and future can use opctx. Additionally, cutting out more functionality allows us to flow a noop interruptibility type which unifies our waiting behind a common api.
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/catalog/SConscript1
-rw-r--r--src/mongo/db/operation_context.cpp70
-rw-r--r--src/mongo/db/operation_context.h166
-rw-r--r--src/mongo/db/operation_context_test.cpp229
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp2
-rw-r--r--src/mongo/rpc/SConscript1
-rw-r--r--src/mongo/s/async_requests_sender.cpp61
-rw-r--r--src/mongo/s/async_requests_sender.h6
-rw-r--r--src/mongo/stdx/SConscript15
-rw-r--r--src/mongo/stdx/condition_variable.h138
-rw-r--r--src/mongo/stdx/condition_variable_bm.cpp87
-rw-r--r--src/mongo/transport/baton.h16
-rw-r--r--src/mongo/transport/baton_asio_linux.h111
-rw-r--r--src/mongo/transport/transport_layer.h5
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp42
-rw-r--r--src/mongo/transport/transport_layer_asio_integration_test.cpp7
-rw-r--r--src/mongo/util/clock_source.cpp13
-rw-r--r--src/mongo/util/clock_source.h31
-rw-r--r--src/mongo/util/future.h96
-rw-r--r--src/mongo/util/future_test_future_int.cpp30
-rw-r--r--src/mongo/util/interruptible.h452
-rw-r--r--src/mongo/util/producer_consumer_queue.h173
-rw-r--r--src/mongo/util/producer_consumer_queue_test.cpp27
-rw-r--r--src/mongo/util/waitable.h141
25 files changed, 1416 insertions, 505 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index e262fe78f4a..dca2d87d1be 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -36,6 +36,7 @@ env.SConscript(
's',
'scripting',
'shell',
+ 'stdx',
'tools',
'transport',
'unittest',
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 05ce3036ad4..8ab0dd4f364 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -202,6 +202,7 @@ env.Library(
'database_holder',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/storage/storage_options',
],
)
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 94a5ea312d2..873972a8c30 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -88,7 +88,9 @@ void OperationContext::setDeadlineAndMaxTime(Date_t when,
ErrorCodes::Error timeoutError) {
invariant(!getClient()->isInDirectClient());
invariant(ErrorCodes::isExceededTimeLimitError(timeoutError));
- uassert(40120, "Illegal attempt to change operation deadline", !hasDeadline());
+ uassert(40120,
+ "Illegal attempt to change operation deadline",
+ _hasArtificialDeadline || !hasDeadline());
_deadline = when;
_maxTime = maxTime;
_timeoutError = timeoutError;
@@ -165,10 +167,6 @@ Microseconds OperationContext::getRemainingMaxTimeMicros() const {
return _maxTime - getElapsedTime();
}
-void OperationContext::checkForInterrupt() {
- uassertStatusOK(checkForInterruptNoAssert());
-}
-
namespace {
// Helper function for checkForInterrupt fail point. Decides whether the operation currently
@@ -190,7 +188,7 @@ bool opShouldFail(Client* client, const BSONObj& failPointInfo) {
} // namespace
-Status OperationContext::checkForInterruptNoAssert() {
+Status OperationContext::checkForInterruptNoAssert() noexcept {
// TODO: Remove the MONGO_likely(getClient()) once all operation contexts are constructed with
// clients.
if (MONGO_likely(getClient() && getServiceContext()) &&
@@ -199,10 +197,16 @@ Status OperationContext::checkForInterruptNoAssert() {
}
if (hasDeadlineExpired()) {
- markKilled(_timeoutError);
+ if (!_hasArtificialDeadline) {
+ markKilled(_timeoutError);
+ }
return Status(_timeoutError, "operation exceeded time limit");
}
+ if (_ignoreInterrupts) {
+ return Status::OK();
+ }
+
MONGO_FAIL_POINT_BLOCK(checkForInterruptFail, scopedFailPoint) {
if (opShouldFail(getClient(), scopedFailPoint.getData())) {
log() << "set pending kill on op " << getOpID() << ", for checkForInterruptFail";
@@ -218,41 +222,6 @@ Status OperationContext::checkForInterruptNoAssert() {
return Status::OK();
}
-void OperationContext::sleepUntil(Date_t deadline) {
- stdx::mutex m;
- stdx::condition_variable cv;
- stdx::unique_lock<stdx::mutex> lk(m);
- invariant(!waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }));
-}
-
-void OperationContext::sleepFor(Milliseconds duration) {
- stdx::mutex m;
- stdx::condition_variable cv;
- stdx::unique_lock<stdx::mutex> lk(m);
- invariant(!waitForConditionOrInterruptFor(cv, lk, duration, [] { return false; }));
-}
-
-void OperationContext::waitForConditionOrInterrupt(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m) {
- uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
-}
-
-Status OperationContext::waitForConditionOrInterruptNoAssert(
- stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m) noexcept {
- auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max());
- if (!status.isOK()) {
- return status.getStatus();
- }
- invariant(status.getValue() == stdx::cv_status::no_timeout);
- return status.getStatus();
-}
-
-stdx::cv_status OperationContext::waitForConditionOrInterruptUntil(
- stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) {
-
- return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
-}
-
// Theory of operation for waitForConditionOrInterruptNoAssertUntil and markKilled:
//
// An operation indicates to potential killers that it is waiting on a condition variable by setting
@@ -312,14 +281,15 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
const auto waitStatus = [&] {
if (Date_t::max() == deadline) {
- cv.wait(m);
+ Waitable::wait(_baton.get(), getServiceContext()->getPreciseClockSource(), cv, m);
return stdx::cv_status::no_timeout;
}
- return getServiceContext()->getPreciseClockSource()->waitForConditionUntil(cv, m, deadline);
+ return getServiceContext()->getPreciseClockSource()->waitForConditionUntil(
+ cv, m, deadline, _baton.get());
}();
// Continue waiting on cv until no other thread is attempting to kill this one.
- cv.wait(m, [this] {
+ Waitable::wait(_baton.get(), getServiceContext()->getPreciseClockSource(), cv, m, [this] {
stdx::lock_guard<Client> clientLock(*getClient());
if (0 == _numKillers) {
_waitMutex = nullptr;
@@ -338,9 +308,12 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser
// is slightly ahead of the FastClock used in checkForInterrupt. In this case,
// we treat the operation as though it has exceeded its time limit, just as if the
// FastClock and system clock had agreed.
- markKilled(_timeoutError);
+ if (!_hasArtificialDeadline) {
+ markKilled(_timeoutError);
+ }
return Status(_timeoutError, "operation exceeded time limit");
}
+
return waitStatus;
}
@@ -361,11 +334,6 @@ void OperationContext::markKilled(ErrorCodes::Error killCode) {
invariant(_waitCV);
_waitCV->notify_all();
}
-
- // If we have a baton, we need to wake it up. The baton itself will check for interruption
- if (_baton) {
- _baton->schedule([] {});
- }
}
void OperationContext::setLogicalSessionId(LogicalSessionId lsid) {
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 712a0ab860c..9931a6e1931 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -44,6 +44,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/decorable.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
@@ -70,7 +71,7 @@ class UnreplicatedWritesBlock;
* (RecoveryUnitState) to reduce complexity and duplication in the storage-engine specific
* RecoveryUnit and to allow better invariant checking.
*/
-class OperationContext : public Decorable<OperationContext> {
+class OperationContext : public Interruptible, public Decorable<OperationContext> {
MONGO_DISALLOW_COPYING(OperationContext);
public:
@@ -127,107 +128,9 @@ public:
std::unique_ptr<Locker> swapLockState(std::unique_ptr<Locker> locker);
/**
- * Raises a AssertionException if this operation is in a killed state.
- */
- void checkForInterrupt();
-
- /**
* Returns Status::OK() unless this operation is in a killed state.
*/
- Status checkForInterruptNoAssert();
-
- /**
- * Sleeps until "deadline"; throws an exception if the operation is interrupted before then.
- */
- void sleepUntil(Date_t deadline);
-
- /**
- * Sleeps for "duration" ms; throws an exception if the operation is interrupted before then.
- */
- void sleepFor(Milliseconds duration);
-
- /**
- * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the
- * deadline on this operation to expire. In the event of interruption or operation deadline
- * expiration, raises a AssertionException with an error code indicating the interruption type.
- */
- void waitForConditionOrInterrupt(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m);
-
- /**
- * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
- * is interrupted or its deadline expires. Throws a DBException for interruption and
- * deadline expiration.
- */
- template <typename Pred>
- void waitForConditionOrInterrupt(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m,
- Pred pred) {
- while (!pred()) {
- waitForConditionOrInterrupt(cv, m);
- }
- }
-
- /**
- * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing
- * a DBException to report interruption.
- */
- Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m) noexcept;
-
- /**
- * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or
- * for the operation to be interrupted, or for the operation's own deadline to expire.
- *
- * If the operation deadline expires or the operation is interrupted, throws a DBException. If
- * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
- * cv_status::no_timeout.
- */
- stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m,
- Date_t deadline);
-
- /**
- * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline"
- * expires, or this operation is interrupted, or this operation's own deadline expires.
- *
- *
- * If the operation deadline expires or the operation is interrupted, throws a DBException. If
- * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
- * cv_status::no_timeout indicating that "pred" finally returned true.
- */
- template <typename Pred>
- bool waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m,
- Date_t deadline,
- Pred pred) {
- while (!pred()) {
- if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
- return pred();
- }
- }
- return true;
- }
-
- /**
- * Same as the predicate form of waitForConditionOrInterruptUntil, but takes a relative
- * amount of time to wait instead of an absolute time point.
- */
- template <typename Pred>
- bool waitForConditionOrInterruptFor(stdx::condition_variable& cv,
- stdx::unique_lock<stdx::mutex>& m,
- Milliseconds duration,
- Pred pred) {
- return waitForConditionOrInterruptUntil(
- cv, m, getExpirationDateForWaitForValue(duration), pred);
- }
-
- /**
- * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
- * non-ok status indicates the error instead of a DBException.
- */
- StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
- stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept;
+ Status checkForInterruptNoAssert() noexcept override;
/**
* Returns the service context under which this operation context runs, or nullptr if there is
@@ -353,6 +256,9 @@ public:
* without lock by the thread executing on behalf of this operation context.
*/
ErrorCodes::Error getKillStatus() const {
+ if (_ignoreInterrupts) {
+ return ErrorCodes::OK;
+ }
return _killCode.loadRelaxed();
}
@@ -397,16 +303,9 @@ public:
}
/**
- * Returns true if this operation has a deadline.
- */
- bool hasDeadline() const {
- return getDeadline() < Date_t::max();
- }
-
- /**
* Returns the deadline for this operation, or Date_t::max() if there is no deadline.
*/
- Date_t getDeadline() const {
+ Date_t getDeadline() const override {
return _deadline;
}
@@ -425,7 +324,54 @@ public:
*/
Microseconds getRemainingMaxTimeMicros() const;
+ StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) noexcept override;
+
private:
+ IgnoreInterruptsState pushIgnoreInterrupts() override {
+ IgnoreInterruptsState iis{_ignoreInterrupts,
+ {_deadline, _timeoutError, _hasArtificialDeadline}};
+ _hasArtificialDeadline = true;
+ setDeadlineByDate(Date_t::max(), ErrorCodes::ExceededTimeLimit);
+ _ignoreInterrupts = true;
+
+ return iis;
+ }
+
+ void popIgnoreInterrupts(IgnoreInterruptsState iis) override {
+ _ignoreInterrupts = iis.ignoreInterrupts;
+
+ setDeadlineByDate(iis.deadline.deadline, iis.deadline.error);
+ _hasArtificialDeadline = iis.deadline.hasArtificialDeadline;
+
+ _markKilledIfDeadlineRequires();
+ }
+
+ DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) override {
+ DeadlineState ds{_deadline, _timeoutError, _hasArtificialDeadline};
+
+ _hasArtificialDeadline = true;
+ setDeadlineByDate(std::min(_deadline, deadline), error);
+
+ return ds;
+ }
+
+ void popArtificialDeadline(DeadlineState ds) override {
+ setDeadlineByDate(ds.deadline, ds.error);
+ _hasArtificialDeadline = ds.hasArtificialDeadline;
+
+ _markKilledIfDeadlineRequires();
+ }
+
+ void _markKilledIfDeadlineRequires() {
+ if (!_ignoreInterrupts && !_hasArtificialDeadline && hasDeadlineExpired() &&
+ !isKillPending()) {
+ markKilled(_timeoutError);
+ }
+ }
+
/**
* Returns true if this operation has a deadline and it has passed according to the fast clock
* on ServiceContext.
@@ -447,7 +393,7 @@ private:
* Returns the timepoint that is "waitFor" ms after now according to the
* ServiceContext's precise clock.
*/
- Date_t getExpirationDateForWaitForValue(Milliseconds waitFor);
+ Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override;
/**
* Set whether or not operations should generate oplog entries.
@@ -502,6 +448,8 @@ private:
Date_t::max(); // The timepoint at which this operation exceeds its time limit.
ErrorCodes::Error _timeoutError = ErrorCodes::ExceededTimeLimit;
+ bool _ignoreInterrupts = false;
+ bool _hasArtificialDeadline = false;
// Max operation time requested by the user or by the cursor in the case of a getMore with no
// user-specified maxTime. This is tracked with microsecond granularity for the purpose of
diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp
index b779102b702..02891cfa151 100644
--- a/src/mongo/db/operation_context_test.cpp
+++ b/src/mongo/db/operation_context_test.cpp
@@ -200,6 +200,25 @@ TEST(OperationContextTest, OpCtxGroup) {
}
}
+TEST(OperationContextTest, IgnoreInterruptsWorks) {
+ auto serviceCtx = ServiceContext::make();
+ auto client = serviceCtx->makeClient("OperationContextTest");
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->markKilled(ErrorCodes::BadValue);
+ ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::BadValue);
+ ASSERT_EQUALS(opCtx->getKillStatus(), ErrorCodes::BadValue);
+
+ opCtx->runWithoutInterruption([&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ });
+
+ ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::BadValue);
+
+ ASSERT_EQUALS(opCtx->getKillStatus(), ErrorCodes::BadValue);
+}
+
class OperationDeadlineTests : public unittest::Test {
public:
void setUp() {
@@ -210,6 +229,13 @@ public:
client = service->makeClient("OperationDeadlineTest");
}
+ void checkForInterruptForTimeout(OperationContext* opCtx) {
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ opCtx->waitForConditionOrInterrupt(cv, lk);
+ }
+
const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>();
ServiceContext::UniqueServiceContext service;
ServiceContext::UniqueClient client;
@@ -302,6 +328,209 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) {
.getStatus());
}
+TEST_F(OperationDeadlineTests, NestedTimeoutsTimeoutInOrder) {
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(500), ErrorCodes::MaxTimeMSExpired);
+
+ bool reachedA = false;
+ bool reachedB = false;
+ bool reachedC = false;
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(50), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ try {
+ opCtx->runWithDeadline(mockClock->now() + Milliseconds(10),
+ ErrorCodes::ExceededTimeLimit,
+ [&] {
+ ASSERT_OK(
+ opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(20));
+ checkForInterruptForTimeout(opCtx.get());
+ ASSERT(false);
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ opCtx->checkForInterrupt();
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(50));
+ reachedA = true;
+ }
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ opCtx->checkForInterrupt();
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(50));
+ reachedB = true;
+ }
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ reachedC = true;
+ ASSERT_OK(opCtx->getKillStatus());
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ }
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+ ASSERT(reachedC);
+
+ ASSERT_OK(opCtx->getKillStatus());
+
+ mockClock->advance(Seconds(1));
+
+ ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::MaxTimeMSExpired);
+}
+
+TEST_F(OperationDeadlineTests, NestedTimeoutsThatViolateMaxTime) {
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
+
+ bool reachedA = false;
+ bool reachedB = false;
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(50));
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ reachedA = true;
+ }
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::MaxTimeMSExpired>&) {
+ reachedB = true;
+ }
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+}
+
+TEST_F(OperationDeadlineTests, NestedNonMaxTimeMSTimeoutsThatAreLargerAreIgnored) {
+ auto opCtx = client->makeOperationContext();
+
+ bool reachedA = false;
+ bool reachedB = false;
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(10), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(50));
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ reachedA = true;
+ }
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ reachedB = true;
+ }
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+}
+
+TEST_F(OperationDeadlineTests, DeadlineAfterIgnoreInterruptsReopens) {
+ auto opCtx = client->makeOperationContext();
+
+ bool reachedA = false;
+ bool reachedB = false;
+ bool reachedC = false;
+
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(500), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+
+ opCtx->runWithoutInterruption([&] {
+ try {
+ opCtx->runWithDeadline(
+ mockClock->now() + Seconds(1), ErrorCodes::ExceededTimeLimit, [&] {
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ ASSERT_OK(opCtx->getKillStatus());
+ mockClock->advance(Milliseconds(750));
+ ASSERT_OK(opCtx->checkForInterruptNoAssert());
+ mockClock->advance(Milliseconds(500));
+ reachedA = true;
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) {
+ opCtx->checkForInterrupt();
+ reachedB = true;
+ }
+ });
+
+ opCtx->checkForInterrupt();
+ });
+ } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>& ex) {
+ reachedC = true;
+ }
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+ ASSERT(reachedC);
+}
+
+TEST_F(OperationDeadlineTests, DeadlineAfterRunWithoutInterruptSeesViolatedMaxMS) {
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(100), ErrorCodes::MaxTimeMSExpired);
+
+ ASSERT_THROWS_CODE(opCtx->runWithoutInterruption([&] {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(200), ErrorCodes::ExceededTimeLimit, [&] {
+ mockClock->advance(Milliseconds(300));
+ opCtx->checkForInterrupt();
+ });
+ }),
+ DBException,
+ ErrorCodes::MaxTimeMSExpired);
+}
+
+TEST_F(OperationDeadlineTests, DeadlineAfterRunWithoutInterruptDoesntSeeUnviolatedMaxMS) {
+ auto opCtx = client->makeOperationContext();
+
+ opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(200), ErrorCodes::MaxTimeMSExpired);
+
+ ASSERT_THROWS_CODE(opCtx->runWithoutInterruption([&] {
+ opCtx->runWithDeadline(
+ mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] {
+ mockClock->advance(Milliseconds(150));
+ opCtx->checkForInterrupt();
+ });
+ }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+}
+
TEST_F(OperationDeadlineTests, WaitForKilledOpCV) {
auto opCtx = client->makeOperationContext();
opCtx->markKilled();
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index 5e79798c6aa..b74e424fe1f 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -96,7 +96,7 @@ void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) {
return;
}
- _timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) {
+ _timer->waitUntil(_reactor->now() + timeoutVal).getAsync([cb = std::move(cb)](Status status) {
// If we get canceled, then we don't worry about the timeout anymore
if (status == ErrorCodes::CallbackCanceled) {
return;
diff --git a/src/mongo/rpc/SConscript b/src/mongo/rpc/SConscript
index f7883dbc9e2..f5892449616 100644
--- a/src/mongo/rpc/SConscript
+++ b/src/mongo/rpc/SConscript
@@ -174,6 +174,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/mongo/transport/transport_layer_common',
"$BUILD_DIR/mongo/util/concurrency/spin_lock",
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index aef8ac9d3b7..4ef49669505 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -99,7 +99,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
// Otherwise, wait for some response to be received.
if (_interruptStatus.isOK()) {
try {
- _makeProgress(_opCtx);
+ _makeProgress();
} catch (const AssertionException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
@@ -108,7 +108,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
continue;
}
} else {
- _makeProgress(nullptr);
+ _opCtx->runWithoutInterruption([&] { _makeProgress(); });
}
}
return *readyResponse;
@@ -139,11 +139,6 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
_scheduleRequests();
}
- // If we have baton requests, we want to process those before proceeding
- if (_batonRequests) {
- return boost::none;
- }
-
// Check if any remote is ready.
invariant(!_remotes.empty());
for (auto& remote : _remotes) {
@@ -215,11 +210,6 @@ void AsyncRequestsSender::_scheduleRequests() {
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
- if (_baton) {
- _batonRequests++;
- _baton->schedule([this] { _batonRequests--; });
- }
-
// Push a noop response to the queue to indicate that a remote is ready for
// re-processing due to failure.
_responseQueue.push(boost::none);
@@ -245,11 +235,6 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
[remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- if (_baton) {
- _batonRequests++;
- _baton->schedule([this] { _batonRequests--; });
- }
-
_responseQueue.push(Job{cbData, remoteIndex});
},
_baton);
@@ -262,22 +247,8 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
}
// Passing opCtx means you'd like to opt into opCtx interruption. During cleanup we actually don't.
-void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) {
- invariant(!opCtx || opCtx == _opCtx);
-
- boost::optional<Job> job;
-
- if (_baton) {
- // If we're using a baton, we peek the queue, and block on the baton if it's empty
- if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) {
- job = std::move(*tryJob);
- } else {
- _baton->run(opCtx, boost::none);
- }
- } else {
- // Otherwise we block on the queue
- job = opCtx ? _responseQueue.pop(opCtx) : _responseQueue.pop();
- }
+void AsyncRequestsSender::_makeProgress() {
+ auto job = _responseQueue.pop(_opCtx);
if (!job) {
return;
@@ -350,22 +321,22 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
// progress on previous requests.
auto pf = makePromiseFuture<HostAndPort>();
- ars->_batonRequests++;
stdx::thread bgChecker([&] {
pf.promise.setWith(
[&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); });
-
- ars->_baton->schedule([ars] { ars->_batonRequests--; });
});
- const auto guard = MakeGuard([&] { bgChecker.join(); });
-
- while (!pf.future.isReady()) {
- if (!ars->_baton->run(nullptr, deadline)) {
- break;
- }
- }
-
- return pf.future.getNoThrow();
+ const auto threadGuard = MakeGuard([&] { bgChecker.join(); });
+
+ // We ignore interrupts here because we want to spin the baton for the full duration of the
+ // findHostWithMaxWait. We set the time limit (although the bg checker should fulfill our
+ // promise) to sync up with the findHostWithMaxWait.
+ //
+ // TODO clean this up after SERVER-35689 when we can do async targeting.
+ return ars->_opCtx->runWithoutInterruption([&] {
+ return ars->_opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
+ return pf.future.getNoThrow(ars->_opCtx);
+ });
+ });
}();
if (!findHostStatus.isOK()) {
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index dfb053c977e..fe7d5e20cb7 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -39,6 +39,7 @@
#include "mongo/executor/task_executor.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/shard_id.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/producer_consumer_queue.h"
#include "mongo/util/time_support.h"
@@ -281,17 +282,14 @@ private:
/**
* Waits for forward progress in gathering responses from a remote.
*
- * If the opCtx is non-null, use it while waiting on completion.
- *
* Stores the response or error in the remote.
*/
- void _makeProgress(OperationContext* opCtx);
+ void _makeProgress();
OperationContext* _opCtx;
executor::TaskExecutor* _executor;
BatonDetacher _baton;
- size_t _batonRequests = 0;
// The metadata obj to pass along with the command remote. Used to indicate that the command is
// ok to run on secondaries.
diff --git a/src/mongo/stdx/SConscript b/src/mongo/stdx/SConscript
new file mode 100644
index 00000000000..a51aa3423f4
--- /dev/null
+++ b/src/mongo/stdx/SConscript
@@ -0,0 +1,15 @@
+# -*- mode: python -*-
+
+Import("env")
+
+env = env.Clone()
+
+env.Benchmark(
+ target='condition_variable_bm',
+ source=[
+ 'condition_variable_bm.cpp',
+ ],
+ LIBDEPS=[
+ ],
+)
+
diff --git a/src/mongo/stdx/condition_variable.h b/src/mongo/stdx/condition_variable.h
index 40ca92ba8f6..6de5dbdb66d 100644
--- a/src/mongo/stdx/condition_variable.h
+++ b/src/mongo/stdx/condition_variable.h
@@ -28,15 +28,151 @@
#pragma once
+#include <atomic>
#include <condition_variable>
+#include <list>
+
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
+
+/**
+ * Notifyable is a slim type meant to allow integration of special kinds of waiters for
+ * stdx::condition_variable. Specifially, the notify() on this type will be called directly from
+ * stdx::condition_varibale::notify_(one|all).
+ *
+ * See Waitable for the stdx::condition_variable integration.
+ */
+class Notifyable {
+public:
+ virtual void notify() noexcept = 0;
+
+protected:
+ ~Notifyable() = default;
+};
+
+class Waitable;
+
namespace stdx {
-using condition_variable = ::std::condition_variable; // NOLINT
using condition_variable_any = ::std::condition_variable_any; // NOLINT
using cv_status = ::std::cv_status; // NOLINT
using ::std::notify_all_at_thread_exit; // NOLINT
+/**
+ * We wrap std::condition_variable to allow us to register Notifyables which can "wait" on the
+ * condvar without actually waiting on the std::condition_variable. This allows us to possibly do
+ * productive work in those types, rather than sleeping in the os.
+ */
+class condition_variable : private std::condition_variable { // NOLINT
+public:
+ using std::condition_variable::condition_variable; // NOLINT
+
+ void notify_one() noexcept {
+ if (_notifyableCount.load()) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_notifyNextNotifyable(lk)) {
+ return;
+ }
+ }
+
+ std::condition_variable::notify_one(); // NOLINT
+ }
+
+ void notify_all() noexcept {
+ if (_notifyableCount.load()) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ while (_notifyNextNotifyable(lk)) {
+ }
+ }
+
+ std::condition_variable::notify_all(); // NOLINT
+ }
+
+ using std::condition_variable::wait; // NOLINT
+ using std::condition_variable::wait_for; // NOLINT
+ using std::condition_variable::wait_until; // NOLINT
+ using std::condition_variable::native_handle; // NOLINT
+
+private:
+ friend class ::mongo::Waitable;
+
+ /**
+ * Runs the callback with the Notifyable registered on the condvar. This ensures that for the
+ * duration of the callback execution, a notification on the condvar will trigger a notify() to
+ * the Notifyable.
+ *
+ * The scheme here is that list entries are erased from the notification list when notified (so
+ * that they don't eat multiple notify_one's). We detect that condition by noting that our
+ * Notifyable* has been overwritten with null (in which case we should avoid a double erase).
+ *
+ * The method is private, and accessed via friendship in Waitable.
+ */
+ template <typename Callback>
+ void _runWithNotifyable(Notifyable& notifyable, Callback&& cb) noexcept {
+ static_assert(noexcept(std::forward<Callback>(cb)()),
+ "Only noexcept functions may be invoked with _runWithNotifyable");
+
+ // We use this local pad to receive notification that we were notified, rather than timing
+ // out organically.
+ //
+ // Note that n must be guarded by _mutex after its insertion in _notifyables (so that we can
+ // detect notification in a thread-safe manner).
+ Notifyable* n = &notifyable;
+
+ auto iter = [&] {
+ stdx::lock_guard<stdx::mutex> localMutex(_mutex);
+ _notifyableCount.addAndFetch(1);
+ return _notifyables.insert(_notifyables.end(), &n);
+ }();
+
+ std::forward<Callback>(cb)();
+
+ stdx::lock_guard<stdx::mutex> localMutex(_mutex);
+ // if n is null, we were notified, and erased in _notifyNextNotifyable
+ if (n) {
+ _notifyableCount.subtractAndFetch(1);
+ _notifyables.erase(iter);
+ }
+ }
+
+ /**
+ * Notifies the next notifyable.
+ *
+ * Returns true if there was a notifyable to be notified.
+ *
+ * Note that as part of notifying, we zero out pointers allocated on the stack by
+ * _runWithNotifyable callers. This is safe because we hold _mutex while we do so, and our
+ * erasure communicates that those waiters need not clear themselves from the notification list
+ * on wakeup.
+ */
+ bool _notifyNextNotifyable(const stdx::lock_guard<stdx::mutex>&) noexcept {
+ auto iter = _notifyables.begin();
+ if (iter == _notifyables.end()) {
+ return false;
+ }
+
+ _notifyableCount.subtractAndFetch(1);
+
+ (**iter)->notify();
+
+ // null out iter here, so that the notifyable won't remove itself from the list when it
+ // wakes up
+ **iter = nullptr;
+
+ _notifyables.erase(iter);
+
+ return true;
+ }
+
+ AtomicUInt64 _notifyableCount;
+
+ stdx::mutex _mutex;
+ std::list<Notifyable**> _notifyables;
+};
+
} // namespace stdx
} // namespace mongo
diff --git a/src/mongo/stdx/condition_variable_bm.cpp b/src/mongo/stdx/condition_variable_bm.cpp
new file mode 100644
index 00000000000..7e020d60c0e
--- /dev/null
+++ b/src/mongo/stdx/condition_variable_bm.cpp
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <benchmark/benchmark.h>
+
+#include "mongo/bson/inline_decls.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+
+void BM_stdNotifyOne(benchmark::State& state) {
+ std::condition_variable cv; // NOLINT
+
+ for (auto _ : state) {
+ benchmark::ClobberMemory();
+ cv.notify_one();
+ }
+}
+
+void BM_stdxNotifyOneNoNotifyables(benchmark::State& state) {
+ stdx::condition_variable cv;
+
+ for (auto _ : state) {
+ benchmark::ClobberMemory();
+ cv.notify_one();
+ }
+}
+
+volatile bool alwaysTrue = true;
+
+void BM_stdWaitWithTruePredicate(benchmark::State& state) {
+ std::condition_variable cv; // NOLINT
+ stdx::mutex mutex;
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ for (auto _ : state) {
+ benchmark::ClobberMemory();
+ cv.wait(lk, [&] { return alwaysTrue; });
+ }
+}
+
+void BM_stdxWaitWithTruePredicate(benchmark::State& state) {
+ stdx::condition_variable cv;
+ stdx::mutex mutex;
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ for (auto _ : state) {
+ benchmark::ClobberMemory();
+ cv.wait(lk, [&] { return alwaysTrue; });
+ }
+}
+
+BENCHMARK(BM_stdNotifyOne);
+BENCHMARK(BM_stdWaitWithTruePredicate);
+BENCHMARK(BM_stdxNotifyOneNoNotifyables);
+BENCHMARK(BM_stdxWaitWithTruePredicate);
+
+} // namespace mongo
diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h
index f854dfeeaf4..33409cd20b8 100644
--- a/src/mongo/transport/baton.h
+++ b/src/mongo/transport/baton.h
@@ -34,6 +34,7 @@
#include "mongo/util/functional.h"
#include "mongo/util/future.h"
#include "mongo/util/time_support.h"
+#include "mongo/util/waitable.h"
namespace mongo {
@@ -55,7 +56,7 @@ class ReactorTimer;
* context switches, as well as improving the readability of stack traces by grounding async
* execution on top of a regular client call stack.
*/
-class Baton {
+class Baton : public Waitable {
public:
virtual ~Baton() = default;
@@ -94,11 +95,6 @@ public:
virtual Future<void> addSession(Session& session, Type type) = 0;
/**
- * Adds a timer, returning a future which activates after a duration.
- */
- virtual Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) = 0;
-
- /**
* Adds a timer, returning a future which activates after a deadline.
*/
virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0;
@@ -116,14 +112,6 @@ public:
* Returns true if the timer was in the baton to be cancelled.
*/
virtual bool cancelTimer(const ReactorTimer& timer) = 0;
-
- /**
- * Runs the baton. This blocks, waiting for networking events or timeouts, and fulfills
- * promises and executes scheduled work.
- *
- * Returns false if the optional deadline has passed
- */
- virtual bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) = 0;
};
using BatonHandle = std::shared_ptr<Baton>;
diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h
index 40829bc43e7..8f18707c12a 100644
--- a/src/mongo/transport/baton_asio_linux.h
+++ b/src/mongo/transport/baton_asio_linux.h
@@ -54,6 +54,23 @@ namespace transport {
* We implement our networking reactor on top of poll + eventfd for wakeups
*/
class TransportLayerASIO::BatonASIO : public Baton {
+ /**
+ * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for
+ * ::poll).
+ *
+ * Its methods are all unreachable because we never actually use its timer-ness (we just need
+ * its address for baton book keeping).
+ */
+ class InternalReactorTimer : public ReactorTimer {
+ public:
+ void cancel(const BatonHandle& baton = nullptr) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) override {
+ MONGO_UNREACHABLE;
+ }
+ };
/**
* RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the
@@ -138,10 +155,6 @@ public:
return std::move(pf.future);
}
- Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) override {
- return waitUntil(timer, Date_t::now() + timeout);
- }
-
Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override {
auto pf = makePromiseFuture<void>();
_safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] {
@@ -202,7 +215,33 @@ public:
}
}
- bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) override {
+ void notify() noexcept override {
+ schedule([] {});
+ }
+
+ /**
+ * We synthesize a run_until by creating a synthetic timer which we use to exit run early (we
+ * create a regular waitUntil baton event off the timer, with the passed deadline).
+ */
+ Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept override {
+ InternalReactorTimer irt;
+ auto future = waitUntil(irt, deadline);
+
+ run(clkSource);
+
+ // If the future is ready our timer has fired, in which case we timed out
+ if (future.isReady()) {
+ future.get();
+
+ return Waitable::TimeoutState::Timeout;
+ } else {
+ cancelTimer(irt);
+
+ return Waitable::TimeoutState::NoTimeout;
+ }
+ }
+
+ void run(ClockSource* clkSource) noexcept override {
std::vector<SharedPromise<void>> toFulfill;
// We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks
@@ -227,44 +266,18 @@ public:
}
});
- // Note that it's important to check for interrupt without the lock, because markKilled
- // calls schedule, which will deadlock if we're holding the lock when calling this.
- if (opCtx) {
- opCtx->checkForInterrupt();
- }
-
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (opCtx) {
- invariant(opCtx == _opCtx);
- }
-
- auto now = Date_t::now();
-
- // If our deadline has passed, return that we've already failed
- if (deadline && *deadline <= now) {
- return false;
- }
// If anything was scheduled, run it now. No need to poll
if (_scheduled.size()) {
- return true;
+ return;
}
- boost::optional<Milliseconds> timeout;
+ boost::optional<Date_t> deadline;
// If we have a timer, poll no longer than that
if (_timers.size()) {
- timeout = _timers.begin()->expiration - now;
- }
-
- if (deadline) {
- auto deadlineTimeout = *deadline - now;
-
- // If we didn't have a timer with a deadline, or our deadline is sooner than that
- // timer
- if (!timeout || (deadlineTimeout < *timeout)) {
- timeout = deadlineTimeout;
- }
+ deadline = _timers.begin()->expiration;
}
std::vector<decltype(_sessions)::iterator> sessions;
@@ -282,13 +295,22 @@ public:
sessions.push_back(iter);
}
+ auto now = clkSource->now();
+
int rval = 0;
// If we don't have a timeout, or we have a timeout that's unexpired, run poll.
- if (!timeout || (*timeout > Milliseconds(0))) {
+ if (!deadline || (*deadline > now)) {
+ if (deadline && !clkSource->tracksSystemClock()) {
+ invariant(clkSource->setAlarm(*deadline, [this] { notify(); }));
+
+ deadline.reset();
+ }
+
_inPoll = true;
lk.unlock();
- rval =
- ::poll(pollSet.data(), pollSet.size(), timeout.value_or(Milliseconds(-1)).count());
+ rval = ::poll(pollSet.data(),
+ pollSet.size(),
+ deadline ? Milliseconds(*deadline - now).count() : -1);
const auto pollGuard = MakeGuard([&] {
lk.lock();
@@ -300,20 +322,9 @@ public:
severe() << "error in poll: " << errnoWithDescription(errno);
fassertFailed(50834);
}
-
- // Note that it's important to check for interrupt without the lock, because markKilled
- // calls schedule, which will deadlock if we're holding the lock when calling this.
- if (opCtx) {
- opCtx->checkForInterrupt();
- }
}
- now = Date_t::now();
-
- // If our deadline passed while in poll, we've failed
- if (deadline && now > *deadline) {
- return false;
- }
+ now = clkSource->now();
// Fire expired timers
for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) {
@@ -348,7 +359,7 @@ public:
invariant(remaining == 0);
}
- return true;
+ return;
}
private:
diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h
index b3ee5aa92a7..a5a81005a73 100644
--- a/src/mongo/transport/transport_layer.h
+++ b/src/mongo/transport/transport_layer.h
@@ -134,12 +134,11 @@ public:
virtual void cancel(const BatonHandle& baton = nullptr) = 0;
/*
- * Returns a future that will be filled with Status::OK after the timeout has ellapsed.
+ * Returns a future that will be filled with Status::OK after the deadline has passed.
*
* Calling this implicitly calls cancel().
*/
- virtual Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) = 0;
- virtual Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) = 0;
+ virtual Future<void> waitUntil(Date_t deadline, const BatonHandle& baton = nullptr) = 0;
};
class Reactor {
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index ad87ea0ed38..b3522f537c1 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -86,13 +86,6 @@ public:
_timer->cancel();
}
- Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) override {
- if (baton) {
- return _asyncWait([&] { return baton->waitFor(*this, timeout); }, baton);
- } else {
- return _asyncWait([&] { _timer->expires_after(timeout.toSystemDuration()); });
- }
- }
Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override {
if (baton) {
@@ -538,25 +531,26 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer,
}
if (timeout > Milliseconds{0} && timeout < Milliseconds::max()) {
- connector->timeoutTimer.waitFor(timeout).getAsync([connector](Status status) {
- if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) {
- return;
- }
+ connector->timeoutTimer.waitUntil(reactor->now() + timeout)
+ .getAsync([connector](Status status) {
+ if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) {
+ return;
+ }
- connector->promise.setError(
- makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"},
- connector->peer,
- connector->resolvedEndpoint));
+ connector->promise.setError(
+ makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"},
+ connector->peer,
+ connector->resolvedEndpoint));
- std::error_code ec;
- stdx::lock_guard<stdx::mutex> lk(connector->mutex);
- connector->resolver.cancel();
- if (connector->session) {
- connector->session->end();
- } else {
- connector->socket.cancel(ec);
- }
- });
+ std::error_code ec;
+ stdx::lock_guard<stdx::mutex> lk(connector->mutex);
+ connector->resolver.cancel();
+ if (connector->session) {
+ connector->session->end();
+ } else {
+ connector->socket.cancel(ec);
+ }
+ });
}
connector->resolver.asyncResolve(connector->peer, _listenerOptions.enableIPv6)
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp
index 0e595449d54..d43885da603 100644
--- a/src/mongo/transport/transport_layer_asio_integration_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/async_client.h"
#include "mongo/client/connection_string.h"
#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/session.h"
@@ -128,11 +129,7 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) {
auto future = handle->runCommandRequest(ecr, baton);
const auto batonGuard = MakeGuard([&] { baton->detach(); });
- while (!future.isReady()) {
- baton->run(nullptr, boost::none);
- }
-
- assertOK(future.get());
+ future.get(opCtx.get());
}
}
diff --git a/src/mongo/util/clock_source.cpp b/src/mongo/util/clock_source.cpp
index d562089c27a..ec02c771294 100644
--- a/src/mongo/util/clock_source.cpp
+++ b/src/mongo/util/clock_source.cpp
@@ -30,13 +30,20 @@
#include "mongo/stdx/thread.h"
#include "mongo/util/clock_source.h"
+#include "mongo/util/waitable.h"
namespace mongo {
stdx::cv_status ClockSource::waitForConditionUntil(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m,
- Date_t deadline) {
+ Date_t deadline,
+ Waitable* waitable) {
if (_tracksSystemClock) {
- return cv.wait_until(m, deadline.toSystemTimePoint());
+ if (deadline == Date_t::max()) {
+ Waitable::wait(waitable, this, cv, m);
+ return stdx::cv_status::no_timeout;
+ }
+
+ return Waitable::wait_until(waitable, this, cv, m, deadline.toSystemTimePoint());
}
// The rest of this function only runs during testing, when the clock source is virtualized and
@@ -75,7 +82,7 @@ stdx::cv_status ClockSource::waitForConditionUntil(stdx::condition_variable& cv,
alarmInfo->waitCV->notify_all();
}));
if (!invokedAlarmInline) {
- cv.wait(m);
+ Waitable::wait(waitable, this, cv, m);
}
m.unlock();
stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex);
diff --git a/src/mongo/util/clock_source.h b/src/mongo/util/clock_source.h
index e38ce7c932f..12401533b79 100644
--- a/src/mongo/util/clock_source.h
+++ b/src/mongo/util/clock_source.h
@@ -28,6 +28,8 @@
#pragma once
+#include <type_traits>
+
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
@@ -41,6 +43,16 @@ class Date_t;
* An interface for getting the current wall clock time.
*/
class ClockSource {
+ // We need a type trait to differentiate waitable ptr args from predicates.
+ //
+ // This returns true for non-pointers and function pointers
+ template <typename Pred>
+ struct CouldBePredicate
+ : public std::integral_constant<bool,
+ !std::is_pointer<Pred>::value ||
+ std::is_function<std::remove_pointer_t<Pred>>::value> {
+ };
+
public:
virtual ~ClockSource() = default;
@@ -79,19 +91,21 @@ public:
*/
stdx::cv_status waitForConditionUntil(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m,
- Date_t deadline);
+ Date_t deadline,
+ Waitable* waitable = nullptr);
/**
* Like cv.wait_until(m, deadline, pred), but uses this ClockSource instead of
* stdx::chrono::system_clock to measure the passage of time.
*/
- template <typename Pred>
+ template <typename Pred, std::enable_if_t<CouldBePredicate<Pred>::value, int> = 0>
bool waitForConditionUntil(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m,
Date_t deadline,
- const Pred& pred) {
+ const Pred& pred,
+ Waitable* waitable = nullptr) {
while (!pred()) {
- if (waitForConditionUntil(cv, m, deadline) == stdx::cv_status::timeout) {
+ if (waitForConditionUntil(cv, m, deadline, waitable) == stdx::cv_status::timeout) {
return pred();
}
}
@@ -102,12 +116,15 @@ public:
* Like cv.wait_for(m, duration, pred), but uses this ClockSource instead of
* stdx::chrono::system_clock to measure the passage of time.
*/
- template <typename Duration, typename Pred>
+ template <typename Duration,
+ typename Pred,
+ std::enable_if_t<CouldBePredicate<Pred>::value, int> = 0>
bool waitForConditionFor(stdx::condition_variable& cv,
stdx::unique_lock<stdx::mutex>& m,
Duration duration,
- const Pred& pred) {
- return waitForConditionUntil(cv, m, now() + duration, pred);
+ const Pred& pred,
+ Waitable* waitable = nullptr) {
+ return waitForConditionUntil(cv, m, now() + duration, pred, waitable);
}
protected:
diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h
index a400a881b0b..9a52ec4c1bf 100644
--- a/src/mongo/util/future.h
+++ b/src/mongo/util/future.h
@@ -44,6 +44,7 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/functional.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/scopeguard.h"
@@ -315,7 +316,7 @@ public:
virtual ~SharedStateBase() = default;
// Only called by future side.
- void wait() noexcept {
+ void wait(Interruptible* interruptible) {
if (state.load(std::memory_order_acquire) == SSBState::kFinished)
return;
@@ -330,7 +331,7 @@ public:
}
stdx::unique_lock<stdx::mutex> lk(mx);
- cv->wait(lk, [&] {
+ interruptible->waitForConditionOrInterrupt(*cv, lk, [&] {
// The mx locking above is insufficient to establish an acquire if state transitions to
// kFinished before we get here, but we aquire mx before the producer does.
return state.load(std::memory_order_acquire) == SSBState::kFinished;
@@ -753,37 +754,86 @@ public:
}
/**
+ * Returns when the future isReady().
+ *
+ * Throws if the interruptible passed is interrupted (explicitly or via deadline).
+ */
+ void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const {
+ if (_immediate) {
+ return;
+ }
+
+ _shared->wait(interruptible);
+ }
+
+ /**
+ * Returns Status::OK() when the future isReady().
+ *
+ * Returns a non-okay status if the interruptible is interrupted.
+ */
+ Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const
+ noexcept {
+ if (_immediate) {
+ return Status::OK();
+ }
+
+ try {
+ _shared->wait(interruptible);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
+ return Status::OK();
+ }
+
+ /**
* Gets the value out of this Future, blocking until it is ready.
*
* get() methods throw on error, while getNoThrow() returns a !OK status.
*
* These methods can be called multiple times, except for the rvalue overloads.
+ *
+ * Note: It is impossible to differentiate interruptible interruption from an error propagating
+ * down the future chain with these methods. If you need to distinguish the two cases, call
+ * wait() first.
*/
- T get() && {
- return std::move(getImpl());
+ T get(Interruptible* interruptible = Interruptible::notInterruptible()) && {
+ return std::move(getImpl(interruptible));
}
- T& get() & {
- return getImpl();
+ T& get(Interruptible* interruptible = Interruptible::notInterruptible()) & {
+ return getImpl(interruptible);
}
- const T& get() const& {
- return const_cast<Future*>(this)->getImpl();
+ const T& get(Interruptible* interruptible = Interruptible::notInterruptible()) const& {
+ return const_cast<Future*>(this)->getImpl(interruptible);
}
- StatusWith<T> getNoThrow() && noexcept {
+ StatusWith<T> getNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) &&
+ noexcept {
if (_immediate) {
return std::move(*_immediate);
}
- _shared->wait();
+ try {
+ _shared->wait(interruptible);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
if (!_shared->status.isOK())
return std::move(_shared->status);
return std::move(*_shared->data);
}
- StatusWith<T> getNoThrow() const& noexcept {
+ StatusWith<T> getNoThrow(
+ Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept {
if (_immediate) {
return *_immediate;
}
- _shared->wait();
+ try {
+ _shared->wait(interruptible);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
if (!_shared->status.isOK())
return _shared->status;
return *_shared->data;
@@ -1092,12 +1142,12 @@ private:
friend class Future;
friend class Promise<T>;
- T& getImpl() {
+ T& getImpl(Interruptible* interruptible) {
if (_immediate) {
return *_immediate;
}
- _shared->wait();
+ _shared->wait(interruptible);
uassertStatusOK(_shared->status);
return *(_shared->data);
}
@@ -1244,12 +1294,22 @@ public:
return _inner.isReady();
}
- void get() const {
- _inner.get();
+ void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const {
+ _inner.wait(interruptible);
+ }
+
+ Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const
+ noexcept {
+ return _inner.waitNoThrow(interruptible);
+ }
+
+ void get(Interruptible* interruptible = Interruptible::notInterruptible()) const {
+ _inner.get(interruptible);
}
- Status getNoThrow() const noexcept {
- return _inner.getNoThrow().getStatus();
+ Status getNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const
+ noexcept {
+ return _inner.getNoThrow(interruptible).getStatus();
}
template <typename Func> // Status -> void
diff --git a/src/mongo/util/future_test_future_int.cpp b/src/mongo/util/future_test_future_int.cpp
index 803b0992500..6707a310161 100644
--- a/src/mongo/util/future_test_future_int.cpp
+++ b/src/mongo/util/future_test_future_int.cpp
@@ -165,6 +165,36 @@ TEST(Future, Fail_isReady) {
});
}
+TEST(Future, Success_wait) {
+ FUTURE_SUCCESS_TEST([] { return 1; },
+ [](Future<int>&& fut) {
+ fut.wait();
+ ASSERT_EQ(fut.get(), 1);
+ });
+}
+
+TEST(Future, Success_waitNoThrow) {
+ FUTURE_SUCCESS_TEST([] { return 1; },
+ [](Future<int>&& fut) {
+ ASSERT_OK(fut.waitNoThrow());
+ ASSERT_EQ(fut.get(), 1);
+ });
+}
+
+TEST(Future, Fail_wait) {
+ FUTURE_FAIL_TEST<int>([](Future<int>&& fut) {
+ fut.wait();
+ ASSERT_THROWS_failStatus(fut.get());
+ });
+}
+
+TEST(Future, Fail_waitNoThrow) {
+ FUTURE_FAIL_TEST<int>([](Future<int>&& fut) {
+ ASSERT_OK(fut.waitNoThrow());
+ ASSERT_THROWS_failStatus(fut.get());
+ });
+}
+
TEST(Future, isReady_TSAN_OK) {
bool done = false;
auto fut = async([&] {
diff --git a/src/mongo/util/interruptible.h b/src/mongo/util/interruptible.h
new file mode 100644
index 00000000000..e47a34ba29c
--- /dev/null
+++ b/src/mongo/util/interruptible.h
@@ -0,0 +1,452 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/time_support.h"
+#include "mongo/util/waitable.h"
+
+namespace mongo {
+
+/**
+ * A type which can be used to wait on condition variables with a level triggered one-way interrupt.
+ * I.e. after the interrupt is triggered (via some non-public api call) subsequent calls to
+ * waitForConditionXXX will fail. Interrupts must unblock all callers of waitForConditionXXX.
+ */
+class Interruptible {
+protected:
+ struct DeadlineState {
+ Date_t deadline;
+ ErrorCodes::Error error;
+ bool hasArtificialDeadline;
+ };
+
+ struct IgnoreInterruptsState {
+ bool ignoreInterrupts;
+ DeadlineState deadline;
+ };
+
+ /**
+ * A deadline guard provides a subsidiary deadline to the parent.
+ */
+ class DeadlineGuard {
+ public:
+ DeadlineGuard(const DeadlineGuard&) = delete;
+ DeadlineGuard& operator=(const DeadlineGuard&) = delete;
+
+ DeadlineGuard(DeadlineGuard&& other)
+ : _interruptible(other._interruptible), _oldDeadline(other._oldDeadline) {
+ other._interruptible = nullptr;
+ }
+
+ DeadlineGuard& operator=(DeadlineGuard&& other) = delete;
+
+ ~DeadlineGuard() {
+ if (_interruptible) {
+ _interruptible->popArtificialDeadline(_oldDeadline);
+ }
+ }
+
+ private:
+ friend Interruptible;
+
+ explicit DeadlineGuard(Interruptible& interruptible,
+ Date_t newDeadline,
+ ErrorCodes::Error error)
+ : _interruptible(&interruptible),
+ _oldDeadline(_interruptible->pushArtificialDeadline(newDeadline, error)) {}
+
+ Interruptible* _interruptible;
+ DeadlineState _oldDeadline;
+ };
+
+ DeadlineGuard makeDeadlineGuard(Date_t deadline, ErrorCodes::Error error) {
+ return DeadlineGuard(*this, deadline, error);
+ }
+
+ /**
+ * An interruption guard provides a region where interruption is ignored.
+ *
+ * Note that this causes the deadline to be reset to Date_t::max(), but that it can also be
+ * subsequently reduced in size after the fact.
+ */
+ class IgnoreInterruptionsGuard {
+ public:
+ IgnoreInterruptionsGuard(const IgnoreInterruptionsGuard&) = delete;
+ IgnoreInterruptionsGuard& operator=(const IgnoreInterruptionsGuard&) = delete;
+
+ IgnoreInterruptionsGuard(IgnoreInterruptionsGuard&& other)
+ : _interruptible(other._interruptible), _oldState(other._oldState) {
+ other._interruptible = nullptr;
+ }
+
+ IgnoreInterruptionsGuard& operator=(IgnoreInterruptionsGuard&&) = delete;
+
+ ~IgnoreInterruptionsGuard() {
+ if (_interruptible) {
+ _interruptible->popIgnoreInterrupts(_oldState);
+ }
+ }
+
+ private:
+ friend Interruptible;
+
+ explicit IgnoreInterruptionsGuard(Interruptible& interruptible)
+ : _interruptible(&interruptible), _oldState(_interruptible->pushIgnoreInterrupts()) {}
+
+ Interruptible* _interruptible;
+ IgnoreInterruptsState _oldState;
+ };
+
+ IgnoreInterruptionsGuard makeIgnoreInterruptionsGuard() {
+ return IgnoreInterruptionsGuard(*this);
+ }
+
+public:
+ /**
+ * Returns a statically allocated instance that cannot be interrupted. Useful as a default
+ * argument to interruptible taking methods.
+ */
+ static Interruptible* notInterruptible();
+
+ /**
+ * Invokes the passed callback with a deadline guard active initialized with the passed
+ * deadline. Additionally handles the dance of try/catching the invocation and checking
+ * checkForInterrupt with the guard inactive (to allow a higher level timeout to override a
+ * lower level one)
+ */
+ template <typename Callback>
+ decltype(auto) runWithDeadline(Date_t deadline, ErrorCodes::Error error, Callback&& cb) {
+ invariant(ErrorCodes::isExceededTimeLimitError(error));
+
+ try {
+ const auto guard = makeDeadlineGuard(deadline, error);
+ return std::forward<Callback>(cb)();
+ } catch (const ExceptionForCat<ErrorCategory::ExceededTimeLimitError>&) {
+ // May throw replacement exception
+ checkForInterrupt();
+ throw;
+ }
+ }
+
+ bool hasDeadline() const {
+ return getDeadline() != Date_t::max();
+ }
+
+ /**
+ * Returns the deadline for this interruptible, or Date_t::max() if there is no deadline.
+ */
+ virtual Date_t getDeadline() const = 0;
+
+ /**
+ * Invokes the passed callback with an interruption guard active. Additionally handles the
+ * dance of try/catching the invocation and checking checkForInterrupt with the guard inactive
+ * (to allow a higher level timeout to override a lower level one, or for top level interruption
+ * to propagate)
+ */
+ template <typename Callback>
+ decltype(auto) runWithoutInterruption(Callback&& cb) {
+ try {
+ const auto guard = makeIgnoreInterruptionsGuard();
+ return std::forward<Callback>(cb)();
+ } catch (const ExceptionForCat<ErrorCategory::ExceededTimeLimitError>&) {
+ // May throw replacement exception
+ checkForInterrupt();
+ throw;
+ }
+ }
+
+ /**
+ * Raises a AssertionException if this operation is in a killed state.
+ */
+ void checkForInterrupt() {
+ uassertStatusOK(checkForInterruptNoAssert());
+ }
+
+ /**
+ * Returns Status::OK() unless this operation is in a killed state.
+ */
+ virtual Status checkForInterruptNoAssert() noexcept = 0;
+
+ /**
+ * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the
+ * deadline on this operation to expire. In the event of interruption or operation deadline
+ * expiration, raises a AssertionException with an error code indicating the interruption type.
+ */
+ void waitForConditionOrInterrupt(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m) {
+ uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m));
+ }
+
+ /**
+ * Waits on condition "cv" for "pred" until "pred" returns true, or this operation
+ * is interrupted or its deadline expires. Throws a DBException for interruption and
+ * deadline expiration.
+ */
+ template <typename Pred>
+ void waitForConditionOrInterrupt(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Pred pred) {
+ while (!pred()) {
+ waitForConditionOrInterrupt(cv, m);
+ }
+ }
+
+ /**
+ * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing
+ * a DBException to report interruption.
+ */
+ Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m) noexcept {
+ auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max());
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+
+ invariant(status.getValue() == stdx::cv_status::no_timeout);
+ return Status::OK();
+ }
+
+ /**
+ * Same as the predicate form of waitForConditionOrInterrupt, except that it returns a not okay
+ * status instead of throwing on interruption.
+ */
+ template <typename Pred>
+ Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Pred pred) noexcept {
+ while (!pred()) {
+ auto status = waitForConditionOrInterruptNoAssert(cv, m);
+
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ return Status::OK();
+ }
+
+ /**
+ * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or
+ * for the operation to be interrupted, or for the operation's own deadline to expire.
+ *
+ * If the operation deadline expires or the operation is interrupted, throws a DBException. If
+ * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
+ * cv_status::no_timeout.
+ */
+ stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) {
+ return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline));
+ }
+
+ /**
+ * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline"
+ * expires, or this operation is interrupted, or this operation's own deadline expires.
+ *
+ *
+ * If the operation deadline expires or the operation is interrupted, throws a DBException. If
+ * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns
+ * cv_status::no_timeout indicating that "pred" finally returned true.
+ */
+ template <typename Pred>
+ bool waitForConditionOrInterruptUntil(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline,
+ Pred pred) {
+ while (!pred()) {
+ if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) {
+ return pred();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Same as the non-predicate form of waitForConditionOrInterruptUntil, but takes a relative
+ * amount of time to wait instead of an absolute time point.
+ */
+ stdx::cv_status waitForConditionOrInterruptFor(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Milliseconds ms) {
+ return uassertStatusOK(
+ waitForConditionOrInterruptNoAssertUntil(cv, m, getExpirationDateForWaitForValue(ms)));
+ }
+
+ /**
+ * Same as the predicate form of waitForConditionOrInterruptUntil, but takes a relative
+ * amount of time to wait instead of an absolute time point.
+ */
+ template <typename Pred>
+ bool waitForConditionOrInterruptFor(stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Milliseconds ms,
+ Pred pred) {
+ while (!pred()) {
+ if (stdx::cv_status::timeout == waitForConditionOrInterruptFor(cv, m, ms)) {
+ return pred();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and
+ * non-ok status indicates the error instead of a DBException.
+ */
+ virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) noexcept = 0;
+
+ /**
+ * Sleeps until "deadline"; throws an exception if the interruptible is interrupted before then.
+ */
+ void sleepUntil(Date_t deadline) {
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ invariant(!waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; }));
+ }
+
+ /**
+ * Sleeps for "duration" ms; throws an exception if the interruptible is interrupted before
+ * then.
+ */
+ void sleepFor(Milliseconds duration) {
+ stdx::mutex m;
+ stdx::condition_variable cv;
+ stdx::unique_lock<stdx::mutex> lk(m);
+ invariant(!waitForConditionOrInterruptFor(cv, lk, duration, [] { return false; }));
+ }
+
+protected:
+ /**
+ * Pushes an ignore interruption critical section into the interruptible. Until an associated
+ * popIgnoreInterrupts is invoked, the interruptible should ignore interruptions related to
+ * explicit interruption or previously set deadlines.
+ *
+ * Note that new deadlines can be set after this is called, which will again introduce the
+ * possibility of interruption.
+ *
+ * Returns state needed to pop interruption.
+ */
+ virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0;
+
+ /**
+ * Pops the ignored interruption critical section introduced by push.
+ */
+ virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0;
+
+ /**
+ * Pushes a subsidiary deadline into the interruptible. Until an associated
+ * popArtificialDeadline is
+ * invoked, the interruptible will fail checkForInterrupt and waitForConditionOrInterrupt calls
+ * with the passed error code if the deadline has passed.
+ *
+ * Note that deadline's higher than the current value are constrained (such that the passed
+ * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed).
+ *
+ * Returns state needed to pop the deadline.
+ */
+ virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0;
+
+ /**
+ * Pops the subsidiary deadline introduced by push.
+ */
+ virtual void popArtificialDeadline(DeadlineState) = 0;
+
+ /**
+ * Returns the equivalent of Date_t::now() + waitFor for the interruptible's clock
+ */
+ virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0;
+
+ class NotInterruptible;
+};
+
+/**
+ * A not interruptible type which can be used as a lightweight default arg for interruptible taking
+ * functions.
+ */
+class Interruptible::NotInterruptible final : public Interruptible {
+ StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& m,
+ Date_t deadline) noexcept override {
+
+ if (deadline == Date_t::max()) {
+ cv.wait(m);
+ return stdx::cv_status::no_timeout;
+ }
+
+ return cv.wait_until(m, deadline.toSystemTimePoint());
+ }
+
+ Date_t getDeadline() const override {
+ return Date_t::max();
+ }
+
+ Status checkForInterruptNoAssert() noexcept override {
+ return Status::OK();
+ }
+
+ // It's invalid to call the deadline or ignore interruption guards on a possibly noop
+ // interruptible.
+ //
+ // The noop interruptible should only be invoked as a default arg at the bottom of the call
+ // stack (with types that won't modify it's invocation)
+ IgnoreInterruptsState pushIgnoreInterrupts() override {
+ MONGO_UNREACHABLE;
+ }
+
+ void popIgnoreInterrupts(IgnoreInterruptsState) override {
+ MONGO_UNREACHABLE;
+ }
+
+ DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void popArtificialDeadline(DeadlineState) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override {
+ return Date_t::now() + waitFor;
+ }
+};
+
+inline Interruptible* Interruptible::notInterruptible() {
+ static NotInterruptible notInterruptible{};
+
+ return &notInterruptible;
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h
index 8f2b17d2265..a2e1a09e9bb 100644
--- a/src/mongo/util/producer_consumer_queue.h
+++ b/src/mongo/util/producer_consumer_queue.h
@@ -38,6 +38,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -56,43 +57,6 @@ struct DefaultCostFunction {
}
};
-// Various helpers to tighten down whether the args getting passed are valid interruption args.
-//
-// Whatever the caller passes in the interruption args, they need to be invocable on one of
-// these helpers. std::is_invocable would do the job in C++17
-constexpr std::false_type areInterruptionArgsHelper(...) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Milliseconds) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Date_t) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(Milliseconds) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(Date_t) {
- return {};
-}
-
-template <typename U, typename... InterruptionArgs>
-constexpr auto areInterruptionArgs(U&& u, InterruptionArgs&&... args) {
- return areInterruptionArgsHelper(std::forward<U>(u), std::forward<InterruptionArgs>(args)...);
-}
-
-constexpr std::true_type areInterruptionArgs() {
- return {};
-}
-
} // namespace producer_consumer_queue_detail
/**
@@ -109,20 +73,9 @@ constexpr std::true_type areInterruptionArgs() {
* multi-consumer - Any number of threads may pop work out of the queue
*
* Interruptibility:
- * All of the blocking methods on this type allow for 6 kinds of interruptibility. The matrix is
- * parameterized by (void|OperationContext*)|(void|Milliseconds|Date_t). These provide different
- * kinds of waiting based on whether the method should be interruptible via opCtx, and then
- * whether they should timeout via deadline or duration
+ * All of the blocking methods on this type take an interruptible.
*
- * A contrived example: pcq.pop(opCtx, Minutes(1)) would be warranted if:
- * - The caller is blocking on a client thread. (opCtx)
- * - The caller needs to act periodically on inactivity. (the duration)
- *
- * Exceptions include:
- * timeouts
- * ErrorCodes::ExceededTimeLimit exceptions
- * opCtx interrupts
- * ErrorCodes::Interrupted exceptions
+ * Exceptions outside the interruptible include:
* closure of queue endpoints
* ErrorCodes::ProducerConsumerQueueEndClosed
* pushes with batches that exceed the max queue size
@@ -165,11 +118,7 @@ public:
// Pushes the passed T into the queue
//
// Leaves T unchanged if an interrupt exception is thrown while waiting for space
- template <
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- void push(T&& t, InterruptionArgs&&... interruptionArgs) {
+ void push(T&& t, Interruptible* interruptible = Interruptible::notInterruptible()) {
_pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
auto cost = _invokeCostFunc(t, lk);
uassert(ErrorCodes::ProducerConsumerQueueBatchTooLarge,
@@ -179,7 +128,7 @@ public:
<< ")",
cost <= _max);
- _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForSpace(lk, cost, interruptible);
_push(lk, std::move(t));
});
}
@@ -195,13 +144,10 @@ public:
//
// Lifecycle methods of T must not throw if you want to use this method, as there's no obvious
// mechanism to see what was and was not pushed if those do throw
- template <
- typename StartIterator,
- typename EndIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- void pushMany(StartIterator start, EndIterator last, InterruptionArgs&&... interruptionArgs) {
+ template <typename StartIterator, typename EndIterator>
+ void pushMany(StartIterator start,
+ EndIterator last,
+ Interruptible* interruptible = Interruptible::notInterruptible()) {
return _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
size_t cost = 0;
for (auto iter = start; iter != last; ++iter) {
@@ -215,7 +161,7 @@ public:
<< ")",
cost <= _max);
- _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForSpace(lk, cost, interruptible);
for (auto iter = start; iter != last; ++iter) {
_push(lk, std::move(*iter));
@@ -232,13 +178,9 @@ public:
}
// Pops one T out of the queue
- template <
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- T pop(InterruptionArgs&&... interruptionArgs) {
+ T pop(Interruptible* interruptible = Interruptible::notInterruptible()) {
return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
- _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForNonEmpty(lk, interruptible);
return _pop(lk);
});
}
@@ -250,14 +192,10 @@ public:
// TODO: add sfinae to check to enforce
//
// Returns the cost value of the items extracted, along with the updated output iterator
- template <
- typename OutputIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- std::pair<size_t, OutputIterator> popMany(OutputIterator iterator,
- InterruptionArgs&&... interruptionArgs) {
- return popManyUpTo(_max, iterator, std::forward<InterruptionArgs>(interruptionArgs)...);
+ template <typename OutputIterator>
+ std::pair<size_t, OutputIterator> popMany(
+ OutputIterator iterator, Interruptible* interruptible = Interruptible::notInterruptible()) {
+ return popManyUpTo(_max, iterator, interruptible);
}
// Waits for at least one item in the queue, then pops items out of the queue until it would
@@ -267,18 +205,15 @@ public:
// TODO: add sfinae to check to enforce
//
// Returns the cost value of the items extracted, along with the updated output iterator
- template <
- typename OutputIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- std::pair<size_t, OutputIterator> popManyUpTo(size_t budget,
- OutputIterator iterator,
- InterruptionArgs&&... interruptionArgs) {
+ template <typename OutputIterator>
+ std::pair<size_t, OutputIterator> popManyUpTo(
+ size_t budget,
+ OutputIterator iterator,
+ Interruptible* interruptible = Interruptible::notInterruptible()) {
return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
size_t cost = 0;
- _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForNonEmpty(lk, interruptible);
while (auto out = _tryPop(lk)) {
cost += _invokeCostFunc(*out, lk);
@@ -449,10 +384,9 @@ private:
return t;
}
- template <typename... InterruptionArgs>
void _waitForSpace(stdx::unique_lock<stdx::mutex>& lk,
size_t cost,
- InterruptionArgs&&... interruptionArgs) {
+ Interruptible* interruptible) {
invariant(!_producerWants);
_producerWants = cost;
@@ -464,12 +398,10 @@ private:
_checkProducerClosed(lk);
return _current + cost <= _max;
},
- std::forward<InterruptionArgs>(interruptionArgs)...);
+ interruptible);
}
- template <typename... InterruptionArgs>
- void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk,
- InterruptionArgs&&... interruptionArgs) {
+ void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, Interruptible* interruptible) {
_consumers++;
const auto guard = MakeGuard([&] { _consumers--; });
@@ -480,64 +412,15 @@ private:
_checkConsumerClosed(lk);
return _queue.size();
},
- std::forward<InterruptionArgs>(interruptionArgs)...);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx) {
- opCtx->waitForConditionOrInterrupt(condvar, lk, pred);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred) {
- condvar.wait(lk, pred);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx,
- Date_t deadline) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- opCtx->waitForConditionOrInterruptUntil(condvar, lk, deadline, pred));
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- Date_t deadline) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- condvar.wait_until(lk, deadline.toSystemTimePoint(), pred));
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx,
- Milliseconds duration) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- opCtx->waitForConditionOrInterruptFor(condvar, lk, duration, pred));
+ interruptible);
}
template <typename Callback>
void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
stdx::condition_variable& condvar,
Callback&& pred,
- Milliseconds duration) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- condvar.wait_for(lk, duration.toSystemDuration(), pred));
+ Interruptible* interruptible) {
+ interruptible->waitForConditionOrInterrupt(condvar, lk, pred);
}
mutable stdx::mutex _mutex;
diff --git a/src/mongo/util/producer_consumer_queue_test.cpp b/src/mongo/util/producer_consumer_queue_test.cpp
index fa7af5e04a9..44161c8e64f 100644
--- a/src/mongo/util/producer_consumer_queue_test.cpp
+++ b/src/mongo/util/producer_consumer_queue_test.cpp
@@ -76,7 +76,8 @@ public:
auto client = _serviceCtx->makeClient(name.toString());
auto opCtx = client->makeOperationContext();
- cb(opCtx.get(), _timeout);
+ opCtx->runWithDeadline(
+ _timeout, ErrorCodes::ExceededTimeLimit, [&] { cb(opCtx.get()); });
});
}
@@ -96,20 +97,6 @@ public:
}
};
-template <typename Timeout>
-class ProducerConsumerQueueTestHelper<Timeout> {
-public:
- ProducerConsumerQueueTestHelper(Timeout timeout) : _timeout(timeout) {}
-
- template <typename Callback>
- stdx::thread runThread(StringData name, Callback&& cb) {
- return stdx::thread([this, name, cb] { cb(_timeout); });
- }
-
-private:
- Timeout _timeout;
-};
-
class ProducerConsumerQueueTest : public unittest::Test {
public:
template <typename Callback>
@@ -127,27 +114,17 @@ public:
const Minutes duration(30);
callback(ProducerConsumerQueueTestHelper<OperationContext>(_serviceCtx.get()));
- callback(ProducerConsumerQueueTestHelper<OperationContext, Milliseconds>(_serviceCtx.get(),
- duration));
callback(ProducerConsumerQueueTestHelper<OperationContext, Date_t>(
_serviceCtx.get(), _serviceCtx->getPreciseClockSource()->now() + duration));
callback(ProducerConsumerQueueTestHelper<>());
- callback(ProducerConsumerQueueTestHelper<Milliseconds>(duration));
- callback(ProducerConsumerQueueTestHelper<Date_t>(
- _serviceCtx->getPreciseClockSource()->now() + duration));
}
template <typename Callback>
void runTimeoutPermutations(Callback&& callback) {
const Milliseconds duration(10);
- callback(ProducerConsumerQueueTestHelper<OperationContext, Milliseconds>(_serviceCtx.get(),
- duration));
callback(ProducerConsumerQueueTestHelper<OperationContext, Date_t>(
_serviceCtx.get(), _serviceCtx->getPreciseClockSource()->now() + duration));
- callback(ProducerConsumerQueueTestHelper<Milliseconds>(duration));
- callback(ProducerConsumerQueueTestHelper<Date_t>(
- _serviceCtx->getPreciseClockSource()->now() + duration));
}
private:
diff --git a/src/mongo/util/waitable.h b/src/mongo/util/waitable.h
new file mode 100644
index 00000000000..21a4ed3ea2a
--- /dev/null
+++ b/src/mongo/util/waitable.h
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/clock_source.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/**
+ * Waitable is a lightweight type that can be used with stdx::condition_variable and can do other
+ * work while the condvar 'waits'.
+ *
+ * It handles this dance by using a special hook that condvar provides to register itself (as a
+ * notifyable, which it inherits from) during calls to wait. Then, rather than actually waiting on
+ * the condvar, it invokes its run/run_until methods.
+ *
+ * The current implementer of Waitable is the transport layer baton type, which performs delayed IO
+ * when it would otherwise block.
+ */
+class Waitable : public Notifyable {
+public:
+ static void wait(Waitable* waitable,
+ ClockSource* clkSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk) {
+ if (waitable) {
+ cv._runWithNotifyable(*waitable, [&]() noexcept {
+ lk.unlock();
+ waitable->run(clkSource);
+ lk.lock();
+ });
+ } else {
+ cv.wait(lk);
+ }
+ }
+
+ template <typename Predicate>
+ static void wait(Waitable* waitable,
+ ClockSource* clkSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ Predicate pred) {
+ while (!pred()) {
+ wait(waitable, clkSource, cv, lk);
+ }
+ }
+
+ static stdx::cv_status wait_until(
+ Waitable* waitable,
+ ClockSource* clkSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ const stdx::chrono::time_point<stdx::chrono::system_clock>& timeout_time) {
+ if (waitable) {
+ auto rval = stdx::cv_status::no_timeout;
+
+ cv._runWithNotifyable(*waitable, [&]() noexcept {
+ lk.unlock();
+ if (waitable->run_until(clkSource, Date_t(timeout_time)) == TimeoutState::Timeout) {
+ rval = stdx::cv_status::timeout;
+ }
+ lk.lock();
+ });
+
+ return rval;
+ } else {
+ return cv.wait_until(lk, timeout_time);
+ }
+ }
+
+ template <typename Predicate>
+ static bool wait_until(Waitable* waitable,
+ ClockSource* clkSource,
+ stdx::condition_variable& cv,
+ stdx::unique_lock<stdx::mutex>& lk,
+ const stdx::chrono::time_point<stdx::chrono::system_clock>& timeout_time,
+ Predicate pred) {
+ while (!pred()) {
+ if (wait_until(waitable, clkSource, cv, lk, timeout_time) == stdx::cv_status::timeout) {
+ return pred();
+ }
+ }
+
+ return true;
+ }
+
+protected:
+ ~Waitable() noexcept {}
+
+ enum class TimeoutState {
+ NoTimeout,
+ Timeout,
+ };
+
+ /**
+ * Run some amount of work. The intention is that this function perform work until it's
+ * possible that the surrounding condvar clause could have finished.
+ *
+ * Note that like regular condvar.wait, this allows implementers the flexibility to possibly
+ * return early.
+ *
+ * We take a clock source here to allow for synthetic timeouts.
+ */
+ virtual void run(ClockSource* clkSource) noexcept = 0;
+
+ /**
+ * Like run, but only until the passed deadline has passed.
+ */
+ virtual TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept = 0;
+};
+
+} // namespace mongo