diff options
author | Jason Chan <jason.chan@mongodb.com> | 2022-12-14 16:28:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-14 18:10:23 +0000 |
commit | dd2e04b11c90385c4c06fe71ed87164e0a9cd712 (patch) | |
tree | 8e55da464bd9d15fc623fc3c6182ba639aa52991 | |
parent | a4b793f1118e775dd442c33d1bf5d5acc4437ee9 (diff) | |
download | mongo-dd2e04b11c90385c4c06fe71ed87164e0a9cd712.tar.gz |
SERVER-70193 Add sendTxnCommand to async rpc API
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 13 | ||||
-rw-r--r-- | src/mongo/executor/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc.h | 118 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc_test.cpp | 273 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc_test_fixture.h | 49 | ||||
-rw-r--r-- | src/mongo/executor/hedged_async_rpc.h | 5 | ||||
-rw-r--r-- | src/mongo/executor/mock_async_rpc_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_common.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/async_rpc_shard_targeter.h | 6 |
10 files changed, 387 insertions, 96 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 3a4505280a3..15e1cb081e3 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -94,6 +94,9 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn const int kMaxRecipientKeyDocsFindAttempts = 10; +using RecipientForgetMigrationRPCOptions = async_rpc::AsyncRPCOptions<RecipientForgetMigration>; +using RecipientSyncDataRPCOptions = async_rpc::AsyncRPCOptions<RecipientSyncData>; + /** * Encapsulates the retry logic for sending the ForgetMigration command. */ @@ -804,8 +807,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa kPrimaryOnlyReadPreference, recipientTargeterRS); auto retryPolicy = std::make_shared<RecipientSyncDataRetryPolicy>(getProtocol(), kExponentialBackoff); - auto cmdRes = async_rpc::sendCommand( - request, _serviceContext, std::move(asyncTargeter), **exec, token, retryPolicy); + auto options = + std::make_shared<RecipientSyncDataRPCOptions>(request, **exec, token, retryPolicy); + auto cmdRes = async_rpc::sendCommand(options, _serviceContext, std::move(asyncTargeter)); return std::move(cmdRes).ignoreValue().onError([](Status status) { return async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status).addContext( "Tenant migration recipient command failed"); @@ -836,8 +840,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget auto asyncTargeter = std::make_unique<async_rpc::AsyncRemoteCommandTargeterAdapter>( kPrimaryOnlyReadPreference, recipientTargeterRS); auto retryPolicy = std::make_shared<RecipientForgetMigrationRetryPolicy>(kExponentialBackoff); - auto cmdRes = async_rpc::sendCommand( - request, _serviceContext, std::move(asyncTargeter), **exec, token, retryPolicy); + auto options = + std::make_shared<RecipientForgetMigrationRPCOptions>(request, **exec, token, retryPolicy); + auto cmdRes = async_rpc::sendCommand(options, _serviceContext, std::move(asyncTargeter)); return std::move(cmdRes).ignoreValue().onError([](Status status) { return async_rpc::unpackRPCStatusIgnoringWriteConcernAndWriteErrors(status).addContext( "Tenant migration recipient command failed"); diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 7140c8c8c5b..79451b9df25 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -349,10 +349,14 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/client/remote_command_targeter', '$BUILD_DIR/mongo/client/remote_command_targeter_mock', + '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', + '$BUILD_DIR/mongo/db/commands/standalone', '$BUILD_DIR/mongo/db/query/command_request_response', '$BUILD_DIR/mongo/db/repl/hello_command', '$BUILD_DIR/mongo/db/service_context_test_fixture', '$BUILD_DIR/mongo/s/mongos_server_parameters', + "$BUILD_DIR/mongo/s/sharding_router_test_fixture", 'async_rpc', 'connection_pool_executor', 'egress_tag_closer_manager', diff --git a/src/mongo/executor/async_rpc.h b/src/mongo/executor/async_rpc.h index 86388866c1b..3c4c6cf05ec 100644 --- a/src/mongo/executor/async_rpc.h +++ b/src/mongo/executor/async_rpc.h @@ -40,6 +40,8 @@ #include "mongo/executor/task_executor.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/async_rpc_shard_targeter.h" +#include "mongo/s/transaction_router.h" #include "mongo/util/assert_util.h" #include "mongo/util/cancellation.h" #include "mongo/util/future.h" @@ -79,6 +81,19 @@ struct AsyncRPCResponse<void> { HostAndPort targetUsed; }; +template <typename CommandType> +struct AsyncRPCOptions { + AsyncRPCOptions(CommandType cmd, + std::shared_ptr<executor::TaskExecutor> exec, + CancellationToken token, + std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) + : cmd{cmd}, exec{exec}, token{token}, retryPolicy{retryPolicy} {} + CommandType cmd; + std::shared_ptr<executor::TaskExecutor> exec; + CancellationToken token; + std::shared_ptr<RetryPolicy> retryPolicy; +}; + /** * Details used internally by the API. Users of the API can skip the code in this namespace * and proceed to the `sendCommand(...)` functions below. @@ -141,36 +156,36 @@ struct RetryDelayAsBackoff { template <typename CommandType> ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRunner( - CommandType cmd, + BSONObj cmdBSON, + std::shared_ptr<AsyncRPCOptions<CommandType>> options, detail::AsyncRPCRunner* runner, OperationContext* opCtx, - std::unique_ptr<Targeter> targeter, - std::shared_ptr<executor::TaskExecutor> exec, - CancellationToken token, - std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) { + std::unique_ptr<Targeter> targeter) { using ReplyType = AsyncRPCResponse<typename CommandType::Reply>; auto tryBody = [=, targeter = std::move(targeter)] { // Execute the command after extracting the db name and bson from the CommandType. // Wrapping this function allows us to separate the CommandType parsing logic from the // implementation details of executing the remote command asynchronously. - return runner->_sendCommand( - cmd.getDbName().db(), cmd.toBSON({}), targeter.get(), opCtx, exec, token); + return runner->_sendCommand(options->cmd.getDbName().db(), + cmdBSON, + targeter.get(), + opCtx, + options->exec, + options->token); }; - auto resFuture = AsyncTry<decltype(tryBody)>(std::move(tryBody)) - .until([token, retryPolicy, cmd]( - StatusWith<detail::AsyncRPCInternalResponse> swResponse) { - bool shouldStopRetry = token.isCanceled() || - !retryPolicy->recordAndEvaluateRetry(swResponse.getStatus()); - return shouldStopRetry; - }) - .withBackoffBetweenIterations(RetryDelayAsBackoff(retryPolicy.get())) - .on(exec, CancellationToken::uncancelable()); - + auto resFuture = + AsyncTry<decltype(tryBody)>(std::move(tryBody)) + .until([options](StatusWith<detail::AsyncRPCInternalResponse> swResponse) { + bool shouldStopRetry = options->token.isCanceled() || + !options->retryPolicy->recordAndEvaluateRetry(swResponse.getStatus()); + return shouldStopRetry; + }) + .withBackoffBetweenIterations(RetryDelayAsBackoff(options->retryPolicy.get())) + .on(options->exec, CancellationToken::uncancelable()); return std::move(resFuture) .then([](detail::AsyncRPCInternalResponse r) -> ReplyType { auto res = CommandType::Reply::parseSharingOwnership(IDLParserContext("AsyncRPCRunner"), r.response); - return {res, r.targetUsed}; }) .unsafeToInlineFuture() @@ -192,7 +207,7 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun return Status{AsyncRPCErrorInfo(s), "Remote command execution failed due to executor shutdown"}; }) - .thenRunOn(exec); + .thenRunOn(options->exec); } } // namespace detail @@ -223,15 +238,12 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommandWithRun */ template <typename CommandType> ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommand( - CommandType cmd, + std::shared_ptr<AsyncRPCOptions<CommandType>> options, OperationContext* opCtx, - std::unique_ptr<Targeter> targeter, - std::shared_ptr<executor::TaskExecutor> exec, - CancellationToken token, - std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) { + std::unique_ptr<Targeter> targeter) { auto runner = detail::AsyncRPCRunner::get(opCtx->getServiceContext()); - return detail::sendCommandWithRunner( - cmd, runner, opCtx, std::move(targeter), exec, token, retryPolicy); + auto cmdBSON = options->cmd.toBSON({}); + return detail::sendCommandWithRunner(cmdBSON, options, runner, opCtx, std::move(targeter)); } /** @@ -241,18 +253,58 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommand( */ template <typename CommandType> ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendCommand( - CommandType cmd, + std::shared_ptr<AsyncRPCOptions<CommandType>> options, ServiceContext* const svcCtx, - std::unique_ptr<Targeter> targeter, - std::shared_ptr<executor::TaskExecutor> exec, - CancellationToken token, - std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) { + std::unique_ptr<Targeter> targeter) { // Execute the command after extracting the db name and bson from the CommandType. // Wrapping this function allows us to separate the CommandType parsing logic from the // implementation details of executing the remote command asynchronously. auto runner = detail::AsyncRPCRunner::get(svcCtx); - return detail::sendCommandWithRunner( - cmd, runner, nullptr, std::move(targeter), exec, token, retryPolicy); + auto cmdBSON = options->cmd.toBSON({}); + return detail::sendCommandWithRunner(cmdBSON, options, runner, nullptr, std::move(targeter)); +} + +/** + * This function operates the same to `sendCommand` above, but will attach transaction metadata + * from the opCtx to the command BSONObject metadata before sending to the targeted shardId. + */ +template <typename CommandType> +ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendTxnCommand( + std::shared_ptr<AsyncRPCOptions<CommandType>> options, + OperationContext* opCtx, + std::unique_ptr<ShardIdTargeter> targeter) { + using ReplyType = AsyncRPCResponse<typename CommandType::Reply>; + // Execute the command after extracting the db name and bson from the CommandType. + // Wrapping this function allows us to separate the CommandType parsing logic from the + // implementation details of executing the remote command asynchronously. + auto runner = detail::AsyncRPCRunner::get(opCtx->getServiceContext()); + auto cmdBSON = options->cmd.toBSON({}); + const auto shardId = targeter->getShardId(); + if (auto txnRouter = TransactionRouter::get(opCtx); txnRouter) { + cmdBSON = txnRouter.attachTxnFieldsIfNeeded(opCtx, targeter->getShardId(), cmdBSON); + } + return detail::sendCommandWithRunner(cmdBSON, options, runner, opCtx, std::move(targeter)) + .onCompletion([opCtx, shardId](StatusWith<ReplyType> swResponse) -> StatusWith<ReplyType> { + auto txnRouter = TransactionRouter::get(opCtx); + if (!txnRouter) { + return swResponse; + } + if (swResponse.isOK()) { + // TODO (SERVER-72082): Make sure TxnResponseMetadata is appended to the BSON + // that we are passing into 'processParticipantResponse'. + txnRouter.processParticipantResponse( + opCtx, shardId, swResponse.getValue().response.toBSON()); + } else { + auto extraInfo = swResponse.getStatus().template extraInfo<AsyncRPCErrorInfo>(); + if (extraInfo->isRemote()) { + auto remoteError = extraInfo->asRemote(); + txnRouter.processParticipantResponse( + opCtx, shardId, remoteError.getResponseObj()); + } + } + return swResponse; + }) + .thenRunOn(options->exec); } } // namespace mongo::async_rpc #undef MONGO_LOGV2_DEFAULT_COMPONENT diff --git a/src/mongo/executor/async_rpc_test.cpp b/src/mongo/executor/async_rpc_test.cpp index 42dc468c278..62824a15cae 100644 --- a/src/mongo/executor/async_rpc_test.cpp +++ b/src/mongo/executor/async_rpc_test.cpp @@ -50,7 +50,16 @@ #include "mongo/util/net/hostandport.h" #include <memory> +#include "mongo/logv2/log.h" +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork + namespace mongo { +const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); +const std::vector<ShardId> kTestShardIds = { + ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; +const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345), + HostAndPort("FakeShard2Host", 12345), + HostAndPort("FakeShard3Host", 12345)}; namespace async_rpc { namespace { /* @@ -63,8 +72,10 @@ TEST_F(AsyncRPCTestFixture, SuccessfulHello) { initializeCommand(helloCmd); auto opCtxHolder = makeOperationContext(); - ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = + sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -97,13 +108,10 @@ TEST_F(AsyncRPCTestFixture, RetryOnSuccessfulHelloAdditionalAttempts) { testPolicy->pushRetryDelay(retryDelay); auto opCtxHolder = makeOperationContext(); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken, testPolicy); ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = - sendCommand(helloCmd, - opCtxHolder.get(), - std::move(targeter), - getExecutorPtr(), - _cancellationToken, - testPolicy); + sendCommand(options, opCtxHolder.get(), std::move(targeter)); const auto onCommandFunc = [&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -142,13 +150,10 @@ TEST_F(AsyncRPCTestFixture, DynamicDelayBetweenRetries) { testPolicy->pushRetryDelay(retryDelays[2]); auto opCtxHolder = makeOperationContext(); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken, testPolicy); ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = - sendCommand(helloCmd, - opCtxHolder.get(), - std::move(targeter), - getExecutorPtr(), - _cancellationToken, - testPolicy); + sendCommand(options, opCtxHolder.get(), std::move(targeter)); const auto onCommandFunc = [&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -185,13 +190,10 @@ TEST_F(AsyncRPCTestFixture, DoNotRetryOnErrorAccordingToPolicy) { testPolicy->setMaxNumRetries(zeroRetries); auto opCtxHolder = makeOperationContext(); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken, testPolicy); ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = - sendCommand(helloCmd, - opCtxHolder.get(), - std::move(targeter), - getExecutorPtr(), - _cancellationToken, - testPolicy); + sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -215,8 +217,9 @@ TEST_F(AsyncRPCTestFixture, LocalError) { initializeCommand(helloCmd); auto opCtxHolder = makeOperationContext(); - auto resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -246,8 +249,9 @@ TEST_F(AsyncRPCTestFixture, RemoteError) { initializeCommand(helloCmd); auto opCtxHolder = makeOperationContext(); - auto resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -277,11 +281,16 @@ TEST_F(AsyncRPCTestFixture, SuccessfulFind) { NamespaceString nss(testDbName); FindCommandRequest findCmd(nss); - auto resultFuture = sendCommand( - findCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + findCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["find"]); + ASSERT(!request.cmdObj["startTransaction"]); + ASSERT(!request.cmdObj["coordinator"]); + ASSERT(!request.cmdObj["autocommit"]); + ASSERT(!request.cmdObj["txnNumber"]); // The BSON documents in this cursor response are created here. // When async_rpc::sendCommand parses the response, it participates // in ownership of the underlying data, so it will participate in @@ -309,8 +318,10 @@ TEST_F(AsyncRPCTestFixture, WriteConcernError) { BSON("ok" << 1 << "writeConcernError" << writeConcernError); auto opCtxHolder = makeOperationContext(); - ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = + sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -348,8 +359,10 @@ TEST_F(AsyncRPCTestFixture, WriteError) { << "Document failed validation"); const BSONObj resWithWriteError = BSON("ok" << 1 << "writeErrors" << BSON_ARRAY(writeError)); auto opCtxHolder = makeOperationContext(); - ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> resultFuture = + sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -383,8 +396,9 @@ TEST_F(AsyncRPCTestFixture, ExecutorShutdown) { HelloCommand helloCmd; initializeCommand(helloCmd); auto opCtxHolder = makeOperationContext(); - auto resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); getExecutorPtr()->shutdown(); auto error = resultFuture.getNoThrow().getStatus(); // The error returned by our API should always be RemoteCommandExecutionError @@ -439,8 +453,9 @@ TEST_F(AsyncRPCTestFixture, ParseAndSeralizeNoop) { initializeCommand(helloCmd); auto opCtxHolder = makeOperationContext(); - auto resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -477,9 +492,9 @@ TEST_F(AsyncRPCTestFixture, FailedTargeting) { HelloCommand helloCmd; initializeCommand(helloCmd); auto opCtxHolder = makeOperationContext(); - - auto resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); auto error = resultFuture.getNoThrow().getStatus(); // The error returned by our API should always be RemoteCommandExecutionError @@ -493,6 +508,175 @@ TEST_F(AsyncRPCTestFixture, FailedTargeting) { ASSERT_EQ(extraInfo->asLocal(), targeterFailStatus); } +TEST_F(AsyncRPCTestFixture, SendTxnCommandWithoutTxnRouterAppendsNoTxnFields) { + ShardId shardId("shard"); + ReadPreferenceSetting readPref; + std::vector<HostAndPort> testHost = {kTestShardHosts[0]}; + // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard. + auto opCtxHolder = makeOperationContext(); + auto targeter = std::make_unique<ShardIdTargeterForTest>( + shardId, opCtxHolder.get(), readPref, getExecutorPtr(), testHost); + DatabaseName testDbName = DatabaseName("testdb", boost::none); + NamespaceString nss(testDbName); + + FindCommandRequest findCmd(nss); + auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + findCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendTxnCommand(options, opCtxHolder.get(), std::move(targeter)); + + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["find"]); + ASSERT(!request.cmdObj["startTransaction"]); + ASSERT(!request.cmdObj["coordinator"]); + ASSERT(!request.cmdObj["autocommit"]); + ASSERT(!request.cmdObj["txnNumber"]); + // The BSON documents in this cursor response are created here. + // When async_rpc::sendCommand parses the response, it participates + // in ownership of the underlying data, so it will participate in + // owning the data in the cursor response. + return CursorResponse(nss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + CursorInitialReply res = std::move(resultFuture).get().response; + ASSERT_BSONOBJ_EQ(res.getCursor()->getFirstBatch()[0], BSON("x" << 1)); +} + +TEST_F(AsyncRPCTxnTestFixture, MultipleSendTxnCommand) { + ShardId shardId("shard"); + ReadPreferenceSetting readPref; + std::vector<HostAndPort> testHost = {kTestShardHosts[0]}; + // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard. + auto targeter = std::make_unique<ShardIdTargeterForTest>( + shardId, getOpCtx(), readPref, getExecutorPtr(), testHost); + DatabaseName testDbName = DatabaseName("testdb", boost::none); + NamespaceString nss(testDbName); + + // Set up the transaction metadata. + TxnNumber txnNum{3}; + getOpCtx()->setTxnNumber(txnNum); + auto txnRouter = TransactionRouter::get(getOpCtx()); + txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(getOpCtx()); + + FindCommandRequest findCmd(nss); + auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + findCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter)); + + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["find"]); + ASSERT(request.cmdObj["startTransaction"].Bool()); + ASSERT(request.cmdObj["coordinator"].Bool()); + ASSERT(!request.cmdObj["autocommit"].Bool()); + ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL); + // The BSON documents in this cursor response are created here. + // When async_rpc::sendCommand parses the response, it participates + // in ownership of the underlying data, so it will participate in + // owning the data in the cursor response. + return CursorResponse(nss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + CursorInitialReply res = std::move(resultFuture).get().response; + ASSERT_BSONOBJ_EQ(res.getCursor()->getFirstBatch()[0], BSON("x" << 1)); + + // // Issue a follow-up find command in the same transaction. + FindCommandRequest secondFindCmd(nss); + auto secondCmdOptions = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + secondFindCmd, getExecutorPtr(), _cancellationToken); + auto secondTargeter = std::make_unique<ShardIdTargeterForTest>( + shardId, getOpCtx(), readPref, getExecutorPtr(), testHost); + auto secondResultFuture = + sendTxnCommand(secondCmdOptions, getOpCtx(), std::move(secondTargeter)); + + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["find"]); + ASSERT(!request.cmdObj["startTransaction"]); + ASSERT(request.cmdObj["coordinator"].Bool()); + ASSERT(!request.cmdObj["autocommit"].Bool()); + ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL); + return CursorResponse(nss, 0LL, {BSON("x" << 2)}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + CursorInitialReply secondRes = std::move(secondResultFuture).get().response; + ASSERT_BSONOBJ_EQ(secondRes.getCursor()->getFirstBatch()[0], BSON("x" << 2)); +} + +TEST_F(AsyncRPCTxnTestFixture, SendTxnCommandReturnsRemoteError) { + ShardId shardId("shard"); + ReadPreferenceSetting readPref; + std::vector<HostAndPort> testHost = {kTestShardHosts[0]}; + // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard. + auto targeter = std::make_unique<ShardIdTargeterForTest>( + shardId, getOpCtx(), readPref, getExecutorPtr(), testHost); + DatabaseName testDbName = DatabaseName("testdb", boost::none); + NamespaceString nss(testDbName); + + // Set up the transaction metadata. + TxnNumber txnNum{3}; + getOpCtx()->setTxnNumber(txnNum); + auto txnRouter = TransactionRouter::get(getOpCtx()); + txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(getOpCtx()); + + FindCommandRequest findCmd(nss); + auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + findCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter)); + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["find"]); + ASSERT(request.cmdObj["startTransaction"].Bool()); + ASSERT(request.cmdObj["coordinator"].Bool()); + ASSERT(!request.cmdObj["autocommit"].Bool()); + ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL); + return createErrorResponse(Status(ErrorCodes::BadValue, "mock")); + }); + + auto error = resultFuture.getNoThrow().getStatus(); + ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); + auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>(); + ASSERT(extraInfo); + ASSERT(extraInfo->isRemote()); +} + +TEST_F(AsyncRPCTxnTestFixture, SendTxnCommandReturnsLocalError) { + ShardId shardId("shard"); + ReadPreferenceSetting readPref; + std::vector<HostAndPort> testHost = {kTestShardHosts[0]}; + // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard. + auto targeter = std::make_unique<ShardIdTargeterForTest>( + shardId, getOpCtx(), readPref, getExecutorPtr(), testHost); + DatabaseName testDbName = DatabaseName("testdb", boost::none); + NamespaceString nss(testDbName); + + // Set up the transaction metadata. + TxnNumber txnNum{3}; + getOpCtx()->setTxnNumber(txnNum); + auto txnRouter = TransactionRouter::get(getOpCtx()); + txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(getOpCtx()); + + FindCommandRequest findCmd(nss); + auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + findCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter)); + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["find"]); + ASSERT(request.cmdObj["startTransaction"].Bool()); + ASSERT(request.cmdObj["coordinator"].Bool()); + ASSERT(!request.cmdObj["autocommit"].Bool()); + ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL); + return Status(ErrorCodes::NetworkTimeout, "mock"); + }); + + auto error = resultFuture.getNoThrow().getStatus(); + ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); + auto extraInfo = error.extraInfo<AsyncRPCErrorInfo>(); + ASSERT(extraInfo); + ASSERT(extraInfo->isLocal()); +} TEST_F(AsyncRPCTestFixture, AttemptedTargetCorrectlyPropogatedWithLocalError) { HelloCommand helloCmd; @@ -501,8 +685,9 @@ TEST_F(AsyncRPCTestFixture, AttemptedTargetCorrectlyPropogatedWithLocalError) { auto targeter = std::make_unique<FixedTargeter>(target); auto opCtxHolder = makeOperationContext(); - auto resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); @@ -528,8 +713,9 @@ TEST_F(AsyncRPCTestFixture, NoAttemptedTargetIfTargetingFails) { auto opCtxHolder = makeOperationContext(); - auto resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); auto error = resultFuture.getNoThrow().getStatus(); ASSERT_EQ(error.code(), ErrorCodes::RemoteCommandExecutionError); @@ -549,8 +735,9 @@ TEST_F(AsyncRPCTestFixture, RemoteErrorAttemptedTargetMatchesActual) { auto opCtxHolder = makeOperationContext(); - auto resultFuture = sendCommand( - helloCmd, opCtxHolder.get(), std::move(targeter), getExecutorPtr(), _cancellationToken); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + helloCmd, getExecutorPtr(), _cancellationToken); + auto resultFuture = sendCommand(options, opCtxHolder.get(), std::move(targeter)); onCommand([&](const auto& request) { ASSERT(request.cmdObj["hello"]); diff --git a/src/mongo/executor/async_rpc_test_fixture.h b/src/mongo/executor/async_rpc_test_fixture.h index 58e79f11a92..f1398bcf466 100644 --- a/src/mongo/executor/async_rpc_test_fixture.h +++ b/src/mongo/executor/async_rpc_test_fixture.h @@ -41,6 +41,7 @@ #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/topology_version_gen.h" +#include "mongo/s/session_catalog_router.h" #include "mongo/unittest/bson_test_util.h" #include "mongo/unittest/unittest.h" #include "mongo/util/duration.h" @@ -91,7 +92,7 @@ private: std::deque<Milliseconds> _retryDelays; }; -class AsyncRPCTestFixture : public LockerNoopServiceContextTest { +class AsyncRPCTestFixture : public ServiceContextTest { public: void setUp() override { ServiceContextTest::setUp(); @@ -188,4 +189,50 @@ public: Status _status; }; + +class ShardIdTargeterForTest : public ShardIdTargeter { +public: + ShardIdTargeterForTest(ShardId shardId, + OperationContext* opCtx, + ReadPreferenceSetting readPref, + ExecutorPtr executor, + std::vector<HostAndPort> resolvedHosts) + : ShardIdTargeter(shardId, opCtx, readPref, executor) { + _resolvedHosts = resolvedHosts; + }; + + SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) override final { + return SemiFuture<std::vector<HostAndPort>>::makeReady(_resolvedHosts); + } + +private: + std::vector<HostAndPort> _resolvedHosts; +}; + +class AsyncRPCTxnTestFixture : public AsyncRPCTestFixture { +public: + void setUp() override { + AsyncRPCTestFixture::setUp(); + _opCtxHolder = makeOperationContext(); + getOpCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); + _routerOpCtxSession.emplace(getOpCtx()); + } + + OperationContext* getOpCtx() { + return _opCtxHolder.get(); + } + + void tearDown() override { + AsyncRPCTestFixture::tearDown(); + } + + const LogicalSessionId& getSessionId() { + return *getOpCtx()->getLogicalSessionId(); + } + +private: + ServiceContext::UniqueOperationContext _opCtxHolder; + boost::optional<mongo::RouterOperationContextSession> _routerOpCtxSession; +}; + } // namespace mongo::async_rpc diff --git a/src/mongo/executor/hedged_async_rpc.h b/src/mongo/executor/hedged_async_rpc.h index 655c8cd874c..511b53a56e7 100644 --- a/src/mongo/executor/hedged_async_rpc.h +++ b/src/mongo/executor/hedged_async_rpc.h @@ -144,9 +144,10 @@ SemiFuture<AsyncRPCResponse<typename CommandType::Reply>> sendHedgedCommand( for (size_t i = 0; i < hostsToTarget; i++) { std::unique_ptr<Targeter> t = std::make_unique<FixedTargeter>(targets[i]); + auto options = std::make_shared<AsyncRPCOptions<CommandType>>( + cmd, exec, hedgeCancellationToken.token()); requests.emplace_back( - sendCommand(cmd, opCtx, std::move(t), exec, hedgeCancellationToken.token()) - .thenRunOn(exec)); + sendCommand(options, opCtx, std::move(t)).thenRunOn(exec)); } /** diff --git a/src/mongo/executor/mock_async_rpc_test.cpp b/src/mongo/executor/mock_async_rpc_test.cpp index ff422301e01..1ef18fbd357 100644 --- a/src/mongo/executor/mock_async_rpc_test.cpp +++ b/src/mongo/executor/mock_async_rpc_test.cpp @@ -72,12 +72,9 @@ public: std::shared_ptr<RetryPolicy> retryPolicy = std::make_shared<NeverRetryPolicy>()) { HelloCommand hello; initializeCommand(hello); - return sendCommand(hello, - _opCtx.get(), - std::make_unique<FixedTargeter>(target), - getExecutorPtr(), - _cancellationToken, - retryPolicy); + auto options = std::make_shared<AsyncRPCOptions<HelloCommand>>( + hello, getExecutorPtr(), _cancellationToken, retryPolicy); + return sendCommand(options, _opCtx.get(), std::make_unique<FixedTargeter>(target)); } ExecutorFuture<AsyncRPCResponse<HelloCommandReply>> sendHelloCommandToLocalHost() { diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp index 966603a517f..982aa527fe4 100644 --- a/src/mongo/executor/task_executor_cursor_test.cpp +++ b/src/mongo/executor/task_executor_cursor_test.cpp @@ -49,10 +49,6 @@ namespace { */ class TaskExecutorCursorFixture : public ThreadPoolExecutorTest { public: - TaskExecutorCursorFixture() { - serviceCtx->registerClientObserver(std::make_unique<LockerNoopClientObserver>()); - } - void setUp() override { ThreadPoolExecutorTest::setUp(); diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index 98a68a837fe..ccd20ecabbc 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -364,7 +364,6 @@ COMMON_EXECUTOR_TEST(EventWaitingWithTimeoutTest) { auto serviceContext = ServiceContext::make(); - serviceContext->registerClientObserver(std::make_unique<LockerNoopClientObserver>()); serviceContext->setFastClockSource(std::make_unique<ClockSourceMock>()); auto mockClock = static_cast<ClockSourceMock*>(serviceContext->getFastClockSource()); @@ -387,7 +386,6 @@ COMMON_EXECUTOR_TEST(EventSignalWithTimeoutTest) { auto serviceContext = ServiceContext::make(); - serviceContext->registerClientObserver(std::make_unique<LockerNoopClientObserver>()); serviceContext->setFastClockSource(std::make_unique<ClockSourceMock>()); auto mockClock = static_cast<ClockSourceMock*>(serviceContext->getFastClockSource()); diff --git a/src/mongo/s/async_rpc_shard_targeter.h b/src/mongo/s/async_rpc_shard_targeter.h index 1dfe6cac767..5fa5a133f2d 100644 --- a/src/mongo/s/async_rpc_shard_targeter.h +++ b/src/mongo/s/async_rpc_shard_targeter.h @@ -57,7 +57,7 @@ public: ExecutorPtr executor) : _shardId(shardId), _opCtx(opCtx), _readPref(readPref), _executor(executor){}; - SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) override final { + SemiFuture<std::vector<HostAndPort>> resolve(CancellationToken t) override { return getShard() .thenRunOn(_executor) .then([this, t](std::shared_ptr<Shard> shard) { @@ -82,6 +82,10 @@ public: return Grid::get(_opCtx)->shardRegistry()->getShard(_executor, _shardId); } + ShardId getShardId() { + return _shardId; + } + private: ShardId _shardId; OperationContext* _opCtx; |