summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2019-05-29 11:44:12 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2019-06-07 14:07:49 -0400
commit0eb92d645b6f132c0c9019502d3293631a4972ed (patch)
tree24bfe561d04236572a677affa9bd6d49755d2e70 /src/mongo/db/s
parent1dbfa74ff20b1f0421fc70740f3ef8a8f85be89b (diff)
downloadmongo-0eb92d645b6f132c0c9019502d3293631a4972ed.tar.gz
SERVER-41179 Observe the coordinator's state transitions and update the corresponding stats and metrics
(cherry picked from commit d14cfb93a7fd33edf679f7fcd68c46b8d2be0b6b)
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/server_transaction_coordinators_metrics.cpp10
-rw-r--r--src/mongo/db/s/server_transaction_coordinators_metrics.h7
-rw-r--r--src/mongo/db/s/single_transaction_coordinator_stats.cpp73
-rw-r--r--src/mongo/db/s/single_transaction_coordinator_stats.h122
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp66
-rw-r--r--src/mongo/db/s/transaction_coordinator.h27
-rw-r--r--src/mongo/db/s/transaction_coordinator_metrics_observer.cpp184
-rw-r--r--src/mongo/db/s/transaction_coordinator_metrics_observer.h124
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp1020
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.cpp33
11 files changed, 1636 insertions, 31 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 80233815359..be69b3fbcac 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -126,6 +126,7 @@ env.Library(
'transaction_coordinator_catalog.cpp',
'transaction_coordinator_factory_mongod.cpp',
'transaction_coordinator_futures_util.cpp',
+ 'transaction_coordinator_metrics_observer.cpp',
'transaction_coordinator_service.cpp',
'transaction_coordinator_structures.cpp',
'transaction_coordinator_util.cpp',
diff --git a/src/mongo/db/s/server_transaction_coordinators_metrics.cpp b/src/mongo/db/s/server_transaction_coordinators_metrics.cpp
index e18d581fbef..5bac2c5c211 100644
--- a/src/mongo/db/s/server_transaction_coordinators_metrics.cpp
+++ b/src/mongo/db/s/server_transaction_coordinators_metrics.cpp
@@ -104,4 +104,14 @@ void ServerTransactionCoordinatorsMetrics::decrementCurrentWaitingForDecisionAck
_totalWaitingForDecisionAcks.fetchAndSubtract(1);
}
+std::int64_t ServerTransactionCoordinatorsMetrics::getCurrentDeletingCoordinatorDoc() {
+ return _totalDeletingCoordinatorDoc.load();
+}
+void ServerTransactionCoordinatorsMetrics::incrementCurrentDeletingCoordinatorDoc() {
+ _totalDeletingCoordinatorDoc.fetchAndAdd(1);
+}
+void ServerTransactionCoordinatorsMetrics::decrementCurrentDeletingCoordinatorDoc() {
+ _totalDeletingCoordinatorDoc.fetchAndSubtract(1);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/server_transaction_coordinators_metrics.h b/src/mongo/db/s/server_transaction_coordinators_metrics.h
index 4cc19915256..9218e5584b4 100644
--- a/src/mongo/db/s/server_transaction_coordinators_metrics.h
+++ b/src/mongo/db/s/server_transaction_coordinators_metrics.h
@@ -70,6 +70,10 @@ public:
void incrementCurrentWaitingForDecisionAcks();
void decrementCurrentWaitingForDecisionAcks();
+ std::int64_t getCurrentDeletingCoordinatorDoc();
+ void incrementCurrentDeletingCoordinatorDoc();
+ void decrementCurrentDeletingCoordinatorDoc();
+
private:
// The total number of transaction coordinators created on this process since the process's
// inception.
@@ -90,6 +94,9 @@ private:
// The number of transaction coordinators currently in the "waiting for decision acks" phase.
AtomicWord<std::int64_t> _totalWaitingForDecisionAcks{0};
+
+ // The number of transaction coordinators currently in the "deleting coordinator doc" phase.
+ AtomicWord<std::int64_t> _totalDeletingCoordinatorDoc{0};
};
} // namespace mongo
diff --git a/src/mongo/db/s/single_transaction_coordinator_stats.cpp b/src/mongo/db/s/single_transaction_coordinator_stats.cpp
index 7f84addcb00..2fc5db4e146 100644
--- a/src/mongo/db/s/single_transaction_coordinator_stats.cpp
+++ b/src/mongo/db/s/single_transaction_coordinator_stats.cpp
@@ -33,17 +33,17 @@
namespace mongo {
-void SingleTransactionCoordinatorStats::setStartTime(TickSource::Tick curTick,
- Date_t curWallClockTime) {
- invariant(!_startTime);
+void SingleTransactionCoordinatorStats::setCreateTime(TickSource::Tick curTick,
+ Date_t curWallClockTime) {
+ invariant(!_createTime);
- _startTime = curTick;
- _startWallClockTime = curWallClockTime;
+ _createTime = curTick;
+ _createWallClockTime = curWallClockTime;
}
void SingleTransactionCoordinatorStats::setEndTime(TickSource::Tick curTick,
Date_t curWallClockTime) {
- invariant(_startTime);
+ invariant(_createTime);
invariant(!_endTime);
_endTime = curTick;
@@ -52,7 +52,7 @@ void SingleTransactionCoordinatorStats::setEndTime(TickSource::Tick curTick,
void SingleTransactionCoordinatorStats::setWritingParticipantListStartTime(
TickSource::Tick curTick, Date_t curWallClockTime) {
- invariant(_startTime);
+ invariant(_createTime);
invariant(!_writingParticipantListStartTime);
_writingParticipantListStartTime = curTick;
@@ -86,14 +86,32 @@ void SingleTransactionCoordinatorStats::setWaitingForDecisionAcksStartTime(
_waitingForDecisionAcksStartWallClockTime = curWallClockTime;
}
-Microseconds SingleTransactionCoordinatorStats::getDuration(TickSource* tickSource,
- TickSource::Tick curTick) const {
- invariant(_startTime);
+void SingleTransactionCoordinatorStats::setDeletingCoordinatorDocStartTime(
+ TickSource::Tick curTick, Date_t curWallClockTime) {
+ invariant(!_deletingCoordinatorDocStartTime);
+
+ _deletingCoordinatorDocStartTime = curTick;
+ _deletingCoordinatorDocStartWallClockTime = curWallClockTime;
+}
+
+Microseconds SingleTransactionCoordinatorStats::getDurationSinceCreation(
+ TickSource* tickSource, TickSource::Tick curTick) const {
+ invariant(_createTime);
if (_endTime) {
- return tickSource->ticksTo<Microseconds>(_endTime - _startTime);
+ return tickSource->ticksTo<Microseconds>(_endTime - _createTime);
}
- return tickSource->ticksTo<Microseconds>(curTick - _startTime);
+ return tickSource->ticksTo<Microseconds>(curTick - _createTime);
+}
+
+Microseconds SingleTransactionCoordinatorStats::getTwoPhaseCommitDuration(
+ TickSource* tickSource, TickSource::Tick curTick) const {
+ invariant(_writingParticipantListStartTime);
+
+ if (_endTime) {
+ return tickSource->ticksTo<Microseconds>(_endTime - _writingParticipantListStartTime);
+ }
+ return tickSource->ticksTo<Microseconds>(curTick - _writingParticipantListStartTime);
}
Microseconds SingleTransactionCoordinatorStats::getWritingParticipantListDuration(
@@ -104,6 +122,11 @@ Microseconds SingleTransactionCoordinatorStats::getWritingParticipantListDuratio
return tickSource->ticksTo<Microseconds>(_waitingForVotesStartTime -
_writingParticipantListStartTime);
}
+
+ if (_endTime) {
+ return tickSource->ticksTo<Microseconds>(_endTime - _writingParticipantListStartTime);
+ }
+
return tickSource->ticksTo<Microseconds>(curTick - _writingParticipantListStartTime);
}
@@ -115,6 +138,11 @@ Microseconds SingleTransactionCoordinatorStats::getWaitingForVotesDuration(
return tickSource->ticksTo<Microseconds>(_writingDecisionStartTime -
_waitingForVotesStartTime);
}
+
+ if (_endTime) {
+ return tickSource->ticksTo<Microseconds>(_endTime - _waitingForVotesStartTime);
+ }
+
return tickSource->ticksTo<Microseconds>(curTick - _waitingForVotesStartTime);
}
@@ -126,6 +154,11 @@ Microseconds SingleTransactionCoordinatorStats::getWritingDecisionDuration(
return tickSource->ticksTo<Microseconds>(_waitingForDecisionAcksStartTime -
_writingDecisionStartTime);
}
+
+ if (_endTime) {
+ return tickSource->ticksTo<Microseconds>(_endTime - _writingDecisionStartTime);
+ }
+
return tickSource->ticksTo<Microseconds>(curTick - _writingDecisionStartTime);
}
@@ -133,10 +166,26 @@ Microseconds SingleTransactionCoordinatorStats::getWaitingForDecisionAcksDuratio
TickSource* tickSource, TickSource::Tick curTick) const {
invariant(_waitingForDecisionAcksStartTime);
+ if (_deletingCoordinatorDocStartTime) {
+ return tickSource->ticksTo<Microseconds>(_deletingCoordinatorDocStartTime -
+ _waitingForDecisionAcksStartTime);
+ }
+
if (_endTime) {
return tickSource->ticksTo<Microseconds>(_endTime - _waitingForDecisionAcksStartTime);
}
+
return tickSource->ticksTo<Microseconds>(curTick - _waitingForDecisionAcksStartTime);
}
+Microseconds SingleTransactionCoordinatorStats::getDeletingCoordinatorDocDuration(
+ TickSource* tickSource, TickSource::Tick curTick) const {
+ invariant(_deletingCoordinatorDocStartTime);
+
+ if (_endTime) {
+ return tickSource->ticksTo<Microseconds>(_endTime - _deletingCoordinatorDocStartTime);
+ }
+
+ return tickSource->ticksTo<Microseconds>(curTick - _deletingCoordinatorDocStartTime);
+}
} // namespace mongo
diff --git a/src/mongo/db/s/single_transaction_coordinator_stats.h b/src/mongo/db/s/single_transaction_coordinator_stats.h
index 030f64fea44..05c75d7c076 100644
--- a/src/mongo/db/s/single_transaction_coordinator_stats.h
+++ b/src/mongo/db/s/single_transaction_coordinator_stats.h
@@ -52,12 +52,12 @@ public:
*
* Can only be called once.
*/
- void setStartTime(TickSource::Tick curTick, Date_t curWallClockTime);
+ void setCreateTime(TickSource::Tick curTick, Date_t curWallClockTime);
/**
* Sets the time the transaction coordinator was destroyed.
*
- * Can only be called once, and must be called after setStartTime.
+ * Can only be called once, and must be called after setCreateTime.
*/
void setEndTime(TickSource::Tick curTick, Date_t curWallClockTime);
@@ -65,7 +65,7 @@ public:
* Sets the time the transaction coordinator wrote the participant list and started waiting for
* the participant list to become majority-committed.
*
- * Can only be called once, and must be called after setStartTime.
+ * Can only be called once, and must be called after setCreateTime.
*/
void setWritingParticipantListStartTime(TickSource::Tick curTick, Date_t curWallClockTime);
@@ -93,17 +93,99 @@ public:
*/
void setWaitingForDecisionAcksStartTime(TickSource::Tick curTick, Date_t curWallClockTime);
+ /**
+ * Sets the time the transaction coordinator deleted its durable state.
+ *
+ * Can only be called once, and must be called after setWaitingForDecisionAcksStartTime.
+ */
+ void setDeletingCoordinatorDocStartTime(TickSource::Tick curTick, Date_t curWallClockTime);
+
//
// Getters
//
/**
- * If the end time has been set, returns the duration between the start time and end time, else
- * returns the duration between the start time and curTick.
+ * Returns the time the coordinator was created.
+ *
+ * Must be called after setCreateTime.
+ */
+ Date_t getCreateTime() const {
+ return _createWallClockTime;
+ }
+
+ /**
+ * Returns the time the coordinator was destroyed.
+ *
+ * Must be called after setCreateTime.
+ */
+ Date_t getEndTime() const {
+ return _endWallClockTime;
+ }
+
+ /**
+ * Returns the time the coordinator started writing the participant list. Note, this is also the
+ * two-phase commit start time.
*
- * Must be called after setStartTime, but can be called any number of times.
+ * Must be called after setWritingParticipantListStartTime.
*/
- Microseconds getDuration(TickSource* tickSource, TickSource::Tick curTick) const;
+ Date_t getWritingParticipantListStartTime() const {
+ return _writingParticipantListStartWallClockTime;
+ }
+
+ /**
+ * Returns the time the coordinator started sending 'prepare' and collecting votes.
+ *
+ * Must be called after setWaitingForVotesStartTime.
+ */
+ Date_t getWaitingForVotesStartTime() const {
+ return _waitingForVotesStartWallClockTime;
+ }
+
+ /**
+ * Returns the time the coordinator started making the decision durable.
+ *
+ * Must be called after setWritingDecisionStartTime.
+ */
+ Date_t getWritingDecisionStartTime() const {
+ return _writingDecisionStartWallClockTime;
+ }
+
+ /**
+ * Returns the time the coordinator started sending the decision and waiting for
+ * acknowledgments.
+ *
+ * Must be called after setWaitingForDecisionAcksStartTime.
+ */
+ Date_t getWaitingForDecisionAcksStartTime() const {
+ return _waitingForDecisionAcksStartWallClockTime;
+ }
+
+ /**
+ * Returns the time the coordinator started deleting its durable state.
+ *
+ * Must be called after setDeletingCoordinatorDocStartTime.
+ */
+ Date_t getDeletingCoordinatorDocStartTime() const {
+ return _deletingCoordinatorDocStartWallClockTime;
+ }
+
+ /**
+ * If the end time has been set, returns the duration between the create time and end time, else
+ * returns the duration between the create time and curTick.
+ *
+ * Must be called after setCreateTime, but can be called any number of times.
+ */
+ Microseconds getDurationSinceCreation(TickSource* tickSource, TickSource::Tick curTick) const;
+
+ /**
+ * If the end time has been set, returns the duration between the writing participant list start
+ * time and end time, else returns the duration between the writing participant list start time
+ * and curTick.
+ *
+ * Must be called after setWritingParticipantListStartTime, but can be called any number of
+ * times.
+ */
+ Microseconds getTwoPhaseCommitDuration(TickSource* tickSource, TickSource::Tick curTick) const;
/**
* If the waiting for votes start time has been set, returns the duration between the writing
@@ -135,9 +217,9 @@ public:
Microseconds getWritingDecisionDuration(TickSource* tickSource, TickSource::Tick curTick) const;
/**
- * If the end time has been set, returns the duration between the waiting for decision acks
- * start time and the end time, else returns the duration between the waiting for decision acks
- * start time and curTick.
+ * If the deleting coordinator doc start time has been set, returns the duration between the
+ * waiting for decision acks start time and the deleting coordinator doc start time, else
+ * returns the duration between the waiting for decision acks start time and curTick.
*
* Must be called after setWaitingForDecisionAcksStartTime, but can be called any number of
* times.
@@ -145,10 +227,23 @@ public:
Microseconds getWaitingForDecisionAcksDuration(TickSource* tickSource,
TickSource::Tick curTick) const;
+ /**
+ * If the end time has been set, returns the duration between the deleting coordinator doc start
+ * and the end time, else returns the duration between the deleting coordinator doc start time
+ * and curTick.
+ *
+ * Must be called after setDeletingCoordinatorDocStartTime, but can be called any number of
+ * times.
+ */
+ Microseconds getDeletingCoordinatorDocDuration(TickSource* tickSource,
+ TickSource::Tick curTick) const;
+
private:
- Date_t _startWallClockTime;
- TickSource::Tick _startTime{0};
+ Date_t _createWallClockTime;
+ TickSource::Tick _createTime{0};
+ // The writing participant list start time doubles as the two-phase commit start time, since
+ // writing the participant list is the first step of the two-phase commit.
Date_t _writingParticipantListStartWallClockTime;
TickSource::Tick _writingParticipantListStartTime{0};
@@ -161,6 +256,9 @@ private:
Date_t _waitingForDecisionAcksStartWallClockTime;
TickSource::Tick _waitingForDecisionAcksStartTime{0};
+ Date_t _deletingCoordinatorDocStartWallClockTime;
+ TickSource::Tick _deletingCoordinatorDocStartTime{0};
+
Date_t _endWallClockTime;
TickSource::Tick _endTime{0};
};
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 3b85046b626..414bd3c058f 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/transaction_coordinator.h"
#include "mongo/db/logical_clock.h"
+#include "mongo/db/s/transaction_coordinator_metrics_observer.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -55,7 +56,9 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
_lsid(lsid),
_txnNumber(txnNumber),
_scheduler(std::move(scheduler)),
- _sendPrepareScheduler(_scheduler->makeChildScheduler()) {
+ _sendPrepareScheduler(_scheduler->makeChildScheduler()),
+ _transactionCoordinatorMetricsObserver(
+ stdx::make_unique<TransactionCoordinatorMetricsObserver>()) {
auto kickOffCommitPF = makePromiseFuture<void>();
_kickOffCommitPromise = std::move(kickOffCommitPF.promise);
@@ -80,6 +83,12 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
}
});
+ // TODO: The duration will be meaningless after failover.
+ _transactionCoordinatorMetricsObserver->onCreate(
+ ServerTransactionCoordinatorsMetrics::get(_serviceContext),
+ _serviceContext->getTickSource(),
+ _serviceContext->getPreciseClockSource()->now());
+
// Two-phase commit phases chain. Once this chain executes, the 2PC sequence has completed
// either with success or error and the scheduled deadline task above has been joined.
std::move(kickOffCommitPF.future)
@@ -92,6 +101,15 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
{
stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_participants);
+
+ _step = Step::kWritingParticipantList;
+
+ // TODO: The duration will be meaningless after failover.
+ _transactionCoordinatorMetricsObserver->onStartWritingParticipantList(
+ ServerTransactionCoordinatorsMetrics::get(_serviceContext),
+ _serviceContext->getTickSource(),
+ _serviceContext->getPreciseClockSource()->now());
+
if (_participantsDurable)
return Future<void>::makeReady();
}
@@ -113,6 +131,15 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
{
stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_participantsDurable);
+
+ _step = Step::kWaitingForVotes;
+
+ // TODO: The duration will be meaningless after failover.
+ _transactionCoordinatorMetricsObserver->onStartWaitingForVotes(
+ ServerTransactionCoordinatorsMetrics::get(_serviceContext),
+ _serviceContext->getTickSource(),
+ _serviceContext->getPreciseClockSource()->now());
+
if (_decision)
return Future<void>::makeReady();
}
@@ -146,6 +173,15 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
{
stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_decision);
+
+ _step = Step::kWritingDecision;
+
+ // TODO: The duration will be meaningless after failover.
+ _transactionCoordinatorMetricsObserver->onStartWritingDecision(
+ ServerTransactionCoordinatorsMetrics::get(_serviceContext),
+ _serviceContext->getTickSource(),
+ _serviceContext->getPreciseClockSource()->now());
+
if (_decisionDurable)
return Future<void>::makeReady();
}
@@ -167,6 +203,14 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
{
stdx::lock_guard<stdx::mutex> lg(_mutex);
invariant(_decisionDurable);
+
+ _step = Step::kWaitingForDecisionAcks;
+
+ // TODO: The duration will be meaningless after failover.
+ _transactionCoordinatorMetricsObserver->onStartWaitingForDecisionAcks(
+ ServerTransactionCoordinatorsMetrics::get(_serviceContext),
+ _serviceContext->getTickSource(),
+ _serviceContext->getPreciseClockSource()->now());
}
_decisionPromise.emplaceValue(_decision->getDecision());
@@ -189,6 +233,17 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
.then([this] {
// Do a best-effort attempt (i.e., writeConcern w:1) to delete the coordinator's durable
// state.
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ _step = Step::kDeletingCoordinatorDoc;
+
+ _transactionCoordinatorMetricsObserver->onStartDeletingCoordinatorDoc(
+ ServerTransactionCoordinatorsMetrics::get(_serviceContext),
+ _serviceContext->getTickSource(),
+ _serviceContext->getPreciseClockSource()->now());
+ }
+
return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber);
})
.onCompletion([ this, deadlineFuture = std::move(deadlineFuture) ](Status s) mutable {
@@ -279,6 +334,15 @@ void TransactionCoordinator::_done(Status status) {
<< redact(status);
stdx::unique_lock<stdx::mutex> ul(_mutex);
+
+ const auto tickSource = _serviceContext->getTickSource();
+
+ _transactionCoordinatorMetricsObserver->onEnd(
+ ServerTransactionCoordinatorsMetrics::get(_serviceContext),
+ tickSource,
+ _serviceContext->getPreciseClockSource()->now(),
+ _step);
+
_completionPromisesFired = true;
if (!_decisionDurable) {
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index 9d9fbdaf8ca..f379f50348e 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -36,6 +36,8 @@
namespace mongo {
+class TransactionCoordinatorMetricsObserver;
+
/**
* State machine, which implements the two-phase commit protocol for a specific transaction,
* identified by lsid + txnNumber.
@@ -50,6 +52,18 @@ class TransactionCoordinator {
public:
/**
+ * The two-phase commit steps.
+ */
+ enum class Step {
+ kInactive,
+ kWritingParticipantList,
+ kWaitingForVotes,
+ kWritingDecision,
+ kWaitingForDecisionAcks,
+ kDeletingCoordinatorDoc,
+ };
+
+ /**
* Instantiates a new TransactioncCoordinator for the specified lsid + txnNumber pair and gives
* it a 'scheduler' to use for any asynchronous tasks it spawns.
*
@@ -103,6 +117,13 @@ public:
*/
void cancelIfCommitNotYetStarted();
+ /**
+ * Returns the TransactionCoordinatorMetricsObserver for this TransactionCoordinator.
+ */
+ const TransactionCoordinatorMetricsObserver& getMetricsObserverForTest() {
+ return *_transactionCoordinatorMetricsObserver;
+ }
+
private:
bool _reserveKickOffCommitPromise();
@@ -129,6 +150,8 @@ private:
// Protects the state below
mutable stdx::mutex _mutex;
+ Step _step{Step::kInactive};
+
// Promise/future pair which will be signaled when the coordinator has completed
bool _kickOffCommitPromiseSet{false};
Promise<void> _kickOffCommitPromise;
@@ -158,6 +181,10 @@ private:
// TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations.
bool _completionPromisesFired{false};
std::vector<Promise<void>> _completionPromises;
+
+ // Store as unique_ptr to avoid a circular dependency between the TransactionCoordinator and the
+ // TransactionCoordinatorMetricsObserver.
+ std::unique_ptr<TransactionCoordinatorMetricsObserver> _transactionCoordinatorMetricsObserver;
};
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp b/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp
new file mode 100644
index 00000000000..593e4a8dfb4
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp
@@ -0,0 +1,184 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/transaction_coordinator_metrics_observer.h"
+
+namespace mongo {
+
+void TransactionCoordinatorMetricsObserver::onCreate(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime) {
+
+ //
+ // Per transaction coordinator stats.
+ //
+ _singleTransactionCoordinatorStats.setCreateTime(tickSource->getTicks(), curWallClockTime);
+
+ //
+ // Server wide transaction coordinators metrics.
+ //
+ serverTransactionCoordinatorsMetrics->incrementTotalCreated();
+}
+
+void TransactionCoordinatorMetricsObserver::onStartWritingParticipantList(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime) {
+
+ //
+ // Per transaction coordinator stats.
+ //
+ _singleTransactionCoordinatorStats.setWritingParticipantListStartTime(tickSource->getTicks(),
+ curWallClockTime);
+
+ //
+ // Server wide transaction coordinators metrics.
+ //
+
+ serverTransactionCoordinatorsMetrics->incrementTotalStartedTwoPhaseCommit();
+ serverTransactionCoordinatorsMetrics->incrementCurrentWritingParticipantList();
+}
+
+void TransactionCoordinatorMetricsObserver::onStartWaitingForVotes(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime) {
+
+ //
+ // Per transaction coordinator stats.
+ //
+ _singleTransactionCoordinatorStats.setWaitingForVotesStartTime(tickSource->getTicks(),
+ curWallClockTime);
+
+ //
+ // Server wide transaction coordinators metrics.
+ //
+ serverTransactionCoordinatorsMetrics->decrementCurrentWritingParticipantList();
+ serverTransactionCoordinatorsMetrics->incrementCurrentWaitingForVotes();
+}
+
+void TransactionCoordinatorMetricsObserver::onStartWritingDecision(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime) {
+
+ //
+ // Per transaction coordinator stats.
+ //
+ _singleTransactionCoordinatorStats.setWritingDecisionStartTime(tickSource->getTicks(),
+ curWallClockTime);
+
+ //
+ // Server wide transaction coordinators metrics.
+ //
+ serverTransactionCoordinatorsMetrics->decrementCurrentWaitingForVotes();
+ serverTransactionCoordinatorsMetrics->incrementCurrentWritingDecision();
+}
+
+void TransactionCoordinatorMetricsObserver::onStartWaitingForDecisionAcks(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime) {
+
+ //
+ // Per transaction coordinator stats.
+ //
+ _singleTransactionCoordinatorStats.setWaitingForDecisionAcksStartTime(tickSource->getTicks(),
+ curWallClockTime);
+
+ //
+ // Server wide transaction coordinators metrics.
+ //
+ serverTransactionCoordinatorsMetrics->decrementCurrentWritingDecision();
+ serverTransactionCoordinatorsMetrics->incrementCurrentWaitingForDecisionAcks();
+}
+
+void TransactionCoordinatorMetricsObserver::onStartDeletingCoordinatorDoc(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime) {
+
+ //
+ // Per transaction coordinator stats.
+ //
+ _singleTransactionCoordinatorStats.setDeletingCoordinatorDocStartTime(tickSource->getTicks(),
+ curWallClockTime);
+
+ //
+ // Server wide transaction coordinators metrics.
+ //
+ serverTransactionCoordinatorsMetrics->decrementCurrentWaitingForDecisionAcks();
+ serverTransactionCoordinatorsMetrics->incrementCurrentDeletingCoordinatorDoc();
+}
+
+void TransactionCoordinatorMetricsObserver::onEnd(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime,
+ TransactionCoordinator::Step step) {
+
+ //
+ // Per transaction coordinator stats.
+ //
+ _singleTransactionCoordinatorStats.setEndTime(tickSource->getTicks(), curWallClockTime);
+
+ //
+ // Server wide transaction coordinators metrics.
+ //
+ _decrementLastStep(serverTransactionCoordinatorsMetrics, step);
+}
+
+void TransactionCoordinatorMetricsObserver::_decrementLastStep(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorsMetrics,
+ TransactionCoordinator::Step step) {
+ switch (step) {
+ case TransactionCoordinator::Step::kInactive:
+ break;
+ case TransactionCoordinator::Step::kWritingParticipantList:
+ serverTransactionCoordinatorsMetrics->decrementCurrentWritingParticipantList();
+ break;
+ case TransactionCoordinator::Step::kWaitingForVotes:
+ serverTransactionCoordinatorsMetrics->decrementCurrentWaitingForVotes();
+ break;
+ case TransactionCoordinator::Step::kWritingDecision:
+ serverTransactionCoordinatorsMetrics->decrementCurrentWritingDecision();
+ break;
+ case TransactionCoordinator::Step::kWaitingForDecisionAcks:
+ serverTransactionCoordinatorsMetrics->decrementCurrentWaitingForDecisionAcks();
+ break;
+ case TransactionCoordinator::Step::kDeletingCoordinatorDoc:
+ serverTransactionCoordinatorsMetrics->decrementCurrentDeletingCoordinatorDoc();
+ break;
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_metrics_observer.h b/src/mongo/db/s/transaction_coordinator_metrics_observer.h
new file mode 100644
index 00000000000..38001cedc62
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_metrics_observer.h
@@ -0,0 +1,124 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/s/server_transaction_coordinators_metrics.h"
+#include "mongo/db/s/single_transaction_coordinator_stats.h"
+#include "mongo/db/s/transaction_coordinator.h"
+
+namespace mongo {
+
+/**
+ * Updates transaction coordinator metrics (per- two-phase commit metrics and server-wide two-phase
+ * commit metrics) upon the appropriate event.
+ */
+class TransactionCoordinatorMetricsObserver {
+
+public:
+ /**
+ * Updates relevant metrics when a transaction coordinator is created.
+ */
+ void onCreate(ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime);
+
+ /**
+ * Updates relevant metrics when a transaction coordinator is about to write the participant
+ * list.
+ */
+ void onStartWritingParticipantList(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime);
+
+ /**
+ * Updates relevant metrics when a transaction coordinator is about to send 'prepare' and start
+ * waiting for votes (i.e., 'prepare' responses).
+ */
+ void onStartWaitingForVotes(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime);
+
+ /**
+ * Updates relevant metrics when a transaction coordinator is about to write the decision.
+ */
+ void onStartWritingDecision(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime);
+
+ /**
+ * Updates relevant metrics when a transaction coordinator is about to send the decision to
+ * participants and start waiting for acknowledgements.
+ */
+ void onStartWaitingForDecisionAcks(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime);
+
+ /**
+ * Updates relevant metrics when a transaction coordinator is about to delete its durable state.
+ */
+ void onStartDeletingCoordinatorDoc(
+ ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime);
+
+ /**
+ * Updates relevant metrics when a transaction coordinator is destroyed.
+ *
+ * The 'lastStep' parameter is needed because, unlike for the other state transitions, the
+ * coordinator can transition to the end state from any other state, for example on stepdown.
+ */
+ void onEnd(ServerTransactionCoordinatorsMetrics* serverTransactionCoordinatorMetrics,
+ TickSource* tickSource,
+ Date_t curWallClockTime,
+ TransactionCoordinator::Step lastStep);
+
+ /**
+ * Returns a read-only reference to the SingleTransactionCoordinatorStats object stored in this
+ * TransactionCoordinatorMetricsObserver instance.
+ */
+ const SingleTransactionCoordinatorStats& getSingleTransactionCoordinatorStats() const {
+ return _singleTransactionCoordinatorStats;
+ }
+
+private:
+ /**
+ * Decrements the current active in 'step'.
+ */
+ void _decrementLastStep(ServerTransactionCoordinatorsMetrics*, TransactionCoordinator::Step);
+
+ // Tracks metrics for a single commit coordination.
+ SingleTransactionCoordinatorStats _singleTransactionCoordinatorStats;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index 2ca40d1ed6b..77bd01d1173 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -34,9 +34,14 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/s/transaction_coordinator_document_gen.h"
+#include "mongo/db/s/transaction_coordinator_metrics_observer.h"
#include "mongo/db/s/transaction_coordinator_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/log.h"
+#include "mongo/util/tick_source_mock.h"
namespace mongo {
namespace {
@@ -44,6 +49,8 @@ namespace {
using PrepareResponse = txn::PrepareResponse;
using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
+const Hours kLongFutureTimeout(8);
+
const StatusWith<BSONObj> kNoSuchTransaction =
BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction);
const StatusWith<BSONObj> kOk = BSON("ok" << 1);
@@ -97,6 +104,42 @@ protected:
ASSERT_FALSE(network()->hasReadyRequests());
}
+ void waitUntilCoordinatorDocIsPresent() {
+ DBDirectClient dbClient(operationContext());
+ while (dbClient.findOne(NamespaceString::kTransactionCoordinatorsNamespace.ns(), Query())
+ .isEmpty())
+ ;
+ }
+
+ /**
+ * Precondition: A coordinator document exists with or without a decision.
+ */
+ void waitUntilCoordinatorDocHasDecision() {
+ DBDirectClient dbClient(operationContext());
+ TransactionCoordinatorDocument doc;
+ do {
+ doc = TransactionCoordinatorDocument::parse(
+ IDLParserErrorContext("dummy"),
+ dbClient.findOne(NamespaceString::kTransactionCoordinatorsNamespace.ns(), Query()));
+ } while (!doc.getDecision());
+ }
+
+ void waitUntilNoCoordinatorDocIsPresent() {
+ DBDirectClient dbClient(operationContext());
+ while (!dbClient.findOne(NamespaceString::kTransactionCoordinatorsNamespace.ns(), Query())
+ .isEmpty())
+ ;
+ }
+
+ void waitUntilMessageSent() {
+ while (true) {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(network());
+ if (network()->hasReadyRequests()) {
+ return;
+ }
+ }
+ }
+
LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
TxnNumber _txnNumber{1};
};
@@ -778,5 +821,982 @@ TEST_F(TransactionCoordinatorTest,
coordinator.onCompletion().get();
}
+class TransactionCoordinatorMetricsTest : public TransactionCoordinatorTestBase {
+public:
+ void setUp() override {
+ TransactionCoordinatorTestBase::setUp();
+
+ getServiceContext()->setPreciseClockSource(stdx::make_unique<ClockSourceMock>());
+
+ auto tickSource = stdx::make_unique<TickSourceMock<Microseconds>>();
+ tickSource->reset(1);
+ getServiceContext()->setTickSource(std::move(tickSource));
+ }
+
+ ServerTransactionCoordinatorsMetrics* metrics() {
+ return ServerTransactionCoordinatorsMetrics::get(getServiceContext());
+ }
+
+ ClockSourceMock* clockSource() {
+ return dynamic_cast<ClockSourceMock*>(getServiceContext()->getPreciseClockSource());
+ }
+
+ TickSourceMock<Microseconds>* tickSource() {
+ return dynamic_cast<TickSourceMock<Microseconds>*>(getServiceContext()->getTickSource());
+ }
+
+ struct Stats {
+ // Start times
+ boost::optional<Date_t> createTime;
+ boost::optional<Date_t> writingParticipantListStartTime;
+ boost::optional<Date_t> waitingForVotesStartTime;
+ boost::optional<Date_t> writingDecisionStartTime;
+ boost::optional<Date_t> waitingForDecisionAcksStartTime;
+ boost::optional<Date_t> deletingCoordinatorDocStartTime;
+ boost::optional<Date_t> endTime;
+
+ // Durations
+ boost::optional<Microseconds> totalDuration;
+ boost::optional<Microseconds> twoPhaseCommitDuration;
+ boost::optional<Microseconds> writingParticipantListDuration;
+ boost::optional<Microseconds> waitingForVotesDuration;
+ boost::optional<Microseconds> writingDecisionDuration;
+ boost::optional<Microseconds> waitingForDecisionAcksDuration;
+ boost::optional<Microseconds> deletingCoordinatorDocDuration;
+ };
+
+ void checkStats(const SingleTransactionCoordinatorStats& stats, const Stats& expected) {
+
+ // Start times
+
+ if (expected.createTime) {
+ ASSERT_EQ(*expected.createTime, stats.getCreateTime());
+ }
+
+ if (expected.writingParticipantListStartTime) {
+ ASSERT(*expected.writingParticipantListStartTime ==
+ stats.getWritingParticipantListStartTime());
+ }
+
+ if (expected.waitingForVotesStartTime) {
+ ASSERT(*expected.waitingForVotesStartTime == stats.getWaitingForVotesStartTime());
+ }
+
+ if (expected.writingDecisionStartTime) {
+ ASSERT(*expected.writingDecisionStartTime == stats.getWritingDecisionStartTime());
+ }
+
+ if (expected.waitingForDecisionAcksStartTime) {
+ ASSERT(*expected.waitingForDecisionAcksStartTime ==
+ stats.getWaitingForDecisionAcksStartTime());
+ }
+
+ if (expected.deletingCoordinatorDocStartTime) {
+ ASSERT(*expected.deletingCoordinatorDocStartTime ==
+ stats.getDeletingCoordinatorDocStartTime());
+ }
+
+ if (expected.endTime) {
+ ASSERT(*expected.endTime == stats.getEndTime());
+ }
+
+ // Durations
+
+ if (expected.totalDuration) {
+ ASSERT_EQ(*expected.totalDuration,
+ stats.getDurationSinceCreation(tickSource(), tickSource()->getTicks()));
+ }
+
+ if (expected.twoPhaseCommitDuration) {
+ ASSERT_EQ(*expected.twoPhaseCommitDuration,
+ stats.getTwoPhaseCommitDuration(tickSource(), tickSource()->getTicks()));
+ }
+
+ if (expected.writingParticipantListDuration) {
+ ASSERT_EQ(
+ *expected.writingParticipantListDuration,
+ stats.getWritingParticipantListDuration(tickSource(), tickSource()->getTicks()));
+ }
+
+ if (expected.waitingForVotesDuration) {
+ ASSERT_EQ(*expected.waitingForVotesDuration,
+ stats.getWaitingForVotesDuration(tickSource(), tickSource()->getTicks()));
+ }
+
+ if (expected.writingDecisionDuration) {
+ ASSERT_EQ(*expected.writingDecisionDuration,
+ stats.getWritingDecisionDuration(tickSource(), tickSource()->getTicks()));
+ }
+
+ if (expected.waitingForDecisionAcksDuration) {
+ ASSERT_EQ(
+ *expected.waitingForDecisionAcksDuration,
+ stats.getWaitingForDecisionAcksDuration(tickSource(), tickSource()->getTicks()));
+ }
+
+ if (expected.deletingCoordinatorDocDuration) {
+ ASSERT_EQ(
+ *expected.deletingCoordinatorDocDuration,
+ stats.getDeletingCoordinatorDocDuration(tickSource(), tickSource()->getTicks()));
+ }
+ }
+
+ struct Metrics {
+ // Totals
+ std::int64_t totalCreated{0};
+ std::int64_t totalStartedTwoPhaseCommit{0};
+
+ // Current in steps
+ std::int64_t currentWritingParticipantList{0};
+ std::int64_t currentWaitingForVotes{0};
+ std::int64_t currentWritingDecision{0};
+ std::int64_t currentWaitingForDecisionAcks{0};
+ std::int64_t currentDeletingCoordinatorDoc{0};
+ };
+
+ void checkMetrics(const Metrics& expectedMetrics) {
+ // Totals
+ ASSERT_EQ(expectedMetrics.totalCreated, metrics()->getTotalCreated());
+ ASSERT_EQ(expectedMetrics.totalStartedTwoPhaseCommit,
+ metrics()->getTotalStartedTwoPhaseCommit());
+
+ // Current in steps
+ ASSERT_EQ(expectedMetrics.currentWritingParticipantList,
+ metrics()->getCurrentWritingParticipantList());
+ ASSERT_EQ(expectedMetrics.currentWaitingForVotes, metrics()->getCurrentWaitingForVotes());
+ ASSERT_EQ(expectedMetrics.currentWritingDecision, metrics()->getCurrentWritingDecision());
+ ASSERT_EQ(expectedMetrics.currentWaitingForDecisionAcks,
+ metrics()->getCurrentWaitingForDecisionAcks());
+ ASSERT_EQ(expectedMetrics.currentDeletingCoordinatorDoc,
+ metrics()->getCurrentDeletingCoordinatorDoc());
+ }
+
+ Date_t advanceClockSourceAndReturnNewNow() {
+ const auto newNow = Date_t::now();
+ clockSource()->reset(newNow);
+ return newNow;
+ }
+};
+
+TEST_F(TransactionCoordinatorMetricsTest, SingleCoordinatorStatsSimpleTwoPhaseCommit) {
+ Stats expectedStats;
+ TransactionCoordinatorMetricsObserver coordinatorObserver;
+ const auto& stats = coordinatorObserver.getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+
+ // Stats are updated on onCreate.
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ coordinatorObserver.onCreate(metrics(), tickSource(), clockSource()->now());
+ checkStats(stats, expectedStats);
+
+ // Advancing the time causes the total duration to increase.
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ checkStats(stats, expectedStats);
+
+ // Stats are updated on onStartWritingParticipantList.
+
+ expectedStats.writingParticipantListStartTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.twoPhaseCommitDuration = Microseconds(0);
+ expectedStats.writingParticipantListDuration = Microseconds(0);
+ coordinatorObserver.onStartWritingParticipantList(
+ metrics(), tickSource(), clockSource()->now());
+ checkStats(stats, expectedStats);
+
+ // Advancing the time causes the total duration, two-phase commit duration, and duration writing
+ // participant list to increase.
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.writingParticipantListDuration =
+ *expectedStats.writingParticipantListDuration + Microseconds(100);
+ checkStats(stats, expectedStats);
+
+ // Stats are updated on onStartWaitingForVotes.
+
+ expectedStats.waitingForVotesStartTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.waitingForVotesDuration = Microseconds(0);
+ coordinatorObserver.onStartWaitingForVotes(metrics(), tickSource(), clockSource()->now());
+ checkStats(stats, expectedStats);
+
+ // Advancing the time causes only the total duration, two-phase commit duration, and duration
+ // waiting for votes to increase.
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.waitingForVotesDuration =
+ *expectedStats.waitingForVotesDuration + Microseconds(100);
+ checkStats(stats, expectedStats);
+
+ // Stats are updated on onStartWritingDecision.
+
+ expectedStats.writingDecisionStartTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.writingDecisionDuration = Microseconds(0);
+ coordinatorObserver.onStartWritingDecision(metrics(), tickSource(), clockSource()->now());
+ checkStats(stats, expectedStats);
+
+ // Advancing the time causes only the total duration, two-phase commit duration, and duration
+ // writing decision to increase.
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.writingDecisionDuration =
+ *expectedStats.writingDecisionDuration + Microseconds(100);
+ checkStats(stats, expectedStats);
+
+ // Stats are updated on onStartWaitingForDecisionAcks.
+
+ expectedStats.waitingForDecisionAcksStartTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.waitingForDecisionAcksDuration = Microseconds(0);
+ coordinatorObserver.onStartWaitingForDecisionAcks(
+ metrics(), tickSource(), clockSource()->now());
+ checkStats(stats, expectedStats);
+
+ // Advancing the time causes only the total duration, two-phase commit duration, and duration
+ // waiting for decision acks to increase.
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.waitingForDecisionAcksDuration =
+ *expectedStats.waitingForDecisionAcksDuration + Microseconds(100);
+ checkStats(stats, expectedStats);
+
+ // Stats are updated on onStartDeletingCoordinatorDoc.
+
+ expectedStats.deletingCoordinatorDocStartTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.deletingCoordinatorDocDuration = Microseconds(0);
+ coordinatorObserver.onStartDeletingCoordinatorDoc(
+ metrics(), tickSource(), clockSource()->now());
+ checkStats(stats, expectedStats);
+
+ // Advancing the time causes only the total duration, two-phase commit duration, and duration
+ // deleting the coordinator doc to increase.
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.deletingCoordinatorDocDuration =
+ *expectedStats.deletingCoordinatorDocDuration + Microseconds(100);
+ checkStats(stats, expectedStats);
+
+ // Stats are updated on onEnd.
+
+ expectedStats.endTime = advanceClockSourceAndReturnNewNow();
+ coordinatorObserver.onEnd(metrics(),
+ tickSource(),
+ clockSource()->now(),
+ TransactionCoordinator::Step::kDeletingCoordinatorDoc);
+ checkStats(stats, expectedStats);
+
+ // Once onEnd has been called, advancing the time does not cause any duration to increase.
+ tickSource()->advance(Microseconds(100));
+ checkStats(stats, expectedStats);
+}
+
+TEST_F(TransactionCoordinatorMetricsTest, ServerWideMetricsSimpleTwoPhaseCommit) {
+ TransactionCoordinatorMetricsObserver coordinatorObserver;
+ Metrics expectedMetrics;
+ checkMetrics(expectedMetrics);
+
+ // Metrics are updated on onCreate.
+ expectedMetrics.totalCreated++;
+ coordinatorObserver.onCreate(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ // Metrics are updated on onStartWritingParticipantList.
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentWritingParticipantList++;
+ coordinatorObserver.onStartWritingParticipantList(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ // Metrics are updated on onStartWaitingForVotes.
+ expectedMetrics.currentWritingParticipantList--;
+ expectedMetrics.currentWaitingForVotes++;
+ coordinatorObserver.onStartWaitingForVotes(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ // Metrics are updated on onStartWritingDecision.
+ expectedMetrics.currentWaitingForVotes--;
+ expectedMetrics.currentWritingDecision++;
+ coordinatorObserver.onStartWritingDecision(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ // Metrics are updated on onStartWaitingForDecisionAcks.
+ expectedMetrics.currentWritingDecision--;
+ expectedMetrics.currentWaitingForDecisionAcks++;
+ coordinatorObserver.onStartWaitingForDecisionAcks(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ // Metrics are updated on onStartDeletingCoordinatorDoc.
+ expectedMetrics.currentWaitingForDecisionAcks--;
+ expectedMetrics.currentDeletingCoordinatorDoc++;
+ coordinatorObserver.onStartDeletingCoordinatorDoc(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ // Metrics are updated on onEnd.
+ expectedMetrics.currentDeletingCoordinatorDoc--;
+ coordinatorObserver.onEnd(metrics(),
+ tickSource(),
+ clockSource()->now(),
+ TransactionCoordinator::Step::kDeletingCoordinatorDoc);
+ checkMetrics(expectedMetrics);
+}
+
+TEST_F(TransactionCoordinatorMetricsTest, ServerWideMetricsSimpleTwoPhaseCommitTwoCoordinators) {
+ TransactionCoordinatorMetricsObserver coordinatorObserver1;
+ TransactionCoordinatorMetricsObserver coordinatorObserver2;
+ Metrics expectedMetrics;
+ checkMetrics(expectedMetrics);
+
+ // Increment each coordinator one step at a time.
+
+ expectedMetrics.totalCreated++;
+ coordinatorObserver1.onCreate(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.totalCreated++;
+ coordinatorObserver2.onCreate(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentWritingParticipantList++;
+ coordinatorObserver1.onStartWritingParticipantList(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentWritingParticipantList++;
+ coordinatorObserver2.onStartWritingParticipantList(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentWritingParticipantList--;
+ expectedMetrics.currentWaitingForVotes++;
+ coordinatorObserver1.onStartWaitingForVotes(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentWritingParticipantList--;
+ expectedMetrics.currentWaitingForVotes++;
+ coordinatorObserver2.onStartWaitingForVotes(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentWaitingForVotes--;
+ expectedMetrics.currentWritingDecision++;
+ coordinatorObserver1.onStartWritingDecision(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentWaitingForVotes--;
+ expectedMetrics.currentWritingDecision++;
+ coordinatorObserver2.onStartWritingDecision(metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentWritingDecision--;
+ expectedMetrics.currentWaitingForDecisionAcks++;
+ coordinatorObserver1.onStartWaitingForDecisionAcks(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentWritingDecision--;
+ expectedMetrics.currentWaitingForDecisionAcks++;
+ coordinatorObserver2.onStartWaitingForDecisionAcks(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentWaitingForDecisionAcks--;
+ expectedMetrics.currentDeletingCoordinatorDoc++;
+ coordinatorObserver1.onStartDeletingCoordinatorDoc(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentWaitingForDecisionAcks--;
+ expectedMetrics.currentDeletingCoordinatorDoc++;
+ coordinatorObserver2.onStartDeletingCoordinatorDoc(
+ metrics(), tickSource(), clockSource()->now());
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentDeletingCoordinatorDoc--;
+ coordinatorObserver1.onEnd(metrics(),
+ tickSource(),
+ clockSource()->now(),
+ TransactionCoordinator::Step::kDeletingCoordinatorDoc);
+ checkMetrics(expectedMetrics);
+
+ expectedMetrics.currentDeletingCoordinatorDoc--;
+ coordinatorObserver2.onEnd(metrics(),
+ tickSource(),
+ clockSource()->now(),
+ TransactionCoordinator::Step::kDeletingCoordinatorDoc);
+ checkMetrics(expectedMetrics);
+}
+
+TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) {
+ Stats expectedStats;
+ Metrics expectedMetrics;
+
+ checkMetrics(expectedMetrics);
+
+ log() << "Create the coordinator.";
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ expectedMetrics.totalCreated++;
+
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ Date_t::max());
+ const auto& stats =
+ coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ log() << "Start two-phase commit (allow the coordinator to progress to writing the participant "
+ "list).";
+
+ expectedStats.writingParticipantListStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration = Microseconds(0);
+ expectedStats.writingParticipantListDuration = Microseconds(0);
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentWritingParticipantList++;
+
+ setGlobalFailPoint("hangBeforeWaitingForParticipantListWriteConcern",
+ BSON("mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("useUninterruptibleSleep" << 1)));
+ coordinator.runCommit(kTwoShardIdList);
+ waitUntilCoordinatorDocIsPresent();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ log() << "Allow the coordinator to progress to waiting for votes.";
+
+ expectedStats.waitingForVotesStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.writingParticipantListDuration =
+ *expectedStats.writingParticipantListDuration + Microseconds(100);
+ expectedStats.waitingForVotesDuration = Microseconds(0);
+ expectedMetrics.currentWritingParticipantList--;
+ expectedMetrics.currentWaitingForVotes++;
+
+ setGlobalFailPoint("hangBeforeWaitingForParticipantListWriteConcern",
+ BSON("mode"
+ << "off"));
+ waitUntilMessageSent();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ log() << "Allow the coordinator to progress to writing the decision.";
+
+ expectedStats.writingDecisionStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.waitingForVotesDuration =
+ *expectedStats.waitingForVotesDuration + Microseconds(100);
+ expectedStats.writingDecisionDuration = Microseconds(0);
+ expectedMetrics.currentWaitingForVotes--;
+ expectedMetrics.currentWritingDecision++;
+
+ setGlobalFailPoint("hangBeforeWaitingForDecisionWriteConcern",
+ BSON("mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("useUninterruptibleSleep" << 1)));
+ // Respond to the second prepare request in a separate thread, because the coordinator will
+ // hijack that thread to run its continuation.
+ assertPrepareSentAndRespondWithSuccess();
+ auto future = launchAsync([this] { assertPrepareSentAndRespondWithSuccess(); });
+ waitUntilCoordinatorDocHasDecision();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ log() << "Allow the coordinator to progress to waiting for acks.";
+
+ expectedStats.waitingForDecisionAcksStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.writingDecisionDuration =
+ *expectedStats.writingDecisionDuration + Microseconds(100);
+ expectedStats.waitingForDecisionAcksDuration = Microseconds(0);
+ expectedMetrics.currentWritingDecision--;
+ expectedMetrics.currentWaitingForDecisionAcks++;
+
+ setGlobalFailPoint("hangBeforeWaitingForDecisionWriteConcern",
+ BSON("mode"
+ << "off"));
+ // The last thing the coordinator will do on the hijacked prepare response thread is schedule
+ // the commitTransaction network requests.
+ future.timed_get(kLongFutureTimeout);
+ waitUntilMessageSent();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ log() << "Allow the coordinator to progress to deleting the coordinator doc.";
+
+ expectedStats.deletingCoordinatorDocStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.waitingForDecisionAcksDuration =
+ *expectedStats.waitingForDecisionAcksDuration + Microseconds(100);
+ expectedStats.deletingCoordinatorDocDuration = Microseconds(0);
+ expectedMetrics.currentWaitingForDecisionAcks--;
+ expectedMetrics.currentDeletingCoordinatorDoc++;
+
+ setGlobalFailPoint("hangAfterDeletingCoordinatorDoc",
+ BSON("mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("useUninterruptibleSleep" << 1)));
+ // Respond to the second commit request in a separate thread, because the coordinator will
+ // hijack that thread to run its continuation.
+ assertCommitSentAndRespondWithSuccess();
+ future = launchAsync([this] { assertCommitSentAndRespondWithSuccess(); });
+ waitUntilNoCoordinatorDocIsPresent();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ log() << "Allow the coordinator to complete.";
+
+ expectedStats.endTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.twoPhaseCommitDuration =
+ *expectedStats.twoPhaseCommitDuration + Microseconds(100);
+ expectedStats.deletingCoordinatorDocDuration =
+ *expectedStats.deletingCoordinatorDocDuration + Microseconds(100);
+ expectedMetrics.currentDeletingCoordinatorDoc--;
+
+ setGlobalFailPoint("hangAfterDeletingCoordinatorDoc",
+ BSON("mode"
+ << "off"));
+ // The last thing the coordinator will do on the hijacked commit response thread is signal the
+ // coordinator's completion.
+ future.timed_get(kLongFutureTimeout);
+ coordinator.onCompletion().get();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+}
+
+TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) {
+ Stats expectedStats;
+ Metrics expectedMetrics;
+
+ checkMetrics(expectedMetrics);
+
+ // Create the coordinator.
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ expectedMetrics.totalCreated++;
+
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ Date_t::max());
+ const auto& stats =
+ coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Cancel the coordinator.
+
+ expectedStats.endTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+
+ coordinator.cancelIfCommitNotYetStarted();
+ coordinator.onCompletion().get();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+}
+
+TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordinatorIsInactive) {
+ Stats expectedStats;
+ Metrics expectedMetrics;
+
+ checkMetrics(expectedMetrics);
+
+ // Create the coordinator.
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ expectedMetrics.totalCreated++;
+
+ auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
+ auto awsPtr = aws.get();
+ TransactionCoordinator coordinator(
+ getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ const auto& stats =
+ coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Shut down the coordinator's AWS.
+
+ expectedStats.endTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+
+ awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
+ coordinator.onCompletion().get();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+}
+
+TEST_F(TransactionCoordinatorMetricsTest,
+ CoordinatorsAWSIsShutDownWhileCoordinatorIsWritingParticipantList) {
+ Stats expectedStats;
+ Metrics expectedMetrics;
+
+ checkMetrics(expectedMetrics);
+
+ // Create the coordinator.
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ expectedMetrics.totalCreated++;
+
+ auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
+ auto awsPtr = aws.get();
+ TransactionCoordinator coordinator(
+ getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ const auto& stats =
+ coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Wait until the coordinator is writing the participant list.
+
+ expectedStats.writingParticipantListStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.writingParticipantListDuration = Microseconds(0);
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentWritingParticipantList++;
+
+ setGlobalFailPoint("hangBeforeWaitingForParticipantListWriteConcern",
+ BSON("mode"
+ << "alwaysOn"));
+ coordinator.runCommit(kTwoShardIdList);
+ waitUntilCoordinatorDocIsPresent();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Shut down the coordinator's AWS.
+
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.writingParticipantListDuration =
+ *expectedStats.writingParticipantListDuration + Microseconds(100);
+ expectedMetrics.currentWritingParticipantList--;
+
+ awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
+ coordinator.onCompletion().get();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Clear the failpoint before the next test.
+ setGlobalFailPoint("hangBeforeWaitingForParticipantListWriteConcern",
+ BSON("mode"
+ << "off"));
+}
+
+TEST_F(TransactionCoordinatorMetricsTest,
+ CoordinatorsAWSIsShutDownWhileCoordinatorIsWaitingForVotes) {
+ Stats expectedStats;
+ Metrics expectedMetrics;
+
+ checkMetrics(expectedMetrics);
+
+ // Create the coordinator.
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ expectedMetrics.totalCreated++;
+
+ auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
+ auto awsPtr = aws.get();
+ TransactionCoordinator coordinator(
+ getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ const auto& stats =
+ coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Wait until the coordinator is waiting for votes.
+
+ expectedStats.waitingForVotesStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.waitingForVotesDuration = Microseconds(0);
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentWaitingForVotes++;
+
+ coordinator.runCommit(kTwoShardIdList);
+ waitUntilMessageSent();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Shut down the coordinator's AWS.
+
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.waitingForVotesDuration =
+ *expectedStats.waitingForVotesDuration + Microseconds(100);
+ expectedMetrics.currentWaitingForVotes--;
+
+ awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
+ network()->enterNetwork();
+ network()->runReadyNetworkOperations();
+ network()->exitNetwork();
+ coordinator.onCompletion().get();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+}
+
+TEST_F(TransactionCoordinatorMetricsTest,
+ CoordinatorsAWSIsShutDownWhileCoordinatorIsWritingDecision) {
+ Stats expectedStats;
+ Metrics expectedMetrics;
+
+ checkMetrics(expectedMetrics);
+
+ // Create the coordinator.
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ expectedMetrics.totalCreated++;
+
+ auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
+ auto awsPtr = aws.get();
+ TransactionCoordinator coordinator(
+ getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ const auto& stats =
+ coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Wait until the coordinator is writing the decision.
+
+ expectedStats.writingDecisionStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.writingDecisionDuration = Microseconds(0);
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentWritingDecision++;
+
+ setGlobalFailPoint("hangBeforeWaitingForDecisionWriteConcern",
+ BSON("mode"
+ << "alwaysOn"));
+ coordinator.runCommit(kTwoShardIdList);
+ // Respond to the second prepare request in a separate thread, because the coordinator will
+ // hijack that thread to run its continuation.
+ assertPrepareSentAndRespondWithSuccess();
+ auto future = launchAsync([this] { assertPrepareSentAndRespondWithSuccess(); });
+ waitUntilCoordinatorDocHasDecision();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Shut down the coordinator's AWS.
+
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.writingDecisionDuration =
+ *expectedStats.writingDecisionDuration + Microseconds(100);
+ expectedMetrics.currentWritingDecision--;
+
+ awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
+ coordinator.onCompletion().get();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Clear the failpoint before the next test.
+ setGlobalFailPoint("hangBeforeWaitingForDecisionWriteConcern",
+ BSON("mode"
+ << "off"));
+}
+
+TEST_F(TransactionCoordinatorMetricsTest,
+ CoordinatorsAWSIsShutDownWhileCoordinatorIsWaitingForDecisionAcks) {
+ Stats expectedStats;
+ Metrics expectedMetrics;
+
+ checkMetrics(expectedMetrics);
+
+ // Create the coordinator.
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ expectedMetrics.totalCreated++;
+
+ auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
+ auto awsPtr = aws.get();
+ TransactionCoordinator coordinator(
+ getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ const auto& stats =
+ coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Wait until the coordinator is waiting for decision acks.
+
+ expectedStats.waitingForDecisionAcksStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.waitingForDecisionAcksDuration = Microseconds(0);
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentWaitingForDecisionAcks++;
+
+ coordinator.runCommit(kTwoShardIdList);
+ // Respond to the second prepare request in a separate thread, because the coordinator will
+ // hijack that thread to run its continuation.
+ assertPrepareSentAndRespondWithSuccess();
+ auto future = launchAsync([this] { assertPrepareSentAndRespondWithSuccess(); });
+ // The last thing the coordinator will do on the hijacked prepare response thread is schedule
+ // the commitTransaction network requests.
+ future.timed_get(kLongFutureTimeout);
+ waitUntilMessageSent();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Shut down the coordinator's AWS.
+
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.waitingForDecisionAcksDuration =
+ *expectedStats.waitingForDecisionAcksDuration + Microseconds(100);
+ expectedMetrics.currentWaitingForDecisionAcks--;
+
+ awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
+ network()->enterNetwork();
+ network()->runReadyNetworkOperations();
+ network()->exitNetwork();
+ coordinator.onCompletion().get();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+}
+
+TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordinatorIsDeletingDoc) {
+ Stats expectedStats;
+ Metrics expectedMetrics;
+
+ checkMetrics(expectedMetrics);
+
+ // Create the coordinator.
+
+ expectedStats.createTime = advanceClockSourceAndReturnNewNow();
+ expectedStats.totalDuration = Microseconds(0);
+ expectedMetrics.totalCreated++;
+
+ auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
+ auto awsPtr = aws.get();
+ TransactionCoordinator coordinator(
+ getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ const auto& stats =
+ coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Wait until the coordinator is deleting the coordinator doc.
+
+ expectedStats.deletingCoordinatorDocStartTime = advanceClockSourceAndReturnNewNow();
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.deletingCoordinatorDocDuration = Microseconds(0);
+ expectedMetrics.totalStartedTwoPhaseCommit++;
+ expectedMetrics.currentDeletingCoordinatorDoc++;
+
+ setGlobalFailPoint("hangAfterDeletingCoordinatorDoc",
+ BSON("mode"
+ << "alwaysOn"));
+ coordinator.runCommit(kTwoShardIdList);
+ // Respond to the second prepare request in a separate thread, because the coordinator will
+ // hijack that thread to run its continuation.
+ assertPrepareSentAndRespondWithSuccess();
+ auto future = launchAsync([this] { assertPrepareSentAndRespondWithSuccess(); });
+ // The last thing the coordinator will do on the hijacked prepare response thread is schedule
+ // the commitTransaction network requests.
+ future.timed_get(kLongFutureTimeout);
+ waitUntilMessageSent();
+ // Respond to the second commit request in a separate thread, because the coordinator will
+ // hijack that thread to run its continuation.
+ assertCommitSentAndRespondWithSuccess();
+ future = launchAsync([this] { assertCommitSentAndRespondWithSuccess(); });
+ waitUntilNoCoordinatorDocIsPresent();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Shut down the coordinator's AWS.
+
+ tickSource()->advance(Microseconds(100));
+ expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);
+ expectedStats.deletingCoordinatorDocDuration =
+ *expectedStats.deletingCoordinatorDocDuration + Microseconds(100);
+ expectedMetrics.currentDeletingCoordinatorDoc--;
+
+ awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
+ // The last thing the coordinator will do on the hijacked commit response thread is signal the
+ // coordinator's completion.
+ future.timed_get(kLongFutureTimeout);
+ coordinator.onCompletion().get();
+
+ checkStats(stats, expectedStats);
+ checkMetrics(expectedMetrics);
+
+ // Clear the failpoint before the next test.
+ setGlobalFailPoint("hangAfterDeletingCoordinatorDoc",
+ BSON("mode"
+ << "off"));
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp
index 27c21fc807d..05b9ff6437b 100644
--- a/src/mongo/db/s/transaction_coordinator_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_util.cpp
@@ -50,6 +50,7 @@ namespace {
MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern);
MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern);
+MONGO_FAIL_POINT_DEFINE(hangAfterDeletingCoordinatorDoc);
MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList);
MONGO_FAIL_POINT_DEFINE(hangBeforeWritingDecision);
@@ -172,10 +173,15 @@ void persistParticipantListBlocking(OperationContext* opCtx,
LOG(3) << "Wrote participant list for " << lsid.getId() << ':' << txnNumber;
- if (MONGO_FAIL_POINT(hangBeforeWaitingForParticipantListWriteConcern)) {
+ MONGO_FAIL_POINT_BLOCK(hangBeforeWaitingForParticipantListWriteConcern, fp) {
LOG(0) << "Hit hangBeforeWaitingForParticipantListWriteConcern failpoint";
- MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
- opCtx, hangBeforeWaitingForParticipantListWriteConcern);
+ const BSONObj& data = fp.getData();
+ if (!data["useUninterruptibleSleep"].eoo()) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForParticipantListWriteConcern);
+ } else {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
+ opCtx, hangBeforeWaitingForParticipantListWriteConcern);
+ }
}
WriteConcernResult unusedWCResult;
@@ -373,10 +379,15 @@ void persistDecisionBlocking(OperationContext* opCtx,
LOG(3) << "Wrote decision " << (commitTimestamp ? "commit" : "abort") << " for " << lsid.getId()
<< ':' << txnNumber;
- if (MONGO_FAIL_POINT(hangBeforeWaitingForDecisionWriteConcern)) {
+ MONGO_FAIL_POINT_BLOCK(hangBeforeWaitingForDecisionWriteConcern, fp) {
LOG(0) << "Hit hangBeforeWaitingForDecisionWriteConcern failpoint";
- MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx,
- hangBeforeWaitingForDecisionWriteConcern);
+ const BSONObj& data = fp.getData();
+ if (!data["useUninterruptibleSleep"].eoo()) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForDecisionWriteConcern);
+ } else {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
+ opCtx, hangBeforeWaitingForDecisionWriteConcern);
+ }
}
WriteConcernResult unusedWCResult;
@@ -505,6 +516,16 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
}
LOG(3) << "Deleted coordinator doc for " << lsid.getId() << ':' << txnNumber;
+
+ MONGO_FAIL_POINT_BLOCK(hangAfterDeletingCoordinatorDoc, fp) {
+ LOG(0) << "Hit hangAfterDeletingCoordinatorDoc failpoint";
+ const BSONObj& data = fp.getData();
+ if (!data["useUninterruptibleSleep"].eoo()) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterDeletingCoordinatorDoc);
+ } else {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangAfterDeletingCoordinatorDoc);
+ }
+ }
}
} // namespace