diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2021-02-01 19:06:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-02 14:45:42 +0000 |
commit | 53ee01d154106c70f526651858a7667df93a6aa2 (patch) | |
tree | 20eaec668035e0ba1a491eb26213e30655f934d6 | |
parent | fa69befc30de4f5368f82a6343019000f62a47c6 (diff) | |
download | mongo-53ee01d154106c70f526651858a7667df93a6aa2.tar.gz |
SERVER-46740 establishCursors() must always drain the AsyncRequestsSender::_baton
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 414 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors_test.cpp | 50 |
2 files changed, 247 insertions, 217 deletions
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index a4f219ed5c6..a49df8b5dbd 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -33,6 +33,8 @@ #include "mongo/s/query/establish_cursors.h" +#include <set> + #include "mongo/client/connpool.h" #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/client/remote_command_targeter.h" @@ -50,213 +52,259 @@ namespace mongo { namespace { -void killOpOnShards(std::shared_ptr<executor::TaskExecutor> executor, - const NamespaceString& nss, - std::vector<HostAndPort> remotes, - const ReadPreferenceSetting& readPref, - UUID opKey) noexcept { - try { - ThreadClient tc("establishCursors cleanup", getGlobalServiceContext()); - auto opCtx = tc->makeOperationContext(); - - for (auto&& host : remotes) { - executor::RemoteCommandRequest request( - host, - "admin", - BSON("_killOperations" << 1 << "operationKeys" << BSON_ARRAY(opKey)), - opCtx.get(), - executor::RemoteCommandRequestBase::kNoTimeout, - boost::none, - executor::RemoteCommandRequestBase::FireAndForgetMode::kOn); - - // We do not process the response to the killOperations request (we make a good-faith - // attempt at cleaning up the cursors, but ignore any returned errors). - uassertStatusOK(executor->scheduleRemoteCommand(request, [host](auto const& args) { - if (!args.response.isOK()) { - LOGV2_DEBUG(4625504, - 2, - "killOperations for {remoteHost} failed with {error}", - "killOperations failed", - "remoteHost"_attr = host.toString(), - "error"_attr = args.response); - return; - } - })); +/** + * This class wraps logic for establishing cursors using a MultiStatementTransactionRequestsSender. + */ +class CursorEstablisher { +public: + CursorEstablisher(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& nss, + bool allowPartialResults) + : _opCtx(opCtx), + _executor{std::move(executor)}, + _nss(nss), + _allowPartialResults(allowPartialResults), + _opKey{UUID::gen()} {} + + /** + * Make a RequestSender and thus send requests. + */ + void sendRequests(const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + Shard::RetryPolicy retryPolicy); + + /** + * Wait for a single response via the RequestSender. + */ + void waitForResponse() noexcept; + + /** + * Wait for all responses via the RequestSender. + */ + void waitForResponses() noexcept { + while (!_ars->done()) { + waitForResponse(); } - } catch (const AssertionException& ex) { - LOGV2_DEBUG(4625503, - 2, - "Failed to cleanup remote operations: {error}", - "Failed to cleanup remote operations", - "error"_attr = ex.toStatus()); } -} - -} // namespace -std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, - std::shared_ptr<executor::TaskExecutor> executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults, - Shard::RetryPolicy retryPolicy) { + /** + * If any request recieved a non-retriable error response and partial results are not allowed, + * cancel any requests that may have succeeded and throw the first such error encountered. + */ + void checkForFailedRequests(); + + /** + * Take all cursors currently tracked by the CursorEstablsher. + */ + std::vector<RemoteCursor> takeCursors() { + return std::exchange(_remoteCursors, {}); + }; + +private: + void _handleFailure(const AsyncRequestsSender::Response& response, Status status) noexcept; + static void _killOpOnShards(ServiceContext* srvCtx, + std::shared_ptr<executor::TaskExecutor> executor, + OperationKey opKey, + std::set<HostAndPort> remotes) noexcept; + + OperationContext* const _opCtx; + const std::shared_ptr<executor::TaskExecutor> _executor; + const NamespaceString _nss; + const bool _allowPartialResults; + + const OperationKey _opKey; + + boost::optional<MultiStatementTransactionRequestsSender> _ars; + + boost::optional<Status> _maybeFailure; + std::vector<RemoteCursor> _remoteCursors; + std::vector<HostAndPort> _remotesToClean; +}; + +void CursorEstablisher::sendRequests(const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + Shard::RetryPolicy retryPolicy) { // Construct the requests std::vector<AsyncRequestsSender::Request> requests; - // Generate an OperationKey to attach to each remote request. This will allow us to kill any - // outstanding requests in case we're interrupted or one of the remotes returns an error. Note - // that although the opCtx may have an OperationKey set on it already, do not inherit it here - // because we may target ourselves which implies the same node receiving multiple operations - // with the same opKey. + // Attach our OperationKey to each remote request. This will allow us to kill any outstanding + // requests in case we're interrupted or one of the remotes returns an error. Note that although + // the opCtx may have an OperationKey set on it already, do not inherit it here because we may + // target ourselves which implies the same node receiving multiple operations with the same + // opKey. // TODO SERVER-47261 management of the opKey should move to the ARS. - auto opKey = UUID::gen(); for (const auto& remote : remotes) { BSONObjBuilder requestWithOpKey(remote.second); - opKey.appendToBuilder(&requestWithOpKey, "clientOperationKey"); + _opKey.appendToBuilder(&requestWithOpKey, "clientOperationKey"); requests.emplace_back(remote.first, requestWithOpKey.obj()); } - LOGV2_DEBUG( - 4625502, - 3, - "Establishing cursors on {opId} for {numRemotes} remotes with operation key {opKey}", - "Establishing cursors on remotes", - "opId"_attr = opCtx->getOpID(), - "numRemotes"_attr = remotes.size(), - "opKey"_attr = opKey); + LOGV2_DEBUG(4625502, + 3, + "Establishing cursors on remotes", + "opId"_attr = _opCtx->getOpID(), + "numRemotes"_attr = remotes.size(), + "opKey"_attr = _opKey); // Send the requests - MultiStatementTransactionRequestsSender ars( - opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy); - - std::vector<RemoteCursor> remoteCursors; + _ars.emplace( + _opCtx, _executor, _nss.db().toString(), std::move(requests), readPref, retryPolicy); +} - // Keep track of any remotes which may have an open cursor. - std::vector<HostAndPort> remotesToClean; +void CursorEstablisher::waitForResponse() noexcept { + auto response = _ars->next(); + if (response.shardHostAndPort) + _remotesToClean.push_back(*response.shardHostAndPort); try { - // Get the responses - while (!ars.done()) { - auto response = ars.next(); - - try { - if (response.shardHostAndPort) - remotesToClean.push_back(*response.shardHostAndPort); - - // Note the shardHostAndPort may not be populated if there was an error, so be sure - // to do this after parsing the cursor response to ensure the response was ok. - // Additionally, be careful not to push into 'remoteCursors' until we are sure we - // have a valid cursor. - auto cursors = CursorResponse::parseFromBSONMany( - uassertStatusOK(std::move(response.swResponse)).data); - - for (auto& cursor : cursors) { - if (cursor.isOK()) { - RemoteCursor remoteCursor; - remoteCursor.setCursorResponse(std::move(cursor.getValue())); - remoteCursor.setShardId(std::move(response.shardId)); - remoteCursor.setHostAndPort(*response.shardHostAndPort); - remoteCursors.push_back(std::move(remoteCursor)); - } else { - // Remote responded with a failure, do not attempt to clean up. - remotesToClean.erase(std::remove(remotesToClean.begin(), - remotesToClean.end(), - *response.shardHostAndPort)); - } - } - - // Throw if there is any error and then the catch block below will do the cleanup. - for (auto& cursor : cursors) { - uassertStatusOK(cursor.getStatus()); - } - } catch (const AssertionException& ex) { - // Retriable errors are swallowed if 'allowPartialResults' is true. Targeting shard - // replica sets can also throw FailedToSatisfyReadPreference, so we swallow it too. - bool isEligibleException = (isMongosRetriableError(ex.code()) || - ex.code() == ErrorCodes::FailedToSatisfyReadPreference); - - // Fail if the exception is something other than a retriable or read preference - // error, or if the 'allowPartialResults' query parameter was not enabled. - if (!allowPartialResults || !isEligibleException) { - throw; - } - // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an - // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true. - remoteCursors.push_back( - {response.shardId.toString(), {}, {nss, CursorId{0}, {}, {}, {}, {}, true}}); + // Note the shardHostAndPort may not be populated if there was an error, so be sure + // to do this after parsing the cursor response to ensure the response was ok. + // Additionally, be careful not to push into 'remoteCursors' until we are sure we + // have a valid cursor, since the error handling path will attempt to clean up + // anything in 'remoteCursors' + auto responseData = uassertStatusOK(std::move(response.swResponse)).data; + auto cursors = CursorResponse::parseFromBSONMany(std::move(responseData)); + + bool hadValidCursor = false; + for (auto& cursor : cursors) { + if (!cursor.isOK()) { + _handleFailure(response, cursor.getStatus()); + continue; } + + hadValidCursor = true; + + RemoteCursor remoteCursor; + remoteCursor.setCursorResponse(std::move(cursor.getValue())); + remoteCursor.setShardId(response.shardId); + remoteCursor.setHostAndPort(*response.shardHostAndPort); + _remoteCursors.emplace_back(std::move(remoteCursor)); } - return remoteCursors; + + if (response.shardHostAndPort && !hadValidCursor) { + // If we never got a valid cursor, we do not need to clean the host. + _remotesToClean.pop_back(); + } + } catch (const DBException& ex) { - // If one of the remotes had an error, we make a best effort to finish retrieving responses - // for other requests that were already sent. - try { - // Do not schedule any new requests. - ars.stopRetrying(); - - // Collect responses from all requests that were already sent. - while (!ars.done()) { - auto response = ars.next(); - - if (response.shardHostAndPort) - remotesToClean.push_back(*response.shardHostAndPort); - - if (response.swResponse.isOK()) { - // Check if the response contains an established cursor, and if so, store it. - StatusWith<CursorResponse> swCursorResponse = - CursorResponse::parseFromBSON(response.swResponse.getValue().data); - - if (swCursorResponse.isOK()) { - RemoteCursor cursor; - cursor.setShardId(std::move(response.shardId)); - cursor.setHostAndPort(*response.shardHostAndPort); - cursor.setCursorResponse(std::move(swCursorResponse.getValue())); - remoteCursors.push_back(std::move(cursor)); - } else { - // Remote responded with a failure, do not attempt to clean up. - remotesToClean.erase(std::remove(remotesToClean.begin(), - remotesToClean.end(), - *response.shardHostAndPort)); - } - } - } + _handleFailure(response, ex.toStatus()); + } +} + +void CursorEstablisher::checkForFailedRequests() { + if (!_maybeFailure) { + // If we saw no failures, there is nothing to do. + return; + } + + LOGV2(4625501, + "Unable to establish remote cursors", + "error"_attr = *_maybeFailure, + "nRemotes"_attr = _remotesToClean.size()); + + if (_remotesToClean.empty()) { + // If we don't have any remotes to clean, throw early. + uassertStatusOK(*_maybeFailure); + } - LOGV2(4625501, - "ARS failed with {error}, attempting to clean up {nRemotes} remote operations", - "ARS failed. Attempting to clean up remote operations", - "error"_attr = ex.toStatus(), - "nRemotes"_attr = remotesToClean.size()); - - // Check whether we have any remote operations to kill. - if (remotesToClean.size() > 0) { - // Schedule killOperations against all cursors that were established. Make sure to - // capture arguments by value since the cleanup work may get scheduled after - // returning from this function. - MONGO_COMPILER_VARIABLE_UNUSED auto cbHandle = uassertStatusOK( - executor->scheduleWork([executor, - nss, - readPref, - remotesToClean{std::move(remotesToClean)}, - opKey{std::move(opKey)}]( - const executor::TaskExecutor::CallbackArgs& args) { - if (!args.status.isOK()) { - LOGV2_WARNING(48038, - "Failed to schedule remote cursor cleanup: {error}", - "Failed to schedule remote cursor cleanup", - "error"_attr = args.status); - return; - } - killOpOnShards( - executor, nss, std::move(remotesToClean), readPref, std::move(opKey)); - })); + // Filter out duplicate hosts. + auto remotes = std::set<HostAndPort>(_remotesToClean.begin(), _remotesToClean.end()); + + // Schedule killOperations against all cursors that were established. Make sure to + // capture arguments by value since the cleanup work may get scheduled after + // returning from this function. + uassertStatusOK(_executor->scheduleWork( + [svcCtx = _opCtx->getServiceContext(), + executor = _executor, + opKey = _opKey, + remotes = std::move(remotes)](const executor::TaskExecutor::CallbackArgs& args) mutable { + if (!args.status.isOK()) { + LOGV2_WARNING( + 48038, "Failed to schedule remote cursor cleanup", "error"_attr = args.status); + return; } - } catch (const DBException&) { - // Ignore the new error and rethrow the original one. - } + _killOpOnShards(svcCtx, std::move(executor), std::move(opKey), std::move(remotes)); + })); + + // Throw our failure. + uassertStatusOK(*_maybeFailure); +} + +void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& response, + Status status) noexcept { + LOGV2_DEBUG( + 4674000, 3, "Experienced a failure while establishing cursors", "error"_attr = status); + if (_maybeFailure) { + // If we've already failed, just log and move on. + return; + } + + // Retriable errors are swallowed if '_allowPartialResults' is true. Targeting shard replica + // sets can also throw FailedToSatisfyReadPreference, so we swallow it too. + bool isEligibleException = (isMongosRetriableError(status.code()) || + status.code() == ErrorCodes::FailedToSatisfyReadPreference); + if (_allowPartialResults && isEligibleException) { + // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an + // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true. + _remoteCursors.push_back( + {response.shardId.toString(), {}, {_nss, CursorId{0}, {}, {}, {}, {}, true}}); + return; + } + + // Do not schedule any new requests. + _ars->stopRetrying(); + _maybeFailure = std::move(status); +} - throw; +void CursorEstablisher::_killOpOnShards(ServiceContext* srvCtx, + std::shared_ptr<executor::TaskExecutor> executor, + OperationKey opKey, + std::set<HostAndPort> remotes) noexcept try { + ThreadClient tc("establishCursors cleanup", srvCtx); + auto opCtx = tc->makeOperationContext(); + + for (auto&& host : remotes) { + executor::RemoteCommandRequest request( + host, + "admin", + BSON("_killOperations" << 1 << "operationKeys" << BSON_ARRAY(opKey)), + opCtx.get(), + executor::RemoteCommandRequestBase::kNoTimeout, + boost::none, + executor::RemoteCommandRequestBase::FireAndForgetMode::kOn); + + // We do not process the response to the killOperations request (we make a good-faith + // attempt at cleaning up the cursors, but ignore any returned errors). + uassertStatusOK(executor->scheduleRemoteCommand(request, [host](auto const& args) { + if (!args.response.isOK()) { + LOGV2_DEBUG(4625504, + 2, + "killOperations failed", + "remoteHost"_attr = host.toString(), + "error"_attr = args.response); + return; + } + })); } +} catch (const AssertionException& ex) { + LOGV2_DEBUG(4625503, 2, "Failed to cleanup remote operations", "error"_attr = ex.toStatus()); +} + +} // namespace + +std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults, + Shard::RetryPolicy retryPolicy) { + auto establisher = CursorEstablisher(opCtx, executor, nss, allowPartialResults); + establisher.sendRequests(readPref, remotes, retryPolicy); + establisher.waitForResponses(); + establisher.checkForFailedRequests(); + return establisher.takeCursors(); } void killRemoteCursor(OperationContext* opCtx, diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp index 5091b44758b..99453cf1282 100644 --- a/src/mongo/s/query/establish_cursors_test.cpp +++ b/src/mongo/s/query/establish_cursors_test.cpp @@ -39,6 +39,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -169,12 +170,7 @@ TEST_F(EstablishCursorsTest, SingleRemoteInterruptedWhileCommandInFlight) { {kTestShardIds[0], cmdObj}, }; - // Hang before sending the command but after resolving the host to send it to. - auto fp = globalFailPointRegistry().find("hangBeforeSchedulingRemoteCommand"); - invariant(fp); - auto startCount = - fp->setMode(FailPoint::alwaysOn, 0, BSON("hostAndPort" << kTestShardHosts[0].toString())); - + auto barrier = std::make_shared<unittest::Barrier>(2); auto future = launchAsync([&] { ASSERT_THROWS(establishCursors(operationContext(), executor(), @@ -183,43 +179,29 @@ TEST_F(EstablishCursorsTest, SingleRemoteInterruptedWhileCommandInFlight) { remotes, false), // allowPartialResults ExceptionFor<ErrorCodes::CursorKilled>); + barrier->countDownAndWait(); }); - // Verify that the failpoint is hit. - fp->waitForTimesEntered(startCount + 1); - - // Now interrupt the opCtx which the cursor is running under. - { - stdx::lock_guard<Client> lk(*operationContext()->getClient()); - operationContext()->getServiceContext()->killOperation( - lk, operationContext(), ErrorCodes::CursorKilled); - } - - // Disable the failpoint to enable the ARS to continue. Once interrupted, it will then trigger a - // killOperations for the two remotes. - fp->setMode(FailPoint::off); - - // The OperationContext was marked as killed before the request was scheduled, however the exact - // timing of when the interrupt condition is detected is not deterministic in this test. - // However, since the failpoint is in a position where the remote hostAndPort is resolved, we - // are guaranteed to get a killOperation for it but we may first see the original remote - // request. - auto killOpSeen = false; onCommand([&](const RemoteCommandRequest& request) { - if (request.dbname == "admin" && request.cmdObj.hasField("_killOperations")) { - killOpSeen = true; - return BSON("ok" << 1); + ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); + + // Now that our "remote" has recieved the request, interrupt the opCtx which the cursor is + // running under. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->getServiceContext()->killOperation( + lk, operationContext(), ErrorCodes::CursorKilled); } - // Otherwise expect the original request and mock the response. - ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); + // Wait for the kill to take since there is a race between response and kill. + barrier->countDownAndWait(); + CursorResponse cursorResponse(_nss, CursorId(123), {}); return cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); }); - if (!killOpSeen) { - expectKillOperations(1); - } + // We were interrupted so establishCursors is forced to send a killOperations out of paranoia. + expectKillOperations(1); future.default_timed_get(); } |