diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2022-02-07 15:46:37 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-07 19:21:55 +0000 |
commit | cebfa751cb5db526fcba92dc2c1a69ed507a075b (patch) | |
tree | 9d5a8f9cd6ea0e0c4b21adf20737fe6e7cd01551 /src/mongo | |
parent | 7a2d86c376488cc756c0325e6edaf3406a86ec5d (diff) | |
download | mongo-cebfa751cb5db526fcba92dc2c1a69ed507a075b.tar.gz |
SERVER-63364 Use transaction API to handle WCOS errors for findAndModify without retryability
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.h | 12 | ||||
-rw-r--r-- | src/mongo/db/transaction_api_test.cpp | 78 | ||||
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.cpp | 15 | ||||
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.h | 7 | ||||
-rw-r--r-- | src/mongo/s/commands/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_and_modify_cmd.cpp | 146 |
8 files changed, 167 insertions, 143 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f9454dfe6d5..e6abcb42581 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -839,7 +839,6 @@ env.Library( 'repl/repl_coordinator_interface', 'service_context', 'shared_request_handling', - 'write_ops', ], ) diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp index 0d6a7637541..8c4fe8c114c 100644 --- a/src/mongo/db/transaction_api.cpp +++ b/src/mongo/db/transaction_api.cpp @@ -48,6 +48,7 @@ #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/reply_interface.h" +#include "mongo/s/is_mongos.h" #include "mongo/stdx/future.h" #include "mongo/transport/service_entry_point.h" @@ -297,7 +298,7 @@ SemiFuture<BSONObj> Transaction::_commitOrAbort(StringData dbName, StringData cm BSONObjBuilder cmdBuilder; cmdBuilder.append(cmdName, 1); - cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern.toBSON()); + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern); auto cmdObj = cmdBuilder.obj(); return _txnClient->runCommand(dbName, cmdObj).semi(); @@ -363,7 +364,7 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) { if (_state == TransactionState::kInit) { _state = TransactionState::kStarted; _sessionInfo.setStartTransaction(boost::none); - cmdBuilder->append(_readConcern.toBSON().firstElement()); + cmdBuilder->append(repl::ReadConcernArgs::kReadConcernFieldName, _readConcern); } _latestResponseHasTransientTransactionErrorLabel = false; @@ -379,32 +380,19 @@ void Transaction::processResponse(const BSONObj& reply) { } } -void Transaction::_setSessionInfo(LogicalSessionId lsid, - TxnNumber txnNumber, - boost::optional<TxnRetryCounter> txnRetryCounter) { +void Transaction::_setSessionInfo(LogicalSessionId lsid, TxnNumber txnNumber) { _sessionInfo.setSessionId(lsid); _sessionInfo.setTxnNumber(txnNumber); - _sessionInfo.setTxnRetryCounter(txnRetryCounter ? *txnRetryCounter : 0); } void Transaction::primeForTransactionRetry() { _latestResponseHasTransientTransactionErrorLabel = false; switch (_execContext) { case ExecutionContext::kOwnSession: - // Advance txnNumber. - _sessionInfo.setTxnNumber(*_sessionInfo.getTxnNumber() + 1); - _sessionInfo.setStartTransaction(true); - _state = TransactionState::kInit; - return; case ExecutionContext::kClientSession: - // Advance txnRetryCounter. - _sessionInfo.setTxnRetryCounter(*_sessionInfo.getTxnRetryCounter() + 1); - _sessionInfo.setStartTransaction(true); - _state = TransactionState::kInit; - return; case ExecutionContext::kClientRetryableWrite: - // Advance txnRetryCounter. - _sessionInfo.setTxnRetryCounter(*_sessionInfo.getTxnRetryCounter() + 1); + // Advance txnNumber. + _sessionInfo.setTxnNumber(*_sessionInfo.getTxnNumber() + 1); _sessionInfo.setStartTransaction(true); _state = TransactionState::kInit; return; @@ -431,27 +419,27 @@ void Transaction::_primeTransaction(OperationContext* opCtx) { auto clientSession = opCtx->getLogicalSessionId(); auto clientTxnNumber = opCtx->getTxnNumber(); auto clientInMultiDocumentTransaction = opCtx->inMultiDocumentTransaction(); - auto clientTxnRetryCounter = opCtx->getTxnRetryCounter(); if (!clientSession) { // TODO SERVER-61783: Integrate session pool. - _setSessionInfo(makeLogicalSessionId(opCtx), 0, 0); + _setSessionInfo(makeLogicalSessionId(opCtx), 0 /* txnNumber */); _execContext = ExecutionContext::kOwnSession; } else if (!clientTxnNumber) { - _setSessionInfo(makeLogicalSessionIdWithTxnUUID(*clientSession), 0, 0); + _setSessionInfo(makeLogicalSessionIdWithTxnUUID(*clientSession), 0 /* txnNumber */); _execContext = ExecutionContext::kClientSession; // TODO SERVER-59186: Handle client session case. MONGO_UNREACHABLE; } else if (!clientInMultiDocumentTransaction) { - _setSessionInfo( - makeLogicalSessionIdWithTxnNumberAndUUID(*clientSession, *clientTxnNumber), 0, 0); + _setSessionInfo(makeLogicalSessionIdWithTxnNumberAndUUID(*clientSession, *clientTxnNumber), + 0 /* txnNumber */); _execContext = ExecutionContext::kClientRetryableWrite; - // TODO SERVER-59186: Handle client retryable write case. - MONGO_UNREACHABLE; + // TODO SERVER-59186: Handle client retryable write case on mongod. This is different from + // mongos because only mongod checks out a transaction session for retryable writes. + invariant(isMongos(), "This case is not yet supported on a mongod"); } else { - _setSessionInfo(*clientSession, *clientTxnNumber, clientTxnRetryCounter); + _setSessionInfo(*clientSession, *clientTxnNumber); _execContext = ExecutionContext::kClientTransaction; // TODO SERVER-59186: Handle client transaction case. @@ -460,9 +448,12 @@ void Transaction::_primeTransaction(OperationContext* opCtx) { _sessionInfo.setStartTransaction(true); _sessionInfo.setAutocommit(false); - // Extract non-session options. - _readConcern = repl::ReadConcernArgs::get(opCtx); - _writeConcern = opCtx->getWriteConcern(); + // Extract non-session options. Strip provenance so it can be correctly inferred for the + // generated commands as if it came from an external client. + _readConcern = repl::ReadConcernArgs::get(opCtx).toBSONInner().removeField( + ReadWriteConcernProvenanceBase::kSourceFieldName); + _writeConcern = opCtx->getWriteConcern().toBSON().removeField( + ReadWriteConcernProvenanceBase::kSourceFieldName); LOGV2_DEBUG(5875901, 0, // TODO SERVER-61781: Raise verbosity. diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h index d09d83fc214..7c0684bef39 100644 --- a/src/mongo/db/transaction_api.h +++ b/src/mongo/db/transaction_api.h @@ -282,8 +282,8 @@ public: /** * Handles the given transaction result based on where the transaction is in its lifecycle and - * its execution context, e.g. by updating its txnNumber or txnRetryCounter, and returns the - * next step for the transaction runner. + * its execution context, e.g. by updating its txnNumber, and returns the next step for the + * transaction runner. */ ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult) const; @@ -318,9 +318,7 @@ private: return std::make_unique<TxnMetadataHooks>(*this); } - void _setSessionInfo(LogicalSessionId lsid, - TxnNumber txnNumber, - boost::optional<TxnRetryCounter> txnRetryCounter); + void _setSessionInfo(LogicalSessionId lsid, TxnNumber txnNumber); SemiFuture<BSONObj> _commitOrAbort(StringData dbName, StringData cmdName); @@ -337,8 +335,8 @@ private: bool _latestResponseHasTransientTransactionErrorLabel{false}; OperationSessionInfo _sessionInfo; - repl::ReadConcernArgs _readConcern; - WriteConcernOptions _writeConcern; + BSONObj _writeConcern; + BSONObj _readConcern; ExecutionContext _execContext; TransactionState _state{TransactionState::kInit}; }; diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp index 9a6d2f8d25f..3b5e19d276b 100644 --- a/src/mongo/db/transaction_api_test.cpp +++ b/src/mongo/db/transaction_api_test.cpp @@ -121,7 +121,6 @@ namespace { void assertTxnMetadata(BSONObj obj, TxnNumber txnNumber, - boost::optional<TxnRetryCounter> txnRetryCounter, boost::optional<bool> startTransaction, boost::optional<BSONObj> readConcern = boost::none, boost::optional<BSONObj> writeConcern = boost::none) { @@ -129,12 +128,6 @@ void assertTxnMetadata(BSONObj obj, ASSERT_EQ(obj["autocommit"].Bool(), false); ASSERT_EQ(obj["txnNumber"].Long(), txnNumber); - if (txnRetryCounter) { - ASSERT_EQ(obj["txnRetryCounter"].Int(), *txnRetryCounter); - } else { - ASSERT(obj["txnRetryCounter"].eoo()); - } - if (startTransaction) { ASSERT_EQ(obj["startTransaction"].Bool(), *startTransaction); } else { @@ -190,13 +183,10 @@ protected: opCtx(), InlineQueuedCountingExecutor::make(), std::move(mockClient)); } - void expectSentAbort(TxnNumber txnNumber, - TxnRetryCounter txnRetryCounter, - BSONObj writeConcern) { + void expectSentAbort(TxnNumber txnNumber, BSONObj writeConcern) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, txnNumber, - txnRetryCounter, boost::none /* startTransaction */, boost::none /* readConcern */, writeConcern); @@ -220,10 +210,8 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) { << "documents" << BSON_ARRAY(BSON("x" << 1)))) .get(); ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - 0 /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + assertTxnMetadata( + mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */); mockClient()->setNextCommandResponse(kOKInsertResponse); insertRes = txnClient @@ -235,7 +223,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) { ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), 0 /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */); // The commit response. @@ -248,7 +235,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */); @@ -284,7 +270,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) { ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), attempt /* txnNumber */, - 0 /* txnRetryCounter */, true /* startTransaction */); mockClient()->setNextCommandResponse(kOKInsertResponse); @@ -297,7 +282,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) { ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), attempt /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */); // Throw a transient error to verify the retries behavior. @@ -313,7 +297,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */, boost::none /* readConcern */, writeConcern.toBSON()); @@ -352,7 +335,7 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnAbort) { }); ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError); - expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, writeConcern.toBSON()); + expectSentAbort(0 /* txnNumber */, writeConcern.toBSON()); } } @@ -382,7 +365,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) { ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), attempt /* txnNumber */, - 0 /* txnRetryCounter */, true /* startTransaction */, readConcern.toBSONInner()); @@ -397,7 +379,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) { ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), attempt /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */); // Throw a transient error to verify the retry will still use the read concern. @@ -413,7 +394,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */); @@ -441,7 +421,7 @@ TEST_F(TxnAPITest, OwnSession_AbortsOnError) { }); ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError); - expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON()); + expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON()); } TEST_F(TxnAPITest, OwnSession_SkipsCommitIfNoCommandsWereRun) { @@ -466,9 +446,7 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) { attempt += 1; if (attempt > 0) { // Verify an abort was sent in between retries. - expectSentAbort(attempt - 1 /* txnNumber */, - 0 /* txnRetryCounter */, - WriteConcernOptions().toBSON()); + expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON()); } mockClient()->setNextCommandResponse(attempt == 0 ? kNoSuchTransactionResponse @@ -492,7 +470,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) { ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), attempt /* txnNumber */, - 0 /* txnRetryCounter */, true /* startTransaction */); return SemiFuture<void>::makeReady(); @@ -503,7 +480,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */); @@ -517,9 +493,7 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) { attempt += 1; if (attempt > 0) { // Verify an abort was sent in between retries. - expectSentAbort(attempt - 1 /* txnNumber */, - 0 /* txnRetryCounter */, - WriteConcernOptions().toBSON()); + expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON()); } mockClient()->setNextCommandResponse(kOKInsertResponse); @@ -534,7 +508,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) { ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), attempt /* txnNumber */, - 0 /* txnRetryCounter */, true /* startTransaction */); // The commit or implicit abort response. @@ -549,7 +522,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */); @@ -569,10 +541,8 @@ TEST_F(TxnAPITest, OwnSession_CommitError) { ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - 0 /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + assertTxnMetadata( + mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */); // The commit response. mockClient()->setNextCommandResponse( @@ -587,7 +557,7 @@ TEST_F(TxnAPITest, OwnSession_CommitError) { ASSERT(swResult.getValue().wcError.toStatus().isOK()); ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::InternalError); - expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON()); + expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON()); } TEST_F(TxnAPITest, OwnSession_TransientCommitError) { @@ -597,9 +567,7 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) { attempt += 1; if (attempt > 0) { // Verify an abort was sent in between retries. - expectSentAbort(attempt - 1 /* txnNumber */, - 0 /* txnRetryCounter */, - WriteConcernOptions().toBSON()); + expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON()); } mockClient()->setNextCommandResponse(kOKInsertResponse); @@ -614,7 +582,6 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) { ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), attempt /* txnNumber */, - 0 /* txnRetryCounter */, true /* startTransaction */); // Set commit and best effort abort response, if necessary. @@ -632,7 +599,6 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */); @@ -652,10 +618,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) { ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - 0 /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + assertTxnMetadata( + mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */); // The commit response. mockClient()->setNextCommandResponse( @@ -669,7 +633,6 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */); @@ -687,10 +650,8 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) { << "documents" << BSON_ARRAY(BSON("x" << 1)))) .get(); ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - 0 /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + assertTxnMetadata( + mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */); // The commit response. mockClient()->setNextCommandResponse(kResWithWriteConcernError); @@ -703,7 +664,7 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) { ASSERT_EQ(swResult.getValue().wcError.toStatus(), ErrorCodes::WriteConcernFailed); ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::WriteConcernFailed); - expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON()); + expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON()); } TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) { @@ -719,10 +680,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) { ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - 0 /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + assertTxnMetadata( + mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */); // The commit responses. mockClient()->setNextCommandResponse(kResWithRetryableWriteConcernError); @@ -735,7 +694,6 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, - 0 /* txnRetryCounter */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */); diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index c5e73bfe707..a4d47c09825 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -62,11 +62,9 @@ namespace mongo { -void appendWriteConcernErrorToCmdResponse(const ShardId& shardId, - const BSONElement& wcErrorElem, - BSONObjBuilder& responseBuilder) { - WriteConcernErrorDetail wcError = getWriteConcernErrorDetail(wcErrorElem); - +void appendWriteConcernErrorDetailToCmdResponse(const ShardId& shardId, + WriteConcernErrorDetail wcError, + BSONObjBuilder& responseBuilder) { auto status = wcError.toStatus(); wcError.setStatus( status.withReason(str::stream() << status.reason() << " at " << shardId.toString())); @@ -74,6 +72,13 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardId, responseBuilder.append("writeConcernError", wcError.toBSON()); } +void appendWriteConcernErrorToCmdResponse(const ShardId& shardId, + const BSONElement& wcErrorElem, + BSONObjBuilder& responseBuilder) { + WriteConcernErrorDetail wcError = getWriteConcernErrorDetail(wcErrorElem); + appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, responseBuilder); +} + boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTargeter( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index d6443df0407..b5872e4b933 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -53,6 +53,13 @@ struct RawResponsesResult { }; /** + * This function appends the provided WriteConcernErrorDetail to the sharded response. + */ +void appendWriteConcernErrorDetailToCmdResponse(const ShardId& shardId, + WriteConcernErrorDetail wcError, + BSONObjBuilder& responseBuilder); + +/** * This function appends the provided writeConcernError BSONElement to the sharded response. */ void appendWriteConcernErrorToCmdResponse(const ShardId& shardID, diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index cc9c85d7601..90025abf11c 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -119,6 +119,7 @@ env.Library( '$BUILD_DIR/mongo/db/ftdc/ftdc_server', '$BUILD_DIR/mongo/db/index_commands_idl', '$BUILD_DIR/mongo/db/initialize_api_parameters', + '$BUILD_DIR/mongo/db/internal_transactions_feature_flag', '$BUILD_DIR/mongo/db/query/command_request_response', '$BUILD_DIR/mongo/db/query/cursor_response_idl', '$BUILD_DIR/mongo/db/query/map_reduce_output_format', @@ -131,6 +132,7 @@ env.Library( '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util', '$BUILD_DIR/mongo/db/timeseries/timeseries_options', + '$BUILD_DIR/mongo/db/transaction_api', '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/executor/async_multicaster', '$BUILD_DIR/mongo/executor/async_request_executor', diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 23672e2f418..f62bbc90e53 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + #include "mongo/platform/basic.h" #include "mongo/base/status_with.h" @@ -38,9 +40,13 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands.h" #include "mongo/db/commands/update_metrics.h" +#include "mongo/db/internal_transactions_feature_flag_gen.h" +#include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/storage/duplicate_key_error_info.h" +#include "mongo/db/transaction_api.h" #include "mongo/executor/task_executor_pool.h" +#include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog_cache.h" @@ -122,6 +128,41 @@ BSONObj getShardKey(OperationContext* opCtx, return shardKey; } +void handleWouldChangeOwningShardErrorRetryableWrite( + OperationContext* opCtx, + const ShardId& shardId, + const NamespaceString& nss, + const write_ops::FindAndModifyCommandRequest& request, + BSONObjBuilder* result) { + auto txn = std::make_shared<txn_api::TransactionWithRetries>( + opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); + + auto swResult = txn->runSyncNoThrow( + opCtx, + [cmdObj = request.toBSON({}), nss, result](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + auto res = txnClient.runCommand(nss.db(), cmdObj).get(); + uassertStatusOK(getStatusFromCommandResult(res)); + + result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(res)); + + return SemiFuture<void>::makeReady(); + }); + + auto cmdStatus = swResult.getStatus(); + if (cmdStatus != ErrorCodes::DuplicateKey || + (cmdStatus == ErrorCodes::DuplicateKey && + !cmdStatus.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) { + cmdStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); + }; + uassertStatusOK(cmdStatus); + + const auto& wcError = swResult.getValue().wcError; + if (!wcError.toStatus().isOK()) { + appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, *result); + } +} + void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, const NamespaceString nss, Status responseStatus, @@ -402,47 +443,23 @@ private: if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) { if (isRetryableWrite) { - RouterOperationContextSession routerSession(opCtx); - try { - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - readConcernArgs = - repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - - // Re-run the findAndModify command that will change the shard key value in a - // transaction. We call _runCommand recursively, and this second time through - // since it will be run as a transaction it will take the other code path to - // updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried - // operation does not include WC inside the transaction by stripping it from the - // cmdObj. The transaction commit will still use the WC, because it uses the WC - // from the opCtx (which has been set previously in Strategy). - documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); - _runCommand(opCtx, - shardId, - shardVersion, - dbVersion, - nss, - stripWriteConcern(cmdObj), - result); - uassertStatusOK(getStatusFromCommandResult(result->asTempObj())); - auto commitResponse = - documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); - - uassertStatusOK(getStatusFromCommandResult(commitResponse)); - if (auto wcErrorElem = commitResponse["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); - } - } catch (DBException& e) { - if (e.code() != ErrorCodes::DuplicateKey || - (e.code() == ErrorCodes::DuplicateKey && - !e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) { - e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); - }; - - auto txnRouterForAbort = TransactionRouter::get(opCtx); - if (txnRouterForAbort) - txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus()); - - throw; + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse( + IDLParserErrorContext("ClusterFindAndModify"), cmdObj); + // Strip write concern because this command will be sent as part of a + // transaction and the write concern has already been loaded onto the opCtx and + // will be picked up by the transaction API. + // + // Strip runtime constants because they will be added again when this command is + // recursively sent through the service entry point. + parsedRequest.setWriteConcern(boost::none); + parsedRequest.setLegacyRuntimeConstants(boost::none); + handleWouldChangeOwningShardErrorRetryableWrite( + opCtx, shardId, nss, parsedRequest, result); + } else { + _handleWouldChangeOwningShardErrorRetryableWriteLegacy( + opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result); } } else { updateShardKeyValueOnWouldChangeOwningShardError( @@ -462,6 +479,53 @@ private: CommandHelpers::filterCommandReplyForPassthrough(response.data)); } + // TODO SERVER-62375: Remove after 6.0 is released. + static void _handleWouldChangeOwningShardErrorRetryableWriteLegacy( + OperationContext* opCtx, + const ShardId& shardId, + const boost::optional<ChunkVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion, + const NamespaceString& nss, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + RouterOperationContextSession routerSession(opCtx); + try { + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + // Re-run the findAndModify command that will change the shard key value in a + // transaction. We call _runCommand recursively, and this second time through + // since it will be run as a transaction it will take the other code path to + // updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried + // operation does not include WC inside the transaction by stripping it from the + // cmdObj. The transaction commit will still use the WC, because it uses the WC + // from the opCtx (which has been set previously in Strategy). + documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); + _runCommand( + opCtx, shardId, shardVersion, dbVersion, nss, stripWriteConcern(cmdObj), result); + uassertStatusOK(getStatusFromCommandResult(result->asTempObj())); + auto commitResponse = + documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); + + uassertStatusOK(getStatusFromCommandResult(commitResponse)); + if (auto wcErrorElem = commitResponse["writeConcernError"]) { + appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result); + } + } catch (DBException& e) { + if (e.code() != ErrorCodes::DuplicateKey || + (e.code() == ErrorCodes::DuplicateKey && + !e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) { + e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext); + }; + + auto txnRouterForAbort = TransactionRouter::get(opCtx); + if (txnRouterForAbort) + txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus()); + + throw; + } + } + // Update related command execution metrics. UpdateMetrics _updateMetrics; } findAndModifyCmd; |