summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/do_txn_cmd.cpp8
-rw-r--r--src/mongo/db/initialize_operation_session_info.cpp32
-rw-r--r--src/mongo/db/initialize_operation_session_info.h11
-rw-r--r--src/mongo/db/logical_session_id_test.cpp8
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp53
-rw-r--r--src/mongo/db/read_concern.cpp12
-rw-r--r--src/mongo/db/repl/do_txn.cpp2
-rw-r--r--src/mongo/db/repl/do_txn_test.cpp11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface.h5
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp6
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h2
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h4
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp10
-rw-r--r--src/mongo/db/service_entry_point_common.cpp97
-rw-r--r--src/mongo/db/session.cpp34
-rw-r--r--src/mongo/db/session.h12
-rw-r--r--src/mongo/db/session_catalog.cpp11
-rw-r--r--src/mongo/db/session_catalog.h7
-rw-r--r--src/mongo/db/session_catalog_test.cpp39
20 files changed, 204 insertions, 163 deletions
diff --git a/src/mongo/db/commands/do_txn_cmd.cpp b/src/mongo/db/commands/do_txn_cmd.cpp
index c4cc43f2dd7..801d55d4663 100644
--- a/src/mongo/db/commands/do_txn_cmd.cpp
+++ b/src/mongo/db/commands/do_txn_cmd.cpp
@@ -120,6 +120,14 @@ public:
return true;
}
+ bool supportsReadConcern(const std::string& dbName,
+ const BSONObj& cmdObj,
+ repl::ReadConcernLevel level) const override {
+ // Support the read concerns before and after upconversion.
+ return level == repl::ReadConcernLevel::kLocalReadConcern ||
+ level == repl::ReadConcernLevel::kSnapshotReadConcern;
+ }
+
std::string help() const override {
return "internal (sharding)\n{ doTxn : [ ] , preCondition : [ { ns : ... , q : ... , "
"res : ... } ] }";
diff --git a/src/mongo/db/initialize_operation_session_info.cpp b/src/mongo/db/initialize_operation_session_info.cpp
index 8842c5cb8bb..085e60db050 100644
--- a/src/mongo/db/initialize_operation_session_info.cpp
+++ b/src/mongo/db/initialize_operation_session_info.cpp
@@ -46,12 +46,11 @@ namespace mongo {
// engine (see SERVER-34165).
MONGO_EXPORT_SERVER_PARAMETER(enableInMemoryTransactions, bool, false);
-boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
- OperationContext* opCtx,
- const BSONObj& requestBody,
- bool requiresAuth,
- bool isReplSetMemberOrMongos,
- bool supportsDocLocking) {
+OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* opCtx,
+ const BSONObj& requestBody,
+ bool requiresAuth,
+ bool isReplSetMemberOrMongos,
+ bool supportsDocLocking) {
auto osi = OperationSessionInfoFromClient::parse("OperationSessionInfo"_sd, requestBody);
if (opCtx->getClient()->isInDirectClient()) {
@@ -74,14 +73,14 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
// logical sessions are disabled. A client may authenticate as the __sytem user,
// or as an externally authorized user.
if (authSession->isUsingLocalhostBypass() && !authSession->isAuthenticated()) {
- return boost::none;
+ return {};
}
// Do not initialize lsid when auth is enabled and no user is logged in since
// there is no sensible uid that can be assigned to it.
if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled() &&
!authSession->isAuthenticated() && !requiresAuth) {
- return boost::none;
+ return {};
}
}
@@ -92,7 +91,7 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
if (!lsc) {
// Ignore session information if the logical session cache has not been set up, e.g. on
// the embedded version of mongod.
- return boost::none;
+ return {};
}
opCtx->setLogicalSessionId(makeLogicalSessionId(osi.getSessionId().get(), opCtx));
@@ -151,6 +150,21 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
osi.getStartTransaction().value());
}
+ // Populate the session info for doTxn command.
+ if (requestBody.firstElementFieldName() == "doTxn"_sd) {
+ uassert(ErrorCodes::InvalidOptions,
+ "doTxn can only be run with a transaction number.",
+ osi.getTxnNumber());
+ uassert(ErrorCodes::OperationNotSupportedInTransaction,
+ "doTxn can not be run in a transaction",
+ !osi.getAutocommit());
+ // 'autocommit' and 'startTransaction' are populated for 'doTxn' to get the oplog
+ // entry generation behavior used for multi-document transactions. The 'doTxn'
+ // command still logically behaves as a commit.
+ osi.setAutocommit(false);
+ osi.setStartTransaction(true);
+ }
+
return osi;
}
diff --git a/src/mongo/db/initialize_operation_session_info.h b/src/mongo/db/initialize_operation_session_info.h
index d7d99e95736..041d287ec21 100644
--- a/src/mongo/db/initialize_operation_session_info.h
+++ b/src/mongo/db/initialize_operation_session_info.h
@@ -51,11 +51,10 @@ namespace mongo {
* On success, returns the parsed request information. Returning boost::none implies that the
* proper command or session requirements were not met.
*/
-boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
- OperationContext* opCtx,
- const BSONObj& requestBody,
- bool requiresAuth,
- bool isReplSetMemberOrMongos,
- bool supportsDocLocking);
+OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* opCtx,
+ const BSONObj& requestBody,
+ bool requiresAuth,
+ bool isReplSetMemberOrMongos,
+ bool supportsDocLocking);
} // namespace mongo
diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp
index 18e755fe507..3d90d189d5b 100644
--- a/src/mongo/db/logical_session_id_test.cpp
+++ b/src/mongo/db/logical_session_id_test.cpp
@@ -335,13 +335,17 @@ TEST_F(LogicalSessionIdTest, InitializeOperationSessionInfo_IgnoresInfoIfNoCache
LogicalSessionCache::set(_opCtx->getServiceContext(), nullptr);
- ASSERT_FALSE(initializeOperationSessionInfo(
+ auto sessionInfo = initializeOperationSessionInfo(
_opCtx.get(),
BSON("TestCmd" << 1 << "lsid" << lsid.toBSON() << "txnNumber" << 100LL << "OtherField"
<< "TestField"),
true,
true,
- true));
+ true);
+ ASSERT(sessionInfo.getSessionId() == boost::none);
+ ASSERT(sessionInfo.getTxnNumber() == boost::none);
+ ASSERT(sessionInfo.getStartTransaction() == boost::none);
+ ASSERT(sessionInfo.getAutocommit() == boost::none);
}
TEST_F(LogicalSessionIdTest, InitializeOperationSessionInfo_SendingInfoFailsInDirectClient) {
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index bfecac520d1..eb4c3f11438 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -406,12 +406,14 @@ TEST_F(OpObserverLargeTransactionTest, TransactionTooLargeWhileCommitting) {
const TxnNumber txnNum = 0;
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNum);
- OperationContextSession opSession(opCtx.get(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction */,
- "testDB" /* dbName */,
- "insert" /* cmdName */);
+ OperationContextSession opSession(opCtx.get(), true /* checkOutSession */);
+ OperationContextSession::get(opCtx.get())
+ ->beginOrContinueTxn(opCtx.get(),
+ txnNum,
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "insert");
session->unstashTransactionResources(opCtx.get(), "insert");
@@ -550,12 +552,13 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) {
auto uuid2 = CollectionUUID::gen();
const TxnNumber txnNum = 2;
opCtx()->setTxnNumber(txnNum);
- OperationContextSession opSession(opCtx(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction*/,
- "testDB",
- "insert");
+ OperationContextSession opSession(opCtx(), true /* checkOutSession */);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ txnNum,
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "insert");
session()->unstashTransactionResources(opCtx(), "insert");
@@ -628,12 +631,13 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
auto uuid2 = CollectionUUID::gen();
const TxnNumber txnNum = 3;
opCtx()->setTxnNumber(txnNum);
- OperationContextSession opSession(opCtx(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction*/,
- "testDB",
- "update");
+ OperationContextSession opSession(opCtx(), true /* checkOutSession */);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ txnNum,
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "insert");
session()->unstashTransactionResources(opCtx(), "update");
@@ -698,12 +702,13 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
auto uuid2 = CollectionUUID::gen();
const TxnNumber txnNum = 3;
opCtx()->setTxnNumber(txnNum);
- OperationContextSession opSession(opCtx(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction*/,
- "testDB",
- "delete");
+ OperationContextSession opSession(opCtx(), true /* checkOutSession */);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ txnNum,
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "insert");
session()->unstashTransactionResources(opCtx(), "delete");
diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp
index 0c859fdbf75..81ca9ecdd6f 100644
--- a/src/mongo/db/read_concern.cpp
+++ b/src/mongo/db/read_concern.cpp
@@ -209,9 +209,8 @@ Status waitForReadConcern(OperationContext* opCtx,
// If we are in a direct client within a transaction, then we may be holding locks, so it is
// illegal to wait for read concern. This is fine, since the outer operation should have handled
// waiting for read concern.
- auto session = OperationContextSession::get(opCtx);
- if (opCtx->getClient()->isInDirectClient() && session &&
- session->inActiveOrKilledMultiDocumentTransaction()) {
+ if (opCtx->getClient()->isInDirectClient() &&
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
return Status::OK();
}
@@ -311,13 +310,6 @@ Status waitForReadConcern(OperationContext* opCtx,
return {ErrorCodes::IncompatibleElectionProtocol,
"Replica sets running protocol version 0 do not support readConcern: snapshot"};
}
- if (speculative) {
- session->setSpeculativeTransactionOpTime(
- opCtx,
- readConcernArgs.getOriginalLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
- ? SpeculativeTransactionOpTime::kAllCommitted
- : SpeculativeTransactionOpTime::kLastApplied);
- }
}
if (atClusterTime) {
diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp
index f1fbcfdb82e..a8b5e0722ea 100644
--- a/src/mongo/db/repl/do_txn.cpp
+++ b/src/mongo/db/repl/do_txn.cpp
@@ -279,8 +279,6 @@ Status doTxn(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& doTxnCmd,
BSONObjBuilder* result) {
- auto txnNumber = opCtx->getTxnNumber();
- uassert(ErrorCodes::InvalidOptions, "doTxn can only be run with a transaction ID.", txnNumber);
auto* session = OperationContextSession::get(opCtx);
uassert(ErrorCodes::InvalidOptions, "doTxn must be run within a session", session);
invariant(session->inActiveOrKilledMultiDocumentTransaction());
diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp
index 33db97122ad..ebfd3db66af 100644
--- a/src/mongo/db/repl/do_txn_test.cpp
+++ b/src/mongo/db/repl/do_txn_test.cpp
@@ -147,13 +147,10 @@ void DoTxnTest::setUp() {
// Set up the transaction and session.
_opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
_opCtx->setTxnNumber(0); // TxnNumber can always be 0 because we have a new session.
- _ocs.emplace(_opCtx.get(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction */,
- "admin" /* dbName */,
- "doTxn" /* cmdName */);
- OperationContextSession::get(opCtx())->unstashTransactionResources(opCtx(), "doTxn");
+ _ocs.emplace(_opCtx.get(), true /* checkOutSession */);
+ auto session = OperationContextSession::get(opCtx());
+ session->beginOrContinueTxn(opCtx(), *opCtx()->getTxnNumber(), false, true, "admin", "doTxn");
+ session->unstashTransactionResources(opCtx(), "doTxn");
}
void DoTxnTest::tearDown() {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index db54f99ebfb..88560376c45 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1418,8 +1418,7 @@ Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext
invariant(!readConcern.getArgsOpTime());
// TODO SERVER-34620: Re-enable speculative behavior when "atClusterTime" is specified.
- auto session = OperationContextSession::get(opCtx);
- const bool speculative = session && session->inActiveOrKilledMultiDocumentTransaction() &&
+ const bool speculative = readConcern.getLevel() == ReadConcernLevel::kSnapshotReadConcern &&
!readConcern.getArgsAtClusterTime();
const bool isMajorityCommittedRead =
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 59f95c1317a..7e5d8583645 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -430,6 +430,11 @@ public:
*/
virtual boost::optional<Timestamp> getLastStableCheckpointTimestamp(
ServiceContext* serviceCtx) const = 0;
+
+ /**
+ * Returns the read timestamp of the recovery unit of the given operation context.
+ */
+ virtual Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index d3cd887a32c..0f0402be183 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -1183,5 +1183,11 @@ Timestamp StorageInterfaceImpl::getOldestOpenReadTimestamp(ServiceContext* servi
return serviceCtx->getStorageEngine()->getOldestOpenReadTimestamp();
}
+Timestamp StorageInterfaceImpl::getPointInTimeReadTimestamp(OperationContext* opCtx) const {
+ auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
+ invariant(readTimestamp);
+ return *readTimestamp;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 9420b12d8c6..1a5854710e4 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -178,6 +178,8 @@ public:
Timestamp getOldestOpenReadTimestamp(ServiceContext* serviceCtx) const override;
+ Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override;
+
/**
* Checks that the "admin" database contains a supported version of the auth data schema.
*/
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index c6f29d79cd0..42f5ccdb0ea 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -338,6 +338,10 @@ public:
return boost::none;
}
+ Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override {
+ return {};
+ }
+
// Testing functions.
CreateCollectionForBulkFn createCollectionForBulkFn =
[](const NamespaceString& nss,
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 49da1334126..3db19f80142 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -245,8 +245,14 @@ public:
// requests with txnNumbers aren't allowed. To get around this, we have to manually set
// up the session state and perform the insert.
initializeOperationSessionInfo(innerOpCtx.get(), insertBuilder.obj(), true, true, true);
- OperationContextSession sessionTxnState(
- innerOpCtx.get(), true, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession sessionTxnState(innerOpCtx.get(), true);
+ OperationContextSession::get(innerOpCtx.get())
+ ->beginOrContinueTxn(innerOpCtx.get(),
+ *sessionInfo.getTxnNumber(),
+ boost::none,
+ boost::none,
+ "testDB",
+ "insert");
const auto reply = performInserts(innerOpCtx.get(), insertRequest);
ASSERT(reply.results.size() == 1);
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index e81d757de07..c4180d864e6 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -219,11 +219,11 @@ void generateErrorResponse(OperationContext* opCtx,
replyBuilder->setMetadata(replyMetadata);
}
-BSONObj getErrorLabels(const boost::optional<OperationSessionInfoFromClient>& sessionOptions,
+BSONObj getErrorLabels(const OperationSessionInfoFromClient& sessionOptions,
const std::string& commandName,
ErrorCodes::Error code) {
// By specifying "autocommit", the user indicates they want to run a transaction.
- if (!sessionOptions || !sessionOptions->getAutocommit()) {
+ if (!sessionOptions.getAutocommit()) {
return {};
}
@@ -347,8 +347,9 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(const CommandInvocation* i
// We must be in a transaction if the readConcern level was upconverted to snapshot and the
// command must support readConcern level snapshot in order to be supported in transactions.
if (upconvertToSnapshot) {
- return {ErrorCodes::OperationNotSupportedInTransaction,
- str::stream() << "Command is not supported in a transaction"};
+ return {
+ ErrorCodes::OperationNotSupportedInTransaction,
+ str::stream() << "Command is not supported as the first command in a transaction"};
}
return {ErrorCodes::InvalidOptions,
str::stream() << "Command does not support read concern "
@@ -464,10 +465,20 @@ void appendClusterAndOperationTime(OperationContext* opCtx,
void invokeInTransaction(OperationContext* opCtx,
CommandInvocation* invocation,
+ const OpMsgRequest& request,
+ const OperationSessionInfoFromClient& sessionOptions,
CommandReplyBuilder* replyBuilder) {
auto session = OperationContextSession::get(opCtx);
invariant(session);
invariant(opCtx->getTxnNumber() || opCtx->getClient()->isInDirectClient());
+ if (!opCtx->getClient()->isInDirectClient()) {
+ session->beginOrContinueTxn(opCtx,
+ *sessionOptions.getTxnNumber(),
+ sessionOptions.getAutocommit(),
+ sessionOptions.getStartTransaction(),
+ request.getDatabase(),
+ request.getCommandName());
+ }
session->unstashTransactionResources(opCtx, invocation->definition()->getName());
ScopeGuard guard = MakeGuard([session, opCtx]() { session->abortActiveTransaction(opCtx); });
@@ -493,7 +504,7 @@ bool runCommandImpl(OperationContext* opCtx,
LogicalTime startOperationTime,
const ServiceEntryPointCommon::Hooks& behaviors,
BSONObjBuilder* extraFieldsBuilder,
- const boost::optional<OperationSessionInfoFromClient>& sessionOptions) {
+ const OperationSessionInfoFromClient& sessionOptions) {
const Command* command = invocation->definition();
auto bytesToReserve = command->reserveBytesForReply();
@@ -510,19 +521,21 @@ bool runCommandImpl(OperationContext* opCtx,
if (!invocation->supportsWriteConcern()) {
behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
if (session) {
- invokeInTransaction(opCtx, invocation, &crb);
+ invokeInTransaction(opCtx, invocation, request, sessionOptions, &crb);
} else {
invocation->run(opCtx, &crb);
}
} else {
auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body));
- uassert(ErrorCodes::InvalidOptions,
- "writeConcern is not allowed within a multi-statement transaction",
- wcResult.usedDefault || !session ||
- !session->inActiveOrKilledMultiDocumentTransaction() ||
- invocation->definition()->getName() == "commitTransaction" ||
- invocation->definition()->getName() == "abortTransaction" ||
- invocation->definition()->getName() == "doTxn");
+ if (sessionOptions.getAutocommit()) {
+ // If "autoCommit" is set, it must be "false".
+ uassert(ErrorCodes::InvalidOptions,
+ "writeConcern is not allowed within a multi-statement transaction",
+ wcResult.usedDefault ||
+ invocation->definition()->getName() == "commitTransaction" ||
+ invocation->definition()->getName() == "abortTransaction" ||
+ invocation->definition()->getName() == "doTxn");
+ }
auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
@@ -546,7 +559,7 @@ bool runCommandImpl(OperationContext* opCtx,
try {
if (session) {
- invokeInTransaction(opCtx, invocation, &crb);
+ invokeInTransaction(opCtx, invocation, request, sessionOptions, &crb);
} else {
invocation->run(opCtx, &crb);
}
@@ -638,7 +651,7 @@ void execCommandDatabase(OperationContext* opCtx,
BSONObjBuilder extraFieldsBuilder;
auto startOperationTime = getClientOperationTime(opCtx);
auto invocation = command->parse(opCtx, request);
- boost::optional<OperationSessionInfoFromClient> sessionOptions = boost::none;
+ OperationSessionInfoFromClient sessionOptions;
try {
{
@@ -674,36 +687,23 @@ void execCommandDatabase(OperationContext* opCtx,
const bool shouldCheckoutSession = static_cast<bool>(opCtx->getTxnNumber()) &&
sessionCheckoutWhitelist.find(command->getName()) != sessionCheckoutWhitelist.cend();
- // Parse the arguments specific to multi-statement transactions.
- boost::optional<bool> startMultiDocTxn = boost::none;
- boost::optional<bool> autocommitVal = boost::none;
- if (sessionOptions) {
- startMultiDocTxn = sessionOptions->getStartTransaction();
- autocommitVal = sessionOptions->getAutocommit();
- if (command->getName() == "doTxn") {
- // Autocommit and 'startMultiDocTxn' are overridden for 'doTxn' to get the oplog
- // entry generation behavior used for multi-document transactions. The 'doTxn'
- // command still logically behaves as a commit.
- autocommitVal = false;
- startMultiDocTxn = true;
- }
- }
+ const auto shouldNotCheckOutSession =
+ !shouldCheckoutSession && !opCtx->getClient()->isInDirectClient();
// Reject commands with 'txnNumber' that do not check out the Session, since no retryable
// writes or transaction machinery will be used to execute commands that do not check out
// the Session. Do not check this if we are in DBDirectClient because the outer command is
// responsible for checking out the Session.
- if (!opCtx->getClient()->isInDirectClient()) {
+ if (shouldNotCheckOutSession) {
uassert(ErrorCodes::OperationNotSupportedInTransaction,
str::stream() << "It is illegal to run command " << command->getName()
<< " in a multi-document transaction.",
- shouldCheckoutSession || !autocommitVal || command->getName() == "doTxn");
+ !sessionOptions.getAutocommit());
uassert(50768,
str::stream() << "It is illegal to provide a txnNumber for command "
<< command->getName(),
- shouldCheckoutSession || !opCtx->getTxnNumber());
+ !opCtx->getTxnNumber());
}
-
std::unique_ptr<MaintenanceModeSetter> mmSetter;
BSONElement cmdOptionMaxTimeMSField;
@@ -747,7 +747,7 @@ void execCommandDatabase(OperationContext* opCtx,
if (!opCtx->getClient()->isInDirectClient() &&
!MONGO_FAIL_POINT(skipCheckingForNotMasterInCommandDispatch)) {
- const bool inMultiDocumentTransaction = (autocommitVal == false);
+ auto inMultiDocumentTransaction = static_cast<bool>(sessionOptions.getAutocommit());
auto allowed = command->secondaryAllowed(opCtx->getServiceContext());
bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways;
bool couldHaveOptedIn =
@@ -812,23 +812,17 @@ void execCommandDatabase(OperationContext* opCtx,
opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired);
}
- // This constructor will check out the session and start a transaction, if necessary. It
- // handles the appropriate state management for both multi-statement transactions and
- // retryable writes.
- OperationContextSession sessionTxnState(opCtx,
- shouldCheckoutSession,
- autocommitVal,
- startMultiDocTxn,
- dbname,
- command->getName());
+ // This constructor will check out the session, if necessary, for both multi-statement
+ // transactions and retryable writes.
+ OperationContextSession sessionTxnState(opCtx, shouldCheckoutSession);
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- auto session = OperationContextSession::get(opCtx);
- if (!opCtx->getClient()->isInDirectClient() || !session ||
- !session->inActiveOrKilledMultiDocumentTransaction()) {
- const bool upconvertToSnapshot = session &&
- session->inActiveOrKilledMultiDocumentTransaction() && sessionOptions &&
- (sessionOptions->getStartTransaction() == boost::optional<bool>(true));
+ // If the parent operation runs in snapshot isolation, we don't override the read concern.
+ auto skipReadConcern = opCtx->getClient()->isInDirectClient() &&
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern;
+ if (!skipReadConcern) {
+ // If "startTransaction" is present, it must be true due to the parsing above.
+ const bool upconvertToSnapshot(sessionOptions.getStartTransaction());
readConcernArgs = uassertStatusOK(
_extractReadConcern(invocation.get(), request.body, upconvertToSnapshot));
}
@@ -840,10 +834,9 @@ void execCommandDatabase(OperationContext* opCtx,
}
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
- auto session = OperationContextSession::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
- "readConcern level snapshot is only valid in multi-statement transactions",
- session && session->inActiveOrKilledMultiDocumentTransaction());
+ "readConcern level snapshot is only valid for the first transaction operation",
+ opCtx->getClient()->isInDirectClient() || sessionOptions.getStartTransaction());
uassert(ErrorCodes::InvalidOptions,
"readConcern level snapshot requires a session ID",
opCtx->getLogicalSessionId());
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 659ca1c3cc4..47727848aae 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/retryable_writes_stats.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
@@ -388,9 +389,9 @@ void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber t
_beginOrContinueTxnOnMigration(lg, txnNumber);
}
-void Session::setSpeculativeTransactionOpTime(OperationContext* opCtx,
- SpeculativeTransactionOpTime opTimeChoice) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
+void Session::_setSpeculativeTransactionOpTime(WithLock,
+ OperationContext* opCtx,
+ SpeculativeTransactionOpTime opTimeChoice) {
repl::ReplicationCoordinator* replCoord =
repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
opCtx->recoveryUnit()->setTimestampReadSource(
@@ -398,13 +399,12 @@ void Session::setSpeculativeTransactionOpTime(OperationContext* opCtx,
? RecoveryUnit::ReadSource::kAllCommittedSnapshot
: RecoveryUnit::ReadSource::kLastAppliedSnapshot);
opCtx->recoveryUnit()->preallocateSnapshot();
- auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
- invariant(readTimestamp);
+ auto readTimestamp = repl::StorageInterface::get(opCtx)->getPointInTimeReadTimestamp(opCtx);
// Transactions do not survive term changes, so combining "getTerm" here with the
// recovery unit timestamp does not cause races.
- _speculativeTransactionReadOpTime = {*readTimestamp, replCoord->getTerm()};
+ _speculativeTransactionReadOpTime = {readTimestamp, replCoord->getTerm()};
stdx::lock_guard<stdx::mutex> ls(_statsMutex);
- _singleTransactionStats.setReadTimestamp(*readTimestamp);
+ _singleTransactionStats.setReadTimestamp(readTimestamp);
}
void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
@@ -821,11 +821,27 @@ void Session::unstashTransactionResources(OperationContext* opCtx, const std::st
return;
}
- // Stashed transaction resources do not exist for this transaction. If this is a
- // multi-document transaction, set up the transaction resources on the opCtx.
+ // Stashed transaction resources do not exist for this transaction.
if (_txnState != MultiDocumentTransactionState::kInProgress) {
return;
}
+
+ // Set speculative execution.
+ const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ const bool speculative =
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
+ !readConcernArgs.getArgsAtClusterTime();
+ // Only set speculative on primary.
+ if (opCtx->writesAreReplicated() && speculative) {
+ _setSpeculativeTransactionOpTime(lg,
+ opCtx,
+ readConcernArgs.getOriginalLevel() ==
+ repl::ReadConcernLevel::kSnapshotReadConcern
+ ? SpeculativeTransactionOpTime::kAllCommitted
+ : SpeculativeTransactionOpTime::kLastApplied);
+ }
+
+ // If this is a multi-document transaction, set up the transaction resources on the opCtx.
opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
// If maxTransactionLockRequestTimeoutMillis is set, then we will ensure no
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 3f2bf89df31..6070ddf109b 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -174,12 +174,6 @@ public:
void beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber);
/**
- * Called for speculative transactions to fix the optime of the snapshot to read from.
- */
- void setSpeculativeTransactionOpTime(OperationContext* opCtx,
- SpeculativeTransactionOpTime opTimeChoice);
-
- /**
* Called after a write under the specified transaction completes while the node is a primary
* and specifies the statement ids which were written. Must be called while the caller is still
* in the write's WUOW. Updates the on-disk state of the session to match the specified
@@ -455,6 +449,12 @@ private:
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteTs);
+ // Called for speculative transactions to fix the optime of the snapshot to read from.
+ void _setSpeculativeTransactionOpTime(WithLock,
+ OperationContext* opCtx,
+ SpeculativeTransactionOpTime opTimeChoice);
+
+
// Releases stashed transaction resources to abort the transaction.
void _abortTransaction(WithLock);
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index b657c9ec5e4..0781349eae4 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -241,12 +241,7 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) {
sri->availableCondVar.notify_one();
}
-OperationContextSession::OperationContextSession(OperationContext* opCtx,
- bool checkOutSession,
- boost::optional<bool> autocommit,
- boost::optional<bool> startTransaction,
- StringData dbName,
- StringData cmdName)
+OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession)
: _opCtx(opCtx) {
if (!opCtx->getLogicalSessionId()) {
@@ -277,10 +272,6 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx,
checkedOutSession->get()->refreshFromStorageIfNeeded(opCtx);
- if (opCtx->getTxnNumber()) {
- checkedOutSession->get()->beginOrContinueTxn(
- opCtx, *opCtx->getTxnNumber(), autocommit, startTransaction, dbName, cmdName);
- }
session->setCurrentOperation(opCtx);
}
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 50f5a16e24c..197a344e55a 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -245,12 +245,7 @@ class OperationContextSession {
MONGO_DISALLOW_COPYING(OperationContextSession);
public:
- OperationContextSession(OperationContext* opCtx,
- bool checkOutSession,
- boost::optional<bool> autocommit,
- boost::optional<bool> startTransaction,
- StringData dbName,
- StringData cmdName);
+ OperationContextSession(OperationContext* opCtx, bool checkOutSession);
~OperationContextSession();
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index f31ce6b464f..5ad3280bc8b 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -92,7 +92,7 @@ TEST_F(SessionCatalogTest, OperationContextCheckedOutSession) {
const TxnNumber txnNum = 20;
opCtx()->setTxnNumber(txnNum);
- OperationContextSession ocs(opCtx(), true, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession ocs(opCtx(), true);
auto session = OperationContextSession::get(opCtx());
ASSERT(session);
ASSERT_EQ(*opCtx()->getLogicalSessionId(), session->getSessionId());
@@ -101,7 +101,7 @@ TEST_F(SessionCatalogTest, OperationContextCheckedOutSession) {
TEST_F(SessionCatalogTest, OperationContextNonCheckedOutSession) {
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
- OperationContextSession ocs(opCtx(), false, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession ocs(opCtx(), false);
auto session = OperationContextSession::get(opCtx());
ASSERT(!session);
@@ -120,7 +120,8 @@ TEST_F(SessionCatalogTest, GetOrCreateSessionAfterCheckOutSession) {
opCtx()->setLogicalSessionId(lsid);
boost::optional<OperationContextSession> ocs;
- ocs.emplace(opCtx(), true, boost::none, false, "testDB", "insert");
+ ocs.emplace(opCtx(), true);
+ // We don't have to start the transaction for this test.
stdx::async(stdx::launch::async, [&] {
ON_BLOCK_EXIT([&] { Client::destroy(); });
@@ -151,13 +152,11 @@ TEST_F(SessionCatalogTest, NestedOperationContextSession) {
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
{
- OperationContextSession outerScopedSession(
- opCtx(), true, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession outerScopedSession(opCtx(), true);
{
DirectClientSetter inDirectClient(opCtx());
- OperationContextSession innerScopedSession(
- opCtx(), true, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession innerScopedSession(opCtx(), true);
auto session = OperationContextSession::get(opCtx());
ASSERT(session);
@@ -180,8 +179,13 @@ TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) {
opCtx()->setTxnNumber(1);
{
- OperationContextSession outerScopedSession(
- opCtx(), true, /* autocommit */ false, /* startTransaction */ true, "testDB", "find");
+ OperationContextSession outerScopedSession(opCtx(), true);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ *opCtx()->getTxnNumber(),
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "find");
Locker* originalLocker = opCtx()->lockState();
RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
@@ -206,9 +210,8 @@ TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) {
{
// Make it look like we're in a DBDirectClient running a nested operation.
DirectClientSetter inDirectClient(opCtx());
- OperationContextSession innerScopedSession(
- opCtx(), true, boost::none, boost::none, "testDB", "find");
-
+ OperationContextSession innerScopedSession(opCtx(), true);
+ // Don't start transaction again if we're nested in invokeInTransaction().
OperationContextSession::get(opCtx())->stashTransactionResources(opCtx());
// The stash was a noop, so the locker, RecoveryUnit, and WriteUnitOfWork on the
@@ -225,8 +228,13 @@ TEST_F(SessionCatalogTest, UnstashInNestedSessionIsANoop) {
opCtx()->setTxnNumber(1);
{
- OperationContextSession outerScopedSession(
- opCtx(), true, /* autocommit */ false, /* startTransaction */ true, "testDB", "find");
+ OperationContextSession outerScopedSession(opCtx(), true);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ *opCtx()->getTxnNumber(),
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "find");
Locker* originalLocker = opCtx()->lockState();
RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
@@ -245,8 +253,7 @@ TEST_F(SessionCatalogTest, UnstashInNestedSessionIsANoop) {
{
// Make it look like we're in a DBDirectClient running a nested operation.
DirectClientSetter inDirectClient(opCtx());
- OperationContextSession innerScopedSession(
- opCtx(), true, boost::none, boost::none, "testDB", "find");
+ OperationContextSession innerScopedSession(opCtx(), true);
OperationContextSession::get(opCtx())->unstashTransactionResources(opCtx(), "find");