diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2021-12-15 21:53:03 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-11 16:35:16 +0000 |
commit | 89bc7e2241fd1e2fcd34f7226a5d5f50b543d33f (patch) | |
tree | 50f0e8c2da55a8895a254b9b08cc98b340abf185 /src/mongo | |
parent | 8c6a0ab5795c16e7c236d96de17cbd6060657020 (diff) | |
download | mongo-89bc7e2241fd1e2fcd34f7226a5d5f50b543d33f.tar.gz |
SERVER-62516 Return commit result from internal transaction API
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/transaction_api.cpp | 167 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.h | 68 | ||||
-rw-r--r-- | src/mongo/db/transaction_api_test.cpp | 507 | ||||
-rw-r--r-- | src/mongo/rpc/write_concern_error_detail.cpp | 29 | ||||
-rw-r--r-- | src/mongo/rpc/write_concern_error_detail.h | 10 | ||||
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.cpp | 28 |
6 files changed, 484 insertions, 325 deletions
diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp index 198cf5065e2..379dcb90eb6 100644 --- a/src/mongo/db/transaction_api.cpp +++ b/src/mongo/db/transaction_api.cpp @@ -53,48 +53,55 @@ namespace mongo::txn_api { -void TransactionWithRetries::runSync(OperationContext* opCtx, TxnCallback func) { +StatusWith<CommitResult> TransactionWithRetries::runSyncNoThrow(OperationContext* opCtx, + TxnCallback func) noexcept { // TODO SERVER-59566 Add a retry policy. while (true) { - try { - ExecutorFuture<void>(_executor) - .then([this, anchor = shared_from_this(), &func] { - return func(_internalTxn->getClient(), _executor); - }) - .get(opCtx); - } catch (const DBException& e) { - auto nextStep = _internalTxn->handleError(e.toStatus()); - switch (nextStep) { - case details::Transaction::ErrorHandlingStep::kDoNotRetry: - _bestEffortAbort(opCtx); - throw; - case details::Transaction::ErrorHandlingStep::kRetryTransaction: - continue; - case details::Transaction::ErrorHandlingStep::kRetryCommit: - MONGO_UNREACHABLE; - } - } - - while (true) { - try { - ExecutorFuture<void>(_executor) - .then([this, anchor = shared_from_this()] { return _internalTxn->commit(); }) - .get(opCtx); - return; - } catch (const DBException& e) { - auto nextStep = _internalTxn->handleError(e.toStatus()); + { + auto bodyStatus = ExecutorFuture<void>(_executor) + .then([this, anchor = shared_from_this(), &func] { + return func(_internalTxn->getClient(), _executor); + }) + .getNoThrow(opCtx); + + if (!bodyStatus.isOK()) { + auto nextStep = _internalTxn->handleError(bodyStatus); switch (nextStep) { case details::Transaction::ErrorHandlingStep::kDoNotRetry: _bestEffortAbort(opCtx); - throw; + return bodyStatus; case details::Transaction::ErrorHandlingStep::kRetryTransaction: - break; - case details::Transaction::ErrorHandlingStep::kRetryCommit: continue; + case details::Transaction::ErrorHandlingStep::kRetryCommit: + MONGO_UNREACHABLE; } } } + + while (true) { + auto swResult = + ExecutorFuture<void>(_executor) + .then([this, anchor = shared_from_this()] { return _internalTxn->commit(); }) + .getNoThrow(opCtx); + + if (swResult.isOK() && swResult.getValue().getEffectiveStatus().isOK()) { + // Commit succeeded so return to the caller. + return swResult; + } + + auto nextStep = _internalTxn->handleError(swResult); + switch (nextStep) { + case details::Transaction::ErrorHandlingStep::kDoNotRetry: + _bestEffortAbort(opCtx); + return swResult; + case details::Transaction::ErrorHandlingStep::kRetryTransaction: + break; + case details::Transaction::ErrorHandlingStep::kRetryCommit: + continue; + } + } } + MONGO_UNREACHABLE; } void TransactionWithRetries::_bestEffortAbort(OperationContext* opCtx) { @@ -178,17 +185,31 @@ SemiFuture<std::vector<BSONObj>> SEPTransactionClient::exhaustiveFind( .semi(); } -SemiFuture<void> Transaction::commit() { - return _commitOrAbort(NamespaceString::kAdminDb, CommitTransaction::kCommandName); +SemiFuture<CommitResult> Transaction::commit() { + return _commitOrAbort(NamespaceString::kAdminDb, CommitTransaction::kCommandName) + .thenRunOn(_executor) + .then([this](BSONObj res) { + auto wcErrorHolder = getWriteConcernErrorDetailFromBSONObj(res); + WriteConcernErrorDetail wcError; + if (wcErrorHolder) { + wcErrorHolder->cloneTo(&wcError); + } + return CommitResult{getStatusFromCommandResult(res), wcError}; + }) + .semi(); } SemiFuture<void> Transaction::abort() { - return _commitOrAbort(NamespaceString::kAdminDb, AbortTransaction::kCommandName); + return _commitOrAbort(NamespaceString::kAdminDb, AbortTransaction::kCommandName) + .thenRunOn(_executor) + .then([this](BSONObj res) { + uassertStatusOK(getStatusFromCommandResult(res)); + uassertStatusOK(getWriteConcernStatusFromCommandResult(res)); + }) + .semi(); } -SemiFuture<void> Transaction::_commitOrAbort(StringData dbName, StringData cmdName) { - uassert(5875904, "Internal transaction already completed", _state != TransactionState::kDone); - +SemiFuture<BSONObj> Transaction::_commitOrAbort(StringData dbName, StringData cmdName) { if (_state == TransactionState::kInit) { LOGV2_DEBUG(5875903, 0, // TODO SERVER-61781: Raise verbosity. @@ -196,7 +217,7 @@ SemiFuture<void> Transaction::_commitOrAbort(StringData dbName, StringData cmdNa "cmdName"_attr = cmdName, "sessionInfo"_attr = _sessionInfo, "execContext"_attr = _execContextToString(_execContext)); - return SemiFuture<void>::makeReady(); + return BSON("ok" << 1); } uassert(5875902, "Internal transaction not in progress", @@ -218,23 +239,17 @@ SemiFuture<void> Transaction::_commitOrAbort(StringData dbName, StringData cmdNa cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern.toBSON()); auto cmdObj = cmdBuilder.obj(); - return _txnClient->runCommand(dbName, cmdObj) - .thenRunOn(_executor) - .then([this](BSONObj res) { - uassertStatusOK(getStatusFromCommandResult(res)); - uassertStatusOK(getWriteConcernStatusFromCommandResult(res)); - _state = TransactionState::kDone; - }) - .semi(); + return _txnClient->runCommand(dbName, cmdObj).semi(); } -Transaction::ErrorHandlingStep Transaction::handleError(Status clientStatus) { +Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitResult>& swResult) { LOGV2_DEBUG(5875905, 0, // TODO SERVER-61781: Raise verbosity. "Internal transaction handling error", - "clientStatus"_attr = clientStatus, - "latestResponseStatus"_attr = _latestResponseStatus, - "latestResponseWCStatus"_attr = _latestResponseWCStatus, + "error"_attr = swResult.isOK() ? swResult.getValue().getEffectiveStatus() + : swResult.getStatus(), + "hasTransientTransactionErrorLabel"_attr = + _latestResponseHasTransientTransactionErrorLabel, "txnInfo"_attr = reportStateForLog()); if (_execContext == ExecutionContext::kClientTransaction) { @@ -242,23 +257,42 @@ Transaction::ErrorHandlingStep Transaction::handleError(Status clientStatus) { return ErrorHandlingStep::kDoNotRetry; } - auto hasStartedCommit = _state == TransactionState::kStartedCommit; - auto clientReceivedNetworkError = ErrorCodes::isNetworkError(clientStatus); - if (_latestResponseHasTransientTransactionErrorLabel || - // A network error before commit is a transient transaction error. - (!hasStartedCommit && clientReceivedNetworkError)) { + // The transient transaction error label is always returned in command responses, even for + // internal clients, so we use it to decide when to retry the transaction instead of inspecting + // error codes. The only exception is when a network error was received before commit, handled + // below. + if (_latestResponseHasTransientTransactionErrorLabel) { _primeForTransactionRetry(); return ErrorHandlingStep::kRetryTransaction; } - bool latestResponseErrorWasRetryable = ErrorCodes::isRetriableError(_latestResponseStatus) || - ErrorCodes::isRetriableError(_latestResponseWCStatus); - if (hasStartedCommit && latestResponseErrorWasRetryable) { - // TODO SERVER-59566: Handle timeouts and max retry attempts. Note commit might be retried - // within the command itself, e.g. ClusterCommitTransaction uses an idempotent retry policy, - // so we may want a timeout policy instead of number of retries. - _primeForCommitRetry(); - return ErrorHandlingStep::kRetryCommit; + auto hasStartedCommit = _state == TransactionState::kStartedCommit; + + const auto& clientStatus = swResult.getStatus(); + if (!clientStatus.isOK()) { + // A network error before commit is a transient transaction error. + if (!hasStartedCommit && ErrorCodes::isNetworkError(clientStatus)) { + _primeForTransactionRetry(); + return ErrorHandlingStep::kRetryTransaction; + } + return ErrorHandlingStep::kDoNotRetry; + } + + if (hasStartedCommit) { + const auto& commitStatus = swResult.getValue().cmdStatus; + const auto& commitWCStatus = swResult.getValue().wcError.toStatus(); + + // The retryable write error label is not returned to internal clients, so we cannot rely on + // it and instead use error categories to decide when to retry commit, which is treated as a + // retryable write, per the drivers specification. + if (ErrorCodes::isRetriableError(commitStatus) || + ErrorCodes::isRetriableError(commitWCStatus)) { + // TODO SERVER-59566: Handle timeouts and max retry attempts. Note commit might be + // retried within the command itself, e.g. ClusterCommitTransaction uses an idempotent + // retry policy, so we may want a timeout policy instead of number of retries. + _primeForCommitRetry(); + return ErrorHandlingStep::kRetryCommit; + } } return ErrorHandlingStep::kDoNotRetry; @@ -275,15 +309,10 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) { cmdBuilder->append(_readConcern.toBSON().firstElement()); } - _latestResponseStatus = Status::OK(); - _latestResponseWCStatus = Status::OK(); _latestResponseHasTransientTransactionErrorLabel = false; } void Transaction::processResponse(const BSONObj& reply) { - _latestResponseStatus = getStatusFromCommandResult(reply); - _latestResponseWCStatus = getWriteConcernStatusFromCommandResult(reply); - if (auto errorLabels = reply[kErrorLabelsFieldName]) { for (const auto& label : errorLabels.Array()) { if (label.String() == ErrorLabel::kTransientTransaction) { @@ -302,6 +331,7 @@ void Transaction::_setSessionInfo(LogicalSessionId lsid, } void Transaction::_primeForTransactionRetry() { + _latestResponseHasTransientTransactionErrorLabel = false; switch (_execContext) { case ExecutionContext::kOwnSession: // Advance txnNumber. @@ -329,6 +359,7 @@ void Transaction::_primeForTransactionRetry() { void Transaction::_primeForCommitRetry() { invariant(_state == TransactionState::kStartedCommit); + _latestResponseHasTransientTransactionErrorLabel = false; _state = TransactionState::kStarted; } diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h index 611f8f0e5ac..9db08057be9 100644 --- a/src/mongo/db/transaction_api.h +++ b/src/mongo/db/transaction_api.h @@ -33,6 +33,7 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/find_command_gen.h" +#include "mongo/rpc/write_concern_error_detail.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/future.h" @@ -44,6 +45,29 @@ class Transaction; } // namespace details /** + * Encapsulates the command status and write concern error from a response to a commitTransaction + * command. + */ +struct CommitResult { + /** + * Returns an error status with additional context if any of the inner errors are non OK. + */ + Status getEffectiveStatus() const { + if (!cmdStatus.isOK()) { + return cmdStatus.withContext("Command error committing internal transaction"); + } + if (!wcError.toStatus().isOK()) { + return wcError.toStatus().withContext( + "Write concern error committing internal transaction"); + } + return Status::OK(); + } + + Status cmdStatus; + WriteConcernErrorDetail wcError; +}; + +/** * Interface for the “backend” of an internal transaction responsible for executing commands. * Intended to be overriden and customized for different use cases. */ @@ -108,12 +132,27 @@ public: std::make_unique<details::Transaction>(opCtx, executor, std::move(txnClient))) {} /** - * Runs the given transaction callback synchronously, throwing on errors. + * Runs the given transaction callback synchronously. + * + * Returns a bundle with the commit command status and write concern error, if any. Any error + * prior to receiving a response from commit (e.g. an interruption or a user assertion in the + * given callback) will result in a non-ok StatusWith. Note that abort errors are not returned + * because an abort will only happen implicitly when another error has occurred, and that + * original error is returned instead. * * TODO SERVER-61782: Make this async. * TODO SERVER-61782: Allow returning a SemiFuture with any type. */ - void runSync(OperationContext* opCtx, TxnCallback func); + StatusWith<CommitResult> runSyncNoThrow(OperationContext* opCtx, TxnCallback func) noexcept; + + /** + * Same as above except will throw if the commit result has a non-ok command status or a write + * concern error. + */ + void runSync(OperationContext* opCtx, TxnCallback func) { + auto result = uassertStatusOK(runSyncNoThrow(opCtx, std::move(func))); + uassertStatusOK(result.getEffectiveStatus()); + } private: /** @@ -212,18 +251,25 @@ public: } /** - * Used by the transaction runner to commit or abort the transaction. Returns an error if the - * command fails. + * Used by the transaction runner to commit the transaction. Returns a future with a non-OK + * status if the commit failed to send, otherwise returns a future with a bundle with the + * command and write concern statuses. + */ + SemiFuture<CommitResult> commit(); + + /** + * Used by the transaction runner to abort the transaction. Returns a future with a non-OK + * status if there was an error sending the command, a non-ok command result, or a write concern + * error. */ - SemiFuture<void> commit(); SemiFuture<void> abort(); /** - * Handles the given client error or the latest error encountered in the transaction based on - * where the transaction is in its lifecycle, e.g. by updating its txnNumber or txnRetryCounter, - * and returns the next step for the transaction runner. + * Handles the given transaction result based on where the transaction is in its lifecycle and + * its execution context, e.g. by updating its txnNumber or txnRetryCounter, and returns the + * next step for the transaction runner. */ - ErrorHandlingStep handleError(Status clientStatus); + ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult); /** * Returns an object with info about the internal transaction for diagnostics. @@ -270,7 +316,7 @@ private: TxnNumber txnNumber, boost::optional<TxnRetryCounter> txnRetryCounter); - SemiFuture<void> _commitOrAbort(StringData dbName, StringData cmdName); + SemiFuture<BSONObj> _commitOrAbort(StringData dbName, StringData cmdName); /** * Extracts session options from Operation Context and infers the internal transaction’s @@ -293,8 +339,6 @@ private: std::unique_ptr<TransactionClient> _txnClient; bool _latestResponseHasTransientTransactionErrorLabel{false}; - Status _latestResponseStatus = Status::OK(); - Status _latestResponseWCStatus = Status::OK(); OperationSessionInfo _sessionInfo; repl::ReadConcernArgs _readConcern; diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp index c0d0e680c0c..2bd64fd84f1 100644 --- a/src/mongo/db/transaction_api_test.cpp +++ b/src/mongo/db/transaction_api_test.cpp @@ -197,7 +197,7 @@ void assertTxnMetadata(BSONObj obj, } TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) { - txnWithRetries().runSync( + auto swResult = txnWithRetries().runSyncNoThrow( opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { mockClient()->setNextCommandResponse(kOKInsertResponse); auto insertRes = txnClient @@ -229,6 +229,9 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) { mockClient()->setNextCommandResponse(kOKCommandResponse); return SemiFuture<void>::makeReady(); }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); + auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, @@ -251,7 +254,7 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) { resetTxnWithRetries(); int attempt = -1; - txnWithRetries().runSync( + auto swResult = txnWithRetries().runSyncNoThrow( opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { attempt += 1; @@ -289,6 +292,9 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) { mockClient()->setNextCommandResponse(kOKCommandResponse); return SemiFuture<void>::makeReady(); }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); + auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, @@ -310,27 +316,25 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnAbort) { opCtx()->setWriteConcern(writeConcern); resetTxnWithRetries(); - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - mockClient()->setNextCommandResponse(kOKInsertResponse); - auto insertRes = - txnClient - .runCommand("user"_sd, - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); - ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - - uasserted(ErrorCodes::InternalError, "Mock error"); - // The abort response, the client should ignore this. - mockClient()->setNextCommandResponse(kResWithBadValueError); - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + + mockClient()->setSecondCommandResponse( + kOKCommandResponse); // Best effort abort response. + + uasserted(ErrorCodes::InternalError, "Mock error"); + return SemiFuture<void>::makeReady(); + }); + ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError); + auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, @@ -355,7 +359,7 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) { resetTxnWithRetries(); int attempt = -1; - txnWithRetries().runSync( + auto swResult = txnWithRetries().runSyncNoThrow( opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { attempt += 1; mockClient()->setNextCommandResponse(kOKInsertResponse); @@ -393,6 +397,9 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) { mockClient()->setNextCommandResponse(kOKCommandResponse); return SemiFuture<void>::makeReady(); }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); + auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, @@ -405,26 +412,25 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) { } TEST_F(TxnAPITest, OwnSession_AbortsOnError) { - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - mockClient()->setNextCommandResponse(kOKInsertResponse); - auto insertRes = txnClient - .runCommand("user"_sd, - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); - ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + + // The best effort abort response, the client should ignore this. + mockClient()->setNextCommandResponse(kResWithBadValueError); + + uasserted(ErrorCodes::InternalError, "Mock error"); + return SemiFuture<void>::makeReady(); + }); + ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError); - uasserted(ErrorCodes::InternalError, "Mock error"); - // The abort response, the client should ignore this. - mockClient()->setNextCommandResponse(kResWithBadValueError); - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, @@ -436,67 +442,67 @@ TEST_F(TxnAPITest, OwnSession_AbortsOnError) { } TEST_F(TxnAPITest, OwnSession_SkipsCommitIfNoCommandsWereRun) { + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + // The commit response, the client should not receive this. + mockClient()->setNextCommandResponse(kResWithBadValueError); + + uasserted(ErrorCodes::InternalError, "Mock error"); + return SemiFuture<void>::makeReady(); + }); + ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError); - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - uasserted(ErrorCodes::InternalError, "Mock error"); - // The commit response, the client should not receive this. - mockClient()->setNextCommandResponse(kResWithBadValueError); - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } auto lastRequest = mockClient()->getLastSentRequest(); ASSERT(lastRequest.isEmpty()); } TEST_F(TxnAPITest, OwnSession_SkipsAbortIfNoCommandsWereRun) { - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - uasserted(ErrorCodes::InternalError, "Mock error"); - // The abort response, the client should not receive this. - mockClient()->setNextCommandResponse(kResWithBadValueError); - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + // The best effort abort response, the client should not receive this. + mockClient()->setNextCommandResponse(kResWithBadValueError); + + uasserted(ErrorCodes::InternalError, "Mock error"); + return SemiFuture<void>::makeReady(); + }); + ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError); + auto lastRequest = mockClient()->getLastSentRequest(); ASSERT(lastRequest.isEmpty()); } TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) { int attempt = -1; - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - attempt += 1; - mockClient()->setNextCommandResponse(attempt == 0 ? kNoSuchTransactionResponse - : kOKInsertResponse); - auto insertRes = txnClient - .runCommand("user"_sd, - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); - uassertStatusOK(getWriteConcernStatusFromCommandResult(insertRes)); + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + attempt += 1; + mockClient()->setNextCommandResponse(attempt == 0 ? kNoSuchTransactionResponse + : kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + if (attempt == 0) { + uassertStatusOK(getStatusFromWriteCommandReply(insertRes)); + } else { + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); + } - ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - attempt /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata(mockClient()->getLastSentRequest(), + attempt /* txnNumber */, + 0 /* txnRetryCounter */, + true /* startTransaction */); + + // The commit response. + mockClient()->setNextCommandResponse(kOKCommandResponse); + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); - // The commit response. - mockClient()->setNextCommandResponse(kOKCommandResponse); - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, @@ -509,33 +515,32 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) { TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) { int attempt = -1; - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - attempt += 1; - mockClient()->setNextCommandResponse(kOKInsertResponse); - auto insertRes = txnClient - .runCommand("user"_sd, - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); - uassertStatusOK(getWriteConcernStatusFromCommandResult(insertRes)); + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + attempt += 1; + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); - ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - attempt /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); - uassert(ErrorCodes::HostUnreachable, "Mock network error", attempt != 0); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata(mockClient()->getLastSentRequest(), + attempt /* txnNumber */, + 0 /* txnRetryCounter */, + true /* startTransaction */); + uassert(ErrorCodes::HostUnreachable, "Mock network error", attempt != 0); + + // The commit response. + mockClient()->setNextCommandResponse(kOKCommandResponse); + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); - // The commit response. - mockClient()->setNextCommandResponse(kOKCommandResponse); - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, @@ -547,34 +552,36 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) { } TEST_F(TxnAPITest, OwnSession_CommitError) { - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - mockClient()->setNextCommandResponse(kOKInsertResponse); - auto insertRes = txnClient - .runCommand("user"_sd, - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); - uassertStatusOK(getWriteConcernStatusFromCommandResult(insertRes)); + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); - ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - 0 /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata(mockClient()->getLastSentRequest(), + 0 /* txnNumber */, + 0 /* txnRetryCounter */, + true /* startTransaction */); + + // The commit response. + mockClient()->setNextCommandResponse( + BSON("ok" << 0 << "code" << ErrorCodes::InternalError)); + + // The best effort abort response, the client should ignore this. + mockClient()->setSecondCommandResponse(kResWithBadValueError); + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT_EQ(swResult.getValue().cmdStatus, ErrorCodes::InternalError); + ASSERT(swResult.getValue().wcError.toStatus().isOK()); + ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::InternalError); - // The commit response. - mockClient()->setNextCommandResponse( - BSON("ok" << 0 << "code" << ErrorCodes::InternalError)); - mockClient()->setSecondCommandResponse( - kOKCommandResponse); // Best effort abort response. - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, @@ -588,33 +595,32 @@ TEST_F(TxnAPITest, OwnSession_CommitError) { TEST_F(TxnAPITest, OwnSession_TransientCommitError) { int attempt = -1; - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - attempt += 1; - mockClient()->setNextCommandResponse(kOKInsertResponse); - auto insertRes = txnClient - .runCommand("user"_sd, - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); - uassertStatusOK(getWriteConcernStatusFromCommandResult(insertRes)); + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + attempt += 1; + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); - ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - attempt /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata(mockClient()->getLastSentRequest(), + attempt /* txnNumber */, + 0 /* txnRetryCounter */, + true /* startTransaction */); + + // The commit response. + mockClient()->setNextCommandResponse(attempt == 0 ? kNoSuchTransactionResponse + : kOKCommandResponse); + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); - // The commit response. - mockClient()->setNextCommandResponse(attempt == 0 ? kNoSuchTransactionResponse - : kOKCommandResponse); - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, attempt /* txnNumber */, @@ -626,33 +632,32 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) { } TEST_F(TxnAPITest, OwnSession_RetryableCommitError) { - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - mockClient()->setNextCommandResponse(kOKInsertResponse); - auto insertRes = txnClient - .runCommand("user"_sd, - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); - uassertStatusOK(getWriteConcernStatusFromCommandResult(insertRes)); + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); - ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - 0 /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata(mockClient()->getLastSentRequest(), + 0 /* txnNumber */, + 0 /* txnRetryCounter */, + true /* startTransaction */); + + // The commit response. + mockClient()->setNextCommandResponse( + BSON("ok" << 0 << "code" << ErrorCodes::InterruptedDueToReplStateChange)); + mockClient()->setSecondCommandResponse(kOKCommandResponse); + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); - // The commit response. - mockClient()->setNextCommandResponse( - BSON("ok" << 0 << "code" << ErrorCodes::InterruptedDueToReplStateChange)); - mockClient()->setSecondCommandResponse(kOKCommandResponse); - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::InternalError); - } auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, @@ -664,31 +669,32 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) { } TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) { - try { - txnWithRetries().runSync( - opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - mockClient()->setNextCommandResponse(kOKInsertResponse); - auto insertRes = txnClient - .runCommand("user"_sd, - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); - ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. - assertTxnMetadata(mockClient()->getLastSentRequest(), - 0 /* txnNumber */, - 0 /* txnRetryCounter */, - true /* startTransaction */); + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata(mockClient()->getLastSentRequest(), + 0 /* txnNumber */, + 0 /* txnRetryCounter */, + true /* startTransaction */); + + // The commit response. + mockClient()->setNextCommandResponse(kResWithWriteConcernError); + mockClient()->setSecondCommandResponse( + kOKCommandResponse); // Best effort abort response. + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().cmdStatus.isOK()); + ASSERT_EQ(swResult.getValue().wcError.toStatus(), ErrorCodes::WriteConcernFailed); + ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::WriteConcernFailed); - // The commit response. - mockClient()->setNextCommandResponse(kResWithWriteConcernError); - mockClient()->setSecondCommandResponse( - kOKCommandResponse); // Best effort abort response. - return SemiFuture<void>::makeReady(); - }); - } catch (const DBException& e) { - ASSERT_EQ(e.code(), ErrorCodes::WriteConcernFailed); - } auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, 0 /* txnNumber */, @@ -700,7 +706,7 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) { } TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) { - txnWithRetries().runSync( + auto swResult = txnWithRetries().runSyncNoThrow( opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { mockClient()->setNextCommandResponse(kOKInsertResponse); auto insertRes = txnClient @@ -709,7 +715,7 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) { << "foo" << "documents" << BSON_ARRAY(BSON("x" << 1)))) .get(); - uassertStatusOK(getWriteConcernStatusFromCommandResult(insertRes)); + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), @@ -722,6 +728,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) { mockClient()->setSecondCommandResponse(kOKCommandResponse); return SemiFuture<void>::makeReady(); }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, @@ -733,5 +741,70 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) { ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); } +TEST_F(TxnAPITest, RunSyncNoErrors) { + txnWithRetries().runSync(opCtx(), + [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + return SemiFuture<void>::makeReady(); + }); +} + +TEST_F(TxnAPITest, RunSyncThrowsOnBodyError) { + ASSERT_THROWS_CODE(txnWithRetries().runSync( + opCtx(), + [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + uasserted(ErrorCodes::InternalError, "Mock error"); + return SemiFuture<void>::makeReady(); + }), + DBException, + ErrorCodes::InternalError); +} + +TEST_F(TxnAPITest, RunSyncThrowsOnCommitCmdError) { + ASSERT_THROWS_CODE(txnWithRetries().runSync( + opCtx(), + [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" + << BSON_ARRAY(BSON("x" << 1)))) + .get(); + + // The commit response. + mockClient()->setNextCommandResponse( + BSON("ok" << 0 << "code" << ErrorCodes::InternalError)); + mockClient()->setSecondCommandResponse( + kOKCommandResponse); // Best effort abort response. + return SemiFuture<void>::makeReady(); + }), + DBException, + ErrorCodes::InternalError); +} + +TEST_F(TxnAPITest, RunSyncThrowsOnCommitWCError) { + ASSERT_THROWS_CODE(txnWithRetries().runSync( + opCtx(), + [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" + << BSON_ARRAY(BSON("x" << 1)))) + .get(); + + // The commit response. + mockClient()->setNextCommandResponse(kResWithWriteConcernError); + mockClient()->setSecondCommandResponse( + kOKCommandResponse); // Best effort abort response. + return SemiFuture<void>::makeReady(); + }), + DBException, + ErrorCodes::WriteConcernFailed); +} + } // namespace } // namespace mongo diff --git a/src/mongo/rpc/write_concern_error_detail.cpp b/src/mongo/rpc/write_concern_error_detail.cpp index 770fa06737a..aa274010b67 100644 --- a/src/mongo/rpc/write_concern_error_detail.cpp +++ b/src/mongo/rpc/write_concern_error_detail.cpp @@ -32,6 +32,7 @@ #include "mongo/rpc/write_concern_error_detail.h" #include "mongo/rpc/write_concern_error_gen.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/db/field_parser.h" #include "mongo/util/str.h" @@ -138,4 +139,32 @@ const BSONObj& WriteConcernErrorDetail::getErrInfo() const { return _errInfo; } +WriteConcernErrorDetail getWriteConcernErrorDetail(const BSONElement& wcErrorElem) { + WriteConcernErrorDetail wcError; + std::string errMsg; + auto wcErrorObj = wcErrorElem.Obj(); + if (!wcError.parseBSON(wcErrorObj, &errMsg)) { + wcError.clear(); + wcError.setStatus({ErrorCodes::FailedToParse, + "Failed to parse writeConcernError: " + wcErrorObj.toString() + + ", Received error: " + errMsg}); + } + + return wcError; +} + +std::unique_ptr<WriteConcernErrorDetail> getWriteConcernErrorDetailFromBSONObj(const BSONObj& obj) { + BSONElement wcErrorElem; + Status status = bsonExtractTypedField(obj, "writeConcernError", Object, &wcErrorElem); + if (!status.isOK()) { + if (status == ErrorCodes::NoSuchKey) { + return nullptr; + } else { + uassertStatusOK(status); + } + } + + return std::make_unique<WriteConcernErrorDetail>(getWriteConcernErrorDetail(wcErrorElem)); +} + } // namespace mongo diff --git a/src/mongo/rpc/write_concern_error_detail.h b/src/mongo/rpc/write_concern_error_detail.h index 8bd984c9f1e..090b77c3061 100644 --- a/src/mongo/rpc/write_concern_error_detail.h +++ b/src/mongo/rpc/write_concern_error_detail.h @@ -83,4 +83,14 @@ private: bool _isErrInfoSet; }; +/** + * Creates and returns a WriteConcernErrorDetail object from a BSONObj. + */ +std::unique_ptr<WriteConcernErrorDetail> getWriteConcernErrorDetailFromBSONObj(const BSONObj& obj); + +/** + * Constructs a WriteConcernErrorDetail by parsing the given BSONElement. + */ +WriteConcernErrorDetail getWriteConcernErrorDetail(const BSONElement& wcErrorElem); + } // namespace mongo diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 44241865243..6f822ea416e 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -62,20 +62,6 @@ namespace mongo { -WriteConcernErrorDetail getWriteConcernErrorDetail(const BSONElement& wcErrorElem) { - WriteConcernErrorDetail wcError; - std::string errMsg; - auto wcErrorObj = wcErrorElem.Obj(); - if (!wcError.parseBSON(wcErrorObj, &errMsg)) { - wcError.clear(); - wcError.setStatus({ErrorCodes::FailedToParse, - "Failed to parse writeConcernError: " + wcErrorObj.toString() + - ", Received error: " + errMsg}); - } - - return wcError; -} - void appendWriteConcernErrorToCmdResponse(const ShardId& shardId, const BSONElement& wcErrorElem, BSONObjBuilder& responseBuilder) { @@ -88,20 +74,6 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardId, responseBuilder.append("writeConcernError", wcError.toBSON()); } -std::unique_ptr<WriteConcernErrorDetail> getWriteConcernErrorDetailFromBSONObj(const BSONObj& obj) { - BSONElement wcErrorElem; - Status status = bsonExtractTypedField(obj, "writeConcernError", Object, &wcErrorElem); - if (!status.isOK()) { - if (status == ErrorCodes::NoSuchKey) { - return nullptr; - } else { - uassertStatusOK(status); - } - } - - return std::make_unique<WriteConcernErrorDetail>(getWriteConcernErrorDetail(wcErrorElem)); -} - boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTargeter( OperationContext* opCtx, const NamespaceString& nss, |