diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-21 16:10:11 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-03-23 16:49:57 -0400 |
commit | cb4b8b622f30e4ffdaec101c70b2ab4438d3567a (patch) | |
tree | 0f51a38fb4a823a16b671a9a1a45416ca0c89650 | |
parent | 42d945d2446e0ca0aef570ec090e4d93ede00618 (diff) | |
download | mongo-cb4b8b622f30e4ffdaec101c70b2ab4438d3567a.tar.gz |
SERVER-28353 make ARS actually interruptible from the deadline on the OperationContext
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 48 | ||||
-rw-r--r-- | src/mongo/s/async_requests_sender.h | 54 |
2 files changed, 56 insertions, 46 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index aa11a3c1216..bf1cf1bd144 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -69,17 +69,18 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, _metadataObj = metadataBuilder.obj(); // Schedule the requests immediately. + // We must create the notification before scheduling any requests, because the notification is - // signaled both on an error in scheduling the request and a request's callback. Similarly, we - // lock so that no callbacks signal the notification until after we are done scheduling - // requests, to prevent signaling the notification twice. + // signaled both on an error in scheduling the request and a request's callback. _notification.emplace(); + + // We lock so that no callbacks signal the notification until after we are done scheduling + // requests, to prevent signaling the notification twice, which is illegal. stdx::lock_guard<stdx::mutex> lk(_mutex); _scheduleRequests_inlock(); } AsyncRequestsSender::~AsyncRequestsSender() { - // Make sure any pending network I/O has been canceled. - kill(); + _cancelPendingRequests(); // Wait on remaining callbacks to run. while (!done()) { @@ -95,36 +96,45 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { boost::optional<Response> readyResponse; while (!(readyResponse = _ready())) { // Otherwise, wait for some response to be received. - _notification->get(_opCtx); + if (_checkForInterrupt) { + try { + _notification->get(_opCtx); + } catch (const UserException& ex) { + // If the operation is interrupted, we cancel outstanding requests and switch to + // waiting for the (canceled) callbacks to finish without checking for interrupts. + invariant(!_opCtx->checkForInterruptNoAssert().isOK()); + _cancelPendingRequests(); + _checkForInterrupt = false; + continue; + } + } else { + _notification->get(); + } } return *readyResponse; } -void AsyncRequestsSender::interrupt() { +void AsyncRequestsSender::stopRetrying() { stdx::lock_guard<stdx::mutex> lk(_mutex); _stopRetrying = true; } -void AsyncRequestsSender::kill() { +bool AsyncRequestsSender::done() { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_killed) { - return; - } + return std::all_of( + _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; }); +} +void AsyncRequestsSender::_cancelPendingRequests() { + stdx::lock_guard<stdx::mutex> lk(_mutex); _stopRetrying = true; + // Cancel all outstanding requests so they return immediately. for (auto& remote : _remotes) { if (remote.cbHandle.isValid()) { _executor->cancel(remote.cbHandle); } } - _killed = true; -} - -bool AsyncRequestsSender::done() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return std::all_of( - _remotes.begin(), _remotes.end(), [](const RemoteData& remote) { return remote.done; }); } boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { @@ -256,7 +266,7 @@ void AsyncRequestsSender::_handleResponse( remote.swResponse = std::move(cbData.response.status); } - // Signal the notification indicating that the remote received a response. + // Signal the notification indicating that a remote received a response. if (!*_notification) { _notification->set(); } diff --git a/src/mongo/s/async_requests_sender.h b/src/mongo/s/async_requests_sender.h index ae90e8eaded..0501ad55c8e 100644 --- a/src/mongo/s/async_requests_sender.h +++ b/src/mongo/s/async_requests_sender.h @@ -47,9 +47,8 @@ namespace mongo { /** - * The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel and - * automatically retrying on retriable errors according to a RetryPolicy. Work on remote nodes is - * accomplished by scheduling remote work in a TaskExecutor's event loop. + * The AsyncRequestsSender allows for sending requests to a set of remote shards in parallel. + * Work on remote nodes is accomplished by scheduling remote work in a TaskExecutor's event loop. * * Typical usage is: * @@ -60,8 +59,8 @@ namespace mongo { * AsyncRequestsSender ars(opCtx, executor, db, requests, readPrefSetting); * * while (!ars.done()) { - * // Schedule a round of retries if needed and wait for next response or error - * auto response = ars.next(opCtx); + * // Schedule a round of retries if needed and wait for next response or error. + * auto response = ars.next(); * * if (!response.swResponse.isOK()) { * // If partial results are tolerable, process the error as needed and continue. @@ -130,7 +129,7 @@ public: /** * Ensures pending network I/O for any outstanding requests has been canceled and waits for - * outstanding requests to complete. + * outstanding callbacks to complete. */ ~AsyncRequestsSender(); @@ -142,10 +141,12 @@ public: /** * Returns the next available response or error. * - * If neither kill() nor stopRetrying() have been called, schedules retries for any remotes that - * have had a retriable error and have not exhausted their retries. + * If the operation is interrupted, the status of some responses may be CallbackCanceled. * - * Invalid to call if done() is true. + * If neither cancelPendingRequests() nor stopRetrying() have been called, schedules retries for + * any remotes that have had a retriable error and have not exhausted their retries. + * + * Note: Must only be called from one thread at a time, and invalid to call if done() is true. */ Response next(); @@ -155,15 +156,7 @@ public: * Use this if you no longer care about getting success responses, but need to do cleanup based * on responses for requests that have already been dispatched. */ - void interrupt(); - - /** - * Cancels all outstanding requests. - * - * Use this if you no longer care about getting success responses, and don't need to process - * responses for requests that have already been dispatched. - */ - void kill(); + void stopRetrying(); private: /** @@ -210,6 +203,11 @@ private: }; /** + * Cancels all outstanding requests on the TaskExecutor and sets the _stopRetrying flag. + */ + void _cancelPendingRequests(); + + /** * Replaces _notification with a new notification. * * If _stopRetrying is false, schedules retries for remotes that have had a retriable error. @@ -251,10 +249,6 @@ private: void _handleResponse(const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData, size_t remoteIndex); - // Used internally to determine if the ARS should attempt to retry any requests. Is set to true - // when stopRetrying() or kill() is called. - bool _stopRetrying = false; - // Not owned here. OperationContext* _opCtx; @@ -271,19 +265,25 @@ private: // The readPreference to use for all requests. ReadPreferenceSetting _readPreference; - // Must be acquired before accessing any data members. + // Used to determine whether to check for interrupt when waiting for a remote to be ready. + // Set to false if we are interrupted, so that we can still wait for callbacks to complete. + // This is only accessed by the thread in next(). + bool _checkForInterrupt = true; + + // Must be acquired before accessing the below data members. // Must also be held when calling any of the '_inlock()' helper functions. stdx::mutex _mutex; // Data tracking the state of our communication with each of the remote nodes. std::vector<RemoteData> _remotes; - // A notification that gets signaled when a remote has a retriable error or the last outstanding - // response is received. + // A notification that gets signaled when a remote is ready for processing (i.e., we failed to + // schedule a request to it or received a response from it). boost::optional<Notification<void>> _notification; - // Set to true when kill() is called so that it is only executed once. - bool _killed = false; + // Used to determine if the ARS should attempt to retry any requests. Is set to true when + // stopRetrying() or cancelPendingRequests() is called. + bool _stopRetrying = false; }; } // namespace mongo |