diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2022-04-13 02:50:43 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-13 06:02:52 +0000 |
commit | d43fcc40638718961c8e01ffb2eb5ce2c525c742 (patch) | |
tree | 9955b74d219c6bae3f9ad4be9100b474377c9eab | |
parent | b3cded481b3b70c766338a729014e64bf71a97e6 (diff) | |
download | mongo-d43fcc40638718961c8e01ffb2eb5ce2c525c742.tar.gz |
SERVER-59566 Retry and timeout policy for internal transactions
-rw-r--r-- | src/mongo/db/transaction_api.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.h | 8 | ||||
-rw-r--r-- | src/mongo/db/transaction_api_test.cpp | 129 |
3 files changed, 169 insertions, 12 deletions
diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp index 2801efde063..9453b851677 100644 --- a/src/mongo/db/transaction_api.cpp +++ b/src/mongo/db/transaction_api.cpp @@ -58,6 +58,10 @@ #include "mongo/stdx/future.h" #include "mongo/transport/service_entry_point.h" +// TODO SERVER-65395: Remove failpoint when fle2 tests can reliably support internal transaction +// retry limit. +MONGO_FAIL_POINT_DEFINE(skipTransactionApiRetryCheckInHandleError); + namespace mongo::txn_api { namespace { @@ -168,15 +172,16 @@ StatusWith<CommitResult> TransactionWithRetries::runSyncNoThrow(OperationContext OperationTimeTracker::get(opCtx)->updateOperationTime(_internalTxn->getOperationTime()); }); - // TODO SERVER-59566 Add a retry policy. _internalTxn->setCallback(std::move(callback)); + int bodyAttempts = 0; while (true) { + bodyAttempts++; { auto bodyStatus = getWithYields(opCtx, [&] { return _internalTxn->runCallback(); }, _resourceYielder); if (!bodyStatus.isOK()) { - auto nextStep = _internalTxn->handleError(bodyStatus); + auto nextStep = _internalTxn->handleError(bodyStatus, bodyAttempts); logNextStep(nextStep, _internalTxn->reportStateForLog()); if (nextStep == details::Transaction::ErrorHandlingStep::kDoNotRetry) { @@ -195,7 +200,9 @@ StatusWith<CommitResult> TransactionWithRetries::runSyncNoThrow(OperationContext } } + int commitAttempts = 0; while (true) { + commitAttempts++; auto swResult = getWithYields(opCtx, [&] { return _internalTxn->commit(); }, _resourceYielder); @@ -204,7 +211,7 @@ StatusWith<CommitResult> TransactionWithRetries::runSyncNoThrow(OperationContext return swResult; } - auto nextStep = _internalTxn->handleError(swResult); + auto nextStep = _internalTxn->handleError(swResult, commitAttempts); logNextStep(nextStep, _internalTxn->reportStateForLog()); if (nextStep == details::Transaction::ErrorHandlingStep::kDoNotRetry) { @@ -421,8 +428,8 @@ SemiFuture<void> Transaction::runCallback() { .semi(); } -Transaction::ErrorHandlingStep Transaction::handleError( - const StatusWith<CommitResult>& swResult) const { +Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitResult>& swResult, + int attemptCounter) const { stdx::lock_guard<Latch> lg(_mutex); LOGV2_DEBUG(5875905, @@ -432,13 +439,21 @@ Transaction::ErrorHandlingStep Transaction::handleError( : swResult.getStatus(), "hasTransientTransactionErrorLabel"_attr = _latestResponseHasTransientTransactionErrorLabel, - "txnInfo"_attr = _reportStateForLog(lg)); + "txnInfo"_attr = _reportStateForLog(lg), + "retriesLeft"_attr = + kTxnRetryLimit - attemptCounter + 1 // To account for the initial execution. + ); if (_execContext == ExecutionContext::kClientTransaction) { // If we're nested in another transaction, let the outer most client decide on errors. return ErrorHandlingStep::kDoNotRetry; } + if (!MONGO_unlikely(skipTransactionApiRetryCheckInHandleError.shouldFail()) && + attemptCounter > kTxnRetryLimit) { + return ErrorHandlingStep::kAbortAndDoNotRetry; + } + // The transient transaction error label is always returned in command responses, even for // internal clients, so we use it to decide when to retry the transaction instead of inspecting // error codes. The only exception is when a network error was received before commit, handled @@ -473,9 +488,6 @@ Transaction::ErrorHandlingStep Transaction::handleError( // retryable write, per the drivers specification. if (ErrorCodes::isRetriableError(commitStatus) || ErrorCodes::isRetriableError(commitWCStatus)) { - // TODO SERVER-59566: Handle timeouts and max retry attempts. Note commit might be - // retried within the command itself, e.g. ClusterCommitTransaction uses an idempotent - // retry policy, so we may want a timeout policy instead of number of retries. return ErrorHandlingStep::kRetryCommit; } } @@ -512,6 +524,16 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) { cmdBuilder->append(repl::ReadConcernArgs::kReadConcernFieldName, _readConcern); } + // Append the new recalculated maxTimeMS + if (_opDeadline) { + uassert(5956600, + "Command object passed to the transaction api should not contain maxTimeMS field", + !cmdBuilder->hasField(kMaxTimeMSField)); + auto timeLeftover = + std::max(Milliseconds(0), *_opDeadline - _service->getFastClockSource()->now()); + cmdBuilder->append(kMaxTimeMSField, durationCount<Milliseconds>(timeLeftover)); + } + // If the transaction API caller had API parameters, we should forward them in all requests. if (_apiParameters.getParamsPassed()) { _apiParameters.appendInfo(cmdBuilder); @@ -644,6 +666,10 @@ void Transaction::_primeTransaction(OperationContext* opCtx) { ReadWriteConcernProvenanceBase::kSourceFieldName); _apiParameters = APIParameters::get(opCtx); + if (opCtx->hasDeadline()) { + _opDeadline = opCtx->getDeadline(); + } + LOGV2_DEBUG(5875901, 3, "Started internal transaction", diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h index 48559d0b79e..8eacdbdc95e 100644 --- a/src/mongo/db/transaction_api.h +++ b/src/mongo/db/transaction_api.h @@ -46,6 +46,10 @@ class TxnMetadataHooks; class Transaction; } // namespace details +// Max number of retries allowed for a transaction operation. +static constexpr int kTxnRetryLimit = 10; +static constexpr auto kMaxTimeMSField = "maxTimeMS"; + /** * Encapsulates the command status and write concern error from a response to a commitTransaction * command. @@ -367,7 +371,8 @@ public: * its execution context, e.g. by updating its txnNumber, returning the next step for the * transaction runner. */ - ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult) const; + ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult, + int attemptCounter) const; /** * Returns an object with info about the internal transaction for diagnostics. @@ -424,6 +429,7 @@ private: std::unique_ptr<TransactionClient> _txnClient; Callback _callback; + boost::optional<Date_t> _opDeadline; BSONObj _writeConcern; BSONObj _readConcern; APIParameters _apiParameters; diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp index a010334ff02..02ee4b128b9 100644 --- a/src/mongo/db/transaction_api_test.cpp +++ b/src/mongo/db/transaction_api_test.cpp @@ -43,6 +43,7 @@ #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/executor_test_util.h" #include "mongo/util/fail_point.h" @@ -140,7 +141,7 @@ public: } virtual SemiFuture<BSONObj> runCommand(StringData dbName, BSONObj cmd) const override { - auto cmdBob = BSONObjBuilder(cmd); + auto cmdBob = BSONObjBuilder(std::move(cmd)); invariant(_hooks); _hooks->runRequestHook(&cmdBob); _lastSentRequest = cmdBob.obj(); @@ -251,7 +252,8 @@ void assertTxnMetadata(BSONObj obj, TxnNumber txnNumber, boost::optional<bool> startTransaction, boost::optional<BSONObj> readConcern = boost::none, - boost::optional<BSONObj> writeConcern = boost::none) { + boost::optional<BSONObj> writeConcern = boost::none, + boost::optional<int> maxTimeMS = boost::none) { ASSERT_EQ(obj["lsid"].type(), BSONType::Object); ASSERT_EQ(obj["autocommit"].Bool(), false); ASSERT_EQ(obj["txnNumber"].Long(), txnNumber); @@ -277,6 +279,12 @@ void assertTxnMetadata(BSONObj obj, } else { ASSERT(obj["writeConcern"].eoo()); } + + if (maxTimeMS) { + ASSERT_EQ(obj["maxTimeMS"].numberInt(), *maxTimeMS); + } else { + ASSERT(obj["maxTimeMS"].eoo()); + } } class TxnAPITest : public ServiceContextTest { @@ -1681,5 +1689,122 @@ TEST_F(TxnAPITest, TestExhaustiveFindErrorOnGetMore) { ASSERT_EQ(lastRequest["batchSize"].Long(), 2); ASSERT_EQ(lastRequest["collection"].String(), "bar"); } + +TEST_F(TxnAPITest, OwnSession_StartTransactionRetryLimitOnTransientErrors) { + int retryCount = 0; + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + retryCount++; + + // Command response used for insert below and eventually abortTransaction. + mockClient()->setNextCommandResponse(kOKCommandResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + uasserted(ErrorCodes::HostUnreachable, "Host unreachable error"); + return SemiFuture<void>::makeReady(); + }); + // The transient error should have been propagated. + ASSERT_EQ(swResult.getStatus(), ErrorCodes::HostUnreachable); + + // We get 11 due to the initial try and then 10 follow up retries. + ASSERT_EQ(retryCount, 11); + + auto lastRequest = mockClient()->getLastSentRequest(); + assertTxnMetadata(lastRequest, + 10 /* txnNumber */, + boost::none /* startTransaction */, + boost::none /* readConcern */, + WriteConcernOptions().toBSON() /* writeConcern */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "abortTransaction"_sd); +} + +TEST_F(TxnAPITest, OwnSession_CommitTransactionRetryLimitOnTransientErrors) { + // If we are able to successfully finish this test, then we know that we have limited our + // retries. + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata( + mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + + // The commit response. + mockClient()->setNextCommandResponse( + BSON("ok" << 0 << "code" << ErrorCodes::PrimarySteppedDown)); + return SemiFuture<void>::makeReady(); + }); + + // The transient error should have been propagated. + ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::PrimarySteppedDown); + auto lastRequest = mockClient()->getLastSentRequest(); + assertTxnMetadata(lastRequest, + 0 /* txnNumber */, + boost::none /* startTransaction */, + boost::none /* readConcern */, + WriteConcernOptions().toBSON() /* writeConcern */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "abortTransaction"_sd); +} + +TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) { + const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>(); + mockClock->reset(getServiceContext()->getFastClockSource()->now()); + getServiceContext()->setFastClockSource(std::make_unique<SharedClockSourceAdapter>(mockClock)); + int maxTimeMS = 2000; + opCtx()->setDeadlineByDate(mockClock->now() + Milliseconds(maxTimeMS), + ErrorCodes::MaxTimeMSExpired); + + // txnNumber will be incremented upon the release of a session. + resetTxnWithRetries(); + + auto swResult = txnWithRetries().runSyncNoThrow( + opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + mockClient()->setNextCommandResponse(kOKInsertResponse); + auto insertRes = txnClient + .runCommand("user"_sd, + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); + ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. + assertTxnMetadata(mockClient()->getLastSentRequest(), + 1 /* txnNumber */, + true /* startTransaction */, + boost::none /* readConcern */, + boost::none /* writeConcern */, + maxTimeMS /* maxTimeMS */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + + mockClock->advance(Milliseconds(1000)); + + // The commit response. + mockClient()->setNextCommandResponse(kOKCommandResponse); + return SemiFuture<void>::makeReady(); + }); + ASSERT(swResult.getStatus().isOK()); + ASSERT(swResult.getValue().getEffectiveStatus().isOK()); + + auto lastRequest = mockClient()->getLastSentRequest(); + assertTxnMetadata(lastRequest, + 1 /* txnNumber */, + boost::none /* startTransaction */, + boost::none /* readConcern */, + WriteConcernOptions().toBSON() /* writeConcern */, + 1000 /* maxTimeMS */); + assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); +} } // namespace } // namespace mongo |