summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js27
-rw-r--r--jstests/noPassthrough/router_transaction_current_op.js162
-rw-r--r--src/mongo/s/transaction_router.cpp58
-rw-r--r--src/mongo/s/transaction_router.h4
-rw-r--r--src/mongo/s/transaction_router_test.cpp3
5 files changed, 210 insertions, 44 deletions
diff --git a/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js b/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js
index 2a7383b37ea..c8e69017336 100644
--- a/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js
+++ b/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js
@@ -77,20 +77,19 @@ var $config = extendWorkload($config, function($config, $super) {
$config.states.runCurrentOp = function runCurrentOp(db, collName) {
const admin = db.getSiblingDB("admin");
- const mongosSessionsWithTransactions =
- admin
- .aggregate([
- {
- $currentOp: {
- allUsers: true,
- idleSessions: true,
- idleConnections: true,
- localOps: true
- }
- },
- {$match: {$or: [{type: 'idleSession'}, {type: 'activeSession'}]}}
- ])
- .toArray();
+ const mongosSessionsWithTransactions = admin
+ .aggregate([
+ {
+ $currentOp: {
+ allUsers: true,
+ idleSessions: true,
+ idleConnections: true,
+ localOps: true
+ }
+ },
+ {$match: {transaction: {$exists: true}}}
+ ])
+ .toArray();
this.verifyMongosSessionsWithTxns(mongosSessionsWithTransactions);
};
diff --git a/jstests/noPassthrough/router_transaction_current_op.js b/jstests/noPassthrough/router_transaction_current_op.js
new file mode 100644
index 00000000000..dab93132455
--- /dev/null
+++ b/jstests/noPassthrough/router_transaction_current_op.js
@@ -0,0 +1,162 @@
+// Verifies currentOp returns the expected fields for idle and active transactions in basic cases.
+// More cases are covered in unit tests.
+// @tags: [uses_transactions]
+(function() {
+"use strict";
+
+load("jstests/libs/parallelTester.js"); // for Thread.
+
+function verifyCurrentOpFields(res, isActive) {
+ // Verify top level fields relevant to transactions. Note this does not include every field, so
+ // the number of fields in the response shouldn't be asserted on.
+
+ const expectedFields = [
+ "type",
+ "host",
+ "desc",
+ "connectionId",
+ "client",
+ "appName",
+ "clientMetadata",
+ "active",
+ "lsid",
+ "transaction",
+ ];
+
+ assert.hasFields(res, expectedFields, tojson(res));
+
+ if (isActive) {
+ assert.eq(res.type, "op", tojson(res));
+ } else {
+ assert.eq(res.type, "idleSession", tojson(res));
+ assert.eq(res.desc, "inactive transaction", tojson(res));
+ }
+
+ // Verify the transactions sub object.
+
+ const transaction = res.transaction;
+ const expectedTransactionsFields = [
+ "parameters",
+ "startWallClockTime",
+ "timeOpenMicros",
+ "timeActiveMicros",
+ "timeInactiveMicros",
+ "globalReadTimestamp",
+ "numParticipants",
+ "participants",
+ "numNonReadOnlyParticipants",
+ "numReadOnlyParticipants",
+ // Commit hasn't started so don't expect 'commitStartWallClockTime' or 'commitType'.
+ ];
+
+ assert.hasFields(transaction, expectedTransactionsFields, tojson(transaction));
+ assert.eq(
+ expectedTransactionsFields.length, Object.keys(transaction).length, tojson(transaction));
+
+ // Verify transaction parameters sub object.
+
+ const parameters = transaction.parameters;
+ const expectedParametersFields = [
+ "txnNumber",
+ "autocommit",
+ "readConcern",
+ ];
+
+ assert.hasFields(parameters, expectedParametersFields, tojson(parameters));
+ assert.eq(expectedParametersFields.length, Object.keys(parameters).length, tojson(parameters));
+
+ // Verify participants sub array.
+
+ const participants = transaction.participants;
+ const expectedParticipantFields = [
+ "name",
+ "coordinator",
+ // 'readOnly' will not be set until a response has been received from that participant, so
+ // it will not be present for the active transaction because of the failpoint and is handled
+ // specially.
+ ];
+
+ participants.forEach((participant) => {
+ assert.hasFields(participant, expectedParticipantFields, tojson(participant));
+ if (isActive) {
+ // 'readOnly' should not be set.
+ assert.eq(expectedParticipantFields.length,
+ Object.keys(participant).length,
+ tojson(participant));
+ } else {
+ // 'readOnly' should always be set for the inactive transaction.
+ assert.hasFields(participant, ["readOnly"], tojson(participant));
+ assert.eq(expectedParticipantFields.length + 1, // +1 for readOnly.
+ Object.keys(participant).length,
+ tojson(participant));
+ }
+ });
+}
+
+function getCurrentOpForFilter(st, matchFilter) {
+ const res = st.s.getDB("admin")
+ .aggregate([{$currentOp: {localOps: true}}, {$match: matchFilter}])
+ .toArray();
+ assert.eq(1, res.length, res);
+ return res[0];
+}
+
+const dbName = "test";
+const collName = "foo";
+const st = new ShardingTest({shards: 1, config: 1});
+
+const session = st.s.startSession();
+const sessionDB = session.getDatabase(dbName);
+
+// Insert a document to set up a collection.
+assert.commandWorked(sessionDB[collName].insert({x: 1}));
+
+jsTest.log("Inactive transaction.");
+(() => {
+ session.startTransaction({readConcern: {level: "snapshot"}});
+ assert.eq(1, sessionDB[collName].find({x: 1}).itcount());
+
+ const res = getCurrentOpForFilter(st, {"lsid.id": session.getSessionId().id});
+ verifyCurrentOpFields(res, false /* isActive */);
+
+ assert.commandWorked(session.abortTransaction_forTesting());
+})();
+
+jsTest.log("Active transaction.");
+(() => {
+ assert.commandWorked(st.rs0.getPrimary().adminCommand(
+ {configureFailPoint: "waitInFindBeforeMakingBatch", mode: "alwaysOn"}));
+
+ const txnThread = new Thread(function(host, dbName, collName) {
+ const mongosConn = new Mongo(host);
+ const threadSession = mongosConn.startSession();
+
+ threadSession.startTransaction({readConcern: {level: "snapshot"}});
+ assert.commandWorked(threadSession.getDatabase(dbName).runCommand(
+ {find: collName, filter: {}, comment: "active_txn_find"}));
+
+ assert.commandWorked(threadSession.abortTransaction_forTesting());
+ threadSession.endSession();
+ }, st.s.host, dbName, collName);
+ txnThread.start();
+
+ // Wait until we know the failpoint has been reached.
+ assert.soon(function() {
+ const filter = {"msg": "waitInFindBeforeMakingBatch"};
+ return assert.commandWorked(st.rs0.getPrimary().getDB("admin").currentOp(filter))
+ .inprog.length === 1;
+ });
+
+ // We don't know the id of the session started by the parallel thread, so use the find's comment
+ // to get its currentOp output.
+ const res = getCurrentOpForFilter(st, {"command.comment": "active_txn_find"});
+ verifyCurrentOpFields(res, true /* isActive */);
+
+ assert.commandWorked(st.rs0.getPrimary().adminCommand(
+ {configureFailPoint: "waitInFindBeforeMakingBatch", mode: "off"}));
+ txnThread.join();
+})();
+
+session.endSession();
+st.stop();
+}());
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 5149cd85809..8a57e3a05d7 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -303,29 +303,39 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx,
return;
}
- // Append relevant client metadata.
+ // Append relevant client metadata for transactions with inactive sessions. For those with
+ // active sessions, these fields will already be in the output.
- builder->append("type", sessionIsActive ? "activeSession" : "idleSession");
- builder->append("host", getHostNameCachedAndPort());
- builder->append("desc", sessionIsActive ? "active transaction" : "inactive transaction");
+ if (!sessionIsActive) {
+ builder->append("type", "idleSession");
+ builder->append("host", getHostNameCachedAndPort());
+ builder->append("desc", "inactive transaction");
- const auto& lastClientInfo = o().lastClientInfo;
- builder->append("client", lastClientInfo.clientHostAndPort);
- builder->append("connectionId", lastClientInfo.connectionId);
- builder->append("appName", lastClientInfo.appName);
- builder->append("clientMetadata", lastClientInfo.clientMetadata);
+ const auto& lastClientInfo = o().lastClientInfo;
+ builder->append("client", lastClientInfo.clientHostAndPort);
+ builder->append("connectionId", lastClientInfo.connectionId);
+ builder->append("appName", lastClientInfo.appName);
+ builder->append("clientMetadata", lastClientInfo.clientMetadata);
- // Append session and transaction metadata.
+ {
+ BSONObjBuilder lsid(builder->subobjStart("lsid"));
+ _sessionId().serialize(&lsid);
+ }
- {
- BSONObjBuilder lsid(builder->subobjStart("lsid"));
- _sessionId().serialize(&lsid);
+ builder->append("active", sessionIsActive);
}
- BSONObjBuilder transactionBuilder(builder->subobjStart("transaction"));
+ // Append current transaction info.
+
+ BSONObjBuilder transactionBuilder;
+ _reportTransactionState(opCtx, &transactionBuilder);
+ builder->append("transaction", transactionBuilder.obj());
+}
+void TransactionRouter::Observer::_reportTransactionState(OperationContext* opCtx,
+ BSONObjBuilder* builder) const {
{
- BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters"));
+ BSONObjBuilder parametersBuilder(builder->subobjStart("parameters"));
parametersBuilder.append("txnNumber", o().txnNumber);
parametersBuilder.append("autocommit", false);
if (!o().readConcernArgs.isEmpty()) {
@@ -333,8 +343,6 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx,
}
}
- // Append current transaction info.
-
if (_atClusterTimeHasBeenSet()) {
builder->append("globalReadTimestamp", o().atClusterTime->getTime().asTimestamp());
}
@@ -380,21 +388,17 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx,
participantsArrayBuilder.append(participantBuilder.obj());
}
- transactionBuilder.appendArray("participants", participantsArrayBuilder.obj());
+ builder->appendArray("participants", participantsArrayBuilder.obj());
}
if (o().commitType != CommitType::kNotInitiated) {
- transactionBuilder.append("commitStartWallClockTime",
- dateToISOStringLocal(timingStats.commitStartWallClockTime));
- transactionBuilder.append("commitType", commitTypeToString(o().commitType));
+ builder->append("commitStartWallClockTime",
+ dateToISOStringLocal(timingStats.commitStartWallClockTime));
+ builder->append("commitType", commitTypeToString(o().commitType));
}
- transactionBuilder.append("numReadOnlyParticipants", numReadOnlyParticipants);
- transactionBuilder.append("numNonReadOnlyParticipants", numNonReadOnlyParticipants);
-
- transactionBuilder.done();
-
- builder->append("active", sessionIsActive);
+ builder->append("numReadOnlyParticipants", numReadOnlyParticipants);
+ builder->append("numNonReadOnlyParticipants", numNonReadOnlyParticipants);
}
bool TransactionRouter::Observer::_atClusterTimeHasBeenSet() const {
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 10879068894..46c7faa87ca 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -266,6 +266,10 @@ public:
BSONObjBuilder* builder,
bool sessionIsActive) const;
+ // Reports the 'transaction' state of this transaction for currentOp using the provided
+ // builder.
+ void _reportTransactionState(OperationContext* opCtx, BSONObjBuilder* builder) const;
+
// Returns true if the atClusterTime has been changed from the default uninitialized value.
bool _atClusterTimeHasBeenSet() const;
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 483865a1d1d..2d5b5a3769b 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -4358,8 +4358,6 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) {
auto transactionDocument = state.getObjectField("transaction");
auto parametersDocument = transactionDocument.getObjectField("parameters");
- ASSERT_EQ(state.getField("desc").valueStringData().toString(), "active transaction");
- ASSERT_EQ(state.getField("type").valueStringData().toString(), "activeSession");
ASSERT_GTE(transactionDocument.getField("readTimestamp").timestamp(), Timestamp(0, 0));
ASSERT_EQ(dateFromISOString(transactionDocument.getField("startWallClockTime").String()),
startTime);
@@ -4410,7 +4408,6 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) {
ASSERT_EQ(transactionDocument.getField("numNonReadOnlyParticipants").numberInt(), 1);
ASSERT_EQ(transactionDocument.getField("numReadOnlyParticipants").numberInt(), 1);
- ASSERT_EQ(state.getField("active").boolean(), true);
ASSERT_GTE(transactionDocument.getField("timeOpenMicros").numberLong(), 0);
ASSERT_GTE(transactionDocument.getField("timeActiveMicros").numberLong(), 0);
ASSERT_GTE(transactionDocument.getField("timeInactiveMicros").numberLong(), 0);