diff options
author | George Wangensteen <george.wangensteen@mongodb.com> | 2022-11-14 22:46:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-14 23:26:24 +0000 |
commit | c8c87a09e3b07543e7422172cacdb95257c497b3 (patch) | |
tree | 2b51cde4c2bbaacb603a41c72b22d6dc5c73f4d9 | |
parent | 682d0882c0c8b570f5c8cf247af2b806bd1a7299 (diff) | |
download | mongo-c8c87a09e3b07543e7422172cacdb95257c497b3.tar.gz |
SERVER-71192 Rewrite targeting and executor shutdown errors for hedged async rpc
-rw-r--r-- | src/mongo/executor/async_rpc.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc_targeter.h | 4 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc_test.cpp | 25 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc_test_fixture.h | 22 | ||||
-rw-r--r-- | src/mongo/executor/hedged_async_rpc.h | 38 | ||||
-rw-r--r-- | src/mongo/executor/hedged_async_rpc_test.cpp | 44 |
6 files changed, 123 insertions, 14 deletions
diff --git a/src/mongo/executor/async_rpc.cpp b/src/mongo/executor/async_rpc.cpp index f2c591e30d6..3e0a4e81591 100644 --- a/src/mongo/executor/async_rpc.cpp +++ b/src/mongo/executor/async_rpc.cpp @@ -58,8 +58,8 @@ public: .thenRunOn(exec) .then([dbName, cmdBSON, opCtx, exec = std::move(exec), token]( std::vector<HostAndPort> targets) { - uassert(ErrorCodes::HostNotFound, "No hosts availables", targets.size() != 0); - + invariant(targets.size(), + "Successful targeting implies there are hosts to target."); executor::RemoteCommandRequestOnAny executorRequest( targets, dbName.toString(), cmdBSON, rpc::makeEmptyMetadata(), opCtx); return exec->scheduleRemoteCommandOnAny(executorRequest, token); diff --git a/src/mongo/executor/async_rpc_targeter.h b/src/mongo/executor/async_rpc_targeter.h index 8d56ddf1fec..eadac59d168 100644 --- a/src/mongo/executor/async_rpc_targeter.h +++ b/src/mongo/executor/async_rpc_targeter.h @@ -51,7 +51,9 @@ public: /* * Returns a collection of possible Hosts on which the command may run based on the specific - * settings (ReadPreference, etc.) of the targeter. + * settings (ReadPreference, etc.) of the targeter. Note that if no targets can be found on + * which to run the command, the returned future should be set with an error - an empty vector + * should never be returned and is treated as a programmer error. */ virtual SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) = 0; diff --git a/src/mongo/executor/async_rpc_test.cpp b/src/mongo/executor/async_rpc_test.cpp index 219612587ed..d325f9dd75e 100644 --- a/src/mongo/executor/async_rpc_test.cpp +++ b/src/mongo/executor/async_rpc_test.cpp @@ -468,6 +468,31 @@ TEST_F(AsyncRPCTestFixture, ParseAndSeralizeNoop) { "RemoteCommandExectionError illegally parsed from bson"); } +/** + * When the targeter returns an error, ensure we rewrite it correctly. + */ +TEST_F(AsyncRPCTestFixture, FailedTargeting) { + auto targeterFailStatus = Status{ErrorCodes::InternalError, "Fake targeter failure"}; + auto targeter = std::make_unique<FailingTargeter>(targeterFailStatus); + HelloCommand helloCmd; + initializeCommand(helloCmd); + auto opCtxHolder = makeOperationContext(); + + auto resultFuture = sendCommand( + helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + + auto error = resultFuture.getNoThrow().getStatus(); + // The error returned by our API should always be RemoteCommandExecutionError + ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); + // Make sure we can extract the extra error info + auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>(); + ASSERT(extraInfo); + // Make sure the extra info indicates the error was local, and that the + // local error (which is just a Status) has the correct code. + ASSERT(extraInfo->isLocal()); + ASSERT_EQ(extraInfo->asLocal(), targeterFailStatus); +} + } // namespace } // namespace async_rpc } // namespace mongo diff --git a/src/mongo/executor/async_rpc_test_fixture.h b/src/mongo/executor/async_rpc_test_fixture.h index 764f954fd5a..58e79f11a92 100644 --- a/src/mongo/executor/async_rpc_test_fixture.h +++ b/src/mongo/executor/async_rpc_test_fixture.h @@ -166,4 +166,26 @@ private: std::shared_ptr<TaskExecutor> _executor; std::shared_ptr<NetworkInterfaceMock> _net; }; + +/** + * Targeter for use in tests. Returns a user-configurable error when asked to resolve + * targets; the error returned can be set when constructing this type. + */ +class FailingTargeter : public Targeter { +public: + FailingTargeter(Status errorToFailWith) : _status{errorToFailWith} {} + SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) override final { + return _status; + } + + SemiFuture<void> onRemoteCommandError(HostAndPort h, Status s) override final { + return SemiFuture<void>::makeReady(); + } + + Status getErrorStatus() { + return _status; + } + + Status _status; +}; } // namespace mongo::async_rpc diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h index e1e45b99cb2..7226d96d352 100644 --- a/src/mongo/executor/hedged_async_rpc.h +++ b/src/mongo/executor/hedged_async_rpc.h @@ -121,10 +121,17 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( // Set up cancellation token to cancel remaining hedged operations. CancellationSource hedgeCancellationToken{token}; auto tryBody = [=, targeter = std::move(targeter)] { - return targeter->resolve(token).thenRunOn(exec).then( - [cmd, opCtx, exec, token, hedgeCancellationToken, readPref]( - std::vector<HostAndPort> targets) { - uassert(ErrorCodes::HostNotFound, "No hosts available.", targets.size() != 0); + return targeter->resolve(token) + .thenRunOn(exec) + .onError([](Status status) -> StatusWith<std::vector<HostAndPort>> { + // Targeting error; rewrite it to a RemoteCommandExecutionError and skip + // command execution body. We'll retry if the policy indicates to. + return Status{AsyncRPCErrorInfo(status), status.reason()}; + }) + .then([cmd, opCtx, exec, token, hedgeCancellationToken, readPref]( + std::vector<HostAndPort> targets) { + invariant(targets.size(), + "Successful targeting implies there are hosts to target."); HedgeOptions opts = getHedgeOptions(CommandType::kCommandName, readPref); @@ -180,8 +187,29 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( }) .withBackoffBetweenIterations(detail::RetryDelayAsBackoff(retryPolicy.get())) .on(exec, CancellationToken::uncancelable()) - .onCompletion([hedgeCancellationToken](StatusWith<SingleResponse> result) mutable { + // We go inline here to intercept executor-shutdown errors and re-write them + // so that the API always returns RemoteCommandExecutionError. Additionally, + // we need to make sure we cancel outstanding requests. + .unsafeToInlineFuture() + .onCompletion([hedgeCancellationToken]( + StatusWith<SingleResponse> result) mutable -> StatusWith<SingleResponse> { hedgeCancellationToken.cancel(); + if (!result.isOK()) { + auto status = result.getStatus(); + if (status.code() == ErrorCodes::RemoteCommandExecutionError) { + return status; + } + // The API implementation guarantees that all errors are provided as + // RemoteCommandExecutionError, so if we've reached this code, it means that the API + // internals were unable to run due to executor shutdown. Today, the only guarantee + // we can make about an executor-shutdown error is that it is in the cancellation + // category. We dassert that this is the case to make it easy to find errors in the + // API implementation's error-handling while still ensuring that we always return + // the correct error code in production. + dassert(ErrorCodes::isA<ErrorCategory::CancellationError>(status.code())); + return Status{AsyncRPCErrorInfo(status), + "Remote command execution failed due to executor shutdown"}; + } return result; }) .semi(); diff --git a/src/mongo/executor/hedged_async_rpc_test.cpp b/src/mongo/executor/hedged_async_rpc_test.cpp index cc39f7830d3..88362165127 100644 --- a/src/mongo/executor/hedged_async_rpc_test.cpp +++ b/src/mongo/executor/hedged_async_rpc_test.cpp @@ -65,7 +65,6 @@ using executor::RemoteCommandResponse; class HedgedAsyncRPCTest : public AsyncRPCTestFixture { public: - const std::vector<HostAndPort> kEmptyHosts{}; const std::vector<HostAndPort> kTwoHosts{HostAndPort("FakeHost1", 12345), HostAndPort("FakeHost2", 12345)}; using TwoHostCallback = @@ -242,15 +241,48 @@ TEST_F(HedgedAsyncRPCTest, HedgedAsyncRPCWithRetryPolicy) { } /** - * When the targeter returns no hosts, we get a HostNotFound error. + * When the targeter returns an error, ensure we rewrite it correctly. */ -TEST_F(HedgedAsyncRPCTest, NoShardsFound) { - HelloCommandReply helloReply = HelloCommandReply(TopologyVersion(OID::gen(), 0)); +TEST_F(HedgedAsyncRPCTest, FailedTargeting) { HelloCommand helloCmd; initializeCommand(helloCmd); - auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kEmptyHosts); + auto opCtx = makeOperationContext(); + auto targeterFailStatus = Status{ErrorCodes::InternalError, "Fake targeter failure"}; + auto targeter = std::make_unique<FailingTargeter>(targeterFailStatus); + + auto resultFuture = sendHedgedCommand(helloCmd, + opCtx.get(), + std::move(targeter), + getExecutorPtr(), + CancellationToken::uncancelable()); + + auto error = resultFuture.getNoThrow().getStatus(); + ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); + + auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>(); + ASSERT(extraInfo); + + ASSERT(extraInfo->isLocal()); + auto localError = extraInfo->asLocal(); + ASSERT_EQ(localError, targeterFailStatus); +} - ASSERT_THROWS_CODE(resultFuture.get(), DBException, ErrorCodes::HostNotFound); +// Ensure that the sendHedgedCommand correctly returns RemoteCommandExecutionError when the executor +// is shutdown mid-remote-invocation, and that the executor shutdown error is contained +// in the error's ExtraInfo. +TEST_F(HedgedAsyncRPCTest, ExecutorShutdown) { + auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts); + getExecutorPtr()->shutdown(); + auto error = resultFuture.getNoThrow().getStatus(); + // The error returned by our API should always be RemoteCommandExecutionError + ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); + // Make sure we can extract the extra error info + auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>(); + ASSERT(extraInfo); + // Make sure the extra info indicates the error was local, and that the + // local error (which is just a Status) has the correct code. + ASSERT(extraInfo->isLocal()); + ASSERT(ErrorCodes::isA<ErrorCategory::CancellationError>(extraInfo->asLocal())); } /** |