summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-02-07 15:46:37 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-07 19:21:55 +0000
commitcebfa751cb5db526fcba92dc2c1a69ed507a075b (patch)
tree9d5a8f9cd6ea0e0c4b21adf20737fe6e7cd01551 /src/mongo
parent7a2d86c376488cc756c0325e6edaf3406a86ec5d (diff)
downloadmongo-cebfa751cb5db526fcba92dc2c1a69ed507a075b.tar.gz
SERVER-63364 Use transaction API to handle WCOS errors for findAndModify without retryability
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/transaction_api.cpp49
-rw-r--r--src/mongo/db/transaction_api.h12
-rw-r--r--src/mongo/db/transaction_api_test.cpp78
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp15
-rw-r--r--src/mongo/s/cluster_commands_helpers.h7
-rw-r--r--src/mongo/s/commands/SConscript2
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp146
8 files changed, 167 insertions, 143 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index f9454dfe6d5..e6abcb42581 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -839,7 +839,6 @@ env.Library(
'repl/repl_coordinator_interface',
'service_context',
'shared_request_handling',
- 'write_ops',
],
)
diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp
index 0d6a7637541..8c4fe8c114c 100644
--- a/src/mongo/db/transaction_api.cpp
+++ b/src/mongo/db/transaction_api.cpp
@@ -48,6 +48,7 @@
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/reply_interface.h"
+#include "mongo/s/is_mongos.h"
#include "mongo/stdx/future.h"
#include "mongo/transport/service_entry_point.h"
@@ -297,7 +298,7 @@ SemiFuture<BSONObj> Transaction::_commitOrAbort(StringData dbName, StringData cm
BSONObjBuilder cmdBuilder;
cmdBuilder.append(cmdName, 1);
- cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern.toBSON());
+ cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern);
auto cmdObj = cmdBuilder.obj();
return _txnClient->runCommand(dbName, cmdObj).semi();
@@ -363,7 +364,7 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) {
if (_state == TransactionState::kInit) {
_state = TransactionState::kStarted;
_sessionInfo.setStartTransaction(boost::none);
- cmdBuilder->append(_readConcern.toBSON().firstElement());
+ cmdBuilder->append(repl::ReadConcernArgs::kReadConcernFieldName, _readConcern);
}
_latestResponseHasTransientTransactionErrorLabel = false;
@@ -379,32 +380,19 @@ void Transaction::processResponse(const BSONObj& reply) {
}
}
-void Transaction::_setSessionInfo(LogicalSessionId lsid,
- TxnNumber txnNumber,
- boost::optional<TxnRetryCounter> txnRetryCounter) {
+void Transaction::_setSessionInfo(LogicalSessionId lsid, TxnNumber txnNumber) {
_sessionInfo.setSessionId(lsid);
_sessionInfo.setTxnNumber(txnNumber);
- _sessionInfo.setTxnRetryCounter(txnRetryCounter ? *txnRetryCounter : 0);
}
void Transaction::primeForTransactionRetry() {
_latestResponseHasTransientTransactionErrorLabel = false;
switch (_execContext) {
case ExecutionContext::kOwnSession:
- // Advance txnNumber.
- _sessionInfo.setTxnNumber(*_sessionInfo.getTxnNumber() + 1);
- _sessionInfo.setStartTransaction(true);
- _state = TransactionState::kInit;
- return;
case ExecutionContext::kClientSession:
- // Advance txnRetryCounter.
- _sessionInfo.setTxnRetryCounter(*_sessionInfo.getTxnRetryCounter() + 1);
- _sessionInfo.setStartTransaction(true);
- _state = TransactionState::kInit;
- return;
case ExecutionContext::kClientRetryableWrite:
- // Advance txnRetryCounter.
- _sessionInfo.setTxnRetryCounter(*_sessionInfo.getTxnRetryCounter() + 1);
+ // Advance txnNumber.
+ _sessionInfo.setTxnNumber(*_sessionInfo.getTxnNumber() + 1);
_sessionInfo.setStartTransaction(true);
_state = TransactionState::kInit;
return;
@@ -431,27 +419,27 @@ void Transaction::_primeTransaction(OperationContext* opCtx) {
auto clientSession = opCtx->getLogicalSessionId();
auto clientTxnNumber = opCtx->getTxnNumber();
auto clientInMultiDocumentTransaction = opCtx->inMultiDocumentTransaction();
- auto clientTxnRetryCounter = opCtx->getTxnRetryCounter();
if (!clientSession) {
// TODO SERVER-61783: Integrate session pool.
- _setSessionInfo(makeLogicalSessionId(opCtx), 0, 0);
+ _setSessionInfo(makeLogicalSessionId(opCtx), 0 /* txnNumber */);
_execContext = ExecutionContext::kOwnSession;
} else if (!clientTxnNumber) {
- _setSessionInfo(makeLogicalSessionIdWithTxnUUID(*clientSession), 0, 0);
+ _setSessionInfo(makeLogicalSessionIdWithTxnUUID(*clientSession), 0 /* txnNumber */);
_execContext = ExecutionContext::kClientSession;
// TODO SERVER-59186: Handle client session case.
MONGO_UNREACHABLE;
} else if (!clientInMultiDocumentTransaction) {
- _setSessionInfo(
- makeLogicalSessionIdWithTxnNumberAndUUID(*clientSession, *clientTxnNumber), 0, 0);
+ _setSessionInfo(makeLogicalSessionIdWithTxnNumberAndUUID(*clientSession, *clientTxnNumber),
+ 0 /* txnNumber */);
_execContext = ExecutionContext::kClientRetryableWrite;
- // TODO SERVER-59186: Handle client retryable write case.
- MONGO_UNREACHABLE;
+ // TODO SERVER-59186: Handle client retryable write case on mongod. This is different from
+ // mongos because only mongod checks out a transaction session for retryable writes.
+ invariant(isMongos(), "This case is not yet supported on a mongod");
} else {
- _setSessionInfo(*clientSession, *clientTxnNumber, clientTxnRetryCounter);
+ _setSessionInfo(*clientSession, *clientTxnNumber);
_execContext = ExecutionContext::kClientTransaction;
// TODO SERVER-59186: Handle client transaction case.
@@ -460,9 +448,12 @@ void Transaction::_primeTransaction(OperationContext* opCtx) {
_sessionInfo.setStartTransaction(true);
_sessionInfo.setAutocommit(false);
- // Extract non-session options.
- _readConcern = repl::ReadConcernArgs::get(opCtx);
- _writeConcern = opCtx->getWriteConcern();
+ // Extract non-session options. Strip provenance so it can be correctly inferred for the
+ // generated commands as if it came from an external client.
+ _readConcern = repl::ReadConcernArgs::get(opCtx).toBSONInner().removeField(
+ ReadWriteConcernProvenanceBase::kSourceFieldName);
+ _writeConcern = opCtx->getWriteConcern().toBSON().removeField(
+ ReadWriteConcernProvenanceBase::kSourceFieldName);
LOGV2_DEBUG(5875901,
0, // TODO SERVER-61781: Raise verbosity.
diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h
index d09d83fc214..7c0684bef39 100644
--- a/src/mongo/db/transaction_api.h
+++ b/src/mongo/db/transaction_api.h
@@ -282,8 +282,8 @@ public:
/**
* Handles the given transaction result based on where the transaction is in its lifecycle and
- * its execution context, e.g. by updating its txnNumber or txnRetryCounter, and returns the
- * next step for the transaction runner.
+ * its execution context, e.g. by updating its txnNumber, and returns the next step for the
+ * transaction runner.
*/
ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult) const;
@@ -318,9 +318,7 @@ private:
return std::make_unique<TxnMetadataHooks>(*this);
}
- void _setSessionInfo(LogicalSessionId lsid,
- TxnNumber txnNumber,
- boost::optional<TxnRetryCounter> txnRetryCounter);
+ void _setSessionInfo(LogicalSessionId lsid, TxnNumber txnNumber);
SemiFuture<BSONObj> _commitOrAbort(StringData dbName, StringData cmdName);
@@ -337,8 +335,8 @@ private:
bool _latestResponseHasTransientTransactionErrorLabel{false};
OperationSessionInfo _sessionInfo;
- repl::ReadConcernArgs _readConcern;
- WriteConcernOptions _writeConcern;
+ BSONObj _writeConcern;
+ BSONObj _readConcern;
ExecutionContext _execContext;
TransactionState _state{TransactionState::kInit};
};
diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp
index 9a6d2f8d25f..3b5e19d276b 100644
--- a/src/mongo/db/transaction_api_test.cpp
+++ b/src/mongo/db/transaction_api_test.cpp
@@ -121,7 +121,6 @@ namespace {
void assertTxnMetadata(BSONObj obj,
TxnNumber txnNumber,
- boost::optional<TxnRetryCounter> txnRetryCounter,
boost::optional<bool> startTransaction,
boost::optional<BSONObj> readConcern = boost::none,
boost::optional<BSONObj> writeConcern = boost::none) {
@@ -129,12 +128,6 @@ void assertTxnMetadata(BSONObj obj,
ASSERT_EQ(obj["autocommit"].Bool(), false);
ASSERT_EQ(obj["txnNumber"].Long(), txnNumber);
- if (txnRetryCounter) {
- ASSERT_EQ(obj["txnRetryCounter"].Int(), *txnRetryCounter);
- } else {
- ASSERT(obj["txnRetryCounter"].eoo());
- }
-
if (startTransaction) {
ASSERT_EQ(obj["startTransaction"].Bool(), *startTransaction);
} else {
@@ -190,13 +183,10 @@ protected:
opCtx(), InlineQueuedCountingExecutor::make(), std::move(mockClient));
}
- void expectSentAbort(TxnNumber txnNumber,
- TxnRetryCounter txnRetryCounter,
- BSONObj writeConcern) {
+ void expectSentAbort(TxnNumber txnNumber, BSONObj writeConcern) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
txnNumber,
- txnRetryCounter,
boost::none /* startTransaction */,
boost::none /* readConcern */,
writeConcern);
@@ -220,10 +210,8 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
<< "documents" << BSON_ARRAY(BSON("x" << 1))))
.get();
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
- assertTxnMetadata(mockClient()->getLastSentRequest(),
- 0 /* txnNumber */,
- 0 /* txnRetryCounter */,
- true /* startTransaction */);
+ assertTxnMetadata(
+ mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
mockClient()->setNextCommandResponse(kOKInsertResponse);
insertRes = txnClient
@@ -235,7 +223,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
0 /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */);
// The commit response.
@@ -248,7 +235,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
0 /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -284,7 +270,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
true /* startTransaction */);
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -297,7 +282,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */);
// Throw a transient error to verify the retries behavior.
@@ -313,7 +297,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
writeConcern.toBSON());
@@ -352,7 +335,7 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnAbort) {
});
ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError);
- expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, writeConcern.toBSON());
+ expectSentAbort(0 /* txnNumber */, writeConcern.toBSON());
}
}
@@ -382,7 +365,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
true /* startTransaction */,
readConcern.toBSONInner());
@@ -397,7 +379,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */);
// Throw a transient error to verify the retry will still use the read concern.
@@ -413,7 +394,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -441,7 +421,7 @@ TEST_F(TxnAPITest, OwnSession_AbortsOnError) {
});
ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError);
- expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
+ expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
}
TEST_F(TxnAPITest, OwnSession_SkipsCommitIfNoCommandsWereRun) {
@@ -466,9 +446,7 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
attempt += 1;
if (attempt > 0) {
// Verify an abort was sent in between retries.
- expectSentAbort(attempt - 1 /* txnNumber */,
- 0 /* txnRetryCounter */,
- WriteConcernOptions().toBSON());
+ expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
}
mockClient()->setNextCommandResponse(attempt == 0 ? kNoSuchTransactionResponse
@@ -492,7 +470,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
true /* startTransaction */);
return SemiFuture<void>::makeReady();
@@ -503,7 +480,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -517,9 +493,7 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
attempt += 1;
if (attempt > 0) {
// Verify an abort was sent in between retries.
- expectSentAbort(attempt - 1 /* txnNumber */,
- 0 /* txnRetryCounter */,
- WriteConcernOptions().toBSON());
+ expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
}
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -534,7 +508,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
true /* startTransaction */);
// The commit or implicit abort response.
@@ -549,7 +522,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -569,10 +541,8 @@ TEST_F(TxnAPITest, OwnSession_CommitError) {
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
- assertTxnMetadata(mockClient()->getLastSentRequest(),
- 0 /* txnNumber */,
- 0 /* txnRetryCounter */,
- true /* startTransaction */);
+ assertTxnMetadata(
+ mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
// The commit response.
mockClient()->setNextCommandResponse(
@@ -587,7 +557,7 @@ TEST_F(TxnAPITest, OwnSession_CommitError) {
ASSERT(swResult.getValue().wcError.toStatus().isOK());
ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::InternalError);
- expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
+ expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
}
TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
@@ -597,9 +567,7 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
attempt += 1;
if (attempt > 0) {
// Verify an abort was sent in between retries.
- expectSentAbort(attempt - 1 /* txnNumber */,
- 0 /* txnRetryCounter */,
- WriteConcernOptions().toBSON());
+ expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
}
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -614,7 +582,6 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
true /* startTransaction */);
// Set commit and best effort abort response, if necessary.
@@ -632,7 +599,6 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -652,10 +618,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) {
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
- assertTxnMetadata(mockClient()->getLastSentRequest(),
- 0 /* txnNumber */,
- 0 /* txnRetryCounter */,
- true /* startTransaction */);
+ assertTxnMetadata(
+ mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
// The commit response.
mockClient()->setNextCommandResponse(
@@ -669,7 +633,6 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
0 /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -687,10 +650,8 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) {
<< "documents" << BSON_ARRAY(BSON("x" << 1))))
.get();
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
- assertTxnMetadata(mockClient()->getLastSentRequest(),
- 0 /* txnNumber */,
- 0 /* txnRetryCounter */,
- true /* startTransaction */);
+ assertTxnMetadata(
+ mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
// The commit response.
mockClient()->setNextCommandResponse(kResWithWriteConcernError);
@@ -703,7 +664,7 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) {
ASSERT_EQ(swResult.getValue().wcError.toStatus(), ErrorCodes::WriteConcernFailed);
ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::WriteConcernFailed);
- expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
+ expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
}
TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
@@ -719,10 +680,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
- assertTxnMetadata(mockClient()->getLastSentRequest(),
- 0 /* txnNumber */,
- 0 /* txnRetryCounter */,
- true /* startTransaction */);
+ assertTxnMetadata(
+ mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
// The commit responses.
mockClient()->setNextCommandResponse(kResWithRetryableWriteConcernError);
@@ -735,7 +694,6 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
0 /* txnNumber */,
- 0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index c5e73bfe707..a4d47c09825 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -62,11 +62,9 @@
namespace mongo {
-void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
- const BSONElement& wcErrorElem,
- BSONObjBuilder& responseBuilder) {
- WriteConcernErrorDetail wcError = getWriteConcernErrorDetail(wcErrorElem);
-
+void appendWriteConcernErrorDetailToCmdResponse(const ShardId& shardId,
+ WriteConcernErrorDetail wcError,
+ BSONObjBuilder& responseBuilder) {
auto status = wcError.toStatus();
wcError.setStatus(
status.withReason(str::stream() << status.reason() << " at " << shardId.toString()));
@@ -74,6 +72,13 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
responseBuilder.append("writeConcernError", wcError.toBSON());
}
+void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
+ const BSONElement& wcErrorElem,
+ BSONObjBuilder& responseBuilder) {
+ WriteConcernErrorDetail wcError = getWriteConcernErrorDetail(wcErrorElem);
+ appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, responseBuilder);
+}
+
boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTargeter(
OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h
index d6443df0407..b5872e4b933 100644
--- a/src/mongo/s/cluster_commands_helpers.h
+++ b/src/mongo/s/cluster_commands_helpers.h
@@ -53,6 +53,13 @@ struct RawResponsesResult {
};
/**
+ * This function appends the provided WriteConcernErrorDetail to the sharded response.
+ */
+void appendWriteConcernErrorDetailToCmdResponse(const ShardId& shardId,
+ WriteConcernErrorDetail wcError,
+ BSONObjBuilder& responseBuilder);
+
+/**
* This function appends the provided writeConcernError BSONElement to the sharded response.
*/
void appendWriteConcernErrorToCmdResponse(const ShardId& shardID,
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index cc9c85d7601..90025abf11c 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -119,6 +119,7 @@ env.Library(
'$BUILD_DIR/mongo/db/ftdc/ftdc_server',
'$BUILD_DIR/mongo/db/index_commands_idl',
'$BUILD_DIR/mongo/db/initialize_api_parameters',
+ '$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/query/cursor_response_idl',
'$BUILD_DIR/mongo/db/query/map_reduce_output_format',
@@ -131,6 +132,7 @@ env.Library(
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util',
'$BUILD_DIR/mongo/db/timeseries/timeseries_options',
+ '$BUILD_DIR/mongo/db/transaction_api',
'$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/executor/async_multicaster',
'$BUILD_DIR/mongo/executor/async_request_executor',
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 23672e2f418..f62bbc90e53 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include "mongo/base/status_with.h"
@@ -38,9 +40,13 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/update_metrics.h"
+#include "mongo/db/internal_transactions_feature_flag_gen.h"
+#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
+#include "mongo/db/transaction_api.h"
#include "mongo/executor/task_executor_pool.h"
+#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog_cache.h"
@@ -122,6 +128,41 @@ BSONObj getShardKey(OperationContext* opCtx,
return shardKey;
}
+void handleWouldChangeOwningShardErrorRetryableWrite(
+ OperationContext* opCtx,
+ const ShardId& shardId,
+ const NamespaceString& nss,
+ const write_ops::FindAndModifyCommandRequest& request,
+ BSONObjBuilder* result) {
+ auto txn = std::make_shared<txn_api::TransactionWithRetries>(
+ opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
+
+ auto swResult = txn->runSyncNoThrow(
+ opCtx,
+ [cmdObj = request.toBSON({}), nss, result](const txn_api::TransactionClient& txnClient,
+ ExecutorPtr txnExec) {
+ auto res = txnClient.runCommand(nss.db(), cmdObj).get();
+ uassertStatusOK(getStatusFromCommandResult(res));
+
+ result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(res));
+
+ return SemiFuture<void>::makeReady();
+ });
+
+ auto cmdStatus = swResult.getStatus();
+ if (cmdStatus != ErrorCodes::DuplicateKey ||
+ (cmdStatus == ErrorCodes::DuplicateKey &&
+ !cmdStatus.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
+ cmdStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
+ };
+ uassertStatusOK(cmdStatus);
+
+ const auto& wcError = swResult.getValue().wcError;
+ if (!wcError.toStatus().isOK()) {
+ appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, *result);
+ }
+}
+
void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx,
const NamespaceString nss,
Status responseStatus,
@@ -402,47 +443,23 @@ private:
if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) {
if (isRetryableWrite) {
- RouterOperationContextSession routerSession(opCtx);
- try {
- auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- readConcernArgs =
- repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
-
- // Re-run the findAndModify command that will change the shard key value in a
- // transaction. We call _runCommand recursively, and this second time through
- // since it will be run as a transaction it will take the other code path to
- // updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried
- // operation does not include WC inside the transaction by stripping it from the
- // cmdObj. The transaction commit will still use the WC, because it uses the WC
- // from the opCtx (which has been set previously in Strategy).
- documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
- _runCommand(opCtx,
- shardId,
- shardVersion,
- dbVersion,
- nss,
- stripWriteConcern(cmdObj),
- result);
- uassertStatusOK(getStatusFromCommandResult(result->asTempObj()));
- auto commitResponse =
- documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
-
- uassertStatusOK(getStatusFromCommandResult(commitResponse));
- if (auto wcErrorElem = commitResponse["writeConcernError"]) {
- appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
- }
- } catch (DBException& e) {
- if (e.code() != ErrorCodes::DuplicateKey ||
- (e.code() == ErrorCodes::DuplicateKey &&
- !e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
- e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
- };
-
- auto txnRouterForAbort = TransactionRouter::get(opCtx);
- if (txnRouterForAbort)
- txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus());
-
- throw;
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse(
+ IDLParserErrorContext("ClusterFindAndModify"), cmdObj);
+ // Strip write concern because this command will be sent as part of a
+ // transaction and the write concern has already been loaded onto the opCtx and
+ // will be picked up by the transaction API.
+ //
+ // Strip runtime constants because they will be added again when this command is
+ // recursively sent through the service entry point.
+ parsedRequest.setWriteConcern(boost::none);
+ parsedRequest.setLegacyRuntimeConstants(boost::none);
+ handleWouldChangeOwningShardErrorRetryableWrite(
+ opCtx, shardId, nss, parsedRequest, result);
+ } else {
+ _handleWouldChangeOwningShardErrorRetryableWriteLegacy(
+ opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result);
}
} else {
updateShardKeyValueOnWouldChangeOwningShardError(
@@ -462,6 +479,53 @@ private:
CommandHelpers::filterCommandReplyForPassthrough(response.data));
}
+ // TODO SERVER-62375: Remove after 6.0 is released.
+ static void _handleWouldChangeOwningShardErrorRetryableWriteLegacy(
+ OperationContext* opCtx,
+ const ShardId& shardId,
+ const boost::optional<ChunkVersion>& shardVersion,
+ const boost::optional<DatabaseVersion>& dbVersion,
+ const NamespaceString& nss,
+ const BSONObj& cmdObj,
+ BSONObjBuilder* result) {
+ RouterOperationContextSession routerSession(opCtx);
+ try {
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ // Re-run the findAndModify command that will change the shard key value in a
+ // transaction. We call _runCommand recursively, and this second time through
+ // since it will be run as a transaction it will take the other code path to
+ // updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried
+ // operation does not include WC inside the transaction by stripping it from the
+ // cmdObj. The transaction commit will still use the WC, because it uses the WC
+ // from the opCtx (which has been set previously in Strategy).
+ documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
+ _runCommand(
+ opCtx, shardId, shardVersion, dbVersion, nss, stripWriteConcern(cmdObj), result);
+ uassertStatusOK(getStatusFromCommandResult(result->asTempObj()));
+ auto commitResponse =
+ documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
+
+ uassertStatusOK(getStatusFromCommandResult(commitResponse));
+ if (auto wcErrorElem = commitResponse["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
+ }
+ } catch (DBException& e) {
+ if (e.code() != ErrorCodes::DuplicateKey ||
+ (e.code() == ErrorCodes::DuplicateKey &&
+ !e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
+ e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
+ };
+
+ auto txnRouterForAbort = TransactionRouter::get(opCtx);
+ if (txnRouterForAbort)
+ txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus());
+
+ throw;
+ }
+ }
+
// Update related command execution metrics.
UpdateMetrics _updateMetrics;
} findAndModifyCmd;