summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2021-02-01 19:06:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-02 15:12:01 +0000
commit2f38d5622410a49d2e7cf2a5a7d9d0b18893c34c (patch)
treed9b9d73f286f7aee0ae6e93dfc7414e8018c3442
parentd98db40b2c6aae986439be9181a32437282ccf6d (diff)
downloadmongo-2f38d5622410a49d2e7cf2a5a7d9d0b18893c34c.tar.gz
SERVER-46740 establishCursors() must always drain the AsyncRequestsSender::_baton
-rw-r--r--src/mongo/s/query/establish_cursors.cpp237
1 files changed, 152 insertions, 85 deletions
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 829c38246d1..e807b92423e 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -49,105 +49,172 @@
namespace mongo {
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- std::shared_ptr<executor::TaskExecutor> executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults,
- Shard::RetryPolicy retryPolicy) {
+namespace {
+
+/**
+ * This class wraps logic for establishing cursors using a MultiStatementTransactionRequestsSender.
+ */
+class CursorEstablisher {
+public:
+ CursorEstablisher(OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ const NamespaceString& nss,
+ bool allowPartialResults)
+ : _opCtx(opCtx),
+ _executor{std::move(executor)},
+ _nss(nss),
+ _allowPartialResults(allowPartialResults) {}
+
+ /**
+ * Make a RequestSender and thus send requests.
+ */
+ void sendRequests(const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ Shard::RetryPolicy retryPolicy);
+
+ /**
+ * Wait for a single response via the RequestSender.
+ */
+ void waitForResponse() noexcept;
+
+ /**
+ * Wait for all responses via the RequestSender.
+ */
+ void waitForResponses() noexcept {
+ while (!_ars->done()) {
+ waitForResponse();
+ }
+ }
+
+ /**
+ * If any request recieved a non-retriable error response and partial results are not allowed,
+ * cancel any requests that may have succeeded and throw the first such error encountered.
+ */
+ void checkForFailedRequests();
+
+ /**
+ * Take all cursors currently tracked by the CursorEstablsher.
+ */
+ std::vector<RemoteCursor> takeCursors() {
+ return std::exchange(_remoteCursors, {});
+ };
+
+private:
+ void _handleFailure(const AsyncRequestsSender::Response& response, Status status) noexcept;
+
+ OperationContext* const _opCtx;
+ const std::shared_ptr<executor::TaskExecutor> _executor;
+ const NamespaceString _nss;
+ const bool _allowPartialResults;
+
+ boost::optional<MultiStatementTransactionRequestsSender> _ars;
+
+ boost::optional<Status> _maybeFailure;
+ std::vector<RemoteCursor> _remoteCursors;
+};
+
+void CursorEstablisher::sendRequests(const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ Shard::RetryPolicy retryPolicy) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& remote : remotes) {
requests.emplace_back(remote.first, remote.second);
}
+ LOG(3) << "Establishing cursors on remotes {"
+ << "opId: " << _opCtx->getOpID() << ","
+ << "numRemotes: " << remotes.size() << "}";
+
// Send the requests
- MultiStatementTransactionRequestsSender ars(
- opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy);
+ _ars.emplace(
+ _opCtx, _executor, _nss.db().toString(), std::move(requests), readPref, retryPolicy);
+}
+
+void CursorEstablisher::waitForResponse() noexcept {
+ auto response = _ars->next();
- std::vector<RemoteCursor> remoteCursors;
try {
- // Get the responses
- while (!ars.done()) {
- try {
- auto response = ars.next();
- // Note the shardHostAndPort may not be populated if there was an error, so be sure
- // to do this after parsing the cursor response to ensure the response was ok.
- // Additionally, be careful not to push into 'remoteCursors' until we are sure we
- // have a valid cursor, since the error handling path will attempt to clean up
- // anything in 'remoteCursors'
- auto cursors = CursorResponse::parseFromBSONMany(
- uassertStatusOK(std::move(response.swResponse)).data);
-
- for (auto& cursor : cursors) {
- if (cursor.isOK()) {
- RemoteCursor remoteCursor;
- remoteCursor.setCursorResponse(std::move(cursor.getValue()));
- remoteCursor.setShardId(std::move(response.shardId));
- remoteCursor.setHostAndPort(*response.shardHostAndPort);
- remoteCursors.push_back(std::move(remoteCursor));
- }
- }
-
- // Throw if there is any error and then the catch block below will do the cleanup.
- for (auto& cursor : cursors) {
- uassertStatusOK(cursor.getStatus());
- }
-
- } catch (const ExceptionForCat<ErrorCategory::RetriableError>&) {
- // Retriable errors are swallowed if 'allowPartialResults' is true.
- if (allowPartialResults) {
- continue;
- }
- throw; // Fail this loop.
- } catch (const ExceptionFor<ErrorCodes::FailedToSatisfyReadPreference>&) {
- // The errors marked as retriable errors are meant to correspond to the driver's
- // spec (see SERVER-42908), but targeting a replica set shard can fail with
- // FailedToSatisfyReadPreference, which is not a retriable error in the driver's
- // spec, so we swallow it separately here if allowPartialResults is true.
- if (allowPartialResults) {
- continue;
- }
- throw; // Fail this loop.
- }
- }
- return remoteCursors;
- } catch (const DBException&) {
- // If one of the remotes had an error, we make a best effort to finish retrieving responses
- // for other requests that were already sent, so that we can send killCursors to any cursors
- // that we know were established.
- try {
- // Do not schedule any new requests.
- ars.stopRetrying();
-
- // Collect responses from all requests that were already sent.
- while (!ars.done()) {
- auto response = ars.next();
-
- // Check if the response contains an established cursor, and if so, store it.
- StatusWith<CursorResponse> swCursorResponse(
- response.swResponse.isOK()
- ? CursorResponse::parseFromBSON(response.swResponse.getValue().data)
- : response.swResponse.getStatus());
-
- if (swCursorResponse.isOK()) {
- RemoteCursor cursor;
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- cursor.setCursorResponse(std::move(swCursorResponse.getValue()));
- remoteCursors.push_back(std::move(cursor));
- }
+ // Note the shardHostAndPort may not be populated if there was an error, so be sure
+ // to do this after parsing the cursor response to ensure the response was ok.
+ // Additionally, be careful not to push into 'remoteCursors' until we are sure we
+ // have a valid cursor, since the error handling path will attempt to clean up
+ // anything in 'remoteCursors'
+ auto responseData = uassertStatusOK(std::move(response.swResponse)).data;
+ auto cursors = CursorResponse::parseFromBSONMany(std::move(responseData));
+
+ bool hadValidCursor = false;
+ for (auto& cursor : cursors) {
+ if (!cursor.isOK()) {
+ _handleFailure(response, cursor.getStatus());
+ continue;
}
- // Schedule killCursors against all cursors that were established.
- killRemoteCursors(opCtx, executor.get(), std::move(remoteCursors), nss);
- } catch (const DBException&) {
- // Ignore the new error and rethrow the original one.
+ hadValidCursor = true;
+
+ RemoteCursor remoteCursor;
+ remoteCursor.setCursorResponse(std::move(cursor.getValue()));
+ remoteCursor.setShardId(response.shardId);
+ remoteCursor.setHostAndPort(*response.shardHostAndPort);
+ _remoteCursors.emplace_back(std::move(remoteCursor));
}
+ } catch (const DBException& ex) {
+ _handleFailure(response, ex.toStatus());
+ }
+}
+
+void CursorEstablisher::checkForFailedRequests() {
+ if (!_maybeFailure) {
+ // If we saw no failures, there is nothing to do.
+ return;
+ }
+
+ LOG(0) << "Unable to establish remote cursors - {"
+ << "error: " << *_maybeFailure << ", "
+ << "numActiveRemotes: " << _remoteCursors.size() << "}";
+
+ // Schedule killCursors against all cursors that were established.
+ killRemoteCursors(_opCtx, _executor.get(), std::move(_remoteCursors), _nss);
+
+ // Throw our failure.
+ uassertStatusOK(*_maybeFailure);
+}
+
+void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& response,
+ Status status) noexcept {
+ LOG(3) << "Experienced a failure while establishing cursors - " << status;
+ if (_maybeFailure) {
+ // If we've already failed, just log and move on.
+ return;
+ }
- throw;
+ // Retriable errors are swallowed if '_allowPartialResults' is true. Targeting shard replica
+ // sets can also throw FailedToSatisfyReadPreference, so we swallow it too.
+ bool isEligibleException = (ErrorCodes::isRetriableError(status.code()) ||
+ status.code() == ErrorCodes::FailedToSatisfyReadPreference);
+ if (_allowPartialResults && isEligibleException) {
+ // This exception is eligible to be swallowed.
+ return;
}
+
+ // Do not schedule any new requests.
+ _ars->stopRetrying();
+ _maybeFailure = std::move(status);
+}
+} // namespace
+
+std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults,
+ Shard::RetryPolicy retryPolicy) {
+ auto establisher = CursorEstablisher(opCtx, executor, nss, allowPartialResults);
+ establisher.sendRequests(readPref, remotes, retryPolicy);
+ establisher.waitForResponses();
+ establisher.checkForFailedRequests();
+ return establisher.takeCursors();
}
void killRemoteCursors(OperationContext* opCtx,