/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
#include "mongo/s/async_requests_sender.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
// Maximum number of retries for network and replication notMaster errors (per host).
const int kMaxNumFailedHostRetryAttempts = 3;
} // namespace
AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
executor::TaskExecutor* executor,
StringData dbName,
const std::vector& requests,
const ReadPreferenceSetting& readPreference,
Shard::RetryPolicy retryPolicy)
: _opCtx(opCtx),
_executor(executor),
_db(dbName.toString()),
_readPreference(readPreference),
_retryPolicy(retryPolicy) {
for (const auto& request : requests) {
_remotes.emplace_back(request.shardId, request.cmdObj);
}
// Initialize command metadata to handle the read preference.
_metadataObj = readPreference.toContainingBSON();
// Schedule the requests immediately.
// We must create the notification before scheduling any requests, because the notification is
// signaled both on an error in scheduling the request and a request's callback.
_notification.emplace();
// We lock so that no callbacks signal the notification until after we are done scheduling
// requests, to prevent signaling the notification twice, which is illegal.
stdx::lock_guard lk(_mutex);
_scheduleRequests(lk);
}
AsyncRequestsSender::~AsyncRequestsSender() {
_cancelPendingRequests();
// Wait on remaining callbacks to run.
while (!done()) {
next();
}
}
AsyncRequestsSender::Response AsyncRequestsSender::next() {
invariant(!done());
// If needed, schedule requests for all remotes which had retriable errors.
// If some remote had success or a non-retriable error, return it.
boost::optional readyResponse;
while (!(readyResponse = _ready())) {
// Otherwise, wait for some response to be received.
if (_interruptStatus.isOK()) {
try {
_notification->get(_opCtx);
} catch (const AssertionException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
_interruptStatus = ex.toStatus();
_cancelPendingRequests();
continue;
}
} else {
_notification->get();
}
}
return *readyResponse;
}
void AsyncRequestsSender::stopRetrying() {
stdx::lock_guard lk(_mutex);
_stopRetrying = true;
}
bool AsyncRequestsSender::done() {
stdx::lock_guard lk(_mutex);
return std::all_of(
_remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; });
}
void AsyncRequestsSender::_cancelPendingRequests() {
stdx::lock_guard lk(_mutex);
_stopRetrying = true;
// Cancel all outstanding requests so they return immediately.
for (auto& remote : _remotes) {
if (remote.cbHandle.isValid()) {
_executor->cancel(remote.cbHandle);
}
}
}
boost::optional AsyncRequestsSender::_ready() {
stdx::lock_guard lk(_mutex);
_notification.emplace();
if (!_stopRetrying) {
_scheduleRequests(lk);
}
// Check if any remote is ready.
invariant(!_remotes.empty());
for (auto& remote : _remotes) {
if (remote.swResponse && !remote.done) {
remote.done = true;
if (remote.swResponse->isOK()) {
invariant(remote.shardHostAndPort);
return Response(std::move(remote.shardId),
std::move(remote.swResponse->getValue()),
std::move(*remote.shardHostAndPort));
} else {
// If _interruptStatus is set, promote CallbackCanceled errors to it.
if (!_interruptStatus.isOK() &&
ErrorCodes::CallbackCanceled == remote.swResponse->getStatus().code()) {
remote.swResponse = _interruptStatus;
}
return Response(std::move(remote.shardId),
std::move(remote.swResponse->getStatus()),
std::move(remote.shardHostAndPort));
}
}
}
// No remotes were ready.
return boost::none;
}
void AsyncRequestsSender::_scheduleRequests(WithLock lk) {
invariant(!_stopRetrying);
// Schedule remote work on hosts for which we have not sent a request or need to retry.
for (size_t i = 0; i < _remotes.size(); ++i) {
auto& remote = _remotes[i];
// First check if the remote had a retriable error, and if so, clear its response field so
// it will be retried.
if (remote.swResponse && !remote.done) {
// We check both the response status and command status for a retriable error.
Status status = remote.swResponse->getStatus();
if (status.isOK()) {
status = getStatusFromCommandResult(remote.swResponse->getValue().data);
}
if (!status.isOK()) {
// There was an error with either the response or the command.
auto shard = remote.getShard();
if (!shard) {
remote.swResponse =
Status(ErrorCodes::ShardNotFound,
str::stream() << "Could not find shard " << remote.shardId);
} else {
if (remote.shardHostAndPort) {
shard->updateReplSetMonitor(*remote.shardHostAndPort, status);
}
if (shard->isRetriableError(status.code(), _retryPolicy) &&
remote.retryCount < kMaxNumFailedHostRetryAttempts) {
LOG(1) << "Command to remote " << remote.shardId << " at host "
<< *remote.shardHostAndPort
<< " failed with retriable error and will be retried "
<< causedBy(redact(status));
++remote.retryCount;
remote.swResponse.reset();
}
}
}
}
// If the remote does not have a response or pending request, schedule remote work for it.
if (!remote.swResponse && !remote.cbHandle.isValid()) {
auto scheduleStatus = _scheduleRequest(lk, i);
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
// Signal the notification indicating the remote had an error (we need to do this
// because no request was scheduled, so no callback for this remote will run and
// signal the notification).
if (!*_notification) {
_notification->set();
}
}
}
}
}
Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
invariant(!remote.swResponse);
Status resolveStatus = remote.resolveShardIdToHostAndPort(_readPreference);
if (!resolveStatus.isOK()) {
return resolveStatus;
}
executor::RemoteCommandRequest request(
*remote.shardHostAndPort, _db, remote.cmdObj, _metadataObj, _opCtx);
auto callbackStatus = _executor->scheduleRemoteCommand(
request, [=](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
_handleResponse(cbData, remoteIndex);
});
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
remote.cbHandle = callbackStatus.getValue();
return Status::OK();
}
void AsyncRequestsSender::_handleResponse(
const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
stdx::lock_guard lk(_mutex);
auto& remote = _remotes[remoteIndex];
invariant(!remote.swResponse);
// Clear the callback handle. This indicates that we are no longer waiting on a response from
// 'remote'.
remote.cbHandle = executor::TaskExecutor::CallbackHandle();
// Store the response or error.
if (cbData.response.status.isOK()) {
remote.swResponse = std::move(cbData.response);
} else {
remote.swResponse = std::move(cbData.response.status);
}
// Signal the notification indicating that a remote received a response.
if (!*_notification) {
_notification->set();
}
}
AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj)
: shardId(shardId), cmdObj(cmdObj) {}
AsyncRequestsSender::Response::Response(ShardId shardId,
executor::RemoteCommandResponse response,
HostAndPort hp)
: shardId(std::move(shardId)),
swResponse(std::move(response)),
shardHostAndPort(std::move(hp)) {}
AsyncRequestsSender::Response::Response(ShardId shardId,
Status status,
boost::optional hp)
: shardId(std::move(shardId)), swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {}
AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj)
: shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {}
Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
const ReadPreferenceSetting& readPref) {
const auto shard = getShard();
if (!shard) {
return Status(ErrorCodes::ShardNotFound,
str::stream() << "Could not find shard " << shardId);
}
auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20});
if (!findHostStatus.isOK()) {
return findHostStatus.getStatus();
}
shardHostAndPort = std::move(findHostStatus.getValue());
return Status::OK();
}
std::shared_ptr AsyncRequestsSender::RemoteData::getShard() {
// TODO: Pass down an OperationContext* to use here.
return grid.shardRegistry()->getShardNoReload(shardId);
}
} // namespace mongo