summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2022-12-14 16:28:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-14 18:10:23 +0000
commitdd2e04b11c90385c4c06fe71ed87164e0a9cd712 (patch)
tree8e55da464bd9d15fc623fc3c6182ba639aa52991
parenta4b793f1118e775dd442c33d1bf5d5acc4437ee9 (diff)
downloadmongo-dd2e04b11c90385c4c06fe71ed87164e0a9cd712.tar.gz
SERVER-70193 Add sendTxnCommand to async rpc API
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp13
-rw-r--r--src/mongo/executor/SConscript4
-rw-r--r--src/mongo/executor/async_rpc.h118
-rw-r--r--src/mongo/executor/async_rpc_test.cpp273
-rw-r--r--src/mongo/executor/async_rpc_test_fixture.h49
-rw-r--r--src/mongo/executor/hedged_async_rpc.h5
-rw-r--r--src/mongo/executor/mock_async_rpc_test.cpp9
-rw-r--r--src/mongo/executor/task_executor_cursor_test.cpp4
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp2
-rw-r--r--src/mongo/s/async_rpc_shard_targeter.h6
10 files changed, 387 insertions, 96 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 3a4505280a3..15e1cb081e3 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -94,6 +94,9 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn
const int kMaxRecipientKeyDocsFindAttempts = 10;
+using RecipientForgetMigrationRPCOptions = async_rpc::AsyncRPCOptions<RecipientForgetMigration>;
+using RecipientSyncDataRPCOptions = async_rpc::AsyncRPCOptions<RecipientSyncData>;
+
/**
* Encapsulates the retry logic for sending the ForgetMigration command.
*/
@@ -804,8 +807,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
kPrimaryOnlyReadPreference, recipientTargeterRS);
auto retryPolicy =
std::make_shared<RecipientSyncDataRetryPolicy>(getProtocol(), kExponentialBackoff);
- auto cmdRes = async_rpc::sendCommand(
- request, _serviceContext, std::move(asyncTargeter), **exec, token, retryPolicy);
+ auto options =
+ std::make_shared<RecipientSyncDataRPCOptions>(request, **exec, token, retryPolicy);
+ auto cmdRes = async_rpc::sendCommand(options, _serviceContext, std::move(asyncTargeter));
return std::move(cmdRes).ignoreValue().onError([](Status status) {
return async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status).addContext(
"Tenant migration recipient command failed");
@@ -836,8 +840,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
auto asyncTargeter = std::make_unique<async_rpc::AsyncRemoteCommandTargeterAdapter>(
kPrimaryOnlyReadPreference, recipientTargeterRS);
auto retryPolicy = std::make_shared<RecipientForgetMigrationRetryPolicy>(kExponentialBackoff);
- auto cmdRes = async_rpc::sendCommand(
- request, _serviceContext, std::move(asyncTargeter), **exec, token, retryPolicy);
+ auto options =
+ std::make_shared<RecipientForgetMigrationRPCOptions>(request, **exec, token, retryPolicy);
+ auto cmdRes = async_rpc::sendCommand(options, _serviceContext, std::move(asyncTargeter));
return std::move(cmdRes).ignoreValue().onError([](Status status) {
return async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status).addContext(
"Tenant migration recipient command failed");
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 7140c8c8c5b..79451b9df25 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -349,10 +349,14 @@ env.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/client/remote_command_targeter',
'$BUILD_DIR/mongo/client/remote_command_targeter_mock',
+ '$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
+ '$BUILD_DIR/mongo/db/commands/standalone',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/repl/hello_command',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
'$BUILD_DIR/mongo/s/mongos_server_parameters',
+ "$BUILD_DIR/mongo/s/sharding_router_test_fixture",
'async_rpc',
'connection_pool_executor',
'egress_tag_closer_manager',
diff --git a/src/mongo/executor/async_rpc.h b/src/mongo/executor/async_rpc.h
index 86388866c1b..3c4c6cf05ec 100644
--- a/src/mongo/executor/async_rpc.h
+++ b/src/mongo/executor/async_rpc.h
@@ -40,6 +40,8 @@
#include "mongo/executor/task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/async_rpc_shard_targeter.h"
+#include "mongo/s/transaction_router.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/cancellation.h"
#include "mongo/util/future.h"
@@ -79,6 +81,19 @@ struct AsyncRPCResponse<void> {
HostAndPort targetUsed;
};
+template <typename CommandType>
+struct AsyncRPCOptions {
+ AsyncRPCOptions(CommandType cmd,
+ std::shared_ptr<executor::TaskExecutor> exec,
+ CancellationToken token,
+ std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>())
+ : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy} {}
+ CommandType cmd;
+ std::shared_ptr<executor::TaskExecutor> exec;
+ CancellationToken token;
+ std::shared_ptr<RetryPolicy> retryPolicy;
+};
+
/**
* Details used internally by the API. Users of the API can skip the code in this namespace
* and proceed to the `sendCommand(...)` functions below.
@@ -141,36 +156,36 @@ struct RetryDelayAsBackoff {
template <typename CommandType>
ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRunner(
- CommandType cmd,
+ BSONObj cmdBSON,
+ std::shared_ptr<AsyncRPCOptions<CommandType>> options,
detail::AsyncRPCRunner* runner,
OperationContext* opCtx,
- std::unique_ptr<Targeter> targeter,
- std::shared_ptr<executor::TaskExecutor> exec,
- CancellationToken token,
- std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) {
+ std::unique_ptr<Targeter> targeter) {
using ReplyType = AsyncRPCResponse<typename CommandType::Reply>;
auto tryBody = [=, targeter = std::move(targeter)] {
// Execute the command after extracting the db name and bson from the CommandType.
// Wrapping this function allows us to separate the CommandType parsing logic from the
// implementation details of executing the remote command asynchronously.
- return runner->_sendCommand(
- cmd.getDbName().db(), cmd.toBSON({}), targeter.get(), opCtx, exec, token);
+ return runner->_sendCommand(options->cmd.getDbName().db(),
+ cmdBSON,
+ targeter.get(),
+ opCtx,
+ options->exec,
+ options->token);
};
- auto resFuture = AsyncTry<decltype(tryBody)>(std::move(tryBody))
- .until([token, retryPolicy, cmd](
- StatusWith<detail::AsyncRPCInternalResponse> swResponse) {
- bool shouldStopRetry = token.isCanceled() ||
- !retryPolicy->recordAndEvaluateRetry(swResponse.getStatus());
- return shouldStopRetry;
- })
- .withBackoffBetweenIterations(RetryDelayAsBackoff(retryPolicy.get()))
- .on(exec, CancellationToken::uncancelable());
-
+ auto resFuture =
+ AsyncTry<decltype(tryBody)>(std::move(tryBody))
+ .until([options](StatusWith<detail::AsyncRPCInternalResponse> swResponse) {
+ bool shouldStopRetry = options->token.isCanceled() ||
+ !options->retryPolicy->recordAndEvaluateRetry(swResponse.getStatus());
+ return shouldStopRetry;
+ })
+ .withBackoffBetweenIterations(RetryDelayAsBackoff(options->retryPolicy.get()))
+ .on(options->exec, CancellationToken::uncancelable());
return std::move(resFuture)
.then([](detail::AsyncRPCInternalResponse r) -> ReplyType {
auto res = CommandType::Reply::parseSharingOwnership(IDLParserContext("AsyncRPCRunner"),
r.response);
-
return {res, r.targetUsed};
})
.unsafeToInlineFuture()
@@ -192,7 +207,7 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun
return Status{AsyncRPCErrorInfo(s),
"Remote command execution failed due to executor shutdown"};
})
- .thenRunOn(exec);
+ .thenRunOn(options->exec);
}
} // namespace detail
@@ -223,15 +238,12 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun
*/
template <typename CommandType>
ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommand(
- CommandType cmd,
+ std::shared_ptr<AsyncRPCOptions<CommandType>> options,
OperationContext* opCtx,
- std::unique_ptr<Targeter> targeter,
- std::shared_ptr<executor::TaskExecutor> exec,
- CancellationToken token,
- std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) {
+ std::unique_ptr<Targeter> targeter) {
auto runner = detail::AsyncRPCRunner::get(opCtx->getServiceContext());
- return detail::sendCommandWithRunner(
- cmd, runner, opCtx, std::move(targeter), exec, token, retryPolicy);
+ auto cmdBSON = options->cmd.toBSON({});
+ return detail::sendCommandWithRunner(cmdBSON, options, runner, opCtx, std::move(targeter));
}
/**
@@ -241,18 +253,58 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommand(
*/
template <typename CommandType>
ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommand(
- CommandType cmd,
+ std::shared_ptr<AsyncRPCOptions<CommandType>> options,
ServiceContext* const svcCtx,
- std::unique_ptr<Targeter> targeter,
- std::shared_ptr<executor::TaskExecutor> exec,
- CancellationToken token,
- std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) {
+ std::unique_ptr<Targeter> targeter) {
// Execute the command after extracting the db name and bson from the CommandType.
// Wrapping this function allows us to separate the CommandType parsing logic from the
// implementation details of executing the remote command asynchronously.
auto runner = detail::AsyncRPCRunner::get(svcCtx);
- return detail::sendCommandWithRunner(
- cmd, runner, nullptr, std::move(targeter), exec, token, retryPolicy);
+ auto cmdBSON = options->cmd.toBSON({});
+ return detail::sendCommandWithRunner(cmdBSON, options, runner, nullptr, std::move(targeter));
+}
+
+/**
+ * This function operates the same to `sendCommand` above, but will attach transaction metadata
+ * from the opCtx to the command BSONObject metadata before sending to the targeted shardId.
+ */
+template <typename CommandType>
+ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendTxnCommand(
+ std::shared_ptr<AsyncRPCOptions<CommandType>> options,
+ OperationContext* opCtx,
+ std::unique_ptr<ShardIdTargeter> targeter) {
+ using ReplyType = AsyncRPCResponse<typename CommandType::Reply>;
+ // Execute the command after extracting the db name and bson from the CommandType.
+ // Wrapping this function allows us to separate the CommandType parsing logic from the
+ // implementation details of executing the remote command asynchronously.
+ auto runner = detail::AsyncRPCRunner::get(opCtx->getServiceContext());
+ auto cmdBSON = options->cmd.toBSON({});
+ const auto shardId = targeter->getShardId();
+ if (auto txnRouter = TransactionRouter::get(opCtx); txnRouter) {
+ cmdBSON = txnRouter.attachTxnFieldsIfNeeded(opCtx, targeter->getShardId(), cmdBSON);
+ }
+ return detail::sendCommandWithRunner(cmdBSON, options, runner, opCtx, std::move(targeter))
+ .onCompletion([opCtx, shardId](StatusWith<ReplyType> swResponse) -> StatusWith<ReplyType> {
+ auto txnRouter = TransactionRouter::get(opCtx);
+ if (!txnRouter) {
+ return swResponse;
+ }
+ if (swResponse.isOK()) {
+ // TODO (SERVER-72082): Make sure TxnResponseMetadata is appended to the BSON
+ // that we are passing into 'processParticipantResponse'.
+ txnRouter.processParticipantResponse(
+ opCtx, shardId, swResponse.getValue().response.toBSON());
+ } else {
+ auto extraInfo = swResponse.getStatus().template extraInfo<AsyncRPCErrorInfo>();
+ if (extraInfo->isRemote()) {
+ auto remoteError = extraInfo->asRemote();
+ txnRouter.processParticipantResponse(
+ opCtx, shardId, remoteError.getResponseObj());
+ }
+ }
+ return swResponse;
+ })
+ .thenRunOn(options->exec);
}
} // namespace mongo::async_rpc
#undef MONGO_LOGV2_DEFAULT_COMPONENT
diff --git a/src/mongo/executor/async_rpc_test.cpp b/src/mongo/executor/async_rpc_test.cpp
index 42dc468c278..62824a15cae 100644
--- a/src/mongo/executor/async_rpc_test.cpp
+++ b/src/mongo/executor/async_rpc_test.cpp
@@ -50,7 +50,16 @@
#include "mongo/util/net/hostandport.h"
#include <memory>
+#include "mongo/logv2/log.h"
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
+
namespace mongo {
+const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
+const std::vector<ShardId> kTestShardIds = {
+ ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
+const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345),
+ HostAndPort("FakeShard2Host", 12345),
+ HostAndPort("FakeShard3Host", 12345)};
namespace async_rpc {
namespace {
/*
@@ -63,8 +72,10 @@ TEST_F(AsyncRPCTestFixture, SuccessfulHello) {
initializeCommand(helloCmd);
auto opCtxHolder = makeOperationContext();
- ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture =
+ sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -97,13 +108,10 @@ TEST_F(AsyncRPCTestFixture, RetryOnSuccessfulHelloAdditionalAttempts) {
testPolicy->pushRetryDelay(retryDelay);
auto opCtxHolder = makeOperationContext();
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken, testPolicy);
ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture =
- sendCommand(helloCmd,
- opCtxHolder.get(),
- std::move(targeter),
- getExecutorPtr(),
- _cancellationToken,
- testPolicy);
+ sendCommand(options, opCtxHolder.get(), std::move(targeter));
const auto onCommandFunc = [&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -142,13 +150,10 @@ TEST_F(AsyncRPCTestFixture, DynamicDelayBetweenRetries) {
testPolicy->pushRetryDelay(retryDelays[2]);
auto opCtxHolder = makeOperationContext();
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken, testPolicy);
ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture =
- sendCommand(helloCmd,
- opCtxHolder.get(),
- std::move(targeter),
- getExecutorPtr(),
- _cancellationToken,
- testPolicy);
+ sendCommand(options, opCtxHolder.get(), std::move(targeter));
const auto onCommandFunc = [&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -185,13 +190,10 @@ TEST_F(AsyncRPCTestFixture, DoNotRetryOnErrorAccordingToPolicy) {
testPolicy->setMaxNumRetries(zeroRetries);
auto opCtxHolder = makeOperationContext();
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken, testPolicy);
ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture =
- sendCommand(helloCmd,
- opCtxHolder.get(),
- std::move(targeter),
- getExecutorPtr(),
- _cancellationToken,
- testPolicy);
+ sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -215,8 +217,9 @@ TEST_F(AsyncRPCTestFixture, LocalError) {
initializeCommand(helloCmd);
auto opCtxHolder = makeOperationContext();
- auto resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -246,8 +249,9 @@ TEST_F(AsyncRPCTestFixture, RemoteError) {
initializeCommand(helloCmd);
auto opCtxHolder = makeOperationContext();
- auto resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -277,11 +281,16 @@ TEST_F(AsyncRPCTestFixture, SuccessfulFind) {
NamespaceString nss(testDbName);
FindCommandRequest findCmd(nss);
- auto resultFuture = sendCommand(
- findCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ findCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["find"]);
+ ASSERT(!request.cmdObj["startTransaction"]);
+ ASSERT(!request.cmdObj["coordinator"]);
+ ASSERT(!request.cmdObj["autocommit"]);
+ ASSERT(!request.cmdObj["txnNumber"]);
// The BSON documents in this cursor response are created here.
// When async_rpc::sendCommand parses the response, it participates
// in ownership of the underlying data, so it will participate in
@@ -309,8 +318,10 @@ TEST_F(AsyncRPCTestFixture, WriteConcernError) {
BSON("ok" << 1 << "writeConcernError" << writeConcernError);
auto opCtxHolder = makeOperationContext();
- ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture =
+ sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -348,8 +359,10 @@ TEST_F(AsyncRPCTestFixture, WriteError) {
<< "Document failed validation");
const BSONObj resWithWriteError = BSON("ok" << 1 << "writeErrors" << BSON_ARRAY(writeError));
auto opCtxHolder = makeOperationContext();
- ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture =
+ sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -383,8 +396,9 @@ TEST_F(AsyncRPCTestFixture, ExecutorShutdown) {
HelloCommand helloCmd;
initializeCommand(helloCmd);
auto opCtxHolder = makeOperationContext();
- auto resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
getExecutorPtr()->shutdown();
auto error = resultFuture.getNoThrow().getStatus();
// The error returned by our API should always be RemoteCommandExecutionError
@@ -439,8 +453,9 @@ TEST_F(AsyncRPCTestFixture, ParseAndSeralizeNoop) {
initializeCommand(helloCmd);
auto opCtxHolder = makeOperationContext();
- auto resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -477,9 +492,9 @@ TEST_F(AsyncRPCTestFixture, FailedTargeting) {
HelloCommand helloCmd;
initializeCommand(helloCmd);
auto opCtxHolder = makeOperationContext();
-
- auto resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
auto error = resultFuture.getNoThrow().getStatus();
// The error returned by our API should always be RemoteCommandExecutionError
@@ -493,6 +508,175 @@ TEST_F(AsyncRPCTestFixture, FailedTargeting) {
ASSERT_EQ(extraInfo->asLocal(), targeterFailStatus);
}
+TEST_F(AsyncRPCTestFixture, SendTxnCommandWithoutTxnRouterAppendsNoTxnFields) {
+ ShardId shardId("shard");
+ ReadPreferenceSetting readPref;
+ std::vector<HostAndPort> testHost = {kTestShardHosts[0]};
+ // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard.
+ auto opCtxHolder = makeOperationContext();
+ auto targeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, opCtxHolder.get(), readPref, getExecutorPtr(), testHost);
+ DatabaseName testDbName = DatabaseName("testdb", boost::none);
+ NamespaceString nss(testDbName);
+
+ FindCommandRequest findCmd(nss);
+ auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ findCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendTxnCommand(options, opCtxHolder.get(), std::move(targeter));
+
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["find"]);
+ ASSERT(!request.cmdObj["startTransaction"]);
+ ASSERT(!request.cmdObj["coordinator"]);
+ ASSERT(!request.cmdObj["autocommit"]);
+ ASSERT(!request.cmdObj["txnNumber"]);
+ // The BSON documents in this cursor response are created here.
+ // When async_rpc::sendCommand parses the response, it participates
+ // in ownership of the underlying data, so it will participate in
+ // owning the data in the cursor response.
+ return CursorResponse(nss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ CursorInitialReply res = std::move(resultFuture).get().response;
+ ASSERT_BSONOBJ_EQ(res.getCursor()->getFirstBatch()[0], BSON("x" << 1));
+}
+
+TEST_F(AsyncRPCTxnTestFixture, MultipleSendTxnCommand) {
+ ShardId shardId("shard");
+ ReadPreferenceSetting readPref;
+ std::vector<HostAndPort> testHost = {kTestShardHosts[0]};
+ // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard.
+ auto targeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, getOpCtx(), readPref, getExecutorPtr(), testHost);
+ DatabaseName testDbName = DatabaseName("testdb", boost::none);
+ NamespaceString nss(testDbName);
+
+ // Set up the transaction metadata.
+ TxnNumber txnNum{3};
+ getOpCtx()->setTxnNumber(txnNum);
+ auto txnRouter = TransactionRouter::get(getOpCtx());
+ txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(getOpCtx());
+
+ FindCommandRequest findCmd(nss);
+ auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ findCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter));
+
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["find"]);
+ ASSERT(request.cmdObj["startTransaction"].Bool());
+ ASSERT(request.cmdObj["coordinator"].Bool());
+ ASSERT(!request.cmdObj["autocommit"].Bool());
+ ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL);
+ // The BSON documents in this cursor response are created here.
+ // When async_rpc::sendCommand parses the response, it participates
+ // in ownership of the underlying data, so it will participate in
+ // owning the data in the cursor response.
+ return CursorResponse(nss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ CursorInitialReply res = std::move(resultFuture).get().response;
+ ASSERT_BSONOBJ_EQ(res.getCursor()->getFirstBatch()[0], BSON("x" << 1));
+
+ // // Issue a follow-up find command in the same transaction.
+ FindCommandRequest secondFindCmd(nss);
+ auto secondCmdOptions = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ secondFindCmd, getExecutorPtr(), _cancellationToken);
+ auto secondTargeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, getOpCtx(), readPref, getExecutorPtr(), testHost);
+ auto secondResultFuture =
+ sendTxnCommand(secondCmdOptions, getOpCtx(), std::move(secondTargeter));
+
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["find"]);
+ ASSERT(!request.cmdObj["startTransaction"]);
+ ASSERT(request.cmdObj["coordinator"].Bool());
+ ASSERT(!request.cmdObj["autocommit"].Bool());
+ ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL);
+ return CursorResponse(nss, 0LL, {BSON("x" << 2)})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ CursorInitialReply secondRes = std::move(secondResultFuture).get().response;
+ ASSERT_BSONOBJ_EQ(secondRes.getCursor()->getFirstBatch()[0], BSON("x" << 2));
+}
+
+TEST_F(AsyncRPCTxnTestFixture, SendTxnCommandReturnsRemoteError) {
+ ShardId shardId("shard");
+ ReadPreferenceSetting readPref;
+ std::vector<HostAndPort> testHost = {kTestShardHosts[0]};
+ // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard.
+ auto targeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, getOpCtx(), readPref, getExecutorPtr(), testHost);
+ DatabaseName testDbName = DatabaseName("testdb", boost::none);
+ NamespaceString nss(testDbName);
+
+ // Set up the transaction metadata.
+ TxnNumber txnNum{3};
+ getOpCtx()->setTxnNumber(txnNum);
+ auto txnRouter = TransactionRouter::get(getOpCtx());
+ txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(getOpCtx());
+
+ FindCommandRequest findCmd(nss);
+ auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ findCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter));
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["find"]);
+ ASSERT(request.cmdObj["startTransaction"].Bool());
+ ASSERT(request.cmdObj["coordinator"].Bool());
+ ASSERT(!request.cmdObj["autocommit"].Bool());
+ ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL);
+ return createErrorResponse(Status(ErrorCodes::BadValue, "mock"));
+ });
+
+ auto error = resultFuture.getNoThrow().getStatus();
+ ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
+ auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
+ ASSERT(extraInfo);
+ ASSERT(extraInfo->isRemote());
+}
+
+TEST_F(AsyncRPCTxnTestFixture, SendTxnCommandReturnsLocalError) {
+ ShardId shardId("shard");
+ ReadPreferenceSetting readPref;
+ std::vector<HostAndPort> testHost = {kTestShardHosts[0]};
+ // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard.
+ auto targeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, getOpCtx(), readPref, getExecutorPtr(), testHost);
+ DatabaseName testDbName = DatabaseName("testdb", boost::none);
+ NamespaceString nss(testDbName);
+
+ // Set up the transaction metadata.
+ TxnNumber txnNum{3};
+ getOpCtx()->setTxnNumber(txnNum);
+ auto txnRouter = TransactionRouter::get(getOpCtx());
+ txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(getOpCtx());
+
+ FindCommandRequest findCmd(nss);
+ auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ findCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter));
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["find"]);
+ ASSERT(request.cmdObj["startTransaction"].Bool());
+ ASSERT(request.cmdObj["coordinator"].Bool());
+ ASSERT(!request.cmdObj["autocommit"].Bool());
+ ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL);
+ return Status(ErrorCodes::NetworkTimeout, "mock");
+ });
+
+ auto error = resultFuture.getNoThrow().getStatus();
+ ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
+ auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>();
+ ASSERT(extraInfo);
+ ASSERT(extraInfo->isLocal());
+}
TEST_F(AsyncRPCTestFixture, AttemptedTargetCorrectlyPropogatedWithLocalError) {
HelloCommand helloCmd;
@@ -501,8 +685,9 @@ TEST_F(AsyncRPCTestFixture, AttemptedTargetCorrectlyPropogatedWithLocalError) {
auto targeter = std::make_unique<FixedTargeter>(target);
auto opCtxHolder = makeOperationContext();
- auto resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
@@ -528,8 +713,9 @@ TEST_F(AsyncRPCTestFixture, NoAttemptedTargetIfTargetingFails) {
auto opCtxHolder = makeOperationContext();
- auto resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
auto error = resultFuture.getNoThrow().getStatus();
ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError);
@@ -549,8 +735,9 @@ TEST_F(AsyncRPCTestFixture, RemoteErrorAttemptedTargetMatchesActual) {
auto opCtxHolder = makeOperationContext();
- auto resultFuture = sendCommand(
- helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ helloCmd, getExecutorPtr(), _cancellationToken);
+ auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter));
onCommand([&](const auto& request) {
ASSERT(request.cmdObj["hello"]);
diff --git a/src/mongo/executor/async_rpc_test_fixture.h b/src/mongo/executor/async_rpc_test_fixture.h
index 58e79f11a92..f1398bcf466 100644
--- a/src/mongo/executor/async_rpc_test_fixture.h
+++ b/src/mongo/executor/async_rpc_test_fixture.h
@@ -41,6 +41,7 @@
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/rpc/topology_version_gen.h"
+#include "mongo/s/session_catalog_router.h"
#include "mongo/unittest/bson_test_util.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/duration.h"
@@ -91,7 +92,7 @@ private:
std::deque<Milliseconds> _retryDelays;
};
-class AsyncRPCTestFixture : public LockerNoopServiceContextTest {
+class AsyncRPCTestFixture : public ServiceContextTest {
public:
void setUp() override {
ServiceContextTest::setUp();
@@ -188,4 +189,50 @@ public:
Status _status;
};
+
+class ShardIdTargeterForTest : public ShardIdTargeter {
+public:
+ ShardIdTargeterForTest(ShardId shardId,
+ OperationContext* opCtx,
+ ReadPreferenceSetting readPref,
+ ExecutorPtr executor,
+ std::vector<HostAndPort> resolvedHosts)
+ : ShardIdTargeter(shardId, opCtx, readPref, executor) {
+ _resolvedHosts = resolvedHosts;
+ };
+
+ SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) override final {
+ return SemiFuture<std::vector<HostAndPort>>::makeReady(_resolvedHosts);
+ }
+
+private:
+ std::vector<HostAndPort> _resolvedHosts;
+};
+
+class AsyncRPCTxnTestFixture : public AsyncRPCTestFixture {
+public:
+ void setUp() override {
+ AsyncRPCTestFixture::setUp();
+ _opCtxHolder = makeOperationContext();
+ getOpCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
+ _routerOpCtxSession.emplace(getOpCtx());
+ }
+
+ OperationContext* getOpCtx() {
+ return _opCtxHolder.get();
+ }
+
+ void tearDown() override {
+ AsyncRPCTestFixture::tearDown();
+ }
+
+ const LogicalSessionId& getSessionId() {
+ return *getOpCtx()->getLogicalSessionId();
+ }
+
+private:
+ ServiceContext::UniqueOperationContext _opCtxHolder;
+ boost::optional<mongo::RouterOperationContextSession> _routerOpCtxSession;
+};
+
} // namespace mongo::async_rpc
diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h
index 655c8cd874c..511b53a56e7 100644
--- a/src/mongo/executor/hedged_async_rpc.h
+++ b/src/mongo/executor/hedged_async_rpc.h
@@ -144,9 +144,10 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand(
for (size_t i = 0; i < hostsToTarget; i++) {
std::unique_ptr<Targeter> t = std::make_unique<FixedTargeter>(targets[i]);
+ auto options = std::make_shared<AsyncRPCOptions<CommandType>>(
+ cmd, exec, hedgeCancellationToken.token());
requests.emplace_back(
- sendCommand(cmd, opCtx, std::move(t), exec, hedgeCancellationToken.token())
- .thenRunOn(exec));
+ sendCommand(options, opCtx, std::move(t)).thenRunOn(exec));
}
/**
diff --git a/src/mongo/executor/mock_async_rpc_test.cpp b/src/mongo/executor/mock_async_rpc_test.cpp
index ff422301e01..1ef18fbd357 100644
--- a/src/mongo/executor/mock_async_rpc_test.cpp
+++ b/src/mongo/executor/mock_async_rpc_test.cpp
@@ -72,12 +72,9 @@ public:
std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) {
HelloCommand hello;
initializeCommand(hello);
- return sendCommand(hello,
- _opCtx.get(),
- std::make_unique<FixedTargeter>(target),
- getExecutorPtr(),
- _cancellationToken,
- retryPolicy);
+ auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>(
+ hello, getExecutorPtr(), _cancellationToken, retryPolicy);
+ return sendCommand(options, _opCtx.get(), std::make_unique<FixedTargeter>(target));
}
ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> sendHelloCommandToLocalHost() {
diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp
index 966603a517f..982aa527fe4 100644
--- a/src/mongo/executor/task_executor_cursor_test.cpp
+++ b/src/mongo/executor/task_executor_cursor_test.cpp
@@ -49,10 +49,6 @@ namespace {
*/
class TaskExecutorCursorFixture : public ThreadPoolExecutorTest {
public:
- TaskExecutorCursorFixture() {
- serviceCtx->registerClientObserver(std::make_unique<LockerNoopClientObserver>());
- }
-
void setUp() override {
ThreadPoolExecutorTest::setUp();
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index 98a68a837fe..ccd20ecabbc 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -364,7 +364,6 @@ COMMON_EXECUTOR_TEST(EventWaitingWithTimeoutTest) {
auto serviceContext = ServiceContext::make();
- serviceContext->registerClientObserver(std::make_unique<LockerNoopClientObserver>());
serviceContext->setFastClockSource(std::make_unique<ClockSourceMock>());
auto mockClock = static_cast<ClockSourceMock*>(serviceContext->getFastClockSource());
@@ -387,7 +386,6 @@ COMMON_EXECUTOR_TEST(EventSignalWithTimeoutTest) {
auto serviceContext = ServiceContext::make();
- serviceContext->registerClientObserver(std::make_unique<LockerNoopClientObserver>());
serviceContext->setFastClockSource(std::make_unique<ClockSourceMock>());
auto mockClock = static_cast<ClockSourceMock*>(serviceContext->getFastClockSource());
diff --git a/src/mongo/s/async_rpc_shard_targeter.h b/src/mongo/s/async_rpc_shard_targeter.h
index 1dfe6cac767..5fa5a133f2d 100644
--- a/src/mongo/s/async_rpc_shard_targeter.h
+++ b/src/mongo/s/async_rpc_shard_targeter.h
@@ -57,7 +57,7 @@ public:
ExecutorPtr executor)
: _shardId(shardId), _opCtx(opCtx), _readPref(readPref), _executor(executor){};
- SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) override final {
+ SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) override {
return getShard()
.thenRunOn(_executor)
.then([this, t](std::shared_ptr<Shard> shard) {
@@ -82,6 +82,10 @@ public:
return Grid::get(_opCtx)->shardRegistry()->getShard(_executor, _shardId);
}
+ ShardId getShardId() {
+ return _shardId;
+ }
+
private:
ShardId _shardId;
OperationContext* _opCtx;