summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2023-03-04 15:28:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-04 16:55:12 +0000
commitea61d9d8ca4bed703033782c7b4066d87c8fd8b4 (patch)
tree9deb4f64768098790582de3199d68a2847ac313f /src/mongo/executor
parentfa66df57ed164c7ab87baee5367ef9d858b3487a (diff)
downloadmongo-ea61d9d8ca4bed703033782c7b4066d87c8fd8b4.tar.gz
SERVER-73016 Cancel hedged operations for `async_rpc` using `_killOperations`
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/SConscript1
-rw-r--r--src/mongo/executor/async_rpc_test_fixture.h4
-rw-r--r--src/mongo/executor/hedged_async_rpc.h137
-rw-r--r--src/mongo/executor/hedged_async_rpc_test.cpp159
4 files changed, 268 insertions, 33 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 68636313b57..d8b929cd37b 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -272,6 +272,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/commands/kill_common',
'$BUILD_DIR/mongo/idl/generic_args_with_types_idl',
'$BUILD_DIR/mongo/rpc/command_status',
'async_rpc_error_info',
diff --git a/src/mongo/executor/async_rpc_test_fixture.h b/src/mongo/executor/async_rpc_test_fixture.h
index 4b869a05ba3..5de2e5b303a 100644
--- a/src/mongo/executor/async_rpc_test_fixture.h
+++ b/src/mongo/executor/async_rpc_test_fixture.h
@@ -107,6 +107,10 @@ public:
void tearDown() override {
_networkTestEnv.reset();
+ // We must shutdown and join the executor to ensure there are no tasks running in the
+ // background as we proceed with tearing down the test environment.
+ _executor->shutdown();
+ _executor->join();
_executor.reset();
_net.reset();
ServiceContextTest::tearDown();
diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h
index 417e609f3f8..f7862d6a0cf 100644
--- a/src/mongo/executor/hedged_async_rpc.h
+++ b/src/mongo/executor/hedged_async_rpc.h
@@ -33,6 +33,7 @@
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/commands/kill_operations_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/service_context.h"
@@ -43,6 +44,7 @@
#include "mongo/executor/hedging_metrics.h"
#include "mongo/executor/remote_command_response.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/mongos_server_parameters_gen.h"
#include "mongo/util/assert_util.h"
@@ -54,6 +56,8 @@
#include <memory>
#include <vector>
+#define MONGO_LOGV2_DEFAULT_COMPONENT mongo::logv2::LogComponent::kExecutor
+
namespace mongo {
namespace async_rpc {
@@ -103,6 +107,33 @@ HedgingMetrics* getHedgingMetrics(ServiceContext* svcCtx) {
invariant(hm);
return hm;
}
+
+/**
+ * Schedules a remote `_killOperations` on `exec` (or `baton`) for all targets, aiming to kill any
+ * operations identified by `opKey`.
+ */
+void killOperations(ServiceContext* svcCtx,
+ std::vector<HostAndPort>& targets,
+ std::shared_ptr<executor::TaskExecutor> exec,
+ UUID opKey) {
+ KillOperationsRequest cmd({opKey});
+ auto options = std::make_shared<AsyncRPCOptions<KillOperationsRequest>>(
+ std::move(cmd),
+ std::move(exec),
+ CancellationToken::uncancelable(),
+ std::make_shared<NeverRetryPolicy>(),
+ GenericArgs());
+ for (const auto& target : targets) {
+ LOGV2_DEBUG(7301601,
+ 2,
+ "Sending killOperations to cancel the remote command",
+ "operationKey"_attr = opKey,
+ "target"_attr = target);
+ sendCommand(options, svcCtx, std::make_unique<FixedTargeter>(target))
+ .ignoreValue()
+ .getAsync([](Status) {});
+ }
+}
} // namespace hedging_rpc_details
/**
@@ -114,6 +145,8 @@ HedgingMetrics* getHedgingMetrics(ServiceContext* svcCtx) {
* must be enabled, and multiple hosts must be provided by the targeter. If any of those conditions
* is false, then the function will not hedge, and instead will just target the first host in the
* vector provided by resolve.
+ *
+ * Accepts an optional UUID to be used as `clientOperationKey` for all remote requests.
*/
template <typename CommandType>
SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
@@ -125,14 +158,42 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(),
ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly),
GenericArgs genericArgs = GenericArgs(),
- BatonHandle baton = nullptr) {
+ BatonHandle baton = nullptr,
+ boost::optional<UUID> clientOperationKey = boost::none) {
using SingleResponse = AsyncRPCResponse<typename CommandType::Reply>;
+ invariant(opCtx);
+ auto svcCtx = opCtx->getServiceContext();
+
+ if (MONGO_unlikely(clientOperationKey && !genericArgs.stable.getClientOperationKey())) {
+ genericArgs.stable.setClientOperationKey(*clientOperationKey);
+ }
+
// Set up cancellation token to cancel remaining hedged operations.
CancellationSource hedgeCancellationToken{token};
auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>();
auto proxyExec = std::make_shared<detail::ProxyingExecutor>(baton, exec);
- auto tryBody = [=, targeter = std::move(targeter)] {
+ auto tryBody = [=, targeter = std::move(targeter)]() mutable {
+ HedgeOptions opts = getHedgeOptions(CommandType::kCommandName, readPref);
+ auto operationKey = [&] {
+ // Check if the caller has provided an operation key, and hedging is not enabled. If so,
+ // we will attach the caller-provided key to all remote commands sent to resolved
+ // targets. Note that doing so may have side-effects if the operation is retried:
+ // cancelling the Nth attempt may impact the (N + 1)th attempt as they share `opKey`.
+ if (auto& opKey = genericArgs.stable.getClientOperationKey();
+ opKey && !opts.isHedgeEnabled) {
+ return *opKey;
+ }
+
+ // The caller has not provided an operation key or hedging is enabled, so we generate a
+ // new `clientOperationKey` for each attempt. The operationKey allows cancelling remote
+ // operations. A new one is generated here to ensure retry attempts are isolated:
+ // cancelling the Nth attempt does not impact the (N + 1)th attempt.
+ auto opKey = UUID::gen();
+ genericArgs.stable.setClientOperationKey(opKey);
+ return opKey;
+ }();
+
return targeter->resolve(token)
.thenRunOn(proxyExec)
.onError([](Status status) -> StatusWith<std::vector<HostAndPort>> {
@@ -145,7 +206,6 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
"Successful targeting implies there are hosts to target.");
*targetsAttempted = targets;
- HedgeOptions opts = getHedgeOptions(CommandType::kCommandName, readPref);
auto hm = hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext());
if (opts.isHedgeEnabled) {
hm->incrementNumTotalOperations();
@@ -193,35 +253,56 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
* hedging or there is only 1 target provided.
*/
return hedging_rpc_details::whenAnyThat(
- std::move(requests), [&](StatusWith<SingleResponse> response, size_t index) {
- Status commandStatus = response.getStatus();
+ std::move(requests),
+ [&](StatusWith<SingleResponse> response, size_t index) {
+ Status commandStatus = response.getStatus();
- if (index == 0) {
- return true;
- }
+ if (index == 0) {
+ return true;
+ }
- if (commandStatus.code() == Status::OK()) {
- hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext())
- ->incrementNumAdvantageouslyHedgedOperations();
- return true;
- }
+ if (commandStatus.code() == Status::OK()) {
+ hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext())
+ ->incrementNumAdvantageouslyHedgedOperations();
+ return true;
+ }
- // TODO SERVER-69592 Account for interior executor shutdown
- invariant(commandStatus.code() == ErrorCodes::RemoteCommandExecutionError,
- commandStatus.toString());
- boost::optional<Status> remoteErr;
- auto extraInfo = commandStatus.extraInfo<AsyncRPCErrorInfo>();
- if (extraInfo->isRemote()) {
- remoteErr = extraInfo->asRemote().getRemoteCommandResult();
- }
+ // TODO SERVER-69592 Account for interior executor shutdown
+ invariant(commandStatus.code() ==
+ ErrorCodes::RemoteCommandExecutionError,
+ commandStatus.toString());
+ boost::optional<Status> remoteErr;
+ auto extraInfo = commandStatus.extraInfo<AsyncRPCErrorInfo>();
+ if (extraInfo->isRemote()) {
+ remoteErr = extraInfo->asRemote().getRemoteCommandResult();
+ }
- if (remoteErr && isIgnorableAsHedgeResult(*remoteErr)) {
- return false;
- }
+ if (remoteErr && isIgnorableAsHedgeResult(*remoteErr)) {
+ return false;
+ }
- hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext())
- ->incrementNumAdvantageouslyHedgedOperations();
- return true;
+ hedging_rpc_details::getHedgingMetrics(getGlobalServiceContext())
+ ->incrementNumAdvantageouslyHedgedOperations();
+ return true;
+ })
+ .tap([=](const SingleResponse& response) {
+ // We received a successful response, so we should try to clean state on
+ // other hosts. Note that there is no guarantee the clean-up happens after
+ // the target receives the original command.
+ std::vector<HostAndPort> targets;
+ std::copy_if(targetsAttempted->begin(),
+ targetsAttempted->end(),
+ std::back_inserter(targets),
+ [&](HostAndPort& h) { return h != response.targetUsed; });
+ hedging_rpc_details::killOperations(svcCtx, targets, exec, operationKey);
+ })
+ .tapError([=](const Status&) {
+ // The hedged operation failed, so we should try to clean state on all
+ // targets. Note that this is just an attempt and does not guarantee no
+ // state is leaked, as the clean-up command may be received by a target
+ // before the original operaiton.
+ hedging_rpc_details::killOperations(
+ svcCtx, *targetsAttempted, exec, operationKey);
});
});
};
@@ -266,3 +347,5 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
} // namespace async_rpc
} // namespace mongo
+
+#undef MONGO_LOGV2_DEFAULT_COMPONENT
diff --git a/src/mongo/executor/hedged_async_rpc_test.cpp b/src/mongo/executor/hedged_async_rpc_test.cpp
index 287dd153c6a..de36a80b67c 100644
--- a/src/mongo/executor/hedged_async_rpc_test.cpp
+++ b/src/mongo/executor/hedged_async_rpc_test.cpp
@@ -51,6 +51,7 @@
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
#include "mongo/s/mongos_server_parameters_gen.h"
+#include "mongo/unittest/assert_that.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/bson_test_util.h"
#include "mongo/unittest/unittest.h"
@@ -112,7 +113,8 @@ public:
std::vector<HostAndPort> hosts,
std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(),
GenericArgs genericArgs = GenericArgs(),
- BatonHandle bh = nullptr) {
+ BatonHandle bh = nullptr,
+ boost::optional<UUID> opKey = boost::none) {
// Use a readPreference that's elgible for hedging.
ReadPreferenceSetting readPref(ReadPreference::Nearest);
readPref.hedgingMode = HedgingMode();
@@ -134,7 +136,15 @@ public:
retryPolicy,
readPref,
genericArgs,
- bh);
+ bh,
+ opKey);
+ }
+
+ void ignoreKillOperations() {
+ onCommand([](const auto& request) {
+ ASSERT(request.cmdObj["_killOperations"]);
+ return OkReply().toBSON();
+ });
}
const NamespaceString testNS =
@@ -231,8 +241,6 @@ TEST_F(HedgedAsyncRPCTest, HelloHedgeRequestWithGenericArgs) {
// to BSON.
GenericArgsAPIV1 genericArgsApiV1;
GenericArgsAPIV1Unstable genericArgsUnstable;
- const UUID clientOpKey = UUID::gen();
- genericArgsApiV1.setClientOperationKey(clientOpKey);
auto configTime = Timestamp(1, 1);
genericArgsUnstable.setDollarConfigTime(configTime);
@@ -245,7 +253,7 @@ TEST_F(HedgedAsyncRPCTest, HelloHedgeRequestWithGenericArgs) {
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
// Confirm that the generic arguments are present in the BSON command object.
- ASSERT_EQ(UUID::fromCDR(request.cmdObj["clientOperationKey"].uuid()), clientOpKey);
+ ASSERT(request.cmdObj["clientOperationKey"]);
ASSERT_EQ(request.cmdObj["$configTime"].timestamp(), configTime);
return helloReply.toBSON();
});
@@ -331,10 +339,11 @@ TEST_F(HedgedAsyncRPCTest, HedgedAsyncRPCWithRetryPolicy) {
// condition for the runner to stop retrying.
for (auto i = 0; i <= maxNumRetries; i++) {
scheduleRequestAndAdvanceClockForRetry(testPolicy, onCommandFunc, retryDelay);
+ ignoreKillOperations();
}
auto counters = getNetworkInterfaceCounters();
- ASSERT_EQ(counters.succeeded, 4);
+ ASSERT_EQ(counters.succeeded, 8); // counting both `hello` and `killOperations`
ASSERT_EQ(counters.canceled, 0);
AsyncRPCResponse<HelloCommandReply> res = resultFuture.get();
@@ -903,6 +912,7 @@ TEST_F(HedgedAsyncRPCTest, DynamicDelayBetweenRetries) {
// Advance the clock appropriately based on each retry delay.
for (auto i = 0; i < maxNumRetries; i++) {
scheduleRequestAndAdvanceClockForRetry(testPolicy, onCommandFunc, retryDelays[i]);
+ ignoreKillOperations();
}
// Schedule a response to the final retry. No need to advance clock since no more
// retries should be attemped after this third one.
@@ -1060,6 +1070,143 @@ TEST_F(HedgedAsyncRPCTest, BatonShutdownExecutorAlive) {
ASSERT_EQ(0, retryPolicy->getNumRetriesPerformed());
}
+
+auto extractUUID(const BSONElement& element) {
+ return UUID::fromCDR(element.uuid());
+}
+
+auto getOpKeyFromCommand(const BSONObj& cmdObj) {
+ return extractUUID(cmdObj["clientOperationKey"]);
+}
+
+TEST_F(HedgedAsyncRPCTest, OperationKeyIsSetByDefault) {
+ auto future = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts);
+ ASSERT_DOES_NOT_THROW([&] {
+ onCommand([&](const auto& request) {
+ (void)getOpKeyFromCommand(request.cmdObj);
+ return CursorResponse(testNS, 0LL, {testFirstBatch})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ }());
+ auto network = getNetworkInterfaceMock();
+ network->enterNetwork();
+ network->runReadyNetworkOperations();
+ network->exitNetwork();
+
+ future.get();
+}
+
+TEST_F(HedgedAsyncRPCTest, UseOperationKeyWhenProvided) {
+ const auto opKey = UUID::gen();
+ auto future =
+ sendHedgedCommandWithHosts(write_ops::InsertCommandRequest(testNS, {BSON("id" << 1)}),
+ kTwoHosts,
+ std::make_shared<NeverRetryPolicy>(),
+ GenericArgs(),
+ nullptr,
+ opKey);
+ onCommand([&](const auto& request) {
+ ASSERT_EQ(getOpKeyFromCommand(request.cmdObj), opKey);
+ return write_ops::InsertCommandReply().toBSON();
+ });
+ future.get();
+}
+
+TEST_F(HedgedAsyncRPCTest, RewriteOperationKeyWhenHedging) {
+ const auto opKey = UUID::gen();
+ auto future = sendHedgedCommandWithHosts(testFindCmd,
+ kTwoHosts,
+ std::make_shared<NeverRetryPolicy>(),
+ GenericArgs(),
+ nullptr,
+ opKey);
+ onCommand([&](const auto& request) {
+ ASSERT_NE(getOpKeyFromCommand(request.cmdObj), opKey);
+ return CursorResponse(testNS, 0LL, {testFirstBatch})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ future.get();
+}
+
+TEST_F(HedgedAsyncRPCTest, KillOpsAfterSuccessfulHedgedOperation) {
+ // We expect all targets to receive a `killOperations`, except for the one used to fulfill the
+ // hedged operation.
+ auto future = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts);
+
+ boost::optional<UUID> opKey;
+ HostAndPort success; // Host that successfully responded to the hedged request.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["find"]);
+ success = request.target;
+ opKey = getOpKeyFromCommand(request.cmdObj);
+ return CursorResponse(testNS, 0LL, {testFirstBatch})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+ ASSERT_TRUE(!!opKey);
+
+ ASSERT_DOES_NOT_THROW(std::move(future).ignoreValue().get(););
+
+ HostAndPort killed; // Host that receives a `killOperations`.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["_killOperations"]);
+ killed = request.target;
+ ASSERT_EQ(extractUUID(request.cmdObj["operationKeys"].Array()[0]), *opKey);
+ return OkReply().toBSON();
+ });
+
+ // We expect only one `killOperations` to be sent, which should be targeted to the host that did
+ // not fulfill the hedged operation.
+ ASSERT_FALSE([&] {
+ auto network = getNetworkInterfaceMock();
+ NetworkInterfaceMock::InNetworkGuard guard(network);
+ return network->hasReadyRequests();
+ }());
+ ASSERT_NOT_EQUALS(success, killed);
+}
+
+TEST_F(HedgedAsyncRPCTest, KillOpsAfterFailedHedgedOperation) {
+ // We expect all targets to receive a `killOperations`.
+ auto future = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts);
+ auto network = getNetworkInterfaceMock();
+
+ auto failCommandAndReturnOpKey = [&]() -> UUID {
+ auto it = network->getNextReadyRequest();
+ const auto& request = it->getRequestOnAny();
+ ASSERT(request.cmdObj["find"]);
+ auto opKey = getOpKeyFromCommand(request.cmdObj);
+ RemoteCommandResponse rcr(createErrorResponse(ignorableMaxTimeMSExpiredStatus),
+ Milliseconds(1)); // Arbitrary duration
+ network->scheduleResponse(it, network->now(), rcr);
+ return opKey;
+ };
+
+ // Fail both operations with an ignorable error response.
+ const auto opKey = [&] {
+ NetworkInterfaceMock::InNetworkGuard guard(network);
+ const auto opKey1 = failCommandAndReturnOpKey();
+ const auto opKey2 = failCommandAndReturnOpKey();
+ ASSERT_EQ(opKey1, opKey2);
+ network->runReadyNetworkOperations();
+ return opKey1;
+ }();
+
+ ASSERT_THROWS_CODE(future.get(), DBException, ErrorCodes::RemoteCommandExecutionError);
+
+ // Verify that all targets receive a `killOperations`.
+ std::vector<HostAndPort> attemptedTargets;
+ for (auto _ : kTwoHosts) {
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["_killOperations"]);
+ attemptedTargets.push_back(request.target);
+ ASSERT_EQ(extractUUID(request.cmdObj["operationKeys"].Array()[0]), opKey);
+ return OkReply().toBSON();
+ });
+ }
+
+ auto matcher = m::AnyOf(m::ElementsAre(m::Eq(kTwoHosts[0]), m::Eq(kTwoHosts[1])),
+ m::ElementsAre(m::Eq(kTwoHosts[1]), m::Eq(kTwoHosts[0])));
+ ASSERT_THAT(attemptedTargets, matcher);
+}
} // namespace
} // namespace async_rpc
} // namespace mongo