diff options
25 files changed, 1416 insertions, 505 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index e262fe78f4a..dca2d87d1be 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -36,6 +36,7 @@ env.SConscript( 's', 'scripting', 'shell', + 'stdx', 'tools', 'transport', 'unittest', diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 05ce3036ad4..8ab0dd4f364 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -202,6 +202,7 @@ env.Library( 'database_holder', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/storage/storage_options', ], ) diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 94a5ea312d2..873972a8c30 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -88,7 +88,9 @@ void OperationContext::setDeadlineAndMaxTime(Date_t when, ErrorCodes::Error timeoutError) { invariant(!getClient()->isInDirectClient()); invariant(ErrorCodes::isExceededTimeLimitError(timeoutError)); - uassert(40120, "Illegal attempt to change operation deadline", !hasDeadline()); + uassert(40120, + "Illegal attempt to change operation deadline", + _hasArtificialDeadline || !hasDeadline()); _deadline = when; _maxTime = maxTime; _timeoutError = timeoutError; @@ -165,10 +167,6 @@ Microseconds OperationContext::getRemainingMaxTimeMicros() const { return _maxTime - getElapsedTime(); } -void OperationContext::checkForInterrupt() { - uassertStatusOK(checkForInterruptNoAssert()); -} - namespace { // Helper function for checkForInterrupt fail point. Decides whether the operation currently @@ -190,7 +188,7 @@ bool opShouldFail(Client* client, const BSONObj& failPointInfo) { } // namespace -Status OperationContext::checkForInterruptNoAssert() { +Status OperationContext::checkForInterruptNoAssert() noexcept { // TODO: Remove the MONGO_likely(getClient()) once all operation contexts are constructed with // clients. if (MONGO_likely(getClient() && getServiceContext()) && @@ -199,10 +197,16 @@ Status OperationContext::checkForInterruptNoAssert() { } if (hasDeadlineExpired()) { - markKilled(_timeoutError); + if (!_hasArtificialDeadline) { + markKilled(_timeoutError); + } return Status(_timeoutError, "operation exceeded time limit"); } + if (_ignoreInterrupts) { + return Status::OK(); + } + MONGO_FAIL_POINT_BLOCK(checkForInterruptFail, scopedFailPoint) { if (opShouldFail(getClient(), scopedFailPoint.getData())) { log() << "set pending kill on op " << getOpID() << ", for checkForInterruptFail"; @@ -218,41 +222,6 @@ Status OperationContext::checkForInterruptNoAssert() { return Status::OK(); } -void OperationContext::sleepUntil(Date_t deadline) { - stdx::mutex m; - stdx::condition_variable cv; - stdx::unique_lock<stdx::mutex> lk(m); - invariant(!waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; })); -} - -void OperationContext::sleepFor(Milliseconds duration) { - stdx::mutex m; - stdx::condition_variable cv; - stdx::unique_lock<stdx::mutex> lk(m); - invariant(!waitForConditionOrInterruptFor(cv, lk, duration, [] { return false; })); -} - -void OperationContext::waitForConditionOrInterrupt(stdx::condition_variable& cv, - stdx::unique_lock<stdx::mutex>& m) { - uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m)); -} - -Status OperationContext::waitForConditionOrInterruptNoAssert( - stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m) noexcept { - auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max()); - if (!status.isOK()) { - return status.getStatus(); - } - invariant(status.getValue() == stdx::cv_status::no_timeout); - return status.getStatus(); -} - -stdx::cv_status OperationContext::waitForConditionOrInterruptUntil( - stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) { - - return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline)); -} - // Theory of operation for waitForConditionOrInterruptNoAssertUntil and markKilled: // // An operation indicates to potential killers that it is waiting on a condition variable by setting @@ -312,14 +281,15 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser const auto waitStatus = [&] { if (Date_t::max() == deadline) { - cv.wait(m); + Waitable::wait(_baton.get(), getServiceContext()->getPreciseClockSource(), cv, m); return stdx::cv_status::no_timeout; } - return getServiceContext()->getPreciseClockSource()->waitForConditionUntil(cv, m, deadline); + return getServiceContext()->getPreciseClockSource()->waitForConditionUntil( + cv, m, deadline, _baton.get()); }(); // Continue waiting on cv until no other thread is attempting to kill this one. - cv.wait(m, [this] { + Waitable::wait(_baton.get(), getServiceContext()->getPreciseClockSource(), cv, m, [this] { stdx::lock_guard<Client> clientLock(*getClient()); if (0 == _numKillers) { _waitMutex = nullptr; @@ -338,9 +308,12 @@ StatusWith<stdx::cv_status> OperationContext::waitForConditionOrInterruptNoAsser // is slightly ahead of the FastClock used in checkForInterrupt. In this case, // we treat the operation as though it has exceeded its time limit, just as if the // FastClock and system clock had agreed. - markKilled(_timeoutError); + if (!_hasArtificialDeadline) { + markKilled(_timeoutError); + } return Status(_timeoutError, "operation exceeded time limit"); } + return waitStatus; } @@ -361,11 +334,6 @@ void OperationContext::markKilled(ErrorCodes::Error killCode) { invariant(_waitCV); _waitCV->notify_all(); } - - // If we have a baton, we need to wake it up. The baton itself will check for interruption - if (_baton) { - _baton->schedule([] {}); - } } void OperationContext::setLogicalSessionId(LogicalSessionId lsid) { diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 712a0ab860c..9931a6e1931 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -44,6 +44,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/decorable.h" +#include "mongo/util/interruptible.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" @@ -70,7 +71,7 @@ class UnreplicatedWritesBlock; * (RecoveryUnitState) to reduce complexity and duplication in the storage-engine specific * RecoveryUnit and to allow better invariant checking. */ -class OperationContext : public Decorable<OperationContext> { +class OperationContext : public Interruptible, public Decorable<OperationContext> { MONGO_DISALLOW_COPYING(OperationContext); public: @@ -127,107 +128,9 @@ public: std::unique_ptr<Locker> swapLockState(std::unique_ptr<Locker> locker); /** - * Raises a AssertionException if this operation is in a killed state. - */ - void checkForInterrupt(); - - /** * Returns Status::OK() unless this operation is in a killed state. */ - Status checkForInterruptNoAssert(); - - /** - * Sleeps until "deadline"; throws an exception if the operation is interrupted before then. - */ - void sleepUntil(Date_t deadline); - - /** - * Sleeps for "duration" ms; throws an exception if the operation is interrupted before then. - */ - void sleepFor(Milliseconds duration); - - /** - * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the - * deadline on this operation to expire. In the event of interruption or operation deadline - * expiration, raises a AssertionException with an error code indicating the interruption type. - */ - void waitForConditionOrInterrupt(stdx::condition_variable& cv, - stdx::unique_lock<stdx::mutex>& m); - - /** - * Waits on condition "cv" for "pred" until "pred" returns true, or this operation - * is interrupted or its deadline expires. Throws a DBException for interruption and - * deadline expiration. - */ - template <typename Pred> - void waitForConditionOrInterrupt(stdx::condition_variable& cv, - stdx::unique_lock<stdx::mutex>& m, - Pred pred) { - while (!pred()) { - waitForConditionOrInterrupt(cv, m); - } - } - - /** - * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing - * a DBException to report interruption. - */ - Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, - stdx::unique_lock<stdx::mutex>& m) noexcept; - - /** - * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or - * for the operation to be interrupted, or for the operation's own deadline to expire. - * - * If the operation deadline expires or the operation is interrupted, throws a DBException. If - * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns - * cv_status::no_timeout. - */ - stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv, - stdx::unique_lock<stdx::mutex>& m, - Date_t deadline); - - /** - * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline" - * expires, or this operation is interrupted, or this operation's own deadline expires. - * - * - * If the operation deadline expires or the operation is interrupted, throws a DBException. If - * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns - * cv_status::no_timeout indicating that "pred" finally returned true. - */ - template <typename Pred> - bool waitForConditionOrInterruptUntil(stdx::condition_variable& cv, - stdx::unique_lock<stdx::mutex>& m, - Date_t deadline, - Pred pred) { - while (!pred()) { - if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) { - return pred(); - } - } - return true; - } - - /** - * Same as the predicate form of waitForConditionOrInterruptUntil, but takes a relative - * amount of time to wait instead of an absolute time point. - */ - template <typename Pred> - bool waitForConditionOrInterruptFor(stdx::condition_variable& cv, - stdx::unique_lock<stdx::mutex>& m, - Milliseconds duration, - Pred pred) { - return waitForConditionOrInterruptUntil( - cv, m, getExpirationDateForWaitForValue(duration), pred); - } - - /** - * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and - * non-ok status indicates the error instead of a DBException. - */ - StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( - stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline) noexcept; + Status checkForInterruptNoAssert() noexcept override; /** * Returns the service context under which this operation context runs, or nullptr if there is @@ -353,6 +256,9 @@ public: * without lock by the thread executing on behalf of this operation context. */ ErrorCodes::Error getKillStatus() const { + if (_ignoreInterrupts) { + return ErrorCodes::OK; + } return _killCode.loadRelaxed(); } @@ -397,16 +303,9 @@ public: } /** - * Returns true if this operation has a deadline. - */ - bool hasDeadline() const { - return getDeadline() < Date_t::max(); - } - - /** * Returns the deadline for this operation, or Date_t::max() if there is no deadline. */ - Date_t getDeadline() const { + Date_t getDeadline() const override { return _deadline; } @@ -425,7 +324,54 @@ public: */ Microseconds getRemainingMaxTimeMicros() const; + StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline) noexcept override; + private: + IgnoreInterruptsState pushIgnoreInterrupts() override { + IgnoreInterruptsState iis{_ignoreInterrupts, + {_deadline, _timeoutError, _hasArtificialDeadline}}; + _hasArtificialDeadline = true; + setDeadlineByDate(Date_t::max(), ErrorCodes::ExceededTimeLimit); + _ignoreInterrupts = true; + + return iis; + } + + void popIgnoreInterrupts(IgnoreInterruptsState iis) override { + _ignoreInterrupts = iis.ignoreInterrupts; + + setDeadlineByDate(iis.deadline.deadline, iis.deadline.error); + _hasArtificialDeadline = iis.deadline.hasArtificialDeadline; + + _markKilledIfDeadlineRequires(); + } + + DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) override { + DeadlineState ds{_deadline, _timeoutError, _hasArtificialDeadline}; + + _hasArtificialDeadline = true; + setDeadlineByDate(std::min(_deadline, deadline), error); + + return ds; + } + + void popArtificialDeadline(DeadlineState ds) override { + setDeadlineByDate(ds.deadline, ds.error); + _hasArtificialDeadline = ds.hasArtificialDeadline; + + _markKilledIfDeadlineRequires(); + } + + void _markKilledIfDeadlineRequires() { + if (!_ignoreInterrupts && !_hasArtificialDeadline && hasDeadlineExpired() && + !isKillPending()) { + markKilled(_timeoutError); + } + } + /** * Returns true if this operation has a deadline and it has passed according to the fast clock * on ServiceContext. @@ -447,7 +393,7 @@ private: * Returns the timepoint that is "waitFor" ms after now according to the * ServiceContext's precise clock. */ - Date_t getExpirationDateForWaitForValue(Milliseconds waitFor); + Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override; /** * Set whether or not operations should generate oplog entries. @@ -502,6 +448,8 @@ private: Date_t::max(); // The timepoint at which this operation exceeds its time limit. ErrorCodes::Error _timeoutError = ErrorCodes::ExceededTimeLimit; + bool _ignoreInterrupts = false; + bool _hasArtificialDeadline = false; // Max operation time requested by the user or by the cursor in the case of a getMore with no // user-specified maxTime. This is tracked with microsecond granularity for the purpose of diff --git a/src/mongo/db/operation_context_test.cpp b/src/mongo/db/operation_context_test.cpp index b779102b702..02891cfa151 100644 --- a/src/mongo/db/operation_context_test.cpp +++ b/src/mongo/db/operation_context_test.cpp @@ -200,6 +200,25 @@ TEST(OperationContextTest, OpCtxGroup) { } } +TEST(OperationContextTest, IgnoreInterruptsWorks) { + auto serviceCtx = ServiceContext::make(); + auto client = serviceCtx->makeClient("OperationContextTest"); + auto opCtx = client->makeOperationContext(); + + opCtx->markKilled(ErrorCodes::BadValue); + ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::BadValue); + ASSERT_EQUALS(opCtx->getKillStatus(), ErrorCodes::BadValue); + + opCtx->runWithoutInterruption([&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + ASSERT_OK(opCtx->getKillStatus()); + }); + + ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::BadValue); + + ASSERT_EQUALS(opCtx->getKillStatus(), ErrorCodes::BadValue); +} + class OperationDeadlineTests : public unittest::Test { public: void setUp() { @@ -210,6 +229,13 @@ public: client = service->makeClient("OperationDeadlineTest"); } + void checkForInterruptForTimeout(OperationContext* opCtx) { + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + opCtx->waitForConditionOrInterrupt(cv, lk); + } + const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>(); ServiceContext::UniqueServiceContext service; ServiceContext::UniqueClient client; @@ -302,6 +328,209 @@ TEST_F(OperationDeadlineTests, WaitForMaxTimeExpiredCVWithWaitUntilSet) { .getStatus()); } +TEST_F(OperationDeadlineTests, NestedTimeoutsTimeoutInOrder) { + auto opCtx = client->makeOperationContext(); + + opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(500), ErrorCodes::MaxTimeMSExpired); + + bool reachedA = false; + bool reachedB = false; + bool reachedC = false; + + try { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + + try { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(50), ErrorCodes::ExceededTimeLimit, [&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + try { + opCtx->runWithDeadline(mockClock->now() + Milliseconds(10), + ErrorCodes::ExceededTimeLimit, + [&] { + ASSERT_OK( + opCtx->checkForInterruptNoAssert()); + ASSERT_OK(opCtx->getKillStatus()); + mockClock->advance(Milliseconds(20)); + checkForInterruptForTimeout(opCtx.get()); + ASSERT(false); + }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) { + opCtx->checkForInterrupt(); + ASSERT_OK(opCtx->getKillStatus()); + mockClock->advance(Milliseconds(50)); + reachedA = true; + } + + opCtx->checkForInterrupt(); + }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) { + opCtx->checkForInterrupt(); + ASSERT_OK(opCtx->getKillStatus()); + mockClock->advance(Milliseconds(50)); + reachedB = true; + } + + opCtx->checkForInterrupt(); + }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) { + reachedC = true; + ASSERT_OK(opCtx->getKillStatus()); + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + } + + ASSERT(reachedA); + ASSERT(reachedB); + ASSERT(reachedC); + + ASSERT_OK(opCtx->getKillStatus()); + + mockClock->advance(Seconds(1)); + + ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), DBException, ErrorCodes::MaxTimeMSExpired); +} + +TEST_F(OperationDeadlineTests, NestedTimeoutsThatViolateMaxTime) { + auto opCtx = client->makeOperationContext(); + + opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + + bool reachedA = false; + bool reachedB = false; + + try { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + try { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + ASSERT_OK(opCtx->getKillStatus()); + mockClock->advance(Milliseconds(50)); + opCtx->checkForInterrupt(); + }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) { + reachedA = true; + } + + opCtx->checkForInterrupt(); + }); + } catch (const ExceptionFor<ErrorCodes::MaxTimeMSExpired>&) { + reachedB = true; + } + + ASSERT(reachedA); + ASSERT(reachedB); +} + +TEST_F(OperationDeadlineTests, NestedNonMaxTimeMSTimeoutsThatAreLargerAreIgnored) { + auto opCtx = client->makeOperationContext(); + + bool reachedA = false; + bool reachedB = false; + + try { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(10), ErrorCodes::ExceededTimeLimit, [&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + try { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + ASSERT_OK(opCtx->getKillStatus()); + mockClock->advance(Milliseconds(50)); + opCtx->checkForInterrupt(); + }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) { + reachedA = true; + } + + opCtx->checkForInterrupt(); + }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) { + reachedB = true; + } + + ASSERT(reachedA); + ASSERT(reachedB); +} + +TEST_F(OperationDeadlineTests, DeadlineAfterIgnoreInterruptsReopens) { + auto opCtx = client->makeOperationContext(); + + bool reachedA = false; + bool reachedB = false; + bool reachedC = false; + + try { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(500), ErrorCodes::ExceededTimeLimit, [&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + + opCtx->runWithoutInterruption([&] { + try { + opCtx->runWithDeadline( + mockClock->now() + Seconds(1), ErrorCodes::ExceededTimeLimit, [&] { + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + ASSERT_OK(opCtx->getKillStatus()); + mockClock->advance(Milliseconds(750)); + ASSERT_OK(opCtx->checkForInterruptNoAssert()); + mockClock->advance(Milliseconds(500)); + reachedA = true; + opCtx->checkForInterrupt(); + }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>&) { + opCtx->checkForInterrupt(); + reachedB = true; + } + }); + + opCtx->checkForInterrupt(); + }); + } catch (const ExceptionFor<ErrorCodes::ExceededTimeLimit>& ex) { + reachedC = true; + } + + ASSERT(reachedA); + ASSERT(reachedB); + ASSERT(reachedC); +} + +TEST_F(OperationDeadlineTests, DeadlineAfterRunWithoutInterruptSeesViolatedMaxMS) { + auto opCtx = client->makeOperationContext(); + + opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(100), ErrorCodes::MaxTimeMSExpired); + + ASSERT_THROWS_CODE(opCtx->runWithoutInterruption([&] { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(200), ErrorCodes::ExceededTimeLimit, [&] { + mockClock->advance(Milliseconds(300)); + opCtx->checkForInterrupt(); + }); + }), + DBException, + ErrorCodes::MaxTimeMSExpired); +} + +TEST_F(OperationDeadlineTests, DeadlineAfterRunWithoutInterruptDoesntSeeUnviolatedMaxMS) { + auto opCtx = client->makeOperationContext(); + + opCtx->setDeadlineByDate(mockClock->now() + Milliseconds(200), ErrorCodes::MaxTimeMSExpired); + + ASSERT_THROWS_CODE(opCtx->runWithoutInterruption([&] { + opCtx->runWithDeadline( + mockClock->now() + Milliseconds(100), ErrorCodes::ExceededTimeLimit, [&] { + mockClock->advance(Milliseconds(150)); + opCtx->checkForInterrupt(); + }); + }), + DBException, + ErrorCodes::ExceededTimeLimit); +} + TEST_F(OperationDeadlineTests, WaitForKilledOpCV) { auto opCtx = client->makeOperationContext(); opCtx->markKilled(); diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp index 5e79798c6aa..b74e424fe1f 100644 --- a/src/mongo/executor/connection_pool_tl.cpp +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -96,7 +96,7 @@ void TLTimer::setTimeout(Milliseconds timeoutVal, TimeoutCallback cb) { return; } - _timer->waitFor(timeoutVal).getAsync([cb = std::move(cb)](Status status) { + _timer->waitUntil(_reactor->now() + timeoutVal).getAsync([cb = std::move(cb)](Status status) { // If we get canceled, then we don't worry about the timeout anymore if (status == ErrorCodes::CallbackCanceled) { return; diff --git a/src/mongo/rpc/SConscript b/src/mongo/rpc/SConscript index f7883dbc9e2..f5892449616 100644 --- a/src/mongo/rpc/SConscript +++ b/src/mongo/rpc/SConscript @@ -174,6 +174,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/s/is_mongos', '$BUILD_DIR/mongo/transport/transport_layer_common', "$BUILD_DIR/mongo/util/concurrency/spin_lock", diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index aef8ac9d3b7..4ef49669505 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -99,7 +99,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { // Otherwise, wait for some response to be received. if (_interruptStatus.isOK()) { try { - _makeProgress(_opCtx); + _makeProgress(); } catch (const AssertionException& ex) { // If the operation is interrupted, we cancel outstanding requests and switch to // waiting for the (canceled) callbacks to finish without checking for interrupts. @@ -108,7 +108,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { continue; } } else { - _makeProgress(nullptr); + _opCtx->runWithoutInterruption([&] { _makeProgress(); }); } } return *readyResponse; @@ -139,11 +139,6 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { _scheduleRequests(); } - // If we have baton requests, we want to process those before proceeding - if (_batonRequests) { - return boost::none; - } - // Check if any remote is ready. invariant(!_remotes.empty()); for (auto& remote : _remotes) { @@ -215,11 +210,6 @@ void AsyncRequestsSender::_scheduleRequests() { if (!scheduleStatus.isOK()) { remote.swResponse = std::move(scheduleStatus); - if (_baton) { - _batonRequests++; - _baton->schedule([this] { _batonRequests--; }); - } - // Push a noop response to the queue to indicate that a remote is ready for // re-processing due to failure. _responseQueue.push(boost::none); @@ -245,11 +235,6 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { auto callbackStatus = _executor->scheduleRemoteCommand( request, [remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - if (_baton) { - _batonRequests++; - _baton->schedule([this] { _batonRequests--; }); - } - _responseQueue.push(Job{cbData, remoteIndex}); }, _baton); @@ -262,22 +247,8 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { } // Passing opCtx means you'd like to opt into opCtx interruption. During cleanup we actually don't. -void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) { - invariant(!opCtx || opCtx == _opCtx); - - boost::optional<Job> job; - - if (_baton) { - // If we're using a baton, we peek the queue, and block on the baton if it's empty - if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) { - job = std::move(*tryJob); - } else { - _baton->run(opCtx, boost::none); - } - } else { - // Otherwise we block on the queue - job = opCtx ? _responseQueue.pop(opCtx) : _responseQueue.pop(); - } +void AsyncRequestsSender::_makeProgress() { + auto job = _responseQueue.pop(_opCtx); if (!job) { return; @@ -350,22 +321,22 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( // progress on previous requests. auto pf = makePromiseFuture<HostAndPort>(); - ars->_batonRequests++; stdx::thread bgChecker([&] { pf.promise.setWith( [&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); }); - - ars->_baton->schedule([ars] { ars->_batonRequests--; }); }); - const auto guard = MakeGuard([&] { bgChecker.join(); }); - - while (!pf.future.isReady()) { - if (!ars->_baton->run(nullptr, deadline)) { - break; - } - } - - return pf.future.getNoThrow(); + const auto threadGuard = MakeGuard([&] { bgChecker.join(); }); + + // We ignore interrupts here because we want to spin the baton for the full duration of the + // findHostWithMaxWait. We set the time limit (although the bg checker should fulfill our + // promise) to sync up with the findHostWithMaxWait. + // + // TODO clean this up after SERVER-35689 when we can do async targeting. + return ars->_opCtx->runWithoutInterruption([&] { + return ars->_opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { + return pf.future.getNoThrow(ars->_opCtx); + }); + }); }(); if (!findHostStatus.isOK()) { diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index dfb053c977e..fe7d5e20cb7 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -39,6 +39,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" +#include "mongo/util/interruptible.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/producer_consumer_queue.h" #include "mongo/util/time_support.h" @@ -281,17 +282,14 @@ private: /** * Waits for forward progress in gathering responses from a remote. * - * If the opCtx is non-null, use it while waiting on completion. - * * Stores the response or error in the remote. */ - void _makeProgress(OperationContext* opCtx); + void _makeProgress(); OperationContext* _opCtx; executor::TaskExecutor* _executor; BatonDetacher _baton; - size_t _batonRequests = 0; // The metadata obj to pass along with the command remote. Used to indicate that the command is // ok to run on secondaries. diff --git a/src/mongo/stdx/SConscript b/src/mongo/stdx/SConscript new file mode 100644 index 00000000000..a51aa3423f4 --- /dev/null +++ b/src/mongo/stdx/SConscript @@ -0,0 +1,15 @@ +# -*- mode: python -*- + +Import("env") + +env = env.Clone() + +env.Benchmark( + target='condition_variable_bm', + source=[ + 'condition_variable_bm.cpp', + ], + LIBDEPS=[ + ], +) + diff --git a/src/mongo/stdx/condition_variable.h b/src/mongo/stdx/condition_variable.h index 40ca92ba8f6..6de5dbdb66d 100644 --- a/src/mongo/stdx/condition_variable.h +++ b/src/mongo/stdx/condition_variable.h @@ -28,15 +28,151 @@ #pragma once +#include <atomic> #include <condition_variable> +#include <list> + +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/mutex.h" namespace mongo { + +/** + * Notifyable is a slim type meant to allow integration of special kinds of waiters for + * stdx::condition_variable. Specifially, the notify() on this type will be called directly from + * stdx::condition_varibale::notify_(one|all). + * + * See Waitable for the stdx::condition_variable integration. + */ +class Notifyable { +public: + virtual void notify() noexcept = 0; + +protected: + ~Notifyable() = default; +}; + +class Waitable; + namespace stdx { -using condition_variable = ::std::condition_variable; // NOLINT using condition_variable_any = ::std::condition_variable_any; // NOLINT using cv_status = ::std::cv_status; // NOLINT using ::std::notify_all_at_thread_exit; // NOLINT +/** + * We wrap std::condition_variable to allow us to register Notifyables which can "wait" on the + * condvar without actually waiting on the std::condition_variable. This allows us to possibly do + * productive work in those types, rather than sleeping in the os. + */ +class condition_variable : private std::condition_variable { // NOLINT +public: + using std::condition_variable::condition_variable; // NOLINT + + void notify_one() noexcept { + if (_notifyableCount.load()) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_notifyNextNotifyable(lk)) { + return; + } + } + + std::condition_variable::notify_one(); // NOLINT + } + + void notify_all() noexcept { + if (_notifyableCount.load()) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + while (_notifyNextNotifyable(lk)) { + } + } + + std::condition_variable::notify_all(); // NOLINT + } + + using std::condition_variable::wait; // NOLINT + using std::condition_variable::wait_for; // NOLINT + using std::condition_variable::wait_until; // NOLINT + using std::condition_variable::native_handle; // NOLINT + +private: + friend class ::mongo::Waitable; + + /** + * Runs the callback with the Notifyable registered on the condvar. This ensures that for the + * duration of the callback execution, a notification on the condvar will trigger a notify() to + * the Notifyable. + * + * The scheme here is that list entries are erased from the notification list when notified (so + * that they don't eat multiple notify_one's). We detect that condition by noting that our + * Notifyable* has been overwritten with null (in which case we should avoid a double erase). + * + * The method is private, and accessed via friendship in Waitable. + */ + template <typename Callback> + void _runWithNotifyable(Notifyable& notifyable, Callback&& cb) noexcept { + static_assert(noexcept(std::forward<Callback>(cb)()), + "Only noexcept functions may be invoked with _runWithNotifyable"); + + // We use this local pad to receive notification that we were notified, rather than timing + // out organically. + // + // Note that n must be guarded by _mutex after its insertion in _notifyables (so that we can + // detect notification in a thread-safe manner). + Notifyable* n = ¬ifyable; + + auto iter = [&] { + stdx::lock_guard<stdx::mutex> localMutex(_mutex); + _notifyableCount.addAndFetch(1); + return _notifyables.insert(_notifyables.end(), &n); + }(); + + std::forward<Callback>(cb)(); + + stdx::lock_guard<stdx::mutex> localMutex(_mutex); + // if n is null, we were notified, and erased in _notifyNextNotifyable + if (n) { + _notifyableCount.subtractAndFetch(1); + _notifyables.erase(iter); + } + } + + /** + * Notifies the next notifyable. + * + * Returns true if there was a notifyable to be notified. + * + * Note that as part of notifying, we zero out pointers allocated on the stack by + * _runWithNotifyable callers. This is safe because we hold _mutex while we do so, and our + * erasure communicates that those waiters need not clear themselves from the notification list + * on wakeup. + */ + bool _notifyNextNotifyable(const stdx::lock_guard<stdx::mutex>&) noexcept { + auto iter = _notifyables.begin(); + if (iter == _notifyables.end()) { + return false; + } + + _notifyableCount.subtractAndFetch(1); + + (**iter)->notify(); + + // null out iter here, so that the notifyable won't remove itself from the list when it + // wakes up + **iter = nullptr; + + _notifyables.erase(iter); + + return true; + } + + AtomicUInt64 _notifyableCount; + + stdx::mutex _mutex; + std::list<Notifyable**> _notifyables; +}; + } // namespace stdx } // namespace mongo diff --git a/src/mongo/stdx/condition_variable_bm.cpp b/src/mongo/stdx/condition_variable_bm.cpp new file mode 100644 index 00000000000..7e020d60c0e --- /dev/null +++ b/src/mongo/stdx/condition_variable_bm.cpp @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <benchmark/benchmark.h> + +#include "mongo/bson/inline_decls.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" + +namespace mongo { + +void BM_stdNotifyOne(benchmark::State& state) { + std::condition_variable cv; // NOLINT + + for (auto _ : state) { + benchmark::ClobberMemory(); + cv.notify_one(); + } +} + +void BM_stdxNotifyOneNoNotifyables(benchmark::State& state) { + stdx::condition_variable cv; + + for (auto _ : state) { + benchmark::ClobberMemory(); + cv.notify_one(); + } +} + +volatile bool alwaysTrue = true; + +void BM_stdWaitWithTruePredicate(benchmark::State& state) { + std::condition_variable cv; // NOLINT + stdx::mutex mutex; + stdx::unique_lock<stdx::mutex> lk(mutex); + + for (auto _ : state) { + benchmark::ClobberMemory(); + cv.wait(lk, [&] { return alwaysTrue; }); + } +} + +void BM_stdxWaitWithTruePredicate(benchmark::State& state) { + stdx::condition_variable cv; + stdx::mutex mutex; + stdx::unique_lock<stdx::mutex> lk(mutex); + + for (auto _ : state) { + benchmark::ClobberMemory(); + cv.wait(lk, [&] { return alwaysTrue; }); + } +} + +BENCHMARK(BM_stdNotifyOne); +BENCHMARK(BM_stdWaitWithTruePredicate); +BENCHMARK(BM_stdxNotifyOneNoNotifyables); +BENCHMARK(BM_stdxWaitWithTruePredicate); + +} // namespace mongo diff --git a/src/mongo/transport/baton.h b/src/mongo/transport/baton.h index f854dfeeaf4..33409cd20b8 100644 --- a/src/mongo/transport/baton.h +++ b/src/mongo/transport/baton.h @@ -34,6 +34,7 @@ #include "mongo/util/functional.h" #include "mongo/util/future.h" #include "mongo/util/time_support.h" +#include "mongo/util/waitable.h" namespace mongo { @@ -55,7 +56,7 @@ class ReactorTimer; * context switches, as well as improving the readability of stack traces by grounding async * execution on top of a regular client call stack. */ -class Baton { +class Baton : public Waitable { public: virtual ~Baton() = default; @@ -94,11 +95,6 @@ public: virtual Future<void> addSession(Session& session, Type type) = 0; /** - * Adds a timer, returning a future which activates after a duration. - */ - virtual Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) = 0; - - /** * Adds a timer, returning a future which activates after a deadline. */ virtual Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) = 0; @@ -116,14 +112,6 @@ public: * Returns true if the timer was in the baton to be cancelled. */ virtual bool cancelTimer(const ReactorTimer& timer) = 0; - - /** - * Runs the baton. This blocks, waiting for networking events or timeouts, and fulfills - * promises and executes scheduled work. - * - * Returns false if the optional deadline has passed - */ - virtual bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) = 0; }; using BatonHandle = std::shared_ptr<Baton>; diff --git a/src/mongo/transport/baton_asio_linux.h b/src/mongo/transport/baton_asio_linux.h index 40829bc43e7..8f18707c12a 100644 --- a/src/mongo/transport/baton_asio_linux.h +++ b/src/mongo/transport/baton_asio_linux.h @@ -54,6 +54,23 @@ namespace transport { * We implement our networking reactor on top of poll + eventfd for wakeups */ class TransportLayerASIO::BatonASIO : public Baton { + /** + * We use this internal reactor timer to exit run_until calls (by forcing an early timeout for + * ::poll). + * + * Its methods are all unreachable because we never actually use its timer-ness (we just need + * its address for baton book keeping). + */ + class InternalReactorTimer : public ReactorTimer { + public: + void cancel(const BatonHandle& baton = nullptr) override { + MONGO_UNREACHABLE; + } + + Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) override { + MONGO_UNREACHABLE; + } + }; /** * RAII type that wraps up an eventfd and reading/writing to it. We don't actually need the @@ -138,10 +155,6 @@ public: return std::move(pf.future); } - Future<void> waitFor(const ReactorTimer& timer, Milliseconds timeout) override { - return waitUntil(timer, Date_t::now() + timeout); - } - Future<void> waitUntil(const ReactorTimer& timer, Date_t expiration) override { auto pf = makePromiseFuture<void>(); _safeExecute([ timerPtr = &timer, expiration, sp = pf.promise.share(), this ] { @@ -202,7 +215,33 @@ public: } } - bool run(OperationContext* opCtx, boost::optional<Date_t> deadline) override { + void notify() noexcept override { + schedule([] {}); + } + + /** + * We synthesize a run_until by creating a synthetic timer which we use to exit run early (we + * create a regular waitUntil baton event off the timer, with the passed deadline). + */ + Waitable::TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept override { + InternalReactorTimer irt; + auto future = waitUntil(irt, deadline); + + run(clkSource); + + // If the future is ready our timer has fired, in which case we timed out + if (future.isReady()) { + future.get(); + + return Waitable::TimeoutState::Timeout; + } else { + cancelTimer(irt); + + return Waitable::TimeoutState::NoTimeout; + } + } + + void run(ClockSource* clkSource) noexcept override { std::vector<SharedPromise<void>> toFulfill; // We'll fulfill promises and run jobs on the way out, ensuring we don't hold any locks @@ -227,44 +266,18 @@ public: } }); - // Note that it's important to check for interrupt without the lock, because markKilled - // calls schedule, which will deadlock if we're holding the lock when calling this. - if (opCtx) { - opCtx->checkForInterrupt(); - } - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (opCtx) { - invariant(opCtx == _opCtx); - } - - auto now = Date_t::now(); - - // If our deadline has passed, return that we've already failed - if (deadline && *deadline <= now) { - return false; - } // If anything was scheduled, run it now. No need to poll if (_scheduled.size()) { - return true; + return; } - boost::optional<Milliseconds> timeout; + boost::optional<Date_t> deadline; // If we have a timer, poll no longer than that if (_timers.size()) { - timeout = _timers.begin()->expiration - now; - } - - if (deadline) { - auto deadlineTimeout = *deadline - now; - - // If we didn't have a timer with a deadline, or our deadline is sooner than that - // timer - if (!timeout || (deadlineTimeout < *timeout)) { - timeout = deadlineTimeout; - } + deadline = _timers.begin()->expiration; } std::vector<decltype(_sessions)::iterator> sessions; @@ -282,13 +295,22 @@ public: sessions.push_back(iter); } + auto now = clkSource->now(); + int rval = 0; // If we don't have a timeout, or we have a timeout that's unexpired, run poll. - if (!timeout || (*timeout > Milliseconds(0))) { + if (!deadline || (*deadline > now)) { + if (deadline && !clkSource->tracksSystemClock()) { + invariant(clkSource->setAlarm(*deadline, [this] { notify(); })); + + deadline.reset(); + } + _inPoll = true; lk.unlock(); - rval = - ::poll(pollSet.data(), pollSet.size(), timeout.value_or(Milliseconds(-1)).count()); + rval = ::poll(pollSet.data(), + pollSet.size(), + deadline ? Milliseconds(*deadline - now).count() : -1); const auto pollGuard = MakeGuard([&] { lk.lock(); @@ -300,20 +322,9 @@ public: severe() << "error in poll: " << errnoWithDescription(errno); fassertFailed(50834); } - - // Note that it's important to check for interrupt without the lock, because markKilled - // calls schedule, which will deadlock if we're holding the lock when calling this. - if (opCtx) { - opCtx->checkForInterrupt(); - } } - now = Date_t::now(); - - // If our deadline passed while in poll, we've failed - if (deadline && now > *deadline) { - return false; - } + now = clkSource->now(); // Fire expired timers for (auto iter = _timers.begin(); iter != _timers.end() && iter->expiration < now;) { @@ -348,7 +359,7 @@ public: invariant(remaining == 0); } - return true; + return; } private: diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index b3ee5aa92a7..a5a81005a73 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -134,12 +134,11 @@ public: virtual void cancel(const BatonHandle& baton = nullptr) = 0; /* - * Returns a future that will be filled with Status::OK after the timeout has ellapsed. + * Returns a future that will be filled with Status::OK after the deadline has passed. * * Calling this implicitly calls cancel(). */ - virtual Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) = 0; - virtual Future<void> waitUntil(Date_t timeout, const BatonHandle& baton = nullptr) = 0; + virtual Future<void> waitUntil(Date_t deadline, const BatonHandle& baton = nullptr) = 0; }; class Reactor { diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index ad87ea0ed38..b3522f537c1 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -86,13 +86,6 @@ public: _timer->cancel(); } - Future<void> waitFor(Milliseconds timeout, const BatonHandle& baton = nullptr) override { - if (baton) { - return _asyncWait([&] { return baton->waitFor(*this, timeout); }, baton); - } else { - return _asyncWait([&] { _timer->expires_after(timeout.toSystemDuration()); }); - } - } Future<void> waitUntil(Date_t expiration, const BatonHandle& baton = nullptr) override { if (baton) { @@ -538,25 +531,26 @@ Future<SessionHandle> TransportLayerASIO::asyncConnect(HostAndPort peer, } if (timeout > Milliseconds{0} && timeout < Milliseconds::max()) { - connector->timeoutTimer.waitFor(timeout).getAsync([connector](Status status) { - if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) { - return; - } + connector->timeoutTimer.waitUntil(reactor->now() + timeout) + .getAsync([connector](Status status) { + if (status == ErrorCodes::CallbackCanceled || connector->done.swap(true)) { + return; + } - connector->promise.setError( - makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"}, - connector->peer, - connector->resolvedEndpoint)); + connector->promise.setError( + makeConnectError({ErrorCodes::NetworkTimeout, "Connecting timed out"}, + connector->peer, + connector->resolvedEndpoint)); - std::error_code ec; - stdx::lock_guard<stdx::mutex> lk(connector->mutex); - connector->resolver.cancel(); - if (connector->session) { - connector->session->end(); - } else { - connector->socket.cancel(ec); - } - }); + std::error_code ec; + stdx::lock_guard<stdx::mutex> lk(connector->mutex); + connector->resolver.cancel(); + if (connector->session) { + connector->session->end(); + } else { + connector->socket.cancel(ec); + } + }); } connector->resolver.asyncResolve(connector->peer, _listenerOptions.enableIPv6) diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp index 0e595449d54..d43885da603 100644 --- a/src/mongo/transport/transport_layer_asio_integration_test.cpp +++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp @@ -33,6 +33,7 @@ #include "mongo/client/async_client.h" #include "mongo/client/connection_string.h" #include "mongo/db/client.h" +#include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/stdx/thread.h" #include "mongo/transport/session.h" @@ -128,11 +129,7 @@ TEST(TransportLayerASIO, ShortReadsAndWritesWork) { auto future = handle->runCommandRequest(ecr, baton); const auto batonGuard = MakeGuard([&] { baton->detach(); }); - while (!future.isReady()) { - baton->run(nullptr, boost::none); - } - - assertOK(future.get()); + future.get(opCtx.get()); } } diff --git a/src/mongo/util/clock_source.cpp b/src/mongo/util/clock_source.cpp index d562089c27a..ec02c771294 100644 --- a/src/mongo/util/clock_source.cpp +++ b/src/mongo/util/clock_source.cpp @@ -30,13 +30,20 @@ #include "mongo/stdx/thread.h" #include "mongo/util/clock_source.h" +#include "mongo/util/waitable.h" namespace mongo { stdx::cv_status ClockSource::waitForConditionUntil(stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, - Date_t deadline) { + Date_t deadline, + Waitable* waitable) { if (_tracksSystemClock) { - return cv.wait_until(m, deadline.toSystemTimePoint()); + if (deadline == Date_t::max()) { + Waitable::wait(waitable, this, cv, m); + return stdx::cv_status::no_timeout; + } + + return Waitable::wait_until(waitable, this, cv, m, deadline.toSystemTimePoint()); } // The rest of this function only runs during testing, when the clock source is virtualized and @@ -75,7 +82,7 @@ stdx::cv_status ClockSource::waitForConditionUntil(stdx::condition_variable& cv, alarmInfo->waitCV->notify_all(); })); if (!invokedAlarmInline) { - cv.wait(m); + Waitable::wait(waitable, this, cv, m); } m.unlock(); stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex); diff --git a/src/mongo/util/clock_source.h b/src/mongo/util/clock_source.h index e38ce7c932f..12401533b79 100644 --- a/src/mongo/util/clock_source.h +++ b/src/mongo/util/clock_source.h @@ -28,6 +28,8 @@ #pragma once +#include <type_traits> + #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" @@ -41,6 +43,16 @@ class Date_t; * An interface for getting the current wall clock time. */ class ClockSource { + // We need a type trait to differentiate waitable ptr args from predicates. + // + // This returns true for non-pointers and function pointers + template <typename Pred> + struct CouldBePredicate + : public std::integral_constant<bool, + !std::is_pointer<Pred>::value || + std::is_function<std::remove_pointer_t<Pred>>::value> { + }; + public: virtual ~ClockSource() = default; @@ -79,19 +91,21 @@ public: */ stdx::cv_status waitForConditionUntil(stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, - Date_t deadline); + Date_t deadline, + Waitable* waitable = nullptr); /** * Like cv.wait_until(m, deadline, pred), but uses this ClockSource instead of * stdx::chrono::system_clock to measure the passage of time. */ - template <typename Pred> + template <typename Pred, std::enable_if_t<CouldBePredicate<Pred>::value, int> = 0> bool waitForConditionUntil(stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Date_t deadline, - const Pred& pred) { + const Pred& pred, + Waitable* waitable = nullptr) { while (!pred()) { - if (waitForConditionUntil(cv, m, deadline) == stdx::cv_status::timeout) { + if (waitForConditionUntil(cv, m, deadline, waitable) == stdx::cv_status::timeout) { return pred(); } } @@ -102,12 +116,15 @@ public: * Like cv.wait_for(m, duration, pred), but uses this ClockSource instead of * stdx::chrono::system_clock to measure the passage of time. */ - template <typename Duration, typename Pred> + template <typename Duration, + typename Pred, + std::enable_if_t<CouldBePredicate<Pred>::value, int> = 0> bool waitForConditionFor(stdx::condition_variable& cv, stdx::unique_lock<stdx::mutex>& m, Duration duration, - const Pred& pred) { - return waitForConditionUntil(cv, m, now() + duration, pred); + const Pred& pred, + Waitable* waitable = nullptr) { + return waitForConditionUntil(cv, m, now() + duration, pred, waitable); } protected: diff --git a/src/mongo/util/future.h b/src/mongo/util/future.h index a400a881b0b..9a52ec4c1bf 100644 --- a/src/mongo/util/future.h +++ b/src/mongo/util/future.h @@ -44,6 +44,7 @@ #include "mongo/util/assert_util.h" #include "mongo/util/debug_util.h" #include "mongo/util/functional.h" +#include "mongo/util/interruptible.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/scopeguard.h" @@ -315,7 +316,7 @@ public: virtual ~SharedStateBase() = default; // Only called by future side. - void wait() noexcept { + void wait(Interruptible* interruptible) { if (state.load(std::memory_order_acquire) == SSBState::kFinished) return; @@ -330,7 +331,7 @@ public: } stdx::unique_lock<stdx::mutex> lk(mx); - cv->wait(lk, [&] { + interruptible->waitForConditionOrInterrupt(*cv, lk, [&] { // The mx locking above is insufficient to establish an acquire if state transitions to // kFinished before we get here, but we aquire mx before the producer does. return state.load(std::memory_order_acquire) == SSBState::kFinished; @@ -753,37 +754,86 @@ public: } /** + * Returns when the future isReady(). + * + * Throws if the interruptible passed is interrupted (explicitly or via deadline). + */ + void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const { + if (_immediate) { + return; + } + + _shared->wait(interruptible); + } + + /** + * Returns Status::OK() when the future isReady(). + * + * Returns a non-okay status if the interruptible is interrupted. + */ + Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + if (_immediate) { + return Status::OK(); + } + + try { + _shared->wait(interruptible); + } catch (const DBException& ex) { + return ex.toStatus(); + } + + return Status::OK(); + } + + /** * Gets the value out of this Future, blocking until it is ready. * * get() methods throw on error, while getNoThrow() returns a !OK status. * * These methods can be called multiple times, except for the rvalue overloads. + * + * Note: It is impossible to differentiate interruptible interruption from an error propagating + * down the future chain with these methods. If you need to distinguish the two cases, call + * wait() first. */ - T get() && { - return std::move(getImpl()); + T get(Interruptible* interruptible = Interruptible::notInterruptible()) && { + return std::move(getImpl(interruptible)); } - T& get() & { - return getImpl(); + T& get(Interruptible* interruptible = Interruptible::notInterruptible()) & { + return getImpl(interruptible); } - const T& get() const& { - return const_cast<Future*>(this)->getImpl(); + const T& get(Interruptible* interruptible = Interruptible::notInterruptible()) const& { + return const_cast<Future*>(this)->getImpl(interruptible); } - StatusWith<T> getNoThrow() && noexcept { + StatusWith<T> getNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) && + noexcept { if (_immediate) { return std::move(*_immediate); } - _shared->wait(); + try { + _shared->wait(interruptible); + } catch (const DBException& ex) { + return ex.toStatus(); + } + if (!_shared->status.isOK()) return std::move(_shared->status); return std::move(*_shared->data); } - StatusWith<T> getNoThrow() const& noexcept { + StatusWith<T> getNoThrow( + Interruptible* interruptible = Interruptible::notInterruptible()) const& noexcept { if (_immediate) { return *_immediate; } - _shared->wait(); + try { + _shared->wait(interruptible); + } catch (const DBException& ex) { + return ex.toStatus(); + } + if (!_shared->status.isOK()) return _shared->status; return *_shared->data; @@ -1092,12 +1142,12 @@ private: friend class Future; friend class Promise<T>; - T& getImpl() { + T& getImpl(Interruptible* interruptible) { if (_immediate) { return *_immediate; } - _shared->wait(); + _shared->wait(interruptible); uassertStatusOK(_shared->status); return *(_shared->data); } @@ -1244,12 +1294,22 @@ public: return _inner.isReady(); } - void get() const { - _inner.get(); + void wait(Interruptible* interruptible = Interruptible::notInterruptible()) const { + _inner.wait(interruptible); + } + + Status waitNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + return _inner.waitNoThrow(interruptible); + } + + void get(Interruptible* interruptible = Interruptible::notInterruptible()) const { + _inner.get(interruptible); } - Status getNoThrow() const noexcept { - return _inner.getNoThrow().getStatus(); + Status getNoThrow(Interruptible* interruptible = Interruptible::notInterruptible()) const + noexcept { + return _inner.getNoThrow(interruptible).getStatus(); } template <typename Func> // Status -> void diff --git a/src/mongo/util/future_test_future_int.cpp b/src/mongo/util/future_test_future_int.cpp index 803b0992500..6707a310161 100644 --- a/src/mongo/util/future_test_future_int.cpp +++ b/src/mongo/util/future_test_future_int.cpp @@ -165,6 +165,36 @@ TEST(Future, Fail_isReady) { }); } +TEST(Future, Success_wait) { + FUTURE_SUCCESS_TEST([] { return 1; }, + [](Future<int>&& fut) { + fut.wait(); + ASSERT_EQ(fut.get(), 1); + }); +} + +TEST(Future, Success_waitNoThrow) { + FUTURE_SUCCESS_TEST([] { return 1; }, + [](Future<int>&& fut) { + ASSERT_OK(fut.waitNoThrow()); + ASSERT_EQ(fut.get(), 1); + }); +} + +TEST(Future, Fail_wait) { + FUTURE_FAIL_TEST<int>([](Future<int>&& fut) { + fut.wait(); + ASSERT_THROWS_failStatus(fut.get()); + }); +} + +TEST(Future, Fail_waitNoThrow) { + FUTURE_FAIL_TEST<int>([](Future<int>&& fut) { + ASSERT_OK(fut.waitNoThrow()); + ASSERT_THROWS_failStatus(fut.get()); + }); +} + TEST(Future, isReady_TSAN_OK) { bool done = false; auto fut = async([&] { diff --git a/src/mongo/util/interruptible.h b/src/mongo/util/interruptible.h new file mode 100644 index 00000000000..e47a34ba29c --- /dev/null +++ b/src/mongo/util/interruptible.h @@ -0,0 +1,452 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/time_support.h" +#include "mongo/util/waitable.h" + +namespace mongo { + +/** + * A type which can be used to wait on condition variables with a level triggered one-way interrupt. + * I.e. after the interrupt is triggered (via some non-public api call) subsequent calls to + * waitForConditionXXX will fail. Interrupts must unblock all callers of waitForConditionXXX. + */ +class Interruptible { +protected: + struct DeadlineState { + Date_t deadline; + ErrorCodes::Error error; + bool hasArtificialDeadline; + }; + + struct IgnoreInterruptsState { + bool ignoreInterrupts; + DeadlineState deadline; + }; + + /** + * A deadline guard provides a subsidiary deadline to the parent. + */ + class DeadlineGuard { + public: + DeadlineGuard(const DeadlineGuard&) = delete; + DeadlineGuard& operator=(const DeadlineGuard&) = delete; + + DeadlineGuard(DeadlineGuard&& other) + : _interruptible(other._interruptible), _oldDeadline(other._oldDeadline) { + other._interruptible = nullptr; + } + + DeadlineGuard& operator=(DeadlineGuard&& other) = delete; + + ~DeadlineGuard() { + if (_interruptible) { + _interruptible->popArtificialDeadline(_oldDeadline); + } + } + + private: + friend Interruptible; + + explicit DeadlineGuard(Interruptible& interruptible, + Date_t newDeadline, + ErrorCodes::Error error) + : _interruptible(&interruptible), + _oldDeadline(_interruptible->pushArtificialDeadline(newDeadline, error)) {} + + Interruptible* _interruptible; + DeadlineState _oldDeadline; + }; + + DeadlineGuard makeDeadlineGuard(Date_t deadline, ErrorCodes::Error error) { + return DeadlineGuard(*this, deadline, error); + } + + /** + * An interruption guard provides a region where interruption is ignored. + * + * Note that this causes the deadline to be reset to Date_t::max(), but that it can also be + * subsequently reduced in size after the fact. + */ + class IgnoreInterruptionsGuard { + public: + IgnoreInterruptionsGuard(const IgnoreInterruptionsGuard&) = delete; + IgnoreInterruptionsGuard& operator=(const IgnoreInterruptionsGuard&) = delete; + + IgnoreInterruptionsGuard(IgnoreInterruptionsGuard&& other) + : _interruptible(other._interruptible), _oldState(other._oldState) { + other._interruptible = nullptr; + } + + IgnoreInterruptionsGuard& operator=(IgnoreInterruptionsGuard&&) = delete; + + ~IgnoreInterruptionsGuard() { + if (_interruptible) { + _interruptible->popIgnoreInterrupts(_oldState); + } + } + + private: + friend Interruptible; + + explicit IgnoreInterruptionsGuard(Interruptible& interruptible) + : _interruptible(&interruptible), _oldState(_interruptible->pushIgnoreInterrupts()) {} + + Interruptible* _interruptible; + IgnoreInterruptsState _oldState; + }; + + IgnoreInterruptionsGuard makeIgnoreInterruptionsGuard() { + return IgnoreInterruptionsGuard(*this); + } + +public: + /** + * Returns a statically allocated instance that cannot be interrupted. Useful as a default + * argument to interruptible taking methods. + */ + static Interruptible* notInterruptible(); + + /** + * Invokes the passed callback with a deadline guard active initialized with the passed + * deadline. Additionally handles the dance of try/catching the invocation and checking + * checkForInterrupt with the guard inactive (to allow a higher level timeout to override a + * lower level one) + */ + template <typename Callback> + decltype(auto) runWithDeadline(Date_t deadline, ErrorCodes::Error error, Callback&& cb) { + invariant(ErrorCodes::isExceededTimeLimitError(error)); + + try { + const auto guard = makeDeadlineGuard(deadline, error); + return std::forward<Callback>(cb)(); + } catch (const ExceptionForCat<ErrorCategory::ExceededTimeLimitError>&) { + // May throw replacement exception + checkForInterrupt(); + throw; + } + } + + bool hasDeadline() const { + return getDeadline() != Date_t::max(); + } + + /** + * Returns the deadline for this interruptible, or Date_t::max() if there is no deadline. + */ + virtual Date_t getDeadline() const = 0; + + /** + * Invokes the passed callback with an interruption guard active. Additionally handles the + * dance of try/catching the invocation and checking checkForInterrupt with the guard inactive + * (to allow a higher level timeout to override a lower level one, or for top level interruption + * to propagate) + */ + template <typename Callback> + decltype(auto) runWithoutInterruption(Callback&& cb) { + try { + const auto guard = makeIgnoreInterruptionsGuard(); + return std::forward<Callback>(cb)(); + } catch (const ExceptionForCat<ErrorCategory::ExceededTimeLimitError>&) { + // May throw replacement exception + checkForInterrupt(); + throw; + } + } + + /** + * Raises a AssertionException if this operation is in a killed state. + */ + void checkForInterrupt() { + uassertStatusOK(checkForInterruptNoAssert()); + } + + /** + * Returns Status::OK() unless this operation is in a killed state. + */ + virtual Status checkForInterruptNoAssert() noexcept = 0; + + /** + * Waits for either the condition "cv" to be signaled, this operation to be interrupted, or the + * deadline on this operation to expire. In the event of interruption or operation deadline + * expiration, raises a AssertionException with an error code indicating the interruption type. + */ + void waitForConditionOrInterrupt(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m) { + uassertStatusOK(waitForConditionOrInterruptNoAssert(cv, m)); + } + + /** + * Waits on condition "cv" for "pred" until "pred" returns true, or this operation + * is interrupted or its deadline expires. Throws a DBException for interruption and + * deadline expiration. + */ + template <typename Pred> + void waitForConditionOrInterrupt(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Pred pred) { + while (!pred()) { + waitForConditionOrInterrupt(cv, m); + } + } + + /** + * Same as waitForConditionOrInterrupt, except returns a Status instead of throwing + * a DBException to report interruption. + */ + Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m) noexcept { + auto status = waitForConditionOrInterruptNoAssertUntil(cv, m, Date_t::max()); + if (!status.isOK()) { + return status.getStatus(); + } + + invariant(status.getValue() == stdx::cv_status::no_timeout); + return Status::OK(); + } + + /** + * Same as the predicate form of waitForConditionOrInterrupt, except that it returns a not okay + * status instead of throwing on interruption. + */ + template <typename Pred> + Status waitForConditionOrInterruptNoAssert(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Pred pred) noexcept { + while (!pred()) { + auto status = waitForConditionOrInterruptNoAssert(cv, m); + + if (!status.isOK()) { + return status; + } + } + + return Status::OK(); + } + + /** + * Waits for condition "cv" to be signaled, or for the given "deadline" to expire, or + * for the operation to be interrupted, or for the operation's own deadline to expire. + * + * If the operation deadline expires or the operation is interrupted, throws a DBException. If + * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns + * cv_status::no_timeout. + */ + stdx::cv_status waitForConditionOrInterruptUntil(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline) { + return uassertStatusOK(waitForConditionOrInterruptNoAssertUntil(cv, m, deadline)); + } + + /** + * Waits on condition "cv" for "pred" until "pred" returns true, or the given "deadline" + * expires, or this operation is interrupted, or this operation's own deadline expires. + * + * + * If the operation deadline expires or the operation is interrupted, throws a DBException. If + * the given "deadline" expires, returns cv_status::timeout. Otherwise, returns + * cv_status::no_timeout indicating that "pred" finally returned true. + */ + template <typename Pred> + bool waitForConditionOrInterruptUntil(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline, + Pred pred) { + while (!pred()) { + if (stdx::cv_status::timeout == waitForConditionOrInterruptUntil(cv, m, deadline)) { + return pred(); + } + } + return true; + } + + /** + * Same as the non-predicate form of waitForConditionOrInterruptUntil, but takes a relative + * amount of time to wait instead of an absolute time point. + */ + stdx::cv_status waitForConditionOrInterruptFor(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Milliseconds ms) { + return uassertStatusOK( + waitForConditionOrInterruptNoAssertUntil(cv, m, getExpirationDateForWaitForValue(ms))); + } + + /** + * Same as the predicate form of waitForConditionOrInterruptUntil, but takes a relative + * amount of time to wait instead of an absolute time point. + */ + template <typename Pred> + bool waitForConditionOrInterruptFor(stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Milliseconds ms, + Pred pred) { + while (!pred()) { + if (stdx::cv_status::timeout == waitForConditionOrInterruptFor(cv, m, ms)) { + return pred(); + } + } + return true; + } + + /** + * Same as waitForConditionOrInterruptUntil, except returns StatusWith<stdx::cv_status> and + * non-ok status indicates the error instead of a DBException. + */ + virtual StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline) noexcept = 0; + + /** + * Sleeps until "deadline"; throws an exception if the interruptible is interrupted before then. + */ + void sleepUntil(Date_t deadline) { + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + invariant(!waitForConditionOrInterruptUntil(cv, lk, deadline, [] { return false; })); + } + + /** + * Sleeps for "duration" ms; throws an exception if the interruptible is interrupted before + * then. + */ + void sleepFor(Milliseconds duration) { + stdx::mutex m; + stdx::condition_variable cv; + stdx::unique_lock<stdx::mutex> lk(m); + invariant(!waitForConditionOrInterruptFor(cv, lk, duration, [] { return false; })); + } + +protected: + /** + * Pushes an ignore interruption critical section into the interruptible. Until an associated + * popIgnoreInterrupts is invoked, the interruptible should ignore interruptions related to + * explicit interruption or previously set deadlines. + * + * Note that new deadlines can be set after this is called, which will again introduce the + * possibility of interruption. + * + * Returns state needed to pop interruption. + */ + virtual IgnoreInterruptsState pushIgnoreInterrupts() = 0; + + /** + * Pops the ignored interruption critical section introduced by push. + */ + virtual void popIgnoreInterrupts(IgnoreInterruptsState iis) = 0; + + /** + * Pushes a subsidiary deadline into the interruptible. Until an associated + * popArtificialDeadline is + * invoked, the interruptible will fail checkForInterrupt and waitForConditionOrInterrupt calls + * with the passed error code if the deadline has passed. + * + * Note that deadline's higher than the current value are constrained (such that the passed + * error code will be returned/thrown, but after the min(oldDeadline, newDeadline) has passed). + * + * Returns state needed to pop the deadline. + */ + virtual DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) = 0; + + /** + * Pops the subsidiary deadline introduced by push. + */ + virtual void popArtificialDeadline(DeadlineState) = 0; + + /** + * Returns the equivalent of Date_t::now() + waitFor for the interruptible's clock + */ + virtual Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) = 0; + + class NotInterruptible; +}; + +/** + * A not interruptible type which can be used as a lightweight default arg for interruptible taking + * functions. + */ +class Interruptible::NotInterruptible final : public Interruptible { + StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil( + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& m, + Date_t deadline) noexcept override { + + if (deadline == Date_t::max()) { + cv.wait(m); + return stdx::cv_status::no_timeout; + } + + return cv.wait_until(m, deadline.toSystemTimePoint()); + } + + Date_t getDeadline() const override { + return Date_t::max(); + } + + Status checkForInterruptNoAssert() noexcept override { + return Status::OK(); + } + + // It's invalid to call the deadline or ignore interruption guards on a possibly noop + // interruptible. + // + // The noop interruptible should only be invoked as a default arg at the bottom of the call + // stack (with types that won't modify it's invocation) + IgnoreInterruptsState pushIgnoreInterrupts() override { + MONGO_UNREACHABLE; + } + + void popIgnoreInterrupts(IgnoreInterruptsState) override { + MONGO_UNREACHABLE; + } + + DeadlineState pushArtificialDeadline(Date_t deadline, ErrorCodes::Error error) override { + MONGO_UNREACHABLE; + } + + void popArtificialDeadline(DeadlineState) override { + MONGO_UNREACHABLE; + } + + Date_t getExpirationDateForWaitForValue(Milliseconds waitFor) override { + return Date_t::now() + waitFor; + } +}; + +inline Interruptible* Interruptible::notInterruptible() { + static NotInterruptible notInterruptible{}; + + return ¬Interruptible; +} + +} // namespace mongo diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h index 8f2b17d2265..a2e1a09e9bb 100644 --- a/src/mongo/util/producer_consumer_queue.h +++ b/src/mongo/util/producer_consumer_queue.h @@ -38,6 +38,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/interruptible.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -56,43 +57,6 @@ struct DefaultCostFunction { } }; -// Various helpers to tighten down whether the args getting passed are valid interruption args. -// -// Whatever the caller passes in the interruption args, they need to be invocable on one of -// these helpers. std::is_invocable would do the job in C++17 -constexpr std::false_type areInterruptionArgsHelper(...) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(OperationContext*) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Milliseconds) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Date_t) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(Milliseconds) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(Date_t) { - return {}; -} - -template <typename U, typename... InterruptionArgs> -constexpr auto areInterruptionArgs(U&& u, InterruptionArgs&&... args) { - return areInterruptionArgsHelper(std::forward<U>(u), std::forward<InterruptionArgs>(args)...); -} - -constexpr std::true_type areInterruptionArgs() { - return {}; -} - } // namespace producer_consumer_queue_detail /** @@ -109,20 +73,9 @@ constexpr std::true_type areInterruptionArgs() { * multi-consumer - Any number of threads may pop work out of the queue * * Interruptibility: - * All of the blocking methods on this type allow for 6 kinds of interruptibility. The matrix is - * parameterized by (void|OperationContext*)|(void|Milliseconds|Date_t). These provide different - * kinds of waiting based on whether the method should be interruptible via opCtx, and then - * whether they should timeout via deadline or duration + * All of the blocking methods on this type take an interruptible. * - * A contrived example: pcq.pop(opCtx, Minutes(1)) would be warranted if: - * - The caller is blocking on a client thread. (opCtx) - * - The caller needs to act periodically on inactivity. (the duration) - * - * Exceptions include: - * timeouts - * ErrorCodes::ExceededTimeLimit exceptions - * opCtx interrupts - * ErrorCodes::Interrupted exceptions + * Exceptions outside the interruptible include: * closure of queue endpoints * ErrorCodes::ProducerConsumerQueueEndClosed * pushes with batches that exceed the max queue size @@ -165,11 +118,7 @@ public: // Pushes the passed T into the queue // // Leaves T unchanged if an interrupt exception is thrown while waiting for space - template < - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - void push(T&& t, InterruptionArgs&&... interruptionArgs) { + void push(T&& t, Interruptible* interruptible = Interruptible::notInterruptible()) { _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) { auto cost = _invokeCostFunc(t, lk); uassert(ErrorCodes::ProducerConsumerQueueBatchTooLarge, @@ -179,7 +128,7 @@ public: << ")", cost <= _max); - _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...); + _waitForSpace(lk, cost, interruptible); _push(lk, std::move(t)); }); } @@ -195,13 +144,10 @@ public: // // Lifecycle methods of T must not throw if you want to use this method, as there's no obvious // mechanism to see what was and was not pushed if those do throw - template < - typename StartIterator, - typename EndIterator, - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - void pushMany(StartIterator start, EndIterator last, InterruptionArgs&&... interruptionArgs) { + template <typename StartIterator, typename EndIterator> + void pushMany(StartIterator start, + EndIterator last, + Interruptible* interruptible = Interruptible::notInterruptible()) { return _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) { size_t cost = 0; for (auto iter = start; iter != last; ++iter) { @@ -215,7 +161,7 @@ public: << ")", cost <= _max); - _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...); + _waitForSpace(lk, cost, interruptible); for (auto iter = start; iter != last; ++iter) { _push(lk, std::move(*iter)); @@ -232,13 +178,9 @@ public: } // Pops one T out of the queue - template < - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - T pop(InterruptionArgs&&... interruptionArgs) { + T pop(Interruptible* interruptible = Interruptible::notInterruptible()) { return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { - _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...); + _waitForNonEmpty(lk, interruptible); return _pop(lk); }); } @@ -250,14 +192,10 @@ public: // TODO: add sfinae to check to enforce // // Returns the cost value of the items extracted, along with the updated output iterator - template < - typename OutputIterator, - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - std::pair<size_t, OutputIterator> popMany(OutputIterator iterator, - InterruptionArgs&&... interruptionArgs) { - return popManyUpTo(_max, iterator, std::forward<InterruptionArgs>(interruptionArgs)...); + template <typename OutputIterator> + std::pair<size_t, OutputIterator> popMany( + OutputIterator iterator, Interruptible* interruptible = Interruptible::notInterruptible()) { + return popManyUpTo(_max, iterator, interruptible); } // Waits for at least one item in the queue, then pops items out of the queue until it would @@ -267,18 +205,15 @@ public: // TODO: add sfinae to check to enforce // // Returns the cost value of the items extracted, along with the updated output iterator - template < - typename OutputIterator, - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - std::pair<size_t, OutputIterator> popManyUpTo(size_t budget, - OutputIterator iterator, - InterruptionArgs&&... interruptionArgs) { + template <typename OutputIterator> + std::pair<size_t, OutputIterator> popManyUpTo( + size_t budget, + OutputIterator iterator, + Interruptible* interruptible = Interruptible::notInterruptible()) { return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { size_t cost = 0; - _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...); + _waitForNonEmpty(lk, interruptible); while (auto out = _tryPop(lk)) { cost += _invokeCostFunc(*out, lk); @@ -449,10 +384,9 @@ private: return t; } - template <typename... InterruptionArgs> void _waitForSpace(stdx::unique_lock<stdx::mutex>& lk, size_t cost, - InterruptionArgs&&... interruptionArgs) { + Interruptible* interruptible) { invariant(!_producerWants); _producerWants = cost; @@ -464,12 +398,10 @@ private: _checkProducerClosed(lk); return _current + cost <= _max; }, - std::forward<InterruptionArgs>(interruptionArgs)...); + interruptible); } - template <typename... InterruptionArgs> - void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, - InterruptionArgs&&... interruptionArgs) { + void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, Interruptible* interruptible) { _consumers++; const auto guard = MakeGuard([&] { _consumers--; }); @@ -480,64 +412,15 @@ private: _checkConsumerClosed(lk); return _queue.size(); }, - std::forward<InterruptionArgs>(interruptionArgs)...); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred, - OperationContext* opCtx) { - opCtx->waitForConditionOrInterrupt(condvar, lk, pred); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred) { - condvar.wait(lk, pred); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred, - OperationContext* opCtx, - Date_t deadline) { - uassert(ErrorCodes::ExceededTimeLimit, - "exceeded timeout", - opCtx->waitForConditionOrInterruptUntil(condvar, lk, deadline, pred)); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred, - Date_t deadline) { - uassert(ErrorCodes::ExceededTimeLimit, - "exceeded timeout", - condvar.wait_until(lk, deadline.toSystemTimePoint(), pred)); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred, - OperationContext* opCtx, - Milliseconds duration) { - uassert(ErrorCodes::ExceededTimeLimit, - "exceeded timeout", - opCtx->waitForConditionOrInterruptFor(condvar, lk, duration, pred)); + interruptible); } template <typename Callback> void _waitFor(stdx::unique_lock<stdx::mutex>& lk, stdx::condition_variable& condvar, Callback&& pred, - Milliseconds duration) { - uassert(ErrorCodes::ExceededTimeLimit, - "exceeded timeout", - condvar.wait_for(lk, duration.toSystemDuration(), pred)); + Interruptible* interruptible) { + interruptible->waitForConditionOrInterrupt(condvar, lk, pred); } mutable stdx::mutex _mutex; diff --git a/src/mongo/util/producer_consumer_queue_test.cpp b/src/mongo/util/producer_consumer_queue_test.cpp index fa7af5e04a9..44161c8e64f 100644 --- a/src/mongo/util/producer_consumer_queue_test.cpp +++ b/src/mongo/util/producer_consumer_queue_test.cpp @@ -76,7 +76,8 @@ public: auto client = _serviceCtx->makeClient(name.toString()); auto opCtx = client->makeOperationContext(); - cb(opCtx.get(), _timeout); + opCtx->runWithDeadline( + _timeout, ErrorCodes::ExceededTimeLimit, [&] { cb(opCtx.get()); }); }); } @@ -96,20 +97,6 @@ public: } }; -template <typename Timeout> -class ProducerConsumerQueueTestHelper<Timeout> { -public: - ProducerConsumerQueueTestHelper(Timeout timeout) : _timeout(timeout) {} - - template <typename Callback> - stdx::thread runThread(StringData name, Callback&& cb) { - return stdx::thread([this, name, cb] { cb(_timeout); }); - } - -private: - Timeout _timeout; -}; - class ProducerConsumerQueueTest : public unittest::Test { public: template <typename Callback> @@ -127,27 +114,17 @@ public: const Minutes duration(30); callback(ProducerConsumerQueueTestHelper<OperationContext>(_serviceCtx.get())); - callback(ProducerConsumerQueueTestHelper<OperationContext, Milliseconds>(_serviceCtx.get(), - duration)); callback(ProducerConsumerQueueTestHelper<OperationContext, Date_t>( _serviceCtx.get(), _serviceCtx->getPreciseClockSource()->now() + duration)); callback(ProducerConsumerQueueTestHelper<>()); - callback(ProducerConsumerQueueTestHelper<Milliseconds>(duration)); - callback(ProducerConsumerQueueTestHelper<Date_t>( - _serviceCtx->getPreciseClockSource()->now() + duration)); } template <typename Callback> void runTimeoutPermutations(Callback&& callback) { const Milliseconds duration(10); - callback(ProducerConsumerQueueTestHelper<OperationContext, Milliseconds>(_serviceCtx.get(), - duration)); callback(ProducerConsumerQueueTestHelper<OperationContext, Date_t>( _serviceCtx.get(), _serviceCtx->getPreciseClockSource()->now() + duration)); - callback(ProducerConsumerQueueTestHelper<Milliseconds>(duration)); - callback(ProducerConsumerQueueTestHelper<Date_t>( - _serviceCtx->getPreciseClockSource()->now() + duration)); } private: diff --git a/src/mongo/util/waitable.h b/src/mongo/util/waitable.h new file mode 100644 index 00000000000..21a4ed3ea2a --- /dev/null +++ b/src/mongo/util/waitable.h @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/clock_source.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +/** + * Waitable is a lightweight type that can be used with stdx::condition_variable and can do other + * work while the condvar 'waits'. + * + * It handles this dance by using a special hook that condvar provides to register itself (as a + * notifyable, which it inherits from) during calls to wait. Then, rather than actually waiting on + * the condvar, it invokes its run/run_until methods. + * + * The current implementer of Waitable is the transport layer baton type, which performs delayed IO + * when it would otherwise block. + */ +class Waitable : public Notifyable { +public: + static void wait(Waitable* waitable, + ClockSource* clkSource, + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& lk) { + if (waitable) { + cv._runWithNotifyable(*waitable, [&]() noexcept { + lk.unlock(); + waitable->run(clkSource); + lk.lock(); + }); + } else { + cv.wait(lk); + } + } + + template <typename Predicate> + static void wait(Waitable* waitable, + ClockSource* clkSource, + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& lk, + Predicate pred) { + while (!pred()) { + wait(waitable, clkSource, cv, lk); + } + } + + static stdx::cv_status wait_until( + Waitable* waitable, + ClockSource* clkSource, + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& lk, + const stdx::chrono::time_point<stdx::chrono::system_clock>& timeout_time) { + if (waitable) { + auto rval = stdx::cv_status::no_timeout; + + cv._runWithNotifyable(*waitable, [&]() noexcept { + lk.unlock(); + if (waitable->run_until(clkSource, Date_t(timeout_time)) == TimeoutState::Timeout) { + rval = stdx::cv_status::timeout; + } + lk.lock(); + }); + + return rval; + } else { + return cv.wait_until(lk, timeout_time); + } + } + + template <typename Predicate> + static bool wait_until(Waitable* waitable, + ClockSource* clkSource, + stdx::condition_variable& cv, + stdx::unique_lock<stdx::mutex>& lk, + const stdx::chrono::time_point<stdx::chrono::system_clock>& timeout_time, + Predicate pred) { + while (!pred()) { + if (wait_until(waitable, clkSource, cv, lk, timeout_time) == stdx::cv_status::timeout) { + return pred(); + } + } + + return true; + } + +protected: + ~Waitable() noexcept {} + + enum class TimeoutState { + NoTimeout, + Timeout, + }; + + /** + * Run some amount of work. The intention is that this function perform work until it's + * possible that the surrounding condvar clause could have finished. + * + * Note that like regular condvar.wait, this allows implementers the flexibility to possibly + * return early. + * + * We take a clock source here to allow for synthetic timeouts. + */ + virtual void run(ClockSource* clkSource) noexcept = 0; + + /** + * Like run, but only until the passed deadline has passed. + */ + virtual TimeoutState run_until(ClockSource* clkSource, Date_t deadline) noexcept = 0; +}; + +} // namespace mongo |