summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/readConcern_snapshot_mongos.js2
-rw-r--r--jstests/sharding/single_shard_snapshot_read.js88
-rw-r--r--src/mongo/db/read_concern_mongod.cpp2
-rw-r--r--src/mongo/db/repl/read_concern_args.cpp36
-rw-r--r--src/mongo/db/repl/read_concern_args.h28
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.cpp191
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.h71
-rw-r--r--src/mongo/s/commands/cluster_delete_test.cpp4
-rw-r--r--src/mongo/s/commands/cluster_distinct_test.cpp6
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_test.cpp4
-rw-r--r--src/mongo/s/commands/cluster_find_test.cpp55
-rw-r--r--src/mongo/s/commands/cluster_insert_test.cpp4
-rw-r--r--src/mongo/s/commands/cluster_update_test.cpp4
-rw-r--r--src/mongo/s/commands/strategy.cpp5
-rw-r--r--src/mongo/s/query/cluster_aggregate_test.cpp7
-rw-r--r--src/mongo/s/query/cluster_find.cpp26
16 files changed, 415 insertions, 118 deletions
diff --git a/jstests/noPassthrough/readConcern_snapshot_mongos.js b/jstests/noPassthrough/readConcern_snapshot_mongos.js
index c1be784e73a..066c281c5cd 100644
--- a/jstests/noPassthrough/readConcern_snapshot_mongos.js
+++ b/jstests/noPassthrough/readConcern_snapshot_mongos.js
@@ -20,7 +20,7 @@ let st = new ShardingTest({shards: 1, rs: {nodes: 2}, config: 2, mongos: 1});
let testDB = st.getDB(dbName);
// Insert data to create the collection.
-assert.commandWorked(testDB[collName].insert({x: 1}));
+assert.commandWorked(testDB[collName].insert({x: 1}, {writeConcern: {w: "majority"}}));
flushRoutersAndRefreshShardMetadata(st, {ns: dbName + "." + collName, dbNames: [dbName]});
diff --git a/jstests/sharding/single_shard_snapshot_read.js b/jstests/sharding/single_shard_snapshot_read.js
new file mode 100644
index 00000000000..69837719784
--- /dev/null
+++ b/jstests/sharding/single_shard_snapshot_read.js
@@ -0,0 +1,88 @@
+/**
+ * Tests readConcern level snapshot outside of transactions, targeting one shard out of two. The
+ * snapshot timestamp selection algorithm is different for single-shard reads from a sharded
+ * collection (SERVER-47952), make sure mongos returns the correct atClusterTime with cursor
+ * replies.
+ *
+ * @tags: [
+ * requires_fcv_46,
+ * requires_majority_read_concern,
+ * requires_find_command
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/global_snapshot_reads_util.js");
+load("jstests/sharding/libs/sharded_transactions_helpers.js");
+
+const nodeOptions = {
+ // Set a large snapshot window of 10 minutes for the test.
+ setParameter: {minSnapshotHistoryWindowInSeconds: 600}
+};
+
+const dbName = "test";
+const collName = "collection";
+
+const st =
+ new ShardingTest({shards: 2, other: {configOptions: nodeOptions, rsOptions: nodeOptions}});
+
+jsTestLog("initiated");
+const mongos = st.s0;
+
+assert.commandWorked(mongos.adminCommand({enableSharding: dbName}));
+assert.commandWorked(mongos.adminCommand({shardCollection: "test.collection", key: {_id: 1}}));
+
+const ns = dbName + "." + collName;
+assert.commandWorked(st.splitAt(ns, {_id: 5}));
+
+assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: st.shard0.shardName}));
+assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {_id: 9}, to: st.shard1.shardName}));
+
+const db = mongos.getDB(dbName);
+const docs = [...Array(10).keys()].map((i) => ({"_id": i}));
+assert.commandWorked(
+ db.runCommand({insert: collName, documents: docs, writeConcern: {w: "majority"}}));
+
+jsTestLog("Advance mongos's clusterTime by writing to shard 1");
+
+const insertReply = assert.commandWorked(db.runCommand({insert: collName, documents: [{_id: 10}]}));
+
+jsTestLog(`Wrote to shard 1 at timestamp ${insertReply.operationTime}`);
+
+jsTestLog("Read from shard 0");
+
+let reply0 = assert.commandWorked(db.runCommand(
+ {find: collName, filter: {_id: {$lt: 4}}, batchSize: 1, readConcern: {level: "snapshot"}}));
+
+jsTestLog(`find reply: ${tojson(reply0)}`);
+
+let cursorId = reply0.cursor.id;
+assert.neq(cursorId, 0);
+assert(reply0.cursor.hasOwnProperty("atClusterTime"));
+assert.lt(reply0.cursor.atClusterTime, insertReply.operationTime);
+
+jsTestLog("Write to shard 0");
+
+const updateReply = assert.commandWorked(db.runCommand({
+ update: collName,
+ updates: [{q: {_id: {$lt: 4}}, u: {$set: {x: 1}}}],
+ writeConcern: {w: "majority"}
+}));
+
+jsTestLog(`Wrote to shard 0 at timestamp ${updateReply.operationTime}`);
+
+let reply1 =
+ assert.commandWorked(db.runCommand({getMore: cursorId, collection: collName, batchSize: 1}));
+
+jsTestLog(`getMore reply: ${tojson(reply1)}`);
+
+assert(reply1.cursor.hasOwnProperty("atClusterTime"));
+assert.eq(reply0.cursor.atClusterTime, reply1.cursor.atClusterTime);
+
+// We have read the version of the document prior to the update.
+assert(!reply1.cursor.nextBatch[0].hasOwnProperty("x"));
+
+st.stop();
+})();
diff --git a/src/mongo/db/read_concern_mongod.cpp b/src/mongo/db/read_concern_mongod.cpp
index 3663b9cdcc5..5d8027b3f71 100644
--- a/src/mongo/db/read_concern_mongod.cpp
+++ b/src/mongo/db/read_concern_mongod.cpp
@@ -374,7 +374,7 @@ Status waitForReadConcernImpl(OperationContext* opCtx,
"No committed OpTime for snapshot read",
!opTime.isNull());
ru->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, opTime.getTimestamp());
- repl::ReadConcernArgs::get(opCtx).setArgsAtClusterTimeForSnapshot(opTime.getTimestamp());
+ repl::ReadConcernArgs::get(opCtx).setArgsAtClusterTime(opTime.getTimestamp());
} else if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern &&
replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet) {
// This block is not used for kSnapshotReadConcern because snapshots are always speculative;
diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp
index d1067a0ff2e..9de1dd105d8 100644
--- a/src/mongo/db/repl/read_concern_args.cpp
+++ b/src/mongo/db/repl/read_concern_args.cpp
@@ -73,9 +73,11 @@ ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime,
ReadConcernArgs::ReadConcernArgs(boost::optional<LogicalTime> clusterTime,
boost::optional<ReadConcernLevel> level)
- : _afterClusterTime(std::move(clusterTime)),
+ : _clientAfterClusterTime(std::move(clusterTime)),
_level(std::move(level)),
- _specified(_afterClusterTime || _level) {}
+ _specified(_clientAfterClusterTime || _level) {
+ _computedAfterClusterTime = _clientAfterClusterTime;
+}
std::string ReadConcernArgs::toString() const {
return toBSON().toString();
@@ -94,7 +96,7 @@ BSONObj ReadConcernArgs::toBSONInner() const {
}
bool ReadConcernArgs::isEmpty() const {
- return !_afterClusterTime && !_opTime && !_atClusterTime && !_level;
+ return !_computedAfterClusterTime && !_opTime && !_computedAtClusterTime && !_level;
}
bool ReadConcernArgs::isSpecified() const {
@@ -114,11 +116,11 @@ boost::optional<OpTime> ReadConcernArgs::getArgsOpTime() const {
}
boost::optional<LogicalTime> ReadConcernArgs::getArgsAfterClusterTime() const {
- return _afterClusterTime;
+ return _computedAfterClusterTime;
}
boost::optional<LogicalTime> ReadConcernArgs::getArgsAtClusterTime() const {
- return _atClusterTime;
+ return _computedAtClusterTime;
}
Status ReadConcernArgs::initialize(const BSONElement& readConcernElem) {
@@ -158,7 +160,7 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) {
if (!afterClusterTimeStatus.isOK()) {
return afterClusterTimeStatus;
}
- _afterClusterTime = LogicalTime(afterClusterTime);
+ _computedAfterClusterTime = _clientAfterClusterTime = LogicalTime(afterClusterTime);
} else if (fieldName == kAtClusterTimeFieldName) {
Timestamp atClusterTime;
auto atClusterTimeStatus =
@@ -166,7 +168,7 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) {
if (!atClusterTimeStatus.isOK()) {
return atClusterTimeStatus;
}
- _atClusterTime = LogicalTime(atClusterTime);
+ _computedAtClusterTime = _clientAtClusterTime = LogicalTime(atClusterTime);
} else if (fieldName == kLevelFieldName) {
std::string levelString;
// TODO pass field in rather than scanning again.
@@ -201,13 +203,13 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) {
}
}
- if (_afterClusterTime && _opTime) {
+ if (_clientAfterClusterTime && _opTime) {
return Status(ErrorCodes::InvalidOptions,
str::stream() << "Can not specify both " << kAfterClusterTimeFieldName
<< " and " << kAfterOpTimeFieldName);
}
- if (_afterClusterTime && _atClusterTime) {
+ if (_clientAfterClusterTime && _clientAtClusterTime) {
return Status(ErrorCodes::InvalidOptions,
"Specifying a timestamp for readConcern snapshot in a causally consistent "
"session is not allowed. See "
@@ -218,7 +220,7 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) {
// Note: 'available' should not be used with after cluster time, as cluster time can wait for
// replication whereas the premise of 'available' is to avoid waiting. 'linearizable' should not
// be used with after cluster time, since linearizable reads are inherently causally consistent.
- if (_afterClusterTime && getLevel() != ReadConcernLevel::kMajorityReadConcern &&
+ if (_clientAfterClusterTime && getLevel() != ReadConcernLevel::kMajorityReadConcern &&
getLevel() != ReadConcernLevel::kLocalReadConcern &&
getLevel() != ReadConcernLevel::kSnapshotReadConcern) {
return Status(ErrorCodes::InvalidOptions,
@@ -236,7 +238,7 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) {
<< " is equal to " << readConcernLevels::kSnapshotName);
}
- if (_atClusterTime && getLevel() != ReadConcernLevel::kSnapshotReadConcern) {
+ if (_clientAtClusterTime && getLevel() != ReadConcernLevel::kSnapshotReadConcern) {
return Status(ErrorCodes::InvalidOptions,
str::stream() << kAtClusterTimeFieldName << " field can be set only if "
<< kLevelFieldName << " is equal to "
@@ -244,14 +246,14 @@ Status ReadConcernArgs::parse(const BSONObj& readConcernObj) {
}
// Make sure that atClusterTime wasn't specified with zero seconds.
- if (_atClusterTime && _atClusterTime->asTimestamp().isNull()) {
+ if (_clientAtClusterTime && _clientAtClusterTime->asTimestamp().isNull()) {
return Status(ErrorCodes::InvalidOptions,
str::stream() << kAtClusterTimeFieldName << " cannot be a null timestamp");
}
// It's okay for afterClusterTime to be specified with zero seconds, but not an uninitialized
// timestamp.
- if (_afterClusterTime && _afterClusterTime == LogicalTime::kUninitialized) {
+ if (_clientAfterClusterTime && _clientAfterClusterTime == LogicalTime::kUninitialized) {
return Status(ErrorCodes::InvalidOptions,
str::stream() << kAfterClusterTimeFieldName << " cannot be a null timestamp");
}
@@ -290,12 +292,12 @@ void ReadConcernArgs::_appendInfoInner(BSONObjBuilder* builder) const {
_opTime->append(builder, kAfterOpTimeFieldName.toString());
}
- if (_afterClusterTime) {
- builder->append(kAfterClusterTimeFieldName, _afterClusterTime->asTimestamp());
+ if (_computedAfterClusterTime) {
+ builder->append(kAfterClusterTimeFieldName, _computedAfterClusterTime->asTimestamp());
}
- if (_atClusterTime) {
- builder->append(kAtClusterTimeFieldName, _atClusterTime->asTimestamp());
+ if (_computedAtClusterTime) {
+ builder->append(kAtClusterTimeFieldName, _computedAtClusterTime->asTimestamp());
}
_provenance.serialize(builder);
diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h
index ba6004d13bf..2c7aca60282 100644
--- a/src/mongo/db/repl/read_concern_args.h
+++ b/src/mongo/db/repl/read_concern_args.h
@@ -182,13 +182,21 @@ public:
* Set atClusterTime, clear afterClusterTime. The BSON representation becomes
* {level: "snapshot", atClusterTime: <ts>}.
*/
- void setArgsAtClusterTimeForSnapshot(Timestamp ts) {
+ void setArgsAtClusterTime(Timestamp ts) {
invariant(_level && _level == ReadConcernLevel::kSnapshotReadConcern);
// Only overwrite a server-selected atClusterTime, not user-supplied.
- invariant(_atClusterTime.is_initialized() == _atClusterTimeSelected);
- _afterClusterTime = boost::none;
- _atClusterTime = LogicalTime(ts);
- _atClusterTimeSelected = true;
+ invariant(!_clientAtClusterTime);
+ _computedAfterClusterTime = boost::none;
+ _computedAtClusterTime = LogicalTime(ts);
+ }
+
+ /**
+ * If we have selected an atClusterTime for non-transaction snapshot reads, clear it and restore
+ * the atClusterTime and afterClusterTime passed by the client.
+ */
+ void clearArgsAtClusterTime() {
+ _computedAfterClusterTime = _clientAfterClusterTime;
+ _computedAtClusterTime = _clientAtClusterTime;
}
/**
@@ -196,7 +204,7 @@ public:
* function returns false if the atClusterTime was specified by the client.
*/
bool wasAtClusterTimeSelected() const {
- return _atClusterTimeSelected;
+ return _computedAtClusterTime && _computedAtClusterTime != _clientAtClusterTime;
}
private:
@@ -213,11 +221,13 @@ private:
/**
* Read data after cluster-wide cluster time.
*/
- boost::optional<LogicalTime> _afterClusterTime;
+ boost::optional<LogicalTime> _clientAfterClusterTime;
+ boost::optional<LogicalTime> _computedAfterClusterTime;
/**
* Read data at a particular cluster time.
*/
- boost::optional<LogicalTime> _atClusterTime;
+ boost::optional<LogicalTime> _clientAtClusterTime;
+ boost::optional<LogicalTime> _computedAtClusterTime;
boost::optional<ReadConcernLevel> _level;
/**
@@ -233,8 +243,6 @@ private:
bool _specified;
ReadWriteConcernProvenance _provenance;
-
- bool _atClusterTimeSelected = false;
};
} // namespace repl
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp
index 0d1a88bf578..81951cd425c 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.cpp
+++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp
@@ -43,12 +43,19 @@
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/vector_clock.h"
#include "mongo/s/cluster_last_error_info.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/options_parser/startup_option_init.h"
#include "mongo/util/tick_source_mock.h"
namespace mongo {
+const Timestamp ClusterCommandTestFixture::kAfterClusterTime(50, 2);
+
+const Timestamp ClusterCommandTestFixture::kShardClusterTime(51, 3);
+
+const CursorId ClusterCommandTestFixture::kCursorId(123);
+
void ClusterCommandTestFixture::setUp() {
CatalogCacheTestFixture::setUp();
CatalogCacheTestFixture::setupNShards(numShards);
@@ -80,13 +87,24 @@ void ClusterCommandTestFixture::setUp() {
"enableStaleVersionAndSnapshotRetriesWithinTransactions");
}
-BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj, bool includeAfterClusterTime) {
+void ClusterCommandTestFixture::tearDown() {
+ // Delete any cursors left behind by tests, to avoid invariant in ~ClusterCursorManager.
+ auto opCtx = operationContext();
+ auto cursorManager = Grid::get(opCtx)->getCursorManager();
+ cursorManager->killAllCursors(opCtx);
+}
+
+BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj,
+ bool startTransaction,
+ bool includeAfterClusterTime) {
BSONObjBuilder bob(cmdObj);
// Each command runs in a new session.
bob.append("lsid", makeLogicalSessionIdForTest().toBSON());
- bob.append("txnNumber", TxnNumber(1));
- bob.append("autocommit", false);
- bob.append("startTransaction", true);
+ if (startTransaction) {
+ bob.append("txnNumber", TxnNumber(1));
+ bob.append("autocommit", false);
+ bob.append("startTransaction", true);
+ }
BSONObjBuilder readConcernBob = bob.subobjStart(repl::ReadConcernArgs::kReadConcernFieldName);
readConcernBob.append("level", "snapshot");
@@ -98,6 +116,14 @@ BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj, bool includeAfterClu
return bob.obj();
}
+BSONObj ClusterCommandTestFixture::_makeTxnCmd(BSONObj cmdObj, bool includeAfterClusterTime) {
+ return _makeCmd(cmdObj, true, includeAfterClusterTime);
+}
+
+BSONObj ClusterCommandTestFixture::_makeNonTxnCmd(BSONObj cmdObj, bool includeAfterClusterTime) {
+ return _makeCmd(cmdObj, false, includeAfterClusterTime);
+}
+
void ClusterCommandTestFixture::expectReturnsError(ErrorCodes::Error code) {
onCommandForPoolExecutor([code](const executor::RemoteCommandRequest& request) {
BSONObjBuilder resBob;
@@ -227,80 +253,108 @@ void ClusterCommandTestFixture::runTxnCommandMaxErrors(BSONObj cmd,
}
void ClusterCommandTestFixture::testNoErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd) {
-
// Target one shard.
- runCommandSuccessful(_makeCmd(targetedCmd), true);
+ runCommandSuccessful(_makeTxnCmd(targetedCmd), true);
// Target all shards.
if (!scatterGatherCmd.isEmpty()) {
- runCommandSuccessful(_makeCmd(scatterGatherCmd), false);
+ runCommandSuccessful(_makeTxnCmd(scatterGatherCmd), false);
}
}
void ClusterCommandTestFixture::testRetryOnSnapshotError(BSONObj targetedCmd,
BSONObj scatterGatherCmd) {
// Target one shard.
- runTxnCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
- runTxnCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
+ runTxnCommandOneError(_makeTxnCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
+ runTxnCommandOneError(_makeTxnCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
// Target all shards
if (!scatterGatherCmd.isEmpty()) {
- runTxnCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
- runTxnCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
+ runTxnCommandOneError(
+ _makeTxnCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
+ runTxnCommandOneError(_makeTxnCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
}
}
void ClusterCommandTestFixture::testMaxRetriesSnapshotErrors(BSONObj targetedCmd,
BSONObj scatterGatherCmd) {
// Target one shard.
- runTxnCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
- runTxnCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
+ runTxnCommandMaxErrors(_makeTxnCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
+ runTxnCommandMaxErrors(_makeTxnCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
// Target all shards
if (!scatterGatherCmd.isEmpty()) {
- runTxnCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
- runTxnCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
+ runTxnCommandMaxErrors(
+ _makeTxnCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
+ runTxnCommandMaxErrors(_makeTxnCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
}
}
-void ClusterCommandTestFixture::testAttachesAtClusterTimeForSnapshotReadConcern(
- BSONObj targetedCmd, BSONObj scatterGatherCmd) {
+void ClusterCommandTestFixture::testAttachesAtClusterTimeForTxnSnapshotReadConcern(
+ BSONObj targetedCmd, BSONObj scatterGatherCmd, bool createsCursor) {
+
+ // Target one shard.
+ runCommandInspectRequests(_makeTxnCmd(targetedCmd), _containsAtClusterTimeOnly, true);
+ if (createsCursor)
+ _assertCursorReadConcern(true, boost::none);
+
+ // Target all shards.
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandInspectRequests(_makeTxnCmd(scatterGatherCmd), _containsAtClusterTimeOnly, false);
+ if (createsCursor)
+ _assertCursorReadConcern(false, boost::none);
+ }
+}
- auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) {
- ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
- };
+void ClusterCommandTestFixture::testTxnSnapshotReadConcernWithAfterClusterTime(
+ BSONObj targetedCmd, BSONObj scatterGatherCmd, bool createsCursor) {
// Target one shard.
- runCommandInspectRequests(_makeCmd(targetedCmd), containsAtClusterTime, true);
+ runCommandInspectRequests(_makeTxnCmd(targetedCmd, true), _containsAtClusterTimeOnly, true);
+ if (createsCursor)
+ _assertCursorReadConcern(true, boost::none);
// Target all shards.
if (!scatterGatherCmd.isEmpty()) {
- runCommandInspectRequests(_makeCmd(scatterGatherCmd), containsAtClusterTime, false);
+ runCommandInspectRequests(
+ _makeTxnCmd(scatterGatherCmd, true), _containsAtClusterTimeOnly, false);
+ if (createsCursor)
+ _assertCursorReadConcern(false, boost::none);
}
}
-void ClusterCommandTestFixture::testSnapshotReadConcernWithAfterClusterTime(
- BSONObj targetedCmd, BSONObj scatterGatherCmd) {
+void ClusterCommandTestFixture::testAttachesAtClusterTimeForNonTxnSnapshotReadConcern(
+ BSONObj targetedCmd, BSONObj scatterGatherCmd, bool createsCursor) {
+
+ // Target one shard.
+ runCommandInspectRequests(_makeNonTxnCmd(targetedCmd), _omitsClusterTime, true);
+ if (createsCursor)
+ _assertCursorReadConcern(true, kShardClusterTime);
- auto containsAtClusterTimeNoAfterClusterTime =
- [&](const executor::RemoteCommandRequest& request) {
- ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
- ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo());
+ // Target all shards.
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandInspectRequests(
+ _makeNonTxnCmd(scatterGatherCmd), _containsAtClusterTimeOnly, false);
+ if (createsCursor)
+ _assertCursorReadConcern(false, kInMemoryLogicalTime.asTimestamp());
+ }
+}
- // The chosen atClusterTime should be greater than or equal to the request's
- // afterClusterTime.
- ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()),
- LogicalTime(kAfterClusterTime));
- };
+void ClusterCommandTestFixture::testNonTxnSnapshotReadConcernWithAfterClusterTime(
+ BSONObj targetedCmd, BSONObj scatterGatherCmd, bool createsCursor) {
// Target one shard.
runCommandInspectRequests(
- _makeCmd(targetedCmd, true), containsAtClusterTimeNoAfterClusterTime, true);
+ _makeNonTxnCmd(targetedCmd, true), _containsAfterClusterTimeOnly, true);
+ if (createsCursor)
+ _assertCursorReadConcern(true, kShardClusterTime);
// Target all shards.
if (!scatterGatherCmd.isEmpty()) {
runCommandInspectRequests(
- _makeCmd(scatterGatherCmd, true), containsAtClusterTimeNoAfterClusterTime, false);
+ _makeNonTxnCmd(scatterGatherCmd, true), _containsAtClusterTimeOnly, false);
+ if (createsCursor)
+ _assertCursorReadConcern(false, kAfterClusterTime);
}
}
@@ -310,6 +364,73 @@ void ClusterCommandTestFixture::appendTxnResponseMetadata(BSONObjBuilder& bob) {
txnResponseMetadata.serialize(&bob);
}
+void ClusterCommandTestFixture::_assertCursorReadConcern(
+ bool isTargeted, boost::optional<Timestamp> expectedAtClusterTime) {
+ auto opCtx = operationContext();
+ auto cursorManager = Grid::get(opCtx)->getCursorManager();
+ auto cursors =
+ cursorManager->getIdleCursors(opCtx, MongoProcessInterface::CurrentOpUserMode::kIncludeAll);
+ ASSERT_EQUALS(cursors.size(), 1);
+ auto cursorId = *cursors[0].getCursorId();
+ auto pinnedCursor = cursorManager->checkOutCursor(
+ kNss, cursorId, opCtx, [](UserNameIterator) { return Status::OK(); });
+
+ ASSERT_OK(pinnedCursor.getStatus());
+ auto readConcern = pinnedCursor.getValue()->getReadConcern();
+ auto nRemotes = pinnedCursor.getValue()->getNumRemotes();
+ ASSERT_EQUALS(nRemotes, isTargeted ? 1 : 2);
+
+ // User supplies no atClusterTime. If not isTargeted then mongos selected a timestamp and called
+ // setArgsAtClusterTime. Else mongos let the shard select atClusterTime. The shard returned it
+ // to mongos, and mongos called setArgsAtClusterTime.
+ if (expectedAtClusterTime) {
+ ASSERT_EQUALS(readConcern->getArgsAtClusterTime()->asTimestamp(), *expectedAtClusterTime);
+ ASSERT_TRUE(readConcern->wasAtClusterTimeSelected());
+ } else {
+ ASSERT_FALSE(readConcern->getArgsAtClusterTime());
+ ASSERT_FALSE(readConcern->wasAtClusterTimeSelected());
+ }
+
+ // Kill cursor on shard(s) and remove from ClusterCursorManager, to ensure we have exactly 1
+ // idle cursor next time this method is called. (Otherwise this test would need a complex
+ // method for determining which cursor to examine.)
+ pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
+ for (std::size_t i = 0; i < nRemotes; ++i) {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQUALS("killCursors"_sd, request.cmdObj.firstElement().fieldNameStringData());
+ ASSERT_EQUALS(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("ok" << 1);
+ });
+ }
+}
+
+void ClusterCommandTestFixture::_containsSelectedAtClusterTime(
+ const executor::RemoteCommandRequest& request) {
+ ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
+ ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo());
+
+ // The chosen atClusterTime should be greater than or equal to the request's afterClusterTime.
+ ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()),
+ LogicalTime(kAfterClusterTime));
+}
+
+void ClusterCommandTestFixture::_containsAtClusterTimeOnly(
+ const executor::RemoteCommandRequest& request) {
+ ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
+ ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo());
+}
+
+void ClusterCommandTestFixture::_containsAfterClusterTimeOnly(
+ const executor::RemoteCommandRequest& request) {
+ ASSERT(request.cmdObj["readConcern"]["atClusterTime"].eoo());
+ ASSERT(!request.cmdObj["readConcern"]["afterClusterTime"].eoo());
+}
+
+void ClusterCommandTestFixture::_omitsClusterTime(const executor::RemoteCommandRequest& request) {
+ ASSERT(request.cmdObj["readConcern"]["atClusterTime"].eoo());
+ ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo());
+}
+
// Satisfies dependency from StoreSASLOPtions.
MONGO_STARTUP_OPTIONS_STORE(CoreOptions)(InitializerContext*) {
return Status::OK();
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.h b/src/mongo/s/commands/cluster_command_test_fixture.h
index 9c5406c073b..9c0f3f4d5b7 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.h
+++ b/src/mongo/s/commands/cluster_command_test_fixture.h
@@ -31,6 +31,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/cursor_id.h"
#include "mongo/s/catalog_cache_test_fixture.h"
#include "mongo/s/commands/strategy.h"
@@ -46,10 +47,16 @@ protected:
const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(10, 1));
- const Timestamp kAfterClusterTime = Timestamp(50, 2);
+ static const Timestamp kAfterClusterTime;
+
+ static const Timestamp kShardClusterTime;
+
+ static const CursorId kCursorId;
void setUp() override;
+ void tearDown() override;
+
virtual void expectInspectRequest(int shardIndex, InspectionCallback cb) = 0;
virtual void expectReturnsSuccess(int shardIndex) = 0;
@@ -85,17 +92,34 @@ protected:
void testMaxRetriesSnapshotErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj());
/**
- * Verifies that atClusterTime is attached to the given commands.
+ * Verifies that atClusterTime is attached to the given commands in a transaction.
*/
- void testAttachesAtClusterTimeForSnapshotReadConcern(BSONObj targetedCmd,
- BSONObj scatterGatherCmd = BSONObj());
+ void testAttachesAtClusterTimeForTxnSnapshotReadConcern(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd = BSONObj(),
+ bool createsCursor = false);
/**
* Verifies that the chosen atClusterTime is greater than or equal to each command's
- * afterClusterTime.
+ * afterClusterTime in a transaction.
+ */
+ void testTxnSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd = BSONObj(),
+ bool createsCursor = false);
+
+ /**
+ * Verifies that atClusterTime is attached to the given non-transaction snapshot commands.
*/
- void testSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd,
- BSONObj scatterGatherCmd = BSONObj());
+ void testAttachesAtClusterTimeForNonTxnSnapshotReadConcern(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd = BSONObj(),
+ bool createsCursor = false);
+
+ /**
+ * Verifies that the chosen atClusterTime is greater than or equal to each non-transaction
+ * snapshot command's afterClusterTime.
+ */
+ void testNonTxnSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd = BSONObj(),
+ bool createsCursor = false);
/**
* Appends the metadata shards return on responses to transaction statements, such as the
@@ -105,11 +129,36 @@ protected:
private:
/**
- * Makes a new command object from the one given by apppending read concern
- * snapshot and the appropriate transaction options. If includeAfterClusterTime
- * is true, also appends afterClusterTime to the read concern.
+ * Makes a new command object from the one given by appending read concern snapshot and the
+ * appropriate transaction options. If includeAfterClusterTime is true, also appends
+ * afterClusterTime to the read concern.
*/
- BSONObj _makeCmd(BSONObj cmdObj, bool includeAfterClusterTime = false);
+ BSONObj _makeTxnCmd(BSONObj cmdObj, bool includeAfterClusterTime = false);
+
+ /**
+ * Makes a new command object from the one given by appending read concern snapshot. If
+ * includeAfterClusterTime is true, also appends afterClusterTime to the read concern.
+ */
+ BSONObj _makeNonTxnCmd(BSONObj cmdObj, bool includeAfterClusterTime = false);
+
+ /**
+ * Helper method.
+ */
+ BSONObj _makeCmd(BSONObj cmdObj, bool startTransaction, bool includeAfterClusterTime);
+
+ /*
+ * Check that the ClusterCursorManager contains an idle cursor with proper readConcern set.
+ */
+ void _assertCursorReadConcern(bool isTargeted,
+ boost::optional<Timestamp> expectedAtClusterTime);
+
+ static void _containsSelectedAtClusterTime(const executor::RemoteCommandRequest& request);
+
+ static void _containsAtClusterTimeOnly(const executor::RemoteCommandRequest& request);
+
+ static void _containsAfterClusterTimeOnly(const executor::RemoteCommandRequest& request);
+
+ static void _omitsClusterTime(const executor::RemoteCommandRequest& request);
// Enables the transaction router to retry within a transaction on stale version and snapshot
// errors for the duration of each test.
diff --git a/src/mongo/s/commands/cluster_delete_test.cpp b/src/mongo/s/commands/cluster_delete_test.cpp
index 407ce5e78e2..f0fae5cbeb2 100644
--- a/src/mongo/s/commands/cluster_delete_test.cpp
+++ b/src/mongo/s/commands/cluster_delete_test.cpp
@@ -73,11 +73,11 @@ TEST_F(ClusterDeleteTest, NoErrors) {
}
TEST_F(ClusterDeleteTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- testAttachesAtClusterTimeForSnapshotReadConcern(kDeleteCmdTargeted, kDeleteCmdScatterGather);
+ testAttachesAtClusterTimeForTxnSnapshotReadConcern(kDeleteCmdTargeted, kDeleteCmdScatterGather);
}
TEST_F(ClusterDeleteTest, SnapshotReadConcernWithAfterClusterTime) {
- testSnapshotReadConcernWithAfterClusterTime(kDeleteCmdTargeted, kDeleteCmdScatterGather);
+ testTxnSnapshotReadConcernWithAfterClusterTime(kDeleteCmdTargeted, kDeleteCmdScatterGather);
}
} // namespace
diff --git a/src/mongo/s/commands/cluster_distinct_test.cpp b/src/mongo/s/commands/cluster_distinct_test.cpp
index 40f0226e820..d8513f36dc5 100644
--- a/src/mongo/s/commands/cluster_distinct_test.cpp
+++ b/src/mongo/s/commands/cluster_distinct_test.cpp
@@ -80,12 +80,12 @@ TEST_F(ClusterDistinctTest, MaxRetriesSnapshotErrors) {
}
TEST_F(ClusterDistinctTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- testAttachesAtClusterTimeForSnapshotReadConcern(kDistinctCmdTargeted,
- kDistinctCmdScatterGather);
+ testAttachesAtClusterTimeForTxnSnapshotReadConcern(kDistinctCmdTargeted,
+ kDistinctCmdScatterGather);
}
TEST_F(ClusterDistinctTest, SnapshotReadConcernWithAfterClusterTime) {
- testSnapshotReadConcernWithAfterClusterTime(kDistinctCmdTargeted, kDistinctCmdScatterGather);
+ testTxnSnapshotReadConcernWithAfterClusterTime(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
} // namespace
diff --git a/src/mongo/s/commands/cluster_find_and_modify_test.cpp b/src/mongo/s/commands/cluster_find_and_modify_test.cpp
index f0a44494591..8f126039fd3 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_test.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_test.cpp
@@ -78,11 +78,11 @@ TEST_F(ClusterFindAndModifyTest, MaxRetriesSnapshotErrors) {
}
TEST_F(ClusterFindAndModifyTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- testAttachesAtClusterTimeForSnapshotReadConcern(kFindAndModifyCmdTargeted);
+ testAttachesAtClusterTimeForTxnSnapshotReadConcern(kFindAndModifyCmdTargeted);
}
TEST_F(ClusterFindAndModifyTest, SnapshotReadConcernWithAfterClusterTime) {
- testSnapshotReadConcernWithAfterClusterTime(kFindAndModifyCmdTargeted);
+ testTxnSnapshotReadConcernWithAfterClusterTime(kFindAndModifyCmdTargeted);
}
} // namespace
diff --git a/src/mongo/s/commands/cluster_find_test.cpp b/src/mongo/s/commands/cluster_find_test.cpp
index 8d0dc6792d4..7d9e219961d 100644
--- a/src/mongo/s/commands/cluster_find_test.cpp
+++ b/src/mongo/s/commands/cluster_find_test.cpp
@@ -31,34 +31,23 @@
#include "mongo/db/query/cursor_response.h"
#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
namespace mongo {
namespace {
class ClusterFindTest : public ClusterCommandTestFixture {
protected:
- const BSONObj kFindCmdScatterGather = BSON("find"
- << "coll");
- const BSONObj kFindCmdTargeted = BSON("find"
- << "coll"
- << "filter" << BSON("_id" << 0));
+ // Batch size 1, so when expectInspectRequest returns one doc, mongos doesn't wait for more.
+ const BSONObj kFindCmdScatterGather = BSON("find" << kNss.coll() << "batchSize" << 1);
+ const BSONObj kFindCmdTargeted =
+ BSON("find" << kNss.coll() << "filter" << BSON("_id" << 0) << "batchSize" << 1);
// The index of the shard expected to receive the response is used to prevent different shards
// from returning documents with the same shard key. This is expected to be 0 for queries
// targeting one shard.
void expectReturnsSuccess(int shardIndex) override {
- onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
- ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
-
- std::vector<BSONObj> batch = {BSON("_id" << shardIndex)};
- CursorResponse cursorResponse(kNss, CursorId(0), batch);
-
- BSONObjBuilder bob;
- bob.appendElementsUnique(
- cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse));
- appendTxnResponseMetadata(bob);
- return bob.obj();
- });
+ expectInspectRequest(shardIndex, [](const executor::RemoteCommandRequest&) {});
}
void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
@@ -68,8 +57,17 @@ protected:
cb(request);
std::vector<BSONObj> batch = {BSON("_id" << shardIndex)};
- CursorResponse cursorResponse(kNss, CursorId(0), batch);
-
+ // User supplies no atClusterTime. For single-shard non-transaction snapshot reads,
+ // mongos lets the shard (which this function simulates) select a read timestamp.
+ boost::optional<Timestamp> atClusterTime;
+ if (request.cmdObj["txnNumber"].eoo() && !request.cmdObj["readConcern"].eoo()) {
+ auto rc = request.cmdObj["readConcern"].Obj();
+ if (rc["level"].String() == "snapshot" && rc["atClusterTime"].eoo()) {
+ atClusterTime = kShardClusterTime;
+ }
+ }
+
+ CursorResponse cursorResponse(kNss, kCursorId, batch, atClusterTime);
BSONObjBuilder bob;
bob.appendElementsUnique(
cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse));
@@ -91,12 +89,23 @@ TEST_F(ClusterFindTest, MaxRetriesSnapshotErrors) {
testMaxRetriesSnapshotErrors(kFindCmdTargeted, kFindCmdScatterGather);
}
-TEST_F(ClusterFindTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- testAttachesAtClusterTimeForSnapshotReadConcern(kFindCmdTargeted, kFindCmdScatterGather);
+TEST_F(ClusterFindTest, AttachesAtClusterTimeForTransactionSnapshotReadConcern) {
+ testAttachesAtClusterTimeForTxnSnapshotReadConcern(
+ kFindCmdTargeted, kFindCmdScatterGather, true);
+}
+
+TEST_F(ClusterFindTest, TransactionSnapshotReadConcernWithAfterClusterTime) {
+ testTxnSnapshotReadConcernWithAfterClusterTime(kFindCmdTargeted, kFindCmdScatterGather, true);
+}
+
+TEST_F(ClusterFindTest, AttachesAtClusterTimeForNonTransactionSnapshotReadConcern) {
+ testAttachesAtClusterTimeForNonTxnSnapshotReadConcern(
+ kFindCmdTargeted, kFindCmdScatterGather, true);
}
-TEST_F(ClusterFindTest, SnapshotReadConcernWithAfterClusterTime) {
- testSnapshotReadConcernWithAfterClusterTime(kFindCmdTargeted, kFindCmdScatterGather);
+TEST_F(ClusterFindTest, NonTransactionSnapshotReadConcernWithAfterClusterTime) {
+ testNonTxnSnapshotReadConcernWithAfterClusterTime(
+ kFindCmdTargeted, kFindCmdScatterGather, true);
}
} // namespace
diff --git a/src/mongo/s/commands/cluster_insert_test.cpp b/src/mongo/s/commands/cluster_insert_test.cpp
index ff5fb075486..24544eeb8d4 100644
--- a/src/mongo/s/commands/cluster_insert_test.cpp
+++ b/src/mongo/s/commands/cluster_insert_test.cpp
@@ -73,11 +73,11 @@ TEST_F(ClusterInsertTest, NoErrors) {
}
TEST_F(ClusterInsertTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- testAttachesAtClusterTimeForSnapshotReadConcern(kInsertCmdTargeted, kInsertCmdScatterGather);
+ testAttachesAtClusterTimeForTxnSnapshotReadConcern(kInsertCmdTargeted, kInsertCmdScatterGather);
}
TEST_F(ClusterInsertTest, SnapshotReadConcernWithAfterClusterTime) {
- testSnapshotReadConcernWithAfterClusterTime(kInsertCmdTargeted, kInsertCmdScatterGather);
+ testTxnSnapshotReadConcernWithAfterClusterTime(kInsertCmdTargeted, kInsertCmdScatterGather);
}
} // namespace
diff --git a/src/mongo/s/commands/cluster_update_test.cpp b/src/mongo/s/commands/cluster_update_test.cpp
index 53aee707522..9ee788c2a1e 100644
--- a/src/mongo/s/commands/cluster_update_test.cpp
+++ b/src/mongo/s/commands/cluster_update_test.cpp
@@ -77,11 +77,11 @@ TEST_F(ClusterUpdateTest, NoErrors) {
}
TEST_F(ClusterUpdateTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- testAttachesAtClusterTimeForSnapshotReadConcern(kUpdateCmdTargeted, kUpdateCmdScatterGather);
+ testAttachesAtClusterTimeForTxnSnapshotReadConcern(kUpdateCmdTargeted, kUpdateCmdScatterGather);
}
TEST_F(ClusterUpdateTest, SnapshotReadConcernWithAfterClusterTime) {
- testSnapshotReadConcernWithAfterClusterTime(kUpdateCmdTargeted, kUpdateCmdScatterGather);
+ testTxnSnapshotReadConcernWithAfterClusterTime(kUpdateCmdTargeted, kUpdateCmdScatterGather);
}
} // namespace
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 39763246f92..c1c7a18e106 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -632,7 +632,7 @@ void runCommand(OperationContext* opCtx,
}
return latestKnownClusterTime.asTimestamp();
}();
- readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime);
+ readConcernArgs.setArgsAtClusterTime(atClusterTime);
}
replyBuilder->reset();
@@ -822,7 +822,8 @@ void runCommand(OperationContext* opCtx,
abortGuard.dismiss();
continue;
- } else if (!ReadConcernArgs::get(opCtx).wasAtClusterTimeSelected()) {
+ } else if (auto rc = ReadConcernArgs::get(opCtx);
+ rc.getArgsAtClusterTime() && !rc.wasAtClusterTimeSelected()) {
// Non-transaction snapshot read. The client sent readConcern: {level:
// "snapshot", atClusterTime: T}, where T is older than
// minSnapshotHistoryWindowInSeconds, retrying won't succeed.
diff --git a/src/mongo/s/query/cluster_aggregate_test.cpp b/src/mongo/s/query/cluster_aggregate_test.cpp
index 08bcc8081e9..3bb16f7fb0d 100644
--- a/src/mongo/s/query/cluster_aggregate_test.cpp
+++ b/src/mongo/s/query/cluster_aggregate_test.cpp
@@ -121,12 +121,13 @@ TEST_F(ClusterAggregateTest, MaxRetriesSnapshotErrors) {
}
TEST_F(ClusterAggregateTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- testAttachesAtClusterTimeForSnapshotReadConcern(kAggregateCmdTargeted,
- kAggregateCmdScatterGather);
+ testAttachesAtClusterTimeForTxnSnapshotReadConcern(kAggregateCmdTargeted,
+ kAggregateCmdScatterGather);
}
TEST_F(ClusterAggregateTest, SnapshotReadConcernWithAfterClusterTime) {
- testSnapshotReadConcernWithAfterClusterTime(kAggregateCmdTargeted, kAggregateCmdScatterGather);
+ testTxnSnapshotReadConcernWithAfterClusterTime(kAggregateCmdTargeted,
+ kAggregateCmdScatterGather);
}
TEST_F(ClusterAggregateTest, ShouldFailWhenFromMongosIsTrue) {
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 422c02406da..380143ed612 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -188,8 +188,9 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards(
}
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (readConcernArgs.wasAtClusterTimeSelected()) {
- // If mongos selected atClusterTime or received it from client, transmit it to shard.
+ // mongos selects atClusterTime for multi-shard snapshot reads. For a single-shard read, let the
+ // shard select it.
+ if (readConcernArgs.wasAtClusterTimeSelected() && shardIds.size() > 1) {
qrToForward->setReadConcern(readConcernArgs.toBSONInner());
}
@@ -246,9 +247,11 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
query.getQueryRequest().getFilter(),
query.getQueryRequest().getCollation());
+ auto& readConcern = ReadConcernArgs::get(opCtx);
+
// Construct the query and parameters. Defer setting skip and limit here until
// we determine if the query is targeting multi-shards or a single shard below.
- ClusterClientCursorParams params(query.nss(), readPref, ReadConcernArgs::get(opCtx));
+ ClusterClientCursorParams params(query.nss(), readPref, readConcern);
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.batchSize = query.getQueryRequest().getEffectiveBatchSize();
params.tailableMode = query.getQueryRequest().getTailableMode();
@@ -262,6 +265,12 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
params.isAutoCommit = false;
}
+ // For single-shard non-transaction snapshot reads, we pass readConcern level "snapshot" to
+ // the shard. It selects "atClusterTime" and replies with it. Store it on the mongos cursor.
+ if (shardIds.size() == 1 && readConcern.wasAtClusterTimeSelected()) {
+ params.readConcern->clearArgsAtClusterTime();
+ }
+
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
// usually use the batchSize associated with the initial find, but as it is illegal to send a
// getMore with a batchSize of 0, we set it to use the default batchSize logic.
@@ -323,13 +332,22 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
: ClusterCursorManager::CursorType::SingleTarget;
// Only set skip, limit and sort to be applied to on the router for the multi-shard case. For
- // the single-shard case skip/limit as well as sorts are appled on mongod.
+ // the single-shard case skip/limit as well as sorts are applied on mongod.
if (cursorType == ClusterCursorManager::CursorType::MultiTarget) {
const auto qr = query.getQueryRequest();
params.skipToApplyOnRouter = qr.getSkip();
params.limit = qr.getLimit();
params.sortToApplyOnRouter = sortComparatorObj;
params.compareWholeSortKeyOnRouter = compareWholeSortKeyOnRouter;
+ } else if (!params.remotes.empty()) {
+ // For single-shard non-transaction snapshot reads, we pass readConcern level "snapshot" to
+ // the shard. It selects "atClusterTime" and replies with it. Store it on the mongos cursor
+ // and opCtx readConcern, whence it is returned to the caller in our reply's atClusterTime.
+ auto clusterTime = params.remotes[0].getCursorResponse().getAtClusterTime();
+ if (clusterTime && !params.readConcern->getArgsAtClusterTime()) {
+ params.readConcern->setArgsAtClusterTime(*clusterTime);
+ readConcern.setArgsAtClusterTime(*clusterTime);
+ }
}
// Transfer the established cursors to a ClusterClientCursor.