diff options
-rw-r--r-- | jstests/noPassthrough/readConcern_snapshot_mongos.js | 2 | ||||
-rw-r--r-- | jstests/sharding/single_shard_snapshot_read.js | 88 | ||||
-rw-r--r-- | src/mongo/db/read_concern_mongod.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/read_concern_args.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/repl/read_concern_args.h | 28 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_command_test_fixture.cpp | 191 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_command_test_fixture.h | 71 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_delete_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_distinct_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_and_modify_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_test.cpp | 55 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_insert_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_update_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate_test.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 26 |
16 files changed, 415 insertions, 118 deletions
diff --git a/jstests/noPassthrough/readConcern_snapshot_mongos.js b/jstests/noPassthrough/readConcern_snapshot_mongos.js index c1be784e73a..066c281c5cd 100644 --- a/jstests/noPassthrough/readConcern_snapshot_mongos.js +++ b/jstests/noPassthrough/readConcern_snapshot_mongos.js @@ -20,7 +20,7 @@ let st = new ShardingTest({shards: 1, rs: {nodes: 2}, config: 2, mongos: 1}); let testDB = st.getDB(dbName); // Insert data to create the collection. -assert.commandWorked(testDB[collName].insert({x: 1})); +assert.commandWorked(testDB[collName].insert({x: 1}, {writeConcern: {w: "majority"}})); flushRoutersAndRefreshShardMetadata(st, {ns: dbName + "." + collName, dbNames: [dbName]}); diff --git a/jstests/sharding/single_shard_snapshot_read.js b/jstests/sharding/single_shard_snapshot_read.js new file mode 100644 index 00000000000..69837719784 --- /dev/null +++ b/jstests/sharding/single_shard_snapshot_read.js @@ -0,0 +1,88 @@ +/** + * Tests readConcern level snapshot outside of transactions, targeting one shard out of two. The + * snapshot timestamp selection algorithm is different for single-shard reads from a sharded + * collection (SERVER-47952), make sure mongos returns the correct atClusterTime with cursor + * replies. + * + * @tags: [ + * requires_fcv_46, + * requires_majority_read_concern, + * requires_find_command + * ] + */ + +(function() { +"use strict"; + +load("jstests/libs/global_snapshot_reads_util.js"); +load("jstests/sharding/libs/sharded_transactions_helpers.js"); + +const nodeOptions = { + // Set a large snapshot window of 10 minutes for the test. + setParameter: {minSnapshotHistoryWindowInSeconds: 600} +}; + +const dbName = "test"; +const collName = "collection"; + +const st = + new ShardingTest({shards: 2, other: {configOptions: nodeOptions, rsOptions: nodeOptions}}); + +jsTestLog("initiated"); +const mongos = st.s0; + +assert.commandWorked(mongos.adminCommand({enableSharding: dbName})); +assert.commandWorked(mongos.adminCommand({shardCollection: "test.collection", key: {_id: 1}})); + +const ns = dbName + "." + collName; +assert.commandWorked(st.splitAt(ns, {_id: 5})); + +assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard0.shardName})); +assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {_id: 9}, to: st.shard1.shardName})); + +const db = mongos.getDB(dbName); +const docs = [...Array(10).keys()].map((i) => ({"_id": i})); +assert.commandWorked( + db.runCommand({insert: collName, documents: docs, writeConcern: {w: "majority"}})); + +jsTestLog("Advance mongos's clusterTime by writing to shard 1"); + +const insertReply = assert.commandWorked(db.runCommand({insert: collName, documents: [{_id: 10}]})); + +jsTestLog(`Wrote to shard 1 at timestamp ${insertReply.operationTime}`); + +jsTestLog("Read from shard 0"); + +let reply0 = assert.commandWorked(db.runCommand( + {find: collName, filter: {_id: {$lt: 4}}, batchSize: 1, readConcern: {level: "snapshot"}})); + +jsTestLog(`find reply: ${tojson(reply0)}`); + +let cursorId = reply0.cursor.id; +assert.neq(cursorId, 0); +assert(reply0.cursor.hasOwnProperty("atClusterTime")); +assert.lt(reply0.cursor.atClusterTime, insertReply.operationTime); + +jsTestLog("Write to shard 0"); + +const updateReply = assert.commandWorked(db.runCommand({ + update: collName, + updates: [{q: {_id: {$lt: 4}}, u: {$set: {x: 1}}}], + writeConcern: {w: "majority"} +})); + +jsTestLog(`Wrote to shard 0 at timestamp ${updateReply.operationTime}`); + +let reply1 = + assert.commandWorked(db.runCommand({getMore: cursorId, collection: collName, batchSize: 1})); + +jsTestLog(`getMore reply: ${tojson(reply1)}`); + +assert(reply1.cursor.hasOwnProperty("atClusterTime")); +assert.eq(reply0.cursor.atClusterTime, reply1.cursor.atClusterTime); + +// We have read the version of the document prior to the update. +assert(!reply1.cursor.nextBatch[0].hasOwnProperty("x")); + +st.stop(); +})(); diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp index 3663b9cdcc5..5d8027b3f71 100644 --- a/src/mongo/db/read_concern_mongod.cpp +++ b/src/mongo/db/read_concern_mongod.cpp @@ -374,7 +374,7 @@ Status waitForReadConcernImpl(OperationContext* opCtx, "No committed OpTime for snapshot read", !opTime.isNull()); ru->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, opTime.getTimestamp()); - repl::ReadConcernArgs::get(opCtx).setArgsAtClusterTimeForSnapshot(opTime.getTimestamp()); + repl::ReadConcernArgs::get(opCtx).setArgsAtClusterTime(opTime.getTimestamp()); } else if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern && replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet) { // This block is not used for kSnapshotReadConcern because snapshots are always speculative; diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp index d1067a0ff2e..9de1dd105d8 100644 --- a/src/mongo/db/repl/read_concern_args.cpp +++ b/src/mongo/db/repl/read_concern_args.cpp @@ -73,9 +73,11 @@ ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime, ReadConcernArgs::ReadConcernArgs(boost::optional<LogicalTime> clusterTime, boost::optional<ReadConcernLevel> level) - : _afterClusterTime(std::move(clusterTime)), + : _clientAfterClusterTime(std::move(clusterTime)), _level(std::move(level)), - _specified(_afterClusterTime || _level) {} + _specified(_clientAfterClusterTime || _level) { + _computedAfterClusterTime = _clientAfterClusterTime; +} std::string ReadConcernArgs::toString() const { return toBSON().toString(); @@ -94,7 +96,7 @@ BSONObj ReadConcernArgs::toBSONInner() const { } bool ReadConcernArgs::isEmpty() const { - return !_afterClusterTime && !_opTime && !_atClusterTime && !_level; + return !_computedAfterClusterTime && !_opTime && !_computedAtClusterTime && !_level; } bool ReadConcernArgs::isSpecified() const { @@ -114,11 +116,11 @@ boost::optional<OpTime> ReadConcernArgs::getArgsOpTime() const { } boost::optional<LogicalTime> ReadConcernArgs::getArgsAfterClusterTime() const { - return _afterClusterTime; + return _computedAfterClusterTime; } boost::optional<LogicalTime> ReadConcernArgs::getArgsAtClusterTime() const { - return _atClusterTime; + return _computedAtClusterTime; } Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) { @@ -158,7 +160,7 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) { if (!afterClusterTimeStatus.isOK()) { return afterClusterTimeStatus; } - _afterClusterTime = LogicalTime(afterClusterTime); + _computedAfterClusterTime = _clientAfterClusterTime = LogicalTime(afterClusterTime); } else if (fieldName == kAtClusterTimeFieldName) { Timestamp atClusterTime; auto atClusterTimeStatus = @@ -166,7 +168,7 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) { if (!atClusterTimeStatus.isOK()) { return atClusterTimeStatus; } - _atClusterTime = LogicalTime(atClusterTime); + _computedAtClusterTime = _clientAtClusterTime = LogicalTime(atClusterTime); } else if (fieldName == kLevelFieldName) { std::string levelString; // TODO pass field in rather than scanning again. @@ -201,13 +203,13 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) { } } - if (_afterClusterTime && _opTime) { + if (_clientAfterClusterTime && _opTime) { return Status(ErrorCodes::InvalidOptions, str::stream() << "Can not specify both " << kAfterClusterTimeFieldName << " and " << kAfterOpTimeFieldName); } - if (_afterClusterTime && _atClusterTime) { + if (_clientAfterClusterTime && _clientAtClusterTime) { return Status(ErrorCodes::InvalidOptions, "Specifying a timestamp for readConcern snapshot in a causally consistent " "session is not allowed. See " @@ -218,7 +220,7 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) { // Note: 'available' should not be used with after cluster time, as cluster time can wait for // replication whereas the premise of 'available' is to avoid waiting. 'linearizable' should not // be used with after cluster time, since linearizable reads are inherently causally consistent. - if (_afterClusterTime && getLevel() != ReadConcernLevel::kMajorityReadConcern && + if (_clientAfterClusterTime && getLevel() != ReadConcernLevel::kMajorityReadConcern && getLevel() != ReadConcernLevel::kLocalReadConcern && getLevel() != ReadConcernLevel::kSnapshotReadConcern) { return Status(ErrorCodes::InvalidOptions, @@ -236,7 +238,7 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) { << " is equal to " << readConcernLevels::kSnapshotName); } - if (_atClusterTime && getLevel() != ReadConcernLevel::kSnapshotReadConcern) { + if (_clientAtClusterTime && getLevel() != ReadConcernLevel::kSnapshotReadConcern) { return Status(ErrorCodes::InvalidOptions, str::stream() << kAtClusterTimeFieldName << " field can be set only if " << kLevelFieldName << " is equal to " @@ -244,14 +246,14 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) { } // Make sure that atClusterTime wasn't specified with zero seconds. - if (_atClusterTime && _atClusterTime->asTimestamp().isNull()) { + if (_clientAtClusterTime && _clientAtClusterTime->asTimestamp().isNull()) { return Status(ErrorCodes::InvalidOptions, str::stream() << kAtClusterTimeFieldName << " cannot be a null timestamp"); } // It's okay for afterClusterTime to be specified with zero seconds, but not an uninitialized // timestamp. - if (_afterClusterTime && _afterClusterTime == LogicalTime::kUninitialized) { + if (_clientAfterClusterTime && _clientAfterClusterTime == LogicalTime::kUninitialized) { return Status(ErrorCodes::InvalidOptions, str::stream() << kAfterClusterTimeFieldName << " cannot be a null timestamp"); } @@ -290,12 +292,12 @@ void ReadConcernArgs::_appendInfoInner(BSONObjBuilder* builder) const { _opTime->append(builder, kAfterOpTimeFieldName.toString()); } - if (_afterClusterTime) { - builder->append(kAfterClusterTimeFieldName, _afterClusterTime->asTimestamp()); + if (_computedAfterClusterTime) { + builder->append(kAfterClusterTimeFieldName, _computedAfterClusterTime->asTimestamp()); } - if (_atClusterTime) { - builder->append(kAtClusterTimeFieldName, _atClusterTime->asTimestamp()); + if (_computedAtClusterTime) { + builder->append(kAtClusterTimeFieldName, _computedAtClusterTime->asTimestamp()); } _provenance.serialize(builder); diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h index ba6004d13bf..2c7aca60282 100644 --- a/src/mongo/db/repl/read_concern_args.h +++ b/src/mongo/db/repl/read_concern_args.h @@ -182,13 +182,21 @@ public: * Set atClusterTime, clear afterClusterTime. The BSON representation becomes * {level: "snapshot", atClusterTime: <ts>}. */ - void setArgsAtClusterTimeForSnapshot(Timestamp ts) { + void setArgsAtClusterTime(Timestamp ts) { invariant(_level && _level == ReadConcernLevel::kSnapshotReadConcern); // Only overwrite a server-selected atClusterTime, not user-supplied. - invariant(_atClusterTime.is_initialized() == _atClusterTimeSelected); - _afterClusterTime = boost::none; - _atClusterTime = LogicalTime(ts); - _atClusterTimeSelected = true; + invariant(!_clientAtClusterTime); + _computedAfterClusterTime = boost::none; + _computedAtClusterTime = LogicalTime(ts); + } + + /** + * If we have selected an atClusterTime for non-transaction snapshot reads, clear it and restore + * the atClusterTime and afterClusterTime passed by the client. + */ + void clearArgsAtClusterTime() { + _computedAfterClusterTime = _clientAfterClusterTime; + _computedAtClusterTime = _clientAtClusterTime; } /** @@ -196,7 +204,7 @@ public: * function returns false if the atClusterTime was specified by the client. */ bool wasAtClusterTimeSelected() const { - return _atClusterTimeSelected; + return _computedAtClusterTime && _computedAtClusterTime != _clientAtClusterTime; } private: @@ -213,11 +221,13 @@ private: /** * Read data after cluster-wide cluster time. */ - boost::optional<LogicalTime> _afterClusterTime; + boost::optional<LogicalTime> _clientAfterClusterTime; + boost::optional<LogicalTime> _computedAfterClusterTime; /** * Read data at a particular cluster time. */ - boost::optional<LogicalTime> _atClusterTime; + boost::optional<LogicalTime> _clientAtClusterTime; + boost::optional<LogicalTime> _computedAtClusterTime; boost::optional<ReadConcernLevel> _level; /** @@ -233,8 +243,6 @@ private: bool _specified; ReadWriteConcernProvenance _provenance; - - bool _atClusterTimeSelected = false; }; } // namespace repl diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp index 0d1a88bf578..81951cd425c 100644 --- a/src/mongo/s/commands/cluster_command_test_fixture.cpp +++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp @@ -43,12 +43,19 @@ #include "mongo/db/logical_time_validator.h" #include "mongo/db/vector_clock.h" #include "mongo/s/cluster_last_error_info.h" +#include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/util/fail_point.h" #include "mongo/util/options_parser/startup_option_init.h" #include "mongo/util/tick_source_mock.h" namespace mongo { +const Timestamp ClusterCommandTestFixture::kAfterClusterTime(50, 2); + +const Timestamp ClusterCommandTestFixture::kShardClusterTime(51, 3); + +const CursorId ClusterCommandTestFixture::kCursorId(123); + void ClusterCommandTestFixture::setUp() { CatalogCacheTestFixture::setUp(); CatalogCacheTestFixture::setupNShards(numShards); @@ -80,13 +87,24 @@ void ClusterCommandTestFixture::setUp() { "enableStaleVersionAndSnapshotRetriesWithinTransactions"); } -BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj, bool includeAfterClusterTime) { +void ClusterCommandTestFixture::tearDown() { + // Delete any cursors left behind by tests, to avoid invariant in ~ClusterCursorManager. + auto opCtx = operationContext(); + auto cursorManager = Grid::get(opCtx)->getCursorManager(); + cursorManager->killAllCursors(opCtx); +} + +BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj, + bool startTransaction, + bool includeAfterClusterTime) { BSONObjBuilder bob(cmdObj); // Each command runs in a new session. bob.append("lsid", makeLogicalSessionIdForTest().toBSON()); - bob.append("txnNumber", TxnNumber(1)); - bob.append("autocommit", false); - bob.append("startTransaction", true); + if (startTransaction) { + bob.append("txnNumber", TxnNumber(1)); + bob.append("autocommit", false); + bob.append("startTransaction", true); + } BSONObjBuilder readConcernBob = bob.subobjStart(repl::ReadConcernArgs::kReadConcernFieldName); readConcernBob.append("level", "snapshot"); @@ -98,6 +116,14 @@ BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj, bool includeAfterClu return bob.obj(); } +BSONObj ClusterCommandTestFixture::_makeTxnCmd(BSONObj cmdObj, bool includeAfterClusterTime) { + return _makeCmd(cmdObj, true, includeAfterClusterTime); +} + +BSONObj ClusterCommandTestFixture::_makeNonTxnCmd(BSONObj cmdObj, bool includeAfterClusterTime) { + return _makeCmd(cmdObj, false, includeAfterClusterTime); +} + void ClusterCommandTestFixture::expectReturnsError(ErrorCodes::Error code) { onCommandForPoolExecutor([code](const executor::RemoteCommandRequest& request) { BSONObjBuilder resBob; @@ -227,80 +253,108 @@ void ClusterCommandTestFixture::runTxnCommandMaxErrors(BSONObj cmd, } void ClusterCommandTestFixture::testNoErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd) { - // Target one shard. - runCommandSuccessful(_makeCmd(targetedCmd), true); + runCommandSuccessful(_makeTxnCmd(targetedCmd), true); // Target all shards. if (!scatterGatherCmd.isEmpty()) { - runCommandSuccessful(_makeCmd(scatterGatherCmd), false); + runCommandSuccessful(_makeTxnCmd(scatterGatherCmd), false); } } void ClusterCommandTestFixture::testRetryOnSnapshotError(BSONObj targetedCmd, BSONObj scatterGatherCmd) { // Target one shard. - runTxnCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true); - runTxnCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true); + runTxnCommandOneError(_makeTxnCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true); + runTxnCommandOneError(_makeTxnCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true); // Target all shards if (!scatterGatherCmd.isEmpty()) { - runTxnCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false); - runTxnCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false); + runTxnCommandOneError( + _makeTxnCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false); + runTxnCommandOneError(_makeTxnCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false); } } void ClusterCommandTestFixture::testMaxRetriesSnapshotErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd) { // Target one shard. - runTxnCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true); - runTxnCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true); + runTxnCommandMaxErrors(_makeTxnCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true); + runTxnCommandMaxErrors(_makeTxnCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true); // Target all shards if (!scatterGatherCmd.isEmpty()) { - runTxnCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false); - runTxnCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false); + runTxnCommandMaxErrors( + _makeTxnCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false); + runTxnCommandMaxErrors(_makeTxnCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false); } } -void ClusterCommandTestFixture::testAttachesAtClusterTimeForSnapshotReadConcern( - BSONObj targetedCmd, BSONObj scatterGatherCmd) { +void ClusterCommandTestFixture::testAttachesAtClusterTimeForTxnSnapshotReadConcern( + BSONObj targetedCmd, BSONObj scatterGatherCmd, bool createsCursor) { + + // Target one shard. + runCommandInspectRequests(_makeTxnCmd(targetedCmd), _containsAtClusterTimeOnly, true); + if (createsCursor) + _assertCursorReadConcern(true, boost::none); + + // Target all shards. + if (!scatterGatherCmd.isEmpty()) { + runCommandInspectRequests(_makeTxnCmd(scatterGatherCmd), _containsAtClusterTimeOnly, false); + if (createsCursor) + _assertCursorReadConcern(false, boost::none); + } +} - auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) { - ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); - }; +void ClusterCommandTestFixture::testTxnSnapshotReadConcernWithAfterClusterTime( + BSONObj targetedCmd, BSONObj scatterGatherCmd, bool createsCursor) { // Target one shard. - runCommandInspectRequests(_makeCmd(targetedCmd), containsAtClusterTime, true); + runCommandInspectRequests(_makeTxnCmd(targetedCmd, true), _containsAtClusterTimeOnly, true); + if (createsCursor) + _assertCursorReadConcern(true, boost::none); // Target all shards. if (!scatterGatherCmd.isEmpty()) { - runCommandInspectRequests(_makeCmd(scatterGatherCmd), containsAtClusterTime, false); + runCommandInspectRequests( + _makeTxnCmd(scatterGatherCmd, true), _containsAtClusterTimeOnly, false); + if (createsCursor) + _assertCursorReadConcern(false, boost::none); } } -void ClusterCommandTestFixture::testSnapshotReadConcernWithAfterClusterTime( - BSONObj targetedCmd, BSONObj scatterGatherCmd) { +void ClusterCommandTestFixture::testAttachesAtClusterTimeForNonTxnSnapshotReadConcern( + BSONObj targetedCmd, BSONObj scatterGatherCmd, bool createsCursor) { + + // Target one shard. + runCommandInspectRequests(_makeNonTxnCmd(targetedCmd), _omitsClusterTime, true); + if (createsCursor) + _assertCursorReadConcern(true, kShardClusterTime); - auto containsAtClusterTimeNoAfterClusterTime = - [&](const executor::RemoteCommandRequest& request) { - ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); - ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo()); + // Target all shards. + if (!scatterGatherCmd.isEmpty()) { + runCommandInspectRequests( + _makeNonTxnCmd(scatterGatherCmd), _containsAtClusterTimeOnly, false); + if (createsCursor) + _assertCursorReadConcern(false, kInMemoryLogicalTime.asTimestamp()); + } +} - // The chosen atClusterTime should be greater than or equal to the request's - // afterClusterTime. - ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()), - LogicalTime(kAfterClusterTime)); - }; +void ClusterCommandTestFixture::testNonTxnSnapshotReadConcernWithAfterClusterTime( + BSONObj targetedCmd, BSONObj scatterGatherCmd, bool createsCursor) { // Target one shard. runCommandInspectRequests( - _makeCmd(targetedCmd, true), containsAtClusterTimeNoAfterClusterTime, true); + _makeNonTxnCmd(targetedCmd, true), _containsAfterClusterTimeOnly, true); + if (createsCursor) + _assertCursorReadConcern(true, kShardClusterTime); // Target all shards. if (!scatterGatherCmd.isEmpty()) { runCommandInspectRequests( - _makeCmd(scatterGatherCmd, true), containsAtClusterTimeNoAfterClusterTime, false); + _makeNonTxnCmd(scatterGatherCmd, true), _containsAtClusterTimeOnly, false); + if (createsCursor) + _assertCursorReadConcern(false, kAfterClusterTime); } } @@ -310,6 +364,73 @@ void ClusterCommandTestFixture::appendTxnResponseMetadata(BSONObjBuilder& bob) { txnResponseMetadata.serialize(&bob); } +void ClusterCommandTestFixture::_assertCursorReadConcern( + bool isTargeted, boost::optional<Timestamp> expectedAtClusterTime) { + auto opCtx = operationContext(); + auto cursorManager = Grid::get(opCtx)->getCursorManager(); + auto cursors = + cursorManager->getIdleCursors(opCtx, MongoProcessInterface::CurrentOpUserMode::kIncludeAll); + ASSERT_EQUALS(cursors.size(), 1); + auto cursorId = *cursors[0].getCursorId(); + auto pinnedCursor = cursorManager->checkOutCursor( + kNss, cursorId, opCtx, [](UserNameIterator) { return Status::OK(); }); + + ASSERT_OK(pinnedCursor.getStatus()); + auto readConcern = pinnedCursor.getValue()->getReadConcern(); + auto nRemotes = pinnedCursor.getValue()->getNumRemotes(); + ASSERT_EQUALS(nRemotes, isTargeted ? 1 : 2); + + // User supplies no atClusterTime. If not isTargeted then mongos selected a timestamp and called + // setArgsAtClusterTime. Else mongos let the shard select atClusterTime. The shard returned it + // to mongos, and mongos called setArgsAtClusterTime. + if (expectedAtClusterTime) { + ASSERT_EQUALS(readConcern->getArgsAtClusterTime()->asTimestamp(), *expectedAtClusterTime); + ASSERT_TRUE(readConcern->wasAtClusterTimeSelected()); + } else { + ASSERT_FALSE(readConcern->getArgsAtClusterTime()); + ASSERT_FALSE(readConcern->wasAtClusterTimeSelected()); + } + + // Kill cursor on shard(s) and remove from ClusterCursorManager, to ensure we have exactly 1 + // idle cursor next time this method is called. (Otherwise this test would need a complex + // method for determining which cursor to examine.) + pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); + for (std::size_t i = 0; i < nRemotes; ++i) { + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQUALS("killCursors"_sd, request.cmdObj.firstElement().fieldNameStringData()); + ASSERT_EQUALS(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + return BSON("ok" << 1); + }); + } +} + +void ClusterCommandTestFixture::_containsSelectedAtClusterTime( + const executor::RemoteCommandRequest& request) { + ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); + ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo()); + + // The chosen atClusterTime should be greater than or equal to the request's afterClusterTime. + ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()), + LogicalTime(kAfterClusterTime)); +} + +void ClusterCommandTestFixture::_containsAtClusterTimeOnly( + const executor::RemoteCommandRequest& request) { + ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); + ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo()); +} + +void ClusterCommandTestFixture::_containsAfterClusterTimeOnly( + const executor::RemoteCommandRequest& request) { + ASSERT(request.cmdObj["readConcern"]["atClusterTime"].eoo()); + ASSERT(!request.cmdObj["readConcern"]["afterClusterTime"].eoo()); +} + +void ClusterCommandTestFixture::_omitsClusterTime(const executor::RemoteCommandRequest& request) { + ASSERT(request.cmdObj["readConcern"]["atClusterTime"].eoo()); + ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo()); +} + // Satisfies dependency from StoreSASLOPtions. MONGO_STARTUP_OPTIONS_STORE(CoreOptions)(InitializerContext*) { return Status::OK(); diff --git a/src/mongo/s/commands/cluster_command_test_fixture.h b/src/mongo/s/commands/cluster_command_test_fixture.h index 9c5406c073b..9c0f3f4d5b7 100644 --- a/src/mongo/s/commands/cluster_command_test_fixture.h +++ b/src/mongo/s/commands/cluster_command_test_fixture.h @@ -31,6 +31,7 @@ #include "mongo/platform/basic.h" +#include "mongo/db/cursor_id.h" #include "mongo/s/catalog_cache_test_fixture.h" #include "mongo/s/commands/strategy.h" @@ -46,10 +47,16 @@ protected: const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(10, 1)); - const Timestamp kAfterClusterTime = Timestamp(50, 2); + static const Timestamp kAfterClusterTime; + + static const Timestamp kShardClusterTime; + + static const CursorId kCursorId; void setUp() override; + void tearDown() override; + virtual void expectInspectRequest(int shardIndex, InspectionCallback cb) = 0; virtual void expectReturnsSuccess(int shardIndex) = 0; @@ -85,17 +92,34 @@ protected: void testMaxRetriesSnapshotErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj()); /** - * Verifies that atClusterTime is attached to the given commands. + * Verifies that atClusterTime is attached to the given commands in a transaction. */ - void testAttachesAtClusterTimeForSnapshotReadConcern(BSONObj targetedCmd, - BSONObj scatterGatherCmd = BSONObj()); + void testAttachesAtClusterTimeForTxnSnapshotReadConcern(BSONObj targetedCmd, + BSONObj scatterGatherCmd = BSONObj(), + bool createsCursor = false); /** * Verifies that the chosen atClusterTime is greater than or equal to each command's - * afterClusterTime. + * afterClusterTime in a transaction. + */ + void testTxnSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd, + BSONObj scatterGatherCmd = BSONObj(), + bool createsCursor = false); + + /** + * Verifies that atClusterTime is attached to the given non-transaction snapshot commands. */ - void testSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd, - BSONObj scatterGatherCmd = BSONObj()); + void testAttachesAtClusterTimeForNonTxnSnapshotReadConcern(BSONObj targetedCmd, + BSONObj scatterGatherCmd = BSONObj(), + bool createsCursor = false); + + /** + * Verifies that the chosen atClusterTime is greater than or equal to each non-transaction + * snapshot command's afterClusterTime. + */ + void testNonTxnSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd, + BSONObj scatterGatherCmd = BSONObj(), + bool createsCursor = false); /** * Appends the metadata shards return on responses to transaction statements, such as the @@ -105,11 +129,36 @@ protected: private: /** - * Makes a new command object from the one given by apppending read concern - * snapshot and the appropriate transaction options. If includeAfterClusterTime - * is true, also appends afterClusterTime to the read concern. + * Makes a new command object from the one given by appending read concern snapshot and the + * appropriate transaction options. If includeAfterClusterTime is true, also appends + * afterClusterTime to the read concern. */ - BSONObj _makeCmd(BSONObj cmdObj, bool includeAfterClusterTime = false); + BSONObj _makeTxnCmd(BSONObj cmdObj, bool includeAfterClusterTime = false); + + /** + * Makes a new command object from the one given by appending read concern snapshot. If + * includeAfterClusterTime is true, also appends afterClusterTime to the read concern. + */ + BSONObj _makeNonTxnCmd(BSONObj cmdObj, bool includeAfterClusterTime = false); + + /** + * Helper method. + */ + BSONObj _makeCmd(BSONObj cmdObj, bool startTransaction, bool includeAfterClusterTime); + + /* + * Check that the ClusterCursorManager contains an idle cursor with proper readConcern set. + */ + void _assertCursorReadConcern(bool isTargeted, + boost::optional<Timestamp> expectedAtClusterTime); + + static void _containsSelectedAtClusterTime(const executor::RemoteCommandRequest& request); + + static void _containsAtClusterTimeOnly(const executor::RemoteCommandRequest& request); + + static void _containsAfterClusterTimeOnly(const executor::RemoteCommandRequest& request); + + static void _omitsClusterTime(const executor::RemoteCommandRequest& request); // Enables the transaction router to retry within a transaction on stale version and snapshot // errors for the duration of each test. diff --git a/src/mongo/s/commands/cluster_delete_test.cpp b/src/mongo/s/commands/cluster_delete_test.cpp index 407ce5e78e2..f0fae5cbeb2 100644 --- a/src/mongo/s/commands/cluster_delete_test.cpp +++ b/src/mongo/s/commands/cluster_delete_test.cpp @@ -73,11 +73,11 @@ TEST_F(ClusterDeleteTest, NoErrors) { } TEST_F(ClusterDeleteTest, AttachesAtClusterTimeForSnapshotReadConcern) { - testAttachesAtClusterTimeForSnapshotReadConcern(kDeleteCmdTargeted, kDeleteCmdScatterGather); + testAttachesAtClusterTimeForTxnSnapshotReadConcern(kDeleteCmdTargeted, kDeleteCmdScatterGather); } TEST_F(ClusterDeleteTest, SnapshotReadConcernWithAfterClusterTime) { - testSnapshotReadConcernWithAfterClusterTime(kDeleteCmdTargeted, kDeleteCmdScatterGather); + testTxnSnapshotReadConcernWithAfterClusterTime(kDeleteCmdTargeted, kDeleteCmdScatterGather); } } // namespace diff --git a/src/mongo/s/commands/cluster_distinct_test.cpp b/src/mongo/s/commands/cluster_distinct_test.cpp index 40f0226e820..d8513f36dc5 100644 --- a/src/mongo/s/commands/cluster_distinct_test.cpp +++ b/src/mongo/s/commands/cluster_distinct_test.cpp @@ -80,12 +80,12 @@ TEST_F(ClusterDistinctTest, MaxRetriesSnapshotErrors) { } TEST_F(ClusterDistinctTest, AttachesAtClusterTimeForSnapshotReadConcern) { - testAttachesAtClusterTimeForSnapshotReadConcern(kDistinctCmdTargeted, - kDistinctCmdScatterGather); + testAttachesAtClusterTimeForTxnSnapshotReadConcern(kDistinctCmdTargeted, + kDistinctCmdScatterGather); } TEST_F(ClusterDistinctTest, SnapshotReadConcernWithAfterClusterTime) { - testSnapshotReadConcernWithAfterClusterTime(kDistinctCmdTargeted, kDistinctCmdScatterGather); + testTxnSnapshotReadConcernWithAfterClusterTime(kDistinctCmdTargeted, kDistinctCmdScatterGather); } } // namespace diff --git a/src/mongo/s/commands/cluster_find_and_modify_test.cpp b/src/mongo/s/commands/cluster_find_and_modify_test.cpp index f0a44494591..8f126039fd3 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_test.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_test.cpp @@ -78,11 +78,11 @@ TEST_F(ClusterFindAndModifyTest, MaxRetriesSnapshotErrors) { } TEST_F(ClusterFindAndModifyTest, AttachesAtClusterTimeForSnapshotReadConcern) { - testAttachesAtClusterTimeForSnapshotReadConcern(kFindAndModifyCmdTargeted); + testAttachesAtClusterTimeForTxnSnapshotReadConcern(kFindAndModifyCmdTargeted); } TEST_F(ClusterFindAndModifyTest, SnapshotReadConcernWithAfterClusterTime) { - testSnapshotReadConcernWithAfterClusterTime(kFindAndModifyCmdTargeted); + testTxnSnapshotReadConcernWithAfterClusterTime(kFindAndModifyCmdTargeted); } } // namespace diff --git a/src/mongo/s/commands/cluster_find_test.cpp b/src/mongo/s/commands/cluster_find_test.cpp index 8d0dc6792d4..7d9e219961d 100644 --- a/src/mongo/s/commands/cluster_find_test.cpp +++ b/src/mongo/s/commands/cluster_find_test.cpp @@ -31,34 +31,23 @@ #include "mongo/db/query/cursor_response.h" #include "mongo/s/commands/cluster_command_test_fixture.h" +#include "mongo/s/query/cluster_cursor_manager.h" namespace mongo { namespace { class ClusterFindTest : public ClusterCommandTestFixture { protected: - const BSONObj kFindCmdScatterGather = BSON("find" - << "coll"); - const BSONObj kFindCmdTargeted = BSON("find" - << "coll" - << "filter" << BSON("_id" << 0)); + // Batch size 1, so when expectInspectRequest returns one doc, mongos doesn't wait for more. + const BSONObj kFindCmdScatterGather = BSON("find" << kNss.coll() << "batchSize" << 1); + const BSONObj kFindCmdTargeted = + BSON("find" << kNss.coll() << "filter" << BSON("_id" << 0) << "batchSize" << 1); // The index of the shard expected to receive the response is used to prevent different shards // from returning documents with the same shard key. This is expected to be 0 for queries // targeting one shard. void expectReturnsSuccess(int shardIndex) override { - onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { - ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); - - std::vector<BSONObj> batch = {BSON("_id" << shardIndex)}; - CursorResponse cursorResponse(kNss, CursorId(0), batch); - - BSONObjBuilder bob; - bob.appendElementsUnique( - cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse)); - appendTxnResponseMetadata(bob); - return bob.obj(); - }); + expectInspectRequest(shardIndex, [](const executor::RemoteCommandRequest&) {}); } void expectInspectRequest(int shardIndex, InspectionCallback cb) override { @@ -68,8 +57,17 @@ protected: cb(request); std::vector<BSONObj> batch = {BSON("_id" << shardIndex)}; - CursorResponse cursorResponse(kNss, CursorId(0), batch); - + // User supplies no atClusterTime. For single-shard non-transaction snapshot reads, + // mongos lets the shard (which this function simulates) select a read timestamp. + boost::optional<Timestamp> atClusterTime; + if (request.cmdObj["txnNumber"].eoo() && !request.cmdObj["readConcern"].eoo()) { + auto rc = request.cmdObj["readConcern"].Obj(); + if (rc["level"].String() == "snapshot" && rc["atClusterTime"].eoo()) { + atClusterTime = kShardClusterTime; + } + } + + CursorResponse cursorResponse(kNss, kCursorId, batch, atClusterTime); BSONObjBuilder bob; bob.appendElementsUnique( cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse)); @@ -91,12 +89,23 @@ TEST_F(ClusterFindTest, MaxRetriesSnapshotErrors) { testMaxRetriesSnapshotErrors(kFindCmdTargeted, kFindCmdScatterGather); } -TEST_F(ClusterFindTest, AttachesAtClusterTimeForSnapshotReadConcern) { - testAttachesAtClusterTimeForSnapshotReadConcern(kFindCmdTargeted, kFindCmdScatterGather); +TEST_F(ClusterFindTest, AttachesAtClusterTimeForTransactionSnapshotReadConcern) { + testAttachesAtClusterTimeForTxnSnapshotReadConcern( + kFindCmdTargeted, kFindCmdScatterGather, true); +} + +TEST_F(ClusterFindTest, TransactionSnapshotReadConcernWithAfterClusterTime) { + testTxnSnapshotReadConcernWithAfterClusterTime(kFindCmdTargeted, kFindCmdScatterGather, true); +} + +TEST_F(ClusterFindTest, AttachesAtClusterTimeForNonTransactionSnapshotReadConcern) { + testAttachesAtClusterTimeForNonTxnSnapshotReadConcern( + kFindCmdTargeted, kFindCmdScatterGather, true); } -TEST_F(ClusterFindTest, SnapshotReadConcernWithAfterClusterTime) { - testSnapshotReadConcernWithAfterClusterTime(kFindCmdTargeted, kFindCmdScatterGather); +TEST_F(ClusterFindTest, NonTransactionSnapshotReadConcernWithAfterClusterTime) { + testNonTxnSnapshotReadConcernWithAfterClusterTime( + kFindCmdTargeted, kFindCmdScatterGather, true); } } // namespace diff --git a/src/mongo/s/commands/cluster_insert_test.cpp b/src/mongo/s/commands/cluster_insert_test.cpp index ff5fb075486..24544eeb8d4 100644 --- a/src/mongo/s/commands/cluster_insert_test.cpp +++ b/src/mongo/s/commands/cluster_insert_test.cpp @@ -73,11 +73,11 @@ TEST_F(ClusterInsertTest, NoErrors) { } TEST_F(ClusterInsertTest, AttachesAtClusterTimeForSnapshotReadConcern) { - testAttachesAtClusterTimeForSnapshotReadConcern(kInsertCmdTargeted, kInsertCmdScatterGather); + testAttachesAtClusterTimeForTxnSnapshotReadConcern(kInsertCmdTargeted, kInsertCmdScatterGather); } TEST_F(ClusterInsertTest, SnapshotReadConcernWithAfterClusterTime) { - testSnapshotReadConcernWithAfterClusterTime(kInsertCmdTargeted, kInsertCmdScatterGather); + testTxnSnapshotReadConcernWithAfterClusterTime(kInsertCmdTargeted, kInsertCmdScatterGather); } } // namespace diff --git a/src/mongo/s/commands/cluster_update_test.cpp b/src/mongo/s/commands/cluster_update_test.cpp index 53aee707522..9ee788c2a1e 100644 --- a/src/mongo/s/commands/cluster_update_test.cpp +++ b/src/mongo/s/commands/cluster_update_test.cpp @@ -77,11 +77,11 @@ TEST_F(ClusterUpdateTest, NoErrors) { } TEST_F(ClusterUpdateTest, AttachesAtClusterTimeForSnapshotReadConcern) { - testAttachesAtClusterTimeForSnapshotReadConcern(kUpdateCmdTargeted, kUpdateCmdScatterGather); + testAttachesAtClusterTimeForTxnSnapshotReadConcern(kUpdateCmdTargeted, kUpdateCmdScatterGather); } TEST_F(ClusterUpdateTest, SnapshotReadConcernWithAfterClusterTime) { - testSnapshotReadConcernWithAfterClusterTime(kUpdateCmdTargeted, kUpdateCmdScatterGather); + testTxnSnapshotReadConcernWithAfterClusterTime(kUpdateCmdTargeted, kUpdateCmdScatterGather); } } // namespace diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 39763246f92..c1c7a18e106 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -632,7 +632,7 @@ void runCommand(OperationContext* opCtx, } return latestKnownClusterTime.asTimestamp(); }(); - readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime); + readConcernArgs.setArgsAtClusterTime(atClusterTime); } replyBuilder->reset(); @@ -822,7 +822,8 @@ void runCommand(OperationContext* opCtx, abortGuard.dismiss(); continue; - } else if (!ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) { + } else if (auto rc = ReadConcernArgs::get(opCtx); + rc.getArgsAtClusterTime() && !rc.wasAtClusterTimeSelected()) { // Non-transaction snapshot read. The client sent readConcern: {level: // "snapshot", atClusterTime: T}, where T is older than // minSnapshotHistoryWindowInSeconds, retrying won't succeed. diff --git a/src/mongo/s/query/cluster_aggregate_test.cpp b/src/mongo/s/query/cluster_aggregate_test.cpp index 08bcc8081e9..3bb16f7fb0d 100644 --- a/src/mongo/s/query/cluster_aggregate_test.cpp +++ b/src/mongo/s/query/cluster_aggregate_test.cpp @@ -121,12 +121,13 @@ TEST_F(ClusterAggregateTest, MaxRetriesSnapshotErrors) { } TEST_F(ClusterAggregateTest, AttachesAtClusterTimeForSnapshotReadConcern) { - testAttachesAtClusterTimeForSnapshotReadConcern(kAggregateCmdTargeted, - kAggregateCmdScatterGather); + testAttachesAtClusterTimeForTxnSnapshotReadConcern(kAggregateCmdTargeted, + kAggregateCmdScatterGather); } TEST_F(ClusterAggregateTest, SnapshotReadConcernWithAfterClusterTime) { - testSnapshotReadConcernWithAfterClusterTime(kAggregateCmdTargeted, kAggregateCmdScatterGather); + testTxnSnapshotReadConcernWithAfterClusterTime(kAggregateCmdTargeted, + kAggregateCmdScatterGather); } TEST_F(ClusterAggregateTest, ShouldFailWhenFromMongosIsTrue) { diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 422c02406da..380143ed612 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -188,8 +188,9 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( } auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (readConcernArgs.wasAtClusterTimeSelected()) { - // If mongos selected atClusterTime or received it from client, transmit it to shard. + // mongos selects atClusterTime for multi-shard snapshot reads. For a single-shard read, let the + // shard select it. + if (readConcernArgs.wasAtClusterTimeSelected() && shardIds.size() > 1) { qrToForward->setReadConcern(readConcernArgs.toBSONInner()); } @@ -246,9 +247,11 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, query.getQueryRequest().getFilter(), query.getQueryRequest().getCollation()); + auto& readConcern = ReadConcernArgs::get(opCtx); + // Construct the query and parameters. Defer setting skip and limit here until // we determine if the query is targeting multi-shards or a single shard below. - ClusterClientCursorParams params(query.nss(), readPref, ReadConcernArgs::get(opCtx)); + ClusterClientCursorParams params(query.nss(), readPref, readConcern); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.batchSize = query.getQueryRequest().getEffectiveBatchSize(); params.tailableMode = query.getQueryRequest().getTailableMode(); @@ -262,6 +265,12 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, params.isAutoCommit = false; } + // For single-shard non-transaction snapshot reads, we pass readConcern level "snapshot" to + // the shard. It selects "atClusterTime" and replies with it. Store it on the mongos cursor. + if (shardIds.size() == 1 && readConcern.wasAtClusterTimeSelected()) { + params.readConcern->clearArgsAtClusterTime(); + } + // This is the batchSize passed to each subsequent getMore command issued by the cursor. We // usually use the batchSize associated with the initial find, but as it is illegal to send a // getMore with a batchSize of 0, we set it to use the default batchSize logic. @@ -323,13 +332,22 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, : ClusterCursorManager::CursorType::SingleTarget; // Only set skip, limit and sort to be applied to on the router for the multi-shard case. For - // the single-shard case skip/limit as well as sorts are appled on mongod. + // the single-shard case skip/limit as well as sorts are applied on mongod. if (cursorType == ClusterCursorManager::CursorType::MultiTarget) { const auto qr = query.getQueryRequest(); params.skipToApplyOnRouter = qr.getSkip(); params.limit = qr.getLimit(); params.sortToApplyOnRouter = sortComparatorObj; params.compareWholeSortKeyOnRouter = compareWholeSortKeyOnRouter; + } else if (!params.remotes.empty()) { + // For single-shard non-transaction snapshot reads, we pass readConcern level "snapshot" to + // the shard. It selects "atClusterTime" and replies with it. Store it on the mongos cursor + // and opCtx readConcern, whence it is returned to the caller in our reply's atClusterTime. + auto clusterTime = params.remotes[0].getCursorResponse().getAtClusterTime(); + if (clusterTime && !params.readConcern->getArgsAtClusterTime()) { + params.readConcern->setArgsAtClusterTime(*clusterTime); + readConcern.setArgsAtClusterTime(*clusterTime); + } } // Transfer the established cursors to a ClusterClientCursor. |