summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2020-05-07 13:11:37 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-07 17:25:48 +0000
commitb17baeb4a77d8400e0283db06ae3a959e05b2560 (patch)
tree5a2a17ba72af56349fcfe79d3ae03ad6f9b0ae1c /src
parentdd5aab7183ab8342d6c3bd664932dc32d43050a6 (diff)
downloadmongo-b17baeb4a77d8400e0283db06ae3a959e05b2560.tar.gz
SERVER-47690 Snapshot reads via mongos
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/query/cursor_response.cpp15
-rw-r--r--src/mongo/db/query/cursor_response.h9
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp9
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp16
-rw-r--r--src/mongo/s/commands/cluster_distinct_cmd.cpp5
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp1
-rw-r--r--src/mongo/s/commands/strategy.cpp25
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp83
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp5
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h5
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h12
-rw-r--r--src/mongo/s/query/cluster_find.cpp16
-rw-r--r--src/mongo/s/query/establish_cursors.cpp5
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp9
18 files changed, 171 insertions, 56 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 3cd99d7e806..4b3eea43ee8 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -129,6 +129,7 @@ void appendGetMoreResponseObject(long long cursorId,
CursorResponse::CursorResponse(NamespaceString nss,
CursorId cursorId,
std::vector<BSONObj> batch,
+ boost::optional<Timestamp> atClusterTime,
boost::optional<long long> numReturnedSoFar,
boost::optional<BSONObj> postBatchResumeToken,
boost::optional<BSONObj> writeConcernError,
@@ -136,6 +137,7 @@ CursorResponse::CursorResponse(NamespaceString nss,
: _nss(std::move(nss)),
_cursorId(cursorId),
_batch(std::move(batch)),
+ _atClusterTime(std::move(atClusterTime)),
_numReturnedSoFar(numReturnedSoFar),
_postBatchResumeToken(std::move(postBatchResumeToken)),
_writeConcernError(std::move(writeConcernError)),
@@ -235,6 +237,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
<< postBatchResumeTokenElem.type()};
}
+ auto atClusterTimeElem = cursorObj[kAtClusterTimeField];
+ if (atClusterTimeElem && atClusterTimeElem.type() != BSONType::bsonTimestamp) {
+ return {ErrorCodes::BadValue,
+ str::stream() << kAtClusterTimeField
+ << " format is invalid; expected Timestamp, but found: "
+ << atClusterTimeElem.type()};
+ }
+
auto partialResultsReturned = cursorObj[kPartialResultsReturnedField];
if (partialResultsReturned) {
@@ -257,6 +267,7 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
return {{NamespaceString(fullns),
cursorId,
std::move(batch),
+ atClusterTimeElem ? atClusterTimeElem.timestamp() : boost::optional<Timestamp>{},
boost::none,
postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned()
: boost::optional<BSONObj>{},
@@ -283,6 +294,10 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken);
}
+ if (_atClusterTime) {
+ cursorBuilder.append(kAtClusterTimeField, *_atClusterTime);
+ }
+
if (_partialResultsReturned) {
cursorBuilder.append(kPartialResultsReturnedField, true);
}
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 5e03d2c5a3e..7e7633b14c1 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -50,6 +50,9 @@ class CursorResponseBuilder {
public:
/**
* Structure used to configure the CursorResponseBuilder.
+ *
+ * If we selected atClusterTime or received it from the client, transmit it back to the client
+ * in the cursor reply document by setting it here.
*/
struct Options {
bool isInitialResponse = false;
@@ -192,6 +195,7 @@ public:
CursorResponse(NamespaceString nss,
CursorId cursorId,
std::vector<BSONObj> batch,
+ boost::optional<Timestamp> atClusterTime = boost::none,
boost::optional<long long> numReturnedSoFar = boost::none,
boost::optional<BSONObj> postBatchResumeToken = boost::none,
boost::optional<BSONObj> writeConcernError = boost::none,
@@ -232,6 +236,10 @@ public:
return _writeConcernError;
}
+ boost::optional<Timestamp> getAtClusterTime() const {
+ return _atClusterTime;
+ }
+
bool getPartialResultsReturned() const {
return _partialResultsReturned;
}
@@ -249,6 +257,7 @@ private:
NamespaceString _nss;
CursorId _cursorId;
std::vector<BSONObj> _batch;
+ boost::optional<Timestamp> _atClusterTime;
boost::optional<long long> _numReturnedSoFar;
boost::optional<BSONObj> _postBatchResumeToken;
boost::optional<BSONObj> _writeConcernError;
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index db1a70b6a74..a3bc5449ac7 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -300,6 +300,7 @@ TEST(CursorResponseTest, toBSONPartialResultsReturned) {
boost::none,
boost::none,
boost::none,
+ boost::none,
true);
BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse);
BSONObj expectedResponse = BSON(
@@ -347,8 +348,12 @@ TEST(CursorResponseTest, serializePostBatchResumeToken) {
std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
auto postBatchResumeToken =
ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson();
- CursorResponse response(
- NamespaceString("db.coll"), CursorId(123), batch, boost::none, postBatchResumeToken);
+ CursorResponse response(NamespaceString("db.coll"),
+ CursorId(123),
+ batch,
+ boost::none,
+ boost::none,
+ postBatchResumeToken);
auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse);
ASSERT_BSONOBJ_EQ(serialized,
BSON("cursor" << BSON("id" << CursorId(123) << "ns"
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index 62b1bf834f9..f7f24aa31c2 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -267,6 +267,7 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx,
BSONObjBuilder output;
bool seenReadConcern = false;
bool seenWriteConcern = false;
+ const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
for (const auto& elem : cmdObj) {
const auto name = elem.fieldNameStringData();
if (appendRC && name == repl::ReadConcernArgs::kReadConcernFieldName) {
@@ -276,13 +277,18 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx,
seenWriteConcern = true;
}
if (!output.hasField(name)) {
- output.append(elem);
+ // If mongos selected atClusterTime, forward it to the shard.
+ if (name == repl::ReadConcernArgs::kReadConcernFieldName &&
+ readConcernArgs.wasAtClusterTimeSelected()) {
+ output.appendElements(readConcernArgs.toBSON());
+ } else {
+ output.append(elem);
+ }
}
}
// Finally, add the new read/write concern.
if (appendRC && !seenReadConcern) {
- const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
output.appendElements(readConcernArgs.toBSON());
}
if (appendWC && !seenWriteConcern) {
@@ -721,6 +727,12 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
auto catalogCache = Grid::get(opCtx)->catalogCache();
invariant(catalogCache);
+ auto argsAtClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
+ if (argsAtClusterTime) {
+ return catalogCache->getCollectionRoutingInfoAt(
+ opCtx, nss, argsAtClusterTime->asTimestamp());
+ }
+
// Return the latest routing table if not running in a transaction with snapshot level read
// concern.
auto txnRouter = TransactionRouter::get(opCtx);
diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp
index 0e02d3169a1..32d0ffe4b1e 100644
--- a/src/mongo/s/commands/cluster_distinct_cmd.cpp
+++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp
@@ -250,6 +250,11 @@ public:
}
result.appendArray("values", b.obj());
+ // If mongos selected atClusterTime or received it from client, transmit it back.
+ if (repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()) {
+ result.append("atClusterTime"_sd,
+ repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()->asTimestamp());
+ }
return true;
}
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index 66cdf67b00f..7ebab369391 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -230,6 +230,7 @@ public:
// Build the response document.
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
+ options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
CursorResponseBuilder firstBatch(result, options);
for (const auto& obj : batch) {
firstBatch.append(obj);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index d1dd14d1a70..6b70018dfd8 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -252,10 +252,21 @@ void execCommandClient(OperationContext* opCtx,
}
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
- uassert(ErrorCodes::InvalidOptions,
- "read concern snapshot is only supported in a multi-statement transaction",
- TransactionRouter::get(opCtx));
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern &&
+ !TransactionRouter::get(opCtx) && !readConcernArgs.getArgsAtClusterTime()) {
+ // Select the latest known clusterTime as the atClusterTime for snapshot reads outside
+ // of transactions.
+ auto atClusterTime = [&] {
+ auto latestKnownClusterTime = LogicalClock::get(opCtx)->getClusterTime();
+ // If the user passed afterClusterTime, the chosen time must be greater than or
+ // equal to it.
+ auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime();
+ if (afterClusterTime && *afterClusterTime > latestKnownClusterTime) {
+ return afterClusterTime->asTimestamp();
+ }
+ return latestKnownClusterTime.asTimestamp();
+ }();
+ readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime);
}
// attach tracking
@@ -401,12 +412,6 @@ void runCommand(OperationContext* opCtx,
return;
}
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
- uassert(ErrorCodes::InvalidOptions,
- "read concern snapshot is not supported with atClusterTime on mongos",
- !readConcernArgs.getArgsAtClusterTime());
- }
-
boost::optional<RouterOperationContextSession> routerSession;
try {
rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth());
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 49ca39fed20..ff930da893d 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -1348,7 +1348,8 @@ DEATH_TEST_REGEX_F(
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
- CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor)));
+ CursorResponse(
+ kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor)));
// Create a second cursor whose initial batch has no PBRT.
cursors.push_back(
makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {})));
@@ -1381,7 +1382,8 @@ DEATH_TEST_REGEX_F(AsyncResultsMergerTest,
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
- CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor)));
+ CursorResponse(
+ kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor)));
params.setRemotes(std::move(cursors));
params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
params.setSort(change_stream_constants::kSortSpec);
@@ -1409,11 +1411,13 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfRemoteHasLowerPostB
cursors.push_back(makeRemoteCursor(
kTestShardIds[0],
kTestShardHosts[0],
- CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor)));
+ CursorResponse(
+ kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor)));
auto tooLowPBRT = makePostBatchResumeToken(Timestamp(1, 2));
- cursors.push_back(makeRemoteCursor(kTestShardIds[1],
- kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT)));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1],
+ kTestShardHosts[1],
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT)));
params.setRemotes(std::move(cursors));
params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
params.setSort(change_stream_constants::kSortSpec);
@@ -1459,7 +1463,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
<< firstDocSortKey.firstElement().String() << "'}]}");
std::vector<BSONObj> batch1 = {firstCursorResponse};
auto firstDoc = batch1.front();
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor);
+ responses.emplace_back(
+ kTestNss, CursorId(123), batch1, boost::none, boost::none, pbrtFirstCursor);
scheduleNetworkResponses(std::move(responses));
// Should be ready now.
@@ -1471,7 +1476,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
newCursors.push_back(
makeRemoteCursor(kTestShardIds[1],
kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT)));
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT)));
arm->addNewShardCursors(std::move(newCursors));
// Now shouldn't be ready, our guarantee from the new shard isn't sufficiently advanced.
@@ -1488,7 +1493,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting)
<< secondDocSortKey.firstElement().String() << "'}]}");
std::vector<BSONObj> batch2 = {secondCursorResponse};
auto secondDoc = batch2.front();
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor);
+ responses.emplace_back(
+ kTestNss, CursorId(456), batch2, boost::none, boost::none, pbrtSecondCursor);
scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1536,7 +1542,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
<< "', documentKey: {_id: 1}}, $sortKey: [{_data: '"
<< firstDocSortKey.firstElement().String() << "'}]}");
std::vector<BSONObj> batch1 = {firstCursorResponse};
- responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor);
+ responses.emplace_back(
+ kTestNss, CursorId(123), batch1, boost::none, boost::none, pbrtFirstCursor);
scheduleNetworkResponses(std::move(responses));
// Should be ready now.
@@ -1548,7 +1555,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
newCursors.push_back(
makeRemoteCursor(kTestShardIds[1],
kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT)));
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT)));
arm->addNewShardCursors(std::move(newCursors));
// Now shouldn't be ready, our guarantee from the new shard isn't sufficiently advanced.
@@ -1566,7 +1573,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting
std::vector<BSONObj> batch2 = {secondCursorResponse};
// The last observed time should still be later than the first shard, so we can get the data
// from it.
- responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor);
+ responses.emplace_back(
+ kTestNss, CursorId(456), batch2, boost::none, boost::none, pbrtSecondCursor);
scheduleNetworkResponses(std::move(responses));
executor()->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -1594,20 +1602,20 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey)
std::vector<RemoteCursor> cursors;
// Create three cursors with empty initial batches. Each batch has a PBRT.
auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[0],
- kTestShardHosts[0],
- CursorResponse(kTestNss, 123, {}, boost::none, pbrtFirstCursor)));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[0],
+ kTestShardHosts[0],
+ CursorResponse(kTestNss, 123, {}, boost::none, boost::none, pbrtFirstCursor)));
auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 1));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[1],
- kTestShardHosts[1],
- CursorResponse(kTestNss, 456, {}, boost::none, pbrtSecondCursor)));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[1],
+ kTestShardHosts[1],
+ CursorResponse(kTestNss, 456, {}, boost::none, boost::none, pbrtSecondCursor)));
auto pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 4));
- cursors.push_back(
- makeRemoteCursor(kTestShardIds[2],
- kTestShardHosts[2],
- CursorResponse(kTestNss, 789, {}, boost::none, pbrtThirdCursor)));
+ cursors.push_back(makeRemoteCursor(
+ kTestShardIds[2],
+ kTestShardHosts[2],
+ CursorResponse(kTestNss, 789, {}, boost::none, boost::none, pbrtThirdCursor)));
params.setRemotes(std::move(cursors));
params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
params.setSort(change_stream_constants::kSortSpec);
@@ -1625,26 +1633,35 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey)
// each cursor to be updated in-order, so we keep the first and third PBRTs constant.
pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 3));
std::vector<BSONObj> emptyBatch = {};
- scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor});
- scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor});
- scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor);
ASSERT_FALSE(arm->ready());
// Advance the second cursor again, so that it surpasses the other two. The third cursor becomes
// the new high water mark.
pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6));
- scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor});
- scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor});
- scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtThirdCursor);
ASSERT_FALSE(arm->ready());
// Advance the third cursor such that the first cursor becomes the high water mark.
pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 7));
- scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor});
- scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor});
- scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor});
+ scheduleNetworkResponse(
+ {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor});
ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtFirstCursor);
ASSERT_FALSE(arm->ready());
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 56d865508c6..6221b1809c4 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -243,7 +243,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
const PrivilegeVector& privileges) {
- ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
+ ClusterClientCursorParams params(
+ requestedNss, ReadPreferenceSetting::get(opCtx), ReadConcernArgs::get(opCtx));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = pipelineForMerging->getContext()->tailableMode;
@@ -267,7 +268,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
rpc::OpMsgReplyBuilder replyBuilder;
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
-
+ options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
CursorResponseBuilder responseBuilder(&replyBuilder, options);
bool stashedResult = false;
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 31d4211570f..6dc1ae8419e 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -181,6 +181,11 @@ public:
virtual boost::optional<ReadPreferenceSetting> getReadPreference() const = 0;
/**
+ * Returns the readConcern for this cursor.
+ */
+ virtual boost::optional<ReadConcernArgs> getReadConcern() const = 0;
+
+ /**
* Returns the creation date of the cursor.
*/
virtual Date_t getCreatedDate() const = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index c6f18579bf1..fefd913f8a7 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -209,6 +209,10 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
return _params.readPreference;
}
+boost::optional<ReadConcernArgs> ClusterClientCursorImpl::getReadConcern() const {
+ return _params.readConcern;
+}
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
OperationContext* opCtx,
std::shared_ptr<executor::TaskExecutor> executor,
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 91a9c4455f3..23f1c351fda 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -104,6 +104,8 @@ public:
boost::optional<ReadPreferenceSetting> getReadPreference() const final;
+ boost::optional<ReadConcernArgs> getReadConcern() const final;
+
Date_t getCreatedDate() const final;
Date_t getLastUseDate() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 958a909687c..e6f86dccade 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -153,4 +153,8 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorMock::getReadPreferenc
return boost::none;
}
+boost::optional<ReadConcernArgs> ClusterClientCursorMock::getReadConcern() const {
+ return boost::none;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index c86f66315b6..f72551a24da 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -92,6 +92,8 @@ public:
boost::optional<ReadPreferenceSetting> getReadPreference() const final;
+ boost::optional<ReadConcernArgs> getReadConcern() const final;
+
Date_t getCreatedDate() const final;
Date_t getLastUseDate() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 24ec074376f..cd6563f842c 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/tailable_mode.h"
+#include "mongo/db/repl/read_concern_args.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/util/net/hostandport.h"
@@ -55,6 +56,8 @@ class TaskExecutor;
class OperationContext;
class RouterExecStage;
+using repl::ReadConcernArgs;
+
/**
* The resulting ClusterClientCursor will take ownership of the existing remote cursor, generating
* results based on the cursor's current state.
@@ -65,11 +68,15 @@ class RouterExecStage;
*/
struct ClusterClientCursorParams {
ClusterClientCursorParams(NamespaceString nss,
- boost::optional<ReadPreferenceSetting> readPref = boost::none)
+ boost::optional<ReadPreferenceSetting> readPref = boost::none,
+ boost::optional<ReadConcernArgs> readConcernArgs = boost::none)
: nsString(std::move(nss)) {
if (readPref) {
readPreference = std::move(readPref.get());
}
+ if (readConcernArgs) {
+ readConcern = std::move(readConcernArgs.get());
+ }
}
/**
@@ -143,6 +150,9 @@ struct ClusterClientCursorParams {
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
+ // Set if a readConcern must be respected throughout the lifetime of the cursor.
+ boost::optional<ReadConcernArgs> readConcern;
+
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index e3bbfb5d995..422c02406da 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -187,6 +187,12 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards(
qrToForward = std::make_unique<QueryRequest>(query.getQueryRequest());
}
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ if (readConcernArgs.wasAtClusterTimeSelected()) {
+ // If mongos selected atClusterTime or received it from client, transmit it to shard.
+ qrToForward->setReadConcern(readConcernArgs.toBSONInner());
+ }
+
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
std::vector<std::pair<ShardId, BSONObj>> requests;
for (const auto& shardId : shardIds) {
@@ -242,7 +248,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// Construct the query and parameters. Defer setting skip and limit here until
// we determine if the query is targeting multi-shards or a single shard below.
- ClusterClientCursorParams params(query.nss(), readPref);
+ ClusterClientCursorParams params(query.nss(), readPref, ReadConcernArgs::get(opCtx));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.batchSize = query.getQueryRequest().getEffectiveBatchSize();
params.tailableMode = query.getQueryRequest().getTailableMode();
@@ -429,6 +435,11 @@ Status setUpOperationContextStateForGetMore(OperationContext* opCtx,
ReadPreferenceSetting::get(opCtx) = *readPref;
}
+ if (auto readConcern = cursor->getReadConcern()) {
+ // Used to return "atClusterTime" in cursor replies to clients for snapshot reads.
+ ReadConcernArgs::get(opCtx) = *readConcern;
+ }
+
// If the originating command had a 'comment' field, we extract it and set it on opCtx. Note
// that if the 'getMore' command itself has a 'comment' field, we give precedence to it.
auto comment = cursor->getOriginatingCommand()["comment"];
@@ -835,9 +846,12 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
"waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch");
}
+ auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
return CursorResponse(request.nss,
idToReturn,
std::move(batch),
+ atClusterTime ? atClusterTime->asTimestamp()
+ : boost::optional<Timestamp>{},
startingFrom,
postBatchResumeToken,
boost::none,
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 8217de1e2ec..3b0bbf4e85c 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -179,8 +179,9 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
// This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an
// empty HostAndPort, and which has the 'partialResultsReturned' flag set to true.
- remoteCursors.push_back(
- {response.shardId.toString(), {}, {nss, CursorId{0}, {}, {}, {}, {}, true}});
+ remoteCursors.push_back({response.shardId.toString(),
+ {},
+ {nss, CursorId{0}, {}, {}, {}, {}, {}, true}});
}
}
return remoteCursors;
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 9877af451a2..b76a47cddbe 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -99,7 +99,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
return cmdResult;
}
- ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS());
+ ClusterClientCursorParams params(
+ incomingCursorResponse.getValue().getNSS(), boost::none, ReadConcernArgs::get(opCtx));
params.remotes.emplace_back();
auto& remoteCursor = params.remotes.back();
remoteCursor.setShardId(shardId.toString());
@@ -136,8 +137,10 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
CurOp::get(opCtx)->debug().cursorid = clusterCursorId.getValue();
- CursorResponse outgoingCursorResponse(
- requestedNss, clusterCursorId.getValue(), incomingCursorResponse.getValue().getBatch());
+ CursorResponse outgoingCursorResponse(requestedNss,
+ clusterCursorId.getValue(),
+ incomingCursorResponse.getValue().getBatch(),
+ incomingCursorResponse.getValue().getAtClusterTime());
return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse);
}