summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2021-02-01 19:06:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-02 14:45:42 +0000
commit53ee01d154106c70f526651858a7667df93a6aa2 (patch)
tree20eaec668035e0ba1a491eb26213e30655f934d6
parentfa69befc30de4f5368f82a6343019000f62a47c6 (diff)
downloadmongo-53ee01d154106c70f526651858a7667df93a6aa2.tar.gz
SERVER-46740 establishCursors() must always drain the AsyncRequestsSender::_baton
-rw-r--r--src/mongo/s/query/establish_cursors.cpp414
-rw-r--r--src/mongo/s/query/establish_cursors_test.cpp50
2 files changed, 247 insertions, 217 deletions
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index a4f219ed5c6..a49df8b5dbd 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -33,6 +33,8 @@
#include "mongo/s/query/establish_cursors.h"
+#include <set>
+
#include "mongo/client/connpool.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/client/remote_command_targeter.h"
@@ -50,213 +52,259 @@ namespace mongo {
namespace {
-void killOpOnShards(std::shared_ptr<executor::TaskExecutor> executor,
- const NamespaceString& nss,
- std::vector<HostAndPort> remotes,
- const ReadPreferenceSetting& readPref,
- UUID opKey) noexcept {
- try {
- ThreadClient tc("establishCursors cleanup", getGlobalServiceContext());
- auto opCtx = tc->makeOperationContext();
-
- for (auto&& host : remotes) {
- executor::RemoteCommandRequest request(
- host,
- "admin",
- BSON("_killOperations" << 1 << "operationKeys" << BSON_ARRAY(opKey)),
- opCtx.get(),
- executor::RemoteCommandRequestBase::kNoTimeout,
- boost::none,
- executor::RemoteCommandRequestBase::FireAndForgetMode::kOn);
-
- // We do not process the response to the killOperations request (we make a good-faith
- // attempt at cleaning up the cursors, but ignore any returned errors).
- uassertStatusOK(executor->scheduleRemoteCommand(request, [host](auto const& args) {
- if (!args.response.isOK()) {
- LOGV2_DEBUG(4625504,
- 2,
- "killOperations for {remoteHost} failed with {error}",
- "killOperations failed",
- "remoteHost"_attr = host.toString(),
- "error"_attr = args.response);
- return;
- }
- }));
+/**
+ * This class wraps logic for establishing cursors using a MultiStatementTransactionRequestsSender.
+ */
+class CursorEstablisher {
+public:
+ CursorEstablisher(OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ const NamespaceString& nss,
+ bool allowPartialResults)
+ : _opCtx(opCtx),
+ _executor{std::move(executor)},
+ _nss(nss),
+ _allowPartialResults(allowPartialResults),
+ _opKey{UUID::gen()} {}
+
+ /**
+ * Make a RequestSender and thus send requests.
+ */
+ void sendRequests(const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ Shard::RetryPolicy retryPolicy);
+
+ /**
+ * Wait for a single response via the RequestSender.
+ */
+ void waitForResponse() noexcept;
+
+ /**
+ * Wait for all responses via the RequestSender.
+ */
+ void waitForResponses() noexcept {
+ while (!_ars->done()) {
+ waitForResponse();
}
- } catch (const AssertionException& ex) {
- LOGV2_DEBUG(4625503,
- 2,
- "Failed to cleanup remote operations: {error}",
- "Failed to cleanup remote operations",
- "error"_attr = ex.toStatus());
}
-}
-
-} // namespace
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- std::shared_ptr<executor::TaskExecutor> executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults,
- Shard::RetryPolicy retryPolicy) {
+ /**
+ * If any request recieved a non-retriable error response and partial results are not allowed,
+ * cancel any requests that may have succeeded and throw the first such error encountered.
+ */
+ void checkForFailedRequests();
+
+ /**
+ * Take all cursors currently tracked by the CursorEstablsher.
+ */
+ std::vector<RemoteCursor> takeCursors() {
+ return std::exchange(_remoteCursors, {});
+ };
+
+private:
+ void _handleFailure(const AsyncRequestsSender::Response& response, Status status) noexcept;
+ static void _killOpOnShards(ServiceContext* srvCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ OperationKey opKey,
+ std::set<HostAndPort> remotes) noexcept;
+
+ OperationContext* const _opCtx;
+ const std::shared_ptr<executor::TaskExecutor> _executor;
+ const NamespaceString _nss;
+ const bool _allowPartialResults;
+
+ const OperationKey _opKey;
+
+ boost::optional<MultiStatementTransactionRequestsSender> _ars;
+
+ boost::optional<Status> _maybeFailure;
+ std::vector<RemoteCursor> _remoteCursors;
+ std::vector<HostAndPort> _remotesToClean;
+};
+
+void CursorEstablisher::sendRequests(const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ Shard::RetryPolicy retryPolicy) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
- // Generate an OperationKey to attach to each remote request. This will allow us to kill any
- // outstanding requests in case we're interrupted or one of the remotes returns an error. Note
- // that although the opCtx may have an OperationKey set on it already, do not inherit it here
- // because we may target ourselves which implies the same node receiving multiple operations
- // with the same opKey.
+ // Attach our OperationKey to each remote request. This will allow us to kill any outstanding
+ // requests in case we're interrupted or one of the remotes returns an error. Note that although
+ // the opCtx may have an OperationKey set on it already, do not inherit it here because we may
+ // target ourselves which implies the same node receiving multiple operations with the same
+ // opKey.
// TODO SERVER-47261 management of the opKey should move to the ARS.
- auto opKey = UUID::gen();
for (const auto& remote : remotes) {
BSONObjBuilder requestWithOpKey(remote.second);
- opKey.appendToBuilder(&requestWithOpKey, "clientOperationKey");
+ _opKey.appendToBuilder(&requestWithOpKey, "clientOperationKey");
requests.emplace_back(remote.first, requestWithOpKey.obj());
}
- LOGV2_DEBUG(
- 4625502,
- 3,
- "Establishing cursors on {opId} for {numRemotes} remotes with operation key {opKey}",
- "Establishing cursors on remotes",
- "opId"_attr = opCtx->getOpID(),
- "numRemotes"_attr = remotes.size(),
- "opKey"_attr = opKey);
+ LOGV2_DEBUG(4625502,
+ 3,
+ "Establishing cursors on remotes",
+ "opId"_attr = _opCtx->getOpID(),
+ "numRemotes"_attr = remotes.size(),
+ "opKey"_attr = _opKey);
// Send the requests
- MultiStatementTransactionRequestsSender ars(
- opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy);
-
- std::vector<RemoteCursor> remoteCursors;
+ _ars.emplace(
+ _opCtx, _executor, _nss.db().toString(), std::move(requests), readPref, retryPolicy);
+}
- // Keep track of any remotes which may have an open cursor.
- std::vector<HostAndPort> remotesToClean;
+void CursorEstablisher::waitForResponse() noexcept {
+ auto response = _ars->next();
+ if (response.shardHostAndPort)
+ _remotesToClean.push_back(*response.shardHostAndPort);
try {
- // Get the responses
- while (!ars.done()) {
- auto response = ars.next();
-
- try {
- if (response.shardHostAndPort)
- remotesToClean.push_back(*response.shardHostAndPort);
-
- // Note the shardHostAndPort may not be populated if there was an error, so be sure
- // to do this after parsing the cursor response to ensure the response was ok.
- // Additionally, be careful not to push into 'remoteCursors' until we are sure we
- // have a valid cursor.
- auto cursors = CursorResponse::parseFromBSONMany(
- uassertStatusOK(std::move(response.swResponse)).data);
-
- for (auto& cursor : cursors) {
- if (cursor.isOK()) {
- RemoteCursor remoteCursor;
- remoteCursor.setCursorResponse(std::move(cursor.getValue()));
- remoteCursor.setShardId(std::move(response.shardId));
- remoteCursor.setHostAndPort(*response.shardHostAndPort);
- remoteCursors.push_back(std::move(remoteCursor));
- } else {
- // Remote responded with a failure, do not attempt to clean up.
- remotesToClean.erase(std::remove(remotesToClean.begin(),
- remotesToClean.end(),
- *response.shardHostAndPort));
- }
- }
-
- // Throw if there is any error and then the catch block below will do the cleanup.
- for (auto& cursor : cursors) {
- uassertStatusOK(cursor.getStatus());
- }
- } catch (const AssertionException& ex) {
- // Retriable errors are swallowed if 'allowPartialResults' is true. Targeting shard
- // replica sets can also throw FailedToSatisfyReadPreference, so we swallow it too.
- bool isEligibleException = (isMongosRetriableError(ex.code()) ||
- ex.code() == ErrorCodes::FailedToSatisfyReadPreference);
-
- // Fail if the exception is something other than a retriable or read preference
- // error, or if the 'allowPartialResults' query parameter was not enabled.
- if (!allowPartialResults || !isEligibleException) {
- throw;
- }
- // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an
- // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true.
- remoteCursors.push_back(
- {response.shardId.toString(), {}, {nss, CursorId{0}, {}, {}, {}, {}, true}});
+ // Note the shardHostAndPort may not be populated if there was an error, so be sure
+ // to do this after parsing the cursor response to ensure the response was ok.
+ // Additionally, be careful not to push into 'remoteCursors' until we are sure we
+ // have a valid cursor, since the error handling path will attempt to clean up
+ // anything in 'remoteCursors'
+ auto responseData = uassertStatusOK(std::move(response.swResponse)).data;
+ auto cursors = CursorResponse::parseFromBSONMany(std::move(responseData));
+
+ bool hadValidCursor = false;
+ for (auto& cursor : cursors) {
+ if (!cursor.isOK()) {
+ _handleFailure(response, cursor.getStatus());
+ continue;
}
+
+ hadValidCursor = true;
+
+ RemoteCursor remoteCursor;
+ remoteCursor.setCursorResponse(std::move(cursor.getValue()));
+ remoteCursor.setShardId(response.shardId);
+ remoteCursor.setHostAndPort(*response.shardHostAndPort);
+ _remoteCursors.emplace_back(std::move(remoteCursor));
}
- return remoteCursors;
+
+ if (response.shardHostAndPort && !hadValidCursor) {
+ // If we never got a valid cursor, we do not need to clean the host.
+ _remotesToClean.pop_back();
+ }
+
} catch (const DBException& ex) {
- // If one of the remotes had an error, we make a best effort to finish retrieving responses
- // for other requests that were already sent.
- try {
- // Do not schedule any new requests.
- ars.stopRetrying();
-
- // Collect responses from all requests that were already sent.
- while (!ars.done()) {
- auto response = ars.next();
-
- if (response.shardHostAndPort)
- remotesToClean.push_back(*response.shardHostAndPort);
-
- if (response.swResponse.isOK()) {
- // Check if the response contains an established cursor, and if so, store it.
- StatusWith<CursorResponse> swCursorResponse =
- CursorResponse::parseFromBSON(response.swResponse.getValue().data);
-
- if (swCursorResponse.isOK()) {
- RemoteCursor cursor;
- cursor.setShardId(std::move(response.shardId));
- cursor.setHostAndPort(*response.shardHostAndPort);
- cursor.setCursorResponse(std::move(swCursorResponse.getValue()));
- remoteCursors.push_back(std::move(cursor));
- } else {
- // Remote responded with a failure, do not attempt to clean up.
- remotesToClean.erase(std::remove(remotesToClean.begin(),
- remotesToClean.end(),
- *response.shardHostAndPort));
- }
- }
- }
+ _handleFailure(response, ex.toStatus());
+ }
+}
+
+void CursorEstablisher::checkForFailedRequests() {
+ if (!_maybeFailure) {
+ // If we saw no failures, there is nothing to do.
+ return;
+ }
+
+ LOGV2(4625501,
+ "Unable to establish remote cursors",
+ "error"_attr = *_maybeFailure,
+ "nRemotes"_attr = _remotesToClean.size());
+
+ if (_remotesToClean.empty()) {
+ // If we don't have any remotes to clean, throw early.
+ uassertStatusOK(*_maybeFailure);
+ }
- LOGV2(4625501,
- "ARS failed with {error}, attempting to clean up {nRemotes} remote operations",
- "ARS failed. Attempting to clean up remote operations",
- "error"_attr = ex.toStatus(),
- "nRemotes"_attr = remotesToClean.size());
-
- // Check whether we have any remote operations to kill.
- if (remotesToClean.size() > 0) {
- // Schedule killOperations against all cursors that were established. Make sure to
- // capture arguments by value since the cleanup work may get scheduled after
- // returning from this function.
- MONGO_COMPILER_VARIABLE_UNUSED auto cbHandle = uassertStatusOK(
- executor->scheduleWork([executor,
- nss,
- readPref,
- remotesToClean{std::move(remotesToClean)},
- opKey{std::move(opKey)}](
- const executor::TaskExecutor::CallbackArgs& args) {
- if (!args.status.isOK()) {
- LOGV2_WARNING(48038,
- "Failed to schedule remote cursor cleanup: {error}",
- "Failed to schedule remote cursor cleanup",
- "error"_attr = args.status);
- return;
- }
- killOpOnShards(
- executor, nss, std::move(remotesToClean), readPref, std::move(opKey));
- }));
+ // Filter out duplicate hosts.
+ auto remotes = std::set<HostAndPort>(_remotesToClean.begin(), _remotesToClean.end());
+
+ // Schedule killOperations against all cursors that were established. Make sure to
+ // capture arguments by value since the cleanup work may get scheduled after
+ // returning from this function.
+ uassertStatusOK(_executor->scheduleWork(
+ [svcCtx = _opCtx->getServiceContext(),
+ executor = _executor,
+ opKey = _opKey,
+ remotes = std::move(remotes)](const executor::TaskExecutor::CallbackArgs& args) mutable {
+ if (!args.status.isOK()) {
+ LOGV2_WARNING(
+ 48038, "Failed to schedule remote cursor cleanup", "error"_attr = args.status);
+ return;
}
- } catch (const DBException&) {
- // Ignore the new error and rethrow the original one.
- }
+ _killOpOnShards(svcCtx, std::move(executor), std::move(opKey), std::move(remotes));
+ }));
+
+ // Throw our failure.
+ uassertStatusOK(*_maybeFailure);
+}
+
+void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& response,
+ Status status) noexcept {
+ LOGV2_DEBUG(
+ 4674000, 3, "Experienced a failure while establishing cursors", "error"_attr = status);
+ if (_maybeFailure) {
+ // If we've already failed, just log and move on.
+ return;
+ }
+
+ // Retriable errors are swallowed if '_allowPartialResults' is true. Targeting shard replica
+ // sets can also throw FailedToSatisfyReadPreference, so we swallow it too.
+ bool isEligibleException = (isMongosRetriableError(status.code()) ||
+ status.code() == ErrorCodes::FailedToSatisfyReadPreference);
+ if (_allowPartialResults && isEligibleException) {
+ // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an
+ // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true.
+ _remoteCursors.push_back(
+ {response.shardId.toString(), {}, {_nss, CursorId{0}, {}, {}, {}, {}, true}});
+ return;
+ }
+
+ // Do not schedule any new requests.
+ _ars->stopRetrying();
+ _maybeFailure = std::move(status);
+}
- throw;
+void CursorEstablisher::_killOpOnShards(ServiceContext* srvCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ OperationKey opKey,
+ std::set<HostAndPort> remotes) noexcept try {
+ ThreadClient tc("establishCursors cleanup", srvCtx);
+ auto opCtx = tc->makeOperationContext();
+
+ for (auto&& host : remotes) {
+ executor::RemoteCommandRequest request(
+ host,
+ "admin",
+ BSON("_killOperations" << 1 << "operationKeys" << BSON_ARRAY(opKey)),
+ opCtx.get(),
+ executor::RemoteCommandRequestBase::kNoTimeout,
+ boost::none,
+ executor::RemoteCommandRequestBase::FireAndForgetMode::kOn);
+
+ // We do not process the response to the killOperations request (we make a good-faith
+ // attempt at cleaning up the cursors, but ignore any returned errors).
+ uassertStatusOK(executor->scheduleRemoteCommand(request, [host](auto const& args) {
+ if (!args.response.isOK()) {
+ LOGV2_DEBUG(4625504,
+ 2,
+ "killOperations failed",
+ "remoteHost"_attr = host.toString(),
+ "error"_attr = args.response);
+ return;
+ }
+ }));
}
+} catch (const AssertionException& ex) {
+ LOGV2_DEBUG(4625503, 2, "Failed to cleanup remote operations", "error"_attr = ex.toStatus());
+}
+
+} // namespace
+
+std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults,
+ Shard::RetryPolicy retryPolicy) {
+ auto establisher = CursorEstablisher(opCtx, executor, nss, allowPartialResults);
+ establisher.sendRequests(readPref, remotes, retryPolicy);
+ establisher.waitForResponses();
+ establisher.checkForFailedRequests();
+ return establisher.takeCursors();
}
void killRemoteCursor(OperationContext* opCtx,
diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp
index 5091b44758b..99453cf1282 100644
--- a/src/mongo/s/query/establish_cursors_test.cpp
+++ b/src/mongo/s/query/establish_cursors_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -169,12 +170,7 @@ TEST_F(EstablishCursorsTest, SingleRemoteInterruptedWhileCommandInFlight) {
{kTestShardIds[0], cmdObj},
};
- // Hang before sending the command but after resolving the host to send it to.
- auto fp = globalFailPointRegistry().find("hangBeforeSchedulingRemoteCommand");
- invariant(fp);
- auto startCount =
- fp->setMode(FailPoint::alwaysOn, 0, BSON("hostAndPort" << kTestShardHosts[0].toString()));
-
+ auto barrier = std::make_shared<unittest::Barrier>(2);
auto future = launchAsync([&] {
ASSERT_THROWS(establishCursors(operationContext(),
executor(),
@@ -183,43 +179,29 @@ TEST_F(EstablishCursorsTest, SingleRemoteInterruptedWhileCommandInFlight) {
remotes,
false), // allowPartialResults
ExceptionFor<ErrorCodes::CursorKilled>);
+ barrier->countDownAndWait();
});
- // Verify that the failpoint is hit.
- fp->waitForTimesEntered(startCount + 1);
-
- // Now interrupt the opCtx which the cursor is running under.
- {
- stdx::lock_guard<Client> lk(*operationContext()->getClient());
- operationContext()->getServiceContext()->killOperation(
- lk, operationContext(), ErrorCodes::CursorKilled);
- }
-
- // Disable the failpoint to enable the ARS to continue. Once interrupted, it will then trigger a
- // killOperations for the two remotes.
- fp->setMode(FailPoint::off);
-
- // The OperationContext was marked as killed before the request was scheduled, however the exact
- // timing of when the interrupt condition is detected is not deterministic in this test.
- // However, since the failpoint is in a position where the remote hostAndPort is resolved, we
- // are guaranteed to get a killOperation for it but we may first see the original remote
- // request.
- auto killOpSeen = false;
onCommand([&](const RemoteCommandRequest& request) {
- if (request.dbname == "admin" && request.cmdObj.hasField("_killOperations")) {
- killOpSeen = true;
- return BSON("ok" << 1);
+ ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
+
+ // Now that our "remote" has recieved the request, interrupt the opCtx which the cursor is
+ // running under.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->getServiceContext()->killOperation(
+ lk, operationContext(), ErrorCodes::CursorKilled);
}
- // Otherwise expect the original request and mock the response.
- ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
+ // Wait for the kill to take since there is a race between response and kill.
+ barrier->countDownAndWait();
+
CursorResponse cursorResponse(_nss, CursorId(123), {});
return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
});
- if (!killOpSeen) {
- expectKillOperations(1);
- }
+ // We were interrupted so establishCursors is forced to send a killOperations out of paranoia.
+ expectKillOperations(1);
future.default_timed_get();
}