summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorLamont Nelson <lamont.nelson@mongodb.com>2019-09-05 18:32:19 +0000
committerevergreen <evergreen@mongodb.com>2019-09-05 18:32:19 +0000
commitb46eb782c4730c8df589a56550c29c3f3585904a (patch)
tree908a3dffce58c449a9d241f193c99e1e6ec28bee /src/mongo
parent174736c49e4a284d12c2d31a3f9c8bf341a35c65 (diff)
downloadmongo-b46eb782c4730c8df589a56550c29c3f3585904a.tar.gz
SERVER-42809 report metrics for the transaction coordinators in curop command
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.cpp6
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.h9
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp3
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp8
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h3
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/single_transaction_coordinator_stats.cpp56
-rw-r--r--src/mongo/db/s/single_transaction_coordinator_stats.h46
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp74
-rw-r--r--src/mongo/db/s/transaction_coordinator.h19
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.cpp16
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.h11
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog_test.cpp4
-rw-r--r--src/mongo/db/s/transaction_coordinator_curop.cpp33
-rw-r--r--src/mongo/db/s/transaction_coordinator_curop.h36
-rw-r--r--src/mongo/db/s/transaction_coordinator_curop_mongod.cpp38
-rw-r--r--src/mongo/db/s/transaction_coordinator_metrics_observer.cpp3
-rw-r--r--src/mongo/db/s/transaction_coordinator_metrics_observer.h6
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp54
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.h6
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp146
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.cpp18
-rw-r--r--src/mongo/db/s/transaction_coordinator_test_fixture.h7
-rw-r--r--src/mongo/embedded/SConscript1
-rw-r--r--src/mongo/embedded/transaction_coordinator_curop_embedded.cpp34
26 files changed, 579 insertions, 64 deletions
diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp
index 5cbd1501280..1c3a9166cfa 100644
--- a/src/mongo/db/pipeline/mongo_process_common.cpp
+++ b/src/mongo/db/pipeline/mongo_process_common.cpp
@@ -111,6 +111,12 @@ std::vector<BSONObj> MongoProcessCommon::getCurrentOps(
_reportCurrentOpsForIdleSessions(opCtx, userMode, &ops);
}
+ if (!ctxAuth->getAuthorizationManager().isAuthEnabled() ||
+ userMode == CurrentOpUserMode::kIncludeAll) {
+ _reportCurrentOpsForTransactionCoordinators(
+ opCtx, sessionMode == MongoProcessInterface::CurrentOpSessionsMode::kIncludeIdle, &ops);
+ }
+
return ops;
}
diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h
index 3ba69ba20a2..ff32827cba1 100644
--- a/src/mongo/db/pipeline/mongo_process_common.h
+++ b/src/mongo/db/pipeline/mongo_process_common.h
@@ -93,6 +93,15 @@ protected:
CurrentOpUserMode userMode,
std::vector<BSONObj>* ops) const = 0;
+
+ /**
+ * Report information about transaction coordinators by iterating through all
+ * TransactionCoordinators in the TransactionCoordinatorCatalog.
+ */
+ virtual void _reportCurrentOpsForTransactionCoordinators(OperationContext* opCtx,
+ bool includeIdle,
+ std::vector<BSONObj>* ops) const = 0;
+
/**
* Converts an array of field names into a set of FieldPath. Throws if 'fields' contains
* duplicate elements.
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 4e121045c23..cec0279587e 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -289,6 +289,9 @@ void MongoSInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx,
});
}
+void MongoSInterface::_reportCurrentOpsForTransactionCoordinators(
+ OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const {};
+
std::vector<GenericCursor> MongoSInterface::getIdleCursors(
const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const {
invariant(hasGlobalServiceContext());
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 8fb8a38b838..33e133adff2 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -255,6 +255,10 @@ protected:
void _reportCurrentOpsForIdleSessions(OperationContext* opCtx,
CurrentOpUserMode userMode,
std::vector<BSONObj>* ops) const final;
+
+ void _reportCurrentOpsForTransactionCoordinators(OperationContext* opCtx,
+ bool includeIdle,
+ std::vector<BSONObj>* ops) const final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index f939c320bc8..63ecf0efced 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -51,7 +51,8 @@
#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/s/transaction_coordinator_worker_curop_info.h"
+#include "mongo/db/s/transaction_coordinator_curop.h"
+#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/fill_locker_info.h"
@@ -616,6 +617,11 @@ BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient(
return builder.obj();
}
+void MongoInterfaceStandalone::_reportCurrentOpsForTransactionCoordinators(
+ OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const {
+ reportCurrentOpsForTransactionCoordinators(opCtx, includeIdle, ops);
+}
+
void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext* opCtx,
CurrentOpUserMode userMode,
std::vector<BSONObj>* ops) const {
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 7e45dd0eaa0..ef502e8d3f2 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -180,6 +180,9 @@ protected:
CurrentOpUserMode userMode,
std::vector<BSONObj>* ops) const final;
+ void _reportCurrentOpsForTransactionCoordinators(OperationContext* opCtx,
+ bool includeIdle,
+ std::vector<BSONObj>* ops) const final;
/**
* Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
*/
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 9f370363f73..6275d0224c8 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -18,6 +18,7 @@ env.Library(
'sharded_connection_info.cpp',
'sharding_migration_critical_section.cpp',
'sharding_state.cpp',
+ 'transaction_coordinator_curop.cpp',
'transaction_coordinator_factory.cpp',
'transaction_coordinator_worker_curop_repository.cpp',
],
@@ -127,6 +128,7 @@ env.Library(
'server_transaction_coordinators_metrics.cpp',
'single_transaction_coordinator_stats.cpp',
'transaction_coordinator_catalog.cpp',
+ 'transaction_coordinator_curop_mongod.cpp',
'transaction_coordinator_factory_mongod.cpp',
'transaction_coordinator_futures_util.cpp',
'transaction_coordinator_metrics_observer.cpp',
diff --git a/src/mongo/db/s/single_transaction_coordinator_stats.cpp b/src/mongo/db/s/single_transaction_coordinator_stats.cpp
index 2fc5db4e146..1ef6209f3c9 100644
--- a/src/mongo/db/s/single_transaction_coordinator_stats.cpp
+++ b/src/mongo/db/s/single_transaction_coordinator_stats.cpp
@@ -29,7 +29,9 @@
#include "mongo/platform/basic.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/s/single_transaction_coordinator_stats.h"
+#include "mongo/util/net/socket_utils.h"
namespace mongo {
@@ -188,4 +190,58 @@ Microseconds SingleTransactionCoordinatorStats::getDeletingCoordinatorDocDuratio
return tickSource->ticksTo<Microseconds>(curTick - _deletingCoordinatorDocStartTime);
}
+
+void SingleTransactionCoordinatorStats::reportMetrics(BSONObjBuilder& parent,
+ TickSource* tickSource,
+ TickSource::Tick curTick) const {
+ BSONObjBuilder stepDurationsBuilder;
+
+ invariant(_createTime);
+ parent.append("commitStartTime", _createWallClockTime);
+
+ if (_writingParticipantListStartTime) {
+ const auto statValue = getWritingParticipantListDuration(tickSource, curTick);
+ stepDurationsBuilder.append("writingParticipantListMicros",
+ durationCount<Microseconds>(statValue));
+
+ const auto statValue2 = getTwoPhaseCommitDuration(tickSource, curTick);
+ stepDurationsBuilder.append("totalCommitDurationMicros",
+ durationCount<Microseconds>(statValue2));
+ }
+
+ if (_waitingForVotesStartTime) {
+ const auto statValue = getWaitingForVotesDuration(tickSource, curTick);
+ stepDurationsBuilder.append("waitingForVotesMicros",
+ durationCount<Microseconds>(statValue));
+ }
+
+ if (_writingDecisionStartTime) {
+ const auto statValue = getWritingDecisionDuration(tickSource, curTick);
+ stepDurationsBuilder.append("writingDecisionMicros",
+ durationCount<Microseconds>(statValue));
+ }
+
+ if (_waitingForDecisionAcksStartTime) {
+ const auto statValue = getWaitingForDecisionAcksDuration(tickSource, curTick);
+ stepDurationsBuilder.append("waitingForDecisionAcksMicros",
+ durationCount<Microseconds>(statValue));
+ }
+
+ if (_deletingCoordinatorDocStartTime) {
+ const auto statValue = getDeletingCoordinatorDocDuration(tickSource, curTick);
+ stepDurationsBuilder.append("deletingCoordinatorDocMicros",
+ durationCount<Microseconds>(statValue));
+ }
+
+ parent.append("stepDurations", stepDurationsBuilder.obj());
+}
+
+void SingleTransactionCoordinatorStats::reportLastClient(BSONObjBuilder& parent) const {
+ parent.append("client", _lastClientInfo.clientHostAndPort);
+ parent.append("host", getHostNameCachedAndPort());
+ parent.append("connectionId", _lastClientInfo.connectionId);
+ parent.append("appName", _lastClientInfo.appName);
+ parent.append("clientMetadata", _lastClientInfo.clientMetadata);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/single_transaction_coordinator_stats.h b/src/mongo/db/s/single_transaction_coordinator_stats.h
index 05c75d7c076..1856bf9b054 100644
--- a/src/mongo/db/s/single_transaction_coordinator_stats.h
+++ b/src/mongo/db/s/single_transaction_coordinator_stats.h
@@ -29,6 +29,8 @@
#pragma once
+#include "mongo/db/client.h"
+#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/util/tick_source.h"
#include "mongo/util/time_support.h"
@@ -43,6 +45,25 @@ class SingleTransactionCoordinatorStats {
public:
SingleTransactionCoordinatorStats() = default;
+ struct LastClientInfo {
+ std::string clientHostAndPort;
+ long long connectionId;
+ BSONObj clientMetadata;
+ std::string appName;
+
+ void update(Client* client) {
+ if (client->hasRemote()) {
+ clientHostAndPort = client->getRemote().toString();
+ }
+ connectionId = client->getConnectionId();
+ if (const auto& metadata =
+ ClientMetadataIsMasterState::get(client).getClientMetadata()) {
+ clientMetadata = metadata.get().getDocument();
+ appName = metadata.get().getApplicationName().toString();
+ }
+ }
+ };
+
//
// Setters
//
@@ -238,6 +259,29 @@ public:
Microseconds getDeletingCoordinatorDocDuration(TickSource* tickSource,
TickSource::Tick curTick) const;
+ /**
+ * Reports the time duration for each step in the two-phase commit and stores them as a
+ * sub-document of the provided parent BSONObjBuilder. The metrics are stored under key
+ * "stepDurations" in the parent document.
+ */
+ void reportMetrics(BSONObjBuilder& parent,
+ TickSource* tickSource,
+ TickSource::Tick curTick) const;
+
+ /**
+ * Reports information about the last client to interact with this transaction.
+ */
+ void reportLastClient(BSONObjBuilder& parent) const;
+
+ /**
+ * Updates the LastClientInfo object stored in this SingleTransactionStats instance with the
+ * given Client's information.
+ */
+ void updateLastClientInfo(Client* client) {
+ invariant(client);
+ _lastClientInfo.update(client);
+ }
+
private:
Date_t _createWallClockTime;
TickSource::Tick _createTime{0};
@@ -261,6 +305,8 @@ private:
Date_t _endWallClockTime;
TickSource::Tick _endTime{0};
+
+ LastClientInfo _lastClientInfo;
};
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 87371bb2394..427ea581a05 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -86,18 +86,19 @@ ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service,
} // namespace
-TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
+TransactionCoordinator::TransactionCoordinator(OperationContext* operationContext,
const LogicalSessionId& lsid,
TxnNumber txnNumber,
std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
Date_t deadline)
- : _serviceContext(serviceContext),
+ : _serviceContext(operationContext->getServiceContext()),
_lsid(lsid),
_txnNumber(txnNumber),
_scheduler(std::move(scheduler)),
_sendPrepareScheduler(_scheduler->makeChildScheduler()),
_transactionCoordinatorMetricsObserver(
- std::make_unique<TransactionCoordinatorMetricsObserver>()) {
+ std::make_unique<TransactionCoordinatorMetricsObserver>()),
+ _deadline(deadline) {
auto kickOffCommitPF = makePromiseFuture<void>();
_kickOffCommitPromise = std::move(kickOffCommitPF.promise);
@@ -123,6 +124,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
});
// TODO: The duration will be meaningless after failover.
+ _updateAssociatedClient(operationContext->getClient());
_transactionCoordinatorMetricsObserver->onCreate(
ServerTransactionCoordinatorsMetrics::get(_serviceContext),
_serviceContext->getTickSource(),
@@ -326,10 +328,11 @@ TransactionCoordinator::~TransactionCoordinator() {
invariant(_completionPromise.getFuture().isReady());
}
-void TransactionCoordinator::runCommit(std::vector<ShardId> participants) {
+void TransactionCoordinator::runCommit(OperationContext* opCtx, std::vector<ShardId> participants) {
if (!_reserveKickOffCommitPromise())
return;
-
+ invariant(opCtx != nullptr && opCtx->getClient() != nullptr);
+ _updateAssociatedClient(opCtx->getClient());
_participants = std::move(participants);
_kickOffCommitPromise.emplaceValue();
}
@@ -486,4 +489,65 @@ std::string TransactionCoordinator::_twoPhaseCommitInfoForLog(
return s.str();
}
+TransactionCoordinator::Step TransactionCoordinator::getStep() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _step;
+}
+
+void TransactionCoordinator::reportState(BSONObjBuilder& parent) const {
+ BSONObjBuilder doc;
+ TickSource* tickSource = _serviceContext->getTickSource();
+ TickSource::Tick currentTick = tickSource->getTicks();
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ BSONObjBuilder lsidBuilder(doc.subobjStart("lsid"));
+ _lsid.serialize(&lsidBuilder);
+ lsidBuilder.doneFast();
+ doc.append("txnNumber", _txnNumber);
+
+ if (_participants) {
+ doc.append("numParticipants", static_cast<long long>(_participants->size()));
+ }
+
+ doc.append("state", toString(_step));
+
+ const auto& singleStats =
+ _transactionCoordinatorMetricsObserver->getSingleTransactionCoordinatorStats();
+ singleStats.reportMetrics(doc, tickSource, currentTick);
+ singleStats.reportLastClient(parent);
+
+ if (_decision)
+ doc.append("decision", _decision->toBSON());
+
+ doc.append("deadline", _deadline);
+
+ parent.append("desc", "transaction coordinator");
+ parent.append("twoPhaseCommitCoordinator", doc.obj());
+}
+
+std::string TransactionCoordinator::toString(Step step) const {
+ switch (step) {
+ case Step::kInactive:
+ return "inactive";
+ case Step::kWritingParticipantList:
+ return "writingParticipantList";
+ case Step::kWaitingForVotes:
+ return "waitingForVotes";
+ case Step::kWritingDecision:
+ return "writingDecision";
+ case Step::kWaitingForDecisionAcks:
+ return "waitingForDecisionAck";
+ case Step::kDeletingCoordinatorDoc:
+ return "deletingCoordinatorDoc";
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+void TransactionCoordinator::_updateAssociatedClient(Client* client) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _transactionCoordinatorMetricsObserver->updateLastClientInfo(client);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index 1aa811a038e..12005613f89 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -71,7 +71,7 @@ public:
* cause the coordinator to be put in a cancelled state, if runCommit is not eventually
* received.
*/
- TransactionCoordinator(ServiceContext* serviceContext,
+ TransactionCoordinator(OperationContext* operationContext,
const LogicalSessionId& lsid,
TxnNumber txnNumber,
std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
@@ -85,7 +85,7 @@ public:
*
* Subsequent calls will not re-run the commit process.
*/
- void runCommit(std::vector<ShardId> participantShards);
+ void runCommit(OperationContext* opCtx, std::vector<ShardId> participantShards);
/**
* To be used to continue coordinating a transaction on step up.
@@ -124,7 +124,15 @@ public:
return *_transactionCoordinatorMetricsObserver;
}
+ void reportState(BSONObjBuilder& parent) const;
+ std::string toString(Step step) const;
+
+ Step getStep() const;
+
+
private:
+ void _updateAssociatedClient(Client* client);
+
bool _reserveKickOffCommitPromise();
/**
@@ -190,9 +198,12 @@ private:
// onCompletion.
SharedPromise<txn::CommitDecision> _completionPromise;
- // Store as unique_ptr to avoid a circular dependency between the TransactionCoordinator and the
- // TransactionCoordinatorMetricsObserver.
+ // Store as unique_ptr to avoid a circular dependency between the TransactionCoordinator and
+ // the TransactionCoordinatorMetricsObserver.
std::unique_ptr<TransactionCoordinatorMetricsObserver> _transactionCoordinatorMetricsObserver;
+
+ // The deadline for the TransactionCoordinator to reach a decision
+ Date_t _deadline;
};
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp
index 3cb7308a8bc..5a5c029833b 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp
@@ -218,4 +218,20 @@ std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const {
return ss.str();
}
+void TransactionCoordinatorCatalog::filter(FilterPredicate predicate, FilterVisitor visitor) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ for (auto sessionIt = _coordinatorsBySession.begin(); sessionIt != _coordinatorsBySession.end();
+ ++sessionIt) {
+ auto& lsid = sessionIt->first;
+ auto& coordinatorsByTxnNumber = sessionIt->second;
+ for (auto txnIt = coordinatorsByTxnNumber.begin(); txnIt != coordinatorsByTxnNumber.end();
+ ++txnIt) {
+ auto txnNumber = txnIt->first;
+ auto& transactionCoordinator = txnIt->second;
+ if (predicate(lsid, txnNumber, transactionCoordinator)) {
+ visitor(lsid, txnNumber, transactionCoordinator);
+ }
+ }
+ }
+}
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.h b/src/mongo/db/s/transaction_coordinator_catalog.h
index 0ae1c59e00b..5768c69bb3c 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.h
+++ b/src/mongo/db/s/transaction_coordinator_catalog.h
@@ -104,6 +104,17 @@ public:
*/
std::string toString() const;
+ using FilterPredicate =
+ std::function<bool(const LogicalSessionId lsid,
+ const TxnNumber txnNumber,
+ const std::shared_ptr<TransactionCoordinator> transactionCoordinator)>;
+ using FilterVisitor =
+ std::function<void(const LogicalSessionId lsid,
+ const TxnNumber txnNumber,
+ const std::shared_ptr<TransactionCoordinator> transactionCoordinator)>;
+
+ void filter(FilterPredicate predicate, FilterVisitor visitor);
+
private:
// Map of transaction coordinators, ordered in decreasing transaction number with the most
// recent transaction at the front
diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
index 09e949ef9fe..891191ab220 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
@@ -57,7 +57,7 @@ protected:
LogicalSessionId lsid,
TxnNumber txnNumber) {
auto newCoordinator = std::make_shared<TransactionCoordinator>(
- getServiceContext(),
+ operationContext(),
lsid,
txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
@@ -172,7 +172,7 @@ TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoC
TransactionCoordinatorCatalog catalog;
catalog.exitStepUp(Status::OK());
- auto coordinator = std::make_shared<TransactionCoordinator>(getServiceContext(),
+ auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(),
lsid,
txnNumber,
aws.makeChildScheduler(),
diff --git a/src/mongo/db/s/transaction_coordinator_curop.cpp b/src/mongo/db/s/transaction_coordinator_curop.cpp
new file mode 100644
index 00000000000..29df19804c0
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_curop.cpp
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/db/s/transaction_coordinator_curop.h"
+
+namespace mongo {
+MONGO_DEFINE_SHIM(reportCurrentOpsForTransactionCoordinators);
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_curop.h b/src/mongo/db/s/transaction_coordinator_curop.h
new file mode 100644
index 00000000000..835a969c757
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_curop.h
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/db/pipeline/mongos_process_interface.h"
+
+namespace mongo {
+extern MONGO_DECLARE_SHIM((OperationContext * opCtx, bool includeIdle, std::vector<BSONObj>* ops)
+ ->void) reportCurrentOpsForTransactionCoordinators;
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_curop_mongod.cpp b/src/mongo/db/s/transaction_coordinator_curop_mongod.cpp
new file mode 100644
index 00000000000..47104250272
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_curop_mongod.cpp
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/db/s/transaction_coordinator_curop.h"
+
+#include "mongo/db/s/transaction_coordinator_service.h"
+
+namespace mongo {
+MONGO_REGISTER_SHIM(reportCurrentOpsForTransactionCoordinators)
+(OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops)->void {
+ TransactionCoordinatorService::get(opCtx)->reportCoordinators(opCtx, includeIdle, ops);
+}
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp b/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp
index c1fafbca96c..42b82236430 100644
--- a/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp
+++ b/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp
@@ -195,4 +195,7 @@ void TransactionCoordinatorMetricsObserver::_decrementLastStep(
}
}
+void TransactionCoordinatorMetricsObserver::updateLastClientInfo(Client* client) {
+ _singleTransactionCoordinatorStats.updateLastClientInfo(client);
+}
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_metrics_observer.h b/src/mongo/db/s/transaction_coordinator_metrics_observer.h
index de32c9e5705..9d73e64f6c7 100644
--- a/src/mongo/db/s/transaction_coordinator_metrics_observer.h
+++ b/src/mongo/db/s/transaction_coordinator_metrics_observer.h
@@ -112,6 +112,12 @@ public:
return _singleTransactionCoordinatorStats;
}
+
+ /**
+ * Save information about the last client that interacted with this transaction.
+ */
+ void updateLastClientInfo(Client* client);
+
private:
/**
* Decrements the current active in 'step'.
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index 7109c0cadf6..a8e72285cd6 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -78,14 +78,47 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
latestCoordinator->cancelIfCommitNotYetStarted();
}
- catalog.insert(opCtx,
- lsid,
- txnNumber,
- std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(),
- lsid,
- txnNumber,
- scheduler.makeChildScheduler(),
- commitDeadline));
+ auto coordinator = std::make_shared<TransactionCoordinator>(
+ opCtx, lsid, txnNumber, scheduler.makeChildScheduler(), commitDeadline);
+
+ catalog.insert(opCtx, lsid, txnNumber, coordinator);
+}
+
+
+void TransactionCoordinatorService::reportCoordinators(OperationContext* opCtx,
+ bool includeIdle,
+ std::vector<BSONObj>* ops) {
+ std::shared_ptr<CatalogAndScheduler> cas;
+ try {
+ cas = _getCatalogAndScheduler(opCtx);
+ } catch (ExceptionFor<ErrorCodes::NotMaster>&) {
+ // If we are not master, don't include any output for transaction coordinators in
+ // the curOp command.
+ return;
+ }
+
+ auto& catalog = cas->catalog;
+
+ auto predicate =
+ [includeIdle](const LogicalSessionId lsid,
+ const TxnNumber txnNumber,
+ const std::shared_ptr<TransactionCoordinator> transactionCoordinator) {
+ TransactionCoordinator::Step step = transactionCoordinator->getStep();
+ if (includeIdle || step > TransactionCoordinator::Step::kInactive) {
+ return true;
+ }
+ return false;
+ };
+
+ auto reporter = [ops](const LogicalSessionId lsid,
+ const TxnNumber txnNumber,
+ const std::shared_ptr<TransactionCoordinator> transactionCoordinator) {
+ BSONObjBuilder doc;
+ transactionCoordinator->reportState(doc);
+ ops->push_back(doc.obj());
+ };
+
+ catalog.filter(predicate, reporter);
}
boost::optional<SharedSemiFuture<txn::CommitDecision>>
@@ -101,7 +134,8 @@ TransactionCoordinatorService::coordinateCommit(OperationContext* opCtx,
return boost::none;
}
- coordinator->runCommit(std::vector<ShardId>{participantList.begin(), participantList.end()});
+ coordinator->runCommit(opCtx,
+ std::vector<ShardId>{participantList.begin(), participantList.end()});
return coordinator->onCompletion();
@@ -180,7 +214,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
const auto txnNumber = *doc.getId().getTxnNumber();
auto coordinator = std::make_shared<TransactionCoordinator>(
- service,
+ opCtx,
lsid,
txnNumber,
scheduler.makeChildScheduler(),
diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h
index a4e5fafa1d9..c200809744f 100644
--- a/src/mongo/db/s/transaction_coordinator_service.h
+++ b/src/mongo/db/s/transaction_coordinator_service.h
@@ -58,6 +58,12 @@ public:
Date_t commitDeadline);
/**
+ * Outputs a vector of BSON documents to the ops out-param containing information about active
+ * and idle coordinators in the system.
+ */
+ void reportCoordinators(OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops);
+
+ /**
* If a coordinator for the (lsid, txnNumber) exists, delivers the participant list to the
* coordinator, which will cause the coordinator to start coordinating the commit if the
* coordinator had not yet received a list, and returns a Future that will contain the decision
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index 86d830341cb..d85c0d7e15a 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -809,12 +809,12 @@ using TransactionCoordinatorTest = TransactionCoordinatorTestBase;
TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitResponses) {
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
assertPrepareSentAndRespondWithSuccess();
@@ -831,12 +831,12 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitRes
TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommitResponses) {
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
@@ -853,12 +853,12 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommi
TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbortResponses) {
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; },
@@ -875,12 +875,12 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbor
TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortResponseOnly) {
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -898,12 +898,12 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortRe
TEST_F(TransactionCoordinatorTest,
RunCommitProducesAbortDecisionOnOneCommitResponseAndOneAbortResponseAfterRetry) {
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
// One participant votes commit and other encounters retryable error
@@ -926,12 +926,12 @@ TEST_F(TransactionCoordinatorTest,
TEST_F(TransactionCoordinatorTest,
RunCommitProducesAbortDecisionOnOneAbortResponseAndOneRetryableAbortResponse) {
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
// One participant votes abort and other encounters retryable error
@@ -951,12 +951,12 @@ TEST_F(TransactionCoordinatorTest,
TEST_F(TransactionCoordinatorTest,
RunCommitProducesCommitDecisionOnCommitAfterMultipleNetworkRetries) {
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
// One participant votes commit after retry.
@@ -983,12 +983,12 @@ TEST_F(TransactionCoordinatorTest,
TEST_F(TransactionCoordinatorTest,
RunCommitProducesReadConcernMajorityNotEnabledIfEitherShardReturnsIt) {
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
// One participant votes commit and other encounters retryable error
@@ -1188,6 +1188,28 @@ public:
serverStatusSection.getObjectField("currentInSteps")["deletingCoordinatorDoc"].Long());
}
+ static void assertClientReportStateFields(BSONObj doc, std::string appName, int connectionId) {
+ ASSERT_EQ(StringData(doc.getStringField("appName")), appName);
+ ASSERT_EQ(doc.getIntField("connectionId"), connectionId);
+
+ auto expectedDriverName = std::string("DriverName").insert(0, appName);
+ auto expectedDriverVersion = std::string("DriverVersion").insert(0, appName);
+ auto expectedOsType = std::string("OsType").insert(0, appName);
+ auto expectedOsName = std::string("OsName").insert(0, appName);
+ auto expectedOsArch = std::string("OsArchitecture").insert(0, appName);
+ auto expectedOsVersion = std::string("OsVersion").insert(0, appName);
+
+ ASSERT_TRUE(doc.hasField("clientMetadata"));
+ auto driver = doc.getObjectField("clientMetadata").getObjectField("driver");
+ ASSERT_EQ(StringData(driver.getStringField("name")), expectedDriverName);
+ ASSERT_EQ(StringData(driver.getStringField("version")), expectedDriverVersion);
+ auto os = doc.getObjectField("clientMetadata").getObjectField("os");
+ ASSERT_EQ(StringData(os.getStringField("type")), expectedOsType);
+ ASSERT_EQ(StringData(os.getStringField("name")), expectedOsName);
+ ASSERT_EQ(StringData(os.getStringField("architecture")), expectedOsArch);
+ ASSERT_EQ(StringData(os.getStringField("version")), expectedOsVersion);
+ }
+
Date_t advanceClockSourceAndReturnNewNow() {
const auto newNow = Date_t::now();
clockSource()->reset(newNow);
@@ -1198,13 +1220,13 @@ public:
startCapturingLogMessages();
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess();
assertPrepareSentAndRespondWithSuccess();
@@ -1498,7 +1520,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) {
expectedMetrics.totalCreated++;
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
@@ -1524,7 +1546,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) {
BSON("mode"
<< "alwaysOn"
<< "data" << BSON("useUninterruptibleSleep" << 1)));
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
waitUntilCoordinatorDocIsPresent();
checkStats(stats, expectedStats);
@@ -1672,7 +1694,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) {
expectedMetrics.totalCreated++;
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
@@ -1719,7 +1741,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
TransactionCoordinator coordinator(
- getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -1762,7 +1784,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
TransactionCoordinator coordinator(
- getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -1779,7 +1801,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
expectedMetrics.currentWritingParticipantList++;
FailPointEnableBlock fp("hangBeforeWaitingForParticipantListWriteConcern");
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
waitUntilCoordinatorDocIsPresent();
checkStats(stats, expectedStats);
@@ -1824,7 +1846,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
TransactionCoordinator coordinator(
- getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -1840,7 +1862,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
expectedMetrics.totalStartedTwoPhaseCommit++;
expectedMetrics.currentWaitingForVotes++;
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
waitUntilMessageSent();
checkStats(stats, expectedStats);
@@ -1888,7 +1910,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
TransactionCoordinator coordinator(
- getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -1906,7 +1928,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
FailPointEnableBlock fp("hangBeforeWaitingForDecisionWriteConcern");
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
// Respond to the second prepare request in a separate thread, because the coordinator will
// hijack that thread to run its continuation.
assertPrepareSentAndRespondWithSuccess();
@@ -1956,7 +1978,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
TransactionCoordinator coordinator(
- getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -1972,7 +1994,7 @@ TEST_F(TransactionCoordinatorMetricsTest,
expectedMetrics.totalStartedTwoPhaseCommit++;
expectedMetrics.currentWaitingForDecisionAcks++;
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
// Respond to the second prepare request in a separate thread, because the coordinator will
// hijack that thread to run its continuation.
assertPrepareSentAndRespondWithSuccess();
@@ -2026,7 +2048,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina
auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext());
auto awsPtr = aws.get();
TransactionCoordinator coordinator(
- getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
+ operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max());
const auto& stats =
coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats();
@@ -2044,7 +2066,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina
FailPointEnableBlock fp("hangAfterDeletingCoordinatorDoc");
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
// Respond to the second prepare request in a separate thread, because the coordinator will
// hijack that thread to run its continuation.
assertPrepareSentAndRespondWithSuccess();
@@ -2109,13 +2131,13 @@ TEST_F(TransactionCoordinatorMetricsTest, DoesNotLogTransactionsUnderSlowMSThres
startCapturingLogMessages();
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
tickSource()->advance(Milliseconds(99));
@@ -2141,7 +2163,7 @@ TEST_F(
startCapturingLogMessages();
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
@@ -2149,7 +2171,7 @@ TEST_F(
tickSource()->advance(Milliseconds(101));
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess();
assertPrepareSentAndRespondWithSuccess();
@@ -2171,13 +2193,13 @@ TEST_F(TransactionCoordinatorMetricsTest, LogsTransactionsOverSlowMSThreshold) {
startCapturingLogMessages();
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
tickSource()->advance(Milliseconds(101));
@@ -2213,13 +2235,13 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesTerminationCauseFor
startCapturingLogMessages();
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
Date_t::max());
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess();
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -2243,7 +2265,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesStepDurationsAndTot
startCapturingLogMessages();
TransactionCoordinator coordinator(
- getServiceContext(),
+ operationContext(),
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
@@ -2253,7 +2275,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesStepDurationsAndTot
FailPointEnableBlock fp("hangBeforeWaitingForParticipantListWriteConcern",
BSON("useUninterruptibleSleep" << 1));
- coordinator.runCommit(kTwoShardIdList);
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
waitUntilCoordinatorDocIsPresent();
// Increase the duration spent writing the participant list.
@@ -2358,5 +2380,45 @@ TEST_F(TransactionCoordinatorMetricsTest,
checkServerStatus();
}
+TEST_F(TransactionCoordinatorMetricsTest, ClientInformationIncludedInReportState) {
+ const auto expectedAppName = std::string("Foo");
+ associateClientMetadata(getClient(), expectedAppName);
+
+ TransactionCoordinator coordinator(
+ operationContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ Date_t::max());
+
+ {
+ BSONObjBuilder builder;
+ coordinator.reportState(builder);
+ BSONObj reportDoc = builder.obj();
+ ASSERT_EQ(StringData(reportDoc.getStringField("desc")), "transaction coordinator");
+ assertClientReportStateFields(reportDoc, expectedAppName, getClient()->getConnectionId());
+ }
+
+ const auto expectedAppName2 = std::string("Bar");
+ associateClientMetadata(getClient(), expectedAppName2);
+
+ coordinator.runCommit(operationContext(), kTwoShardIdList);
+
+ {
+ BSONObjBuilder builder;
+ coordinator.reportState(builder);
+ BSONObj reportDoc = builder.obj();
+ ASSERT_EQ(StringData(reportDoc.getStringField("desc")), "transaction coordinator");
+ assertClientReportStateFields(reportDoc, expectedAppName2, getClient()->getConnectionId());
+ }
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ coordinator.onCompletion().get();
+}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
index 6554461dd97..2891f938e85 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp
@@ -38,6 +38,8 @@
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/wait_for_majority_service.h"
+#include "mongo/rpc/metadata/client_metadata.h"
+#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/unittest/unittest.h"
@@ -123,4 +125,20 @@ void TransactionCoordinatorTestFixture::advanceClockAndExecuteScheduledTasks() {
network()->advanceTime(network()->now() + Seconds{1});
}
+void TransactionCoordinatorTestFixture::associateClientMetadata(Client* client,
+ std::string appName) {
+ BSONObjBuilder metadataBuilder;
+ ASSERT_OK(ClientMetadata::serializePrivate(std::string("DriverName").insert(0, appName),
+ std::string("DriverVersion").insert(0, appName),
+ std::string("OsType").insert(0, appName),
+ std::string("OsName").insert(0, appName),
+ std::string("OsArchitecture").insert(0, appName),
+ std::string("OsVersion").insert(0, appName),
+ appName,
+ &metadataBuilder));
+ auto clientMetadata = metadataBuilder.obj();
+ auto clientMetadataParse = ClientMetadata::parse(clientMetadata["client"]);
+ ClientMetadataIsMasterState::setClientMetadata(client,
+ std::move(clientMetadataParse.getValue()));
+}
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.h b/src/mongo/db/s/transaction_coordinator_test_fixture.h
index 5f1067a9320..03763d3e2a1 100644
--- a/src/mongo/db/s/transaction_coordinator_test_fixture.h
+++ b/src/mongo/db/s/transaction_coordinator_test_fixture.h
@@ -66,6 +66,13 @@ protected:
*/
void advanceClockAndExecuteScheduledTasks();
+
+ /**
+ * Associates metatadata with the provided client. Metadata fields have appName prepended to
+ * thier value.
+ */
+ static void associateClientMetadata(Client* client, std::string appName);
+
const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}};
const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}};
const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}};
diff --git a/src/mongo/embedded/SConscript b/src/mongo/embedded/SConscript
index 5e1c75c9a2e..41650b01a4e 100644
--- a/src/mongo/embedded/SConscript
+++ b/src/mongo/embedded/SConscript
@@ -79,6 +79,7 @@ env.Library(
'service_entry_point_embedded.cpp',
'transaction_coordinator_factory_embedded.cpp',
'transaction_coordinator_worker_curop_repository_embedded.cpp',
+ 'transaction_coordinator_curop_embedded.cpp',
env.Idlc('embedded_options.idl')[0],
],
LIBDEPS=[
diff --git a/src/mongo/embedded/transaction_coordinator_curop_embedded.cpp b/src/mongo/embedded/transaction_coordinator_curop_embedded.cpp
new file mode 100644
index 00000000000..ecec687a3e7
--- /dev/null
+++ b/src/mongo/embedded/transaction_coordinator_curop_embedded.cpp
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/db/s/transaction_coordinator_curop.h"
+
+namespace mongo {
+MONGO_REGISTER_SHIM(reportCurrentOpsForTransactionCoordinators)
+(OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops)->void {}
+} // namespace mongo