diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2019-05-29 11:44:12 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2019-06-07 14:07:49 -0400 |
commit | 0eb92d645b6f132c0c9019502d3293631a4972ed (patch) | |
tree | 24bfe561d04236572a677affa9bd6d49755d2e70 | |
parent | 1dbfa74ff20b1f0421fc70740f3ef8a8f85be89b (diff) | |
download | mongo-0eb92d645b6f132c0c9019502d3293631a4972ed.tar.gz |
SERVER-41179 Observe the coordinator's state transitions and update the corresponding stats and metrics
(cherry picked from commit d14cfb93a7fd33edf679f7fcd68c46b8d2be0b6b)
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/server_transaction_coordinators_metrics.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/server_transaction_coordinators_metrics.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/single_transaction_coordinator_stats.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/s/single_transaction_coordinator_stats.h | 122 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 66 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.h | 27 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_metrics_observer.cpp | 184 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_metrics_observer.h | 124 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test.cpp | 1020 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.cpp | 33 |
11 files changed, 1636 insertions, 31 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 80233815359..be69b3fbcac 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -126,6 +126,7 @@ env.Library( 'transaction_coordinator_catalog.cpp', 'transaction_coordinator_factory_mongod.cpp', 'transaction_coordinator_futures_util.cpp', + 'transaction_coordinator_metrics_observer.cpp', 'transaction_coordinator_service.cpp', 'transaction_coordinator_structures.cpp', 'transaction_coordinator_util.cpp', diff --git a/src/mongo/db/s/server_transaction_coordinators_metrics.cpp b/src/mongo/db/s/server_transaction_coordinators_metrics.cpp index e18d581fbef..5bac2c5c211 100644 --- a/src/mongo/db/s/server_transaction_coordinators_metrics.cpp +++ b/src/mongo/db/s/server_transaction_coordinators_metrics.cpp @@ -104,4 +104,14 @@ void ServerTransactionCoordinatorsMetrics::decrementCurrentWaitingForDecisionAck _totalWaitingForDecisionAcks.fetchAndSubtract(1); } +std::int64_t ServerTransactionCoordinatorsMetrics::getCurrentDeletingCoordinatorDoc() { + return _totalDeletingCoordinatorDoc.load(); +} +void ServerTransactionCoordinatorsMetrics::incrementCurrentDeletingCoordinatorDoc() { + _totalDeletingCoordinatorDoc.fetchAndAdd(1); +} +void ServerTransactionCoordinatorsMetrics::decrementCurrentDeletingCoordinatorDoc() { + _totalDeletingCoordinatorDoc.fetchAndSubtract(1); +} + } // namespace mongo diff --git a/src/mongo/db/s/server_transaction_coordinators_metrics.h b/src/mongo/db/s/server_transaction_coordinators_metrics.h index 4cc19915256..9218e5584b4 100644 --- a/src/mongo/db/s/server_transaction_coordinators_metrics.h +++ b/src/mongo/db/s/server_transaction_coordinators_metrics.h @@ -70,6 +70,10 @@ public: void incrementCurrentWaitingForDecisionAcks(); void decrementCurrentWaitingForDecisionAcks(); + std::int64_t getCurrentDeletingCoordinatorDoc(); + void incrementCurrentDeletingCoordinatorDoc(); + void decrementCurrentDeletingCoordinatorDoc(); + private: // The total number of transaction coordinators created on this process since the process's // inception. @@ -90,6 +94,9 @@ private: // The number of transaction coordinators currently in the "waiting for decision acks" phase. AtomicWord<std::int64_t> _totalWaitingForDecisionAcks{0}; + + // The number of transaction coordinators currently in the "deleting coordinator doc" phase. + AtomicWord<std::int64_t> _totalDeletingCoordinatorDoc{0}; }; } // namespace mongo diff --git a/src/mongo/db/s/single_transaction_coordinator_stats.cpp b/src/mongo/db/s/single_transaction_coordinator_stats.cpp index 7f84addcb00..2fc5db4e146 100644 --- a/src/mongo/db/s/single_transaction_coordinator_stats.cpp +++ b/src/mongo/db/s/single_transaction_coordinator_stats.cpp @@ -33,17 +33,17 @@ namespace mongo { -void SingleTransactionCoordinatorStats::setStartTime(TickSource::Tick curTick, - Date_t curWallClockTime) { - invariant(!_startTime); +void SingleTransactionCoordinatorStats::setCreateTime(TickSource::Tick curTick, + Date_t curWallClockTime) { + invariant(!_createTime); - _startTime = curTick; - _startWallClockTime = curWallClockTime; + _createTime = curTick; + _createWallClockTime = curWallClockTime; } void SingleTransactionCoordinatorStats::setEndTime(TickSource::Tick curTick, Date_t curWallClockTime) { - invariant(_startTime); + invariant(_createTime); invariant(!_endTime); _endTime = curTick; @@ -52,7 +52,7 @@ void SingleTransactionCoordinatorStats::setEndTime(TickSource::Tick curTick, void SingleTransactionCoordinatorStats::setWritingParticipantListStartTime( TickSource::Tick curTick, Date_t curWallClockTime) { - invariant(_startTime); + invariant(_createTime); invariant(!_writingParticipantListStartTime); _writingParticipantListStartTime = curTick; @@ -86,14 +86,32 @@ void SingleTransactionCoordinatorStats::setWaitingForDecisionAcksStartTime( _waitingForDecisionAcksStartWallClockTime = curWallClockTime; } -Microseconds SingleTransactionCoordinatorStats::getDuration(TickSource* tickSource, - TickSource::Tick curTick) const { - invariant(_startTime); +void SingleTransactionCoordinatorStats::setDeletingCoordinatorDocStartTime( + TickSource::Tick curTick, Date_t curWallClockTime) { + invariant(!_deletingCoordinatorDocStartTime); + + _deletingCoordinatorDocStartTime = curTick; + _deletingCoordinatorDocStartWallClockTime = curWallClockTime; +} + +Microseconds SingleTransactionCoordinatorStats::getDurationSinceCreation( + TickSource* tickSource, TickSource::Tick curTick) const { + invariant(_createTime); if (_endTime) { - return tickSource->ticksTo<Microseconds>(_endTime - _startTime); + return tickSource->ticksTo<Microseconds>(_endTime - _createTime); } - return tickSource->ticksTo<Microseconds>(curTick - _startTime); + return tickSource->ticksTo<Microseconds>(curTick - _createTime); +} + +Microseconds SingleTransactionCoordinatorStats::getTwoPhaseCommitDuration( + TickSource* tickSource, TickSource::Tick curTick) const { + invariant(_writingParticipantListStartTime); + + if (_endTime) { + return tickSource->ticksTo<Microseconds>(_endTime - _writingParticipantListStartTime); + } + return tickSource->ticksTo<Microseconds>(curTick - _writingParticipantListStartTime); } Microseconds SingleTransactionCoordinatorStats::getWritingParticipantListDuration( @@ -104,6 +122,11 @@ Microseconds SingleTransactionCoordinatorStats::getWritingParticipantListDuratio return tickSource->ticksTo<Microseconds>(_waitingForVotesStartTime - _writingParticipantListStartTime); } + + if (_endTime) { + return tickSource->ticksTo<Microseconds>(_endTime - _writingParticipantListStartTime); + } + return tickSource->ticksTo<Microseconds>(curTick - _writingParticipantListStartTime); } @@ -115,6 +138,11 @@ Microseconds SingleTransactionCoordinatorStats::getWaitingForVotesDuration( return tickSource->ticksTo<Microseconds>(_writingDecisionStartTime - _waitingForVotesStartTime); } + + if (_endTime) { + return tickSource->ticksTo<Microseconds>(_endTime - _waitingForVotesStartTime); + } + return tickSource->ticksTo<Microseconds>(curTick - _waitingForVotesStartTime); } @@ -126,6 +154,11 @@ Microseconds SingleTransactionCoordinatorStats::getWritingDecisionDuration( return tickSource->ticksTo<Microseconds>(_waitingForDecisionAcksStartTime - _writingDecisionStartTime); } + + if (_endTime) { + return tickSource->ticksTo<Microseconds>(_endTime - _writingDecisionStartTime); + } + return tickSource->ticksTo<Microseconds>(curTick - _writingDecisionStartTime); } @@ -133,10 +166,26 @@ Microseconds SingleTransactionCoordinatorStats::getWaitingForDecisionAcksDuratio TickSource* tickSource, TickSource::Tick curTick) const { invariant(_waitingForDecisionAcksStartTime); + if (_deletingCoordinatorDocStartTime) { + return tickSource->ticksTo<Microseconds>(_deletingCoordinatorDocStartTime - + _waitingForDecisionAcksStartTime); + } + if (_endTime) { return tickSource->ticksTo<Microseconds>(_endTime - _waitingForDecisionAcksStartTime); } + return tickSource->ticksTo<Microseconds>(curTick - _waitingForDecisionAcksStartTime); } +Microseconds SingleTransactionCoordinatorStats::getDeletingCoordinatorDocDuration( + TickSource* tickSource, TickSource::Tick curTick) const { + invariant(_deletingCoordinatorDocStartTime); + + if (_endTime) { + return tickSource->ticksTo<Microseconds>(_endTime - _deletingCoordinatorDocStartTime); + } + + return tickSource->ticksTo<Microseconds>(curTick - _deletingCoordinatorDocStartTime); +} } // namespace mongo diff --git a/src/mongo/db/s/single_transaction_coordinator_stats.h b/src/mongo/db/s/single_transaction_coordinator_stats.h index 030f64fea44..05c75d7c076 100644 --- a/src/mongo/db/s/single_transaction_coordinator_stats.h +++ b/src/mongo/db/s/single_transaction_coordinator_stats.h @@ -52,12 +52,12 @@ public: * * Can only be called once. */ - void setStartTime(TickSource::Tick curTick, Date_t curWallClockTime); + void setCreateTime(TickSource::Tick curTick, Date_t curWallClockTime); /** * Sets the time the transaction coordinator was destroyed. * - * Can only be called once, and must be called after setStartTime. + * Can only be called once, and must be called after setCreateTime. */ void setEndTime(TickSource::Tick curTick, Date_t curWallClockTime); @@ -65,7 +65,7 @@ public: * Sets the time the transaction coordinator wrote the participant list and started waiting for * the participant list to become majority-committed. * - * Can only be called once, and must be called after setStartTime. + * Can only be called once, and must be called after setCreateTime. */ void setWritingParticipantListStartTime(TickSource::Tick curTick, Date_t curWallClockTime); @@ -93,17 +93,99 @@ public: */ void setWaitingForDecisionAcksStartTime(TickSource::Tick curTick, Date_t curWallClockTime); + /** + * Sets the time the transaction coordinator deleted its durable state. + * + * Can only be called once, and must be called after setWaitingForDecisionAcksStartTime. + */ + void setDeletingCoordinatorDocStartTime(TickSource::Tick curTick, Date_t curWallClockTime); + // // Getters // /** - * If the end time has been set, returns the duration between the start time and end time, else - * returns the duration between the start time and curTick. + * Returns the time the coordinator was created. + * + * Must be called after setCreateTime. + */ + Date_t getCreateTime() const { + return _createWallClockTime; + } + + /** + * Returns the time the coordinator was destroyed. + * + * Must be called after setCreateTime. + */ + Date_t getEndTime() const { + return _endWallClockTime; + } + + /** + * Returns the time the coordinator started writing the participant list. Note, this is also the + * two-phase commit start time. * - * Must be called after setStartTime, but can be called any number of times. + * Must be called after setWritingParticipantListStartTime. */ - Microseconds getDuration(TickSource* tickSource, TickSource::Tick curTick) const; + Date_t getWritingParticipantListStartTime() const { + return _writingParticipantListStartWallClockTime; + } + + /** + * Returns the time the coordinator started sending 'prepare' and collecting votes. + * + * Must be called after setWaitingForVotesStartTime. + */ + Date_t getWaitingForVotesStartTime() const { + return _waitingForVotesStartWallClockTime; + } + + /** + * Returns the time the coordinator started making the decision durable. + * + * Must be called after setWritingDecisionStartTime. + */ + Date_t getWritingDecisionStartTime() const { + return _writingDecisionStartWallClockTime; + } + + /** + * Returns the time the coordinator started sending the decision and waiting for + * acknowledgments. + * + * Must be called after setWaitingForDecisionAcksStartTime. + */ + Date_t getWaitingForDecisionAcksStartTime() const { + return _waitingForDecisionAcksStartWallClockTime; + } + + /** + * Returns the time the coordinator started deleting its durable state. + * + * Must be called after setDeletingCoordinatorDocStartTime. + */ + Date_t getDeletingCoordinatorDocStartTime() const { + return _deletingCoordinatorDocStartWallClockTime; + } + + /** + * If the end time has been set, returns the duration between the create time and end time, else + * returns the duration between the create time and curTick. + * + * Must be called after setCreateTime, but can be called any number of times. + */ + Microseconds getDurationSinceCreation(TickSource* tickSource, TickSource::Tick curTick) const; + + /** + * If the end time has been set, returns the duration between the writing participant list start + * time and end time, else returns the duration between the writing participant list start time + * and curTick. + * + * Must be called after setWritingParticipantListStartTime, but can be called any number of + * times. + */ + Microseconds getTwoPhaseCommitDuration(TickSource* tickSource, TickSource::Tick curTick) const; /** * If the waiting for votes start time has been set, returns the duration between the writing @@ -135,9 +217,9 @@ public: Microseconds getWritingDecisionDuration(TickSource* tickSource, TickSource::Tick curTick) const; /** - * If the end time has been set, returns the duration between the waiting for decision acks - * start time and the end time, else returns the duration between the waiting for decision acks - * start time and curTick. + * If the deleting coordinator doc start time has been set, returns the duration between the + * waiting for decision acks start time and the deleting coordinator doc start time, else + * returns the duration between the waiting for decision acks start time and curTick. * * Must be called after setWaitingForDecisionAcksStartTime, but can be called any number of * times. @@ -145,10 +227,23 @@ public: Microseconds getWaitingForDecisionAcksDuration(TickSource* tickSource, TickSource::Tick curTick) const; + /** + * If the end time has been set, returns the duration between the deleting coordinator doc start + * and the end time, else returns the duration between the deleting coordinator doc start time + * and curTick. + * + * Must be called after setDeletingCoordinatorDocStartTime, but can be called any number of + * times. + */ + Microseconds getDeletingCoordinatorDocDuration(TickSource* tickSource, + TickSource::Tick curTick) const; + private: - Date_t _startWallClockTime; - TickSource::Tick _startTime{0}; + Date_t _createWallClockTime; + TickSource::Tick _createTime{0}; + // The writing participant list start time doubles as the two-phase commit start time, since + // writing the participant list is the first step of the two-phase commit. Date_t _writingParticipantListStartWallClockTime; TickSource::Tick _writingParticipantListStartTime{0}; @@ -161,6 +256,9 @@ private: Date_t _waitingForDecisionAcksStartWallClockTime; TickSource::Tick _waitingForDecisionAcksStartTime{0}; + Date_t _deletingCoordinatorDocStartWallClockTime; + TickSource::Tick _deletingCoordinatorDocStartTime{0}; + Date_t _endWallClockTime; TickSource::Tick _endTime{0}; }; diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 3b85046b626..414bd3c058f 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/transaction_coordinator.h" #include "mongo/db/logical_clock.h" +#include "mongo/db/s/transaction_coordinator_metrics_observer.h" #include "mongo/util/log.h" namespace mongo { @@ -55,7 +56,9 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, _lsid(lsid), _txnNumber(txnNumber), _scheduler(std::move(scheduler)), - _sendPrepareScheduler(_scheduler->makeChildScheduler()) { + _sendPrepareScheduler(_scheduler->makeChildScheduler()), + _transactionCoordinatorMetricsObserver( + stdx::make_unique<TransactionCoordinatorMetricsObserver>()) { auto kickOffCommitPF = makePromiseFuture<void>(); _kickOffCommitPromise = std::move(kickOffCommitPF.promise); @@ -80,6 +83,12 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, } }); + // TODO: The duration will be meaningless after failover. + _transactionCoordinatorMetricsObserver->onCreate( + ServerTransactionCoordinatorsMetrics::get(_serviceContext), + _serviceContext->getTickSource(), + _serviceContext->getPreciseClockSource()->now()); + // Two-phase commit phases chain. Once this chain executes, the 2PC sequence has completed // either with success or error and the scheduled deadline task above has been joined. std::move(kickOffCommitPF.future) @@ -92,6 +101,15 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, { stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_participants); + + _step = Step::kWritingParticipantList; + + // TODO: The duration will be meaningless after failover. + _transactionCoordinatorMetricsObserver->onStartWritingParticipantList( + ServerTransactionCoordinatorsMetrics::get(_serviceContext), + _serviceContext->getTickSource(), + _serviceContext->getPreciseClockSource()->now()); + if (_participantsDurable) return Future<void>::makeReady(); } @@ -113,6 +131,15 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, { stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_participantsDurable); + + _step = Step::kWaitingForVotes; + + // TODO: The duration will be meaningless after failover. + _transactionCoordinatorMetricsObserver->onStartWaitingForVotes( + ServerTransactionCoordinatorsMetrics::get(_serviceContext), + _serviceContext->getTickSource(), + _serviceContext->getPreciseClockSource()->now()); + if (_decision) return Future<void>::makeReady(); } @@ -146,6 +173,15 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, { stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_decision); + + _step = Step::kWritingDecision; + + // TODO: The duration will be meaningless after failover. + _transactionCoordinatorMetricsObserver->onStartWritingDecision( + ServerTransactionCoordinatorsMetrics::get(_serviceContext), + _serviceContext->getTickSource(), + _serviceContext->getPreciseClockSource()->now()); + if (_decisionDurable) return Future<void>::makeReady(); } @@ -167,6 +203,14 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, { stdx::lock_guard<stdx::mutex> lg(_mutex); invariant(_decisionDurable); + + _step = Step::kWaitingForDecisionAcks; + + // TODO: The duration will be meaningless after failover. + _transactionCoordinatorMetricsObserver->onStartWaitingForDecisionAcks( + ServerTransactionCoordinatorsMetrics::get(_serviceContext), + _serviceContext->getTickSource(), + _serviceContext->getPreciseClockSource()->now()); } _decisionPromise.emplaceValue(_decision->getDecision()); @@ -189,6 +233,17 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, .then([this] { // Do a best-effort attempt (i.e., writeConcern w:1) to delete the coordinator's durable // state. + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + + _step = Step::kDeletingCoordinatorDoc; + + _transactionCoordinatorMetricsObserver->onStartDeletingCoordinatorDoc( + ServerTransactionCoordinatorsMetrics::get(_serviceContext), + _serviceContext->getTickSource(), + _serviceContext->getPreciseClockSource()->now()); + } + return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber); }) .onCompletion([ this, deadlineFuture = std::move(deadlineFuture) ](Status s) mutable { @@ -279,6 +334,15 @@ void TransactionCoordinator::_done(Status status) { << redact(status); stdx::unique_lock<stdx::mutex> ul(_mutex); + + const auto tickSource = _serviceContext->getTickSource(); + + _transactionCoordinatorMetricsObserver->onEnd( + ServerTransactionCoordinatorsMetrics::get(_serviceContext), + tickSource, + _serviceContext->getPreciseClockSource()->now(), + _step); + _completionPromisesFired = true; if (!_decisionDurable) { diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index 9d9fbdaf8ca..f379f50348e 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -36,6 +36,8 @@ namespace mongo { +class TransactionCoordinatorMetricsObserver; + /** * State machine, which implements the two-phase commit protocol for a specific transaction, * identified by lsid + txnNumber. @@ -50,6 +52,18 @@ class TransactionCoordinator { public: /** + * The two-phase commit steps. + */ + enum class Step { + kInactive, + kWritingParticipantList, + kWaitingForVotes, + kWritingDecision, + kWaitingForDecisionAcks, + kDeletingCoordinatorDoc, + }; + + /** * Instantiates a new TransactioncCoordinator for the specified lsid + txnNumber pair and gives * it a 'scheduler' to use for any asynchronous tasks it spawns. * @@ -103,6 +117,13 @@ public: */ void cancelIfCommitNotYetStarted(); + /** + * Returns the TransactionCoordinatorMetricsObserver for this TransactionCoordinator. + */ + const TransactionCoordinatorMetricsObserver& getMetricsObserverForTest() { + return *_transactionCoordinatorMetricsObserver; + } + private: bool _reserveKickOffCommitPromise(); @@ -129,6 +150,8 @@ private: // Protects the state below mutable stdx::mutex _mutex; + Step _step{Step::kInactive}; + // Promise/future pair which will be signaled when the coordinator has completed bool _kickOffCommitPromiseSet{false}; Promise<void> _kickOffCommitPromise; @@ -158,6 +181,10 @@ private: // TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations. bool _completionPromisesFired{false}; std::vector<Promise<void>> _completionPromises; + + // Store as unique_ptr to avoid a circular dependency between the TransactionCoordinator and the + // TransactionCoordinatorMetricsObserver. + std::unique_ptr<TransactionCoordinatorMetricsObserver> _transactionCoordinatorMetricsObserver; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp b/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp new file mode 100644 index 00000000000..593e4a8dfb4 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp @@ -0,0 +1,184 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/transaction_coordinator_metrics_observer.h" + +namespace mongo { + +void TransactionCoordinatorMetricsObserver::onCreate( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics, + TickSource* tickSource, + Date_t curWallClockTime) { + + // + // Per transaction coordinator stats. + // + _singleTransactionCoordinatorStats.setCreateTime(tickSource->getTicks(), curWallClockTime); + + // + // Server wide transaction coordinators metrics. + // + serverTransactionCoordinatorsMetrics->incrementTotalCreated(); +} + +void TransactionCoordinatorMetricsObserver::onStartWritingParticipantList( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics, + TickSource* tickSource, + Date_t curWallClockTime) { + + // + // Per transaction coordinator stats. + // + _singleTransactionCoordinatorStats.setWritingParticipantListStartTime(tickSource->getTicks(), + curWallClockTime); + + // + // Server wide transaction coordinators metrics. + // + + serverTransactionCoordinatorsMetrics->incrementTotalStartedTwoPhaseCommit(); + serverTransactionCoordinatorsMetrics->incrementCurrentWritingParticipantList(); +} + +void TransactionCoordinatorMetricsObserver::onStartWaitingForVotes( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics, + TickSource* tickSource, + Date_t curWallClockTime) { + + // + // Per transaction coordinator stats. + // + _singleTransactionCoordinatorStats.setWaitingForVotesStartTime(tickSource->getTicks(), + curWallClockTime); + + // + // Server wide transaction coordinators metrics. + // + serverTransactionCoordinatorsMetrics->decrementCurrentWritingParticipantList(); + serverTransactionCoordinatorsMetrics->incrementCurrentWaitingForVotes(); +} + +void TransactionCoordinatorMetricsObserver::onStartWritingDecision( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics, + TickSource* tickSource, + Date_t curWallClockTime) { + + // + // Per transaction coordinator stats. + // + _singleTransactionCoordinatorStats.setWritingDecisionStartTime(tickSource->getTicks(), + curWallClockTime); + + // + // Server wide transaction coordinators metrics. + // + serverTransactionCoordinatorsMetrics->decrementCurrentWaitingForVotes(); + serverTransactionCoordinatorsMetrics->incrementCurrentWritingDecision(); +} + +void TransactionCoordinatorMetricsObserver::onStartWaitingForDecisionAcks( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics, + TickSource* tickSource, + Date_t curWallClockTime) { + + // + // Per transaction coordinator stats. + // + _singleTransactionCoordinatorStats.setWaitingForDecisionAcksStartTime(tickSource->getTicks(), + curWallClockTime); + + // + // Server wide transaction coordinators metrics. + // + serverTransactionCoordinatorsMetrics->decrementCurrentWritingDecision(); + serverTransactionCoordinatorsMetrics->incrementCurrentWaitingForDecisionAcks(); +} + +void TransactionCoordinatorMetricsObserver::onStartDeletingCoordinatorDoc( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics, + TickSource* tickSource, + Date_t curWallClockTime) { + + // + // Per transaction coordinator stats. + // + _singleTransactionCoordinatorStats.setDeletingCoordinatorDocStartTime(tickSource->getTicks(), + curWallClockTime); + + // + // Server wide transaction coordinators metrics. + // + serverTransactionCoordinatorsMetrics->decrementCurrentWaitingForDecisionAcks(); + serverTransactionCoordinatorsMetrics->incrementCurrentDeletingCoordinatorDoc(); +} + +void TransactionCoordinatorMetricsObserver::onEnd( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics, + TickSource* tickSource, + Date_t curWallClockTime, + TransactionCoordinator::Step step) { + + // + // Per transaction coordinator stats. + // + _singleTransactionCoordinatorStats.setEndTime(tickSource->getTicks(), curWallClockTime); + + // + // Server wide transaction coordinators metrics. + // + _decrementLastStep(serverTransactionCoordinatorsMetrics, step); +} + +void TransactionCoordinatorMetricsObserver::_decrementLastStep( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics, + TransactionCoordinator::Step step) { + switch (step) { + case TransactionCoordinator::Step::kInactive: + break; + case TransactionCoordinator::Step::kWritingParticipantList: + serverTransactionCoordinatorsMetrics->decrementCurrentWritingParticipantList(); + break; + case TransactionCoordinator::Step::kWaitingForVotes: + serverTransactionCoordinatorsMetrics->decrementCurrentWaitingForVotes(); + break; + case TransactionCoordinator::Step::kWritingDecision: + serverTransactionCoordinatorsMetrics->decrementCurrentWritingDecision(); + break; + case TransactionCoordinator::Step::kWaitingForDecisionAcks: + serverTransactionCoordinatorsMetrics->decrementCurrentWaitingForDecisionAcks(); + break; + case TransactionCoordinator::Step::kDeletingCoordinatorDoc: + serverTransactionCoordinatorsMetrics->decrementCurrentDeletingCoordinatorDoc(); + break; + } +} + +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_metrics_observer.h b/src/mongo/db/s/transaction_coordinator_metrics_observer.h new file mode 100644 index 00000000000..38001cedc62 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_metrics_observer.h @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/s/server_transaction_coordinators_metrics.h" +#include "mongo/db/s/single_transaction_coordinator_stats.h" +#include "mongo/db/s/transaction_coordinator.h" + +namespace mongo { + +/** + * Updates transaction coordinator metrics (per- two-phase commit metrics and server-wide two-phase + * commit metrics) upon the appropriate event. + */ +class TransactionCoordinatorMetricsObserver { + +public: + /** + * Updates relevant metrics when a transaction coordinator is created. + */ + void onCreate(ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics, + TickSource* tickSource, + Date_t curWallClockTime); + + /** + * Updates relevant metrics when a transaction coordinator is about to write the participant + * list. + */ + void onStartWritingParticipantList( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics, + TickSource* tickSource, + Date_t curWallClockTime); + + /** + * Updates relevant metrics when a transaction coordinator is about to send 'prepare' and start + * waiting for votes (i.e., 'prepare' responses). + */ + void onStartWaitingForVotes( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics, + TickSource* tickSource, + Date_t curWallClockTime); + + /** + * Updates relevant metrics when a transaction coordinator is about to write the decision. + */ + void onStartWritingDecision( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics, + TickSource* tickSource, + Date_t curWallClockTime); + + /** + * Updates relevant metrics when a transaction coordinator is about to send the decision to + * participants and start waiting for acknowledgements. + */ + void onStartWaitingForDecisionAcks( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics, + TickSource* tickSource, + Date_t curWallClockTime); + + /** + * Updates relevant metrics when a transaction coordinator is about to delete its durable state. + */ + void onStartDeletingCoordinatorDoc( + ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics, + TickSource* tickSource, + Date_t curWallClockTime); + + /** + * Updates relevant metrics when a transaction coordinator is destroyed. + * + * The 'lastStep' parameter is needed because, unlike for the other state transitions, the + * coordinator can transition to the end state from any other state, for example on stepdown. + */ + void onEnd(ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics, + TickSource* tickSource, + Date_t curWallClockTime, + TransactionCoordinator::Step lastStep); + + /** + * Returns a read-only reference to the SingleTransactionCoordinatorStats object stored in this + * TransactionCoordinatorMetricsObserver instance. + */ + const SingleTransactionCoordinatorStats& getSingleTransactionCoordinatorStats() const { + return _singleTransactionCoordinatorStats; + } + +private: + /** + * Decrements the current active in 'step'. + */ + void _decrementLastStep(ServerTransactionCoordinatorsMetrics*, TransactionCoordinator::Step); + + // Tracks metrics for a single commit coordination. + SingleTransactionCoordinatorStats _singleTransactionCoordinatorStats; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index 2ca40d1ed6b..77bd01d1173 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -34,9 +34,14 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/s/transaction_coordinator_document_gen.h" +#include "mongo/db/s/transaction_coordinator_metrics_observer.h" #include "mongo/db/s/transaction_coordinator_test_fixture.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" #include "mongo/util/log.h" +#include "mongo/util/tick_source_mock.h" namespace mongo { namespace { @@ -44,6 +49,8 @@ namespace { using PrepareResponse = txn::PrepareResponse; using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; +const Hours kLongFutureTimeout(8); + const StatusWith<BSONObj> kNoSuchTransaction = BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction); const StatusWith<BSONObj> kOk = BSON("ok" << 1); @@ -97,6 +104,42 @@ protected: ASSERT_FALSE(network()->hasReadyRequests()); } + void waitUntilCoordinatorDocIsPresent() { + DBDirectClient dbClient(operationContext()); + while (dbClient.findOne(NamespaceString::kTransactionCoordinatorsNamespace.ns(), Query()) + .isEmpty()) + ; + } + + /** + * Precondition: A coordinator document exists with or without a decision. + */ + void waitUntilCoordinatorDocHasDecision() { + DBDirectClient dbClient(operationContext()); + TransactionCoordinatorDocument doc; + do { + doc = TransactionCoordinatorDocument::parse( + IDLParserErrorContext("dummy"), + dbClient.findOne(NamespaceString::kTransactionCoordinatorsNamespace.ns(), Query())); + } while (!doc.getDecision()); + } + + void waitUntilNoCoordinatorDocIsPresent() { + DBDirectClient dbClient(operationContext()); + while (!dbClient.findOne(NamespaceString::kTransactionCoordinatorsNamespace.ns(), Query()) + .isEmpty()) + ; + } + + void waitUntilMessageSent() { + while (true) { + executor::NetworkInterfaceMock::InNetworkGuard networkGuard(network()); + if (network()->hasReadyRequests()) { + return; + } + } + } + LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; TxnNumber _txnNumber{1}; }; @@ -778,5 +821,982 @@ TEST_F(TransactionCoordinatorTest, coordinator.onCompletion().get(); } +class TransactionCoordinatorMetricsTest : public TransactionCoordinatorTestBase { +public: + void setUp() override { + TransactionCoordinatorTestBase::setUp(); + + getServiceContext()->setPreciseClockSource(stdx::make_unique<ClockSourceMock>()); + + auto tickSource = stdx::make_unique<TickSourceMock<Microseconds>>(); + tickSource->reset(1); + getServiceContext()->setTickSource(std::move(tickSource)); + } + + ServerTransactionCoordinatorsMetrics* metrics() { + return ServerTransactionCoordinatorsMetrics::get(getServiceContext()); + } + + ClockSourceMock* clockSource() { + return dynamic_cast<ClockSourceMock*>(getServiceContext()->getPreciseClockSource()); + } + + TickSourceMock<Microseconds>* tickSource() { + return dynamic_cast<TickSourceMock<Microseconds>*>(getServiceContext()->getTickSource()); + } + + struct Stats { + // Start times + boost::optional<Date_t> createTime; + boost::optional<Date_t> writingParticipantListStartTime; + boost::optional<Date_t> waitingForVotesStartTime; + boost::optional<Date_t> writingDecisionStartTime; + boost::optional<Date_t> waitingForDecisionAcksStartTime; + boost::optional<Date_t> deletingCoordinatorDocStartTime; + boost::optional<Date_t> endTime; + + // Durations + boost::optional<Microseconds> totalDuration; + boost::optional<Microseconds> twoPhaseCommitDuration; + boost::optional<Microseconds> writingParticipantListDuration; + boost::optional<Microseconds> waitingForVotesDuration; + boost::optional<Microseconds> writingDecisionDuration; + boost::optional<Microseconds> waitingForDecisionAcksDuration; + boost::optional<Microseconds> deletingCoordinatorDocDuration; + }; + + void checkStats(const SingleTransactionCoordinatorStats& stats, const Stats& expected) { + + // Start times + + if (expected.createTime) { + ASSERT_EQ(*expected.createTime, stats.getCreateTime()); + } + + if (expected.writingParticipantListStartTime) { + ASSERT(*expected.writingParticipantListStartTime == + stats.getWritingParticipantListStartTime()); + } + + if (expected.waitingForVotesStartTime) { + ASSERT(*expected.waitingForVotesStartTime == stats.getWaitingForVotesStartTime()); + } + + if (expected.writingDecisionStartTime) { + ASSERT(*expected.writingDecisionStartTime == stats.getWritingDecisionStartTime()); + } + + if (expected.waitingForDecisionAcksStartTime) { + ASSERT(*expected.waitingForDecisionAcksStartTime == + stats.getWaitingForDecisionAcksStartTime()); + } + + if (expected.deletingCoordinatorDocStartTime) { + ASSERT(*expected.deletingCoordinatorDocStartTime == + stats.getDeletingCoordinatorDocStartTime()); + } + + if (expected.endTime) { + ASSERT(*expected.endTime == stats.getEndTime()); + } + + // Durations + + if (expected.totalDuration) { + ASSERT_EQ(*expected.totalDuration, + stats.getDurationSinceCreation(tickSource(), tickSource()->getTicks())); + } + + if (expected.twoPhaseCommitDuration) { + ASSERT_EQ(*expected.twoPhaseCommitDuration, + stats.getTwoPhaseCommitDuration(tickSource(), tickSource()->getTicks())); + } + + if (expected.writingParticipantListDuration) { + ASSERT_EQ( + *expected.writingParticipantListDuration, + stats.getWritingParticipantListDuration(tickSource(), tickSource()->getTicks())); + } + + if (expected.waitingForVotesDuration) { + ASSERT_EQ(*expected.waitingForVotesDuration, + stats.getWaitingForVotesDuration(tickSource(), tickSource()->getTicks())); + } + + if (expected.writingDecisionDuration) { + ASSERT_EQ(*expected.writingDecisionDuration, + stats.getWritingDecisionDuration(tickSource(), tickSource()->getTicks())); + } + + if (expected.waitingForDecisionAcksDuration) { + ASSERT_EQ( + *expected.waitingForDecisionAcksDuration, + stats.getWaitingForDecisionAcksDuration(tickSource(), tickSource()->getTicks())); + } + + if (expected.deletingCoordinatorDocDuration) { + ASSERT_EQ( + *expected.deletingCoordinatorDocDuration, + stats.getDeletingCoordinatorDocDuration(tickSource(), tickSource()->getTicks())); + } + } + + struct Metrics { + // Totals + std::int64_t totalCreated{0}; + std::int64_t totalStartedTwoPhaseCommit{0}; + + // Current in steps + std::int64_t currentWritingParticipantList{0}; + std::int64_t currentWaitingForVotes{0}; + std::int64_t currentWritingDecision{0}; + std::int64_t currentWaitingForDecisionAcks{0}; + std::int64_t currentDeletingCoordinatorDoc{0}; + }; + + void checkMetrics(const Metrics& expectedMetrics) { + // Totals + ASSERT_EQ(expectedMetrics.totalCreated, metrics()->getTotalCreated()); + ASSERT_EQ(expectedMetrics.totalStartedTwoPhaseCommit, + metrics()->getTotalStartedTwoPhaseCommit()); + + // Current in steps + ASSERT_EQ(expectedMetrics.currentWritingParticipantList, + metrics()->getCurrentWritingParticipantList()); + ASSERT_EQ(expectedMetrics.currentWaitingForVotes, metrics()->getCurrentWaitingForVotes()); + ASSERT_EQ(expectedMetrics.currentWritingDecision, metrics()->getCurrentWritingDecision()); + ASSERT_EQ(expectedMetrics.currentWaitingForDecisionAcks, + metrics()->getCurrentWaitingForDecisionAcks()); + ASSERT_EQ(expectedMetrics.currentDeletingCoordinatorDoc, + metrics()->getCurrentDeletingCoordinatorDoc()); + } + + Date_t advanceClockSourceAndReturnNewNow() { + const auto newNow = Date_t::now(); + clockSource()->reset(newNow); + return newNow; + } +}; + +TEST_F(TransactionCoordinatorMetricsTest, SingleCoordinatorStatsSimpleTwoPhaseCommit) { + Stats expectedStats; + TransactionCoordinatorMetricsObserver coordinatorObserver; + const auto& stats = coordinatorObserver.getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + + // Stats are updated on onCreate. + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + coordinatorObserver.onCreate(metrics(), tickSource(), clockSource()->now()); + checkStats(stats, expectedStats); + + // Advancing the time causes the total duration to increase. + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + checkStats(stats, expectedStats); + + // Stats are updated on onStartWritingParticipantList. + + expectedStats.writingParticipantListStartTime = advanceClockSourceAndReturnNewNow(); + expectedStats.twoPhaseCommitDuration = Microseconds(0); + expectedStats.writingParticipantListDuration = Microseconds(0); + coordinatorObserver.onStartWritingParticipantList( + metrics(), tickSource(), clockSource()->now()); + checkStats(stats, expectedStats); + + // Advancing the time causes the total duration, two-phase commit duration, and duration writing + // participant list to increase. + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.writingParticipantListDuration = + *expectedStats.writingParticipantListDuration + Microseconds(100); + checkStats(stats, expectedStats); + + // Stats are updated on onStartWaitingForVotes. + + expectedStats.waitingForVotesStartTime = advanceClockSourceAndReturnNewNow(); + expectedStats.waitingForVotesDuration = Microseconds(0); + coordinatorObserver.onStartWaitingForVotes(metrics(), tickSource(), clockSource()->now()); + checkStats(stats, expectedStats); + + // Advancing the time causes only the total duration, two-phase commit duration, and duration + // waiting for votes to increase. + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.waitingForVotesDuration = + *expectedStats.waitingForVotesDuration + Microseconds(100); + checkStats(stats, expectedStats); + + // Stats are updated on onStartWritingDecision. + + expectedStats.writingDecisionStartTime = advanceClockSourceAndReturnNewNow(); + expectedStats.writingDecisionDuration = Microseconds(0); + coordinatorObserver.onStartWritingDecision(metrics(), tickSource(), clockSource()->now()); + checkStats(stats, expectedStats); + + // Advancing the time causes only the total duration, two-phase commit duration, and duration + // writing decision to increase. + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.writingDecisionDuration = + *expectedStats.writingDecisionDuration + Microseconds(100); + checkStats(stats, expectedStats); + + // Stats are updated on onStartWaitingForDecisionAcks. + + expectedStats.waitingForDecisionAcksStartTime = advanceClockSourceAndReturnNewNow(); + expectedStats.waitingForDecisionAcksDuration = Microseconds(0); + coordinatorObserver.onStartWaitingForDecisionAcks( + metrics(), tickSource(), clockSource()->now()); + checkStats(stats, expectedStats); + + // Advancing the time causes only the total duration, two-phase commit duration, and duration + // waiting for decision acks to increase. + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.waitingForDecisionAcksDuration = + *expectedStats.waitingForDecisionAcksDuration + Microseconds(100); + checkStats(stats, expectedStats); + + // Stats are updated on onStartDeletingCoordinatorDoc. + + expectedStats.deletingCoordinatorDocStartTime = advanceClockSourceAndReturnNewNow(); + expectedStats.deletingCoordinatorDocDuration = Microseconds(0); + coordinatorObserver.onStartDeletingCoordinatorDoc( + metrics(), tickSource(), clockSource()->now()); + checkStats(stats, expectedStats); + + // Advancing the time causes only the total duration, two-phase commit duration, and duration + // deleting the coordinator doc to increase. + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.deletingCoordinatorDocDuration = + *expectedStats.deletingCoordinatorDocDuration + Microseconds(100); + checkStats(stats, expectedStats); + + // Stats are updated on onEnd. + + expectedStats.endTime = advanceClockSourceAndReturnNewNow(); + coordinatorObserver.onEnd(metrics(), + tickSource(), + clockSource()->now(), + TransactionCoordinator::Step::kDeletingCoordinatorDoc); + checkStats(stats, expectedStats); + + // Once onEnd has been called, advancing the time does not cause any duration to increase. + tickSource()->advance(Microseconds(100)); + checkStats(stats, expectedStats); +} + +TEST_F(TransactionCoordinatorMetricsTest, ServerWideMetricsSimpleTwoPhaseCommit) { + TransactionCoordinatorMetricsObserver coordinatorObserver; + Metrics expectedMetrics; + checkMetrics(expectedMetrics); + + // Metrics are updated on onCreate. + expectedMetrics.totalCreated++; + coordinatorObserver.onCreate(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + // Metrics are updated on onStartWritingParticipantList. + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentWritingParticipantList++; + coordinatorObserver.onStartWritingParticipantList( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + // Metrics are updated on onStartWaitingForVotes. + expectedMetrics.currentWritingParticipantList--; + expectedMetrics.currentWaitingForVotes++; + coordinatorObserver.onStartWaitingForVotes(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + // Metrics are updated on onStartWritingDecision. + expectedMetrics.currentWaitingForVotes--; + expectedMetrics.currentWritingDecision++; + coordinatorObserver.onStartWritingDecision(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + // Metrics are updated on onStartWaitingForDecisionAcks. + expectedMetrics.currentWritingDecision--; + expectedMetrics.currentWaitingForDecisionAcks++; + coordinatorObserver.onStartWaitingForDecisionAcks( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + // Metrics are updated on onStartDeletingCoordinatorDoc. + expectedMetrics.currentWaitingForDecisionAcks--; + expectedMetrics.currentDeletingCoordinatorDoc++; + coordinatorObserver.onStartDeletingCoordinatorDoc( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + // Metrics are updated on onEnd. + expectedMetrics.currentDeletingCoordinatorDoc--; + coordinatorObserver.onEnd(metrics(), + tickSource(), + clockSource()->now(), + TransactionCoordinator::Step::kDeletingCoordinatorDoc); + checkMetrics(expectedMetrics); +} + +TEST_F(TransactionCoordinatorMetricsTest, ServerWideMetricsSimpleTwoPhaseCommitTwoCoordinators) { + TransactionCoordinatorMetricsObserver coordinatorObserver1; + TransactionCoordinatorMetricsObserver coordinatorObserver2; + Metrics expectedMetrics; + checkMetrics(expectedMetrics); + + // Increment each coordinator one step at a time. + + expectedMetrics.totalCreated++; + coordinatorObserver1.onCreate(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.totalCreated++; + coordinatorObserver2.onCreate(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentWritingParticipantList++; + coordinatorObserver1.onStartWritingParticipantList( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentWritingParticipantList++; + coordinatorObserver2.onStartWritingParticipantList( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentWritingParticipantList--; + expectedMetrics.currentWaitingForVotes++; + coordinatorObserver1.onStartWaitingForVotes(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentWritingParticipantList--; + expectedMetrics.currentWaitingForVotes++; + coordinatorObserver2.onStartWaitingForVotes(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentWaitingForVotes--; + expectedMetrics.currentWritingDecision++; + coordinatorObserver1.onStartWritingDecision(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentWaitingForVotes--; + expectedMetrics.currentWritingDecision++; + coordinatorObserver2.onStartWritingDecision(metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentWritingDecision--; + expectedMetrics.currentWaitingForDecisionAcks++; + coordinatorObserver1.onStartWaitingForDecisionAcks( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentWritingDecision--; + expectedMetrics.currentWaitingForDecisionAcks++; + coordinatorObserver2.onStartWaitingForDecisionAcks( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentWaitingForDecisionAcks--; + expectedMetrics.currentDeletingCoordinatorDoc++; + coordinatorObserver1.onStartDeletingCoordinatorDoc( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentWaitingForDecisionAcks--; + expectedMetrics.currentDeletingCoordinatorDoc++; + coordinatorObserver2.onStartDeletingCoordinatorDoc( + metrics(), tickSource(), clockSource()->now()); + checkMetrics(expectedMetrics); + + expectedMetrics.currentDeletingCoordinatorDoc--; + coordinatorObserver1.onEnd(metrics(), + tickSource(), + clockSource()->now(), + TransactionCoordinator::Step::kDeletingCoordinatorDoc); + checkMetrics(expectedMetrics); + + expectedMetrics.currentDeletingCoordinatorDoc--; + coordinatorObserver2.onEnd(metrics(), + tickSource(), + clockSource()->now(), + TransactionCoordinator::Step::kDeletingCoordinatorDoc); + checkMetrics(expectedMetrics); +} + +TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) { + Stats expectedStats; + Metrics expectedMetrics; + + checkMetrics(expectedMetrics); + + log() << "Create the coordinator."; + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + expectedMetrics.totalCreated++; + + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + Date_t::max()); + const auto& stats = + coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + log() << "Start two-phase commit (allow the coordinator to progress to writing the participant " + "list)."; + + expectedStats.writingParticipantListStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = Microseconds(0); + expectedStats.writingParticipantListDuration = Microseconds(0); + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentWritingParticipantList++; + + setGlobalFailPoint("hangBeforeWaitingForParticipantListWriteConcern", + BSON("mode" + << "alwaysOn" + << "data" + << BSON("useUninterruptibleSleep" << 1))); + coordinator.runCommit(kTwoShardIdList); + waitUntilCoordinatorDocIsPresent(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + log() << "Allow the coordinator to progress to waiting for votes."; + + expectedStats.waitingForVotesStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.writingParticipantListDuration = + *expectedStats.writingParticipantListDuration + Microseconds(100); + expectedStats.waitingForVotesDuration = Microseconds(0); + expectedMetrics.currentWritingParticipantList--; + expectedMetrics.currentWaitingForVotes++; + + setGlobalFailPoint("hangBeforeWaitingForParticipantListWriteConcern", + BSON("mode" + << "off")); + waitUntilMessageSent(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + log() << "Allow the coordinator to progress to writing the decision."; + + expectedStats.writingDecisionStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.waitingForVotesDuration = + *expectedStats.waitingForVotesDuration + Microseconds(100); + expectedStats.writingDecisionDuration = Microseconds(0); + expectedMetrics.currentWaitingForVotes--; + expectedMetrics.currentWritingDecision++; + + setGlobalFailPoint("hangBeforeWaitingForDecisionWriteConcern", + BSON("mode" + << "alwaysOn" + << "data" + << BSON("useUninterruptibleSleep" << 1))); + // Respond to the second prepare request in a separate thread, because the coordinator will + // hijack that thread to run its continuation. + assertPrepareSentAndRespondWithSuccess(); + auto future = launchAsync([this] { assertPrepareSentAndRespondWithSuccess(); }); + waitUntilCoordinatorDocHasDecision(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + log() << "Allow the coordinator to progress to waiting for acks."; + + expectedStats.waitingForDecisionAcksStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.writingDecisionDuration = + *expectedStats.writingDecisionDuration + Microseconds(100); + expectedStats.waitingForDecisionAcksDuration = Microseconds(0); + expectedMetrics.currentWritingDecision--; + expectedMetrics.currentWaitingForDecisionAcks++; + + setGlobalFailPoint("hangBeforeWaitingForDecisionWriteConcern", + BSON("mode" + << "off")); + // The last thing the coordinator will do on the hijacked prepare response thread is schedule + // the commitTransaction network requests. + future.timed_get(kLongFutureTimeout); + waitUntilMessageSent(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + log() << "Allow the coordinator to progress to deleting the coordinator doc."; + + expectedStats.deletingCoordinatorDocStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.waitingForDecisionAcksDuration = + *expectedStats.waitingForDecisionAcksDuration + Microseconds(100); + expectedStats.deletingCoordinatorDocDuration = Microseconds(0); + expectedMetrics.currentWaitingForDecisionAcks--; + expectedMetrics.currentDeletingCoordinatorDoc++; + + setGlobalFailPoint("hangAfterDeletingCoordinatorDoc", + BSON("mode" + << "alwaysOn" + << "data" + << BSON("useUninterruptibleSleep" << 1))); + // Respond to the second commit request in a separate thread, because the coordinator will + // hijack that thread to run its continuation. + assertCommitSentAndRespondWithSuccess(); + future = launchAsync([this] { assertCommitSentAndRespondWithSuccess(); }); + waitUntilNoCoordinatorDocIsPresent(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + log() << "Allow the coordinator to complete."; + + expectedStats.endTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.twoPhaseCommitDuration = + *expectedStats.twoPhaseCommitDuration + Microseconds(100); + expectedStats.deletingCoordinatorDocDuration = + *expectedStats.deletingCoordinatorDocDuration + Microseconds(100); + expectedMetrics.currentDeletingCoordinatorDoc--; + + setGlobalFailPoint("hangAfterDeletingCoordinatorDoc", + BSON("mode" + << "off")); + // The last thing the coordinator will do on the hijacked commit response thread is signal the + // coordinator's completion. + future.timed_get(kLongFutureTimeout); + coordinator.onCompletion().get(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); +} + +TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) { + Stats expectedStats; + Metrics expectedMetrics; + + checkMetrics(expectedMetrics); + + // Create the coordinator. + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + expectedMetrics.totalCreated++; + + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + Date_t::max()); + const auto& stats = + coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Cancel the coordinator. + + expectedStats.endTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + + coordinator.cancelIfCommitNotYetStarted(); + coordinator.onCompletion().get(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); +} + +TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordinatorIsInactive) { + Stats expectedStats; + Metrics expectedMetrics; + + checkMetrics(expectedMetrics); + + // Create the coordinator. + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + expectedMetrics.totalCreated++; + + auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); + auto awsPtr = aws.get(); + TransactionCoordinator coordinator( + getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + const auto& stats = + coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Shut down the coordinator's AWS. + + expectedStats.endTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + + awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"}); + coordinator.onCompletion().get(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); +} + +TEST_F(TransactionCoordinatorMetricsTest, + CoordinatorsAWSIsShutDownWhileCoordinatorIsWritingParticipantList) { + Stats expectedStats; + Metrics expectedMetrics; + + checkMetrics(expectedMetrics); + + // Create the coordinator. + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + expectedMetrics.totalCreated++; + + auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); + auto awsPtr = aws.get(); + TransactionCoordinator coordinator( + getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + const auto& stats = + coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Wait until the coordinator is writing the participant list. + + expectedStats.writingParticipantListStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.writingParticipantListDuration = Microseconds(0); + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentWritingParticipantList++; + + setGlobalFailPoint("hangBeforeWaitingForParticipantListWriteConcern", + BSON("mode" + << "alwaysOn")); + coordinator.runCommit(kTwoShardIdList); + waitUntilCoordinatorDocIsPresent(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Shut down the coordinator's AWS. + + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.writingParticipantListDuration = + *expectedStats.writingParticipantListDuration + Microseconds(100); + expectedMetrics.currentWritingParticipantList--; + + awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"}); + coordinator.onCompletion().get(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Clear the failpoint before the next test. + setGlobalFailPoint("hangBeforeWaitingForParticipantListWriteConcern", + BSON("mode" + << "off")); +} + +TEST_F(TransactionCoordinatorMetricsTest, + CoordinatorsAWSIsShutDownWhileCoordinatorIsWaitingForVotes) { + Stats expectedStats; + Metrics expectedMetrics; + + checkMetrics(expectedMetrics); + + // Create the coordinator. + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + expectedMetrics.totalCreated++; + + auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); + auto awsPtr = aws.get(); + TransactionCoordinator coordinator( + getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + const auto& stats = + coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Wait until the coordinator is waiting for votes. + + expectedStats.waitingForVotesStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.waitingForVotesDuration = Microseconds(0); + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentWaitingForVotes++; + + coordinator.runCommit(kTwoShardIdList); + waitUntilMessageSent(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Shut down the coordinator's AWS. + + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.waitingForVotesDuration = + *expectedStats.waitingForVotesDuration + Microseconds(100); + expectedMetrics.currentWaitingForVotes--; + + awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"}); + network()->enterNetwork(); + network()->runReadyNetworkOperations(); + network()->exitNetwork(); + coordinator.onCompletion().get(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); +} + +TEST_F(TransactionCoordinatorMetricsTest, + CoordinatorsAWSIsShutDownWhileCoordinatorIsWritingDecision) { + Stats expectedStats; + Metrics expectedMetrics; + + checkMetrics(expectedMetrics); + + // Create the coordinator. + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + expectedMetrics.totalCreated++; + + auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); + auto awsPtr = aws.get(); + TransactionCoordinator coordinator( + getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + const auto& stats = + coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Wait until the coordinator is writing the decision. + + expectedStats.writingDecisionStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.writingDecisionDuration = Microseconds(0); + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentWritingDecision++; + + setGlobalFailPoint("hangBeforeWaitingForDecisionWriteConcern", + BSON("mode" + << "alwaysOn")); + coordinator.runCommit(kTwoShardIdList); + // Respond to the second prepare request in a separate thread, because the coordinator will + // hijack that thread to run its continuation. + assertPrepareSentAndRespondWithSuccess(); + auto future = launchAsync([this] { assertPrepareSentAndRespondWithSuccess(); }); + waitUntilCoordinatorDocHasDecision(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Shut down the coordinator's AWS. + + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.writingDecisionDuration = + *expectedStats.writingDecisionDuration + Microseconds(100); + expectedMetrics.currentWritingDecision--; + + awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"}); + coordinator.onCompletion().get(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Clear the failpoint before the next test. + setGlobalFailPoint("hangBeforeWaitingForDecisionWriteConcern", + BSON("mode" + << "off")); +} + +TEST_F(TransactionCoordinatorMetricsTest, + CoordinatorsAWSIsShutDownWhileCoordinatorIsWaitingForDecisionAcks) { + Stats expectedStats; + Metrics expectedMetrics; + + checkMetrics(expectedMetrics); + + // Create the coordinator. + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + expectedMetrics.totalCreated++; + + auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); + auto awsPtr = aws.get(); + TransactionCoordinator coordinator( + getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + const auto& stats = + coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Wait until the coordinator is waiting for decision acks. + + expectedStats.waitingForDecisionAcksStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.waitingForDecisionAcksDuration = Microseconds(0); + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentWaitingForDecisionAcks++; + + coordinator.runCommit(kTwoShardIdList); + // Respond to the second prepare request in a separate thread, because the coordinator will + // hijack that thread to run its continuation. + assertPrepareSentAndRespondWithSuccess(); + auto future = launchAsync([this] { assertPrepareSentAndRespondWithSuccess(); }); + // The last thing the coordinator will do on the hijacked prepare response thread is schedule + // the commitTransaction network requests. + future.timed_get(kLongFutureTimeout); + waitUntilMessageSent(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Shut down the coordinator's AWS. + + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.waitingForDecisionAcksDuration = + *expectedStats.waitingForDecisionAcksDuration + Microseconds(100); + expectedMetrics.currentWaitingForDecisionAcks--; + + awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"}); + network()->enterNetwork(); + network()->runReadyNetworkOperations(); + network()->exitNetwork(); + coordinator.onCompletion().get(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); +} + +TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordinatorIsDeletingDoc) { + Stats expectedStats; + Metrics expectedMetrics; + + checkMetrics(expectedMetrics); + + // Create the coordinator. + + expectedStats.createTime = advanceClockSourceAndReturnNewNow(); + expectedStats.totalDuration = Microseconds(0); + expectedMetrics.totalCreated++; + + auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); + auto awsPtr = aws.get(); + TransactionCoordinator coordinator( + getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + const auto& stats = + coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Wait until the coordinator is deleting the coordinator doc. + + expectedStats.deletingCoordinatorDocStartTime = advanceClockSourceAndReturnNewNow(); + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.deletingCoordinatorDocDuration = Microseconds(0); + expectedMetrics.totalStartedTwoPhaseCommit++; + expectedMetrics.currentDeletingCoordinatorDoc++; + + setGlobalFailPoint("hangAfterDeletingCoordinatorDoc", + BSON("mode" + << "alwaysOn")); + coordinator.runCommit(kTwoShardIdList); + // Respond to the second prepare request in a separate thread, because the coordinator will + // hijack that thread to run its continuation. + assertPrepareSentAndRespondWithSuccess(); + auto future = launchAsync([this] { assertPrepareSentAndRespondWithSuccess(); }); + // The last thing the coordinator will do on the hijacked prepare response thread is schedule + // the commitTransaction network requests. + future.timed_get(kLongFutureTimeout); + waitUntilMessageSent(); + // Respond to the second commit request in a separate thread, because the coordinator will + // hijack that thread to run its continuation. + assertCommitSentAndRespondWithSuccess(); + future = launchAsync([this] { assertCommitSentAndRespondWithSuccess(); }); + waitUntilNoCoordinatorDocIsPresent(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Shut down the coordinator's AWS. + + tickSource()->advance(Microseconds(100)); + expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100); + expectedStats.deletingCoordinatorDocDuration = + *expectedStats.deletingCoordinatorDocDuration + Microseconds(100); + expectedMetrics.currentDeletingCoordinatorDoc--; + + awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"}); + // The last thing the coordinator will do on the hijacked commit response thread is signal the + // coordinator's completion. + future.timed_get(kLongFutureTimeout); + coordinator.onCompletion().get(); + + checkStats(stats, expectedStats); + checkMetrics(expectedMetrics); + + // Clear the failpoint before the next test. + setGlobalFailPoint("hangAfterDeletingCoordinatorDoc", + BSON("mode" + << "off")); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp index 27c21fc807d..05b9ff6437b 100644 --- a/src/mongo/db/s/transaction_coordinator_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_util.cpp @@ -50,6 +50,7 @@ namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern); MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern); +MONGO_FAIL_POINT_DEFINE(hangAfterDeletingCoordinatorDoc); MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList); MONGO_FAIL_POINT_DEFINE(hangBeforeWritingDecision); @@ -172,10 +173,15 @@ void persistParticipantListBlocking(OperationContext* opCtx, LOG(3) << "Wrote participant list for " << lsid.getId() << ':' << txnNumber; - if (MONGO_FAIL_POINT(hangBeforeWaitingForParticipantListWriteConcern)) { + MONGO_FAIL_POINT_BLOCK(hangBeforeWaitingForParticipantListWriteConcern, fp) { LOG(0) << "Hit hangBeforeWaitingForParticipantListWriteConcern failpoint"; - MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( - opCtx, hangBeforeWaitingForParticipantListWriteConcern); + const BSONObj& data = fp.getData(); + if (!data["useUninterruptibleSleep"].eoo()) { + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForParticipantListWriteConcern); + } else { + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( + opCtx, hangBeforeWaitingForParticipantListWriteConcern); + } } WriteConcernResult unusedWCResult; @@ -373,10 +379,15 @@ void persistDecisionBlocking(OperationContext* opCtx, LOG(3) << "Wrote decision " << (commitTimestamp ? "commit" : "abort") << " for " << lsid.getId() << ':' << txnNumber; - if (MONGO_FAIL_POINT(hangBeforeWaitingForDecisionWriteConcern)) { + MONGO_FAIL_POINT_BLOCK(hangBeforeWaitingForDecisionWriteConcern, fp) { LOG(0) << "Hit hangBeforeWaitingForDecisionWriteConcern failpoint"; - MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, - hangBeforeWaitingForDecisionWriteConcern); + const BSONObj& data = fp.getData(); + if (!data["useUninterruptibleSleep"].eoo()) { + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForDecisionWriteConcern); + } else { + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED( + opCtx, hangBeforeWaitingForDecisionWriteConcern); + } } WriteConcernResult unusedWCResult; @@ -505,6 +516,16 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, } LOG(3) << "Deleted coordinator doc for " << lsid.getId() << ':' << txnNumber; + + MONGO_FAIL_POINT_BLOCK(hangAfterDeletingCoordinatorDoc, fp) { + LOG(0) << "Hit hangAfterDeletingCoordinatorDoc failpoint"; + const BSONObj& data = fp.getData(); + if (!data["useUninterruptibleSleep"].eoo()) { + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterDeletingCoordinatorDoc); + } else { + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangAfterDeletingCoordinatorDoc); + } + } } } // namespace |