summaryrefslogtreecommitdiff
path: root/src/mongo/s/async_requests_sender.h
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-04-19 18:15:20 -0400
committerJason Carey <jcarey@argv.me>2018-04-24 12:02:16 -0400
commitab112a029bca9d575379d42450ea2a7e9254c6de (patch)
treec56b03e56bd6e99879c822a0a055badc510d6185 /src/mongo/s/async_requests_sender.h
parent5e118de9148bf95e7ca2de6169fcf20a071d59b4 (diff)
downloadmongo-ab112a029bca9d575379d42450ea2a7e9254c6de.tar.gz
SERVER-34582 Replace object level lock for ARS
ARS holds a lock during scheduling, to prevent notification during scheduling. As an unfortunate side effect, this prevents callbacks from resolving during scheduling. (which can cause background executors to block in executing a callback). This replaces the mutex with a producer consumer queue which handles responses, and moves response handling into calls to next().
Diffstat (limited to 'src/mongo/s/async_requests_sender.h')
-rw-r--r--src/mongo/s/async_requests_sender.h39
1 files changed, 20 insertions, 19 deletions
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index bafeed53555..2371ee988bb 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -39,10 +39,8 @@
#include "mongo/executor/task_executor.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/shard_id.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/notification.h"
-#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/producer_consumer_queue.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -205,13 +203,20 @@ private:
};
/**
+ * Job for _handleResponse. We use a producer consumer queue to coordinate with TaskExecutors
+ * off thread, and this wraps up the arguments for that call.
+ */
+ struct Job {
+ executor::TaskExecutor::RemoteCommandCallbackArgs cbData;
+ size_t remoteIndex;
+ };
+
+ /**
* Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag.
*/
void _cancelPendingRequests();
/**
- * Replaces _notification with a new notification.
- *
* If _stopRetrying is false, schedules retries for remotes that have had a retriable error.
*
* If any remote has successfully received a response, returns a Response for it.
@@ -226,9 +231,9 @@ private:
*
* For each remote without a response or pending request, schedules the remote request.
*
- * On failure to schedule a request, signals the notification.
+ * On failure to schedule a request, pushes a noop job to the response queue.
*/
- void _scheduleRequests(WithLock);
+ void _scheduleRequests();
/**
* Helper to schedule a command to a remote.
@@ -238,18 +243,16 @@ private:
*
* Returns success if the command was scheduled successfully.
*/
- Status _scheduleRequest(WithLock, size_t remoteIndex);
+ Status _scheduleRequest(size_t remoteIndex);
/**
* The callback for a remote command.
*
- * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
- * indicates which node the response came from and where the response should be buffered.
+ * If the job is not set, we've failed targeting and calling this function is a noop.
*
- * Stores the response or error in the remote and signals the notification.
+ * Stores the response or error in the remote.
*/
- void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- size_t remoteIndex);
+ void _handleResponse(boost::optional<Job> job);
OperationContext* _opCtx;
@@ -276,15 +279,13 @@ private:
// is CallbackCanceled, we promote the response status to the _interruptStatus.
Status _interruptStatus = Status::OK();
- // Must be acquired before accessing the below data members.
- stdx::mutex _mutex;
-
// Data tracking the state of our communication with each of the remote nodes.
std::vector<RemoteData> _remotes;
- // A notification that gets signaled when a remote is ready for processing (i.e., we failed to
- // schedule a request to it or received a response from it).
- boost::optional<Notification<void>> _notification;
+ // Thread safe queue which collects responses from the task executor for execution in next()
+ //
+ // The queue supports unset jobs for a signal to wake up and check for failure
+ ProducerConsumerQueue<boost::optional<Job>> _responseQueue;
// Used to determine if the ARS should attempt to retry any requests. Is set to true when
// stopRetrying() or cancelPendingRequests() is called.