diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2020-05-07 13:11:37 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-07 17:25:48 +0000 |
commit | b17baeb4a77d8400e0283db06ae3a959e05b2560 (patch) | |
tree | 5a2a17ba72af56349fcfe79d3ae03ad6f9b0ae1c /src | |
parent | dd5aab7183ab8342d6c3bd664932dc32d43050a6 (diff) | |
download | mongo-b17baeb4a77d8400e0283db06ae3a959e05b2560.tar.gz |
SERVER-47690 Snapshot reads via mongos
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/query/cursor_response.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.h | 9 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/cluster_commands_helpers.cpp | 16 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_distinct_cmd.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_cmd.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 25 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 83 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor.h | 5 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_mock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_params.h | 12 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 16 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/query/store_possible_cursor.cpp | 9 |
18 files changed, 171 insertions, 56 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 3cd99d7e806..4b3eea43ee8 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -129,6 +129,7 @@ void appendGetMoreResponseObject(long long cursorId, CursorResponse::CursorResponse(NamespaceString nss, CursorId cursorId, std::vector<BSONObj> batch, + boost::optional<Timestamp> atClusterTime, boost::optional<long long> numReturnedSoFar, boost::optional<BSONObj> postBatchResumeToken, boost::optional<BSONObj> writeConcernError, @@ -136,6 +137,7 @@ CursorResponse::CursorResponse(NamespaceString nss, : _nss(std::move(nss)), _cursorId(cursorId), _batch(std::move(batch)), + _atClusterTime(std::move(atClusterTime)), _numReturnedSoFar(numReturnedSoFar), _postBatchResumeToken(std::move(postBatchResumeToken)), _writeConcernError(std::move(writeConcernError)), @@ -235,6 +237,14 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo << postBatchResumeTokenElem.type()}; } + auto atClusterTimeElem = cursorObj[kAtClusterTimeField]; + if (atClusterTimeElem && atClusterTimeElem.type() != BSONType::bsonTimestamp) { + return {ErrorCodes::BadValue, + str::stream() << kAtClusterTimeField + << " format is invalid; expected Timestamp, but found: " + << atClusterTimeElem.type()}; + } + auto partialResultsReturned = cursorObj[kPartialResultsReturnedField]; if (partialResultsReturned) { @@ -257,6 +267,7 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo return {{NamespaceString(fullns), cursorId, std::move(batch), + atClusterTimeElem ? atClusterTimeElem.timestamp() : boost::optional<Timestamp>{}, boost::none, postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned() : boost::optional<BSONObj>{}, @@ -283,6 +294,10 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken); } + if (_atClusterTime) { + cursorBuilder.append(kAtClusterTimeField, *_atClusterTime); + } + if (_partialResultsReturned) { cursorBuilder.append(kPartialResultsReturnedField, true); } diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index 5e03d2c5a3e..7e7633b14c1 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -50,6 +50,9 @@ class CursorResponseBuilder { public: /** * Structure used to configure the CursorResponseBuilder. + * + * If we selected atClusterTime or received it from the client, transmit it back to the client + * in the cursor reply document by setting it here. */ struct Options { bool isInitialResponse = false; @@ -192,6 +195,7 @@ public: CursorResponse(NamespaceString nss, CursorId cursorId, std::vector<BSONObj> batch, + boost::optional<Timestamp> atClusterTime = boost::none, boost::optional<long long> numReturnedSoFar = boost::none, boost::optional<BSONObj> postBatchResumeToken = boost::none, boost::optional<BSONObj> writeConcernError = boost::none, @@ -232,6 +236,10 @@ public: return _writeConcernError; } + boost::optional<Timestamp> getAtClusterTime() const { + return _atClusterTime; + } + bool getPartialResultsReturned() const { return _partialResultsReturned; } @@ -249,6 +257,7 @@ private: NamespaceString _nss; CursorId _cursorId; std::vector<BSONObj> _batch; + boost::optional<Timestamp> _atClusterTime; boost::optional<long long> _numReturnedSoFar; boost::optional<BSONObj> _postBatchResumeToken; boost::optional<BSONObj> _writeConcernError; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index db1a70b6a74..a3bc5449ac7 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -300,6 +300,7 @@ TEST(CursorResponseTest, toBSONPartialResultsReturned) { boost::none, boost::none, boost::none, + boost::none, true); BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse); BSONObj expectedResponse = BSON( @@ -347,8 +348,12 @@ TEST(CursorResponseTest, serializePostBatchResumeToken) { std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; auto postBatchResumeToken = ResumeToken::makeHighWaterMarkToken(Timestamp(1, 2)).toDocument().toBson(); - CursorResponse response( - NamespaceString("db.coll"), CursorId(123), batch, boost::none, postBatchResumeToken); + CursorResponse response(NamespaceString("db.coll"), + CursorId(123), + batch, + boost::none, + boost::none, + postBatchResumeToken); auto serialized = response.toBSON(CursorResponse::ResponseType::SubsequentResponse); ASSERT_BSONOBJ_EQ(serialized, BSON("cursor" << BSON("id" << CursorId(123) << "ns" diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 62b1bf834f9..f7f24aa31c2 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -267,6 +267,7 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx, BSONObjBuilder output; bool seenReadConcern = false; bool seenWriteConcern = false; + const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); for (const auto& elem : cmdObj) { const auto name = elem.fieldNameStringData(); if (appendRC && name == repl::ReadConcernArgs::kReadConcernFieldName) { @@ -276,13 +277,18 @@ BSONObj applyReadWriteConcern(OperationContext* opCtx, seenWriteConcern = true; } if (!output.hasField(name)) { - output.append(elem); + // If mongos selected atClusterTime, forward it to the shard. + if (name == repl::ReadConcernArgs::kReadConcernFieldName && + readConcernArgs.wasAtClusterTimeSelected()) { + output.appendElements(readConcernArgs.toBSON()); + } else { + output.append(elem); + } } } // Finally, add the new read/write concern. if (appendRC && !seenReadConcern) { - const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); output.appendElements(readConcernArgs.toBSON()); } if (appendWC && !seenWriteConcern) { @@ -721,6 +727,12 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( auto catalogCache = Grid::get(opCtx)->catalogCache(); invariant(catalogCache); + auto argsAtClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); + if (argsAtClusterTime) { + return catalogCache->getCollectionRoutingInfoAt( + opCtx, nss, argsAtClusterTime->asTimestamp()); + } + // Return the latest routing table if not running in a transaction with snapshot level read // concern. auto txnRouter = TransactionRouter::get(opCtx); diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index 0e02d3169a1..32d0ffe4b1e 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -250,6 +250,11 @@ public: } result.appendArray("values", b.obj()); + // If mongos selected atClusterTime or received it from client, transmit it back. + if (repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()) { + result.append("atClusterTime"_sd, + repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime()->asTimestamp()); + } return true; } diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index 66cdf67b00f..7ebab369391 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -230,6 +230,7 @@ public: // Build the response document. CursorResponseBuilder::Options options; options.isInitialResponse = true; + options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); CursorResponseBuilder firstBatch(result, options); for (const auto& obj : batch) { firstBatch.append(obj); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index d1dd14d1a70..6b70018dfd8 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -252,10 +252,21 @@ void execCommandClient(OperationContext* opCtx, } auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { - uassert(ErrorCodes::InvalidOptions, - "read concern snapshot is only supported in a multi-statement transaction", - TransactionRouter::get(opCtx)); + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && + !TransactionRouter::get(opCtx) && !readConcernArgs.getArgsAtClusterTime()) { + // Select the latest known clusterTime as the atClusterTime for snapshot reads outside + // of transactions. + auto atClusterTime = [&] { + auto latestKnownClusterTime = LogicalClock::get(opCtx)->getClusterTime(); + // If the user passed afterClusterTime, the chosen time must be greater than or + // equal to it. + auto afterClusterTime = readConcernArgs.getArgsAfterClusterTime(); + if (afterClusterTime && *afterClusterTime > latestKnownClusterTime) { + return afterClusterTime->asTimestamp(); + } + return latestKnownClusterTime.asTimestamp(); + }(); + readConcernArgs.setArgsAtClusterTimeForSnapshot(atClusterTime); } // attach tracking @@ -401,12 +412,6 @@ void runCommand(OperationContext* opCtx, return; } - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { - uassert(ErrorCodes::InvalidOptions, - "read concern snapshot is not supported with atClusterTime on mongos", - !readConcernArgs.getArgsAtClusterTime()); - } - boost::optional<RouterOperationContextSession> routerSession; try { rpc::readRequestMetadata(opCtx, request.body, command->requiresAuth()); diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 49ca39fed20..ff930da893d 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -1348,7 +1348,8 @@ DEATH_TEST_REGEX_F( cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], - CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor))); + CursorResponse( + kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor))); // Create a second cursor whose initial batch has no PBRT. cursors.push_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 456, {}))); @@ -1381,7 +1382,8 @@ DEATH_TEST_REGEX_F(AsyncResultsMergerTest, cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], - CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor))); + CursorResponse( + kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor))); params.setRemotes(std::move(cursors)); params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); params.setSort(change_stream_constants::kSortSpec); @@ -1409,11 +1411,13 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfRemoteHasLowerPostB cursors.push_back(makeRemoteCursor( kTestShardIds[0], kTestShardHosts[0], - CursorResponse(kTestNss, 123, {firstCursorResponse}, boost::none, pbrtFirstCursor))); + CursorResponse( + kTestNss, 123, {firstCursorResponse}, boost::none, boost::none, pbrtFirstCursor))); auto tooLowPBRT = makePostBatchResumeToken(Timestamp(1, 2)); - cursors.push_back(makeRemoteCursor(kTestShardIds[1], - kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], + kTestShardHosts[1], + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT))); params.setRemotes(std::move(cursors)); params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); params.setSort(change_stream_constants::kSortSpec); @@ -1459,7 +1463,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) << firstDocSortKey.firstElement().String() << "'}]}"); std::vector<BSONObj> batch1 = {firstCursorResponse}; auto firstDoc = batch1.front(); - responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor); + responses.emplace_back( + kTestNss, CursorId(123), batch1, boost::none, boost::none, pbrtFirstCursor); scheduleNetworkResponses(std::move(responses)); // Should be ready now. @@ -1471,7 +1476,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) newCursors.push_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT))); + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT))); arm->addNewShardCursors(std::move(newCursors)); // Now shouldn't be ready, our guarantee from the new shard isn't sufficiently advanced. @@ -1488,7 +1493,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) << secondDocSortKey.firstElement().String() << "'}]}"); std::vector<BSONObj> batch2 = {secondCursorResponse}; auto secondDoc = batch2.front(); - responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor); + responses.emplace_back( + kTestNss, CursorId(456), batch2, boost::none, boost::none, pbrtSecondCursor); scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1536,7 +1542,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting << "', documentKey: {_id: 1}}, $sortKey: [{_data: '" << firstDocSortKey.firstElement().String() << "'}]}"); std::vector<BSONObj> batch1 = {firstCursorResponse}; - responses.emplace_back(kTestNss, CursorId(123), batch1, boost::none, pbrtFirstCursor); + responses.emplace_back( + kTestNss, CursorId(123), batch1, boost::none, boost::none, pbrtFirstCursor); scheduleNetworkResponses(std::move(responses)); // Should be ready now. @@ -1548,7 +1555,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting newCursors.push_back( makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, tooLowPBRT))); + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, tooLowPBRT))); arm->addNewShardCursors(std::move(newCursors)); // Now shouldn't be ready, our guarantee from the new shard isn't sufficiently advanced. @@ -1566,7 +1573,8 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting std::vector<BSONObj> batch2 = {secondCursorResponse}; // The last observed time should still be later than the first shard, so we can get the data // from it. - responses.emplace_back(kTestNss, CursorId(456), batch2, boost::none, pbrtSecondCursor); + responses.emplace_back( + kTestNss, CursorId(456), batch2, boost::none, boost::none, pbrtSecondCursor); scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1594,20 +1602,20 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey) std::vector<RemoteCursor> cursors; // Create three cursors with empty initial batches. Each batch has a PBRT. auto pbrtFirstCursor = makePostBatchResumeToken(Timestamp(1, 5)); - cursors.push_back( - makeRemoteCursor(kTestShardIds[0], - kTestShardHosts[0], - CursorResponse(kTestNss, 123, {}, boost::none, pbrtFirstCursor))); + cursors.push_back(makeRemoteCursor( + kTestShardIds[0], + kTestShardHosts[0], + CursorResponse(kTestNss, 123, {}, boost::none, boost::none, pbrtFirstCursor))); auto pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 1)); - cursors.push_back( - makeRemoteCursor(kTestShardIds[1], - kTestShardHosts[1], - CursorResponse(kTestNss, 456, {}, boost::none, pbrtSecondCursor))); + cursors.push_back(makeRemoteCursor( + kTestShardIds[1], + kTestShardHosts[1], + CursorResponse(kTestNss, 456, {}, boost::none, boost::none, pbrtSecondCursor))); auto pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 4)); - cursors.push_back( - makeRemoteCursor(kTestShardIds[2], - kTestShardHosts[2], - CursorResponse(kTestNss, 789, {}, boost::none, pbrtThirdCursor))); + cursors.push_back(makeRemoteCursor( + kTestShardIds[2], + kTestShardHosts[2], + CursorResponse(kTestNss, 789, {}, boost::none, boost::none, pbrtThirdCursor))); params.setRemotes(std::move(cursors)); params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); params.setSort(change_stream_constants::kSortSpec); @@ -1625,26 +1633,35 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorReturnsHighWaterMarkSortKey) // each cursor to be updated in-order, so we keep the first and third PBRTs constant. pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 3)); std::vector<BSONObj> emptyBatch = {}; - scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor}); - scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor}); - scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtSecondCursor); ASSERT_FALSE(arm->ready()); // Advance the second cursor again, so that it surpasses the other two. The third cursor becomes // the new high water mark. pbrtSecondCursor = makePostBatchResumeToken(Timestamp(1, 6)); - scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor}); - scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor}); - scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtThirdCursor); ASSERT_FALSE(arm->ready()); // Advance the third cursor such that the first cursor becomes the high water mark. pbrtThirdCursor = makePostBatchResumeToken(Timestamp(1, 7)); - scheduleNetworkResponse({kTestNss, CursorId(123), emptyBatch, boost::none, pbrtFirstCursor}); - scheduleNetworkResponse({kTestNss, CursorId(456), emptyBatch, boost::none, pbrtSecondCursor}); - scheduleNetworkResponse({kTestNss, CursorId(789), emptyBatch, boost::none, pbrtThirdCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(123), emptyBatch, boost::none, boost::none, pbrtFirstCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(456), emptyBatch, boost::none, boost::none, pbrtSecondCursor}); + scheduleNetworkResponse( + {kTestNss, CursorId(789), emptyBatch, boost::none, boost::none, pbrtThirdCursor}); ASSERT_BSONOBJ_EQ(arm->getHighWaterMark(), pbrtFirstCursor); ASSERT_FALSE(arm->ready()); diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 56d865508c6..6221b1809c4 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -243,7 +243,8 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, const PrivilegeVector& privileges) { - ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); + ClusterClientCursorParams params( + requestedNss, ReadPreferenceSetting::get(opCtx), ReadConcernArgs::get(opCtx)); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.tailableMode = pipelineForMerging->getContext()->tailableMode; @@ -267,7 +268,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, rpc::OpMsgReplyBuilder replyBuilder; CursorResponseBuilder::Options options; options.isInitialResponse = true; - + options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); CursorResponseBuilder responseBuilder(&replyBuilder, options); bool stashedResult = false; diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 31d4211570f..6dc1ae8419e 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -181,6 +181,11 @@ public: virtual boost::optional<ReadPreferenceSetting> getReadPreference() const = 0; /** + * Returns the readConcern for this cursor. + */ + virtual boost::optional<ReadConcernArgs> getReadConcern() const = 0; + + /** * Returns the creation date of the cursor. */ virtual Date_t getCreatedDate() const = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index c6f18579bf1..fefd913f8a7 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -209,6 +209,10 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc return _params.readPreference; } +boost::optional<ReadConcernArgs> ClusterClientCursorImpl::getReadConcern() const { + return _params.readConcern; +} + std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor, diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 91a9c4455f3..23f1c351fda 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -104,6 +104,8 @@ public: boost::optional<ReadPreferenceSetting> getReadPreference() const final; + boost::optional<ReadConcernArgs> getReadConcern() const final; + Date_t getCreatedDate() const final; Date_t getLastUseDate() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 958a909687c..e6f86dccade 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -153,4 +153,8 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorMock::getReadPreferenc return boost::none; } +boost::optional<ReadConcernArgs> ClusterClientCursorMock::getReadConcern() const { + return boost::none; +} + } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index c86f66315b6..f72551a24da 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -92,6 +92,8 @@ public: boost::optional<ReadPreferenceSetting> getReadPreference() const final; + boost::optional<ReadConcernArgs> getReadConcern() const final; + Date_t getCreatedDate() const final; Date_t getLastUseDate() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 24ec074376f..cd6563f842c 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/tailable_mode.h" +#include "mongo/db/repl/read_concern_args.h" #include "mongo/s/client/shard.h" #include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/util/net/hostandport.h" @@ -55,6 +56,8 @@ class TaskExecutor; class OperationContext; class RouterExecStage; +using repl::ReadConcernArgs; + /** * The resulting ClusterClientCursor will take ownership of the existing remote cursor, generating * results based on the cursor's current state. @@ -65,11 +68,15 @@ class RouterExecStage; */ struct ClusterClientCursorParams { ClusterClientCursorParams(NamespaceString nss, - boost::optional<ReadPreferenceSetting> readPref = boost::none) + boost::optional<ReadPreferenceSetting> readPref = boost::none, + boost::optional<ReadConcernArgs> readConcernArgs = boost::none) : nsString(std::move(nss)) { if (readPref) { readPreference = std::move(readPref.get()); } + if (readConcernArgs) { + readConcern = std::move(readConcernArgs.get()); + } } /** @@ -143,6 +150,9 @@ struct ClusterClientCursorParams { // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; + // Set if a readConcern must be respected throughout the lifetime of the cursor. + boost::optional<ReadConcernArgs> readConcern; + // Whether the client indicated that it is willing to receive partial results in the case of an // unreachable host. bool isAllowPartialResults = false; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index e3bbfb5d995..422c02406da 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -187,6 +187,12 @@ std::vector<std::pair<ShardId, BSONObj>> constructRequestsForShards( qrToForward = std::make_unique<QueryRequest>(query.getQueryRequest()); } + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + if (readConcernArgs.wasAtClusterTimeSelected()) { + // If mongos selected atClusterTime or received it from client, transmit it to shard. + qrToForward->setReadConcern(readConcernArgs.toBSONInner()); + } + auto shardRegistry = Grid::get(opCtx)->shardRegistry(); std::vector<std::pair<ShardId, BSONObj>> requests; for (const auto& shardId : shardIds) { @@ -242,7 +248,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // Construct the query and parameters. Defer setting skip and limit here until // we determine if the query is targeting multi-shards or a single shard below. - ClusterClientCursorParams params(query.nss(), readPref); + ClusterClientCursorParams params(query.nss(), readPref, ReadConcernArgs::get(opCtx)); params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned(); params.batchSize = query.getQueryRequest().getEffectiveBatchSize(); params.tailableMode = query.getQueryRequest().getTailableMode(); @@ -429,6 +435,11 @@ Status setUpOperationContextStateForGetMore(OperationContext* opCtx, ReadPreferenceSetting::get(opCtx) = *readPref; } + if (auto readConcern = cursor->getReadConcern()) { + // Used to return "atClusterTime" in cursor replies to clients for snapshot reads. + ReadConcernArgs::get(opCtx) = *readConcern; + } + // If the originating command had a 'comment' field, we extract it and set it on opCtx. Note // that if the 'getMore' command itself has a 'comment' field, we give precedence to it. auto comment = cursor->getOriginatingCommand()["comment"]; @@ -835,9 +846,12 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"); } + auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); return CursorResponse(request.nss, idToReturn, std::move(batch), + atClusterTime ? atClusterTime->asTimestamp() + : boost::optional<Timestamp>{}, startingFrom, postBatchResumeToken, boost::none, diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 8217de1e2ec..3b0bbf4e85c 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -179,8 +179,9 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, } // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true. - remoteCursors.push_back( - {response.shardId.toString(), {}, {nss, CursorId{0}, {}, {}, {}, {}, true}}); + remoteCursors.push_back({response.shardId.toString(), + {}, + {nss, CursorId{0}, {}, {}, {}, {}, {}, true}}); } } return remoteCursors; diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 9877af451a2..b76a47cddbe 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -99,7 +99,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, return cmdResult; } - ClusterClientCursorParams params(incomingCursorResponse.getValue().getNSS()); + ClusterClientCursorParams params( + incomingCursorResponse.getValue().getNSS(), boost::none, ReadConcernArgs::get(opCtx)); params.remotes.emplace_back(); auto& remoteCursor = params.remotes.back(); remoteCursor.setShardId(shardId.toString()); @@ -136,8 +137,10 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, CurOp::get(opCtx)->debug().cursorid = clusterCursorId.getValue(); - CursorResponse outgoingCursorResponse( - requestedNss, clusterCursorId.getValue(), incomingCursorResponse.getValue().getBatch()); + CursorResponse outgoingCursorResponse(requestedNss, + clusterCursorId.getValue(), + incomingCursorResponse.getValue().getBatch(), + incomingCursorResponse.getValue().getAtClusterTime()); return outgoingCursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse); } |