diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-09-25 18:27:47 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-10-19 17:43:02 -0400 |
commit | 9f363b489585124afa1e26412e19f6728763e1ad (patch) | |
tree | 58421f03eb2679d4d55174f8b32611731aed7e35 | |
parent | 10934947f4a0e9fd83c256ed297f3a0a73c4d971 (diff) | |
download | mongo-9f363b489585124afa1e26412e19f6728763e1ad.tar.gz |
SERVER-37349 Later statements in a transaction should target shards using the global read timestamp
22 files changed, 382 insertions, 181 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index b832285d1a6..71ae75d7cef 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -177,6 +177,10 @@ selector: # - jstests/sharding/remove1.js # - jstests/sharding/remove2.js + # Moves a chunk before continuing a transaction, which can lead to snapshot errors if the + # CSRS failovers are sufficiently slow. + - jstests/sharding/transactions_target_at_point_in_time.js + executor: config: shell_options: diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 30dd78acf48..45d1718ce52 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -51,6 +51,7 @@ selector: - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js - jstests/sharding/transactions_stale_database_version_errors.js - jstests/sharding/transactions_stale_shard_version_errors.js + - jstests/sharding/transactions_target_at_point_in_time.js - jstests/sharding/transactions_view_resolution.js - jstests/sharding/txn_agg.js - jstests/sharding/txn_basic_two_phase_commit.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index 9bee9c8a4ed..a0ae78a83a8 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -366,6 +366,7 @@ selector: - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js - jstests/sharding/transactions_stale_database_version_errors.js - jstests/sharding/transactions_stale_shard_version_errors.js + - jstests/sharding/transactions_target_at_point_in_time.js - jstests/sharding/transactions_view_resolution.js - jstests/sharding/txn_agg.js - jstests/sharding/txn_basic_two_phase_commit.js diff --git a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 index aaedbeeb4f9..783ceda5d6c 100644 --- a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 +++ b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 @@ -61,6 +61,7 @@ selector: - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js - jstests/sharding/transactions_stale_database_version_errors.js - jstests/sharding/transactions_stale_shard_version_errors.js + - jstests/sharding/transactions_target_at_point_in_time.js - jstests/sharding/transactions_view_resolution.js - jstests/sharding/txn_agg.js - jstests/sharding/txn_basic_two_phase_commit.js diff --git a/jstests/sharding/transactions_target_at_point_in_time.js b/jstests/sharding/transactions_target_at_point_in_time.js new file mode 100644 index 00000000000..8422c50160b --- /dev/null +++ b/jstests/sharding/transactions_target_at_point_in_time.js @@ -0,0 +1,118 @@ +// Verifies mongos uses a versioned routing table to target subsequent requests in transactions with +// snapshot level read concern. +// +// @tags: [ +// requires_find_command, +// requires_sharding, +// uses_multi_shard_transaction, +// uses_transactions, +// ] +(function() { + "use strict"; + + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + + function expectChunks(st, ns, chunks) { + for (let i = 0; i < chunks.length; i++) { + assert.eq(chunks[i], + st.s.getDB("config").chunks.count({ns: ns, shard: st["shard" + i].shardName}), + "unexpected number of chunks on shard " + i); + } + } + + const dbName = "test"; + const collName = "foo"; + const ns = dbName + '.' + collName; + + const st = new ShardingTest({shards: 3, mongos: 1, config: 1}); + + // Set up one sharded collection with 2 chunks, both on the primary shard. + + assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: -5}, {writeConcern: {w: "majority"}})); + assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}})); + + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + st.ensurePrimaryShard(dbName, st.shard0.shardName); + + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); + + expectChunks(st, ns, [2, 0, 0]); + + // Temporarily move a chunk to Shard2, to avoid picking a global read timestamp before the + // sharding metadata cache collections are created. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName})); + + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName})); + expectChunks(st, ns, [1, 1, 0]); + + // First command targets the first chunk, the second command targets the second chunk. + const kCommandTestCases = [ + { + name: "aggregate", + commandFuncs: [ + (coll) => coll.aggregate({$match: {_id: -5}}).itcount(), + (coll) => coll.aggregate({$match: {_id: 5}}).itcount(), + ] + }, + { + name: "distinct", + commandFuncs: [ + (coll) => coll.distinct("_id", {_id: -5}).length, + (coll) => coll.distinct("_id", {_id: 5}).length, + ] + }, + { + name: "find", + commandFuncs: [ + (coll) => coll.find({_id: -5}).itcount(), + (coll) => coll.find({_id: 5}).itcount(), + ] + }, + // TODO SERVER-37350: Verify writes to chunks a shard no longer owns are rejected. + ]; + + function runTest(testCase) { + const cmdName = testCase.name; + const targetChunk1Func = testCase.commandFuncs[0]; + const targetChunk2Func = testCase.commandFuncs[1]; + + jsTestLog("Testing " + cmdName); + + expectChunks(st, ns, [1, 1, 0]); + + const session = st.s.startSession(); + const sessionDB = session.getDatabase(dbName); + const sessionColl = sessionDB[collName]; + + session.startTransaction({readConcern: {level: "snapshot"}}); + + // Start a transaction on Shard0 which will select and pin a global read timestamp. + assert.eq(targetChunk1Func(sessionColl), + 1, + "expected to find document in first chunk, cmd: " + cmdName); + + // Move a chunk from Shard1 to Shard2 outside of the transaction. This will happen at a + // later logical time than the transaction's read timestamp. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName})); + + // Target a document in the chunk that was moved. The router should get a stale shard + // version from Shard1 then retry on Shard1 and see the document. + assert.eq(targetChunk2Func(sessionColl), + 1, + "expected to find document in second chunk, cmd: " + cmdName); + + session.commitTransaction(); + + // Move the chunk back to Shard1 for the next iteration. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName})); + } + + kCommandTestCases.forEach(runTest); + + st.stop(); +})(); diff --git a/src/mongo/s/at_cluster_time_util.cpp b/src/mongo/s/at_cluster_time_util.cpp index 3a44e50923d..5e5e12dba13 100644 --- a/src/mongo/s/at_cluster_time_util.cpp +++ b/src/mongo/s/at_cluster_time_util.cpp @@ -95,13 +95,6 @@ boost::optional<LogicalTime> computeAtClusterTime(OperationContext* opCtx, const NamespaceString& nss, const BSONObj query, const BSONObj collation) { - - // TODO SERVER-36688: Move this check to TransactionRouter::computeAtClusterTime. - if (repl::ReadConcernArgs::get(opCtx).getLevel() != - repl::ReadConcernLevel::kSnapshotReadConcern) { - return boost::none; - } - auto atClusterTime = _computeAtClusterTime(opCtx, mustRunOnAll, shardIds, nss, query, collation); @@ -116,13 +109,6 @@ boost::optional<LogicalTime> computeAtClusterTime(OperationContext* opCtx, boost::optional<LogicalTime> computeAtClusterTimeForOneShard(OperationContext* opCtx, const ShardId& shardId) { - - // TODO SERVER-36688: Move this check to TransactionRouter::computeAtClusterTime. - if (repl::ReadConcernArgs::get(opCtx).getLevel() != - repl::ReadConcernLevel::kSnapshotReadConcern) { - return boost::none; - } - // TODO SERVER-36312: Re-enable algorithm using the cached opTimes of the targeted shard. // TODO SERVER-37549: Use the shard's cached lastApplied opTime instead of lastCommitted. auto atClusterTime = LogicalClock::get(opCtx)->getClusterTime(); diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index faecc0ab52a..2279429438d 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -564,4 +564,20 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx, return {routingInfo.db().primaryId()}; } +StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( + OperationContext* opCtx, const NamespaceString& nss) { + auto catalogCache = Grid::get(opCtx)->catalogCache(); + + // Return the latest routing table if not running in a transaction with snapshot level read + // concern. + auto txnRouter = TransactionRouter::get(opCtx); + if (!txnRouter || !txnRouter->getAtClusterTime()) { + return catalogCache->getCollectionRoutingInfo(opCtx, nss); + } + + auto atClusterTime = txnRouter->getAtClusterTime(); + return catalogCache->getCollectionRoutingInfoAt( + opCtx, nss, atClusterTime->getTime().asTimestamp()); +} + } // namespace mongo diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index bfe4e5abdce..31485508597 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -203,4 +203,15 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx, const BSONObj& query, const BSONObj& collation); +/** + * If the command is running in a transaction, returns the proper routing table to use for targeting + * shards. If there is no active transaction or the transaction is not running with snapshot level + * read concern, the latest routing table is returned, otherwise a historical routing table is + * returned at the global read timestamp, which must have been selected by this point. + * + * Should be used by all router commands that can be run in a transaction when targeting shards. + */ +StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( + OperationContext* opCtx, const NamespaceString& nss); + } // namespace mongo diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index f0ddf09533e..679c51cdf3c 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -170,8 +170,7 @@ public: CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); } - const auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); std::vector<AsyncRequestsSender::Response> shardResponses; try { diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 2832874beee..d0a17135882 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -167,8 +167,7 @@ public: // that the parsing be pulled into this function. uassertStatusOK(createShardDatabase(opCtx, nss.db())); - const auto routingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); if (!routingInfo.cm()) { _runCommand(opCtx, routingInfo.db().primaryId(), @@ -203,10 +202,6 @@ private: const NamespaceString& nss, const BSONObj& cmdObj, BSONObjBuilder* result) { - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->setAtClusterTimeToLatestTime(opCtx); - } - const auto response = [&] { std::vector<AsyncRequestsSender::Request> requests; requests.emplace_back( diff --git a/src/mongo/s/commands/cluster_killcursors_cmd.cpp b/src/mongo/s/commands/cluster_killcursors_cmd.cpp index aff708870c0..ec05bd23483 100644 --- a/src/mongo/s/commands/cluster_killcursors_cmd.cpp +++ b/src/mongo/s/commands/cluster_killcursors_cmd.cpp @@ -52,16 +52,6 @@ public: const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder& result) final { - // killCursors must choose a global read timestamp if it is the first command in a - // transaction with snapshot level read concern because any shards it may contact will not - // be able to change the snapshot of the local transactions they begin. - // - // TODO SERVER-37045: This can be removed once killCursors is not allowed to start a - // cross-shard transaction. - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->setAtClusterTimeToLatestTime(opCtx); - } - return runImpl(opCtx, dbname, cmdObj, result); } diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 92d8565ecf0..edd8e327f83 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -248,10 +248,6 @@ private: batchedRequest.setAllowImplicitCreate(false); } - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->setAtClusterTimeToLatestTime(opCtx); - } - BatchWriteExecStats stats; BatchedCommandResponse response; ClusterWriter::write(opCtx, batchedRequest, &stats, &response); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 09defb7c760..d0fdf1e671c 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -158,6 +158,9 @@ void invokeInTransactionRouter(OperationContext* opCtx, CommandInvocation* invocation, TransactionRouter* txnRouter, rpc::ReplyBuilderInterface* result) { + // No-op if the transaction is not running with snapshot read concern. + txnRouter->setDefaultAtClusterTime(opCtx); + try { invocation->run(opCtx, result); } catch (const DBException& e) { diff --git a/src/mongo/s/compute_at_cluster_time_test.cpp b/src/mongo/s/compute_at_cluster_time_test.cpp index be93abf9103..ffa1a8937f0 100644 --- a/src/mongo/s/compute_at_cluster_time_test.cpp +++ b/src/mongo/s/compute_at_cluster_time_test.cpp @@ -210,40 +210,6 @@ TEST_F(AtClusterTimeTargetingTest, ReturnsLatestTimeFromShard) { operationContext(), true, shards, kNss, query, collation)); } -// Verifies that a null logical time is returned for all requests without snapshot readConcern. -TEST_F(AtClusterTimeTargetingTest, NonSnapshotReadConcern) { - auto routingInfo = loadRoutingTableWithTwoChunksAndTwoShards(kNss); - auto query = BSON("find" << kNss.coll()); - auto collation = BSONObj(); - auto shards = getTargetedShardsForQuery(operationContext(), routingInfo, query, collation); - - // Uninitialized read concern. - ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime( - operationContext(), true, shards, kNss, query, collation)); - - auto& readConcernArgs = repl::ReadConcernArgs::get(operationContext()); - - // Local readConcern. - readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime( - operationContext(), true, shards, kNss, query, collation)); - - // Majority readConcern. - readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern); - ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime( - operationContext(), true, shards, kNss, query, collation)); - - // Linearizable readConcern. - readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLinearizableReadConcern); - ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime( - operationContext(), true, shards, kNss, query, collation)); - - // Available readConcern. - readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kAvailableReadConcern); - ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime( - operationContext(), true, shards, kNss, query, collation)); -} - // Verifies that if atClusterTime is specified in the request, atClusterTime is always greater than // or equal to it. TEST_F(AtClusterTimeTargetingTest, AfterClusterTime) { diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 502b0e9f43b..dddb0192896 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -147,8 +147,9 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; } - // This call to getCollectionRoutingInfo will return !OK if the database does not exist. - return Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, execNss); + // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not + // exist. + return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); } std::set<ShardId> getTargetedShards(OperationContext* opCtx, @@ -916,7 +917,7 @@ StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces( StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; for (auto&& nss : litePipe.getInvolvedNamespaces()) { const auto resolvedNsRoutingInfo = - uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); uassert(28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss)); diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 34767d9b354..ceb1fc4e384 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/document_source_merge_cursors.h" @@ -462,8 +463,8 @@ boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationConte return boost::none; } - const auto routingInfo = uassertStatusOK( - grid->catalogCache()->getCollectionRoutingInfo(opCtx, outStage->getOutputNs())); + const auto routingInfo = + uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, outStage->getOutputNs())); if (!routingInfo.cm()) { return boost::none; } diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 50f99690d84..43a6f01e2a2 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -426,7 +426,7 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx, // Re-target and re-send the initial find command to the shards until we have established the // shard version. for (size_t retries = 1; retries <= kMaxRetries; ++retries) { - auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, query.nss()); + auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, query.nss()); if (routingInfoStatus == ErrorCodes::NamespaceNotFound) { // If the database doesn't exist, we successfully return an empty result set without // creating a cursor. diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 30a6c0054f1..f37dfde9dbb 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -37,6 +37,7 @@ #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/logical_clock.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/at_cluster_time_util.h" @@ -244,6 +245,25 @@ StmtId TransactionRouter::Participant::getStmtIdCreatedAt() const { return _stmtIdCreatedAt; } +LogicalTime TransactionRouter::AtClusterTime::getTime() const { + invariant(_atClusterTime != LogicalTime::kUninitialized); + invariant(_stmtIdSelectedAt != kUninitializedStmtId); + return _atClusterTime; +} + +void TransactionRouter::AtClusterTime::setTime(LogicalTime atClusterTime, StmtId currentStmtId) { + _atClusterTime = atClusterTime; + _stmtIdSelectedAt = currentStmtId; +} + +bool TransactionRouter::AtClusterTime::isSet() const { + return _atClusterTime != LogicalTime::kUninitialized; +} + +bool TransactionRouter::AtClusterTime::canChange(StmtId currentStmtId) const { + return _stmtIdSelectedAt == kUninitializedStmtId || _stmtIdSelectedAt == currentStmtId; +} + TransactionRouter* TransactionRouter::get(OperationContext* opCtx) { auto& opCtxSession = getRouterSessionRuntimeState(opCtx); if (!opCtxSession) { @@ -268,6 +288,11 @@ bool TransactionRouter::isCheckedOut() { return _isCheckedOut; } +const boost::optional<TransactionRouter::AtClusterTime>& TransactionRouter::getAtClusterTime() + const { + return _atClusterTime; +} + boost::optional<ShardId> TransactionRouter::getCoordinatorId() const { return _coordinatorId; } @@ -286,7 +311,7 @@ void TransactionRouter::_verifyReadConcern() { if (_readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { invariant(_atClusterTime); - invariant(_atClusterTime != LogicalTime::kUninitialized); + invariant(_atClusterTime->isSet()); } } @@ -297,7 +322,8 @@ void TransactionRouter::_verifyParticipantAtClusterTime(const Participant& parti auto participantAtClusterTime = participant.getSharedOptions().atClusterTime; invariant(participantAtClusterTime); - invariant(participantAtClusterTime == _atClusterTime); + invariant(_atClusterTime); + invariant(*participantAtClusterTime == _atClusterTime->getTime()); } boost::optional<TransactionRouter::Participant&> TransactionRouter::getParticipant( @@ -323,12 +349,14 @@ TransactionRouter::Participant& TransactionRouter::_createParticipant(const Shar _verifyReadConcern(); - auto resultPair = _participants.try_emplace( - shard.toString(), - TransactionRouter::Participant( - isFirstParticipant, - _latestStmtId, - SharedTransactionOptions{_txnNumber, _readConcernArgs, _atClusterTime})); + auto sharedOptions = _atClusterTime + ? SharedTransactionOptions{_txnNumber, _readConcernArgs, _atClusterTime->getTime()} + : SharedTransactionOptions{_txnNumber, _readConcernArgs, boost::none}; + + auto resultPair = + _participants.try_emplace(shard.toString(), + TransactionRouter::Participant( + isFirstParticipant, _latestStmtId, std::move(sharedOptions))); return resultPair.first->second; } @@ -390,7 +418,8 @@ void TransactionRouter::onViewResolutionError() { } bool TransactionRouter::_canContinueOnSnapshotError() const { - return _latestStmtId == _firstStmtId; + invariant(_atClusterTime); + return _atClusterTime->canChange(_latestStmtId); } void TransactionRouter::onSnapshotError() { @@ -406,7 +435,9 @@ void TransactionRouter::onSnapshotError() { invariant(!_coordinatorId); // Reset the global snapshot timestamp so the retry will select a new one. + invariant(_atClusterTime); _atClusterTime.reset(); + _atClusterTime.emplace(); } void TransactionRouter::computeAtClusterTime(OperationContext* opCtx, @@ -415,42 +446,31 @@ void TransactionRouter::computeAtClusterTime(OperationContext* opCtx, const NamespaceString& nss, const BSONObj query, const BSONObj collation) { - // TODO SERVER-36688: We should also return immediately if the read concern - // is not snapshot. - if (_atClusterTime) { + if (!_atClusterTime || !_atClusterTime->canChange(_latestStmtId)) { return; } - // atClusterTime could be none if the the read concern is not snapshot. + // TODO SERVER-36688: Remove at_cluster_time_util. auto atClusterTime = at_cluster_time_util::computeAtClusterTime( opCtx, mustRunOnAll, shardIds, nss, query, collation); - // TODO SERVER-36688: atClusterTime should never be none once we add the check above. - invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized); - if (atClusterTime) { - _atClusterTime = *atClusterTime; - } + invariant(atClusterTime && *atClusterTime != LogicalTime::kUninitialized); + _atClusterTime->setTime(*atClusterTime, _latestStmtId); } void TransactionRouter::computeAtClusterTimeForOneShard(OperationContext* opCtx, const ShardId& shardId) { - // TODO SERVER-36688: We should also return immediately if the read concern - // is not snapshot. - if (_atClusterTime) { + if (!_atClusterTime || !_atClusterTime->canChange(_latestStmtId)) { return; } - // atClusterTime could be none if the the read concern is not snapshot. + // TODO SERVER-36688: Remove at_cluster_time_util. auto atClusterTime = at_cluster_time_util::computeAtClusterTimeForOneShard(opCtx, shardId); - // TODO SERVER-36688: atClusterTime should never be none once we add the check above. - invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized); - if (atClusterTime) { - _atClusterTime = *atClusterTime; - } + invariant(atClusterTime && *atClusterTime != LogicalTime::kUninitialized); + _atClusterTime->setTime(*atClusterTime, _latestStmtId); } -void TransactionRouter::setAtClusterTimeToLatestTime(OperationContext* opCtx) { - if (_atClusterTime || - _readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern) { +void TransactionRouter::setDefaultAtClusterTime(OperationContext* opCtx) { + if (!_atClusterTime || !_atClusterTime->canChange(_latestStmtId)) { return; } @@ -463,7 +483,7 @@ void TransactionRouter::setAtClusterTimeToLatestTime(OperationContext* opCtx) { atClusterTime = *afterClusterTime; } - _atClusterTime = atClusterTime; + _atClusterTime->setTime(atClusterTime, _latestStmtId); } void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, @@ -527,6 +547,10 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, // command that started the transaction, if one was included. _latestStmtId = kDefaultFirstStmtId; _firstStmtId = kDefaultFirstStmtId; + + if (_readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { + _atClusterTime.emplace(); + } } diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index 5c869d972d6..007eddc4b93 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -108,6 +108,44 @@ public: const SharedTransactionOptions _sharedOptions; }; + /** + * Encapsulates the logic around selecting a global read timestamp for a sharded transaction at + * snapshot level read concern. + * + * The first command in a transaction to target at least one shard must select a cluster time + * timestamp before targeting, but may change the timestamp before contacting any shards to + * allow optimizing the timestamp based on the targeted shards. If the first command encounters + * a retryable error, e.g. StaleShardVersion or SnapshotTooOld, the retry may also select a new + * timestamp. Once the first command has successfully completed, the timestamp cannot be + * changed. + */ + class AtClusterTime { + public: + /** + * Cannot be called until a timestamp has been set. + */ + LogicalTime getTime() const; + + /** + * Sets the timestamp and remembers the statement id of the command that set it. + */ + void setTime(LogicalTime atClusterTime, StmtId currentStmtId); + + /** + * True if the timestamp has been set to a non-null value. + */ + bool isSet() const; + + /** + * True if the timestamp can be changed by a command running at the given statement id. + */ + bool canChange(StmtId currentStmtId) const; + + private: + StmtId _stmtIdSelectedAt = kUninitializedStmtId; + LogicalTime _atClusterTime; + }; + TransactionRouter(LogicalSessionId sessionId); /** @@ -173,7 +211,13 @@ public: * Sets the atClusterTime for the current transaction to the latest time in the router's logical * clock. */ - void setAtClusterTimeToLatestTime(OperationContext* opCtx); + void setDefaultAtClusterTime(OperationContext* opCtx); + + /** + * Returns the global read timestamp for this transaction. Returns boost::none for transactions + * that don't run at snapshot level read concern or if a timestamp has not yet been selected. + */ + const boost::optional<AtClusterTime>& getAtClusterTime() const; bool isCheckedOut(); @@ -280,9 +324,9 @@ private: repl::ReadConcernArgs _readConcernArgs; // The cluster time of the timestamp all participant shards in the current transaction with - // snapshot level read concern must read from. Selected during the first statement of the - // transaction. Should not be changed after the first statement has completed successfully. - boost::optional<LogicalTime> _atClusterTime; + // snapshot level read concern must read from. Only set for transactions running with snapshot + // level read concern. + boost::optional<AtClusterTime> _atClusterTime; // The statement id of the latest received command for this transaction. For batch writes, this // will be the highest stmtId contained in the batch. Incremented by one if new commands do not diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index dd201c37ab3..847ff7d65b2 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -80,7 +80,7 @@ TEST_F(TransactionRouterTest, StartTxnShouldBeAttachedOnlyOnFirstStatementToPart TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -127,7 +127,7 @@ TEST_F(TransactionRouterTest, BasicStartTxnWithAtClusterTime) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -184,7 +184,7 @@ TEST_F(TransactionRouterTest, NewParticipantMustAttachTxnAndReadConcern) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -265,7 +265,7 @@ TEST_F(TransactionRouterTest, StartingNewTxnShouldClearState) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, @@ -291,7 +291,7 @@ TEST_F(TransactionRouterTest, StartingNewTxnShouldClearState) { TxnNumber txnNum2{5}; txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -323,7 +323,7 @@ TEST_F(TransactionRouterTest, FirstParticipantIsCoordinator) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_FALSE(txnRouter.getCoordinatorId()); @@ -346,7 +346,7 @@ TEST_F(TransactionRouterTest, FirstParticipantIsCoordinator) { TxnNumber txnNum2{5}; txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_FALSE(txnRouter.getCoordinatorId()); @@ -365,7 +365,7 @@ TEST_F(TransactionRouterTest, DoesNotAttachTxnNumIfAlreadyThere) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -397,7 +397,7 @@ DEATH_TEST_F(TransactionRouterTest, CrashesIfCmdHasDifferentTxnNumber, "invarian TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(shard1, BSON("insert" @@ -412,7 +412,7 @@ TEST_F(TransactionRouterTest, AttachTxnValidatesReadConcernIfAlreadyOnCmd) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, @@ -446,7 +446,7 @@ TEST_F(TransactionRouterTest, CannotSpecifyReadConcernAfterFirstStatement) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn(operationContext(), txnNum, false /* startTransaction */), @@ -461,7 +461,7 @@ TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelGiven) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -494,7 +494,7 @@ TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelButHasAfter TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" @@ -597,7 +597,7 @@ TEST_F(TransactionRouterTest, CannotCommitWithoutParticipants) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_THROWS(txnRouter.commitTransaction(operationContext()), AssertionException); } @@ -636,7 +636,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); auto future = launchAsync([&] { txnRouter->commitTransaction(operationContext()); }); @@ -668,7 +668,7 @@ TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipa auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); txnRouter->attachTxnFieldsIfNeeded(shard2, {}); @@ -716,7 +716,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedReadConcern = BSON("level" << "snapshot" @@ -738,7 +738,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) { // Simulate a snapshot error. txnRouter.onSnapshotError(); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); expectedReadConcern = BSON("level" << "snapshot" @@ -753,15 +753,13 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) { } } -TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) { +TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeAfterStatementThatSelectedIt) { TxnNumber txnNum{3}; TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); - - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedReadConcern = BSON("level" << "snapshot" @@ -775,11 +773,18 @@ TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) { ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); } - LogicalTime laterTime(Timestamp(1000, 1)); - ASSERT_GT(laterTime, kInMemoryLogicalTime); - LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTime); + // Changing the atClusterTime during the statement that selected it is allowed. - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + LogicalTime laterTimeSameStmt(Timestamp(100, 1)); + ASSERT_GT(laterTimeSameStmt, kInMemoryLogicalTime); + LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTimeSameStmt); + + txnRouter.setDefaultAtClusterTime(operationContext()); + + expectedReadConcern = BSON("level" + << "snapshot" + << "atClusterTime" + << laterTimeSameStmt.asTimestamp()); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2, @@ -787,6 +792,24 @@ TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) { << "test")); ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); } + + // Later statements cannot change atClusterTime. + + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); + + LogicalTime laterTimeNewStmt(Timestamp(1000, 1)); + ASSERT_GT(laterTimeNewStmt, laterTimeSameStmt); + LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTimeNewStmt); + + txnRouter.setDefaultAtClusterTime(operationContext()); + + { + auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard3, + BSON("insert" + << "test")); + ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); + } } TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) { @@ -795,7 +818,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); // Successfully start a transaction on two shards, selecting one as the coordinator. @@ -810,7 +833,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) { txnRouter.onSnapshotError(); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_FALSE(txnRouter.getCoordinatorId()); @@ -842,12 +865,12 @@ TEST_F(TransactionRouterTest, OnSnapshotErrorThrowsAfterFirstCommand) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); // Should not throw. txnRouter.onSnapshotError(); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); @@ -866,7 +889,7 @@ TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) { TxnNumber txnNum{3}; txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); // Transaction 1 contacts shard1 and shard2 during the first command, then shard3 in the second // command. @@ -895,7 +918,7 @@ TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) { repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern); TxnNumber txnNum2{5}; txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(shard3, {}); txnRouter.attachTxnFieldsIfNeeded(shard2, {}); @@ -916,7 +939,7 @@ TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOn TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); // Start a transaction on two shards, selecting one as the coordinator, but simulate a // re-targeting error from at least one of them. @@ -957,7 +980,7 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsClearedOnStaleError) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); // First statement successfully targets one shard, selecing it as the coordinator. @@ -983,44 +1006,47 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsClearedOnStaleError) { ASSERT_TRUE(txnRouter.attachTxnFieldsIfNeeded(shard3, {})["startTransaction"].trueValue()); } -TEST_F(TransactionRouterTest, RetryOnStaleErrorCannotPickNewAtClusterTime) { +TEST_F(TransactionRouterTest, RetriesCannotPickNewAtClusterTimeOnStatementAfterSelected) { TxnNumber txnNum{3}; TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + // First statement selects an atClusterTime. - BSONObj expectedReadConcern = BSON("level" - << "snapshot" - << "atClusterTime" - << kInMemoryLogicalTime.asTimestamp()); + txnRouter.setDefaultAtClusterTime(operationContext()); - { - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("find" - << "test")); - ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); - } + // A later statement retries on a stale version error and a view resolution error and cannot + // change the atClusterTime. + + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); - // Advance the latest time in the logical clock, simulate a stale config/db error, and verify - // the retry attempt cannot pick a new atClusterTime. LogicalTime laterTime(Timestamp(1000, 1)); ASSERT_GT(laterTime, kInMemoryLogicalTime); LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTime); txnRouter.onStaleShardOrDbError("find"); + txnRouter.setDefaultAtClusterTime(operationContext()); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + BSONObj expectedReadConcern = BSON("level" + << "snapshot" + << "atClusterTime" + << kInMemoryLogicalTime.asTimestamp()); - { - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("find" - << "test")); - ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); - } + auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, + BSON("find" + << "test")); + ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); + + txnRouter.onViewResolutionError(); + txnRouter.setDefaultAtClusterTime(operationContext()); + + newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, + BSON("find" + << "test")); + ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); } TEST_F(TransactionRouterTest, WritesCanOnlyBeRetriedIfFirstOverallCommand) { @@ -1032,7 +1058,7 @@ TEST_F(TransactionRouterTest, WritesCanOnlyBeRetriedIfFirstOverallCommand) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(shard1, {}); @@ -1071,7 +1097,7 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); ASSERT_THROWS_CODE( txnRouter->abortTransaction(opCtx), DBException, ErrorCodes::NoSuchTransaction); @@ -1089,7 +1115,7 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); }); @@ -1122,7 +1148,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); txnRouter->attachTxnFieldsIfNeeded(shard2, {}); @@ -1162,7 +1188,7 @@ TEST_F(TransactionRouterTest, OnViewResolutionErrorClearsAllNewParticipants) { TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); // One shard is targeted by the first statement. auto firstShardCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, {}); @@ -1212,7 +1238,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); // Should not throw. txnRouter->implicitlyAbortTransaction(opCtx); @@ -1230,7 +1256,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); auto future = @@ -1263,7 +1289,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); txnRouter->attachTxnFieldsIfNeeded(shard2, {}); @@ -1309,7 +1335,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) { auto txnRouter = TransactionRouter::get(opCtx); txnRouter->beginOrContinueTxn(opCtx, txnNum, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); txnRouter->attachTxnFieldsIfNeeded(shard1, {}); auto future = @@ -1337,7 +1363,7 @@ TEST_F(TransactionRouterTest, ContinuingTransactionPlacesItsReadConcernOnOpCtx) TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); @@ -1359,7 +1385,7 @@ TEST_F(TransactionRouterTest, SubsequentStatementCanSelectAtClusterTimeIfNotSele txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); // Subsequent statement does select an atClusterTime and does target a participant. - txnRouter.setAtClusterTimeToLatestTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedReadConcern = BSON("level" << "snapshot" @@ -1370,7 +1396,26 @@ TEST_F(TransactionRouterTest, SubsequentStatementCanSelectAtClusterTimeIfNotSele BSON("insert" << "test")); ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); + + // The next statement cannot change the atClusterTime. + + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); + + LogicalTime laterTimeSameStmt(Timestamp(100, 1)); + ASSERT_GT(laterTimeSameStmt, kInMemoryLogicalTime); + LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTimeSameStmt); + + txnRouter.setDefaultAtClusterTime(operationContext()); + + newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2, + BSON("insert" + << "test")); + ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); } +// TODO SERVER-37630: Verify transactions with majority level read concern don't have and cannot +// select an atClusterTime once they are allowed. + } // unnamed namespace } // namespace mongo diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index eb8a27b5e33..021148993e8 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -596,7 +596,7 @@ public: auto txnRouter = TransactionRouter::get(operationContext()); txnRouter->checkOut(); txnRouter->beginOrContinueTxn(operationContext(), kTxnNumber, true); - txnRouter->setAtClusterTimeToLatestTime(operationContext()); + txnRouter->setDefaultAtClusterTime(operationContext()); } void tearDown() override { diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index 54358656aaf..3b496e95d05 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -325,8 +325,7 @@ Status ChunkManagerTargeter::init(OperationContext* opCtx) { return shardDbStatus.getStatus(); } - const auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, _nss); + const auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, _nss); if (!routingInfoStatus.isOK()) { return routingInfoStatus.getStatus(); } |