summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-10-11 18:54:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-11 19:38:20 +0000
commitdccba0661624787d8f398058928b7a29a31a2a86 (patch)
tree24b66cefce9a3db8bd94441b291235ad83bc789c /src
parent13f0ae71f634409f2e219616ac489b45057d56bb (diff)
downloadmongo-dccba0661624787d8f398058928b7a29a31a2a86.tar.gz
SERVER-60323 Make TransactionCoordinator support txnRetryCounter
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/logical_session_id.h30
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp73
-rw-r--r--src/mongo/db/s/transaction_coordinator.h12
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog_test.cpp13
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp10
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp568
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.h1
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.cpp290
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.h44
10 files changed, 709 insertions, 333 deletions
diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h
index 3270eece38e..8e04e774a40 100644
--- a/src/mongo/db/logical_session_id.h
+++ b/src/mongo/db/logical_session_id.h
@@ -132,4 +132,34 @@ using LogicalSessionRecordSet = stdx::unordered_set<LogicalSessionRecord, Logica
template <typename T>
using LogicalSessionIdMap = stdx::unordered_map<LogicalSessionId, T, LogicalSessionIdHash>;
+class TxnNumberAndRetryCounter {
+public:
+ TxnNumberAndRetryCounter(TxnNumber txnNumber, TxnRetryCounter txnRetryCounter)
+ : _txnNumber(txnNumber), _txnRetryCounter(txnRetryCounter) {}
+
+ TxnNumberAndRetryCounter(TxnNumber txnNumber)
+ : _txnNumber(txnNumber), _txnRetryCounter(boost::none) {}
+
+ BSONObj toBSON() const {
+ BSONObjBuilder bob;
+ bob.append(OperationSessionInfo::kTxnNumberFieldName, _txnNumber);
+ if (_txnRetryCounter) {
+ bob.append(OperationSessionInfo::kTxnRetryCounterFieldName, *_txnRetryCounter);
+ }
+ return bob.obj();
+ }
+
+ const TxnNumber getTxnNumber() const {
+ return _txnNumber;
+ }
+
+ const boost::optional<TxnRetryCounter> getTxnRetryCounter() const {
+ return _txnRetryCounter;
+ }
+
+private:
+ const TxnNumber _txnNumber;
+ const boost::optional<TxnRetryCounter> _txnRetryCounter;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 39d418721d5..3a51bcbf981 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -178,6 +178,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/server_status',
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
'$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/not_primary_error_tracker',
'$BUILD_DIR/mongo/db/repl/wait_for_majority_service',
'$BUILD_DIR/mongo/db/rw_concern_d',
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 35d7f5f00f8..beaeadac455 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -88,19 +88,21 @@ ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service,
} // namespace
-TransactionCoordinator::TransactionCoordinator(OperationContext* operationContext,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
- Date_t deadline)
+TransactionCoordinator::TransactionCoordinator(
+ OperationContext* operationContext,
+ const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
+ std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
+ Date_t deadline)
: _serviceContext(operationContext->getServiceContext()),
_lsid(lsid),
- _txnNumber(txnNumber),
+ _txnNumberAndRetryCounter(txnNumberAndRetryCounter),
_scheduler(std::move(scheduler)),
_sendPrepareScheduler(_scheduler->makeChildScheduler()),
_transactionCoordinatorMetricsObserver(
std::make_unique<TransactionCoordinatorMetricsObserver>()),
_deadline(deadline) {
+ invariant(_txnNumberAndRetryCounter.getTxnRetryCounter());
auto apiParams = APIParameters::get(operationContext);
auto kickOffCommitPF = makePromiseFuture<void>();
@@ -116,7 +118,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
1,
"TransactionCoordinator deadline reached",
"sessionId"_attr = _lsid.getId(),
- "txnNumber"_attr = _txnNumber);
+ "txnNumberAndRetryCounter"_attr =
+ _txnNumberAndRetryCounter);
cancelIfCommitNotYetStarted();
// See the comments for sendPrepare about the purpose of this
@@ -167,7 +170,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
}
return txn::persistParticipantsList(
- *_sendPrepareScheduler, _lsid, _txnNumber, *_participants);
+ *_sendPrepareScheduler, _lsid, _txnNumberAndRetryCounter, *_participants);
})
.then([this](repl::OpTime opTime) {
return waitForMajorityWithHangFailpoint(
@@ -207,7 +210,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
return txn::sendPrepare(_serviceContext,
*_sendPrepareScheduler,
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
apiParams,
*_participants)
.then([this](PrepareVoteConsensus consensus) mutable {
@@ -217,14 +220,15 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
}
if (_decision->getDecision() == CommitDecision::kCommit) {
- LOGV2_DEBUG(22446,
- 3,
- "{sessionId}:{txnNumber} Advancing cluster time to "
- "the commit timestamp {commitTimestamp}",
- "Advancing cluster time to the commit timestamp",
- "sessionId"_attr = _lsid.getId(),
- "txnNumber"_attr = _txnNumber,
- "commitTimestamp"_attr = *_decision->getCommitTimestamp());
+ LOGV2_DEBUG(
+ 22446,
+ 3,
+ "{sessionId}:{_txnNumberAndRetryCounter} Advancing cluster time to "
+ "the commit timestamp {commitTimestamp}",
+ "Advancing cluster time to the commit timestamp",
+ "sessionId"_attr = _lsid.getId(),
+ "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter,
+ "commitTimestamp"_attr = *_decision->getCommitTimestamp());
VectorClockMutable::get(_serviceContext)
->tickClusterTimeTo(LogicalTime(*_decision->getCommitTimestamp()));
@@ -232,13 +236,13 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
});
})
.onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>(
- [this, lsid, txnNumber](const Status& status) {
+ [this, lsid, txnNumberAndRetryCounter](const Status& status) {
// Timeout happened, propagate the decision to abort the transaction to replicas
// and convert the internal error code to the public one.
LOGV2(5047001,
"Transaction coordinator made abort decision",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"status"_attr = redact(status));
stdx::lock_guard<Latch> lg(_mutex);
_decision = txn::PrepareVote::kAbort;
@@ -266,7 +270,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
return Future<repl::OpTime>::makeReady(repl::OpTime());
}
- return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision);
+ return txn::persistDecision(
+ *_scheduler, _lsid, _txnNumberAndRetryCounter, *_participants, *_decision);
})
.then([this](repl::OpTime opTime) {
switch (_decision->getDecision()) {
@@ -313,14 +318,18 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
return txn::sendCommit(_serviceContext,
*_scheduler,
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
apiParams,
*_participants,
*_decision->getCommitTimestamp());
}
case CommitDecision::kAbort: {
- return txn::sendAbort(
- _serviceContext, *_scheduler, _lsid, _txnNumber, apiParams, *_participants);
+ return txn::sendAbort(_serviceContext,
+ *_scheduler,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ apiParams,
+ *_participants);
}
default:
MONGO_UNREACHABLE;
@@ -340,7 +349,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
_serviceContext->getPreciseClockSource()->now());
}
- return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber);
+ return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter);
})
.getAsync([this, deadlineFuture = std::move(deadlineFuture)](Status s) mutable {
// Interrupt this coordinator's scheduler hierarchy and join the deadline task's future
@@ -418,15 +427,16 @@ void TransactionCoordinator::_done(Status status) {
// *receiving* node was stepping down.
if (status == ErrorCodes::TransactionCoordinatorSteppingDown)
status = Status(ErrorCodes::InterruptedDueToReplStateChange,
- str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber
+ str::stream() << "Coordinator " << _lsid.getId() << ':'
+ << _txnNumberAndRetryCounter.toBSON()
<< " stopped due to: " << status.reason());
LOGV2_DEBUG(22447,
3,
- "{sessionId}:{txnNumber} Two-phase commit completed with {status}",
+ "{sessionId}:{_txnNumberAndRetryCounter} Two-phase commit completed with {status}",
"Two-phase commit completed",
"sessionId"_attr = _lsid.getId(),
- "txnNumber"_attr = _txnNumber,
+ "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter,
"status"_attr = redact(status));
stdx::unique_lock<Latch> ul(_mutex);
@@ -472,7 +482,8 @@ void TransactionCoordinator::_logSlowTwoPhaseCommit(
_lsid.serialize(&lsidBuilder);
lsidBuilder.doneFast();
- parametersBuilder.append("txnNumber", _txnNumber);
+ parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber());
+ parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter());
attrs.add("parameters", parametersBuilder.obj());
@@ -543,7 +554,8 @@ std::string TransactionCoordinator::_twoPhaseCommitInfoForLog(
_lsid.serialize(&lsidBuilder);
lsidBuilder.doneFast();
- parametersBuilder.append("txnNumber", _txnNumber);
+ parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber());
+ parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter());
s << " parameters:" << parametersBuilder.obj().toString();
@@ -616,7 +628,8 @@ void TransactionCoordinator::reportState(BSONObjBuilder& parent) const {
BSONObjBuilder lsidBuilder(doc.subobjStart("lsid"));
_lsid.serialize(&lsidBuilder);
lsidBuilder.doneFast();
- doc.append("txnNumber", _txnNumber);
+ doc.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber());
+ doc.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter());
if (_participants) {
doc.append("numParticipants", static_cast<long long>(_participants->size()));
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index cdc76424cbb..8adea56aab2 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -40,7 +40,7 @@ class TransactionCoordinatorMetricsObserver;
/**
* State machine, which implements the two-phase commit protocol for a specific transaction,
- * identified by lsid + txnNumber.
+ * identified by lsid + txnNumber + txnRetryCounter.
*
* The lifetime of a coordinator starts with a construction and ends with the `onCompletion()`
* future getting signaled. It is illegal to destroy a coordinator without waiting for
@@ -64,8 +64,8 @@ public:
};
/**
- * Instantiates a new TransactioncCoordinator for the specified lsid + txnNumber pair and gives
- * it a 'scheduler' to use for any asynchronous tasks it spawns.
+ * Instantiates a new TransactionCoordinator for the specified lsid + txnNumber +
+ * txnRetryCounter and gives it a 'scheduler' to use for any asynchronous tasks it spawns.
*
* If the 'coordinateCommitDeadline' parameter is specified, a timed task will be scheduled to
* cause the coordinator to be put in a cancelled state, if runCommit is not eventually
@@ -73,7 +73,7 @@ public:
*/
TransactionCoordinator(OperationContext* operationContext,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
Date_t deadline);
@@ -151,9 +151,9 @@ private:
// Shortcut to the service context under which this coordinator runs
ServiceContext* const _serviceContext;
- // The lsid + transaction number that this coordinator is coordinating
+ // The lsid + txnNumber + txnRetryCounter that this coordinator is coordinating.
const LogicalSessionId _lsid;
- const TxnNumber _txnNumber;
+ const TxnNumberAndRetryCounter _txnNumberAndRetryCounter;
// Scheduler and context wrapping all asynchronous work dispatched by this coordinator
std::unique_ptr<txn::AsyncWorkScheduler> _scheduler;
diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
index 891191ab220..1044412c91b 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
@@ -59,7 +59,7 @@ protected:
auto newCoordinator = std::make_shared<TransactionCoordinator>(
operationContext(),
lsid,
- txnNumber,
+ TxnNumberAndRetryCounter{txnNumber, 0},
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
@@ -172,11 +172,12 @@ TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoC
TransactionCoordinatorCatalog catalog;
catalog.exitStepUp(Status::OK());
- auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(),
- lsid,
- txnNumber,
- aws.makeChildScheduler(),
- network()->now() + Seconds{5});
+ auto coordinator =
+ std::make_shared<TransactionCoordinator>(operationContext(),
+ lsid,
+ TxnNumberAndRetryCounter{txnNumber, 0},
+ aws.makeChildScheduler(),
+ network()->now() + Seconds{5});
aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Test step down"});
catalog.onStepDown();
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index 349db0ac079..cbefeb5eb25 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -80,8 +80,12 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
latestCoordinator->cancelIfCommitNotYetStarted();
}
- auto coordinator = std::make_shared<TransactionCoordinator>(
- opCtx, lsid, txnNumber, scheduler.makeChildScheduler(), commitDeadline);
+ auto coordinator =
+ std::make_shared<TransactionCoordinator>(opCtx,
+ lsid,
+ TxnNumberAndRetryCounter{txnNumber, 0},
+ scheduler.makeChildScheduler(),
+ commitDeadline);
try {
catalog.insert(opCtx, lsid, txnNumber, coordinator);
@@ -240,7 +244,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
auto coordinator = std::make_shared<TransactionCoordinator>(
opCtx,
lsid,
- txnNumber,
+ TxnNumberAndRetryCounter{txnNumber, 0},
scheduler.makeChildScheduler(),
clockSource->now() + Seconds(gTransactionLifetimeLimitSeconds.load()));
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index 602165fceb2..43fe5c3e48a 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/s/transaction_coordinator_document_gen.h"
#include "mongo/db/s/transaction_coordinator_metrics_observer.h"
#include "mongo/db/s/transaction_coordinator_test_fixture.h"
+#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/log_test.h"
#include "mongo/unittest/unittest.h"
@@ -177,7 +178,8 @@ protected:
}
LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
- TxnNumber _txnNumber{1};
+ TxnNumberAndRetryCounter _txnNumberAndRetryCounter{1, 1};
+ RAIIServerParameterControllerForTest _controller{"featureFlagInternalTransactions", true};
};
class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase {
@@ -194,25 +196,28 @@ protected:
boost::optional<txn::AsyncWorkScheduler> _aws;
};
-auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnNumber) {
+auto makeDummyPrepareCommand(const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
PrepareTransaction prepareCmd;
prepareCmd.setDbName(NamespaceString::kAdminDb);
auto prepareObj = prepareCmd.toBSON(
- BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
- << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
-
+ BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumberAndRetryCounter.getTxnNumber()
+ << "txnRetryCounter" << *txnNumberAndRetryCounter.getTxnRetryCounter()
+ << "autocommit" << false << WriteConcernOptions::kWriteConcernField
+ << WriteConcernOptions::Majority));
return prepareObj;
}
TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = txn::sendDecisionToShard(getServiceContext(),
- aws,
- _lsid,
- _txnNumber,
- kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future =
+ txn::sendDecisionToShard(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ kTwoShardIdList[0],
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithSuccess();
@@ -223,12 +228,13 @@ TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOn
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = txn::sendDecisionToShard(getServiceContext(),
- aws,
- _lsid,
- _txnNumber,
- kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future =
+ txn::sendDecisionToShard(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ kTwoShardIdList[0],
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
@@ -242,12 +248,13 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = txn::sendDecisionToShard(getServiceContext(),
- aws,
- _lsid,
- _txnNumber,
- kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future =
+ txn::sendDecisionToShard(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ kTwoShardIdList[0],
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithRetryableError();
@@ -260,12 +267,13 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = txn::sendDecisionToShard(getServiceContext(),
- aws,
- _lsid,
- _txnNumber,
- kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future =
+ txn::sendDecisionToShard(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ kTwoShardIdList[0],
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -275,12 +283,13 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardCanBeInterruptedAndReturnsError) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = txn::sendDecisionToShard(getServiceContext(),
- aws,
- _lsid,
- _txnNumber,
- kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future =
+ txn::sendDecisionToShard(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ kTwoShardIdList[0],
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
assertPrepareSentAndRespondWithRetryableError();
aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"});
@@ -296,9 +305,9 @@ TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecision
txn::sendPrepareToShard(getServiceContext(),
aws,
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithSuccess();
@@ -315,9 +324,9 @@ TEST_F(TransactionCoordinatorDriverTest,
txn::sendPrepareToShard(getServiceContext(),
aws,
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
@@ -337,9 +346,9 @@ TEST_F(TransactionCoordinatorDriverTest,
txn::sendPrepareToShard(getServiceContext(),
aws,
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
assertPrepareSentAndRespondWithRetryableError();
const auto shutdownStatus =
@@ -360,9 +369,9 @@ TEST_F(TransactionCoordinatorDriverTest,
txn::sendPrepareToShard(getServiceContext(),
aws,
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
assertPrepareSentAndRespondWithRetryableError();
aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Service shutting down"});
@@ -375,12 +384,13 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) {
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepareToShard(getServiceContext(),
- aws,
- _lsid,
- _txnNumber,
- kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ auto future =
+ txn::sendPrepareToShard(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ kTwoShardIdList[0],
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -394,12 +404,13 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) {
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepareToShard(getServiceContext(),
- aws,
- _lsid,
- _txnNumber,
- kTwoShardIdList[0],
- makeDummyPrepareCommand(_lsid, _txnNumber));
+ auto future =
+ txn::sendPrepareToShard(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ kTwoShardIdList[0],
+ makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter));
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -414,8 +425,12 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) {
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepare(
- getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList);
+ auto future = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kTwoShardIdList);
onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
@@ -430,8 +445,12 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) {
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepare(
- getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList);
+ auto future = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess();
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -445,8 +464,12 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) {
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepare(
- getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList);
+ auto future = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kTwoShardIdList);
onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }});
@@ -463,8 +486,12 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto maxPrepareTimestamp = Timestamp(2, 1);
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepare(
- getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList);
+ auto future = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
@@ -481,8 +508,12 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto maxPrepareTimestamp = Timestamp(2, 1);
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepare(
- getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList);
+ auto future = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
@@ -499,8 +530,12 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto maxPrepareTimestamp = Timestamp(2, 1);
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepare(
- getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList);
+ auto future = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
@@ -516,8 +551,12 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto timestamp = Timestamp(1, 1);
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepare(
- getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList);
+ auto future = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess(timestamp);
assertCommandSentAndRespondWith(
@@ -533,8 +572,12 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsErrorWhenOneShardReturnsReadConcernMajorityNotEnabled) {
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = txn::sendPrepare(
- getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList);
+ auto future = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess(Timestamp(100, 1));
assertCommandSentAndRespondWith(
@@ -550,6 +593,95 @@ TEST_F(TransactionCoordinatorDriverTest,
ASSERT_EQ(ErrorCodes::ReadConcernMajorityNotEnabled, decision.getAbortStatus()->code());
}
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareAndDecisionAttachTxnRetryCounterIfFeatureFlagIsEnabled) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto prepareFuture = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kOneShardIdList);
+ onCommands({[&](const executor::RemoteCommandRequest& request) {
+ ASSERT_TRUE(request.cmdObj.hasField("txnRetryCounter"));
+ ASSERT_EQUALS(request.cmdObj.getIntField("txnRetryCounter"),
+ *_txnNumberAndRetryCounter.getTxnRetryCounter());
+ return kNoSuchTransaction;
+ }});
+ prepareFuture.get();
+
+ auto commitFuture = txn::sendCommit(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kOneShardIdList,
+ {});
+ onCommands({[&](const executor::RemoteCommandRequest& request) {
+ ASSERT_TRUE(request.cmdObj.hasField("txnRetryCounter"));
+ ASSERT_EQUALS(request.cmdObj.getIntField("txnRetryCounter"),
+ *_txnNumberAndRetryCounter.getTxnRetryCounter());
+ return kNoSuchTransaction;
+ }});
+ commitFuture.get();
+
+ auto abortFuture = txn::sendAbort(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kOneShardIdList);
+ onCommands({[&](const executor::RemoteCommandRequest& request) {
+ ASSERT_TRUE(request.cmdObj.hasField("txnRetryCounter"));
+ ASSERT_EQUALS(request.cmdObj.getIntField("txnRetryCounter"),
+ *_txnNumberAndRetryCounter.getTxnRetryCounter());
+ return kNoSuchTransaction;
+ }});
+ abortFuture.get();
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareAndDecisionDoesNotAttachTxnRetryCounterIfFeatureFlagIsNotEnabled) {
+ RAIIServerParameterControllerForTest controller{"featureFlagInternalTransactions", false};
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto prepareFuture = txn::sendPrepare(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kOneShardIdList);
+ onCommands({[&](const executor::RemoteCommandRequest& request) {
+ ASSERT_FALSE(request.cmdObj.hasField("txnRetryCounter"));
+ return kNoSuchTransaction;
+ }});
+ prepareFuture.get();
+
+ auto commitFuture = txn::sendCommit(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kOneShardIdList,
+ {});
+ onCommands({[&](const executor::RemoteCommandRequest& request) {
+ ASSERT_FALSE(request.cmdObj.hasField("txnRetryCounter"));
+ return kNoSuchTransaction;
+ }});
+ commitFuture.get();
+
+ auto abortFuture = txn::sendAbort(getServiceContext(),
+ aws,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ APIParameters(),
+ kOneShardIdList);
+ onCommands({[&](const executor::RemoteCommandRequest& request) {
+ ASSERT_FALSE(request.cmdObj.hasField("txnRetryCounter"));
+ return kNoSuchTransaction;
+ }});
+ abortFuture.get();
+}
+
class TransactionCoordinatorDriverPersistenceTest : public TransactionCoordinatorDriverTest {
protected:
void setUp() override {
@@ -565,14 +697,17 @@ protected:
static void assertDocumentMatches(
TransactionCoordinatorDocument doc,
LogicalSessionId expectedLsid,
- TxnNumber expectedTxnNum,
+ TxnNumberAndRetryCounter expectedTxnNumberAndRetryCounter,
std::vector<ShardId> expectedParticipants,
boost::optional<txn::CommitDecision> expectedDecision = boost::none,
boost::optional<Timestamp> expectedCommitTimestamp = boost::none) {
ASSERT(doc.getId().getSessionId());
ASSERT_EQUALS(*doc.getId().getSessionId(), expectedLsid);
ASSERT(doc.getId().getTxnNumber());
- ASSERT_EQUALS(*doc.getId().getTxnNumber(), expectedTxnNum);
+ ASSERT_EQUALS(*doc.getId().getTxnNumber(), expectedTxnNumberAndRetryCounter.getTxnNumber());
+ ASSERT(doc.getId().getTxnRetryCounter());
+ ASSERT_EQUALS(*doc.getId().getTxnRetryCounter(),
+ *expectedTxnNumberAndRetryCounter.getTxnRetryCounter());
ASSERT(doc.getParticipants() == expectedParticipants);
@@ -593,23 +728,23 @@ protected:
void persistParticipantListExpectSuccess(OperationContext* opCtx,
LogicalSessionId lsid,
- TxnNumber txnNumber,
+ TxnNumberAndRetryCounter txnNumberAndRetryCounter,
const std::vector<ShardId>& participants) {
- txn::persistParticipantsList(*_aws, lsid, txnNumber, participants).get();
+ txn::persistParticipantsList(*_aws, lsid, txnNumberAndRetryCounter, participants).get();
auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
- assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumber, participants);
+ assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumberAndRetryCounter, participants);
}
void persistDecisionExpectSuccess(OperationContext* opCtx,
LogicalSessionId lsid,
- TxnNumber txnNumber,
+ TxnNumberAndRetryCounter txnNumberAndRetryCounter,
const std::vector<ShardId>& participants,
const boost::optional<Timestamp>& commitTimestamp) {
txn::persistDecision(*_aws,
lsid,
- txnNumber,
+ txnNumberAndRetryCounter,
participants,
[&] {
txn::CoordinatorCommitDecision decision;
@@ -630,20 +765,23 @@ protected:
if (commitTimestamp) {
assertDocumentMatches(allCoordinatorDocs[0],
lsid,
- txnNumber,
+ txnNumberAndRetryCounter,
participants,
txn::CommitDecision::kCommit,
*commitTimestamp);
} else {
- assertDocumentMatches(
- allCoordinatorDocs[0], lsid, txnNumber, participants, txn::CommitDecision::kAbort);
+ assertDocumentMatches(allCoordinatorDocs[0],
+ lsid,
+ txnNumberAndRetryCounter,
+ participants,
+ txn::CommitDecision::kAbort);
}
}
void deleteCoordinatorDocExpectSuccess(OperationContext* opCtx,
LogicalSessionId lsid,
- TxnNumber txnNumber) {
- txn::deleteCoordinatorDoc(*_aws, lsid, txnNumber).get();
+ TxnNumberAndRetryCounter txnNumberAndRetryCounter) {
+ txn::deleteCoordinatorDoc(*_aws, lsid, txnNumberAndRetryCounter).get();
auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(0));
@@ -659,13 +797,16 @@ protected:
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistParticipantListWhenNoDocumentForTransactionExistsSucceeds) {
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistParticipantListWhenMatchingDocumentForTransactionExistsSucceeds) {
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
@@ -673,12 +814,13 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
auto opCtx = operationContext();
std::vector<ShardId> participants{
ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")};
- persistParticipantListExpectSuccess(opCtx, _lsid, _txnNumber, participants);
+ persistParticipantListExpectSuccess(opCtx, _lsid, _txnNumberAndRetryCounter, participants);
// We should retry until shutdown. The original participants should be persisted.
std::vector<ShardId> smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")};
- auto future = txn::persistParticipantsList(*_aws, _lsid, _txnNumber, smallerParticipantList);
+ auto future = txn::persistParticipantsList(
+ *_aws, _lsid, _txnNumberAndRetryCounter, smallerParticipantList);
_aws->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"});
advanceClockAndExecuteScheduledTasks();
@@ -687,24 +829,37 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
- assertDocumentMatches(allCoordinatorDocs[0], _lsid, _txnNumber, participants);
+ assertDocumentMatches(allCoordinatorDocs[0], _lsid, _txnNumberAndRetryCounter, participants);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistParticipantListForMultipleTransactionsOnSameSession) {
for (int i = 1; i <= 3; i++) {
- auto txnNumber = TxnNumber{i};
- txn::persistParticipantsList(*_aws, _lsid, txnNumber, _participants).get();
+ txn::persistParticipantsList(
+ *_aws, _lsid, {i, *_txnNumberAndRetryCounter.getTxnRetryCounter()}, _participants)
+ .get();
auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
}
}
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistParticipantListForOneTransactionMultipleTxnRetryCountersOnSameSession) {
+ const auto numRetries = 3;
+ for (int i = 1; i <= numRetries; i++) {
+ txn::persistParticipantsList(
+ *_aws, _lsid, {_txnNumberAndRetryCounter.getTxnNumber(), i}, _participants)
+ .get();
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
+ }
+}
+
TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMultipleSessions) {
for (int i = 1; i <= 3; i++) {
auto lsid = makeLogicalSessionIdForTest();
- txn::persistParticipantsList(*_aws, lsid, _txnNumber, _participants).get();
+ txn::persistParticipantsList(*_aws, lsid, _txnNumberAndRetryCounter, _participants).get();
auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
@@ -713,19 +868,29 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMul
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistAbortDecisionWhenDocumentExistsWithoutDecisionSucceeds) {
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
- persistDecisionExpectSuccess(
- operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ persistDecisionExpectSuccess(operationContext(),
+ _lsid,
+ _txnNumberAndRetryCounter,
+ _participants,
+ boost::none /* abort */);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistAbortDecisionWhenDocumentExistsWithSameDecisionSucceeds) {
-
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
- persistDecisionExpectSuccess(
- operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
- persistDecisionExpectSuccess(
- operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ persistDecisionExpectSuccess(operationContext(),
+ _lsid,
+ _txnNumberAndRetryCounter,
+ _participants,
+ boost::none /* abort */);
+ persistDecisionExpectSuccess(operationContext(),
+ _lsid,
+ _txnNumberAndRetryCounter,
+ _participants,
+ boost::none /* abort */);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
@@ -734,7 +899,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
{
FailPointEnableBlock failpoint("hangBeforeWritingDecision");
- future = txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, [&] {
+ future = txn::persistDecision(*_aws, _lsid, _txnNumberAndRetryCounter, _participants, [&] {
txn::CoordinatorCommitDecision decision(txn::CommitDecision::kCommit);
decision.setCommitTimestamp(_commitTimestamp);
return decision;
@@ -749,56 +914,108 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistCommitDecisionWhenDocumentExistsWithoutDecisionSucceeds) {
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
- persistDecisionExpectSuccess(
- operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ persistDecisionExpectSuccess(operationContext(),
+ _lsid,
+ _txnNumberAndRetryCounter,
+ _participants,
+ _commitTimestamp /* commit */);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistCommitDecisionWhenDocumentExistsWithSameDecisionSucceeds) {
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
- persistDecisionExpectSuccess(
- operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
- persistDecisionExpectSuccess(
- operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ persistDecisionExpectSuccess(operationContext(),
+ _lsid,
+ _txnNumberAndRetryCounter,
+ _participants,
+ _commitTimestamp /* commit */);
+ persistDecisionExpectSuccess(operationContext(),
+ _lsid,
+ _txnNumberAndRetryCounter,
+ _participants,
+ _commitTimestamp /* commit */);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenNoDocumentExistsFails) {
+ ASSERT_THROWS_CODE(txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumberAndRetryCounter).get(),
+ AssertionException,
+ 51027);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ DeleteCoordinatorDocWhenDocumentExistsWithDifferentTxnNumberFails) {
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ ASSERT_THROWS_CODE(txn::deleteCoordinatorDoc(*_aws,
+ _lsid,
+ {_txnNumberAndRetryCounter.getTxnNumber() + 1,
+ *_txnNumberAndRetryCounter.getTxnRetryCounter()})
+ .get(),
+ AssertionException,
+ 51027);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ DeleteCoordinatorDocWhenDocumentExistsWithDifferentTxnRetryCounterFails) {
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
ASSERT_THROWS_CODE(
- txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumber).get(), AssertionException, 51027);
+ txn::deleteCoordinatorDoc(*_aws,
+ _lsid,
+ {_txnNumberAndRetryCounter.getTxnNumber(),
+ *_txnNumberAndRetryCounter.getTxnRetryCounter() + 1})
+ .get(),
+ AssertionException,
+ 51027);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
DeleteCoordinatorDocWhenDocumentExistsWithoutDecisionFails) {
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
- ASSERT_THROWS_CODE(
- txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumber).get(), AssertionException, 51027);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ ASSERT_THROWS_CODE(txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumberAndRetryCounter).get(),
+ AssertionException,
+ 51027);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
DeleteCoordinatorDocWhenDocumentExistsWithAbortDecisionSucceeds) {
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
- persistDecisionExpectSuccess(
- operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
- deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumber);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ persistDecisionExpectSuccess(operationContext(),
+ _lsid,
+ _txnNumberAndRetryCounter,
+ _participants,
+ boost::none /* abort */);
+ deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumberAndRetryCounter);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
DeleteCoordinatorDocWhenDocumentExistsWithCommitDecisionSucceeds) {
- persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
- persistDecisionExpectSuccess(
- operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
- deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumber);
+ persistParticipantListExpectSuccess(
+ operationContext(), _lsid, _txnNumberAndRetryCounter, _participants);
+ persistDecisionExpectSuccess(operationContext(),
+ _lsid,
+ _txnNumberAndRetryCounter,
+ _participants,
+ _commitTimestamp /* commit */);
+ deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumberAndRetryCounter);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
- MultipleCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) {
- const auto txnNumber1 = TxnNumber{4};
- const auto txnNumber2 = TxnNumber{5};
+ MultipleTxnNumbersCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) {
+ const TxnNumberAndRetryCounter txnNumberAndRetryCounter1{
+ _txnNumberAndRetryCounter.getTxnNumber(), *_txnNumberAndRetryCounter.getTxnRetryCounter()};
+ const TxnNumberAndRetryCounter txnNumberAndRetryCounter2{
+ _txnNumberAndRetryCounter.getTxnNumber() + 1,
+ *_txnNumberAndRetryCounter.getTxnRetryCounter()};
// Insert coordinator documents for two transactions.
- txn::persistParticipantsList(*_aws, _lsid, txnNumber1, _participants).get();
- txn::persistParticipantsList(*_aws, _lsid, txnNumber2, _participants).get();
+ txn::persistParticipantsList(*_aws, _lsid, txnNumberAndRetryCounter1, _participants).get();
+ txn::persistParticipantsList(*_aws, _lsid, txnNumberAndRetryCounter2, _participants).get();
auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2));
@@ -807,7 +1024,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
// document still exists.
txn::persistDecision(*_aws,
_lsid,
- txnNumber1,
+ txnNumberAndRetryCounter1,
_participants,
[&] {
txn::CoordinatorCommitDecision decision(txn::CommitDecision::kAbort);
@@ -816,13 +1033,48 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
return decision;
}())
.get();
- txn::deleteCoordinatorDoc(*_aws, _lsid, txnNumber1).get();
+ txn::deleteCoordinatorDoc(*_aws, _lsid, txnNumberAndRetryCounter1).get();
allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
- assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumber2, _participants);
+ assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumberAndRetryCounter2, _participants);
}
+TEST_F(
+ TransactionCoordinatorDriverPersistenceTest,
+ MultipleTxnRetryCountersCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) {
+ const TxnNumberAndRetryCounter txnNumberAndRetryCounter1{
+ _txnNumberAndRetryCounter.getTxnNumber(), *_txnNumberAndRetryCounter.getTxnRetryCounter()};
+ const TxnNumberAndRetryCounter txnNumberAndRetryCounter2{
+ _txnNumberAndRetryCounter.getTxnNumber(),
+ *_txnNumberAndRetryCounter.getTxnRetryCounter() + 1};
+
+ // Insert coordinator documents for two transactions.
+ txn::persistParticipantsList(*_aws, _lsid, txnNumberAndRetryCounter1, _participants).get();
+ txn::persistParticipantsList(*_aws, _lsid, txnNumberAndRetryCounter2, _participants).get();
+
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2));
+
+ // Delete the document for the first transaction and check that only the second transaction's
+ // document still exists.
+ txn::persistDecision(*_aws,
+ _lsid,
+ txnNumberAndRetryCounter1,
+ _participants,
+ [&] {
+ txn::CoordinatorCommitDecision decision(txn::CommitDecision::kAbort);
+ decision.setAbortStatus(
+ Status(ErrorCodes::NoSuchTransaction, "Test abort error"));
+ return decision;
+ }())
+ .get();
+ txn::deleteCoordinatorDoc(*_aws, _lsid, txnNumberAndRetryCounter1).get();
+
+ allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
+ assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumberAndRetryCounter2, _participants);
+}
using TransactionCoordinatorTest = TransactionCoordinatorTestBase;
@@ -830,7 +1082,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitRes
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -852,7 +1104,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommi
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -875,7 +1127,7 @@ TEST_F(TransactionCoordinatorTest,
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -898,7 +1150,7 @@ TEST_F(TransactionCoordinatorTest,
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -921,7 +1173,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortRe
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -944,7 +1196,7 @@ TEST_F(TransactionCoordinatorTest,
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -972,7 +1224,7 @@ TEST_F(TransactionCoordinatorTest,
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -997,7 +1249,7 @@ TEST_F(TransactionCoordinatorTest,
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -1029,7 +1281,7 @@ TEST_F(TransactionCoordinatorTest,
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
coordinator.runCommit(operationContext(), kTwoShardIdList);
@@ -1053,7 +1305,6 @@ TEST_F(TransactionCoordinatorTest,
ErrorCodes::ReadConcernMajorityNotEnabled);
}
-
class TransactionCoordinatorMetricsTest : public TransactionCoordinatorTestBase {
public:
void setUp() override {
@@ -1265,7 +1516,7 @@ public:
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
@@ -1565,7 +1816,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) {
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
const auto& stats =
@@ -1740,7 +1991,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) {
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
const auto& stats =
@@ -1785,7 +2036,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
TransactionCoordinator coordinator(
- operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -1828,7 +2079,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
TransactionCoordinator coordinator(
- operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -1890,7 +2141,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
TransactionCoordinator coordinator(
- operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -1954,7 +2205,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
TransactionCoordinator coordinator(
- operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -2022,7 +2273,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
TransactionCoordinator coordinator(
- operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -2093,7 +2344,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
TransactionCoordinator coordinator(
- operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -2179,7 +2430,7 @@ TEST_F(TransactionCoordinatorMetricsTest, DoesNotLogTransactionsUnderSlowMSThres
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
@@ -2211,7 +2462,7 @@ TEST_F(
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
@@ -2241,7 +2492,7 @@ TEST_F(TransactionCoordinatorMetricsTest, LogsTransactionsOverSlowMSThreshold) {
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
@@ -2267,8 +2518,9 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesTransactionParamete
_lsid.serialize(&lsidBob);
ASSERT_EQUALS(1,
countBSONFormatLogLinesIsSubset(BSON(
- "attr" << BSON("parameters" << BSON("lsid" << lsidBob.obj() << "txnNumber"
- << _txnNumber)))));
+ "attr" << BSON("parameters" << BSON(
+ "lsid" << lsidBob.obj() << "txnNumber"
+ << _txnNumberAndRetryCounter.getTxnNumber())))));
}
TEST_F(TransactionCoordinatorMetricsTest,
@@ -2287,7 +2539,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesTerminationCauseFor
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
@@ -2338,7 +2590,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesStepDurationsAndTot
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
@@ -2460,7 +2712,7 @@ TEST_F(TransactionCoordinatorMetricsTest, RecoveryFromFailureIndicatedInReportSt
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
@@ -2496,7 +2748,7 @@ TEST_F(TransactionCoordinatorMetricsTest, ClientInformationIncludedInReportState
TransactionCoordinator coordinator(
operationContext(),
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.h b/src/mongo/db/s/transaction_coordinator_test_fixture.h
index 6a60538a9fa..d7d12eb058a 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.h
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.h
@@ -72,6 +72,7 @@ protected:
*/
static void associateClientMetadata(Client* client, std::string appName);
+ const std::vector<ShardId> kOneShardIdList{{"s1"}};
const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}};
const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}};
const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}};
diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp
index 9e07378295d..bb6832314b7 100644
--- a/src/mongo/db/s/transaction_coordinator_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_util.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/transaction_coordinator_futures_util.h"
@@ -102,16 +103,17 @@ bool shouldRetryPersistingCoordinatorState(const StatusWith<T>& responseStatus)
} // namespace
namespace {
-repl::OpTime persistParticipantListBlocking(OperationContext* opCtx,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const std::vector<ShardId>& participantList) {
+repl::OpTime persistParticipantListBlocking(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
+ const std::vector<ShardId>& participantList) {
LOGV2_DEBUG(22463,
3,
- "{sessionId}:{txnNumber} Going to write participant list",
+ "{sessionId}:{txnNumberAndRetryCounter} Going to write participant list",
"Going to write participant list",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber);
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter);
if (MONGO_unlikely(hangBeforeWritingParticipantList.shouldFail())) {
LOGV2(22464, "Hit hangBeforeWritingParticipantList failpoint");
@@ -120,7 +122,11 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx,
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(lsid);
- sessionInfo.setTxnNumber(txnNumber);
+ sessionInfo.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber());
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ sessionInfo.setTxnRetryCounter(*txnNumberAndRetryCounter.getTxnRetryCounter());
+ }
DBDirectClient client(opCtx);
@@ -167,7 +173,7 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx,
uasserted(51025,
str::stream() << "While attempting to write participant list "
<< buildParticipantListString(participantList) << " for "
- << lsid.getId() << ':' << txnNumber
+ << lsid.getId() << ':' << txnNumberAndRetryCounter.toBSON()
<< ", found document with a different participant list: " << doc);
}
@@ -176,29 +182,35 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx,
LOGV2_DEBUG(22465,
3,
- "{sessionId}:{txnNumber} Wrote participant list",
+ "{sessionId}:{txnNumberAndRetryCounter} Wrote participant list",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber);
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter);
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
}
} // namespace
-Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const txn::ParticipantsList& participants) {
+Future<repl::OpTime> persistParticipantsList(
+ txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
+ const txn::ParticipantsList& participants) {
return txn::doWhile(
scheduler,
boost::none /* no need for a backoff */,
[](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); },
- [&scheduler, lsid, txnNumber, participants] {
- return scheduler.scheduleWork([lsid, txnNumber, participants](OperationContext* opCtx) {
- FlowControl::Bypass flowControlBypass(opCtx);
- getTransactionCoordinatorWorkerCurOpRepository()->set(
- opCtx, lsid, txnNumber, CoordinatorAction::kWritingParticipantList);
- return persistParticipantListBlocking(opCtx, lsid, txnNumber, participants);
- });
+ [&scheduler, lsid, txnNumberAndRetryCounter, participants] {
+ return scheduler.scheduleWork(
+ [lsid, txnNumberAndRetryCounter, participants](OperationContext* opCtx) {
+ FlowControl::Bypass flowControlBypass(opCtx);
+ getTransactionCoordinatorWorkerCurOpRepository()->set(
+ opCtx,
+ lsid,
+ txnNumberAndRetryCounter.getTxnNumber(),
+ CoordinatorAction::kWritingParticipantList);
+ return persistParticipantListBlocking(
+ opCtx, lsid, txnNumberAndRetryCounter, participants);
+ });
});
}
@@ -232,14 +244,20 @@ CoordinatorCommitDecision PrepareVoteConsensus::decision() const {
Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const APIParameters& apiParams,
const txn::ParticipantsList& participants) {
PrepareTransaction prepareTransaction;
prepareTransaction.setDbName(NamespaceString::kAdminDb);
- BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit"
+ BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber"
+ << txnNumberAndRetryCounter.getTxnNumber() << "autocommit"
<< false << WriteConcernOptions::kWriteConcernField
<< WriteConcernOptions::Majority));
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ bob.append(OperationSessionInfo::kTxnRetryCounterFieldName,
+ *txnNumberAndRetryCounter.getTxnRetryCounter());
+ }
apiParams.appendInfo(&bob);
auto prepareObj = prepareTransaction.toBSON(bob.obj());
@@ -249,10 +267,14 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
// vector of responses.
auto prepareScheduler = scheduler.makeChildScheduler();
- OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) {
+ OperationContextFn operationContextFn = [lsid,
+ txnNumberAndRetryCounter](OperationContext* opCtx) {
invariant(opCtx);
getTransactionCoordinatorWorkerCurOpRepository()->set(
- opCtx, lsid, txnNumber, CoordinatorAction::kSendingPrepare);
+ opCtx,
+ lsid,
+ txnNumberAndRetryCounter.getTxnNumber(),
+ CoordinatorAction::kSendingPrepare);
if (MONGO_unlikely(hangBeforeSendingPrepare.shouldFail())) {
LOGV2(22466, "Hit hangBeforeSendingPrepare failpoint");
@@ -264,7 +286,7 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
responses.emplace_back(sendPrepareToShard(service,
*prepareScheduler,
lsid,
- txnNumber,
+ txnNumberAndRetryCounter,
participant,
prepareObj,
operationContextFn));
@@ -278,8 +300,8 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
// Initial value
PrepareVoteConsensus{int(participants.size())},
// Aggregates an incoming response (next) with the existing aggregate value (result)
- [&prepareScheduler = *prepareScheduler, txnNumber](PrepareVoteConsensus& result,
- const PrepareResponse& next) {
+ [&prepareScheduler = *prepareScheduler, txnNumberAndRetryCounter](
+ PrepareVoteConsensus& result, const PrepareResponse& next) {
result.registerVote(next);
if (next.vote == PrepareVote::kAbort) {
@@ -287,7 +309,7 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
1,
"Received abort prepare vote from node",
"shardId"_attr = next.shardId,
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"error"_attr = (next.abortReason.has_value()
? next.abortReason.value().reason()
: ""));
@@ -308,15 +330,15 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
namespace {
repl::OpTime persistDecisionBlocking(OperationContext* opCtx,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const std::vector<ShardId>& participantList,
const txn::CoordinatorCommitDecision& decision) {
const bool isCommit = decision.getDecision() == txn::CommitDecision::kCommit;
LOGV2_DEBUG(22467,
3,
- "{sessionId}:{txnNumber} Going to write decision {decision}",
+ "{sessionId}:{txnNumberAndRetryCounter} Going to write decision {decision}",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"decision"_attr = (isCommit ? "commit" : "abort"));
if (MONGO_unlikely(hangBeforeWritingDecision.shouldFail())) {
@@ -326,7 +348,11 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx,
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(lsid);
- sessionInfo.setTxnNumber(txnNumber);
+ sessionInfo.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber());
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ sessionInfo.setTxnRetryCounter(*txnNumberAndRetryCounter.getTxnRetryCounter());
+ }
DBDirectClient client(opCtx);
@@ -379,7 +405,7 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx,
uasserted(51026,
str::stream() << "While attempting to write decision "
<< (isCommit ? "'commit'" : "'abort'") << " for" << lsid.getId()
- << ':' << txnNumber
+ << ':' << txnNumberAndRetryCounter.toBSON()
<< ", either failed to find document for this lsid:txnNumber or "
"document existed with a different participant list, decision "
"or commitTimestamp: "
@@ -388,10 +414,10 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx,
LOGV2_DEBUG(22469,
3,
- "{sessionId}:{txnNumber} Wrote decision {decision}",
+ "{sessionId}:{txnNumberAndRetryCounter} Wrote decision {decision}",
"Wrote decision",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"decision"_attr = (isCommit ? "commit" : "abort"));
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
@@ -400,20 +426,24 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx,
Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const txn::ParticipantsList& participants,
const txn::CoordinatorCommitDecision& decision) {
return txn::doWhile(
scheduler,
boost::none /* no need for a backoff */,
[](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); },
- [&scheduler, lsid, txnNumber, participants, decision] {
+ [&scheduler, lsid, txnNumberAndRetryCounter, participants, decision] {
return scheduler.scheduleWork(
- [lsid, txnNumber, participants, decision](OperationContext* opCtx) {
+ [lsid, txnNumberAndRetryCounter, participants, decision](OperationContext* opCtx) {
FlowControl::Bypass flowControlBypass(opCtx);
getTransactionCoordinatorWorkerCurOpRepository()->set(
- opCtx, lsid, txnNumber, CoordinatorAction::kWritingDecision);
- return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision);
+ opCtx,
+ lsid,
+ txnNumberAndRetryCounter.getTxnNumber(),
+ CoordinatorAction::kWritingDecision);
+ return persistDecisionBlocking(
+ opCtx, lsid, txnNumberAndRetryCounter, participants, decision);
});
});
}
@@ -421,23 +451,33 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler,
Future<void> sendCommit(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const APIParameters& apiParams,
const txn::ParticipantsList& participants,
Timestamp commitTimestamp) {
CommitTransaction commitTransaction;
commitTransaction.setDbName(NamespaceString::kAdminDb);
commitTransaction.setCommitTimestamp(commitTimestamp);
- BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit"
+ BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber"
+ << txnNumberAndRetryCounter.getTxnNumber() << "autocommit"
<< false << WriteConcernOptions::kWriteConcernField
<< WriteConcernOptions::Majority));
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ bob.append(OperationSessionInfo::kTxnRetryCounterFieldName,
+ *txnNumberAndRetryCounter.getTxnRetryCounter());
+ }
apiParams.appendInfo(&bob);
auto commitObj = commitTransaction.toBSON(bob.obj());
- OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) {
+ OperationContextFn operationContextFn = [lsid,
+ txnNumberAndRetryCounter](OperationContext* opCtx) {
invariant(opCtx);
getTransactionCoordinatorWorkerCurOpRepository()->set(
- opCtx, lsid, txnNumber, CoordinatorAction::kSendingCommit);
+ opCtx,
+ lsid,
+ txnNumberAndRetryCounter.getTxnNumber(),
+ CoordinatorAction::kSendingCommit);
if (MONGO_unlikely(hangBeforeSendingCommit.shouldFail())) {
LOGV2(22470, "Hit hangBeforeSendingCommit failpoint");
@@ -447,8 +487,13 @@ Future<void> sendCommit(ServiceContext* service,
std::vector<Future<void>> responses;
for (const auto& participant : participants) {
- responses.push_back(sendDecisionToShard(
- service, scheduler, lsid, txnNumber, participant, commitObj, operationContextFn));
+ responses.push_back(sendDecisionToShard(service,
+ scheduler,
+ lsid,
+ txnNumberAndRetryCounter,
+ participant,
+ commitObj,
+ operationContextFn));
}
return txn::whenAll(responses);
}
@@ -456,21 +501,28 @@ Future<void> sendCommit(ServiceContext* service,
Future<void> sendAbort(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const APIParameters& apiParams,
const txn::ParticipantsList& participants) {
AbortTransaction abortTransaction;
abortTransaction.setDbName(NamespaceString::kAdminDb);
- BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit"
+ BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber"
+ << txnNumberAndRetryCounter.getTxnNumber() << "autocommit"
<< false << WriteConcernOptions::kWriteConcernField
<< WriteConcernOptions::Majority));
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ bob.append(OperationSessionInfo::kTxnRetryCounterFieldName,
+ *txnNumberAndRetryCounter.getTxnRetryCounter());
+ }
apiParams.appendInfo(&bob);
auto abortObj = abortTransaction.toBSON(bob.obj());
- OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) {
+ OperationContextFn operationContextFn = [lsid,
+ txnNumberAndRetryCounter](OperationContext* opCtx) {
invariant(opCtx);
getTransactionCoordinatorWorkerCurOpRepository()->set(
- opCtx, lsid, txnNumber, CoordinatorAction::kSendingAbort);
+ opCtx, lsid, txnNumberAndRetryCounter.getTxnNumber(), CoordinatorAction::kSendingAbort);
if (MONGO_unlikely(hangBeforeSendingAbort.shouldFail())) {
LOGV2(22471, "Hit hangBeforeSendingAbort failpoint");
@@ -480,8 +532,13 @@ Future<void> sendAbort(ServiceContext* service,
std::vector<Future<void>> responses;
for (const auto& participant : participants) {
- responses.push_back(sendDecisionToShard(
- service, scheduler, lsid, txnNumber, participant, abortObj, operationContextFn));
+ responses.push_back(sendDecisionToShard(service,
+ scheduler,
+ lsid,
+ txnNumberAndRetryCounter,
+ participant,
+ abortObj,
+ operationContextFn));
}
return txn::whenAll(responses);
}
@@ -489,13 +546,13 @@ Future<void> sendAbort(ServiceContext* service,
namespace {
void deleteCoordinatorDocBlocking(OperationContext* opCtx,
const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
LOGV2_DEBUG(22472,
3,
- "{sessionId}:{txnNumber} Going to delete coordinator doc",
+ "{sessionId}:{txnNumberAndRetryCounter} Going to delete coordinator doc",
"Going to delete coordinator doc",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber);
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter);
if (MONGO_unlikely(hangBeforeDeletingCoordinatorDoc.shouldFail())) {
LOGV2(22473, "Hit hangBeforeDeletingCoordinatorDoc failpoint");
@@ -504,7 +561,11 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
OperationSessionInfo sessionInfo;
sessionInfo.setSessionId(lsid);
- sessionInfo.setTxnNumber(txnNumber);
+ sessionInfo.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber());
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ sessionInfo.setTxnRetryCounter(*txnNumberAndRetryCounter.getTxnRetryCounter());
+ }
DBDirectClient client(opCtx);
@@ -545,7 +606,7 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
BSON(TransactionCoordinatorDocument::kIdFieldName << sessionInfo.toBSON()));
uasserted(51027,
str::stream() << "While attempting to delete document for " << lsid.getId() << ':'
- << txnNumber
+ << txnNumberAndRetryCounter.toBSON()
<< ", either failed to find document for this lsid:txnNumber or "
"document existed without a decision: "
<< doc);
@@ -553,10 +614,10 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
LOGV2_DEBUG(22474,
3,
- "{sessionId}:{txnNumber} Deleted coordinator doc",
+ "{sessionId}:{txnNumberAndRetryCounter} Deleted coordinator doc",
"Deleted coordinator doc",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber);
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter);
hangAfterDeletingCoordinatorDoc.execute([&](const BSONObj& data) {
LOGV2(22475, "Hit hangAfterDeletingCoordinatorDoc failpoint");
@@ -571,19 +632,22 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
- return txn::doWhile(
- scheduler,
- boost::none /* no need for a backoff */,
- [](const Status& s) { return s == ErrorCodes::Interrupted; },
- [&scheduler, lsid, txnNumber] {
- return scheduler.scheduleWork([lsid, txnNumber](OperationContext* opCtx) {
- FlowControl::Bypass flowControlBypass(opCtx);
- getTransactionCoordinatorWorkerCurOpRepository()->set(
- opCtx, lsid, txnNumber, CoordinatorAction::kDeletingCoordinatorDoc);
- deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber);
- });
- });
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
+ return txn::doWhile(scheduler,
+ boost::none /* no need for a backoff */,
+ [](const Status& s) { return s == ErrorCodes::Interrupted; },
+ [&scheduler, lsid, txnNumberAndRetryCounter] {
+ return scheduler.scheduleWork([lsid, txnNumberAndRetryCounter](
+ OperationContext* opCtx) {
+ FlowControl::Bypass flowControlBypass(opCtx);
+ getTransactionCoordinatorWorkerCurOpRepository()->set(
+ opCtx,
+ lsid,
+ txnNumberAndRetryCounter.getTxnNumber(),
+ CoordinatorAction::kDeletingCoordinatorDoc);
+ deleteCoordinatorDocBlocking(opCtx, lsid, txnNumberAndRetryCounter);
+ });
+ });
}
std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx) {
@@ -607,7 +671,7 @@ std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationCont
Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const ShardId& shardId,
const BSONObj& commandObj,
OperationContextFn operationContextFn) {
@@ -624,18 +688,18 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
},
[&scheduler,
lsid,
- txnNumber,
+ txnNumberAndRetryCounter,
shardId,
isLocalShard,
commandObj = commandObj.getOwned(),
operationContextFn] {
LOGV2_DEBUG(22476,
3,
- "{sessionId}:{txnNumber} Coordinator going to send command "
+ "{sessionId}:{txnNumberAndRetryCounter} Coordinator going to send command "
"{command} to {localOrRemote} shard {shardId}",
"Coordinator going to send command to shard",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"command"_attr = commandObj,
"localOrRemote"_attr = (isLocalShard ? "local" : "remote"),
"shardId"_attr = shardId);
@@ -643,7 +707,7 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
return scheduler
.scheduleRemoteCommand(
shardId, kPrimaryReadPreference, commandObj, operationContextFn)
- .then([lsid, txnNumber, shardId, commandObj = commandObj.getOwned()](
+ .then([lsid, txnNumberAndRetryCounter, shardId, commandObj = commandObj.getOwned()](
ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
@@ -667,10 +731,10 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
<< ", which is not an expected behavior. "
"Interpreting the response as vote to abort");
LOGV2(22477,
- "{sessionId}:{txnNumber} {error}",
+ "{sessionId}:{txnNumberAndRetryCounter} {error}",
"Coordinator received error from transaction participant",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"error"_attr = redact(abortStatus));
return PrepareResponse{
@@ -680,12 +744,12 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
LOGV2_DEBUG(
22478,
3,
- "{sessionId}:{txnNumber} Coordinator shard received a "
+ "{sessionId}:{txnNumberAndRetryCounter} Coordinator shard received a "
"vote to commit from shard {shardId} with prepareTimestamp: "
"{prepareTimestamp}",
"Coordinator shard received a vote to commit from participant shard",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"shardId"_attr = shardId,
"prepareTimestampField"_attr = prepareTimestampField.timestamp());
@@ -697,11 +761,11 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
LOGV2_DEBUG(22479,
3,
- "{sessionId}:{txnNumber} Coordinator shard received "
+ "{sessionId}:{txnNumberAndRetryCounter} Coordinator shard received "
"{error} from shard {shardId} for {command}",
"Coordinator shard received response from shard",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"error"_attr = status,
"shardId"_attr = shardId,
"command"_attr = commandObj);
@@ -732,14 +796,15 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
});
return std::move(f).onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>(
- [lsid, txnNumber, shardId](const Status& status) {
- LOGV2_DEBUG(22480,
- 3,
- "{sessionId}:{txnNumber} Prepare stopped retrying due to retrying "
- "being cancelled",
- "Prepare stopped retrying due to retrying being cancelled",
- "sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber);
+ [lsid, txnNumberAndRetryCounter, shardId](const Status& status) {
+ LOGV2_DEBUG(
+ 22480,
+ 3,
+ "{sessionId}:{txnNumberAndRetryCounter} Prepare stopped retrying due to retrying "
+ "being cancelled",
+ "Prepare stopped retrying due to retrying being cancelled",
+ "sessionId"_attr = lsid.getId(),
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter);
return PrepareResponse{shardId,
boost::none,
boost::none,
@@ -750,7 +815,7 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
Future<void> sendDecisionToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const ShardId& shardId,
const BSONObj& commandObj,
OperationContextFn operationContextFn) {
@@ -766,18 +831,18 @@ Future<void> sendDecisionToShard(ServiceContext* service,
},
[&scheduler,
lsid,
- txnNumber,
+ txnNumberAndRetryCounter,
shardId,
isLocalShard,
operationContextFn,
commandObj = commandObj.getOwned()] {
LOGV2_DEBUG(22481,
3,
- "{sessionId}:{txnNumber} Coordinator going to send command "
+ "{sessionId}:{txnNumberAndRetryCounter} Coordinator going to send command "
"{command} to {localOrRemote} shard {shardId}",
"Coordinator going to send command to shard",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"command"_attr = commandObj,
"localOrRemote"_attr = (isLocalShard ? "local" : "remote"),
"shardId"_attr = shardId);
@@ -785,7 +850,7 @@ Future<void> sendDecisionToShard(ServiceContext* service,
return scheduler
.scheduleRemoteCommand(
shardId, kPrimaryReadPreference, commandObj, operationContextFn)
- .then([lsid, txnNumber, shardId, commandObj = commandObj.getOwned()](
+ .then([lsid, txnNumberAndRetryCounter, shardId, commandObj = commandObj.getOwned()](
ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
@@ -796,16 +861,17 @@ Future<void> sendDecisionToShard(ServiceContext* service,
status = wcStatus;
}
- LOGV2_DEBUG(22482,
- 3,
- "{sessionId}:{txnNumber} Coordinator shard received "
- "{status} in response to {command} from shard {shardId}",
- "Coordinator shard received response from shard",
- "sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
- "status"_attr = status,
- "command"_attr = commandObj,
- "shardId"_attr = shardId);
+ LOGV2_DEBUG(
+ 22482,
+ 3,
+ "{sessionId}:{txnNumberAndRetryCounter} Coordinator shard received "
+ "{status} in response to {command} from shard {shardId}",
+ "Coordinator shard received response from shard",
+ "sessionId"_attr = lsid.getId(),
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
+ "status"_attr = status,
+ "command"_attr = commandObj,
+ "shardId"_attr = shardId);
if (ErrorCodes::isVoteAbortError(status.code())) {
// Interpret voteAbort errors as an ack.
@@ -824,8 +890,14 @@ Future<void> sendDecisionToShard(ServiceContext* service,
});
}
-std::string txnIdToString(const LogicalSessionId& lsid, TxnNumber txnNumber) {
- return str::stream() << lsid.getId() << ':' << txnNumber;
+std::string txnIdToString(const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
+ str::stream ss;
+ ss << lsid.getId() << ':' << txnNumberAndRetryCounter.getTxnNumber();
+ if (auto retryCounter = txnNumberAndRetryCounter.getTxnRetryCounter()) {
+ ss << ':' << *retryCounter;
+ }
+ return ss;
}
} // namespace txn
diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h
index 36a2b2fcf3d..ba125b287cb 100644
--- a/src/mongo/db/s/transaction_coordinator_util.h
+++ b/src/mongo/db/s/transaction_coordinator_util.h
@@ -42,7 +42,7 @@ namespace txn {
* Upserts a document of the form:
*
* {
- * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>, txnRetryCounter: <txnRetryCounter>}
* participants: ["shard0000", "shard0001"]
* }
*
@@ -51,12 +51,13 @@ namespace txn {
* Throws if the upsert fails or waiting for writeConcern fails.
*
* If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it means a
- * document for the (lsid, txnNumber) exists with a different participant list.
+ * document for the (lsid, txnNumber, txnRetryCounter) exists with a different participant list.
*/
-Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- const txn::ParticipantsList& participants);
+Future<repl::OpTime> persistParticipantsList(
+ txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
+ const txn::ParticipantsList& participants);
struct PrepareResponse;
class PrepareVoteConsensus {
@@ -97,7 +98,7 @@ private:
Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const APIParameters& apiParams,
const txn::ParticipantsList& participants);
@@ -105,10 +106,10 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
* If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators
* for
*
- * (lsid, txnNumber) to be:
+ * (lsid, txnNumber, txnRetryCounter) to be:
*
* {
- * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>, txnRetryCounter: <txnRetryCounter>}
* participants: ["shard0000", "shard0001"]
* decision: "abort"
* }
@@ -116,7 +117,7 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
* else updates the document to be:
*
* {
- * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>, txnRetryCounter: <txnRetryCounter>}
* participants: ["shard0000", "shard0001"]
* decision: "commit"
* commitTimestamp: Timestamp(xxxxxxxx, x),
@@ -127,12 +128,12 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
* Throws if the update fails or waiting for writeConcern fails.
*
* If the update succeeds but did not update any document, throws an anonymous error, because it
- * means either no document for (lsid, txnNumber) exists, or a document exists but has a different
- * participant list, different decision, or different commit Timestamp.
+ * means either no document for (lsid, txnNumber, txnRetryCounter) exists, or a document exists but
+ * has a different participant list, different decision, or different commit Timestamp.
*/
Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const txn::ParticipantsList& participants,
const txn::CoordinatorCommitDecision& decision);
@@ -143,7 +144,7 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler,
Future<void> sendCommit(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const APIParameters& apiParams,
const txn::ParticipantsList& participants,
Timestamp commitTimestamp);
@@ -155,7 +156,7 @@ Future<void> sendCommit(ServiceContext* service,
Future<void> sendAbort(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const APIParameters& apiParams,
const txn::ParticipantsList& participants);
@@ -167,12 +168,12 @@ Future<void> sendAbort(ServiceContext* service,
* Throws if the update fails.
*
* If the delete succeeds but did not delete any document, throws an anonymous error, because it
- * means either no document for (lsid, txnNumber) exists, or a document exists but without a
- * decision.
+ * means either no document for (lsid, txnNumber, txnRetryCounter) exists, or a document exists but
+ * without a decision.
*/
Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber);
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter);
/**
* Reads and returns all documents in config.transaction_coordinators.
@@ -208,7 +209,7 @@ struct PrepareResponse {
Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const ShardId& shardId,
const BSONObj& prepareCommandObj,
OperationContextFn operationContextFn =
@@ -228,7 +229,7 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
Future<void> sendDecisionToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
const ShardId& shardId,
const BSONObj& commandObj,
OperationContextFn operationContextFn = [](OperationContext*) {});
@@ -237,7 +238,8 @@ Future<void> sendDecisionToShard(ServiceContext* service,
* Returns a string representation of the transaction id represented by the given session id and
* transaction number.
*/
-std::string txnIdToString(const LogicalSessionId& lsid, TxnNumber txnNumber);
+std::string txnIdToString(const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter);
} // namespace txn
} // namespace mongo