summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLamont Nelson <lamont.nelson@mongodb.com>2019-08-22 17:05:21 +0000
committerevergreen <evergreen@mongodb.com>2019-08-22 17:05:21 +0000
commit65bf2734c67f8c2cc74df24df2c3b11cff71f9c0 (patch)
treecdbf0c046e98144137b21b41448728c3c074a2c8
parent283e8966d1b24559963c460591b1cb845e4c6f9a (diff)
downloadmongo-65bf2734c67f8c2cc74df24df2c3b11cff71f9c0.tar.gz
SERVER-40987 Output transaction information in curOp for coordinator worker
threads.
-rw-r--r--jstests/noPassthrough/transaction_coordinator_curop_info.js177
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp5
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.cpp18
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.h12
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.cpp104
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.h7
-rw-r--r--src/mongo/db/s/transaction_coordinator_worker_curop_info.cpp38
-rw-r--r--src/mongo/db/s/transaction_coordinator_worker_curop_info.h57
-rw-r--r--src/mongo/db/s/transaction_coordinator_worker_curop_repository.cpp38
-rw-r--r--src/mongo/db/s/transaction_coordinator_worker_curop_repository.h70
-rw-r--r--src/mongo/db/s/transaction_coordinator_worker_curop_repository_mongod.cpp105
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp7
-rw-r--r--src/mongo/embedded/SConscript1
-rw-r--r--src/mongo/embedded/transaction_coordinator_worker_curop_repository_embedded.cpp54
16 files changed, 665 insertions, 32 deletions
diff --git a/jstests/noPassthrough/transaction_coordinator_curop_info.js b/jstests/noPassthrough/transaction_coordinator_curop_info.js
new file mode 100644
index 00000000000..7341009e8a7
--- /dev/null
+++ b/jstests/noPassthrough/transaction_coordinator_curop_info.js
@@ -0,0 +1,177 @@
+/**
+ * Tests that the transaction items in the 'twoPhaseCommitCoordinator' object in currentOp() are
+ * being tracked correctly.
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+
+(function() {
+'use strict';
+load('jstests/sharding/libs/sharded_transactions_helpers.js');
+
+function commitTxn(st, lsid, txnNumber, expectedError = null) {
+ let cmd = "db.adminCommand({" +
+ "commitTransaction: 1," +
+ "lsid: " + tojson(lsid) + "," +
+ "txnNumber: NumberLong(" + txnNumber + ")," +
+ "stmtId: NumberInt(0)," +
+ "autocommit: false," +
+ "})";
+ if (expectedError) {
+ cmd = "assert.commandFailedWithCode(" + cmd + "," + String(expectedError) + ");";
+ } else {
+ cmd = "assert.commandWorked(" + cmd + ");";
+ }
+ return startParallelShell(cmd, st.s.port);
+}
+
+function curOpAfterFailpoint(fpName, filter, fpCount = 1) {
+ const expectedLog = "Hit " + fpName + " failpoint";
+ jsTest.log(`waiting for failpoint '${fpName}' to appear in the log ${fpCount} time(s).`);
+ waitForFailpoint(expectedLog, fpCount);
+
+ jsTest.log(`Running curOp operation after '${fpName}' failpoint.`);
+ let result = adminDB.aggregate([{$currentOp: {}}, {$match: filter}]).toArray();
+
+ jsTest.log(`${result.length} matching curOp entries after '${fpName}':\n${tojson(result)}`);
+
+ jsTest.log(`disable '${fpName}' failpoint.`);
+ assert.commandWorked(coordinator.adminCommand({
+ configureFailPoint: fpName,
+ mode: "off",
+ }));
+
+ return result;
+}
+
+function makeWorkerFilterWithAction(session, action, txnNumber) {
+ return {
+ active: true,
+ 'twoPhaseCommitCoordinator.lsid.id': session.getSessionId().id,
+ 'twoPhaseCommitCoordinator.txnNumber': NumberLong(txnNumber),
+ 'twoPhaseCommitCoordinator.action': action,
+ 'twoPhaseCommitCoordinator.startTime': {$exists: true}
+ };
+}
+
+function enableFailPoints(shard, failPoints) {
+ jsTest.log(`enabling the following failpoints: ${tojson(failPoints)}`);
+ failPoints.forEach(function(fpName) {
+ assert.commandWorked(shard.adminCommand({
+ configureFailPoint: fpName,
+ mode: "alwaysOn",
+ }));
+ });
+}
+
+function startTransaction(session, collectionName, insertValue) {
+ const dbName = session.getDatabase('test');
+ jsTest.log(`Starting a new transaction on ${dbName}.${collectionName}`);
+ session.startTransaction();
+ // insert into both shards
+ assert.commandWorked(dbName[collectionName].insert({_id: -1 * insertValue}));
+ assert.commandWorked(dbName[collectionName].insert({_id: insertValue}));
+}
+
+// Setup test
+const numShards = 2;
+const st = new ShardingTest({shards: numShards, config: 1});
+const dbName = "test";
+const collectionName = 'currentop_two_phase';
+const ns = dbName + "." + collectionName;
+const adminDB = st.s.getDB('admin');
+const coordinator = st.shard0;
+const participant = st.shard1;
+const failPoints = [
+ 'hangAfterStartingCoordinateCommit',
+ 'hangBeforeWritingParticipantList',
+ 'hangBeforeSendingPrepare',
+ 'hangBeforeWritingDecision',
+ 'hangBeforeSendingCommit',
+ 'hangBeforeDeletingCoordinatorDoc',
+ 'hangBeforeSendingAbort'
+];
+
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: coordinator.shardName}));
+assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
+assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: participant.shardName}));
+assert.commandWorked(coordinator.adminCommand({_flushRoutingTableCacheUpdates: ns}));
+assert.commandWorked(participant.adminCommand({_flushRoutingTableCacheUpdates: ns}));
+
+enableFailPoints(coordinator, failPoints);
+
+jsTest.log("Testing that coordinator threads show up in currentOp for a commit decision");
+{
+ let session = adminDB.getMongo().startSession();
+ startTransaction(session, collectionName, 1);
+ let txnNumber = session.getTxnNumber_forTesting();
+ let lsid = session.getSessionId();
+ let commitJoin = commitTxn(st, lsid, txnNumber);
+
+ const coordinateCommitFilter = {
+ active: true,
+ 'command.coordinateCommitTransaction': 1,
+ 'command.lsid.id': session.getSessionId().id,
+ 'command.txnNumber': NumberLong(txnNumber),
+ 'command.coordinator': true,
+ 'command.autocommit': false
+ };
+ let createCoordinateCommitTxnOp =
+ curOpAfterFailpoint("hangAfterStartingCoordinateCommit", coordinateCommitFilter);
+ assert.eq(1, createCoordinateCommitTxnOp.length);
+
+ const writeParticipantFilter =
+ makeWorkerFilterWithAction(session, "writingParticipantList", txnNumber);
+ let writeParticipantOp =
+ curOpAfterFailpoint('hangBeforeWritingParticipantList', writeParticipantFilter);
+ assert.eq(1, writeParticipantOp.length);
+
+ const sendPrepareFilter = makeWorkerFilterWithAction(session, "sendingPrepare", txnNumber);
+ let sendPrepareOp =
+ curOpAfterFailpoint('hangBeforeSendingPrepare', sendPrepareFilter, numShards);
+ assert.eq(numShards, sendPrepareOp.length);
+
+ const writingDecisionFilter = makeWorkerFilterWithAction(session, "writingDecision", txnNumber);
+ let writeDecisionOp = curOpAfterFailpoint('hangBeforeWritingDecision', writingDecisionFilter);
+ assert.eq(1, writeDecisionOp.length);
+
+ const sendCommitFilter = makeWorkerFilterWithAction(session, "sendingCommit", txnNumber);
+ let sendCommitOp = curOpAfterFailpoint('hangBeforeSendingCommit', sendCommitFilter, numShards);
+ assert.eq(numShards, sendCommitOp.length);
+
+ const deletingCoordinatorFilter =
+ makeWorkerFilterWithAction(session, "deletingCoordinatorDoc", txnNumber);
+ let deletingCoordinatorDocOp =
+ curOpAfterFailpoint('hangBeforeDeletingCoordinatorDoc', deletingCoordinatorFilter);
+ assert.eq(1, deletingCoordinatorDocOp.length);
+
+ commitJoin();
+}
+
+jsTest.log("Testing that coordinator threads show up in currentOp for an abort decision.");
+{
+ let session = adminDB.getMongo().startSession();
+ startTransaction(session, collectionName, 2);
+ let txnNumber = session.getTxnNumber_forTesting();
+ let lsid = session.getSessionId();
+ // Manually abort the transaction on one of the participants, so that the participant fails to
+ // prepare and failpoint is triggered on the coordinator.
+ assert.commandWorked(participant.adminCommand({
+ abortTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ stmtId: NumberInt(0),
+ autocommit: false,
+ }));
+ let commitJoin = commitTxn(st, lsid, txnNumber, ErrorCodes.NoSuchTransaction);
+
+ const sendAbortFilter = makeWorkerFilterWithAction(session, "sendingAbort", txnNumber);
+ let sendingAbortOp = curOpAfterFailpoint('hangBeforeSendingAbort', sendAbortFilter, numShards);
+ assert.eq(numShards, sendingAbortOp.length);
+
+ commitJoin();
+}
+
+st.stop();
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 25efa885ef3..c41118f6cf3 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -531,5 +531,5 @@ env.CppUnitTest(
'pipeline',
'process_interface_shardsvr',
'process_interface_standalone',
- ],
+ ]
)
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index ac6198dd79f..899cc6b81b2 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/transaction_coordinator_worker_curop_info.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/fill_locker_info.h"
@@ -599,6 +600,10 @@ BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient(
fillLockerInfo(*lockerInfo, builder);
}
+ if (auto tcWorkerRepo = getTransactionCoordinatorWorkerCurOpRepository()) {
+ tcWorkerRepo->reportState(clientOpCtx, &builder);
+ }
+
auto flowControlStats = clientOpCtx->lockState()->getFlowControlStats();
flowControlStats.writeToBuilder(builder);
}
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 336afc36aa8..9f370363f73 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -19,6 +19,7 @@ env.Library(
'sharding_migration_critical_section.cpp',
'sharding_state.cpp',
'transaction_coordinator_factory.cpp',
+ 'transaction_coordinator_worker_curop_repository.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
@@ -132,6 +133,7 @@ env.Library(
'transaction_coordinator_service.cpp',
'transaction_coordinator_structures.cpp',
'transaction_coordinator_util.cpp',
+ 'transaction_coordinator_worker_curop_repository_mongod.cpp',
'transaction_coordinator.cpp',
'wait_for_majority_service.cpp',
env.Idlc('transaction_coordinator_document.idl')[0],
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
index 79128137b6b..59ae621bc59 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
@@ -74,7 +74,10 @@ AsyncWorkScheduler::~AsyncWorkScheduler() {
}
Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemoteCommand(
- const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj) {
+ const ShardId& shardId,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& commandObj,
+ OperationContextFn operationContextFn) {
const bool isSelfShard = (shardId == getLocalShardId(_serviceContext));
@@ -83,8 +86,10 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot
// rather than going through the host targeting below. This ensures that the state changes
// for the participant and coordinator occur sequentially on a single branch of replica set
// history. See SERVER-38142 for details.
- return scheduleWork([this, shardId, commandObj = commandObj.getOwned()](
+ return scheduleWork([this, shardId, operationContextFn, commandObj = commandObj.getOwned()](
OperationContext* opCtx) {
+ operationContextFn(opCtx);
+
// Note: This internal authorization is tied to the lifetime of the client, which will
// be destroyed by 'scheduleWork' immediately after this lambda ends
AuthorizationSession::get(opCtx->getClient())
@@ -113,7 +118,7 @@ Future<executor::TaskExecutor::ResponseStatus> AsyncWorkScheduler::scheduleRemot
});
}
- return _targetHostAsync(shardId, readPref)
+ return _targetHostAsync(shardId, readPref, operationContextFn)
.then([this, shardId, commandObj = commandObj.getOwned(), readPref](
HostAndShard hostAndShard) mutable {
executor::RemoteCommandRequest request(hostAndShard.hostTargeted,
@@ -218,8 +223,11 @@ void AsyncWorkScheduler::join() {
}
Future<AsyncWorkScheduler::HostAndShard> AsyncWorkScheduler::_targetHostAsync(
- const ShardId& shardId, const ReadPreferenceSetting& readPref) {
- return scheduleWork([shardId, readPref](OperationContext* opCtx) {
+ const ShardId& shardId,
+ const ReadPreferenceSetting& readPref,
+ OperationContextFn operationContextFn) {
+ return scheduleWork([shardId, readPref, operationContextFn](OperationContext* opCtx) {
+ operationContextFn(opCtx);
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h
index 7aef1fc8e78..eb769319aad 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.h
@@ -42,6 +42,9 @@
#include "mongo/util/time_support.h"
namespace mongo {
+
+using OperationContextFn = std::function<void(OperationContext*)>;
+
namespace txn {
/**
@@ -131,7 +134,10 @@ public:
* completes (with error or not).
*/
Future<executor::TaskExecutor::ResponseStatus> scheduleRemoteCommand(
- const ShardId& shardId, const ReadPreferenceSetting& readPref, const BSONObj& commandObj);
+ const ShardId& shardId,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& commandObj,
+ OperationContextFn operationContextFn = [](OperationContext*) {});
/**
* Allows sub-tasks on this scheduler to be grouped together and works-around the fact that
@@ -176,7 +182,9 @@ private:
* Finds the host and port for a shard id, returning it and the shard object used for targeting.
*/
Future<HostAndShard> _targetHostAsync(const ShardId& shardId,
- const ReadPreferenceSetting& readPref);
+ const ReadPreferenceSetting& readPref,
+ OperationContextFn operationContextFn =
+ [](OperationContext*) {});
/**
* Returns true when all the registered child schedulers, op contexts and handles have joined.
diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp
index dbffc60de1d..237ec82525e 100644
--- a/src/mongo/db/s/transaction_coordinator_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_util.cpp
@@ -36,9 +36,12 @@
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/transaction_coordinator_futures_util.h"
+#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/fail_point_service.h"
@@ -48,13 +51,16 @@ namespace mongo {
namespace txn {
namespace {
-MONGO_FAIL_POINT_DEFINE(hangAfterDeletingCoordinatorDoc);
-
MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList);
+MONGO_FAIL_POINT_DEFINE(hangBeforeSendingPrepare);
MONGO_FAIL_POINT_DEFINE(hangBeforeWritingDecision);
+MONGO_FAIL_POINT_DEFINE(hangBeforeSendingCommit);
+MONGO_FAIL_POINT_DEFINE(hangBeforeSendingAbort);
MONGO_FAIL_POINT_DEFINE(hangBeforeDeletingCoordinatorDoc);
+MONGO_FAIL_POINT_DEFINE(hangAfterDeletingCoordinatorDoc);
using ResponseStatus = executor::TaskExecutor::ResponseStatus;
+using CoordinatorAction = TransactionCoordinatorWorkerCurOpRepository::CoordinatorAction;
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
@@ -177,6 +183,8 @@ Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
[](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); },
[&scheduler, lsid, txnNumber, participants] {
return scheduler.scheduleWork([lsid, txnNumber, participants](OperationContext* opCtx) {
+ getTransactionCoordinatorWorkerCurOpRepository()->set(
+ opCtx, lsid, txnNumber, CoordinatorAction::kWritingParticipantList);
return persistParticipantListBlocking(opCtx, lsid, txnNumber, participants);
});
});
@@ -226,9 +234,20 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
// vector of responses.
auto prepareScheduler = scheduler.makeChildScheduler();
+ OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) {
+ invariant(opCtx);
+ getTransactionCoordinatorWorkerCurOpRepository()->set(
+ opCtx, lsid, txnNumber, CoordinatorAction::kSendingPrepare);
+
+ if (MONGO_FAIL_POINT(hangBeforeSendingPrepare)) {
+ LOG(0) << "Hit hangBeforeSendingPrepare failpoint";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangBeforeSendingPrepare);
+ }
+ };
+
for (const auto& participant : participants) {
- responses.emplace_back(
- sendPrepareToShard(service, *prepareScheduler, participant, prepareObj));
+ responses.emplace_back(sendPrepareToShard(
+ service, *prepareScheduler, participant, prepareObj, operationContextFn));
}
// Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp
@@ -352,6 +371,8 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler,
[&scheduler, lsid, txnNumber, participants, decision] {
return scheduler.scheduleWork(
[lsid, txnNumber, participants, decision](OperationContext* opCtx) {
+ getTransactionCoordinatorWorkerCurOpRepository()->set(
+ opCtx, lsid, txnNumber, CoordinatorAction::kWritingDecision);
return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision);
});
});
@@ -370,9 +391,21 @@ Future<void> sendCommit(ServiceContext* service,
BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
<< WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) {
+ invariant(opCtx);
+ getTransactionCoordinatorWorkerCurOpRepository()->set(
+ opCtx, lsid, txnNumber, CoordinatorAction::kSendingCommit);
+
+ if (MONGO_FAIL_POINT(hangBeforeSendingCommit)) {
+ LOG(0) << "Hit hangBeforeSendingCommit failpoint";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangBeforeSendingCommit);
+ }
+ };
+
std::vector<Future<void>> responses;
for (const auto& participant : participants) {
- responses.push_back(sendDecisionToShard(service, scheduler, participant, commitObj));
+ responses.push_back(
+ sendDecisionToShard(service, scheduler, participant, commitObj, operationContextFn));
}
return txn::whenAll(responses);
}
@@ -388,9 +421,21 @@ Future<void> sendAbort(ServiceContext* service,
BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
<< WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) {
+ invariant(opCtx);
+ getTransactionCoordinatorWorkerCurOpRepository()->set(
+ opCtx, lsid, txnNumber, CoordinatorAction::kSendingAbort);
+
+ if (MONGO_FAIL_POINT(hangBeforeSendingAbort)) {
+ LOG(0) << "Hit hangBeforeSendingAbort failpoint";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangBeforeSendingAbort);
+ }
+ };
+
std::vector<Future<void>> responses;
for (const auto& participant : participants) {
- responses.push_back(sendDecisionToShard(service, scheduler, participant, abortObj));
+ responses.push_back(
+ sendDecisionToShard(service, scheduler, participant, abortObj, operationContextFn));
}
return txn::whenAll(responses);
}
@@ -471,15 +516,17 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
TxnNumber txnNumber) {
- return txn::doWhile(scheduler,
- boost::none /* no need for a backoff */,
- [](const Status& s) { return s == ErrorCodes::Interrupted; },
- [&scheduler, lsid, txnNumber] {
- return scheduler.scheduleWork(
- [lsid, txnNumber](OperationContext* opCtx) {
- deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber);
- });
- });
+ return txn::doWhile(
+ scheduler,
+ boost::none /* no need for a backoff */,
+ [](const Status& s) { return s == ErrorCodes::Interrupted; },
+ [&scheduler, lsid, txnNumber] {
+ return scheduler.scheduleWork([lsid, txnNumber](OperationContext* opCtx) {
+ getTransactionCoordinatorWorkerCurOpRepository()->set(
+ opCtx, lsid, txnNumber, CoordinatorAction::kDeletingCoordinatorDoc);
+ deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber);
+ });
+ });
}
std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx) {
@@ -503,9 +550,9 @@ std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationCont
Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const ShardId& shardId,
- const BSONObj& commandObj) {
+ const BSONObj& commandObj,
+ OperationContextFn operationContextFn) {
const bool isLocalShard = (shardId == txn::getLocalShardId(service));
-
auto f = txn::doWhile(
scheduler,
kExponentialBackoff,
@@ -516,11 +563,17 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
swPrepareResponse != ErrorCodes::TransactionCoordinatorSteppingDown &&
swPrepareResponse != ErrorCodes::TransactionCoordinatorReachedAbortDecision;
},
- [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned()] {
+ [&scheduler,
+ shardId,
+ isLocalShard,
+ commandObj = commandObj.getOwned(),
+ operationContextFn] {
LOG(3) << "Coordinator going to send command " << commandObj << " to "
<< (isLocalShard ? " local " : "") << " shard " << shardId;
- return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
+ return scheduler
+ .scheduleRemoteCommand(
+ shardId, kPrimaryReadPreference, commandObj, operationContextFn)
.then([shardId, commandObj = commandObj.getOwned()](ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
@@ -597,7 +650,8 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
Future<void> sendDecisionToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const ShardId& shardId,
- const BSONObj& commandObj) {
+ const BSONObj& commandObj,
+ OperationContextFn operationContextFn) {
const bool isLocalShard = (shardId == txn::getLocalShardId(service));
return txn::doWhile(
@@ -608,11 +662,17 @@ Future<void> sendDecisionToShard(ServiceContext* service,
// coordinator-specific code.
return !s.isOK() && s != ErrorCodes::TransactionCoordinatorSteppingDown;
},
- [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned()] {
+ [&scheduler,
+ shardId,
+ isLocalShard,
+ operationContextFn,
+ commandObj = commandObj.getOwned()] {
LOG(3) << "Coordinator going to send command " << commandObj << " to "
<< (isLocalShard ? "local" : "") << " shard " << shardId;
- return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
+ return scheduler
+ .scheduleRemoteCommand(
+ shardId, kPrimaryReadPreference, commandObj, operationContextFn)
.then([shardId, commandObj = commandObj.getOwned()](ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h
index b83afda06e1..0b020c3dddc 100644
--- a/src/mongo/db/s/transaction_coordinator_util.h
+++ b/src/mongo/db/s/transaction_coordinator_util.h
@@ -205,7 +205,9 @@ struct PrepareResponse {
Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const ShardId& shardId,
- const BSONObj& prepareCommandObj);
+ const BSONObj& prepareCommandObj,
+ OperationContextFn operationContextFn =
+ [](OperationContext*) {});
/**
* Sends a command corresponding to a commit decision (i.e. commitTransaction or*
@@ -221,7 +223,8 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
Future<void> sendDecisionToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const ShardId& shardId,
- const BSONObj& commandObj);
+ const BSONObj& commandObj,
+ OperationContextFn operationContextFn = [](OperationContext*) {});
} // namespace txn
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_info.cpp b/src/mongo/db/s/transaction_coordinator_worker_curop_info.cpp
new file mode 100644
index 00000000000..ead0c5e48ac
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_worker_curop_info.cpp
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/transaction_coordinator_worker_curop_info.h"
+
+#include "mongo/base/shim.h"
+
+namespace mongo {
+
+MONGO_DEFINE_SHIM(getTransactionCoordinatorWorkerCurOpRepository);
+
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_info.h b/src/mongo/db/s/transaction_coordinator_worker_curop_info.h
new file mode 100644
index 00000000000..461fed02ec2
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_worker_curop_info.h
@@ -0,0 +1,57 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h"
+
+#include "mongo/base/shim.h"
+#include "mongo/db/curop.h"
+
+namespace mongo {
+class TransactionCoordinatorWorkerCurOpInfo {
+ using CoordinatorAction = TransactionCoordinatorWorkerCurOpRepository::CoordinatorAction;
+ TransactionCoordinatorWorkerCurOpInfo() = delete;
+
+public:
+ TransactionCoordinatorWorkerCurOpInfo(LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ Date_t startTime,
+ CoordinatorAction action);
+
+ void reportState(BSONObjBuilder* parent) const;
+
+private:
+ static const std::string toString(CoordinatorAction action);
+
+ LogicalSessionId _lsid;
+ TxnNumber _txnNumber;
+ Date_t _startTime;
+ CoordinatorAction _action;
+};
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_repository.cpp b/src/mongo/db/s/transaction_coordinator_worker_curop_repository.cpp
new file mode 100644
index 00000000000..4c9e0d103b8
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_worker_curop_repository.cpp
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h"
+
+namespace mongo {
+
+MONGO_DEFINE_SHIM(getTransactionCoordinatorWorkerCurOpRepository);
+
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_repository.h b/src/mongo/db/s/transaction_coordinator_worker_curop_repository.h
new file mode 100644
index 00000000000..a19f72fea12
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_worker_curop_repository.h
@@ -0,0 +1,70 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/base/shim.h"
+#include "mongo/db/curop.h"
+
+namespace mongo {
+class TransactionCoordinatorWorkerCurOpRepository;
+
+class TransactionCoordinatorWorkerCurOpRepository {
+public:
+ TransactionCoordinatorWorkerCurOpRepository() {}
+ virtual ~TransactionCoordinatorWorkerCurOpRepository() {}
+
+ enum class CoordinatorAction {
+ kWritingParticipantList,
+ kSendingPrepare,
+ kWritingDecision,
+ kSendingCommit,
+ kSendingAbort,
+ kDeletingCoordinatorDoc
+ };
+
+ /**
+ * Associates the two phase commit coordinator transaction information with the
+ * OperationContext instance.
+ *
+ * This method takes the associated client lock from the OperationContext.
+ */
+ virtual void set(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber txnNumber,
+ const CoordinatorAction action) = 0;
+
+ /**
+ * Output the state into BSON previously associated with this OperationContext instance.
+ */
+ virtual void reportState(OperationContext* opCtx, BSONObjBuilder* parent) const = 0;
+};
+
+extern MONGO_DECLARE_SHIM(()->std::shared_ptr<TransactionCoordinatorWorkerCurOpRepository>)
+ getTransactionCoordinatorWorkerCurOpRepository;
+} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_worker_curop_repository_mongod.cpp b/src/mongo/db/s/transaction_coordinator_worker_curop_repository_mongod.cpp
new file mode 100644
index 00000000000..a805e6f69f8
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_worker_curop_repository_mongod.cpp
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/db/s/transaction_coordinator_worker_curop_info.h"
+
+#include "mongo/base/shim.h"
+
+namespace mongo {
+const auto getTransactionCoordinatorWorkerCurOpInfo =
+ OperationContext::declareDecoration<boost::optional<TransactionCoordinatorWorkerCurOpInfo>>();
+
+class MongoDTransactionCoordinatorWorkerCurOpRepository final
+ : public TransactionCoordinatorWorkerCurOpRepository {
+public:
+ MongoDTransactionCoordinatorWorkerCurOpRepository() {}
+
+ void set(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber txnNumber,
+ const CoordinatorAction action) override {
+ auto startTime = opCtx->getServiceContext()->getPreciseClockSource()->now();
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ getTransactionCoordinatorWorkerCurOpInfo(opCtx).emplace(
+ TransactionCoordinatorWorkerCurOpInfo(lsid, txnNumber, startTime, action));
+ }
+
+ /**
+ * Caller should hold the client lock associated with the OperationContext.
+ */
+ void reportState(OperationContext* opCtx, BSONObjBuilder* parent) const override {
+ if (auto info = getTransactionCoordinatorWorkerCurOpInfo(opCtx)) {
+ info->reportState(parent);
+ }
+ }
+};
+
+const auto _transactionCoordinatorWorkerCurOpRepository =
+ std::make_shared<MongoDTransactionCoordinatorWorkerCurOpRepository>();
+
+MONGO_REGISTER_SHIM(getTransactionCoordinatorWorkerCurOpRepository)
+()->std::shared_ptr<TransactionCoordinatorWorkerCurOpRepository> {
+ return _transactionCoordinatorWorkerCurOpRepository;
+}
+
+TransactionCoordinatorWorkerCurOpInfo::TransactionCoordinatorWorkerCurOpInfo(
+ LogicalSessionId lsid, TxnNumber txnNumber, Date_t startTime, CoordinatorAction action)
+ : _lsid(lsid), _txnNumber(txnNumber), _startTime(startTime), _action(action) {}
+
+
+const std::string TransactionCoordinatorWorkerCurOpInfo::toString(CoordinatorAction action) {
+ switch (action) {
+ case CoordinatorAction::kSendingPrepare:
+ return "sendingPrepare";
+ case CoordinatorAction::kSendingCommit:
+ return "sendingCommit";
+ case CoordinatorAction::kSendingAbort:
+ return "sendingAbort";
+ case CoordinatorAction::kWritingParticipantList:
+ return "writingParticipantList";
+ case CoordinatorAction::kWritingDecision:
+ return "writingDecision";
+ case CoordinatorAction::kDeletingCoordinatorDoc:
+ return "deletingCoordinatorDoc";
+ default:
+ MONGO_UNREACHABLE
+ }
+}
+
+void TransactionCoordinatorWorkerCurOpInfo::reportState(BSONObjBuilder* parent) const {
+ invariant(parent);
+ BSONObjBuilder twoPhaseCoordinatorBuilder;
+ BSONObjBuilder lsidBuilder(twoPhaseCoordinatorBuilder.subobjStart("lsid"));
+ _lsid.serialize(&lsidBuilder);
+ lsidBuilder.doneFast();
+ twoPhaseCoordinatorBuilder.append("txnNumber", _txnNumber);
+ twoPhaseCoordinatorBuilder.append("action", toString(_action));
+ twoPhaseCoordinatorBuilder.append("startTime", dateToISOStringUTC(_startTime));
+ parent->append("twoPhaseCommitCoordinator", twoPhaseCoordinatorBuilder.obj());
+}
+} // namespace mongo
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index 0f52b7a00b9..639b44e6e13 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -45,6 +45,7 @@
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(hangAfterStartingCoordinateCommit);
MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic);
class PrepareTransactionCmd : public TypedCommand<PrepareTransactionCmd> {
@@ -233,6 +234,12 @@ public:
*opCtx->getTxnNumber(),
validateParticipants(opCtx, cmd.getParticipants()));
+ if (MONGO_FAIL_POINT(hangAfterStartingCoordinateCommit)) {
+ LOG(0) << "Hit hangAfterStartingCoordinateCommit failpoint";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx,
+ hangAfterStartingCoordinateCommit);
+ }
+
ON_BLOCK_EXIT([opCtx] {
// A decision will most likely have been written from a different OperationContext
// (in all cases except the one where this command aborts the local participant), so
diff --git a/src/mongo/embedded/SConscript b/src/mongo/embedded/SConscript
index fffa90c17f2..5e1c75c9a2e 100644
--- a/src/mongo/embedded/SConscript
+++ b/src/mongo/embedded/SConscript
@@ -78,6 +78,7 @@ env.Library(
'replication_coordinator_embedded.cpp',
'service_entry_point_embedded.cpp',
'transaction_coordinator_factory_embedded.cpp',
+ 'transaction_coordinator_worker_curop_repository_embedded.cpp',
env.Idlc('embedded_options.idl')[0],
],
LIBDEPS=[
diff --git a/src/mongo/embedded/transaction_coordinator_worker_curop_repository_embedded.cpp b/src/mongo/embedded/transaction_coordinator_worker_curop_repository_embedded.cpp
new file mode 100644
index 00000000000..a330a42c2d0
--- /dev/null
+++ b/src/mongo/embedded/transaction_coordinator_worker_curop_repository_embedded.cpp
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/base/shim.h"
+
+#include "mongo/db/s/transaction_coordinator_worker_curop_repository.h"
+
+namespace mongo {
+class NoOpTransactionCoordinatorWorkerCurOpRepository final
+ : public TransactionCoordinatorWorkerCurOpRepository {
+public:
+ NoOpTransactionCoordinatorWorkerCurOpRepository() {}
+
+ void set(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber txnNumber,
+ const CoordinatorAction action) override {}
+
+ void reportState(OperationContext* opCtx, BSONObjBuilder* parent) const override {}
+};
+
+const auto _transactionCoordinatorWorkerCurOpRepository =
+ std::make_shared<NoOpTransactionCoordinatorWorkerCurOpRepository>();
+
+MONGO_REGISTER_SHIM(getTransactionCoordinatorWorkerCurOpRepository)
+()->std::shared_ptr<TransactionCoordinatorWorkerCurOpRepository> {
+ return _transactionCoordinatorWorkerCurOpRepository;
+}
+} // namespace mongo