diff options
author | Lamont Nelson <lamont.nelson@mongodb.com> | 2019-09-05 18:32:19 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-05 18:32:19 +0000 |
commit | b46eb782c4730c8df589a56550c29c3f3585904a (patch) | |
tree | 908a3dffce58c449a9d241f193c99e1e6ec28bee /src/mongo | |
parent | 174736c49e4a284d12c2d31a3f9c8bf341a35c65 (diff) | |
download | mongo-b46eb782c4730c8df589a56550c29c3f3585904a.tar.gz |
SERVER-42809 report metrics for the transaction coordinators in curop command
Diffstat (limited to 'src/mongo')
26 files changed, 579 insertions, 64 deletions
diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp index 5cbd1501280..1c3a9166cfa 100644 --- a/src/mongo/db/pipeline/mongo_process_common.cpp +++ b/src/mongo/db/pipeline/mongo_process_common.cpp @@ -111,6 +111,12 @@ std::vector<BSONObj> MongoProcessCommon::getCurrentOps( _reportCurrentOpsForIdleSessions(opCtx, userMode, &ops); } + if (!ctxAuth->getAuthorizationManager().isAuthEnabled() || + userMode == CurrentOpUserMode::kIncludeAll) { + _reportCurrentOpsForTransactionCoordinators( + opCtx, sessionMode == MongoProcessInterface::CurrentOpSessionsMode::kIncludeIdle, &ops); + } + return ops; } diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h index 3ba69ba20a2..ff32827cba1 100644 --- a/src/mongo/db/pipeline/mongo_process_common.h +++ b/src/mongo/db/pipeline/mongo_process_common.h @@ -93,6 +93,15 @@ protected: CurrentOpUserMode userMode, std::vector<BSONObj>* ops) const = 0; + + /** + * Report information about transaction coordinators by iterating through all + * TransactionCoordinators in the TransactionCoordinatorCatalog. + */ + virtual void _reportCurrentOpsForTransactionCoordinators(OperationContext* opCtx, + bool includeIdle, + std::vector<BSONObj>* ops) const = 0; + /** * Converts an array of field names into a set of FieldPath. Throws if 'fields' contains * duplicate elements. diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 4e121045c23..cec0279587e 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -289,6 +289,9 @@ void MongoSInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, }); } +void MongoSInterface::_reportCurrentOpsForTransactionCoordinators( + OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const {}; + std::vector<GenericCursor> MongoSInterface::getIdleCursors( const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { invariant(hasGlobalServiceContext()); diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 8fb8a38b838..33e133adff2 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -255,6 +255,10 @@ protected: void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, CurrentOpUserMode userMode, std::vector<BSONObj>* ops) const final; + + void _reportCurrentOpsForTransactionCoordinators(OperationContext* opCtx, + bool includeIdle, + std::vector<BSONObj>* ops) const final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index f939c320bc8..63ecf0efced 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -51,7 +51,8 @@ #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/transaction_coordinator_worker_curop_info.h" +#include "mongo/db/s/transaction_coordinator_curop.h" +#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h" #include "mongo/db/session_catalog.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/stats/fill_locker_info.h" @@ -616,6 +617,11 @@ BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient( return builder.obj(); } +void MongoInterfaceStandalone::_reportCurrentOpsForTransactionCoordinators( + OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const { + reportCurrentOpsForTransactionCoordinators(opCtx, includeIdle, ops); +} + void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, CurrentOpUserMode userMode, std::vector<BSONObj>* ops) const { diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 7e45dd0eaa0..ef502e8d3f2 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -180,6 +180,9 @@ protected: CurrentOpUserMode userMode, std::vector<BSONObj>* ops) const final; + void _reportCurrentOpsForTransactionCoordinators(OperationContext* opCtx, + bool includeIdle, + std::vector<BSONObj>* ops) const final; /** * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. */ diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 9f370363f73..6275d0224c8 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -18,6 +18,7 @@ env.Library( 'sharded_connection_info.cpp', 'sharding_migration_critical_section.cpp', 'sharding_state.cpp', + 'transaction_coordinator_curop.cpp', 'transaction_coordinator_factory.cpp', 'transaction_coordinator_worker_curop_repository.cpp', ], @@ -127,6 +128,7 @@ env.Library( 'server_transaction_coordinators_metrics.cpp', 'single_transaction_coordinator_stats.cpp', 'transaction_coordinator_catalog.cpp', + 'transaction_coordinator_curop_mongod.cpp', 'transaction_coordinator_factory_mongod.cpp', 'transaction_coordinator_futures_util.cpp', 'transaction_coordinator_metrics_observer.cpp', diff --git a/src/mongo/db/s/single_transaction_coordinator_stats.cpp b/src/mongo/db/s/single_transaction_coordinator_stats.cpp index 2fc5db4e146..1ef6209f3c9 100644 --- a/src/mongo/db/s/single_transaction_coordinator_stats.cpp +++ b/src/mongo/db/s/single_transaction_coordinator_stats.cpp @@ -29,7 +29,9 @@ #include "mongo/platform/basic.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/s/single_transaction_coordinator_stats.h" +#include "mongo/util/net/socket_utils.h" namespace mongo { @@ -188,4 +190,58 @@ Microseconds SingleTransactionCoordinatorStats::getDeletingCoordinatorDocDuratio return tickSource->ticksTo<Microseconds>(curTick - _deletingCoordinatorDocStartTime); } + +void SingleTransactionCoordinatorStats::reportMetrics(BSONObjBuilder& parent, + TickSource* tickSource, + TickSource::Tick curTick) const { + BSONObjBuilder stepDurationsBuilder; + + invariant(_createTime); + parent.append("commitStartTime", _createWallClockTime); + + if (_writingParticipantListStartTime) { + const auto statValue = getWritingParticipantListDuration(tickSource, curTick); + stepDurationsBuilder.append("writingParticipantListMicros", + durationCount<Microseconds>(statValue)); + + const auto statValue2 = getTwoPhaseCommitDuration(tickSource, curTick); + stepDurationsBuilder.append("totalCommitDurationMicros", + durationCount<Microseconds>(statValue2)); + } + + if (_waitingForVotesStartTime) { + const auto statValue = getWaitingForVotesDuration(tickSource, curTick); + stepDurationsBuilder.append("waitingForVotesMicros", + durationCount<Microseconds>(statValue)); + } + + if (_writingDecisionStartTime) { + const auto statValue = getWritingDecisionDuration(tickSource, curTick); + stepDurationsBuilder.append("writingDecisionMicros", + durationCount<Microseconds>(statValue)); + } + + if (_waitingForDecisionAcksStartTime) { + const auto statValue = getWaitingForDecisionAcksDuration(tickSource, curTick); + stepDurationsBuilder.append("waitingForDecisionAcksMicros", + durationCount<Microseconds>(statValue)); + } + + if (_deletingCoordinatorDocStartTime) { + const auto statValue = getDeletingCoordinatorDocDuration(tickSource, curTick); + stepDurationsBuilder.append("deletingCoordinatorDocMicros", + durationCount<Microseconds>(statValue)); + } + + parent.append("stepDurations", stepDurationsBuilder.obj()); +} + +void SingleTransactionCoordinatorStats::reportLastClient(BSONObjBuilder& parent) const { + parent.append("client", _lastClientInfo.clientHostAndPort); + parent.append("host", getHostNameCachedAndPort()); + parent.append("connectionId", _lastClientInfo.connectionId); + parent.append("appName", _lastClientInfo.appName); + parent.append("clientMetadata", _lastClientInfo.clientMetadata); +} + } // namespace mongo diff --git a/src/mongo/db/s/single_transaction_coordinator_stats.h b/src/mongo/db/s/single_transaction_coordinator_stats.h index 05c75d7c076..1856bf9b054 100644 --- a/src/mongo/db/s/single_transaction_coordinator_stats.h +++ b/src/mongo/db/s/single_transaction_coordinator_stats.h @@ -29,6 +29,8 @@ #pragma once +#include "mongo/db/client.h" +#include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/util/tick_source.h" #include "mongo/util/time_support.h" @@ -43,6 +45,25 @@ class SingleTransactionCoordinatorStats { public: SingleTransactionCoordinatorStats() = default; + struct LastClientInfo { + std::string clientHostAndPort; + long long connectionId; + BSONObj clientMetadata; + std::string appName; + + void update(Client* client) { + if (client->hasRemote()) { + clientHostAndPort = client->getRemote().toString(); + } + connectionId = client->getConnectionId(); + if (const auto& metadata = + ClientMetadataIsMasterState::get(client).getClientMetadata()) { + clientMetadata = metadata.get().getDocument(); + appName = metadata.get().getApplicationName().toString(); + } + } + }; + // // Setters // @@ -238,6 +259,29 @@ public: Microseconds getDeletingCoordinatorDocDuration(TickSource* tickSource, TickSource::Tick curTick) const; + /** + * Reports the time duration for each step in the two-phase commit and stores them as a + * sub-document of the provided parent BSONObjBuilder. The metrics are stored under key + * "stepDurations" in the parent document. + */ + void reportMetrics(BSONObjBuilder& parent, + TickSource* tickSource, + TickSource::Tick curTick) const; + + /** + * Reports information about the last client to interact with this transaction. + */ + void reportLastClient(BSONObjBuilder& parent) const; + + /** + * Updates the LastClientInfo object stored in this SingleTransactionStats instance with the + * given Client's information. + */ + void updateLastClientInfo(Client* client) { + invariant(client); + _lastClientInfo.update(client); + } + private: Date_t _createWallClockTime; TickSource::Tick _createTime{0}; @@ -261,6 +305,8 @@ private: Date_t _endWallClockTime; TickSource::Tick _endTime{0}; + + LastClientInfo _lastClientInfo; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 87371bb2394..427ea581a05 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -86,18 +86,19 @@ ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service, } // namespace -TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, +TransactionCoordinator::TransactionCoordinator(OperationContext* operationContext, const LogicalSessionId& lsid, TxnNumber txnNumber, std::unique_ptr<txn::AsyncWorkScheduler> scheduler, Date_t deadline) - : _serviceContext(serviceContext), + : _serviceContext(operationContext->getServiceContext()), _lsid(lsid), _txnNumber(txnNumber), _scheduler(std::move(scheduler)), _sendPrepareScheduler(_scheduler->makeChildScheduler()), _transactionCoordinatorMetricsObserver( - std::make_unique<TransactionCoordinatorMetricsObserver>()) { + std::make_unique<TransactionCoordinatorMetricsObserver>()), + _deadline(deadline) { auto kickOffCommitPF = makePromiseFuture<void>(); _kickOffCommitPromise = std::move(kickOffCommitPF.promise); @@ -123,6 +124,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, }); // TODO: The duration will be meaningless after failover. + _updateAssociatedClient(operationContext->getClient()); _transactionCoordinatorMetricsObserver->onCreate( ServerTransactionCoordinatorsMetrics::get(_serviceContext), _serviceContext->getTickSource(), @@ -326,10 +328,11 @@ TransactionCoordinator::~TransactionCoordinator() { invariant(_completionPromise.getFuture().isReady()); } -void TransactionCoordinator::runCommit(std::vector<ShardId> participants) { +void TransactionCoordinator::runCommit(OperationContext* opCtx, std::vector<ShardId> participants) { if (!_reserveKickOffCommitPromise()) return; - + invariant(opCtx != nullptr && opCtx->getClient() != nullptr); + _updateAssociatedClient(opCtx->getClient()); _participants = std::move(participants); _kickOffCommitPromise.emplaceValue(); } @@ -486,4 +489,65 @@ std::string TransactionCoordinator::_twoPhaseCommitInfoForLog( return s.str(); } +TransactionCoordinator::Step TransactionCoordinator::getStep() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _step; +} + +void TransactionCoordinator::reportState(BSONObjBuilder& parent) const { + BSONObjBuilder doc; + TickSource* tickSource = _serviceContext->getTickSource(); + TickSource::Tick currentTick = tickSource->getTicks(); + + stdx::lock_guard<stdx::mutex> lk(_mutex); + + BSONObjBuilder lsidBuilder(doc.subobjStart("lsid")); + _lsid.serialize(&lsidBuilder); + lsidBuilder.doneFast(); + doc.append("txnNumber", _txnNumber); + + if (_participants) { + doc.append("numParticipants", static_cast<long long>(_participants->size())); + } + + doc.append("state", toString(_step)); + + const auto& singleStats = + _transactionCoordinatorMetricsObserver->getSingleTransactionCoordinatorStats(); + singleStats.reportMetrics(doc, tickSource, currentTick); + singleStats.reportLastClient(parent); + + if (_decision) + doc.append("decision", _decision->toBSON()); + + doc.append("deadline", _deadline); + + parent.append("desc", "transaction coordinator"); + parent.append("twoPhaseCommitCoordinator", doc.obj()); +} + +std::string TransactionCoordinator::toString(Step step) const { + switch (step) { + case Step::kInactive: + return "inactive"; + case Step::kWritingParticipantList: + return "writingParticipantList"; + case Step::kWaitingForVotes: + return "waitingForVotes"; + case Step::kWritingDecision: + return "writingDecision"; + case Step::kWaitingForDecisionAcks: + return "waitingForDecisionAck"; + case Step::kDeletingCoordinatorDoc: + return "deletingCoordinatorDoc"; + default: + MONGO_UNREACHABLE; + } +} + +void TransactionCoordinator::_updateAssociatedClient(Client* client) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _transactionCoordinatorMetricsObserver->updateLastClientInfo(client); +} + } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index 1aa811a038e..12005613f89 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -71,7 +71,7 @@ public: * cause the coordinator to be put in a cancelled state, if runCommit is not eventually * received. */ - TransactionCoordinator(ServiceContext* serviceContext, + TransactionCoordinator(OperationContext* operationContext, const LogicalSessionId& lsid, TxnNumber txnNumber, std::unique_ptr<txn::AsyncWorkScheduler> scheduler, @@ -85,7 +85,7 @@ public: * * Subsequent calls will not re-run the commit process. */ - void runCommit(std::vector<ShardId> participantShards); + void runCommit(OperationContext* opCtx, std::vector<ShardId> participantShards); /** * To be used to continue coordinating a transaction on step up. @@ -124,7 +124,15 @@ public: return *_transactionCoordinatorMetricsObserver; } + void reportState(BSONObjBuilder& parent) const; + std::string toString(Step step) const; + + Step getStep() const; + + private: + void _updateAssociatedClient(Client* client); + bool _reserveKickOffCommitPromise(); /** @@ -190,9 +198,12 @@ private: // onCompletion. SharedPromise<txn::CommitDecision> _completionPromise; - // Store as unique_ptr to avoid a circular dependency between the TransactionCoordinator and the - // TransactionCoordinatorMetricsObserver. + // Store as unique_ptr to avoid a circular dependency between the TransactionCoordinator and + // the TransactionCoordinatorMetricsObserver. std::unique_ptr<TransactionCoordinatorMetricsObserver> _transactionCoordinatorMetricsObserver; + + // The deadline for the TransactionCoordinator to reach a decision + Date_t _deadline; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp index 3cb7308a8bc..5a5c029833b 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp @@ -218,4 +218,20 @@ std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const { return ss.str(); } +void TransactionCoordinatorCatalog::filter(FilterPredicate predicate, FilterVisitor visitor) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + for (auto sessionIt = _coordinatorsBySession.begin(); sessionIt != _coordinatorsBySession.end(); + ++sessionIt) { + auto& lsid = sessionIt->first; + auto& coordinatorsByTxnNumber = sessionIt->second; + for (auto txnIt = coordinatorsByTxnNumber.begin(); txnIt != coordinatorsByTxnNumber.end(); + ++txnIt) { + auto txnNumber = txnIt->first; + auto& transactionCoordinator = txnIt->second; + if (predicate(lsid, txnNumber, transactionCoordinator)) { + visitor(lsid, txnNumber, transactionCoordinator); + } + } + } +} } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_catalog.h b/src/mongo/db/s/transaction_coordinator_catalog.h index 0ae1c59e00b..5768c69bb3c 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.h +++ b/src/mongo/db/s/transaction_coordinator_catalog.h @@ -104,6 +104,17 @@ public: */ std::string toString() const; + using FilterPredicate = + std::function<bool(const LogicalSessionId lsid, + const TxnNumber txnNumber, + const std::shared_ptr<TransactionCoordinator> transactionCoordinator)>; + using FilterVisitor = + std::function<void(const LogicalSessionId lsid, + const TxnNumber txnNumber, + const std::shared_ptr<TransactionCoordinator> transactionCoordinator)>; + + void filter(FilterPredicate predicate, FilterVisitor visitor); + private: // Map of transaction coordinators, ordered in decreasing transaction number with the most // recent transaction at the front diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp index 09e949ef9fe..891191ab220 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp @@ -57,7 +57,7 @@ protected: LogicalSessionId lsid, TxnNumber txnNumber) { auto newCoordinator = std::make_shared<TransactionCoordinator>( - getServiceContext(), + operationContext(), lsid, txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), @@ -172,7 +172,7 @@ TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoC TransactionCoordinatorCatalog catalog; catalog.exitStepUp(Status::OK()); - auto coordinator = std::make_shared<TransactionCoordinator>(getServiceContext(), + auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(), lsid, txnNumber, aws.makeChildScheduler(), diff --git a/src/mongo/db/s/transaction_coordinator_curop.cpp b/src/mongo/db/s/transaction_coordinator_curop.cpp new file mode 100644 index 00000000000..29df19804c0 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_curop.cpp @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/db/s/transaction_coordinator_curop.h" + +namespace mongo { +MONGO_DEFINE_SHIM(reportCurrentOpsForTransactionCoordinators); +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_curop.h b/src/mongo/db/s/transaction_coordinator_curop.h new file mode 100644 index 00000000000..835a969c757 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_curop.h @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/db/pipeline/mongos_process_interface.h" + +namespace mongo { +extern MONGO_DECLARE_SHIM((OperationContext * opCtx, bool includeIdle, std::vector<BSONObj>* ops) + ->void) reportCurrentOpsForTransactionCoordinators; +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_curop_mongod.cpp b/src/mongo/db/s/transaction_coordinator_curop_mongod.cpp new file mode 100644 index 00000000000..47104250272 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_curop_mongod.cpp @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/db/s/transaction_coordinator_curop.h" + +#include "mongo/db/s/transaction_coordinator_service.h" + +namespace mongo { +MONGO_REGISTER_SHIM(reportCurrentOpsForTransactionCoordinators) +(OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops)->void { + TransactionCoordinatorService::get(opCtx)->reportCoordinators(opCtx, includeIdle, ops); +} +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp b/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp index c1fafbca96c..42b82236430 100644 --- a/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp +++ b/src/mongo/db/s/transaction_coordinator_metrics_observer.cpp @@ -195,4 +195,7 @@ void TransactionCoordinatorMetricsObserver::_decrementLastStep( } } +void TransactionCoordinatorMetricsObserver::updateLastClientInfo(Client* client) { + _singleTransactionCoordinatorStats.updateLastClientInfo(client); +} } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_metrics_observer.h b/src/mongo/db/s/transaction_coordinator_metrics_observer.h index de32c9e5705..9d73e64f6c7 100644 --- a/src/mongo/db/s/transaction_coordinator_metrics_observer.h +++ b/src/mongo/db/s/transaction_coordinator_metrics_observer.h @@ -112,6 +112,12 @@ public: return _singleTransactionCoordinatorStats; } + + /** + * Save information about the last client that interacted with this transaction. + */ + void updateLastClientInfo(Client* client); + private: /** * Decrements the current active in 'step'. diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index 7109c0cadf6..a8e72285cd6 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -78,14 +78,47 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, latestCoordinator->cancelIfCommitNotYetStarted(); } - catalog.insert(opCtx, - lsid, - txnNumber, - std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), - lsid, - txnNumber, - scheduler.makeChildScheduler(), - commitDeadline)); + auto coordinator = std::make_shared<TransactionCoordinator>( + opCtx, lsid, txnNumber, scheduler.makeChildScheduler(), commitDeadline); + + catalog.insert(opCtx, lsid, txnNumber, coordinator); +} + + +void TransactionCoordinatorService::reportCoordinators(OperationContext* opCtx, + bool includeIdle, + std::vector<BSONObj>* ops) { + std::shared_ptr<CatalogAndScheduler> cas; + try { + cas = _getCatalogAndScheduler(opCtx); + } catch (ExceptionFor<ErrorCodes::NotMaster>&) { + // If we are not master, don't include any output for transaction coordinators in + // the curOp command. + return; + } + + auto& catalog = cas->catalog; + + auto predicate = + [includeIdle](const LogicalSessionId lsid, + const TxnNumber txnNumber, + const std::shared_ptr<TransactionCoordinator> transactionCoordinator) { + TransactionCoordinator::Step step = transactionCoordinator->getStep(); + if (includeIdle || step > TransactionCoordinator::Step::kInactive) { + return true; + } + return false; + }; + + auto reporter = [ops](const LogicalSessionId lsid, + const TxnNumber txnNumber, + const std::shared_ptr<TransactionCoordinator> transactionCoordinator) { + BSONObjBuilder doc; + transactionCoordinator->reportState(doc); + ops->push_back(doc.obj()); + }; + + catalog.filter(predicate, reporter); } boost::optional<SharedSemiFuture<txn::CommitDecision>> @@ -101,7 +134,8 @@ TransactionCoordinatorService::coordinateCommit(OperationContext* opCtx, return boost::none; } - coordinator->runCommit(std::vector<ShardId>{participantList.begin(), participantList.end()}); + coordinator->runCommit(opCtx, + std::vector<ShardId>{participantList.begin(), participantList.end()}); return coordinator->onCompletion(); @@ -180,7 +214,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, const auto txnNumber = *doc.getId().getTxnNumber(); auto coordinator = std::make_shared<TransactionCoordinator>( - service, + opCtx, lsid, txnNumber, scheduler.makeChildScheduler(), diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h index a4e5fafa1d9..c200809744f 100644 --- a/src/mongo/db/s/transaction_coordinator_service.h +++ b/src/mongo/db/s/transaction_coordinator_service.h @@ -58,6 +58,12 @@ public: Date_t commitDeadline); /** + * Outputs a vector of BSON documents to the ops out-param containing information about active + * and idle coordinators in the system. + */ + void reportCoordinators(OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops); + + /** * If a coordinator for the (lsid, txnNumber) exists, delivers the participant list to the * coordinator, which will cause the coordinator to start coordinating the commit if the * coordinator had not yet received a list, and returns a Future that will contain the decision diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index 86d830341cb..d85c0d7e15a 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -809,12 +809,12 @@ using TransactionCoordinatorTest = TransactionCoordinatorTestBase; TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitResponses) { TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); assertPrepareSentAndRespondWithSuccess(); @@ -831,12 +831,12 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitRes TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommitResponses) { TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, @@ -853,12 +853,12 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommi TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbortResponses) { TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }, @@ -875,12 +875,12 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbor TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortResponseOnly) { TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -898,12 +898,12 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortRe TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnOneCommitResponseAndOneAbortResponseAfterRetry) { TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); // One participant votes commit and other encounters retryable error @@ -926,12 +926,12 @@ TEST_F(TransactionCoordinatorTest, TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnOneAbortResponseAndOneRetryableAbortResponse) { TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); // One participant votes abort and other encounters retryable error @@ -951,12 +951,12 @@ TEST_F(TransactionCoordinatorTest, TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnCommitAfterMultipleNetworkRetries) { TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); // One participant votes commit after retry. @@ -983,12 +983,12 @@ TEST_F(TransactionCoordinatorTest, TEST_F(TransactionCoordinatorTest, RunCommitProducesReadConcernMajorityNotEnabledIfEitherShardReturnsIt) { TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); // One participant votes commit and other encounters retryable error @@ -1188,6 +1188,28 @@ public: serverStatusSection.getObjectField("currentInSteps")["deletingCoordinatorDoc"].Long()); } + static void assertClientReportStateFields(BSONObj doc, std::string appName, int connectionId) { + ASSERT_EQ(StringData(doc.getStringField("appName")), appName); + ASSERT_EQ(doc.getIntField("connectionId"), connectionId); + + auto expectedDriverName = std::string("DriverName").insert(0, appName); + auto expectedDriverVersion = std::string("DriverVersion").insert(0, appName); + auto expectedOsType = std::string("OsType").insert(0, appName); + auto expectedOsName = std::string("OsName").insert(0, appName); + auto expectedOsArch = std::string("OsArchitecture").insert(0, appName); + auto expectedOsVersion = std::string("OsVersion").insert(0, appName); + + ASSERT_TRUE(doc.hasField("clientMetadata")); + auto driver = doc.getObjectField("clientMetadata").getObjectField("driver"); + ASSERT_EQ(StringData(driver.getStringField("name")), expectedDriverName); + ASSERT_EQ(StringData(driver.getStringField("version")), expectedDriverVersion); + auto os = doc.getObjectField("clientMetadata").getObjectField("os"); + ASSERT_EQ(StringData(os.getStringField("type")), expectedOsType); + ASSERT_EQ(StringData(os.getStringField("name")), expectedOsName); + ASSERT_EQ(StringData(os.getStringField("architecture")), expectedOsArch); + ASSERT_EQ(StringData(os.getStringField("version")), expectedOsVersion); + } + Date_t advanceClockSourceAndReturnNewNow() { const auto newNow = Date_t::now(); clockSource()->reset(newNow); @@ -1198,13 +1220,13 @@ public: startCapturingLogMessages(); TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(); assertPrepareSentAndRespondWithSuccess(); @@ -1498,7 +1520,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) { expectedMetrics.totalCreated++; TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), @@ -1524,7 +1546,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) { BSON("mode" << "alwaysOn" << "data" << BSON("useUninterruptibleSleep" << 1))); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); waitUntilCoordinatorDocIsPresent(); checkStats(stats, expectedStats); @@ -1672,7 +1694,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) { expectedMetrics.totalCreated++; TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), @@ -1719,7 +1741,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); auto awsPtr = aws.get(); TransactionCoordinator coordinator( - getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -1762,7 +1784,7 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); TransactionCoordinator coordinator( - getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -1779,7 +1801,7 @@ TEST_F(TransactionCoordinatorMetricsTest, expectedMetrics.currentWritingParticipantList++; FailPointEnableBlock fp("hangBeforeWaitingForParticipantListWriteConcern"); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); waitUntilCoordinatorDocIsPresent(); checkStats(stats, expectedStats); @@ -1824,7 +1846,7 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); auto awsPtr = aws.get(); TransactionCoordinator coordinator( - getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -1840,7 +1862,7 @@ TEST_F(TransactionCoordinatorMetricsTest, expectedMetrics.totalStartedTwoPhaseCommit++; expectedMetrics.currentWaitingForVotes++; - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); waitUntilMessageSent(); checkStats(stats, expectedStats); @@ -1888,7 +1910,7 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); TransactionCoordinator coordinator( - getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -1906,7 +1928,7 @@ TEST_F(TransactionCoordinatorMetricsTest, FailPointEnableBlock fp("hangBeforeWaitingForDecisionWriteConcern"); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); // Respond to the second prepare request in a separate thread, because the coordinator will // hijack that thread to run its continuation. assertPrepareSentAndRespondWithSuccess(); @@ -1956,7 +1978,7 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); auto awsPtr = aws.get(); TransactionCoordinator coordinator( - getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -1972,7 +1994,7 @@ TEST_F(TransactionCoordinatorMetricsTest, expectedMetrics.totalStartedTwoPhaseCommit++; expectedMetrics.currentWaitingForDecisionAcks++; - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); // Respond to the second prepare request in a separate thread, because the coordinator will // hijack that thread to run its continuation. assertPrepareSentAndRespondWithSuccess(); @@ -2026,7 +2048,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); auto awsPtr = aws.get(); TransactionCoordinator coordinator( - getServiceContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2044,7 +2066,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina FailPointEnableBlock fp("hangAfterDeletingCoordinatorDoc"); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); // Respond to the second prepare request in a separate thread, because the coordinator will // hijack that thread to run its continuation. assertPrepareSentAndRespondWithSuccess(); @@ -2109,13 +2131,13 @@ TEST_F(TransactionCoordinatorMetricsTest, DoesNotLogTransactionsUnderSlowMSThres startCapturingLogMessages(); TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); tickSource()->advance(Milliseconds(99)); @@ -2141,7 +2163,7 @@ TEST_F( startCapturingLogMessages(); TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), @@ -2149,7 +2171,7 @@ TEST_F( tickSource()->advance(Milliseconds(101)); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(); assertPrepareSentAndRespondWithSuccess(); @@ -2171,13 +2193,13 @@ TEST_F(TransactionCoordinatorMetricsTest, LogsTransactionsOverSlowMSThreshold) { startCapturingLogMessages(); TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); tickSource()->advance(Milliseconds(101)); @@ -2213,13 +2235,13 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesTerminationCauseFor startCapturingLogMessages(); TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -2243,7 +2265,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesStepDurationsAndTot startCapturingLogMessages(); TransactionCoordinator coordinator( - getServiceContext(), + operationContext(), _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), @@ -2253,7 +2275,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesStepDurationsAndTot FailPointEnableBlock fp("hangBeforeWaitingForParticipantListWriteConcern", BSON("useUninterruptibleSleep" << 1)); - coordinator.runCommit(kTwoShardIdList); + coordinator.runCommit(operationContext(), kTwoShardIdList); waitUntilCoordinatorDocIsPresent(); // Increase the duration spent writing the participant list. @@ -2358,5 +2380,45 @@ TEST_F(TransactionCoordinatorMetricsTest, checkServerStatus(); } +TEST_F(TransactionCoordinatorMetricsTest, ClientInformationIncludedInReportState) { + const auto expectedAppName = std::string("Foo"); + associateClientMetadata(getClient(), expectedAppName); + + TransactionCoordinator coordinator( + operationContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + Date_t::max()); + + { + BSONObjBuilder builder; + coordinator.reportState(builder); + BSONObj reportDoc = builder.obj(); + ASSERT_EQ(StringData(reportDoc.getStringField("desc")), "transaction coordinator"); + assertClientReportStateFields(reportDoc, expectedAppName, getClient()->getConnectionId()); + } + + const auto expectedAppName2 = std::string("Bar"); + associateClientMetadata(getClient(), expectedAppName2); + + coordinator.runCommit(operationContext(), kTwoShardIdList); + + { + BSONObjBuilder builder; + coordinator.reportState(builder); + BSONObj reportDoc = builder.obj(); + ASSERT_EQ(StringData(reportDoc.getStringField("desc")), "transaction coordinator"); + assertClientReportStateFields(reportDoc, expectedAppName2, getClient()->getConnectionId()); + } + + assertPrepareSentAndRespondWithSuccess(); + assertPrepareSentAndRespondWithSuccess(); + + assertCommitSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + + coordinator.onCompletion().get(); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp index 6554461dd97..2891f938e85 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp @@ -38,6 +38,8 @@ #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/operation_context.h" #include "mongo/db/s/wait_for_majority_service.h" +#include "mongo/rpc/metadata/client_metadata.h" +#include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/unittest/unittest.h" @@ -123,4 +125,20 @@ void TransactionCoordinatorTestFixture::advanceClockAndExecuteScheduledTasks() { network()->advanceTime(network()->now() + Seconds{1}); } +void TransactionCoordinatorTestFixture::associateClientMetadata(Client* client, + std::string appName) { + BSONObjBuilder metadataBuilder; + ASSERT_OK(ClientMetadata::serializePrivate(std::string("DriverName").insert(0, appName), + std::string("DriverVersion").insert(0, appName), + std::string("OsType").insert(0, appName), + std::string("OsName").insert(0, appName), + std::string("OsArchitecture").insert(0, appName), + std::string("OsVersion").insert(0, appName), + appName, + &metadataBuilder)); + auto clientMetadata = metadataBuilder.obj(); + auto clientMetadataParse = ClientMetadata::parse(clientMetadata["client"]); + ClientMetadataIsMasterState::setClientMetadata(client, + std::move(clientMetadataParse.getValue())); +} } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.h b/src/mongo/db/s/transaction_coordinator_test_fixture.h index 5f1067a9320..03763d3e2a1 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.h +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.h @@ -66,6 +66,13 @@ protected: */ void advanceClockAndExecuteScheduledTasks(); + + /** + * Associates metatadata with the provided client. Metadata fields have appName prepended to + * thier value. + */ + static void associateClientMetadata(Client* client, std::string appName); + const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}}; const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}}; diff --git a/src/mongo/embedded/SConscript b/src/mongo/embedded/SConscript index 5e1c75c9a2e..41650b01a4e 100644 --- a/src/mongo/embedded/SConscript +++ b/src/mongo/embedded/SConscript @@ -79,6 +79,7 @@ env.Library( 'service_entry_point_embedded.cpp', 'transaction_coordinator_factory_embedded.cpp', 'transaction_coordinator_worker_curop_repository_embedded.cpp', + 'transaction_coordinator_curop_embedded.cpp', env.Idlc('embedded_options.idl')[0], ], LIBDEPS=[ diff --git a/src/mongo/embedded/transaction_coordinator_curop_embedded.cpp b/src/mongo/embedded/transaction_coordinator_curop_embedded.cpp new file mode 100644 index 00000000000..ecec687a3e7 --- /dev/null +++ b/src/mongo/embedded/transaction_coordinator_curop_embedded.cpp @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/db/s/transaction_coordinator_curop.h" + +namespace mongo { +MONGO_REGISTER_SHIM(reportCurrentOpsForTransactionCoordinators) +(OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops)->void {} +} // namespace mongo |