summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-02-12 16:56:00 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-02-22 07:19:19 -0500
commit63220b7df242b26f83c7083394b463c4e4b3a5d8 (patch)
tree203129f84d8afad520169a95fd03138af7b4107b /src/mongo/db
parentcdc5a7e8fe27e66b4a5f862cf9624130fd5e4eeb (diff)
downloadmongo-63220b7df242b26f83c7083394b463c4e4b3a5d8.tar.gz
SERVER-38522/SERVER-38715 Make all TransactionCoordinator tasks interruptible
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp166
-rw-r--r--src/mongo/db/s/transaction_coordinator.h98
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.cpp113
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.h63
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog_test.cpp8
-rw-r--r--src/mongo/db/s/transaction_coordinator_driver.cpp68
-rw-r--r--src/mongo/db/s/transaction_coordinator_driver.h40
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.cpp20
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.h15
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util_test.cpp21
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp64
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.h3
-rw-r--r--src/mongo/db/s/transaction_coordinator_service_test.cpp23
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp382
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.cpp2
15 files changed, 650 insertions, 436 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index e8a0b4432c4..5a1d0dfd41c 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -71,13 +71,29 @@ CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(
TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
const LogicalSessionId& lsid,
- TxnNumber txnNumber)
+ TxnNumber txnNumber,
+ std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
+ boost::optional<Date_t> coordinateCommitDeadline)
: _serviceContext(serviceContext),
- _driver(serviceContext),
_lsid(lsid),
- _txnNumber(txnNumber) {}
+ _txnNumber(txnNumber),
+ _scheduler(std::move(scheduler)),
+ _driver(serviceContext) {
+ if (coordinateCommitDeadline) {
+ _deadlineScheduler = _scheduler->makeChildScheduler();
+ _deadlineScheduler
+ ->scheduleWorkAt(*coordinateCommitDeadline,
+ [this](OperationContext* opCtx) { cancelIfCommitNotYetStarted(); })
+ .getAsync([](const Status&) {});
+ }
+}
TransactionCoordinator::~TransactionCoordinator() {
+ _cancelTimeoutWaitForCommitTask();
+
+ if (_deadlineScheduler)
+ _deadlineScheduler.reset();
+
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_state == TransactionCoordinator::CoordinatorState::kDone);
@@ -86,61 +102,33 @@ TransactionCoordinator::~TransactionCoordinator() {
invariant(_completionPromises.empty());
}
-/**
- * Implements the high-level logic for two-phase commit.
- */
-SharedSemiFuture<txn::CommitDecision> TransactionCoordinator::runCommit(
- const std::vector<ShardId>& participantShards) {
+void TransactionCoordinator::runCommit(std::vector<ShardId> participantShards) {
{
- // If another thread has already begun the commit process, return early.
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_state != CoordinatorState::kInit) {
- return _finalDecisionPromise.getFuture();
- }
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ if (_state != CoordinatorState::kInit)
+ return;
_state = CoordinatorState::kPreparing;
}
+ _cancelTimeoutWaitForCommitTask();
+
_driver.persistParticipantList(_lsid, _txnNumber, participantShards)
.then([this, participantShards]() { return _runPhaseOne(participantShards); })
.then([this, participantShards](CoordinatorCommitDecision decision) {
return _runPhaseTwo(participantShards, decision);
})
.getAsync([this](Status s) { _handleCompletionStatus(s); });
-
- return _finalDecisionPromise.getFuture();
-}
-
-Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne(
- const std::vector<ShardId>& participantShards) {
- return _driver.sendPrepare(participantShards, _lsid, _txnNumber)
- .then([this, participantShards](txn::PrepareVoteConsensus result) {
- invariant(_state == CoordinatorState::kPreparing);
-
- auto decision =
- makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber);
-
- return _driver
- .persistDecision(_lsid, _txnNumber, participantShards, decision.commitTimestamp)
- .then([decision] { return decision; });
- });
-}
-
-Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& participantShards,
- const CoordinatorCommitDecision& decision) {
- return _sendDecisionToParticipants(participantShards, decision)
- .then([this] { return _driver.deleteCoordinatorDoc(_lsid, _txnNumber); })
- .then([this] {
- LOG(3) << "Two-phase commit completed for session " << _lsid.toBSON()
- << ", transaction number " << _txnNumber;
-
- stdx::unique_lock<stdx::mutex> ul(_mutex);
- _transitionToDone(std::move(ul));
- });
}
void TransactionCoordinator::continueCommit(const TransactionCoordinatorDocument& doc) {
- _state = CoordinatorState::kPreparing;
- const auto participantShards = doc.getParticipants();
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(_state == CoordinatorState::kInit);
+ invariant(!_deadlineScheduler);
+ _state = CoordinatorState::kPreparing;
+ }
+
+ const auto& participantShards = doc.getParticipants();
// Helper lambda to get the decision either from the document passed in or from the participants
// (by performing 'phase one' of two-phase commit).
@@ -170,27 +158,67 @@ Future<void> TransactionCoordinator::onCompletion() {
auto completionPromiseFuture = makePromiseFuture<void>();
_completionPromises.push_back(std::move(completionPromiseFuture.promise));
- return std::move(completionPromiseFuture.future);
+
+ return std::move(completionPromiseFuture.future)
+ .onError<ErrorCodes::TransactionCoordinatorSteppingDown>(
+ [](const Status& s) { uasserted(ErrorCodes::InterruptedDueToStepDown, s.reason()); });
}
SharedSemiFuture<txn::CommitDecision> TransactionCoordinator::getDecision() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- return _finalDecisionPromise.getFuture();
+ return _decisionPromise.getFuture();
}
void TransactionCoordinator::cancelIfCommitNotYetStarted() {
+ _cancelTimeoutWaitForCommitTask();
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_state == CoordinatorState::kInit) {
- invariant(!_finalDecisionPromise.getFuture().isReady());
- _finalDecisionPromise.emplaceValue(txn::CommitDecision::kCanceled);
+ invariant(!_decisionPromise.getFuture().isReady());
+ _decisionPromise.emplaceValue(txn::CommitDecision::kCanceled);
_transitionToDone(std::move(lk));
}
}
+void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() {
+ if (_deadlineScheduler) {
+ _deadlineScheduler->shutdown(
+ {ErrorCodes::CallbackCanceled, "Interrupting the commit received deadline task"});
+ }
+}
+
+Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne(
+ const std::vector<ShardId>& participantShards) {
+ return _driver.sendPrepare(participantShards, _lsid, _txnNumber)
+ .then([this, participantShards](txn::PrepareVoteConsensus result) {
+ invariant(_state == CoordinatorState::kPreparing);
+
+ auto decision =
+ makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber);
+
+ return _driver
+ .persistDecision(_lsid, _txnNumber, participantShards, decision.commitTimestamp)
+ .then([decision] { return decision; });
+ });
+}
+
+Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& participantShards,
+ const CoordinatorCommitDecision& decision) {
+ return _sendDecisionToParticipants(participantShards, decision)
+ .then([this] { return _driver.deleteCoordinatorDoc(_lsid, _txnNumber); })
+ .then([this] {
+ LOG(3) << "Two-phase commit completed for session " << _lsid.toBSON()
+ << ", transaction number " << _txnNumber;
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ _transitionToDone(std::move(ul));
+ });
+}
+
Future<void> TransactionCoordinator::_sendDecisionToParticipants(
const std::vector<ShardId>& participantShards, CoordinatorCommitDecision decision) {
invariant(_state == CoordinatorState::kPreparing);
- _finalDecisionPromise.emplaceValue(decision.decision);
+ _decisionPromise.emplaceValue(decision.decision);
// Send the decision to all participants.
switch (decision.decision) {
@@ -214,14 +242,17 @@ void TransactionCoordinator::_handleCompletionStatus(Status s) {
}
stdx::unique_lock<stdx::mutex> lk(_mutex);
+
LOG(3) << "Two-phase commit failed with error in state " << _state << " for transaction "
<< _txnNumber << " on session " << _lsid.toBSON() << causedBy(s);
- // If an error occurred prior to making a decision, set an error on the decision
- // promise to propagate it to callers of runCommit.
- if (!_finalDecisionPromise.getFuture().isReady()) {
+ // If an error occurred prior to making a decision, set an error on the decision promise to
+ // propagate it to callers of runCommit
+ if (!_decisionPromise.getFuture().isReady()) {
invariant(_state == CoordinatorState::kPreparing);
- _finalDecisionPromise.setError(s);
+ _decisionPromise.setError(s == ErrorCodes::TransactionCoordinatorReachedAbortDecision
+ ? Status{ErrorCodes::InterruptedDueToStepDown, s.reason()}
+ : s);
}
_transitionToDone(std::move(lk));
@@ -292,4 +323,31 @@ BSONObj CoordinatorCommitDecision::toBSON() const {
return builder.obj();
}
+logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
+ const TransactionCoordinator::CoordinatorState& state) {
+ using State = TransactionCoordinator::CoordinatorState;
+ // clang-format off
+ switch (state) {
+ case State::kInit: stream.stream() << "kInit"; break;
+ case State::kPreparing: stream.stream() << "kPreparing"; break;
+ case State::kAborting: stream.stream() << "kAborting"; break;
+ case State::kCommitting: stream.stream() << "kCommitting"; break;
+ case State::kDone: stream.stream() << "kDone"; break;
+ };
+ // clang-format on
+ return stream;
+}
+
+logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
+ const txn::CommitDecision& decision) {
+ // clang-format off
+ switch (decision) {
+ case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break;
+ case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break;
+ case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break;
+ };
+ // clang-format on
+ return stream;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index a4981edffad..0921c9c4090 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -43,9 +43,20 @@ class TransactionCoordinator {
MONGO_DISALLOW_COPYING(TransactionCoordinator);
public:
+ /**
+ * Instantiates a new TransactioncCoordinator for the specified lsid + txnNumber pair and gives
+ * it a 'scheduler' to use for any asynchronous tasks it spawns.
+ *
+ * If the 'coordinateCommitDeadline' parameter is specified, a timed task will be scheduled to
+ * cause the coordinator to be put in a cancelled state, if runCommit is not eventually
+ * received.
+ */
TransactionCoordinator(ServiceContext* serviceContext,
const LogicalSessionId& lsid,
- TxnNumber txnNumber);
+ TxnNumber txnNumber,
+ std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
+ boost::optional<Date_t> coordinateCommitDeadline);
+
~TransactionCoordinator();
/**
@@ -94,14 +105,11 @@ public:
/**
* The first time this is called, it asynchronously begins the two-phase commit process for the
- * transaction that this coordinator is responsible for, and returns a future that will be
- * resolved when a commit or abort decision has been made and persisted.
+ * transaction that this coordinator is responsible for.
*
- * Subsequent calls will not re-run the commit process, but instead return a future that will be
- * resolved when the original commit process that was kicked off comes to a decision. If the
- * original commit process has completed, returns a ready future containing the final decision.
+ * Subsequent calls will not re-run the commit process.
*/
- SharedSemiFuture<txn::CommitDecision> runCommit(const std::vector<ShardId>& participantShards);
+ void runCommit(std::vector<ShardId> participantShards);
/**
* To be used to continue coordinating a transaction on step up.
@@ -109,6 +117,14 @@ public:
void continueCommit(const TransactionCoordinatorDocument& doc);
/**
+ * Gets a Future that will contain the decision that the coordinator reaches. Note that this
+ * will never be signaled unless runCommit has been called.
+ *
+ * TODO (SERVER-37364): Remove this when it is no longer needed by the coordinator service.
+ */
+ SharedSemiFuture<txn::CommitDecision> getDecision();
+
+ /**
* Returns a future that will be signaled when the transaction has completely finished
* committing or aborting (i.e. when commit/abort acknowledgements have been received from all
* participants, or the coordinator commit process is aborted locally for some reason).
@@ -118,24 +134,23 @@ public:
Future<void> onCompletion();
/**
- * Gets a Future that will contain the decision that the coordinator reaches. Note that this
- * will never be signaled unless runCommit has been called.
- *
- * TODO (SERVER-37364): Remove this when it is no longer needed by the coordinator service.
- */
- SharedSemiFuture<txn::CommitDecision> getDecision();
-
- /**
* If runCommit has not yet been called, this will transition this coordinator object to
* the 'done' state, effectively making it impossible for two-phase commit to be run for this
* coordinator.
*
- * Called when a transaction with a higher transaction number is received on this session.
+ * Called when a transaction with a higher transaction number is received on this session or
+ * when the transaction coordinator service is shutting down.
*/
void cancelIfCommitNotYetStarted();
private:
/**
+ * Called when the timeout task scheduled by the constructor is no longer necessary (i.e.
+ * coordinateCommit was received or the coordinator was cancelled intentionally).
+ */
+ void _cancelTimeoutWaitForCommitTask();
+
+ /**
* Expects the participant list to already be majority-committed.
*
* 1. Sends prepare and collect the votes (i.e., responses), retrying requests as needed.
@@ -181,14 +196,23 @@ private:
// Shortcut to the service context under which this coordinator runs
ServiceContext* const _serviceContext;
- // Context object used to perform and track the state of asynchronous operations on behalf of
- // this coordinator.
- TransactionCoordinatorDriver _driver;
-
// The lsid + transaction number that this coordinator is coordinating
const LogicalSessionId _lsid;
const TxnNumber _txnNumber;
+ // Scheduler to use for all asynchronous activity which needs to be performed by this
+ // coordinator
+ std::unique_ptr<txn::AsyncWorkScheduler> _scheduler;
+
+ // If not nullptr, references the scheduler containing a task which willl trigger
+ // 'cancelIfCommitNotYetStarted' if runCommit has not been invoked. If nullptr, then this
+ // coordinator was not created with an expiration task.
+ std::unique_ptr<txn::AsyncWorkScheduler> _deadlineScheduler;
+
+ // Context object used to perform and track the state of asynchronous operations on behalf of
+ // this coordinator.
+ TransactionCoordinatorDriver _driver;
+
// Protects the state below
mutable stdx::mutex _mutex;
@@ -199,7 +223,7 @@ private:
// abort). This is only known once all responses to prepare have been received from all
// participants, and the collective decision has been majority persisted to
// config.transactionCommitDecisions.
- SharedPromise<txn::CommitDecision> _finalDecisionPromise;
+ SharedPromise<txn::CommitDecision> _decisionPromise;
// A list of all promises corresponding to futures that were returned to callers of
// onCompletion.
@@ -208,30 +232,10 @@ private:
std::vector<Promise<void>> _completionPromises;
};
-inline logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
- const TransactionCoordinator::CoordinatorState& state) {
- using State = TransactionCoordinator::CoordinatorState;
- // clang-format off
- switch (state) {
- case State::kInit: stream.stream() << "kInit"; break;
- case State::kPreparing: stream.stream() << "kPreparing"; break;
- case State::kAborting: stream.stream() << "kAborting"; break;
- case State::kCommitting: stream.stream() << "kCommitting"; break;
- case State::kDone: stream.stream() << "kDone"; break;
- };
- // clang-format on
- return stream;
-}
-
-inline logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
- const txn::CommitDecision& decision) {
- // clang-format off
- switch (decision) {
- case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break;
- case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break;
- case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break;
- };
- // clang-format on
- return stream;
-}
+logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
+ const TransactionCoordinator::CoordinatorState& state);
+
+logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
+ const txn::CommitDecision& decision);
+
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp
index dc5dd0fb0e4..3fa862ddde3 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp
@@ -48,15 +48,49 @@ TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() {
join();
}
+void TransactionCoordinatorCatalog::exitStepUp(Status status) {
+ if (status.isOK()) {
+ LOG(0) << "Incoming coordinateCommit requests are now enabled";
+ } else {
+ warning() << "Coordinator recovery failed and coordinateCommit requests will not be allowed"
+ << causedBy(status);
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_stepUpCompletionStatus);
+ _stepUpCompletionStatus = std::move(status);
+ _stepUpCompleteCV.notify_all();
+}
+
+void TransactionCoordinatorCatalog::onStepDown() {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+
+ std::vector<std::shared_ptr<TransactionCoordinator>> coordinatorsToCancel;
+ for (auto && [ sessionId, coordinatorsForSession ] : _coordinatorsBySession) {
+ for (auto && [ txnNumber, coordinator ] : coordinatorsForSession) {
+ coordinatorsToCancel.emplace_back(coordinator);
+ }
+ }
+
+ ul.unlock();
+
+ for (auto&& coordinator : coordinatorsToCancel) {
+ coordinator->cancelIfCommitNotYetStarted();
+ }
+
+ ul.lock();
+ _cleanupCompletedCoordinators(ul);
+}
+
void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
- LogicalSessionId lsid,
+ const LogicalSessionId& lsid,
TxnNumber txnNumber,
std::shared_ptr<TransactionCoordinator> coordinator,
bool forStepUp) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); });
if (!forStepUp) {
- _waitForStepUpToComplete(lk, opCtx);
+ _waitForStepUpToComplete(ul, opCtx);
}
auto& coordinatorsBySession = _coordinatorsBySession[lsid];
@@ -70,18 +104,19 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
// Schedule callback to remove coordinator from catalog when it either commits or aborts.
coordinator->onCompletion().getAsync(
- [this, lsid, txnNumber](Status) { remove(lsid, txnNumber); });
+ [this, lsid, txnNumber](Status) { _remove(lsid, txnNumber); });
LOG(3) << "Inserting coordinator for transaction " << txnNumber << " on session "
<< lsid.toBSON() << " into in-memory catalog";
+
coordinatorsBySession[txnNumber] = std::move(coordinator);
}
-std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _waitForStepUpToComplete(lk, opCtx);
+std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(
+ OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber) {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); });
+ _waitForStepUpToComplete(ul, opCtx);
std::shared_ptr<TransactionCoordinator> coordinatorToReturn;
@@ -112,9 +147,11 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(Opera
}
boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>>
-TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, LogicalSessionId lsid) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _waitForStepUpToComplete(lk, opCtx);
+TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx,
+ const LogicalSessionId& lsid) {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); });
+ _waitForStepUpToComplete(ul, opCtx);
const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid);
@@ -132,7 +169,7 @@ TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, Logic
return std::make_pair(lastCoordinatorOnSession->first, lastCoordinatorOnSession->second);
}
-void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnNumber) {
+void TransactionCoordinatorCatalog::_remove(const LogicalSessionId& lsid, TxnNumber txnNumber) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
LOG(3) << "Removing coordinator for transaction " << txnNumber << " on session "
@@ -163,6 +200,11 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN
}
}
+ // Since the '_remove' method executes on the AWS of the coordinator which is being
+ // removed, we cannot destroy it inline. Because of this, put it on a cleanup list so
+ // that subsequent catalog operations will perform the cleanup.
+ _coordinatorsToCleanup.emplace_back(coordinatorForTxnIter->second);
+
coordinatorsForSession.erase(coordinatorForTxnIter);
if (coordinatorsForSession.empty()) {
_coordinatorsBySession.erase(coordinatorsForSessionIter);
@@ -172,28 +214,14 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN
if (_coordinatorsBySession.empty()) {
LOG(3) << "Signaling last active coordinator removed";
- _noActiveCoordinatorsCv.notify_all();
- }
-}
-
-void TransactionCoordinatorCatalog::exitStepUp(Status status) {
- if (status.isOK()) {
- LOG(0) << "Incoming coordinateCommit requests are now enabled";
- } else {
- warning() << "Coordinator recovery failed and coordinateCommit requests will not be allowed"
- << causedBy(status);
+ _noActiveCoordinatorsCV.notify_all();
}
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(!_stepUpCompletionStatus);
- _stepUpCompletionStatus = std::move(status);
- _stepUpCompleteCv.notify_all();
}
void TransactionCoordinatorCatalog::join() {
stdx::unique_lock<stdx::mutex> ul(_mutex);
- while (!_noActiveCoordinatorsCv.wait_for(
+ while (!_noActiveCoordinatorsCV.wait_for(
ul, stdx::chrono::seconds{5}, [this] { return _coordinatorsBySession.empty(); })) {
LOG(0) << "After 5 seconds of wait there are still " << _coordinatorsBySession.size()
<< " sessions left with active coordinators which have not yet completed";
@@ -210,23 +238,28 @@ void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<s
OperationContext* opCtx) {
invariant(lk.owns_lock());
opCtx->waitForConditionOrInterrupt(
- _stepUpCompleteCv, lk, [this]() { return bool(_stepUpCompletionStatus); });
+ _stepUpCompleteCV, lk, [this]() { return bool(_stepUpCompletionStatus); });
uassertStatusOK(*_stepUpCompletionStatus);
}
+void TransactionCoordinatorCatalog::_cleanupCompletedCoordinators(
+ stdx::unique_lock<stdx::mutex>& ul) {
+ invariant(ul.owns_lock());
+ auto coordinatorsToCleanup = std::move(_coordinatorsToCleanup);
+
+ // Ensure the destructors run outside of the lock in order to minimize the time this methods
+ // spends in a critical section
+ ul.unlock();
+}
+
std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const {
StringBuilder ss;
ss << "[";
- for (auto coordinatorsForSession = _coordinatorsBySession.begin();
- coordinatorsForSession != _coordinatorsBySession.end();
- ++coordinatorsForSession) {
- ss << "\n";
- ss << coordinatorsForSession->first.toBSON() << ": ";
- for (auto coordinatorForTxnNumber = coordinatorsForSession->second.begin();
- coordinatorForTxnNumber != coordinatorsForSession->second.end();
- ++coordinatorForTxnNumber) {
- ss << coordinatorForTxnNumber->first << " ";
+ for (const auto& coordinatorsForSession : _coordinatorsBySession) {
+ ss << "\n" << coordinatorsForSession.first.toBSON() << ": ";
+ for (const auto& coordinator : coordinatorsForSession.second) {
+ ss << coordinator.first << ",";
}
}
ss << "]";
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.h b/src/mongo/db/s/transaction_coordinator_catalog.h
index 99ca47b246a..67f6daf77db 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.h
+++ b/src/mongo/db/s/transaction_coordinator_catalog.h
@@ -41,9 +41,7 @@ namespace mongo {
/**
* A container for TransactionCoordinator objects, indexed by logical session id and transaction
- * number. It allows holding several coordinator objects per session. It also knows how to recreate
- * itself from the config.txnCommitDecisions collection, which will be done on transition to
- * primary (whether from startup or ordinary step up).
+ * number. It allows holding several coordinator objects per session.
*/
class TransactionCoordinatorCatalog {
MONGO_DISALLOW_COPYING(TransactionCoordinatorCatalog);
@@ -53,6 +51,16 @@ public:
~TransactionCoordinatorCatalog();
/**
+ * Marks that recovery of the catalog has completed and that operations can be run on it.
+ */
+ void exitStepUp(Status status);
+
+ /**
+ * Cancels any outstanding idle transaction coordinators so that they will get unregistered.
+ */
+ void onStepDown();
+
+ /**
* Inserts a coordinator into the catalog.
*
* Note: Inserting a duplicate coordinator for the given session id and transaction number
@@ -60,7 +68,7 @@ public:
* does not take place.
*/
void insert(OperationContext* opCtx,
- LogicalSessionId lsid,
+ const LogicalSessionId& lsid,
TxnNumber txnNumber,
std::shared_ptr<TransactionCoordinator> coordinator,
bool forStepUp = false);
@@ -70,7 +78,7 @@ public:
* does not exist, return nullptr.
*/
std::shared_ptr<TransactionCoordinator> get(OperationContext* opCtx,
- LogicalSessionId lsid,
+ const LogicalSessionId& lsid,
TxnNumber txnNumber);
/**
@@ -78,21 +86,7 @@ public:
* exists. If it does not exist, return boost::none.
*/
boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>>
- getLatestOnSession(OperationContext* opCtx, LogicalSessionId lsid);
-
- /**
- * Removes the coordinator with the given session id and transaction number from the catalog, if
- * one exists, and if this the last coordinator in the catalog, signals that there are no active
- * coordinators.
- *
- * Note: The coordinator must be in a state suitable for removal (i.e. committed or aborted).
- */
- void remove(LogicalSessionId lsid, TxnNumber txnNumber);
-
- /**
- * Marks no stepup in progress and signals that no stepup is in progress.
- */
- void exitStepUp(Status status);
+ getLatestOnSession(OperationContext* opCtx, const LogicalSessionId& lsid);
/**
* Blocking method, which waits for all coordinators registered on the catalog to complete
@@ -119,6 +113,22 @@ private:
void _waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk, OperationContext* opCtx);
/**
+ * Removes the coordinator with the given session id and transaction number from the catalog, if
+ * one exists, and if this the last coordinator in the catalog, signals that there are no active
+ * coordinators.
+ *
+ * Note: The coordinator must be in a state suitable for removal (i.e. committed or aborted).
+ */
+ void _remove(const LogicalSessionId& lsid, TxnNumber txnNumber);
+
+ /**
+ * Goes through the '_coordinatorsToCleanup' list and deletes entries from it. As a side-effect
+ * also unlocks the passed-in lock, so no other synchronized members of the class should be
+ * accessed after it is called.
+ */
+ void _cleanupCompletedCoordinators(stdx::unique_lock<stdx::mutex>& ul);
+
+ /**
* Constructs a string representation of all the coordinators registered on the catalog.
*/
std::string _toString(WithLock wl) const;
@@ -135,23 +145,26 @@ private:
// commit coordination and would normally be expunged from memory.
LogicalSessionIdMap<TransactionCoordinatorMap> _coordinatorsBySessionDefunct;
+ // Set of coordinators which have completed, but have not yet been destroyed.
+ std::list<std::shared_ptr<TransactionCoordinator>> _coordinatorsToCleanup;
+
// Stores the result of the coordinator catalog's recovery attempt (the status passed to
// exitStepUp). This is what the values mean:
//
// stepUpCompletionStatus = none - brand new created object (exitStepUp has not been called
- // yet). All calls will block.
+ // yet). All calls will block.
// stepUpCompletionStatus = OK - recovery completed successfully, transactions can be
- // coordinated
+ // coordinated
// stepUpCompletionStatus = error - recovery completed with an error, transactions cannot be
- // coordinated (all methods will fail with this error)
+ // coordinated (all methods will fail with this error)
boost::optional<Status> _stepUpCompletionStatus;
// Signaled when recovery of the catalog completes (when _stepUpCompletionStatus transitions
// from none to either OK or error)
- stdx::condition_variable _stepUpCompleteCv;
+ stdx::condition_variable _stepUpCompleteCV;
// Notified when the last coordinator is removed from the catalog.
- stdx::condition_variable _noActiveCoordinatorsCv;
+ stdx::condition_variable _noActiveCoordinatorsCV;
};
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
index 9a114eb203a..b73955b5c78 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
@@ -63,8 +63,12 @@ protected:
void createCoordinatorInCatalog(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber) {
- auto newCoordinator =
- std::make_shared<TransactionCoordinator>(getServiceContext(), lsid, txnNumber);
+ auto newCoordinator = std::make_shared<TransactionCoordinator>(
+ getServiceContext(),
+ lsid,
+ txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
_coordinatorCatalog->insert(opCtx, lsid, txnNumber, newCoordinator);
_coordinatorsForTest.push_back(newCoordinator);
diff --git a/src/mongo/db/s/transaction_coordinator_driver.cpp b/src/mongo/db/s/transaction_coordinator_driver.cpp
index f44d5e7ebd7..56f74078594 100644
--- a/src/mongo/db/s/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/s/transaction_coordinator_driver.cpp
@@ -140,6 +140,11 @@ bool shouldRetryCommandAgainstShard(const ShardId& shardId,
shouldRetry = false;
}
+ if (responseStatus == ErrorCodes::TransactionCoordinatorSteppingDown ||
+ responseStatus == ErrorCodes::TransactionCoordinatorReachedAbortDecision) {
+ shouldRetry = false;
+ }
+
LOG(3) << "Coordinator " << (shouldRetry ? "retrying " : "not retrying ") << commandObj
<< " against " << (isLocalShard ? "local " : "") << "shard " << shardId
<< " because got response status " << responseStatus;
@@ -279,8 +284,10 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
// Send prepare to all participants asynchronously and collect their future responses in a
// vector of responses.
+ auto prepareScheduler = _scheduler->makeChildScheduler();
+
for (const auto& participant : participantShards) {
- responses.push_back(sendPrepareToShard(participant, prepareObj));
+ responses.push_back(sendPrepareToShard(*prepareScheduler, participant, prepareObj));
}
// Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp
@@ -291,7 +298,8 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
// Initial value
txn::PrepareVoteConsensus{boost::none, boost::none},
// Aggregates an incoming response (next) with the existing aggregate value (result)
- [this](txn::PrepareVoteConsensus& result, const PrepareResponse& next) {
+ [prepareScheduler = std::move(prepareScheduler)](txn::PrepareVoteConsensus & result,
+ const PrepareResponse& next) {
if (!next.vote) {
LOG(3) << "Transaction coordinator did not receive a response from shard "
<< next.participantShardId;
@@ -302,7 +310,9 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
case PrepareVote::kAbort:
result.decision = CommitDecision::kAbort;
result.maxPrepareTimestamp = boost::none;
- cancel();
+ prepareScheduler->shutdown(
+ {ErrorCodes::TransactionCoordinatorReachedAbortDecision,
+ "Received at least one vote abort"});
return txn::ShouldStopIteration::kYes;
case PrepareVote::kCommit:
result.decision = CommitDecision::kCommit;
@@ -469,7 +479,7 @@ Future<void> TransactionCoordinatorDriver::sendCommit(const std::vector<ShardId>
std::vector<Future<void>> responses;
for (const auto& participant : participantShards) {
- responses.push_back(sendDecisionToParticipantShard(participant, commitObj));
+ responses.push_back(sendDecisionToParticipantShard(*_scheduler, participant, commitObj));
}
return txn::whenAll(responses);
}
@@ -486,7 +496,7 @@ Future<void> TransactionCoordinatorDriver::sendAbort(const std::vector<ShardId>&
std::vector<Future<void>> responses;
for (const auto& participant : participantShards) {
- responses.push_back(sendDecisionToParticipantShard(participant, abortObj));
+ responses.push_back(sendDecisionToParticipantShard(*_scheduler, participant, abortObj));
}
return txn::whenAll(responses);
}
@@ -595,20 +605,22 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl
}
Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
- const ShardId& shardId, const BSONObj& commandObj) {
+ txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) {
const bool isLocalShard = checkIsLocalShard(_serviceContext, shardId);
- return txn::doWhile(
- *_scheduler,
+
+ auto f = txn::doWhile(
+ scheduler,
kExponentialBackoff,
- [ isLocalShard, shardId, commandObj = commandObj.getOwned() ](
+ [ shardId, isLocalShard, commandObj = commandObj.getOwned() ](
StatusWith<PrepareResponse> swPrepareResponse) {
return shouldRetryCommandAgainstShard(
shardId, isLocalShard, commandObj, swPrepareResponse.getStatus());
},
- [ this, shardId, isLocalShard, commandObj = commandObj.getOwned() ] {
+ [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned() ] {
LOG(3) << "Coordinator going to send command " << commandObj << " to "
<< (isLocalShard ? " local " : "") << " shard " << shardId;
- return _scheduler->scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
+
+ return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
.then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
@@ -641,6 +653,7 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
LOG(3) << "Coordinator shard received " << status << " from shard " << shardId
<< " for " << commandObj;
+
if (ErrorCodes::isVoteAbortError(status.code())) {
return PrepareResponse{shardId, PrepareVote::kAbort, boost::none};
}
@@ -648,7 +661,7 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
uassertStatusOK(status);
MONGO_UNREACHABLE;
})
- .onError<ErrorCodes::ShardNotFound>([shardId, isLocalShard](const Status& s) {
+ .onError<ErrorCodes::ShardNotFound>([shardId, isLocalShard](const Status&) {
invariant(!isLocalShard);
// ShardNotFound may indicate that the participant shard has been removed (it
// could also mean the participant shard was recently added and this node
@@ -659,32 +672,31 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
// must then send abort.
return Future<PrepareResponse>::makeReady(
{shardId, CommitDecision::kAbort, boost::none});
- })
- .onError([this, shardId](const Status& status) {
- if (_cancelled.loadRelaxed()) {
- LOG(3) << "Prepare stopped retrying due to retrying being cancelled";
- return PrepareResponse{shardId, boost::none, boost::none};
- }
-
- uassertStatusOK(status);
- MONGO_UNREACHABLE;
});
});
+
+ return std::move(f).onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>(
+ [shardId](const Status&) {
+ LOG(3) << "Prepare stopped retrying due to retrying being cancelled";
+ return PrepareResponse{shardId, boost::none, boost::none};
+ });
}
Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard(
- const ShardId& shardId, const BSONObj& commandObj) {
+ txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) {
const bool isLocalShard = checkIsLocalShard(_serviceContext, shardId);
+
return txn::doWhile(
- *_scheduler,
+ scheduler,
kExponentialBackoff,
- [ isLocalShard, shardId, commandObj = commandObj.getOwned() ](const Status& s) {
+ [ shardId, isLocalShard, commandObj = commandObj.getOwned() ](const Status& s) {
return shouldRetryCommandAgainstShard(shardId, isLocalShard, commandObj, s);
},
- [ this, isLocalShard, shardId, commandObj = commandObj.getOwned() ] {
+ [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned() ] {
LOG(3) << "Coordinator going to send command " << commandObj << " to "
<< (isLocalShard ? " local " : "") << " shard " << shardId;
- return _scheduler->scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
+
+ return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
.then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
@@ -715,8 +727,4 @@ Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard(
});
}
-void TransactionCoordinatorDriver::cancel() {
- _cancelled.store(true);
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_driver.h b/src/mongo/db/s/transaction_coordinator_driver.h
index 5e935753eb6..99dce0b67e1 100644
--- a/src/mongo/db/s/transaction_coordinator_driver.h
+++ b/src/mongo/db/s/transaction_coordinator_driver.h
@@ -191,15 +191,6 @@ public:
Future<void> deleteCoordinatorDoc(const LogicalSessionId& lsid, TxnNumber txnNumber);
/**
- * Non-blocking method, which interrupts any executing asynchronous tasks and prevents new ones
- * from getting scheduled. Any futures returned by the driver must still be waited on before the
- * object is disposed of.
- *
- * TODO (SERVER-38522): Implement using real cancellation through the AsyncWorkScheduler
- */
- void cancel();
-
- /**
* Reads and returns all documents in config.transaction_coordinators.
*/
static std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(
@@ -210,32 +201,37 @@ public:
//
/**
- * Sends prepare to a given shard, retrying until a response is received from the participant or
- * until the coordinator is no longer in a preparing state due to having already received a vote
- * to abort from another participant.
+ * Sends prepare to the given shard and returns a future, which will be set with the vote.
*
- * Returns a future containing the participant's response.
+ * This method will retry until it receives a non-retryable response from the remote node or
+ * until the scheduler under which it is running is shut down. Because of this it can return
+ * only the following error code(s):
+ * - TransactionCoordinatorSteppingDown
+ * - ShardNotFound
*/
- Future<txn::PrepareResponse> sendPrepareToShard(const ShardId& shardId,
+ Future<txn::PrepareResponse> sendPrepareToShard(txn::AsyncWorkScheduler& scheduler,
+ const ShardId& shardId,
const BSONObj& prepareCommandObj);
/**
- * Sends a command corresponding to a commit decision (i.e. commitTransaction or
- * abortTransaction) to the given shard ID and retries on any retryable error until the command
- * succeeds or receives a response that may be interpreted as a vote to abort (e.g.
- * NoSuchTransaction).
+ * Sends a command corresponding to a commit decision (i.e. commitTransaction or*
+ * abortTransaction) to the given shard and returns a future, which will be set with the result.
*
* Used for sendCommit and sendAbort.
+ *
+ * This method will retry until it receives a response from the remote node which can be
+ * interpreted as vote abort (e.g. NoSuchTransaction), or until the scheduler under which it is
+ * running is shut down. Because of this it can return only the following error code(s):
+ * - TransactionCoordinatorSteppingDown
*/
- Future<void> sendDecisionToParticipantShard(const ShardId& shardId, const BSONObj& commandObj);
+ Future<void> sendDecisionToParticipantShard(txn::AsyncWorkScheduler& scheduler,
+ const ShardId& shardId,
+ const BSONObj& commandObj);
private:
ServiceContext* _serviceContext;
std::unique_ptr<txn::AsyncWorkScheduler> _scheduler;
-
- // TODO (SERVER-38522): Remove once AsyncWorkScheduler is used for cancellation
- AtomicWord<bool> _cancelled{false};
};
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
index 55627f400ed..81d101c91e8 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
@@ -59,10 +59,10 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext)
AsyncWorkScheduler::~AsyncWorkScheduler() {
{
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- invariant(_activeOpContexts.empty());
- invariant(_activeHandles.empty());
- invariant(_childSchedulers.empty());
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ _allListsEmptyCV.wait(ul, [&] {
+ return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty();
+ });
}
if (!_parent)
@@ -70,6 +70,7 @@ AsyncWorkScheduler::~AsyncWorkScheduler() {
stdx::lock_guard<stdx::mutex> lg(_parent->_mutex);
_parent->_childSchedulers.erase(_itToRemove);
+ _parent->_notifyAllTasksComplete(lg);
_parent = nullptr;
}
@@ -165,6 +166,7 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot
[ this, it = std::move(it) ](StatusWith<ResponseStatus> s) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
_activeHandles.erase(it);
+ _notifyAllTasksComplete(lg);
});
});
}
@@ -212,7 +214,7 @@ Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId,
const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
if (MONGO_FAIL_POINT(hangWhileTargetingRemoteHost)) {
- LOG(0) << "Hit hangWhileTargetingRemoteHost failpoint";
+ LOG(0) << "Hit hangWhileTargetingRemoteHost failpoint for shard " << shardId;
MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangWhileTargetingRemoteHost);
}
@@ -221,6 +223,14 @@ Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId,
});
}
+void AsyncWorkScheduler::_notifyAllTasksComplete(WithLock) {
+ if (_shutdownStatus.isOK())
+ return;
+
+ if (_activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty())
+ _allListsEmptyCV.notify_all();
+}
+
Future<void> whenAll(std::vector<Future<void>>& futures) {
std::vector<Future<int>> dummyFutures;
for (auto&& f : futures) {
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h
index d8ca57a4063..942ee7e2fbf 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.h
@@ -93,6 +93,9 @@ public:
auto scopedGuard = makeGuard([&] {
ul.lock();
_activeOpContexts.erase(uniqueOpCtxIter);
+ // There is no need to call _notifyAllTasksComplete here, because we
+ // will still have an outstanding _activeHandles entry, so the scheduler
+ // is never completed at this point
});
return task(uniqueOpCtxIter->get());
@@ -108,6 +111,7 @@ public:
[ this, it = std::move(it) ](StatusOrStatusWith<ReturnType> s) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
_activeHandles.erase(it);
+ _notifyAllTasksComplete(lg);
});
} catch (const DBException& ex) {
taskCompletionPromise->setError(ex.toStatus());
@@ -151,6 +155,13 @@ private:
Future<HostAndPort> _targetHostAsync(const ShardId& shardId,
const ReadPreferenceSetting& readPref);
+ /**
+ * Invoked every time a registered op context, handle or child scheduler is getting
+ * unregistered. Used to notify the 'all lists empty' condition variable when there is no more
+ * activity on a scheduler which has been shut down.
+ */
+ void _notifyAllTasksComplete(WithLock);
+
// Service context under which this executor runs
ServiceContext* const _serviceContext;
@@ -177,6 +188,10 @@ private:
// Any outstanding child schedulers created though 'makeChildScheduler'
ChildIteratorsList _childSchedulers;
+
+ // Notified when the the scheduler is shut down and the last active handle, operation context or
+ // child scheduler has been unregistered.
+ stdx::condition_variable _allListsEmptyCV;
};
enum class ShouldStopIteration { kYes, kNo };
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp
index cf7a2f75e5a..4948c57d260 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp
@@ -597,6 +597,27 @@ TEST_F(AsyncWorkSchedulerTest, MakeChildSchedulerAfterShutdownParentScheduler) {
ASSERT_THROWS_CODE(childFuture2.get(), AssertionException, ErrorCodes::InternalError);
}
+TEST_F(AsyncWorkSchedulerTest, ShutdownAllowedFromScheduleWorkAtCallback) {
+ AsyncWorkScheduler async(getServiceContext());
+ auto future = async.scheduleWork([&](OperationContext* opCtx) {
+ async.shutdown({ErrorCodes::InternalError, "Test error"});
+ });
+
+ future.get();
+}
+
+TEST_F(AsyncWorkSchedulerTest, DestroyingSchedulerCapturedInFutureCallback) {
+ auto async = std::make_unique<AsyncWorkScheduler>(getServiceContext());
+
+ Barrier barrier(2);
+ auto future =
+ async->scheduleWork([&barrier](OperationContext* opCtx) { barrier.countDownAndWait(); })
+ .tapAll([ async = std::move(async), &barrier ](Status){});
+
+ barrier.countDownAndWait();
+ future.get();
+}
+
using DoWhileTest = AsyncWorkSchedulerTest;
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index 8f1301873e4..e5b5e15e300 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -36,8 +36,6 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/transaction_coordinator_document_gen.h"
#include "mongo/db/write_concern.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
@@ -69,6 +67,7 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
Date_t commitDeadline) {
auto cas = _getCatalogAndScheduler(opCtx);
auto& catalog = cas->catalog;
+ auto& scheduler = cas->scheduler;
if (auto latestTxnNumAndCoordinator = catalog.getLatestOnSession(opCtx, lsid)) {
auto latestCoordinator = latestTxnNumAndCoordinator->second;
@@ -78,27 +77,14 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
latestCoordinator->cancelIfCommitNotYetStarted();
}
- auto coordinator =
- std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber);
-
- catalog.insert(opCtx, lsid, txnNumber, coordinator);
-
- // Schedule a task in the future to cancel the commit coordination on the coordinator, so that
- // the coordinator does not remain in memory forever (in case the particpant list is never
- // received).
- const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- auto cbHandle = uassertStatusOK(executor->scheduleWorkAt(
- commitDeadline,
- [coordinatorWeakPtr = std::weak_ptr<TransactionCoordinator>(coordinator)](
- const mongo::executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
- auto coordinator = coordinatorWeakPtr.lock();
- if (coordinator) {
- coordinator->cancelIfCommitNotYetStarted();
- }
- }));
-
- // TODO (SERVER-38715): Store the callback handle in the coordinator, so that the coordinator
- // can cancel the cancel task on receiving the participant list.
+ catalog.insert(opCtx,
+ lsid,
+ txnNumber,
+ std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(),
+ lsid,
+ txnNumber,
+ scheduler.makeChildScheduler(),
+ commitDeadline));
}
boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coordinateCommit(
@@ -114,8 +100,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor
return boost::none;
}
- std::vector<ShardId> participants(participantList.begin(), participantList.end());
- auto decisionFuture = coordinator->runCommit(participants);
+ coordinator->runCommit(std::vector<ShardId>{participantList.begin(), participantList.end()});
return coordinator->onCompletion().then(
[coordinator] { return coordinator->getDecision().get(); });
@@ -123,7 +108,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor
// TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision
// is made durable. Currently the coordinator waits to hear acks because participants in prepare
// reject requests with a higher transaction number, causing tests to fail.
- // return coordinator->runCommit(participants);
+ // return coordinator->getDecision();
}
boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::recoverCommit(
@@ -181,6 +166,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
<< " transactions";
auto& catalog = catalogAndScheduler->catalog;
+ auto& scheduler = catalogAndScheduler->scheduler;
for (const auto& doc : coordinatorDocs) {
LOG(3) << "Going to resume coordinating commit for " << doc.toBSON();
@@ -188,8 +174,13 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
const auto lsid = *doc.getId().getSessionId();
const auto txnNumber = *doc.getId().getTxnNumber();
- auto coordinator = std::make_shared<TransactionCoordinator>(
- opCtx->getServiceContext(), lsid, txnNumber);
+ auto coordinator =
+ std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(),
+ lsid,
+ txnNumber,
+ scheduler.makeChildScheduler(),
+ boost::none /* No deadline */);
+
catalog.insert(opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
coordinator->continueCommit(doc);
}
@@ -214,8 +205,7 @@ void TransactionCoordinatorService::onStepDown() {
_catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler);
}
- _catalogAndSchedulerToCleanup->scheduler.shutdown(
- {ErrorCodes::InterruptedDueToStepDown, "Transaction coordinator service stepping down"});
+ _catalogAndSchedulerToCleanup->onStepDown();
}
void TransactionCoordinatorService::onShardingInitialization(OperationContext* opCtx,
@@ -253,9 +243,19 @@ void TransactionCoordinatorService::_joinPreviousRound() {
// Block until all coordinators scheduled the previous time the service was primary to have
// drained. Because the scheduler was interrupted, it should be extremely rare for there to be
// any coordinators left, so if this actually causes blocking, it would most likely be a bug.
- _catalogAndSchedulerToCleanup->recoveryTaskCompleted->wait();
- _catalogAndSchedulerToCleanup->catalog.join();
+ _catalogAndSchedulerToCleanup->join();
_catalogAndSchedulerToCleanup.reset();
}
+void TransactionCoordinatorService::CatalogAndScheduler::onStepDown() {
+ scheduler.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown,
+ "Transaction coordinator service stepping down"});
+ catalog.onStepDown();
+}
+
+void TransactionCoordinatorService::CatalogAndScheduler::join() {
+ recoveryTaskCompleted->wait();
+ catalog.join();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h
index d65751653c7..9cfdc68faab 100644
--- a/src/mongo/db/s/transaction_coordinator_service.h
+++ b/src/mongo/db/s/transaction_coordinator_service.h
@@ -106,6 +106,9 @@ private:
struct CatalogAndScheduler {
CatalogAndScheduler(ServiceContext* service) : scheduler(service) {}
+ void onStepDown();
+ void join();
+
txn::AsyncWorkScheduler scheduler;
TransactionCoordinatorCatalog catalog;
diff --git a/src/mongo/db/s/transaction_coordinator_service_test.cpp b/src/mongo/db/s/transaction_coordinator_service_test.cpp
index 6f4eb769e2a..5bc2eec8087 100644
--- a/src/mongo/db/s/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service_test.cpp
@@ -65,9 +65,7 @@ const StatusWith<BSONObj> kPrepareOkButWriteConcernError =
<< kDummyWriteConcernError);
class TransactionCoordinatorServiceTestFixture : public TransactionCoordinatorTestFixture {
-public:
- // Prepare responses
-
+protected:
void assertPrepareSentAndRespondWithSuccess() {
assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
kPrepareOk,
@@ -145,9 +143,6 @@ public:
ASSERT_FALSE(network()->hasReadyRequests());
}
- // TODO (SERVER-38382): Put all these helpers in one separate file and share with
- // transaction_coordinator_test.
-
/**
* Goes through the steps to commit a transaction through the coordinator service for a given
* lsid and txnNumber. Useful when not explictly testing the commit protocol.
@@ -183,9 +178,8 @@ public:
auto commitDecisionFuture =
*coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet);
- for (size_t i = 0; i < shardIdSet.size(); ++i) {
- assertPrepareSentAndRespondWithNoSuchTransaction();
- }
+ // It is sufficient to abort just one of the participants
+ assertPrepareSentAndRespondWithNoSuchTransaction();
for (size_t i = 0; i < shardIdSet.size(); ++i) {
assertAbortSentAndRespondWithSuccess();
@@ -390,11 +384,11 @@ TEST_F(TransactionCoordinatorServiceTest,
operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
assertPrepareSentAndRespondWithNoSuchTransaction();
+ advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered
auto commitDecisionFuture2 = *coordinatorService->coordinateCommit(
operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
- assertPrepareSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -433,11 +427,11 @@ TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationT
operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
assertPrepareSentAndRespondWithNoSuchTransaction();
+ advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered
auto commitDecisionFuture2 =
*coordinatorService->recoverCommit(operationContext(), _lsid, _txnNumber);
- assertPrepareSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -479,7 +473,6 @@ TEST_F(TransactionCoordinatorServiceTest,
coordinatorService->createCoordinator(
operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
-
ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()),
static_cast<int>(txn::CommitDecision::kCanceled));
}
@@ -661,8 +654,8 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
// Simulate a participant voting to abort.
- assertPrepareSentAndRespondWithNoSuchTransaction();
- assertPrepareSentAndRespondWithSuccess();
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; },
+ [&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }});
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -678,8 +671,6 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
ASSERT_FALSE(commitDecisionFuture.isReady());
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(*coordinatorService(), _lsid, _txnNumber, kTwoShardIdSet, kTwoShardIdList[0]);
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index 6132e8e97b9..2f3d918886e 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -41,8 +41,6 @@
namespace mongo {
namespace {
-const Timestamp kDummyTimestamp = Timestamp::min();
-const Date_t kCommitDeadline = Date_t::max();
const StatusWith<BSONObj> kNoSuchTransaction =
BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction);
const StatusWith<BSONObj> kOk = BSON("ok" << 1);
@@ -99,41 +97,6 @@ protected:
ASSERT_FALSE(network()->hasReadyRequests());
}
- /**
- * Goes through the steps to commit a transaction through the coordinator service for a given
- * lsid and txnNumber. Useful when not explictly testing the commit protocol.
- */
- void commitTransaction(const std::set<ShardId>& transactionParticipantShards) {
- for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
- assertPrepareSentAndRespondWithSuccess(kDummyPrepareTimestamp);
- }
-
- for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
- assertCommitSentAndRespondWithSuccess();
- }
- }
-
- /**
- * Goes through the steps to abort a transaction through the coordinator service for a given
- * lsid and txnNumber. Useful when not explictly testing the abort protocol.
- */
- void abortTransaction(const std::set<ShardId>& shardIdSet, const ShardId& abortingShard) {
- // auto commitDecisionFuture =
- // coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber,
- // shardIdSet);
-
- for (size_t i = 0; i < shardIdSet.size(); ++i) {
- assertPrepareSentAndRespondWithNoSuchTransaction();
- }
-
- // Abort gets sent to the second participant as soon as the first participant
- // receives a not-okay response to prepare.
- assertAbortSentAndRespondWithSuccess();
-
- // Wait for abort to complete.
- // commitDecisionFuture.get();
- }
-
LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
TxnNumber _txnNumber{1};
};
@@ -166,105 +129,134 @@ auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnN
}
TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
Future<void> future = _driver->sendDecisionToParticipantShard(
- kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ ASSERT(!future.isReady());
assertPrepareSentAndRespondWithSuccess();
- future.get(operationContext());
+
+ future.get();
}
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
Future<void> future = _driver->sendDecisionToParticipantShard(
- kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithSuccess();
- future.get(operationContext());
+
+ future.get();
}
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
Future<void> future = _driver->sendDecisionToParticipantShard(
- kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
-
- std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); });
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithSuccess();
+
+ future.get();
}
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
Future<void> future = _driver->sendDecisionToParticipantShard(
- kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
-
- std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); });
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ future.get();
}
-TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) {
- Future<PrepareResponse> future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+TEST_F(TransactionCoordinatorDriverTest,
+ SendDecisionToParticipantShardCanBeInterruptedAndReturnsError) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<void> future = _driver->sendDecisionToParticipantShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ assertPrepareSentAndRespondWithRetryableError();
+ aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"});
+ advanceClockAndExecuteScheduledTasks();
+
+ ASSERT_THROWS_CODE(
+ future.get(), AssertionException, ErrorCodes::TransactionCoordinatorSteppingDown);
+}
- std::move(future).getAsync([](StatusWith<PrepareResponse> swResponse) {
- ASSERT_OK(swResponse.getStatus());
- auto response = swResponse.getValue();
- ASSERT(response.vote == txn::PrepareVote::kCommit);
- ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
- });
+TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<PrepareResponse> future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ ASSERT(!future.isReady());
- // Simulate a participant voting to commit.
assertPrepareSentAndRespondWithSuccess();
+
+ auto response = future.get();
+ ASSERT(response.vote == txn::PrepareVote::kCommit);
+ ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsCommitDecisionOnRetryableErrorThenOkResponse) {
- Future<PrepareResponse> future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
-
- std::move(future).getAsync([](StatusWith<PrepareResponse> swResponse) {
- ASSERT_OK(swResponse.getStatus());
- auto response = swResponse.getValue();
- ASSERT(response.vote == txn::PrepareVote::kCommit);
- ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
- });
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<PrepareResponse> future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
+ ASSERT(!future.isReady());
+
assertPrepareSentAndRespondWithSuccess();
+
+ auto response = future.get();
+ ASSERT(response.vote == txn::PrepareVote::kCommit);
+ ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
}
-TEST_F(
- TransactionCoordinatorDriverTest,
- SendPrepareToShardStopsRetryingAfterRetryableErrorAndReturnsNoneIfCoordinatorStateIsNotPrepare) {
- Future<PrepareResponse> future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareToShardCanBeInterruptedAndReturnsNoDecisionIfNotServiceShutdown) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<PrepareResponse> future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ assertPrepareSentAndRespondWithRetryableError();
+ aws.shutdown({ErrorCodes::TransactionCoordinatorReachedAbortDecision, "Retry interrupted"});
+ advanceClockAndExecuteScheduledTasks();
+
+ auto response = future.get();
+ ASSERT(response.vote == boost::none);
+ ASSERT(response.prepareTimestamp == boost::none);
+}
- auto resultFuture = std::move(future).then([](PrepareResponse response) {
- ASSERT(response.vote == boost::none);
- ASSERT(response.prepareTimestamp == boost::none);
- });
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareToShardCanBeInterruptedAndThrowsExceptionIfServiceShutdown) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<PrepareResponse> future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
- _driver->cancel();
assertPrepareSentAndRespondWithRetryableError();
- resultFuture.get();
+ aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Service shutting down"});
+ advanceClockAndExecuteScheduledTasks();
+
+ ASSERT_THROWS_CODE(
+ future.get(), AssertionException, ErrorCodes::TransactionCoordinatorSteppingDown);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) {
- auto future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber))
- .then([&](PrepareResponse response) {
- ASSERT(response.vote == txn::PrepareVote::kAbort);
- ASSERT(response.prepareTimestamp == boost::none);
- return response;
- });
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -275,55 +267,52 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) {
- auto future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber))
- .then([&](PrepareResponse response) {
- ASSERT(response.vote == txn::PrepareVote::kAbort);
- ASSERT(response.prepareTimestamp == boost::none);
- });
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithNoSuchTransaction();
- future.get();
+
+ auto response = future.get();
+ ASSERT(response.vote == txn::PrepareVote::kAbort);
+ ASSERT(response.prepareTimestamp == boost::none);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
- assertPrepareSentAndRespondWithNoSuchTransaction();
- assertPrepareSentAndRespondWithSuccess();
- future.get();
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
+ [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
assertPrepareSentAndRespondWithSuccess();
assertPrepareSentAndRespondWithNoSuchTransaction();
- future.get();
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
- assertPrepareSentAndRespondWithNoSuchTransaction();
- assertPrepareSentAndRespondWithNoSuchTransaction();
- future.get();
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
+ [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -331,15 +320,14 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
- future.get(); // Should be able to return after the first participant responds.
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -347,15 +335,14 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
- future.get(); // Should be able to return after the first participant responds.
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -363,15 +350,14 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
- future.get(); // Should be able to return after the first participant responds.
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
}
@@ -660,13 +646,40 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
using TransactionCoordinatorTest = TransactionCoordinatorTestBase;
-TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitResponses) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
- // Simulate a participant voting to abort.
- assertPrepareSentAndRespondWithNoSuchTransaction();
assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit));
+
+ coordinator.onCompletion().get();
+}
+
+TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommitResponses) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
+
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
+ [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -677,33 +690,40 @@ TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort)
coordinator.onCompletion().get();
}
-TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbortResponses) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
- assertPrepareSentAndRespondWithSuccess();
- assertPrepareSentAndRespondWithSuccess();
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; },
+ [&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }});
- assertCommitSentAndRespondWithSuccess();
- assertCommitSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
auto commitDecision = commitDecisionFuture.get();
- ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit));
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort));
coordinator.onCompletion().get();
}
-TEST_F(TransactionCoordinatorTest,
- RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesOneParticipantAborts) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortResponseOnly) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
- // One participant votes abort after retry.
- assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithNoSuchTransaction();
-
- // One participant votes commit.
- assertPrepareSentAndRespondWithSuccess();
+ advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -715,16 +735,48 @@ TEST_F(TransactionCoordinatorTest,
}
TEST_F(TransactionCoordinatorTest,
- RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesBothParticipantsAbort) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+ RunCommitProducesAbortDecisionOnOneCommitResponseAndOneAbortResponseAfterRetry) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
+
+ // One participant votes commit and other encounters retryable error
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; },
+ [&](const executor::RemoteCommandRequest& request) { return kRetryableError; }});
+ advanceClockAndExecuteScheduledTasks(); // Make sure the scheduled retry executes
// One participant votes abort after retry.
- assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithNoSuchTransaction();
- // One participant votes abort.
- assertPrepareSentAndRespondWithNoSuchTransaction();
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort));
+
+ coordinator.onCompletion().get();
+}
+
+TEST_F(TransactionCoordinatorTest,
+ RunCommitProducesAbortDecisionOnOneAbortResponseAndOneRetryableAbortResponse) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
+
+ // One participant votes abort and other encounters retryable error
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
+ [&](const executor::RemoteCommandRequest& request) { return kRetryableError; }});
+ advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -736,9 +788,15 @@ TEST_F(TransactionCoordinatorTest,
}
TEST_F(TransactionCoordinatorTest,
- RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+ RunCommitProducesCommitDecisionOnCommitAfterMultipleNetworkRetries) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
// One participant votes commit after retry.
assertPrepareSentAndRespondWithRetryableError();
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
index 6f6df8fe73e..d7384e906ae 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
@@ -101,7 +101,7 @@ void TransactionCoordinatorTestFixture::assertCommandSentAndRespondWith(
const StatusWith<BSONObj>& response,
boost::optional<BSONObj> expectedWriteConcern) {
onCommand([&](const executor::RemoteCommandRequest& request) {
- ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName);
+ ASSERT_EQ(commandName, request.cmdObj.firstElement().fieldNameStringData());
if (expectedWriteConcern) {
ASSERT_BSONOBJ_EQ(
*expectedWriteConcern,