diff options
-rw-r--r-- | jstests/core/txns/majority_read_concern.js | 33 | ||||
-rw-r--r-- | jstests/core/txns/read_concerns.js | 53 | ||||
-rw-r--r-- | jstests/sharding/transactions_read_concerns.js | 4 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/transaction_router_test.cpp | 186 |
5 files changed, 154 insertions, 133 deletions
diff --git a/jstests/core/txns/majority_read_concern.js b/jstests/core/txns/majority_read_concern.js deleted file mode 100644 index e21f8b8570e..00000000000 --- a/jstests/core/txns/majority_read_concern.js +++ /dev/null @@ -1,33 +0,0 @@ -// Verifies transactions can run with majority read concern, with or without afterClusterTime. -// -// @tags: [uses_transactions] -(function() { - "use strict"; - - const dbName = "test"; - const collName = "majority_read_concern"; - - function runTest(sessionOptions) { - jsTestLog("Testing transactions with majority read concern and sessionOptions: " + - tojson(sessionOptions)); - - db.getSiblingDB(dbName).runCommand({drop: collName, writeConcern: {w: "majority"}}); - - const session = db.getMongo().startSession(sessionOptions); - const sessionColl = session.getDatabase(dbName)[collName]; - - // Set up the collection. - assert.writeOK(sessionColl.insert({_id: 0}, {writeConcern: {w: "majority"}})); - - // Assert a transaction can run with the given session options and read concern level - // majority. - session.startTransaction({readConcern: {level: "majority"}}); - assert.eq(1, sessionColl.find().itcount()); - assert.commandWorked(session.commitTransaction_forTesting()); - - session.endSession(); - } - - runTest({causalConsistency: false}); - runTest({causalConsistency: true}); -}()); diff --git a/jstests/core/txns/read_concerns.js b/jstests/core/txns/read_concerns.js new file mode 100644 index 00000000000..0256e49eca9 --- /dev/null +++ b/jstests/core/txns/read_concerns.js @@ -0,0 +1,53 @@ +// Verifies which read concern levels transactions support, with and without afterClusterTime. +// +// @tags: [uses_transactions] +(function() { + "use strict"; + + const dbName = "test"; + const collName = "supported_read_concern_levels"; + + function runTest(level, sessionOptions, supported) { + jsTestLog("Testing transactions with read concern level: " + level + + " and sessionOptions: " + tojson(sessionOptions)); + + db.getSiblingDB(dbName).runCommand({drop: collName, writeConcern: {w: "majority"}}); + + const session = db.getMongo().startSession(sessionOptions); + const sessionDB = session.getDatabase(dbName); + const sessionColl = sessionDB[collName]; + + // Set up the collection. + assert.writeOK(sessionColl.insert({_id: 0}, {writeConcern: {w: "majority"}})); + + session.startTransaction({readConcern: {level: level}}); + const res = sessionDB.runCommand({find: collName}); + if (supported) { + assert.commandWorked(res, + "expected success, read concern level: " + level + + ", sessionOptions: " + tojson(sessionOptions)); + assert.commandWorked(session.commitTransaction_forTesting()); + } else { + assert.commandFailedWithCode(res, + ErrorCodes.InvalidOptions, + "expected failure, read concern level: " + level + + ", sessionOptions: " + tojson(sessionOptions)); + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); + } + + session.endSession(); + } + + const kSupportedLevels = ["local", "majority", "snapshot"]; + for (let level of kSupportedLevels) { + runTest(level, {causalConsistency: false}, true /*supported*/); + runTest(level, {causalConsistency: true}, true /*supported*/); + } + + const kUnsupportedLevels = ["available", "linearizable"]; + for (let level of kUnsupportedLevels) { + runTest(level, {causalConsistency: false}, false /*supported*/); + runTest(level, {causalConsistency: true}, false /*supported*/); + } +}()); diff --git a/jstests/sharding/transactions_read_concerns.js b/jstests/sharding/transactions_read_concerns.js index b733899f086..07b754706c0 100644 --- a/jstests/sharding/transactions_read_concerns.js +++ b/jstests/sharding/transactions_read_concerns.js @@ -1,4 +1,4 @@ -// Verifies basic sharded transaction behavior with read concern level majority and snapshot. +// Verifies basic sharded transaction behavior with the supported read concern levels. // // @tags: [ // requires_find_command, @@ -65,7 +65,7 @@ assert.writeOK(sessionDB[collName].remove({_id: 5})); } - const kAllowedReadConcernLevels = ["majority", "snapshot"]; + const kAllowedReadConcernLevels = ["local", "majority", "snapshot"]; for (let readConcernLevel of kAllowedReadConcernLevels) { runTest(st, {level: readConcernLevel}, {causalConsistency: false}); runTest(st, {level: readConcernLevel}, {causalConsistency: true}); diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 3d935768490..01e4b18abca 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -146,6 +146,12 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd, const StringMap<int> alwaysRetryableCmds = { {"aggregate", 1}, {"distinct", 1}, {"find", 1}, {"getMore", 1}, {"killCursors", 1}}; +bool isReadConcernLevelAllowedInTransaction(repl::ReadConcernLevel readConcernLevel) { + return readConcernLevel == repl::ReadConcernLevel::kSnapshotReadConcern || + readConcernLevel == repl::ReadConcernLevel::kMajorityReadConcern || + readConcernLevel == repl::ReadConcernLevel::kLocalReadConcern; +} + } // unnamed namespace TransactionRouter::Participant::Participant(bool isCoordinator, @@ -477,9 +483,8 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, } else { uassert(ErrorCodes::InvalidOptions, "The first command in a transaction cannot specify a readConcern level other " - "than snapshot or majority", - readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern || - readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern); + "than local, majority, or snapshot", + isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())); } _readConcernArgs = readConcernArgs; } else { diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 8f845aaae59..5037c4f9769 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -47,7 +47,6 @@ using executor::RemoteCommandRequest; class TransactionRouterTest : public ShardingTestFixture { protected: const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(3, 1)); - const TxnNumber kTxnNumber = 10; const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); @@ -59,6 +58,14 @@ protected: const ShardId shard3 = ShardId("shard3"); + const StringMap<repl::ReadConcernLevel> supportedNonSnapshotRCLevels = { + {"local", repl::ReadConcernLevel::kLocalReadConcern}, + {"majority", repl::ReadConcernLevel::kMajorityReadConcern}}; + + const std::vector<repl::ReadConcernLevel> unsupportedRCLevels = { + repl::ReadConcernLevel::kAvailableReadConcern, + repl::ReadConcernLevel::kLinearizableReadConcern}; + void setUp() override { ShardingTestFixture::setUp(); configTargeter()->setFindHostReturnValue(kTestConfigShardHost); @@ -534,13 +541,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, ASSERT_BSONOBJ_EQ(expectedNewObj, newCmd); } -TEST_F(TransactionRouterTestWithDefaultSession, - CannotUpconvertIfLevelOtherThanSnapshotOrMajorityWasGiven) { - auto readConcernLevels = {repl::ReadConcernLevel::kLocalReadConcern, - repl::ReadConcernLevel::kLinearizableReadConcern, - repl::ReadConcernLevel::kAvailableReadConcern}; - - for (auto readConcernLevel : readConcernLevels) { +TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedReadConcernLevels) { + for (auto readConcernLevel : unsupportedRCLevels) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(readConcernLevel); TxnNumber txnNum{3}; @@ -552,13 +554,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, } } -TEST_F(TransactionRouterTestWithDefaultSession, - CannotUpconvertIfLevelOtherThanSnapshotOrMajorityWasGivenWithAfterClusterTime) { - auto readConcernLevels = {repl::ReadConcernLevel::kLocalReadConcern, - repl::ReadConcernLevel::kLinearizableReadConcern, - repl::ReadConcernLevel::kAvailableReadConcern}; - - for (auto readConcernLevel : readConcernLevels) { +TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfterClusterTime) { + for (auto readConcernLevel : unsupportedRCLevels) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(LogicalTime(Timestamp(10, 1)), readConcernLevel); @@ -571,12 +568,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, } } -TEST_F(TransactionRouterTestWithDefaultSession, CannotUpconvertWithAfterOpTime) { - auto readConcernLevels = {repl::ReadConcernLevel::kLocalReadConcern, - repl::ReadConcernLevel::kLinearizableReadConcern, - repl::ReadConcernLevel::kAvailableReadConcern}; - - for (auto readConcernLevel : readConcernLevels) { +TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfterOpTime) { + for (auto readConcernLevel : unsupportedRCLevels) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(repl::OpTime(Timestamp(10, 1), 2), readConcernLevel); @@ -1401,108 +1394,111 @@ TEST_F(TransactionRouterTestWithDefaultSession, ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); } -TEST_F(TransactionRouterTestWithDefaultSession, MajorityReadConcernHasNoAtClusterTime) { - repl::ReadConcernArgs::get(operationContext()) = - repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern); +TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernHasNoAtClusterTime) { + TxnNumber txnNum{3}; + for (auto rcIt : supportedNonSnapshotRCLevels) { + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second); - auto& txnRouter(*TransactionRouter::get(operationContext())); - txnRouter.beginOrContinueTxn(operationContext(), kTxnNumber, true); + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn(operationContext(), txnNum++, true); - // No atClusterTime is placed on the router by default. - ASSERT_FALSE(txnRouter.getAtClusterTime()); + // No atClusterTime is placed on the router by default. + ASSERT_FALSE(txnRouter.getAtClusterTime()); - // Can't compute and set an atClusterTime. - txnRouter.setDefaultAtClusterTime(operationContext()); - ASSERT_FALSE(txnRouter.getAtClusterTime()); + // Can't compute and set an atClusterTime. + txnRouter.setDefaultAtClusterTime(operationContext()); + ASSERT_FALSE(txnRouter.getAtClusterTime()); - txnRouter.computeAndSetAtClusterTime( - operationContext(), true, {shard1}, NamespaceString("test.coll"), BSONObj(), BSONObj()); - ASSERT_FALSE(txnRouter.getAtClusterTime()); + txnRouter.computeAndSetAtClusterTime( + operationContext(), true, {shard1}, NamespaceString("test.coll"), BSONObj(), BSONObj()); + ASSERT_FALSE(txnRouter.getAtClusterTime()); - txnRouter.computeAndSetAtClusterTimeForUnsharded(operationContext(), shard1); - ASSERT_FALSE(txnRouter.getAtClusterTime()); + txnRouter.computeAndSetAtClusterTimeForUnsharded(operationContext(), shard1); + ASSERT_FALSE(txnRouter.getAtClusterTime()); - // Can't continue on snapshot errors. - ASSERT_THROWS_CODE( - txnRouter.onSnapshotError(), AssertionException, ErrorCodes::NoSuchTransaction); + // Can't continue on snapshot errors. + ASSERT_THROWS_CODE( + txnRouter.onSnapshotError(), AssertionException, ErrorCodes::NoSuchTransaction); + } } -TEST_F(TransactionRouterTestWithDefaultSession, AttachesMajorityReadConcernToNewParticipants) { - repl::ReadConcernArgs::get(operationContext()) = - repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern); - - auto& txnRouter(*TransactionRouter::get(operationContext())); - txnRouter.beginOrContinueTxn(operationContext(), kTxnNumber, true); - - const BSONObj rcMajority = BSON("level" - << "majority"); - - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("insert" - << "test")); - ASSERT_BSONOBJ_EQ(rcMajority, newCmd["readConcern"].Obj()); - +TEST_F(TransactionRouterTestWithDefaultSession, + SupportedNonSnapshotReadConcernLevelsArePassedThrough) { + TxnNumber txnNum{3}; + for (auto rcIt : supportedNonSnapshotRCLevels) { + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second); - // Only attached on first command to a participant. - newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("insert" - << "test")); - ASSERT(newCmd["readConcern"].eoo()); + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn(operationContext(), txnNum++, true /* startTransaction */); + txnRouter.setDefaultAtClusterTime(operationContext()); - // Attached for new participants after the first one. - newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2, - BSON("insert" - << "test")); - ASSERT_BSONOBJ_EQ(rcMajority, newCmd["readConcern"].Obj()); + const BSONObj expectedRC = BSON("level" << rcIt.first); + auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, + BSON("insert" + << "test")); + ASSERT_BSONOBJ_EQ(expectedRC, newCmd["readConcern"].Obj()); + + // Only attached on first command to a participant. + newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, + BSON("insert" + << "test")); + ASSERT(newCmd["readConcern"].eoo()); + + // Attached for new participants after the first one. + newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2, + BSON("insert" + << "test")); + ASSERT_BSONOBJ_EQ(expectedRC, newCmd["readConcern"].Obj()); + } } + TEST_F(TransactionRouterTestWithDefaultSession, - AttachingMajorityReadConcernPreservesAfterClusterTime) { + NonSnapshotReadConcernLevelsPreserveAfterClusterTime) { const auto clusterTime = LogicalTime(Timestamp(10, 1)); - repl::ReadConcernArgs::get(operationContext()) = - repl::ReadConcernArgs(clusterTime, repl::ReadConcernLevel::kMajorityReadConcern); - - auto& txnRouter(*TransactionRouter::get(operationContext())); - txnRouter.beginOrContinueTxn(operationContext(), kTxnNumber, true); + TxnNumber txnNum{3}; + for (auto rcIt : supportedNonSnapshotRCLevels) { + repl::ReadConcernArgs::get(operationContext()) = + repl::ReadConcernArgs(clusterTime, rcIt.second); - // Call setDefaultAtClusterTime to simulate real command execution. - txnRouter.setDefaultAtClusterTime(operationContext()); + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn(operationContext(), txnNum++, true /* startTransaction */); + txnRouter.setDefaultAtClusterTime(operationContext()); - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("insert" - << "test")); - ASSERT_BSONOBJ_EQ(BSON("level" - << "majority" - << "afterClusterTime" - << clusterTime.asTimestamp()), - newCmd["readConcern"].Obj()); + auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, + BSON("insert" + << "test")); + ASSERT_BSONOBJ_EQ( + BSON("level" << rcIt.first << "afterClusterTime" << clusterTime.asTimestamp()), + newCmd["readConcern"].Obj()); + } } -TEST_F(TransactionRouterTestWithDefaultSession, AttachingMajorityReadConcernPreservesAfterOpTime) { +TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernLevelsPreserveAfterOpTime) { const auto opTime = repl::OpTime(Timestamp(10, 1), 2); - repl::ReadConcernArgs::get(operationContext()) = - repl::ReadConcernArgs(opTime, repl::ReadConcernLevel::kMajorityReadConcern); + TxnNumber txnNum{3}; + for (auto rcIt : supportedNonSnapshotRCLevels) { + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(opTime, rcIt.second); - auto& txnRouter(*TransactionRouter::get(operationContext())); - txnRouter.beginOrContinueTxn(operationContext(), kTxnNumber, true); + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn(operationContext(), txnNum++, true /* startTransaction */); - // Call setDefaultAtClusterTime to simulate real command execution. - txnRouter.setDefaultAtClusterTime(operationContext()); + // Call setDefaultAtClusterTime to simulate real command execution. + txnRouter.setDefaultAtClusterTime(operationContext()); - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("insert" - << "test")); - ASSERT_BSONOBJ_EQ(BSON("level" - << "majority" - << "afterOpTime" - << opTime), - newCmd["readConcern"].Obj()); + auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, + BSON("insert" + << "test")); + ASSERT_BSONOBJ_EQ(BSON("level" << rcIt.first << "afterOpTime" << opTime), + newCmd["readConcern"].Obj()); + } } // Begins a transaction with snapshot level read concern and sets a default cluster time. class TransactionRouterTestWithDefaultSessionAndStartedSnapshot : public TransactionRouterTestWithDefaultSession { protected: + const TxnNumber kTxnNumber = 10; const BSONObj rcLatestInMemoryAtClusterTime = BSON("level" << "snapshot" << "atClusterTime" |