diff options
26 files changed, 586 insertions, 244 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml index cc733b396f5..8e623717c26 100644 --- a/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml +++ b/buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml @@ -195,7 +195,7 @@ selector: # Moves a chunk before continuing a transaction, which can lead to snapshot errors if the # CSRS failovers are sufficiently slow. - jstests/sharding/transactions_reject_writes_for_moved_chunks.js - - jstests/sharding/transactions_target_at_point_in_time.js + - jstests/sharding/snapshot_reads_target_at_point_in_time.js # Tests that rely on shards becoming aware of collection drops regardless of config stepdowns. # (SERVER-34760) - jstests/sharding/merge_requires_unique_index.js diff --git a/jstests/core/txns/no_snapshot_writes_outside_txn.js b/jstests/core/txns/no_snapshot_writes_outside_txn.js index 1626e7fa009..1694719e04b 100644 --- a/jstests/core/txns/no_snapshot_writes_outside_txn.js +++ b/jstests/core/txns/no_snapshot_writes_outside_txn.js @@ -1,7 +1,8 @@ /** * Verify that readConcern: snapshot is not permitted for writes outside transactions. * - * @tags: [uses_transactions] + * // TODO(SERVER-47915): remove assumes_against_mongod_not_mongos + * @tags: [uses_transactions, assumes_against_mongod_not_mongos] */ (function() { diff --git a/jstests/libs/global_snapshot_reads_util.js b/jstests/libs/global_snapshot_reads_util.js index da2630f8fd1..87bcc52ae2d 100644 --- a/jstests/libs/global_snapshot_reads_util.js +++ b/jstests/libs/global_snapshot_reads_util.js @@ -32,7 +32,7 @@ function verifyInvalidGetMoreAttempts(mainDb, collName, cursorId, lsid, txnNumbe ErrorCodes.NoSuchTransaction); } -var snapshotReadsCursorTest, snapshotReadsDistinctTest; +var snapshotReadsTest; (function() { function makeSnapshotReadConcern(atClusterTime) { @@ -43,16 +43,8 @@ function makeSnapshotReadConcern(atClusterTime) { return {level: "snapshot", atClusterTime: atClusterTime}; } -function awaitCommitted(db, ts) { - jsTestLog(`Wait for ${ts} to be committed on ${db.getMongo()}`); - assert.soonNoExcept(function() { - const replSetStatus = - assert.commandWorked(db.getSiblingDB("admin").runCommand({replSetGetStatus: 1})); - return timestampCmp(replSetStatus.optimes.readConcernMajorityOpTime.ts, ts) >= 0; - }, `${ts} was never committed on ${db.getMongo()}`); -} - -snapshotReadsCursorTest = function(testScenarioName, primaryDB, secondaryDB, collName) { +function snapshotReadsCursorTest( + {testScenarioName, primaryDB, secondaryDB, collName, awaitCommittedFn}) { const docs = [...Array(10).keys()].map((i) => ({"_id": i})); const commands = { @@ -93,7 +85,7 @@ snapshotReadsCursorTest = function(testScenarioName, primaryDB, secondaryDB, col assert(insertTimestamp); jsTestLog(`Inserted 10 documents at timestamp ${insertTimestamp}`); - awaitCommitted(db, insertTimestamp); + awaitCommittedFn(db, insertTimestamp); // Create a session if useCausalConsistency is true. let causalDb, sessionTimestamp; @@ -128,7 +120,7 @@ snapshotReadsCursorTest = function(testScenarioName, primaryDB, secondaryDB, col {update: collName, updates: [{q: {}, u: {$set: {x: true}}, multi: true}]})); jsTestLog(`Updated collection "${collName}" at timestamp ${res.operationTime}`); - awaitCommitted(db, res.operationTime); + awaitCommittedFn(db, res.operationTime); // Retrieve the rest of the read command's result set. res = assert.commandWorked( @@ -167,9 +159,10 @@ snapshotReadsCursorTest = function(testScenarioName, primaryDB, secondaryDB, col } } } -}; +} -snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, collName) { +function snapshotReadsDistinctTest( + {testScenarioName, primaryDB, secondaryDB, collName, awaitCommittedFn}) { // Note: this test sets documents' "x" field, the test above uses "_id". const docs = [...Array(10).keys()].map((i) => ({"x": i})); @@ -184,9 +177,10 @@ snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, c for (let useCausalConsistency of [false, true]) { for (let [db, readPreferenceMode] of [[primaryDB, "primary"], [secondaryDB, "secondary"]]) { - jsTestLog(`Testing "distinct" on collection ` + - `${collName} with read preference ${readPreferenceMode} and causal` + - ` consistency ${useCausalConsistency}`); + jsTestLog( + `Testing "distinct" with the ${testScenarioName} scenario on` + + ` collection ${collName} with read preference ${readPreferenceMode} and causal` + + ` consistency ${useCausalConsistency}`); let res = assert.commandWorked(primaryDB.runCommand({insert: collName, documents: docs})); @@ -194,7 +188,7 @@ snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, c assert(insertTimestamp); jsTestLog(`Inserted 10 documents at timestamp ${insertTimestamp}`); - awaitCommitted(db, insertTimestamp); + awaitCommittedFn(db, insertTimestamp); // Create a session if useCausalConsistency is true. let causalDb, sessionTimestamp; @@ -227,7 +221,7 @@ snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, c {update: collName, updates: [{q: {}, u: {$set: {x: 42}}, multi: true}]})); jsTestLog(`Updated collection "${collName}" at timestamp ${res.operationTime}`); - awaitCommitted(db, res.operationTime); + awaitCommittedFn(db, res.operationTime); // This read shows the updated docs. res = assert.commandWorked(causalDb.runCommand(distinctCommand(readPreferenceMode))); @@ -249,5 +243,40 @@ snapshotReadsDistinctTest = function(testScenarioName, primaryDB, secondaryDB, c assert.commandWorked(primaryDB[collName].remove({}, {writeConcern: {w: "majority"}})); } } +} + +/** + * Test non-transaction snapshot reads on primary and secondary. + * + * Pass two handles to the same database; either both connected to a mongos, or one connected to + * a replica set primary and the other connected to a replica set secondary. (The test will also + * pass $readPreference, so if the handles are connected to a mongos, then the reads will target + * primary/secondary shard servers.) + * + * For awaitCommittedFn, pass a function that waits for the last write to be committed on all + * secondaries. + * + * @param {testScenarioName} String used when logging progress + * @param {primaryDB} Database handle connected to a primary or mongos + * @param {secondaryDB} Database handle connected to a secondary or mongos + * @param {collName} String + * @param {awaitCommittedFn} A function with no arguments or return value + */ +snapshotReadsTest = function( + {testScenarioName, primaryDB, secondaryDB, collName, awaitCommittedFn}) { + snapshotReadsCursorTest({ + testScenarioName: testScenarioName, + primaryDB: primaryDB, + secondaryDB: secondaryDB, + collName: collName, + awaitCommittedFn: awaitCommittedFn + }); + snapshotReadsDistinctTest({ + testScenarioName: testScenarioName, + primaryDB: primaryDB, + secondaryDB: secondaryDB, + collName: collName, + awaitCommittedFn: awaitCommittedFn + }); }; })(); diff --git a/jstests/noPassthrough/readConcern_snapshot_mongos.js b/jstests/noPassthrough/readConcern_snapshot_mongos.js index 472da1af4de..b4bbb2a7680 100644 --- a/jstests/noPassthrough/readConcern_snapshot_mongos.js +++ b/jstests/noPassthrough/readConcern_snapshot_mongos.js @@ -13,57 +13,20 @@ function expectSuccessInTxnThenAbort(session, sessionConn, cmdObj) { assert.commandWorked(session.abortTransaction_forTesting()); } -// Runs the command as the first in a multi statement txn that is aborted right after, expecting -// failure with the given error code. -function expectFailInTxnThenAbort(session, sessionConn, expectedErrorCode, cmdObj) { - session.startTransaction(); - assert.commandFailedWithCode(sessionConn.runCommand(cmdObj), expectedErrorCode); - assert.commandFailedWithCode(session.abortTransaction_forTesting(), - ErrorCodes.NoSuchTransaction); -} - const dbName = "test"; const collName = "coll"; let st = new ShardingTest({shards: 1, rs: {nodes: 2}, config: 2, mongos: 1}); let testDB = st.getDB(dbName); -let coll = testDB.coll; // Insert data to create the collection. assert.commandWorked(testDB[collName].insert({x: 1})); flushRoutersAndRefreshShardMetadata(st, {ns: dbName + "." + collName, dbNames: [dbName]}); -// noPassthrough tests - -// readConcern 'snapshot' is not allowed outside session context. -assert.commandFailedWithCode(testDB.runCommand({find: collName, readConcern: {level: "snapshot"}}), - ErrorCodes.InvalidOptions); - let session = testDB.getMongo().startSession({causalConsistency: false}); let sessionDb = session.getDatabase(dbName); -// readConcern 'snapshot' is not allowed outside transaction context. -assert.commandFailedWithCode(sessionDb.runCommand({ - find: collName, - readConcern: {level: "snapshot"}, -}), - ErrorCodes.InvalidOptions); - -// readConcern 'snapshot' is not allowed with 'atClusterTime'. -let pingRes = assert.commandWorked(st.s0.adminCommand({ping: 1})); -assert(pingRes.hasOwnProperty("$clusterTime"), tojson(pingRes)); -assert(pingRes.$clusterTime.hasOwnProperty("clusterTime"), tojson(pingRes)); -const clusterTime = pingRes.$clusterTime.clusterTime; - -expectFailInTxnThenAbort(session, sessionDb, ErrorCodes.InvalidOptions, { - find: collName, - readConcern: {level: "snapshot", atClusterTime: clusterTime}, -}); - -// Passthrough tests. There are parts not implemented on mongod and mongos, they are tracked by -// separate jiras - // readConcern 'snapshot' is supported by insert on mongos in a transaction. expectSuccessInTxnThenAbort(session, sessionDb, { insert: collName, @@ -113,6 +76,11 @@ expectSuccessInTxnThenAbort(session, sessionDb, { readConcern: {level: "snapshot"}, }); +let pingRes = assert.commandWorked(st.s0.adminCommand({ping: 1})); +assert(pingRes.hasOwnProperty("$clusterTime"), tojson(pingRes)); +assert(pingRes.$clusterTime.hasOwnProperty("clusterTime"), tojson(pingRes)); +const clusterTime = pingRes.$clusterTime.clusterTime; + // readConcern 'snapshot' is allowed with 'afterClusterTime'. expectSuccessInTxnThenAbort(session, sessionDb, { find: collName, diff --git a/jstests/replsets/non_transaction_snapshot_reads.js b/jstests/replsets/non_transaction_snapshot_reads.js index 2be6b3233a0..ed4dc19a164 100644 --- a/jstests/replsets/non_transaction_snapshot_reads.js +++ b/jstests/replsets/non_transaction_snapshot_reads.js @@ -1,4 +1,5 @@ -/**Tests readConcern level snapshot outside of transactions. +/** + * Tests readConcern level snapshot outside of transactions. * * @tags: [ * requires_fcv_46, @@ -12,15 +13,22 @@ load("jstests/libs/global_snapshot_reads_util.js"); // TODO(SERVER-47672): Use minSnapshotHistoryWindowInSeconds instead. const options = { - setParameter: "maxTargetSnapshotHistoryWindowInSeconds=600", + setParameter: {maxTargetSnapshotHistoryWindowInSeconds: 600} }; const replSet = new ReplSetTest({nodes: 3, nodeOptions: options}); replSet.startSet(); replSet.initiateWithHighElectionTimeout(); const primaryDB = replSet.getPrimary().getDB('test'); const secondaryDB = replSet.getSecondary().getDB('test'); -snapshotReadsCursorTest(jsTestName(), primaryDB, secondaryDB, "test"); -snapshotReadsDistinctTest(jsTestName(), primaryDB, secondaryDB, "test"); +snapshotReadsTest({ + testScenarioName: jsTestName(), + primaryDB: primaryDB, + secondaryDB: secondaryDB, + collName: "test", + awaitCommittedFn: () => { + replSet.awaitLastOpCommitted(); + } +}); // Ensure "atClusterTime" is omitted from a regular (non-snapshot) reads. primaryDB["collection"].insertOne({}); diff --git a/jstests/sharding/sharding_non_transaction_snapshot_read.js b/jstests/sharding/sharding_non_transaction_snapshot_read.js new file mode 100644 index 00000000000..a7b3d0309be --- /dev/null +++ b/jstests/sharding/sharding_non_transaction_snapshot_read.js @@ -0,0 +1,143 @@ +/** + * Tests readConcern level snapshot outside of transactions. + * + * @tags: [ + * requires_fcv_46, + * requires_majority_read_concern, + * requires_find_command + * ] + */ + +(function() { +"use strict"; + +load("jstests/libs/global_snapshot_reads_util.js"); +load("jstests/sharding/libs/sharded_transactions_helpers.js"); + +// TODO(SERVER-47672): Use minSnapshotHistoryWindowInSeconds instead. +const configOptions = { + setParameter: {maxTargetSnapshotHistoryWindowInSeconds: 600} +}; + +const dbName = "test"; +const shardedCollName = "shardedColl"; +const unshardedCollName = "unshardedColl"; + +function setUpAllScenarios(st) { + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + assert.commandWorked(st.s.adminCommand( + {shardCollection: st.s.getDB(dbName)[shardedCollName] + "", key: {_id: 1}})); +} + +let shardingScenarios = { + singleShard: { + compatibleCollections: [shardedCollName, unshardedCollName], + setUp: function() { + const st = new ShardingTest({ + mongos: 1, + config: 1, + shards: {rs0: {nodes: 2}}, + other: {configOptions: configOptions} + }); + setUpAllScenarios(st); + return st; + } + }, + multiShardAllShardReads: { + compatibleCollections: [shardedCollName], + setUp: function() { + let st = new ShardingTest({ + shards: { + rs0: {nodes: 2}, + rs1: {nodes: 2}, + rs2: {nodes: 2}, + }, + mongos: 1, + config: 1, + other: {configOptions: configOptions} + }); + setUpAllScenarios(st); + const mongos = st.s0; + const ns = dbName + '.' + shardedCollName; + + // snapshotReadsTest() inserts ids 0-9 and tries snapshot reads on the collection. + assert.commandWorked(st.splitAt(ns, {_id: 4})); + assert.commandWorked(st.splitAt(ns, {_id: 7})); + + assert.commandWorked( + mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard0.shardName})); + assert.commandWorked( + mongos.adminCommand({moveChunk: ns, find: {_id: 4}, to: st.shard1.shardName})); + assert.commandWorked( + mongos.adminCommand({moveChunk: ns, find: {_id: 7}, to: st.shard2.shardName})); + + assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName})); + assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName})); + assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard2.shardName})); + + flushRoutersAndRefreshShardMetadata(st, {ns}); + + return st; + } + }, + // Only two out of three shards have documents. + multiShardSomeShardReads: { + compatibleCollections: [shardedCollName], + setUp: function() { + let st = new ShardingTest({ + shards: { + rs0: {nodes: 2}, + rs1: {nodes: 2}, + rs2: {nodes: 2}, + }, + mongos: 1, + config: 1, + other: {configOptions: configOptions} + }); + setUpAllScenarios(st); + const mongos = st.s0; + const ns = dbName + '.' + shardedCollName; + + // snapshotReadsTest() inserts ids 0-9 and tries snapshot reads on the collection. + assert.commandWorked(st.splitAt(ns, {_id: 5})); + assert.commandWorked( + mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard1.shardName})); + assert.commandWorked( + mongos.adminCommand({moveChunk: ns, find: {_id: 7}, to: st.shard2.shardName})); + + assert.eq(0, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName})); + assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName})); + assert.eq(1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard2.shardName})); + + flushRoutersAndRefreshShardMetadata(st, {ns}); + + return st; + } + } +}; + +for (let [scenarioName, scenario] of Object.entries(shardingScenarios)) { + scenario.compatibleCollections.forEach(function(collName) { + jsTestLog(`Run scenario ${scenarioName} with collection ${collName}`); + let st = scenario.setUp(); + + function awaitCommittedFn() { + for (let i = 0; st['rs' + i] !== undefined; i++) { + st['rs' + i].awaitLastOpCommitted(); + } + } + + // Pass the same DB handle as "primaryDB" and "secondaryDB" params; the test functions will + // send readPreference to mongos to target primary/secondary shard servers. + let db = st.s.getDB(dbName); + snapshotReadsTest({ + testScenarioName: scenarioName, + primaryDB: db, + secondaryDB: db, + collName: collName, + awaitCommittedFn: awaitCommittedFn + }); + st.stop(); + }); +} +})(); diff --git a/jstests/sharding/snapshot_reads_target_at_point_in_time.js b/jstests/sharding/snapshot_reads_target_at_point_in_time.js new file mode 100644 index 00000000000..0828c5b9a75 --- /dev/null +++ b/jstests/sharding/snapshot_reads_target_at_point_in_time.js @@ -0,0 +1,203 @@ +// Verifies mongos uses a versioned routing table to target subsequent requests for snapshot reads. +// +// @tags: [ +// requires_find_command, +// requires_sharding, +// uses_multi_shard_transaction, +// uses_transactions, +// requires_fcv_46 +// ] +(function() { +"use strict"; + +load("jstests/sharding/libs/sharded_transactions_helpers.js"); + +function expectChunks(st, ns, chunks) { + for (let i = 0; i < chunks.length; i++) { + assert.eq(chunks[i], + st.s.getDB("config").chunks.count({ns: ns, shard: st["shard" + i].shardName}), + "unexpected number of chunks on shard " + i); + } +} + +const dbName = "test"; +const collName = "foo"; +const ns = dbName + '.' + collName; + +const st = new ShardingTest({ + shards: 3, + mongos: 1, + config: 1, + other: { + rs0: {nodes: 2}, + rs1: {nodes: 2}, + rs2: {nodes: 2}, + // Disable expiring old chunk history to ensure the transactions are able to read from a + // shard that has donated a chunk, even if the migration takes longer than the amount of + // time for which a chunk's history is normally stored (see SERVER-39763). + configOptions: + {setParameter: {"failpoint.skipExpiringOldChunkHistory": "{mode: 'alwaysOn'}"}} + } +}); + +// Set up one sharded collection with 2 chunks, both on the primary shard. + +assert.commandWorked( + st.s.getDB(dbName)[collName].insert({_id: -5}, {writeConcern: {w: "majority"}})); +assert.commandWorked( + st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}})); + +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +st.ensurePrimaryShard(dbName, st.shard0.shardName); + +assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); +assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); + +expectChunks(st, ns, [2, 0, 0]); + +// Temporarily move a chunk to Shard2, to avoid picking a global read timestamp before the +// sharding metadata cache collections are created. +assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName})); + +assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName})); +expectChunks(st, ns, [1, 1, 0]); + +// First command targets the first chunk, the second command targets the second chunk. +const kCommandTestCases = [ + { + name: "aggregate", + commands: [ + {aggregate: collName, pipeline: [{$match: {_id: -5}}], cursor: {}}, + {aggregate: collName, pipeline: [{$match: {_id: 5}}], cursor: {}} + ] + }, + { + name: "find", + commands: [{find: collName, filter: {_id: -5}}, {find: collName, filter: {_id: 5}}] + } +]; + +const TestMode = { + TRANSACTION: 'TRANSACTION', + CAUSAL_CONSISTENCY: 'CAUSAL_CONSISTENCY', + SNAPSHOT: 'SNAPSHOT', + SNAPSHOT_AT_CLUSTER_TIME: 'SNAPSHOT_AT_CLUSTER_TIME' +}; + +function runTest(testCase, testMode, readPreferenceMode) { + const cmdName = testCase.name; + // Clone commands so we can modify readConcern and readPreference. + const targetChunk1Cmd = Object.assign({}, testCase.commands[0]); + const targetChunk2Cmd = Object.assign({}, testCase.commands[1]); + targetChunk1Cmd["$readPreference"] = {mode: readPreferenceMode}; + targetChunk2Cmd["$readPreference"] = {mode: readPreferenceMode}; + + jsTestLog(`Testing ${cmdName} in mode ${testMode}`); + + expectChunks(st, ns, [1, 1, 0]); + + st.refreshCatalogCacheForNs(st.s, ns); + + let session, db; + switch (testMode) { + case TestMode.TRANSACTION: + session = st.s.startSession({causalConsistency: false}); + session.startTransaction({readConcern: {level: "snapshot"}}); + db = session.getDatabase(dbName); + break; + case TestMode.CAUSAL_CONSISTENCY: + session = st.s.startSession({causalConsistency: true}); + db = session.getDatabase(dbName); + db[collName].findOne(); // Establish a timestamp in the session. + break; + case TestMode.SNAPSHOT: + db = st.s.getDB(dbName); + targetChunk1Cmd.readConcern = targetChunk2Cmd.readConcern = {level: "snapshot"}; + break; + case TestMode.SNAPSHOT_AT_CLUSTER_TIME: + db = st.s.getDB(dbName); + const opTime = st.s.getDB(dbName).runCommand({ping: 1}).operationTime; + targetChunk1Cmd.readConcern = {level: "snapshot", atClusterTime: opTime}; + break; + } + + // Establish a read timestamp. + let res = assert.commandWorked(db.runCommand(targetChunk1Cmd)); + assert.sameMembers([{_id: -5}], + res.cursor.firstBatch, + `expected to find document in first chunk, command` + + ` ${tojson(targetChunk1Cmd)} returned ${tojson(res)}`); + + const targetChunk1CmdTimestamp = res.cursor.atClusterTime; + jsTestLog(`Chunk 1 command replied with timestamp ${targetChunk1CmdTimestamp}`); + + // Move a chunk from Shard1 to Shard2 outside of the transaction, and update it. This will + // happen at a later logical time than the read timestamp. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName})); + + res = assert.commandWorked(st.s.getDB(dbName).runCommand({ + update: collName, + updates: [{q: {_id: 5}, u: {$set: {x: true}}}], + writeConcern: {w: "majority"} + })); + jsTestLog(`Updated chunk 2 at timestamp ${tojson(res.operationTime)}`); + st.refreshCatalogCacheForNs(st.s, ns); + + if (testMode === TestMode.SNAPSHOT_AT_CLUSTER_TIME) { + targetChunk2Cmd.readConcern = {level: "snapshot", atClusterTime: targetChunk1CmdTimestamp}; + } + + res = assert.commandWorked(db.runCommand(targetChunk2Cmd)); + + switch (testMode) { + case TestMode.CAUSAL_CONSISTENCY: + case TestMode.SNAPSHOT: + // We may or may not see the result of the update above. + assert.eq(1, + res.cursor.firstBatch.length, + `expected to find document in second chunk, command` + + ` ${tojson(targetChunk2Cmd)} returned ${tojson(res)}`); + assert.eq(5, + res.cursor.firstBatch[0]._id, + `expected to find {_id: 5} in second chunk, command` + + ` ${tojson(targetChunk2Cmd)} returned ${tojson(res)}`); + break; + case TestMode.TRANSACTION: + case TestMode.SNAPSHOT_AT_CLUSTER_TIME: + // Must not see the update's result. + assert.sameMembers([{_id: 5}], + res.cursor.firstBatch, + `expected to find document in second chunk, command` + + ` ${tojson(targetChunk2Cmd)} returned ${tojson(res)}`); + break; + } + + if (testMode === TestMode.TRANSACTION) { + assert.commandWorked(session.commitTransaction_forTesting()); + } + + // Move the chunk back to Shard1 and clear updated field for the next iteration. + assert.commandWorked(st.s.getDB(dbName).runCommand({ + update: collName, + updates: [{q: {_id: 5}, u: {$unset: {x: true}}}], + writeConcern: {w: "majority"} + })); + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName})); +} + +for (let testCase of kCommandTestCases) { + for (let testMode of Object.values(TestMode)) { + for (let readPreferenceMode of ["primary", "secondary"]) { + if (readPreferenceMode === "secondary" && testMode === TestMode.TRANSACTION) { + // Transactions aren't supported on secondaries. + continue; + } + + runTest(testCase, testMode, readPreferenceMode); + } + } +} +st.stop(); +})(); diff --git a/jstests/sharding/transactions_target_at_point_in_time.js b/jstests/sharding/transactions_target_at_point_in_time.js deleted file mode 100644 index aef0800aa8e..00000000000 --- a/jstests/sharding/transactions_target_at_point_in_time.js +++ /dev/null @@ -1,125 +0,0 @@ -// Verifies mongos uses a versioned routing table to target subsequent requests in transactions with -// snapshot level read concern. -// -// @tags: [ -// requires_find_command, -// requires_sharding, -// uses_multi_shard_transaction, -// uses_transactions, -// ] -(function() { -"use strict"; - -load("jstests/sharding/libs/sharded_transactions_helpers.js"); - -function expectChunks(st, ns, chunks) { - for (let i = 0; i < chunks.length; i++) { - assert.eq(chunks[i], - st.s.getDB("config").chunks.count({ns: ns, shard: st["shard" + i].shardName}), - "unexpected number of chunks on shard " + i); - } -} - -const dbName = "test"; -const collName = "foo"; -const ns = dbName + '.' + collName; - -const st = new ShardingTest({ - shards: 3, - mongos: 1, - config: 1, - other: { - // Disable expiring old chunk history to ensure the transactions are able to read from a - // shard that has donated a chunk, even if the migration takes longer than the amount of - // time for which a chunk's history is normally stored (see SERVER-39763). - configOptions: - {setParameter: {"failpoint.skipExpiringOldChunkHistory": "{mode: 'alwaysOn'}"}} - } -}); - -// Set up one sharded collection with 2 chunks, both on the primary shard. - -assert.commandWorked( - st.s.getDB(dbName)[collName].insert({_id: -5}, {writeConcern: {w: "majority"}})); -assert.commandWorked( - st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}})); - -assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); -st.ensurePrimaryShard(dbName, st.shard0.shardName); - -assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); -assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); - -expectChunks(st, ns, [2, 0, 0]); - -// Temporarily move a chunk to Shard2, to avoid picking a global read timestamp before the -// sharding metadata cache collections are created. -assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName})); - -assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName})); -expectChunks(st, ns, [1, 1, 0]); - -// First command targets the first chunk, the second command targets the second chunk. -const kCommandTestCases = [ - { - name: "aggregate", - commandFuncs: [ - (coll) => coll.aggregate({$match: {_id: -5}}).itcount(), - (coll) => coll.aggregate({$match: {_id: 5}}).itcount(), - ] - }, - { - name: "find", - commandFuncs: [ - (coll) => coll.find({_id: -5}).itcount(), - (coll) => coll.find({_id: 5}).itcount(), - ] - } -]; - -function runTest(testCase) { - const cmdName = testCase.name; - const targetChunk1Func = testCase.commandFuncs[0]; - const targetChunk2Func = testCase.commandFuncs[1]; - - jsTestLog("Testing " + cmdName); - - expectChunks(st, ns, [1, 1, 0]); - - st.refreshCatalogCacheForNs(st.s, ns); - - const session = st.s.startSession(); - const sessionDB = session.getDatabase(dbName); - const sessionColl = sessionDB[collName]; - - session.startTransaction({readConcern: {level: "snapshot"}}); - - // Start a transaction on Shard0 which will select and pin a global read timestamp. - assert.eq(targetChunk1Func(sessionColl), - 1, - "expected to find document in first chunk, cmd: " + cmdName); - - // Move a chunk from Shard1 to Shard2 outside of the transaction. This will happen at a - // later logical time than the transaction's read timestamp. - assert.commandWorked( - st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName})); - - // Target a document in the chunk that was moved. The router should get a stale shard - // version from Shard1 then retry on Shard1 and see the document. - st.refreshCatalogCacheForNs(st.s, ns); - - assert.eq(targetChunk2Func(sessionColl), - 1, - "expected to find document in second chunk, cmd: " + cmdName); - - assert.commandWorked(session.commitTransaction_forTesting()); - - // Move the chunk back to Shard1 for the next iteration. - assert.commandWorked( - st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName})); -} - -kCommandTestCases.forEach(runTest); - -st.stop(); -})(); diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 3cd99d7e806..4b3eea43ee8 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -129,6 +129,7 @@ void appendGetMoreResponseObject(long long cursorId, CursorResponse::CursorResponse(NamespaceString nss, CursorId cursorId, std::vector<BSONObj> batch, + boost::optional<Timestamp> atClusterTime, boost::optional<long long> numReturnedSoFar, boost::optional<BSONObj> postBatchResumeToken, boost::optional<BSONObj> writeConcernError, @@ -136,6 +137,7 @@ CursorResponse::CursorResponse(NamespaceString nss, : _nss(std::move(nss)), _cursorId(cursorId), _batch(std::move(batch)), + _atClusterTime(std::move(atClusterTime)), _numReturnedSoFar(numReturnedSoFar), _postBatchResumeToken(std::move(postBatchResumeToken)), _writeConcernError(std::move(writeConcernError)), @@ -235,6 +237,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo << postBatchResumeTokenElem.type()}; } + auto atClusterTimeElem = cursorObj[kAtClusterTimeField]; + if (atClusterTimeElem && atClusterTimeElem.type() != BSONType::bsonTimestamp) { + return {ErrorCodes::BadValue, + str::stream() << kAtClusterTimeField + << " format is invalid; expected Timestamp, but found: " + << atClusterTimeElem.type()}; + } + auto partialResultsReturned = cursorObj[kPartialResultsReturnedField]; if (partialResultsReturned) { @@ -257,6 +267,7 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo return {{NamespaceString(fullns), cursorId, std::move(batch), + atClusterTimeElem ? atClusterTimeElem.timestamp() : boost::optional<Timestamp>{}, boost::none, postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned() : boost::optional<BSONObj>{}, @@ -283,6 +294,10 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken); } + if (_atClusterTime) { + cursorBuilder.append(kAtClusterTimeField, *_atClusterTime); + } + if (_partialResultsReturned) { cursorBuilder.append(kPartialResultsReturnedField, true); } diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 5e03d2c5a3e..7e7633b14c1 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -50,6 +50,9 @@ class CursorResponseBuilder { public: /** * Structure used to configure the CursorResponseBuilder. + * + * If we selected atClusterTime or received it from the client, transmit it back to the client + * in the cursor reply document by setting it here. */ struct Options { bool isInitialResponse = false; @@ -192,6 +195,7 @@ public: CursorResponse(NamespaceString nss, CursorId cursorId, std::vector<BSONObj> batch, + boost::optional<Timestamp> atClusterTime = boost::none, boost::optional<long long> numReturnedSoFar = boost::none, boost::optional<BSONObj> postBatchResumeToken = boost::none, boost::optional<BSONObj> writeConcernError = boost::none, @@ -232,6 +236,10 @@ public: return _writeConcernError; } + boost::optional<Timestamp> getAtClusterTime() const { + return _atClusterTime; + } + bool getPartialResultsReturned() const { return _partialResultsReturned; } @@ -249,6 +257,7 @@ private: NamespaceString _nss; CursorId _cursorId; std::vector<BSONObj> _batch; + boost::optional<Timestamp> _atClusterTime; boost::optional<long long> _numReturnedSoFar; boost::optional<BSONObj> _postBatchResumeToken; boost::optional<BSONObj> _writeConcernError; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index db1a70b6a74..a3bc5449ac7 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -300,6 +300,7 @@ TEST(CursorResponseTest, toBSONPartialResultsReturned) { boost::none, boost::none, boost::none, + boost::none, true); BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse); BSONObj expectedResponse = BSON( @@ -347,8 +348,12 @@ TEST(CursorResponseTest, serializePostBatchResumeToken) { std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; auto postBatchResumeToken = ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson(); - CursorResponse response( - NamespaceString("db.coll"), CursorId(123), batch, boost::none, postBatchResumeToken); + CursorResponse response(NamespaceString("db.coll"), + CursorId(123), + batch, + boost::none, + boost::none, + postBatchResumeToken); auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse); ASSERT_BSONOBJ_EQ(serialized, BSON("cursor" << BSON("id" << CursorId(123) << "ns" diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 62b1bf834f9..f7f24aa31c2 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -267,6 +267,7 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx, BSONObjBuilder output; bool seenReadConcern = false; bool seenWriteConcern = false; + const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); for (const auto& elem : cmdObj) { const auto name = elem.fieldNameStringData(); if (appendRC && name == repl::ReadConcernArgs::kReadConcernFieldName) { @@ -276,13 +277,18 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx, seenWriteConcern = true; } if (!output.hasField(name)) { - output.append(elem); + // If mongos selected atClusterTime, forward it to the shard. + if (name == repl::ReadConcernArgs::kReadConcernFieldName && + readConcernArgs.wasAtClusterTimeSelected()) { + output.appendElements(readConcernArgs.toBSON()); + } else { + output.append(elem); + } } } // Finally, add the new read/write concern. if (appendRC && !seenReadConcern) { - const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); output.appendElements(readConcernArgs.toBSON()); } if (appendWC && !seenWriteConcern) { @@ -721,6 +727,12 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( auto catalogCache = Grid::get(opCtx)->catalogCache(); invariant(catalogCache); + auto argsAtClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); + if (argsAtClusterTime) { + return catalogCache->getCollectionRoutingInfoAt( + opCtx, nss, argsAtClusterTime->asTimestamp()); + } + // Return the latest routing table if not running in a transaction with snapshot level read // concern. auto txnRouter = TransactionRouter::get(opCtx); diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index 0e02d3169a1..32d0ffe4b1e 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -250,6 +250,11 @@ public: } result.appendArray("values", b.obj()); + // If mongos selected atClusterTime or received it from client, transmit it back. + if (repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()) { + result.append("atClusterTime"_sd, + repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()->asTimestamp()); + } return true; } diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index 66cdf67b00f..7ebab369391 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -230,6 +230,7 @@ public: // Build the response document. CursorResponseBuilder::Options options; options.isInitialResponse = true; + options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); CursorResponseBuilder firstBatch(result, options); for (const auto& obj : batch) { firstBatch.append(obj); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index d1dd14d1a70..6b70018dfd8 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -252,10 +252,21 @@ void execCommandClient(OperationContext* opCtx, } auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { - uassert(ErrorCodes::InvalidOptions, - "read concern snapshot is only supported in a multi-statement transaction", - TransactionRouter::get(opCtx)); + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && + !TransactionRouter::get(opCtx) && !readConcernArgs.getArgsAtClusterTime()) { + // Select the latest known clusterTime as the atClusterTime for snapshot reads outside + // of transactions. + auto atClusterTime = [&] { + auto latestKnownClusterTime = LogicalClock::get(opCtx)->getClusterTime(); + // If the user passed afterClusterTime, the chosen time must be greater than or + // equal to it. + auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime(); + if (afterClusterTime && *afterClusterTime > latestKnownClusterTime) { + return afterClusterTime->asTimestamp(); + } + return latestKnownClusterTime.asTimestamp(); + }(); + readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime); } // attach tracking @@ -401,12 +412,6 @@ void runCommand(OperationContext* opCtx, return; } - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { - uassert(ErrorCodes::InvalidOptions, - "read concern snapshot is not supported with atClusterTime on mongos", - !readConcernArgs.getArgsAtClusterTime()); - } - boost::optional<RouterOperationContextSession> routerSession; try { rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 49ca39fed20..ff930da893d 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -1348,7 +1348,8 @@ DEATH_TEST_REGEX_F( cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], - CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor))); + CursorResponse( + kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor))); // Create a second cursor whose initial batch has no PBRT. cursors.push_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); @@ -1381,7 +1382,8 @@ DEATH_TEST_REGEX_F(AsyncResultsMergerTest, cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], - CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor))); + CursorResponse( + kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor))); params.setRemotes(std::move(cursors)); params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); params.setSort(change_stream_constants::kSortSpec); @@ -1409,11 +1411,13 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfRemoteHasLowerPostB cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], - CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor))); + CursorResponse( + kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor))); auto tooLowPBRT = makePostBatchResumeToken(Timestamp(1, 2)); - cursors.push_back(makeRemoteCursor(kTestShardIds[1], - kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], + kTestShardHosts[1], + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT))); params.setRemotes(std::move(cursors)); params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); params.setSort(change_stream_constants::kSortSpec); @@ -1459,7 +1463,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) << firstDocSortKey.firstElement().String() << "'}]}"); std::vector<BSONObj> batch1 = {firstCursorResponse}; auto firstDoc = batch1.front(); - responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor); + responses.emplace_back( + kTestNss, CursorId(123), batch1, boost::none, boost::none, pbrtFirstCursor); scheduleNetworkResponses(std::move(responses)); // Should be ready now. @@ -1471,7 +1476,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) newCursors.push_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT))); + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT))); arm->addNewShardCursors(std::move(newCursors)); // Now shouldn't be ready, our guarantee from the new shard isn't sufficiently advanced. @@ -1488,7 +1493,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) << secondDocSortKey.firstElement().String() << "'}]}"); std::vector<BSONObj> batch2 = {secondCursorResponse}; auto secondDoc = batch2.front(); - responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor); + responses.emplace_back( + kTestNss, CursorId(456), batch2, boost::none, boost::none, pbrtSecondCursor); scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1536,7 +1542,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting << "', documentKey: {_id: 1}}, $sortKey: [{_data: '" << firstDocSortKey.firstElement().String() << "'}]}"); std::vector<BSONObj> batch1 = {firstCursorResponse}; - responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor); + responses.emplace_back( + kTestNss, CursorId(123), batch1, boost::none, boost::none, pbrtFirstCursor); scheduleNetworkResponses(std::move(responses)); // Should be ready now. @@ -1548,7 +1555,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting newCursors.push_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT))); + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT))); arm->addNewShardCursors(std::move(newCursors)); // Now shouldn't be ready, our guarantee from the new shard isn't sufficiently advanced. @@ -1566,7 +1573,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting std::vector<BSONObj> batch2 = {secondCursorResponse}; // The last observed time should still be later than the first shard, so we can get the data // from it. - responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor); + responses.emplace_back( + kTestNss, CursorId(456), batch2, boost::none, boost::none, pbrtSecondCursor); scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1594,20 +1602,20 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey) std::vector<RemoteCursor> cursors; // Create three cursors with empty initial batches. Each batch has a PBRT. auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], - kTestShardHosts[0], - CursorResponse(kTestNss, 123, {}, boost::none, pbrtFirstCursor))); + cursors.push_back(makeRemoteCursor( + kTestShardIds[0], + kTestShardHosts[0], + CursorResponse(kTestNss, 123, {}, boost::none, boost::none, pbrtFirstCursor))); auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 1)); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], - kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, pbrtSecondCursor))); + cursors.push_back(makeRemoteCursor( + kTestShardIds[1], + kTestShardHosts[1], + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, pbrtSecondCursor))); auto pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 4)); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], - kTestShardHosts[2], - CursorResponse(kTestNss, 789, {}, boost::none, pbrtThirdCursor))); + cursors.push_back(makeRemoteCursor( + kTestShardIds[2], + kTestShardHosts[2], + CursorResponse(kTestNss, 789, {}, boost::none, boost::none, pbrtThirdCursor))); params.setRemotes(std::move(cursors)); params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); params.setSort(change_stream_constants::kSortSpec); @@ -1625,26 +1633,35 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey) // each cursor to be updated in-order, so we keep the first and third PBRTs constant. pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 3)); std::vector<BSONObj> emptyBatch = {}; - scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor}); - scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor}); - scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor); ASSERT_FALSE(arm->ready()); // Advance the second cursor again, so that it surpasses the other two. The third cursor becomes // the new high water mark. pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6)); - scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor}); - scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor}); - scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtThirdCursor); ASSERT_FALSE(arm->ready()); // Advance the third cursor such that the first cursor becomes the high water mark. pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 7)); - scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor}); - scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor}); - scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtFirstCursor); ASSERT_FALSE(arm->ready()); diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 56d865508c6..6221b1809c4 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -243,7 +243,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, const PrivilegeVector& privileges) { - ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); + ClusterClientCursorParams params( + requestedNss, ReadPreferenceSetting::get(opCtx), ReadConcernArgs::get(opCtx)); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.tailableMode = pipelineForMerging->getContext()->tailableMode; @@ -267,7 +268,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, rpc::OpMsgReplyBuilder replyBuilder; CursorResponseBuilder::Options options; options.isInitialResponse = true; - + options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); CursorResponseBuilder responseBuilder(&replyBuilder, options); bool stashedResult = false; diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 31d4211570f..6dc1ae8419e 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -181,6 +181,11 @@ public: virtual boost::optional<ReadPreferenceSetting> getReadPreference() const = 0; /** + * Returns the readConcern for this cursor. + */ + virtual boost::optional<ReadConcernArgs> getReadConcern() const = 0; + + /** * Returns the creation date of the cursor. */ virtual Date_t getCreatedDate() const = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index c6f18579bf1..fefd913f8a7 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -209,6 +209,10 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc return _params.readPreference; } +boost::optional<ReadConcernArgs> ClusterClientCursorImpl::getReadConcern() const { + return _params.readConcern; +} + std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor, diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 91a9c4455f3..23f1c351fda 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -104,6 +104,8 @@ public: boost::optional<ReadPreferenceSetting> getReadPreference() const final; + boost::optional<ReadConcernArgs> getReadConcern() const final; + Date_t getCreatedDate() const final; Date_t getLastUseDate() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 958a909687c..e6f86dccade 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -153,4 +153,8 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorMock::getReadPreferenc return boost::none; } +boost::optional<ReadConcernArgs> ClusterClientCursorMock::getReadConcern() const { + return boost::none; +} + } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index c86f66315b6..f72551a24da 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -92,6 +92,8 @@ public: boost::optional<ReadPreferenceSetting> getReadPreference() const final; + boost::optional<ReadConcernArgs> getReadConcern() const final; + Date_t getCreatedDate() const final; Date_t getLastUseDate() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 24ec074376f..cd6563f842c 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/tailable_mode.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/s/client/shard.h" #include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/util/net/hostandport.h" @@ -55,6 +56,8 @@ class TaskExecutor; class OperationContext; class RouterExecStage; +using repl::ReadConcernArgs; + /** * The resulting ClusterClientCursor will take ownership of the existing remote cursor, generating * results based on the cursor's current state. @@ -65,11 +68,15 @@ class RouterExecStage; */ struct ClusterClientCursorParams { ClusterClientCursorParams(NamespaceString nss, - boost::optional<ReadPreferenceSetting> readPref = boost::none) + boost::optional<ReadPreferenceSetting> readPref = boost::none, + boost::optional<ReadConcernArgs> readConcernArgs = boost::none) : nsString(std::move(nss)) { if (readPref) { readPreference = std::move(readPref.get()); } + if (readConcernArgs) { + readConcern = std::move(readConcernArgs.get()); + } } /** @@ -143,6 +150,9 @@ struct ClusterClientCursorParams { // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; + // Set if a readConcern must be respected throughout the lifetime of the cursor. + boost::optional<ReadConcernArgs> readConcern; + // Whether the client indicated that it is willing to receive partial results in the case of an // unreachable host. bool isAllowPartialResults = false; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index e3bbfb5d995..422c02406da 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -187,6 +187,12 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( qrToForward = std::make_unique<QueryRequest>(query.getQueryRequest()); } + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + if (readConcernArgs.wasAtClusterTimeSelected()) { + // If mongos selected atClusterTime or received it from client, transmit it to shard. + qrToForward->setReadConcern(readConcernArgs.toBSONInner()); + } + auto shardRegistry = Grid::get(opCtx)->shardRegistry(); std::vector<std::pair<ShardId, BSONObj>> requests; for (const auto& shardId : shardIds) { @@ -242,7 +248,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // Construct the query and parameters. Defer setting skip and limit here until // we determine if the query is targeting multi-shards or a single shard below. - ClusterClientCursorParams params(query.nss(), readPref); + ClusterClientCursorParams params(query.nss(), readPref, ReadConcernArgs::get(opCtx)); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.batchSize = query.getQueryRequest().getEffectiveBatchSize(); params.tailableMode = query.getQueryRequest().getTailableMode(); @@ -429,6 +435,11 @@ Status setUpOperationContextStateForGetMore(OperationContext* opCtx, ReadPreferenceSetting::get(opCtx) = *readPref; } + if (auto readConcern = cursor->getReadConcern()) { + // Used to return "atClusterTime" in cursor replies to clients for snapshot reads. + ReadConcernArgs::get(opCtx) = *readConcern; + } + // If the originating command had a 'comment' field, we extract it and set it on opCtx. Note // that if the 'getMore' command itself has a 'comment' field, we give precedence to it. auto comment = cursor->getOriginatingCommand()["comment"]; @@ -835,9 +846,12 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"); } + auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); return CursorResponse(request.nss, idToReturn, std::move(batch), + atClusterTime ? atClusterTime->asTimestamp() + : boost::optional<Timestamp>{}, startingFrom, postBatchResumeToken, boost::none, diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 8217de1e2ec..3b0bbf4e85c 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -179,8 +179,9 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, } // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true. - remoteCursors.push_back( - {response.shardId.toString(), {}, {nss, CursorId{0}, {}, {}, {}, {}, true}}); + remoteCursors.push_back({response.shardId.toString(), + {}, + {nss, CursorId{0}, {}, {}, {}, {}, {}, true}}); } } return remoteCursors; diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 9877af451a2..b76a47cddbe 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -99,7 +99,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, return cmdResult; } - ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS()); + ClusterClientCursorParams params( + incomingCursorResponse.getValue().getNSS(), boost::none, ReadConcernArgs::get(opCtx)); params.remotes.emplace_back(); auto& remoteCursor = params.remotes.back(); remoteCursor.setShardId(shardId.toString()); @@ -136,8 +137,10 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, CurOp::get(opCtx)->debug().cursorid = clusterCursorId.getValue(); - CursorResponse outgoingCursorResponse( - requestedNss, clusterCursorId.getValue(), incomingCursorResponse.getValue().getBatch()); + CursorResponse outgoingCursorResponse(requestedNss, + clusterCursorId.getValue(), + incomingCursorResponse.getValue().getBatch(), + incomingCursorResponse.getValue().getAtClusterTime()); return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); } |