diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-06 18:17:34 -0500 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-13 15:09:46 -0400 |
commit | 965dc76f4b4e27f7a9e3bc7810b608c53085d32f (patch) | |
tree | e5554cdfb59e4df76adc55d4851b2aaa54dabc12 /src/mongo/s/async_requests_sender.cpp | |
parent | 90bd4ed6ba5d0f3353d1af42c667cd6a2c1a540e (diff) | |
download | mongo-965dc76f4b4e27f7a9e3bc7810b608c53085d32f.tar.gz |
SERVER-28164 make ClusterWrite::run path use ARS instead of DBClientMultiCommand
Diffstat (limited to 'src/mongo/s/async_requests_sender.cpp')
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 283 |
1 files changed, 121 insertions, 162 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 1ec65430f75..aa11a3c1216 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -55,12 +55,8 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, executor::TaskExecutor* executor, StringData db, const std::vector<AsyncRequestsSender::Request>& requests, - const ReadPreferenceSetting& readPreference, - bool allowPartialResults) - : _executor(executor), - _db(std::move(db)), - _readPreference(readPreference), - _allowPartialResults(allowPartialResults) { + const ReadPreferenceSetting& readPreference) + : _opCtx(opCtx), _executor(executor), _db(std::move(db)), _readPreference(readPreference) { for (const auto& request : requests) { _remotes.emplace_back(request.shardId, request.cmdObj); } @@ -73,44 +69,35 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, _metadataObj = metadataBuilder.obj(); // Schedule the requests immediately. - _scheduleRequestsIfNeeded(opCtx); + // We must create the notification before scheduling any requests, because the notification is + // signaled both on an error in scheduling the request and a request's callback. Similarly, we + // lock so that no callbacks signal the notification until after we are done scheduling + // requests, to prevent signaling the notification twice. + _notification.emplace(); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _scheduleRequests_inlock(); } - AsyncRequestsSender::~AsyncRequestsSender() { - invariant(_done()); -} - -std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses( - OperationContext* opCtx) { - invariant(!_remotes.empty()); - - // Until all remotes have received a response or error, keep scheduling retries and waiting on - // outstanding requests. - while (!_done()) { - _notification->get(); + // Make sure any pending network I/O has been canceled. + kill(); - // Note: if we have been interrupt()'d or if some remote had a non-retriable error and - // allowPartialResults is false, no retries will be scheduled. - _scheduleRequestsIfNeeded(opCtx); - } - - // Construct the responses. - std::vector<Response> responses; - for (const auto& remote : _remotes) { - invariant(remote.swResponse); - if (remote.swResponse->isOK()) { - invariant(remote.shardHostAndPort); - responses.emplace_back(std::move(remote.swResponse->getValue()), - std::move(*remote.shardHostAndPort)); - } else { - responses.emplace_back(std::move(remote.swResponse->getStatus()), - std::move(remote.shardHostAndPort)); - } + // Wait on remaining callbacks to run. + while (!done()) { + next(); } +} - _remotes.clear(); +AsyncRequestsSender::Response AsyncRequestsSender::next() { + invariant(!done()); - return responses; + // If needed, schedule requests for all remotes which had retriable errors. + // If some remote had success or a non-retriable error, return it. + boost::optional<Response> readyResponse; + while (!(readyResponse = _ready())) { + // Otherwise, wait for some response to be received. + _notification->get(_opCtx); + } + return *readyResponse; } void AsyncRequestsSender::interrupt() { @@ -120,78 +107,112 @@ void AsyncRequestsSender::interrupt() { void AsyncRequestsSender::kill() { stdx::lock_guard<stdx::mutex> lk(_mutex); - _stopRetrying = true; + if (_killed) { + return; + } + _stopRetrying = true; // Cancel all outstanding requests so they return immediately. for (auto& remote : _remotes) { if (remote.cbHandle.isValid()) { _executor->cancel(remote.cbHandle); } } + _killed = true; } -bool AsyncRequestsSender::_done() { +bool AsyncRequestsSender::done() { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _done_inlock(); + return std::all_of( + _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; }); } -bool AsyncRequestsSender::_done_inlock() { - for (const auto& remote : _remotes) { - if (!remote.swResponse) { - return false; - } - } - return true; -} - -/* - * Note: If _scheduleRequestsIfNeeded() does retries, only the remotes with retriable errors will be - * rescheduled because: - * - * 1. Other pending remotes still have callback assigned to them. - * 2. Remotes that already successfully received a response will have a non-empty 'response'. - * 3. Remotes that have reached maximum retries will have an error status. - */ -void AsyncRequestsSender::_scheduleRequestsIfNeeded(OperationContext* opCtx) { +boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { stdx::lock_guard<stdx::mutex> lk(_mutex); - // We can't make a new notification if there was a previous one that has not been signaled. - invariant(!_notification || *_notification); + _notification.emplace(); - if (_done_inlock()) { - return; + if (!_stopRetrying) { + _scheduleRequests_inlock(); } - _notification.emplace(); - - if (_stopRetrying) { - return; + // Check if any remote is ready. + invariant(!_remotes.empty()); + for (auto& remote : _remotes) { + if (remote.swResponse && !remote.done) { + remote.done = true; + if (remote.swResponse->isOK()) { + invariant(remote.shardHostAndPort); + return Response(std::move(remote.shardId), + std::move(remote.swResponse->getValue()), + std::move(*remote.shardHostAndPort)); + } else { + return Response(std::move(remote.shardId), + std::move(remote.swResponse->getStatus()), + std::move(remote.shardHostAndPort)); + } + } } + // No remotes were ready. + return boost::none; +} +void AsyncRequestsSender::_scheduleRequests_inlock() { + invariant(!_stopRetrying); // Schedule remote work on hosts for which we have not sent a request or need to retry. for (size_t i = 0; i < _remotes.size(); ++i) { auto& remote = _remotes[i]; - // If we have not yet received a response or error for this remote, and we do not have an - // outstanding request for this remote, schedule remote work to send the command. + // First check if the remote had a retriable error, and if so, clear its response field so + // it will be retried. + if (remote.swResponse && !remote.done) { + // We check both the response status and command status for a retriable error. + Status status = remote.swResponse->getStatus(); + if (status.isOK()) { + status = getStatusFromCommandResult(remote.swResponse->getValue().data); + } + + if (!status.isOK()) { + // There was an error with either the response or the command. + auto shard = remote.getShard(); + if (!shard) { + remote.swResponse = + Status(ErrorCodes::ShardNotFound, + str::stream() << "Could not find shard " << remote.shardId); + } else { + if (remote.shardHostAndPort) { + shard->updateReplSetMonitor(*remote.shardHostAndPort, status); + } + if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && + remote.retryCount < kMaxNumFailedHostRetryAttempts) { + LOG(1) << "Command to remote " << remote.shardId << " at host " + << *remote.shardHostAndPort + << " failed with retriable error and will be retried " + << causedBy(redact(status)); + ++remote.retryCount; + remote.swResponse.reset(); + } + } + } + } + + // If the remote does not have a response or pending request, schedule remote work for it. if (!remote.swResponse && !remote.cbHandle.isValid()) { - auto scheduleStatus = _scheduleRequest_inlock(opCtx, i); + auto scheduleStatus = _scheduleRequest_inlock(i); if (!scheduleStatus.isOK()) { - // Being unable to schedule a request to a remote is a non-retriable error. remote.swResponse = std::move(scheduleStatus); - - // If partial results are not allowed, stop scheduling requests on other remotes and - // just wait for outstanding requests to come back. - if (!_allowPartialResults) { - _stopRetrying = true; - break; + // Signal the notification indicating the remote had an error (we need to do this + // because no request was scheduled, so no callback for this remote will run and + // signal the notification). + if (!*_notification) { + _notification->set(); } } } } } -Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, size_t remoteIndex) { +Status AsyncRequestsSender::_scheduleRequest_inlock(size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -203,15 +224,12 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, siz } executor::RemoteCommandRequest request( - remote.getTargetHost(), _db.toString(), remote.cmdObj, _metadataObj, opCtx); - - auto callbackStatus = - _executor->scheduleRemoteCommand(request, - stdx::bind(&AsyncRequestsSender::_handleResponse, - this, - stdx::placeholders::_1, - opCtx, - remoteIndex)); + *remote.shardHostAndPort, _db.toString(), remote.cmdObj, _metadataObj, _opCtx); + + auto callbackStatus = _executor->scheduleRemoteCommand( + request, + stdx::bind( + &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, remoteIndex)); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -221,9 +239,7 @@ Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* opCtx, siz } void AsyncRequestsSender::_handleResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - OperationContext* opCtx, - size_t remoteIndex) { + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) { stdx::lock_guard<stdx::mutex> lk(_mutex); auto& remote = _remotes[remoteIndex]; @@ -233,94 +249,37 @@ void AsyncRequestsSender::_handleResponse( // 'remote'. remote.cbHandle = executor::TaskExecutor::CallbackHandle(); - // On early return from this point on, signal anyone waiting on the current notification if - // _done() is true, since this might be the last outstanding request. - ScopeGuard signaller = - MakeGuard(&AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock, this); - - // We check both the response status and command status for a retriable error. - Status status = cbData.response.status; - if (status.isOK()) { - status = getStatusFromCommandResult(cbData.response.data); - if (status.isOK()) { - remote.swResponse = std::move(cbData.response); - return; - } - } - - // There was an error with either the response or the command. - - auto shard = remote.getShard(); - if (!shard) { - remote.swResponse = - Status(ErrorCodes::ShardNotFound, - str::stream() << "Could not find shard " << remote.shardId << " containing host " - << remote.getTargetHost().toString()); - return; - } - shard->updateReplSetMonitor(remote.getTargetHost(), status); - - if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && !_stopRetrying && - remote.retryCount < kMaxNumFailedHostRetryAttempts) { - LOG(1) << "Command to remote " << remote.shardId << " at host " << *remote.shardHostAndPort - << " failed with retriable error and will be retried" << causedBy(redact(status)); - ++remote.retryCount; - - // Even if _done() is not true, signal the thread sleeping in waitForResponses() to make - // it schedule a retry for this remote without waiting for all outstanding requests to - // come back. - signaller.Dismiss(); - _signalCurrentNotification_inlock(); + // Store the response or error. + if (cbData.response.status.isOK()) { + remote.swResponse = std::move(cbData.response); } else { - // Non-retriable error, out of retries, or _stopRetrying is true. - - // Even though we examined the command status to check for retriable errors, we just return - // the response or response status here. It is up to the caller to parse the response as - // a command result. - if (cbData.response.status.isOK()) { - remote.swResponse = std::move(cbData.response); - } else { - remote.swResponse = std::move(cbData.response.status); - } - - // If the caller can't use partial results, there's no point continuing to retry on - // retriable errors for other remotes. - if (!_allowPartialResults) { - _stopRetrying = true; - } + remote.swResponse = std::move(cbData.response.status); } -} -void AsyncRequestsSender::_signalCurrentNotification_inlock() { - // Only signal the notification if it has not already been signalled. + // Signal the notification indicating that the remote received a response. if (!*_notification) { _notification->set(); } } -void AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock() { - if (_done_inlock()) { - _signalCurrentNotification_inlock(); - } -} - AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj) : shardId(shardId), cmdObj(cmdObj) {} -AsyncRequestsSender::Response::Response(executor::RemoteCommandResponse response, HostAndPort hp) - : swResponse(std::move(response)), shardHostAndPort(std::move(hp)) {} +AsyncRequestsSender::Response::Response(ShardId shardId, + executor::RemoteCommandResponse response, + HostAndPort hp) + : shardId(std::move(shardId)), + swResponse(std::move(response)), + shardHostAndPort(std::move(hp)) {} -AsyncRequestsSender::Response::Response(Status status, boost::optional<HostAndPort> hp) - : swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {} +AsyncRequestsSender::Response::Response(ShardId shardId, + Status status, + boost::optional<HostAndPort> hp) + : shardId(std::move(shardId)), swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {} AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj) : shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {} -const HostAndPort& AsyncRequestsSender::RemoteData::getTargetHost() const { - invariant(shardHostAndPort); - return *shardHostAndPort; -} - Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( const ReadPreferenceSetting& readPref) { const auto shard = getShard(); |