summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml2
-rw-r--r--jstests/core/txns/no_snapshot_writes_outside_txn.js3
-rw-r--r--jstests/libs/global_snapshot_reads_util.js69
-rw-r--r--jstests/noPassthrough/readConcern_snapshot_mongos.js42
-rw-r--r--jstests/replsets/non_transaction_snapshot_reads.js16
-rw-r--r--jstests/sharding/sharding_non_transaction_snapshot_read.js143
-rw-r--r--jstests/sharding/snapshot_reads_target_at_point_in_time.js203
-rw-r--r--jstests/sharding/transactions_target_at_point_in_time.js125
-rw-r--r--src/mongo/db/query/cursor_response.cpp15
-rw-r--r--src/mongo/db/query/cursor_response.h9
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp9
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp16
-rw-r--r--src/mongo/s/commands/cluster_distinct_cmd.cpp5
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp1
-rw-r--r--src/mongo/s/commands/strategy.cpp25
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp83
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp5
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h5
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h12
-rw-r--r--src/mongo/s/query/cluster_find.cpp16
-rw-r--r--src/mongo/s/query/establish_cursors.cpp5
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp9
26 files changed, 586 insertions, 244 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
index cc733b396f5..8e623717c26 100644
--- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml
@@ -195,7 +195,7 @@ selector:
# Moves a chunk before continuing a transaction, which can lead to snapshot errors if the
# CSRS failovers are sufficiently slow.
- jstests/sharding/transactions_reject_writes_for_moved_chunks.js
- - jstests/sharding/transactions_target_at_point_in_time.js
+ - jstests/sharding/snapshot_reads_target_at_point_in_time.js
# Tests that rely on shards becoming aware of collection drops regardless of config stepdowns.
# (SERVER-34760)
- jstests/sharding/merge_requires_unique_index.js
diff --git a/jstests/core/txns/no_snapshot_writes_outside_txn.js b/jstests/core/txns/no_snapshot_writes_outside_txn.js
index 1626e7fa009..1694719e04b 100644
--- a/jstests/core/txns/no_snapshot_writes_outside_txn.js
+++ b/jstests/core/txns/no_snapshot_writes_outside_txn.js
@@ -1,7 +1,8 @@
/**
* Verify that readConcern: snapshot is not permitted for writes outside transactions.
*
- * @tags: [uses_transactions]
+ * // TODO(SERVER-47915): remove assumes_against_mongod_not_mongos
+ * @tags: [uses_transactions, assumes_against_mongod_not_mongos]
*/
(function() {
diff --git a/jstests/libs/global_snapshot_reads_util.js b/jstests/libs/global_snapshot_reads_util.js
index da2630f8fd1..87bcc52ae2d 100644
--- a/jstests/libs/global_snapshot_reads_util.js
+++ b/jstests/libs/global_snapshot_reads_util.js
@@ -32,7 +32,7 @@ function verifyInvalidGetMoreAttempts(mainDb, collName, cursorId, lsid, txnNumbe
ErrorCodes.NoSuchTransaction);
}
-var snapshotReadsCursorTest, snapshotReadsDistinctTest;
+var snapshotReadsTest;
(function() {
function makeSnapshotReadConcern(atClusterTime) {
@@ -43,16 +43,8 @@ function makeSnapshotReadConcern(atClusterTime) {
return {level: "snapshot", atClusterTime: atClusterTime};
}
-function awaitCommitted(db, ts) {
- jsTestLog(`Wait for ${ts} to be committed on ${db.getMongo()}`);
- assert.soonNoExcept(function() {
- const replSetStatus =
- assert.commandWorked(db.getSiblingDB("admin").runCommand({replSetGetStatus: 1}));
- return timestampCmp(replSetStatus.optimes.readConcernMajorityOpTime.ts, ts) >= 0;
- }, `${ts} was never committed on ${db.getMongo()}`);
-}
-
-snapshotReadsCursorTest = function(testScenarioName, primaryDB, secondaryDB, collName) {
+function snapshotReadsCursorTest(
+ {testScenarioName, primaryDB, secondaryDB, collName, awaitCommittedFn}) {
const docs = [...Array(10).keys()].map((i) => ({"_id": i}));
const commands = {
@@ -93,7 +85,7 @@ snapshotReadsCursorTest = function(testScenarioName, primaryDB, secondaryDB, col
assert(insertTimestamp);
jsTestLog(`Inserted 10 documents at timestamp ${insertTimestamp}`);
- awaitCommitted(db, insertTimestamp);
+ awaitCommittedFn(db, insertTimestamp);
// Create a session if useCausalConsistency is true.
let causalDb, sessionTimestamp;
@@ -128,7 +120,7 @@ snapshotReadsCursorTest = function(testScenarioName, primaryDB, secondaryDB, col
{update: collName, updates: [{q: {}, u: {$set: {x: true}}, multi: true}]}));
jsTestLog(`Updated collection "${collName}" at timestamp ${res.operationTime}`);
- awaitCommitted(db, res.operationTime);
+ awaitCommittedFn(db, res.operationTime);
// Retrieve the rest of the read command's result set.
res = assert.commandWorked(
@@ -167,9 +159,10 @@ snapshotReadsCursorTest = function(testScenarioName, primaryDB, secondaryDB, col
}
}
}
-};
+}
-snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, collName) {
+function snapshotReadsDistinctTest(
+ {testScenarioName, primaryDB, secondaryDB, collName, awaitCommittedFn}) {
// Note: this test sets documents' "x" field, the test above uses "_id".
const docs = [...Array(10).keys()].map((i) => ({"x": i}));
@@ -184,9 +177,10 @@ snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, c
for (let useCausalConsistency of [false, true]) {
for (let [db, readPreferenceMode] of [[primaryDB, "primary"], [secondaryDB, "secondary"]]) {
- jsTestLog(`Testing "distinct" on collection ` +
- `${collName} with read preference ${readPreferenceMode} and causal` +
- ` consistency ${useCausalConsistency}`);
+ jsTestLog(
+ `Testing "distinct" with the ${testScenarioName} scenario on` +
+ ` collection ${collName} with read preference ${readPreferenceMode} and causal` +
+ ` consistency ${useCausalConsistency}`);
let res =
assert.commandWorked(primaryDB.runCommand({insert: collName, documents: docs}));
@@ -194,7 +188,7 @@ snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, c
assert(insertTimestamp);
jsTestLog(`Inserted 10 documents at timestamp ${insertTimestamp}`);
- awaitCommitted(db, insertTimestamp);
+ awaitCommittedFn(db, insertTimestamp);
// Create a session if useCausalConsistency is true.
let causalDb, sessionTimestamp;
@@ -227,7 +221,7 @@ snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, c
{update: collName, updates: [{q: {}, u: {$set: {x: 42}}, multi: true}]}));
jsTestLog(`Updated collection "${collName}" at timestamp ${res.operationTime}`);
- awaitCommitted(db, res.operationTime);
+ awaitCommittedFn(db, res.operationTime);
// This read shows the updated docs.
res = assert.commandWorked(causalDb.runCommand(distinctCommand(readPreferenceMode)));
@@ -249,5 +243,40 @@ snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, c
assert.commandWorked(primaryDB[collName].remove({}, {writeConcern: {w: "majority"}}));
}
}
+}
+
+/**
+ * Test non-transaction snapshot reads on primary and secondary.
+ *
+ * Pass two handles to the same database; either both connected to a mongos, or one connected to
+ * a replica set primary and the other connected to a replica set secondary. (The test will also
+ * pass $readPreference, so if the handles are connected to a mongos, then the reads will target
+ * primary/secondary shard servers.)
+ *
+ * For awaitCommittedFn, pass a function that waits for the last write to be committed on all
+ * secondaries.
+ *
+ * @param {testScenarioName} String used when logging progress
+ * @param {primaryDB} Database handle connected to a primary or mongos
+ * @param {secondaryDB} Database handle connected to a secondary or mongos
+ * @param {collName} String
+ * @param {awaitCommittedFn} A function with no arguments or return value
+ */
+snapshotReadsTest = function(
+ {testScenarioName, primaryDB, secondaryDB, collName, awaitCommittedFn}) {
+ snapshotReadsCursorTest({
+ testScenarioName: testScenarioName,
+ primaryDB: primaryDB,
+ secondaryDB: secondaryDB,
+ collName: collName,
+ awaitCommittedFn: awaitCommittedFn
+ });
+ snapshotReadsDistinctTest({
+ testScenarioName: testScenarioName,
+ primaryDB: primaryDB,
+ secondaryDB: secondaryDB,
+ collName: collName,
+ awaitCommittedFn: awaitCommittedFn
+ });
};
})();
diff --git a/jstests/noPassthrough/readConcern_snapshot_mongos.js b/jstests/noPassthrough/readConcern_snapshot_mongos.js
index 472da1af4de..b4bbb2a7680 100644
--- a/jstests/noPassthrough/readConcern_snapshot_mongos.js
+++ b/jstests/noPassthrough/readConcern_snapshot_mongos.js
@@ -13,57 +13,20 @@ function expectSuccessInTxnThenAbort(session, sessionConn, cmdObj) {
assert.commandWorked(session.abortTransaction_forTesting());
}
-// Runs the command as the first in a multi statement txn that is aborted right after, expecting
-// failure with the given error code.
-function expectFailInTxnThenAbort(session, sessionConn, expectedErrorCode, cmdObj) {
- session.startTransaction();
- assert.commandFailedWithCode(sessionConn.runCommand(cmdObj), expectedErrorCode);
- assert.commandFailedWithCode(session.abortTransaction_forTesting(),
- ErrorCodes.NoSuchTransaction);
-}
-
const dbName = "test";
const collName = "coll";
let st = new ShardingTest({shards: 1, rs: {nodes: 2}, config: 2, mongos: 1});
let testDB = st.getDB(dbName);
-let coll = testDB.coll;
// Insert data to create the collection.
assert.commandWorked(testDB[collName].insert({x: 1}));
flushRoutersAndRefreshShardMetadata(st, {ns: dbName + "." + collName, dbNames: [dbName]});
-// noPassthrough tests
-
-// readConcern 'snapshot' is not allowed outside session context.
-assert.commandFailedWithCode(testDB.runCommand({find: collName, readConcern: {level: "snapshot"}}),
- ErrorCodes.InvalidOptions);
-
let session = testDB.getMongo().startSession({causalConsistency: false});
let sessionDb = session.getDatabase(dbName);
-// readConcern 'snapshot' is not allowed outside transaction context.
-assert.commandFailedWithCode(sessionDb.runCommand({
- find: collName,
- readConcern: {level: "snapshot"},
-}),
- ErrorCodes.InvalidOptions);
-
-// readConcern 'snapshot' is not allowed with 'atClusterTime'.
-let pingRes = assert.commandWorked(st.s0.adminCommand({ping: 1}));
-assert(pingRes.hasOwnProperty("$clusterTime"), tojson(pingRes));
-assert(pingRes.$clusterTime.hasOwnProperty("clusterTime"), tojson(pingRes));
-const clusterTime = pingRes.$clusterTime.clusterTime;
-
-expectFailInTxnThenAbort(session, sessionDb, ErrorCodes.InvalidOptions, {
- find: collName,
- readConcern: {level: "snapshot", atClusterTime: clusterTime},
-});
-
-// Passthrough tests. There are parts not implemented on mongod and mongos, they are tracked by
-// separate jiras
-
// readConcern 'snapshot' is supported by insert on mongos in a transaction.
expectSuccessInTxnThenAbort(session, sessionDb, {
insert: collName,
@@ -113,6 +76,11 @@ expectSuccessInTxnThenAbort(session, sessionDb, {
readConcern: {level: "snapshot"},
});
+let pingRes = assert.commandWorked(st.s0.adminCommand({ping: 1}));
+assert(pingRes.hasOwnProperty("$clusterTime"), tojson(pingRes));
+assert(pingRes.$clusterTime.hasOwnProperty("clusterTime"), tojson(pingRes));
+const clusterTime = pingRes.$clusterTime.clusterTime;
+
// readConcern 'snapshot' is allowed with 'afterClusterTime'.
expectSuccessInTxnThenAbort(session, sessionDb, {
find: collName,
diff --git a/jstests/replsets/non_transaction_snapshot_reads.js b/jstests/replsets/non_transaction_snapshot_reads.js
index 2be6b3233a0..ed4dc19a164 100644
--- a/jstests/replsets/non_transaction_snapshot_reads.js
+++ b/jstests/replsets/non_transaction_snapshot_reads.js
@@ -1,4 +1,5 @@
-/**Tests readConcern level snapshot outside of transactions.
+/**
+ * Tests readConcern level snapshot outside of transactions.
*
* @tags: [
* requires_fcv_46,
@@ -12,15 +13,22 @@ load("jstests/libs/global_snapshot_reads_util.js");
// TODO(SERVER-47672): Use minSnapshotHistoryWindowInSeconds instead.
const options = {
- setParameter: "maxTargetSnapshotHistoryWindowInSeconds=600",
+ setParameter: {maxTargetSnapshotHistoryWindowInSeconds: 600}
};
const replSet = new ReplSetTest({nodes: 3, nodeOptions: options});
replSet.startSet();
replSet.initiateWithHighElectionTimeout();
const primaryDB = replSet.getPrimary().getDB('test');
const secondaryDB = replSet.getSecondary().getDB('test');
-snapshotReadsCursorTest(jsTestName(), primaryDB, secondaryDB, "test");
-snapshotReadsDistinctTest(jsTestName(), primaryDB, secondaryDB, "test");
+snapshotReadsTest({
+ testScenarioName: jsTestName(),
+ primaryDB: primaryDB,
+ secondaryDB: secondaryDB,
+ collName: "test",
+ awaitCommittedFn: () => {
+ replSet.awaitLastOpCommitted();
+ }
+});
// Ensure "atClusterTime" is omitted from a regular (non-snapshot) reads.
primaryDB["collection"].insertOne({});
diff --git a/jstests/sharding/sharding_non_transaction_snapshot_read.js b/jstests/sharding/sharding_non_transaction_snapshot_read.js
new file mode 100644
index 00000000000..a7b3d0309be
--- /dev/null
+++ b/jstests/sharding/sharding_non_transaction_snapshot_read.js
@@ -0,0 +1,143 @@
+/**
+ * Tests readConcern level snapshot outside of transactions.
+ *
+ * @tags: [
+ * requires_fcv_46,
+ * requires_majority_read_concern,
+ * requires_find_command
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/global_snapshot_reads_util.js");
+load("jstests/sharding/libs/sharded_transactions_helpers.js");
+
+// TODO(SERVER-47672): Use minSnapshotHistoryWindowInSeconds instead.
+const configOptions = {
+ setParameter: {maxTargetSnapshotHistoryWindowInSeconds: 600}
+};
+
+const dbName = "test";
+const shardedCollName = "shardedColl";
+const unshardedCollName = "unshardedColl";
+
+function setUpAllScenarios(st) {
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ assert.commandWorked(st.s.adminCommand(
+ {shardCollection: st.s.getDB(dbName)[shardedCollName] + "", key: {_id: 1}}));
+}
+
+let shardingScenarios = {
+ singleShard: {
+ compatibleCollections: [shardedCollName, unshardedCollName],
+ setUp: function() {
+ const st = new ShardingTest({
+ mongos: 1,
+ config: 1,
+ shards: {rs0: {nodes: 2}},
+ other: {configOptions: configOptions}
+ });
+ setUpAllScenarios(st);
+ return st;
+ }
+ },
+ multiShardAllShardReads: {
+ compatibleCollections: [shardedCollName],
+ setUp: function() {
+ let st = new ShardingTest({
+ shards: {
+ rs0: {nodes: 2},
+ rs1: {nodes: 2},
+ rs2: {nodes: 2},
+ },
+ mongos: 1,
+ config: 1,
+ other: {configOptions: configOptions}
+ });
+ setUpAllScenarios(st);
+ const mongos = st.s0;
+ const ns = dbName + '.' + shardedCollName;
+
+ // snapshotReadsTest() inserts ids 0-9 and tries snapshot reads on the collection.
+ assert.commandWorked(st.splitAt(ns, {_id: 4}));
+ assert.commandWorked(st.splitAt(ns, {_id: 7}));
+
+ assert.commandWorked(
+ mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard0.shardName}));
+ assert.commandWorked(
+ mongos.adminCommand({moveChunk: ns, find: {_id: 4}, to: st.shard1.shardName}));
+ assert.commandWorked(
+ mongos.adminCommand({moveChunk: ns, find: {_id: 7}, to: st.shard2.shardName}));
+
+ assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName}));
+ assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName}));
+ assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard2.shardName}));
+
+ flushRoutersAndRefreshShardMetadata(st, {ns});
+
+ return st;
+ }
+ },
+ // Only two out of three shards have documents.
+ multiShardSomeShardReads: {
+ compatibleCollections: [shardedCollName],
+ setUp: function() {
+ let st = new ShardingTest({
+ shards: {
+ rs0: {nodes: 2},
+ rs1: {nodes: 2},
+ rs2: {nodes: 2},
+ },
+ mongos: 1,
+ config: 1,
+ other: {configOptions: configOptions}
+ });
+ setUpAllScenarios(st);
+ const mongos = st.s0;
+ const ns = dbName + '.' + shardedCollName;
+
+ // snapshotReadsTest() inserts ids 0-9 and tries snapshot reads on the collection.
+ assert.commandWorked(st.splitAt(ns, {_id: 5}));
+ assert.commandWorked(
+ mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard1.shardName}));
+ assert.commandWorked(
+ mongos.adminCommand({moveChunk: ns, find: {_id: 7}, to: st.shard2.shardName}));
+
+ assert.eq(0, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName}));
+ assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName}));
+ assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard2.shardName}));
+
+ flushRoutersAndRefreshShardMetadata(st, {ns});
+
+ return st;
+ }
+ }
+};
+
+for (let [scenarioName, scenario] of Object.entries(shardingScenarios)) {
+ scenario.compatibleCollections.forEach(function(collName) {
+ jsTestLog(`Run scenario ${scenarioName} with collection ${collName}`);
+ let st = scenario.setUp();
+
+ function awaitCommittedFn() {
+ for (let i = 0; st['rs' + i] !== undefined; i++) {
+ st['rs' + i].awaitLastOpCommitted();
+ }
+ }
+
+ // Pass the same DB handle as "primaryDB" and "secondaryDB" params; the test functions will
+ // send readPreference to mongos to target primary/secondary shard servers.
+ let db = st.s.getDB(dbName);
+ snapshotReadsTest({
+ testScenarioName: scenarioName,
+ primaryDB: db,
+ secondaryDB: db,
+ collName: collName,
+ awaitCommittedFn: awaitCommittedFn
+ });
+ st.stop();
+ });
+}
+})();
diff --git a/jstests/sharding/snapshot_reads_target_at_point_in_time.js b/jstests/sharding/snapshot_reads_target_at_point_in_time.js
new file mode 100644
index 00000000000..0828c5b9a75
--- /dev/null
+++ b/jstests/sharding/snapshot_reads_target_at_point_in_time.js
@@ -0,0 +1,203 @@
+// Verifies mongos uses a versioned routing table to target subsequent requests for snapshot reads.
+//
+// @tags: [
+// requires_find_command,
+// requires_sharding,
+// uses_multi_shard_transaction,
+// uses_transactions,
+// requires_fcv_46
+// ]
+(function() {
+"use strict";
+
+load("jstests/sharding/libs/sharded_transactions_helpers.js");
+
+function expectChunks(st, ns, chunks) {
+ for (let i = 0; i < chunks.length; i++) {
+ assert.eq(chunks[i],
+ st.s.getDB("config").chunks.count({ns: ns, shard: st["shard" + i].shardName}),
+ "unexpected number of chunks on shard " + i);
+ }
+}
+
+const dbName = "test";
+const collName = "foo";
+const ns = dbName + '.' + collName;
+
+const st = new ShardingTest({
+ shards: 3,
+ mongos: 1,
+ config: 1,
+ other: {
+ rs0: {nodes: 2},
+ rs1: {nodes: 2},
+ rs2: {nodes: 2},
+ // Disable expiring old chunk history to ensure the transactions are able to read from a
+ // shard that has donated a chunk, even if the migration takes longer than the amount of
+ // time for which a chunk's history is normally stored (see SERVER-39763).
+ configOptions:
+ {setParameter: {"failpoint.skipExpiringOldChunkHistory": "{mode: 'alwaysOn'}"}}
+ }
+});
+
+// Set up one sharded collection with 2 chunks, both on the primary shard.
+
+assert.commandWorked(
+ st.s.getDB(dbName)[collName].insert({_id: -5}, {writeConcern: {w: "majority"}}));
+assert.commandWorked(
+ st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}}));
+
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+st.ensurePrimaryShard(dbName, st.shard0.shardName);
+
+assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
+
+expectChunks(st, ns, [2, 0, 0]);
+
+// Temporarily move a chunk to Shard2, to avoid picking a global read timestamp before the
+// sharding metadata cache collections are created.
+assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName}));
+
+assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName}));
+expectChunks(st, ns, [1, 1, 0]);
+
+// First command targets the first chunk, the second command targets the second chunk.
+const kCommandTestCases = [
+ {
+ name: "aggregate",
+ commands: [
+ {aggregate: collName, pipeline: [{$match: {_id: -5}}], cursor: {}},
+ {aggregate: collName, pipeline: [{$match: {_id: 5}}], cursor: {}}
+ ]
+ },
+ {
+ name: "find",
+ commands: [{find: collName, filter: {_id: -5}}, {find: collName, filter: {_id: 5}}]
+ }
+];
+
+const TestMode = {
+ TRANSACTION: 'TRANSACTION',
+ CAUSAL_CONSISTENCY: 'CAUSAL_CONSISTENCY',
+ SNAPSHOT: 'SNAPSHOT',
+ SNAPSHOT_AT_CLUSTER_TIME: 'SNAPSHOT_AT_CLUSTER_TIME'
+};
+
+function runTest(testCase, testMode, readPreferenceMode) {
+ const cmdName = testCase.name;
+ // Clone commands so we can modify readConcern and readPreference.
+ const targetChunk1Cmd = Object.assign({}, testCase.commands[0]);
+ const targetChunk2Cmd = Object.assign({}, testCase.commands[1]);
+ targetChunk1Cmd["$readPreference"] = {mode: readPreferenceMode};
+ targetChunk2Cmd["$readPreference"] = {mode: readPreferenceMode};
+
+ jsTestLog(`Testing ${cmdName} in mode ${testMode}`);
+
+ expectChunks(st, ns, [1, 1, 0]);
+
+ st.refreshCatalogCacheForNs(st.s, ns);
+
+ let session, db;
+ switch (testMode) {
+ case TestMode.TRANSACTION:
+ session = st.s.startSession({causalConsistency: false});
+ session.startTransaction({readConcern: {level: "snapshot"}});
+ db = session.getDatabase(dbName);
+ break;
+ case TestMode.CAUSAL_CONSISTENCY:
+ session = st.s.startSession({causalConsistency: true});
+ db = session.getDatabase(dbName);
+ db[collName].findOne(); // Establish a timestamp in the session.
+ break;
+ case TestMode.SNAPSHOT:
+ db = st.s.getDB(dbName);
+ targetChunk1Cmd.readConcern = targetChunk2Cmd.readConcern = {level: "snapshot"};
+ break;
+ case TestMode.SNAPSHOT_AT_CLUSTER_TIME:
+ db = st.s.getDB(dbName);
+ const opTime = st.s.getDB(dbName).runCommand({ping: 1}).operationTime;
+ targetChunk1Cmd.readConcern = {level: "snapshot", atClusterTime: opTime};
+ break;
+ }
+
+ // Establish a read timestamp.
+ let res = assert.commandWorked(db.runCommand(targetChunk1Cmd));
+ assert.sameMembers([{_id: -5}],
+ res.cursor.firstBatch,
+ `expected to find document in first chunk, command` +
+ ` ${tojson(targetChunk1Cmd)} returned ${tojson(res)}`);
+
+ const targetChunk1CmdTimestamp = res.cursor.atClusterTime;
+ jsTestLog(`Chunk 1 command replied with timestamp ${targetChunk1CmdTimestamp}`);
+
+ // Move a chunk from Shard1 to Shard2 outside of the transaction, and update it. This will
+ // happen at a later logical time than the read timestamp.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName}));
+
+ res = assert.commandWorked(st.s.getDB(dbName).runCommand({
+ update: collName,
+ updates: [{q: {_id: 5}, u: {$set: {x: true}}}],
+ writeConcern: {w: "majority"}
+ }));
+ jsTestLog(`Updated chunk 2 at timestamp ${tojson(res.operationTime)}`);
+ st.refreshCatalogCacheForNs(st.s, ns);
+
+ if (testMode === TestMode.SNAPSHOT_AT_CLUSTER_TIME) {
+ targetChunk2Cmd.readConcern = {level: "snapshot", atClusterTime: targetChunk1CmdTimestamp};
+ }
+
+ res = assert.commandWorked(db.runCommand(targetChunk2Cmd));
+
+ switch (testMode) {
+ case TestMode.CAUSAL_CONSISTENCY:
+ case TestMode.SNAPSHOT:
+ // We may or may not see the result of the update above.
+ assert.eq(1,
+ res.cursor.firstBatch.length,
+ `expected to find document in second chunk, command` +
+ ` ${tojson(targetChunk2Cmd)} returned ${tojson(res)}`);
+ assert.eq(5,
+ res.cursor.firstBatch[0]._id,
+ `expected to find {_id: 5} in second chunk, command` +
+ ` ${tojson(targetChunk2Cmd)} returned ${tojson(res)}`);
+ break;
+ case TestMode.TRANSACTION:
+ case TestMode.SNAPSHOT_AT_CLUSTER_TIME:
+ // Must not see the update's result.
+ assert.sameMembers([{_id: 5}],
+ res.cursor.firstBatch,
+ `expected to find document in second chunk, command` +
+ ` ${tojson(targetChunk2Cmd)} returned ${tojson(res)}`);
+ break;
+ }
+
+ if (testMode === TestMode.TRANSACTION) {
+ assert.commandWorked(session.commitTransaction_forTesting());
+ }
+
+ // Move the chunk back to Shard1 and clear updated field for the next iteration.
+ assert.commandWorked(st.s.getDB(dbName).runCommand({
+ update: collName,
+ updates: [{q: {_id: 5}, u: {$unset: {x: true}}}],
+ writeConcern: {w: "majority"}
+ }));
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName}));
+}
+
+for (let testCase of kCommandTestCases) {
+ for (let testMode of Object.values(TestMode)) {
+ for (let readPreferenceMode of ["primary", "secondary"]) {
+ if (readPreferenceMode === "secondary" && testMode === TestMode.TRANSACTION) {
+ // Transactions aren't supported on secondaries.
+ continue;
+ }
+
+ runTest(testCase, testMode, readPreferenceMode);
+ }
+ }
+}
+st.stop();
+})();
diff --git a/jstests/sharding/transactions_target_at_point_in_time.js b/jstests/sharding/transactions_target_at_point_in_time.js
deleted file mode 100644
index aef0800aa8e..00000000000
--- a/jstests/sharding/transactions_target_at_point_in_time.js
+++ /dev/null
@@ -1,125 +0,0 @@
-// Verifies mongos uses a versioned routing table to target subsequent requests in transactions with
-// snapshot level read concern.
-//
-// @tags: [
-// requires_find_command,
-// requires_sharding,
-// uses_multi_shard_transaction,
-// uses_transactions,
-// ]
-(function() {
-"use strict";
-
-load("jstests/sharding/libs/sharded_transactions_helpers.js");
-
-function expectChunks(st, ns, chunks) {
- for (let i = 0; i < chunks.length; i++) {
- assert.eq(chunks[i],
- st.s.getDB("config").chunks.count({ns: ns, shard: st["shard" + i].shardName}),
- "unexpected number of chunks on shard " + i);
- }
-}
-
-const dbName = "test";
-const collName = "foo";
-const ns = dbName + '.' + collName;
-
-const st = new ShardingTest({
- shards: 3,
- mongos: 1,
- config: 1,
- other: {
- // Disable expiring old chunk history to ensure the transactions are able to read from a
- // shard that has donated a chunk, even if the migration takes longer than the amount of
- // time for which a chunk's history is normally stored (see SERVER-39763).
- configOptions:
- {setParameter: {"failpoint.skipExpiringOldChunkHistory": "{mode: 'alwaysOn'}"}}
- }
-});
-
-// Set up one sharded collection with 2 chunks, both on the primary shard.
-
-assert.commandWorked(
- st.s.getDB(dbName)[collName].insert({_id: -5}, {writeConcern: {w: "majority"}}));
-assert.commandWorked(
- st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}}));
-
-assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
-st.ensurePrimaryShard(dbName, st.shard0.shardName);
-
-assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
-assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
-
-expectChunks(st, ns, [2, 0, 0]);
-
-// Temporarily move a chunk to Shard2, to avoid picking a global read timestamp before the
-// sharding metadata cache collections are created.
-assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName}));
-
-assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName}));
-expectChunks(st, ns, [1, 1, 0]);
-
-// First command targets the first chunk, the second command targets the second chunk.
-const kCommandTestCases = [
- {
- name: "aggregate",
- commandFuncs: [
- (coll) => coll.aggregate({$match: {_id: -5}}).itcount(),
- (coll) => coll.aggregate({$match: {_id: 5}}).itcount(),
- ]
- },
- {
- name: "find",
- commandFuncs: [
- (coll) => coll.find({_id: -5}).itcount(),
- (coll) => coll.find({_id: 5}).itcount(),
- ]
- }
-];
-
-function runTest(testCase) {
- const cmdName = testCase.name;
- const targetChunk1Func = testCase.commandFuncs[0];
- const targetChunk2Func = testCase.commandFuncs[1];
-
- jsTestLog("Testing " + cmdName);
-
- expectChunks(st, ns, [1, 1, 0]);
-
- st.refreshCatalogCacheForNs(st.s, ns);
-
- const session = st.s.startSession();
- const sessionDB = session.getDatabase(dbName);
- const sessionColl = sessionDB[collName];
-
- session.startTransaction({readConcern: {level: "snapshot"}});
-
- // Start a transaction on Shard0 which will select and pin a global read timestamp.
- assert.eq(targetChunk1Func(sessionColl),
- 1,
- "expected to find document in first chunk, cmd: " + cmdName);
-
- // Move a chunk from Shard1 to Shard2 outside of the transaction. This will happen at a
- // later logical time than the transaction's read timestamp.
- assert.commandWorked(
- st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName}));
-
- // Target a document in the chunk that was moved. The router should get a stale shard
- // version from Shard1 then retry on Shard1 and see the document.
- st.refreshCatalogCacheForNs(st.s, ns);
-
- assert.eq(targetChunk2Func(sessionColl),
- 1,
- "expected to find document in second chunk, cmd: " + cmdName);
-
- assert.commandWorked(session.commitTransaction_forTesting());
-
- // Move the chunk back to Shard1 for the next iteration.
- assert.commandWorked(
- st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName}));
-}
-
-kCommandTestCases.forEach(runTest);
-
-st.stop();
-})();
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 3cd99d7e806..4b3eea43ee8 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -129,6 +129,7 @@ void appendGetMoreResponseObject(long long cursorId,
CursorResponse::CursorResponse(NamespaceString nss,
CursorId cursorId,
std::vector<BSONObj> batch,
+ boost::optional<Timestamp> atClusterTime,
boost::optional<long long> numReturnedSoFar,
boost::optional<BSONObj> postBatchResumeToken,
boost::optional<BSONObj> writeConcernError,
@@ -136,6 +137,7 @@ CursorResponse::CursorResponse(NamespaceString nss,
: _nss(std::move(nss)),
_cursorId(cursorId),
_batch(std::move(batch)),
+ _atClusterTime(std::move(atClusterTime)),
_numReturnedSoFar(numReturnedSoFar),
_postBatchResumeToken(std::move(postBatchResumeToken)),
_writeConcernError(std::move(writeConcernError)),
@@ -235,6 +237,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
<< postBatchResumeTokenElem.type()};
}
+ auto atClusterTimeElem = cursorObj[kAtClusterTimeField];
+ if (atClusterTimeElem && atClusterTimeElem.type() != BSONType::bsonTimestamp) {
+ return {ErrorCodes::BadValue,
+ str::stream() << kAtClusterTimeField
+ << " format is invalid; expected Timestamp, but found: "
+ << atClusterTimeElem.type()};
+ }
+
auto partialResultsReturned = cursorObj[kPartialResultsReturnedField];
if (partialResultsReturned) {
@@ -257,6 +267,7 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
return {{NamespaceString(fullns),
cursorId,
std::move(batch),
+ atClusterTimeElem ? atClusterTimeElem.timestamp() : boost::optional<Timestamp>{},
boost::none,
postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned()
: boost::optional<BSONObj>{},
@@ -283,6 +294,10 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken);
}
+ if (_atClusterTime) {
+ cursorBuilder.append(kAtClusterTimeField, *_atClusterTime);
+ }
+
if (_partialResultsReturned) {
cursorBuilder.append(kPartialResultsReturnedField, true);
}
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 5e03d2c5a3e..7e7633b14c1 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -50,6 +50,9 @@ class CursorResponseBuilder {
public:
/**
* Structure used to configure the CursorResponseBuilder.
+ *
+ * If we selected atClusterTime or received it from the client, transmit it back to the client
+ * in the cursor reply document by setting it here.
*/
struct Options {
bool isInitialResponse = false;
@@ -192,6 +195,7 @@ public:
CursorResponse(NamespaceString nss,
CursorId cursorId,
std::vector<BSONObj> batch,
+ boost::optional<Timestamp> atClusterTime = boost::none,
boost::optional<long long> numReturnedSoFar = boost::none,
boost::optional<BSONObj> postBatchResumeToken = boost::none,
boost::optional<BSONObj> writeConcernError = boost::none,
@@ -232,6 +236,10 @@ public:
return _writeConcernError;
}
+ boost::optional<Timestamp> getAtClusterTime() const {
+ return _atClusterTime;
+ }
+
bool getPartialResultsReturned() const {
return _partialResultsReturned;
}
@@ -249,6 +257,7 @@ private:
NamespaceString _nss;
CursorId _cursorId;
std::vector<BSONObj> _batch;
+ boost::optional<Timestamp> _atClusterTime;
boost::optional<long long> _numReturnedSoFar;
boost::optional<BSONObj> _postBatchResumeToken;
boost::optional<BSONObj> _writeConcernError;
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index db1a70b6a74..a3bc5449ac7 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -300,6 +300,7 @@ TEST(CursorResponseTest, toBSONPartialResultsReturned) {
boost::none,
boost::none,
boost::none,
+ boost::none,
true);
BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse);
BSONObj expectedResponse = BSON(
@@ -347,8 +348,12 @@ TEST(CursorResponseTest, serializePostBatchResumeToken) {
std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
auto postBatchResumeToken =
ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson();
- CursorResponse response(
- NamespaceString("db.coll"), CursorId(123), batch, boost::none, postBatchResumeToken);
+ CursorResponse response(NamespaceString("db.coll"),
+ CursorId(123),
+ batch,
+ boost::none,
+ boost::none,
+ postBatchResumeToken);
auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse);
ASSERT_BSONOBJ_EQ(serialized,
BSON("cursor" << BSON("id" << CursorId(123) << "ns"
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index 62b1bf834f9..f7f24aa31c2 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -267,6 +267,7 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx,
BSONObjBuilder output;
bool seenReadConcern = false;
bool seenWriteConcern = false;
+ const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
for (const auto& elem : cmdObj) {
const auto name = elem.fieldNameStringData();
if (appendRC && name == repl::ReadConcernArgs::kReadConcernFieldName) {
@@ -276,13 +277,18 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx,
seenWriteConcern = true;
}
if (!output.hasField(name)) {
- output.append(elem);
+ // If mongos selected atClusterTime, forward it to the shard.
+ if (name == repl::ReadConcernArgs::kReadConcernFieldName &&
+ readConcernArgs.wasAtClusterTimeSelected()) {
+ output.appendElements(readConcernArgs.toBSON());
+ } else {
+ output.append(elem);
+ }
}
}
// Finally, add the new read/write concern.
if (appendRC && !seenReadConcern) {
- const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
output.appendElements(readConcernArgs.toBSON());
}
if (appendWC && !seenWriteConcern) {
@@ -721,6 +727,12 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
auto catalogCache = Grid::get(opCtx)->catalogCache();
invariant(catalogCache);
+ auto argsAtClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
+ if (argsAtClusterTime) {
+ return catalogCache->getCollectionRoutingInfoAt(
+ opCtx, nss, argsAtClusterTime->asTimestamp());
+ }
+
// Return the latest routing table if not running in a transaction with snapshot level read
// concern.
auto txnRouter = TransactionRouter::get(opCtx);
diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp
index 0e02d3169a1..32d0ffe4b1e 100644
--- a/src/mongo/s/commands/cluster_distinct_cmd.cpp
+++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp
@@ -250,6 +250,11 @@ public:
}
result.appendArray("values", b.obj());
+ // If mongos selected atClusterTime or received it from client, transmit it back.
+ if (repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()) {
+ result.append("atClusterTime"_sd,
+ repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()->asTimestamp());
+ }
return true;
}
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index 66cdf67b00f..7ebab369391 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -230,6 +230,7 @@ public:
// Build the response document.
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
+ options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
CursorResponseBuilder firstBatch(result, options);
for (const auto& obj : batch) {
firstBatch.append(obj);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index d1dd14d1a70..6b70018dfd8 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -252,10 +252,21 @@ void execCommandClient(OperationContext* opCtx,
}
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
- uassert(ErrorCodes::InvalidOptions,
- "read concern snapshot is only supported in a multi-statement transaction",
- TransactionRouter::get(opCtx));
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
+ !TransactionRouter::get(opCtx) && !readConcernArgs.getArgsAtClusterTime()) {
+ // Select the latest known clusterTime as the atClusterTime for snapshot reads outside
+ // of transactions.
+ auto atClusterTime = [&] {
+ auto latestKnownClusterTime = LogicalClock::get(opCtx)->getClusterTime();
+ // If the user passed afterClusterTime, the chosen time must be greater than or
+ // equal to it.
+ auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime();
+ if (afterClusterTime && *afterClusterTime > latestKnownClusterTime) {
+ return afterClusterTime->asTimestamp();
+ }
+ return latestKnownClusterTime.asTimestamp();
+ }();
+ readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime);
}
// attach tracking
@@ -401,12 +412,6 @@ void runCommand(OperationContext* opCtx,
return;
}
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
- uassert(ErrorCodes::InvalidOptions,
- "read concern snapshot is not supported with atClusterTime on mongos",
- !readConcernArgs.getArgsAtClusterTime());
- }
-
boost::optional<RouterOperationContextSession> routerSession;
try {
rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 49ca39fed20..ff930da893d 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -1348,7 +1348,8 @@ DEATH_TEST_REGEX_F(
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
- CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor)));
+ CursorResponse(
+ kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor)));
// Create a second cursor whose initial batch has no PBRT.
cursors.push_back(
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
@@ -1381,7 +1382,8 @@ DEATH_TEST_REGEX_F(AsyncResultsMergerTest,
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
- CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor)));
+ CursorResponse(
+ kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor)));
params.setRemotes(std::move(cursors));
params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
params.setSort(change_stream_constants::kSortSpec);
@@ -1409,11 +1411,13 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfRemoteHasLowerPostB
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
- CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor)));
+ CursorResponse(
+ kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor)));
auto tooLowPBRT = makePostBatchResumeToken(Timestamp(1, 2));
- cursors.push_back(makeRemoteCursor(kTestShardIds[1],
- kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT)));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1],
+ kTestShardHosts[1],
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT)));
params.setRemotes(std::move(cursors));
params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
params.setSort(change_stream_constants::kSortSpec);
@@ -1459,7 +1463,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
<< firstDocSortKey.firstElement().String() << "'}]}");
std::vector<BSONObj> batch1 = {firstCursorResponse};
auto firstDoc = batch1.front();
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor);
+ responses.emplace_back(
+ kTestNss, CursorId(123), batch1, boost::none, boost::none, pbrtFirstCursor);
scheduleNetworkResponses(std::move(responses));
// Should be ready now.
@@ -1471,7 +1476,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
newCursors.push_back(
makeRemoteCursor(kTestShardIds[1],
kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT)));
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT)));
arm->addNewShardCursors(std::move(newCursors));
// Now shouldn't be ready, our guarantee from the new shard isn't sufficiently advanced.
@@ -1488,7 +1493,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
<< secondDocSortKey.firstElement().String() << "'}]}");
std::vector<BSONObj> batch2 = {secondCursorResponse};
auto secondDoc = batch2.front();
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor);
+ responses.emplace_back(
+ kTestNss, CursorId(456), batch2, boost::none, boost::none, pbrtSecondCursor);
scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1536,7 +1542,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
<< "', documentKey: {_id: 1}}, $sortKey: [{_data: '"
<< firstDocSortKey.firstElement().String() << "'}]}");
std::vector<BSONObj> batch1 = {firstCursorResponse};
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor);
+ responses.emplace_back(
+ kTestNss, CursorId(123), batch1, boost::none, boost::none, pbrtFirstCursor);
scheduleNetworkResponses(std::move(responses));
// Should be ready now.
@@ -1548,7 +1555,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
newCursors.push_back(
makeRemoteCursor(kTestShardIds[1],
kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT)));
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT)));
arm->addNewShardCursors(std::move(newCursors));
// Now shouldn't be ready, our guarantee from the new shard isn't sufficiently advanced.
@@ -1566,7 +1573,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
std::vector<BSONObj> batch2 = {secondCursorResponse};
// The last observed time should still be later than the first shard, so we can get the data
// from it.
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor);
+ responses.emplace_back(
+ kTestNss, CursorId(456), batch2, boost::none, boost::none, pbrtSecondCursor);
scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1594,20 +1602,20 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey)
std::vector<RemoteCursor> cursors;
// Create three cursors with empty initial batches. Each batch has a PBRT.
auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0],
- kTestShardHosts[0],
- CursorResponse(kTestNss, 123, {}, boost::none, pbrtFirstCursor)));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[0],
+ kTestShardHosts[0],
+ CursorResponse(kTestNss, 123, {}, boost::none, boost::none, pbrtFirstCursor)));
auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 1));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1],
- kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, pbrtSecondCursor)));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[1],
+ kTestShardHosts[1],
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, pbrtSecondCursor)));
auto pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 4));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2],
- kTestShardHosts[2],
- CursorResponse(kTestNss, 789, {}, boost::none, pbrtThirdCursor)));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[2],
+ kTestShardHosts[2],
+ CursorResponse(kTestNss, 789, {}, boost::none, boost::none, pbrtThirdCursor)));
params.setRemotes(std::move(cursors));
params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
params.setSort(change_stream_constants::kSortSpec);
@@ -1625,26 +1633,35 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey)
// each cursor to be updated in-order, so we keep the first and third PBRTs constant.
pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 3));
std::vector<BSONObj> emptyBatch = {};
- scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor});
- scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor});
- scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor);
ASSERT_FALSE(arm->ready());
// Advance the second cursor again, so that it surpasses the other two. The third cursor becomes
// the new high water mark.
pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6));
- scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor});
- scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor});
- scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtThirdCursor);
ASSERT_FALSE(arm->ready());
// Advance the third cursor such that the first cursor becomes the high water mark.
pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 7));
- scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor});
- scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor});
- scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtFirstCursor);
ASSERT_FALSE(arm->ready());
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 56d865508c6..6221b1809c4 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -243,7 +243,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
const PrivilegeVector& privileges) {
- ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
+ ClusterClientCursorParams params(
+ requestedNss, ReadPreferenceSetting::get(opCtx), ReadConcernArgs::get(opCtx));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = pipelineForMerging->getContext()->tailableMode;
@@ -267,7 +268,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
rpc::OpMsgReplyBuilder replyBuilder;
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
-
+ options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
CursorResponseBuilder responseBuilder(&replyBuilder, options);
bool stashedResult = false;
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 31d4211570f..6dc1ae8419e 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -181,6 +181,11 @@ public:
virtual boost::optional<ReadPreferenceSetting> getReadPreference() const = 0;
/**
+ * Returns the readConcern for this cursor.
+ */
+ virtual boost::optional<ReadConcernArgs> getReadConcern() const = 0;
+
+ /**
* Returns the creation date of the cursor.
*/
virtual Date_t getCreatedDate() const = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index c6f18579bf1..fefd913f8a7 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -209,6 +209,10 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
return _params.readPreference;
}
+boost::optional<ReadConcernArgs> ClusterClientCursorImpl::getReadConcern() const {
+ return _params.readConcern;
+}
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 91a9c4455f3..23f1c351fda 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -104,6 +104,8 @@ public:
boost::optional<ReadPreferenceSetting> getReadPreference() const final;
+ boost::optional<ReadConcernArgs> getReadConcern() const final;
+
Date_t getCreatedDate() const final;
Date_t getLastUseDate() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 958a909687c..e6f86dccade 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -153,4 +153,8 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorMock::getReadPreferenc
return boost::none;
}
+boost::optional<ReadConcernArgs> ClusterClientCursorMock::getReadConcern() const {
+ return boost::none;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index c86f66315b6..f72551a24da 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -92,6 +92,8 @@ public:
boost::optional<ReadPreferenceSetting> getReadPreference() const final;
+ boost::optional<ReadConcernArgs> getReadConcern() const final;
+
Date_t getCreatedDate() const final;
Date_t getLastUseDate() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 24ec074376f..cd6563f842c 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/tailable_mode.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/util/net/hostandport.h"
@@ -55,6 +56,8 @@ class TaskExecutor;
class OperationContext;
class RouterExecStage;
+using repl::ReadConcernArgs;
+
/**
* The resulting ClusterClientCursor will take ownership of the existing remote cursor, generating
* results based on the cursor's current state.
@@ -65,11 +68,15 @@ class RouterExecStage;
*/
struct ClusterClientCursorParams {
ClusterClientCursorParams(NamespaceString nss,
- boost::optional<ReadPreferenceSetting> readPref = boost::none)
+ boost::optional<ReadPreferenceSetting> readPref = boost::none,
+ boost::optional<ReadConcernArgs> readConcernArgs = boost::none)
: nsString(std::move(nss)) {
if (readPref) {
readPreference = std::move(readPref.get());
}
+ if (readConcernArgs) {
+ readConcern = std::move(readConcernArgs.get());
+ }
}
/**
@@ -143,6 +150,9 @@ struct ClusterClientCursorParams {
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
+ // Set if a readConcern must be respected throughout the lifetime of the cursor.
+ boost::optional<ReadConcernArgs> readConcern;
+
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index e3bbfb5d995..422c02406da 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -187,6 +187,12 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards(
qrToForward = std::make_unique<QueryRequest>(query.getQueryRequest());
}
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ if (readConcernArgs.wasAtClusterTimeSelected()) {
+ // If mongos selected atClusterTime or received it from client, transmit it to shard.
+ qrToForward->setReadConcern(readConcernArgs.toBSONInner());
+ }
+
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
std::vector<std::pair<ShardId, BSONObj>> requests;
for (const auto& shardId : shardIds) {
@@ -242,7 +248,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// Construct the query and parameters. Defer setting skip and limit here until
// we determine if the query is targeting multi-shards or a single shard below.
- ClusterClientCursorParams params(query.nss(), readPref);
+ ClusterClientCursorParams params(query.nss(), readPref, ReadConcernArgs::get(opCtx));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.batchSize = query.getQueryRequest().getEffectiveBatchSize();
params.tailableMode = query.getQueryRequest().getTailableMode();
@@ -429,6 +435,11 @@ Status setUpOperationContextStateForGetMore(OperationContext* opCtx,
ReadPreferenceSetting::get(opCtx) = *readPref;
}
+ if (auto readConcern = cursor->getReadConcern()) {
+ // Used to return "atClusterTime" in cursor replies to clients for snapshot reads.
+ ReadConcernArgs::get(opCtx) = *readConcern;
+ }
+
// If the originating command had a 'comment' field, we extract it and set it on opCtx. Note
// that if the 'getMore' command itself has a 'comment' field, we give precedence to it.
auto comment = cursor->getOriginatingCommand()["comment"];
@@ -835,9 +846,12 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
"waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch");
}
+ auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
return CursorResponse(request.nss,
idToReturn,
std::move(batch),
+ atClusterTime ? atClusterTime->asTimestamp()
+ : boost::optional<Timestamp>{},
startingFrom,
postBatchResumeToken,
boost::none,
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 8217de1e2ec..3b0bbf4e85c 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -179,8 +179,9 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
// This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an
// empty HostAndPort, and which has the 'partialResultsReturned' flag set to true.
- remoteCursors.push_back(
- {response.shardId.toString(), {}, {nss, CursorId{0}, {}, {}, {}, {}, true}});
+ remoteCursors.push_back({response.shardId.toString(),
+ {},
+ {nss, CursorId{0}, {}, {}, {}, {}, {}, true}});
}
}
return remoteCursors;
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 9877af451a2..b76a47cddbe 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -99,7 +99,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
return cmdResult;
}
- ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS());
+ ClusterClientCursorParams params(
+ incomingCursorResponse.getValue().getNSS(), boost::none, ReadConcernArgs::get(opCtx));
params.remotes.emplace_back();
auto& remoteCursor = params.remotes.back();
remoteCursor.setShardId(shardId.toString());
@@ -136,8 +137,10 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
CurOp::get(opCtx)->debug().cursorid = clusterCursorId.getValue();
- CursorResponse outgoingCursorResponse(
- requestedNss, clusterCursorId.getValue(), incomingCursorResponse.getValue().getBatch());
+ CursorResponse outgoingCursorResponse(requestedNss,
+ clusterCursorId.getValue(),
+ incomingCursorResponse.getValue().getBatch(),
+ incomingCursorResponse.getValue().getAtClusterTime());
return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
}