summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/transaction_coordinator_curop_info.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/noPassthrough/transaction_coordinator_curop_info.js')
-rw-r--r--jstests/noPassthrough/transaction_coordinator_curop_info.js177
1 files changed, 177 insertions, 0 deletions
diff --git a/jstests/noPassthrough/transaction_coordinator_curop_info.js b/jstests/noPassthrough/transaction_coordinator_curop_info.js
new file mode 100644
index 00000000000..7341009e8a7
--- /dev/null
+++ b/jstests/noPassthrough/transaction_coordinator_curop_info.js
@@ -0,0 +1,177 @@
+/**
+ * Tests that the transaction items in the 'twoPhaseCommitCoordinator' object in currentOp() are
+ * being tracked correctly.
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+
+(function() {
+'use strict';
+load('jstests/sharding/libs/sharded_transactions_helpers.js');
+
+function commitTxn(st, lsid, txnNumber, expectedError = null) {
+ let cmd = "db.adminCommand({" +
+ "commitTransaction: 1," +
+ "lsid: " + tojson(lsid) + "," +
+ "txnNumber: NumberLong(" + txnNumber + ")," +
+ "stmtId: NumberInt(0)," +
+ "autocommit: false," +
+ "})";
+ if (expectedError) {
+ cmd = "assert.commandFailedWithCode(" + cmd + "," + String(expectedError) + ");";
+ } else {
+ cmd = "assert.commandWorked(" + cmd + ");";
+ }
+ return startParallelShell(cmd, st.s.port);
+}
+
+function curOpAfterFailpoint(fpName, filter, fpCount = 1) {
+ const expectedLog = "Hit " + fpName + " failpoint";
+ jsTest.log(`waiting for failpoint '${fpName}' to appear in the log ${fpCount} time(s).`);
+ waitForFailpoint(expectedLog, fpCount);
+
+ jsTest.log(`Running curOp operation after '${fpName}' failpoint.`);
+ let result = adminDB.aggregate([{$currentOp: {}}, {$match: filter}]).toArray();
+
+ jsTest.log(`${result.length} matching curOp entries after '${fpName}':\n${tojson(result)}`);
+
+ jsTest.log(`disable '${fpName}' failpoint.`);
+ assert.commandWorked(coordinator.adminCommand({
+ configureFailPoint: fpName,
+ mode: "off",
+ }));
+
+ return result;
+}
+
+function makeWorkerFilterWithAction(session, action, txnNumber) {
+ return {
+ active: true,
+ 'twoPhaseCommitCoordinator.lsid.id': session.getSessionId().id,
+ 'twoPhaseCommitCoordinator.txnNumber': NumberLong(txnNumber),
+ 'twoPhaseCommitCoordinator.action': action,
+ 'twoPhaseCommitCoordinator.startTime': {$exists: true}
+ };
+}
+
+function enableFailPoints(shard, failPoints) {
+ jsTest.log(`enabling the following failpoints: ${tojson(failPoints)}`);
+ failPoints.forEach(function(fpName) {
+ assert.commandWorked(shard.adminCommand({
+ configureFailPoint: fpName,
+ mode: "alwaysOn",
+ }));
+ });
+}
+
+function startTransaction(session, collectionName, insertValue) {
+ const dbName = session.getDatabase('test');
+ jsTest.log(`Starting a new transaction on ${dbName}.${collectionName}`);
+ session.startTransaction();
+ // insert into both shards
+ assert.commandWorked(dbName[collectionName].insert({_id: -1 * insertValue}));
+ assert.commandWorked(dbName[collectionName].insert({_id: insertValue}));
+}
+
+// Setup test
+const numShards = 2;
+const st = new ShardingTest({shards: numShards, config: 1});
+const dbName = "test";
+const collectionName = 'currentop_two_phase';
+const ns = dbName + "." + collectionName;
+const adminDB = st.s.getDB('admin');
+const coordinator = st.shard0;
+const participant = st.shard1;
+const failPoints = [
+ 'hangAfterStartingCoordinateCommit',
+ 'hangBeforeWritingParticipantList',
+ 'hangBeforeSendingPrepare',
+ 'hangBeforeWritingDecision',
+ 'hangBeforeSendingCommit',
+ 'hangBeforeDeletingCoordinatorDoc',
+ 'hangBeforeSendingAbort'
+];
+
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: coordinator.shardName}));
+assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
+assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: participant.shardName}));
+assert.commandWorked(coordinator.adminCommand({_flushRoutingTableCacheUpdates: ns}));
+assert.commandWorked(participant.adminCommand({_flushRoutingTableCacheUpdates: ns}));
+
+enableFailPoints(coordinator, failPoints);
+
+jsTest.log("Testing that coordinator threads show up in currentOp for a commit decision");
+{
+ let session = adminDB.getMongo().startSession();
+ startTransaction(session, collectionName, 1);
+ let txnNumber = session.getTxnNumber_forTesting();
+ let lsid = session.getSessionId();
+ let commitJoin = commitTxn(st, lsid, txnNumber);
+
+ const coordinateCommitFilter = {
+ active: true,
+ 'command.coordinateCommitTransaction': 1,
+ 'command.lsid.id': session.getSessionId().id,
+ 'command.txnNumber': NumberLong(txnNumber),
+ 'command.coordinator': true,
+ 'command.autocommit': false
+ };
+ let createCoordinateCommitTxnOp =
+ curOpAfterFailpoint("hangAfterStartingCoordinateCommit", coordinateCommitFilter);
+ assert.eq(1, createCoordinateCommitTxnOp.length);
+
+ const writeParticipantFilter =
+ makeWorkerFilterWithAction(session, "writingParticipantList", txnNumber);
+ let writeParticipantOp =
+ curOpAfterFailpoint('hangBeforeWritingParticipantList', writeParticipantFilter);
+ assert.eq(1, writeParticipantOp.length);
+
+ const sendPrepareFilter = makeWorkerFilterWithAction(session, "sendingPrepare", txnNumber);
+ let sendPrepareOp =
+ curOpAfterFailpoint('hangBeforeSendingPrepare', sendPrepareFilter, numShards);
+ assert.eq(numShards, sendPrepareOp.length);
+
+ const writingDecisionFilter = makeWorkerFilterWithAction(session, "writingDecision", txnNumber);
+ let writeDecisionOp = curOpAfterFailpoint('hangBeforeWritingDecision', writingDecisionFilter);
+ assert.eq(1, writeDecisionOp.length);
+
+ const sendCommitFilter = makeWorkerFilterWithAction(session, "sendingCommit", txnNumber);
+ let sendCommitOp = curOpAfterFailpoint('hangBeforeSendingCommit', sendCommitFilter, numShards);
+ assert.eq(numShards, sendCommitOp.length);
+
+ const deletingCoordinatorFilter =
+ makeWorkerFilterWithAction(session, "deletingCoordinatorDoc", txnNumber);
+ let deletingCoordinatorDocOp =
+ curOpAfterFailpoint('hangBeforeDeletingCoordinatorDoc', deletingCoordinatorFilter);
+ assert.eq(1, deletingCoordinatorDocOp.length);
+
+ commitJoin();
+}
+
+jsTest.log("Testing that coordinator threads show up in currentOp for an abort decision.");
+{
+ let session = adminDB.getMongo().startSession();
+ startTransaction(session, collectionName, 2);
+ let txnNumber = session.getTxnNumber_forTesting();
+ let lsid = session.getSessionId();
+ // Manually abort the transaction on one of the participants, so that the participant fails to
+ // prepare and failpoint is triggered on the coordinator.
+ assert.commandWorked(participant.adminCommand({
+ abortTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ stmtId: NumberInt(0),
+ autocommit: false,
+ }));
+ let commitJoin = commitTxn(st, lsid, txnNumber, ErrorCodes.NoSuchTransaction);
+
+ const sendAbortFilter = makeWorkerFilterWithAction(session, "sendingAbort", txnNumber);
+ let sendingAbortOp = curOpAfterFailpoint('hangBeforeSendingAbort', sendAbortFilter, numShards);
+ assert.eq(numShards, sendingAbortOp.length);
+
+ commitJoin();
+}
+
+st.stop();
+})();