diff options
author | Jason Carey <jcarey@argv.me> | 2018-04-19 18:15:20 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-04-24 12:02:16 -0400 |
commit | ab112a029bca9d575379d42450ea2a7e9254c6de (patch) | |
tree | c56b03e56bd6e99879c822a0a055badc510d6185 /src/mongo/s/async_requests_sender.h | |
parent | 5e118de9148bf95e7ca2de6169fcf20a071d59b4 (diff) | |
download | mongo-ab112a029bca9d575379d42450ea2a7e9254c6de.tar.gz |
SERVER-34582 Replace object level lock for ARS
ARS holds a lock during scheduling, to prevent notification during
scheduling. As an unfortunate side effect, this prevents callbacks from
resolving during scheduling. (which can cause background executors to
block in executing a callback).
This replaces the mutex with a producer consumer queue which handles
responses, and moves response handling into calls to next().
Diffstat (limited to 'src/mongo/s/async_requests_sender.h')
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 39 |
1 files changed, 20 insertions, 19 deletions
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index bafeed53555..2371ee988bb 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -39,10 +39,8 @@ #include "mongo/executor/task_executor.h" #include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/notification.h" -#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/producer_consumer_queue.h" #include "mongo/util/time_support.h" namespace mongo { @@ -205,13 +203,20 @@ private: }; /** + * Job for _handleResponse. We use a producer consumer queue to coordinate with TaskExecutors + * off thread, and this wraps up the arguments for that call. + */ + struct Job { + executor::TaskExecutor::RemoteCommandCallbackArgs cbData; + size_t remoteIndex; + }; + + /** * Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag. */ void _cancelPendingRequests(); /** - * Replaces _notification with a new notification. - * * If _stopRetrying is false, schedules retries for remotes that have had a retriable error. * * If any remote has successfully received a response, returns a Response for it. @@ -226,9 +231,9 @@ private: * * For each remote without a response or pending request, schedules the remote request. * - * On failure to schedule a request, signals the notification. + * On failure to schedule a request, pushes a noop job to the response queue. */ - void _scheduleRequests(WithLock); + void _scheduleRequests(); /** * Helper to schedule a command to a remote. @@ -238,18 +243,16 @@ private: * * Returns success if the command was scheduled successfully. */ - Status _scheduleRequest(WithLock, size_t remoteIndex); + Status _scheduleRequest(size_t remoteIndex); /** * The callback for a remote command. * - * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore - * indicates which node the response came from and where the response should be buffered. + * If the job is not set, we've failed targeting and calling this function is a noop. * - * Stores the response or error in the remote and signals the notification. + * Stores the response or error in the remote. */ - void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - size_t remoteIndex); + void _handleResponse(boost::optional<Job> job); OperationContext* _opCtx; @@ -276,15 +279,13 @@ private: // is CallbackCanceled, we promote the response status to the _interruptStatus. Status _interruptStatus = Status::OK(); - // Must be acquired before accessing the below data members. - stdx::mutex _mutex; - // Data tracking the state of our communication with each of the remote nodes. std::vector<RemoteData> _remotes; - // A notification that gets signaled when a remote is ready for processing (i.e., we failed to - // schedule a request to it or received a response from it). - boost::optional<Notification<void>> _notification; + // Thread safe queue which collects responses from the task executor for execution in next() + // + // The queue supports unset jobs for a signal to wake up and check for failure + ProducerConsumerQueue<boost::optional<Job>> _responseQueue; // Used to determine if the ARS should attempt to retry any requests. Is set to true when // stopRetrying() or cancelPendingRequests() is called. |