diff options
author | George Wangensteen <george.wangensteen@mongodb.com> | 2023-03-13 16:00:53 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-13 18:43:50 +0000 |
commit | 38d850fc76000fe64264a1c034df02f4c6a0affe (patch) | |
tree | e24ffbfffe7b74d0c921f3169dde269a75beec09 | |
parent | fbf9372bbf3eb705b38108964a39ab78c3439c4e (diff) | |
download | mongo-38d850fc76000fe64264a1c034df02f4c6a0affe.tar.gz |
SERVER-72082 Process transaction metadata via the TxnRouter when using AsyncRPC::sendTxnCommand
-rw-r--r-- | src/mongo/db/commands/txn_cmds.idl | 4 | ||||
-rw-r--r-- | src/mongo/executor/async_rpc_test.cpp | 135 | ||||
-rw-r--r-- | src/mongo/executor/async_transaction_rpc.h | 17 | ||||
-rw-r--r-- | src/mongo/idl/generic_args_with_types.idl | 4 | ||||
-rw-r--r-- | src/mongo/idl/generic_argument.idl | 3 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 14 |
6 files changed, 163 insertions, 14 deletions
diff --git a/src/mongo/db/commands/txn_cmds.idl b/src/mongo/db/commands/txn_cmds.idl index 3363e88e690..a771d59ee9c 100644 --- a/src/mongo/db/commands/txn_cmds.idl +++ b/src/mongo/db/commands/txn_cmds.idl @@ -42,6 +42,10 @@ structs: description: "True if the shard has the transaction in progress but has not done a write for it" type: bool + additionalParticipants: + description: "Additional participants in the transaction" + optional: true + type: array<object> TxnRecoveryToken: description: "Contains info for retrying the commit of a sharded transaction" diff --git a/src/mongo/executor/async_rpc_test.cpp b/src/mongo/executor/async_rpc_test.cpp index b38b749112e..6f20e410871 100644 --- a/src/mongo/executor/async_rpc_test.cpp +++ b/src/mongo/executor/async_rpc_test.cpp @@ -824,7 +824,8 @@ TEST_F(AsyncRPCTxnTestFixture, MultipleSendTxnCommand) { // in ownership of the underlying data, so it will participate in // owning the data in the cursor response. return CursorResponse(nss, 0LL, {BSON("x" << 1)}) - .toBSON(CursorResponse::ResponseType::InitialResponse); + .toBSON(CursorResponse::ResponseType::InitialResponse) + .addFields(BSON("readOnly" << true)); }); CursorInitialReply res = std::move(resultFuture).get().response; @@ -846,13 +847,140 @@ TEST_F(AsyncRPCTxnTestFixture, MultipleSendTxnCommand) { ASSERT(!request.cmdObj["autocommit"].Bool()); ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL); return CursorResponse(nss, 0LL, {BSON("x" << 2)}) - .toBSON(CursorResponse::ResponseType::InitialResponse); + .toBSON(CursorResponse::ResponseType::InitialResponse) + .addFields(BSON("readOnly" << false)); + }); + + CursorInitialReply secondRes = std::move(secondResultFuture).get().response; + ASSERT_BSONOBJ_EQ(secondRes.getCursor()->getFirstBatch()[0], BSON("x" << 2)); +} + +// We test side-effects of calling `processParticipantResponse` with different values for `readOnly` +// in the response to ensure it is being invoked correctly by the sendTxnCommand wrapper. +TEST_F(AsyncRPCTxnTestFixture, EnsureProcessParticipantCalledCorrectlyOnSuccess) { + ShardId shardId("shard"); + ReadPreferenceSetting readPref; + std::vector<HostAndPort> testHost = {kTestShardHosts[0]}; + // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard. + auto targeter = std::make_unique<ShardIdTargeterForTest>( + shardId, getOpCtx(), readPref, getExecutorPtr(), testHost); + DatabaseName testDbName = DatabaseName("testdb", boost::none); + NamespaceString nss = NamespaceString::createNamespaceString_forTest(testDbName); + + // Set up the transaction metadata. + TxnNumber txnNum{3}; + getOpCtx()->setTxnNumber(txnNum); + auto txnRouter = TransactionRouter::get(getOpCtx()); + txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(getOpCtx()); + + FindCommandRequest findCmd(nss); + auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + findCmd, getExecutorPtr(), _cancellationToken); + + // There should be no recovery shard to start with. + ASSERT(!txnRouter.getRecoveryShardId()); + auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter)); + + // Set "readOnly: true" in the reply. + onCommand([&](const auto& request) { + return CursorResponse(nss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::InitialResponse) + .addFields(BSON("readOnly" << true)); + }); + + CursorInitialReply res = std::move(resultFuture).get().response; + ASSERT_BSONOBJ_EQ(res.getCursor()->getFirstBatch()[0], BSON("x" << 1)); + // First statement was read-only. If processed correctly by the router, we shouldn't have set a + // recovery shard. + ASSERT_FALSE(txnRouter.getRecoveryShardId()); + + // // Issue a follow-up find command in the same transaction. + FindCommandRequest secondFindCmd(nss); + auto secondCmdOptions = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + secondFindCmd, getExecutorPtr(), _cancellationToken); + auto secondTargeter = std::make_unique<ShardIdTargeterForTest>( + shardId, getOpCtx(), readPref, getExecutorPtr(), testHost); + auto secondResultFuture = + sendTxnCommand(secondCmdOptions, getOpCtx(), std::move(secondTargeter)); + + // Set "readOnly: false" in this response. If processed correctly by the router, we _will_ set a + // recovery shard. + onCommand([&](const auto& request) { + return CursorResponse(nss, 0LL, {BSON("x" << 2)}) + .toBSON(CursorResponse::ResponseType::InitialResponse) + .addFields(BSON("readOnly" << false)); }); CursorInitialReply secondRes = std::move(secondResultFuture).get().response; ASSERT_BSONOBJ_EQ(secondRes.getCursor()->getFirstBatch()[0], BSON("x" << 2)); + + // We should have set a recovery shard, if `TxnRouter::processParticipantResponse` was invoked + // correctly. + ASSERT(txnRouter.getRecoveryShardId()); + ASSERT_EQ(*txnRouter.getRecoveryShardId(), shardId); } +TEST_F(AsyncRPCTxnTestFixture, EnsureProcessParticipantCalledCorrectlyOnRemoteError) { + ShardId shardId("shard"); + ReadPreferenceSetting readPref; + std::vector<HostAndPort> testHost = {kTestShardHosts[0]}; + // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard. + auto targeter = std::make_unique<ShardIdTargeterForTest>( + shardId, getOpCtx(), readPref, getExecutorPtr(), testHost); + DatabaseName testDbName = DatabaseName("testdb", boost::none); + NamespaceString nss = NamespaceString::createNamespaceString_forTest(testDbName); + // Set up the transaction metadata. + TxnNumber txnNum{3}; + getOpCtx()->setTxnNumber(txnNum); + auto txnRouter = TransactionRouter::get(getOpCtx()); + txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(getOpCtx()); + + FindCommandRequest findCmd(nss); + auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + findCmd, getExecutorPtr(), _cancellationToken); + + // There should be no recovery shard to start with. + ASSERT(!txnRouter.getRecoveryShardId()); + auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter)); + + // Set "readOnly: false" in the reply. + onCommand([&](const auto& request) { + return createErrorResponse({ErrorCodes::BadValue, "test"}) + .addFields(BSON("readOnly" << false)); + }); + // Because the router ignores error-responses that aren't "ErrorCodes::WouldChangeOwningShard", + // expect no change to the TransactionRouter state. + std::move(resultFuture).getNoThrow().getStatus().ignore(); + ASSERT_FALSE(txnRouter.getRecoveryShardId()); + + // // Issue a follow-up find command in the same transaction. + FindCommandRequest secondFindCmd(nss); + auto secondCmdOptions = std::make_shared<AsyncRPCOptions<FindCommandRequest>>( + secondFindCmd, getExecutorPtr(), _cancellationToken); + auto secondTargeter = std::make_unique<ShardIdTargeterForTest>( + shardId, getOpCtx(), readPref, getExecutorPtr(), testHost); + auto secondResultFuture = + sendTxnCommand(secondCmdOptions, getOpCtx(), std::move(secondTargeter)); + + // Use WouldChangeOwningShard error this time. + onCommand([&](const auto& request) -> BSONObj { + auto code = ErrorCodes::WouldChangeOwningShard; + auto err = BSON("ok" << false << "code" << code << "codeName" + << ErrorCodes::errorString(code) << "errmsg" + << "test" + << "preImage" << BSON("x" << 1) << "postImage" << BSON("x" << 2) + << "shouldUpsert" << true); + return err.addFields(BSON("readOnly" << false)); + }); + + std::move(secondResultFuture).getNoThrow().getStatus().ignore(); + // We should have set a recovery shard, if `TxnRouter::processParticipantResponse` was invoked + // correctly. + ASSERT(txnRouter.getRecoveryShardId()); + ASSERT_EQ(*txnRouter.getRecoveryShardId(), shardId); +} TEST_F(AsyncRPCTxnTestFixture, SendTxnCommandWithGenericArgs) { ShardId shardId("shard"); @@ -912,7 +1040,8 @@ TEST_F(AsyncRPCTxnTestFixture, SendTxnCommandWithGenericArgs) { // in ownership of the underlying data, so it will participate in // owning the data in the cursor response. return CursorResponse(nss, 0LL, {BSON("x" << 1)}) - .toBSON(CursorResponse::ResponseType::InitialResponse); + .toBSON(CursorResponse::ResponseType::InitialResponse) + .addFields(BSON("readOnly" << false)); }); CursorInitialReply res = std::move(resultFuture).get().response; diff --git a/src/mongo/executor/async_transaction_rpc.h b/src/mongo/executor/async_transaction_rpc.h index 56f438e3f3b..5ec4cc85370 100644 --- a/src/mongo/executor/async_transaction_rpc.h +++ b/src/mongo/executor/async_transaction_rpc.h @@ -69,10 +69,19 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendTxnCommand( return swResponse; } if (swResponse.isOK()) { - // TODO (SERVER-72082): Make sure TxnResponseMetadata is appended to the BSON - // that we are passing into 'processParticipantResponse'. - txnRouter.processParticipantResponse( - opCtx, shardId, swResponse.getValue().response.toBSON()); + ReplyType reply = swResponse.getValue(); + GenericReplyFields gens = reply.genericReplyFields; + // The TransactionRouter expects a raw-BSON command-response + // in its API for processing transaction metadata. The async_rpc API + // doesn't expose the raw-BSON of the response in the case of command-success, + // so we construct a fake one for now to appease the TxnRouter API. + auto fakeResponseObj = [&] { + BSONObjBuilder bob; + gens.stable.serialize(&bob); + gens.unstable.serialize(&bob); + return bob.obj(); + }(); + txnRouter.processParticipantResponse(opCtx, shardId, fakeResponseObj); } else { auto extraInfo = swResponse.getStatus().template extraInfo<AsyncRPCErrorInfo>(); if (extraInfo->isRemote()) { diff --git a/src/mongo/idl/generic_args_with_types.idl b/src/mongo/idl/generic_args_with_types.idl index 90b270650c3..57e659434a2 100644 --- a/src/mongo/idl/generic_args_with_types.idl +++ b/src/mongo/idl/generic_args_with_types.idl @@ -258,6 +258,10 @@ structs: type: bool forward_from_shards: false optional: true + additionalParticipants: + type: array<object> + forward_from_shards: false + optional: true $configTime: type: timestamp cpp_name: "dollarConfigTime" diff --git a/src/mongo/idl/generic_argument.idl b/src/mongo/idl/generic_argument.idl index aae807ef346..ba12e0b972a 100644 --- a/src/mongo/idl/generic_argument.idl +++ b/src/mongo/idl/generic_argument.idl @@ -195,6 +195,9 @@ structs: readOnly: type: IDLAnyType forward_from_shards: false + additionalParticipants: + type: IDLAnyType + forward_from_shards: false $configTime: type: IDLAnyType cpp_name: "dollarConfigTime" diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 12099dba7ea..b5c3b5a75ab 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -565,14 +565,14 @@ void TransactionRouter::Router::processParticipantResponse(OperationContext* opC } } - const std::string extraParticipants = "additionalParticipants"; - if (responseObj.hasField(extraParticipants)) { - BSONForEach(e, responseObj.getField(extraParticipants).Array()) { - mongo::ShardId addingparticipant = ShardId( - std::string(e.Obj().getField(StringData{"shardId"}).checkAndGetStringData())); - auto txnPart = _createParticipant(opCtx, addingparticipant); + if (txnResponseMetadata.getAdditionalParticipants()) { + auto additionalParticipants = *txnResponseMetadata.getAdditionalParticipants(); + for (auto&& participantElem : additionalParticipants) { + mongo::ShardId addingParticipant = ShardId(std::string( + participantElem.getField(StringData{"shardId"}).checkAndGetStringData())); + _createParticipant(opCtx, addingParticipant); _setReadOnlyForParticipant( - opCtx, addingparticipant, Participant::ReadOnly::kNotReadOnly); + opCtx, addingParticipant, Participant::ReadOnly::kNotReadOnly); if (!p().isRecoveringCommit) { // Don't update participant stats during recovery since the participant list isn't |