/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include "mongo/s/async_requests_sender.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/server_parameters.h" #include "mongo/executor/remote_command_request.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/transport/baton.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { MONGO_EXPORT_SERVER_PARAMETER(AsyncRequestsSenderUseBaton, bool, true); namespace { // Maximum number of retries for network and replication notMaster errors (per host). const int kMaxNumFailedHostRetryAttempts = 3; } // namespace AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, executor::TaskExecutor* executor, StringData dbName, const std::vector& requests, const ReadPreferenceSetting& readPreference, Shard::RetryPolicy retryPolicy) : _opCtx(opCtx), _executor(executor), _baton(opCtx), _db(dbName.toString()), _readPreference(readPreference), _retryPolicy(retryPolicy) { for (const auto& request : requests) { auto cmdObj = request.cmdObj; _remotes.emplace_back(request.shardId, cmdObj); } // Initialize command metadata to handle the read preference. _metadataObj = readPreference.toContainingBSON(); // Schedule the requests immediately. _scheduleRequests(); } AsyncRequestsSender::~AsyncRequestsSender() { _cancelPendingRequests(); // Wait on remaining callbacks to run. while (!done()) { next(); } } AsyncRequestsSender::Response AsyncRequestsSender::next() { invariant(!done()); // If needed, schedule requests for all remotes which had retriable errors. // If some remote had success or a non-retriable error, return it. boost::optional readyResponse; while (!(readyResponse = _ready())) { // Otherwise, wait for some response to be received. if (_interruptStatus.isOK()) { try { _makeProgress(_opCtx); } catch (const AssertionException& ex) { // If the operation is interrupted, we cancel outstanding requests and switch to // waiting for the (canceled) callbacks to finish without checking for interrupts. _interruptStatus = ex.toStatus(); _cancelPendingRequests(); continue; } } else { _makeProgress(nullptr); } } return *readyResponse; } void AsyncRequestsSender::stopRetrying() { _stopRetrying = true; } bool AsyncRequestsSender::done() { return std::all_of( _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; }); } void AsyncRequestsSender::_cancelPendingRequests() { _stopRetrying = true; // Cancel all outstanding requests so they return immediately. for (auto& remote : _remotes) { if (remote.cbHandle.isValid()) { _executor->cancel(remote.cbHandle); } } } boost::optional AsyncRequestsSender::_ready() { if (!_stopRetrying) { _scheduleRequests(); } // If we have baton requests, we want to process those before proceeding if (_batonRequests) { return boost::none; } // Check if any remote is ready. invariant(!_remotes.empty()); for (auto& remote : _remotes) { if (remote.swResponse && !remote.done) { remote.done = true; if (remote.swResponse->isOK()) { invariant(remote.shardHostAndPort); return Response(std::move(remote.shardId), std::move(remote.swResponse->getValue()), std::move(*remote.shardHostAndPort)); } else { // If _interruptStatus is set, promote CallbackCanceled errors to it. if (!_interruptStatus.isOK() && ErrorCodes::CallbackCanceled == remote.swResponse->getStatus().code()) { remote.swResponse = _interruptStatus; } return Response(std::move(remote.shardId), std::move(remote.swResponse->getStatus()), std::move(remote.shardHostAndPort)); } } } // No remotes were ready. return boost::none; } void AsyncRequestsSender::_scheduleRequests() { invariant(!_stopRetrying); // Schedule remote work on hosts for which we have not sent a request or need to retry. for (size_t i = 0; i < _remotes.size(); ++i) { auto& remote = _remotes[i]; // First check if the remote had a retriable error, and if so, clear its response field so // it will be retried. if (remote.swResponse && !remote.done) { // We check both the response status and command status for a retriable error. Status status = remote.swResponse->getStatus(); if (status.isOK()) { status = getStatusFromCommandResult(remote.swResponse->getValue().data); } if (!status.isOK()) { // There was an error with either the response or the command. auto shard = remote.getShard(); if (!shard) { remote.swResponse = Status(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << remote.shardId); } else { if (remote.shardHostAndPort) { shard->updateReplSetMonitor(*remote.shardHostAndPort, status); } if (shard->isRetriableError(status.code(), _retryPolicy) && remote.retryCount < kMaxNumFailedHostRetryAttempts) { LOG(1) << "Command to remote " << remote.shardId << " at host " << *remote.shardHostAndPort << " failed with retriable error and will be retried " << causedBy(redact(status)); ++remote.retryCount; remote.swResponse.reset(); } } } } // If the remote does not have a response or pending request, schedule remote work for it. if (!remote.swResponse && !remote.cbHandle.isValid()) { auto scheduleStatus = _scheduleRequest(i); if (!scheduleStatus.isOK()) { remote.swResponse = std::move(scheduleStatus); if (_baton) { _batonRequests++; _baton->schedule([this] { _batonRequests--; }); } // Push a noop response to the queue to indicate that a remote is ready for // re-processing due to failure. _responseQueue.push(boost::none); } } } } Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); invariant(!remote.swResponse); Status resolveStatus = remote.resolveShardIdToHostAndPort(this, _readPreference); if (!resolveStatus.isOK()) { return resolveStatus; } executor::RemoteCommandRequest request( *remote.shardHostAndPort, _db, remote.cmdObj, _metadataObj, _opCtx); auto callbackStatus = _executor->scheduleRemoteCommand( request, [remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { if (_baton) { _batonRequests++; _baton->schedule([this] { _batonRequests--; }); } _responseQueue.push(Job{cbData, remoteIndex}); }, _baton); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } remote.cbHandle = callbackStatus.getValue(); return Status::OK(); } // Passing opCtx means you'd like to opt into opCtx interruption. During cleanup we actually don't. void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) { invariant(!opCtx || opCtx == _opCtx); boost::optional job; if (_baton) { // If we're using a baton, we peek the queue, and block on the baton if it's empty if (boost::optional> tryJob = _responseQueue.tryPop()) { job = std::move(*tryJob); } else { _baton->run(opCtx, boost::none); } } else { // Otherwise we block on the queue job = opCtx ? _responseQueue.pop(opCtx) : _responseQueue.pop(); } if (!job) { return; } auto& remote = _remotes[job->remoteIndex]; invariant(!remote.swResponse); // Clear the callback handle. This indicates that we are no longer waiting on a response from // 'remote'. remote.cbHandle = executor::TaskExecutor::CallbackHandle(); // Store the response or error. if (job->cbData.response.status.isOK()) { remote.swResponse = std::move(job->cbData.response); } else { // TODO: call participant.markAsCommandSent on "transaction already started" errors? remote.swResponse = std::move(job->cbData.response.status); } } AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj) : shardId(shardId), cmdObj(cmdObj) {} AsyncRequestsSender::Response::Response(ShardId shardId, executor::RemoteCommandResponse response, HostAndPort hp) : shardId(std::move(shardId)), swResponse(std::move(response)), shardHostAndPort(std::move(hp)) {} AsyncRequestsSender::Response::Response(ShardId shardId, Status status, boost::optional hp) : shardId(std::move(shardId)), swResponse(std::move(status)), shardHostAndPort(std::move(hp)) {} AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj) : shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {} Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( AsyncRequestsSender* ars, const ReadPreferenceSetting& readPref) { const auto shard = getShard(); if (!shard) { return Status(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << shardId); } auto clock = ars->_opCtx->getServiceContext()->getFastClockSource(); auto deadline = clock->now() + Seconds(20); auto targeter = shard->getTargeter(); auto findHostStatus = [&] { // If we don't have a baton, just go ahead and block in targeting if (!ars->_baton) { return targeter->findHostWithMaxWait(readPref, Seconds{20}); } // If we do have a baton, and we can target quickly, just do that { auto findHostStatus = targeter->findHostNoWait(readPref); if (findHostStatus.isOK()) { return findHostStatus; } } // If it's going to take a while to target, we spin up a background thread to do our // targeting, while running the baton on the calling thread. This allows us to make forward // progress on previous requests. auto pf = makePromiseFuture(); ars->_batonRequests++; stdx::thread bgChecker([&] { pf.promise.setWith( [&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); }); ars->_baton->schedule([ars] { ars->_batonRequests--; }); }); const auto guard = MakeGuard([&] { bgChecker.join(); }); while (!pf.future.isReady()) { if (!ars->_baton->run(nullptr, deadline)) { break; } } return pf.future.getNoThrow(); }(); if (!findHostStatus.isOK()) { return findHostStatus.getStatus(); } shardHostAndPort = std::move(findHostStatus.getValue()); return Status::OK(); } std::shared_ptr AsyncRequestsSender::RemoteData::getShard() { // TODO: Pass down an OperationContext* to use here. return Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(shardId); } AsyncRequestsSender::BatonDetacher::BatonDetacher(OperationContext* opCtx) : _baton(AsyncRequestsSenderUseBaton.load() ? (opCtx->getServiceContext()->getTransportLayer() ? opCtx->getServiceContext()->getTransportLayer()->makeBaton(opCtx) : nullptr) : nullptr) {} AsyncRequestsSender::BatonDetacher::~BatonDetacher() { if (_baton) { _baton->detach(); } } } // namespace mongo