summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Zhang <jason.zhang@mongodb.com>2022-04-13 02:50:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-13 06:02:52 +0000
commitd43fcc40638718961c8e01ffb2eb5ce2c525c742 (patch)
tree9955b74d219c6bae3f9ad4be9100b474377c9eab
parentb3cded481b3b70c766338a729014e64bf71a97e6 (diff)
downloadmongo-d43fcc40638718961c8e01ffb2eb5ce2c525c742.tar.gz
SERVER-59566 Retry and timeout policy for internal transactions
-rw-r--r--src/mongo/db/transaction_api.cpp44
-rw-r--r--src/mongo/db/transaction_api.h8
-rw-r--r--src/mongo/db/transaction_api_test.cpp129
3 files changed, 169 insertions, 12 deletions
diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp
index 2801efde063..9453b851677 100644
--- a/src/mongo/db/transaction_api.cpp
+++ b/src/mongo/db/transaction_api.cpp
@@ -58,6 +58,10 @@
#include "mongo/stdx/future.h"
#include "mongo/transport/service_entry_point.h"
+// TODO SERVER-65395: Remove failpoint when fle2 tests can reliably support internal transaction
+// retry limit.
+MONGO_FAIL_POINT_DEFINE(skipTransactionApiRetryCheckInHandleError);
+
namespace mongo::txn_api {
namespace {
@@ -168,15 +172,16 @@ StatusWith<CommitResult> TransactionWithRetries::runSyncNoThrow(OperationContext
OperationTimeTracker::get(opCtx)->updateOperationTime(_internalTxn->getOperationTime());
});
- // TODO SERVER-59566 Add a retry policy.
_internalTxn->setCallback(std::move(callback));
+ int bodyAttempts = 0;
while (true) {
+ bodyAttempts++;
{
auto bodyStatus =
getWithYields(opCtx, [&] { return _internalTxn->runCallback(); }, _resourceYielder);
if (!bodyStatus.isOK()) {
- auto nextStep = _internalTxn->handleError(bodyStatus);
+ auto nextStep = _internalTxn->handleError(bodyStatus, bodyAttempts);
logNextStep(nextStep, _internalTxn->reportStateForLog());
if (nextStep == details::Transaction::ErrorHandlingStep::kDoNotRetry) {
@@ -195,7 +200,9 @@ StatusWith<CommitResult> TransactionWithRetries::runSyncNoThrow(OperationContext
}
}
+ int commitAttempts = 0;
while (true) {
+ commitAttempts++;
auto swResult =
getWithYields(opCtx, [&] { return _internalTxn->commit(); }, _resourceYielder);
@@ -204,7 +211,7 @@ StatusWith<CommitResult> TransactionWithRetries::runSyncNoThrow(OperationContext
return swResult;
}
- auto nextStep = _internalTxn->handleError(swResult);
+ auto nextStep = _internalTxn->handleError(swResult, commitAttempts);
logNextStep(nextStep, _internalTxn->reportStateForLog());
if (nextStep == details::Transaction::ErrorHandlingStep::kDoNotRetry) {
@@ -421,8 +428,8 @@ SemiFuture<void> Transaction::runCallback() {
.semi();
}
-Transaction::ErrorHandlingStep Transaction::handleError(
- const StatusWith<CommitResult>& swResult) const {
+Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitResult>& swResult,
+ int attemptCounter) const {
stdx::lock_guard<Latch> lg(_mutex);
LOGV2_DEBUG(5875905,
@@ -432,13 +439,21 @@ Transaction::ErrorHandlingStep Transaction::handleError(
: swResult.getStatus(),
"hasTransientTransactionErrorLabel"_attr =
_latestResponseHasTransientTransactionErrorLabel,
- "txnInfo"_attr = _reportStateForLog(lg));
+ "txnInfo"_attr = _reportStateForLog(lg),
+ "retriesLeft"_attr =
+ kTxnRetryLimit - attemptCounter + 1 // To account for the initial execution.
+ );
if (_execContext == ExecutionContext::kClientTransaction) {
// If we're nested in another transaction, let the outer most client decide on errors.
return ErrorHandlingStep::kDoNotRetry;
}
+ if (!MONGO_unlikely(skipTransactionApiRetryCheckInHandleError.shouldFail()) &&
+ attemptCounter > kTxnRetryLimit) {
+ return ErrorHandlingStep::kAbortAndDoNotRetry;
+ }
+
// The transient transaction error label is always returned in command responses, even for
// internal clients, so we use it to decide when to retry the transaction instead of inspecting
// error codes. The only exception is when a network error was received before commit, handled
@@ -473,9 +488,6 @@ Transaction::ErrorHandlingStep Transaction::handleError(
// retryable write, per the drivers specification.
if (ErrorCodes::isRetriableError(commitStatus) ||
ErrorCodes::isRetriableError(commitWCStatus)) {
- // TODO SERVER-59566: Handle timeouts and max retry attempts. Note commit might be
- // retried within the command itself, e.g. ClusterCommitTransaction uses an idempotent
- // retry policy, so we may want a timeout policy instead of number of retries.
return ErrorHandlingStep::kRetryCommit;
}
}
@@ -512,6 +524,16 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) {
cmdBuilder->append(repl::ReadConcernArgs::kReadConcernFieldName, _readConcern);
}
+ // Append the new recalculated maxTimeMS
+ if (_opDeadline) {
+ uassert(5956600,
+ "Command object passed to the transaction api should not contain maxTimeMS field",
+ !cmdBuilder->hasField(kMaxTimeMSField));
+ auto timeLeftover =
+ std::max(Milliseconds(0), *_opDeadline - _service->getFastClockSource()->now());
+ cmdBuilder->append(kMaxTimeMSField, durationCount<Milliseconds>(timeLeftover));
+ }
+
// If the transaction API caller had API parameters, we should forward them in all requests.
if (_apiParameters.getParamsPassed()) {
_apiParameters.appendInfo(cmdBuilder);
@@ -644,6 +666,10 @@ void Transaction::_primeTransaction(OperationContext* opCtx) {
ReadWriteConcernProvenanceBase::kSourceFieldName);
_apiParameters = APIParameters::get(opCtx);
+ if (opCtx->hasDeadline()) {
+ _opDeadline = opCtx->getDeadline();
+ }
+
LOGV2_DEBUG(5875901,
3,
"Started internal transaction",
diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h
index 48559d0b79e..8eacdbdc95e 100644
--- a/src/mongo/db/transaction_api.h
+++ b/src/mongo/db/transaction_api.h
@@ -46,6 +46,10 @@ class TxnMetadataHooks;
class Transaction;
} // namespace details
+// Max number of retries allowed for a transaction operation.
+static constexpr int kTxnRetryLimit = 10;
+static constexpr auto kMaxTimeMSField = "maxTimeMS";
+
/**
* Encapsulates the command status and write concern error from a response to a commitTransaction
* command.
@@ -367,7 +371,8 @@ public:
* its execution context, e.g. by updating its txnNumber, returning the next step for the
* transaction runner.
*/
- ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult) const;
+ ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult,
+ int attemptCounter) const;
/**
* Returns an object with info about the internal transaction for diagnostics.
@@ -424,6 +429,7 @@ private:
std::unique_ptr<TransactionClient> _txnClient;
Callback _callback;
+ boost::optional<Date_t> _opDeadline;
BSONObj _writeConcern;
BSONObj _readConcern;
APIParameters _apiParameters;
diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp
index a010334ff02..02ee4b128b9 100644
--- a/src/mongo/db/transaction_api_test.cpp
+++ b/src/mongo/db/transaction_api_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/executor_test_util.h"
#include "mongo/util/fail_point.h"
@@ -140,7 +141,7 @@ public:
}
virtual SemiFuture<BSONObj> runCommand(StringData dbName, BSONObj cmd) const override {
- auto cmdBob = BSONObjBuilder(cmd);
+ auto cmdBob = BSONObjBuilder(std::move(cmd));
invariant(_hooks);
_hooks->runRequestHook(&cmdBob);
_lastSentRequest = cmdBob.obj();
@@ -251,7 +252,8 @@ void assertTxnMetadata(BSONObj obj,
TxnNumber txnNumber,
boost::optional<bool> startTransaction,
boost::optional<BSONObj> readConcern = boost::none,
- boost::optional<BSONObj> writeConcern = boost::none) {
+ boost::optional<BSONObj> writeConcern = boost::none,
+ boost::optional<int> maxTimeMS = boost::none) {
ASSERT_EQ(obj["lsid"].type(), BSONType::Object);
ASSERT_EQ(obj["autocommit"].Bool(), false);
ASSERT_EQ(obj["txnNumber"].Long(), txnNumber);
@@ -277,6 +279,12 @@ void assertTxnMetadata(BSONObj obj,
} else {
ASSERT(obj["writeConcern"].eoo());
}
+
+ if (maxTimeMS) {
+ ASSERT_EQ(obj["maxTimeMS"].numberInt(), *maxTimeMS);
+ } else {
+ ASSERT(obj["maxTimeMS"].eoo());
+ }
}
class TxnAPITest : public ServiceContextTest {
@@ -1681,5 +1689,122 @@ TEST_F(TxnAPITest, TestExhaustiveFindErrorOnGetMore) {
ASSERT_EQ(lastRequest["batchSize"].Long(), 2);
ASSERT_EQ(lastRequest["collection"].String(), "bar");
}
+
+TEST_F(TxnAPITest, OwnSession_StartTransactionRetryLimitOnTransientErrors) {
+ int retryCount = 0;
+ auto swResult = txnWithRetries().runSyncNoThrow(
+ opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ retryCount++;
+
+ // Command response used for insert below and eventually abortTransaction.
+ mockClient()->setNextCommandResponse(kOKCommandResponse);
+ auto insertRes = txnClient
+ .runCommand("user"_sd,
+ BSON("insert"
+ << "foo"
+ << "documents" << BSON_ARRAY(BSON("x" << 1))))
+ .get();
+ uasserted(ErrorCodes::HostUnreachable, "Host unreachable error");
+ return SemiFuture<void>::makeReady();
+ });
+ // The transient error should have been propagated.
+ ASSERT_EQ(swResult.getStatus(), ErrorCodes::HostUnreachable);
+
+ // We get 11 due to the initial try and then 10 follow up retries.
+ ASSERT_EQ(retryCount, 11);
+
+ auto lastRequest = mockClient()->getLastSentRequest();
+ assertTxnMetadata(lastRequest,
+ 10 /* txnNumber */,
+ boost::none /* startTransaction */,
+ boost::none /* readConcern */,
+ WriteConcernOptions().toBSON() /* writeConcern */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+ ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "abortTransaction"_sd);
+}
+
+TEST_F(TxnAPITest, OwnSession_CommitTransactionRetryLimitOnTransientErrors) {
+ // If we are able to successfully finish this test, then we know that we have limited our
+ // retries.
+ auto swResult = txnWithRetries().runSyncNoThrow(
+ opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ mockClient()->setNextCommandResponse(kOKInsertResponse);
+ auto insertRes = txnClient
+ .runCommand("user"_sd,
+ BSON("insert"
+ << "foo"
+ << "documents" << BSON_ARRAY(BSON("x" << 1))))
+ .get();
+ ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
+ assertTxnMetadata(
+ mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+
+ // The commit response.
+ mockClient()->setNextCommandResponse(
+ BSON("ok" << 0 << "code" << ErrorCodes::PrimarySteppedDown));
+ return SemiFuture<void>::makeReady();
+ });
+
+ // The transient error should have been propagated.
+ ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::PrimarySteppedDown);
+ auto lastRequest = mockClient()->getLastSentRequest();
+ assertTxnMetadata(lastRequest,
+ 0 /* txnNumber */,
+ boost::none /* startTransaction */,
+ boost::none /* readConcern */,
+ WriteConcernOptions().toBSON() /* writeConcern */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+ ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "abortTransaction"_sd);
+}
+
+TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) {
+ const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>();
+ mockClock->reset(getServiceContext()->getFastClockSource()->now());
+ getServiceContext()->setFastClockSource(std::make_unique<SharedClockSourceAdapter>(mockClock));
+ int maxTimeMS = 2000;
+ opCtx()->setDeadlineByDate(mockClock->now() + Milliseconds(maxTimeMS),
+ ErrorCodes::MaxTimeMSExpired);
+
+ // txnNumber will be incremented upon the release of a session.
+ resetTxnWithRetries();
+
+ auto swResult = txnWithRetries().runSyncNoThrow(
+ opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ mockClient()->setNextCommandResponse(kOKInsertResponse);
+ auto insertRes = txnClient
+ .runCommand("user"_sd,
+ BSON("insert"
+ << "foo"
+ << "documents" << BSON_ARRAY(BSON("x" << 1))))
+ .get();
+ ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
+ assertTxnMetadata(mockClient()->getLastSentRequest(),
+ 1 /* txnNumber */,
+ true /* startTransaction */,
+ boost::none /* readConcern */,
+ boost::none /* writeConcern */,
+ maxTimeMS /* maxTimeMS */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+
+ mockClock->advance(Milliseconds(1000));
+
+ // The commit response.
+ mockClient()->setNextCommandResponse(kOKCommandResponse);
+ return SemiFuture<void>::makeReady();
+ });
+ ASSERT(swResult.getStatus().isOK());
+ ASSERT(swResult.getValue().getEffectiveStatus().isOK());
+
+ auto lastRequest = mockClient()->getLastSentRequest();
+ assertTxnMetadata(lastRequest,
+ 1 /* txnNumber */,
+ boost::none /* startTransaction */,
+ boost::none /* readConcern */,
+ WriteConcernOptions().toBSON() /* writeConcern */,
+ 1000 /* maxTimeMS */);
+ assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+ ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd);
+}
} // namespace
} // namespace mongo