diff options
author | Lamont Nelson <lamont.nelson@mongodb.com> | 2019-08-22 17:05:21 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-08-22 17:05:21 +0000 |
commit | 65bf2734c67f8c2cc74df24df2c3b11cff71f9c0 (patch) | |
tree | cdbf0c046e98144137b21b41448728c3c074a2c8 /src/mongo | |
parent | 283e8966d1b24559963c460591b1cb845e4c6f9a (diff) | |
download | mongo-65bf2734c67f8c2cc74df24df2c3b11cff71f9c0.tar.gz |
SERVER-40987 Output transaction information in curOp for coordinator worker
threads.
Diffstat (limited to 'src/mongo')
15 files changed, 488 insertions, 32 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 25efa885ef3..c41118f6cf3 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -531,5 +531,5 @@ env.CppUnitTest( 'pipeline', 'process_interface_shardsvr', 'process_interface_standalone', - ], + ] ) diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index ac6198dd79f..899cc6b81b2 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -51,6 +51,7 @@ #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/transaction_coordinator_worker_curop_info.h" #include "mongo/db/session_catalog.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/stats/fill_locker_info.h" @@ -599,6 +600,10 @@ BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient( fillLockerInfo(*lockerInfo, builder); } + if (auto tcWorkerRepo = getTransactionCoordinatorWorkerCurOpRepository()) { + tcWorkerRepo->reportState(clientOpCtx, &builder); + } + auto flowControlStats = clientOpCtx->lockState()->getFlowControlStats(); flowControlStats.writeToBuilder(builder); } diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 336afc36aa8..9f370363f73 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -19,6 +19,7 @@ env.Library( 'sharding_migration_critical_section.cpp', 'sharding_state.cpp', 'transaction_coordinator_factory.cpp', + 'transaction_coordinator_worker_curop_repository.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -132,6 +133,7 @@ env.Library( 'transaction_coordinator_service.cpp', 'transaction_coordinator_structures.cpp', 'transaction_coordinator_util.cpp', + 'transaction_coordinator_worker_curop_repository_mongod.cpp', 'transaction_coordinator.cpp', 'wait_for_majority_service.cpp', env.Idlc('transaction_coordinator_document.idl')[0], diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp index 79128137b6b..59ae621bc59 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp @@ -74,7 +74,10 @@ AsyncWorkScheduler::~AsyncWorkScheduler() { } Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand( - const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) { + const ShardId& shardId, + const ReadPreferenceSetting& readPref, + const BSONObj& commandObj, + OperationContextFn operationContextFn) { const bool isSelfShard = (shardId == getLocalShardId(_serviceContext)); @@ -83,8 +86,10 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot // rather than going through the host targeting below. This ensures that the state changes // for the participant and coordinator occur sequentially on a single branch of replica set // history. See SERVER-38142 for details. - return scheduleWork([this, shardId, commandObj = commandObj.getOwned()]( + return scheduleWork([this, shardId, operationContextFn, commandObj = commandObj.getOwned()]( OperationContext* opCtx) { + operationContextFn(opCtx); + // Note: This internal authorization is tied to the lifetime of the client, which will // be destroyed by 'scheduleWork' immediately after this lambda ends AuthorizationSession::get(opCtx->getClient()) @@ -113,7 +118,7 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot }); } - return _targetHostAsync(shardId, readPref) + return _targetHostAsync(shardId, readPref, operationContextFn) .then([this, shardId, commandObj = commandObj.getOwned(), readPref]( HostAndShard hostAndShard) mutable { executor::RemoteCommandRequest request(hostAndShard.hostTargeted, @@ -218,8 +223,11 @@ void AsyncWorkScheduler::join() { } Future<AsyncWorkScheduler::HostAndShard> AsyncWorkScheduler::_targetHostAsync( - const ShardId& shardId, const ReadPreferenceSetting& readPref) { - return scheduleWork([shardId, readPref](OperationContext* opCtx) { + const ShardId& shardId, + const ReadPreferenceSetting& readPref, + OperationContextFn operationContextFn) { + return scheduleWork([shardId, readPref, operationContextFn](OperationContext* opCtx) { + operationContextFn(opCtx); const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h index 7aef1fc8e78..eb769319aad 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.h +++ b/src/mongo/db/s/transaction_coordinator_futures_util.h @@ -42,6 +42,9 @@ #include "mongo/util/time_support.h" namespace mongo { + +using OperationContextFn = std::function<void(OperationContext*)>; + namespace txn { /** @@ -131,7 +134,10 @@ public: * completes (with error or not). */ Future<executor::TaskExecutor::ResponseStatus> scheduleRemoteCommand( - const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj); + const ShardId& shardId, + const ReadPreferenceSetting& readPref, + const BSONObj& commandObj, + OperationContextFn operationContextFn = [](OperationContext*) {}); /** * Allows sub-tasks on this scheduler to be grouped together and works-around the fact that @@ -176,7 +182,9 @@ private: * Finds the host and port for a shard id, returning it and the shard object used for targeting. */ Future<HostAndShard> _targetHostAsync(const ShardId& shardId, - const ReadPreferenceSetting& readPref); + const ReadPreferenceSetting& readPref, + OperationContextFn operationContextFn = + [](OperationContext*) {}); /** * Returns true when all the registered child schedulers, op contexts and handles have joined. diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp index dbffc60de1d..237ec82525e 100644 --- a/src/mongo/db/s/transaction_coordinator_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_util.cpp @@ -36,9 +36,12 @@ #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/transaction_coordinator_futures_util.h" +#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/fail_point_service.h" @@ -48,13 +51,16 @@ namespace mongo { namespace txn { namespace { -MONGO_FAIL_POINT_DEFINE(hangAfterDeletingCoordinatorDoc); - MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList); +MONGO_FAIL_POINT_DEFINE(hangBeforeSendingPrepare); MONGO_FAIL_POINT_DEFINE(hangBeforeWritingDecision); +MONGO_FAIL_POINT_DEFINE(hangBeforeSendingCommit); +MONGO_FAIL_POINT_DEFINE(hangBeforeSendingAbort); MONGO_FAIL_POINT_DEFINE(hangBeforeDeletingCoordinatorDoc); +MONGO_FAIL_POINT_DEFINE(hangAfterDeletingCoordinatorDoc); using ResponseStatus = executor::TaskExecutor::ResponseStatus; +using CoordinatorAction = TransactionCoordinatorWorkerCurOpRepository::CoordinatorAction; const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); @@ -177,6 +183,8 @@ Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, [](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); }, [&scheduler, lsid, txnNumber, participants] { return scheduler.scheduleWork([lsid, txnNumber, participants](OperationContext* opCtx) { + getTransactionCoordinatorWorkerCurOpRepository()->set( + opCtx, lsid, txnNumber, CoordinatorAction::kWritingParticipantList); return persistParticipantListBlocking(opCtx, lsid, txnNumber, participants); }); }); @@ -226,9 +234,20 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, // vector of responses. auto prepareScheduler = scheduler.makeChildScheduler(); + OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) { + invariant(opCtx); + getTransactionCoordinatorWorkerCurOpRepository()->set( + opCtx, lsid, txnNumber, CoordinatorAction::kSendingPrepare); + + if (MONGO_FAIL_POINT(hangBeforeSendingPrepare)) { + LOG(0) << "Hit hangBeforeSendingPrepare failpoint"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangBeforeSendingPrepare); + } + }; + for (const auto& participant : participants) { - responses.emplace_back( - sendPrepareToShard(service, *prepareScheduler, participant, prepareObj)); + responses.emplace_back(sendPrepareToShard( + service, *prepareScheduler, participant, prepareObj, operationContextFn)); } // Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp @@ -352,6 +371,8 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, [&scheduler, lsid, txnNumber, participants, decision] { return scheduler.scheduleWork( [lsid, txnNumber, participants, decision](OperationContext* opCtx) { + getTransactionCoordinatorWorkerCurOpRepository()->set( + opCtx, lsid, txnNumber, CoordinatorAction::kWritingDecision); return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision); }); }); @@ -370,9 +391,21 @@ Future<void> sendCommit(ServiceContext* service, BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) { + invariant(opCtx); + getTransactionCoordinatorWorkerCurOpRepository()->set( + opCtx, lsid, txnNumber, CoordinatorAction::kSendingCommit); + + if (MONGO_FAIL_POINT(hangBeforeSendingCommit)) { + LOG(0) << "Hit hangBeforeSendingCommit failpoint"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangBeforeSendingCommit); + } + }; + std::vector<Future<void>> responses; for (const auto& participant : participants) { - responses.push_back(sendDecisionToShard(service, scheduler, participant, commitObj)); + responses.push_back( + sendDecisionToShard(service, scheduler, participant, commitObj, operationContextFn)); } return txn::whenAll(responses); } @@ -388,9 +421,21 @@ Future<void> sendAbort(ServiceContext* service, BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) { + invariant(opCtx); + getTransactionCoordinatorWorkerCurOpRepository()->set( + opCtx, lsid, txnNumber, CoordinatorAction::kSendingAbort); + + if (MONGO_FAIL_POINT(hangBeforeSendingAbort)) { + LOG(0) << "Hit hangBeforeSendingAbort failpoint"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangBeforeSendingAbort); + } + }; + std::vector<Future<void>> responses; for (const auto& participant : participants) { - responses.push_back(sendDecisionToShard(service, scheduler, participant, abortObj)); + responses.push_back( + sendDecisionToShard(service, scheduler, participant, abortObj, operationContextFn)); } return txn::whenAll(responses); } @@ -471,15 +516,17 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, TxnNumber txnNumber) { - return txn::doWhile(scheduler, - boost::none /* no need for a backoff */, - [](const Status& s) { return s == ErrorCodes::Interrupted; }, - [&scheduler, lsid, txnNumber] { - return scheduler.scheduleWork( - [lsid, txnNumber](OperationContext* opCtx) { - deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber); - }); - }); + return txn::doWhile( + scheduler, + boost::none /* no need for a backoff */, + [](const Status& s) { return s == ErrorCodes::Interrupted; }, + [&scheduler, lsid, txnNumber] { + return scheduler.scheduleWork([lsid, txnNumber](OperationContext* opCtx) { + getTransactionCoordinatorWorkerCurOpRepository()->set( + opCtx, lsid, txnNumber, CoordinatorAction::kDeletingCoordinatorDoc); + deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber); + }); + }); } std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx) { @@ -503,9 +550,9 @@ std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationCont Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, - const BSONObj& commandObj) { + const BSONObj& commandObj, + OperationContextFn operationContextFn) { const bool isLocalShard = (shardId == txn::getLocalShardId(service)); - auto f = txn::doWhile( scheduler, kExponentialBackoff, @@ -516,11 +563,17 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, swPrepareResponse != ErrorCodes::TransactionCoordinatorSteppingDown && swPrepareResponse != ErrorCodes::TransactionCoordinatorReachedAbortDecision; }, - [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned()] { + [&scheduler, + shardId, + isLocalShard, + commandObj = commandObj.getOwned(), + operationContextFn] { LOG(3) << "Coordinator going to send command " << commandObj << " to " << (isLocalShard ? " local " : "") << " shard " << shardId; - return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) + return scheduler + .scheduleRemoteCommand( + shardId, kPrimaryReadPreference, commandObj, operationContextFn) .then([shardId, commandObj = commandObj.getOwned()](ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); @@ -597,7 +650,8 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, Future<void> sendDecisionToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, - const BSONObj& commandObj) { + const BSONObj& commandObj, + OperationContextFn operationContextFn) { const bool isLocalShard = (shardId == txn::getLocalShardId(service)); return txn::doWhile( @@ -608,11 +662,17 @@ Future<void> sendDecisionToShard(ServiceContext* service, // coordinator-specific code. return !s.isOK() && s != ErrorCodes::TransactionCoordinatorSteppingDown; }, - [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned()] { + [&scheduler, + shardId, + isLocalShard, + operationContextFn, + commandObj = commandObj.getOwned()] { LOG(3) << "Coordinator going to send command " << commandObj << " to " << (isLocalShard ? "local" : "") << " shard " << shardId; - return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) + return scheduler + .scheduleRemoteCommand( + shardId, kPrimaryReadPreference, commandObj, operationContextFn) .then([shardId, commandObj = commandObj.getOwned()](ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h index b83afda06e1..0b020c3dddc 100644 --- a/src/mongo/db/s/transaction_coordinator_util.h +++ b/src/mongo/db/s/transaction_coordinator_util.h @@ -205,7 +205,9 @@ struct PrepareResponse { Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, - const BSONObj& prepareCommandObj); + const BSONObj& prepareCommandObj, + OperationContextFn operationContextFn = + [](OperationContext*) {}); /** * Sends a command corresponding to a commit decision (i.e. commitTransaction or* @@ -221,7 +223,8 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, Future<void> sendDecisionToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, - const BSONObj& commandObj); + const BSONObj& commandObj, + OperationContextFn operationContextFn = [](OperationContext*) {}); } // namespace txn } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_info.cpp b/src/mongo/db/s/transaction_coordinator_worker_curop_info.cpp new file mode 100644 index 00000000000..ead0c5e48ac --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_worker_curop_info.cpp @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/transaction_coordinator_worker_curop_info.h" + +#include "mongo/base/shim.h" + +namespace mongo { + +MONGO_DEFINE_SHIM(getTransactionCoordinatorWorkerCurOpRepository); + +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_info.h b/src/mongo/db/s/transaction_coordinator_worker_curop_info.h new file mode 100644 index 00000000000..461fed02ec2 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_worker_curop_info.h @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h" + +#include "mongo/base/shim.h" +#include "mongo/db/curop.h" + +namespace mongo { +class TransactionCoordinatorWorkerCurOpInfo { + using CoordinatorAction = TransactionCoordinatorWorkerCurOpRepository::CoordinatorAction; + TransactionCoordinatorWorkerCurOpInfo() = delete; + +public: + TransactionCoordinatorWorkerCurOpInfo(LogicalSessionId lsid, + TxnNumber txnNumber, + Date_t startTime, + CoordinatorAction action); + + void reportState(BSONObjBuilder* parent) const; + +private: + static const std::string toString(CoordinatorAction action); + + LogicalSessionId _lsid; + TxnNumber _txnNumber; + Date_t _startTime; + CoordinatorAction _action; +}; +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_repository.cpp b/src/mongo/db/s/transaction_coordinator_worker_curop_repository.cpp new file mode 100644 index 00000000000..4c9e0d103b8 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_worker_curop_repository.cpp @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h" + +namespace mongo { + +MONGO_DEFINE_SHIM(getTransactionCoordinatorWorkerCurOpRepository); + +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_repository.h b/src/mongo/db/s/transaction_coordinator_worker_curop_repository.h new file mode 100644 index 00000000000..a19f72fea12 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_worker_curop_repository.h @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/base/shim.h" +#include "mongo/db/curop.h" + +namespace mongo { +class TransactionCoordinatorWorkerCurOpRepository; + +class TransactionCoordinatorWorkerCurOpRepository { +public: + TransactionCoordinatorWorkerCurOpRepository() {} + virtual ~TransactionCoordinatorWorkerCurOpRepository() {} + + enum class CoordinatorAction { + kWritingParticipantList, + kSendingPrepare, + kWritingDecision, + kSendingCommit, + kSendingAbort, + kDeletingCoordinatorDoc + }; + + /** + * Associates the two phase commit coordinator transaction information with the + * OperationContext instance. + * + * This method takes the associated client lock from the OperationContext. + */ + virtual void set(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber txnNumber, + const CoordinatorAction action) = 0; + + /** + * Output the state into BSON previously associated with this OperationContext instance. + */ + virtual void reportState(OperationContext* opCtx, BSONObjBuilder* parent) const = 0; +}; + +extern MONGO_DECLARE_SHIM(()->std::shared_ptr<TransactionCoordinatorWorkerCurOpRepository>) + getTransactionCoordinatorWorkerCurOpRepository; +} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_repository_mongod.cpp b/src/mongo/db/s/transaction_coordinator_worker_curop_repository_mongod.cpp new file mode 100644 index 00000000000..a805e6f69f8 --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_worker_curop_repository_mongod.cpp @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/db/s/transaction_coordinator_worker_curop_info.h" + +#include "mongo/base/shim.h" + +namespace mongo { +const auto getTransactionCoordinatorWorkerCurOpInfo = + OperationContext::declareDecoration<boost::optional<TransactionCoordinatorWorkerCurOpInfo>>(); + +class MongoDTransactionCoordinatorWorkerCurOpRepository final + : public TransactionCoordinatorWorkerCurOpRepository { +public: + MongoDTransactionCoordinatorWorkerCurOpRepository() {} + + void set(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber txnNumber, + const CoordinatorAction action) override { + auto startTime = opCtx->getServiceContext()->getPreciseClockSource()->now(); + stdx::lock_guard<Client> lk(*opCtx->getClient()); + getTransactionCoordinatorWorkerCurOpInfo(opCtx).emplace( + TransactionCoordinatorWorkerCurOpInfo(lsid, txnNumber, startTime, action)); + } + + /** + * Caller should hold the client lock associated with the OperationContext. + */ + void reportState(OperationContext* opCtx, BSONObjBuilder* parent) const override { + if (auto info = getTransactionCoordinatorWorkerCurOpInfo(opCtx)) { + info->reportState(parent); + } + } +}; + +const auto _transactionCoordinatorWorkerCurOpRepository = + std::make_shared<MongoDTransactionCoordinatorWorkerCurOpRepository>(); + +MONGO_REGISTER_SHIM(getTransactionCoordinatorWorkerCurOpRepository) +()->std::shared_ptr<TransactionCoordinatorWorkerCurOpRepository> { + return _transactionCoordinatorWorkerCurOpRepository; +} + +TransactionCoordinatorWorkerCurOpInfo::TransactionCoordinatorWorkerCurOpInfo( + LogicalSessionId lsid, TxnNumber txnNumber, Date_t startTime, CoordinatorAction action) + : _lsid(lsid), _txnNumber(txnNumber), _startTime(startTime), _action(action) {} + + +const std::string TransactionCoordinatorWorkerCurOpInfo::toString(CoordinatorAction action) { + switch (action) { + case CoordinatorAction::kSendingPrepare: + return "sendingPrepare"; + case CoordinatorAction::kSendingCommit: + return "sendingCommit"; + case CoordinatorAction::kSendingAbort: + return "sendingAbort"; + case CoordinatorAction::kWritingParticipantList: + return "writingParticipantList"; + case CoordinatorAction::kWritingDecision: + return "writingDecision"; + case CoordinatorAction::kDeletingCoordinatorDoc: + return "deletingCoordinatorDoc"; + default: + MONGO_UNREACHABLE + } +} + +void TransactionCoordinatorWorkerCurOpInfo::reportState(BSONObjBuilder* parent) const { + invariant(parent); + BSONObjBuilder twoPhaseCoordinatorBuilder; + BSONObjBuilder lsidBuilder(twoPhaseCoordinatorBuilder.subobjStart("lsid")); + _lsid.serialize(&lsidBuilder); + lsidBuilder.doneFast(); + twoPhaseCoordinatorBuilder.append("txnNumber", _txnNumber); + twoPhaseCoordinatorBuilder.append("action", toString(_action)); + twoPhaseCoordinatorBuilder.append("startTime", dateToISOStringUTC(_startTime)); + parent->append("twoPhaseCommitCoordinator", twoPhaseCoordinatorBuilder.obj()); +} +} // namespace mongo diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 0f52b7a00b9..639b44e6e13 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -45,6 +45,7 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(hangAfterStartingCoordinateCommit); MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic); class PrepareTransactionCmd : public TypedCommand<PrepareTransactionCmd> { @@ -233,6 +234,12 @@ public: *opCtx->getTxnNumber(), validateParticipants(opCtx, cmd.getParticipants())); + if (MONGO_FAIL_POINT(hangAfterStartingCoordinateCommit)) { + LOG(0) << "Hit hangAfterStartingCoordinateCommit failpoint"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, + hangAfterStartingCoordinateCommit); + } + ON_BLOCK_EXIT([opCtx] { // A decision will most likely have been written from a different OperationContext // (in all cases except the one where this command aborts the local participant), so diff --git a/src/mongo/embedded/SConscript b/src/mongo/embedded/SConscript index fffa90c17f2..5e1c75c9a2e 100644 --- a/src/mongo/embedded/SConscript +++ b/src/mongo/embedded/SConscript @@ -78,6 +78,7 @@ env.Library( 'replication_coordinator_embedded.cpp', 'service_entry_point_embedded.cpp', 'transaction_coordinator_factory_embedded.cpp', + 'transaction_coordinator_worker_curop_repository_embedded.cpp', env.Idlc('embedded_options.idl')[0], ], LIBDEPS=[ diff --git a/src/mongo/embedded/transaction_coordinator_worker_curop_repository_embedded.cpp b/src/mongo/embedded/transaction_coordinator_worker_curop_repository_embedded.cpp new file mode 100644 index 00000000000..a330a42c2d0 --- /dev/null +++ b/src/mongo/embedded/transaction_coordinator_worker_curop_repository_embedded.cpp @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/base/shim.h" + +#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h" + +namespace mongo { +class NoOpTransactionCoordinatorWorkerCurOpRepository final + : public TransactionCoordinatorWorkerCurOpRepository { +public: + NoOpTransactionCoordinatorWorkerCurOpRepository() {} + + void set(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber txnNumber, + const CoordinatorAction action) override {} + + void reportState(OperationContext* opCtx, BSONObjBuilder* parent) const override {} +}; + +const auto _transactionCoordinatorWorkerCurOpRepository = + std::make_shared<NoOpTransactionCoordinatorWorkerCurOpRepository>(); + +MONGO_REGISTER_SHIM(getTransactionCoordinatorWorkerCurOpRepository) +()->std::shared_ptr<TransactionCoordinatorWorkerCurOpRepository> { + return _transactionCoordinatorWorkerCurOpRepository; +} +} // namespace mongo |