summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamyukta Lanka <samy.lanka@mongodb.com>2019-09-24 18:54:51 +0000
committerevergreen <evergreen@mongodb.com>2019-09-24 18:54:51 +0000
commit1781933c126d2aff697cdb42a8f84b8212db23ad (patch)
treeb903149750710b82b3f8429c52b61cd4c4d02a74
parent4fc3e8d5910547e22d3c4afb22454ffaab343aa2 (diff)
downloadmongo-1781933c126d2aff697cdb42a8f84b8212db23ad.tar.gz
SERVER-40466 Unify checks for being in a multi-document transaction
(cherry picked from commit 77967c90b1a521108c052af235ce7de9742aa95e) SERVER-42755 recover_prepared_txn_with_multikey_write.js should verify absence of prepare conflicts on catalog after restart. (cherry picked from commit eaf81228328738b685532c7eb10f833362434061) SERVER-42750 Set opCtx->inMultiDocumentTransaction() in transaction recovery. (cherry picked from commit 16c17677704a4f979381d593106a0c356088fd30)
-rw-r--r--jstests/replsets/recover_prepared_txn_with_multikey_write.js27
-rw-r--r--src/mongo/db/catalog/index_catalog_entry_impl.cpp16
-rw-r--r--src/mongo/db/catalog_raii.cpp11
-rw-r--r--src/mongo/db/commands/distinct.cpp4
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp17
-rw-r--r--src/mongo/db/commands/find_cmd.cpp8
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp8
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp4
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp4
-rw-r--r--src/mongo/db/db_raii.cpp2
-rw-r--r--src/mongo/db/initialize_operation_session_info.cpp1
-rw-r--r--src/mongo/db/kill_sessions_local.cpp4
-rw-r--r--src/mongo/db/op_observer_impl.cpp17
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp1
-rw-r--r--src/mongo/db/operation_context.h17
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp47
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp4
-rw-r--r--src/mongo/db/query/get_executor.cpp19
-rw-r--r--src/mongo/db/query/get_executor.h4
-rw-r--r--src/mongo/db/read_concern_mongod.cpp7
-rw-r--r--src/mongo/db/repl/oplog.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp7
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp5
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp4
-rw-r--r--src/mongo/db/service_entry_point_common.cpp16
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp8
-rw-r--r--src/mongo/db/transaction_participant.cpp6
-rw-r--r--src/mongo/db/transaction_participant.h22
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp2
-rw-r--r--src/mongo/db/transaction_participant_test.cpp28
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp2
31 files changed, 174 insertions, 151 deletions
diff --git a/jstests/replsets/recover_prepared_txn_with_multikey_write.js b/jstests/replsets/recover_prepared_txn_with_multikey_write.js
index b898fce07ba..56b30995b0f 100644
--- a/jstests/replsets/recover_prepared_txn_with_multikey_write.js
+++ b/jstests/replsets/recover_prepared_txn_with_multikey_write.js
@@ -2,9 +2,12 @@
* Test that replication recovery can reconstruct a prepared transaction that includes a write that
* sets the multikey flag.
*
- * @tags: [uses_transactions, uses_prepare_transaction]
+ * @tags: [uses_transactions, uses_prepare_transaction, requires_persistence]
*/
(function() {
+"use strict";
+load("jstests/core/txns/libs/prepare_helpers.js");
+
const rst = new ReplSetTest({
nodes: [
{},
@@ -21,7 +24,7 @@ const rst = new ReplSetTest({
rst.startSet();
rst.initiate();
-const primary = rst.getPrimary();
+let primary = rst.getPrimary();
const session = primary.getDB("test").getMongo().startSession();
const sessionDB = session.getDatabase("test");
@@ -32,12 +35,30 @@ sessionColl.createIndex({x: 1});
session.startTransaction();
// Make the index multikey.
+jsTestLog("Making the index multikey.");
sessionColl.insert({x: [1, 2, 3]});
-assert.commandWorked(sessionDB.adminCommand({prepareTransaction: 1}));
+// Make sure { w: "majority" } is always used, otherwise the prepare may not get journaled before
+// the shutdown below.
+PrepareHelpers.prepareTransaction(session);
// Do an unclean shutdown so we don't force a checkpoint, and then restart.
+jsTestLog("Killing the primary.");
rst.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL});
rst.restart(0);
+jsTestLog("Waiting for the node to get elected again.");
+primary = rst.getPrimary();
+
+jsTestLog("Making sure no prepare conflicts are generated on the catalog.");
+assert.commandWorked(primary.adminCommand({listDatabases: 1}));
+
+jsTestLog("Aborting the prepared transaction.");
+assert.commandWorked(primary.adminCommand({
+ abortTransaction: 1,
+ lsid: session.getSessionId(),
+ txnNumber: session.getTxnNumber_forTesting(),
+ autocommit: false
+}));
+
rst.stopSet();
}());
diff --git a/src/mongo/db/catalog/index_catalog_entry_impl.cpp b/src/mongo/db/catalog/index_catalog_entry_impl.cpp
index aa711498a71..234240b0ecb 100644
--- a/src/mongo/db/catalog/index_catalog_entry_impl.cpp
+++ b/src/mongo/db/catalog/index_catalog_entry_impl.cpp
@@ -121,12 +121,11 @@ void IndexCatalogEntryImpl::init(std::unique_ptr<IndexAccessMethod> accessMethod
}
bool IndexCatalogEntryImpl::isReady(OperationContext* opCtx) const {
- auto txnParticipant = TransactionParticipant::get(opCtx);
// For multi-document transactions, we can open a snapshot prior to checking the
// minimumSnapshotVersion on a collection. This means we are unprotected from reading
// out-of-sync index catalog entries. To fix this, we uassert if we detect that the
// in-memory catalog is out-of-sync with the on-disk catalog.
- if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) {
+ if (opCtx->inMultiDocumentTransaction()) {
if (!_catalogIsPresent(opCtx) || _catalogIsReady(opCtx) != _isReady) {
uasserted(ErrorCodes::SnapshotUnavailable,
str::stream() << "Unable to read from a snapshot due to pending collection"
@@ -152,10 +151,12 @@ bool IndexCatalogEntryImpl::isMultikey(OperationContext* opCtx) const {
// and the read-path will query this state before determining there is no interesting multikey
// state. Note, it's always legal, though potentially wasteful, to return `true`.
auto txnParticipant = TransactionParticipant::get(opCtx);
- if (!txnParticipant || !txnParticipant.inMultiDocumentTransaction()) {
+ if (!txnParticipant || !txnParticipant.transactionIsOpen()) {
return false;
}
+ invariant(txnParticipant);
+
for (const MultikeyPathInfo& path : txnParticipant.getUncommittedMultikeyPathInfos()) {
if (path.nss == NamespaceString(_ns) && path.indexName == _descriptor->indexName()) {
return true;
@@ -169,10 +170,12 @@ MultikeyPaths IndexCatalogEntryImpl::getMultikeyPaths(OperationContext* opCtx) c
stdx::lock_guard<stdx::mutex> lk(_indexMultikeyPathsMutex);
auto txnParticipant = TransactionParticipant::get(opCtx);
- if (!txnParticipant || !txnParticipant.inMultiDocumentTransaction()) {
+ if (!txnParticipant || !txnParticipant.transactionIsOpen()) {
return _indexMultikeyPaths;
}
+ invariant(txnParticipant);
+
MultikeyPaths ret = _indexMultikeyPaths;
for (const MultikeyPathInfo& path : txnParticipant.getUncommittedMultikeyPathInfos()) {
if (path.nss == NamespaceString(_ns) && path.indexName == _descriptor->indexName()) {
@@ -292,7 +295,7 @@ void IndexCatalogEntryImpl::setMultikey(OperationContext* opCtx,
// multikey flag write and the parent transaction. We can do this write separately and commit it
// before the parent transaction commits.
auto txnParticipant = TransactionParticipant::get(opCtx);
- if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) {
+ if (opCtx->inMultiDocumentTransaction()) {
TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
writeConflictRetry(opCtx, "set index multikey", _ns.ns(), [&] {
WriteUnitOfWork wuow(opCtx);
@@ -340,7 +343,8 @@ void IndexCatalogEntryImpl::setMultikey(OperationContext* opCtx,
// multikey flag until after the transaction commits, we track extra information here to let
// subsequent readers within the same transaction know if this index was set as multikey by a
// previous write in the transaction.
- if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) {
+ if (opCtx->inMultiDocumentTransaction()) {
+ invariant(txnParticipant);
txnParticipant.addUncommittedMultikeyPathInfo(
MultikeyPathInfo{_ns, _descriptor->indexName(), std::move(paths)});
}
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp
index fee42dfd059..3ac870ff8b5 100644
--- a/src/mongo/db/catalog_raii.cpp
+++ b/src/mongo/db/catalog_raii.cpp
@@ -110,11 +110,12 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
<< nsOrUUID.toString());
if (_coll) {
- // Unlike read concern majority, read concern snapshot cannot yield and wait when there are
- // pending catalog changes. Instead, we must return an error in such situations. We ignore
- // this restriction for the oplog, since it never has pending catalog changes.
- auto readConcernLevel = repl::ReadConcernArgs::get(opCtx).getLevel();
- if (readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern &&
+ // If we are in a transaction and have a read timestamp, we cannot yield and wait when there
+ // are pending catalog changes. Instead, we must return an error in such situations. We
+ // ignore this restriction for the oplog, since it never has pending catalog changes.
+ if (opCtx->inMultiDocumentTransaction() &&
+ opCtx->recoveryUnit()->getTimestampReadSource() !=
+ RecoveryUnit::ReadSource::kNoTimestamp &&
_resolvedNss != NamespaceString::kRsOplogNamespace) {
auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
if (mySnapshot) {
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index d520b86450c..507894ffb4a 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -54,7 +54,6 @@
#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/query/view_response_formatter.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/transaction_participant.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -184,12 +183,11 @@ public:
// Distinct doesn't filter orphan documents so it is not allowed to run on sharded
// collections in multi-document transactions.
- auto txnParticipant = TransactionParticipant::get(opCtx);
uassert(ErrorCodes::OperationNotSupportedInTransaction,
"Cannot run 'distinct' on a sharded collection in a multi-document transaction. "
"Please see http://dochub.mongodb.org/core/transaction-distinct for a recommended "
"alternative.",
- !txnParticipant || !txnParticipant.inMultiDocumentTransaction() ||
+ !opCtx->inMultiDocumentTransaction() ||
!CollectionShardingState::get(opCtx, nss)->getCurrentMetadata()->isSharded());
const ExtensionsCallbackReal extensionsCallback(opCtx, &nss);
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index ad526ba0d15..8f5ce4827b6 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -127,11 +127,8 @@ void makeUpdateRequest(OperationContext* opCtx,
requestOut->setMulti(false);
requestOut->setExplain(explain);
- const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- requestOut->setYieldPolicy(readConcernArgs.getLevel() ==
- repl::ReadConcernLevel::kSnapshotReadConcern
- ? PlanExecutor::INTERRUPT_ONLY
- : PlanExecutor::YIELD_AUTO);
+ requestOut->setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY
+ : PlanExecutor::YIELD_AUTO);
}
void makeDeleteRequest(OperationContext* opCtx,
@@ -148,11 +145,8 @@ void makeDeleteRequest(OperationContext* opCtx,
requestOut->setReturnDeleted(true); // Always return the old value.
requestOut->setExplain(explain);
- const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- requestOut->setYieldPolicy(readConcernArgs.getLevel() ==
- repl::ReadConcernLevel::kSnapshotReadConcern
- ? PlanExecutor::INTERRUPT_ONLY
- : PlanExecutor::YIELD_AUTO);
+ requestOut->setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY
+ : PlanExecutor::YIELD_AUTO);
}
void appendCommandResponse(const PlanExecutor* exec,
@@ -329,8 +323,7 @@ public:
maybeDisableValidation.emplace(opCtx);
}
- const auto txnParticipant = TransactionParticipant::get(opCtx);
- const auto inTransaction = txnParticipant && txnParticipant.inMultiDocumentTransaction();
+ const auto inTransaction = opCtx->inMultiDocumentTransaction();
uassert(50781,
str::stream() << "Cannot write to system collection " << nsString.ns()
<< " within a transaction.",
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 69a144d89c6..e7abdd8a240 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -271,13 +271,11 @@ public:
const auto txnParticipant = TransactionParticipant::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"It is illegal to open a tailable cursor in a transaction",
- !txnParticipant ||
- !(txnParticipant.inMultiDocumentTransaction() && qr->isTailable()));
+ !(opCtx->inMultiDocumentTransaction() && qr->isTailable()));
uassert(ErrorCodes::OperationNotSupportedInTransaction,
"The 'readOnce' option is not supported within a transaction.",
- !txnParticipant || !txnParticipant.inMultiDocumentTransaction() ||
- !qr->isReadOnce());
+ !txnParticipant || !opCtx->inMultiDocumentTransaction() || !qr->isReadOnce());
uassert(ErrorCodes::InvalidOptions,
"The '$_internalReadAtClusterTime' option is only supported when testing"
@@ -287,7 +285,7 @@ public:
uassert(
ErrorCodes::OperationNotSupportedInTransaction,
"The '$_internalReadAtClusterTime' option is not supported within a transaction.",
- !txnParticipant || !txnParticipant.inMultiDocumentTransaction() ||
+ !txnParticipant || !opCtx->inMultiDocumentTransaction() ||
!qr->getReadAtClusterTime());
uassert(ErrorCodes::InvalidOptions,
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index bfd84ede705..dcf1fbd5448 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -68,7 +68,6 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_options.h"
-#include "mongo/db/transaction_participant.h"
#include "mongo/db/views/view.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/stdx/memory.h"
@@ -391,9 +390,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)),
uuid);
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
- auto txnParticipant = TransactionParticipant::get(opCtx);
- expCtx->inMultiDocumentTransaction =
- txnParticipant && txnParticipant.inMultiDocumentTransaction();
+ expCtx->inMultiDocumentTransaction = opCtx->inMultiDocumentTransaction();
return expCtx;
}
@@ -528,10 +525,9 @@ Status runAggregate(OperationContext* opCtx,
liteParsedPipeline.assertSupportsReadConcern(
opCtx, request.getExplain(), serverGlobalParams.enableMajorityReadConcern);
} catch (const DBException& ex) {
- auto txnParticipant = TransactionParticipant::get(opCtx);
// If we are in a multi-document transaction, we intercept the 'readConcern'
// assertion in order to provide a more descriptive error message and code.
- if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) {
+ if (opCtx->inMultiDocumentTransaction()) {
return {ErrorCodes::OperationNotSupportedInTransaction,
ex.toStatus("Operation not permitted in transaction").reason()};
}
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index d18a85a4064..ecb756b67d6 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -114,7 +114,7 @@ public:
uassert(ErrorCodes::NoSuchTransaction,
"Transaction isn't in progress",
- txnParticipant.inMultiDocumentTransaction());
+ txnParticipant.transactionIsOpen());
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&hangBeforeCommitingTxn, opCtx, "hangBeforeCommitingTxn");
@@ -184,7 +184,7 @@ public:
uassert(ErrorCodes::NoSuchTransaction,
"Transaction isn't in progress",
- txnParticipant.inMultiDocumentTransaction());
+ txnParticipant.transactionIsOpen());
CurOpFailpointHelpers::waitWhileFailPointEnabled(
&hangBeforeAbortingTxn, opCtx, "hangBeforeAbortingTxn");
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index c427ff42598..422b19f6be0 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -51,7 +51,6 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
-#include "mongo/db/transaction_participant.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/stale_exception.h"
@@ -261,8 +260,7 @@ private:
}
void _transactionChecks(OperationContext* opCtx) const {
- auto txnParticipant = TransactionParticipant::get(opCtx);
- if (!txnParticipant || !txnParticipant.inMultiDocumentTransaction())
+ if (!opCtx->inMultiDocumentTransaction())
return;
uassert(50791,
str::stream() << "Cannot write to system collection " << ns().toString()
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index c851fcfcc9c..fd04dac348d 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -377,7 +377,7 @@ LockMode getLockModeForQuery(OperationContext* opCtx, const boost::optional<Name
invariant(opCtx);
// Use IX locks for multi-statement transactions; otherwise, use IS locks.
- if (opCtx->getWriteUnitOfWork()) {
+ if (opCtx->inMultiDocumentTransaction()) {
uassert(51071,
"Cannot query system.views within a transaction",
!nss || !nss->isSystemDotViews());
diff --git a/src/mongo/db/initialize_operation_session_info.cpp b/src/mongo/db/initialize_operation_session_info.cpp
index 0257878e145..2059dae0623 100644
--- a/src/mongo/db/initialize_operation_session_info.cpp
+++ b/src/mongo/db/initialize_operation_session_info.cpp
@@ -130,6 +130,7 @@ OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext*
uassert(ErrorCodes::InvalidOptions,
"Specifying autocommit=true is not allowed.",
!osi.getAutocommit().value());
+ opCtx->setInMultiDocumentTransaction();
} else {
uassert(ErrorCodes::InvalidOptions,
"'startTransaction' field requires 'autocommit' field to also be specified",
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp
index 8c217cb7e25..c44234c7568 100644
--- a/src/mongo/db/kill_sessions_local.cpp
+++ b/src/mongo/db/kill_sessions_local.cpp
@@ -91,7 +91,7 @@ void killSessionsAbortUnpreparedTransactions(OperationContext* opCtx,
matcher,
[](const ObservableSession& session) {
auto participant = TransactionParticipant::get(session);
- return participant.inMultiDocumentTransaction() && !participant.transactionIsPrepared();
+ return participant.transactionIsOpen() && !participant.transactionIsPrepared();
},
[](OperationContext* opCtx, const SessionToKill& session) {
TransactionParticipant::get(session).abortTransactionIfNotPrepared(opCtx);
@@ -138,7 +138,7 @@ void killSessionsLocalShutdownAllTransactions(OperationContext* opCtx) {
killSessionsAction(opCtx,
matcherAllSessions,
[](const ObservableSession& session) {
- return TransactionParticipant::get(session).inMultiDocumentTransaction();
+ return TransactionParticipant::get(session).transactionIsOpen();
},
[](OperationContext* opCtx, const SessionToKill& session) {
TransactionParticipant::get(session).shutdown(opCtx);
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 6dd6e52f66e..2fe31d10d98 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -462,8 +462,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
auto txnParticipant = TransactionParticipant::get(opCtx);
- const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() &&
- txnParticipant.inMultiDocumentTransaction();
+ const bool inMultiDocumentTransaction =
+ txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
Date_t lastWriteDate;
@@ -472,11 +472,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
if (inMultiDocumentTransaction) {
// Do not add writes to the profile collection to the list of transaction operations, since
- // these are done outside the transaction.
+ // these are done outside the transaction. There is no top-level WriteUnitOfWork when we are
+ // in a SideTransactionBlock.
if (!opCtx->getWriteUnitOfWork()) {
invariant(nss.isSystemDotProfile());
return;
}
+
for (auto iter = first; iter != last; iter++) {
auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc);
txnParticipant.addTransactionOperation(opCtx, operation);
@@ -545,8 +547,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
auto txnParticipant = TransactionParticipant::get(opCtx);
- const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() &&
- txnParticipant.inMultiDocumentTransaction();
+ const bool inMultiDocumentTransaction =
+ txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
@@ -606,8 +608,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
invariant(!documentKey.isEmpty());
auto txnParticipant = TransactionParticipant::get(opCtx);
- const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() &&
- txnParticipant.inMultiDocumentTransaction();
+ const bool inMultiDocumentTransaction =
+ txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
@@ -1216,7 +1218,6 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
// There should not be a parent WUOW outside of this one. This guarantees the safety of the
// write conflict retry loop.
- invariant(!opCtx->getWriteUnitOfWork());
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
// We must not have a maximum lock timeout, since writing the commit or abort oplog entry for a
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 92f55a4ba40..8184a4c9c03 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -560,6 +560,7 @@ public:
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
opCtx()->setTxnNumber(txnNum());
+ opCtx()->setInMultiDocumentTransaction();
_sessionCheckout = std::make_unique<MongoDOperationContextSession>(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 5723bc1dac7..26dc7a7723c 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -358,6 +358,22 @@ public:
bool isIgnoringInterrupts() const;
+ /**
+ * Returns whether this operation is part of a multi-document transaction. Specifically, it
+ * indicates whether the user asked for a multi-document transaction.
+ */
+ bool inMultiDocumentTransaction() const {
+ return _inMultiDocumentTransaction;
+ }
+
+ /**
+ * Sets that this operation is part of a multi-document transaction. Once this is set, it cannot
+ * be unset.
+ */
+ void setInMultiDocumentTransaction() {
+ _inMultiDocumentTransaction = true;
+ }
+
private:
IgnoreInterruptsState pushIgnoreInterrupts() override {
IgnoreInterruptsState iis{_ignoreInterrupts,
@@ -482,6 +498,7 @@ private:
bool _writesAreReplicated = true;
bool _shouldParticipateInFlowControl = true;
+ bool _inMultiDocumentTransaction = false;
};
namespace repl {
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index dedb2683898..2695ef439ce 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -201,8 +201,7 @@ void assertCanWrite_inlock(OperationContext* opCtx, const NamespaceString& ns) {
}
void makeCollection(OperationContext* opCtx, const NamespaceString& ns) {
- auto txnParticipant = TransactionParticipant::get(opCtx);
- auto inTransaction = txnParticipant && txnParticipant.inMultiDocumentTransaction();
+ auto inTransaction = opCtx->inMultiDocumentTransaction();
uassert(ErrorCodes::OperationNotSupportedInTransaction,
str::stream() << "Cannot create namespace " << ns.ns()
<< " in multi-document transaction.",
@@ -246,7 +245,7 @@ bool handleError(OperationContext* opCtx,
}
auto txnParticipant = TransactionParticipant::get(opCtx);
- if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) {
+ if (txnParticipant && opCtx->inMultiDocumentTransaction()) {
if (isTransientTransactionError(
ex.code(), false /* hasWriteConcernError */, false /* isCommitTransaction */)) {
// Tell the client to try the whole txn again, by returning ok: 0 with errorLabels.
@@ -304,8 +303,7 @@ void insertDocuments(OperationContext* opCtx,
auto batchSize = std::distance(begin, end);
if (supportsDocLocking()) {
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- auto txnParticipant = TransactionParticipant::get(opCtx);
- auto inTransaction = txnParticipant && txnParticipant.inMultiDocumentTransaction();
+ auto inTransaction = opCtx->inMultiDocumentTransaction();
if (!inTransaction && !replCoord->isOplogDisabledFor(opCtx, collection->ns())) {
// Populate 'slots' with new optimes for each insert.
@@ -335,8 +333,7 @@ void insertDocuments(OperationContext* opCtx,
* collection lock, which we cannot hold in transactions.
*/
Status checkIfTransactionOnCappedColl(OperationContext* opCtx, Collection* collection) {
- auto txnParticipant = TransactionParticipant::get(opCtx);
- if (txnParticipant && txnParticipant.inMultiDocumentTransaction() && collection->isCapped()) {
+ if (opCtx->inMultiDocumentTransaction() && collection->isCapped()) {
return {ErrorCodes::OperationNotSupportedInTransaction,
str::stream() << "Collection '" << collection->ns()
<< "' is a capped collection. Writes in transactions are not allowed "
@@ -400,7 +397,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
try {
acquireCollection();
auto txnParticipant = TransactionParticipant::get(opCtx);
- auto inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction();
+ auto inTxn = txnParticipant && opCtx->inMultiDocumentTransaction();
if (!collection->getCollection()->isCapped() && !inTxn && batch.size() > 1) {
// First try doing it all together. If all goes well, this is all we need to do.
// See Collection::_insertDocuments for why we do all capped inserts one-at-a-time.
@@ -491,7 +488,7 @@ WriteResult performInserts(OperationContext* opCtx,
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
invariant(!opCtx->lockState()->inAWriteUnitOfWork() ||
- (txnParticipant && txnParticipant.inMultiDocumentTransaction()));
+ (txnParticipant && opCtx->inMultiDocumentTransaction()));
auto& curOp = *CurOp::get(opCtx);
ON_BLOCK_EXIT([&] {
// This is the only part of finishCurOp we need to do for inserts because they reuse the
@@ -544,7 +541,7 @@ WriteResult performInserts(OperationContext* opCtx,
} else {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
- if (!txnParticipant.inMultiDocumentTransaction() &&
+ if (!opCtx->inMultiDocumentTransaction() &&
txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
@@ -697,11 +694,9 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext*
curOp.ensureStarted();
}
- auto txnParticipant = TransactionParticipant::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"Cannot use (or request) retryable writes with multi=true",
- (txnParticipant && txnParticipant.inMultiDocumentTransaction()) ||
- !opCtx->getTxnNumber() || !op.getMulti());
+ opCtx->inMultiDocumentTransaction() || !opCtx->getTxnNumber() || !op.getMulti());
UpdateRequest request(ns);
request.setQuery(op.getQ());
@@ -715,11 +710,8 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext*
request.setUpsert(op.getUpsert());
request.setHint(op.getHint());
- auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- request.setYieldPolicy(readConcernArgs.getLevel() ==
- repl::ReadConcernLevel::kSnapshotReadConcern
- ? PlanExecutor::INTERRUPT_ONLY
- : PlanExecutor::YIELD_AUTO);
+ request.setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY
+ : PlanExecutor::YIELD_AUTO);
size_t numAttempts = 0;
while (true) {
@@ -758,7 +750,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
invariant(!opCtx->lockState()->inAWriteUnitOfWork() ||
- (txnParticipant && txnParticipant.inMultiDocumentTransaction()));
+ (txnParticipant && opCtx->inMultiDocumentTransaction()));
uassertStatusOK(userAllowedWriteNS(wholeOp.getNamespace()));
DisableDocumentValidationIfTrue docValidationDisabler(
@@ -780,7 +772,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
for (auto&& singleOp : wholeOp.getUpdates()) {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
- if (!txnParticipant.inMultiDocumentTransaction()) {
+ if (!opCtx->inMultiDocumentTransaction()) {
if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
@@ -820,11 +812,9 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
const NamespaceString& ns,
StmtId stmtId,
const write_ops::DeleteOpEntry& op) {
- auto txnParticipant = TransactionParticipant::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"Cannot use (or request) retryable writes with limit=0",
- (txnParticipant && txnParticipant.inMultiDocumentTransaction()) ||
- !opCtx->getTxnNumber() || !op.getMulti());
+ opCtx->inMultiDocumentTransaction() || !opCtx->getTxnNumber() || !op.getMulti());
globalOpCounters.gotDelete();
ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForDelete(opCtx->getWriteConcern());
@@ -842,11 +832,8 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
request.setQuery(op.getQ());
request.setCollation(write_ops::collationOf(op));
request.setMulti(op.getMulti());
- auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- request.setYieldPolicy(readConcernArgs.getLevel() ==
- repl::ReadConcernLevel::kSnapshotReadConcern
- ? PlanExecutor::INTERRUPT_ONLY
- : PlanExecutor::YIELD_AUTO);
+ request.setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY
+ : PlanExecutor::YIELD_AUTO);
request.setStmtId(stmtId);
ParsedDelete parsedDelete(opCtx, &request);
@@ -915,7 +902,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
invariant(!opCtx->lockState()->inAWriteUnitOfWork() ||
- (txnParticipant && txnParticipant.inMultiDocumentTransaction()));
+ (txnParticipant && opCtx->inMultiDocumentTransaction()));
uassertStatusOK(userAllowedWriteNS(wholeOp.getNamespace()));
DisableDocumentValidationIfTrue docValidationDisabler(
@@ -932,7 +919,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
for (auto&& singleOp : wholeOp.getDeletes()) {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
- if (!txnParticipant.inMultiDocumentTransaction() &&
+ if (!opCtx->inMultiDocumentTransaction() &&
txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index 74508c4a10e..18a857d305b 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -49,7 +49,6 @@
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/transaction_participant.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
@@ -167,8 +166,7 @@ unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSou
// a transaction, the foreign collection is unsharded. Otherwise, we may access the catalog
// cache, and attempt to do a network request while holding locks.
// TODO: SERVER-39162 allow $lookup in sharded transactions.
- auto txnParticipant = TransactionParticipant::get(expCtx->opCtx);
- const bool inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction();
+ const bool inTxn = expCtx->opCtx->inMultiDocumentTransaction();
const bool isSharded = [&]() {
if (inTxn || !ShardingState::get(expCtx->opCtx)->enabled()) {
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 6b3273792c0..ba1ad4a1e04 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -764,10 +764,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
unique_ptr<CanonicalQuery> canonicalQuery,
bool permitYield,
size_t plannerOptions) {
- const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- auto yieldPolicy =
- (permitYield &&
- (readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern))
+ auto yieldPolicy = (permitYield && !opCtx->inMultiDocumentTransaction())
? PlanExecutor::YIELD_AUTO
: PlanExecutor::INTERRUPT_ONLY;
return _getExecutorFind(
@@ -1259,11 +1256,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCount(
}
unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
- const auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- const auto yieldPolicy =
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
- ? PlanExecutor::INTERRUPT_ONLY
- : PlanExecutor::YIELD_AUTO;
+ const auto yieldPolicy = opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY
+ : PlanExecutor::YIELD_AUTO;
const auto skip = request.getSkip().value_or(0);
const auto limit = request.getLimit().value_or(0);
@@ -1716,11 +1710,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDistinct(
Collection* collection,
size_t plannerOptions,
ParsedDistinct* parsedDistinct) {
- const auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- const auto yieldPolicy =
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
- ? PlanExecutor::INTERRUPT_ONLY
- : PlanExecutor::YIELD_AUTO;
+ const auto yieldPolicy = opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY
+ : PlanExecutor::YIELD_AUTO;
if (!collection) {
// Treat collections that do not exist as empty collections.
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index 6f4117e31cb..087a20cf3af 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -122,8 +122,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor(
/**
* Get a plan executor for a .find() operation. The executor will have a 'YIELD_AUTO' yield policy
- * unless a false value for 'permitYield' or a snapshot read concern (according to the
- * OperationContext) forces it to have a 'NO_INTERRUPT' yield policy.
+ * unless a false value for 'permitYield' or being part of a multi-document transaction forces it to
+ * have a 'NO_INTERRUPT' yield policy.
*
* If the query is valid and an executor could be created, returns a StatusWith with the
* PlanExecutor.
diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp
index ea270fef283..b5ce3b2fdb9 100644
--- a/src/mongo/db/read_concern_mongod.cpp
+++ b/src/mongo/db/read_concern_mongod.cpp
@@ -237,10 +237,9 @@ MONGO_REGISTER_SHIM(waitForReadConcern)
->Status {
// If we are in a direct client within a transaction, then we may be holding locks, so it is
// illegal to wait for read concern. This is fine, since the outer operation should have handled
- // waiting for read concern. We don't want to ignore prepare conflicts because snapshot reads
- // should block on prepared transactions.
- if (opCtx->getClient()->isInDirectClient() &&
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
+ // waiting for read concern. We don't want to ignore prepare conflicts because reads in
+ // transactions should block on prepared transactions.
+ if (opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction()) {
return Status::OK();
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 49aa46c6ef0..0d6e43a2cab 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1542,8 +1542,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
// [2] This upsert behavior exists to support idempotency guarantees outside
// steady-state replication and existing users of applyOps.
- const auto txnParticipant = TransactionParticipant::get(opCtx);
- const bool inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction();
+ const bool inTxn = opCtx->inMultiDocumentTransaction();
bool needToDoUpsert = haveWrappingWriteUnitOfWork && !inTxn;
Timestamp timestamp;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 5f2165e72eb..7f433b3e837 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -77,7 +77,6 @@
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
-#include "mongo/db/transaction_participant.h"
#include "mongo/db/write_concern.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/connection_pool_stats.h"
@@ -1528,8 +1527,7 @@ Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext
Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
OperationContext* opCtx, const ReadConcernArgs& readConcern) {
const bool isMajorityCommittedRead =
- readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern ||
- readConcern.getLevel() == ReadConcernLevel::kSnapshotReadConcern;
+ readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern;
const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime());
return _waitUntilOpTime(opCtx, isMajorityCommittedRead, targetOpTime);
@@ -2290,8 +2288,7 @@ Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext
return Status::OK();
}
- auto txnParticipant = TransactionParticipant::get(opCtx);
- if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) {
+ if (opCtx->inMultiDocumentTransaction()) {
if (!_readWriteAbility->canAcceptNonLocalWrites_UNSAFE()) {
return Status(ErrorCodes::NotMaster,
"Multi-document transactions are only allowed on replica set primaries.");
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index 3994ad86972..8f40abd535b 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -176,6 +176,7 @@ Status applyCommitTransaction(OperationContext* opCtx,
invariant(entry.getTxnNumber());
opCtx->setLogicalSessionId(*entry.getSessionId());
opCtx->setTxnNumber(*entry.getTxnNumber());
+ opCtx->setInMultiDocumentTransaction();
// The write on transaction table may be applied concurrently, so refreshing state
// from disk may read that write, causing starting a new transaction on an existing
@@ -212,6 +213,8 @@ Status applyAbortTransaction(OperationContext* opCtx,
invariant(entry.getTxnNumber());
opCtx->setLogicalSessionId(*entry.getSessionId());
opCtx->setTxnNumber(*entry.getTxnNumber());
+ opCtx->setInMultiDocumentTransaction();
+
// The write on transaction table may be applied concurrently, so refreshing state
// from disk may read that write, causing starting a new transaction on an existing
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
@@ -331,6 +334,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
invariant(entry.getTxnNumber());
opCtx->setLogicalSessionId(*entry.getSessionId());
opCtx->setTxnNumber(*entry.getTxnNumber());
+ opCtx->setInMultiDocumentTransaction();
+
// The write on transaction table may be applied concurrently, so refreshing state
// from disk may read that write, causing starting a new transaction on an existing
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index 33cb39e854d..342ead52a0a 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -111,7 +111,7 @@ public:
uassert(ErrorCodes::NoSuchTransaction,
"Transaction isn't in progress",
- txnParticipant.inMultiDocumentTransaction());
+ txnParticipant.transactionIsOpen());
if (txnParticipant.transactionIsPrepared()) {
auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient());
@@ -326,7 +326,7 @@ public:
false /* autocommit */,
boost::none /* startTransaction */);
- invariant(!txnParticipant.inMultiDocumentTransaction(),
+ invariant(!txnParticipant.transactionIsOpen(),
"The participant should not be in progress after we waited for the "
"participant to complete");
uassert(ErrorCodes::NoSuchTransaction,
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index a17b1cabe71..be12c34a4f4 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -395,7 +395,7 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
if (sessionOptions.getCoordinator() == boost::optional<bool>(true)) {
createTransactionCoordinator(opCtx, *sessionOptions.getTxnNumber());
}
- } else if (txnParticipant.inMultiDocumentTransaction()) {
+ } else if (txnParticipant.transactionIsOpen()) {
const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"Only the first command in a transaction may specify a readConcern",
@@ -780,9 +780,10 @@ void execCommandDatabase(OperationContext* opCtx,
}
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- // If the parent operation runs in snapshot isolation, we don't override the read concern.
- auto skipReadConcern = opCtx->getClient()->isInDirectClient() &&
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern;
+
+ // If the parent operation runs in a transaction, we don't override the read concern.
+ auto skipReadConcern =
+ opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction();
if (!skipReadConcern) {
// If "startTransaction" is present, it must be true due to the parsing above.
const bool upconvertToSnapshot(sessionOptions.getStartTransaction());
@@ -1259,10 +1260,9 @@ DbResponse ServiceEntryPointCommon::handleRequest(OperationContext* opCtx,
Client& c = *opCtx->getClient();
if (c.isInDirectClient()) {
- if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber() ||
- repl::ReadConcernArgs::get(opCtx).getLevel() !=
- repl::ReadConcernLevel::kSnapshotReadConcern) {
- invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+ if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) {
+ invariant(!opCtx->inMultiDocumentTransaction() &&
+ !opCtx->lockState()->inAWriteUnitOfWork());
}
} else {
LastError::get(c).startRequest();
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index 2f6145f0287..8a7a1164d33 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -203,6 +203,7 @@ void abortInProgressTransactions(OperationContext* opCtx) {
IDLParserErrorContext("abort-in-progress-transactions"), cursor->next());
opCtx->setLogicalSessionId(txnRecord.getSessionId());
opCtx->setTxnNumber(txnRecord.getTxnNum());
+ opCtx->setInMultiDocumentTransaction();
MongoDOperationContextSessionWithoutRefresh ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
LOG(3) << "Aborting transaction sessionId: " << txnRecord.getSessionId().toBSON()
@@ -229,7 +230,7 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
catalog->scanSessions(matcher, [&](const ObservableSession& session) {
const auto txnParticipant = TransactionParticipant::get(session);
- if (!txnParticipant.inMultiDocumentTransaction()) {
+ if (!txnParticipant.transactionIsOpen()) {
sessionKillTokens.emplace_back(session.kill());
}
@@ -351,7 +352,7 @@ int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx,
for (const auto& lsid : expiredSessionIds) {
catalog->scanSession(lsid, [](ObservableSession& session) {
const auto participant = TransactionParticipant::get(session);
- if (!participant.inMultiDocumentTransaction()) {
+ if (!participant.transactionIsOpen()) {
session.markForReap();
}
});
@@ -437,8 +438,7 @@ MongoDOperationContextSessionWithoutRefresh::~MongoDOperationContextSessionWitho
const auto txnParticipant = TransactionParticipant::get(_opCtx);
// A session on secondaries should never be checked back in with a TransactionParticipant that
// isn't prepared, aborted, or committed.
- invariant(!txnParticipant.inMultiDocumentTransaction() ||
- txnParticipant.transactionIsPrepared());
+ invariant(!txnParticipant.transactionIsInProgress());
}
} // namespace mongo
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 2ca4657a75e..85e09b8dfc9 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -517,6 +517,7 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt
// autocommit be given as an argument on the request, and currently it can only be false, which
// is verified earlier when parsing the request.
invariant(*autocommit == false);
+ invariant(opCtx->inMultiDocumentTransaction());
if (!startTransaction) {
_continueMultiDocumentTransaction(opCtx, txnNumber);
@@ -558,6 +559,7 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt
void TransactionParticipant::Participant::beginOrContinueTransactionUnconditionally(
OperationContext* opCtx, TxnNumber txnNumber) {
+ invariant(opCtx->inMultiDocumentTransaction());
// We don't check or fetch any on-disk state, so treat the transaction as 'valid' for the
// purposes of this method and continue the transaction unconditionally
@@ -782,6 +784,8 @@ void TransactionParticipant::TxnResources::release(OperationContext* opCtx) {
TransactionParticipant::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx)
: _opCtx(opCtx) {
+ // Do nothing if we are already in a SideTransactionBlock. We can tell we are already in a
+ // SideTransactionBlock because there is no top level write unit of work.
if (!_opCtx->getWriteUnitOfWork()) {
return;
}
@@ -847,7 +851,7 @@ void TransactionParticipant::Participant::stashTransactionResources(OperationCon
}
invariant(opCtx->getTxnNumber());
- if (o().txnState.inMultiDocumentTransaction()) {
+ if (o().txnState.isOpen()) {
_stashActiveTransaction(opCtx);
}
}
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index cd210a47f03..1b5b41d0d75 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -114,7 +114,7 @@ class TransactionParticipant {
StateFlag newState,
TransitionValidation shouldValidate = TransitionValidation::kValidateTransition);
- bool inMultiDocumentTransaction() const {
+ bool isOpen() const {
return _state == kInProgress || _state == kPrepared;
}
@@ -288,13 +288,13 @@ public:
bool expiredAsOf(Date_t when) const;
/**
- * Returns whether we are in a multi-document transaction, which means we have an active
- * transaction which has autocommit:false and has not been committed or aborted. It is
- * possible that the current transaction is stashed onto the stack via a
+ * Returns whether we are in an open multi-document transaction, which means we have an
+ * active transaction which has autocommit:false and has not been committed or aborted. It
+ * is possible that the current transaction is stashed onto the stack via a
* `SideTransactionBlock`.
*/
- bool inMultiDocumentTransaction() const {
- return o().txnState.inMultiDocumentTransaction();
+ bool transactionIsOpen() const {
+ return o().txnState.isOpen();
};
bool transactionIsCommitted() const {
@@ -309,6 +309,10 @@ public:
return o().txnState.isPrepared();
}
+ bool transactionIsInProgress() const {
+ return o().txnState.isInProgress();
+ }
+
/**
* If this session is holding stashed locks in txnResourceStash, reports the current state
* of the session using the provided builder.
@@ -549,16 +553,16 @@ public:
* multi-key path info to the set of path infos to be updated at commit time.
*/
void addUncommittedMultikeyPathInfo(MultikeyPathInfo info) {
- invariant(inMultiDocumentTransaction());
+ invariant(transactionIsOpen());
p().multikeyPathInfo.emplace_back(std::move(info));
}
/**
- * May only be called while a mutil-document transaction is not committed and returns the
+ * May only be called while a multi-document transaction is not committed and returns the
* path infos which have been added so far.
*/
const std::vector<MultikeyPathInfo>& getUncommittedMultikeyPathInfos() const {
- invariant(inMultiDocumentTransaction());
+ invariant(transactionIsOpen());
return p().multikeyPathInfo;
}
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 155935ddd47..27c2587eeb9 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -699,6 +699,7 @@ TEST_F(ShardTxnParticipantRetryableWritesTest,
const auto& sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 20;
+ opCtx()->setTxnNumber(txnNum);
const auto uuid = UUID::gen();
txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
@@ -719,6 +720,7 @@ TEST_F(ShardTxnParticipantRetryableWritesTest,
auto autocommit = false;
auto startTransaction = true;
+ opCtx()->setInMultiDocumentTransaction();
ASSERT_THROWS_CODE(
txnParticipant.beginOrContinue(opCtx(), txnNum, autocommit, startTransaction),
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 025092ccde7..fb109b38bbc 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -263,6 +263,7 @@ protected:
opCtx()->setLogicalSessionId(_sessionId);
opCtx()->setTxnNumber(_txnNumber);
+ opCtx()->setInMultiDocumentTransaction();
// Normally, committing a transaction is supposed to usassert if the corresponding prepare
// has not been majority committed. We excempt our unit tests from this expectation.
@@ -299,6 +300,7 @@ protected:
std::unique_ptr<MongoDOperationContextSession> checkOutSession(
boost::optional<bool> startNewTxn = true) {
opCtx()->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
+ opCtx()->setInMultiDocumentTransaction();
auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, startNewTxn);
@@ -345,6 +347,7 @@ TEST_F(TxnParticipantTest, TransactionThrowsLockTimeoutIfLockIsUnavailable) {
auto newOpCtx = newClient->makeOperationContext();
newOpCtx.get()->setLogicalSessionId(newSessionId);
newOpCtx.get()->setTxnNumber(newTxnNum);
+ newOpCtx.get()->setInMultiDocumentTransaction();
MongoDOperationContextSession newOpCtxSession(newOpCtx.get());
auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get());
@@ -415,7 +418,7 @@ TEST_F(TxnParticipantTest, CannotSpecifyStartTransactionOnInProgressTxn) {
// Must specify startTransaction=true and autocommit=false to start a transaction.
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT_TRUE(txnParticipant.inMultiDocumentTransaction());
+ ASSERT_TRUE(txnParticipant.transactionIsOpen());
// Cannot try to start a transaction that already started.
ASSERT_THROWS_CODE(
@@ -779,6 +782,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) {
auto commitPreparedFunc = [&](OperationContext* opCtx) {
opCtx->setLogicalSessionId(_sessionId);
opCtx->setTxnNumber(_txnNumber);
+ opCtx->setInMultiDocumentTransaction();
// Check out the session and continue the transaction.
auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
@@ -819,6 +823,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) {
auto commitPreparedFunc = [&](OperationContext* opCtx) {
opCtx->setLogicalSessionId(_sessionId);
opCtx->setTxnNumber(_txnNumber);
+ opCtx->setInMultiDocumentTransaction();
// Check out the session and continue the transaction.
auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
@@ -1196,7 +1201,7 @@ TEST_F(TxnParticipantTest, CannotContinueTransactionIfNotPrimary) {
// Will start the transaction.
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT_TRUE(txnParticipant.inMultiDocumentTransaction());
+ ASSERT_TRUE(txnParticipant.transactionIsOpen());
ASSERT_OK(repl::ReplicationCoordinator::get(opCtx())->setFollowerMode(
repl::MemberState::RS_SECONDARY));
@@ -1239,6 +1244,7 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr
txnNumberToStart = *opCtx()->getTxnNumber() + 1](OperationContext* newOpCtx) {
newOpCtx->setLogicalSessionId(lsid);
newOpCtx->setTxnNumber(txnNumberToStart);
+ newOpCtx->setInMultiDocumentTransaction();
MongoDOperationContextSession ocs(newOpCtx);
auto txnParticipant = TransactionParticipant::get(newOpCtx);
@@ -1399,7 +1405,7 @@ protected:
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT(txnParticipant.inMultiDocumentTransaction());
+ ASSERT(txnParticipant.transactionIsOpen());
ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(
opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction),
@@ -1413,14 +1419,14 @@ protected:
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT(txnParticipant.inMultiDocumentTransaction());
+ ASSERT(txnParticipant.transactionIsOpen());
txnParticipant.abortActiveTransaction(opCtx());
ASSERT(txnParticipant.transactionIsAborted());
txnParticipant.beginOrContinue(
opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction);
- ASSERT(txnParticipant.inMultiDocumentTransaction());
+ ASSERT(txnParticipant.transactionIsOpen());
}
void cannotSpecifyStartTransactionOnCommittedTxn() {
@@ -1429,7 +1435,7 @@ protected:
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT(txnParticipant.inMultiDocumentTransaction());
+ ASSERT(txnParticipant.transactionIsOpen());
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
txnParticipant.commitUnpreparedTransaction(opCtx());
@@ -1446,7 +1452,7 @@ protected:
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT(txnParticipant.inMultiDocumentTransaction());
+ ASSERT(txnParticipant.transactionIsOpen());
txnParticipant.unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0));
@@ -1465,7 +1471,7 @@ protected:
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT(txnParticipant.inMultiDocumentTransaction());
+ ASSERT(txnParticipant.transactionIsOpen());
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
txnParticipant.prepareTransaction(opCtx(), {});
@@ -1486,7 +1492,7 @@ protected:
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, boost::none);
- ASSERT_FALSE(txnParticipant.inMultiDocumentTransaction());
+ ASSERT_FALSE(txnParticipant.transactionIsOpen());
auto autocommit = false;
auto startTransaction = true;
@@ -1494,7 +1500,7 @@ protected:
txnParticipant.beginOrContinue(
opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction);
- ASSERT(txnParticipant.inMultiDocumentTransaction());
+ ASSERT(txnParticipant.transactionIsOpen());
}
};
@@ -3805,7 +3811,7 @@ TEST_F(TxnParticipantTest, AbortTransactionOnSessionCheckoutWithoutRefresh) {
MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT(txnParticipant.inMultiDocumentTransaction());
+ ASSERT(txnParticipant.transactionIsOpen());
ASSERT_EQ(txnParticipant.getActiveTxnNumber(), txnNumber);
txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 2ec0bc34e5c..1672d0ac2fe 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -1518,6 +1518,7 @@ public:
const auto sessionId = makeLogicalSessionIdForTest();
_opCtx->setLogicalSessionId(sessionId);
_opCtx->setTxnNumber(1);
+ _opCtx->setInMultiDocumentTransaction();
// Check out the session.
MongoDOperationContextSession ocs(_opCtx);
@@ -2683,6 +2684,7 @@ public:
const auto sessionId = makeLogicalSessionIdForTest();
_opCtx->setLogicalSessionId(sessionId);
_opCtx->setTxnNumber(26);
+ _opCtx->setInMultiDocumentTransaction();
ocs.emplace(_opCtx);