diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2021-02-01 19:06:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-02 15:12:01 +0000 |
commit | 2f38d5622410a49d2e7cf2a5a7d9d0b18893c34c (patch) | |
tree | d9b9d73f286f7aee0ae6e93dfc7414e8018c3442 | |
parent | d98db40b2c6aae986439be9181a32437282ccf6d (diff) | |
download | mongo-2f38d5622410a49d2e7cf2a5a7d9d0b18893c34c.tar.gz |
SERVER-46740 establishCursors() must always drain the AsyncRequestsSender::_baton
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 237 |
1 files changed, 152 insertions, 85 deletions
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 829c38246d1..e807b92423e 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -49,105 +49,172 @@ namespace mongo { -std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, - std::shared_ptr<executor::TaskExecutor> executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults, - Shard::RetryPolicy retryPolicy) { +namespace { + +/** + * This class wraps logic for establishing cursors using a MultiStatementTransactionRequestsSender. + */ +class CursorEstablisher { +public: + CursorEstablisher(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& nss, + bool allowPartialResults) + : _opCtx(opCtx), + _executor{std::move(executor)}, + _nss(nss), + _allowPartialResults(allowPartialResults) {} + + /** + * Make a RequestSender and thus send requests. + */ + void sendRequests(const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + Shard::RetryPolicy retryPolicy); + + /** + * Wait for a single response via the RequestSender. + */ + void waitForResponse() noexcept; + + /** + * Wait for all responses via the RequestSender. + */ + void waitForResponses() noexcept { + while (!_ars->done()) { + waitForResponse(); + } + } + + /** + * If any request recieved a non-retriable error response and partial results are not allowed, + * cancel any requests that may have succeeded and throw the first such error encountered. + */ + void checkForFailedRequests(); + + /** + * Take all cursors currently tracked by the CursorEstablsher. + */ + std::vector<RemoteCursor> takeCursors() { + return std::exchange(_remoteCursors, {}); + }; + +private: + void _handleFailure(const AsyncRequestsSender::Response& response, Status status) noexcept; + + OperationContext* const _opCtx; + const std::shared_ptr<executor::TaskExecutor> _executor; + const NamespaceString _nss; + const bool _allowPartialResults; + + boost::optional<MultiStatementTransactionRequestsSender> _ars; + + boost::optional<Status> _maybeFailure; + std::vector<RemoteCursor> _remoteCursors; +}; + +void CursorEstablisher::sendRequests(const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + Shard::RetryPolicy retryPolicy) { // Construct the requests std::vector<AsyncRequestsSender::Request> requests; for (const auto& remote : remotes) { requests.emplace_back(remote.first, remote.second); } + LOG(3) << "Establishing cursors on remotes {" + << "opId: " << _opCtx->getOpID() << "," + << "numRemotes: " << remotes.size() << "}"; + // Send the requests - MultiStatementTransactionRequestsSender ars( - opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy); + _ars.emplace( + _opCtx, _executor, _nss.db().toString(), std::move(requests), readPref, retryPolicy); +} + +void CursorEstablisher::waitForResponse() noexcept { + auto response = _ars->next(); - std::vector<RemoteCursor> remoteCursors; try { - // Get the responses - while (!ars.done()) { - try { - auto response = ars.next(); - // Note the shardHostAndPort may not be populated if there was an error, so be sure - // to do this after parsing the cursor response to ensure the response was ok. - // Additionally, be careful not to push into 'remoteCursors' until we are sure we - // have a valid cursor, since the error handling path will attempt to clean up - // anything in 'remoteCursors' - auto cursors = CursorResponse::parseFromBSONMany( - uassertStatusOK(std::move(response.swResponse)).data); - - for (auto& cursor : cursors) { - if (cursor.isOK()) { - RemoteCursor remoteCursor; - remoteCursor.setCursorResponse(std::move(cursor.getValue())); - remoteCursor.setShardId(std::move(response.shardId)); - remoteCursor.setHostAndPort(*response.shardHostAndPort); - remoteCursors.push_back(std::move(remoteCursor)); - } - } - - // Throw if there is any error and then the catch block below will do the cleanup. - for (auto& cursor : cursors) { - uassertStatusOK(cursor.getStatus()); - } - - } catch (const ExceptionForCat<ErrorCategory::RetriableError>&) { - // Retriable errors are swallowed if 'allowPartialResults' is true. - if (allowPartialResults) { - continue; - } - throw; // Fail this loop. - } catch (const ExceptionFor<ErrorCodes::FailedToSatisfyReadPreference>&) { - // The errors marked as retriable errors are meant to correspond to the driver's - // spec (see SERVER-42908), but targeting a replica set shard can fail with - // FailedToSatisfyReadPreference, which is not a retriable error in the driver's - // spec, so we swallow it separately here if allowPartialResults is true. - if (allowPartialResults) { - continue; - } - throw; // Fail this loop. - } - } - return remoteCursors; - } catch (const DBException&) { - // If one of the remotes had an error, we make a best effort to finish retrieving responses - // for other requests that were already sent, so that we can send killCursors to any cursors - // that we know were established. - try { - // Do not schedule any new requests. - ars.stopRetrying(); - - // Collect responses from all requests that were already sent. - while (!ars.done()) { - auto response = ars.next(); - - // Check if the response contains an established cursor, and if so, store it. - StatusWith<CursorResponse> swCursorResponse( - response.swResponse.isOK() - ? CursorResponse::parseFromBSON(response.swResponse.getValue().data) - : response.swResponse.getStatus()); - - if (swCursorResponse.isOK()) { - RemoteCursor cursor; - cursor.setShardId(std::move(response.shardId)); - cursor.setHostAndPort(*response.shardHostAndPort); - cursor.setCursorResponse(std::move(swCursorResponse.getValue())); - remoteCursors.push_back(std::move(cursor)); - } + // Note the shardHostAndPort may not be populated if there was an error, so be sure + // to do this after parsing the cursor response to ensure the response was ok. + // Additionally, be careful not to push into 'remoteCursors' until we are sure we + // have a valid cursor, since the error handling path will attempt to clean up + // anything in 'remoteCursors' + auto responseData = uassertStatusOK(std::move(response.swResponse)).data; + auto cursors = CursorResponse::parseFromBSONMany(std::move(responseData)); + + bool hadValidCursor = false; + for (auto& cursor : cursors) { + if (!cursor.isOK()) { + _handleFailure(response, cursor.getStatus()); + continue; } - // Schedule killCursors against all cursors that were established. - killRemoteCursors(opCtx, executor.get(), std::move(remoteCursors), nss); - } catch (const DBException&) { - // Ignore the new error and rethrow the original one. + hadValidCursor = true; + + RemoteCursor remoteCursor; + remoteCursor.setCursorResponse(std::move(cursor.getValue())); + remoteCursor.setShardId(response.shardId); + remoteCursor.setHostAndPort(*response.shardHostAndPort); + _remoteCursors.emplace_back(std::move(remoteCursor)); } + } catch (const DBException& ex) { + _handleFailure(response, ex.toStatus()); + } +} + +void CursorEstablisher::checkForFailedRequests() { + if (!_maybeFailure) { + // If we saw no failures, there is nothing to do. + return; + } + + LOG(0) << "Unable to establish remote cursors - {" + << "error: " << *_maybeFailure << ", " + << "numActiveRemotes: " << _remoteCursors.size() << "}"; + + // Schedule killCursors against all cursors that were established. + killRemoteCursors(_opCtx, _executor.get(), std::move(_remoteCursors), _nss); + + // Throw our failure. + uassertStatusOK(*_maybeFailure); +} + +void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& response, + Status status) noexcept { + LOG(3) << "Experienced a failure while establishing cursors - " << status; + if (_maybeFailure) { + // If we've already failed, just log and move on. + return; + } - throw; + // Retriable errors are swallowed if '_allowPartialResults' is true. Targeting shard replica + // sets can also throw FailedToSatisfyReadPreference, so we swallow it too. + bool isEligibleException = (ErrorCodes::isRetriableError(status.code()) || + status.code() == ErrorCodes::FailedToSatisfyReadPreference); + if (_allowPartialResults && isEligibleException) { + // This exception is eligible to be swallowed. + return; } + + // Do not schedule any new requests. + _ars->stopRetrying(); + _maybeFailure = std::move(status); +} +} // namespace + +std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults, + Shard::RetryPolicy retryPolicy) { + auto establisher = CursorEstablisher(opCtx, executor, nss, allowPartialResults); + establisher.sendRequests(readPref, remotes, retryPolicy); + establisher.waitForResponses(); + establisher.checkForFailedRequests(); + return establisher.takeCursors(); } void killRemoteCursors(OperationContext* opCtx, |