summaryrefslogtreecommitdiff
path: root/src/mongo/s/async_requests_sender.h
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-06 18:17:34 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-03-13 15:09:46 -0400
commit965dc76f4b4e27f7a9e3bc7810b608c53085d32f (patch)
treee5554cdfb59e4df76adc55d4851b2aaa54dabc12 /src/mongo/s/async_requests_sender.h
parent90bd4ed6ba5d0f3353d1af42c667cd6a2c1a540e (diff)
downloadmongo-965dc76f4b4e27f7a9e3bc7810b608c53085d32f.tar.gz
SERVER-28164 make ClusterWrite::run path use ARS instead of DBClientMultiCommand
Diffstat (limited to 'src/mongo/s/async_requests_sender.h')
-rw-r--r--src/mongo/s/async_requests_sender.h201
1 files changed, 102 insertions, 99 deletions
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 9f8664f9c2a..ae90e8eaded 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -48,20 +48,35 @@ namespace mongo {
/**
* The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel and
- * automatically retrying on retriable errors according to a RetryPolicy. It can also allow for
- * retrieving partial results by ignoring shards that return errors.
- *
- * Work on remote nodes is accomplished by scheduling remote work in a TaskExecutor's event loop.
+ * automatically retrying on retriable errors according to a RetryPolicy. Work on remote nodes is
+ * accomplished by scheduling remote work in a TaskExecutor's event loop.
*
* Typical usage is:
*
- * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting); // schedule the
- * requests
- * auto responses = ars.waitForResponses(opCtx); // wait for responses; retries on retriable erors
+ * // Add some requests
+ * std::vector<AsyncRequestSender::Request> requests;
+ *
+ * // Creating the ARS schedules the requests immediately
+ * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting);
+ *
+ * while (!ars.done()) {
+ * // Schedule a round of retries if needed and wait for next response or error
+ * auto response = ars.next(opCtx);
*
- * Additionally, you can interrupt() (if you want waitForResponses() to wait for responses for
- * outstanding requests but stop scheduling retries) or kill() (if you want to cancel outstanding
- * requests) the ARS from another thread.
+ * if (!response.swResponse.isOK()) {
+ * // If partial results are tolerable, process the error as needed and continue.
+ * continue;
+ *
+ * // If partial results are not tolerable but you need to retrieve responses for all
+ * // dispatched requests, use stopRetrying() and continue.
+ * ars.stopRetrying();
+ * continue;
+ *
+ * // If partial results are not tolerable and you don't care about dispatched requests,
+ * // safe to destroy the ARS. It will automatically cancel pending I/O and wait for the
+ * // outstanding callbacks to complete on destruction.
+ * }
+ * }
*
* Does not throw exceptions.
*/
@@ -87,10 +102,13 @@ public:
*/
struct Response {
// Constructor for a response that was successfully received.
- Response(executor::RemoteCommandResponse response, HostAndPort hp);
+ Response(ShardId shardId, executor::RemoteCommandResponse response, HostAndPort hp);
// Constructor that specifies the reason the response was not successfully received.
- Response(Status status, boost::optional<HostAndPort> hp);
+ Response(ShardId shardId, Status status, boost::optional<HostAndPort> hp);
+
+ // The shard to which the request was sent.
+ ShardId shardId;
// The response or error from the remote.
StatusWith<executor::RemoteCommandResponse> swResponse;
@@ -101,33 +119,38 @@ public:
};
/**
- * Constructs a new AsyncRequestsSender. The TaskExecutor* must remain valid for the lifetime of
- * the ARS.
+ * Constructs a new AsyncRequestsSender. The OperationContext* and TaskExecutor* must remain
+ * valid for the lifetime of the ARS.
*/
AsyncRequestsSender(OperationContext* opCtx,
executor::TaskExecutor* executor,
StringData db,
const std::vector<AsyncRequestsSender::Request>& requests,
- const ReadPreferenceSetting& readPreference,
- bool allowPartialResults = false);
+ const ReadPreferenceSetting& readPreference);
+ /**
+ * Ensures pending network I/O for any outstanding requests has been canceled and waits for
+ * outstanding requests to complete.
+ */
~AsyncRequestsSender();
/**
- * Returns a vector containing the responses or errors for each remote in the same order as the
- * input vector that was passed in the constructor.
+ * Returns true if responses for all requests have been returned via next().
+ */
+ bool done();
+
+ /**
+ * Returns the next available response or error.
*
- * If we were killed, returns immediately.
- * If we were interrupted, returns when any outstanding requests have completed.
- * Otherwise, returns when each remote has received a response or error.
+ * If neither kill() nor stopRetrying() have been called, schedules retries for any remotes that
+ * have had a retriable error and have not exhausted their retries.
*
- * Must only be called once.
+ * Invalid to call if done() is true.
*/
- std::vector<Response> waitForResponses(OperationContext* opCtx);
+ Response next();
/**
- * Stops the ARS from retrying requests. Causes waitForResponses() to wait until any outstanding
- * requests have received a response or error.
+ * Stops the ARS from retrying requests.
*
* Use this if you no longer care about getting success responses, but need to do cleanup based
* on responses for requests that have already been dispatched.
@@ -135,72 +158,15 @@ public:
void interrupt();
/**
- * Cancels all outstanding requests and makes waitForResponses() return immediately.
+ * Cancels all outstanding requests.
*
* Use this if you no longer care about getting success responses, and don't need to process
- * responses for outstanding requests.
+ * responses for requests that have already been dispatched.
*/
void kill();
private:
/**
- * Returns true if each remote has received a response or error. (If kill() has been called,
- * the error is the error assigned by the TaskExecutor when a callback is canceled).
- */
- bool _done();
-
- /**
- * Executes the logic of _done().
- */
- bool _done_inlock();
-
- /**
- * Replaces _notification with a new notification.
- *
- * If _stopRetrying is false, for each remote that does not have a response or outstanding
- * request, schedules work to send the command to the remote.
- *
- * Invalid to call if there is an existing Notification and it has not yet been signaled.
- */
- void _scheduleRequestsIfNeeded(OperationContext* opCtx);
-
- /**
- * Helper to schedule a command to a remote.
- *
- * The 'remoteIndex' gives the position of the remote node from which we are retrieving the
- * batch in '_remotes'.
- *
- * Returns success if the command to retrieve the next batch was scheduled successfully.
- */
- Status _scheduleRequest_inlock(OperationContext* opCtx, size_t remoteIndex);
-
- /**
- * The callback for a remote command.
- *
- * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
- * indicates which node the response came from and where the response should be buffered.
- *
- * On a retriable error, unless _stopRetrying is true, signals the notification so that the
- * request can be immediately retried.
- *
- * On a non-retriable error, if allowPartialResults is false, sets _stopRetrying to true.
- */
- void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- OperationContext* opCtx,
- size_t remoteIndex);
-
- /**
- * If the existing notification has not yet been signaled, signals it and marks it as signaled.
- */
- void _signalCurrentNotification_inlock();
-
- /**
- * Wrapper around signalCurrentNotification_inlock(); only signals the notification if _done()
- * is true.
- */
- void _signalCurrentNotificationIfDone_inlock();
-
- /**
* We instantiate one of these per remote host.
*/
struct RemoteData {
@@ -210,11 +176,6 @@ private:
RemoteData(ShardId shardId, BSONObj cmdObj);
/**
- * Returns the resolved host and port on which the remote command was or will be run.
- */
- const HostAndPort& getTargetHost() const;
-
- /**
* Given a read preference, selects a host on which the command should be run.
*/
Status resolveShardIdToHostAndPort(const ReadPreferenceSetting& readPref);
@@ -225,7 +186,7 @@ private:
std::shared_ptr<Shard> getShard();
// ShardId of the shard to which the command will be sent.
- const ShardId shardId;
+ ShardId shardId;
// The command object to send to the remote host.
BSONObj cmdObj;
@@ -243,18 +204,61 @@ private:
// The callback handle to an outstanding request for this remote.
executor::TaskExecutor::CallbackHandle cbHandle;
+
+ // Whether this remote's result has been returned.
+ bool done = false;
};
/**
- * Used internally to determine if the ARS should attempt to retry any requests. Is set to true
- * when:
- * - interrupt() or kill() is called
- * - allowPartialResults is false and some remote has a non-retriable error (or exhausts its
- * retries for a retriable error).
+ * Replaces _notification with a new notification.
+ *
+ * If _stopRetrying is false, schedules retries for remotes that have had a retriable error.
+ *
+ * If any remote has successfully received a response, returns a Response for it.
+ * If any remote has an error response that can't be retried, returns a Response for it.
+ * Otherwise, returns boost::none.
+ */
+ boost::optional<Response> _ready();
+
+ /**
+ * For each remote that had a response, checks if it had a retriable error, and clears its
+ * response if so.
+ *
+ * For each remote without a response or pending request, schedules the remote request.
+ *
+ * On failure to schedule a request, signals the notification.
+ */
+ void _scheduleRequests_inlock();
+
+ /**
+ * Helper to schedule a command to a remote.
+ *
+ * The 'remoteIndex' gives the position of the remote node from which we are retrieving the
+ * batch in '_remotes'.
+ *
+ * Returns success if the command was scheduled successfully.
*/
+ Status _scheduleRequest_inlock(size_t remoteIndex);
+
+ /**
+ * The callback for a remote command.
+ *
+ * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
+ * indicates which node the response came from and where the response should be buffered.
+ *
+ * Stores the response or error in the remote and signals the notification.
+ */
+ void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
+ size_t remoteIndex);
+
+ // Used internally to determine if the ARS should attempt to retry any requests. Is set to true
+ // when stopRetrying() or kill() is called.
bool _stopRetrying = false;
// Not owned here.
+ OperationContext* _opCtx;
+
+ // Not owned here.
executor::TaskExecutor* _executor;
// The metadata obj to pass along with the command remote. Used to indicate that the command is
@@ -267,10 +271,6 @@ private:
// The readPreference to use for all requests.
ReadPreferenceSetting _readPreference;
- // If set to true, allows for skipping over hosts that have non-retriable errors or exhaust
- // their retries.
- bool _allowPartialResults = false;
-
// Must be acquired before accessing any data members.
// Must also be held when calling any of the '_inlock()' helper functions.
stdx::mutex _mutex;
@@ -281,6 +281,9 @@ private:
// A notification that gets signaled when a remote has a retriable error or the last outstanding
// response is received.
boost::optional<Notification<void>> _notification;
+
+ // Set to true when kill() is called so that it is only executed once.
+ bool _killed = false;
};
} // namespace mongo