diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2019-03-12 17:00:09 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2019-03-13 19:10:21 -0400 |
commit | 88462aac153791dc07d129e510110295335663ac (patch) | |
tree | 6ee832e68413a6a1bdeed8c717b051af0cadacba | |
parent | 41eac7940607f4750f078d89962240bc88fa1359 (diff) | |
download | mongo-88462aac153791dc07d129e510110295335663ac.tar.gz |
SERVER-39877 Make router send commitTransaction directly to participants for read-only multi-shard transactions
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml | 1 | ||||
-rw-r--r-- | jstests/sharding/txn_read_only_transactions.js | 321 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 68 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.h | 6 | ||||
-rw-r--r-- | src/mongo/s/transaction_router_test.cpp | 124 |
5 files changed, 520 insertions, 0 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index e02ff3d704b..6b57a4c6940 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -90,6 +90,7 @@ selector: - jstests/sharding/txn_two_phase_commit_killop.js - jstests/sharding/txn_two_phase_commit_recover_decision.js - jstests/sharding/txn_two_phase_commit_wait_for_majority_commit_after_stepup.js + - jstests/sharding/txn_read_only_transactions.js - jstests/sharding/txn_being_applied_to_secondary_cannot_be_killed.js - jstests/sharding/txn_writes_during_movechunk.js - jstests/sharding/update_sharded.js diff --git a/jstests/sharding/txn_read_only_transactions.js b/jstests/sharding/txn_read_only_transactions.js new file mode 100644 index 00000000000..f3235e06c02 --- /dev/null +++ b/jstests/sharding/txn_read_only_transactions.js @@ -0,0 +1,321 @@ +/** + * Tests that the appropriate commit path (single-shard, read-only, multi-shard) is taken for a + * variety of transaction types. + * + * Checks that the response formats are correct across each type for several scenarios, including + * no failures, a participant having failed over, a participant being unable to satisfy the client's + * writeConcern, and an invalid client writeConcern. + * + * @tags: [uses_transactions, uses_multi_shard_transaction] + */ + +(function() { + 'use strict'; + + load("jstests/libs/write_concern_util.js"); + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + + const addTxnFields = function(command, lsid, txnNumber) { + const txnFields = { + lsid: lsid, + txnNumber: NumberLong(txnNumber), + stmtId: NumberInt(0), + autocommit: false, + }; + return Object.assign({}, command, txnFields); + }; + + const defaultCommitCommand = { + commitTransaction: 1, + writeConcern: {w: "majority", wtimeout: 3000} + }; + const noop = () => {}; + + const dbName = "test"; + const collName = "foo"; + const ns = dbName + "." + collName; + + let lsid = {id: UUID()}; + let txnNumber = 0; + + let st = new ShardingTest({ + rs0: {nodes: [{}, {rsConfig: {priority: 0}}]}, + rs1: {nodes: [{}, {rsConfig: {priority: 0}}]}, + config: 1, + other: {mongosOptions: {verbose: 3}}, + }); + + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard1.shardName})); + + // Create a "dummy" collection for doing noop writes to advance shard's last applied OpTimes. + assert.commandWorked(st.s.getDB(dbName).getCollection("dummy").insert({dummy: 1})); + + // Create a sharded collection with a chunk on each shard: + // shard0: [-inf, 0) + // shard1: [0, 10) + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: -1}, to: st.shard0.shardName})); + + flushRoutersAndRefreshShardMetadata(st, {ns}); + + // For each transaction type, contains the list of statements for that type. + const transactionTypes = { + readOnlySingleShardSingleStatement: txnNumber => { + return [{ + find: collName, + filter: {_id: txnNumber}, + startTransaction: true, + }]; + }, + readOnlySingleShardMultiStatement: txnNumber => { + return [ + { + find: collName, + filter: {_id: txnNumber}, + startTransaction: true, + }, + {distinct: collName, key: "_id", query: {_id: txnNumber}}, + ]; + }, + readOnlyMultiShardSingleStatement: txnNumber => { + return [{find: collName, startTransaction: true}]; + }, + readOnlyMultiShardMultiStatement: txnNumber => { + return [ + { + find: collName, + filter: {_id: txnNumber}, + startTransaction: true, + }, + {distinct: collName, key: "_id", query: {_id: (txnNumber * -1)}}, + ]; + }, + writeOnlySingleShardSingleStatement: txnNumber => { + return [{ + insert: collName, + documents: [{_id: txnNumber}], + startTransaction: true, + }]; + }, + writeOnlySingleShardMultiStatement: txnNumber => { + return [ + { + insert: collName, + documents: [{_id: txnNumber}], + startTransaction: true, + }, + { + update: collName, + updates: [{q: {_id: txnNumber}, u: {$set: {updated: 1}}}], + } + ]; + }, + writeOnlyMultiShardSingleStatement: txnNumber => { + return [{ + insert: collName, + documents: [{_id: (txnNumber * -1)}, {_id: txnNumber}], + startTransaction: true, + }]; + }, + writeOnlyMultiShardMultiStatement: txnNumber => { + return [ + { + insert: collName, + documents: [{_id: txnNumber}], + startTransaction: true, + }, + { + insert: collName, + documents: [{_id: (txnNumber * -1)}], + } + ]; + }, + readWriteSingleShard: txnNumber => { + return [ + { + find: collName, + filter: {_id: txnNumber}, + startTransaction: true, + }, + { + insert: collName, + documents: [{_id: txnNumber}], + } + ]; + }, + readWriteMultiShard: txnNumber => { + return [ + { + find: collName, + filter: {_id: txnNumber}, + startTransaction: true, + }, + { + insert: collName, + documents: [{_id: (txnNumber * -1)}], + } + ]; + }, + }; + + const failureModes = { + noFailures: { + beforeStatements: noop, + beforeCommit: noop, + getCommitCommand: (lsid, txnNumber) => { + return addTxnFields(defaultCommitCommand, lsid, txnNumber); + }, + checkCommitResult: (res) => { + // Commit should return ok without writeConcern error + assert.commandWorked(res); + assert.eq(null, res.errorLabels); + }, + cleanUp: noop, + }, + participantStepsDownBeforeClientSendsCommit: { + beforeStatements: noop, + beforeCommit: () => { + // Participant primary steps down. + assert.commandWorked( + st.shard1.adminCommand({replSetStepDown: 1 /* stepDownSecs */, force: true})); + }, + getCommitCommand: (lsid, txnNumber) => { + return addTxnFields(defaultCommitCommand, lsid, txnNumber); + }, + checkCommitResult: (res) => { + // Commit should return NoSuchTransaction. + assert.commandFailedWithCode(res, ErrorCodes.NoSuchTransaction); + assert.eq(["TransientTransactionError"], res.errorLabels); + }, + cleanUp: () => { + st.rs1.awaitNodesAgreeOnPrimary(); + }, + }, + participantCannotMajorityCommitWritesClientSendsWriteConcernMajority: { + beforeStatements: () => { + // Participant cannot majority commit writes. + stopServerReplication(st.rs1.getSecondaries()); + + // Do a write on rs1 through the router outside the transaction to ensure the + // transaction will choose a read time that has not been majority committed. + assert.commandWorked(st.s.getDB(dbName).getCollection("dummy").insert({dummy: 1})); + }, + beforeCommit: noop, + getCommitCommand: (lsid, txnNumber) => { + return addTxnFields(defaultCommitCommand, lsid, txnNumber); + }, + checkCommitResult: (res) => { + // Commit should return ok with a writeConcernError with wtimeout. + assert.commandWorkedIgnoringWriteConcernErrors(res); + checkWriteConcernTimedOut(res); + assert.eq(null, res.errorLabels); + }, + cleanUp: () => { + restartServerReplication(st.rs1.getSecondaries()); + }, + }, + participantCannotMajorityCommitWritesClientSendsWriteConcern1: { + beforeStatements: () => { + // Participant cannot majority commit writes. + stopServerReplication(st.rs1.getSecondaries()); + + // Do a write on rs1 through the router outside the transaction to ensure the + // transaction will choose a read time that has not been majority committed. + assert.commandWorked(st.s.getDB(dbName).getCollection("dummy").insert({dummy: 1})); + }, + beforeCommit: noop, + getCommitCommand: (lsid, txnNumber) => { + return addTxnFields({commitTransaction: 1, writeConcern: {w: 1}}, lsid, txnNumber); + }, + checkCommitResult: (res) => { + // Commit should return ok without writeConcern error + assert.commandWorked(res); + assert.eq(null, res.errorLabels); + }, + cleanUp: () => { + restartServerReplication(st.rs1.getSecondaries()); + }, + }, + clientSendsInvalidWriteConcernOnCommit: { + beforeStatements: noop, + beforeCommit: noop, + getCommitCommand: (lsid, txnNumber) => { + // Client sends invalid writeConcern on commit. + return addTxnFields( + {commitTransaction: 1, writeConcern: {w: "invalid"}}, lsid, txnNumber); + }, + checkCommitResult: (res) => { + // Commit should return ok with writeConcernError without wtimeout. + assert.commandWorkedIgnoringWriteConcernErrors(res); + assertWriteConcernError(res); + assert.eq(ErrorCodes.UnknownReplWriteConcern, res.writeConcernError.code); + assert.eq(null, res.writeConcernError.errInfo); // errInfo only set for wtimeout + assert.eq(null, res.errorLabels); + }, + cleanUp: noop, + }, + }; + + for (const failureModeName in failureModes) { + for (const type in transactionTypes) { + // TODO (SERVER-37881): Unblacklist these test cases once the coordinator times out + // waiting for votes. + if (failureModeName.includes("participantCannotMajorityCommitWrites") && + (type === "writeOnlyMultiShardSingleStatement" || + type === "writeOnlyMultiShardMultiStatement" || type === "readWriteMultiShard")) { + jsTest.log( + `Testing ${failureModeName} with ${type} is skipped until SERVER-37881 is implemented`); + continue; + } + + txnNumber++; + jsTest.log(`Testing ${failureModeName} with ${type} at txnNumber ${txnNumber}`); + + const failureMode = failureModes[failureModeName]; + + // Run the statements. + failureMode.beforeStatements(); + transactionTypes[type](txnNumber).forEach(command => { + assert.commandWorked( + st.s.getDB(dbName).runCommand(addTxnFields(command, lsid, txnNumber))); + }); + + // Run commit. + failureMode.beforeCommit(); + const res = st.s.adminCommand(failureMode.getCommitCommand(lsid, txnNumber)); + print(`Response for ${failureModeName} for ${type}: ` + tojson(res)); + + // Check the commit response. + failureMode.checkCommitResult(res); + + if (type.includes("SingleShard")) { + assert.eq(1, + rawMongoProgramOutput() + .match(new RegExp('Committing single-shard transaction')) + .length); + } else if (type.includes("readOnly")) { + assert.eq(1, + rawMongoProgramOutput() + .match(new RegExp('Committing read-only transaction')) + .length); + } else if (type.includes("MultiShard")) { + assert.eq(1, + rawMongoProgramOutput() + .match(new RegExp('Committing multi-shard transaction')) + .length); + } else { + assert(false, `Unknown transaction type: ${type}`); + } + + clearRawMongoProgramOutput(); + + failureMode.cleanUp(); + } + } + + st.stop(); + +})(); diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 754dc91094f..b3bba1ac2db 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -42,9 +42,12 @@ #include "mongo/db/logical_session_id.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/session_catalog.h" +#include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/async_requests_sender.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" +#include "mongo/s/multi_statement_transaction_requests_sender.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -635,6 +638,55 @@ BSONObj TransactionRouter::_commitSingleShardTransaction(OperationContext* opCtx .response; } +BSONObj TransactionRouter::_commitReadOnlyTransaction(OperationContext* opCtx) { + // Assemble requests. + std::vector<AsyncRequestsSender::Request> requests; + for (const auto& participant : _participants) { + CommitTransaction commitCmd; + commitCmd.setDbName(NamespaceString::kAdminDb); + const auto commitCmdObj = commitCmd.toBSON( + BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())); + requests.emplace_back(participant.first, commitCmdObj); + } + + LOG(0) << txnIdToString() << " Committing read-only transaction on " << requests.size() + << " shards"; + + // Send the requests. + MultiStatementTransactionRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + NamespaceString::kAdminDb, + requests, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + Shard::RetryPolicy::kIdempotent); + + // Receive the responses. + while (!ars.done()) { + auto response = ars.next(); + + uassertStatusOK(response.swResponse); + const auto result = response.swResponse.getValue().data; + + // If any shard returned an error, return the error immediately. + const auto commandStatus = getStatusFromCommandResult(result); + if (!commandStatus.isOK()) { + return result; + } + + // If any participant had a writeConcern error, return the participant's writeConcern + // error immediately. + const auto writeConcernStatus = getWriteConcernStatusFromCommandResult(result); + if (!writeConcernStatus.isOK()) { + return result; + } + } + + // If all the responses were ok, return empty BSON, which the commitTransaction command will + // interpret as success. + return BSONObj(); +} + BSONObj TransactionRouter::_commitMultiShardTransaction(OperationContext* opCtx) { invariant(_coordinatorId); auto coordinatorIter = _participants.find(*_coordinatorId); @@ -731,10 +783,26 @@ BSONObj TransactionRouter::commitTransaction( return BSON("ok" << 1); } + bool allParticipantsReadOnly = true; + for (const auto& participant : _participants) { + uassert(ErrorCodes::NoSuchTransaction, + "Can't send commit unless all previous statements were successful", + participant.second.readOnly != Participant::ReadOnly::kUnset); + if (participant.second.readOnly == Participant::ReadOnly::kNotReadOnly) { + allParticipantsReadOnly = false; + } + } + + // Make the single-shard commit path take precedence. The read-only optimization is only to skip + // two-phase commit for a read-only multi-shard transaction. if (_participants.size() == 1) { return _commitSingleShardTransaction(opCtx); } + if (allParticipantsReadOnly) { + return _commitReadOnlyTransaction(opCtx); + } + return _commitMultiShardTransaction(opCtx); } diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index ce349fe40e2..e885bf77139 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -283,6 +283,12 @@ private: */ BSONObj _commitSingleShardTransaction(OperationContext* opCtx); + /** + * Skips explicit commit and instead waits for participants' read Timestamps to reach the level + * of durability specified by the writeConcern on 'opCtx'. + */ + BSONObj _commitReadOnlyTransaction(OperationContext* opCtx); + BSONObj _commitWithRecoveryToken(OperationContext* opCtx, const TxnRecoveryToken& recoveryToken); diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 0c9764180cf..53d04a4e68d 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -703,6 +703,38 @@ void checkWriteConcern(const BSONObj& cmdObj, const WriteConcernOptions& expecte } TEST_F(TransactionRouterTestWithDefaultSession, + SendCommitDirectlyForSingleParticipantThatIsReadOnly) { + TxnNumber txnNum{3}; + + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + + txnRouter.attachTxnFieldsIfNeeded(shard1, {}); + txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + + TxnRecoveryToken recoveryToken; + recoveryToken.setShardId(shard1); + auto future = + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); + + onCommand([&](const RemoteCommandRequest& request) { + ASSERT_EQ(hostAndPort1, request.target); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "commitTransaction"); + + checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + + return BSON("ok" << 1); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForSingleParticipantThatDidAWrite) { TxnNumber txnNum{3}; @@ -734,6 +766,98 @@ TEST_F(TransactionRouterTestWithDefaultSession, } TEST_F(TransactionRouterTestWithDefaultSession, + SendCommitDirectlyForMultipleParticipantsThatAreAllReadOnly) { + TxnNumber txnNum{3}; + + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + + txnRouter.attachTxnFieldsIfNeeded(shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(shard2, {}); + txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(shard2, kOkReadOnlyTrueResponse); + + TxnRecoveryToken recoveryToken; + recoveryToken.setShardId(shard1); + auto future = + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); + + // The requests are scheduled in a nondeterministic order, since they are scheduled by iterating + // over the participant list, which is stored as a hash map. So, just check that all expected + // hosts and ports were targeted at the end. + std::set<HostAndPort> expectedHostAndPorts{hostAndPort1, hostAndPort2}; + std::set<HostAndPort> seenHostAndPorts; + for (int i = 0; i < 2; i++) { + onCommand([&](const RemoteCommandRequest& request) { + seenHostAndPorts.insert(request.target); + + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "commitTransaction"); + + // The shard with hostAndPort1 is expected to be the coordinator. + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, (request.target == hostAndPort1)); + + return kOkReadOnlyTrueResponse; + }); + } + + future.timed_get(kFutureTimeout); + ASSERT(expectedHostAndPorts == seenHostAndPorts); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + SendCoordinateCommitForMultipleParticipantsOnlyOneDidAWrite) { + TxnNumber txnNum{3}; + + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + + txnRouter.attachTxnFieldsIfNeeded(shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(shard2, {}); + txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + + TxnRecoveryToken recoveryToken; + recoveryToken.setShardId(shard1); + auto future = + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); + + onCommand([&](const RemoteCommandRequest& request) { + ASSERT_EQ(hostAndPort1, request.target); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "coordinateCommitTransaction"); + + std::set<std::string> expectedParticipants = {shard1.toString(), shard2.toString()}; + auto participantElements = request.cmdObj["participants"].Array(); + ASSERT_EQ(expectedParticipants.size(), participantElements.size()); + + for (const auto& element : participantElements) { + auto shardId = element["shardId"].valuestr(); + ASSERT_EQ(1ull, expectedParticipants.count(shardId)); + expectedParticipants.erase(shardId); + } + + checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + + return BSON("ok" << 1); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(TransactionRouterTestWithDefaultSession, SendCoordinateCommitForMultipleParticipantsAllDidWrites) { TxnNumber txnNum{3}; |