diff options
author | Jason Carey <jcarey@argv.me> | 2018-04-19 18:15:20 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-05-08 10:32:15 -0400 |
commit | b9be9ba0418cb94430cc5df3524580df6fdc7903 (patch) | |
tree | 4a60a4758c3c0ee54cc0c6c95fe2f38dabcb5f80 | |
parent | d0a80a3d209817aa810e6e35429e729c30be7b54 (diff) | |
download | mongo-b9be9ba0418cb94430cc5df3524580df6fdc7903.tar.gz |
SERVER-34582 Replace object level lock for ARS
ARS holds a lock during scheduling, to prevent notification during
scheduling. As an unfortunate side effect, this prevents callbacks from
resolving during scheduling. (which can cause background executors to
block in executing a callback).
This replaces the mutex with a producer consumer queue which handles
responses, and moves response handling into calls to next().
(cherry picked from commit ab112a029bca9d575379d42450ea2a7e9254c6de)
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 63 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 39 |
2 files changed, 41 insertions, 61 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 0dc955fe119..3d76ca34dba 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -69,15 +69,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, _metadataObj = readPreference.toContainingBSON(); // Schedule the requests immediately. - - // We must create the notification before scheduling any requests, because the notification is - // signaled both on an error in scheduling the request and a request's callback. - _notification.emplace(); - - // We lock so that no callbacks signal the notification until after we are done scheduling - // requests, to prevent signaling the notification twice, which is illegal. - stdx::lock_guard<stdx::mutex> lk(_mutex); - _scheduleRequests(lk); + _scheduleRequests(); } AsyncRequestsSender::~AsyncRequestsSender() { _cancelPendingRequests(); @@ -98,7 +90,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { // Otherwise, wait for some response to be received. if (_interruptStatus.isOK()) { try { - _notification->get(_opCtx); + _handleResponse(_responseQueue.pop(_opCtx)); } catch (const AssertionException& ex) { // If the operation is interrupted, we cancel outstanding requests and switch to // waiting for the (canceled) callbacks to finish without checking for interrupts. @@ -107,25 +99,22 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { continue; } } else { - _notification->get(); + _handleResponse(_responseQueue.pop()); } } return *readyResponse; } void AsyncRequestsSender::stopRetrying() { - stdx::lock_guard<stdx::mutex> lk(_mutex); _stopRetrying = true; } bool AsyncRequestsSender::done() { - stdx::lock_guard<stdx::mutex> lk(_mutex); return std::all_of( _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; }); } void AsyncRequestsSender::_cancelPendingRequests() { - stdx::lock_guard<stdx::mutex> lk(_mutex); _stopRetrying = true; // Cancel all outstanding requests so they return immediately. @@ -137,12 +126,8 @@ void AsyncRequestsSender::_cancelPendingRequests() { } boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - _notification.emplace(); - if (!_stopRetrying) { - _scheduleRequests(lk); + _scheduleRequests(); } // Check if any remote is ready. @@ -171,7 +156,7 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { return boost::none; } -void AsyncRequestsSender::_scheduleRequests(WithLock lk) { +void AsyncRequestsSender::_scheduleRequests() { invariant(!_stopRetrying); // Schedule remote work on hosts for which we have not sent a request or need to retry. for (size_t i = 0; i < _remotes.size(); ++i) { @@ -212,21 +197,18 @@ void AsyncRequestsSender::_scheduleRequests(WithLock lk) { // If the remote does not have a response or pending request, schedule remote work for it. if (!remote.swResponse && !remote.cbHandle.isValid()) { - auto scheduleStatus = _scheduleRequest(lk, i); + auto scheduleStatus = _scheduleRequest(i); if (!scheduleStatus.isOK()) { remote.swResponse = std::move(scheduleStatus); - // Signal the notification indicating the remote had an error (we need to do this - // because no request was scheduled, so no callback for this remote will run and - // signal the notification). - if (!*_notification) { - _notification->set(); - } + // Push a noop response to the queue to indicate that a remote is ready for + // re-processing due to failure. + _responseQueue.push(boost::none); } } } } -Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) { +Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -242,8 +224,9 @@ Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) { auto callbackStatus = _executor->scheduleRemoteCommand( request, - stdx::bind( - &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, remoteIndex)); + [remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { + _responseQueue.push(Job{cbData, remoteIndex}); + }); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -252,11 +235,12 @@ Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) { return Status::OK(); } -void AsyncRequestsSender::_handleResponse( - const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void AsyncRequestsSender::_handleResponse(boost::optional<Job> job) { + if (!job) { + return; + } - auto& remote = _remotes[remoteIndex]; + auto& remote = _remotes[job->remoteIndex]; invariant(!remote.swResponse); // Clear the callback handle. This indicates that we are no longer waiting on a response from @@ -264,15 +248,10 @@ void AsyncRequestsSender::_handleResponse( remote.cbHandle = executor::TaskExecutor::CallbackHandle(); // Store the response or error. - if (cbData.response.status.isOK()) { - remote.swResponse = std::move(cbData.response); + if (job->cbData.response.status.isOK()) { + remote.swResponse = std::move(job->cbData.response); } else { - remote.swResponse = std::move(cbData.response.status); - } - - // Signal the notification indicating that a remote received a response. - if (!*_notification) { - _notification->set(); + remote.swResponse = std::move(job->cbData.response.status); } } diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index 70d574a8ef7..061eb885f29 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -39,10 +39,8 @@ #include "mongo/executor/task_executor.h" #include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/notification.h" -#include "mongo/util/concurrency/with_lock.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/producer_consumer_queue.h" #include "mongo/util/time_support.h" namespace mongo { @@ -205,13 +203,20 @@ private: }; /** + * Job for _handleResponse. We use a producer consumer queue to coordinate with TaskExecutors + * off thread, and this wraps up the arguments for that call. + */ + struct Job { + executor::TaskExecutor::RemoteCommandCallbackArgs cbData; + size_t remoteIndex; + }; + + /** * Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag. */ void _cancelPendingRequests(); /** - * Replaces _notification with a new notification. - * * If _stopRetrying is false, schedules retries for remotes that have had a retriable error. * * If any remote has successfully received a response, returns a Response for it. @@ -226,9 +231,9 @@ private: * * For each remote without a response or pending request, schedules the remote request. * - * On failure to schedule a request, signals the notification. + * On failure to schedule a request, pushes a noop job to the response queue. */ - void _scheduleRequests(WithLock); + void _scheduleRequests(); /** * Helper to schedule a command to a remote. @@ -238,18 +243,16 @@ private: * * Returns success if the command was scheduled successfully. */ - Status _scheduleRequest(WithLock, size_t remoteIndex); + Status _scheduleRequest(size_t remoteIndex); /** * The callback for a remote command. * - * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore - * indicates which node the response came from and where the response should be buffered. + * If the job is not set, we've failed targeting and calling this function is a noop. * - * Stores the response or error in the remote and signals the notification. + * Stores the response or error in the remote. */ - void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, - size_t remoteIndex); + void _handleResponse(boost::optional<Job> job); OperationContext* _opCtx; @@ -276,15 +279,13 @@ private: // is CallbackCanceled, we promote the response status to the _interruptStatus. Status _interruptStatus = Status::OK(); - // Must be acquired before accessing the below data members. - stdx::mutex _mutex; - // Data tracking the state of our communication with each of the remote nodes. std::vector<RemoteData> _remotes; - // A notification that gets signaled when a remote is ready for processing (i.e., we failed to - // schedule a request to it or received a response from it). - boost::optional<Notification<void>> _notification; + // Thread safe queue which collects responses from the task executor for execution in next() + // + // The queue supports unset jobs for a signal to wake up and check for failure + ProducerConsumerQueue<boost::optional<Job>> _responseQueue; // Used to determine if the ARS should attempt to retry any requests. Is set to true when // stopRetrying() or cancelPendingRequests() is called. |