diff options
author | Jason Carey <jcarey@argv.me> | 2019-05-01 21:43:38 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2019-05-01 21:45:07 -0400 |
commit | d53850fddc3f57369cf73a007e94515367a93cbb (patch) | |
tree | e08b4a8c68216a7ab9828f366bcd2e271492df0e /src/mongo/s/async_requests_sender.h | |
parent | b92621e4f6dc631023f0be2cfe311da71cd0f63a (diff) | |
download | mongo-d53850fddc3f57369cf73a007e94515367a93cbb.tar.gz |
SERVER-39163 Parallel targetting in the ARS
Use the new opctx as executor functionality to perform parallel
targeting in the ARS
Diffstat (limited to 'src/mongo/s/async_requests_sender.h')
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 205 |
1 files changed, 102 insertions, 103 deletions
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index bfd06834972..86387d01bbe 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -35,7 +35,9 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" #include "mongo/client/read_preference.h" +#include "mongo/db/baton.h" #include "mongo/executor/remote_command_response.h" +#include "mongo/executor/scoped_task_executor.h" #include "mongo/executor/task_executor.h" #include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" @@ -72,8 +74,7 @@ namespace mongo { * continue; * * // If partial results are not tolerable and you don't care about dispatched requests, - * // safe to destroy the ARS. It will automatically cancel pending I/O and wait for the - * // outstanding callbacks to complete on destruction. + * // safe to destroy the ARS. It will automatically cancel pending I/O. * } * } * @@ -101,16 +102,23 @@ public: * Defines a response for a request to a remote shard. */ struct Response { - // Constructor for a response that was successfully received. - Response(ShardId shardId, executor::RemoteCommandResponse response, HostAndPort hp); - - // Constructor that specifies the reason the response was not successfully received. - Response(ShardId shardId, Status status, boost::optional<HostAndPort> hp); - // The shard to which the request was sent. ShardId shardId; // The response or error from the remote. + // + // The mapping between the RemoteCommandResponse returned by the task executor and this + // field is fairly specific: + // + // Status is set when: + // * An error is returned when scheduling the task + // * A status is returned in the response.status field + // + // The value is set when: + // * There are no errors + // * Errors exist only remotely (I.e. by reading response.data for ok:0 or write errors + // + // I.e. if a value is set, swResponse.getValue().status.isOK() StatusWith<executor::RemoteCommandResponse> swResponse; // The exact host on which the remote command was run. Is unset if the shard could not be @@ -130,27 +138,21 @@ public: Shard::RetryPolicy retryPolicy); /** - * Ensures pending network I/O for any outstanding requests has been canceled and waits for - * outstanding callbacks to complete. - */ - ~AsyncRequestsSender(); - - /** * Returns true if responses for all requests have been returned via next(). */ - bool done(); + bool done() noexcept; /** * Returns the next available response or error. * * If the operation is interrupted, the status of some responses may be CallbackCanceled. * - * If neither cancelPendingRequests() nor stopRetrying() have been called, schedules retries for - * any remotes that have had a retriable error and have not exhausted their retries. + * If stopRetrying() has not been called, schedules retries for any remotes that have had a + * retriable error and have not exhausted their retries. * * Note: Must only be called from one thread at a time, and invalid to call if done() is true. */ - Response next(); + Response next() noexcept; /** * Stops the ARS from retrying requests. @@ -158,107 +160,96 @@ public: * Use this if you no longer care about getting success responses, but need to do cleanup based * on responses for requests that have already been dispatched. */ - void stopRetrying(); + void stopRetrying() noexcept; private: /** * We instantiate one of these per remote host. */ - struct RemoteData { + class RemoteData { + public: /** * Creates a new uninitialized remote state with a command to send. */ - RemoteData(ShardId shardId, BSONObj cmdObj); - - /** - * Given a read preference, selects a host on which the command should be run. - */ - Status resolveShardIdToHostAndPort(AsyncRequestsSender* ars, - const ReadPreferenceSetting& readPref); + RemoteData(AsyncRequestsSender* ars, ShardId shardId, BSONObj cmdObj); /** * Returns the Shard object associated with this remote. */ std::shared_ptr<Shard> getShard(); - // ShardId of the shard to which the command will be sent. - ShardId shardId; + /** + * Returns true if we've already queued a response from the remote. + */ + explicit operator bool() const { + return _done; + } - // The command object to send to the remote host. - BSONObj cmdObj; + /** + * Extracts a failed response from the remote, given an interruption status. + */ + Response makeFailedResponse(Status status) && { + return {std::move(_shardId), std::move(status), std::move(_shardHostAndPort)}; + } - // The response or error from the remote. Is unset until a response or error has been - // received. - boost::optional<StatusWith<executor::RemoteCommandResponse>> swResponse; + /** + * Executes the request for the given shard, this includes any necessary retries and ends + * with a Response getting written to the response queue. + * + * This is implemented by calling scheduleRequest, which handles retries internally in its + * future chain. + */ + void executeRequest(); - // The exact host on which the remote command was run. Is unset until a request has been - // sent. - boost::optional<HostAndPort> shardHostAndPort; + /** + * Executes a single attempt to: + * + * 1. resolveShardIdToHostAndPort + * 2. scheduleRemoteCommand + * 3. handlResponse + * + * for the given shard. + */ + SemiFuture<executor::RemoteCommandResponse> scheduleRequest(); - // The number of times we've retried sending the command to this remote. - int retryCount = 0; + /** + * Given a read preference, selects a host on which the command should be run. + */ + SemiFuture<HostAndPort> resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); - // The callback handle to an outstanding request for this remote. - executor::TaskExecutor::CallbackHandle cbHandle; + /** + * Schedules the remote command on the ARS's TaskExecutor + */ + SemiFuture<executor::RemoteCommandResponse> scheduleRemoteCommand( + HostAndPort&& hostAndPort); - // Whether this remote's result has been returned. - bool done = false; - }; + /** + * Handles the remote response + */ + SemiFuture<executor::RemoteCommandResponse> handleResponse( + executor::RemoteCommandResponse&& rcr); - /** - * Job for _makeProgress. We use a producer consumer queue to coordinate with TaskExecutors - * off thread, and this wraps up the arguments for that call. - */ - struct Job { - executor::TaskExecutor::RemoteCommandCallbackArgs cbData; - size_t remoteIndex; - }; + private: + bool _done = false; - /** - * Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag. - */ - void _cancelPendingRequests(); + AsyncRequestsSender* const _ars; - /** - * If _stopRetrying is false, schedules retries for remotes that have had a retriable error. - * - * If any remote has successfully received a response, returns a Response for it. - * If any remote has an error response that can't be retried, returns a Response for it. - * Otherwise, returns boost::none. - */ - boost::optional<Response> _ready(); + // ShardId of the shard to which the command will be sent. + ShardId _shardId; - /** - * For each remote that had a response, checks if it had a retriable error, and clears its - * response if so. - * - * For each remote without a response or pending request, schedules the remote request. - * - * On failure to schedule a request, pushes a noop job to the response queue. - */ - void _scheduleRequests(); + // The command object to send to the remote host. + BSONObj _cmdObj; - /** - * Helper to schedule a command to a remote. - * - * The 'remoteIndex' gives the position of the remote node from which we are retrieving the - * batch in '_remotes'. - * - * Returns success if the command was scheduled successfully. - */ - Status _scheduleRequest(size_t remoteIndex); + // The exact host on which the remote command was run. Is unset until a request has been + // sent. + boost::optional<HostAndPort> _shardHostAndPort; - /** - * Waits for forward progress in gathering responses from a remote. - * - * Stores the response or error in the remote. - */ - void _makeProgress(); + // The number of times we've retried sending the command to this remote. + int _retryCount = 0; + }; OperationContext* _opCtx; - executor::TaskExecutor* _executor; - // The metadata obj to pass along with the command remote. Used to indicate that the command is // ok to run on secondaries. BSONObj _metadataObj; @@ -272,25 +263,33 @@ private: // The policy to use when deciding whether to retry on an error. Shard::RetryPolicy _retryPolicy; - // Is set to a non-OK status if the client operation is interrupted. - // When waiting for a remote to be ready, we only check for interrupt if the _interruptStatus - // has not already been set to an error (so we can wait for callbacks for (canceled) outstanding - // requests to complete after interrupt). - // When processing responses from remotes, if _interruptStatus is non-OK and the response status - // is CallbackCanceled, we promote the response status to the _interruptStatus. - Status _interruptStatus = Status::OK(); - // Data tracking the state of our communication with each of the remote nodes. std::vector<RemoteData> _remotes; - // Thread safe queue which collects responses from the task executor for execution in next() - // - // The queue supports unset jobs for a signal to wake up and check for failure - MultiProducerSingleConsumerQueue<boost::optional<Job>>::Pipe _responseQueue; + // Number of remotes we haven't returned final results from. + size_t _remotesLeft; + + // Queue of responses. We don't actually take advantage of the thread safety of the queue, but + // instead use it to collect results while waiting on a condvar (which allows us to use our + // underlying baton). + SingleProducerSingleConsumerQueue<Response> _responseQueue; // Used to determine if the ARS should attempt to retry any requests. Is set to true when - // stopRetrying() or cancelPendingRequests() is called. + // stopRetrying() is called. bool _stopRetrying = false; + + Status _interruptStatus = Status::OK(); + + // NOTE: it's important that these two members go last in this class. That ensures that we: + // 1. cancel/ensure no more callbacks run which touch the ARS + // 2. cancel any outstanding work in the task executor + + // Scoped task executor which handles clean up of any handles after the ARS goes out of scope + executor::ScopedTaskExecutor _subExecutor; + + // Scoped baton holder which ensures any callbacks which touch this ARS are called with a + // not-okay status (or not run, in the case of ExecutorFuture continuations). + Baton::SubBatonHolder _subBaton; }; } // namespace mongo |