diff options
author | Spencer T Brody <spencer@mongodb.com> | 2018-08-15 18:10:03 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2018-08-22 18:29:06 -0400 |
commit | b313d4dc086c039af423cdf4bdf25b9091455ce5 (patch) | |
tree | 26a5b477ad7c129cf9f6fb0d683983682d703bd5 | |
parent | d8810253bff426c58ee9040689302f8964053c9c (diff) | |
download | mongo-b313d4dc086c039af423cdf4bdf25b9091455ce5.tar.gz |
SERVER-36331 Kill running op when a transaction expires
(cherry picked from commit b83c5312dc5f437480de10487f945933a96a7ccd)
-rw-r--r-- | buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml | 1 | ||||
-rw-r--r-- | jstests/core/txns/kill_op_on_txn_expiry.js | 95 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/session.h | 16 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 4 |
5 files changed, 145 insertions, 9 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml index d7222328cbe..09e93a50555 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml @@ -17,6 +17,7 @@ selector: # the CheckReplDBHashInBackground hook doesn't want transactions to be reaped while it is running. - jstests/core/txns/abort_expired_transaction.js - jstests/core/txns/abort_transaction_thread_does_not_block_on_locks.js + - jstests/core/txns/kill_op_on_txn_expiry.js # The set_param1.js test attempts to compare the response from running the {getParameter: "*"} # command multiple times, which may observe the change to the "transactionLifetimeLimitSeconds" # server parameter. diff --git a/jstests/core/txns/kill_op_on_txn_expiry.js b/jstests/core/txns/kill_op_on_txn_expiry.js new file mode 100644 index 00000000000..298b5d0926e --- /dev/null +++ b/jstests/core/txns/kill_op_on_txn_expiry.js @@ -0,0 +1,95 @@ +// Test that ongoing operations in a transaction are interrupted when the transaction expires. +// @tags: [uses_transactions] +(function() { + "use strict"; + + load('jstests/libs/parallelTester.js'); + load("jstests/libs/check_log.js"); + + const dbName = "test"; + const collName = "kill_op_on_txn_expiry"; + const testDB = db.getSiblingDB(dbName); + const testColl = testDB[collName]; + + testDB.runCommand({drop: collName, writeConcern: {w: "majority"}}); + assert.commandWorked( + testDB.createCollection(testColl.getName(), {writeConcern: {w: "majority"}})); + + const sessionOptions = {causalConsistency: false}; + const session = db.getMongo().startSession(sessionOptions); + const sessionDb = session.getDatabase(dbName); + const sessionColl = sessionDb[collName]; + + // Need the original 'transactionLifetimeLimitSeconds' value so that we can reset it back at the + // end of the test. + const res = assert.commandWorked( + db.adminCommand({getParameter: 1, transactionLifetimeLimitSeconds: 1})); + const originalTransactionLifetimeLimitSeconds = res.transactionLifetimeLimitSeconds; + + // Decrease transactionLifetimeLimitSeconds so it expires faster + jsTest.log("Decrease transactionLifetimeLimitSeconds from " + + originalTransactionLifetimeLimitSeconds + " to 30 seconds."); + assert.commandWorked(db.adminCommand({setParameter: 1, transactionLifetimeLimitSeconds: 30})); + + try { + jsTestLog("Starting transaction"); + + let txnNumber = 0; + assert.commandWorked(testColl.runCommand({ + insert: collName, + documents: [{_id: 0}], + txnNumber: NumberLong(txnNumber), + startTransaction: true, + autocommit: false, + lsid: session.getSessionId(), + })); + + jsTestLog("Enabling fail point to block batch inserts"); + assert.commandWorked( + testDB.adminCommand({configureFailPoint: "hangDuringBatchInsert", mode: "alwaysOn"})); + // Clear ramlog so checkLog can't find log messages from previous times this fail point was + // enabled. + assert.commandWorked(testDB.adminCommand({clearLog: 'global'})); + + jsTestLog("Starting insert operation in parallel thread"); + let workerThread = new ScopedThread((sessionId, txnNumber, dbName, collName) => { + // Deserialize the session ID from its string representation. + sessionId = eval("(" + sessionId + ")"); + + let coll = db.getSiblingDB(dbName).getCollection(collName); + assert.commandFailedWithCode(coll.runCommand({ + insert: collName, + documents: [{_id: 1}], + txnNumber: NumberLong(txnNumber), + autocommit: false, + lsid: sessionId + }), + ErrorCodes.ExceededTimeLimit); + + }, tojson(session.getSessionId()), txnNumber, dbName, collName); + workerThread.start(); + + jsTestLog("Wait for insert to be blocked"); + checkLog.contains(db.getMongo(), "hangDuringBatchInsert fail point enabled"); + + jsTestLog("Wait for the transaction to expire"); + checkLog.contains(db.getMongo(), "Aborting transaction with txnNumber " + txnNumber); + + jsTestLog("Disabling fail point to enable insert to proceed and detect that the session " + + "has been killed"); + assert.commandWorked( + testDB.adminCommand({configureFailPoint: "hangDuringBatchInsert", mode: "off"})); + + workerThread.join(); + assert(!workerThread.hasFailed()); + } finally { + // Must ensure that the transactionLifetimeLimitSeconds is reset so that it does not impact + // other tests in the suite. + assert.commandWorked(db.adminCommand({ + setParameter: 1, + transactionLifetimeLimitSeconds: originalTransactionLifetimeLimitSeconds + })); + } + + session.endSession(); +}()); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 83139595f53..8537513bd28 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -292,6 +292,18 @@ const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} +void Session::setCurrentOperation(OperationContext* currentOperation) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_currentOperation); + _currentOperation = currentOperation; +} + +void Session::clearCurrentOperation() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_currentOperation); + _currentOperation = nullptr; +} + void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { if (opCtx->getClient()->isInDirectClient()) { return; @@ -838,22 +850,34 @@ void Session::unstashTransactionResources(OperationContext* opCtx, const std::st void Session::abortArbitraryTransaction() { stdx::lock_guard<stdx::mutex> lock(_mutex); - _abortArbitraryTransaction(lock); + + if (_txnState != MultiDocumentTransactionState::kInProgress) { + return; + } + + _abortTransaction(lock); } void Session::abortArbitraryTransactionIfExpired() { stdx::lock_guard<stdx::mutex> lock(_mutex); - if (!_transactionExpireDate || _transactionExpireDate >= Date_t::now()) { + if (_txnState != MultiDocumentTransactionState::kInProgress || !_transactionExpireDate || + _transactionExpireDate >= Date_t::now()) { return; } - _abortArbitraryTransaction(lock); -} -void Session::_abortArbitraryTransaction(WithLock lock) { - if (_txnState != MultiDocumentTransactionState::kInProgress) { - return; + if (_currentOperation) { + // If an operation is still running for this transaction when it expires, kill the currently + // running operation. + stdx::lock_guard<Client> clientLock(*_currentOperation->getClient()); + getGlobalServiceContext()->killOperation(_currentOperation, ErrorCodes::ExceededTimeLimit); } + // Log after killing the current operation because jstests may wait to see this log message to + // imply that the operation has been killed. + log() << "Aborting transaction with txnNumber " << _activeTxnNumber << " on session with lsid " + << _sessionId.getId() + << " because it has been running for longer than 'transactionLifetimeLimitSeconds'"; + _abortTransaction(lock); } diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 7c10b3b8825..4b0ac53b270 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -378,6 +378,16 @@ public: } /** + * Sets the current operation running on this Session. + */ + void setCurrentOperation(OperationContext* currentOperation); + + /** + * Clears the current operation running on this Session. + */ + void clearCurrentOperation(); + + /** * Returns a new oplog entry if the given entry has transaction state embedded within in. * The new oplog entry will contain the operation needed to replicate the transaction * table. @@ -430,8 +440,6 @@ private: std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteTs); - void _abortArbitraryTransaction(WithLock); - // Releases stashed transaction resources to abort the transaction. void _abortTransaction(WithLock); @@ -457,6 +465,10 @@ private: // Condition variable notified when we finish an attempt to commit the global WUOW. stdx::condition_variable _commitcv; + // A pointer back to the currently running operation on this Session, or nullptr if there + // is no operation currently running for the Session. + OperationContext* _currentOperation{nullptr}; + // Specifies whether the session information needs to be refreshed from storage bool _isValid{false}; diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 24fcea6b37e..0b444f2d2e4 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -279,6 +279,7 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx, checkedOutSession->get()->beginOrContinueTxn( opCtx, *opCtx->getTxnNumber(), autocommit, startTransaction, dbName, cmdName); } + session->setCurrentOperation(opCtx); } OperationContextSession::~OperationContextSession() { @@ -289,6 +290,9 @@ OperationContextSession::~OperationContextSession() { } auto& checkedOutSession = operationSessionDecoration(_opCtx); + if (checkedOutSession) { + checkedOutSession->get()->clearCurrentOperation(); + } // We acquire a Client lock here to guard the destruction of this session so that references to // this session are safe to use while the lock is held. stdx::lock_guard<Client> lk(*_opCtx->getClient()); |