summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-02 12:02:32 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-03-02 12:02:49 -0500
commit8c173ff0776c2c4ab1698a26aee2d087f973a3de (patch)
tree5a850c19ba9c7743c99cad4f3d2e5c7aba248891
parent6753fb211150d85eed71a84393bd13f2a07a8865 (diff)
downloadmongo-8c173ff0776c2c4ab1698a26aee2d087f973a3de.tar.gz
SERVER-28173 create a component that sends requests in parallel to multiple hosts over ASIO
-rw-r--r--src/mongo/s/SConscript14
-rw-r--r--src/mongo/s/async_requests_sender.cpp336
-rw-r--r--src/mongo/s/async_requests_sender.h283
3 files changed, 633 insertions, 0 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 3ee3d34939f..f802a27739d 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -47,6 +47,20 @@ env.Library(
)
env.Library(
+ target="async_requests_sender",
+ source=[
+ "async_requests_sender.cpp",
+ ],
+ LIBDEPS=[
+ "$BUILD_DIR/mongo/db/query/command_request_response",
+ "$BUILD_DIR/mongo/executor/task_executor_interface",
+ "$BUILD_DIR/mongo/s/client/sharding_client",
+ "$BUILD_DIR/mongo/s/coreshard",
+ '$BUILD_DIR/mongo/s/client/shard_interface',
+ ],
+)
+
+env.Library(
target='common',
source=[
'catalog/mongo_version_range.cpp',
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
new file mode 100644
index 00000000000..a3163050208
--- /dev/null
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -0,0 +1,336 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/async_requests_sender.h"
+
+#include "mongo/client/remote_command_targeter.h"
+#include "mongo/executor/remote_command_request.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace {
+
+// Maximum number of retries for network and replication notMaster errors (per host).
+const int kMaxNumFailedHostRetryAttempts = 3;
+
+} // namespace
+
+AsyncRequestsSender::AsyncRequestsSender(OperationContext* txn,
+ executor::TaskExecutor* executor,
+ std::string db,
+ const std::vector<AsyncRequestsSender::Request>& requests,
+ const ReadPreferenceSetting& readPreference,
+ bool allowPartialResults)
+ : _executor(executor),
+ _db(std::move(db)),
+ _readPreference(readPreference),
+ _allowPartialResults(allowPartialResults) {
+ for (const auto& request : requests) {
+ _remotes.emplace_back(request.shardId, request.cmdObj);
+ }
+
+ // Initialize command metadata to handle the read preference.
+ BSONObjBuilder metadataBuilder;
+ rpc::ServerSelectionMetadata metadata(_readPreference.pref != ReadPreference::PrimaryOnly,
+ boost::none);
+ uassertStatusOK(metadata.writeToMetadata(&metadataBuilder));
+ _metadataObj = metadataBuilder.obj();
+
+ // Schedule the requests immediately.
+ _scheduleRequestsIfNeeded(txn);
+}
+
+AsyncRequestsSender::~AsyncRequestsSender() {
+ invariant(_done());
+}
+
+std::vector<AsyncRequestsSender::Response> AsyncRequestsSender::waitForResponses(
+ OperationContext* txn) {
+ // Until all remotes have received a response or error, keep scheduling retries and waiting on
+ // outstanding requests.
+ while (!_done()) {
+ _notification->get();
+
+ // Note: if we have been interrupt()'d or if some remote had a non-retriable error and
+ // allowPartialResults is false, no retries will be scheduled.
+ _scheduleRequestsIfNeeded(txn);
+ }
+
+ // Construct the responses.
+ std::vector<Response> responses;
+ for (const auto& remote : _remotes) {
+ invariant(remote.swResponse);
+ if (remote.swResponse->isOK()) {
+ invariant(remote.shardHostAndPort);
+ responses.emplace_back(remote.swResponse->getValue(), *remote.shardHostAndPort);
+ } else {
+ responses.emplace_back(remote.swResponse->getStatus());
+ }
+ }
+ return responses;
+}
+
+void AsyncRequestsSender::interrupt() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _stopRetrying = true;
+}
+
+void AsyncRequestsSender::kill() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _stopRetrying = true;
+
+ // Cancel all outstanding requests so they return immediately.
+ for (auto& remote : _remotes) {
+ if (remote.cbHandle.isValid()) {
+ _executor->cancel(remote.cbHandle);
+ }
+ }
+}
+
+bool AsyncRequestsSender::_done() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _done_inlock();
+}
+
+bool AsyncRequestsSender::_done_inlock() {
+ for (const auto& remote : _remotes) {
+ if (!remote.swResponse) {
+ return false;
+ }
+ }
+ return true;
+}
+
+/*
+ * Note: If _scheduleRequestsIfNeeded() does retries, only the remotes with retriable errors will be
+ * rescheduled because:
+ *
+ * 1. Other pending remotes still have callback assigned to them.
+ * 2. Remotes that already successfully received a response will have a non-empty 'response'.
+ * 3. Remotes that have reached maximum retries will have an error status.
+ */
+void AsyncRequestsSender::_scheduleRequestsIfNeeded(OperationContext* txn) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // We can't make a new notification if there was a previous one that has not been signaled.
+ invariant(!_notification || *_notification);
+
+ if (_done_inlock()) {
+ return;
+ }
+
+ _notification.emplace();
+
+ if (_stopRetrying) {
+ return;
+ }
+
+ // Schedule remote work on hosts for which we have not sent a request or need to retry.
+ for (size_t i = 0; i < _remotes.size(); ++i) {
+ auto& remote = _remotes[i];
+
+ // If we have not yet received a response or error for this remote, and we do not have an
+ // outstanding request for this remote, schedule remote work to send the command.
+ if (!remote.swResponse && !remote.cbHandle.isValid()) {
+ auto scheduleStatus = _scheduleRequest_inlock(txn, i);
+ if (!scheduleStatus.isOK()) {
+ // Being unable to schedule a request to a remote is a non-retriable error.
+ remote.swResponse = std::move(scheduleStatus);
+
+ // If partial results are not allowed, stop scheduling requests on other remotes and
+ // just wait for outstanding requests to come back.
+ if (!_allowPartialResults) {
+ _stopRetrying = true;
+ break;
+ }
+ }
+ }
+ }
+}
+
+Status AsyncRequestsSender::_scheduleRequest_inlock(OperationContext* txn, size_t remoteIndex) {
+ auto& remote = _remotes[remoteIndex];
+
+ invariant(!remote.cbHandle.isValid());
+ invariant(!remote.swResponse);
+
+ Status resolveStatus = remote.resolveShardIdToHostAndPort(_readPreference);
+ if (!resolveStatus.isOK()) {
+ return resolveStatus;
+ }
+
+ executor::RemoteCommandRequest request(
+ remote.getTargetHost(), _db, remote.cmdObj, _metadataObj, txn);
+
+ auto callbackStatus = _executor->scheduleRemoteCommand(
+ request,
+ stdx::bind(
+ &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, txn, remoteIndex));
+ if (!callbackStatus.isOK()) {
+ return callbackStatus.getStatus();
+ }
+
+ remote.cbHandle = callbackStatus.getValue();
+ return Status::OK();
+}
+
+void AsyncRequestsSender::_handleResponse(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
+ OperationContext* txn,
+ size_t remoteIndex) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ auto& remote = _remotes[remoteIndex];
+ invariant(!remote.swResponse);
+
+ // Clear the callback handle. This indicates that we are no longer waiting on a response from
+ // 'remote'.
+ remote.cbHandle = executor::TaskExecutor::CallbackHandle();
+
+ // On early return from this point on, signal anyone waiting on the current notification if
+ // _done() is true, since this might be the last outstanding request.
+ ScopeGuard signaller =
+ MakeGuard(&AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock, this);
+
+ // We check both the response status and command status for a retriable error.
+ Status status = cbData.response.status;
+ if (status.isOK()) {
+ status = getStatusFromCommandResult(cbData.response.data);
+ if (status.isOK()) {
+ remote.swResponse = std::move(cbData.response);
+ return;
+ }
+ }
+
+ // There was an error with either the response or the command.
+
+ auto shard = remote.getShard();
+ if (!shard) {
+ remote.swResponse =
+ Status(ErrorCodes::ShardNotFound,
+ str::stream() << "Could not find shard " << remote.shardId << " containing host "
+ << remote.getTargetHost().toString());
+ return;
+ }
+ shard->updateReplSetMonitor(remote.getTargetHost(), status);
+
+ if (shard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent) && !_stopRetrying &&
+ remote.retryCount < kMaxNumFailedHostRetryAttempts) {
+ LOG(1) << "Command to remote " << remote.shardId << " at host " << *remote.shardHostAndPort
+ << " failed with retriable error and will be retried" << causedBy(redact(status));
+ ++remote.retryCount;
+
+ // Even if _done() is not true, signal the thread sleeping in waitForResponses() to make
+ // it schedule a retry for this remote without waiting for all outstanding requests to
+ // come back.
+ signaller.Dismiss();
+ _signalCurrentNotification_inlock();
+ } else {
+ // Non-retriable error, out of retries, or _stopRetrying is true.
+
+ // Even though we examined the command status to check for retriable errors, we just return
+ // the response or response status here. It is up to the caller to parse the response as
+ // a command result.
+ if (cbData.response.status.isOK()) {
+ remote.swResponse = std::move(cbData.response);
+ } else {
+ remote.swResponse = std::move(cbData.response.status);
+ }
+
+ // If the caller can't use partial results, there's no point continuing to retry on
+ // retriable errors for other remotes.
+ if (!_allowPartialResults) {
+ _stopRetrying = true;
+ }
+ }
+}
+
+void AsyncRequestsSender::_signalCurrentNotification_inlock() {
+ // Only signal the notification if it has not already been signalled.
+ if (!*_notification) {
+ _notification->set();
+ }
+}
+
+void AsyncRequestsSender::_signalCurrentNotificationIfDone_inlock() {
+ if (_done_inlock()) {
+ _signalCurrentNotification_inlock();
+ }
+}
+
+AsyncRequestsSender::Request::Request(ShardId shardId, BSONObj cmdObj)
+ : shardId(shardId), cmdObj(cmdObj) {}
+
+AsyncRequestsSender::Response::Response(executor::RemoteCommandResponse response, HostAndPort hp)
+ : swResponse(response), shardHostAndPort(hp) {}
+
+AsyncRequestsSender::Response::Response(Status status) : swResponse(status) {}
+
+AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj)
+ : shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {}
+
+const HostAndPort& AsyncRequestsSender::RemoteData::getTargetHost() const {
+ invariant(shardHostAndPort);
+ return *shardHostAndPort;
+}
+
+Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort(
+ const ReadPreferenceSetting& readPref) {
+ const auto shard = getShard();
+ if (!shard) {
+ return Status(ErrorCodes::ShardNotFound,
+ str::stream() << "Could not find shard " << shardId);
+ }
+
+ auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20});
+ if (!findHostStatus.isOK()) {
+ return findHostStatus.getStatus();
+ }
+
+ shardHostAndPort = std::move(findHostStatus.getValue());
+
+ return Status::OK();
+}
+
+std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() {
+ // TODO: Pass down an OperationContext* to use here.
+ return grid.shardRegistry()->getShardNoReload(shardId);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
new file mode 100644
index 00000000000..6bbf1f9d0f3
--- /dev/null
+++ b/src/mongo/s/async_requests_sender.h
@@ -0,0 +1,283 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/executor/remote_command_response.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+/**
+ * The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel and
+ * automatically retrying on retriable errors according to a RetryPolicy. It can also allow for
+ * retrieving partial results by ignoring shards that return errors.
+ *
+ * Work on remote nodes is accomplished by scheduling remote work in a TaskExecutor's event loop.
+ *
+ * Typical usage is:
+ *
+ * AsyncRequestsSender ars(txn, executor, db, requests, readPrefSetting); // schedule the requests
+ * auto responses = ars.waitForResponses(txn); // wait for responses; retries on retriable erors
+ *
+ * Additionally, you can interrupt() (if you want waitForResponses() to wait for responses for
+ * outstanding requests but stop scheduling retries) or kill() (if you want to cancel outstanding
+ * requests) the ARS from another thread.
+ *
+ * Does not throw exceptions.
+ */
+class AsyncRequestsSender {
+ MONGO_DISALLOW_COPYING(AsyncRequestsSender);
+
+public:
+ /**
+ * Defines a request to a remote shard that can be run by the ARS.
+ */
+ struct Request {
+ Request(ShardId shardId, BSONObj cmdObj);
+
+ // ShardId of the shard to which the command will be sent.
+ const ShardId shardId;
+
+ // The command object to send to the remote host.
+ const BSONObj cmdObj;
+ };
+
+ /**
+ * Defines a response for a request to a remote shard.
+ */
+ struct Response {
+ // Constructor for a response that was successfully received.
+ Response(executor::RemoteCommandResponse response, HostAndPort hp);
+
+ // Constructor that specifies the reason the response was not successfully received.
+ Response(Status status);
+
+ // The response or error from the remote.
+ StatusWith<executor::RemoteCommandResponse> swResponse;
+
+ // The exact host on which the remote command was run. Is unset if swResponse has a non-OK
+ // status.
+ boost::optional<HostAndPort> shardHostAndPort;
+ };
+
+ /**
+ * Constructs a new AsyncRequestsSender. The TaskExecutor* must remain valid for the lifetime of
+ * the ARS.
+ */
+ AsyncRequestsSender(OperationContext* txn,
+ executor::TaskExecutor* executor,
+ std::string db,
+ const std::vector<AsyncRequestsSender::Request>& requests,
+ const ReadPreferenceSetting& readPreference,
+ bool allowPartialResults = false);
+
+ ~AsyncRequestsSender();
+
+ /**
+ * Returns a vector containing the responses or errors for each remote in the same order as the
+ * input vector that was passed in the constructor.
+ *
+ * If we were killed, returns immediately.
+ * If we were interrupted, returns when any outstanding requests have completed.
+ * Otherwise, returns when each remote has received a response or error.
+ */
+ std::vector<Response> waitForResponses(OperationContext* txn);
+
+ /**
+ * Stops the ARS from retrying requests. Causes waitForResponses() to wait until any outstanding
+ * requests have received a response or error.
+ *
+ * Use this if you no longer care about getting success responses, but need to do cleanup based
+ * on responses for requests that have already been dispatched.
+ */
+ void interrupt();
+
+ /**
+ * Cancels all outstanding requests and makes waitForResponses() return immediately.
+ *
+ * Use this if you no longer care about getting success responses, and don't need to process
+ * responses for outstanding requests.
+ */
+ void kill();
+
+private:
+ /**
+ * Returns true if each remote has received a response or error. (If kill() has been called,
+ * the error is the error assigned by the TaskExecutor when a callback is canceled).
+ */
+ bool _done();
+
+ /**
+ * Executes the logic of _done().
+ */
+ bool _done_inlock();
+
+ /**
+ * Replaces _notification with a new notification.
+ *
+ * If _stopRetrying is false, for each remote that does not have a response or outstanding
+ * request, schedules work to send the command to the remote.
+ *
+ * Invalid to call if there is an existing Notification and it has not yet been signaled.
+ */
+ void _scheduleRequestsIfNeeded(OperationContext* txn);
+
+ /**
+ * Helper to schedule a command to a remote.
+ *
+ * The 'remoteIndex' gives the position of the remote node from which we are retrieving the
+ * batch in '_remotes'.
+ *
+ * Returns success if the command to retrieve the next batch was scheduled successfully.
+ */
+ Status _scheduleRequest_inlock(OperationContext* txn, size_t remoteIndex);
+
+ /**
+ * The callback for a remote command.
+ *
+ * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
+ * indicates which node the response came from and where the response should be buffered.
+ *
+ * On a retriable error, unless _stopRetrying is true, signals the notification so that the
+ * request can be immediately retried.
+ *
+ * On a non-retriable error, if allowPartialResults is false, sets _stopRetrying to true.
+ */
+ void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
+ OperationContext* txn,
+ size_t remoteIndex);
+
+ /**
+ * If the existing notification has not yet been signaled, signals it and marks it as signaled.
+ */
+ void _signalCurrentNotification_inlock();
+
+ /**
+ * Wrapper around signalCurrentNotification_inlock(); only signals the notification if _done()
+ * is true.
+ */
+ void _signalCurrentNotificationIfDone_inlock();
+
+ /**
+ * We instantiate one of these per remote host.
+ */
+ struct RemoteData {
+ /**
+ * Creates a new uninitialized remote state with a command to send.
+ */
+ RemoteData(ShardId shardId, BSONObj cmdObj);
+
+ /**
+ * Returns the resolved host and port on which the remote command was or will be run.
+ */
+ const HostAndPort& getTargetHost() const;
+
+ /**
+ * Given a read preference, selects a host on which the command should be run.
+ */
+ Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref);
+
+ /**
+ * Returns the Shard object associated with this remote.
+ */
+ std::shared_ptr<Shard> getShard();
+
+ // ShardId of the shard to which the command will be sent.
+ const ShardId shardId;
+
+ // The command object to send to the remote host.
+ BSONObj cmdObj;
+
+ // The response or error from the remote. Is unset until a response or error has been
+ // received.
+ boost::optional<StatusWith<executor::RemoteCommandResponse>> swResponse;
+
+ // The exact host on which the remote command was run. Is unset until a request has been
+ // sent.
+ boost::optional<HostAndPort> shardHostAndPort;
+
+ // The number of times we've retried sending the command to this remote.
+ int retryCount = 0;
+
+ // The callback handle to an outstanding request for this remote.
+ executor::TaskExecutor::CallbackHandle cbHandle;
+ };
+
+ /**
+ * Used internally to determine if the ARS should attempt to retry any requests. Is set to true
+ * when:
+ * - interrupt() or kill() is called
+ * - allowPartialResults is false and some remote has a non-retriable error (or exhausts its
+ * retries for a retriable error).
+ */
+ bool _stopRetrying = false;
+
+ // Not owned here.
+ executor::TaskExecutor* _executor;
+
+ // The metadata obj to pass along with the command remote. Used to indicate that the command is
+ // ok to run on secondaries.
+ BSONObj _metadataObj;
+
+ // The database against which the commands are run.
+ const std::string _db;
+
+ // The readPreference to use for all requests.
+ ReadPreferenceSetting _readPreference;
+
+ // If set to true, allows for skipping over hosts that have non-retriable errors or exhaust
+ // their retries.
+ bool _allowPartialResults = false;
+
+ // Must be acquired before accessing any data members.
+ // Must also be held when calling any of the '_inlock()' helper functions.
+ stdx::mutex _mutex;
+
+ // Data tracking the state of our communication with each of the remote nodes.
+ std::vector<RemoteData> _remotes;
+
+ // A notification that gets signaled when a remote has a retriable error or the last outstanding
+ // response is received.
+ boost::optional<Notification<void>> _notification;
+};
+
+} // namespace mongo