summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2019-09-20 20:42:02 +0000
committerevergreen <evergreen@mongodb.com>2019-09-20 20:42:02 +0000
commite5cfae3ab4b3bb01ff73e8aa352a9597e99f35c3 (patch)
tree4fce57cabe8f7853e8b85a5111075efdcc152085
parentd3d645651c427e34122196b85e4f0ded41489223 (diff)
downloadmongo-e5cfae3ab4b3bb01ff73e8aa352a9597e99f35c3.tar.gz
SERVER-41956 Add integration and unit tests for killing commit and abort transaction commands on a prepared transaction
(cherry picked from commit 9927c83ac62350700247a30e99c2bda3cdc62d7d)
-rw-r--r--jstests/replsets/kill_prepared_transaction_commit_abort.js182
-rw-r--r--src/mongo/db/service_entry_point_common.cpp11
-rw-r--r--src/mongo/db/transaction_participant_test.cpp80
3 files changed, 273 insertions, 0 deletions
diff --git a/jstests/replsets/kill_prepared_transaction_commit_abort.js b/jstests/replsets/kill_prepared_transaction_commit_abort.js
new file mode 100644
index 00000000000..58a964a2bf8
--- /dev/null
+++ b/jstests/replsets/kill_prepared_transaction_commit_abort.js
@@ -0,0 +1,182 @@
+/**
+ * Test killing 'commitTransaction' and 'abortTransaction' operations on prepared transactions.
+ *
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+
+(function() {
+"use strict";
+// TODO (SERVER-42987) Re-enable this test.
+if (true) {
+ return;
+}
+load("jstests/core/txns/libs/prepare_helpers.js");
+load("jstests/libs/parallelTester.js"); // for ScopedThread.
+
+const name = "kill_prepared_transaction_commit_abort";
+const rst = new ReplSetTest({
+ nodes: 1,
+});
+rst.startSet();
+rst.initiate();
+
+const TxnState = {
+ InProgress: 1,
+ Committed: 2,
+ Aborted: 3,
+};
+
+const dbName = "test";
+const collName = name;
+
+const primary = rst.getPrimary();
+const testDB = primary.getDB(dbName);
+
+// A latch that will act as a signal to shut down the killOp thread.
+let shutdownLatch = new CountDownLatch(1);
+assert.commandWorked(testDB.runCommand({create: collName}));
+
+/**
+ * A function that continuously kills any running 'commitTransaction' or 'abortTransaction' commands
+ * on the server, until it receives a shutdown signal via 'shutdownLatch'.
+ */
+function killOpThread(host, dbName, collName, shutdownLatch) {
+ const nodeDB = new Mongo(host).getDB(dbName);
+ jsTestLog("killOp thread starting.");
+ while (shutdownLatch.getCount() > 0) {
+ let filter = {
+ "$or": [
+ {"command.commitTransaction": 1, active: true},
+ {"command.abortTransaction": 1, active: true}
+ ]
+ };
+ let ops = nodeDB.currentOp(filter).inprog;
+ if (ops.length > 0) {
+ print("Going to run 'killOp' on " + ops.length + " ops.");
+ }
+ ops.forEach(op => {
+ if (op.opid) {
+ nodeDB.killOp(op.opid);
+ }
+ });
+ }
+ jsTestLog("killOp thread exiting.");
+}
+
+/**
+ * Creates 'num' sessions and starts and prepares a transaction on each. Returns an array of
+ * sessions included with the commit timestamp for each prepared transaction and the current state
+ * of that transaction.
+ */
+function createPreparedTransactions(num) {
+ let sessions = [];
+ for (let i = 0; i < num; i++) {
+ const priSession = primary.startSession();
+ const priSessionDB = priSession.getDatabase(dbName);
+ const priSessionColl = priSessionDB.getCollection(collName);
+
+ priSession.startTransaction();
+ assert.commandWorked(priSessionColl.insert({_id: i}));
+ const prepareTimestamp = PrepareHelpers.prepareTransaction(priSession);
+ sessions.push(
+ {session: priSession, commitTs: prepareTimestamp, state: TxnState.InProgress});
+ }
+ return sessions;
+}
+
+/**
+ * Commit or abort transactions on all the given sessions until all transactions are complete. We
+ * choose to randomly commit or abort a given transaction with equal probability.
+ */
+function commitOrAbortAllTransactions(sessions) {
+ // Until all transactions have definitively completed, try to abort/commit the open,
+ // prepared transactions.
+ while (sessions.find(s => (s.state === TxnState.InProgress)) !== undefined) {
+ for (let i = 0; i < sessions.length; i++) {
+ // We don't need to commit an already completed transaction.
+ if (sessions[i].state !== TxnState.InProgress) {
+ continue;
+ }
+
+ // Randomly choose to commit or abort the transaction.
+ let sess = sessions[i];
+ let terminalStates = [TxnState.Committed, TxnState.Aborted];
+ let terminalState = terminalStates[Math.round(Math.random())];
+ let cmd = (terminalState === TxnState.Committed)
+ ? {commitTransaction: 1, commitTimestamp: sess.commitTs}
+ : {abortTransaction: 1};
+ let res = sess.session.getDatabase("admin").adminCommand(cmd);
+
+ if (res.ok === 1) {
+ // Mark the transaction's terminal state.
+ sessions[i].state = terminalState;
+ }
+ if (res.ok === 0) {
+ // We assume that transaction commit/abort should not fail for any reason other than
+ // interruption in this test. If the commit/abort was interrupted, then the command
+ // should have failed, and the transaction state should be unaffected.
+ assert.commandFailedWithCode(res, ErrorCodes.Interrupted);
+ print("Transaction " + i + " was interrupted.");
+ }
+ }
+ }
+}
+
+// The number of sessions and transactions to create.
+const numTxns = 100;
+
+// Make the server sleep a bit right after commit/abort commands start to make it more likely that
+// the kill op thread will be able to find and kill them.
+assert.commandWorked(primary.adminCommand({
+ configureFailPoint: "sleepMillisAfterCommandExecutionBegins",
+ mode: "alwaysOn",
+ data: {millis: 50, commands: {"commitTransaction": 1, "abortTransaction": 1}}
+}));
+
+jsTestLog("Creating sessions and preparing " + numTxns + " transactions.");
+let sessions = createPreparedTransactions(numTxns);
+
+jsTestLog("Starting the killOp thread.");
+let killThread = new Thread(killOpThread, primary.host, dbName, collName, shutdownLatch);
+killThread.start();
+
+jsTestLog("Committing/aborting all transactions.");
+commitOrAbortAllTransactions(sessions);
+
+// Make sure all transactions were completed.
+assert(sessions.every(s => (s.state === TxnState.Committed) || (s.state === TxnState.Aborted)));
+
+jsTestLog("Stopping the killOp thread.");
+shutdownLatch.countDown();
+killThread.join();
+
+jsTestLog("Checking visibility of all transaction operations.");
+
+// If a transaction committed then its document should be visible. If it aborted then its document
+// should not be visible.
+let docs = testDB[collName].find().toArray();
+let committedTxnIds =
+ sessions.reduce((acc, s, i) => (s.state === TxnState.Committed ? acc.concat(i) : acc), []);
+let commitCount = committedTxnIds.length;
+let abortCount = (sessions.length - committedTxnIds.length);
+jsTestLog("Committed " + commitCount + " transactions, Aborted " + abortCount + " transactions.");
+
+// Verify that the correct documents exist.
+let expectedDocs = committedTxnIds.map((i) => ({_id: i}));
+assert.sameMembers(docs, expectedDocs);
+
+// Assert that no prepared transactions are open on any of the sessions we started, and then end
+// each session.
+for (let i = 0; i < sessions.length; i++) {
+ const ops = testDB
+ .currentOp({
+ "lsid.id": sessions[i].session.getSessionId().id,
+ "transaction.timePreparedMicros": {$exists: true}
+ })
+ .inprog;
+ assert.eq(ops.length, 0);
+ sessions[i].session.endSession();
+}
+
+rst.stopSet();
+})();
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index a0a3b7812a8..a17b1cabe71 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -100,6 +100,7 @@ MONGO_FAIL_POINT_DEFINE(rsStopGetMore);
MONGO_FAIL_POINT_DEFINE(respondWithNotPrimaryInCommandDispatch);
MONGO_FAIL_POINT_DEFINE(skipCheckingForNotMasterInCommandDispatch);
MONGO_FAIL_POINT_DEFINE(waitAfterReadCommandFinishesExecution);
+MONGO_FAIL_POINT_DEFINE(sleepMillisAfterCommandExecutionBegins);
// Tracks the number of times a legacy unacknowledged write failed due to
// not master error resulted in network disconnection.
@@ -636,6 +637,16 @@ void execCommandDatabase(OperationContext* opCtx,
CurOp::get(opCtx)->setCommand_inlock(command);
}
+ MONGO_FAIL_POINT_BLOCK(sleepMillisAfterCommandExecutionBegins, arg) {
+ const BSONObj& data = arg.getData();
+ auto numMillis = data["millis"].numberInt();
+ auto commands = data["commands"].Obj().getFieldNames<std::set<std::string>>();
+ // Only sleep for one of the specified commands.
+ if (commands.find(command->getName()) != commands.end()) {
+ mongo::sleepmillis(numMillis);
+ }
+ }
+
// TODO: move this back to runCommands when mongos supports OperationContext
// see SERVER-18515 for details.
rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 047310244f5..025092ccde7 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -755,6 +755,86 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPrepareDoesNotAbortTransaction) {
ASSERT_FALSE(txnParticipant.transactionIsAborted());
}
+TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+
+ // Prepare the transaction.
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+ auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {});
+ opCtx()->markKilled(ErrorCodes::Interrupted);
+ try {
+ // The commit should throw, since the operation was killed.
+ txnParticipant.commitPreparedTransaction(opCtx(), prepareTimestamp, boost::none);
+ } catch (const DBException& ex) {
+ ASSERT_EQ(ErrorCodes::Interrupted, ex.code());
+ }
+
+ // Check the session back in.
+ sessionCheckout->checkIn(opCtx());
+
+ // The transaction state should have been unaffected.
+ ASSERT_TRUE(txnParticipant.transactionIsPrepared());
+
+ auto commitPreparedFunc = [&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(_sessionId);
+ opCtx->setTxnNumber(_txnNumber);
+
+ // Check out the session and continue the transaction.
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto newTxnParticipant = TransactionParticipant::get(opCtx);
+ newTxnParticipant.beginOrContinue(
+ opCtx, *(opCtx->getTxnNumber()), false, boost::none /*startNewTxn*/);
+
+ newTxnParticipant.unstashTransactionResources(opCtx, "commitTransaction");
+ newTxnParticipant.commitPreparedTransaction(opCtx, prepareTimestamp, boost::none);
+ };
+
+ // Now try to commit the transaction again, with a fresh operation context.
+ runFunctionFromDifferentOpCtx(commitPreparedFunc);
+ ASSERT_TRUE(txnParticipant.transactionIsCommitted());
+}
+
+TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+
+ // Prepare the transaction.
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+ auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {});
+ opCtx()->markKilled(ErrorCodes::Interrupted);
+ try {
+ // The abort should throw, since the operation was killed.
+ txnParticipant.abortActiveTransaction(opCtx());
+ } catch (const DBException& ex) {
+ ASSERT_EQ(ErrorCodes::Interrupted, ex.code());
+ }
+
+ // Check the session back in.
+ sessionCheckout->checkIn(opCtx());
+
+ // The transaction state should have been unaffected.
+ ASSERT_TRUE(txnParticipant.transactionIsPrepared());
+
+ auto commitPreparedFunc = [&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(_sessionId);
+ opCtx->setTxnNumber(_txnNumber);
+
+ // Check out the session and continue the transaction.
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto newTxnParticipant = TransactionParticipant::get(opCtx);
+ newTxnParticipant.beginOrContinue(
+ opCtx, *(opCtx->getTxnNumber()), false, boost::none /*startNewTxn*/);
+
+ newTxnParticipant.unstashTransactionResources(opCtx, "commitTransaction");
+ newTxnParticipant.commitPreparedTransaction(opCtx, prepareTimestamp, boost::none);
+ };
+
+ // Now try to commit the transaction again, with a fresh operation context.
+ runFunctionFromDifferentOpCtx(commitPreparedFunc);
+ ASSERT_TRUE(txnParticipant.transactionIsCommitted());
+}
+
TEST_F(TxnParticipantTest, ThrowDuringOnTransactionPrepareAbortsTransaction) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());