summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/async_rpc.cpp17
-rw-r--r--src/mongo/executor/async_rpc.h52
-rw-r--r--src/mongo/executor/async_rpc_test.cpp75
-rw-r--r--src/mongo/executor/hedged_async_rpc.h16
-rw-r--r--src/mongo/executor/hedged_async_rpc_test.cpp92
-rw-r--r--src/mongo/executor/mock_async_rpc.h6
6 files changed, 230 insertions, 28 deletions
diff --git a/src/mongo/executor/async_rpc.cpp b/src/mongo/executor/async_rpc.cpp
index 32eada9c970..d3fe08f49ca 100644
--- a/src/mongo/executor/async_rpc.cpp
+++ b/src/mongo/executor/async_rpc.cpp
@@ -53,18 +53,25 @@ public:
Targeter* targeter,
OperationContext* opCtx,
std::shared_ptr<TaskExecutor> exec,
- CancellationToken token) final {
+ CancellationToken token,
+ BatonHandle baton) final {
+ auto proxyExec = std::make_shared<ProxyingExecutor>(baton, exec);
auto targetsUsed = std::make_shared<std::vector<HostAndPort>>();
return targeter->resolve(token)
- .thenRunOn(exec)
- .then([dbName, cmdBSON, opCtx, exec = std::move(exec), token, targetsUsed](
- std::vector<HostAndPort> targets) {
+ .thenRunOn(proxyExec)
+ .then([dbName,
+ cmdBSON,
+ opCtx,
+ exec = std::move(exec),
+ token,
+ baton = std::move(baton),
+ targetsUsed](std::vector<HostAndPort> targets) {
invariant(targets.size(),
"Successful targeting implies there are hosts to target.");
*targetsUsed = targets;
executor::RemoteCommandRequestOnAny executorRequest(
targets, dbName.toString(), cmdBSON, rpc::makeEmptyMetadata(), opCtx);
- return exec->scheduleRemoteCommandOnAny(executorRequest, token);
+ return exec->scheduleRemoteCommandOnAny(executorRequest, token, std::move(baton));
})
.onError([targetsUsed](Status s) -> StatusWith<TaskExecutor::ResponseOnAnyStatus> {
// If there was a scheduling error or other local error before the
diff --git a/src/mongo/executor/async_rpc.h b/src/mongo/executor/async_rpc.h
index 3c4c6cf05ec..102cf7e0bb0 100644
--- a/src/mongo/executor/async_rpc.h
+++ b/src/mongo/executor/async_rpc.h
@@ -86,12 +86,14 @@ struct AsyncRPCOptions {
AsyncRPCOptions(CommandType cmd,
std::shared_ptr<executor::TaskExecutor> exec,
CancellationToken token,
- std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>())
- : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy} {}
+ std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(),
+ BatonHandle baton = nullptr)
+ : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy}, baton{std::move(baton)} {}
CommandType cmd;
std::shared_ptr<executor::TaskExecutor> exec;
CancellationToken token;
std::shared_ptr<RetryPolicy> retryPolicy;
+ BatonHandle baton;
};
/**
@@ -124,7 +126,22 @@ public:
Targeter* targeter,
OperationContext* opCtx,
std::shared_ptr<TaskExecutor> exec,
- CancellationToken token) = 0;
+ CancellationToken token,
+ BatonHandle baton) = 0;
+ ExecutorFuture<AsyncRPCInternalResponse> _sendCommand(StringData dbName,
+ BSONObj cmdBSON,
+ Targeter* targeter,
+ OperationContext* opCtx,
+ std::shared_ptr<TaskExecutor> exec,
+ CancellationToken token) {
+ return _sendCommand(std::move(dbName),
+ std::move(cmdBSON),
+ std::move(targeter),
+ std::move(opCtx),
+ std::move(exec),
+ std::move(token),
+ nullptr);
+ }
static AsyncRPCRunner* get(ServiceContext* serviceContext);
static void set(ServiceContext* serviceContext, std::unique_ptr<AsyncRPCRunner> theRunner);
};
@@ -154,6 +171,31 @@ struct RetryDelayAsBackoff {
RetryPolicy* _policy;
};
+class ProxyingExecutor : public OutOfLineExecutor,
+ public std::enable_shared_from_this<ProxyingExecutor> {
+public:
+ ProxyingExecutor(BatonHandle baton, std::shared_ptr<TaskExecutor> executor)
+ : _baton{std::move(baton)}, _executor{std::move(executor)} {}
+
+ void schedule(Task func) override {
+ if (_baton)
+ return _baton->schedule(std::move(func));
+ return _executor->schedule(std::move(func));
+ }
+
+ ExecutorFuture<void> sleepFor(Milliseconds duration, const CancellationToken& token) {
+ auto deadline = Date_t::now() + duration;
+ if (auto netBaton = _baton ? _baton->networking() : nullptr; netBaton) {
+ return netBaton->waitUntil(deadline, token).thenRunOn(shared_from_this());
+ }
+ return _executor->sleepFor(duration, token);
+ }
+
+private:
+ BatonHandle _baton;
+ std::shared_ptr<TaskExecutor> _executor;
+};
+
template <typename CommandType>
ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRunner(
BSONObj cmdBSON,
@@ -162,6 +204,7 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun
OperationContext* opCtx,
std::unique_ptr<Targeter> targeter) {
using ReplyType = AsyncRPCResponse<typename CommandType::Reply>;
+ auto proxyExec = std::make_shared<ProxyingExecutor>(options->baton, options->exec);
auto tryBody = [=, targeter = std::move(targeter)] {
// Execute the command after extracting the db name and bson from the CommandType.
// Wrapping this function allows us to separate the CommandType parsing logic from the
@@ -181,7 +224,8 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun
return shouldStopRetry;
})
.withBackoffBetweenIterations(RetryDelayAsBackoff(options->retryPolicy.get()))
- .on(options->exec, CancellationToken::uncancelable());
+ .on(proxyExec, CancellationToken::uncancelable());
+
return std::move(resFuture)
.then([](detail::AsyncRPCInternalResponse r) -> ReplyType {
auto res = CommandType::Reply::parseSharingOwnership(IDLParserContext("AsyncRPCRunner"),
diff --git a/src/mongo/executor/async_rpc_test.cpp b/src/mongo/executor/async_rpc_test.cpp
index 62824a15cae..0fdec25994e 100644
--- a/src/mongo/executor/async_rpc_test.cpp
+++ b/src/mongo/executor/async_rpc_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/topology_version_gen.h"
#include "mongo/unittest/bson_test_util.h"
+#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/duration.h"
#include "mongo/util/future.h"
@@ -412,6 +413,44 @@ TEST_F(AsyncRPCTestFixture, ExecutorShutdown) {
ASSERT(ErrorCodes::isA<ErrorCategory::CancellationError>(extraInfo->asLocal()));
}
+TEST_F(AsyncRPCTestFixture, BatonTest) {
+ std::unique_ptr<Targeter> targeter = std::make_unique<LocalHostTargeter>();
+ auto retryPolicy = std::make_shared<NeverRetryPolicy>();
+ HelloCommand helloCmd;
+ HelloCommandReply helloReply = HelloCommandReply(TopologyVersion(OID::gen(), 0));
+ initializeCommand(helloCmd);
+ auto opCtxHolder = makeOperationContext();
+ auto baton = opCtxHolder->getBaton();
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ options->baton = baton;
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
+
+ Notification<void> seenNetworkRequest;
+ unittest::ThreadAssertionMonitor monitor;
+ // This thread will respond to the request we sent via sendCommand above.
+ auto networkResponder = monitor.spawn([&] {
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["hello"]);
+ seenNetworkRequest.set();
+ monitor.notifyDone();
+ return helloReply.toBSON();
+ });
+ });
+ // Wait on the opCtx until networkResponder has observed the network request.
+ // While we block on the opCtx, the current thread should run jobs scheduled
+ // on the baton, including enqueuing the network request via `sendCommand` above.
+ seenNetworkRequest.get(opCtxHolder.get());
+
+ networkResponder.join();
+ // Wait on the opCtx again to allow the current thread, via the baton, to propogate
+ // the network response up into the resultFuture.
+ AsyncRPCResponse res = resultFuture.get(opCtxHolder.get());
+
+ ASSERT_BSONOBJ_EQ(res.response.toBSON(), helloReply.toBSON());
+ ASSERT_EQ(HostAndPort("localhost", serverGlobalParams.port), res.targetUsed);
+}
+
/*
* Basic Targeter that returns the host that invoked it.
*/
@@ -507,6 +546,42 @@ TEST_F(AsyncRPCTestFixture, FailedTargeting) {
ASSERT(extraInfo->isLocal());
ASSERT_EQ(extraInfo->asLocal(), targeterFailStatus);
}
+TEST_F(AsyncRPCTestFixture, BatonShutdownExecutorAlive) {
+ std::unique_ptr<Targeter> targeter = std::make_unique<LocalHostTargeter>();
+ auto retryPolicy = std::make_shared<TestRetryPolicy>();
+ const auto maxNumRetries = 5;
+ const auto retryDelay = Milliseconds(10);
+ retryPolicy->setMaxNumRetries(maxNumRetries);
+ for (int i = 0; i < maxNumRetries; ++i)
+ retryPolicy->pushRetryDelay(retryDelay);
+ HelloCommand helloCmd;
+ HelloCommandReply helloReply = HelloCommandReply(TopologyVersion(OID::gen(), 0));
+ initializeCommand(helloCmd);
+ auto opCtxHolder = makeOperationContext();
+ auto subBaton = opCtxHolder->getBaton()->makeSubBaton();
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ options->baton = *subBaton;
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
+
+ subBaton.shutdown();
+
+ auto error = resultFuture.getNoThrow().getStatus();
+ auto expectedDetachError = Status(ErrorCodes::ShutdownInProgress, "Baton detached");
+ auto expectedOuterReason = "Remote command execution failed due to executor shutdown";
+
+ ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
+ ASSERT_EQ(error.reason(), expectedOuterReason);
+
+ auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
+ ASSERT(extraInfo);
+
+ ASSERT(extraInfo->isLocal());
+ auto localError = extraInfo->asLocal();
+ ASSERT_EQ(localError, expectedDetachError);
+
+ ASSERT_EQ(0, retryPolicy->getNumRetriesPerformed());
+}
TEST_F(AsyncRPCTestFixture, SendTxnCommandWithoutTxnRouterAppendsNoTxnFields) {
ShardId shardId("shard");
diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h
index 511b53a56e7..93e072b6ec2 100644
--- a/src/mongo/executor/hedged_async_rpc.h
+++ b/src/mongo/executor/hedged_async_rpc.h
@@ -115,22 +115,23 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
std::shared_ptr<executor::TaskExecutor> exec,
CancellationToken token,
std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(),
- ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly)) {
+ ReadPreferenceSetting readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ BatonHandle baton = nullptr) {
using SingleResponse = AsyncRPCResponse<typename CommandType::Reply>;
// Set up cancellation token to cancel remaining hedged operations.
CancellationSource hedgeCancellationToken{token};
auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>();
+ auto proxyExec = std::make_shared<detail::ProxyingExecutor>(baton, exec);
auto tryBody = [=, targeter = std::move(targeter)] {
return targeter->resolve(token)
- .thenRunOn(exec)
+ .thenRunOn(proxyExec)
.onError([](Status status) -> StatusWith<std::vector<HostAndPort>> {
// Targeting error; rewrite it to a RemoteCommandExecutionError and skip
// command execution body. We'll retry if the policy indicates to.
return Status{AsyncRPCErrorInfo(status), status.reason()};
})
- .then([cmd, opCtx, exec, token, hedgeCancellationToken, readPref, targetsAttempted](
- std::vector<HostAndPort> targets) {
+ .then([=](std::vector<HostAndPort> targets) {
invariant(targets.size(),
"Successful targeting implies there are hosts to target.");
*targetsAttempted = targets;
@@ -146,8 +147,9 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
std::unique_ptr<Targeter> t = std::make_unique<FixedTargeter>(targets[i]);
auto options = std::make_shared<AsyncRPCOptions<CommandType>>(
cmd, exec, hedgeCancellationToken.token());
- requests.emplace_back(
- sendCommand(options, opCtx, std::move(t)).thenRunOn(exec));
+ options->baton = baton;
+ requests.push_back(
+ sendCommand(options, opCtx, std::move(t)).thenRunOn(proxyExec));
}
/**
@@ -189,7 +191,7 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
!retryPolicy->recordAndEvaluateRetry(swResponse.getStatus());
})
.withBackoffBetweenIterations(detail::RetryDelayAsBackoff(retryPolicy.get()))
- .on(exec, CancellationToken::uncancelable())
+ .on(proxyExec, CancellationToken::uncancelable())
// We go inline here to intercept executor-shutdown errors and re-write them
// so that the API always returns RemoteCommandExecutionError. Additionally,
// we need to make sure we cancel outstanding requests.
diff --git a/src/mongo/executor/hedged_async_rpc_test.cpp b/src/mongo/executor/hedged_async_rpc_test.cpp
index 95140e0259f..64839b1a7f5 100644
--- a/src/mongo/executor/hedged_async_rpc_test.cpp
+++ b/src/mongo/executor/hedged_async_rpc_test.cpp
@@ -103,7 +103,8 @@ public:
SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommandWithHosts(
CommandType cmd,
std::vector<HostAndPort> hosts,
- std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) {
+ std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>(),
+ BatonHandle bh = nullptr) {
// Use a readPreference that's elgible for hedging.
ReadPreferenceSetting readPref(ReadPreference::Nearest);
readPref.hedgingMode = HedgingMode();
@@ -117,14 +118,14 @@ public:
std::unique_ptr<Targeter> targeter =
std::make_unique<AsyncRemoteCommandTargeterAdapter>(readPref, t);
- _opCtx = makeOperationContext();
return sendHedgedCommand(cmd,
_opCtx.get(),
std::move(targeter),
getExecutorPtr(),
CancellationToken::uncancelable(),
retryPolicy,
- readPref);
+ readPref,
+ bh);
}
const NamespaceString testNS = NamespaceString("testdb", "testcoll");
@@ -146,10 +147,13 @@ public:
const RemoteCommandResponse testAlternateIgnorableErrorResponse{
createErrorResponse(ignorableNetworkTimeoutStatus), Milliseconds(1)};
+ auto getOpCtx() {
+ return _opCtx.get();
+ }
+
private:
- // This OpCtx is used by sendHedgedCommandWithHosts and is initialized when the function is
- // first invoked and destroyed during fixture destruction.
- ServiceContext::UniqueOperationContext _opCtx;
+ // This OpCtx is used by sendHedgedCommandWithHosts and is destroyed during fixture destruction.
+ ServiceContext::UniqueOperationContext _opCtx{makeOperationContext()};
};
/**
@@ -246,12 +250,11 @@ TEST_F(HedgedAsyncRPCTest, HedgedAsyncRPCWithRetryPolicy) {
TEST_F(HedgedAsyncRPCTest, FailedTargeting) {
HelloCommand helloCmd;
initializeCommand(helloCmd);
- auto opCtx = makeOperationContext();
auto targeterFailStatus = Status{ErrorCodes::InternalError, "Fake targeter failure"};
auto targeter = std::make_unique<FailingTargeter>(targeterFailStatus);
auto resultFuture = sendHedgedCommand(helloCmd,
- opCtx.get(),
+ getOpCtx(),
std::move(targeter),
getExecutorPtr(),
CancellationToken::uncancelable());
@@ -819,9 +822,8 @@ TEST_F(HedgedAsyncRPCTest, NoAttemptedTargetIfTargetingFails) {
auto targeter = std::make_unique<FailingTargeter>(resolveErr);
- auto opCtxHolder = makeOperationContext();
auto resultFuture = sendHedgedCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ helloCmd, getOpCtx(), std::move(targeter), getExecutorPtr(), _cancellationToken);
auto error = resultFuture.getNoThrow().getStatus();
ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
@@ -868,6 +870,76 @@ TEST_F(HedgedAsyncRPCTest, RemoteErrorAttemptedTargetsContainActual) {
ASSERT_THAT(remoteErr.getTargetUsed(), eitherHostMatcher);
}
+TEST_F(HedgedAsyncRPCTest, BatonTest) {
+ std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>();
+ auto resultFuture =
+ sendHedgedCommandWithHosts(testFindCmd, kTwoHosts, retryPolicy, getOpCtx()->getBaton());
+
+ Notification<void> seenNetworkRequests;
+ // This thread will respond to the requests we sent via sendHedgedCommandWithHosts above.
+ stdx::thread networkResponder([&] {
+ auto network = getNetworkInterfaceMock();
+ network->enterNetwork();
+ auto now = network->now();
+ performAuthoritativeHedgeBehavior(
+ network,
+ [&](NetworkInterfaceMock::NetworkOperationIterator authoritative,
+ NetworkInterfaceMock::NetworkOperationIterator hedged) {
+ network->scheduleResponse(hedged, now, testIgnorableErrorResponse);
+ network->scheduleSuccessfulResponse(authoritative, now, testSuccessResponse);
+ seenNetworkRequests.set();
+ });
+ network->runReadyNetworkOperations();
+ network->exitNetwork();
+ });
+ // Wait on the opCtx until networkResponder has observed the network requests.
+ // While we block on the opCtx, the current thread should run jobs scheduled
+ // on the baton, including enqueuing the network requests via `sendHedgedCommand` above.
+ seenNetworkRequests.get(getOpCtx());
+
+ networkResponder.join();
+ // Wait on the opCtx again to allow the current thread, via the baton, to propogate
+ // the network responses up into the resultFuture.
+ AsyncRPCResponse res = resultFuture.get(getOpCtx());
+
+ ASSERT_EQ(res.response.getCursor()->getNs(), testNS);
+ ASSERT_BSONOBJ_EQ(res.response.getCursor()->getFirstBatch()[0], testFirstBatch);
+ namespace m = unittest::match;
+ ASSERT_THAT(res.targetUsed, m::AnyOf(m::Eq(kTwoHosts[0]), m::Eq(kTwoHosts[1])));
+}
+TEST_F(HedgedAsyncRPCTest, BatonShutdownExecutorAlive) {
+ auto retryPolicy = std::make_shared<TestRetryPolicy>();
+ const auto maxNumRetries = 5;
+ const auto retryDelay = Milliseconds(10);
+ retryPolicy->setMaxNumRetries(maxNumRetries);
+ for (int i = 0; i < maxNumRetries; ++i)
+ retryPolicy->pushRetryDelay(retryDelay);
+ auto subBaton = getOpCtx()->getBaton()->makeSubBaton();
+ auto resultFuture = sendHedgedCommandWithHosts(testFindCmd, kTwoHosts, retryPolicy, *subBaton);
+
+ subBaton.shutdown();
+ auto net = getNetworkInterfaceMock();
+ for (auto i = 0; i <= maxNumRetries; i++) {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ net->advanceTime(net->now() + retryDelay);
+ }
+
+ auto error = resultFuture.getNoThrow().getStatus();
+ auto expectedDetachError = Status(ErrorCodes::ShutdownInProgress, "Baton detached");
+ auto expectedOuterReason = "Remote command execution failed due to executor shutdown";
+
+ ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
+ ASSERT_EQ(error.reason(), expectedOuterReason);
+
+ auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
+ ASSERT(extraInfo);
+
+ ASSERT(extraInfo->isLocal());
+ auto localError = extraInfo->asLocal();
+ ASSERT_EQ(localError, expectedDetachError);
+
+ ASSERT_EQ(0, retryPolicy->getNumRetriesPerformed());
+}
} // namespace
} // namespace async_rpc
} // namespace mongo
diff --git a/src/mongo/executor/mock_async_rpc.h b/src/mongo/executor/mock_async_rpc.h
index eaa4dba2fb9..1f6b34e3078 100644
--- a/src/mongo/executor/mock_async_rpc.h
+++ b/src/mongo/executor/mock_async_rpc.h
@@ -100,7 +100,8 @@ public:
Targeter* targeter,
OperationContext* opCtx,
std::shared_ptr<TaskExecutor> exec,
- CancellationToken token) final {
+ CancellationToken token,
+ BatonHandle) final {
auto [p, f] = makePromiseFuture<BSONObj>();
auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>();
return targeter->resolve(token)
@@ -212,7 +213,8 @@ public:
Targeter* targeter,
OperationContext* opCtx,
std::shared_ptr<TaskExecutor> exec,
- CancellationToken token) final {
+ CancellationToken token,
+ BatonHandle) final {
auto [p, f] = makePromiseFuture<BSONObj>();
auto targetsAttempted = std::make_shared<std::vector<HostAndPort>>();
return targeter->resolve(token).thenRunOn(exec).then([this,