summaryrefslogtreecommitdiff
path: root/src/mongo/s/async_requests_sender.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-04-19 18:15:20 -0400
committerJason Carey <jcarey@argv.me>2018-04-24 12:02:16 -0400
commitab112a029bca9d575379d42450ea2a7e9254c6de (patch)
treec56b03e56bd6e99879c822a0a055badc510d6185 /src/mongo/s/async_requests_sender.cpp
parent5e118de9148bf95e7ca2de6169fcf20a071d59b4 (diff)
downloadmongo-ab112a029bca9d575379d42450ea2a7e9254c6de.tar.gz
SERVER-34582 Replace object level lock for ARS
ARS holds a lock during scheduling, to prevent notification during scheduling. As an unfortunate side effect, this prevents callbacks from resolving during scheduling. (which can cause background executors to block in executing a callback). This replaces the mutex with a producer consumer queue which handles responses, and moves response handling into calls to next().
Diffstat (limited to 'src/mongo/s/async_requests_sender.cpp')
-rw-r--r--src/mongo/s/async_requests_sender.cpp63
1 files changed, 21 insertions, 42 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 69f61bf989b..5e70f3630ff 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -69,15 +69,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
_metadataObj = readPreference.toContainingBSON();
// Schedule the requests immediately.
-
- // We must create the notification before scheduling any requests, because the notification is
- // signaled both on an error in scheduling the request and a request's callback.
- _notification.emplace();
-
- // We lock so that no callbacks signal the notification until after we are done scheduling
- // requests, to prevent signaling the notification twice, which is illegal.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _scheduleRequests(lk);
+ _scheduleRequests();
}
AsyncRequestsSender::~AsyncRequestsSender() {
_cancelPendingRequests();
@@ -98,7 +90,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
// Otherwise, wait for some response to be received.
if (_interruptStatus.isOK()) {
try {
- _notification->get(_opCtx);
+ _handleResponse(_responseQueue.pop(_opCtx));
} catch (const AssertionException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
@@ -107,25 +99,22 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
continue;
}
} else {
- _notification->get();
+ _handleResponse(_responseQueue.pop());
}
}
return *readyResponse;
}
void AsyncRequestsSender::stopRetrying() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
_stopRetrying = true;
}
bool AsyncRequestsSender::done() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
return std::all_of(
_remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; });
}
void AsyncRequestsSender::_cancelPendingRequests() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
_stopRetrying = true;
// Cancel all outstanding requests so they return immediately.
@@ -137,12 +126,8 @@ void AsyncRequestsSender::_cancelPendingRequests() {
}
boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- _notification.emplace();
-
if (!_stopRetrying) {
- _scheduleRequests(lk);
+ _scheduleRequests();
}
// Check if any remote is ready.
@@ -171,7 +156,7 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
return boost::none;
}
-void AsyncRequestsSender::_scheduleRequests(WithLock lk) {
+void AsyncRequestsSender::_scheduleRequests() {
invariant(!_stopRetrying);
// Schedule remote work on hosts for which we have not sent a request or need to retry.
for (size_t i = 0; i < _remotes.size(); ++i) {
@@ -212,21 +197,18 @@ void AsyncRequestsSender::_scheduleRequests(WithLock lk) {
// If the remote does not have a response or pending request, schedule remote work for it.
if (!remote.swResponse && !remote.cbHandle.isValid()) {
- auto scheduleStatus = _scheduleRequest(lk, i);
+ auto scheduleStatus = _scheduleRequest(i);
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
- // Signal the notification indicating the remote had an error (we need to do this
- // because no request was scheduled, so no callback for this remote will run and
- // signal the notification).
- if (!*_notification) {
- _notification->set();
- }
+ // Push a noop response to the queue to indicate that a remote is ready for
+ // re-processing due to failure.
+ _responseQueue.push(boost::none);
}
}
}
}
-Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) {
+Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -241,8 +223,9 @@ Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) {
*remote.shardHostAndPort, _db, remote.cmdObj, _metadataObj, _opCtx);
auto callbackStatus = _executor->scheduleRemoteCommand(
- request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
- _handleResponse(cbData, remoteIndex);
+ request,
+ [remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
+ _responseQueue.push(Job{cbData, remoteIndex});
});
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
@@ -252,11 +235,12 @@ Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) {
return Status::OK();
}
-void AsyncRequestsSender::_handleResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void AsyncRequestsSender::_handleResponse(boost::optional<Job> job) {
+ if (!job) {
+ return;
+ }
- auto& remote = _remotes[remoteIndex];
+ auto& remote = _remotes[job->remoteIndex];
invariant(!remote.swResponse);
// Clear the callback handle. This indicates that we are no longer waiting on a response from
@@ -264,15 +248,10 @@ void AsyncRequestsSender::_handleResponse(
remote.cbHandle = executor::TaskExecutor::CallbackHandle();
// Store the response or error.
- if (cbData.response.status.isOK()) {
- remote.swResponse = std::move(cbData.response);
+ if (job->cbData.response.status.isOK()) {
+ remote.swResponse = std::move(job->cbData.response);
} else {
- remote.swResponse = std::move(cbData.response.status);
- }
-
- // Signal the notification indicating that a remote received a response.
- if (!*_notification) {
- _notification->set();
+ remote.swResponse = std::move(job->cbData.response.status);
}
}