summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml1
-rw-r--r--buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j21
-rw-r--r--jstests/sharding/transactions_target_at_point_in_time.js118
-rw-r--r--src/mongo/s/at_cluster_time_util.cpp14
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp16
-rw-r--r--src/mongo/s/cluster_commands_helpers.h11
-rw-r--r--src/mongo/s/commands/cluster_distinct_cmd.cpp3
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp7
-rw-r--r--src/mongo/s/commands/cluster_killcursors_cmd.cpp10
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp4
-rw-r--r--src/mongo/s/commands/strategy.cpp3
-rw-r--r--src/mongo/s/compute_at_cluster_time_test.cpp34
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp7
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp5
-rw-r--r--src/mongo/s/query/cluster_find.cpp2
-rw-r--r--src/mongo/s/transaction_router.cpp86
-rw-r--r--src/mongo/s/transaction_router.h52
-rw-r--r--src/mongo/s/transaction_router_test.cpp179
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp2
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp3
22 files changed, 382 insertions, 181 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
index b832285d1a6..71ae75d7cef 100644
--- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
@@ -177,6 +177,10 @@ selector:
# - jstests/sharding/remove1.js
# - jstests/sharding/remove2.js
+ # Moves a chunk before continuing a transaction, which can lead to snapshot errors if the
+ # CSRS failovers are sufficiently slow.
+ - jstests/sharding/transactions_target_at_point_in_time.js
+
executor:
config:
shell_options:
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 30dd78acf48..45d1718ce52 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -51,6 +51,7 @@ selector:
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
- jstests/sharding/transactions_stale_database_version_errors.js
- jstests/sharding/transactions_stale_shard_version_errors.js
+ - jstests/sharding/transactions_target_at_point_in_time.js
- jstests/sharding/transactions_view_resolution.js
- jstests/sharding/txn_agg.js
- jstests/sharding/txn_basic_two_phase_commit.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index 9bee9c8a4ed..a0ae78a83a8 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -366,6 +366,7 @@ selector:
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
- jstests/sharding/transactions_stale_database_version_errors.js
- jstests/sharding/transactions_stale_shard_version_errors.js
+ - jstests/sharding/transactions_target_at_point_in_time.js
- jstests/sharding/transactions_view_resolution.js
- jstests/sharding/txn_agg.js
- jstests/sharding/txn_basic_two_phase_commit.js
diff --git a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
index aaedbeeb4f9..783ceda5d6c 100644
--- a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
+++ b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
@@ -61,6 +61,7 @@ selector:
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
- jstests/sharding/transactions_stale_database_version_errors.js
- jstests/sharding/transactions_stale_shard_version_errors.js
+ - jstests/sharding/transactions_target_at_point_in_time.js
- jstests/sharding/transactions_view_resolution.js
- jstests/sharding/txn_agg.js
- jstests/sharding/txn_basic_two_phase_commit.js
diff --git a/jstests/sharding/transactions_target_at_point_in_time.js b/jstests/sharding/transactions_target_at_point_in_time.js
new file mode 100644
index 00000000000..8422c50160b
--- /dev/null
+++ b/jstests/sharding/transactions_target_at_point_in_time.js
@@ -0,0 +1,118 @@
+// Verifies mongos uses a versioned routing table to target subsequent requests in transactions with
+// snapshot level read concern.
+//
+// @tags: [
+// requires_find_command,
+// requires_sharding,
+// uses_multi_shard_transaction,
+// uses_transactions,
+// ]
+(function() {
+ "use strict";
+
+ load("jstests/sharding/libs/sharded_transactions_helpers.js");
+
+ function expectChunks(st, ns, chunks) {
+ for (let i = 0; i < chunks.length; i++) {
+ assert.eq(chunks[i],
+ st.s.getDB("config").chunks.count({ns: ns, shard: st["shard" + i].shardName}),
+ "unexpected number of chunks on shard " + i);
+ }
+ }
+
+ const dbName = "test";
+ const collName = "foo";
+ const ns = dbName + '.' + collName;
+
+ const st = new ShardingTest({shards: 3, mongos: 1, config: 1});
+
+ // Set up one sharded collection with 2 chunks, both on the primary shard.
+
+ assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: -5}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}}));
+
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ st.ensurePrimaryShard(dbName, st.shard0.shardName);
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
+
+ expectChunks(st, ns, [2, 0, 0]);
+
+ // Temporarily move a chunk to Shard2, to avoid picking a global read timestamp before the
+ // sharding metadata cache collections are created.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName}));
+
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName}));
+ expectChunks(st, ns, [1, 1, 0]);
+
+ // First command targets the first chunk, the second command targets the second chunk.
+ const kCommandTestCases = [
+ {
+ name: "aggregate",
+ commandFuncs: [
+ (coll) => coll.aggregate({$match: {_id: -5}}).itcount(),
+ (coll) => coll.aggregate({$match: {_id: 5}}).itcount(),
+ ]
+ },
+ {
+ name: "distinct",
+ commandFuncs: [
+ (coll) => coll.distinct("_id", {_id: -5}).length,
+ (coll) => coll.distinct("_id", {_id: 5}).length,
+ ]
+ },
+ {
+ name: "find",
+ commandFuncs: [
+ (coll) => coll.find({_id: -5}).itcount(),
+ (coll) => coll.find({_id: 5}).itcount(),
+ ]
+ },
+ // TODO SERVER-37350: Verify writes to chunks a shard no longer owns are rejected.
+ ];
+
+ function runTest(testCase) {
+ const cmdName = testCase.name;
+ const targetChunk1Func = testCase.commandFuncs[0];
+ const targetChunk2Func = testCase.commandFuncs[1];
+
+ jsTestLog("Testing " + cmdName);
+
+ expectChunks(st, ns, [1, 1, 0]);
+
+ const session = st.s.startSession();
+ const sessionDB = session.getDatabase(dbName);
+ const sessionColl = sessionDB[collName];
+
+ session.startTransaction({readConcern: {level: "snapshot"}});
+
+ // Start a transaction on Shard0 which will select and pin a global read timestamp.
+ assert.eq(targetChunk1Func(sessionColl),
+ 1,
+ "expected to find document in first chunk, cmd: " + cmdName);
+
+ // Move a chunk from Shard1 to Shard2 outside of the transaction. This will happen at a
+ // later logical time than the transaction's read timestamp.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName}));
+
+ // Target a document in the chunk that was moved. The router should get a stale shard
+ // version from Shard1 then retry on Shard1 and see the document.
+ assert.eq(targetChunk2Func(sessionColl),
+ 1,
+ "expected to find document in second chunk, cmd: " + cmdName);
+
+ session.commitTransaction();
+
+ // Move the chunk back to Shard1 for the next iteration.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName}));
+ }
+
+ kCommandTestCases.forEach(runTest);
+
+ st.stop();
+})();
diff --git a/src/mongo/s/at_cluster_time_util.cpp b/src/mongo/s/at_cluster_time_util.cpp
index 3a44e50923d..5e5e12dba13 100644
--- a/src/mongo/s/at_cluster_time_util.cpp
+++ b/src/mongo/s/at_cluster_time_util.cpp
@@ -95,13 +95,6 @@ boost::optional<LogicalTime> computeAtClusterTime(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj query,
const BSONObj collation) {
-
- // TODO SERVER-36688: Move this check to TransactionRouter::computeAtClusterTime.
- if (repl::ReadConcernArgs::get(opCtx).getLevel() !=
- repl::ReadConcernLevel::kSnapshotReadConcern) {
- return boost::none;
- }
-
auto atClusterTime =
_computeAtClusterTime(opCtx, mustRunOnAll, shardIds, nss, query, collation);
@@ -116,13 +109,6 @@ boost::optional<LogicalTime> computeAtClusterTime(OperationContext* opCtx,
boost::optional<LogicalTime> computeAtClusterTimeForOneShard(OperationContext* opCtx,
const ShardId& shardId) {
-
- // TODO SERVER-36688: Move this check to TransactionRouter::computeAtClusterTime.
- if (repl::ReadConcernArgs::get(opCtx).getLevel() !=
- repl::ReadConcernLevel::kSnapshotReadConcern) {
- return boost::none;
- }
-
// TODO SERVER-36312: Re-enable algorithm using the cached opTimes of the targeted shard.
// TODO SERVER-37549: Use the shard's cached lastApplied opTime instead of lastCommitted.
auto atClusterTime = LogicalClock::get(opCtx)->getClusterTime();
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index faecc0ab52a..2279429438d 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -564,4 +564,20 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx,
return {routingInfo.db().primaryId()};
}
+StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
+ OperationContext* opCtx, const NamespaceString& nss) {
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+
+ // Return the latest routing table if not running in a transaction with snapshot level read
+ // concern.
+ auto txnRouter = TransactionRouter::get(opCtx);
+ if (!txnRouter || !txnRouter->getAtClusterTime()) {
+ return catalogCache->getCollectionRoutingInfo(opCtx, nss);
+ }
+
+ auto atClusterTime = txnRouter->getAtClusterTime();
+ return catalogCache->getCollectionRoutingInfoAt(
+ opCtx, nss, atClusterTime->getTime().asTimestamp());
+}
+
} // namespace mongo
diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h
index bfe4e5abdce..31485508597 100644
--- a/src/mongo/s/cluster_commands_helpers.h
+++ b/src/mongo/s/cluster_commands_helpers.h
@@ -203,4 +203,15 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& collation);
+/**
+ * If the command is running in a transaction, returns the proper routing table to use for targeting
+ * shards. If there is no active transaction or the transaction is not running with snapshot level
+ * read concern, the latest routing table is returned, otherwise a historical routing table is
+ * returned at the global read timestamp, which must have been selected by this point.
+ *
+ * Should be used by all router commands that can be run in a transaction when targeting shards.
+ */
+StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
+ OperationContext* opCtx, const NamespaceString& nss);
+
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp
index f0ddf09533e..679c51cdf3c 100644
--- a/src/mongo/s/commands/cluster_distinct_cmd.cpp
+++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp
@@ -170,8 +170,7 @@ public:
CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation));
}
- const auto routingInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
std::vector<AsyncRequestsSender::Response> shardResponses;
try {
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 2832874beee..d0a17135882 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -167,8 +167,7 @@ public:
// that the parsing be pulled into this function.
uassertStatusOK(createShardDatabase(opCtx, nss.db()));
- const auto routingInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ const auto routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
if (!routingInfo.cm()) {
_runCommand(opCtx,
routingInfo.db().primaryId(),
@@ -203,10 +202,6 @@ private:
const NamespaceString& nss,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->setAtClusterTimeToLatestTime(opCtx);
- }
-
const auto response = [&] {
std::vector<AsyncRequestsSender::Request> requests;
requests.emplace_back(
diff --git a/src/mongo/s/commands/cluster_killcursors_cmd.cpp b/src/mongo/s/commands/cluster_killcursors_cmd.cpp
index aff708870c0..ec05bd23483 100644
--- a/src/mongo/s/commands/cluster_killcursors_cmd.cpp
+++ b/src/mongo/s/commands/cluster_killcursors_cmd.cpp
@@ -52,16 +52,6 @@ public:
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) final {
- // killCursors must choose a global read timestamp if it is the first command in a
- // transaction with snapshot level read concern because any shards it may contact will not
- // be able to change the snapshot of the local transactions they begin.
- //
- // TODO SERVER-37045: This can be removed once killCursors is not allowed to start a
- // cross-shard transaction.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->setAtClusterTimeToLatestTime(opCtx);
- }
-
return runImpl(opCtx, dbname, cmdObj, result);
}
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 92d8565ecf0..edd8e327f83 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -248,10 +248,6 @@ private:
batchedRequest.setAllowImplicitCreate(false);
}
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->setAtClusterTimeToLatestTime(opCtx);
- }
-
BatchWriteExecStats stats;
BatchedCommandResponse response;
ClusterWriter::write(opCtx, batchedRequest, &stats, &response);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 09defb7c760..d0fdf1e671c 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -158,6 +158,9 @@ void invokeInTransactionRouter(OperationContext* opCtx,
CommandInvocation* invocation,
TransactionRouter* txnRouter,
rpc::ReplyBuilderInterface* result) {
+ // No-op if the transaction is not running with snapshot read concern.
+ txnRouter->setDefaultAtClusterTime(opCtx);
+
try {
invocation->run(opCtx, result);
} catch (const DBException& e) {
diff --git a/src/mongo/s/compute_at_cluster_time_test.cpp b/src/mongo/s/compute_at_cluster_time_test.cpp
index be93abf9103..ffa1a8937f0 100644
--- a/src/mongo/s/compute_at_cluster_time_test.cpp
+++ b/src/mongo/s/compute_at_cluster_time_test.cpp
@@ -210,40 +210,6 @@ TEST_F(AtClusterTimeTargetingTest, ReturnsLatestTimeFromShard) {
operationContext(), true, shards, kNss, query, collation));
}
-// Verifies that a null logical time is returned for all requests without snapshot readConcern.
-TEST_F(AtClusterTimeTargetingTest, NonSnapshotReadConcern) {
- auto routingInfo = loadRoutingTableWithTwoChunksAndTwoShards(kNss);
- auto query = BSON("find" << kNss.coll());
- auto collation = BSONObj();
- auto shards = getTargetedShardsForQuery(operationContext(), routingInfo, query, collation);
-
- // Uninitialized read concern.
- ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime(
- operationContext(), true, shards, kNss, query, collation));
-
- auto& readConcernArgs = repl::ReadConcernArgs::get(operationContext());
-
- // Local readConcern.
- readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
- ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime(
- operationContext(), true, shards, kNss, query, collation));
-
- // Majority readConcern.
- readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern);
- ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime(
- operationContext(), true, shards, kNss, query, collation));
-
- // Linearizable readConcern.
- readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLinearizableReadConcern);
- ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime(
- operationContext(), true, shards, kNss, query, collation));
-
- // Available readConcern.
- readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kAvailableReadConcern);
- ASSERT_FALSE(at_cluster_time_util::computeAtClusterTime(
- operationContext(), true, shards, kNss, query, collation));
-}
-
// Verifies that if atClusterTime is specified in the request, atClusterTime is always greater than
// or equal to it.
TEST_F(AtClusterTimeTargetingTest, AfterClusterTime) {
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 502b0e9f43b..dddb0192896 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -147,8 +147,9 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte
return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
}
- // This call to getCollectionRoutingInfo will return !OK if the database does not exist.
- return Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, execNss);
+ // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not
+ // exist.
+ return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
}
std::set<ShardId> getTargetedShards(OperationContext* opCtx,
@@ -916,7 +917,7 @@ StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces(
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
for (auto&& nss : litePipe.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =
- uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
uassert(28769,
str::stream() << nss.ns() << " cannot be sharded",
!resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss));
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 34767d9b354..ceb1fc4e384 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/document_source_merge_cursors.h"
@@ -462,8 +463,8 @@ boost::optional<ShardedExchangePolicy> checkIfEligibleForExchange(OperationConte
return boost::none;
}
- const auto routingInfo = uassertStatusOK(
- grid->catalogCache()->getCollectionRoutingInfo(opCtx, outStage->getOutputNs()));
+ const auto routingInfo =
+ uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, outStage->getOutputNs()));
if (!routingInfo.cm()) {
return boost::none;
}
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 50f99690d84..43a6f01e2a2 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -426,7 +426,7 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
// Re-target and re-send the initial find command to the shards until we have established the
// shard version.
for (size_t retries = 1; retries <= kMaxRetries; ++retries) {
- auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, query.nss());
+ auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, query.nss());
if (routingInfoStatus == ErrorCodes::NamespaceNotFound) {
// If the database doesn't exist, we successfully return an empty result set without
// creating a cursor.
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 30a6c0054f1..f37dfde9dbb 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/at_cluster_time_util.h"
@@ -244,6 +245,25 @@ StmtId TransactionRouter::Participant::getStmtIdCreatedAt() const {
return _stmtIdCreatedAt;
}
+LogicalTime TransactionRouter::AtClusterTime::getTime() const {
+ invariant(_atClusterTime != LogicalTime::kUninitialized);
+ invariant(_stmtIdSelectedAt != kUninitializedStmtId);
+ return _atClusterTime;
+}
+
+void TransactionRouter::AtClusterTime::setTime(LogicalTime atClusterTime, StmtId currentStmtId) {
+ _atClusterTime = atClusterTime;
+ _stmtIdSelectedAt = currentStmtId;
+}
+
+bool TransactionRouter::AtClusterTime::isSet() const {
+ return _atClusterTime != LogicalTime::kUninitialized;
+}
+
+bool TransactionRouter::AtClusterTime::canChange(StmtId currentStmtId) const {
+ return _stmtIdSelectedAt == kUninitializedStmtId || _stmtIdSelectedAt == currentStmtId;
+}
+
TransactionRouter* TransactionRouter::get(OperationContext* opCtx) {
auto& opCtxSession = getRouterSessionRuntimeState(opCtx);
if (!opCtxSession) {
@@ -268,6 +288,11 @@ bool TransactionRouter::isCheckedOut() {
return _isCheckedOut;
}
+const boost::optional<TransactionRouter::AtClusterTime>& TransactionRouter::getAtClusterTime()
+ const {
+ return _atClusterTime;
+}
+
boost::optional<ShardId> TransactionRouter::getCoordinatorId() const {
return _coordinatorId;
}
@@ -286,7 +311,7 @@ void TransactionRouter::_verifyReadConcern() {
if (_readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
invariant(_atClusterTime);
- invariant(_atClusterTime != LogicalTime::kUninitialized);
+ invariant(_atClusterTime->isSet());
}
}
@@ -297,7 +322,8 @@ void TransactionRouter::_verifyParticipantAtClusterTime(const Participant& parti
auto participantAtClusterTime = participant.getSharedOptions().atClusterTime;
invariant(participantAtClusterTime);
- invariant(participantAtClusterTime == _atClusterTime);
+ invariant(_atClusterTime);
+ invariant(*participantAtClusterTime == _atClusterTime->getTime());
}
boost::optional<TransactionRouter::Participant&> TransactionRouter::getParticipant(
@@ -323,12 +349,14 @@ TransactionRouter::Participant& TransactionRouter::_createParticipant(const Shar
_verifyReadConcern();
- auto resultPair = _participants.try_emplace(
- shard.toString(),
- TransactionRouter::Participant(
- isFirstParticipant,
- _latestStmtId,
- SharedTransactionOptions{_txnNumber, _readConcernArgs, _atClusterTime}));
+ auto sharedOptions = _atClusterTime
+ ? SharedTransactionOptions{_txnNumber, _readConcernArgs, _atClusterTime->getTime()}
+ : SharedTransactionOptions{_txnNumber, _readConcernArgs, boost::none};
+
+ auto resultPair =
+ _participants.try_emplace(shard.toString(),
+ TransactionRouter::Participant(
+ isFirstParticipant, _latestStmtId, std::move(sharedOptions)));
return resultPair.first->second;
}
@@ -390,7 +418,8 @@ void TransactionRouter::onViewResolutionError() {
}
bool TransactionRouter::_canContinueOnSnapshotError() const {
- return _latestStmtId == _firstStmtId;
+ invariant(_atClusterTime);
+ return _atClusterTime->canChange(_latestStmtId);
}
void TransactionRouter::onSnapshotError() {
@@ -406,7 +435,9 @@ void TransactionRouter::onSnapshotError() {
invariant(!_coordinatorId);
// Reset the global snapshot timestamp so the retry will select a new one.
+ invariant(_atClusterTime);
_atClusterTime.reset();
+ _atClusterTime.emplace();
}
void TransactionRouter::computeAtClusterTime(OperationContext* opCtx,
@@ -415,42 +446,31 @@ void TransactionRouter::computeAtClusterTime(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj query,
const BSONObj collation) {
- // TODO SERVER-36688: We should also return immediately if the read concern
- // is not snapshot.
- if (_atClusterTime) {
+ if (!_atClusterTime || !_atClusterTime->canChange(_latestStmtId)) {
return;
}
- // atClusterTime could be none if the the read concern is not snapshot.
+ // TODO SERVER-36688: Remove at_cluster_time_util.
auto atClusterTime = at_cluster_time_util::computeAtClusterTime(
opCtx, mustRunOnAll, shardIds, nss, query, collation);
- // TODO SERVER-36688: atClusterTime should never be none once we add the check above.
- invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized);
- if (atClusterTime) {
- _atClusterTime = *atClusterTime;
- }
+ invariant(atClusterTime && *atClusterTime != LogicalTime::kUninitialized);
+ _atClusterTime->setTime(*atClusterTime, _latestStmtId);
}
void TransactionRouter::computeAtClusterTimeForOneShard(OperationContext* opCtx,
const ShardId& shardId) {
- // TODO SERVER-36688: We should also return immediately if the read concern
- // is not snapshot.
- if (_atClusterTime) {
+ if (!_atClusterTime || !_atClusterTime->canChange(_latestStmtId)) {
return;
}
- // atClusterTime could be none if the the read concern is not snapshot.
+ // TODO SERVER-36688: Remove at_cluster_time_util.
auto atClusterTime = at_cluster_time_util::computeAtClusterTimeForOneShard(opCtx, shardId);
- // TODO SERVER-36688: atClusterTime should never be none once we add the check above.
- invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized);
- if (atClusterTime) {
- _atClusterTime = *atClusterTime;
- }
+ invariant(atClusterTime && *atClusterTime != LogicalTime::kUninitialized);
+ _atClusterTime->setTime(*atClusterTime, _latestStmtId);
}
-void TransactionRouter::setAtClusterTimeToLatestTime(OperationContext* opCtx) {
- if (_atClusterTime ||
- _readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern) {
+void TransactionRouter::setDefaultAtClusterTime(OperationContext* opCtx) {
+ if (!_atClusterTime || !_atClusterTime->canChange(_latestStmtId)) {
return;
}
@@ -463,7 +483,7 @@ void TransactionRouter::setAtClusterTimeToLatestTime(OperationContext* opCtx) {
atClusterTime = *afterClusterTime;
}
- _atClusterTime = atClusterTime;
+ _atClusterTime->setTime(atClusterTime, _latestStmtId);
}
void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
@@ -527,6 +547,10 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
// command that started the transaction, if one was included.
_latestStmtId = kDefaultFirstStmtId;
_firstStmtId = kDefaultFirstStmtId;
+
+ if (_readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
+ _atClusterTime.emplace();
+ }
}
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 5c869d972d6..007eddc4b93 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -108,6 +108,44 @@ public:
const SharedTransactionOptions _sharedOptions;
};
+ /**
+ * Encapsulates the logic around selecting a global read timestamp for a sharded transaction at
+ * snapshot level read concern.
+ *
+ * The first command in a transaction to target at least one shard must select a cluster time
+ * timestamp before targeting, but may change the timestamp before contacting any shards to
+ * allow optimizing the timestamp based on the targeted shards. If the first command encounters
+ * a retryable error, e.g. StaleShardVersion or SnapshotTooOld, the retry may also select a new
+ * timestamp. Once the first command has successfully completed, the timestamp cannot be
+ * changed.
+ */
+ class AtClusterTime {
+ public:
+ /**
+ * Cannot be called until a timestamp has been set.
+ */
+ LogicalTime getTime() const;
+
+ /**
+ * Sets the timestamp and remembers the statement id of the command that set it.
+ */
+ void setTime(LogicalTime atClusterTime, StmtId currentStmtId);
+
+ /**
+ * True if the timestamp has been set to a non-null value.
+ */
+ bool isSet() const;
+
+ /**
+ * True if the timestamp can be changed by a command running at the given statement id.
+ */
+ bool canChange(StmtId currentStmtId) const;
+
+ private:
+ StmtId _stmtIdSelectedAt = kUninitializedStmtId;
+ LogicalTime _atClusterTime;
+ };
+
TransactionRouter(LogicalSessionId sessionId);
/**
@@ -173,7 +211,13 @@ public:
* Sets the atClusterTime for the current transaction to the latest time in the router's logical
* clock.
*/
- void setAtClusterTimeToLatestTime(OperationContext* opCtx);
+ void setDefaultAtClusterTime(OperationContext* opCtx);
+
+ /**
+ * Returns the global read timestamp for this transaction. Returns boost::none for transactions
+ * that don't run at snapshot level read concern or if a timestamp has not yet been selected.
+ */
+ const boost::optional<AtClusterTime>& getAtClusterTime() const;
bool isCheckedOut();
@@ -280,9 +324,9 @@ private:
repl::ReadConcernArgs _readConcernArgs;
// The cluster time of the timestamp all participant shards in the current transaction with
- // snapshot level read concern must read from. Selected during the first statement of the
- // transaction. Should not be changed after the first statement has completed successfully.
- boost::optional<LogicalTime> _atClusterTime;
+ // snapshot level read concern must read from. Only set for transactions running with snapshot
+ // level read concern.
+ boost::optional<AtClusterTime> _atClusterTime;
// The statement id of the latest received command for this transaction. For batch writes, this
// will be the highest stmtId contained in the batch. Incremented by one if new commands do not
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index dd201c37ab3..847ff7d65b2 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -80,7 +80,7 @@ TEST_F(TransactionRouterTest, StartTxnShouldBeAttachedOnlyOnFirstStatementToPart
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -127,7 +127,7 @@ TEST_F(TransactionRouterTest, BasicStartTxnWithAtClusterTime) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -184,7 +184,7 @@ TEST_F(TransactionRouterTest, NewParticipantMustAttachTxnAndReadConcern) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -265,7 +265,7 @@ TEST_F(TransactionRouterTest, StartingNewTxnShouldClearState) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
@@ -291,7 +291,7 @@ TEST_F(TransactionRouterTest, StartingNewTxnShouldClearState) {
TxnNumber txnNum2{5};
txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -323,7 +323,7 @@ TEST_F(TransactionRouterTest, FirstParticipantIsCoordinator) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_FALSE(txnRouter.getCoordinatorId());
@@ -346,7 +346,7 @@ TEST_F(TransactionRouterTest, FirstParticipantIsCoordinator) {
TxnNumber txnNum2{5};
txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_FALSE(txnRouter.getCoordinatorId());
@@ -365,7 +365,7 @@ TEST_F(TransactionRouterTest, DoesNotAttachTxnNumIfAlreadyThere) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -397,7 +397,7 @@ DEATH_TEST_F(TransactionRouterTest, CrashesIfCmdHasDifferentTxnNumber, "invarian
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(shard1,
BSON("insert"
@@ -412,7 +412,7 @@ TEST_F(TransactionRouterTest, AttachTxnValidatesReadConcernIfAlreadyOnCmd) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
@@ -446,7 +446,7 @@ TEST_F(TransactionRouterTest, CannotSpecifyReadConcernAfterFirstStatement) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false /* startTransaction */),
@@ -461,7 +461,7 @@ TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelGiven) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -494,7 +494,7 @@ TEST_F(TransactionRouterTest, UpconvertToSnapshotIfNoReadConcernLevelButHasAfter
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true /* startTransaction */);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
@@ -597,7 +597,7 @@ TEST_F(TransactionRouterTest, CannotCommitWithoutParticipants) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_THROWS(txnRouter.commitTransaction(operationContext()), AssertionException);
}
@@ -636,7 +636,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
auto future = launchAsync([&] { txnRouter->commitTransaction(operationContext()); });
@@ -668,7 +668,7 @@ TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipa
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
txnRouter->attachTxnFieldsIfNeeded(shard2, {});
@@ -716,7 +716,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedReadConcern = BSON("level"
<< "snapshot"
@@ -738,7 +738,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) {
// Simulate a snapshot error.
txnRouter.onSnapshotError();
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
expectedReadConcern = BSON("level"
<< "snapshot"
@@ -753,15 +753,13 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) {
}
}
-TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) {
+TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeAfterStatementThatSelectedIt) {
TxnNumber txnNum{3};
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
-
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedReadConcern = BSON("level"
<< "snapshot"
@@ -775,11 +773,18 @@ TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) {
ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
}
- LogicalTime laterTime(Timestamp(1000, 1));
- ASSERT_GT(laterTime, kInMemoryLogicalTime);
- LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTime);
+ // Changing the atClusterTime during the statement that selected it is allowed.
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ LogicalTime laterTimeSameStmt(Timestamp(100, 1));
+ ASSERT_GT(laterTimeSameStmt, kInMemoryLogicalTime);
+ LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTimeSameStmt);
+
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ expectedReadConcern = BSON("level"
+ << "snapshot"
+ << "atClusterTime"
+ << laterTimeSameStmt.asTimestamp());
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2,
@@ -787,6 +792,24 @@ TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) {
<< "test"));
ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
}
+
+ // Later statements cannot change atClusterTime.
+
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
+
+ LogicalTime laterTimeNewStmt(Timestamp(1000, 1));
+ ASSERT_GT(laterTimeNewStmt, laterTimeSameStmt);
+ LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTimeNewStmt);
+
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ {
+ auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard3,
+ BSON("insert"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
+ }
}
TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) {
@@ -795,7 +818,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
// Successfully start a transaction on two shards, selecting one as the coordinator.
@@ -810,7 +833,7 @@ TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) {
txnRouter.onSnapshotError();
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_FALSE(txnRouter.getCoordinatorId());
@@ -842,12 +865,12 @@ TEST_F(TransactionRouterTest, OnSnapshotErrorThrowsAfterFirstCommand) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
// Should not throw.
txnRouter.onSnapshotError();
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
@@ -866,7 +889,7 @@ TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) {
TxnNumber txnNum{3};
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
// Transaction 1 contacts shard1 and shard2 during the first command, then shard3 in the second
// command.
@@ -895,7 +918,7 @@ TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) {
repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern);
TxnNumber txnNum2{5};
txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(shard3, {});
txnRouter.attachTxnFieldsIfNeeded(shard2, {});
@@ -916,7 +939,7 @@ TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOn
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
// Start a transaction on two shards, selecting one as the coordinator, but simulate a
// re-targeting error from at least one of them.
@@ -957,7 +980,7 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsClearedOnStaleError) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
// First statement successfully targets one shard, selecing it as the coordinator.
@@ -983,44 +1006,47 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsClearedOnStaleError) {
ASSERT_TRUE(txnRouter.attachTxnFieldsIfNeeded(shard3, {})["startTransaction"].trueValue());
}
-TEST_F(TransactionRouterTest, RetryOnStaleErrorCannotPickNewAtClusterTime) {
+TEST_F(TransactionRouterTest, RetriesCannotPickNewAtClusterTimeOnStatementAfterSelected) {
TxnNumber txnNum{3};
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ // First statement selects an atClusterTime.
- BSONObj expectedReadConcern = BSON("level"
- << "snapshot"
- << "atClusterTime"
- << kInMemoryLogicalTime.asTimestamp());
+ txnRouter.setDefaultAtClusterTime(operationContext());
- {
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("find"
- << "test"));
- ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
- }
+ // A later statement retries on a stale version error and a view resolution error and cannot
+ // change the atClusterTime.
+
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
- // Advance the latest time in the logical clock, simulate a stale config/db error, and verify
- // the retry attempt cannot pick a new atClusterTime.
LogicalTime laterTime(Timestamp(1000, 1));
ASSERT_GT(laterTime, kInMemoryLogicalTime);
LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTime);
txnRouter.onStaleShardOrDbError("find");
+ txnRouter.setDefaultAtClusterTime(operationContext());
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ BSONObj expectedReadConcern = BSON("level"
+ << "snapshot"
+ << "atClusterTime"
+ << kInMemoryLogicalTime.asTimestamp());
- {
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("find"
- << "test"));
- ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
- }
+ auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
+ BSON("find"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
+
+ txnRouter.onViewResolutionError();
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
+ BSON("find"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
}
TEST_F(TransactionRouterTest, WritesCanOnlyBeRetriedIfFirstOverallCommand) {
@@ -1032,7 +1058,7 @@ TEST_F(TransactionRouterTest, WritesCanOnlyBeRetriedIfFirstOverallCommand) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(shard1, {});
@@ -1071,7 +1097,7 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
ASSERT_THROWS_CODE(
txnRouter->abortTransaction(opCtx), DBException, ErrorCodes::NoSuchTransaction);
@@ -1089,7 +1115,7 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
@@ -1122,7 +1148,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
txnRouter->attachTxnFieldsIfNeeded(shard2, {});
@@ -1162,7 +1188,7 @@ TEST_F(TransactionRouterTest, OnViewResolutionErrorClearsAllNewParticipants) {
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
// One shard is targeted by the first statement.
auto firstShardCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, {});
@@ -1212,7 +1238,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
// Should not throw.
txnRouter->implicitlyAbortTransaction(opCtx);
@@ -1230,7 +1256,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
auto future =
@@ -1263,7 +1289,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
txnRouter->attachTxnFieldsIfNeeded(shard2, {});
@@ -1309,7 +1335,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) {
auto txnRouter = TransactionRouter::get(opCtx);
txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
txnRouter->attachTxnFieldsIfNeeded(shard1, {});
auto future =
@@ -1337,7 +1363,7 @@ TEST_F(TransactionRouterTest, ContinuingTransactionPlacesItsReadConcernOnOpCtx)
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
@@ -1359,7 +1385,7 @@ TEST_F(TransactionRouterTest, SubsequentStatementCanSelectAtClusterTimeIfNotSele
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
// Subsequent statement does select an atClusterTime and does target a participant.
- txnRouter.setAtClusterTimeToLatestTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedReadConcern = BSON("level"
<< "snapshot"
@@ -1370,7 +1396,26 @@ TEST_F(TransactionRouterTest, SubsequentStatementCanSelectAtClusterTimeIfNotSele
BSON("insert"
<< "test"));
ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
+
+ // The next statement cannot change the atClusterTime.
+
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
+
+ LogicalTime laterTimeSameStmt(Timestamp(100, 1));
+ ASSERT_GT(laterTimeSameStmt, kInMemoryLogicalTime);
+ LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTimeSameStmt);
+
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ newCmd = txnRouter.attachTxnFieldsIfNeeded(shard2,
+ BSON("insert"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
}
+// TODO SERVER-37630: Verify transactions with majority level read concern don't have and cannot
+// select an atClusterTime once they are allowed.
+
} // unnamed namespace
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index eb8a27b5e33..021148993e8 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -596,7 +596,7 @@ public:
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter->checkOut();
txnRouter->beginOrContinueTxn(operationContext(), kTxnNumber, true);
- txnRouter->setAtClusterTimeToLatestTime(operationContext());
+ txnRouter->setDefaultAtClusterTime(operationContext());
}
void tearDown() override {
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index 54358656aaf..3b496e95d05 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -325,8 +325,7 @@ Status ChunkManagerTargeter::init(OperationContext* opCtx) {
return shardDbStatus.getStatus();
}
- const auto routingInfoStatus =
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, _nss);
+ const auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, _nss);
if (!routingInfoStatus.isOK()) {
return routingInfoStatus.getStatus();
}