diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-06 18:17:34 -0500 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-13 15:09:46 -0400 |
commit | 965dc76f4b4e27f7a9e3bc7810b608c53085d32f (patch) | |
tree | e5554cdfb59e4df76adc55d4851b2aaa54dabc12 /src/mongo/s/async_requests_sender.h | |
parent | 90bd4ed6ba5d0f3353d1af42c667cd6a2c1a540e (diff) | |
download | mongo-965dc76f4b4e27f7a9e3bc7810b608c53085d32f.tar.gz |
SERVER-28164 make ClusterWrite::run path use ARS instead of DBClientMultiCommand
Diffstat (limited to 'src/mongo/s/async_requests_sender.h')
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 201 |
1 files changed, 102 insertions, 99 deletions
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 9f8664f9c2a..ae90e8eaded 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -48,20 +48,35 @@ namespace mongo { /** * The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel and - * automatically retrying on retriable errors according to a RetryPolicy. It can also allow for - * retrieving partial results by ignoring shards that return errors. - * - * Work on remote nodes is accomplished by scheduling remote work in a TaskExecutor's event loop. + * automatically retrying on retriable errors according to a RetryPolicy. Work on remote nodes is + * accomplished by scheduling remote work in a TaskExecutor's event loop. * * Typical usage is: * - * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting); // schedule the - * requests - * auto responses = ars.waitForResponses(opCtx); // wait for responses; retries on retriable erors + * // Add some requests + * std::vector<AsyncRequestSender::Request> requests; + * + * // Creating the ARS schedules the requests immediately + * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting); + * + * while (!ars.done()) { + * // Schedule a round of retries if needed and wait for next response or error + * auto response = ars.next(opCtx); * - * Additionally, you can interrupt() (if you want waitForResponses() to wait for responses for - * outstanding requests but stop scheduling retries) or kill() (if you want to cancel outstanding - * requests) the ARS from another thread. + * if (!response.swResponse.isOK()) { + * // If partial results are tolerable, process the error as needed and continue. + * continue; + * + * // If partial results are not tolerable but you need to retrieve responses for all + * // dispatched requests, use stopRetrying() and continue. + * ars.stopRetrying(); + * continue; + * + * // If partial results are not tolerable and you don't care about dispatched requests, + * // safe to destroy the ARS. It will automatically cancel pending I/O and wait for the + * // outstanding callbacks to complete on destruction. + * } + * } * * Does not throw exceptions. */ @@ -87,10 +102,13 @@ public: */ struct Response { // Constructor for a response that was successfully received. - Response(executor::RemoteCommandResponse response, HostAndPort hp); + Response(ShardId shardId, executor::RemoteCommandResponse response, HostAndPort hp); // Constructor that specifies the reason the response was not successfully received. - Response(Status status, boost::optional<HostAndPort> hp); + Response(ShardId shardId, Status status, boost::optional<HostAndPort> hp); + + // The shard to which the request was sent. + ShardId shardId; // The response or error from the remote. StatusWith<executor::RemoteCommandResponse> swResponse; @@ -101,33 +119,38 @@ public: }; /** - * Constructs a new AsyncRequestsSender. The TaskExecutor* must remain valid for the lifetime of - * the ARS. + * Constructs a new AsyncRequestsSender. The OperationContext* and TaskExecutor* must remain + * valid for the lifetime of the ARS. */ AsyncRequestsSender(OperationContext* opCtx, executor::TaskExecutor* executor, StringData db, const std::vector<AsyncRequestsSender::Request>& requests, - const ReadPreferenceSetting& readPreference, - bool allowPartialResults = false); + const ReadPreferenceSetting& readPreference); + /** + * Ensures pending network I/O for any outstanding requests has been canceled and waits for + * outstanding requests to complete. + */ ~AsyncRequestsSender(); /** - * Returns a vector containing the responses or errors for each remote in the same order as the - * input vector that was passed in the constructor. + * Returns true if responses for all requests have been returned via next(). + */ + bool done(); + + /** + * Returns the next available response or error. * - * If we were killed, returns immediately. - * If we were interrupted, returns when any outstanding requests have completed. - * Otherwise, returns when each remote has received a response or error. + * If neither kill() nor stopRetrying() have been called, schedules retries for any remotes that + * have had a retriable error and have not exhausted their retries. * - * Must only be called once. + * Invalid to call if done() is true. */ - std::vector<Response> waitForResponses(OperationContext* opCtx); + Response next(); /** - * Stops the ARS from retrying requests. Causes waitForResponses() to wait until any outstanding - * requests have received a response or error. + * Stops the ARS from retrying requests. * * Use this if you no longer care about getting success responses, but need to do cleanup based * on responses for requests that have already been dispatched. @@ -135,72 +158,15 @@ public: void interrupt(); /** - * Cancels all outstanding requests and makes waitForResponses() return immediately. + * Cancels all outstanding requests. * * Use this if you no longer care about getting success responses, and don't need to process - * responses for outstanding requests. + * responses for requests that have already been dispatched. */ void kill(); private: /** - * Returns true if each remote has received a response or error. (If kill() has been called, - * the error is the error assigned by the TaskExecutor when a callback is canceled). - */ - bool _done(); - - /** - * Executes the logic of _done(). - */ - bool _done_inlock(); - - /** - * Replaces _notification with a new notification. - * - * If _stopRetrying is false, for each remote that does not have a response or outstanding - * request, schedules work to send the command to the remote. - * - * Invalid to call if there is an existing Notification and it has not yet been signaled. - */ - void _scheduleRequestsIfNeeded(OperationContext* opCtx); - - /** - * Helper to schedule a command to a remote. - * - * The 'remoteIndex' gives the position of the remote node from which we are retrieving the - * batch in '_remotes'. - * - * Returns success if the command to retrieve the next batch was scheduled successfully. - */ - Status _scheduleRequest_inlock(OperationContext* opCtx, size_t remoteIndex); - - /** - * The callback for a remote command. - * - * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore - * indicates which node the response came from and where the response should be buffered. - * - * On a retriable error, unless _stopRetrying is true, signals the notification so that the - * request can be immediately retried. - * - * On a non-retriable error, if allowPartialResults is false, sets _stopRetrying to true. - */ - void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - OperationContext* opCtx, - size_t remoteIndex); - - /** - * If the existing notification has not yet been signaled, signals it and marks it as signaled. - */ - void _signalCurrentNotification_inlock(); - - /** - * Wrapper around signalCurrentNotification_inlock(); only signals the notification if _done() - * is true. - */ - void _signalCurrentNotificationIfDone_inlock(); - - /** * We instantiate one of these per remote host. */ struct RemoteData { @@ -210,11 +176,6 @@ private: RemoteData(ShardId shardId, BSONObj cmdObj); /** - * Returns the resolved host and port on which the remote command was or will be run. - */ - const HostAndPort& getTargetHost() const; - - /** * Given a read preference, selects a host on which the command should be run. */ Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref); @@ -225,7 +186,7 @@ private: std::shared_ptr<Shard> getShard(); // ShardId of the shard to which the command will be sent. - const ShardId shardId; + ShardId shardId; // The command object to send to the remote host. BSONObj cmdObj; @@ -243,18 +204,61 @@ private: // The callback handle to an outstanding request for this remote. executor::TaskExecutor::CallbackHandle cbHandle; + + // Whether this remote's result has been returned. + bool done = false; }; /** - * Used internally to determine if the ARS should attempt to retry any requests. Is set to true - * when: - * - interrupt() or kill() is called - * - allowPartialResults is false and some remote has a non-retriable error (or exhausts its - * retries for a retriable error). + * Replaces _notification with a new notification. + * + * If _stopRetrying is false, schedules retries for remotes that have had a retriable error. + * + * If any remote has successfully received a response, returns a Response for it. + * If any remote has an error response that can't be retried, returns a Response for it. + * Otherwise, returns boost::none. + */ + boost::optional<Response> _ready(); + + /** + * For each remote that had a response, checks if it had a retriable error, and clears its + * response if so. + * + * For each remote without a response or pending request, schedules the remote request. + * + * On failure to schedule a request, signals the notification. + */ + void _scheduleRequests_inlock(); + + /** + * Helper to schedule a command to a remote. + * + * The 'remoteIndex' gives the position of the remote node from which we are retrieving the + * batch in '_remotes'. + * + * Returns success if the command was scheduled successfully. */ + Status _scheduleRequest_inlock(size_t remoteIndex); + + /** + * The callback for a remote command. + * + * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore + * indicates which node the response came from and where the response should be buffered. + * + * Stores the response or error in the remote and signals the notification. + */ + void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, + size_t remoteIndex); + + // Used internally to determine if the ARS should attempt to retry any requests. Is set to true + // when stopRetrying() or kill() is called. bool _stopRetrying = false; // Not owned here. + OperationContext* _opCtx; + + // Not owned here. executor::TaskExecutor* _executor; // The metadata obj to pass along with the command remote. Used to indicate that the command is @@ -267,10 +271,6 @@ private: // The readPreference to use for all requests. ReadPreferenceSetting _readPreference; - // If set to true, allows for skipping over hosts that have non-retriable errors or exhaust - // their retries. - bool _allowPartialResults = false; - // Must be acquired before accessing any data members. // Must also be held when calling any of the '_inlock()' helper functions. stdx::mutex _mutex; @@ -281,6 +281,9 @@ private: // A notification that gets signaled when a remote has a retriable error or the last outstanding // response is received. boost::optional<Notification<void>> _notification; + + // Set to true when kill() is called so that it is only executed once. + bool _killed = false; }; } // namespace mongo |