diff options
17 files changed, 682 insertions, 122 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns.yml index 63f8d191d52..149f3865ffc 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns.yml @@ -181,6 +181,8 @@ selector: exclude_with_any_tags: - requires_replication - requires_non_retryable_writes + # Curop requires readConcern local. + - uses_curop_agg_stage executor: archive: diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml index 89d0553cd7d..c7b716ed938 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_with_stepdowns_and_balancer.yml @@ -183,6 +183,8 @@ selector: exclude_with_any_tags: - requires_replication - requires_non_retryable_writes + # Curop requires readConcern local. + - uses_curop_agg_stage executor: archive: diff --git a/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js b/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js new file mode 100644 index 00000000000..cdeaaa9375b --- /dev/null +++ b/jstests/concurrency/fsm_workloads/multi_statement_transaction_current_op.js @@ -0,0 +1,147 @@ +'use strict'; + +/** + * Runs update, findAndModify, delete, find, and getMore in a transaction with all threads using the + * same session. + * + * @tags: [ + * assumes_snapshot_transactions, + * requires_sharding, + * state_functions_share_transaction, + * uses_curop_agg_stage, + * uses_transactions + * ] + */ + +load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload +load('jstests/concurrency/fsm_workloads/multi_statement_transaction_all_commands.js'); // for + // $config + +var $config = extendWorkload($config, function($config, $super) { + $config.data.verifyMongosSessionsWithTxns = function verifyMongosSessionsWithTxns(sessions) { + const acceptableReadConcernLevels = ['snapshot', 'local']; + sessions.forEach((session) => { + jsTestLog("xxx here is session: " + tojson(session)); + const transactionDocument = session.transaction; + + assertAlways.gte(transactionDocument.parameters.txnNumber, 0); + assertAlways.eq(transactionDocument.parameters.autocommit, false); + if (transactionDocument.parameters.readConcern !== undefined) { + assertAlways(acceptableReadConcernLevels.includes( + transactionDocument.parameters.readConcern.level)); + } + if (transactionDocument.globalReadTimestamp !== undefined) { + assertAlways.gt(transactionDocument.globalReadTimestamp, Timestamp(0, 0)); + } + assertAlways.gt(ISODate(transactionDocument.startWallClockTime), + ISODate("1970-01-01T00:00:00.000Z")); + + if (transactionDocument.numParticipants > 0) { + const participants = transactionDocument.participants; + assertAlways.eq(transactionDocument.numParticipants, participants.length); + + let hasCoordinator = false; + let numNonReadOnly = 0; + let numReadOnly = 0; + participants.forEach((participant) => { + if (participant.coordinator) { + assertAlways.eq(hasCoordinator, false); + hasCoordinator = true; + } + + if (participant.hasOwnProperty('readOnly')) { + if (participant.readOnly) { + ++numReadOnly; + } else { + ++numNonReadOnly; + } + } + }); + + assertAlways.eq(hasCoordinator, true); + assertAlways.eq(transactionDocument.numNonReadOnlyParticipants, numNonReadOnly); + assertAlways.eq(transactionDocument.numReadOnlyParticipants, numReadOnly); + } + }); + }; + + $config.states.runCurrentOp = function runCurrentOp(db, collName) { + const admin = db.getSiblingDB("admin"); + const mongosSessionsWithTransactions = + admin + .aggregate([ + { + $currentOp: { + allUsers: true, + idleSessions: true, + idleConnections: true, + localOps: true + } + }, + {$match: {$or: [{type: 'idleSession'}, {type: 'activeSession'}]}} + ]) + .toArray(); + + this.verifyMongosSessionsWithTxns(mongosSessionsWithTransactions); + }; + + $config.transitions = { + init: { + runCurrentOp: .2, + runFindAndModify: .16, + runUpdate: .16, + runDelete: .16, + runFindAndGetMore: .16, + commitTxn: .16 + }, + runCurrentOp: { + runCurrentOp: .2, + runFindAndModify: .16, + runUpdate: .16, + runDelete: .16, + runFindAndGetMore: .16, + commitTxn: .16 + }, + runFindAndModify: { + runCurrentOp: .2, + runFindAndModify: .16, + runUpdate: .16, + runDelete: .16, + runFindAndGetMore: .16, + commitTxn: .16 + }, + runUpdate: { + runCurrentOp: .2, + runFindAndModify: .16, + runUpdate: .16, + runDelete: .16, + runFindAndGetMore: .16, + commitTxn: .16 + }, + runDelete: { + runCurrentOp: .2, + runFindAndModify: .16, + runUpdate: .16, + runDelete: .16, + runFindAndGetMore: .16, + commitTxn: .16 + }, + runFindAndGetMore: { + runCurrentOp: .2, + runFindAndModify: .16, + runUpdate: .16, + runDelete: .16, + runFindAndGetMore: .16, + commitTxn: .16 + }, + commitTxn: { + runCurrentOp: .1, + runFindAndModify: .225, + runUpdate: .225, + runDelete: .225, + runFindAndGetMore: .225 + }, + }; + + return $config; +}); diff --git a/jstests/sharding/aggregation_currentop.js b/jstests/sharding/aggregation_currentop.js index a3a9d9c32e2..4e4479ff4fe 100644 --- a/jstests/sharding/aggregation_currentop.js +++ b/jstests/sharding/aggregation_currentop.js @@ -592,14 +592,6 @@ function runLocalOpsTests(conn) { runLocalOpsTests(mongosConn); runLocalOpsTests(shardConn); -// -// Stashed transactions tests. -// - -// Test that $currentOp will display stashed transaction locks if 'idleSessions' is true, and -// will only permit a user to view other users' sessions if the caller possesses the 'inprog' -// privilege and 'allUsers' is true. -const userNames = ["user_inprog", "admin", "user_no_inprog"]; let sessionDBs = []; let sessions = []; @@ -615,83 +607,117 @@ function sessionFilter() { }; } -for (let i in userNames) { - shardAdminDB.logout(); - assert(shardAdminDB.auth(userNames[i], "pwd")); - - // Create a session for this user. - const session = shardAdminDB.getMongo().startSession(); - - // For each session, start but do not complete a transaction. - const sessionDB = session.getDatabase(shardTestDB.getName()); - assert.commandWorked(sessionDB.runCommand({ - insert: "test", - documents: [{_id: `txn-insert-${userNames[i]}-${i}`}], - readConcern: {level: "snapshot"}, - txnNumber: NumberLong(i), - startTransaction: true, - autocommit: false - })); - sessionDBs.push(sessionDB); - sessions.push(session); +// +// Idle sessions tests +// - // Use $currentOp to confirm that the incomplete transactions have stashed their locks while - // inactive, and that each user can only view their own sessions with 'allUsers:false'. - assert.eq( - shardAdminDB - .aggregate( - [{$currentOp: {allUsers: false, idleSessions: true}}, {$match: sessionFilter()}]) - .itcount(), - 1); -} +// Runs a suite of tests to verify idle session behavior with transactions. +// 1. For the mongos connection, verifies that idle transactions are only shown with +// 'idleSessions' and 'localOps' set to true. +// 2. For the shard connection, verifies that stashed transaction locks are displayed only if +// 'idleSessions' is set to true. +function runIdleSessionsTests(conn, adminDB, txnDB, useLocalOps) { + // Test that $currentOp will display idle transactions if 'idleSessions' is true, and will + // only permit a user to view other users' sessions if the caller possesses the 'inprog' + // privilege and 'allUsers' is true. + const userNames = ["user_inprog", "admin", "user_no_inprog"]; + + sessionDBs = []; + sessions = []; + + for (let i in userNames) { + adminDB.logout(); + assert(adminDB.auth(userNames[i], "pwd")); + + // Create a session for this user. + const session = adminDB.getMongo().startSession(); + + // For each session, start but do not complete a transaction. + const sessionDB = session.getDatabase(txnDB.getName()); + assert.commandWorked(sessionDB.runCommand({ + insert: "test", + documents: [{_id: `txn-insert-${conn}-${userNames[i]}-${i}`}], + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(i), + startTransaction: true, + autocommit: false + })); + sessionDBs.push(sessionDB); + sessions.push(session); + + // Use $currentOp to confirm that each user can only view their own sessions with + // 'allUsers:false'. + assert.eq( + adminDB + .aggregate([ + {$currentOp: {allUsers: false, idleSessions: true, localOps: useLocalOps}}, + {$match: sessionFilter()} + ]) + .itcount(), + 1); + } + + // Log in as 'user_no_inprog' to verify that the user cannot view other users' sessions via + // 'allUsers:true'. + adminDB.logout(); + assert(adminDB.auth("user_no_inprog", "pwd")); -// Log in as 'user_no_inprog' to verify that the user cannot view other users' sessions via -// 'allUsers:true'. -shardAdminDB.logout(); -assert(shardAdminDB.auth("user_no_inprog", "pwd")); + assert.commandFailedWithCode(adminDB.runCommand({ + aggregate: 1, + cursor: {}, + pipeline: [ + {$currentOp: {allUsers: true, idleSessions: true, localOps: useLocalOps}}, + {$match: sessionFilter()} + ] + }), + ErrorCodes.Unauthorized); -assert.commandFailedWithCode(shardAdminDB.runCommand({ - aggregate: 1, - cursor: {}, - pipeline: [{$currentOp: {allUsers: true, idleSessions: true}}, {$match: sessionFilter()}] -}), - ErrorCodes.Unauthorized); + // Log in as 'user_inprog' to confirm that a user with the 'inprog' privilege can see all + // three idle/stashed transactions with 'allUsers:true'. + adminDB.logout(); + assert(adminDB.auth("user_inprog", "pwd")); -// Log in as 'user_inprog' to confirm that a user with the 'inprog' privilege can see all three -// stashed transactions with 'allUsers:true'. -shardAdminDB.logout(); -assert(shardAdminDB.auth("user_inprog", "pwd")); + assert.eq(adminDB + .aggregate([ + {$currentOp: {allUsers: true, idleSessions: true, localOps: useLocalOps}}, + {$match: sessionFilter()} + ]) + .itcount(), + 3); -assert.eq( - shardAdminDB - .aggregate([{$currentOp: {allUsers: true, idleSessions: true}}, {$match: sessionFilter()}]) - .itcount(), - 3); + // Confirm that the 'idleSessions' parameter defaults to true. + assert.eq( + adminDB + .aggregate( + [{$currentOp: {allUsers: true, localOps: useLocalOps}}, {$match: sessionFilter()}]) + .itcount(), + 3); -// Confirm that the 'idleSessions' parameter defaults to true. -assert.eq( - shardAdminDB.aggregate([{$currentOp: {allUsers: true}}, {$match: sessionFilter()}]).itcount(), - 3); + // Confirm that idleSessions:false omits the idle/stashed transactions from the report. + assert.eq(adminDB + .aggregate([ + {$currentOp: {allUsers: true, idleSessions: false, localOps: useLocalOps}}, + {$match: sessionFilter()} + ]) + .itcount(), + 0); -// Confirm that idleSessions:false omits the stashed locks from the report. -assert.eq( - shardAdminDB - .aggregate([{$currentOp: {allUsers: true, idleSessions: false}}, {$match: sessionFilter()}]) - .itcount(), - 0); - -// Allow all transactions to complete and close the associated sessions. -for (let i in userNames) { - assert(shardAdminDB.auth(userNames[i], "pwd")); - assert.commandWorked(sessionDBs[i].adminCommand({ - commitTransaction: 1, - txnNumber: NumberLong(i), - autocommit: false, - writeConcern: {w: 'majority'} - })); - sessions[i].endSession(); + // Cancel all transactions and close the associated sessions. + for (let i in userNames) { + assert(adminDB.auth(userNames[i], "pwd")); + assert.commandWorked(sessionDBs[i].adminCommand({ + commitTransaction: 1, + txnNumber: NumberLong(i), + autocommit: false, + writeConcern: {w: 'majority'} + })); + sessions[i].endSession(); + } } +runIdleSessionsTests(mongosConn, clusterAdminDB, clusterTestDB, true); +runIdleSessionsTests(shardConn, shardAdminDB, shardTestDB, false); + // // No-auth tests. // diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index c3d8fbb223d..321b0085518 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -531,7 +531,6 @@ env.Library( 'retryable_writes_stats.cpp', 'server_transactions_metrics.cpp', 'session_catalog_mongod.cpp', - 'single_transaction_stats.cpp', 'transaction_history_iterator.cpp', 'transaction_metrics_observer.cpp', 'transaction_participant.cpp', @@ -559,6 +558,7 @@ env.Library( 'query_exec', 'stats/fill_locker_info', 'stats/top', + 'stats/transaction_stats', 'update/update_driver', ] ) @@ -1818,6 +1818,7 @@ envWithAsio.CppUnitTest( 'snapshot_window_util', 'startup_warnings_mongod', 'stats/fill_locker_info', + 'stats/transaction_stats', 'time_proof_service', 'transaction', 'update_index_data', diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 41963051c7a..b34cd5987d7 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -271,7 +271,10 @@ env.Library( '$BUILD_DIR/mongo/s/query/cluster_query', 'mongo_process_common', 'sharded_agg_helpers', - ] + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/session_catalog', + ], ) env.Library( diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 05395e2c7ca..4e121045c23 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -48,6 +48,7 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_exec_stage.h" +#include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" namespace mongo { @@ -250,9 +251,44 @@ BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx, (backtraceMode == CurrentOpBacktraceMode::kIncludeBacktrace), &builder); + OperationContext* clientOpCtx = client->getOperationContext(); + + if (clientOpCtx) { + if (auto txnRouter = TransactionRouter::get(clientOpCtx)) { + txnRouter.reportState(clientOpCtx, &builder, true /* sessionIsActive */); + } + } + return builder.obj(); } +void MongoSInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector<BSONObj>* ops) const { + auto sessionCatalog = SessionCatalog::get(opCtx); + + const bool authEnabled = + AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled(); + + // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to + // create a pattern that will match against all authenticated usernames for the current client. + // If the user is listing ops for all users, we create an empty pattern; constructing an + // instance of SessionKiller::Matcher with this empty pattern will return all sessions. + auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers + ? makeSessionFilterForAuthenticatedUsers(opCtx) + : KillAllSessionsByPatternSet{{}}); + + sessionCatalog->scanSessions({std::move(sessionFilter)}, [&](const ObservableSession& session) { + if (!session.currentOperation()) { + auto op = + TransactionRouter::get(session).reportState(opCtx, false /* sessionIsActive */); + if (!op.isEmpty()) { + ops->emplace_back(op); + } + } + }); +} + std::vector<GenericCursor> MongoSInterface::getIdleCursors( const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { invariant(hasGlobalServiceContext()); diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 50d1149b3b9..163743d591c 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -254,10 +254,7 @@ protected: void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const final { - // This implementation is a no-op, since mongoS does not maintain a SessionCatalog or - // hold stashed locks for idle sessions. - } + std::vector<BSONObj>* ops) const final; }; } // namespace mongo diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 55b0c331ad4..f3954651690 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -159,6 +159,7 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, if (matcher.match(it->first)) { auto& sri = it->second; ObservableSession osession(lg, sri->session); + workerFn(osession); if (osession._markedForReap && !osession._killed() && diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript index 88372bb685e..594233298e2 100644 --- a/src/mongo/db/stats/SConscript +++ b/src/mongo/db/stats/SConscript @@ -58,6 +58,17 @@ env.Library( ) env.Library( + target="transaction_stats", + source=[ + "single_transaction_stats.cpp" + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/repl/read_concern_args', + ], +) + +env.Library( target='fill_locker_info', source=[ 'fill_locker_info.cpp', diff --git a/src/mongo/db/single_transaction_stats.cpp b/src/mongo/db/stats/single_transaction_stats.cpp index 66b8539553c..386a2568d45 100644 --- a/src/mongo/db/single_transaction_stats.cpp +++ b/src/mongo/db/stats/single_transaction_stats.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/single_transaction_stats.h" +#include "mongo/db/stats/single_transaction_stats.h" namespace mongo { diff --git a/src/mongo/db/single_transaction_stats.h b/src/mongo/db/stats/single_transaction_stats.h index 0afdb7f3d24..0afdb7f3d24 100644 --- a/src/mongo/db/single_transaction_stats.h +++ b/src/mongo/db/stats/single_transaction_stats.h diff --git a/src/mongo/db/transaction_metrics_observer.h b/src/mongo/db/transaction_metrics_observer.h index f4cafd1091c..634792198c0 100644 --- a/src/mongo/db/transaction_metrics_observer.h +++ b/src/mongo/db/transaction_metrics_observer.h @@ -31,7 +31,7 @@ #include "mongo/db/curop.h" #include "mongo/db/server_transactions_metrics.h" -#include "mongo/db/single_transaction_stats.h" +#include "mongo/db/stats/single_transaction_stats.h" #include "mongo/db/stats/top.h" namespace mongo { diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 29f4f0e31c4..897637b9f32 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -46,7 +46,7 @@ #include "mongo/db/session.h" #include "mongo/db/session_catalog.h" #include "mongo/db/session_txn_record_gen.h" -#include "mongo/db/single_transaction_stats.h" +#include "mongo/db/stats/single_transaction_stats.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/transaction_metrics_observer.h" diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 5d7d08fbafb..9de327e0d45 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -52,6 +52,7 @@ #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/net/socket_utils.h" namespace mongo { namespace { @@ -281,6 +282,126 @@ TransactionRouter::Participant::Participant(bool inIsCoordinator, sharedOptions(std::move(inSharedOptions)), stmtIdCreatedAt(inStmtIdCreatedAt) {} +BSONObj TransactionRouter::Observer::reportState(OperationContext* opCtx, + bool sessionIsActive) const { + BSONObjBuilder builder; + reportState(opCtx, &builder, sessionIsActive); + return builder.obj(); +} + +void TransactionRouter::Observer::reportState(OperationContext* opCtx, + BSONObjBuilder* builder, + bool sessionIsActive) const { + _reportState(opCtx, builder, sessionIsActive); +} + +void TransactionRouter::Observer::_reportState(OperationContext* opCtx, + BSONObjBuilder* builder, + bool sessionIsActive) const { + if (o().txnNumber == kUninitializedTxnNumber) { + // This transaction router is not yet initialized. + return; + } + + // Append relevant client metadata. + + builder->append("type", sessionIsActive ? "activeSession" : "idleSession"); + builder->append("host", getHostNameCachedAndPort()); + builder->append("desc", sessionIsActive ? "active transaction" : "inactive transaction"); + + const auto& lastClientInfo = o().lastClientInfo; + builder->append("client", lastClientInfo.clientHostAndPort); + builder->append("connectionId", lastClientInfo.connectionId); + builder->append("appName", lastClientInfo.appName); + builder->append("clientMetadata", lastClientInfo.clientMetadata); + + // Append session and transaction metadata. + + { + BSONObjBuilder lsid(builder->subobjStart("lsid")); + _sessionId().serialize(&lsid); + } + + BSONObjBuilder transactionBuilder(builder->subobjStart("transaction")); + + { + BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters")); + parametersBuilder.append("txnNumber", o().txnNumber); + parametersBuilder.append("autocommit", false); + if (!o().readConcernArgs.isEmpty()) { + o().readConcernArgs.appendInfo(¶metersBuilder); + } + } + + // Append current transaction info. + + if (_atClusterTimeHasBeenSet()) { + builder->append("globalReadTimestamp", o().atClusterTime->getTime().asTimestamp()); + } + + const auto& timingStats = o().timingStats; + + builder->append("startWallClockTime", dateToISOStringLocal(timingStats.startWallClockTime)); + + auto tickSource = opCtx->getServiceContext()->getTickSource(); + auto curTicks = tickSource->getTicks(); + + builder->append("timeOpenMicros", + durationCount<Microseconds>(timingStats.getDuration(tickSource, curTicks))); + + // TODO SERVER-41376: Log timeActiveMicros + + // TODO SERVER-41376: Log timeInactiveMicros + + int numReadOnlyParticipants = 0; + int numNonReadOnlyParticipants = 0; + + // We don't know the participants if we're recovering the commit. + if (o().commitType != CommitType::kRecoverWithToken) { + builder->append("numParticipants", static_cast<int>(o().participants.size())); + + BSONArrayBuilder participantsArrayBuilder; + for (auto const& participantPair : o().participants) { + BSONObjBuilder participantBuilder; + participantBuilder.append("name", participantPair.first); + participantBuilder.append("coordinator", participantPair.second.isCoordinator); + + if (participantPair.second.readOnly == Participant::ReadOnly::kReadOnly) { + participantBuilder.append("readOnly", true); + ++numReadOnlyParticipants; + } else if (participantPair.second.readOnly == Participant::ReadOnly::kNotReadOnly) { + participantBuilder.append("readOnly", false); + ++numNonReadOnlyParticipants; + } + participantsArrayBuilder.append(participantBuilder.obj()); + } + + transactionBuilder.appendArray("participants", participantsArrayBuilder.obj()); + } + + if (o().commitType != CommitType::kNotInitiated) { + transactionBuilder.append("commitStartWallClockTime", + dateToISOStringLocal(timingStats.commitStartWallClockTime)); + transactionBuilder.append("commitType", commitTypeToString(o().commitType)); + } + + transactionBuilder.append("numReadOnlyParticipants", numReadOnlyParticipants); + transactionBuilder.append("numNonReadOnlyParticipants", numNonReadOnlyParticipants); + + transactionBuilder.done(); + + builder->append("active", sessionIsActive); +} + +bool TransactionRouter::Observer::_atClusterTimeHasBeenSet() const { + return o().atClusterTime.is_initialized() && o().atClusterTime->timeHasBeenSet(); +} + +const LogicalSessionId& TransactionRouter::Observer::_sessionId() const { + const auto* owningSession = getTransactionRouter.owner(_tr); + return owningSession->getSessionId(); +} + BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded( BSONObj cmd, bool isFirstStatementInThisParticipant) const { bool hasStartTxn = false; @@ -503,6 +624,8 @@ TransactionRouter::Participant& TransactionRouter::Router::_createParticipant( void TransactionRouter::Router::_setReadOnlyForParticipant(OperationContext* opCtx, const ShardId& shard, const Participant::ReadOnly readOnly) { + invariant(readOnly != Participant::ReadOnly::kUnset); + const auto iter = o().participants.find(shard.toString()); invariant(iter != o().participants.end()); const auto currentParticipant = iter->second; @@ -686,10 +809,6 @@ void TransactionRouter::Router::setDefaultAtClusterTime(OperationContext* opCtx) opCtx, repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime(), defaultTime); } -bool TransactionRouter::Router::_atClusterTimeHasBeenSet() const { - return o().atClusterTime.is_initialized() && o().atClusterTime->timeHasBeenSet(); -} - void TransactionRouter::Router::_setAtClusterTime( OperationContext* opCtx, const boost::optional<LogicalTime>& afterClusterTime, @@ -730,12 +849,13 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, repl::ReadConcernArgs::get(opCtx).isEmpty()); repl::ReadConcernArgs::get(opCtx) = o().readConcernArgs; + ++p().latestStmtId; - return; + break; } case TransactionActions::kCommit: ++p().latestStmtId; - return; + break; } } else if (txnNumber > o().txnNumber) { // This is a newer transaction. @@ -764,7 +884,7 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, _onNewTransaction(opCtx); LOG(3) << txnIdToString() << " New transaction started"; - return; + break; } case TransactionActions::kContinue: { uasserted(ErrorCodes::NoSuchTransaction, @@ -780,16 +900,12 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, _onBeginRecoveringDecision(opCtx); LOG(3) << txnIdToString() << " Commit recovery started"; - return; + break; } }; } - MONGO_UNREACHABLE; -} -const LogicalSessionId& TransactionRouter::Router::_sessionId() const { - const auto* owningSession = getTransactionRouter.owner(_tr); - return owningSession->getSessionId(); + _updateLastClientInfo(opCtx->getClient()); } BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext* opCtx) { @@ -919,6 +1035,7 @@ BSONObj TransactionRouter::Router::_commitTransaction( _onStartCommit(lk, opCtx); } + return sendCommitDirectlyToShards(opCtx, {shardId}); } @@ -1090,6 +1207,11 @@ void TransactionRouter::Router::_resetRouterState(OperationContext* opCtx, o(lk).timingStats = TimingStats(); p().terminationInitiated = false; + auto tickSource = opCtx->getServiceContext()->getTickSource(); + o(lk).timingStats.startTime = tickSource->getTicks(); + o(lk).timingStats.startWallClockTime = + opCtx->getServiceContext()->getPreciseClockSource()->now(); + // TODO SERVER-37115: Parse statement ids from the client and remember the statement id // of the command that started the transaction, if one was included. p().latestStmtId = kDefaultFirstStmtId; @@ -1144,7 +1266,10 @@ std::string TransactionRouter::Router::_transactionInfoForLog( parametersBuilder.append("txnNumber", o().txnNumber); parametersBuilder.append("autocommit", false); - o().readConcernArgs.appendInfo(¶metersBuilder); + + if (!o().readConcernArgs.isEmpty()) { + o().readConcernArgs.appendInfo(¶metersBuilder); + } sb << "parameters:" << parametersBuilder.obj().toString() << ","; @@ -1187,9 +1312,9 @@ std::string TransactionRouter::Router::_transactionInfoForLog( << ","; } - // TODO SERVER-40985: Log timeActiveMicros + // TODO SERVER-41376: Log timeActiveMicros - // TODO SERVER-40985: Log timeInactiveMicros + // TODO SERVER-41376: Log timeInactiveMicros // Total duration of the transaction. Logged at the end of the line for consistency with slow // command logging. @@ -1199,23 +1324,11 @@ std::string TransactionRouter::Router::_transactionInfoForLog( } void TransactionRouter::Router::_onNewTransaction(OperationContext* opCtx) { - auto tickSource = opCtx->getServiceContext()->getTickSource(); - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - o(lk).timingStats.startTime = tickSource->getTicks(); - } - auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); routerTxnMetrics->incrementTotalStarted(); } void TransactionRouter::Router::_onBeginRecoveringDecision(OperationContext* opCtx) { - auto tickSource = opCtx->getServiceContext()->getTickSource(); - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - o(lk).timingStats.startTime = tickSource->getTicks(); - } - auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); routerTxnMetrics->incrementTotalStarted(); } @@ -1260,7 +1373,11 @@ void TransactionRouter::Router::_onStartCommit(WithLock wl, OperationContext* op } auto tickSource = opCtx->getServiceContext()->getTickSource(); - o(wl).timingStats.commitStartTime = tickSource->getTicks(); + { + o(wl).timingStats.commitStartTime = tickSource->getTicks(); + o(wl).timingStats.commitStartWallClockTime = + opCtx->getServiceContext()->getPreciseClockSource()->now(); + } auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); routerTxnMetrics->incrementCommitInitiated(o().commitType); @@ -1323,6 +1440,11 @@ void TransactionRouter::Router::_endTransactionTrackingIfNecessary( } } +void TransactionRouter::Router::_updateLastClientInfo(Client* client) { + stdx::lock_guard<Client> lk(*client); + o(lk).lastClientInfo.update(client); +} + Microseconds TransactionRouter::TimingStats::getDuration(TickSource* tickSource, TickSource::Tick curTicks) const { invariant(startTime > 0); diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index cf225ef0f9c..3f1e45d5fa3 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -36,6 +36,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/session_catalog.h" +#include "mongo/db/stats/single_transaction_stats.h" #include "mongo/s/async_requests_sender.h" #include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" @@ -133,10 +134,18 @@ public: */ Microseconds getCommitDuration(TickSource* tickSource, TickSource::Tick curTicks) const; + // The start time of the transaction in millisecond resolution. Used only for diagnostics + // reporting. + Date_t startWallClockTime; + // The start time of the transaction. Note that tick values should only ever be used to // measure distance from other tick values, not for reporting absolute wall clock time. TickSource::Tick startTime{0}; + // The start time of the transaction commit in millisecond resolution. Used only for + // diagnostics reporting. + Date_t commitStartWallClockTime; + // When commit was started. TickSource::Tick commitStartTime{0}; @@ -197,6 +206,18 @@ public: public: explicit Observer(const ObservableSession& session); + /** + * Report the current state of an session. The sessionIsActive boolean indicates whether + * the session and transaction are currently active. + * + * The Client lock for the given OperationContext must be held when calling this method in + * the case where sessionIsActive is true. + */ + BSONObj reportState(OperationContext* opCtx, bool sessionIsActive) const; + void reportState(OperationContext* opCtx, + BSONObjBuilder* builder, + bool sessionIsActive) const; + protected: explicit Observer(TransactionRouter* tr) : _tr(tr) {} @@ -204,6 +225,17 @@ public: return _tr->_o; } + // Reports the current state of the session using the provided builder. + void _reportState(OperationContext* opCtx, + BSONObjBuilder* builder, + bool sessionIsActive) const; + + // Returns true if the atClusterTime has been changed from the default uninitialized value. + bool _atClusterTimeHasBeenSet() const; + + // Shortcut to obtain the id of the session under which this transaction router runs + const LogicalSessionId& _sessionId() const; + TransactionRouter* _tr; }; // class Observer @@ -408,11 +440,6 @@ public: BSONObj _handOffCommitToCoordinator(OperationContext* opCtx); /** - * Returns true if the atClusterTime has been changed from the default uninitialized value. - */ - bool _atClusterTimeHasBeenSet() const; - - /** * Sets the given logical time as the atClusterTime for the transaction to be the greater of * the given time and the user's afterClusterTime, if one was provided. */ @@ -515,8 +542,15 @@ public: std::string _transactionInfoForLog(OperationContext* opCtx, TerminationCause terminationCause) const; - // Shortcut to obtain the id of the session under which this transaction router runs - const LogicalSessionId& _sessionId() const; + /** + * Returns the LastClientInfo object. + */ + const SingleTransactionStats::LastClientInfo& _getLastClientInfo() const; + + /** + * Updates the LastClientInfo object with the given Client's information. + */ + void _updateLastClientInfo(Client* client); TransactionRouter::PrivateState& p() { return _tr->_p; @@ -577,6 +611,10 @@ private: // Stats used for calculating durations for the active transaction. TimingStats timingStats; + + // Information about the last client to run a transaction operation on this transaction + // router. + SingleTransactionStats::LastClientInfo lastClientInfo; } _o; /** diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 36d4e3cb996..db099d84789 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction + #include "mongo/platform/basic.h" #include <map> @@ -46,7 +48,10 @@ #include "mongo/s/transaction_router.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/clock_source_mock.h" #include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/net/socket_utils.h" #include "mongo/util/tick_source_mock.h" namespace mongo { @@ -2798,6 +2803,14 @@ protected: return dynamic_cast<TickSourceMock<Microseconds>*>(getServiceContext()->getTickSource()); } + /** + * Set up and return a mock clock source. + */ + ClockSourceMock* preciseClockSource() { + getServiceContext()->setPreciseClockSource(std::make_unique<ClockSourceMock>()); + return dynamic_cast<ClockSourceMock*>(getServiceContext()->getPreciseClockSource()); + } + TransactionRouter::Router txnRouter() { return TransactionRouter::get(operationContext()); } @@ -3130,7 +3143,7 @@ TEST_F(TransactionRouterMetricsTest, SlowLoggingReadConcern_None) { beginSlowTxnWithDefaultTxnNumber(); runCommit(kDummyOkRes); - ASSERT_EQUALS(1, countLogLinesContaining(readConcern.toBSON()["readConcern"])); + ASSERT_EQUALS(0, countLogLinesContaining(readConcern.toBSON()["readConcern"])); ASSERT_EQUALS(0, countLogLinesContaining("globalReadTimestamp:")); } @@ -3961,5 +3974,166 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCommitTypeStatsSuccessfulDurat .successfulDurationMicros.load()); } +TEST_F(TransactionRouterMetricsTest, ReportResources) { + // Create client and read concern metadata. + BSONObjBuilder builder; + ASSERT_OK(ClientMetadata::serializePrivate("driverName", + "driverVersion", + "osType", + "osName", + "osArchitecture", + "osVersion", + "appName", + &builder)); + + auto obj = builder.obj(); + auto clientMetadata = ClientMetadata::parse(obj["client"]); + auto& clientMetadataIsMasterState = + ClientMetadataIsMasterState::get(operationContext()->getClient()); + clientMetadataIsMasterState.setClientMetadata(operationContext()->getClient(), + std::move(clientMetadata.getValue())); + + repl::ReadConcernArgs readConcernArgs; + ASSERT_OK( + readConcernArgs.initialize(BSON("find" + << "test" << repl::ReadConcernArgs::kReadConcernFieldName + << BSON(repl::ReadConcernArgs::kLevelFieldName + << "snapshot")))); + repl::ReadConcernArgs::get(operationContext()) = readConcernArgs; + + auto clockSource = preciseClockSource(); + auto startTime = Date_t::now(); + clockSource->reset(startTime); + + beginTxnWithDefaultTxnNumber(); + + // Verify reported parameters match expectations. + auto state = txnRouter().reportState(operationContext(), false /* sessionIsActive */); + auto transactionDocument = state.getObjectField("transaction"); + + auto parametersDocument = transactionDocument.getObjectField("parameters"); + ASSERT_EQ(parametersDocument.getField("txnNumber").numberLong(), kTxnNumber); + ASSERT_EQ(parametersDocument.getField("autocommit").boolean(), false); + ASSERT_BSONELT_EQ(parametersDocument.getField("readConcern"), + readConcernArgs.toBSON().getField("readConcern")); + + ASSERT_GTE(transactionDocument.getField("readTimestamp").timestamp(), Timestamp(0, 0)); + ASSERT_EQ( + dateFromISOString(transactionDocument.getField("startWallClockTime").valueStringData()) + .getValue(), + startTime); + ASSERT_GTE(transactionDocument.getField("timeOpenMicros").numberLong(), 0); + ASSERT_EQ(transactionDocument.getField("numNonReadOnlyParticipants").numberInt(), 0); + ASSERT_EQ(transactionDocument.getField("numReadOnlyParticipants").numberInt(), 0); + + + ASSERT_EQ(state.getField("host").valueStringData().toString(), getHostNameCachedAndPort()); + ASSERT_EQ(state.getField("desc").valueStringData().toString(), "inactive transaction"); + ASSERT_BSONOBJ_EQ(state.getField("lsid").Obj(), getSessionId().toBSON()); + ASSERT_EQ(state.getField("client").valueStringData().toString(), ""); + ASSERT_EQ(state.getField("connectionId").numberLong(), 0); + ASSERT_EQ(state.getField("appName").valueStringData().toString(), "appName"); + ASSERT_BSONOBJ_EQ(state.getField("clientMetadata").Obj(), obj.getField("client").Obj()); + ASSERT_EQ(state.getField("active").boolean(), false); +} + +TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) { + auto clockSource = preciseClockSource(); + auto startTime = Date_t::now(); + clockSource->reset(startTime); + + beginTxnWithDefaultTxnNumber(); + txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {}); + + auto state = txnRouter().reportState(operationContext(), true /* sessionIsActive */); + auto transactionDocument = state.getObjectField("transaction"); + auto parametersDocument = transactionDocument.getObjectField("parameters"); + + ASSERT_EQ(state.getField("desc").valueStringData().toString(), "active transaction"); + ASSERT_EQ(state.getField("type").valueStringData().toString(), "activeSession"); + ASSERT_GTE(transactionDocument.getField("readTimestamp").timestamp(), Timestamp(0, 0)); + ASSERT_EQ(dateFromISOString(transactionDocument.getField("startWallClockTime").String()), + startTime); + + // Verify participants array matches expected values. + + auto participantComp = [](const BSONElement& a, const BSONElement& b) { + return a.Obj().getField("name").String() < b.Obj().getField("name").String(); + }; + + auto participantArray = transactionDocument.getField("participants").Array(); + ASSERT_EQ(participantArray.size(), 2U); + std::sort(participantArray.begin(), participantArray.end(), participantComp); + + auto participant1 = participantArray[0].Obj(); + ASSERT_EQ(participant1.getField("name").String(), "shard1"); + ASSERT_EQ(participant1.getField("coordinator").boolean(), true); + + auto participant2 = participantArray[1].Obj(); + ASSERT_EQ(participant2.getField("name").String(), "shard2"); + ASSERT_EQ(participant2.getField("coordinator").boolean(), false); + + txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse); + + txnRouter().beginOrContinueTxn( + operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); + + // Verify participants array has been updated with proper ReadOnly responses. + + state = txnRouter().reportState(operationContext(), true /* sessionIsActive */); + transactionDocument = state.getObjectField("transaction"); + participantArray = transactionDocument.getField("participants").Array(); + + ASSERT_EQ(participantArray.size(), 2U); + std::sort(participantArray.begin(), participantArray.end(), participantComp); + + participant1 = participantArray[0].Obj(); + ASSERT_EQ(participant1.getField("name").String(), "shard1"); + ASSERT_EQ(participant1.getField("coordinator").boolean(), true); + ASSERT_EQ(participant1.getField("readOnly").boolean(), false); + + participant2 = participantArray[1].Obj(); + ASSERT_EQ(participant2.getField("name").String(), "shard2"); + ASSERT_EQ(participant2.getField("coordinator").boolean(), false); + ASSERT_EQ(participant2.getField("readOnly").boolean(), true); + + ASSERT_EQ(transactionDocument.getField("numNonReadOnlyParticipants").numberInt(), 1); + ASSERT_EQ(transactionDocument.getField("numReadOnlyParticipants").numberInt(), 1); + + ASSERT_EQ(state.getField("active").boolean(), true); + ASSERT_GTE(transactionDocument.getField("timeOpenMicros").numberLong(), 0); +} + +TEST_F(TransactionRouterMetricsTest, ReportResourcesCommit) { + beginTxnWithDefaultTxnNumber(); + + auto clockSource = preciseClockSource(); + auto commitTime = Date_t::now(); + clockSource->reset(commitTime); + + runTwoPhaseCommit(); + + // Verify commit is reported as expected. + + auto state = txnRouter().reportState(operationContext(), true /* sessionIsActive */); + auto transactionDocument = state.getObjectField("transaction"); + ASSERT_EQ(dateFromISOString(transactionDocument.getField("commitStartWallClockTime").String()), + commitTime); + ASSERT_EQ(transactionDocument.getField("commitType").String(), "twoPhaseCommit"); +} + +TEST_F(TransactionRouterMetricsTest, ReportResourcesRecoveryCommit) { + beginSlowRecoverCommitWithDefaultTxnNumber(); + runRecoverWithTokenCommit(boost::none); + + // Verify that the participant list does not exist if the commit type is recovery. + + auto state = txnRouter().reportState(operationContext(), true /* sessionIsActive */); + auto transactionDocument = state.getObjectField("transaction"); + ASSERT_EQ(transactionDocument.hasField("participants"), false); +} + } // unnamed namespace } // namespace mongo |