diff options
author | Jason Carey <jcarey@argv.me> | 2018-07-05 15:50:40 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-09-17 18:07:18 -0400 |
commit | 6e3c5ea176aadbd0475f8f87525b9f0fabd4bdc9 (patch) | |
tree | 9ca79672f893b98d4fc99a42cf36528c4a7b7488 /src/mongo/s/async_requests_sender.cpp | |
parent | 1faa184e835a7a628631064af08389471d64ed0f (diff) | |
download | mongo-6e3c5ea176aadbd0475f8f87525b9f0fabd4bdc9.tar.gz |
SERVER-35679 General Interruption Facility
Add support for a generalized interruptibility facility in the server.
This offers a generalized interruptibility facility, trialed in
Future<T> and ProducerConsumerQueue<T>.
It offers 3 major concepts:
Notifyable: A type which can notified off-thread, causing a wake up
from some kind of blocking wait
Waitable: A type which is Notifyable, and also can perform work while in
a ready-to-receive notification state. static methods offer support for
running underneath condition_variable::wait's. The chief implementer is
the transport layer baton type
Interruptible: A type which can wait on condition variables, and offers:
- deadlines. This means the type integrates some sort of clock source
- interruptibility. This means the type offers a way of noticing
that it should no longer run via status or exception
Additionally, Interruptible's offer special scoped guards which
offer
- Exemption from interruption in a region defined by the lifetime
of a guard object
- Subsidiary deadlines which can trigger recursively, offering
specialized timeout and status return support.
The series of virtual types allows us to slice the interface between
opCtx and future such that opctx can use future and future can use
opctx. Additionally, cutting out more functionality allows us to flow a
noop interruptibility type which unifies our waiting behind a common
api.
Diffstat (limited to 'src/mongo/s/async_requests_sender.cpp')
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 61 |
1 files changed, 16 insertions, 45 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index aef8ac9d3b7..4ef49669505 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -99,7 +99,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { // Otherwise, wait for some response to be received. if (_interruptStatus.isOK()) { try { - _makeProgress(_opCtx); + _makeProgress(); } catch (const AssertionException& ex) { // If the operation is interrupted, we cancel outstanding requests and switch to // waiting for the (canceled) callbacks to finish without checking for interrupts. @@ -108,7 +108,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { continue; } } else { - _makeProgress(nullptr); + _opCtx->runWithoutInterruption([&] { _makeProgress(); }); } } return *readyResponse; @@ -139,11 +139,6 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { _scheduleRequests(); } - // If we have baton requests, we want to process those before proceeding - if (_batonRequests) { - return boost::none; - } - // Check if any remote is ready. invariant(!_remotes.empty()); for (auto& remote : _remotes) { @@ -215,11 +210,6 @@ void AsyncRequestsSender::_scheduleRequests() { if (!scheduleStatus.isOK()) { remote.swResponse = std::move(scheduleStatus); - if (_baton) { - _batonRequests++; - _baton->schedule([this] { _batonRequests--; }); - } - // Push a noop response to the queue to indicate that a remote is ready for // re-processing due to failure. _responseQueue.push(boost::none); @@ -245,11 +235,6 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { auto callbackStatus = _executor->scheduleRemoteCommand( request, [remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { - if (_baton) { - _batonRequests++; - _baton->schedule([this] { _batonRequests--; }); - } - _responseQueue.push(Job{cbData, remoteIndex}); }, _baton); @@ -262,22 +247,8 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { } // Passing opCtx means you'd like to opt into opCtx interruption. During cleanup we actually don't. -void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) { - invariant(!opCtx || opCtx == _opCtx); - - boost::optional<Job> job; - - if (_baton) { - // If we're using a baton, we peek the queue, and block on the baton if it's empty - if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) { - job = std::move(*tryJob); - } else { - _baton->run(opCtx, boost::none); - } - } else { - // Otherwise we block on the queue - job = opCtx ? _responseQueue.pop(opCtx) : _responseQueue.pop(); - } +void AsyncRequestsSender::_makeProgress() { + auto job = _responseQueue.pop(_opCtx); if (!job) { return; @@ -350,22 +321,22 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( // progress on previous requests. auto pf = makePromiseFuture<HostAndPort>(); - ars->_batonRequests++; stdx::thread bgChecker([&] { pf.promise.setWith( [&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); }); - - ars->_baton->schedule([ars] { ars->_batonRequests--; }); }); - const auto guard = MakeGuard([&] { bgChecker.join(); }); - - while (!pf.future.isReady()) { - if (!ars->_baton->run(nullptr, deadline)) { - break; - } - } - - return pf.future.getNoThrow(); + const auto threadGuard = MakeGuard([&] { bgChecker.join(); }); + + // We ignore interrupts here because we want to spin the baton for the full duration of the + // findHostWithMaxWait. We set the time limit (although the bg checker should fulfill our + // promise) to sync up with the findHostWithMaxWait. + // + // TODO clean this up after SERVER-35689 when we can do async targeting. + return ars->_opCtx->runWithoutInterruption([&] { + return ars->_opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] { + return pf.future.getNoThrow(ars->_opCtx); + }); + }); }(); if (!findHostStatus.isOK()) { |