diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2023-03-04 15:28:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-04 16:55:12 +0000 |
commit | ea61d9d8ca4bed703033782c7b4066d87c8fd8b4 (patch) | |
tree | 9deb4f64768098790582de3199d68a2847ac313f /src/mongo/executor | |
parent | fa66df57ed164c7ab87baee5367ef9d858b3487a (diff) | |
download | mongo-ea61d9d8ca4bed703033782c7b4066d87c8fd8b4.tar.gz |
SERVER-73016 Cancel hedged operations for `async_rpc` using `_killOperations`
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc_test_fixture.h | 4 | ||||
-rw-r--r-- | src/mongo/executor/hedged_async_rpc.h | 137 | ||||
-rw-r--r-- | src/mongo/executor/hedged_async_rpc_test.cpp | 159 |
4 files changed, 268 insertions, 33 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 68636313b57..d8b929cd37b 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -272,6 +272,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/commands/kill_common', '$BUILD_DIR/mongo/idl/generic_args_with_types_idl', '$BUILD_DIR/mongo/rpc/command_status', 'async_rpc_error_info', diff --git a/src/mongo/executor/async_rpc_test_fixture.h b/src/mongo/executor/async_rpc_test_fixture.h index 4b869a05ba3..5de2e5b303a 100644 --- a/src/mongo/executor/async_rpc_test_fixture.h +++ b/src/mongo/executor/async_rpc_test_fixture.h @@ -107,6 +107,10 @@ public: void tearDown() override { _networkTestEnv.reset(); + // We must shutdown and join the executor to ensure there are no tasks running in the + // background as we proceed with tearing down the test environment. + _executor->shutdown(); + _executor->join(); _executor.reset(); _net.reset(); ServiceContextTest::tearDown(); diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h index 417e609f3f8..f7862d6a0cf 100644 --- a/src/mongo/executor/hedged_async_rpc.h +++ b/src/mongo/executor/hedged_async_rpc.h @@ -33,6 +33,7 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" +#include "mongo/db/commands/kill_operations_gen.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/service_context.h" @@ -43,6 +44,7 @@ #include "mongo/executor/hedging_metrics.h" #include "mongo/executor/remote_command_response.h" #include "mongo/executor/task_executor.h" +#include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/mongos_server_parameters_gen.h" #include "mongo/util/assert_util.h" @@ -54,6 +56,8 @@ #include <memory> #include <vector> +#define MONGO_LOGV2_DEFAULT_COMPONENT mongo::logv2::LogComponent::kExecutor + namespace mongo { namespace async_rpc { @@ -103,6 +107,33 @@ HedgingMetrics* getHedgingMetrics(ServiceContext* svcCtx) { invariant(hm); return hm; } + +/** + * Schedules a remote `_killOperations` on `exec` (or `baton`) for all targets, aiming to kill any + * operations identified by `opKey`. + */ +void killOperations(ServiceContext* svcCtx, + std::vector<HostAndPort>& targets, + std::shared_ptr<executor::TaskExecutor> exec, + UUID opKey) { + KillOperationsRequest cmd({opKey}); + auto options = std::make_shared<AsyncRPCOptions<KillOperationsRequest>>( + std::move(cmd), + std::move(exec), + CancellationToken::uncancelable(), + std::make_shared<NeverRetryPolicy>(), + GenericArgs()); + for (const auto& target : targets) { + LOGV2_DEBUG(7301601, + 2, + "Sending killOperations to cancel the remote command", + "operationKey"_attr = opKey, + "target"_attr = target); + sendCommand(options, svcCtx, std::make_unique<FixedTargeter>(target)) + .ignoreValue() + .getAsync([](Status) {}); + } +} } // namespace hedging_rpc_details /** @@ -114,6 +145,8 @@ HedgingMetrics* getHedgingMetrics(ServiceContext* svcCtx) { * must be enabled, and multiple hosts must be provided by the targeter. If any of those conditions * is false, then the function will not hedge, and instead will just target the first host in the * vector provided by resolve. + * + * Accepts an optional UUID to be used as `clientOperationKey` for all remote requests. */ template <typename CommandType> SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( @@ -125,14 +158,42 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(), ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly), GenericArgs genericArgs = GenericArgs(), - BatonHandle baton = nullptr) { + BatonHandle baton = nullptr, + boost::optional<UUID> clientOperationKey = boost::none) { using SingleResponse = AsyncRPCResponse<typename CommandType::Reply>; + invariant(opCtx); + auto svcCtx = opCtx->getServiceContext(); + + if (MONGO_unlikely(clientOperationKey && !genericArgs.stable.getClientOperationKey())) { + genericArgs.stable.setClientOperationKey(*clientOperationKey); + } + // Set up cancellation token to cancel remaining hedged operations. CancellationSource hedgeCancellationToken{token}; auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>(); auto proxyExec = std::make_shared<detail::ProxyingExecutor>(baton, exec); - auto tryBody = [=, targeter = std::move(targeter)] { + auto tryBody = [=, targeter = std::move(targeter)]() mutable { + HedgeOptions opts = getHedgeOptions(CommandType::kCommandName, readPref); + auto operationKey = [&] { + // Check if the caller has provided an operation key, and hedging is not enabled. If so, + // we will attach the caller-provided key to all remote commands sent to resolved + // targets. Note that doing so may have side-effects if the operation is retried: + // cancelling the Nth attempt may impact the (N + 1)th attempt as they share `opKey`. + if (auto& opKey = genericArgs.stable.getClientOperationKey(); + opKey && !opts.isHedgeEnabled) { + return *opKey; + } + + // The caller has not provided an operation key or hedging is enabled, so we generate a + // new `clientOperationKey` for each attempt. The operationKey allows cancelling remote + // operations. A new one is generated here to ensure retry attempts are isolated: + // cancelling the Nth attempt does not impact the (N + 1)th attempt. + auto opKey = UUID::gen(); + genericArgs.stable.setClientOperationKey(opKey); + return opKey; + }(); + return targeter->resolve(token) .thenRunOn(proxyExec) .onError([](Status status) -> StatusWith<std::vector<HostAndPort>> { @@ -145,7 +206,6 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( "Successful targeting implies there are hosts to target."); *targetsAttempted = targets; - HedgeOptions opts = getHedgeOptions(CommandType::kCommandName, readPref); auto hm = hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext()); if (opts.isHedgeEnabled) { hm->incrementNumTotalOperations(); @@ -193,35 +253,56 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( * hedging or there is only 1 target provided. */ return hedging_rpc_details::whenAnyThat( - std::move(requests), [&](StatusWith<SingleResponse> response, size_t index) { - Status commandStatus = response.getStatus(); + std::move(requests), + [&](StatusWith<SingleResponse> response, size_t index) { + Status commandStatus = response.getStatus(); - if (index == 0) { - return true; - } + if (index == 0) { + return true; + } - if (commandStatus.code() == Status::OK()) { - hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext()) - ->incrementNumAdvantageouslyHedgedOperations(); - return true; - } + if (commandStatus.code() == Status::OK()) { + hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext()) + ->incrementNumAdvantageouslyHedgedOperations(); + return true; + } - // TODO SERVER-69592 Account for interior executor shutdown - invariant(commandStatus.code() == ErrorCodes::RemoteCommandExecutionError, - commandStatus.toString()); - boost::optional<Status> remoteErr; - auto extraInfo = commandStatus.extraInfo<AsyncRPCErrorInfo>(); - if (extraInfo->isRemote()) { - remoteErr = extraInfo->asRemote().getRemoteCommandResult(); - } + // TODO SERVER-69592 Account for interior executor shutdown + invariant(commandStatus.code() == + ErrorCodes::RemoteCommandExecutionError, + commandStatus.toString()); + boost::optional<Status> remoteErr; + auto extraInfo = commandStatus.extraInfo<AsyncRPCErrorInfo>(); + if (extraInfo->isRemote()) { + remoteErr = extraInfo->asRemote().getRemoteCommandResult(); + } - if (remoteErr && isIgnorableAsHedgeResult(*remoteErr)) { - return false; - } + if (remoteErr && isIgnorableAsHedgeResult(*remoteErr)) { + return false; + } - hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext()) - ->incrementNumAdvantageouslyHedgedOperations(); - return true; + hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext()) + ->incrementNumAdvantageouslyHedgedOperations(); + return true; + }) + .tap([=](const SingleResponse& response) { + // We received a successful response, so we should try to clean state on + // other hosts. Note that there is no guarantee the clean-up happens after + // the target receives the original command. + std::vector<HostAndPort> targets; + std::copy_if(targetsAttempted->begin(), + targetsAttempted->end(), + std::back_inserter(targets), + [&](HostAndPort& h) { return h != response.targetUsed; }); + hedging_rpc_details::killOperations(svcCtx, targets, exec, operationKey); + }) + .tapError([=](const Status&) { + // The hedged operation failed, so we should try to clean state on all + // targets. Note that this is just an attempt and does not guarantee no + // state is leaked, as the clean-up command may be received by a target + // before the original operaiton. + hedging_rpc_details::killOperations( + svcCtx, *targetsAttempted, exec, operationKey); }); }); }; @@ -266,3 +347,5 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( } // namespace async_rpc } // namespace mongo + +#undef MONGO_LOGV2_DEFAULT_COMPONENT diff --git a/src/mongo/executor/hedged_async_rpc_test.cpp b/src/mongo/executor/hedged_async_rpc_test.cpp index 287dd153c6a..de36a80b67c 100644 --- a/src/mongo/executor/hedged_async_rpc_test.cpp +++ b/src/mongo/executor/hedged_async_rpc_test.cpp @@ -51,6 +51,7 @@ #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" #include "mongo/s/mongos_server_parameters_gen.h" +#include "mongo/unittest/assert_that.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/bson_test_util.h" #include "mongo/unittest/unittest.h" @@ -112,7 +113,8 @@ public: std::vector<HostAndPort> hosts, std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(), GenericArgs genericArgs = GenericArgs(), - BatonHandle bh = nullptr) { + BatonHandle bh = nullptr, + boost::optional<UUID> opKey = boost::none) { // Use a readPreference that's elgible for hedging. ReadPreferenceSetting readPref(ReadPreference::Nearest); readPref.hedgingMode = HedgingMode(); @@ -134,7 +136,15 @@ public: retryPolicy, readPref, genericArgs, - bh); + bh, + opKey); + } + + void ignoreKillOperations() { + onCommand([](const auto& request) { + ASSERT(request.cmdObj["_killOperations"]); + return OkReply().toBSON(); + }); } const NamespaceString testNS = @@ -231,8 +241,6 @@ TEST_F(HedgedAsyncRPCTest, HelloHedgeRequestWithGenericArgs) { // to BSON. GenericArgsAPIV1 genericArgsApiV1; GenericArgsAPIV1Unstable genericArgsUnstable; - const UUID clientOpKey = UUID::gen(); - genericArgsApiV1.setClientOperationKey(clientOpKey); auto configTime = Timestamp(1, 1); genericArgsUnstable.setDollarConfigTime(configTime); @@ -245,7 +253,7 @@ TEST_F(HedgedAsyncRPCTest, HelloHedgeRequestWithGenericArgs) { onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); // Confirm that the generic arguments are present in the BSON command object. - ASSERT_EQ(UUID::fromCDR(request.cmdObj["clientOperationKey"].uuid()), clientOpKey); + ASSERT(request.cmdObj["clientOperationKey"]); ASSERT_EQ(request.cmdObj["$configTime"].timestamp(), configTime); return helloReply.toBSON(); }); @@ -331,10 +339,11 @@ TEST_F(HedgedAsyncRPCTest, HedgedAsyncRPCWithRetryPolicy) { // condition for the runner to stop retrying. for (auto i = 0; i <= maxNumRetries; i++) { scheduleRequestAndAdvanceClockForRetry(testPolicy, onCommandFunc, retryDelay); + ignoreKillOperations(); } auto counters = getNetworkInterfaceCounters(); - ASSERT_EQ(counters.succeeded, 4); + ASSERT_EQ(counters.succeeded, 8); // counting both `hello` and `killOperations` ASSERT_EQ(counters.canceled, 0); AsyncRPCResponse<HelloCommandReply> res = resultFuture.get(); @@ -903,6 +912,7 @@ TEST_F(HedgedAsyncRPCTest, DynamicDelayBetweenRetries) { // Advance the clock appropriately based on each retry delay. for (auto i = 0; i < maxNumRetries; i++) { scheduleRequestAndAdvanceClockForRetry(testPolicy, onCommandFunc, retryDelays[i]); + ignoreKillOperations(); } // Schedule a response to the final retry. No need to advance clock since no more // retries should be attemped after this third one. @@ -1060,6 +1070,143 @@ TEST_F(HedgedAsyncRPCTest, BatonShutdownExecutorAlive) { ASSERT_EQ(0, retryPolicy->getNumRetriesPerformed()); } + +auto extractUUID(const BSONElement& element) { + return UUID::fromCDR(element.uuid()); +} + +auto getOpKeyFromCommand(const BSONObj& cmdObj) { + return extractUUID(cmdObj["clientOperationKey"]); +} + +TEST_F(HedgedAsyncRPCTest, OperationKeyIsSetByDefault) { + auto future = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts); + ASSERT_DOES_NOT_THROW([&] { + onCommand([&](const auto& request) { + (void)getOpKeyFromCommand(request.cmdObj); + return CursorResponse(testNS, 0LL, {testFirstBatch}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + }()); + auto network = getNetworkInterfaceMock(); + network->enterNetwork(); + network->runReadyNetworkOperations(); + network->exitNetwork(); + + future.get(); +} + +TEST_F(HedgedAsyncRPCTest, UseOperationKeyWhenProvided) { + const auto opKey = UUID::gen(); + auto future = + sendHedgedCommandWithHosts(write_ops::InsertCommandRequest(testNS, {BSON("id" << 1)}), + kTwoHosts, + std::make_shared<NeverRetryPolicy>(), + GenericArgs(), + nullptr, + opKey); + onCommand([&](const auto& request) { + ASSERT_EQ(getOpKeyFromCommand(request.cmdObj), opKey); + return write_ops::InsertCommandReply().toBSON(); + }); + future.get(); +} + +TEST_F(HedgedAsyncRPCTest, RewriteOperationKeyWhenHedging) { + const auto opKey = UUID::gen(); + auto future = sendHedgedCommandWithHosts(testFindCmd, + kTwoHosts, + std::make_shared<NeverRetryPolicy>(), + GenericArgs(), + nullptr, + opKey); + onCommand([&](const auto& request) { + ASSERT_NE(getOpKeyFromCommand(request.cmdObj), opKey); + return CursorResponse(testNS, 0LL, {testFirstBatch}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + future.get(); +} + +TEST_F(HedgedAsyncRPCTest, KillOpsAfterSuccessfulHedgedOperation) { + // We expect all targets to receive a `killOperations`, except for the one used to fulfill the + // hedged operation. + auto future = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts); + + boost::optional<UUID> opKey; + HostAndPort success; // Host that successfully responded to the hedged request. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["find"]); + success = request.target; + opKey = getOpKeyFromCommand(request.cmdObj); + return CursorResponse(testNS, 0LL, {testFirstBatch}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + ASSERT_TRUE(!!opKey); + + ASSERT_DOES_NOT_THROW(std::move(future).ignoreValue().get();); + + HostAndPort killed; // Host that receives a `killOperations`. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["_killOperations"]); + killed = request.target; + ASSERT_EQ(extractUUID(request.cmdObj["operationKeys"].Array()[0]), *opKey); + return OkReply().toBSON(); + }); + + // We expect only one `killOperations` to be sent, which should be targeted to the host that did + // not fulfill the hedged operation. + ASSERT_FALSE([&] { + auto network = getNetworkInterfaceMock(); + NetworkInterfaceMock::InNetworkGuard guard(network); + return network->hasReadyRequests(); + }()); + ASSERT_NOT_EQUALS(success, killed); +} + +TEST_F(HedgedAsyncRPCTest, KillOpsAfterFailedHedgedOperation) { + // We expect all targets to receive a `killOperations`. + auto future = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts); + auto network = getNetworkInterfaceMock(); + + auto failCommandAndReturnOpKey = [&]() -> UUID { + auto it = network->getNextReadyRequest(); + const auto& request = it->getRequestOnAny(); + ASSERT(request.cmdObj["find"]); + auto opKey = getOpKeyFromCommand(request.cmdObj); + RemoteCommandResponse rcr(createErrorResponse(ignorableMaxTimeMSExpiredStatus), + Milliseconds(1)); // Arbitrary duration + network->scheduleResponse(it, network->now(), rcr); + return opKey; + }; + + // Fail both operations with an ignorable error response. + const auto opKey = [&] { + NetworkInterfaceMock::InNetworkGuard guard(network); + const auto opKey1 = failCommandAndReturnOpKey(); + const auto opKey2 = failCommandAndReturnOpKey(); + ASSERT_EQ(opKey1, opKey2); + network->runReadyNetworkOperations(); + return opKey1; + }(); + + ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::RemoteCommandExecutionError); + + // Verify that all targets receive a `killOperations`. + std::vector<HostAndPort> attemptedTargets; + for (auto _ : kTwoHosts) { + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["_killOperations"]); + attemptedTargets.push_back(request.target); + ASSERT_EQ(extractUUID(request.cmdObj["operationKeys"].Array()[0]), opKey); + return OkReply().toBSON(); + }); + } + + auto matcher = m::AnyOf(m::ElementsAre(m::Eq(kTwoHosts[0]), m::Eq(kTwoHosts[1])), + m::ElementsAre(m::Eq(kTwoHosts[1]), m::Eq(kTwoHosts[0]))); + ASSERT_THAT(attemptedTargets, matcher); +} } // namespace } // namespace async_rpc } // namespace mongo |