summaryrefslogtreecommitdiff
path: root/src/mongo/s/async_requests_sender.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-07-05 15:50:40 -0400
committerJason Carey <jcarey@argv.me>2018-09-17 18:07:18 -0400
commit6e3c5ea176aadbd0475f8f87525b9f0fabd4bdc9 (patch)
tree9ca79672f893b98d4fc99a42cf36528c4a7b7488 /src/mongo/s/async_requests_sender.cpp
parent1faa184e835a7a628631064af08389471d64ed0f (diff)
downloadmongo-6e3c5ea176aadbd0475f8f87525b9f0fabd4bdc9.tar.gz
SERVER-35679 General Interruption Facility
Add support for a generalized interruptibility facility in the server. This offers a generalized interruptibility facility, trialed in Future<T> and ProducerConsumerQueue<T>. It offers 3 major concepts: Notifyable: A type which can notified off-thread, causing a wake up from some kind of blocking wait Waitable: A type which is Notifyable, and also can perform work while in a ready-to-receive notification state. static methods offer support for running underneath condition_variable::wait's. The chief implementer is the transport layer baton type Interruptible: A type which can wait on condition variables, and offers: - deadlines. This means the type integrates some sort of clock source - interruptibility. This means the type offers a way of noticing that it should no longer run via status or exception Additionally, Interruptible's offer special scoped guards which offer - Exemption from interruption in a region defined by the lifetime of a guard object - Subsidiary deadlines which can trigger recursively, offering specialized timeout and status return support. The series of virtual types allows us to slice the interface between opCtx and future such that opctx can use future and future can use opctx. Additionally, cutting out more functionality allows us to flow a noop interruptibility type which unifies our waiting behind a common api.
Diffstat (limited to 'src/mongo/s/async_requests_sender.cpp')
-rw-r--r--src/mongo/s/async_requests_sender.cpp61
1 files changed, 16 insertions, 45 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index aef8ac9d3b7..4ef49669505 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -99,7 +99,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
// Otherwise, wait for some response to be received.
if (_interruptStatus.isOK()) {
try {
- _makeProgress(_opCtx);
+ _makeProgress();
} catch (const AssertionException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
@@ -108,7 +108,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
continue;
}
} else {
- _makeProgress(nullptr);
+ _opCtx->runWithoutInterruption([&] { _makeProgress(); });
}
}
return *readyResponse;
@@ -139,11 +139,6 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
_scheduleRequests();
}
- // If we have baton requests, we want to process those before proceeding
- if (_batonRequests) {
- return boost::none;
- }
-
// Check if any remote is ready.
invariant(!_remotes.empty());
for (auto& remote : _remotes) {
@@ -215,11 +210,6 @@ void AsyncRequestsSender::_scheduleRequests() {
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
- if (_baton) {
- _batonRequests++;
- _baton->schedule([this] { _batonRequests--; });
- }
-
// Push a noop response to the queue to indicate that a remote is ready for
// re-processing due to failure.
_responseQueue.push(boost::none);
@@ -245,11 +235,6 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
[remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- if (_baton) {
- _batonRequests++;
- _baton->schedule([this] { _batonRequests--; });
- }
-
_responseQueue.push(Job{cbData, remoteIndex});
},
_baton);
@@ -262,22 +247,8 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
}
// Passing opCtx means you'd like to opt into opCtx interruption. During cleanup we actually don't.
-void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) {
- invariant(!opCtx || opCtx == _opCtx);
-
- boost::optional<Job> job;
-
- if (_baton) {
- // If we're using a baton, we peek the queue, and block on the baton if it's empty
- if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) {
- job = std::move(*tryJob);
- } else {
- _baton->run(opCtx, boost::none);
- }
- } else {
- // Otherwise we block on the queue
- job = opCtx ? _responseQueue.pop(opCtx) : _responseQueue.pop();
- }
+void AsyncRequestsSender::_makeProgress() {
+ auto job = _responseQueue.pop(_opCtx);
if (!job) {
return;
@@ -350,22 +321,22 @@ Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
// progress on previous requests.
auto pf = makePromiseFuture<HostAndPort>();
- ars->_batonRequests++;
stdx::thread bgChecker([&] {
pf.promise.setWith(
[&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); });
-
- ars->_baton->schedule([ars] { ars->_batonRequests--; });
});
- const auto guard = MakeGuard([&] { bgChecker.join(); });
-
- while (!pf.future.isReady()) {
- if (!ars->_baton->run(nullptr, deadline)) {
- break;
- }
- }
-
- return pf.future.getNoThrow();
+ const auto threadGuard = MakeGuard([&] { bgChecker.join(); });
+
+ // We ignore interrupts here because we want to spin the baton for the full duration of the
+ // findHostWithMaxWait. We set the time limit (although the bg checker should fulfill our
+ // promise) to sync up with the findHostWithMaxWait.
+ //
+ // TODO clean this up after SERVER-35689 when we can do async targeting.
+ return ars->_opCtx->runWithoutInterruption([&] {
+ return ars->_opCtx->runWithDeadline(deadline, ErrorCodes::ExceededTimeLimit, [&] {
+ return pf.future.getNoThrow(ars->_opCtx);
+ });
+ });
}();
if (!findHostStatus.isOK()) {