diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-02 12:02:32 -0500 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-02 12:02:49 -0500 |
commit | 8c173ff0776c2c4ab1698a26aee2d087f973a3de (patch) | |
tree | 5a850c19ba9c7743c99cad4f3d2e5c7aba248891 | |
parent | 6753fb211150d85eed71a84393bd13f2a07a8865 (diff) | |
download | mongo-8c173ff0776c2c4ab1698a26aee2d087f973a3de.tar.gz |
SERVER-28173 create a component that sends requests in parallel to multiple hosts over ASIO
-rw-r--r-- | src/mongo/s/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 336 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 283 |
3 files changed, 633 insertions, 0 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 3ee3d34939f..f802a27739d 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -47,6 +47,20 @@ env.Library( ) env.Library( + target="async_requests_sender", + source=[ + "async_requests_sender.cpp", + ], + LIBDEPS=[ + "$BUILD_DIR/mongo/db/query/command_request_response", + "$BUILD_DIR/mongo/executor/task_executor_interface", + "$BUILD_DIR/mongo/s/client/sharding_client", + "$BUILD_DIR/mongo/s/coreshard", + '$BUILD_DIR/mongo/s/client/shard_interface', + ], +) + +env.Library( target='common', source=[ 'catalog/mongo_version_range.cpp', diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp new file mode 100644 index 00000000000..a3163050208 --- /dev/null +++ b/src/mongo/s/async_requests_sender.cpp @@ -0,0 +1,336 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/s/async_requests_sender.h" + +#include "mongo/client/remote_command_targeter.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace { + +// Maximum number of retries for network and replication notMaster errors (per host). +const int kMaxNumFailedHostRetryAttempts = 3; + +} // namespace + +AsyncRequestsSender::AsyncRequestsSender(OperationContext* txn, + executor::TaskExecutor* executor, + std::string db, + const std::vector<AsyncRequestsSender::Request>& requests, + const ReadPreferenceSetting& readPreference, + bool allowPartialResults) + : _executor(executor), + _db(std::move(db)), + _readPreference(readPreference), + _allowPartialResults(allowPartialResults) { + for (const auto& request : requests) { + _remotes.emplace_back(request.shardId, request.cmdObj); + } + + // Initialize command metadata to handle the read preference. + BSONObjBuilder metadataBuilder; + rpc::ServerSelectionMetadata metadata(_readPreference.pref != ReadPreference::PrimaryOnly, + boost::none); + uassertStatusOK(metadata.writeToMetadata(&metadataBuilder)); + _metadataObj = metadataBuilder.obj(); + + // Schedule the requests immediately. + _scheduleRequestsIfNeeded(txn); +} + +AsyncRequestsSender::~AsyncRequestsSender() { + invariant(_done()); +} + +std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses( + OperationContext* txn) { + // Until all remotes have received a response or error, keep scheduling retries and waiting on + // outstanding requests. + while (!_done()) { + _notification->get(); + + // Note: if we have been interrupt()'d or if some remote had a non-retriable error and + // allowPartialResults is false, no retries will be scheduled. + _scheduleRequestsIfNeeded(txn); + } + + // Construct the responses. + std::vector<Response> responses; + for (const auto& remote : _remotes) { + invariant(remote.swResponse); + if (remote.swResponse->isOK()) { + invariant(remote.shardHostAndPort); + responses.emplace_back(remote.swResponse->getValue(), *remote.shardHostAndPort); + } else { + responses.emplace_back(remote.swResponse->getStatus()); + } + } + return responses; +} + +void AsyncRequestsSender::interrupt() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _stopRetrying = true; +} + +void AsyncRequestsSender::kill() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _stopRetrying = true; + + // Cancel all outstanding requests so they return immediately. + for (auto& remote : _remotes) { + if (remote.cbHandle.isValid()) { + _executor->cancel(remote.cbHandle); + } + } +} + +bool AsyncRequestsSender::_done() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _done_inlock(); +} + +bool AsyncRequestsSender::_done_inlock() { + for (const auto& remote : _remotes) { + if (!remote.swResponse) { + return false; + } + } + return true; +} + +/* + * Note: If _scheduleRequestsIfNeeded() does retries, only the remotes with retriable errors will be + * rescheduled because: + * + * 1. Other pending remotes still have callback assigned to them. + * 2. Remotes that already successfully received a response will have a non-empty 'response'. + * 3. Remotes that have reached maximum retries will have an error status. + */ +void AsyncRequestsSender::_scheduleRequestsIfNeeded(OperationContext* txn) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // We can't make a new notification if there was a previous one that has not been signaled. + invariant(!_notification || *_notification); + + if (_done_inlock()) { + return; + } + + _notification.emplace(); + + if (_stopRetrying) { + return; + } + + // Schedule remote work on hosts for which we have not sent a request or need to retry. + for (size_t i = 0; i < _remotes.size(); ++i) { + auto& remote = _remotes[i]; + + // If we have not yet received a response or error for this remote, and we do not have an + // outstanding request for this remote, schedule remote work to send the command. + if (!remote.swResponse && !remote.cbHandle.isValid()) { + auto scheduleStatus = _scheduleRequest_inlock(txn, i); + if (!scheduleStatus.isOK()) { + // Being unable to schedule a request to a remote is a non-retriable error. + remote.swResponse = std::move(scheduleStatus); + + // If partial results are not allowed, stop scheduling requests on other remotes and + // just wait for outstanding requests to come back. + if (!_allowPartialResults) { + _stopRetrying = true; + break; + } + } + } + } +} + +Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* txn, size_t remoteIndex) { + auto& remote = _remotes[remoteIndex]; + + invariant(!remote.cbHandle.isValid()); + invariant(!remote.swResponse); + + Status resolveStatus = remote.resolveShardIdToHostAndPort(_readPreference); + if (!resolveStatus.isOK()) { + return resolveStatus; + } + + executor::RemoteCommandRequest request( + remote.getTargetHost(), _db, remote.cmdObj, _metadataObj, txn); + + auto callbackStatus = _executor->scheduleRemoteCommand( + request, + stdx::bind( + &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, txn, remoteIndex)); + if (!callbackStatus.isOK()) { + return callbackStatus.getStatus(); + } + + remote.cbHandle = callbackStatus.getValue(); + return Status::OK(); +} + +void AsyncRequestsSender::_handleResponse( + const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, + OperationContext* txn, + size_t remoteIndex) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + auto& remote = _remotes[remoteIndex]; + invariant(!remote.swResponse); + + // Clear the callback handle. This indicates that we are no longer waiting on a response from + // 'remote'. + remote.cbHandle = executor::TaskExecutor::CallbackHandle(); + + // On early return from this point on, signal anyone waiting on the current notification if + // _done() is true, since this might be the last outstanding request. + ScopeGuard signaller = + MakeGuard(&AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock, this); + + // We check both the response status and command status for a retriable error. + Status status = cbData.response.status; + if (status.isOK()) { + status = getStatusFromCommandResult(cbData.response.data); + if (status.isOK()) { + remote.swResponse = std::move(cbData.response); + return; + } + } + + // There was an error with either the response or the command. + + auto shard = remote.getShard(); + if (!shard) { + remote.swResponse = + Status(ErrorCodes::ShardNotFound, + str::stream() << "Could not find shard " << remote.shardId << " containing host " + << remote.getTargetHost().toString()); + return; + } + shard->updateReplSetMonitor(remote.getTargetHost(), status); + + if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && !_stopRetrying && + remote.retryCount < kMaxNumFailedHostRetryAttempts) { + LOG(1) << "Command to remote " << remote.shardId << " at host " << *remote.shardHostAndPort + << " failed with retriable error and will be retried" << causedBy(redact(status)); + ++remote.retryCount; + + // Even if _done() is not true, signal the thread sleeping in waitForResponses() to make + // it schedule a retry for this remote without waiting for all outstanding requests to + // come back. + signaller.Dismiss(); + _signalCurrentNotification_inlock(); + } else { + // Non-retriable error, out of retries, or _stopRetrying is true. + + // Even though we examined the command status to check for retriable errors, we just return + // the response or response status here. It is up to the caller to parse the response as + // a command result. + if (cbData.response.status.isOK()) { + remote.swResponse = std::move(cbData.response); + } else { + remote.swResponse = std::move(cbData.response.status); + } + + // If the caller can't use partial results, there's no point continuing to retry on + // retriable errors for other remotes. + if (!_allowPartialResults) { + _stopRetrying = true; + } + } +} + +void AsyncRequestsSender::_signalCurrentNotification_inlock() { + // Only signal the notification if it has not already been signalled. + if (!*_notification) { + _notification->set(); + } +} + +void AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock() { + if (_done_inlock()) { + _signalCurrentNotification_inlock(); + } +} + +AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj) + : shardId(shardId), cmdObj(cmdObj) {} + +AsyncRequestsSender::Response::Response(executor::RemoteCommandResponse response, HostAndPort hp) + : swResponse(response), shardHostAndPort(hp) {} + +AsyncRequestsSender::Response::Response(Status status) : swResponse(status) {} + +AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj) + : shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {} + +const HostAndPort& AsyncRequestsSender::RemoteData::getTargetHost() const { + invariant(shardHostAndPort); + return *shardHostAndPort; +} + +Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( + const ReadPreferenceSetting& readPref) { + const auto shard = getShard(); + if (!shard) { + return Status(ErrorCodes::ShardNotFound, + str::stream() << "Could not find shard " << shardId); + } + + auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20}); + if (!findHostStatus.isOK()) { + return findHostStatus.getStatus(); + } + + shardHostAndPort = std::move(findHostStatus.getValue()); + + return Status::OK(); +} + +std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() { + // TODO: Pass down an OperationContext* to use here. + return grid.shardRegistry()->getShardNoReload(shardId); +} + +} // namespace mongo diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h new file mode 100644 index 00000000000..6bbf1f9d0f3 --- /dev/null +++ b/src/mongo/s/async_requests_sender.h @@ -0,0 +1,283 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/client/read_preference.h" +#include "mongo/executor/remote_command_response.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/shard_id.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/notification.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +/** + * The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel and + * automatically retrying on retriable errors according to a RetryPolicy. It can also allow for + * retrieving partial results by ignoring shards that return errors. + * + * Work on remote nodes is accomplished by scheduling remote work in a TaskExecutor's event loop. + * + * Typical usage is: + * + * AsyncRequestsSender ars(txn, executor, db, requests, readPrefSetting); // schedule the requests + * auto responses = ars.waitForResponses(txn); // wait for responses; retries on retriable erors + * + * Additionally, you can interrupt() (if you want waitForResponses() to wait for responses for + * outstanding requests but stop scheduling retries) or kill() (if you want to cancel outstanding + * requests) the ARS from another thread. + * + * Does not throw exceptions. + */ +class AsyncRequestsSender { + MONGO_DISALLOW_COPYING(AsyncRequestsSender); + +public: + /** + * Defines a request to a remote shard that can be run by the ARS. + */ + struct Request { + Request(ShardId shardId, BSONObj cmdObj); + + // ShardId of the shard to which the command will be sent. + const ShardId shardId; + + // The command object to send to the remote host. + const BSONObj cmdObj; + }; + + /** + * Defines a response for a request to a remote shard. + */ + struct Response { + // Constructor for a response that was successfully received. + Response(executor::RemoteCommandResponse response, HostAndPort hp); + + // Constructor that specifies the reason the response was not successfully received. + Response(Status status); + + // The response or error from the remote. + StatusWith<executor::RemoteCommandResponse> swResponse; + + // The exact host on which the remote command was run. Is unset if swResponse has a non-OK + // status. + boost::optional<HostAndPort> shardHostAndPort; + }; + + /** + * Constructs a new AsyncRequestsSender. The TaskExecutor* must remain valid for the lifetime of + * the ARS. + */ + AsyncRequestsSender(OperationContext* txn, + executor::TaskExecutor* executor, + std::string db, + const std::vector<AsyncRequestsSender::Request>& requests, + const ReadPreferenceSetting& readPreference, + bool allowPartialResults = false); + + ~AsyncRequestsSender(); + + /** + * Returns a vector containing the responses or errors for each remote in the same order as the + * input vector that was passed in the constructor. + * + * If we were killed, returns immediately. + * If we were interrupted, returns when any outstanding requests have completed. + * Otherwise, returns when each remote has received a response or error. + */ + std::vector<Response> waitForResponses(OperationContext* txn); + + /** + * Stops the ARS from retrying requests. Causes waitForResponses() to wait until any outstanding + * requests have received a response or error. + * + * Use this if you no longer care about getting success responses, but need to do cleanup based + * on responses for requests that have already been dispatched. + */ + void interrupt(); + + /** + * Cancels all outstanding requests and makes waitForResponses() return immediately. + * + * Use this if you no longer care about getting success responses, and don't need to process + * responses for outstanding requests. + */ + void kill(); + +private: + /** + * Returns true if each remote has received a response or error. (If kill() has been called, + * the error is the error assigned by the TaskExecutor when a callback is canceled). + */ + bool _done(); + + /** + * Executes the logic of _done(). + */ + bool _done_inlock(); + + /** + * Replaces _notification with a new notification. + * + * If _stopRetrying is false, for each remote that does not have a response or outstanding + * request, schedules work to send the command to the remote. + * + * Invalid to call if there is an existing Notification and it has not yet been signaled. + */ + void _scheduleRequestsIfNeeded(OperationContext* txn); + + /** + * Helper to schedule a command to a remote. + * + * The 'remoteIndex' gives the position of the remote node from which we are retrieving the + * batch in '_remotes'. + * + * Returns success if the command to retrieve the next batch was scheduled successfully. + */ + Status _scheduleRequest_inlock(OperationContext* txn, size_t remoteIndex); + + /** + * The callback for a remote command. + * + * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore + * indicates which node the response came from and where the response should be buffered. + * + * On a retriable error, unless _stopRetrying is true, signals the notification so that the + * request can be immediately retried. + * + * On a non-retriable error, if allowPartialResults is false, sets _stopRetrying to true. + */ + void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, + OperationContext* txn, + size_t remoteIndex); + + /** + * If the existing notification has not yet been signaled, signals it and marks it as signaled. + */ + void _signalCurrentNotification_inlock(); + + /** + * Wrapper around signalCurrentNotification_inlock(); only signals the notification if _done() + * is true. + */ + void _signalCurrentNotificationIfDone_inlock(); + + /** + * We instantiate one of these per remote host. + */ + struct RemoteData { + /** + * Creates a new uninitialized remote state with a command to send. + */ + RemoteData(ShardId shardId, BSONObj cmdObj); + + /** + * Returns the resolved host and port on which the remote command was or will be run. + */ + const HostAndPort& getTargetHost() const; + + /** + * Given a read preference, selects a host on which the command should be run. + */ + Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); + + /** + * Returns the Shard object associated with this remote. + */ + std::shared_ptr<Shard> getShard(); + + // ShardId of the shard to which the command will be sent. + const ShardId shardId; + + // The command object to send to the remote host. + BSONObj cmdObj; + + // The response or error from the remote. Is unset until a response or error has been + // received. + boost::optional<StatusWith<executor::RemoteCommandResponse>> swResponse; + + // The exact host on which the remote command was run. Is unset until a request has been + // sent. + boost::optional<HostAndPort> shardHostAndPort; + + // The number of times we've retried sending the command to this remote. + int retryCount = 0; + + // The callback handle to an outstanding request for this remote. + executor::TaskExecutor::CallbackHandle cbHandle; + }; + + /** + * Used internally to determine if the ARS should attempt to retry any requests. Is set to true + * when: + * - interrupt() or kill() is called + * - allowPartialResults is false and some remote has a non-retriable error (or exhausts its + * retries for a retriable error). + */ + bool _stopRetrying = false; + + // Not owned here. + executor::TaskExecutor* _executor; + + // The metadata obj to pass along with the command remote. Used to indicate that the command is + // ok to run on secondaries. + BSONObj _metadataObj; + + // The database against which the commands are run. + const std::string _db; + + // The readPreference to use for all requests. + ReadPreferenceSetting _readPreference; + + // If set to true, allows for skipping over hosts that have non-retriable errors or exhaust + // their retries. + bool _allowPartialResults = false; + + // Must be acquired before accessing any data members. + // Must also be held when calling any of the '_inlock()' helper functions. + stdx::mutex _mutex; + + // Data tracking the state of our communication with each of the remote nodes. + std::vector<RemoteData> _remotes; + + // A notification that gets signaled when a remote has a retriable error or the last outstanding + // response is received. + boost::optional<Notification<void>> _notification; +}; + +} // namespace mongo |