diff options
-rw-r--r-- | jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js | 27 | ||||
-rw-r--r-- | jstests/noPassthrough/router_transaction_current_op.js | 162 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 58 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.h | 4 | ||||
-rw-r--r-- | src/mongo/s/transaction_router_test.cpp | 3 |
5 files changed, 210 insertions, 44 deletions
diff --git a/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js b/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js index 2a7383b37ea..c8e69017336 100644 --- a/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js +++ b/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js @@ -77,20 +77,19 @@ var $config = extendWorkload($config, function($config, $super) { $config.states.runCurrentOp = function runCurrentOp(db, collName) { const admin = db.getSiblingDB("admin"); - const mongosSessionsWithTransactions = - admin - .aggregate([ - { - $currentOp: { - allUsers: true, - idleSessions: true, - idleConnections: true, - localOps: true - } - }, - {$match: {$or: [{type: 'idleSession'}, {type: 'activeSession'}]}} - ]) - .toArray(); + const mongosSessionsWithTransactions = admin + .aggregate([ + { + $currentOp: { + allUsers: true, + idleSessions: true, + idleConnections: true, + localOps: true + } + }, + {$match: {transaction: {$exists: true}}} + ]) + .toArray(); this.verifyMongosSessionsWithTxns(mongosSessionsWithTransactions); }; diff --git a/jstests/noPassthrough/router_transaction_current_op.js b/jstests/noPassthrough/router_transaction_current_op.js new file mode 100644 index 00000000000..dab93132455 --- /dev/null +++ b/jstests/noPassthrough/router_transaction_current_op.js @@ -0,0 +1,162 @@ +// Verifies currentOp returns the expected fields for idle and active transactions in basic cases. +// More cases are covered in unit tests. +// @tags: [uses_transactions] +(function() { +"use strict"; + +load("jstests/libs/parallelTester.js"); // for Thread. + +function verifyCurrentOpFields(res, isActive) { + // Verify top level fields relevant to transactions. Note this does not include every field, so + // the number of fields in the response shouldn't be asserted on. + + const expectedFields = [ + "type", + "host", + "desc", + "connectionId", + "client", + "appName", + "clientMetadata", + "active", + "lsid", + "transaction", + ]; + + assert.hasFields(res, expectedFields, tojson(res)); + + if (isActive) { + assert.eq(res.type, "op", tojson(res)); + } else { + assert.eq(res.type, "idleSession", tojson(res)); + assert.eq(res.desc, "inactive transaction", tojson(res)); + } + + // Verify the transactions sub object. + + const transaction = res.transaction; + const expectedTransactionsFields = [ + "parameters", + "startWallClockTime", + "timeOpenMicros", + "timeActiveMicros", + "timeInactiveMicros", + "globalReadTimestamp", + "numParticipants", + "participants", + "numNonReadOnlyParticipants", + "numReadOnlyParticipants", + // Commit hasn't started so don't expect 'commitStartWallClockTime' or 'commitType'. + ]; + + assert.hasFields(transaction, expectedTransactionsFields, tojson(transaction)); + assert.eq( + expectedTransactionsFields.length, Object.keys(transaction).length, tojson(transaction)); + + // Verify transaction parameters sub object. + + const parameters = transaction.parameters; + const expectedParametersFields = [ + "txnNumber", + "autocommit", + "readConcern", + ]; + + assert.hasFields(parameters, expectedParametersFields, tojson(parameters)); + assert.eq(expectedParametersFields.length, Object.keys(parameters).length, tojson(parameters)); + + // Verify participants sub array. + + const participants = transaction.participants; + const expectedParticipantFields = [ + "name", + "coordinator", + // 'readOnly' will not be set until a response has been received from that participant, so + // it will not be present for the active transaction because of the failpoint and is handled + // specially. + ]; + + participants.forEach((participant) => { + assert.hasFields(participant, expectedParticipantFields, tojson(participant)); + if (isActive) { + // 'readOnly' should not be set. + assert.eq(expectedParticipantFields.length, + Object.keys(participant).length, + tojson(participant)); + } else { + // 'readOnly' should always be set for the inactive transaction. + assert.hasFields(participant, ["readOnly"], tojson(participant)); + assert.eq(expectedParticipantFields.length + 1, // +1 for readOnly. + Object.keys(participant).length, + tojson(participant)); + } + }); +} + +function getCurrentOpForFilter(st, matchFilter) { + const res = st.s.getDB("admin") + .aggregate([{$currentOp: {localOps: true}}, {$match: matchFilter}]) + .toArray(); + assert.eq(1, res.length, res); + return res[0]; +} + +const dbName = "test"; +const collName = "foo"; +const st = new ShardingTest({shards: 1, config: 1}); + +const session = st.s.startSession(); +const sessionDB = session.getDatabase(dbName); + +// Insert a document to set up a collection. +assert.commandWorked(sessionDB[collName].insert({x: 1})); + +jsTest.log("Inactive transaction."); +(() => { + session.startTransaction({readConcern: {level: "snapshot"}}); + assert.eq(1, sessionDB[collName].find({x: 1}).itcount()); + + const res = getCurrentOpForFilter(st, {"lsid.id": session.getSessionId().id}); + verifyCurrentOpFields(res, false /* isActive */); + + assert.commandWorked(session.abortTransaction_forTesting()); +})(); + +jsTest.log("Active transaction."); +(() => { + assert.commandWorked(st.rs0.getPrimary().adminCommand( + {configureFailPoint: "waitInFindBeforeMakingBatch", mode: "alwaysOn"})); + + const txnThread = new Thread(function(host, dbName, collName) { + const mongosConn = new Mongo(host); + const threadSession = mongosConn.startSession(); + + threadSession.startTransaction({readConcern: {level: "snapshot"}}); + assert.commandWorked(threadSession.getDatabase(dbName).runCommand( + {find: collName, filter: {}, comment: "active_txn_find"})); + + assert.commandWorked(threadSession.abortTransaction_forTesting()); + threadSession.endSession(); + }, st.s.host, dbName, collName); + txnThread.start(); + + // Wait until we know the failpoint has been reached. + assert.soon(function() { + const filter = {"msg": "waitInFindBeforeMakingBatch"}; + return assert.commandWorked(st.rs0.getPrimary().getDB("admin").currentOp(filter)) + .inprog.length === 1; + }); + + // We don't know the id of the session started by the parallel thread, so use the find's comment + // to get its currentOp output. + const res = getCurrentOpForFilter(st, {"command.comment": "active_txn_find"}); + verifyCurrentOpFields(res, true /* isActive */); + + assert.commandWorked(st.rs0.getPrimary().adminCommand( + {configureFailPoint: "waitInFindBeforeMakingBatch", mode: "off"})); + txnThread.join(); +})(); + +session.endSession(); +st.stop(); +}()); diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 5149cd85809..8a57e3a05d7 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -303,29 +303,39 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx, return; } - // Append relevant client metadata. + // Append relevant client metadata for transactions with inactive sessions. For those with + // active sessions, these fields will already be in the output. - builder->append("type", sessionIsActive ? "activeSession" : "idleSession"); - builder->append("host", getHostNameCachedAndPort()); - builder->append("desc", sessionIsActive ? "active transaction" : "inactive transaction"); + if (!sessionIsActive) { + builder->append("type", "idleSession"); + builder->append("host", getHostNameCachedAndPort()); + builder->append("desc", "inactive transaction"); - const auto& lastClientInfo = o().lastClientInfo; - builder->append("client", lastClientInfo.clientHostAndPort); - builder->append("connectionId", lastClientInfo.connectionId); - builder->append("appName", lastClientInfo.appName); - builder->append("clientMetadata", lastClientInfo.clientMetadata); + const auto& lastClientInfo = o().lastClientInfo; + builder->append("client", lastClientInfo.clientHostAndPort); + builder->append("connectionId", lastClientInfo.connectionId); + builder->append("appName", lastClientInfo.appName); + builder->append("clientMetadata", lastClientInfo.clientMetadata); - // Append session and transaction metadata. + { + BSONObjBuilder lsid(builder->subobjStart("lsid")); + _sessionId().serialize(&lsid); + } - { - BSONObjBuilder lsid(builder->subobjStart("lsid")); - _sessionId().serialize(&lsid); + builder->append("active", sessionIsActive); } - BSONObjBuilder transactionBuilder(builder->subobjStart("transaction")); + // Append current transaction info. + + BSONObjBuilder transactionBuilder; + _reportTransactionState(opCtx, &transactionBuilder); + builder->append("transaction", transactionBuilder.obj()); +} +void TransactionRouter::Observer::_reportTransactionState(OperationContext* opCtx, + BSONObjBuilder* builder) const { { - BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters")); + BSONObjBuilder parametersBuilder(builder->subobjStart("parameters")); parametersBuilder.append("txnNumber", o().txnNumber); parametersBuilder.append("autocommit", false); if (!o().readConcernArgs.isEmpty()) { @@ -333,8 +343,6 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx, } } - // Append current transaction info. - if (_atClusterTimeHasBeenSet()) { builder->append("globalReadTimestamp", o().atClusterTime->getTime().asTimestamp()); } @@ -380,21 +388,17 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx, participantsArrayBuilder.append(participantBuilder.obj()); } - transactionBuilder.appendArray("participants", participantsArrayBuilder.obj()); + builder->appendArray("participants", participantsArrayBuilder.obj()); } if (o().commitType != CommitType::kNotInitiated) { - transactionBuilder.append("commitStartWallClockTime", - dateToISOStringLocal(timingStats.commitStartWallClockTime)); - transactionBuilder.append("commitType", commitTypeToString(o().commitType)); + builder->append("commitStartWallClockTime", + dateToISOStringLocal(timingStats.commitStartWallClockTime)); + builder->append("commitType", commitTypeToString(o().commitType)); } - transactionBuilder.append("numReadOnlyParticipants", numReadOnlyParticipants); - transactionBuilder.append("numNonReadOnlyParticipants", numNonReadOnlyParticipants); - - transactionBuilder.done(); - - builder->append("active", sessionIsActive); + builder->append("numReadOnlyParticipants", numReadOnlyParticipants); + builder->append("numNonReadOnlyParticipants", numNonReadOnlyParticipants); } bool TransactionRouter::Observer::_atClusterTimeHasBeenSet() const { diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index 10879068894..46c7faa87ca 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -266,6 +266,10 @@ public: BSONObjBuilder* builder, bool sessionIsActive) const; + // Reports the 'transaction' state of this transaction for currentOp using the provided + // builder. + void _reportTransactionState(OperationContext* opCtx, BSONObjBuilder* builder) const; + // Returns true if the atClusterTime has been changed from the default uninitialized value. bool _atClusterTimeHasBeenSet() const; diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 483865a1d1d..2d5b5a3769b 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -4358,8 +4358,6 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) { auto transactionDocument = state.getObjectField("transaction"); auto parametersDocument = transactionDocument.getObjectField("parameters"); - ASSERT_EQ(state.getField("desc").valueStringData().toString(), "active transaction"); - ASSERT_EQ(state.getField("type").valueStringData().toString(), "activeSession"); ASSERT_GTE(transactionDocument.getField("readTimestamp").timestamp(), Timestamp(0, 0)); ASSERT_EQ(dateFromISOString(transactionDocument.getField("startWallClockTime").String()), startTime); @@ -4410,7 +4408,6 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) { ASSERT_EQ(transactionDocument.getField("numNonReadOnlyParticipants").numberInt(), 1); ASSERT_EQ(transactionDocument.getField("numReadOnlyParticipants").numberInt(), 1); - ASSERT_EQ(state.getField("active").boolean(), true); ASSERT_GTE(transactionDocument.getField("timeOpenMicros").numberLong(), 0); ASSERT_GTE(transactionDocument.getField("timeActiveMicros").numberLong(), 0); ASSERT_GTE(transactionDocument.getField("timeInactiveMicros").numberLong(), 0); |