diff options
Diffstat (limited to 'src')
20 files changed, 204 insertions, 163 deletions
diff --git a/src/mongo/db/commands/do_txn_cmd.cpp b/src/mongo/db/commands/do_txn_cmd.cpp index c4cc43f2dd7..801d55d4663 100644 --- a/src/mongo/db/commands/do_txn_cmd.cpp +++ b/src/mongo/db/commands/do_txn_cmd.cpp @@ -120,6 +120,14 @@ public: return true; } + bool supportsReadConcern(const std::string& dbName, + const BSONObj& cmdObj, + repl::ReadConcernLevel level) const override { + // Support the read concerns before and after upconversion. + return level == repl::ReadConcernLevel::kLocalReadConcern || + level == repl::ReadConcernLevel::kSnapshotReadConcern; + } + std::string help() const override { return "internal (sharding)\n{ doTxn : [ ] , preCondition : [ { ns : ... , q : ... , " "res : ... } ] }"; diff --git a/src/mongo/db/initialize_operation_session_info.cpp b/src/mongo/db/initialize_operation_session_info.cpp index 8842c5cb8bb..085e60db050 100644 --- a/src/mongo/db/initialize_operation_session_info.cpp +++ b/src/mongo/db/initialize_operation_session_info.cpp @@ -46,12 +46,11 @@ namespace mongo { // engine (see SERVER-34165). MONGO_EXPORT_SERVER_PARAMETER(enableInMemoryTransactions, bool, false); -boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo( - OperationContext* opCtx, - const BSONObj& requestBody, - bool requiresAuth, - bool isReplSetMemberOrMongos, - bool supportsDocLocking) { +OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* opCtx, + const BSONObj& requestBody, + bool requiresAuth, + bool isReplSetMemberOrMongos, + bool supportsDocLocking) { auto osi = OperationSessionInfoFromClient::parse("OperationSessionInfo"_sd, requestBody); if (opCtx->getClient()->isInDirectClient()) { @@ -74,14 +73,14 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo( // logical sessions are disabled. A client may authenticate as the __sytem user, // or as an externally authorized user. if (authSession->isUsingLocalhostBypass() && !authSession->isAuthenticated()) { - return boost::none; + return {}; } // Do not initialize lsid when auth is enabled and no user is logged in since // there is no sensible uid that can be assigned to it. if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled() && !authSession->isAuthenticated() && !requiresAuth) { - return boost::none; + return {}; } } @@ -92,7 +91,7 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo( if (!lsc) { // Ignore session information if the logical session cache has not been set up, e.g. on // the embedded version of mongod. - return boost::none; + return {}; } opCtx->setLogicalSessionId(makeLogicalSessionId(osi.getSessionId().get(), opCtx)); @@ -151,6 +150,21 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo( osi.getStartTransaction().value()); } + // Populate the session info for doTxn command. + if (requestBody.firstElementFieldName() == "doTxn"_sd) { + uassert(ErrorCodes::InvalidOptions, + "doTxn can only be run with a transaction number.", + osi.getTxnNumber()); + uassert(ErrorCodes::OperationNotSupportedInTransaction, + "doTxn can not be run in a transaction", + !osi.getAutocommit()); + // 'autocommit' and 'startTransaction' are populated for 'doTxn' to get the oplog + // entry generation behavior used for multi-document transactions. The 'doTxn' + // command still logically behaves as a commit. + osi.setAutocommit(false); + osi.setStartTransaction(true); + } + return osi; } diff --git a/src/mongo/db/initialize_operation_session_info.h b/src/mongo/db/initialize_operation_session_info.h index d7d99e95736..041d287ec21 100644 --- a/src/mongo/db/initialize_operation_session_info.h +++ b/src/mongo/db/initialize_operation_session_info.h @@ -51,11 +51,10 @@ namespace mongo { * On success, returns the parsed request information. Returning boost::none implies that the * proper command or session requirements were not met. */ -boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo( - OperationContext* opCtx, - const BSONObj& requestBody, - bool requiresAuth, - bool isReplSetMemberOrMongos, - bool supportsDocLocking); +OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* opCtx, + const BSONObj& requestBody, + bool requiresAuth, + bool isReplSetMemberOrMongos, + bool supportsDocLocking); } // namespace mongo diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp index 18e755fe507..3d90d189d5b 100644 --- a/src/mongo/db/logical_session_id_test.cpp +++ b/src/mongo/db/logical_session_id_test.cpp @@ -335,13 +335,17 @@ TEST_F(LogicalSessionIdTest, InitializeOperationSessionInfo_IgnoresInfoIfNoCache LogicalSessionCache::set(_opCtx->getServiceContext(), nullptr); - ASSERT_FALSE(initializeOperationSessionInfo( + auto sessionInfo = initializeOperationSessionInfo( _opCtx.get(), BSON("TestCmd" << 1 << "lsid" << lsid.toBSON() << "txnNumber" << 100LL << "OtherField" << "TestField"), true, true, - true)); + true); + ASSERT(sessionInfo.getSessionId() == boost::none); + ASSERT(sessionInfo.getTxnNumber() == boost::none); + ASSERT(sessionInfo.getStartTransaction() == boost::none); + ASSERT(sessionInfo.getAutocommit() == boost::none); } TEST_F(LogicalSessionIdTest, InitializeOperationSessionInfo_SendingInfoFailsInDirectClient) { diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index bfecac520d1..eb4c3f11438 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -406,12 +406,14 @@ TEST_F(OpObserverLargeTransactionTest, TransactionTooLargeWhileCommitting) { const TxnNumber txnNum = 0; opCtx->setLogicalSessionId(sessionId); opCtx->setTxnNumber(txnNum); - OperationContextSession opSession(opCtx.get(), - true /* checkOutSession */, - false /* autocommit */, - true /* startTransaction */, - "testDB" /* dbName */, - "insert" /* cmdName */); + OperationContextSession opSession(opCtx.get(), true /* checkOutSession */); + OperationContextSession::get(opCtx.get()) + ->beginOrContinueTxn(opCtx.get(), + txnNum, + /* autocommit */ false, + /* startTransaction */ true, + "testDB", + "insert"); session->unstashTransactionResources(opCtx.get(), "insert"); @@ -550,12 +552,13 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) { auto uuid2 = CollectionUUID::gen(); const TxnNumber txnNum = 2; opCtx()->setTxnNumber(txnNum); - OperationContextSession opSession(opCtx(), - true /* checkOutSession */, - false /* autocommit */, - true /* startTransaction*/, - "testDB", - "insert"); + OperationContextSession opSession(opCtx(), true /* checkOutSession */); + OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(), + txnNum, + /* autocommit */ false, + /* startTransaction */ true, + "testDB", + "insert"); session()->unstashTransactionResources(opCtx(), "insert"); @@ -628,12 +631,13 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { auto uuid2 = CollectionUUID::gen(); const TxnNumber txnNum = 3; opCtx()->setTxnNumber(txnNum); - OperationContextSession opSession(opCtx(), - true /* checkOutSession */, - false /* autocommit */, - true /* startTransaction*/, - "testDB", - "update"); + OperationContextSession opSession(opCtx(), true /* checkOutSession */); + OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(), + txnNum, + /* autocommit */ false, + /* startTransaction */ true, + "testDB", + "insert"); session()->unstashTransactionResources(opCtx(), "update"); @@ -698,12 +702,13 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { auto uuid2 = CollectionUUID::gen(); const TxnNumber txnNum = 3; opCtx()->setTxnNumber(txnNum); - OperationContextSession opSession(opCtx(), - true /* checkOutSession */, - false /* autocommit */, - true /* startTransaction*/, - "testDB", - "delete"); + OperationContextSession opSession(opCtx(), true /* checkOutSession */); + OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(), + txnNum, + /* autocommit */ false, + /* startTransaction */ true, + "testDB", + "insert"); session()->unstashTransactionResources(opCtx(), "delete"); diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp index 0c859fdbf75..81ca9ecdd6f 100644 --- a/src/mongo/db/read_concern.cpp +++ b/src/mongo/db/read_concern.cpp @@ -209,9 +209,8 @@ Status waitForReadConcern(OperationContext* opCtx, // If we are in a direct client within a transaction, then we may be holding locks, so it is // illegal to wait for read concern. This is fine, since the outer operation should have handled // waiting for read concern. - auto session = OperationContextSession::get(opCtx); - if (opCtx->getClient()->isInDirectClient() && session && - session->inActiveOrKilledMultiDocumentTransaction()) { + if (opCtx->getClient()->isInDirectClient() && + readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { return Status::OK(); } @@ -311,13 +310,6 @@ Status waitForReadConcern(OperationContext* opCtx, return {ErrorCodes::IncompatibleElectionProtocol, "Replica sets running protocol version 0 do not support readConcern: snapshot"}; } - if (speculative) { - session->setSpeculativeTransactionOpTime( - opCtx, - readConcernArgs.getOriginalLevel() == repl::ReadConcernLevel::kSnapshotReadConcern - ? SpeculativeTransactionOpTime::kAllCommitted - : SpeculativeTransactionOpTime::kLastApplied); - } } if (atClusterTime) { diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp index f1fbcfdb82e..a8b5e0722ea 100644 --- a/src/mongo/db/repl/do_txn.cpp +++ b/src/mongo/db/repl/do_txn.cpp @@ -279,8 +279,6 @@ Status doTxn(OperationContext* opCtx, const std::string& dbName, const BSONObj& doTxnCmd, BSONObjBuilder* result) { - auto txnNumber = opCtx->getTxnNumber(); - uassert(ErrorCodes::InvalidOptions, "doTxn can only be run with a transaction ID.", txnNumber); auto* session = OperationContextSession::get(opCtx); uassert(ErrorCodes::InvalidOptions, "doTxn must be run within a session", session); invariant(session->inActiveOrKilledMultiDocumentTransaction()); diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp index 33db97122ad..ebfd3db66af 100644 --- a/src/mongo/db/repl/do_txn_test.cpp +++ b/src/mongo/db/repl/do_txn_test.cpp @@ -147,13 +147,10 @@ void DoTxnTest::setUp() { // Set up the transaction and session. _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); _opCtx->setTxnNumber(0); // TxnNumber can always be 0 because we have a new session. - _ocs.emplace(_opCtx.get(), - true /* checkOutSession */, - false /* autocommit */, - true /* startTransaction */, - "admin" /* dbName */, - "doTxn" /* cmdName */); - OperationContextSession::get(opCtx())->unstashTransactionResources(opCtx(), "doTxn"); + _ocs.emplace(_opCtx.get(), true /* checkOutSession */); + auto session = OperationContextSession::get(opCtx()); + session->beginOrContinueTxn(opCtx(), *opCtx()->getTxnNumber(), false, true, "admin", "doTxn"); + session->unstashTransactionResources(opCtx(), "doTxn"); } void DoTxnTest::tearDown() { diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index db54f99ebfb..88560376c45 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1418,8 +1418,7 @@ Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext invariant(!readConcern.getArgsOpTime()); // TODO SERVER-34620: Re-enable speculative behavior when "atClusterTime" is specified. - auto session = OperationContextSession::get(opCtx); - const bool speculative = session && session->inActiveOrKilledMultiDocumentTransaction() && + const bool speculative = readConcern.getLevel() == ReadConcernLevel::kSnapshotReadConcern && !readConcern.getArgsAtClusterTime(); const bool isMajorityCommittedRead = diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 59f95c1317a..7e5d8583645 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -430,6 +430,11 @@ public: */ virtual boost::optional<Timestamp> getLastStableCheckpointTimestamp( ServiceContext* serviceCtx) const = 0; + + /** + * Returns the read timestamp of the recovery unit of the given operation context. + */ + virtual Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index d3cd887a32c..0f0402be183 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -1183,5 +1183,11 @@ Timestamp StorageInterfaceImpl::getOldestOpenReadTimestamp(ServiceContext* servi return serviceCtx->getStorageEngine()->getOldestOpenReadTimestamp(); } +Timestamp StorageInterfaceImpl::getPointInTimeReadTimestamp(OperationContext* opCtx) const { + auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); + invariant(readTimestamp); + return *readTimestamp; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 9420b12d8c6..1a5854710e4 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -178,6 +178,8 @@ public: Timestamp getOldestOpenReadTimestamp(ServiceContext* serviceCtx) const override; + Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override; + /** * Checks that the "admin" database contains a supported version of the auth data schema. */ diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index c6f29d79cd0..42f5ccdb0ea 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -338,6 +338,10 @@ public: return boost::none; } + Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override { + return {}; + } + // Testing functions. CreateCollectionForBulkFn createCollectionForBulkFn = [](const NamespaceString& nss, diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 49da1334126..3db19f80142 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -245,8 +245,14 @@ public: // requests with txnNumbers aren't allowed. To get around this, we have to manually set // up the session state and perform the insert. initializeOperationSessionInfo(innerOpCtx.get(), insertBuilder.obj(), true, true, true); - OperationContextSession sessionTxnState( - innerOpCtx.get(), true, boost::none, boost::none, "testDB", "insert"); + OperationContextSession sessionTxnState(innerOpCtx.get(), true); + OperationContextSession::get(innerOpCtx.get()) + ->beginOrContinueTxn(innerOpCtx.get(), + *sessionInfo.getTxnNumber(), + boost::none, + boost::none, + "testDB", + "insert"); const auto reply = performInserts(innerOpCtx.get(), insertRequest); ASSERT(reply.results.size() == 1); diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index e81d757de07..c4180d864e6 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -219,11 +219,11 @@ void generateErrorResponse(OperationContext* opCtx, replyBuilder->setMetadata(replyMetadata); } -BSONObj getErrorLabels(const boost::optional<OperationSessionInfoFromClient>& sessionOptions, +BSONObj getErrorLabels(const OperationSessionInfoFromClient& sessionOptions, const std::string& commandName, ErrorCodes::Error code) { // By specifying "autocommit", the user indicates they want to run a transaction. - if (!sessionOptions || !sessionOptions->getAutocommit()) { + if (!sessionOptions.getAutocommit()) { return {}; } @@ -347,8 +347,9 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(const CommandInvocation* i // We must be in a transaction if the readConcern level was upconverted to snapshot and the // command must support readConcern level snapshot in order to be supported in transactions. if (upconvertToSnapshot) { - return {ErrorCodes::OperationNotSupportedInTransaction, - str::stream() << "Command is not supported in a transaction"}; + return { + ErrorCodes::OperationNotSupportedInTransaction, + str::stream() << "Command is not supported as the first command in a transaction"}; } return {ErrorCodes::InvalidOptions, str::stream() << "Command does not support read concern " @@ -464,10 +465,20 @@ void appendClusterAndOperationTime(OperationContext* opCtx, void invokeInTransaction(OperationContext* opCtx, CommandInvocation* invocation, + const OpMsgRequest& request, + const OperationSessionInfoFromClient& sessionOptions, CommandReplyBuilder* replyBuilder) { auto session = OperationContextSession::get(opCtx); invariant(session); invariant(opCtx->getTxnNumber() || opCtx->getClient()->isInDirectClient()); + if (!opCtx->getClient()->isInDirectClient()) { + session->beginOrContinueTxn(opCtx, + *sessionOptions.getTxnNumber(), + sessionOptions.getAutocommit(), + sessionOptions.getStartTransaction(), + request.getDatabase(), + request.getCommandName()); + } session->unstashTransactionResources(opCtx, invocation->definition()->getName()); ScopeGuard guard = MakeGuard([session, opCtx]() { session->abortActiveTransaction(opCtx); }); @@ -493,7 +504,7 @@ bool runCommandImpl(OperationContext* opCtx, LogicalTime startOperationTime, const ServiceEntryPointCommon::Hooks& behaviors, BSONObjBuilder* extraFieldsBuilder, - const boost::optional<OperationSessionInfoFromClient>& sessionOptions) { + const OperationSessionInfoFromClient& sessionOptions) { const Command* command = invocation->definition(); auto bytesToReserve = command->reserveBytesForReply(); @@ -510,19 +521,21 @@ bool runCommandImpl(OperationContext* opCtx, if (!invocation->supportsWriteConcern()) { behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body); if (session) { - invokeInTransaction(opCtx, invocation, &crb); + invokeInTransaction(opCtx, invocation, request, sessionOptions, &crb); } else { invocation->run(opCtx, &crb); } } else { auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body)); - uassert(ErrorCodes::InvalidOptions, - "writeConcern is not allowed within a multi-statement transaction", - wcResult.usedDefault || !session || - !session->inActiveOrKilledMultiDocumentTransaction() || - invocation->definition()->getName() == "commitTransaction" || - invocation->definition()->getName() == "abortTransaction" || - invocation->definition()->getName() == "doTxn"); + if (sessionOptions.getAutocommit()) { + // If "autoCommit" is set, it must be "false". + uassert(ErrorCodes::InvalidOptions, + "writeConcern is not allowed within a multi-statement transaction", + wcResult.usedDefault || + invocation->definition()->getName() == "commitTransaction" || + invocation->definition()->getName() == "abortTransaction" || + invocation->definition()->getName() == "doTxn"); + } auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); @@ -546,7 +559,7 @@ bool runCommandImpl(OperationContext* opCtx, try { if (session) { - invokeInTransaction(opCtx, invocation, &crb); + invokeInTransaction(opCtx, invocation, request, sessionOptions, &crb); } else { invocation->run(opCtx, &crb); } @@ -638,7 +651,7 @@ void execCommandDatabase(OperationContext* opCtx, BSONObjBuilder extraFieldsBuilder; auto startOperationTime = getClientOperationTime(opCtx); auto invocation = command->parse(opCtx, request); - boost::optional<OperationSessionInfoFromClient> sessionOptions = boost::none; + OperationSessionInfoFromClient sessionOptions; try { { @@ -674,36 +687,23 @@ void execCommandDatabase(OperationContext* opCtx, const bool shouldCheckoutSession = static_cast<bool>(opCtx->getTxnNumber()) && sessionCheckoutWhitelist.find(command->getName()) != sessionCheckoutWhitelist.cend(); - // Parse the arguments specific to multi-statement transactions. - boost::optional<bool> startMultiDocTxn = boost::none; - boost::optional<bool> autocommitVal = boost::none; - if (sessionOptions) { - startMultiDocTxn = sessionOptions->getStartTransaction(); - autocommitVal = sessionOptions->getAutocommit(); - if (command->getName() == "doTxn") { - // Autocommit and 'startMultiDocTxn' are overridden for 'doTxn' to get the oplog - // entry generation behavior used for multi-document transactions. The 'doTxn' - // command still logically behaves as a commit. - autocommitVal = false; - startMultiDocTxn = true; - } - } + const auto shouldNotCheckOutSession = + !shouldCheckoutSession && !opCtx->getClient()->isInDirectClient(); // Reject commands with 'txnNumber' that do not check out the Session, since no retryable // writes or transaction machinery will be used to execute commands that do not check out // the Session. Do not check this if we are in DBDirectClient because the outer command is // responsible for checking out the Session. - if (!opCtx->getClient()->isInDirectClient()) { + if (shouldNotCheckOutSession) { uassert(ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "It is illegal to run command " << command->getName() << " in a multi-document transaction.", - shouldCheckoutSession || !autocommitVal || command->getName() == "doTxn"); + !sessionOptions.getAutocommit()); uassert(50768, str::stream() << "It is illegal to provide a txnNumber for command " << command->getName(), - shouldCheckoutSession || !opCtx->getTxnNumber()); + !opCtx->getTxnNumber()); } - std::unique_ptr<MaintenanceModeSetter> mmSetter; BSONElement cmdOptionMaxTimeMSField; @@ -747,7 +747,7 @@ void execCommandDatabase(OperationContext* opCtx, if (!opCtx->getClient()->isInDirectClient() && !MONGO_FAIL_POINT(skipCheckingForNotMasterInCommandDispatch)) { - const bool inMultiDocumentTransaction = (autocommitVal == false); + auto inMultiDocumentTransaction = static_cast<bool>(sessionOptions.getAutocommit()); auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; bool couldHaveOptedIn = @@ -812,23 +812,17 @@ void execCommandDatabase(OperationContext* opCtx, opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired); } - // This constructor will check out the session and start a transaction, if necessary. It - // handles the appropriate state management for both multi-statement transactions and - // retryable writes. - OperationContextSession sessionTxnState(opCtx, - shouldCheckoutSession, - autocommitVal, - startMultiDocTxn, - dbname, - command->getName()); + // This constructor will check out the session, if necessary, for both multi-statement + // transactions and retryable writes. + OperationContextSession sessionTxnState(opCtx, shouldCheckoutSession); auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - auto session = OperationContextSession::get(opCtx); - if (!opCtx->getClient()->isInDirectClient() || !session || - !session->inActiveOrKilledMultiDocumentTransaction()) { - const bool upconvertToSnapshot = session && - session->inActiveOrKilledMultiDocumentTransaction() && sessionOptions && - (sessionOptions->getStartTransaction() == boost::optional<bool>(true)); + // If the parent operation runs in snapshot isolation, we don't override the read concern. + auto skipReadConcern = opCtx->getClient()->isInDirectClient() && + readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern; + if (!skipReadConcern) { + // If "startTransaction" is present, it must be true due to the parsing above. + const bool upconvertToSnapshot(sessionOptions.getStartTransaction()); readConcernArgs = uassertStatusOK( _extractReadConcern(invocation.get(), request.body, upconvertToSnapshot)); } @@ -840,10 +834,9 @@ void execCommandDatabase(OperationContext* opCtx, } if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { - auto session = OperationContextSession::get(opCtx); uassert(ErrorCodes::InvalidOptions, - "readConcern level snapshot is only valid in multi-statement transactions", - session && session->inActiveOrKilledMultiDocumentTransaction()); + "readConcern level snapshot is only valid for the first transaction operation", + opCtx->getClient()->isInDirectClient() || sessionOptions.getStartTransaction()); uassert(ErrorCodes::InvalidOptions, "readConcern level snapshot requires a session ID", opCtx->getLogicalSessionId()); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 659ca1c3cc4..47727848aae 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -50,6 +50,7 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/retryable_writes_stats.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" @@ -388,9 +389,9 @@ void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber t _beginOrContinueTxnOnMigration(lg, txnNumber); } -void Session::setSpeculativeTransactionOpTime(OperationContext* opCtx, - SpeculativeTransactionOpTime opTimeChoice) { - stdx::lock_guard<stdx::mutex> lg(_mutex); +void Session::_setSpeculativeTransactionOpTime(WithLock, + OperationContext* opCtx, + SpeculativeTransactionOpTime opTimeChoice) { repl::ReplicationCoordinator* replCoord = repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext()); opCtx->recoveryUnit()->setTimestampReadSource( @@ -398,13 +399,12 @@ void Session::setSpeculativeTransactionOpTime(OperationContext* opCtx, ? RecoveryUnit::ReadSource::kAllCommittedSnapshot : RecoveryUnit::ReadSource::kLastAppliedSnapshot); opCtx->recoveryUnit()->preallocateSnapshot(); - auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); - invariant(readTimestamp); + auto readTimestamp = repl::StorageInterface::get(opCtx)->getPointInTimeReadTimestamp(opCtx); // Transactions do not survive term changes, so combining "getTerm" here with the // recovery unit timestamp does not cause races. - _speculativeTransactionReadOpTime = {*readTimestamp, replCoord->getTerm()}; + _speculativeTransactionReadOpTime = {readTimestamp, replCoord->getTerm()}; stdx::lock_guard<stdx::mutex> ls(_statsMutex); - _singleTransactionStats.setReadTimestamp(*readTimestamp); + _singleTransactionStats.setReadTimestamp(readTimestamp); } void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, @@ -821,11 +821,27 @@ void Session::unstashTransactionResources(OperationContext* opCtx, const std::st return; } - // Stashed transaction resources do not exist for this transaction. If this is a - // multi-document transaction, set up the transaction resources on the opCtx. + // Stashed transaction resources do not exist for this transaction. if (_txnState != MultiDocumentTransactionState::kInProgress) { return; } + + // Set speculative execution. + const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + const bool speculative = + readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && + !readConcernArgs.getArgsAtClusterTime(); + // Only set speculative on primary. + if (opCtx->writesAreReplicated() && speculative) { + _setSpeculativeTransactionOpTime(lg, + opCtx, + readConcernArgs.getOriginalLevel() == + repl::ReadConcernLevel::kSnapshotReadConcern + ? SpeculativeTransactionOpTime::kAllCommitted + : SpeculativeTransactionOpTime::kLastApplied); + } + + // If this is a multi-document transaction, set up the transaction resources on the opCtx. opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx)); // If maxTransactionLockRequestTimeoutMillis is set, then we will ensure no diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 3f2bf89df31..6070ddf109b 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -174,12 +174,6 @@ public: void beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber); /** - * Called for speculative transactions to fix the optime of the snapshot to read from. - */ - void setSpeculativeTransactionOpTime(OperationContext* opCtx, - SpeculativeTransactionOpTime opTimeChoice); - - /** * Called after a write under the specified transaction completes while the node is a primary * and specifies the statement ids which were written. Must be called while the caller is still * in the write's WUOW. Updates the on-disk state of the session to match the specified @@ -455,6 +449,12 @@ private: std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteTs); + // Called for speculative transactions to fix the optime of the snapshot to read from. + void _setSpeculativeTransactionOpTime(WithLock, + OperationContext* opCtx, + SpeculativeTransactionOpTime opTimeChoice); + + // Releases stashed transaction resources to abort the transaction. void _abortTransaction(WithLock); diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index b657c9ec5e4..0781349eae4 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -241,12 +241,7 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) { sri->availableCondVar.notify_one(); } -OperationContextSession::OperationContextSession(OperationContext* opCtx, - bool checkOutSession, - boost::optional<bool> autocommit, - boost::optional<bool> startTransaction, - StringData dbName, - StringData cmdName) +OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession) : _opCtx(opCtx) { if (!opCtx->getLogicalSessionId()) { @@ -277,10 +272,6 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx, checkedOutSession->get()->refreshFromStorageIfNeeded(opCtx); - if (opCtx->getTxnNumber()) { - checkedOutSession->get()->beginOrContinueTxn( - opCtx, *opCtx->getTxnNumber(), autocommit, startTransaction, dbName, cmdName); - } session->setCurrentOperation(opCtx); } diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index 50f5a16e24c..197a344e55a 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -245,12 +245,7 @@ class OperationContextSession { MONGO_DISALLOW_COPYING(OperationContextSession); public: - OperationContextSession(OperationContext* opCtx, - bool checkOutSession, - boost::optional<bool> autocommit, - boost::optional<bool> startTransaction, - StringData dbName, - StringData cmdName); + OperationContextSession(OperationContext* opCtx, bool checkOutSession); ~OperationContextSession(); diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index f31ce6b464f..5ad3280bc8b 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -92,7 +92,7 @@ TEST_F(SessionCatalogTest, OperationContextCheckedOutSession) { const TxnNumber txnNum = 20; opCtx()->setTxnNumber(txnNum); - OperationContextSession ocs(opCtx(), true, boost::none, boost::none, "testDB", "insert"); + OperationContextSession ocs(opCtx(), true); auto session = OperationContextSession::get(opCtx()); ASSERT(session); ASSERT_EQ(*opCtx()->getLogicalSessionId(), session->getSessionId()); @@ -101,7 +101,7 @@ TEST_F(SessionCatalogTest, OperationContextCheckedOutSession) { TEST_F(SessionCatalogTest, OperationContextNonCheckedOutSession) { opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); - OperationContextSession ocs(opCtx(), false, boost::none, boost::none, "testDB", "insert"); + OperationContextSession ocs(opCtx(), false); auto session = OperationContextSession::get(opCtx()); ASSERT(!session); @@ -120,7 +120,8 @@ TEST_F(SessionCatalogTest, GetOrCreateSessionAfterCheckOutSession) { opCtx()->setLogicalSessionId(lsid); boost::optional<OperationContextSession> ocs; - ocs.emplace(opCtx(), true, boost::none, false, "testDB", "insert"); + ocs.emplace(opCtx(), true); + // We don't have to start the transaction for this test. stdx::async(stdx::launch::async, [&] { ON_BLOCK_EXIT([&] { Client::destroy(); }); @@ -151,13 +152,11 @@ TEST_F(SessionCatalogTest, NestedOperationContextSession) { opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); { - OperationContextSession outerScopedSession( - opCtx(), true, boost::none, boost::none, "testDB", "insert"); + OperationContextSession outerScopedSession(opCtx(), true); { DirectClientSetter inDirectClient(opCtx()); - OperationContextSession innerScopedSession( - opCtx(), true, boost::none, boost::none, "testDB", "insert"); + OperationContextSession innerScopedSession(opCtx(), true); auto session = OperationContextSession::get(opCtx()); ASSERT(session); @@ -180,8 +179,13 @@ TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) { opCtx()->setTxnNumber(1); { - OperationContextSession outerScopedSession( - opCtx(), true, /* autocommit */ false, /* startTransaction */ true, "testDB", "find"); + OperationContextSession outerScopedSession(opCtx(), true); + OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(), + *opCtx()->getTxnNumber(), + /* autocommit */ false, + /* startTransaction */ true, + "testDB", + "find"); Locker* originalLocker = opCtx()->lockState(); RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit(); @@ -206,9 +210,8 @@ TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) { { // Make it look like we're in a DBDirectClient running a nested operation. DirectClientSetter inDirectClient(opCtx()); - OperationContextSession innerScopedSession( - opCtx(), true, boost::none, boost::none, "testDB", "find"); - + OperationContextSession innerScopedSession(opCtx(), true); + // Don't start transaction again if we're nested in invokeInTransaction(). OperationContextSession::get(opCtx())->stashTransactionResources(opCtx()); // The stash was a noop, so the locker, RecoveryUnit, and WriteUnitOfWork on the @@ -225,8 +228,13 @@ TEST_F(SessionCatalogTest, UnstashInNestedSessionIsANoop) { opCtx()->setTxnNumber(1); { - OperationContextSession outerScopedSession( - opCtx(), true, /* autocommit */ false, /* startTransaction */ true, "testDB", "find"); + OperationContextSession outerScopedSession(opCtx(), true); + OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(), + *opCtx()->getTxnNumber(), + /* autocommit */ false, + /* startTransaction */ true, + "testDB", + "find"); Locker* originalLocker = opCtx()->lockState(); RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit(); @@ -245,8 +253,7 @@ TEST_F(SessionCatalogTest, UnstashInNestedSessionIsANoop) { { // Make it look like we're in a DBDirectClient running a nested operation. DirectClientSetter inDirectClient(opCtx()); - OperationContextSession innerScopedSession( - opCtx(), true, boost::none, boost::none, "testDB", "find"); + OperationContextSession innerScopedSession(opCtx(), true); OperationContextSession::get(opCtx())->unstashTransactionResources(opCtx(), "find"); |