summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-04-19 18:15:20 -0400
committerJason Carey <jcarey@argv.me>2018-05-08 10:32:15 -0400
commitb9be9ba0418cb94430cc5df3524580df6fdc7903 (patch)
tree4a60a4758c3c0ee54cc0c6c95fe2f38dabcb5f80
parentd0a80a3d209817aa810e6e35429e729c30be7b54 (diff)
downloadmongo-b9be9ba0418cb94430cc5df3524580df6fdc7903.tar.gz
SERVER-34582 Replace object level lock for ARS
ARS holds a lock during scheduling, to prevent notification during scheduling. As an unfortunate side effect, this prevents callbacks from resolving during scheduling. (which can cause background executors to block in executing a callback). This replaces the mutex with a producer consumer queue which handles responses, and moves response handling into calls to next(). (cherry picked from commit ab112a029bca9d575379d42450ea2a7e9254c6de)
-rw-r--r--src/mongo/s/async_requests_sender.cpp63
-rw-r--r--src/mongo/s/async_requests_sender.h39
2 files changed, 41 insertions, 61 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index 0dc955fe119..3d76ca34dba 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -69,15 +69,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
_metadataObj = readPreference.toContainingBSON();
// Schedule the requests immediately.
-
- // We must create the notification before scheduling any requests, because the notification is
- // signaled both on an error in scheduling the request and a request's callback.
- _notification.emplace();
-
- // We lock so that no callbacks signal the notification until after we are done scheduling
- // requests, to prevent signaling the notification twice, which is illegal.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _scheduleRequests(lk);
+ _scheduleRequests();
}
AsyncRequestsSender::~AsyncRequestsSender() {
_cancelPendingRequests();
@@ -98,7 +90,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
// Otherwise, wait for some response to be received.
if (_interruptStatus.isOK()) {
try {
- _notification->get(_opCtx);
+ _handleResponse(_responseQueue.pop(_opCtx));
} catch (const AssertionException& ex) {
// If the operation is interrupted, we cancel outstanding requests and switch to
// waiting for the (canceled) callbacks to finish without checking for interrupts.
@@ -107,25 +99,22 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
continue;
}
} else {
- _notification->get();
+ _handleResponse(_responseQueue.pop());
}
}
return *readyResponse;
}
void AsyncRequestsSender::stopRetrying() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
_stopRetrying = true;
}
bool AsyncRequestsSender::done() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
return std::all_of(
_remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; });
}
void AsyncRequestsSender::_cancelPendingRequests() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
_stopRetrying = true;
// Cancel all outstanding requests so they return immediately.
@@ -137,12 +126,8 @@ void AsyncRequestsSender::_cancelPendingRequests() {
}
boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- _notification.emplace();
-
if (!_stopRetrying) {
- _scheduleRequests(lk);
+ _scheduleRequests();
}
// Check if any remote is ready.
@@ -171,7 +156,7 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
return boost::none;
}
-void AsyncRequestsSender::_scheduleRequests(WithLock lk) {
+void AsyncRequestsSender::_scheduleRequests() {
invariant(!_stopRetrying);
// Schedule remote work on hosts for which we have not sent a request or need to retry.
for (size_t i = 0; i < _remotes.size(); ++i) {
@@ -212,21 +197,18 @@ void AsyncRequestsSender::_scheduleRequests(WithLock lk) {
// If the remote does not have a response or pending request, schedule remote work for it.
if (!remote.swResponse && !remote.cbHandle.isValid()) {
- auto scheduleStatus = _scheduleRequest(lk, i);
+ auto scheduleStatus = _scheduleRequest(i);
if (!scheduleStatus.isOK()) {
remote.swResponse = std::move(scheduleStatus);
- // Signal the notification indicating the remote had an error (we need to do this
- // because no request was scheduled, so no callback for this remote will run and
- // signal the notification).
- if (!*_notification) {
- _notification->set();
- }
+ // Push a noop response to the queue to indicate that a remote is ready for
+ // re-processing due to failure.
+ _responseQueue.push(boost::none);
}
}
}
}
-Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) {
+Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) {
auto& remote = _remotes[remoteIndex];
invariant(!remote.cbHandle.isValid());
@@ -242,8 +224,9 @@ Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) {
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
- stdx::bind(
- &AsyncRequestsSender::_handleResponse, this, stdx::placeholders::_1, remoteIndex));
+ [remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {
+ _responseQueue.push(Job{cbData, remoteIndex});
+ });
if (!callbackStatus.isOK()) {
return callbackStatus.getStatus();
}
@@ -252,11 +235,12 @@ Status AsyncRequestsSender::_scheduleRequest(WithLock, size_t remoteIndex) {
return Status::OK();
}
-void AsyncRequestsSender::_handleResponse(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void AsyncRequestsSender::_handleResponse(boost::optional<Job> job) {
+ if (!job) {
+ return;
+ }
- auto& remote = _remotes[remoteIndex];
+ auto& remote = _remotes[job->remoteIndex];
invariant(!remote.swResponse);
// Clear the callback handle. This indicates that we are no longer waiting on a response from
@@ -264,15 +248,10 @@ void AsyncRequestsSender::_handleResponse(
remote.cbHandle = executor::TaskExecutor::CallbackHandle();
// Store the response or error.
- if (cbData.response.status.isOK()) {
- remote.swResponse = std::move(cbData.response);
+ if (job->cbData.response.status.isOK()) {
+ remote.swResponse = std::move(job->cbData.response);
} else {
- remote.swResponse = std::move(cbData.response.status);
- }
-
- // Signal the notification indicating that a remote received a response.
- if (!*_notification) {
- _notification->set();
+ remote.swResponse = std::move(job->cbData.response.status);
}
}
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index 70d574a8ef7..061eb885f29 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -39,10 +39,8 @@
#include "mongo/executor/task_executor.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/shard_id.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/util/concurrency/notification.h"
-#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/producer_consumer_queue.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -205,13 +203,20 @@ private:
};
/**
+ * Job for _handleResponse. We use a producer consumer queue to coordinate with TaskExecutors
+ * off thread, and this wraps up the arguments for that call.
+ */
+ struct Job {
+ executor::TaskExecutor::RemoteCommandCallbackArgs cbData;
+ size_t remoteIndex;
+ };
+
+ /**
* Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag.
*/
void _cancelPendingRequests();
/**
- * Replaces _notification with a new notification.
- *
* If _stopRetrying is false, schedules retries for remotes that have had a retriable error.
*
* If any remote has successfully received a response, returns a Response for it.
@@ -226,9 +231,9 @@ private:
*
* For each remote without a response or pending request, schedules the remote request.
*
- * On failure to schedule a request, signals the notification.
+ * On failure to schedule a request, pushes a noop job to the response queue.
*/
- void _scheduleRequests(WithLock);
+ void _scheduleRequests();
/**
* Helper to schedule a command to a remote.
@@ -238,18 +243,16 @@ private:
*
* Returns success if the command was scheduled successfully.
*/
- Status _scheduleRequest(WithLock, size_t remoteIndex);
+ Status _scheduleRequest(size_t remoteIndex);
/**
* The callback for a remote command.
*
- * 'remoteIndex' is the position of the relevant remote node in '_remotes', and therefore
- * indicates which node the response came from and where the response should be buffered.
+ * If the job is not set, we've failed targeting and calling this function is a noop.
*
- * Stores the response or error in the remote and signals the notification.
+ * Stores the response or error in the remote.
*/
- void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
- size_t remoteIndex);
+ void _handleResponse(boost::optional<Job> job);
OperationContext* _opCtx;
@@ -276,15 +279,13 @@ private:
// is CallbackCanceled, we promote the response status to the _interruptStatus.
Status _interruptStatus = Status::OK();
- // Must be acquired before accessing the below data members.
- stdx::mutex _mutex;
-
// Data tracking the state of our communication with each of the remote nodes.
std::vector<RemoteData> _remotes;
- // A notification that gets signaled when a remote is ready for processing (i.e., we failed to
- // schedule a request to it or received a response from it).
- boost::optional<Notification<void>> _notification;
+ // Thread safe queue which collects responses from the task executor for execution in next()
+ //
+ // The queue supports unset jobs for a signal to wake up and check for failure
+ ProducerConsumerQueue<boost::optional<Job>> _responseQueue;
// Used to determine if the ARS should attempt to retry any requests. Is set to true when
// stopRetrying() or cancelPendingRequests() is called.