summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2022-11-14 22:46:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-14 23:26:24 +0000
commitc8c87a09e3b07543e7422172cacdb95257c497b3 (patch)
tree2b51cde4c2bbaacb603a41c72b22d6dc5c73f4d9
parent682d0882c0c8b570f5c8cf247af2b806bd1a7299 (diff)
downloadmongo-c8c87a09e3b07543e7422172cacdb95257c497b3.tar.gz
SERVER-71192 Rewrite targeting and executor shutdown errors for hedged async rpc
-rw-r--r--src/mongo/executor/async_rpc.cpp4
-rw-r--r--src/mongo/executor/async_rpc_targeter.h4
-rw-r--r--src/mongo/executor/async_rpc_test.cpp25
-rw-r--r--src/mongo/executor/async_rpc_test_fixture.h22
-rw-r--r--src/mongo/executor/hedged_async_rpc.h38
-rw-r--r--src/mongo/executor/hedged_async_rpc_test.cpp44
6 files changed, 123 insertions, 14 deletions
diff --git a/src/mongo/executor/async_rpc.cpp b/src/mongo/executor/async_rpc.cpp
index f2c591e30d6..3e0a4e81591 100644
--- a/src/mongo/executor/async_rpc.cpp
+++ b/src/mongo/executor/async_rpc.cpp
@@ -58,8 +58,8 @@ public:
.thenRunOn(exec)
.then([dbName, cmdBSON, opCtx, exec = std::move(exec), token](
std::vector<HostAndPort> targets) {
- uassert(ErrorCodes::HostNotFound, "No hosts availables", targets.size() != 0);
-
+ invariant(targets.size(),
+ "Successful targeting implies there are hosts to target.");
executor::RemoteCommandRequestOnAny executorRequest(
targets, dbName.toString(), cmdBSON, rpc::makeEmptyMetadata(), opCtx);
return exec->scheduleRemoteCommandOnAny(executorRequest, token);
diff --git a/src/mongo/executor/async_rpc_targeter.h b/src/mongo/executor/async_rpc_targeter.h
index 8d56ddf1fec..eadac59d168 100644
--- a/src/mongo/executor/async_rpc_targeter.h
+++ b/src/mongo/executor/async_rpc_targeter.h
@@ -51,7 +51,9 @@ public:
/*
* Returns a collection of possible Hosts on which the command may run based on the specific
- * settings (ReadPreference, etc.) of the targeter.
+ * settings (ReadPreference, etc.) of the targeter. Note that if no targets can be found on
+ * which to run the command, the returned future should be set with an error - an empty vector
+ * should never be returned and is treated as a programmer error.
*/
virtual SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) = 0;
diff --git a/src/mongo/executor/async_rpc_test.cpp b/src/mongo/executor/async_rpc_test.cpp
index 219612587ed..d325f9dd75e 100644
--- a/src/mongo/executor/async_rpc_test.cpp
+++ b/src/mongo/executor/async_rpc_test.cpp
@@ -468,6 +468,31 @@ TEST_F(AsyncRPCTestFixture, ParseAndSeralizeNoop) {
"RemoteCommandExectionError illegally parsed from bson");
}
+/**
+ * When the targeter returns an error, ensure we rewrite it correctly.
+ */
+TEST_F(AsyncRPCTestFixture, FailedTargeting) {
+ auto targeterFailStatus = Status{ErrorCodes::InternalError, "Fake targeter failure"};
+ auto targeter = std::make_unique<FailingTargeter>(targeterFailStatus);
+ HelloCommand helloCmd;
+ initializeCommand(helloCmd);
+ auto opCtxHolder = makeOperationContext();
+
+ auto resultFuture = sendCommand(
+ helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+
+ auto error = resultFuture.getNoThrow().getStatus();
+ // The error returned by our API should always be RemoteCommandExecutionError
+ ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
+ // Make sure we can extract the extra error info
+ auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
+ ASSERT(extraInfo);
+ // Make sure the extra info indicates the error was local, and that the
+ // local error (which is just a Status) has the correct code.
+ ASSERT(extraInfo->isLocal());
+ ASSERT_EQ(extraInfo->asLocal(), targeterFailStatus);
+}
+
} // namespace
} // namespace async_rpc
} // namespace mongo
diff --git a/src/mongo/executor/async_rpc_test_fixture.h b/src/mongo/executor/async_rpc_test_fixture.h
index 764f954fd5a..58e79f11a92 100644
--- a/src/mongo/executor/async_rpc_test_fixture.h
+++ b/src/mongo/executor/async_rpc_test_fixture.h
@@ -166,4 +166,26 @@ private:
std::shared_ptr<TaskExecutor> _executor;
std::shared_ptr<NetworkInterfaceMock> _net;
};
+
+/**
+ * Targeter for use in tests. Returns a user-configurable error when asked to resolve
+ * targets; the error returned can be set when constructing this type.
+ */
+class FailingTargeter : public Targeter {
+public:
+ FailingTargeter(Status errorToFailWith) : _status{errorToFailWith} {}
+ SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) override final {
+ return _status;
+ }
+
+ SemiFuture<void> onRemoteCommandError(HostAndPort h, Status s) override final {
+ return SemiFuture<void>::makeReady();
+ }
+
+ Status getErrorStatus() {
+ return _status;
+ }
+
+ Status _status;
+};
} // namespace mongo::async_rpc
diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h
index e1e45b99cb2..7226d96d352 100644
--- a/src/mongo/executor/hedged_async_rpc.h
+++ b/src/mongo/executor/hedged_async_rpc.h
@@ -121,10 +121,17 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
// Set up cancellation token to cancel remaining hedged operations.
CancellationSource hedgeCancellationToken{token};
auto tryBody = [=, targeter = std::move(targeter)] {
- return targeter->resolve(token).thenRunOn(exec).then(
- [cmd, opCtx, exec, token, hedgeCancellationToken, readPref](
- std::vector<HostAndPort> targets) {
- uassert(ErrorCodes::HostNotFound, "No hosts available.", targets.size() != 0);
+ return targeter->resolve(token)
+ .thenRunOn(exec)
+ .onError([](Status status) -> StatusWith<std::vector<HostAndPort>> {
+ // Targeting error; rewrite it to a RemoteCommandExecutionError and skip
+ // command execution body. We'll retry if the policy indicates to.
+ return Status{AsyncRPCErrorInfo(status), status.reason()};
+ })
+ .then([cmd, opCtx, exec, token, hedgeCancellationToken, readPref](
+ std::vector<HostAndPort> targets) {
+ invariant(targets.size(),
+ "Successful targeting implies there are hosts to target.");
HedgeOptions opts = getHedgeOptions(CommandType::kCommandName, readPref);
@@ -180,8 +187,29 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
})
.withBackoffBetweenIterations(detail::RetryDelayAsBackoff(retryPolicy.get()))
.on(exec, CancellationToken::uncancelable())
- .onCompletion([hedgeCancellationToken](StatusWith<SingleResponse> result) mutable {
+ // We go inline here to intercept executor-shutdown errors and re-write them
+ // so that the API always returns RemoteCommandExecutionError. Additionally,
+ // we need to make sure we cancel outstanding requests.
+ .unsafeToInlineFuture()
+ .onCompletion([hedgeCancellationToken](
+ StatusWith<SingleResponse> result) mutable -> StatusWith<SingleResponse> {
hedgeCancellationToken.cancel();
+ if (!result.isOK()) {
+ auto status = result.getStatus();
+ if (status.code() == ErrorCodes::RemoteCommandExecutionError) {
+ return status;
+ }
+ // The API implementation guarantees that all errors are provided as
+ // RemoteCommandExecutionError, so if we've reached this code, it means that the API
+ // internals were unable to run due to executor shutdown. Today, the only guarantee
+ // we can make about an executor-shutdown error is that it is in the cancellation
+ // category. We dassert that this is the case to make it easy to find errors in the
+ // API implementation's error-handling while still ensuring that we always return
+ // the correct error code in production.
+ dassert(ErrorCodes::isA<ErrorCategory::CancellationError>(status.code()));
+ return Status{AsyncRPCErrorInfo(status),
+ "Remote command execution failed due to executor shutdown"};
+ }
return result;
})
.semi();
diff --git a/src/mongo/executor/hedged_async_rpc_test.cpp b/src/mongo/executor/hedged_async_rpc_test.cpp
index cc39f7830d3..88362165127 100644
--- a/src/mongo/executor/hedged_async_rpc_test.cpp
+++ b/src/mongo/executor/hedged_async_rpc_test.cpp
@@ -65,7 +65,6 @@ using executor::RemoteCommandResponse;
class HedgedAsyncRPCTest : public AsyncRPCTestFixture {
public:
- const std::vector<HostAndPort> kEmptyHosts{};
const std::vector<HostAndPort> kTwoHosts{HostAndPort("FakeHost1", 12345),
HostAndPort("FakeHost2", 12345)};
using TwoHostCallback =
@@ -242,15 +241,48 @@ TEST_F(HedgedAsyncRPCTest, HedgedAsyncRPCWithRetryPolicy) {
}
/**
- * When the targeter returns no hosts, we get a HostNotFound error.
+ * When the targeter returns an error, ensure we rewrite it correctly.
*/
-TEST_F(HedgedAsyncRPCTest, NoShardsFound) {
- HelloCommandReply helloReply = HelloCommandReply(TopologyVersion(OID::gen(), 0));
+TEST_F(HedgedAsyncRPCTest, FailedTargeting) {
HelloCommand helloCmd;
initializeCommand(helloCmd);
- auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kEmptyHosts);
+ auto opCtx = makeOperationContext();
+ auto targeterFailStatus = Status{ErrorCodes::InternalError, "Fake targeter failure"};
+ auto targeter = std::make_unique<FailingTargeter>(targeterFailStatus);
+
+ auto resultFuture = sendHedgedCommand(helloCmd,
+ opCtx.get(),
+ std::move(targeter),
+ getExecutorPtr(),
+ CancellationToken::uncancelable());
+
+ auto error = resultFuture.getNoThrow().getStatus();
+ ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
+
+ auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
+ ASSERT(extraInfo);
+
+ ASSERT(extraInfo->isLocal());
+ auto localError = extraInfo->asLocal();
+ ASSERT_EQ(localError, targeterFailStatus);
+}
- ASSERT_THROWS_CODE(resultFuture.get(), DBException, ErrorCodes::HostNotFound);
+// Ensure that the sendHedgedCommand correctly returns RemoteCommandExecutionError when the executor
+// is shutdown mid-remote-invocation, and that the executor shutdown error is contained
+// in the error's ExtraInfo.
+TEST_F(HedgedAsyncRPCTest, ExecutorShutdown) {
+ auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts);
+ getExecutorPtr()->shutdown();
+ auto error = resultFuture.getNoThrow().getStatus();
+ // The error returned by our API should always be RemoteCommandExecutionError
+ ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
+ // Make sure we can extract the extra error info
+ auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
+ ASSERT(extraInfo);
+ // Make sure the extra info indicates the error was local, and that the
+ // local error (which is just a Status) has the correct code.
+ ASSERT(extraInfo->isLocal());
+ ASSERT(ErrorCodes::isA<ErrorCategory::CancellationError>(extraInfo->asLocal()));
}
/**