summaryrefslogtreecommitdiff
path: root/src/mongo/s/async_requests_sender.h
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2019-05-01 21:43:38 -0400
committerJason Carey <jcarey@argv.me>2019-05-01 21:45:07 -0400
commitd53850fddc3f57369cf73a007e94515367a93cbb (patch)
treee08b4a8c68216a7ab9828f366bcd2e271492df0e /src/mongo/s/async_requests_sender.h
parentb92621e4f6dc631023f0be2cfe311da71cd0f63a (diff)
downloadmongo-d53850fddc3f57369cf73a007e94515367a93cbb.tar.gz
SERVER-39163 Parallel targetting in the ARS
Use the new opctx as executor functionality to perform parallel targeting in the ARS
Diffstat (limited to 'src/mongo/s/async_requests_sender.h')
-rw-r--r--src/mongo/s/async_requests_sender.h205
1 files changed, 102 insertions, 103 deletions
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index bfd06834972..86387d01bbe 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -35,7 +35,9 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
+#include "mongo/db/baton.h"
#include "mongo/executor/remote_command_response.h"
+#include "mongo/executor/scoped_task_executor.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/shard_id.h"
@@ -72,8 +74,7 @@ namespace mongo {
* continue;
*
* // If partial results are not tolerable and you don't care about dispatched requests,
- * // safe to destroy the ARS. It will automatically cancel pending I/O and wait for the
- * // outstanding callbacks to complete on destruction.
+ * // safe to destroy the ARS. It will automatically cancel pending I/O.
* }
* }
*
@@ -101,16 +102,23 @@ public:
* Defines a response for a request to a remote shard.
*/
struct Response {
- // Constructor for a response that was successfully received.
- Response(ShardId shardId, executor::RemoteCommandResponse response, HostAndPort hp);
-
- // Constructor that specifies the reason the response was not successfully received.
- Response(ShardId shardId, Status status, boost::optional<HostAndPort> hp);
-
// The shard to which the request was sent.
ShardId shardId;
// The response or error from the remote.
+ //
+ // The mapping between the RemoteCommandResponse returned by the task executor and this
+ // field is fairly specific:
+ //
+ // Status is set when:
+ // * An error is returned when scheduling the task
+ // * A status is returned in the response.status field
+ //
+ // The value is set when:
+ // * There are no errors
+ // * Errors exist only remotely (I.e. by reading response.data for ok:0 or write errors
+ //
+ // I.e. if a value is set, swResponse.getValue().status.isOK()
StatusWith<executor::RemoteCommandResponse> swResponse;
// The exact host on which the remote command was run. Is unset if the shard could not be
@@ -130,27 +138,21 @@ public:
Shard::RetryPolicy retryPolicy);
/**
- * Ensures pending network I/O for any outstanding requests has been canceled and waits for
- * outstanding callbacks to complete.
- */
- ~AsyncRequestsSender();
-
- /**
* Returns true if responses for all requests have been returned via next().
*/
- bool done();
+ bool done() noexcept;
/**
* Returns the next available response or error.
*
* If the operation is interrupted, the status of some responses may be CallbackCanceled.
*
- * If neither cancelPendingRequests() nor stopRetrying() have been called, schedules retries for
- * any remotes that have had a retriable error and have not exhausted their retries.
+ * If stopRetrying() has not been called, schedules retries for any remotes that have had a
+ * retriable error and have not exhausted their retries.
*
* Note: Must only be called from one thread at a time, and invalid to call if done() is true.
*/
- Response next();
+ Response next() noexcept;
/**
* Stops the ARS from retrying requests.
@@ -158,107 +160,96 @@ public:
* Use this if you no longer care about getting success responses, but need to do cleanup based
* on responses for requests that have already been dispatched.
*/
- void stopRetrying();
+ void stopRetrying() noexcept;
private:
/**
* We instantiate one of these per remote host.
*/
- struct RemoteData {
+ class RemoteData {
+ public:
/**
* Creates a new uninitialized remote state with a command to send.
*/
- RemoteData(ShardId shardId, BSONObj cmdObj);
-
- /**
- * Given a read preference, selects a host on which the command should be run.
- */
- Status resolveShardIdToHostAndPort(AsyncRequestsSender* ars,
- const ReadPreferenceSetting& readPref);
+ RemoteData(AsyncRequestsSender* ars, ShardId shardId, BSONObj cmdObj);
/**
* Returns the Shard object associated with this remote.
*/
std::shared_ptr<Shard> getShard();
- // ShardId of the shard to which the command will be sent.
- ShardId shardId;
+ /**
+ * Returns true if we've already queued a response from the remote.
+ */
+ explicit operator bool() const {
+ return _done;
+ }
- // The command object to send to the remote host.
- BSONObj cmdObj;
+ /**
+ * Extracts a failed response from the remote, given an interruption status.
+ */
+ Response makeFailedResponse(Status status) && {
+ return {std::move(_shardId), std::move(status), std::move(_shardHostAndPort)};
+ }
- // The response or error from the remote. Is unset until a response or error has been
- // received.
- boost::optional<StatusWith<executor::RemoteCommandResponse>> swResponse;
+ /**
+ * Executes the request for the given shard, this includes any necessary retries and ends
+ * with a Response getting written to the response queue.
+ *
+ * This is implemented by calling scheduleRequest, which handles retries internally in its
+ * future chain.
+ */
+ void executeRequest();
- // The exact host on which the remote command was run. Is unset until a request has been
- // sent.
- boost::optional<HostAndPort> shardHostAndPort;
+ /**
+ * Executes a single attempt to:
+ *
+ * 1. resolveShardIdToHostAndPort
+ * 2. scheduleRemoteCommand
+ * 3. handlResponse
+ *
+ * for the given shard.
+ */
+ SemiFuture<executor::RemoteCommandResponse> scheduleRequest();
- // The number of times we've retried sending the command to this remote.
- int retryCount = 0;
+ /**
+ * Given a read preference, selects a host on which the command should be run.
+ */
+ SemiFuture<HostAndPort> resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref);
- // The callback handle to an outstanding request for this remote.
- executor::TaskExecutor::CallbackHandle cbHandle;
+ /**
+ * Schedules the remote command on the ARS's TaskExecutor
+ */
+ SemiFuture<executor::RemoteCommandResponse> scheduleRemoteCommand(
+ HostAndPort&& hostAndPort);
- // Whether this remote's result has been returned.
- bool done = false;
- };
+ /**
+ * Handles the remote response
+ */
+ SemiFuture<executor::RemoteCommandResponse> handleResponse(
+ executor::RemoteCommandResponse&& rcr);
- /**
- * Job for _makeProgress. We use a producer consumer queue to coordinate with TaskExecutors
- * off thread, and this wraps up the arguments for that call.
- */
- struct Job {
- executor::TaskExecutor::RemoteCommandCallbackArgs cbData;
- size_t remoteIndex;
- };
+ private:
+ bool _done = false;
- /**
- * Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag.
- */
- void _cancelPendingRequests();
+ AsyncRequestsSender* const _ars;
- /**
- * If _stopRetrying is false, schedules retries for remotes that have had a retriable error.
- *
- * If any remote has successfully received a response, returns a Response for it.
- * If any remote has an error response that can't be retried, returns a Response for it.
- * Otherwise, returns boost::none.
- */
- boost::optional<Response> _ready();
+ // ShardId of the shard to which the command will be sent.
+ ShardId _shardId;
- /**
- * For each remote that had a response, checks if it had a retriable error, and clears its
- * response if so.
- *
- * For each remote without a response or pending request, schedules the remote request.
- *
- * On failure to schedule a request, pushes a noop job to the response queue.
- */
- void _scheduleRequests();
+ // The command object to send to the remote host.
+ BSONObj _cmdObj;
- /**
- * Helper to schedule a command to a remote.
- *
- * The 'remoteIndex' gives the position of the remote node from which we are retrieving the
- * batch in '_remotes'.
- *
- * Returns success if the command was scheduled successfully.
- */
- Status _scheduleRequest(size_t remoteIndex);
+ // The exact host on which the remote command was run. Is unset until a request has been
+ // sent.
+ boost::optional<HostAndPort> _shardHostAndPort;
- /**
- * Waits for forward progress in gathering responses from a remote.
- *
- * Stores the response or error in the remote.
- */
- void _makeProgress();
+ // The number of times we've retried sending the command to this remote.
+ int _retryCount = 0;
+ };
OperationContext* _opCtx;
- executor::TaskExecutor* _executor;
-
// The metadata obj to pass along with the command remote. Used to indicate that the command is
// ok to run on secondaries.
BSONObj _metadataObj;
@@ -272,25 +263,33 @@ private:
// The policy to use when deciding whether to retry on an error.
Shard::RetryPolicy _retryPolicy;
- // Is set to a non-OK status if the client operation is interrupted.
- // When waiting for a remote to be ready, we only check for interrupt if the _interruptStatus
- // has not already been set to an error (so we can wait for callbacks for (canceled) outstanding
- // requests to complete after interrupt).
- // When processing responses from remotes, if _interruptStatus is non-OK and the response status
- // is CallbackCanceled, we promote the response status to the _interruptStatus.
- Status _interruptStatus = Status::OK();
-
// Data tracking the state of our communication with each of the remote nodes.
std::vector<RemoteData> _remotes;
- // Thread safe queue which collects responses from the task executor for execution in next()
- //
- // The queue supports unset jobs for a signal to wake up and check for failure
- MultiProducerSingleConsumerQueue<boost::optional<Job>>::Pipe _responseQueue;
+ // Number of remotes we haven't returned final results from.
+ size_t _remotesLeft;
+
+ // Queue of responses. We don't actually take advantage of the thread safety of the queue, but
+ // instead use it to collect results while waiting on a condvar (which allows us to use our
+ // underlying baton).
+ SingleProducerSingleConsumerQueue<Response> _responseQueue;
// Used to determine if the ARS should attempt to retry any requests. Is set to true when
- // stopRetrying() or cancelPendingRequests() is called.
+ // stopRetrying() is called.
bool _stopRetrying = false;
+
+ Status _interruptStatus = Status::OK();
+
+ // NOTE: it's important that these two members go last in this class. That ensures that we:
+ // 1. cancel/ensure no more callbacks run which touch the ARS
+ // 2. cancel any outstanding work in the task executor
+
+ // Scoped task executor which handles clean up of any handles after the ARS goes out of scope
+ executor::ScopedTaskExecutor _subExecutor;
+
+ // Scoped baton holder which ensures any callbacks which touch this ARS are called with a
+ // not-okay status (or not run, in the case of ExecutorFuture continuations).
+ Baton::SubBatonHolder _subBaton;
};
} // namespace mongo