summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2019-02-27 19:13:12 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2019-03-04 17:02:58 -0500
commit5366d3c6ea014f1bd19eee1a149f46f3b1227a2b (patch)
tree6169e2afd8fdc26d4c86b4e81990733e837b65d8
parent01af550078c1c89201cbf861808f5a2c1fff7294 (diff)
downloadmongo-5366d3c6ea014f1bd19eee1a149f46f3b1227a2b.tar.gz
SERVER-37179 Pull out starting transaction from session checkout and push it down to before command execution.
This patch redid 248601a647 and 4fb38d9c10 from master on v4.0 branch. Transaction will begin or continue after waiting for read concern. If an error is thrown on starting transaction, it'll be able to wait for write concern if a write concern is specified.
-rw-r--r--jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js8
-rw-r--r--jstests/core/txns/commands_not_allowed_in_txn.js2
-rw-r--r--jstests/core/txns/do_txn_basic.js15
-rw-r--r--jstests/core/txns/multi_statement_transaction_command_args.js2
-rw-r--r--src/mongo/db/commands/do_txn_cmd.cpp8
-rw-r--r--src/mongo/db/initialize_operation_session_info.cpp32
-rw-r--r--src/mongo/db/initialize_operation_session_info.h11
-rw-r--r--src/mongo/db/logical_session_id_test.cpp8
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp53
-rw-r--r--src/mongo/db/read_concern.cpp12
-rw-r--r--src/mongo/db/repl/do_txn.cpp2
-rw-r--r--src/mongo/db/repl/do_txn_test.cpp11
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp3
-rw-r--r--src/mongo/db/repl/storage_interface.h5
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp6
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h2
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h4
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp10
-rw-r--r--src/mongo/db/service_entry_point_common.cpp97
-rw-r--r--src/mongo/db/session.cpp34
-rw-r--r--src/mongo/db/session.h12
-rw-r--r--src/mongo/db/session_catalog.cpp11
-rw-r--r--src/mongo/db/session_catalog.h7
-rw-r--r--src/mongo/db/session_catalog_test.cpp39
24 files changed, 223 insertions, 171 deletions
diff --git a/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js b/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js
index f78f7d17b8e..233f44ca68b 100644
--- a/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js
+++ b/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js
@@ -61,7 +61,13 @@ function doSnapshotFind(sortByAscending, collName, data, findErrorCodes) {
autocommit: false
};
res = data.sessionDb.adminCommand(abortCmd);
- assertWorkedOrFailed(abortCmd, res, [ErrorCodes.NoSuchTransaction]);
+ const abortErrorCodes = [
+ ErrorCodes.NoSuchTransaction,
+ ErrorCodes.TransactionCommitted,
+ ErrorCodes.TransactionTooOld,
+ ErrorCodes.Interrupted
+ ];
+ assertWorkedOrFailed(abortCmd, res, abortErrorCodes);
data.cursorId = 0;
} else {
assert(cursor.hasOwnProperty("firstBatch"), tojson(res));
diff --git a/jstests/core/txns/commands_not_allowed_in_txn.js b/jstests/core/txns/commands_not_allowed_in_txn.js
index 88d456b115e..5b39cdfa5c5 100644
--- a/jstests/core/txns/commands_not_allowed_in_txn.js
+++ b/jstests/core/txns/commands_not_allowed_in_txn.js
@@ -151,7 +151,7 @@
stmtId: NumberInt(1),
autocommit: false
}),
- ErrorCodes.ConflictingOperationInProgress);
+ ErrorCodes.OperationNotSupportedInTransaction);
// It is still possible to commit the transaction. The rejected command does not abort the
// transaction.
diff --git a/jstests/core/txns/do_txn_basic.js b/jstests/core/txns/do_txn_basic.js
index 7e9bd4b98e1..de753f787dc 100644
--- a/jstests/core/txns/do_txn_basic.js
+++ b/jstests/core/txns/do_txn_basic.js
@@ -86,7 +86,10 @@
jsTestLog("Valid 'ns' field value in unknown operation type 'x'.");
assert.commandFailedWithCode(
- db.adminCommand({doTxn: [{op: 'x', ns: t.getFullName(), o: {_id: 0}}]}),
+ db.adminCommand({
+ doTxn: [{op: 'x', ns: t.getFullName(), o: {_id: 0}}],
+ txnNumber: NumberLong(txnNumber++)
+ }),
ErrorCodes.FailedToParse,
'doTxn should fail on unknown operation type "x" with valid "ns" value');
@@ -159,10 +162,12 @@
'doTxn should fail when inner transaction contains statement id.');
jsTestLog("Malformed operation with unexpected field 'x'.");
- assert.commandFailedWithCode(
- db.adminCommand({doTxn: [{op: 'i', ns: t.getFullName(), o: {_id: 0}, x: 1}]}),
- ErrorCodes.FailedToParse,
- 'doTxn should fail on malformed operations.');
+ assert.commandFailedWithCode(db.adminCommand({
+ doTxn: [{op: 'i', ns: t.getFullName(), o: {_id: 0}, x: 1}],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail on malformed operations.');
assert.eq(0, t.find().count(), "Non-zero amount of documents in collection to start");
diff --git a/jstests/core/txns/multi_statement_transaction_command_args.js b/jstests/core/txns/multi_statement_transaction_command_args.js
index 8246d1030a5..d5866a57b52 100644
--- a/jstests/core/txns/multi_statement_transaction_command_args.js
+++ b/jstests/core/txns/multi_statement_transaction_command_args.js
@@ -145,7 +145,7 @@
txnNumber: NumberLong(txnNumber),
autocommit: false
}),
- ErrorCodes.NoSuchTransaction);
+ ErrorCodes.InvalidOptions);
jsTestLog("Try to begin a transaction with startTransaction=false and autocommit=false");
txnNumber++;
diff --git a/src/mongo/db/commands/do_txn_cmd.cpp b/src/mongo/db/commands/do_txn_cmd.cpp
index c4cc43f2dd7..801d55d4663 100644
--- a/src/mongo/db/commands/do_txn_cmd.cpp
+++ b/src/mongo/db/commands/do_txn_cmd.cpp
@@ -120,6 +120,14 @@ public:
return true;
}
+ bool supportsReadConcern(const std::string& dbName,
+ const BSONObj& cmdObj,
+ repl::ReadConcernLevel level) const override {
+ // Support the read concerns before and after upconversion.
+ return level == repl::ReadConcernLevel::kLocalReadConcern ||
+ level == repl::ReadConcernLevel::kSnapshotReadConcern;
+ }
+
std::string help() const override {
return "internal (sharding)\n{ doTxn : [ ] , preCondition : [ { ns : ... , q : ... , "
"res : ... } ] }";
diff --git a/src/mongo/db/initialize_operation_session_info.cpp b/src/mongo/db/initialize_operation_session_info.cpp
index 8842c5cb8bb..085e60db050 100644
--- a/src/mongo/db/initialize_operation_session_info.cpp
+++ b/src/mongo/db/initialize_operation_session_info.cpp
@@ -46,12 +46,11 @@ namespace mongo {
// engine (see SERVER-34165).
MONGO_EXPORT_SERVER_PARAMETER(enableInMemoryTransactions, bool, false);
-boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
- OperationContext* opCtx,
- const BSONObj& requestBody,
- bool requiresAuth,
- bool isReplSetMemberOrMongos,
- bool supportsDocLocking) {
+OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* opCtx,
+ const BSONObj& requestBody,
+ bool requiresAuth,
+ bool isReplSetMemberOrMongos,
+ bool supportsDocLocking) {
auto osi = OperationSessionInfoFromClient::parse("OperationSessionInfo"_sd, requestBody);
if (opCtx->getClient()->isInDirectClient()) {
@@ -74,14 +73,14 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
// logical sessions are disabled. A client may authenticate as the __sytem user,
// or as an externally authorized user.
if (authSession->isUsingLocalhostBypass() && !authSession->isAuthenticated()) {
- return boost::none;
+ return {};
}
// Do not initialize lsid when auth is enabled and no user is logged in since
// there is no sensible uid that can be assigned to it.
if (AuthorizationManager::get(opCtx->getServiceContext())->isAuthEnabled() &&
!authSession->isAuthenticated() && !requiresAuth) {
- return boost::none;
+ return {};
}
}
@@ -92,7 +91,7 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
if (!lsc) {
// Ignore session information if the logical session cache has not been set up, e.g. on
// the embedded version of mongod.
- return boost::none;
+ return {};
}
opCtx->setLogicalSessionId(makeLogicalSessionId(osi.getSessionId().get(), opCtx));
@@ -151,6 +150,21 @@ boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
osi.getStartTransaction().value());
}
+ // Populate the session info for doTxn command.
+ if (requestBody.firstElementFieldName() == "doTxn"_sd) {
+ uassert(ErrorCodes::InvalidOptions,
+ "doTxn can only be run with a transaction number.",
+ osi.getTxnNumber());
+ uassert(ErrorCodes::OperationNotSupportedInTransaction,
+ "doTxn can not be run in a transaction",
+ !osi.getAutocommit());
+ // 'autocommit' and 'startTransaction' are populated for 'doTxn' to get the oplog
+ // entry generation behavior used for multi-document transactions. The 'doTxn'
+ // command still logically behaves as a commit.
+ osi.setAutocommit(false);
+ osi.setStartTransaction(true);
+ }
+
return osi;
}
diff --git a/src/mongo/db/initialize_operation_session_info.h b/src/mongo/db/initialize_operation_session_info.h
index d7d99e95736..041d287ec21 100644
--- a/src/mongo/db/initialize_operation_session_info.h
+++ b/src/mongo/db/initialize_operation_session_info.h
@@ -51,11 +51,10 @@ namespace mongo {
* On success, returns the parsed request information. Returning boost::none implies that the
* proper command or session requirements were not met.
*/
-boost::optional<OperationSessionInfoFromClient> initializeOperationSessionInfo(
- OperationContext* opCtx,
- const BSONObj& requestBody,
- bool requiresAuth,
- bool isReplSetMemberOrMongos,
- bool supportsDocLocking);
+OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* opCtx,
+ const BSONObj& requestBody,
+ bool requiresAuth,
+ bool isReplSetMemberOrMongos,
+ bool supportsDocLocking);
} // namespace mongo
diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp
index 18e755fe507..3d90d189d5b 100644
--- a/src/mongo/db/logical_session_id_test.cpp
+++ b/src/mongo/db/logical_session_id_test.cpp
@@ -335,13 +335,17 @@ TEST_F(LogicalSessionIdTest, InitializeOperationSessionInfo_IgnoresInfoIfNoCache
LogicalSessionCache::set(_opCtx->getServiceContext(), nullptr);
- ASSERT_FALSE(initializeOperationSessionInfo(
+ auto sessionInfo = initializeOperationSessionInfo(
_opCtx.get(),
BSON("TestCmd" << 1 << "lsid" << lsid.toBSON() << "txnNumber" << 100LL << "OtherField"
<< "TestField"),
true,
true,
- true));
+ true);
+ ASSERT(sessionInfo.getSessionId() == boost::none);
+ ASSERT(sessionInfo.getTxnNumber() == boost::none);
+ ASSERT(sessionInfo.getStartTransaction() == boost::none);
+ ASSERT(sessionInfo.getAutocommit() == boost::none);
}
TEST_F(LogicalSessionIdTest, InitializeOperationSessionInfo_SendingInfoFailsInDirectClient) {
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index bfecac520d1..eb4c3f11438 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -406,12 +406,14 @@ TEST_F(OpObserverLargeTransactionTest, TransactionTooLargeWhileCommitting) {
const TxnNumber txnNum = 0;
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNum);
- OperationContextSession opSession(opCtx.get(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction */,
- "testDB" /* dbName */,
- "insert" /* cmdName */);
+ OperationContextSession opSession(opCtx.get(), true /* checkOutSession */);
+ OperationContextSession::get(opCtx.get())
+ ->beginOrContinueTxn(opCtx.get(),
+ txnNum,
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "insert");
session->unstashTransactionResources(opCtx.get(), "insert");
@@ -550,12 +552,13 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) {
auto uuid2 = CollectionUUID::gen();
const TxnNumber txnNum = 2;
opCtx()->setTxnNumber(txnNum);
- OperationContextSession opSession(opCtx(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction*/,
- "testDB",
- "insert");
+ OperationContextSession opSession(opCtx(), true /* checkOutSession */);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ txnNum,
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "insert");
session()->unstashTransactionResources(opCtx(), "insert");
@@ -628,12 +631,13 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
auto uuid2 = CollectionUUID::gen();
const TxnNumber txnNum = 3;
opCtx()->setTxnNumber(txnNum);
- OperationContextSession opSession(opCtx(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction*/,
- "testDB",
- "update");
+ OperationContextSession opSession(opCtx(), true /* checkOutSession */);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ txnNum,
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "insert");
session()->unstashTransactionResources(opCtx(), "update");
@@ -698,12 +702,13 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
auto uuid2 = CollectionUUID::gen();
const TxnNumber txnNum = 3;
opCtx()->setTxnNumber(txnNum);
- OperationContextSession opSession(opCtx(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction*/,
- "testDB",
- "delete");
+ OperationContextSession opSession(opCtx(), true /* checkOutSession */);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ txnNum,
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "insert");
session()->unstashTransactionResources(opCtx(), "delete");
diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp
index 0c859fdbf75..81ca9ecdd6f 100644
--- a/src/mongo/db/read_concern.cpp
+++ b/src/mongo/db/read_concern.cpp
@@ -209,9 +209,8 @@ Status waitForReadConcern(OperationContext* opCtx,
// If we are in a direct client within a transaction, then we may be holding locks, so it is
// illegal to wait for read concern. This is fine, since the outer operation should have handled
// waiting for read concern.
- auto session = OperationContextSession::get(opCtx);
- if (opCtx->getClient()->isInDirectClient() && session &&
- session->inActiveOrKilledMultiDocumentTransaction()) {
+ if (opCtx->getClient()->isInDirectClient() &&
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
return Status::OK();
}
@@ -311,13 +310,6 @@ Status waitForReadConcern(OperationContext* opCtx,
return {ErrorCodes::IncompatibleElectionProtocol,
"Replica sets running protocol version 0 do not support readConcern: snapshot"};
}
- if (speculative) {
- session->setSpeculativeTransactionOpTime(
- opCtx,
- readConcernArgs.getOriginalLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
- ? SpeculativeTransactionOpTime::kAllCommitted
- : SpeculativeTransactionOpTime::kLastApplied);
- }
}
if (atClusterTime) {
diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp
index f1fbcfdb82e..a8b5e0722ea 100644
--- a/src/mongo/db/repl/do_txn.cpp
+++ b/src/mongo/db/repl/do_txn.cpp
@@ -279,8 +279,6 @@ Status doTxn(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& doTxnCmd,
BSONObjBuilder* result) {
- auto txnNumber = opCtx->getTxnNumber();
- uassert(ErrorCodes::InvalidOptions, "doTxn can only be run with a transaction ID.", txnNumber);
auto* session = OperationContextSession::get(opCtx);
uassert(ErrorCodes::InvalidOptions, "doTxn must be run within a session", session);
invariant(session->inActiveOrKilledMultiDocumentTransaction());
diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp
index 33db97122ad..ebfd3db66af 100644
--- a/src/mongo/db/repl/do_txn_test.cpp
+++ b/src/mongo/db/repl/do_txn_test.cpp
@@ -147,13 +147,10 @@ void DoTxnTest::setUp() {
// Set up the transaction and session.
_opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
_opCtx->setTxnNumber(0); // TxnNumber can always be 0 because we have a new session.
- _ocs.emplace(_opCtx.get(),
- true /* checkOutSession */,
- false /* autocommit */,
- true /* startTransaction */,
- "admin" /* dbName */,
- "doTxn" /* cmdName */);
- OperationContextSession::get(opCtx())->unstashTransactionResources(opCtx(), "doTxn");
+ _ocs.emplace(_opCtx.get(), true /* checkOutSession */);
+ auto session = OperationContextSession::get(opCtx());
+ session->beginOrContinueTxn(opCtx(), *opCtx()->getTxnNumber(), false, true, "admin", "doTxn");
+ session->unstashTransactionResources(opCtx(), "doTxn");
}
void DoTxnTest::tearDown() {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index db54f99ebfb..88560376c45 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1418,8 +1418,7 @@ Status ReplicationCoordinatorImpl::_waitUntilClusterTimeForRead(OperationContext
invariant(!readConcern.getArgsOpTime());
// TODO SERVER-34620: Re-enable speculative behavior when "atClusterTime" is specified.
- auto session = OperationContextSession::get(opCtx);
- const bool speculative = session && session->inActiveOrKilledMultiDocumentTransaction() &&
+ const bool speculative = readConcern.getLevel() == ReadConcernLevel::kSnapshotReadConcern &&
!readConcern.getArgsAtClusterTime();
const bool isMajorityCommittedRead =
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 59f95c1317a..7e5d8583645 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -430,6 +430,11 @@ public:
*/
virtual boost::optional<Timestamp> getLastStableCheckpointTimestamp(
ServiceContext* serviceCtx) const = 0;
+
+ /**
+ * Returns the read timestamp of the recovery unit of the given operation context.
+ */
+ virtual Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index d3cd887a32c..0f0402be183 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -1183,5 +1183,11 @@ Timestamp StorageInterfaceImpl::getOldestOpenReadTimestamp(ServiceContext* servi
return serviceCtx->getStorageEngine()->getOldestOpenReadTimestamp();
}
+Timestamp StorageInterfaceImpl::getPointInTimeReadTimestamp(OperationContext* opCtx) const {
+ auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
+ invariant(readTimestamp);
+ return *readTimestamp;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 9420b12d8c6..1a5854710e4 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -178,6 +178,8 @@ public:
Timestamp getOldestOpenReadTimestamp(ServiceContext* serviceCtx) const override;
+ Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override;
+
/**
* Checks that the "admin" database contains a supported version of the auth data schema.
*/
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index c6f29d79cd0..42f5ccdb0ea 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -338,6 +338,10 @@ public:
return boost::none;
}
+ Timestamp getPointInTimeReadTimestamp(OperationContext* opCtx) const override {
+ return {};
+ }
+
// Testing functions.
CreateCollectionForBulkFn createCollectionForBulkFn =
[](const NamespaceString& nss,
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 49da1334126..3db19f80142 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -245,8 +245,14 @@ public:
// requests with txnNumbers aren't allowed. To get around this, we have to manually set
// up the session state and perform the insert.
initializeOperationSessionInfo(innerOpCtx.get(), insertBuilder.obj(), true, true, true);
- OperationContextSession sessionTxnState(
- innerOpCtx.get(), true, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession sessionTxnState(innerOpCtx.get(), true);
+ OperationContextSession::get(innerOpCtx.get())
+ ->beginOrContinueTxn(innerOpCtx.get(),
+ *sessionInfo.getTxnNumber(),
+ boost::none,
+ boost::none,
+ "testDB",
+ "insert");
const auto reply = performInserts(innerOpCtx.get(), insertRequest);
ASSERT(reply.results.size() == 1);
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index e81d757de07..c4180d864e6 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -219,11 +219,11 @@ void generateErrorResponse(OperationContext* opCtx,
replyBuilder->setMetadata(replyMetadata);
}
-BSONObj getErrorLabels(const boost::optional<OperationSessionInfoFromClient>& sessionOptions,
+BSONObj getErrorLabels(const OperationSessionInfoFromClient& sessionOptions,
const std::string& commandName,
ErrorCodes::Error code) {
// By specifying "autocommit", the user indicates they want to run a transaction.
- if (!sessionOptions || !sessionOptions->getAutocommit()) {
+ if (!sessionOptions.getAutocommit()) {
return {};
}
@@ -347,8 +347,9 @@ StatusWith<repl::ReadConcernArgs> _extractReadConcern(const CommandInvocation* i
// We must be in a transaction if the readConcern level was upconverted to snapshot and the
// command must support readConcern level snapshot in order to be supported in transactions.
if (upconvertToSnapshot) {
- return {ErrorCodes::OperationNotSupportedInTransaction,
- str::stream() << "Command is not supported in a transaction"};
+ return {
+ ErrorCodes::OperationNotSupportedInTransaction,
+ str::stream() << "Command is not supported as the first command in a transaction"};
}
return {ErrorCodes::InvalidOptions,
str::stream() << "Command does not support read concern "
@@ -464,10 +465,20 @@ void appendClusterAndOperationTime(OperationContext* opCtx,
void invokeInTransaction(OperationContext* opCtx,
CommandInvocation* invocation,
+ const OpMsgRequest& request,
+ const OperationSessionInfoFromClient& sessionOptions,
CommandReplyBuilder* replyBuilder) {
auto session = OperationContextSession::get(opCtx);
invariant(session);
invariant(opCtx->getTxnNumber() || opCtx->getClient()->isInDirectClient());
+ if (!opCtx->getClient()->isInDirectClient()) {
+ session->beginOrContinueTxn(opCtx,
+ *sessionOptions.getTxnNumber(),
+ sessionOptions.getAutocommit(),
+ sessionOptions.getStartTransaction(),
+ request.getDatabase(),
+ request.getCommandName());
+ }
session->unstashTransactionResources(opCtx, invocation->definition()->getName());
ScopeGuard guard = MakeGuard([session, opCtx]() { session->abortActiveTransaction(opCtx); });
@@ -493,7 +504,7 @@ bool runCommandImpl(OperationContext* opCtx,
LogicalTime startOperationTime,
const ServiceEntryPointCommon::Hooks& behaviors,
BSONObjBuilder* extraFieldsBuilder,
- const boost::optional<OperationSessionInfoFromClient>& sessionOptions) {
+ const OperationSessionInfoFromClient& sessionOptions) {
const Command* command = invocation->definition();
auto bytesToReserve = command->reserveBytesForReply();
@@ -510,19 +521,21 @@ bool runCommandImpl(OperationContext* opCtx,
if (!invocation->supportsWriteConcern()) {
behaviors.uassertCommandDoesNotSpecifyWriteConcern(request.body);
if (session) {
- invokeInTransaction(opCtx, invocation, &crb);
+ invokeInTransaction(opCtx, invocation, request, sessionOptions, &crb);
} else {
invocation->run(opCtx, &crb);
}
} else {
auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body));
- uassert(ErrorCodes::InvalidOptions,
- "writeConcern is not allowed within a multi-statement transaction",
- wcResult.usedDefault || !session ||
- !session->inActiveOrKilledMultiDocumentTransaction() ||
- invocation->definition()->getName() == "commitTransaction" ||
- invocation->definition()->getName() == "abortTransaction" ||
- invocation->definition()->getName() == "doTxn");
+ if (sessionOptions.getAutocommit()) {
+ // If "autoCommit" is set, it must be "false".
+ uassert(ErrorCodes::InvalidOptions,
+ "writeConcern is not allowed within a multi-statement transaction",
+ wcResult.usedDefault ||
+ invocation->definition()->getName() == "commitTransaction" ||
+ invocation->definition()->getName() == "abortTransaction" ||
+ invocation->definition()->getName() == "doTxn");
+ }
auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
@@ -546,7 +559,7 @@ bool runCommandImpl(OperationContext* opCtx,
try {
if (session) {
- invokeInTransaction(opCtx, invocation, &crb);
+ invokeInTransaction(opCtx, invocation, request, sessionOptions, &crb);
} else {
invocation->run(opCtx, &crb);
}
@@ -638,7 +651,7 @@ void execCommandDatabase(OperationContext* opCtx,
BSONObjBuilder extraFieldsBuilder;
auto startOperationTime = getClientOperationTime(opCtx);
auto invocation = command->parse(opCtx, request);
- boost::optional<OperationSessionInfoFromClient> sessionOptions = boost::none;
+ OperationSessionInfoFromClient sessionOptions;
try {
{
@@ -674,36 +687,23 @@ void execCommandDatabase(OperationContext* opCtx,
const bool shouldCheckoutSession = static_cast<bool>(opCtx->getTxnNumber()) &&
sessionCheckoutWhitelist.find(command->getName()) != sessionCheckoutWhitelist.cend();
- // Parse the arguments specific to multi-statement transactions.
- boost::optional<bool> startMultiDocTxn = boost::none;
- boost::optional<bool> autocommitVal = boost::none;
- if (sessionOptions) {
- startMultiDocTxn = sessionOptions->getStartTransaction();
- autocommitVal = sessionOptions->getAutocommit();
- if (command->getName() == "doTxn") {
- // Autocommit and 'startMultiDocTxn' are overridden for 'doTxn' to get the oplog
- // entry generation behavior used for multi-document transactions. The 'doTxn'
- // command still logically behaves as a commit.
- autocommitVal = false;
- startMultiDocTxn = true;
- }
- }
+ const auto shouldNotCheckOutSession =
+ !shouldCheckoutSession && !opCtx->getClient()->isInDirectClient();
// Reject commands with 'txnNumber' that do not check out the Session, since no retryable
// writes or transaction machinery will be used to execute commands that do not check out
// the Session. Do not check this if we are in DBDirectClient because the outer command is
// responsible for checking out the Session.
- if (!opCtx->getClient()->isInDirectClient()) {
+ if (shouldNotCheckOutSession) {
uassert(ErrorCodes::OperationNotSupportedInTransaction,
str::stream() << "It is illegal to run command " << command->getName()
<< " in a multi-document transaction.",
- shouldCheckoutSession || !autocommitVal || command->getName() == "doTxn");
+ !sessionOptions.getAutocommit());
uassert(50768,
str::stream() << "It is illegal to provide a txnNumber for command "
<< command->getName(),
- shouldCheckoutSession || !opCtx->getTxnNumber());
+ !opCtx->getTxnNumber());
}
-
std::unique_ptr<MaintenanceModeSetter> mmSetter;
BSONElement cmdOptionMaxTimeMSField;
@@ -747,7 +747,7 @@ void execCommandDatabase(OperationContext* opCtx,
if (!opCtx->getClient()->isInDirectClient() &&
!MONGO_FAIL_POINT(skipCheckingForNotMasterInCommandDispatch)) {
- const bool inMultiDocumentTransaction = (autocommitVal == false);
+ auto inMultiDocumentTransaction = static_cast<bool>(sessionOptions.getAutocommit());
auto allowed = command->secondaryAllowed(opCtx->getServiceContext());
bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways;
bool couldHaveOptedIn =
@@ -812,23 +812,17 @@ void execCommandDatabase(OperationContext* opCtx,
opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}, ErrorCodes::MaxTimeMSExpired);
}
- // This constructor will check out the session and start a transaction, if necessary. It
- // handles the appropriate state management for both multi-statement transactions and
- // retryable writes.
- OperationContextSession sessionTxnState(opCtx,
- shouldCheckoutSession,
- autocommitVal,
- startMultiDocTxn,
- dbname,
- command->getName());
+ // This constructor will check out the session, if necessary, for both multi-statement
+ // transactions and retryable writes.
+ OperationContextSession sessionTxnState(opCtx, shouldCheckoutSession);
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- auto session = OperationContextSession::get(opCtx);
- if (!opCtx->getClient()->isInDirectClient() || !session ||
- !session->inActiveOrKilledMultiDocumentTransaction()) {
- const bool upconvertToSnapshot = session &&
- session->inActiveOrKilledMultiDocumentTransaction() && sessionOptions &&
- (sessionOptions->getStartTransaction() == boost::optional<bool>(true));
+ // If the parent operation runs in snapshot isolation, we don't override the read concern.
+ auto skipReadConcern = opCtx->getClient()->isInDirectClient() &&
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern;
+ if (!skipReadConcern) {
+ // If "startTransaction" is present, it must be true due to the parsing above.
+ const bool upconvertToSnapshot(sessionOptions.getStartTransaction());
readConcernArgs = uassertStatusOK(
_extractReadConcern(invocation.get(), request.body, upconvertToSnapshot));
}
@@ -840,10 +834,9 @@ void execCommandDatabase(OperationContext* opCtx,
}
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
- auto session = OperationContextSession::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
- "readConcern level snapshot is only valid in multi-statement transactions",
- session && session->inActiveOrKilledMultiDocumentTransaction());
+ "readConcern level snapshot is only valid for the first transaction operation",
+ opCtx->getClient()->isInDirectClient() || sessionOptions.getStartTransaction());
uassert(ErrorCodes::InvalidOptions,
"readConcern level snapshot requires a session ID",
opCtx->getLogicalSessionId());
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 659ca1c3cc4..47727848aae 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/retryable_writes_stats.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
@@ -388,9 +389,9 @@ void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber t
_beginOrContinueTxnOnMigration(lg, txnNumber);
}
-void Session::setSpeculativeTransactionOpTime(OperationContext* opCtx,
- SpeculativeTransactionOpTime opTimeChoice) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
+void Session::_setSpeculativeTransactionOpTime(WithLock,
+ OperationContext* opCtx,
+ SpeculativeTransactionOpTime opTimeChoice) {
repl::ReplicationCoordinator* replCoord =
repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
opCtx->recoveryUnit()->setTimestampReadSource(
@@ -398,13 +399,12 @@ void Session::setSpeculativeTransactionOpTime(OperationContext* opCtx,
? RecoveryUnit::ReadSource::kAllCommittedSnapshot
: RecoveryUnit::ReadSource::kLastAppliedSnapshot);
opCtx->recoveryUnit()->preallocateSnapshot();
- auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
- invariant(readTimestamp);
+ auto readTimestamp = repl::StorageInterface::get(opCtx)->getPointInTimeReadTimestamp(opCtx);
// Transactions do not survive term changes, so combining "getTerm" here with the
// recovery unit timestamp does not cause races.
- _speculativeTransactionReadOpTime = {*readTimestamp, replCoord->getTerm()};
+ _speculativeTransactionReadOpTime = {readTimestamp, replCoord->getTerm()};
stdx::lock_guard<stdx::mutex> ls(_statsMutex);
- _singleTransactionStats.setReadTimestamp(*readTimestamp);
+ _singleTransactionStats.setReadTimestamp(readTimestamp);
}
void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
@@ -821,11 +821,27 @@ void Session::unstashTransactionResources(OperationContext* opCtx, const std::st
return;
}
- // Stashed transaction resources do not exist for this transaction. If this is a
- // multi-document transaction, set up the transaction resources on the opCtx.
+ // Stashed transaction resources do not exist for this transaction.
if (_txnState != MultiDocumentTransactionState::kInProgress) {
return;
}
+
+ // Set speculative execution.
+ const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ const bool speculative =
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
+ !readConcernArgs.getArgsAtClusterTime();
+ // Only set speculative on primary.
+ if (opCtx->writesAreReplicated() && speculative) {
+ _setSpeculativeTransactionOpTime(lg,
+ opCtx,
+ readConcernArgs.getOriginalLevel() ==
+ repl::ReadConcernLevel::kSnapshotReadConcern
+ ? SpeculativeTransactionOpTime::kAllCommitted
+ : SpeculativeTransactionOpTime::kLastApplied);
+ }
+
+ // If this is a multi-document transaction, set up the transaction resources on the opCtx.
opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
// If maxTransactionLockRequestTimeoutMillis is set, then we will ensure no
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 3f2bf89df31..6070ddf109b 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -174,12 +174,6 @@ public:
void beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber);
/**
- * Called for speculative transactions to fix the optime of the snapshot to read from.
- */
- void setSpeculativeTransactionOpTime(OperationContext* opCtx,
- SpeculativeTransactionOpTime opTimeChoice);
-
- /**
* Called after a write under the specified transaction completes while the node is a primary
* and specifies the statement ids which were written. Must be called while the caller is still
* in the write's WUOW. Updates the on-disk state of the session to match the specified
@@ -455,6 +449,12 @@ private:
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteTs);
+ // Called for speculative transactions to fix the optime of the snapshot to read from.
+ void _setSpeculativeTransactionOpTime(WithLock,
+ OperationContext* opCtx,
+ SpeculativeTransactionOpTime opTimeChoice);
+
+
// Releases stashed transaction resources to abort the transaction.
void _abortTransaction(WithLock);
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index b657c9ec5e4..0781349eae4 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -241,12 +241,7 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) {
sri->availableCondVar.notify_one();
}
-OperationContextSession::OperationContextSession(OperationContext* opCtx,
- bool checkOutSession,
- boost::optional<bool> autocommit,
- boost::optional<bool> startTransaction,
- StringData dbName,
- StringData cmdName)
+OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession)
: _opCtx(opCtx) {
if (!opCtx->getLogicalSessionId()) {
@@ -277,10 +272,6 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx,
checkedOutSession->get()->refreshFromStorageIfNeeded(opCtx);
- if (opCtx->getTxnNumber()) {
- checkedOutSession->get()->beginOrContinueTxn(
- opCtx, *opCtx->getTxnNumber(), autocommit, startTransaction, dbName, cmdName);
- }
session->setCurrentOperation(opCtx);
}
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 50f5a16e24c..197a344e55a 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -245,12 +245,7 @@ class OperationContextSession {
MONGO_DISALLOW_COPYING(OperationContextSession);
public:
- OperationContextSession(OperationContext* opCtx,
- bool checkOutSession,
- boost::optional<bool> autocommit,
- boost::optional<bool> startTransaction,
- StringData dbName,
- StringData cmdName);
+ OperationContextSession(OperationContext* opCtx, bool checkOutSession);
~OperationContextSession();
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index f31ce6b464f..5ad3280bc8b 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -92,7 +92,7 @@ TEST_F(SessionCatalogTest, OperationContextCheckedOutSession) {
const TxnNumber txnNum = 20;
opCtx()->setTxnNumber(txnNum);
- OperationContextSession ocs(opCtx(), true, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession ocs(opCtx(), true);
auto session = OperationContextSession::get(opCtx());
ASSERT(session);
ASSERT_EQ(*opCtx()->getLogicalSessionId(), session->getSessionId());
@@ -101,7 +101,7 @@ TEST_F(SessionCatalogTest, OperationContextCheckedOutSession) {
TEST_F(SessionCatalogTest, OperationContextNonCheckedOutSession) {
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
- OperationContextSession ocs(opCtx(), false, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession ocs(opCtx(), false);
auto session = OperationContextSession::get(opCtx());
ASSERT(!session);
@@ -120,7 +120,8 @@ TEST_F(SessionCatalogTest, GetOrCreateSessionAfterCheckOutSession) {
opCtx()->setLogicalSessionId(lsid);
boost::optional<OperationContextSession> ocs;
- ocs.emplace(opCtx(), true, boost::none, false, "testDB", "insert");
+ ocs.emplace(opCtx(), true);
+ // We don't have to start the transaction for this test.
stdx::async(stdx::launch::async, [&] {
ON_BLOCK_EXIT([&] { Client::destroy(); });
@@ -151,13 +152,11 @@ TEST_F(SessionCatalogTest, NestedOperationContextSession) {
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
{
- OperationContextSession outerScopedSession(
- opCtx(), true, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession outerScopedSession(opCtx(), true);
{
DirectClientSetter inDirectClient(opCtx());
- OperationContextSession innerScopedSession(
- opCtx(), true, boost::none, boost::none, "testDB", "insert");
+ OperationContextSession innerScopedSession(opCtx(), true);
auto session = OperationContextSession::get(opCtx());
ASSERT(session);
@@ -180,8 +179,13 @@ TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) {
opCtx()->setTxnNumber(1);
{
- OperationContextSession outerScopedSession(
- opCtx(), true, /* autocommit */ false, /* startTransaction */ true, "testDB", "find");
+ OperationContextSession outerScopedSession(opCtx(), true);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ *opCtx()->getTxnNumber(),
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "find");
Locker* originalLocker = opCtx()->lockState();
RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
@@ -206,9 +210,8 @@ TEST_F(SessionCatalogTest, StashInNestedSessionIsANoop) {
{
// Make it look like we're in a DBDirectClient running a nested operation.
DirectClientSetter inDirectClient(opCtx());
- OperationContextSession innerScopedSession(
- opCtx(), true, boost::none, boost::none, "testDB", "find");
-
+ OperationContextSession innerScopedSession(opCtx(), true);
+ // Don't start transaction again if we're nested in invokeInTransaction().
OperationContextSession::get(opCtx())->stashTransactionResources(opCtx());
// The stash was a noop, so the locker, RecoveryUnit, and WriteUnitOfWork on the
@@ -225,8 +228,13 @@ TEST_F(SessionCatalogTest, UnstashInNestedSessionIsANoop) {
opCtx()->setTxnNumber(1);
{
- OperationContextSession outerScopedSession(
- opCtx(), true, /* autocommit */ false, /* startTransaction */ true, "testDB", "find");
+ OperationContextSession outerScopedSession(opCtx(), true);
+ OperationContextSession::get(opCtx())->beginOrContinueTxn(opCtx(),
+ *opCtx()->getTxnNumber(),
+ /* autocommit */ false,
+ /* startTransaction */ true,
+ "testDB",
+ "find");
Locker* originalLocker = opCtx()->lockState();
RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
@@ -245,8 +253,7 @@ TEST_F(SessionCatalogTest, UnstashInNestedSessionIsANoop) {
{
// Make it look like we're in a DBDirectClient running a nested operation.
DirectClientSetter inDirectClient(opCtx());
- OperationContextSession innerScopedSession(
- opCtx(), true, boost::none, boost::none, "testDB", "find");
+ OperationContextSession innerScopedSession(opCtx(), true);
OperationContextSession::get(opCtx())->unstashTransactionResources(opCtx(), "find");