diff options
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/async_rpc.cpp | 17 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc.h | 52 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc_test.cpp | 75 | ||||
-rw-r--r-- | src/mongo/executor/hedged_async_rpc.h | 16 | ||||
-rw-r--r-- | src/mongo/executor/hedged_async_rpc_test.cpp | 92 | ||||
-rw-r--r-- | src/mongo/executor/mock_async_rpc.h | 6 |
6 files changed, 230 insertions, 28 deletions
diff --git a/src/mongo/executor/async_rpc.cpp b/src/mongo/executor/async_rpc.cpp index 32eada9c970..d3fe08f49ca 100644 --- a/src/mongo/executor/async_rpc.cpp +++ b/src/mongo/executor/async_rpc.cpp @@ -53,18 +53,25 @@ public: Targeter* targeter, OperationContext* opCtx, std::shared_ptr<TaskExecutor> exec, - CancellationToken token) final { + CancellationToken token, + BatonHandle baton) final { + auto proxyExec = std::make_shared<ProxyingExecutor>(baton, exec); auto targetsUsed = std::make_shared<std::vector<HostAndPort>>(); return targeter->resolve(token) - .thenRunOn(exec) - .then([dbName, cmdBSON, opCtx, exec = std::move(exec), token, targetsUsed]( - std::vector<HostAndPort> targets) { + .thenRunOn(proxyExec) + .then([dbName, + cmdBSON, + opCtx, + exec = std::move(exec), + token, + baton = std::move(baton), + targetsUsed](std::vector<HostAndPort> targets) { invariant(targets.size(), "Successful targeting implies there are hosts to target."); *targetsUsed = targets; executor::RemoteCommandRequestOnAny executorRequest( targets, dbName.toString(), cmdBSON, rpc::makeEmptyMetadata(), opCtx); - return exec->scheduleRemoteCommandOnAny(executorRequest, token); + return exec->scheduleRemoteCommandOnAny(executorRequest, token, std::move(baton)); }) .onError([targetsUsed](Status s) -> StatusWith<TaskExecutor::ResponseOnAnyStatus> { // If there was a scheduling error or other local error before the diff --git a/src/mongo/executor/async_rpc.h b/src/mongo/executor/async_rpc.h index 3c4c6cf05ec..102cf7e0bb0 100644 --- a/src/mongo/executor/async_rpc.h +++ b/src/mongo/executor/async_rpc.h @@ -86,12 +86,14 @@ struct AsyncRPCOptions { AsyncRPCOptions(CommandType cmd, std::shared_ptr<executor::TaskExecutor> exec, CancellationToken token, - std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) - : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy} {} + std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(), + BatonHandle baton = nullptr) + : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy}, baton{std::move(baton)} {} CommandType cmd; std::shared_ptr<executor::TaskExecutor> exec; CancellationToken token; std::shared_ptr<RetryPolicy> retryPolicy; + BatonHandle baton; }; /** @@ -124,7 +126,22 @@ public: Targeter* targeter, OperationContext* opCtx, std::shared_ptr<TaskExecutor> exec, - CancellationToken token) = 0; + CancellationToken token, + BatonHandle baton) = 0; + ExecutorFuture<AsyncRPCInternalResponse> _sendCommand(StringData dbName, + BSONObj cmdBSON, + Targeter* targeter, + OperationContext* opCtx, + std::shared_ptr<TaskExecutor> exec, + CancellationToken token) { + return _sendCommand(std::move(dbName), + std::move(cmdBSON), + std::move(targeter), + std::move(opCtx), + std::move(exec), + std::move(token), + nullptr); + } static AsyncRPCRunner* get(ServiceContext* serviceContext); static void set(ServiceContext* serviceContext, std::unique_ptr<AsyncRPCRunner> theRunner); }; @@ -154,6 +171,31 @@ struct RetryDelayAsBackoff { RetryPolicy* _policy; }; +class ProxyingExecutor : public OutOfLineExecutor, + public std::enable_shared_from_this<ProxyingExecutor> { +public: + ProxyingExecutor(BatonHandle baton, std::shared_ptr<TaskExecutor> executor) + : _baton{std::move(baton)}, _executor{std::move(executor)} {} + + void schedule(Task func) override { + if (_baton) + return _baton->schedule(std::move(func)); + return _executor->schedule(std::move(func)); + } + + ExecutorFuture<void> sleepFor(Milliseconds duration, const CancellationToken& token) { + auto deadline = Date_t::now() + duration; + if (auto netBaton = _baton ? _baton->networking() : nullptr; netBaton) { + return netBaton->waitUntil(deadline, token).thenRunOn(shared_from_this()); + } + return _executor->sleepFor(duration, token); + } + +private: + BatonHandle _baton; + std::shared_ptr<TaskExecutor> _executor; +}; + template <typename CommandType> ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRunner( BSONObj cmdBSON, @@ -162,6 +204,7 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun OperationContext* opCtx, std::unique_ptr<Targeter> targeter) { using ReplyType = AsyncRPCResponse<typename CommandType::Reply>; + auto proxyExec = std::make_shared<ProxyingExecutor>(options->baton, options->exec); auto tryBody = [=, targeter = std::move(targeter)] { // Execute the command after extracting the db name and bson from the CommandType. // Wrapping this function allows us to separate the CommandType parsing logic from the @@ -181,7 +224,8 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun return shouldStopRetry; }) .withBackoffBetweenIterations(RetryDelayAsBackoff(options->retryPolicy.get())) - .on(options->exec, CancellationToken::uncancelable()); + .on(proxyExec, CancellationToken::uncancelable()); + return std::move(resFuture) .then([](detail::AsyncRPCInternalResponse r) -> ReplyType { auto res = CommandType::Reply::parseSharingOwnership(IDLParserContext("AsyncRPCRunner"), diff --git a/src/mongo/executor/async_rpc_test.cpp b/src/mongo/executor/async_rpc_test.cpp index 62824a15cae..0fdec25994e 100644 --- a/src/mongo/executor/async_rpc_test.cpp +++ b/src/mongo/executor/async_rpc_test.cpp @@ -44,6 +44,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/topology_version_gen.h" #include "mongo/unittest/bson_test_util.h" +#include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/unittest/unittest.h" #include "mongo/util/duration.h" #include "mongo/util/future.h" @@ -412,6 +413,44 @@ TEST_F(AsyncRPCTestFixture, ExecutorShutdown) { ASSERT(ErrorCodes::isA<ErrorCategory::CancellationError>(extraInfo->asLocal())); } +TEST_F(AsyncRPCTestFixture, BatonTest) { + std::unique_ptr<Targeter> targeter = std::make_unique<LocalHostTargeter>(); + auto retryPolicy = std::make_shared<NeverRetryPolicy>(); + HelloCommand helloCmd; + HelloCommandReply helloReply = HelloCommandReply(TopologyVersion(OID::gen(), 0)); + initializeCommand(helloCmd); + auto opCtxHolder = makeOperationContext(); + auto baton = opCtxHolder->getBaton(); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + options->baton = baton; + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); + + Notification<void> seenNetworkRequest; + unittest::ThreadAssertionMonitor monitor; + // This thread will respond to the request we sent via sendCommand above. + auto networkResponder = monitor.spawn([&] { + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["hello"]); + seenNetworkRequest.set(); + monitor.notifyDone(); + return helloReply.toBSON(); + }); + }); + // Wait on the opCtx until networkResponder has observed the network request. + // While we block on the opCtx, the current thread should run jobs scheduled + // on the baton, including enqueuing the network request via `sendCommand` above. + seenNetworkRequest.get(opCtxHolder.get()); + + networkResponder.join(); + // Wait on the opCtx again to allow the current thread, via the baton, to propogate + // the network response up into the resultFuture. + AsyncRPCResponse res = resultFuture.get(opCtxHolder.get()); + + ASSERT_BSONOBJ_EQ(res.response.toBSON(), helloReply.toBSON()); + ASSERT_EQ(HostAndPort("localhost", serverGlobalParams.port), res.targetUsed); +} + /* * Basic Targeter that returns the host that invoked it. */ @@ -507,6 +546,42 @@ TEST_F(AsyncRPCTestFixture, FailedTargeting) { ASSERT(extraInfo->isLocal()); ASSERT_EQ(extraInfo->asLocal(), targeterFailStatus); } +TEST_F(AsyncRPCTestFixture, BatonShutdownExecutorAlive) { + std::unique_ptr<Targeter> targeter = std::make_unique<LocalHostTargeter>(); + auto retryPolicy = std::make_shared<TestRetryPolicy>(); + const auto maxNumRetries = 5; + const auto retryDelay = Milliseconds(10); + retryPolicy->setMaxNumRetries(maxNumRetries); + for (int i = 0; i < maxNumRetries; ++i) + retryPolicy->pushRetryDelay(retryDelay); + HelloCommand helloCmd; + HelloCommandReply helloReply = HelloCommandReply(TopologyVersion(OID::gen(), 0)); + initializeCommand(helloCmd); + auto opCtxHolder = makeOperationContext(); + auto subBaton = opCtxHolder->getBaton()->makeSubBaton(); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + options->baton = *subBaton; + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); + + subBaton.shutdown(); + + auto error = resultFuture.getNoThrow().getStatus(); + auto expectedDetachError = Status(ErrorCodes::ShutdownInProgress, "Baton detached"); + auto expectedOuterReason = "Remote command execution failed due to executor shutdown"; + + ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); + ASSERT_EQ(error.reason(), expectedOuterReason); + + auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>(); + ASSERT(extraInfo); + + ASSERT(extraInfo->isLocal()); + auto localError = extraInfo->asLocal(); + ASSERT_EQ(localError, expectedDetachError); + + ASSERT_EQ(0, retryPolicy->getNumRetriesPerformed()); +} TEST_F(AsyncRPCTestFixture, SendTxnCommandWithoutTxnRouterAppendsNoTxnFields) { ShardId shardId("shard"); diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h index 511b53a56e7..93e072b6ec2 100644 --- a/src/mongo/executor/hedged_async_rpc.h +++ b/src/mongo/executor/hedged_async_rpc.h @@ -115,22 +115,23 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( std::shared_ptr<executor::TaskExecutor> exec, CancellationToken token, std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(), - ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly)) { + ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly), + BatonHandle baton = nullptr) { using SingleResponse = AsyncRPCResponse<typename CommandType::Reply>; // Set up cancellation token to cancel remaining hedged operations. CancellationSource hedgeCancellationToken{token}; auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>(); + auto proxyExec = std::make_shared<detail::ProxyingExecutor>(baton, exec); auto tryBody = [=, targeter = std::move(targeter)] { return targeter->resolve(token) - .thenRunOn(exec) + .thenRunOn(proxyExec) .onError([](Status status) -> StatusWith<std::vector<HostAndPort>> { // Targeting error; rewrite it to a RemoteCommandExecutionError and skip // command execution body. We'll retry if the policy indicates to. return Status{AsyncRPCErrorInfo(status), status.reason()}; }) - .then([cmd, opCtx, exec, token, hedgeCancellationToken, readPref, targetsAttempted]( - std::vector<HostAndPort> targets) { + .then([=](std::vector<HostAndPort> targets) { invariant(targets.size(), "Successful targeting implies there are hosts to target."); *targetsAttempted = targets; @@ -146,8 +147,9 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( std::unique_ptr<Targeter> t = std::make_unique<FixedTargeter>(targets[i]); auto options = std::make_shared<AsyncRPCOptions<CommandType>>( cmd, exec, hedgeCancellationToken.token()); - requests.emplace_back( - sendCommand(options, opCtx, std::move(t)).thenRunOn(exec)); + options->baton = baton; + requests.push_back( + sendCommand(options, opCtx, std::move(t)).thenRunOn(proxyExec)); } /** @@ -189,7 +191,7 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( !retryPolicy->recordAndEvaluateRetry(swResponse.getStatus()); }) .withBackoffBetweenIterations(detail::RetryDelayAsBackoff(retryPolicy.get())) - .on(exec, CancellationToken::uncancelable()) + .on(proxyExec, CancellationToken::uncancelable()) // We go inline here to intercept executor-shutdown errors and re-write them // so that the API always returns RemoteCommandExecutionError. Additionally, // we need to make sure we cancel outstanding requests. diff --git a/src/mongo/executor/hedged_async_rpc_test.cpp b/src/mongo/executor/hedged_async_rpc_test.cpp index 95140e0259f..64839b1a7f5 100644 --- a/src/mongo/executor/hedged_async_rpc_test.cpp +++ b/src/mongo/executor/hedged_async_rpc_test.cpp @@ -103,7 +103,8 @@ public: SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommandWithHosts( CommandType cmd, std::vector<HostAndPort> hosts, - std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) { + std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(), + BatonHandle bh = nullptr) { // Use a readPreference that's elgible for hedging. ReadPreferenceSetting readPref(ReadPreference::Nearest); readPref.hedgingMode = HedgingMode(); @@ -117,14 +118,14 @@ public: std::unique_ptr<Targeter> targeter = std::make_unique<AsyncRemoteCommandTargeterAdapter>(readPref, t); - _opCtx = makeOperationContext(); return sendHedgedCommand(cmd, _opCtx.get(), std::move(targeter), getExecutorPtr(), CancellationToken::uncancelable(), retryPolicy, - readPref); + readPref, + bh); } const NamespaceString testNS = NamespaceString("testdb", "testcoll"); @@ -146,10 +147,13 @@ public: const RemoteCommandResponse testAlternateIgnorableErrorResponse{ createErrorResponse(ignorableNetworkTimeoutStatus), Milliseconds(1)}; + auto getOpCtx() { + return _opCtx.get(); + } + private: - // This OpCtx is used by sendHedgedCommandWithHosts and is initialized when the function is - // first invoked and destroyed during fixture destruction. - ServiceContext::UniqueOperationContext _opCtx; + // This OpCtx is used by sendHedgedCommandWithHosts and is destroyed during fixture destruction. + ServiceContext::UniqueOperationContext _opCtx{makeOperationContext()}; }; /** @@ -246,12 +250,11 @@ TEST_F(HedgedAsyncRPCTest, HedgedAsyncRPCWithRetryPolicy) { TEST_F(HedgedAsyncRPCTest, FailedTargeting) { HelloCommand helloCmd; initializeCommand(helloCmd); - auto opCtx = makeOperationContext(); auto targeterFailStatus = Status{ErrorCodes::InternalError, "Fake targeter failure"}; auto targeter = std::make_unique<FailingTargeter>(targeterFailStatus); auto resultFuture = sendHedgedCommand(helloCmd, - opCtx.get(), + getOpCtx(), std::move(targeter), getExecutorPtr(), CancellationToken::uncancelable()); @@ -819,9 +822,8 @@ TEST_F(HedgedAsyncRPCTest, NoAttemptedTargetIfTargetingFails) { auto targeter = std::make_unique<FailingTargeter>(resolveErr); - auto opCtxHolder = makeOperationContext(); auto resultFuture = sendHedgedCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + helloCmd, getOpCtx(), std::move(targeter), getExecutorPtr(), _cancellationToken); auto error = resultFuture.getNoThrow().getStatus(); ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); @@ -868,6 +870,76 @@ TEST_F(HedgedAsyncRPCTest, RemoteErrorAttemptedTargetsContainActual) { ASSERT_THAT(remoteErr.getTargetUsed(), eitherHostMatcher); } +TEST_F(HedgedAsyncRPCTest, BatonTest) { + std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(); + auto resultFuture = + sendHedgedCommandWithHosts(testFindCmd, kTwoHosts, retryPolicy, getOpCtx()->getBaton()); + + Notification<void> seenNetworkRequests; + // This thread will respond to the requests we sent via sendHedgedCommandWithHosts above. + stdx::thread networkResponder([&] { + auto network = getNetworkInterfaceMock(); + network->enterNetwork(); + auto now = network->now(); + performAuthoritativeHedgeBehavior( + network, + [&](NetworkInterfaceMock::NetworkOperationIterator authoritative, + NetworkInterfaceMock::NetworkOperationIterator hedged) { + network->scheduleResponse(hedged, now, testIgnorableErrorResponse); + network->scheduleSuccessfulResponse(authoritative, now, testSuccessResponse); + seenNetworkRequests.set(); + }); + network->runReadyNetworkOperations(); + network->exitNetwork(); + }); + // Wait on the opCtx until networkResponder has observed the network requests. + // While we block on the opCtx, the current thread should run jobs scheduled + // on the baton, including enqueuing the network requests via `sendHedgedCommand` above. + seenNetworkRequests.get(getOpCtx()); + + networkResponder.join(); + // Wait on the opCtx again to allow the current thread, via the baton, to propogate + // the network responses up into the resultFuture. + AsyncRPCResponse res = resultFuture.get(getOpCtx()); + + ASSERT_EQ(res.response.getCursor()->getNs(), testNS); + ASSERT_BSONOBJ_EQ(res.response.getCursor()->getFirstBatch()[0], testFirstBatch); + namespace m = unittest::match; + ASSERT_THAT(res.targetUsed, m::AnyOf(m::Eq(kTwoHosts[0]), m::Eq(kTwoHosts[1]))); +} +TEST_F(HedgedAsyncRPCTest, BatonShutdownExecutorAlive) { + auto retryPolicy = std::make_shared<TestRetryPolicy>(); + const auto maxNumRetries = 5; + const auto retryDelay = Milliseconds(10); + retryPolicy->setMaxNumRetries(maxNumRetries); + for (int i = 0; i < maxNumRetries; ++i) + retryPolicy->pushRetryDelay(retryDelay); + auto subBaton = getOpCtx()->getBaton()->makeSubBaton(); + auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts, retryPolicy, *subBaton); + + subBaton.shutdown(); + auto net = getNetworkInterfaceMock(); + for (auto i = 0; i <= maxNumRetries; i++) { + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + net->advanceTime(net->now() + retryDelay); + } + + auto error = resultFuture.getNoThrow().getStatus(); + auto expectedDetachError = Status(ErrorCodes::ShutdownInProgress, "Baton detached"); + auto expectedOuterReason = "Remote command execution failed due to executor shutdown"; + + ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); + ASSERT_EQ(error.reason(), expectedOuterReason); + + auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>(); + ASSERT(extraInfo); + + ASSERT(extraInfo->isLocal()); + auto localError = extraInfo->asLocal(); + ASSERT_EQ(localError, expectedDetachError); + + ASSERT_EQ(0, retryPolicy->getNumRetriesPerformed()); +} } // namespace } // namespace async_rpc } // namespace mongo diff --git a/src/mongo/executor/mock_async_rpc.h b/src/mongo/executor/mock_async_rpc.h index eaa4dba2fb9..1f6b34e3078 100644 --- a/src/mongo/executor/mock_async_rpc.h +++ b/src/mongo/executor/mock_async_rpc.h @@ -100,7 +100,8 @@ public: Targeter* targeter, OperationContext* opCtx, std::shared_ptr<TaskExecutor> exec, - CancellationToken token) final { + CancellationToken token, + BatonHandle) final { auto [p, f] = makePromiseFuture<BSONObj>(); auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>(); return targeter->resolve(token) @@ -212,7 +213,8 @@ public: Targeter* targeter, OperationContext* opCtx, std::shared_ptr<TaskExecutor> exec, - CancellationToken token) final { + CancellationToken token, + BatonHandle) final { auto [p, f] = makePromiseFuture<BSONObj>(); auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>(); return targeter->resolve(token).thenRunOn(exec).then([this, |