diff options
31 files changed, 174 insertions, 151 deletions
diff --git a/jstests/replsets/recover_prepared_txn_with_multikey_write.js b/jstests/replsets/recover_prepared_txn_with_multikey_write.js index b898fce07ba..56b30995b0f 100644 --- a/jstests/replsets/recover_prepared_txn_with_multikey_write.js +++ b/jstests/replsets/recover_prepared_txn_with_multikey_write.js @@ -2,9 +2,12 @@ * Test that replication recovery can reconstruct a prepared transaction that includes a write that * sets the multikey flag. * - * @tags: [uses_transactions, uses_prepare_transaction] + * @tags: [uses_transactions, uses_prepare_transaction, requires_persistence] */ (function() { +"use strict"; +load("jstests/core/txns/libs/prepare_helpers.js"); + const rst = new ReplSetTest({ nodes: [ {}, @@ -21,7 +24,7 @@ const rst = new ReplSetTest({ rst.startSet(); rst.initiate(); -const primary = rst.getPrimary(); +let primary = rst.getPrimary(); const session = primary.getDB("test").getMongo().startSession(); const sessionDB = session.getDatabase("test"); @@ -32,12 +35,30 @@ sessionColl.createIndex({x: 1}); session.startTransaction(); // Make the index multikey. +jsTestLog("Making the index multikey."); sessionColl.insert({x: [1, 2, 3]}); -assert.commandWorked(sessionDB.adminCommand({prepareTransaction: 1})); +// Make sure { w: "majority" } is always used, otherwise the prepare may not get journaled before +// the shutdown below. +PrepareHelpers.prepareTransaction(session); // Do an unclean shutdown so we don't force a checkpoint, and then restart. +jsTestLog("Killing the primary."); rst.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}); rst.restart(0); +jsTestLog("Waiting for the node to get elected again."); +primary = rst.getPrimary(); + +jsTestLog("Making sure no prepare conflicts are generated on the catalog."); +assert.commandWorked(primary.adminCommand({listDatabases: 1})); + +jsTestLog("Aborting the prepared transaction."); +assert.commandWorked(primary.adminCommand({ + abortTransaction: 1, + lsid: session.getSessionId(), + txnNumber: session.getTxnNumber_forTesting(), + autocommit: false +})); + rst.stopSet(); }()); diff --git a/src/mongo/db/catalog/index_catalog_entry_impl.cpp b/src/mongo/db/catalog/index_catalog_entry_impl.cpp index aa711498a71..234240b0ecb 100644 --- a/src/mongo/db/catalog/index_catalog_entry_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_entry_impl.cpp @@ -121,12 +121,11 @@ void IndexCatalogEntryImpl::init(std::unique_ptr<IndexAccessMethod> accessMethod } bool IndexCatalogEntryImpl::isReady(OperationContext* opCtx) const { - auto txnParticipant = TransactionParticipant::get(opCtx); // For multi-document transactions, we can open a snapshot prior to checking the // minimumSnapshotVersion on a collection. This means we are unprotected from reading // out-of-sync index catalog entries. To fix this, we uassert if we detect that the // in-memory catalog is out-of-sync with the on-disk catalog. - if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) { + if (opCtx->inMultiDocumentTransaction()) { if (!_catalogIsPresent(opCtx) || _catalogIsReady(opCtx) != _isReady) { uasserted(ErrorCodes::SnapshotUnavailable, str::stream() << "Unable to read from a snapshot due to pending collection" @@ -152,10 +151,12 @@ bool IndexCatalogEntryImpl::isMultikey(OperationContext* opCtx) const { // and the read-path will query this state before determining there is no interesting multikey // state. Note, it's always legal, though potentially wasteful, to return `true`. auto txnParticipant = TransactionParticipant::get(opCtx); - if (!txnParticipant || !txnParticipant.inMultiDocumentTransaction()) { + if (!txnParticipant || !txnParticipant.transactionIsOpen()) { return false; } + invariant(txnParticipant); + for (const MultikeyPathInfo& path : txnParticipant.getUncommittedMultikeyPathInfos()) { if (path.nss == NamespaceString(_ns) && path.indexName == _descriptor->indexName()) { return true; @@ -169,10 +170,12 @@ MultikeyPaths IndexCatalogEntryImpl::getMultikeyPaths(OperationContext* opCtx) c stdx::lock_guard<stdx::mutex> lk(_indexMultikeyPathsMutex); auto txnParticipant = TransactionParticipant::get(opCtx); - if (!txnParticipant || !txnParticipant.inMultiDocumentTransaction()) { + if (!txnParticipant || !txnParticipant.transactionIsOpen()) { return _indexMultikeyPaths; } + invariant(txnParticipant); + MultikeyPaths ret = _indexMultikeyPaths; for (const MultikeyPathInfo& path : txnParticipant.getUncommittedMultikeyPathInfos()) { if (path.nss == NamespaceString(_ns) && path.indexName == _descriptor->indexName()) { @@ -292,7 +295,7 @@ void IndexCatalogEntryImpl::setMultikey(OperationContext* opCtx, // multikey flag write and the parent transaction. We can do this write separately and commit it // before the parent transaction commits. auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) { + if (opCtx->inMultiDocumentTransaction()) { TransactionParticipant::SideTransactionBlock sideTxn(opCtx); writeConflictRetry(opCtx, "set index multikey", _ns.ns(), [&] { WriteUnitOfWork wuow(opCtx); @@ -340,7 +343,8 @@ void IndexCatalogEntryImpl::setMultikey(OperationContext* opCtx, // multikey flag until after the transaction commits, we track extra information here to let // subsequent readers within the same transaction know if this index was set as multikey by a // previous write in the transaction. - if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) { + if (opCtx->inMultiDocumentTransaction()) { + invariant(txnParticipant); txnParticipant.addUncommittedMultikeyPathInfo( MultikeyPathInfo{_ns, _descriptor->indexName(), std::move(paths)}); } diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp index fee42dfd059..3ac870ff8b5 100644 --- a/src/mongo/db/catalog_raii.cpp +++ b/src/mongo/db/catalog_raii.cpp @@ -110,11 +110,12 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx, << nsOrUUID.toString()); if (_coll) { - // Unlike read concern majority, read concern snapshot cannot yield and wait when there are - // pending catalog changes. Instead, we must return an error in such situations. We ignore - // this restriction for the oplog, since it never has pending catalog changes. - auto readConcernLevel = repl::ReadConcernArgs::get(opCtx).getLevel(); - if (readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern && + // If we are in a transaction and have a read timestamp, we cannot yield and wait when there + // are pending catalog changes. Instead, we must return an error in such situations. We + // ignore this restriction for the oplog, since it never has pending catalog changes. + if (opCtx->inMultiDocumentTransaction() && + opCtx->recoveryUnit()->getTimestampReadSource() != + RecoveryUnit::ReadSource::kNoTimestamp && _resolvedNss != NamespaceString::kRsOplogNamespace) { auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); if (mySnapshot) { diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index d520b86450c..507894ffb4a 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -54,7 +54,6 @@ #include "mongo/db/query/query_planner_common.h" #include "mongo/db/query/view_response_formatter.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/transaction_participant.h" #include "mongo/db/views/resolved_view.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -184,12 +183,11 @@ public: // Distinct doesn't filter orphan documents so it is not allowed to run on sharded // collections in multi-document transactions. - auto txnParticipant = TransactionParticipant::get(opCtx); uassert(ErrorCodes::OperationNotSupportedInTransaction, "Cannot run 'distinct' on a sharded collection in a multi-document transaction. " "Please see http://dochub.mongodb.org/core/transaction-distinct for a recommended " "alternative.", - !txnParticipant || !txnParticipant.inMultiDocumentTransaction() || + !opCtx->inMultiDocumentTransaction() || !CollectionShardingState::get(opCtx, nss)->getCurrentMetadata()->isSharded()); const ExtensionsCallbackReal extensionsCallback(opCtx, &nss); diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index ad526ba0d15..8f5ce4827b6 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -127,11 +127,8 @@ void makeUpdateRequest(OperationContext* opCtx, requestOut->setMulti(false); requestOut->setExplain(explain); - const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - requestOut->setYieldPolicy(readConcernArgs.getLevel() == - repl::ReadConcernLevel::kSnapshotReadConcern - ? PlanExecutor::INTERRUPT_ONLY - : PlanExecutor::YIELD_AUTO); + requestOut->setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY + : PlanExecutor::YIELD_AUTO); } void makeDeleteRequest(OperationContext* opCtx, @@ -148,11 +145,8 @@ void makeDeleteRequest(OperationContext* opCtx, requestOut->setReturnDeleted(true); // Always return the old value. requestOut->setExplain(explain); - const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - requestOut->setYieldPolicy(readConcernArgs.getLevel() == - repl::ReadConcernLevel::kSnapshotReadConcern - ? PlanExecutor::INTERRUPT_ONLY - : PlanExecutor::YIELD_AUTO); + requestOut->setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY + : PlanExecutor::YIELD_AUTO); } void appendCommandResponse(const PlanExecutor* exec, @@ -329,8 +323,7 @@ public: maybeDisableValidation.emplace(opCtx); } - const auto txnParticipant = TransactionParticipant::get(opCtx); - const auto inTransaction = txnParticipant && txnParticipant.inMultiDocumentTransaction(); + const auto inTransaction = opCtx->inMultiDocumentTransaction(); uassert(50781, str::stream() << "Cannot write to system collection " << nsString.ns() << " within a transaction.", diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 69a144d89c6..e7abdd8a240 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -271,13 +271,11 @@ public: const auto txnParticipant = TransactionParticipant::get(opCtx); uassert(ErrorCodes::InvalidOptions, "It is illegal to open a tailable cursor in a transaction", - !txnParticipant || - !(txnParticipant.inMultiDocumentTransaction() && qr->isTailable())); + !(opCtx->inMultiDocumentTransaction() && qr->isTailable())); uassert(ErrorCodes::OperationNotSupportedInTransaction, "The 'readOnce' option is not supported within a transaction.", - !txnParticipant || !txnParticipant.inMultiDocumentTransaction() || - !qr->isReadOnce()); + !txnParticipant || !opCtx->inMultiDocumentTransaction() || !qr->isReadOnce()); uassert(ErrorCodes::InvalidOptions, "The '$_internalReadAtClusterTime' option is only supported when testing" @@ -287,7 +285,7 @@ public: uassert( ErrorCodes::OperationNotSupportedInTransaction, "The '$_internalReadAtClusterTime' option is not supported within a transaction.", - !txnParticipant || !txnParticipant.inMultiDocumentTransaction() || + !txnParticipant || !opCtx->inMultiDocumentTransaction() || !qr->getReadAtClusterTime()); uassert(ErrorCodes::InvalidOptions, diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index bfd84ede705..dcf1fbd5448 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -68,7 +68,6 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/db/transaction_participant.h" #include "mongo/db/views/view.h" #include "mongo/db/views/view_catalog.h" #include "mongo/stdx/memory.h" @@ -391,9 +390,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)), uuid); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; - auto txnParticipant = TransactionParticipant::get(opCtx); - expCtx->inMultiDocumentTransaction = - txnParticipant && txnParticipant.inMultiDocumentTransaction(); + expCtx->inMultiDocumentTransaction = opCtx->inMultiDocumentTransaction(); return expCtx; } @@ -528,10 +525,9 @@ Status runAggregate(OperationContext* opCtx, liteParsedPipeline.assertSupportsReadConcern( opCtx, request.getExplain(), serverGlobalParams.enableMajorityReadConcern); } catch (const DBException& ex) { - auto txnParticipant = TransactionParticipant::get(opCtx); // If we are in a multi-document transaction, we intercept the 'readConcern' // assertion in order to provide a more descriptive error message and code. - if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) { + if (opCtx->inMultiDocumentTransaction()) { return {ErrorCodes::OperationNotSupportedInTransaction, ex.toStatus("Operation not permitted in transaction").reason()}; } diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index d18a85a4064..ecb756b67d6 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -114,7 +114,7 @@ public: uassert(ErrorCodes::NoSuchTransaction, "Transaction isn't in progress", - txnParticipant.inMultiDocumentTransaction()); + txnParticipant.transactionIsOpen()); CurOpFailpointHelpers::waitWhileFailPointEnabled( &hangBeforeCommitingTxn, opCtx, "hangBeforeCommitingTxn"); @@ -184,7 +184,7 @@ public: uassert(ErrorCodes::NoSuchTransaction, "Transaction isn't in progress", - txnParticipant.inMultiDocumentTransaction()); + txnParticipant.transactionIsOpen()); CurOpFailpointHelpers::waitWhileFailPointEnabled( &hangBeforeAbortingTxn, opCtx, "hangBeforeAbortingTxn"); diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index c427ff42598..422b19f6be0 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -51,7 +51,6 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/duplicate_key_error_info.h" -#include "mongo/db/transaction_participant.h" #include "mongo/db/write_concern.h" #include "mongo/s/stale_exception.h" @@ -261,8 +260,7 @@ private: } void _transactionChecks(OperationContext* opCtx) const { - auto txnParticipant = TransactionParticipant::get(opCtx); - if (!txnParticipant || !txnParticipant.inMultiDocumentTransaction()) + if (!opCtx->inMultiDocumentTransaction()) return; uassert(50791, str::stream() << "Cannot write to system collection " << ns().toString() diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index c851fcfcc9c..fd04dac348d 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -377,7 +377,7 @@ LockMode getLockModeForQuery(OperationContext* opCtx, const boost::optional<Name invariant(opCtx); // Use IX locks for multi-statement transactions; otherwise, use IS locks. - if (opCtx->getWriteUnitOfWork()) { + if (opCtx->inMultiDocumentTransaction()) { uassert(51071, "Cannot query system.views within a transaction", !nss || !nss->isSystemDotViews()); diff --git a/src/mongo/db/initialize_operation_session_info.cpp b/src/mongo/db/initialize_operation_session_info.cpp index 0257878e145..2059dae0623 100644 --- a/src/mongo/db/initialize_operation_session_info.cpp +++ b/src/mongo/db/initialize_operation_session_info.cpp @@ -130,6 +130,7 @@ OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* uassert(ErrorCodes::InvalidOptions, "Specifying autocommit=true is not allowed.", !osi.getAutocommit().value()); + opCtx->setInMultiDocumentTransaction(); } else { uassert(ErrorCodes::InvalidOptions, "'startTransaction' field requires 'autocommit' field to also be specified", diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index 8c217cb7e25..c44234c7568 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -91,7 +91,7 @@ void killSessionsAbortUnpreparedTransactions(OperationContext* opCtx, matcher, [](const ObservableSession& session) { auto participant = TransactionParticipant::get(session); - return participant.inMultiDocumentTransaction() && !participant.transactionIsPrepared(); + return participant.transactionIsOpen() && !participant.transactionIsPrepared(); }, [](OperationContext* opCtx, const SessionToKill& session) { TransactionParticipant::get(session).abortTransactionIfNotPrepared(opCtx); @@ -138,7 +138,7 @@ void killSessionsLocalShutdownAllTransactions(OperationContext* opCtx) { killSessionsAction(opCtx, matcherAllSessions, [](const ObservableSession& session) { - return TransactionParticipant::get(session).inMultiDocumentTransaction(); + return TransactionParticipant::get(session).transactionIsOpen(); }, [](OperationContext* opCtx, const SessionToKill& session) { TransactionParticipant::get(session).shutdown(opCtx); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 6dd6e52f66e..2fe31d10d98 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -462,8 +462,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { auto txnParticipant = TransactionParticipant::get(opCtx); - const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && - txnParticipant.inMultiDocumentTransaction(); + const bool inMultiDocumentTransaction = + txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); Date_t lastWriteDate; @@ -472,11 +472,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, if (inMultiDocumentTransaction) { // Do not add writes to the profile collection to the list of transaction operations, since - // these are done outside the transaction. + // these are done outside the transaction. There is no top-level WriteUnitOfWork when we are + // in a SideTransactionBlock. if (!opCtx->getWriteUnitOfWork()) { invariant(nss.isSystemDotProfile()); return; } + for (auto iter = first; iter != last; iter++) { auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc); txnParticipant.addTransactionOperation(opCtx, operation); @@ -545,8 +547,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } auto txnParticipant = TransactionParticipant::get(opCtx); - const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && - txnParticipant.inMultiDocumentTransaction(); + const bool inMultiDocumentTransaction = + txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); OpTimeBundle opTime; if (inMultiDocumentTransaction) { @@ -606,8 +608,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, invariant(!documentKey.isEmpty()); auto txnParticipant = TransactionParticipant::get(opCtx); - const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && - txnParticipant.inMultiDocumentTransaction(); + const bool inMultiDocumentTransaction = + txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); OpTimeBundle opTime; if (inMultiDocumentTransaction) { @@ -1216,7 +1218,6 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, // There should not be a parent WUOW outside of this one. This guarantees the safety of the // write conflict retry loop. - invariant(!opCtx->getWriteUnitOfWork()); invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // We must not have a maximum lock timeout, since writing the commit or abort oplog entry for a diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 92f55a4ba40..8184a4c9c03 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -560,6 +560,7 @@ public: opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); opCtx()->setTxnNumber(txnNum()); + opCtx()->setInMultiDocumentTransaction(); _sessionCheckout = std::make_unique<MongoDOperationContextSession>(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 5723bc1dac7..26dc7a7723c 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -358,6 +358,22 @@ public: bool isIgnoringInterrupts() const; + /** + * Returns whether this operation is part of a multi-document transaction. Specifically, it + * indicates whether the user asked for a multi-document transaction. + */ + bool inMultiDocumentTransaction() const { + return _inMultiDocumentTransaction; + } + + /** + * Sets that this operation is part of a multi-document transaction. Once this is set, it cannot + * be unset. + */ + void setInMultiDocumentTransaction() { + _inMultiDocumentTransaction = true; + } + private: IgnoreInterruptsState pushIgnoreInterrupts() override { IgnoreInterruptsState iis{_ignoreInterrupts, @@ -482,6 +498,7 @@ private: bool _writesAreReplicated = true; bool _shouldParticipateInFlowControl = true; + bool _inMultiDocumentTransaction = false; }; namespace repl { diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index dedb2683898..2695ef439ce 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -201,8 +201,7 @@ void assertCanWrite_inlock(OperationContext* opCtx, const NamespaceString& ns) { } void makeCollection(OperationContext* opCtx, const NamespaceString& ns) { - auto txnParticipant = TransactionParticipant::get(opCtx); - auto inTransaction = txnParticipant && txnParticipant.inMultiDocumentTransaction(); + auto inTransaction = opCtx->inMultiDocumentTransaction(); uassert(ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "Cannot create namespace " << ns.ns() << " in multi-document transaction.", @@ -246,7 +245,7 @@ bool handleError(OperationContext* opCtx, } auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) { + if (txnParticipant && opCtx->inMultiDocumentTransaction()) { if (isTransientTransactionError( ex.code(), false /* hasWriteConcernError */, false /* isCommitTransaction */)) { // Tell the client to try the whole txn again, by returning ok: 0 with errorLabels. @@ -304,8 +303,7 @@ void insertDocuments(OperationContext* opCtx, auto batchSize = std::distance(begin, end); if (supportsDocLocking()) { auto replCoord = repl::ReplicationCoordinator::get(opCtx); - auto txnParticipant = TransactionParticipant::get(opCtx); - auto inTransaction = txnParticipant && txnParticipant.inMultiDocumentTransaction(); + auto inTransaction = opCtx->inMultiDocumentTransaction(); if (!inTransaction && !replCoord->isOplogDisabledFor(opCtx, collection->ns())) { // Populate 'slots' with new optimes for each insert. @@ -335,8 +333,7 @@ void insertDocuments(OperationContext* opCtx, * collection lock, which we cannot hold in transactions. */ Status checkIfTransactionOnCappedColl(OperationContext* opCtx, Collection* collection) { - auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant && txnParticipant.inMultiDocumentTransaction() && collection->isCapped()) { + if (opCtx->inMultiDocumentTransaction() && collection->isCapped()) { return {ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "Collection '" << collection->ns() << "' is a capped collection. Writes in transactions are not allowed " @@ -400,7 +397,7 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, try { acquireCollection(); auto txnParticipant = TransactionParticipant::get(opCtx); - auto inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction(); + auto inTxn = txnParticipant && opCtx->inMultiDocumentTransaction(); if (!collection->getCollection()->isCapped() && !inTxn && batch.size() > 1) { // First try doing it all together. If all goes well, this is all we need to do. // See Collection::_insertDocuments for why we do all capped inserts one-at-a-time. @@ -491,7 +488,7 @@ WriteResult performInserts(OperationContext* opCtx, // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); invariant(!opCtx->lockState()->inAWriteUnitOfWork() || - (txnParticipant && txnParticipant.inMultiDocumentTransaction())); + (txnParticipant && opCtx->inMultiDocumentTransaction())); auto& curOp = *CurOp::get(opCtx); ON_BLOCK_EXIT([&] { // This is the only part of finishCurOp we need to do for inserts because they reuse the @@ -544,7 +541,7 @@ WriteResult performInserts(OperationContext* opCtx, } else { const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); if (opCtx->getTxnNumber()) { - if (!txnParticipant.inMultiDocumentTransaction() && + if (!opCtx->inMultiDocumentTransaction() && txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) { containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); @@ -697,11 +694,9 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext* curOp.ensureStarted(); } - auto txnParticipant = TransactionParticipant::get(opCtx); uassert(ErrorCodes::InvalidOptions, "Cannot use (or request) retryable writes with multi=true", - (txnParticipant && txnParticipant.inMultiDocumentTransaction()) || - !opCtx->getTxnNumber() || !op.getMulti()); + opCtx->inMultiDocumentTransaction() || !opCtx->getTxnNumber() || !op.getMulti()); UpdateRequest request(ns); request.setQuery(op.getQ()); @@ -715,11 +710,8 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext* request.setUpsert(op.getUpsert()); request.setHint(op.getHint()); - auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - request.setYieldPolicy(readConcernArgs.getLevel() == - repl::ReadConcernLevel::kSnapshotReadConcern - ? PlanExecutor::INTERRUPT_ONLY - : PlanExecutor::YIELD_AUTO); + request.setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY + : PlanExecutor::YIELD_AUTO); size_t numAttempts = 0; while (true) { @@ -758,7 +750,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); invariant(!opCtx->lockState()->inAWriteUnitOfWork() || - (txnParticipant && txnParticipant.inMultiDocumentTransaction())); + (txnParticipant && opCtx->inMultiDocumentTransaction())); uassertStatusOK(userAllowedWriteNS(wholeOp.getNamespace())); DisableDocumentValidationIfTrue docValidationDisabler( @@ -780,7 +772,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who for (auto&& singleOp : wholeOp.getUpdates()) { const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); if (opCtx->getTxnNumber()) { - if (!txnParticipant.inMultiDocumentTransaction()) { + if (!opCtx->inMultiDocumentTransaction()) { if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) { containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); @@ -820,11 +812,9 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, const NamespaceString& ns, StmtId stmtId, const write_ops::DeleteOpEntry& op) { - auto txnParticipant = TransactionParticipant::get(opCtx); uassert(ErrorCodes::InvalidOptions, "Cannot use (or request) retryable writes with limit=0", - (txnParticipant && txnParticipant.inMultiDocumentTransaction()) || - !opCtx->getTxnNumber() || !op.getMulti()); + opCtx->inMultiDocumentTransaction() || !opCtx->getTxnNumber() || !op.getMulti()); globalOpCounters.gotDelete(); ServerWriteConcernMetrics::get(opCtx)->recordWriteConcernForDelete(opCtx->getWriteConcern()); @@ -842,11 +832,8 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, request.setQuery(op.getQ()); request.setCollation(write_ops::collationOf(op)); request.setMulti(op.getMulti()); - auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - request.setYieldPolicy(readConcernArgs.getLevel() == - repl::ReadConcernLevel::kSnapshotReadConcern - ? PlanExecutor::INTERRUPT_ONLY - : PlanExecutor::YIELD_AUTO); + request.setYieldPolicy(opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY + : PlanExecutor::YIELD_AUTO); request.setStmtId(stmtId); ParsedDelete parsedDelete(opCtx, &request); @@ -915,7 +902,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who // transaction. auto txnParticipant = TransactionParticipant::get(opCtx); invariant(!opCtx->lockState()->inAWriteUnitOfWork() || - (txnParticipant && txnParticipant.inMultiDocumentTransaction())); + (txnParticipant && opCtx->inMultiDocumentTransaction())); uassertStatusOK(userAllowedWriteNS(wholeOp.getNamespace())); DisableDocumentValidationIfTrue docValidationDisabler( @@ -932,7 +919,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who for (auto&& singleOp : wholeOp.getDeletes()) { const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); if (opCtx->getTxnNumber()) { - if (!txnParticipant.inMultiDocumentTransaction() && + if (!opCtx->inMultiDocumentTransaction() && txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId)) { containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index 74508c4a10e..18a857d305b 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -49,7 +49,6 @@ #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/transaction_participant.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" @@ -167,8 +166,7 @@ unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSou // a transaction, the foreign collection is unsharded. Otherwise, we may access the catalog // cache, and attempt to do a network request while holding locks. // TODO: SERVER-39162 allow $lookup in sharded transactions. - auto txnParticipant = TransactionParticipant::get(expCtx->opCtx); - const bool inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction(); + const bool inTxn = expCtx->opCtx->inMultiDocumentTransaction(); const bool isSharded = [&]() { if (inTxn || !ShardingState::get(expCtx->opCtx)->enabled()) { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 6b3273792c0..ba1ad4a1e04 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -764,10 +764,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( unique_ptr<CanonicalQuery> canonicalQuery, bool permitYield, size_t plannerOptions) { - const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - auto yieldPolicy = - (permitYield && - (readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern)) + auto yieldPolicy = (permitYield && !opCtx->inMultiDocumentTransaction()) ? PlanExecutor::YIELD_AUTO : PlanExecutor::INTERRUPT_ONLY; return _getExecutorFind( @@ -1259,11 +1256,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCount( } unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - const auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - const auto yieldPolicy = - readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern - ? PlanExecutor::INTERRUPT_ONLY - : PlanExecutor::YIELD_AUTO; + const auto yieldPolicy = opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY + : PlanExecutor::YIELD_AUTO; const auto skip = request.getSkip().value_or(0); const auto limit = request.getLimit().value_or(0); @@ -1716,11 +1710,8 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDistinct( Collection* collection, size_t plannerOptions, ParsedDistinct* parsedDistinct) { - const auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - const auto yieldPolicy = - readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern - ? PlanExecutor::INTERRUPT_ONLY - : PlanExecutor::YIELD_AUTO; + const auto yieldPolicy = opCtx->inMultiDocumentTransaction() ? PlanExecutor::INTERRUPT_ONLY + : PlanExecutor::YIELD_AUTO; if (!collection) { // Treat collections that do not exist as empty collections. diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 6f4117e31cb..087a20cf3af 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -122,8 +122,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( /** * Get a plan executor for a .find() operation. The executor will have a 'YIELD_AUTO' yield policy - * unless a false value for 'permitYield' or a snapshot read concern (according to the - * OperationContext) forces it to have a 'NO_INTERRUPT' yield policy. + * unless a false value for 'permitYield' or being part of a multi-document transaction forces it to + * have a 'NO_INTERRUPT' yield policy. * * If the query is valid and an executor could be created, returns a StatusWith with the * PlanExecutor. diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp index ea270fef283..b5ce3b2fdb9 100644 --- a/src/mongo/db/read_concern_mongod.cpp +++ b/src/mongo/db/read_concern_mongod.cpp @@ -237,10 +237,9 @@ MONGO_REGISTER_SHIM(waitForReadConcern) ->Status { // If we are in a direct client within a transaction, then we may be holding locks, so it is // illegal to wait for read concern. This is fine, since the outer operation should have handled - // waiting for read concern. We don't want to ignore prepare conflicts because snapshot reads - // should block on prepared transactions. - if (opCtx->getClient()->isInDirectClient() && - readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { + // waiting for read concern. We don't want to ignore prepare conflicts because reads in + // transactions should block on prepared transactions. + if (opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction()) { return Status::OK(); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 49aa46c6ef0..0d6e43a2cab 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1542,8 +1542,7 @@ Status applyOperation_inlock(OperationContext* opCtx, // [2] This upsert behavior exists to support idempotency guarantees outside // steady-state replication and existing users of applyOps. - const auto txnParticipant = TransactionParticipant::get(opCtx); - const bool inTxn = txnParticipant && txnParticipant.inMultiDocumentTransaction(); + const bool inTxn = opCtx->inMultiDocumentTransaction(); bool needToDoUpsert = haveWrappingWriteUnitOfWork && !inTxn; Timestamp timestamp; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 5f2165e72eb..7f433b3e837 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -77,7 +77,6 @@ #include "mongo/db/repl/update_position_args.h" #include "mongo/db/repl/vote_requester.h" #include "mongo/db/server_options.h" -#include "mongo/db/transaction_participant.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" #include "mongo/executor/connection_pool_stats.h" @@ -1528,8 +1527,7 @@ Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated( OperationContext* opCtx, const ReadConcernArgs& readConcern) { const bool isMajorityCommittedRead = - readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern || - readConcern.getLevel() == ReadConcernLevel::kSnapshotReadConcern; + readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern; const auto targetOpTime = readConcern.getArgsOpTime().value_or(OpTime()); return _waitUntilOpTime(opCtx, isMajorityCommittedRead, targetOpTime); @@ -2290,8 +2288,7 @@ Status ReplicationCoordinatorImpl::checkCanServeReadsFor_UNSAFE(OperationContext return Status::OK(); } - auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant && txnParticipant.inMultiDocumentTransaction()) { + if (opCtx->inMultiDocumentTransaction()) { if (!_readWriteAbility->canAcceptNonLocalWrites_UNSAFE()) { return Status(ErrorCodes::NotMaster, "Multi-document transactions are only allowed on replica set primaries."); diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index 3994ad86972..8f40abd535b 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -176,6 +176,7 @@ Status applyCommitTransaction(OperationContext* opCtx, invariant(entry.getTxnNumber()); opCtx->setLogicalSessionId(*entry.getSessionId()); opCtx->setTxnNumber(*entry.getTxnNumber()); + opCtx->setInMultiDocumentTransaction(); // The write on transaction table may be applied concurrently, so refreshing state // from disk may read that write, causing starting a new transaction on an existing @@ -212,6 +213,8 @@ Status applyAbortTransaction(OperationContext* opCtx, invariant(entry.getTxnNumber()); opCtx->setLogicalSessionId(*entry.getSessionId()); opCtx->setTxnNumber(*entry.getTxnNumber()); + opCtx->setInMultiDocumentTransaction(); + // The write on transaction table may be applied concurrently, so refreshing state // from disk may read that write, causing starting a new transaction on an existing // txnNumber. Thus, we start a new transaction without refreshing state from disk. @@ -331,6 +334,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx, invariant(entry.getTxnNumber()); opCtx->setLogicalSessionId(*entry.getSessionId()); opCtx->setTxnNumber(*entry.getTxnNumber()); + opCtx->setInMultiDocumentTransaction(); + // The write on transaction table may be applied concurrently, so refreshing state // from disk may read that write, causing starting a new transaction on an existing // txnNumber. Thus, we start a new transaction without refreshing state from disk. diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 33cb39e854d..342ead52a0a 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -111,7 +111,7 @@ public: uassert(ErrorCodes::NoSuchTransaction, "Transaction isn't in progress", - txnParticipant.inMultiDocumentTransaction()); + txnParticipant.transactionIsOpen()); if (txnParticipant.transactionIsPrepared()) { auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient()); @@ -326,7 +326,7 @@ public: false /* autocommit */, boost::none /* startTransaction */); - invariant(!txnParticipant.inMultiDocumentTransaction(), + invariant(!txnParticipant.transactionIsOpen(), "The participant should not be in progress after we waited for the " "participant to complete"); uassert(ErrorCodes::NoSuchTransaction, diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index a17b1cabe71..be12c34a4f4 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -395,7 +395,7 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, if (sessionOptions.getCoordinator() == boost::optional<bool>(true)) { createTransactionCoordinator(opCtx, *sessionOptions.getTxnNumber()); } - } else if (txnParticipant.inMultiDocumentTransaction()) { + } else if (txnParticipant.transactionIsOpen()) { const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); uassert(ErrorCodes::InvalidOptions, "Only the first command in a transaction may specify a readConcern", @@ -780,9 +780,10 @@ void execCommandDatabase(OperationContext* opCtx, } auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - // If the parent operation runs in snapshot isolation, we don't override the read concern. - auto skipReadConcern = opCtx->getClient()->isInDirectClient() && - readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern; + + // If the parent operation runs in a transaction, we don't override the read concern. + auto skipReadConcern = + opCtx->getClient()->isInDirectClient() && opCtx->inMultiDocumentTransaction(); if (!skipReadConcern) { // If "startTransaction" is present, it must be true due to the parsing above. const bool upconvertToSnapshot(sessionOptions.getStartTransaction()); @@ -1259,10 +1260,9 @@ DbResponse ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, Client& c = *opCtx->getClient(); if (c.isInDirectClient()) { - if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber() || - repl::ReadConcernArgs::get(opCtx).getLevel() != - repl::ReadConcernLevel::kSnapshotReadConcern) { - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + if (!opCtx->getLogicalSessionId() || !opCtx->getTxnNumber()) { + invariant(!opCtx->inMultiDocumentTransaction() && + !opCtx->lockState()->inAWriteUnitOfWork()); } } else { LastError::get(c).startRequest(); diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 2f6145f0287..8a7a1164d33 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -203,6 +203,7 @@ void abortInProgressTransactions(OperationContext* opCtx) { IDLParserErrorContext("abort-in-progress-transactions"), cursor->next()); opCtx->setLogicalSessionId(txnRecord.getSessionId()); opCtx->setTxnNumber(txnRecord.getTxnNum()); + opCtx->setInMultiDocumentTransaction(); MongoDOperationContextSessionWithoutRefresh ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); LOG(3) << "Aborting transaction sessionId: " << txnRecord.getSessionId().toBSON() @@ -229,7 +230,7 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) { KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); catalog->scanSessions(matcher, [&](const ObservableSession& session) { const auto txnParticipant = TransactionParticipant::get(session); - if (!txnParticipant.inMultiDocumentTransaction()) { + if (!txnParticipant.transactionIsOpen()) { sessionKillTokens.emplace_back(session.kill()); } @@ -351,7 +352,7 @@ int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, for (const auto& lsid : expiredSessionIds) { catalog->scanSession(lsid, [](ObservableSession& session) { const auto participant = TransactionParticipant::get(session); - if (!participant.inMultiDocumentTransaction()) { + if (!participant.transactionIsOpen()) { session.markForReap(); } }); @@ -437,8 +438,7 @@ MongoDOperationContextSessionWithoutRefresh::~MongoDOperationContextSessionWitho const auto txnParticipant = TransactionParticipant::get(_opCtx); // A session on secondaries should never be checked back in with a TransactionParticipant that // isn't prepared, aborted, or committed. - invariant(!txnParticipant.inMultiDocumentTransaction() || - txnParticipant.transactionIsPrepared()); + invariant(!txnParticipant.transactionIsInProgress()); } } // namespace mongo diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 2ca4657a75e..85e09b8dfc9 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -517,6 +517,7 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt // autocommit be given as an argument on the request, and currently it can only be false, which // is verified earlier when parsing the request. invariant(*autocommit == false); + invariant(opCtx->inMultiDocumentTransaction()); if (!startTransaction) { _continueMultiDocumentTransaction(opCtx, txnNumber); @@ -558,6 +559,7 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt void TransactionParticipant::Participant::beginOrContinueTransactionUnconditionally( OperationContext* opCtx, TxnNumber txnNumber) { + invariant(opCtx->inMultiDocumentTransaction()); // We don't check or fetch any on-disk state, so treat the transaction as 'valid' for the // purposes of this method and continue the transaction unconditionally @@ -782,6 +784,8 @@ void TransactionParticipant::TxnResources::release(OperationContext* opCtx) { TransactionParticipant::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx) : _opCtx(opCtx) { + // Do nothing if we are already in a SideTransactionBlock. We can tell we are already in a + // SideTransactionBlock because there is no top level write unit of work. if (!_opCtx->getWriteUnitOfWork()) { return; } @@ -847,7 +851,7 @@ void TransactionParticipant::Participant::stashTransactionResources(OperationCon } invariant(opCtx->getTxnNumber()); - if (o().txnState.inMultiDocumentTransaction()) { + if (o().txnState.isOpen()) { _stashActiveTransaction(opCtx); } } diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index cd210a47f03..1b5b41d0d75 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -114,7 +114,7 @@ class TransactionParticipant { StateFlag newState, TransitionValidation shouldValidate = TransitionValidation::kValidateTransition); - bool inMultiDocumentTransaction() const { + bool isOpen() const { return _state == kInProgress || _state == kPrepared; } @@ -288,13 +288,13 @@ public: bool expiredAsOf(Date_t when) const; /** - * Returns whether we are in a multi-document transaction, which means we have an active - * transaction which has autocommit:false and has not been committed or aborted. It is - * possible that the current transaction is stashed onto the stack via a + * Returns whether we are in an open multi-document transaction, which means we have an + * active transaction which has autocommit:false and has not been committed or aborted. It + * is possible that the current transaction is stashed onto the stack via a * `SideTransactionBlock`. */ - bool inMultiDocumentTransaction() const { - return o().txnState.inMultiDocumentTransaction(); + bool transactionIsOpen() const { + return o().txnState.isOpen(); }; bool transactionIsCommitted() const { @@ -309,6 +309,10 @@ public: return o().txnState.isPrepared(); } + bool transactionIsInProgress() const { + return o().txnState.isInProgress(); + } + /** * If this session is holding stashed locks in txnResourceStash, reports the current state * of the session using the provided builder. @@ -549,16 +553,16 @@ public: * multi-key path info to the set of path infos to be updated at commit time. */ void addUncommittedMultikeyPathInfo(MultikeyPathInfo info) { - invariant(inMultiDocumentTransaction()); + invariant(transactionIsOpen()); p().multikeyPathInfo.emplace_back(std::move(info)); } /** - * May only be called while a mutil-document transaction is not committed and returns the + * May only be called while a multi-document transaction is not committed and returns the * path infos which have been added so far. */ const std::vector<MultikeyPathInfo>& getUncommittedMultikeyPathInfos() const { - invariant(inMultiDocumentTransaction()); + invariant(transactionIsOpen()); return p().multikeyPathInfo; } diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 155935ddd47..27c2587eeb9 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -699,6 +699,7 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 20; + opCtx()->setTxnNumber(txnNum); const auto uuid = UUID::gen(); txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); @@ -719,6 +720,7 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, auto autocommit = false; auto startTransaction = true; + opCtx()->setInMultiDocumentTransaction(); ASSERT_THROWS_CODE( txnParticipant.beginOrContinue(opCtx(), txnNum, autocommit, startTransaction), diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 025092ccde7..fb109b38bbc 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -263,6 +263,7 @@ protected: opCtx()->setLogicalSessionId(_sessionId); opCtx()->setTxnNumber(_txnNumber); + opCtx()->setInMultiDocumentTransaction(); // Normally, committing a transaction is supposed to usassert if the corresponding prepare // has not been majority committed. We excempt our unit tests from this expectation. @@ -299,6 +300,7 @@ protected: std::unique_ptr<MongoDOperationContextSession> checkOutSession( boost::optional<bool> startNewTxn = true) { opCtx()->lockState()->setShouldConflictWithSecondaryBatchApplication(false); + opCtx()->setInMultiDocumentTransaction(); auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, startNewTxn); @@ -345,6 +347,7 @@ TEST_F(TxnParticipantTest, TransactionThrowsLockTimeoutIfLockIsUnavailable) { auto newOpCtx = newClient->makeOperationContext(); newOpCtx.get()->setLogicalSessionId(newSessionId); newOpCtx.get()->setTxnNumber(newTxnNum); + newOpCtx.get()->setInMultiDocumentTransaction(); MongoDOperationContextSession newOpCtxSession(newOpCtx.get()); auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get()); @@ -415,7 +418,7 @@ TEST_F(TxnParticipantTest, CannotSpecifyStartTransactionOnInProgressTxn) { // Must specify startTransaction=true and autocommit=false to start a transaction. auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT_TRUE(txnParticipant.inMultiDocumentTransaction()); + ASSERT_TRUE(txnParticipant.transactionIsOpen()); // Cannot try to start a transaction that already started. ASSERT_THROWS_CODE( @@ -779,6 +782,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) { auto commitPreparedFunc = [&](OperationContext* opCtx) { opCtx->setLogicalSessionId(_sessionId); opCtx->setTxnNumber(_txnNumber); + opCtx->setInMultiDocumentTransaction(); // Check out the session and continue the transaction. auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx); @@ -819,6 +823,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) { auto commitPreparedFunc = [&](OperationContext* opCtx) { opCtx->setLogicalSessionId(_sessionId); opCtx->setTxnNumber(_txnNumber); + opCtx->setInMultiDocumentTransaction(); // Check out the session and continue the transaction. auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx); @@ -1196,7 +1201,7 @@ TEST_F(TxnParticipantTest, CannotContinueTransactionIfNotPrimary) { // Will start the transaction. auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT_TRUE(txnParticipant.inMultiDocumentTransaction()); + ASSERT_TRUE(txnParticipant.transactionIsOpen()); ASSERT_OK(repl::ReplicationCoordinator::get(opCtx())->setFollowerMode( repl::MemberState::RS_SECONDARY)); @@ -1239,6 +1244,7 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr txnNumberToStart = *opCtx()->getTxnNumber() + 1](OperationContext* newOpCtx) { newOpCtx->setLogicalSessionId(lsid); newOpCtx->setTxnNumber(txnNumberToStart); + newOpCtx->setInMultiDocumentTransaction(); MongoDOperationContextSession ocs(newOpCtx); auto txnParticipant = TransactionParticipant::get(newOpCtx); @@ -1399,7 +1405,7 @@ protected: auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT(txnParticipant.inMultiDocumentTransaction()); + ASSERT(txnParticipant.transactionIsOpen()); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction), @@ -1413,14 +1419,14 @@ protected: auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT(txnParticipant.inMultiDocumentTransaction()); + ASSERT(txnParticipant.transactionIsOpen()); txnParticipant.abortActiveTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); txnParticipant.beginOrContinue( opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction); - ASSERT(txnParticipant.inMultiDocumentTransaction()); + ASSERT(txnParticipant.transactionIsOpen()); } void cannotSpecifyStartTransactionOnCommittedTxn() { @@ -1429,7 +1435,7 @@ protected: auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT(txnParticipant.inMultiDocumentTransaction()); + ASSERT(txnParticipant.transactionIsOpen()); txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); txnParticipant.commitUnpreparedTransaction(opCtx()); @@ -1446,7 +1452,7 @@ protected: auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT(txnParticipant.inMultiDocumentTransaction()); + ASSERT(txnParticipant.transactionIsOpen()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); @@ -1465,7 +1471,7 @@ protected: auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT(txnParticipant.inMultiDocumentTransaction()); + ASSERT(txnParticipant.transactionIsOpen()); txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); txnParticipant.prepareTransaction(opCtx(), {}); @@ -1486,7 +1492,7 @@ protected: auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, boost::none); - ASSERT_FALSE(txnParticipant.inMultiDocumentTransaction()); + ASSERT_FALSE(txnParticipant.transactionIsOpen()); auto autocommit = false; auto startTransaction = true; @@ -1494,7 +1500,7 @@ protected: txnParticipant.beginOrContinue( opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction); - ASSERT(txnParticipant.inMultiDocumentTransaction()); + ASSERT(txnParticipant.transactionIsOpen()); } }; @@ -3805,7 +3811,7 @@ TEST_F(TxnParticipantTest, AbortTransactionOnSessionCheckoutWithoutRefresh) { MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT(txnParticipant.inMultiDocumentTransaction()); + ASSERT(txnParticipant.transactionIsOpen()); ASSERT_EQ(txnParticipant.getActiveTxnNumber(), txnNumber); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 2ec0bc34e5c..1672d0ac2fe 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1518,6 +1518,7 @@ public: const auto sessionId = makeLogicalSessionIdForTest(); _opCtx->setLogicalSessionId(sessionId); _opCtx->setTxnNumber(1); + _opCtx->setInMultiDocumentTransaction(); // Check out the session. MongoDOperationContextSession ocs(_opCtx); @@ -2683,6 +2684,7 @@ public: const auto sessionId = makeLogicalSessionIdForTest(); _opCtx->setLogicalSessionId(sessionId); _opCtx->setTxnNumber(26); + _opCtx->setInMultiDocumentTransaction(); ocs.emplace(_opCtx); |