diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2023-05-12 21:17:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-12 23:57:53 +0000 |
commit | cc09debe1f3eacece6986f8bb5fd3199600e54c6 (patch) | |
tree | 9a15da1fe9530202e3b04e170a7fc09df54679f8 | |
parent | 4fc07a7a59bfc5c664b26c9fc307f52d5616dc51 (diff) | |
download | mongo-cc09debe1f3eacece6986f8bb5fd3199600e54c6.tar.gz |
SERVER-77053 Transaction API shouldn't throw top level errors in runCommand
-rw-r--r-- | jstests/noPassthrough/transaction_api_commit_errors.js | 115 | ||||
-rw-r--r-- | src/mongo/db/s/global_index/global_index_inserter.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_api.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_api.h | 15 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_api_test.cpp | 165 |
5 files changed, 359 insertions, 39 deletions
diff --git a/jstests/noPassthrough/transaction_api_commit_errors.js b/jstests/noPassthrough/transaction_api_commit_errors.js new file mode 100644 index 00000000000..1613599a78b --- /dev/null +++ b/jstests/noPassthrough/transaction_api_commit_errors.js @@ -0,0 +1,115 @@ +/** + * Tests that the transaction API handles commit errors correctly. + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); + +const kDbName = "testDb"; +const kCollName = "testColl"; + +function makeSingleInsertTxn(doc) { + return [{ + dbName: kDbName, + command: { + insert: kCollName, + documents: [doc], + } + }]; +} + +function runTxn(conn, commandInfos) { + return conn.adminCommand({testInternalTransactions: 1, commandInfos: commandInfos}); +} + +const st = new ShardingTest({config: 1, shards: 1}); +const shardPrimary = st.rs0.getPrimary(); + +// Set up the test collection. +assert.commandWorked(st.s.getDB(kDbName)[kCollName].insert([{_id: 0}])); + +// +// Error codes where the API should retry and eventually commit the transaction, either by retrying +// commit until it succeeds or retrying the entire transaction until it succeeds. Fail commands 10 +// times to exhaust internal retries at layers below the transaction API. +// + +// Retryable error. Note this error is not a NotPrimary error so it won't be rewritten by mongos. +let commitFailPoint = + configureFailPoint(shardPrimary, + "failCommand", + { + errorCode: ErrorCodes.ReadConcernMajorityNotAvailableYet, + failCommands: ["commitTransaction"], + failInternalCommands: true, + }, + {times: 10}); +let res = assert.commandWorked(runTxn(st.s, makeSingleInsertTxn({_id: 1}))); +commitFailPoint.off(); + +// No command error with a retryable write concern error. +commitFailPoint = configureFailPoint( + shardPrimary, + "failCommand", + { + writeConcernError: + {code: NumberInt(ErrorCodes.ReadConcernMajorityNotAvailableYet), errmsg: "foo"}, + failCommands: ["commitTransaction"], + failInternalCommands: true, + }, + {times: 10}); +res = assert.commandWorked(runTxn(st.s, makeSingleInsertTxn({_id: 2}))); +commitFailPoint.off(); + +// +// Error codes where the API should not retry. +// + +// Non-transient commit error with a non-retryable write concern error. +commitFailPoint = configureFailPoint(shardPrimary, + "failCommand", + { + errorCode: ErrorCodes.InternalError, + failCommands: ["commitTransaction"], + failInternalCommands: true, + }, + {times: 10}); +res = assert.commandFailedWithCode(runTxn(st.s, makeSingleInsertTxn({_id: 3})), + ErrorCodes.InternalError); +commitFailPoint.off(); + +// No commit error with a non-retryable write concern error. +commitFailPoint = configureFailPoint( + shardPrimary, + "failCommand", + { + writeConcernError: {code: NumberInt(ErrorCodes.InternalError), errmsg: "foo"}, + failCommands: ["commitTransaction"], + failInternalCommands: true, + }, + {times: 10}); +// The internal transaction test command will rethrow a write concern error as a top-level error. +res = assert.commandFailedWithCode(runTxn(st.s, makeSingleInsertTxn({_id: 4})), + ErrorCodes.InternalError); +commitFailPoint.off(); + +// Non-transient commit error that is normally transient. Note NoSuchTransaction is not transient +// with a write concern error, which is what this is meant to simulate. Also note the fail command +// fail point can't take both a write concern error and write concern error so we "cheat" and +// override the error labels. +commitFailPoint = configureFailPoint(shardPrimary, + "failCommand", + { + errorCode: ErrorCodes.NoSuchTransaction, + errorLabels: [], + failCommands: ["commitTransaction"], + failInternalCommands: true, + }, + {times: 10}); +res = assert.commandFailedWithCode(runTxn(st.s, makeSingleInsertTxn({_id: 5})), + ErrorCodes.NoSuchTransaction); +commitFailPoint.off(); + +st.stop(); +}()); diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp index 673810360cd..93cbbb20049 100644 --- a/src/mongo/db/s/global_index/global_index_inserter.cpp +++ b/src/mongo/db/s/global_index/global_index_inserter.cpp @@ -64,43 +64,45 @@ NamespaceString GlobalIndexInserter::_skipIdNss() { void GlobalIndexInserter::processDoc(OperationContext* opCtx, const BSONObj& indexKeyValues, const BSONObj& documentKey) { - auto insertToGlobalIndexFn = - [this, service = opCtx->getServiceContext(), indexKeyValues, documentKey]( - const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { - FindCommandRequest skipIdQuery(_skipIdNss()); - skipIdQuery.setFilter(BSON("_id" << documentKey)); - skipIdQuery.setLimit(1); - - return txnClient.exhaustiveFind(skipIdQuery) - .thenRunOn(txnExec) - .then([this, service, indexKeyValues, documentKey, &txnClient, txnExec]( - const auto& skipIdDocResults) { - auto client = service->makeClient("globalIndexInserter"); - auto opCtx = service->makeOperationContext(client.get()); - globalIndexInserterPauseAfterReadingSkipCollection.pauseWhileSet(opCtx.get()); - - if (!skipIdDocResults.empty()) { - return SemiFuture<void>::makeReady(); - } - - InsertGlobalIndexKey globalIndexEntryInsert(_indexUUID); - // Note: dbName is unused by command but required by idl. - globalIndexEntryInsert.setDbName(DatabaseName::kAdmin); - globalIndexEntryInsert.setGlobalIndexKeyEntry( - GlobalIndexKeyEntry(indexKeyValues, documentKey)); - - return txnClient.runCommand(_nss.dbName(), globalIndexEntryInsert.toBSON({})) - .thenRunOn(txnExec) - .then([this, documentKey, &txnClient](const auto& commandResponse) { - write_ops::InsertCommandRequest skipIdInsert(_skipIdNss()); - - skipIdInsert.setDocuments({BSON("_id" << documentKey)}); - return txnClient.runCRUDOp({skipIdInsert}, {}).ignoreValue(); - }) - .semi(); - }) - .semi(); - }; + auto insertToGlobalIndexFn = [this, + service = opCtx->getServiceContext(), + indexKeyValues, + documentKey](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + FindCommandRequest skipIdQuery(_skipIdNss()); + skipIdQuery.setFilter(BSON("_id" << documentKey)); + skipIdQuery.setLimit(1); + + return txnClient.exhaustiveFind(skipIdQuery) + .thenRunOn(txnExec) + .then([this, service, indexKeyValues, documentKey, &txnClient, txnExec]( + const auto& skipIdDocResults) { + auto client = service->makeClient("globalIndexInserter"); + auto opCtx = service->makeOperationContext(client.get()); + globalIndexInserterPauseAfterReadingSkipCollection.pauseWhileSet(opCtx.get()); + + if (!skipIdDocResults.empty()) { + return SemiFuture<void>::makeReady(); + } + + InsertGlobalIndexKey globalIndexEntryInsert(_indexUUID); + // Note: dbName is unused by command but required by idl. + globalIndexEntryInsert.setDbName(DatabaseName::kAdmin); + globalIndexEntryInsert.setGlobalIndexKeyEntry( + GlobalIndexKeyEntry(indexKeyValues, documentKey)); + + return txnClient.runCommandChecked(_nss.dbName(), globalIndexEntryInsert.toBSON({})) + .thenRunOn(txnExec) + .then([this, documentKey, &txnClient](const auto& commandResponse) { + write_ops::InsertCommandRequest skipIdInsert(_skipIdNss()); + + skipIdInsert.setDocuments({BSON("_id" << documentKey)}); + return txnClient.runCRUDOp({skipIdInsert}, {}).ignoreValue(); + }) + .semi(); + }) + .semi(); + }; auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); txn_api::SyncTransactionWithRetries txn(opCtx, _executor, nullptr, inlineExecutor); diff --git a/src/mongo/db/transaction/transaction_api.cpp b/src/mongo/db/transaction/transaction_api.cpp index 17e50980c99..a2854b6dbf7 100644 --- a/src/mongo/db/transaction/transaction_api.cpp +++ b/src/mongo/db/transaction/transaction_api.cpp @@ -452,9 +452,10 @@ ExecutorFuture<BSONObj> SEPTransactionClient::_runCommand(const DatabaseName& db return _behaviors->handleRequest(cancellableOpCtx.get(), requestMessage) .thenRunOn(_executor) .then([this](DbResponse dbResponse) { + // NOTE: The API uses this method to run commit and abort, so be careful about adding + // new logic here to ensure it cannot interfere with error handling for either command. auto reply = rpc::makeReply(&dbResponse.response)->getCommandReply().getOwned(); _hooks->runReplyHook(reply); - uassertStatusOK(getStatusFromCommandResult(reply)); return reply; }); } @@ -475,6 +476,30 @@ SemiFuture<BSONObj> SEPTransactionClient::runCommand(const DatabaseName& dbName, return _runCommand(dbName, cmdObj).semi(); } +ExecutorFuture<BSONObj> SEPTransactionClient::_runCommandChecked(const DatabaseName& dbName, + BSONObj cmdObj) const { + return _runCommand(dbName, cmdObj).then([](BSONObj reply) { + uassertStatusOK(getStatusFromCommandResult(reply)); + return reply; + }); +} + +SemiFuture<BSONObj> SEPTransactionClient::runCommandChecked(const DatabaseName& dbName, + BSONObj cmdObj) const { + return _runCommandChecked(dbName, cmdObj).semi(); +} + +BSONObj SEPTransactionClient::runCommandCheckedSync(const DatabaseName& dbName, + BSONObj cmdObj) const { + Notification<void> mayReturn; + auto result = _runCommandChecked(dbName, cmdObj).unsafeToInlineFuture().tapAll([&](auto&&) { + mayReturn.set(); + }); + runFutureInline(_inlineExecutor.get(), mayReturn); + + return std::move(result).get(); +} + ExecutorFuture<BatchedCommandResponse> SEPTransactionClient::_runCRUDOp( const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const { invariant(!stmtIds.size() || (cmd.sizeWriteOps() == stmtIds.size()), diff --git a/src/mongo/db/transaction/transaction_api.h b/src/mongo/db/transaction/transaction_api.h index 323a4af98f9..28367668175 100644 --- a/src/mongo/db/transaction/transaction_api.h +++ b/src/mongo/db/transaction/transaction_api.h @@ -100,8 +100,15 @@ public: * Runs the given command as part of the transaction that owns this transaction client. */ virtual SemiFuture<BSONObj> runCommand(const DatabaseName& dbName, BSONObj cmd) const = 0; - virtual BSONObj runCommandSync(const DatabaseName& dbName, BSONObj cmd) const = 0; + + /** + * Same as runCommand but will assert the command status is ok. + */ + virtual SemiFuture<BSONObj> runCommandChecked(const DatabaseName& dbName, + BSONObj cmd) const = 0; + virtual BSONObj runCommandCheckedSync(const DatabaseName& dbName, BSONObj cmd) const = 0; + /** * Helper method to run commands representable as a BatchedCommandRequest in the transaction * client's transaction. @@ -298,6 +305,10 @@ public: virtual SemiFuture<BSONObj> runCommand(const DatabaseName& dbName, BSONObj cmd) const override; virtual BSONObj runCommandSync(const DatabaseName& dbName, BSONObj cmd) const override; + virtual SemiFuture<BSONObj> runCommandChecked(const DatabaseName& dbName, + BSONObj cmd) const override; + virtual BSONObj runCommandCheckedSync(const DatabaseName& dbName, BSONObj cmd) const override; + virtual SemiFuture<BatchedCommandResponse> runCRUDOp( const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const override; virtual BatchedCommandResponse runCRUDOpSync(const BatchedCommandRequest& cmd, @@ -322,6 +333,8 @@ public: private: ExecutorFuture<BSONObj> _runCommand(const DatabaseName& dbName, BSONObj cmd) const; + ExecutorFuture<BSONObj> _runCommandChecked(const DatabaseName& dbName, BSONObj cmd) const; + ExecutorFuture<BatchedCommandResponse> _runCRUDOp(const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const; diff --git a/src/mongo/db/transaction/transaction_api_test.cpp b/src/mongo/db/transaction/transaction_api_test.cpp index b9290d0cd86..4bdbb16a141 100644 --- a/src/mongo/db/transaction/transaction_api_test.cpp +++ b/src/mongo/db/transaction/transaction_api_test.cpp @@ -80,6 +80,19 @@ const BSONObj kRetryableWriteConcernError = const BSONObj kResWithRetryableWriteConcernError = BSON("ok" << 1 << "writeConcernError" << kRetryableWriteConcernError); +const BSONObj kResWithTransientCommitErrorAndRetryableWriteConcernError = + BSON("ok" << 0 << "code" << ErrorCodes::LockTimeout << kErrorLabelsFieldName + << BSON_ARRAY(ErrorLabel::kTransientTransaction) << "writeConcernError" + << kRetryableWriteConcernError); + +const BSONObj kResWithNonTransientCommitErrorAndRetryableWriteConcernError = + BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction << "writeConcernError" + << kRetryableWriteConcernError); + +const BSONObj kResWithNonTransientCommitErrorAndNonRetryableWriteConcernError = + BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction << "writeConcernError" + << kWriteConcernError); + class MockResourceYielder : public ResourceYielder { public: void yield(OperationContext*) { @@ -194,6 +207,15 @@ public: MONGO_UNREACHABLE; } + virtual BSONObj runCommandCheckedSync(const DatabaseName& dbName, BSONObj cmd) const override { + MONGO_UNREACHABLE; + } + + virtual SemiFuture<BSONObj> runCommandChecked(const DatabaseName& dbName, + BSONObj cmd) const override { + MONGO_UNREACHABLE; + } + virtual SemiFuture<BatchedCommandResponse> runCRUDOp( const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const override { MONGO_UNREACHABLE; @@ -502,6 +524,15 @@ public: MONGO_UNREACHABLE; } + virtual BSONObj runCommandCheckedSync(const DatabaseName& dbName, BSONObj cmd) const override { + MONGO_UNREACHABLE; + } + + virtual SemiFuture<BSONObj> runCommandChecked(const DatabaseName& dbName, + BSONObj cmd) const override { + MONGO_UNREACHABLE; + } + virtual SemiFuture<BatchedCommandResponse> runCRUDOp( const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const override { MONGO_UNREACHABLE; @@ -1070,6 +1101,49 @@ TEST_F(TxnAPITest, OwnSession_CommitError) { ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); } +TEST_F(TxnAPITest, DoesNotRetryOnNonTransientCommitErrorWithNonRetryableCommitWCError) { + auto swResult = txnWithRetries().runNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = + txnClient + .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd), + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); + + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata( + mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + + // The commit response. + mockClient()->setNextCommandResponse( + kResWithNonTransientCommitErrorAndNonRetryableWriteConcernError); + + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT_EQ(swResult.getValue().cmdStatus, ErrorCodes::NoSuchTransaction); + ASSERT_EQ(swResult.getValue().wcError.toStatus(), ErrorCodes::WriteConcernFailed); + ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::NoSuchTransaction); + + ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getStarted()); + ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getRetriedTransactions()); + ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getRetriedCommits()); + ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getSucceeded()); + + auto lastRequest = mockClient()->getLastSentRequest(); + assertTxnMetadata(lastRequest, + 0 /* txnNumber */, + boost::none /* startTransaction */, + boost::none /* readConcern */, + WriteConcernOptions().toBSON() /* writeConcern */); + ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); +} + TEST_F(TxnAPITest, OwnSession_TransientCommitError) { int attempt = -1; auto swResult = txnWithRetries().runNoThrow( @@ -1243,6 +1317,97 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) { ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); } +TEST_F(TxnAPITest, RetriesOnNonTransientCommitWithErrorRetryableCommitWCError) { + auto swResult = txnWithRetries().runNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = + txnClient + .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd), + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); + + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata( + mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + + // The commit responses. + mockClient()->setNextCommandResponse( + kResWithNonTransientCommitErrorAndRetryableWriteConcernError); + mockClient()->setNextCommandResponse(kOKCommandResponse); + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); + + ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getStarted()); + ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getRetriedTransactions()); + ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getRetriedCommits()); + ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getSucceeded()); + + auto lastRequest = mockClient()->getLastSentRequest(); + assertTxnMetadata(lastRequest, + 0 /* txnNumber */, + boost::none /* startTransaction */, + boost::none /* readConcern */, + CommandHelpers::kMajorityWriteConcern.toBSON()); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); +} + +TEST_F(TxnAPITest, RetriesOnTransientCommitErrorWithRetryableWCError) { + int attempt = -1; + auto swResult = txnWithRetries().runNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + attempt += 1; + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = + txnClient + .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd), + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_OK(getStatusFromWriteCommandReply(insertRes)); + + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata(mockClient()->getLastSentRequest(), + attempt /* txnNumber */, + true /* startTransaction */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + + // Set commit response. Initial commit response is a transient txn error so the + // transaction retries. + if (attempt == 0) { + mockClient()->setNextCommandResponse( + kResWithTransientCommitErrorAndRetryableWriteConcernError); + } else { + mockClient()->setNextCommandResponse(kOKCommandResponse); + } + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); + + ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getStarted()); + ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getRetriedTransactions()); + ASSERT_EQ(0, InternalTransactionMetrics::get(opCtx())->getRetriedCommits()); + ASSERT_EQ(1, InternalTransactionMetrics::get(opCtx())->getSucceeded()); + + auto lastRequest = mockClient()->getLastSentRequest(); + assertTxnMetadata(lastRequest, + attempt /* txnNumber */, + boost::none /* startTransaction */, + boost::none /* readConcern */, + WriteConcernOptions().toBSON() /* writeConcern */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); +} + TEST_F(TxnAPITest, RunNoErrors) { txnWithRetries().run(opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { |