summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/txns/majority_read_concern.js33
-rw-r--r--jstests/core/txns/read_concerns.js53
-rw-r--r--jstests/sharding/transactions_read_concerns.js4
-rw-r--r--src/mongo/s/transaction_router.cpp11
-rw-r--r--src/mongo/s/transaction_router_test.cpp186
5 files changed, 154 insertions, 133 deletions
diff --git a/jstests/core/txns/majority_read_concern.js b/jstests/core/txns/majority_read_concern.js
deleted file mode 100644
index e21f8b8570e..00000000000
--- a/jstests/core/txns/majority_read_concern.js
+++ /dev/null
@@ -1,33 +0,0 @@
-// Verifies transactions can run with majority read concern, with or without afterClusterTime.
-//
-// @tags: [uses_transactions]
-(function() {
- "use strict";
-
- const dbName = "test";
- const collName = "majority_read_concern";
-
- function runTest(sessionOptions) {
- jsTestLog("Testing transactions with majority read concern and sessionOptions: " +
- tojson(sessionOptions));
-
- db.getSiblingDB(dbName).runCommand({drop: collName, writeConcern: {w: "majority"}});
-
- const session = db.getMongo().startSession(sessionOptions);
- const sessionColl = session.getDatabase(dbName)[collName];
-
- // Set up the collection.
- assert.writeOK(sessionColl.insert({_id: 0}, {writeConcern: {w: "majority"}}));
-
- // Assert a transaction can run with the given session options and read concern level
- // majority.
- session.startTransaction({readConcern: {level: "majority"}});
- assert.eq(1, sessionColl.find().itcount());
- assert.commandWorked(session.commitTransaction_forTesting());
-
- session.endSession();
- }
-
- runTest({causalConsistency: false});
- runTest({causalConsistency: true});
-}());
diff --git a/jstests/core/txns/read_concerns.js b/jstests/core/txns/read_concerns.js
new file mode 100644
index 00000000000..0256e49eca9
--- /dev/null
+++ b/jstests/core/txns/read_concerns.js
@@ -0,0 +1,53 @@
+// Verifies which read concern levels transactions support, with and without afterClusterTime.
+//
+// @tags: [uses_transactions]
+(function() {
+ "use strict";
+
+ const dbName = "test";
+ const collName = "supported_read_concern_levels";
+
+ function runTest(level, sessionOptions, supported) {
+ jsTestLog("Testing transactions with read concern level: " + level +
+ " and sessionOptions: " + tojson(sessionOptions));
+
+ db.getSiblingDB(dbName).runCommand({drop: collName, writeConcern: {w: "majority"}});
+
+ const session = db.getMongo().startSession(sessionOptions);
+ const sessionDB = session.getDatabase(dbName);
+ const sessionColl = sessionDB[collName];
+
+ // Set up the collection.
+ assert.writeOK(sessionColl.insert({_id: 0}, {writeConcern: {w: "majority"}}));
+
+ session.startTransaction({readConcern: {level: level}});
+ const res = sessionDB.runCommand({find: collName});
+ if (supported) {
+ assert.commandWorked(res,
+ "expected success, read concern level: " + level +
+ ", sessionOptions: " + tojson(sessionOptions));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ } else {
+ assert.commandFailedWithCode(res,
+ ErrorCodes.InvalidOptions,
+ "expected failure, read concern level: " + level +
+ ", sessionOptions: " + tojson(sessionOptions));
+ assert.commandFailedWithCode(session.abortTransaction_forTesting(),
+ ErrorCodes.NoSuchTransaction);
+ }
+
+ session.endSession();
+ }
+
+ const kSupportedLevels = ["local", "majority", "snapshot"];
+ for (let level of kSupportedLevels) {
+ runTest(level, {causalConsistency: false}, true /*supported*/);
+ runTest(level, {causalConsistency: true}, true /*supported*/);
+ }
+
+ const kUnsupportedLevels = ["available", "linearizable"];
+ for (let level of kUnsupportedLevels) {
+ runTest(level, {causalConsistency: false}, false /*supported*/);
+ runTest(level, {causalConsistency: true}, false /*supported*/);
+ }
+}());
diff --git a/jstests/sharding/transactions_read_concerns.js b/jstests/sharding/transactions_read_concerns.js
index b733899f086..07b754706c0 100644
--- a/jstests/sharding/transactions_read_concerns.js
+++ b/jstests/sharding/transactions_read_concerns.js
@@ -1,4 +1,4 @@
-// Verifies basic sharded transaction behavior with read concern level majority and snapshot.
+// Verifies basic sharded transaction behavior with the supported read concern levels.
//
// @tags: [
// requires_find_command,
@@ -65,7 +65,7 @@
assert.writeOK(sessionDB[collName].remove({_id: 5}));
}
- const kAllowedReadConcernLevels = ["majority", "snapshot"];
+ const kAllowedReadConcernLevels = ["local", "majority", "snapshot"];
for (let readConcernLevel of kAllowedReadConcernLevels) {
runTest(st, {level: readConcernLevel}, {causalConsistency: false});
runTest(st, {level: readConcernLevel}, {causalConsistency: true});
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 3d935768490..01e4b18abca 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -146,6 +146,12 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd,
const StringMap<int> alwaysRetryableCmds = {
{"aggregate", 1}, {"distinct", 1}, {"find", 1}, {"getMore", 1}, {"killCursors", 1}};
+bool isReadConcernLevelAllowedInTransaction(repl::ReadConcernLevel readConcernLevel) {
+ return readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern ||
+ readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern ||
+ readConcernLevel == repl::ReadConcernLevel::kLocalReadConcern;
+}
+
} // unnamed namespace
TransactionRouter::Participant::Participant(bool isCoordinator,
@@ -477,9 +483,8 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
} else {
uassert(ErrorCodes::InvalidOptions,
"The first command in a transaction cannot specify a readConcern level other "
- "than snapshot or majority",
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ||
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern);
+ "than local, majority, or snapshot",
+ isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel()));
}
_readConcernArgs = readConcernArgs;
} else {
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 8f845aaae59..5037c4f9769 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -47,7 +47,6 @@ using executor::RemoteCommandRequest;
class TransactionRouterTest : public ShardingTestFixture {
protected:
const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(3, 1));
- const TxnNumber kTxnNumber = 10;
const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
@@ -59,6 +58,14 @@ protected:
const ShardId shard3 = ShardId("shard3");
+ const StringMap<repl::ReadConcernLevel> supportedNonSnapshotRCLevels = {
+ {"local", repl::ReadConcernLevel::kLocalReadConcern},
+ {"majority", repl::ReadConcernLevel::kMajorityReadConcern}};
+
+ const std::vector<repl::ReadConcernLevel> unsupportedRCLevels = {
+ repl::ReadConcernLevel::kAvailableReadConcern,
+ repl::ReadConcernLevel::kLinearizableReadConcern};
+
void setUp() override {
ShardingTestFixture::setUp();
configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
@@ -534,13 +541,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
ASSERT_BSONOBJ_EQ(expectedNewObj, newCmd);
}
-TEST_F(TransactionRouterTestWithDefaultSession,
- CannotUpconvertIfLevelOtherThanSnapshotOrMajorityWasGiven) {
- auto readConcernLevels = {repl::ReadConcernLevel::kLocalReadConcern,
- repl::ReadConcernLevel::kLinearizableReadConcern,
- repl::ReadConcernLevel::kAvailableReadConcern};
-
- for (auto readConcernLevel : readConcernLevels) {
+TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedReadConcernLevels) {
+ for (auto readConcernLevel : unsupportedRCLevels) {
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(readConcernLevel);
TxnNumber txnNum{3};
@@ -552,13 +554,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
}
}
-TEST_F(TransactionRouterTestWithDefaultSession,
- CannotUpconvertIfLevelOtherThanSnapshotOrMajorityWasGivenWithAfterClusterTime) {
- auto readConcernLevels = {repl::ReadConcernLevel::kLocalReadConcern,
- repl::ReadConcernLevel::kLinearizableReadConcern,
- repl::ReadConcernLevel::kAvailableReadConcern};
-
- for (auto readConcernLevel : readConcernLevels) {
+TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfterClusterTime) {
+ for (auto readConcernLevel : unsupportedRCLevels) {
repl::ReadConcernArgs::get(operationContext()) =
repl::ReadConcernArgs(LogicalTime(Timestamp(10, 1)), readConcernLevel);
@@ -571,12 +568,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
}
}
-TEST_F(TransactionRouterTestWithDefaultSession, CannotUpconvertWithAfterOpTime) {
- auto readConcernLevels = {repl::ReadConcernLevel::kLocalReadConcern,
- repl::ReadConcernLevel::kLinearizableReadConcern,
- repl::ReadConcernLevel::kAvailableReadConcern};
-
- for (auto readConcernLevel : readConcernLevels) {
+TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfterOpTime) {
+ for (auto readConcernLevel : unsupportedRCLevels) {
repl::ReadConcernArgs::get(operationContext()) =
repl::ReadConcernArgs(repl::OpTime(Timestamp(10, 1), 2), readConcernLevel);
@@ -1401,108 +1394,111 @@ TEST_F(TransactionRouterTestWithDefaultSession,
ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
}
-TEST_F(TransactionRouterTestWithDefaultSession, MajorityReadConcernHasNoAtClusterTime) {
- repl::ReadConcernArgs::get(operationContext()) =
- repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern);
+TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernHasNoAtClusterTime) {
+ TxnNumber txnNum{3};
+ for (auto rcIt : supportedNonSnapshotRCLevels) {
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second);
- auto& txnRouter(*TransactionRouter::get(operationContext()));
- txnRouter.beginOrContinueTxn(operationContext(), kTxnNumber, true);
+ auto& txnRouter(*TransactionRouter::get(operationContext()));
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum++, true);
- // No atClusterTime is placed on the router by default.
- ASSERT_FALSE(txnRouter.getAtClusterTime());
+ // No atClusterTime is placed on the router by default.
+ ASSERT_FALSE(txnRouter.getAtClusterTime());
- // Can't compute and set an atClusterTime.
- txnRouter.setDefaultAtClusterTime(operationContext());
- ASSERT_FALSE(txnRouter.getAtClusterTime());
+ // Can't compute and set an atClusterTime.
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ ASSERT_FALSE(txnRouter.getAtClusterTime());
- txnRouter.computeAndSetAtClusterTime(
- operationContext(), true, {shard1}, NamespaceString("test.coll"), BSONObj(), BSONObj());
- ASSERT_FALSE(txnRouter.getAtClusterTime());
+ txnRouter.computeAndSetAtClusterTime(
+ operationContext(), true, {shard1}, NamespaceString("test.coll"), BSONObj(), BSONObj());
+ ASSERT_FALSE(txnRouter.getAtClusterTime());
- txnRouter.computeAndSetAtClusterTimeForUnsharded(operationContext(), shard1);
- ASSERT_FALSE(txnRouter.getAtClusterTime());
+ txnRouter.computeAndSetAtClusterTimeForUnsharded(operationContext(), shard1);
+ ASSERT_FALSE(txnRouter.getAtClusterTime());
- // Can't continue on snapshot errors.
- ASSERT_THROWS_CODE(
- txnRouter.onSnapshotError(), AssertionException, ErrorCodes::NoSuchTransaction);
+ // Can't continue on snapshot errors.
+ ASSERT_THROWS_CODE(
+ txnRouter.onSnapshotError(), AssertionException, ErrorCodes::NoSuchTransaction);
+ }
}
-TEST_F(TransactionRouterTestWithDefaultSession, AttachesMajorityReadConcernToNewParticipants) {
- repl::ReadConcernArgs::get(operationContext()) =
- repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern);
-
- auto& txnRouter(*TransactionRouter::get(operationContext()));
- txnRouter.beginOrContinueTxn(operationContext(), kTxnNumber, true);
-
- const BSONObj rcMajority = BSON("level"
- << "majority");
-
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("insert"
- << "test"));
- ASSERT_BSONOBJ_EQ(rcMajority, newCmd["readConcern"].Obj());
-
+TEST_F(TransactionRouterTestWithDefaultSession,
+ SupportedNonSnapshotReadConcernLevelsArePassedThrough) {
+ TxnNumber txnNum{3};
+ for (auto rcIt : supportedNonSnapshotRCLevels) {
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second);
- // Only attached on first command to a participant.
- newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("insert"
- << "test"));
- ASSERT(newCmd["readConcern"].eoo());
+ auto& txnRouter(*TransactionRouter::get(operationContext()));
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum++, true /* startTransaction */);
+ txnRouter.setDefaultAtClusterTime(operationContext());
- // Attached for new participants after the first one.
- newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2,
- BSON("insert"
- << "test"));
- ASSERT_BSONOBJ_EQ(rcMajority, newCmd["readConcern"].Obj());
+ const BSONObj expectedRC = BSON("level" << rcIt.first);
+ auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
+ BSON("insert"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(expectedRC, newCmd["readConcern"].Obj());
+
+ // Only attached on first command to a participant.
+ newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
+ BSON("insert"
+ << "test"));
+ ASSERT(newCmd["readConcern"].eoo());
+
+ // Attached for new participants after the first one.
+ newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2,
+ BSON("insert"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(expectedRC, newCmd["readConcern"].Obj());
+ }
}
+
TEST_F(TransactionRouterTestWithDefaultSession,
- AttachingMajorityReadConcernPreservesAfterClusterTime) {
+ NonSnapshotReadConcernLevelsPreserveAfterClusterTime) {
const auto clusterTime = LogicalTime(Timestamp(10, 1));
- repl::ReadConcernArgs::get(operationContext()) =
- repl::ReadConcernArgs(clusterTime, repl::ReadConcernLevel::kMajorityReadConcern);
-
- auto& txnRouter(*TransactionRouter::get(operationContext()));
- txnRouter.beginOrContinueTxn(operationContext(), kTxnNumber, true);
+ TxnNumber txnNum{3};
+ for (auto rcIt : supportedNonSnapshotRCLevels) {
+ repl::ReadConcernArgs::get(operationContext()) =
+ repl::ReadConcernArgs(clusterTime, rcIt.second);
- // Call setDefaultAtClusterTime to simulate real command execution.
- txnRouter.setDefaultAtClusterTime(operationContext());
+ auto& txnRouter(*TransactionRouter::get(operationContext()));
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum++, true /* startTransaction */);
+ txnRouter.setDefaultAtClusterTime(operationContext());
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("insert"
- << "test"));
- ASSERT_BSONOBJ_EQ(BSON("level"
- << "majority"
- << "afterClusterTime"
- << clusterTime.asTimestamp()),
- newCmd["readConcern"].Obj());
+ auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
+ BSON("insert"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(
+ BSON("level" << rcIt.first << "afterClusterTime" << clusterTime.asTimestamp()),
+ newCmd["readConcern"].Obj());
+ }
}
-TEST_F(TransactionRouterTestWithDefaultSession, AttachingMajorityReadConcernPreservesAfterOpTime) {
+TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernLevelsPreserveAfterOpTime) {
const auto opTime = repl::OpTime(Timestamp(10, 1), 2);
- repl::ReadConcernArgs::get(operationContext()) =
- repl::ReadConcernArgs(opTime, repl::ReadConcernLevel::kMajorityReadConcern);
+ TxnNumber txnNum{3};
+ for (auto rcIt : supportedNonSnapshotRCLevels) {
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(opTime, rcIt.second);
- auto& txnRouter(*TransactionRouter::get(operationContext()));
- txnRouter.beginOrContinueTxn(operationContext(), kTxnNumber, true);
+ auto& txnRouter(*TransactionRouter::get(operationContext()));
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum++, true /* startTransaction */);
- // Call setDefaultAtClusterTime to simulate real command execution.
- txnRouter.setDefaultAtClusterTime(operationContext());
+ // Call setDefaultAtClusterTime to simulate real command execution.
+ txnRouter.setDefaultAtClusterTime(operationContext());
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("insert"
- << "test"));
- ASSERT_BSONOBJ_EQ(BSON("level"
- << "majority"
- << "afterOpTime"
- << opTime),
- newCmd["readConcern"].Obj());
+ auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
+ BSON("insert"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(BSON("level" << rcIt.first << "afterOpTime" << opTime),
+ newCmd["readConcern"].Obj());
+ }
}
// Begins a transaction with snapshot level read concern and sets a default cluster time.
class TransactionRouterTestWithDefaultSessionAndStartedSnapshot
: public TransactionRouterTestWithDefaultSession {
protected:
+ const TxnNumber kTxnNumber = 10;
const BSONObj rcLatestInMemoryAtClusterTime = BSON("level"
<< "snapshot"
<< "atClusterTime"