summaryrefslogtreecommitdiff
path: root/src/mongo/s/async_requests_sender.cpp
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-06 18:17:34 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-03-13 15:09:46 -0400
commit965dc76f4b4e27f7a9e3bc7810b608c53085d32f (patch)
treee5554cdfb59e4df76adc55d4851b2aaa54dabc12 /src/mongo/s/async_requests_sender.cpp
parent90bd4ed6ba5d0f3353d1af42c667cd6a2c1a540e (diff)
downloadmongo-965dc76f4b4e27f7a9e3bc7810b608c53085d32f.tar.gz
SERVER-28164 make ClusterWrite::run path use ARS instead of DBClientMultiCommand
Diffstat (limited to 'src/mongo/s/async_requests_sender.cpp')
-rw-r--r--src/mongo/s/async_requests_sender.cpp283
1 files changed, 121 insertions, 162 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 1ec65430f75..aa11a3c1216 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -55,12 +55,8 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
executor::TaskExecutor* executor,
StringData db,
const std::vector<AsyncRequestsSender::Request>& requests,
- const ReadPreferenceSetting& readPreference,
- bool allowPartialResults)
- : _executor(executor),
- _db(std::move(db)),
- _readPreference(readPreference),
- _allowPartialResults(allowPartialResults) {
+ const ReadPreferenceSetting& readPreference)
+ : _opCtx(opCtx), _executor(executor), _db(std::move(db)), _readPreference(readPreference) {
for (const auto& request : requests) {
_remotes.emplace_back(request.shardId, request.cmdObj);
}
@@ -73,44 +69,35 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
_metadataObj = metadataBuilder.obj();
// Schedule the requests immediately.
- _scheduleRequestsIfNeeded(opCtx);
+ // We must create the notification before scheduling any requests, because the notification is
+ // signaled both on an error in scheduling the request and a request's callback. Similarly, we
+ // lock so that no callbacks signal the notification until after we are done scheduling
+ // requests, to prevent signaling the notification twice.
+ _notification.emplace();
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _scheduleRequests_inlock();
}
-
AsyncRequestsSender::~AsyncRequestsSender() {
- invariant(_done());
-}
-
-std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses(
- OperationContext* opCtx) {
- invariant(!_remotes.empty());
-
- // Until all remotes have received a response or error, keep scheduling retries and waiting on
- // outstanding requests.
- while (!_done()) {
- _notification->get();
+ // Make sure any pending network I/O has been canceled.
+ kill();
- // Note: if we have been interrupt()'d or if some remote had a non-retriable error and
- // allowPartialResults is false, no retries will be scheduled.
- _scheduleRequestsIfNeeded(opCtx);
- }
-
- // Construct the responses.
- std::vector<Response> responses;
- for (const auto& remote : _remotes) {
- invariant(remote.swResponse);
- if (remote.swResponse->isOK()) {
- invariant(remote.shardHostAndPort);
- responses.emplace_back(std::move(remote.swResponse->getValue()),
- std::move(*remote.shardHostAndPort));
- } else {
- responses.emplace_back(std::move(remote.swResponse->getStatus()),
- std::move(remote.shardHostAndPort));
- }
+ // Wait on remaining callbacks to run.
+ while (!done()) {
+ next();
}
+}
- _remotes.clear();
+AsyncRequestsSender::Response AsyncRequestsSender::next() {
+ invariant(!done());
- return responses;
+ // If needed, schedule requests for all remotes which had retriable errors.
+ // If some remote had success or a non-retriable error, return it.
+ boost::optional<Response> readyResponse;
+ while (!(readyResponse = _ready())) {
+ // Otherwise, wait for some response to be received.
+ _notification->get(_opCtx);
+ }
+ return *readyResponse;
}
void AsyncRequestsSender::interrupt() {
@@ -120,78 +107,112 @@ void AsyncRequestsSender::interrupt() {
void AsyncRequestsSender::kill() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _stopRetrying = true;
+ if (_killed) {
+ return;
+ }
+ _stopRetrying = true;
// Cancel all outstanding requests so they return immediately.
for (auto& remote : _remotes) {
if (remote.cbHandle.isValid()) {
_executor->cancel(remote.cbHandle);
}
}
+ _killed = true;
}
-bool AsyncRequestsSender::_done() {
+bool AsyncRequestsSender::done() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _done_inlock();
+ return std::all_of(
+ _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; });
}
-bool AsyncRequestsSender::_done_inlock() {
- for (const auto& remote : _remotes) {
- if (!remote.swResponse) {
- return false;
- }
- }
- return true;
-}
-
-/*
- * Note: If _scheduleRequestsIfNeeded() does retries, only the remotes with retriable errors will be
- * rescheduled because:
- *
- * 1. Other pending remotes still have callback assigned to them.
- * 2. Remotes that already successfully received a response will have a non-empty 'response'.
- * 3. Remotes that have reached maximum retries will have an error status.
- */
-void AsyncRequestsSender::_scheduleRequestsIfNeeded(OperationContext* opCtx) {
+boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- // We can't make a new notification if there was a previous one that has not been signaled.
- invariant(!_notification || *_notification);
+ _notification.emplace();
- if (_done_inlock()) {
- return;
+ if (!_stopRetrying) {
+ _scheduleRequests_inlock();
}
- _notification.emplace();
-
- if (_stopRetrying) {
- return;
+ // Check if any remote is ready.
+ invariant(!_remotes.empty());
+ for (auto& remote : _remotes) {
+ if (remote.swResponse && !remote.done) {
+ remote.done = true;
+ if (remote.swResponse->isOK()) {
+ invariant(remote.shardHostAndPort);
+ return Response(std::move(remote.shardId),
+ std::move(remote.swResponse->getValue()),
+ std::move(*remote.shardHostAndPort));
+ } else {
+ return Response(std::move(remote.shardId),
+ std::move(remote.swResponse->getStatus()),
+ std::move(remote.shardHostAndPort));
+ }
+ }
}
+ // No remotes were ready.
+ return boost::none;
+}
+void AsyncRequestsSender::_scheduleRequests_inlock() {
+ invariant(!_stopRetrying);
// Schedule remote work on hosts for which we have not sent a request or need to retry.
for (size_t i = 0; i < _remotes.size(); ++i) {
auto& remote = _remotes[i];
- // If we have not yet received a response or error for this remote, and we do not have an
- // outstanding request for this remote, schedule remote work to send the command.
+ // First check if the remote had a retriable error, and if so, clear its response field so
+ // it will be retried.
+ if (remote.swResponse && !remote.done) {
+ // We check both the response status and command status for a retriable error.
+ Status status = remote.swResponse->getStatus();
+ if (status.isOK()) {
+ status = getStatusFromCommandResult(remote.swResponse->getValue().data);
+ }
+
+ if (!status.isOK()) {
+ // There was an error with either the response or the command.
+ auto shard = remote.getShard();
+ if (!shard) {
+ remote.swResponse =
+ Status(ErrorCodes::ShardNotFound,
+ str::stream() << "Could not find shard " << remote.shardId);
+ } else {
+ if (remote.shardHostAndPort) {
+ shard->updateReplSetMonitor(*remote.shardHostAndPort, status);
+ }
+ if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) &&
+ remote.retryCount < kMaxNumFailedHostRetryAttempts) {
+ LOG(1) << "Command to remote " << remote.shardId << " at host "
+ << *remote.shardHostAndPort
+ << " failed with retriable error and will be retried "
+ << causedBy(redact(status));
+ ++remote.retryCount;
+ remote.swResponse.reset();
+ }
+ }
+ }
+ }
+
+ // If the remote does not have a response or pending request, schedule remote work for it.
if (!remote.swResponse && !remote.cbHandle.isValid()) {
- auto scheduleStatus = _scheduleRequest_inlock(opCtx, i);
+ auto scheduleStatus = _scheduleRequest_inlock(i);
if (!scheduleStatus.isOK()) {
- // Being unable to schedule a request to a remote is a non-retriable error.
remote.swResponse = std::move(scheduleStatus);
-
- // If partial results are not allowed, stop scheduling requests on other remotes and
- // just wait for outstanding requests to come back.
- if (!_allowPartialResults) {
- _stopRetrying = true;
- break;
+ // Signal the notification indicating the remote had an error (we need to do this
+ // because no request was scheduled, so no callback for this remote will run and
+ // signal the notification).
+ if (!*_notification) {
+ _notification->set();
}
}
}
}
}
-Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, size_t remoteIndex) {
+Status AsyncRequestsSender::_scheduleRequest_inlock(size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -203,15 +224,12 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, siz
}
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _db.toString(), remote.cmdObj, _metadataObj, opCtx);
-
- auto callbackStatus =
- _executor->scheduleRemoteCommand(request,
- stdx::bind(&AsyncRequestsSender::_handleResponse,
- this,
- stdx::placeholders::_1,
- opCtx,
- remoteIndex));
+ *remote.shardHostAndPort, _db.toString(), remote.cmdObj, _metadataObj, _opCtx);
+
+ auto callbackStatus = _executor->scheduleRemoteCommand(
+ request,
+ stdx::bind(
+ &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, remoteIndex));
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -221,9 +239,7 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, siz
}
void AsyncRequestsSender::_handleResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- OperationContext* opCtx,
- size_t remoteIndex) {
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto& remote = _remotes[remoteIndex];
@@ -233,94 +249,37 @@ void AsyncRequestsSender::_handleResponse(
// 'remote'.
remote.cbHandle = executor::TaskExecutor::CallbackHandle();
- // On early return from this point on, signal anyone waiting on the current notification if
- // _done() is true, since this might be the last outstanding request.
- ScopeGuard signaller =
- MakeGuard(&AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock, this);
-
- // We check both the response status and command status for a retriable error.
- Status status = cbData.response.status;
- if (status.isOK()) {
- status = getStatusFromCommandResult(cbData.response.data);
- if (status.isOK()) {
- remote.swResponse = std::move(cbData.response);
- return;
- }
- }
-
- // There was an error with either the response or the command.
-
- auto shard = remote.getShard();
- if (!shard) {
- remote.swResponse =
- Status(ErrorCodes::ShardNotFound,
- str::stream() << "Could not find shard " << remote.shardId << " containing host "
- << remote.getTargetHost().toString());
- return;
- }
- shard->updateReplSetMonitor(remote.getTargetHost(), status);
-
- if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && !_stopRetrying &&
- remote.retryCount < kMaxNumFailedHostRetryAttempts) {
- LOG(1) << "Command to remote " << remote.shardId << " at host " << *remote.shardHostAndPort
- << " failed with retriable error and will be retried" << causedBy(redact(status));
- ++remote.retryCount;
-
- // Even if _done() is not true, signal the thread sleeping in waitForResponses() to make
- // it schedule a retry for this remote without waiting for all outstanding requests to
- // come back.
- signaller.Dismiss();
- _signalCurrentNotification_inlock();
+ // Store the response or error.
+ if (cbData.response.status.isOK()) {
+ remote.swResponse = std::move(cbData.response);
} else {
- // Non-retriable error, out of retries, or _stopRetrying is true.
-
- // Even though we examined the command status to check for retriable errors, we just return
- // the response or response status here. It is up to the caller to parse the response as
- // a command result.
- if (cbData.response.status.isOK()) {
- remote.swResponse = std::move(cbData.response);
- } else {
- remote.swResponse = std::move(cbData.response.status);
- }
-
- // If the caller can't use partial results, there's no point continuing to retry on
- // retriable errors for other remotes.
- if (!_allowPartialResults) {
- _stopRetrying = true;
- }
+ remote.swResponse = std::move(cbData.response.status);
}
-}
-void AsyncRequestsSender::_signalCurrentNotification_inlock() {
- // Only signal the notification if it has not already been signalled.
+ // Signal the notification indicating that the remote received a response.
if (!*_notification) {
_notification->set();
}
}
-void AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock() {
- if (_done_inlock()) {
- _signalCurrentNotification_inlock();
- }
-}
-
AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj)
: shardId(shardId), cmdObj(cmdObj) {}
-AsyncRequestsSender::Response::Response(executor::RemoteCommandResponse response, HostAndPort hp)
- : swResponse(std::move(response)), shardHostAndPort(std::move(hp)) {}
+AsyncRequestsSender::Response::Response(ShardId shardId,
+ executor::RemoteCommandResponse response,
+ HostAndPort hp)
+ : shardId(std::move(shardId)),
+ swResponse(std::move(response)),
+ shardHostAndPort(std::move(hp)) {}
-AsyncRequestsSender::Response::Response(Status status, boost::optional<HostAndPort> hp)
- : swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {}
+AsyncRequestsSender::Response::Response(ShardId shardId,
+ Status status,
+ boost::optional<HostAndPort> hp)
+ : shardId(std::move(shardId)), swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {}
AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj)
: shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {}
-const HostAndPort& AsyncRequestsSender::RemoteData::getTargetHost() const {
- invariant(shardHostAndPort);
- return *shardHostAndPort;
-}
-
Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
const ReadPreferenceSetting& readPref) {
const auto shard = getShard();