summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2018-08-15 18:10:03 -0400
committerSpencer T Brody <spencer@mongodb.com>2018-08-22 18:29:06 -0400
commitb313d4dc086c039af423cdf4bdf25b9091455ce5 (patch)
tree26a5b477ad7c129cf9f6fb0d683983682d703bd5
parentd8810253bff426c58ee9040689302f8964053c9c (diff)
downloadmongo-b313d4dc086c039af423cdf4bdf25b9091455ce5.tar.gz
SERVER-36331 Kill running op when a transaction expires
(cherry picked from commit b83c5312dc5f437480de10487f945933a96a7ccd)
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml1
-rw-r--r--jstests/core/txns/kill_op_on_txn_expiry.js95
-rw-r--r--src/mongo/db/session.cpp38
-rw-r--r--src/mongo/db/session.h16
-rw-r--r--src/mongo/db/session_catalog.cpp4
5 files changed, 145 insertions, 9 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
index d7222328cbe..09e93a50555 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
@@ -17,6 +17,7 @@ selector:
# the CheckReplDBHashInBackground hook doesn't want transactions to be reaped while it is running.
- jstests/core/txns/abort_expired_transaction.js
- jstests/core/txns/abort_transaction_thread_does_not_block_on_locks.js
+ - jstests/core/txns/kill_op_on_txn_expiry.js
# The set_param1.js test attempts to compare the response from running the {getParameter: "*"}
# command multiple times, which may observe the change to the "transactionLifetimeLimitSeconds"
# server parameter.
diff --git a/jstests/core/txns/kill_op_on_txn_expiry.js b/jstests/core/txns/kill_op_on_txn_expiry.js
new file mode 100644
index 00000000000..298b5d0926e
--- /dev/null
+++ b/jstests/core/txns/kill_op_on_txn_expiry.js
@@ -0,0 +1,95 @@
+// Test that ongoing operations in a transaction are interrupted when the transaction expires.
+// @tags: [uses_transactions]
+(function() {
+ "use strict";
+
+ load('jstests/libs/parallelTester.js');
+ load("jstests/libs/check_log.js");
+
+ const dbName = "test";
+ const collName = "kill_op_on_txn_expiry";
+ const testDB = db.getSiblingDB(dbName);
+ const testColl = testDB[collName];
+
+ testDB.runCommand({drop: collName, writeConcern: {w: "majority"}});
+ assert.commandWorked(
+ testDB.createCollection(testColl.getName(), {writeConcern: {w: "majority"}}));
+
+ const sessionOptions = {causalConsistency: false};
+ const session = db.getMongo().startSession(sessionOptions);
+ const sessionDb = session.getDatabase(dbName);
+ const sessionColl = sessionDb[collName];
+
+ // Need the original 'transactionLifetimeLimitSeconds' value so that we can reset it back at the
+ // end of the test.
+ const res = assert.commandWorked(
+ db.adminCommand({getParameter: 1, transactionLifetimeLimitSeconds: 1}));
+ const originalTransactionLifetimeLimitSeconds = res.transactionLifetimeLimitSeconds;
+
+ // Decrease transactionLifetimeLimitSeconds so it expires faster
+ jsTest.log("Decrease transactionLifetimeLimitSeconds from " +
+ originalTransactionLifetimeLimitSeconds + " to 30 seconds.");
+ assert.commandWorked(db.adminCommand({setParameter: 1, transactionLifetimeLimitSeconds: 30}));
+
+ try {
+ jsTestLog("Starting transaction");
+
+ let txnNumber = 0;
+ assert.commandWorked(testColl.runCommand({
+ insert: collName,
+ documents: [{_id: 0}],
+ txnNumber: NumberLong(txnNumber),
+ startTransaction: true,
+ autocommit: false,
+ lsid: session.getSessionId(),
+ }));
+
+ jsTestLog("Enabling fail point to block batch inserts");
+ assert.commandWorked(
+ testDB.adminCommand({configureFailPoint: "hangDuringBatchInsert", mode: "alwaysOn"}));
+ // Clear ramlog so checkLog can't find log messages from previous times this fail point was
+ // enabled.
+ assert.commandWorked(testDB.adminCommand({clearLog: 'global'}));
+
+ jsTestLog("Starting insert operation in parallel thread");
+ let workerThread = new ScopedThread((sessionId, txnNumber, dbName, collName) => {
+ // Deserialize the session ID from its string representation.
+ sessionId = eval("(" + sessionId + ")");
+
+ let coll = db.getSiblingDB(dbName).getCollection(collName);
+ assert.commandFailedWithCode(coll.runCommand({
+ insert: collName,
+ documents: [{_id: 1}],
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false,
+ lsid: sessionId
+ }),
+ ErrorCodes.ExceededTimeLimit);
+
+ }, tojson(session.getSessionId()), txnNumber, dbName, collName);
+ workerThread.start();
+
+ jsTestLog("Wait for insert to be blocked");
+ checkLog.contains(db.getMongo(), "hangDuringBatchInsert fail point enabled");
+
+ jsTestLog("Wait for the transaction to expire");
+ checkLog.contains(db.getMongo(), "Aborting transaction with txnNumber " + txnNumber);
+
+ jsTestLog("Disabling fail point to enable insert to proceed and detect that the session " +
+ "has been killed");
+ assert.commandWorked(
+ testDB.adminCommand({configureFailPoint: "hangDuringBatchInsert", mode: "off"}));
+
+ workerThread.join();
+ assert(!workerThread.hasFailed());
+ } finally {
+ // Must ensure that the transactionLifetimeLimitSeconds is reset so that it does not impact
+ // other tests in the suite.
+ assert.commandWorked(db.adminCommand({
+ setParameter: 1,
+ transactionLifetimeLimitSeconds: originalTransactionLifetimeLimitSeconds
+ }));
+ }
+
+ session.endSession();
+}());
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 83139595f53..8537513bd28 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -292,6 +292,18 @@ const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
+void Session::setCurrentOperation(OperationContext* currentOperation) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_currentOperation);
+ _currentOperation = currentOperation;
+}
+
+void Session::clearCurrentOperation() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_currentOperation);
+ _currentOperation = nullptr;
+}
+
void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
if (opCtx->getClient()->isInDirectClient()) {
return;
@@ -838,22 +850,34 @@ void Session::unstashTransactionResources(OperationContext* opCtx, const std::st
void Session::abortArbitraryTransaction() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _abortArbitraryTransaction(lock);
+
+ if (_txnState != MultiDocumentTransactionState::kInProgress) {
+ return;
+ }
+
+ _abortTransaction(lock);
}
void Session::abortArbitraryTransactionIfExpired() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (!_transactionExpireDate || _transactionExpireDate >= Date_t::now()) {
+ if (_txnState != MultiDocumentTransactionState::kInProgress || !_transactionExpireDate ||
+ _transactionExpireDate >= Date_t::now()) {
return;
}
- _abortArbitraryTransaction(lock);
-}
-void Session::_abortArbitraryTransaction(WithLock lock) {
- if (_txnState != MultiDocumentTransactionState::kInProgress) {
- return;
+ if (_currentOperation) {
+ // If an operation is still running for this transaction when it expires, kill the currently
+ // running operation.
+ stdx::lock_guard<Client> clientLock(*_currentOperation->getClient());
+ getGlobalServiceContext()->killOperation(_currentOperation, ErrorCodes::ExceededTimeLimit);
}
+ // Log after killing the current operation because jstests may wait to see this log message to
+ // imply that the operation has been killed.
+ log() << "Aborting transaction with txnNumber " << _activeTxnNumber << " on session with lsid "
+ << _sessionId.getId()
+ << " because it has been running for longer than 'transactionLifetimeLimitSeconds'";
+
_abortTransaction(lock);
}
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 7c10b3b8825..4b0ac53b270 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -378,6 +378,16 @@ public:
}
/**
+ * Sets the current operation running on this Session.
+ */
+ void setCurrentOperation(OperationContext* currentOperation);
+
+ /**
+ * Clears the current operation running on this Session.
+ */
+ void clearCurrentOperation();
+
+ /**
* Returns a new oplog entry if the given entry has transaction state embedded within in.
* The new oplog entry will contain the operation needed to replicate the transaction
* table.
@@ -430,8 +440,6 @@ private:
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteTs);
- void _abortArbitraryTransaction(WithLock);
-
// Releases stashed transaction resources to abort the transaction.
void _abortTransaction(WithLock);
@@ -457,6 +465,10 @@ private:
// Condition variable notified when we finish an attempt to commit the global WUOW.
stdx::condition_variable _commitcv;
+ // A pointer back to the currently running operation on this Session, or nullptr if there
+ // is no operation currently running for the Session.
+ OperationContext* _currentOperation{nullptr};
+
// Specifies whether the session information needs to be refreshed from storage
bool _isValid{false};
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 24fcea6b37e..0b444f2d2e4 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -279,6 +279,7 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx,
checkedOutSession->get()->beginOrContinueTxn(
opCtx, *opCtx->getTxnNumber(), autocommit, startTransaction, dbName, cmdName);
}
+ session->setCurrentOperation(opCtx);
}
OperationContextSession::~OperationContextSession() {
@@ -289,6 +290,9 @@ OperationContextSession::~OperationContextSession() {
}
auto& checkedOutSession = operationSessionDecoration(_opCtx);
+ if (checkedOutSession) {
+ checkedOutSession->get()->clearCurrentOperation();
+ }
// We acquire a Client lock here to guard the destruction of this session so that references to
// this session are safe to use while the lock is held.
stdx::lock_guard<Client> lk(*_opCtx->getClient());