summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-03-21 16:10:11 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-03-23 16:49:57 -0400
commitcb4b8b622f30e4ffdaec101c70b2ab4438d3567a (patch)
tree0f51a38fb4a823a16b671a9a1a45416ca0c89650
parent42d945d2446e0ca0aef570ec090e4d93ede00618 (diff)
downloadmongo-cb4b8b622f30e4ffdaec101c70b2ab4438d3567a.tar.gz
SERVER-28353 make ARS actually interruptible from the deadline on the OperationContext
-rw-r--r--src/mongo/s/async_requests_sender.cpp48
-rw-r--r--src/mongo/s/async_requests_sender.h54
2 files changed, 56 insertions, 46 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index aa11a3c1216..bf1cf1bd144 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -69,17 +69,18 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx,
_metadataObj = metadataBuilder.obj();
// Schedule the requests immediately.
+
// We must create the notification before scheduling any requests, because the notification is
- // signaled both on an error in scheduling the request and a request's callback. Similarly, we
- // lock so that no callbacks signal the notification until after we are done scheduling
- // requests, to prevent signaling the notification twice.
+ // signaled both on an error in scheduling the request and a request's callback.
_notification.emplace();
+
+ // We lock so that no callbacks signal the notification until after we are done scheduling
+ // requests, to prevent signaling the notification twice, which is illegal.
stdx::lock_guard<stdx::mutex> lk(_mutex);
_scheduleRequests_inlock();
}
AsyncRequestsSender::~AsyncRequestsSender() {
- // Make sure any pending network I/O has been canceled.
- kill();
+ _cancelPendingRequests();
// Wait on remaining callbacks to run.
while (!done()) {
@@ -95,36 +96,45 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() {
boost::optional<Response> readyResponse;
while (!(readyResponse = _ready())) {
// Otherwise, wait for some response to be received.
- _notification->get(_opCtx);
+ if (_checkForInterrupt) {
+ try {
+ _notification->get(_opCtx);
+ } catch (const UserException& ex) {
+ // If the operation is interrupted, we cancel outstanding requests and switch to
+ // waiting for the (canceled) callbacks to finish without checking for interrupts.
+ invariant(!_opCtx->checkForInterruptNoAssert().isOK());
+ _cancelPendingRequests();
+ _checkForInterrupt = false;
+ continue;
+ }
+ } else {
+ _notification->get();
+ }
}
return *readyResponse;
}
-void AsyncRequestsSender::interrupt() {
+void AsyncRequestsSender::stopRetrying() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_stopRetrying = true;
}
-void AsyncRequestsSender::kill() {
+bool AsyncRequestsSender::done() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_killed) {
- return;
- }
+ return std::all_of(
+ _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; });
+}
+void AsyncRequestsSender::_cancelPendingRequests() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stopRetrying = true;
+
// Cancel all outstanding requests so they return immediately.
for (auto& remote : _remotes) {
if (remote.cbHandle.isValid()) {
_executor->cancel(remote.cbHandle);
}
}
- _killed = true;
-}
-
-bool AsyncRequestsSender::done() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return std::all_of(
- _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; });
}
boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() {
@@ -256,7 +266,7 @@ void AsyncRequestsSender::_handleResponse(
remote.swResponse = std::move(cbData.response.status);
}
- // Signal the notification indicating that the remote received a response.
+ // Signal the notification indicating that a remote received a response.
if (!*_notification) {
_notification->set();
}
diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h
index ae90e8eaded..0501ad55c8e 100644
--- a/src/mongo/s/async_requests_sender.h
+++ b/src/mongo/s/async_requests_sender.h
@@ -47,9 +47,8 @@
namespace mongo {
/**
- * The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel and
- * automatically retrying on retriable errors according to a RetryPolicy. Work on remote nodes is
- * accomplished by scheduling remote work in a TaskExecutor's event loop.
+ * The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel.
+ * Work on remote nodes is accomplished by scheduling remote work in a TaskExecutor's event loop.
*
* Typical usage is:
*
@@ -60,8 +59,8 @@ namespace mongo {
* AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting);
*
* while (!ars.done()) {
- * // Schedule a round of retries if needed and wait for next response or error
- * auto response = ars.next(opCtx);
+ * // Schedule a round of retries if needed and wait for next response or error.
+ * auto response = ars.next();
*
* if (!response.swResponse.isOK()) {
* // If partial results are tolerable, process the error as needed and continue.
@@ -130,7 +129,7 @@ public:
/**
* Ensures pending network I/O for any outstanding requests has been canceled and waits for
- * outstanding requests to complete.
+ * outstanding callbacks to complete.
*/
~AsyncRequestsSender();
@@ -142,10 +141,12 @@ public:
/**
* Returns the next available response or error.
*
- * If neither kill() nor stopRetrying() have been called, schedules retries for any remotes that
- * have had a retriable error and have not exhausted their retries.
+ * If the operation is interrupted, the status of some responses may be CallbackCanceled.
*
- * Invalid to call if done() is true.
+ * If neither cancelPendingRequests() nor stopRetrying() have been called, schedules retries for
+ * any remotes that have had a retriable error and have not exhausted their retries.
+ *
+ * Note: Must only be called from one thread at a time, and invalid to call if done() is true.
*/
Response next();
@@ -155,15 +156,7 @@ public:
* Use this if you no longer care about getting success responses, but need to do cleanup based
* on responses for requests that have already been dispatched.
*/
- void interrupt();
-
- /**
- * Cancels all outstanding requests.
- *
- * Use this if you no longer care about getting success responses, and don't need to process
- * responses for requests that have already been dispatched.
- */
- void kill();
+ void stopRetrying();
private:
/**
@@ -210,6 +203,11 @@ private:
};
/**
+ * Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag.
+ */
+ void _cancelPendingRequests();
+
+ /**
* Replaces _notification with a new notification.
*
* If _stopRetrying is false, schedules retries for remotes that have had a retriable error.
@@ -251,10 +249,6 @@ private:
void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData,
size_t remoteIndex);
- // Used internally to determine if the ARS should attempt to retry any requests. Is set to true
- // when stopRetrying() or kill() is called.
- bool _stopRetrying = false;
-
// Not owned here.
OperationContext* _opCtx;
@@ -271,19 +265,25 @@ private:
// The readPreference to use for all requests.
ReadPreferenceSetting _readPreference;
- // Must be acquired before accessing any data members.
+ // Used to determine whether to check for interrupt when waiting for a remote to be ready.
+ // Set to false if we are interrupted, so that we can still wait for callbacks to complete.
+ // This is only accessed by the thread in next().
+ bool _checkForInterrupt = true;
+
+ // Must be acquired before accessing the below data members.
// Must also be held when calling any of the '_inlock()' helper functions.
stdx::mutex _mutex;
// Data tracking the state of our communication with each of the remote nodes.
std::vector<RemoteData> _remotes;
- // A notification that gets signaled when a remote has a retriable error or the last outstanding
- // response is received.
+ // A notification that gets signaled when a remote is ready for processing (i.e., we failed to
+ // schedule a request to it or received a response from it).
boost::optional<Notification<void>> _notification;
- // Set to true when kill() is called so that it is only executed once.
- bool _killed = false;
+ // Used to determine if the ARS should attempt to retry any requests. Is set to true when
+ // stopRetrying() or cancelPendingRequests() is called.
+ bool _stopRetrying = false;
};
} // namespace mongo