summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2023-03-13 16:00:53 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-13 18:43:50 +0000
commit38d850fc76000fe64264a1c034df02f4c6a0affe (patch)
treee24ffbfffe7b74d0c921f3169dde269a75beec09
parentfbf9372bbf3eb705b38108964a39ab78c3439c4e (diff)
downloadmongo-38d850fc76000fe64264a1c034df02f4c6a0affe.tar.gz
SERVER-72082 Process transaction metadata via the TxnRouter when using AsyncRPC::sendTxnCommand
-rw-r--r--src/mongo/db/commands/txn_cmds.idl4
-rw-r--r--src/mongo/executor/async_rpc_test.cpp135
-rw-r--r--src/mongo/executor/async_transaction_rpc.h17
-rw-r--r--src/mongo/idl/generic_args_with_types.idl4
-rw-r--r--src/mongo/idl/generic_argument.idl3
-rw-r--r--src/mongo/s/transaction_router.cpp14
6 files changed, 163 insertions, 14 deletions
diff --git a/src/mongo/db/commands/txn_cmds.idl b/src/mongo/db/commands/txn_cmds.idl
index 3363e88e690..a771d59ee9c 100644
--- a/src/mongo/db/commands/txn_cmds.idl
+++ b/src/mongo/db/commands/txn_cmds.idl
@@ -42,6 +42,10 @@ structs:
description: "True if the shard has the transaction in progress but has not done a
write for it"
type: bool
+ additionalParticipants:
+ description: "Additional participants in the transaction"
+ optional: true
+ type: array<object>
TxnRecoveryToken:
description: "Contains info for retrying the commit of a sharded transaction"
diff --git a/src/mongo/executor/async_rpc_test.cpp b/src/mongo/executor/async_rpc_test.cpp
index b38b749112e..6f20e410871 100644
--- a/src/mongo/executor/async_rpc_test.cpp
+++ b/src/mongo/executor/async_rpc_test.cpp
@@ -824,7 +824,8 @@ TEST_F(AsyncRPCTxnTestFixture, MultipleSendTxnCommand) {
// in ownership of the underlying data, so it will participate in
// owning the data in the cursor response.
return CursorResponse(nss, 0LL, {BSON("x" << 1)})
- .toBSON(CursorResponse::ResponseType::InitialResponse);
+ .toBSON(CursorResponse::ResponseType::InitialResponse)
+ .addFields(BSON("readOnly" << true));
});
CursorInitialReply res = std::move(resultFuture).get().response;
@@ -846,13 +847,140 @@ TEST_F(AsyncRPCTxnTestFixture, MultipleSendTxnCommand) {
ASSERT(!request.cmdObj["autocommit"].Bool());
ASSERT_EQUALS(request.cmdObj["txnNumber"].numberLong(), 3LL);
return CursorResponse(nss, 0LL, {BSON("x" << 2)})
- .toBSON(CursorResponse::ResponseType::InitialResponse);
+ .toBSON(CursorResponse::ResponseType::InitialResponse)
+ .addFields(BSON("readOnly" << false));
+ });
+
+ CursorInitialReply secondRes = std::move(secondResultFuture).get().response;
+ ASSERT_BSONOBJ_EQ(secondRes.getCursor()->getFirstBatch()[0], BSON("x" << 2));
+}
+
+// We test side-effects of calling `processParticipantResponse` with different values for `readOnly`
+// in the response to ensure it is being invoked correctly by the sendTxnCommand wrapper.
+TEST_F(AsyncRPCTxnTestFixture, EnsureProcessParticipantCalledCorrectlyOnSuccess) {
+ ShardId shardId("shard");
+ ReadPreferenceSetting readPref;
+ std::vector<HostAndPort> testHost = {kTestShardHosts[0]};
+ // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard.
+ auto targeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, getOpCtx(), readPref, getExecutorPtr(), testHost);
+ DatabaseName testDbName = DatabaseName("testdb", boost::none);
+ NamespaceString nss = NamespaceString::createNamespaceString_forTest(testDbName);
+
+ // Set up the transaction metadata.
+ TxnNumber txnNum{3};
+ getOpCtx()->setTxnNumber(txnNum);
+ auto txnRouter = TransactionRouter::get(getOpCtx());
+ txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(getOpCtx());
+
+ FindCommandRequest findCmd(nss);
+ auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ findCmd, getExecutorPtr(), _cancellationToken);
+
+ // There should be no recovery shard to start with.
+ ASSERT(!txnRouter.getRecoveryShardId());
+ auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter));
+
+ // Set "readOnly: true" in the reply.
+ onCommand([&](const auto& request) {
+ return CursorResponse(nss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::InitialResponse)
+ .addFields(BSON("readOnly" << true));
+ });
+
+ CursorInitialReply res = std::move(resultFuture).get().response;
+ ASSERT_BSONOBJ_EQ(res.getCursor()->getFirstBatch()[0], BSON("x" << 1));
+ // First statement was read-only. If processed correctly by the router, we shouldn't have set a
+ // recovery shard.
+ ASSERT_FALSE(txnRouter.getRecoveryShardId());
+
+ // // Issue a follow-up find command in the same transaction.
+ FindCommandRequest secondFindCmd(nss);
+ auto secondCmdOptions = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ secondFindCmd, getExecutorPtr(), _cancellationToken);
+ auto secondTargeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, getOpCtx(), readPref, getExecutorPtr(), testHost);
+ auto secondResultFuture =
+ sendTxnCommand(secondCmdOptions, getOpCtx(), std::move(secondTargeter));
+
+ // Set "readOnly: false" in this response. If processed correctly by the router, we _will_ set a
+ // recovery shard.
+ onCommand([&](const auto& request) {
+ return CursorResponse(nss, 0LL, {BSON("x" << 2)})
+ .toBSON(CursorResponse::ResponseType::InitialResponse)
+ .addFields(BSON("readOnly" << false));
});
CursorInitialReply secondRes = std::move(secondResultFuture).get().response;
ASSERT_BSONOBJ_EQ(secondRes.getCursor()->getFirstBatch()[0], BSON("x" << 2));
+
+ // We should have set a recovery shard, if `TxnRouter::processParticipantResponse` was invoked
+ // correctly.
+ ASSERT(txnRouter.getRecoveryShardId());
+ ASSERT_EQ(*txnRouter.getRecoveryShardId(), shardId);
}
+TEST_F(AsyncRPCTxnTestFixture, EnsureProcessParticipantCalledCorrectlyOnRemoteError) {
+ ShardId shardId("shard");
+ ReadPreferenceSetting readPref;
+ std::vector<HostAndPort> testHost = {kTestShardHosts[0]};
+ // Use a mock ShardIdTargeter to avoid calling into the ShardRegistry to get a target shard.
+ auto targeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, getOpCtx(), readPref, getExecutorPtr(), testHost);
+ DatabaseName testDbName = DatabaseName("testdb", boost::none);
+ NamespaceString nss = NamespaceString::createNamespaceString_forTest(testDbName);
+ // Set up the transaction metadata.
+ TxnNumber txnNum{3};
+ getOpCtx()->setTxnNumber(txnNum);
+ auto txnRouter = TransactionRouter::get(getOpCtx());
+ txnRouter.beginOrContinueTxn(getOpCtx(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(getOpCtx());
+
+ FindCommandRequest findCmd(nss);
+ auto options = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ findCmd, getExecutorPtr(), _cancellationToken);
+
+ // There should be no recovery shard to start with.
+ ASSERT(!txnRouter.getRecoveryShardId());
+ auto resultFuture = sendTxnCommand(options, getOpCtx(), std::move(targeter));
+
+ // Set "readOnly: false" in the reply.
+ onCommand([&](const auto& request) {
+ return createErrorResponse({ErrorCodes::BadValue, "test"})
+ .addFields(BSON("readOnly" << false));
+ });
+ // Because the router ignores error-responses that aren't "ErrorCodes::WouldChangeOwningShard",
+ // expect no change to the TransactionRouter state.
+ std::move(resultFuture).getNoThrow().getStatus().ignore();
+ ASSERT_FALSE(txnRouter.getRecoveryShardId());
+
+ // // Issue a follow-up find command in the same transaction.
+ FindCommandRequest secondFindCmd(nss);
+ auto secondCmdOptions = std::make_shared<AsyncRPCOptions<FindCommandRequest>>(
+ secondFindCmd, getExecutorPtr(), _cancellationToken);
+ auto secondTargeter = std::make_unique<ShardIdTargeterForTest>(
+ shardId, getOpCtx(), readPref, getExecutorPtr(), testHost);
+ auto secondResultFuture =
+ sendTxnCommand(secondCmdOptions, getOpCtx(), std::move(secondTargeter));
+
+ // Use WouldChangeOwningShard error this time.
+ onCommand([&](const auto& request) -> BSONObj {
+ auto code = ErrorCodes::WouldChangeOwningShard;
+ auto err = BSON("ok" << false << "code" << code << "codeName"
+ << ErrorCodes::errorString(code) << "errmsg"
+ << "test"
+ << "preImage" << BSON("x" << 1) << "postImage" << BSON("x" << 2)
+ << "shouldUpsert" << true);
+ return err.addFields(BSON("readOnly" << false));
+ });
+
+ std::move(secondResultFuture).getNoThrow().getStatus().ignore();
+ // We should have set a recovery shard, if `TxnRouter::processParticipantResponse` was invoked
+ // correctly.
+ ASSERT(txnRouter.getRecoveryShardId());
+ ASSERT_EQ(*txnRouter.getRecoveryShardId(), shardId);
+}
TEST_F(AsyncRPCTxnTestFixture, SendTxnCommandWithGenericArgs) {
ShardId shardId("shard");
@@ -912,7 +1040,8 @@ TEST_F(AsyncRPCTxnTestFixture, SendTxnCommandWithGenericArgs) {
// in ownership of the underlying data, so it will participate in
// owning the data in the cursor response.
return CursorResponse(nss, 0LL, {BSON("x" << 1)})
- .toBSON(CursorResponse::ResponseType::InitialResponse);
+ .toBSON(CursorResponse::ResponseType::InitialResponse)
+ .addFields(BSON("readOnly" << false));
});
CursorInitialReply res = std::move(resultFuture).get().response;
diff --git a/src/mongo/executor/async_transaction_rpc.h b/src/mongo/executor/async_transaction_rpc.h
index 56f438e3f3b..5ec4cc85370 100644
--- a/src/mongo/executor/async_transaction_rpc.h
+++ b/src/mongo/executor/async_transaction_rpc.h
@@ -69,10 +69,19 @@ ExecutorFuture<AsyncRPCResponse<typename CommandType::Reply>> sendTxnCommand(
return swResponse;
}
if (swResponse.isOK()) {
- // TODO (SERVER-72082): Make sure TxnResponseMetadata is appended to the BSON
- // that we are passing into 'processParticipantResponse'.
- txnRouter.processParticipantResponse(
- opCtx, shardId, swResponse.getValue().response.toBSON());
+ ReplyType reply = swResponse.getValue();
+ GenericReplyFields gens = reply.genericReplyFields;
+ // The TransactionRouter expects a raw-BSON command-response
+ // in its API for processing transaction metadata. The async_rpc API
+ // doesn't expose the raw-BSON of the response in the case of command-success,
+ // so we construct a fake one for now to appease the TxnRouter API.
+ auto fakeResponseObj = [&] {
+ BSONObjBuilder bob;
+ gens.stable.serialize(&bob);
+ gens.unstable.serialize(&bob);
+ return bob.obj();
+ }();
+ txnRouter.processParticipantResponse(opCtx, shardId, fakeResponseObj);
} else {
auto extraInfo = swResponse.getStatus().template extraInfo<AsyncRPCErrorInfo>();
if (extraInfo->isRemote()) {
diff --git a/src/mongo/idl/generic_args_with_types.idl b/src/mongo/idl/generic_args_with_types.idl
index 90b270650c3..57e659434a2 100644
--- a/src/mongo/idl/generic_args_with_types.idl
+++ b/src/mongo/idl/generic_args_with_types.idl
@@ -258,6 +258,10 @@ structs:
type: bool
forward_from_shards: false
optional: true
+ additionalParticipants:
+ type: array<object>
+ forward_from_shards: false
+ optional: true
$configTime:
type: timestamp
cpp_name: "dollarConfigTime"
diff --git a/src/mongo/idl/generic_argument.idl b/src/mongo/idl/generic_argument.idl
index aae807ef346..ba12e0b972a 100644
--- a/src/mongo/idl/generic_argument.idl
+++ b/src/mongo/idl/generic_argument.idl
@@ -195,6 +195,9 @@ structs:
readOnly:
type: IDLAnyType
forward_from_shards: false
+ additionalParticipants:
+ type: IDLAnyType
+ forward_from_shards: false
$configTime:
type: IDLAnyType
cpp_name: "dollarConfigTime"
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 12099dba7ea..b5c3b5a75ab 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -565,14 +565,14 @@ void TransactionRouter::Router::processParticipantResponse(OperationContext* opC
}
}
- const std::string extraParticipants = "additionalParticipants";
- if (responseObj.hasField(extraParticipants)) {
- BSONForEach(e, responseObj.getField(extraParticipants).Array()) {
- mongo::ShardId addingparticipant = ShardId(
- std::string(e.Obj().getField(StringData{"shardId"}).checkAndGetStringData()));
- auto txnPart = _createParticipant(opCtx, addingparticipant);
+ if (txnResponseMetadata.getAdditionalParticipants()) {
+ auto additionalParticipants = *txnResponseMetadata.getAdditionalParticipants();
+ for (auto&& participantElem : additionalParticipants) {
+ mongo::ShardId addingParticipant = ShardId(std::string(
+ participantElem.getField(StringData{"shardId"}).checkAndGetStringData()));
+ _createParticipant(opCtx, addingParticipant);
_setReadOnlyForParticipant(
- opCtx, addingparticipant, Participant::ReadOnly::kNotReadOnly);
+ opCtx, addingParticipant, Participant::ReadOnly::kNotReadOnly);
if (!p().isRecoveringCommit) {
// Don't update participant stats during recovery since the participant list isn't