diff options
author | Bernard Gorman <bernard.gorman@mongodb.com> | 2019-11-06 01:37:15 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-06 01:37:15 +0000 |
commit | a4b84bfd711ab0b2c236831abb11f29590ac4393 (patch) | |
tree | 83ba881af8449af39b7e7794153c15bd19083bf0 /src | |
parent | e2bd86fe3f5118b092067eaab0d944f899389cef (diff) | |
download | mongo-a4b84bfd711ab0b2c236831abb11f29590ac4393.tar.gz |
SERVER-43996 Return a cursor flag to the client if only a partial subset of results are available
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/query/cursor_response.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.h | 15 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response_test.cpp | 95 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_cmd.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 47 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 23 | ||||
-rw-r--r-- | src/mongo/s/query/blocking_results_merger.h | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor.h | 8 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_mock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 31 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.h | 3 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 29 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors_test.cpp | 22 | ||||
-rw-r--r-- | src/mongo/s/query/router_exec_stage.h | 8 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_merge.h | 4 |
18 files changed, 287 insertions, 48 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 8cb8a063e1f..8063c51ac7e 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -49,6 +49,7 @@ const char kBatchFieldInitial[] = "firstBatch"; const char kBatchDocSequenceField[] = "cursor.nextBatch"; const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch"; const char kPostBatchResumeTokenField[] = "postBatchResumeToken"; +const char kPartialResultsReturnedField[] = "partialResultsReturned"; } // namespace @@ -78,6 +79,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace) if (!_postBatchResumeToken.isEmpty()) { _cursorObject->append(kPostBatchResumeTokenField, _postBatchResumeToken); } + if (_partialResultsReturned) { + _cursorObject->append(kPartialResultsReturnedField, true); + } _cursorObject->append(kIdField, cursorId); _cursorObject->append(kNsField, cursorNamespace); _cursorObject.reset(); @@ -123,13 +127,15 @@ CursorResponse::CursorResponse(NamespaceString nss, std::vector<BSONObj> batch, boost::optional<long long> numReturnedSoFar, boost::optional<BSONObj> postBatchResumeToken, - boost::optional<BSONObj> writeConcernError) + boost::optional<BSONObj> writeConcernError, + bool partialResultsReturned) : _nss(std::move(nss)), _cursorId(cursorId), _batch(std::move(batch)), _numReturnedSoFar(numReturnedSoFar), _postBatchResumeToken(std::move(postBatchResumeToken)), - _writeConcernError(std::move(writeConcernError)) {} + _writeConcernError(std::move(writeConcernError)), + _partialResultsReturned(partialResultsReturned) {} std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany( const BSONObj& cmdResponse) { @@ -225,6 +231,17 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo << postBatchResumeTokenElem.type()}; } + auto partialResultsReturned = cursorObj[kPartialResultsReturnedField]; + + if (partialResultsReturned) { + if (partialResultsReturned.type() != BSONType::Bool) { + return {ErrorCodes::BadValue, + str::stream() << kPartialResultsReturnedField + << " format is invalid; expected Bool, but found: " + << partialResultsReturned.type()}; + } + } + auto writeConcernError = cmdResponse["writeConcernError"]; if (writeConcernError && writeConcernError.type() != BSONType::Object) { @@ -239,7 +256,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo boost::none, postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned() : boost::optional<BSONObj>{}, - writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}}; + writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}, + partialResultsReturned.trueValue()}}; } void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, @@ -261,6 +279,10 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken); } + if (_partialResultsReturned) { + cursorBuilder.append(kPartialResultsReturnedField, true); + } + cursorBuilder.doneFast(); builder->append("ok", 1.0); diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index a92750dfeee..509600103e4 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -49,7 +49,7 @@ class CursorResponseBuilder { public: /** - * Structure used to confiugre the CursorResponseBuilder. + * Structure used to configure the CursorResponseBuilder. */ struct Options { bool isInitialResponse = false; @@ -91,6 +91,10 @@ public: _postBatchResumeToken = token.getOwned(); } + void setPartialResultsReturned(bool partialResults) { + _partialResultsReturned = partialResults; + } + long long numDocs() const { return _numDocs; } @@ -120,6 +124,7 @@ private: bool _active = true; long long _numDocs = 0; BSONObj _postBatchResumeToken; + bool _partialResultsReturned = false; }; /** @@ -188,7 +193,8 @@ public: std::vector<BSONObj> batch, boost::optional<long long> numReturnedSoFar = boost::none, boost::optional<BSONObj> postBatchResumeToken = boost::none, - boost::optional<BSONObj> writeConcernError = boost::none); + boost::optional<BSONObj> writeConcernError = boost::none, + bool partialResultsReturned = false); CursorResponse(CursorResponse&& other) = default; CursorResponse& operator=(CursorResponse&& other) = default; @@ -225,6 +231,10 @@ public: return _writeConcernError; } + bool getPartialResultsReturned() const { + return _partialResultsReturned; + } + /** * Converts this response to its raw BSON representation. */ @@ -241,6 +251,7 @@ private: boost::optional<long long> _numReturnedSoFar; boost::optional<BSONObj> _postBatchResumeToken; boost::optional<BSONObj> _writeConcernError; + bool _partialResultsReturned = false; }; } // namespace mongo diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index ecb5d7570b6..db1a70b6a74 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -183,6 +183,82 @@ TEST(CursorResponseTest, parseFromBSONOkFieldMissing) { ASSERT_NOT_OK(result.getStatus()); } +TEST(CursorResponseTest, parseFromBSONPartialResultsReturnedField) { + StatusWith<CursorResponse> result = CursorResponse::parseFromBSON(BSON( + "cursor" << BSON("id" << CursorId(123) << "ns" + << "db.coll" + << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2)) + << "partialResultsReturned" << true) + << "ok" << 1)); + ASSERT_OK(result.getStatus()); + + CursorResponse response = std::move(result.getValue()); + ASSERT_EQ(response.getCursorId(), CursorId(123)); + ASSERT_EQ(response.getNSS().ns(), "db.coll"); + ASSERT_EQ(response.getBatch().size(), 2U); + ASSERT_BSONOBJ_EQ(response.getBatch()[0], BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(response.getBatch()[1], BSON("_id" << 2)); + ASSERT_EQ(response.getPartialResultsReturned(), true); +} + +TEST(CursorResponseTest, parseFromBSONPartialResultsReturnedFieldWrongType) { + StatusWith<CursorResponse> result = CursorResponse::parseFromBSON(BSON( + "cursor" << BSON("id" << CursorId(123) << "ns" + << "db.coll" + << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2)) + << "partialResultsReturned" << 1) + << "ok" << 1)); + ASSERT_NOT_OK(result.getStatus()); +} + +TEST(CursorResponseTest, roundTripThroughCursorResponseBuilderWithPartialResultsReturned) { + CursorResponseBuilder::Options options; + options.isInitialResponse = true; + rpc::OpMsgReplyBuilder builder; + BSONObj okStatus = BSON("ok" << 1); + BSONObj testDoc = BSON("_id" << 1); + BSONObj expectedBody = + BSON("cursor" << BSON("firstBatch" << BSON_ARRAY(testDoc) << "partialResultsReturned" + << true << "id" << CursorId(123) << "ns" + << "db.coll")); + + // Use CursorResponseBuilder to serialize the cursor response to OpMsgReplyBuilder. + CursorResponseBuilder crb(&builder, options); + crb.append(testDoc); + crb.setPartialResultsReturned(true); + crb.done(CursorId(123), "db.coll"); + + // Confirm that the resulting BSONObj response matches the expected body. + auto msg = builder.done(); + auto opMsg = OpMsg::parse(msg); + ASSERT_BSONOBJ_EQ(expectedBody, opMsg.body); + + // Append {"ok": 1} to the opMsg body so that it can be parsed by CursorResponse. + auto swCursorResponse = CursorResponse::parseFromBSON(opMsg.body.addField(okStatus["ok"])); + ASSERT_OK(swCursorResponse.getStatus()); + + // Confirm the CursorReponse parsed from CursorResponseBuilder output has the correct content. + CursorResponse response = std::move(swCursorResponse.getValue()); + ASSERT_EQ(response.getCursorId(), CursorId(123)); + ASSERT_EQ(response.getNSS().ns(), "db.coll"); + ASSERT_EQ(response.getBatch().size(), 1U); + ASSERT_BSONOBJ_EQ(response.getBatch()[0], testDoc); + ASSERT_EQ(response.getPartialResultsReturned(), true); + + // Re-serialize a BSONObj response from the CursorResponse. + auto cursorResBSON = response.toBSONAsInitialResponse(); + + // Confirm that the BSON serialized by the CursorResponse is the same as that serialized by the + // CursorResponseBuilder. Field ordering differs between the two, so compare per-element. + BSONObjIteratorSorted cursorResIt(cursorResBSON["cursor"].Obj()); + BSONObjIteratorSorted cursorBuilderIt(opMsg.body["cursor"].Obj()); + while (cursorResIt.more()) { + ASSERT(cursorBuilderIt.more()); + ASSERT_EQ(cursorResIt.next().woCompare(cursorBuilderIt.next()), 0); + } + ASSERT(!cursorBuilderIt.more()); +} + TEST(CursorResponseTest, parseFromBSONHandleErrorResponse) { StatusWith<CursorResponse> result = CursorResponse::parseFromBSON(BSON("ok" << 0 << "code" << 123 << "errmsg" @@ -216,6 +292,25 @@ TEST(CursorResponseTest, toBSONSubsequentResponse) { ASSERT_BSONOBJ_EQ(responseObj, expectedResponse); } +TEST(CursorResponseTest, toBSONPartialResultsReturned) { + std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; + CursorResponse response(NamespaceString("testdb.testcoll"), + CursorId(123), + batch, + boost::none, + boost::none, + boost::none, + true); + BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse); + BSONObj expectedResponse = BSON( + "cursor" << BSON("id" << CursorId(123) << "ns" + << "testdb.testcoll" + << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2)) + << "partialResultsReturned" << true) + << "ok" << 1.0); + ASSERT_BSONOBJ_EQ(responseObj, expectedResponse); +} + TEST(CursorResponseTest, addToBSONInitialResponse) { std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch); diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index 091cac8879b..36d15ca6ce8 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -213,9 +213,10 @@ public: try { // Do the work to generate the first batch of results. This blocks waiting to get // responses from the shard(s). + bool partialResultsReturned = false; std::vector<BSONObj> batch; - auto cursorId = - ClusterFind::runQuery(opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch); + auto cursorId = ClusterFind::runQuery( + opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch, &partialResultsReturned); // Build the response document. CursorResponseBuilder::Options options; @@ -224,6 +225,7 @@ public: for (const auto& obj : batch) { firstBatch.append(obj); } + firstBatch.setPartialResultsReturned(partialResultsReturned); firstBatch.done(cursorId, cq->ns()); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { result->reset(); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 51bbcf84db4..0cb80cf3e42 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -102,7 +102,11 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx, for (const auto& remote : _params.getRemotes()) { _remotes.emplace_back(remote.getHostAndPort(), remote.getCursorResponse().getNSS(), - remote.getCursorResponse().getCursorId()); + remote.getCursorResponse().getCursorId(), + remote.getCursorResponse().getPartialResultsReturned()); + + // A remote cannot be flagged as 'partialResultsReturned' if 'allowPartialResults' is false. + invariant(!(_remotes.back().partialResultsReturned && !_params.getAllowPartialResults())); // We don't check the return value of _addBatchToBuffer here; if there was an error, // it will be stored in the remote and the first call to ready() will return true. @@ -183,11 +187,33 @@ void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCurso const auto newIndex = _remotes.size(); _remotes.emplace_back(remote.getHostAndPort(), remote.getCursorResponse().getNSS(), - remote.getCursorResponse().getCursorId()); + remote.getCursorResponse().getCursorId(), + remote.getCursorResponse().getPartialResultsReturned()); _addBatchToBuffer(lk, newIndex, remote.getCursorResponse()); } } +bool AsyncResultsMerger::partialResultsReturned() const { + stdx::lock_guard<Latch> lk(_mutex); + return std::any_of(_remotes.begin(), _remotes.end(), [](const auto& remote) { + return remote.partialResultsReturned; + }); +} + +std::size_t AsyncResultsMerger::getNumRemotes() const { + // Take the lock to guard against shard additions or disconnections. + stdx::lock_guard<Latch> lk(_mutex); + + // If 'allowPartialResults' is false, the number of participating remotes is constant. + if (!_params.getAllowPartialResults()) { + return _remotes.size(); + } + // Otherwise, discount remotes which failed to connect or disconnected prematurely. + return std::count_if(_remotes.begin(), _remotes.end(), [](const auto& remote) { + return !remote.partialResultsReturned; + }); +} + BSONObj AsyncResultsMerger::getHighWaterMark() { stdx::lock_guard<Latch> lk(_mutex); auto minPromisedSortKey = _getMinPromisedSortKey(lk); @@ -579,14 +605,14 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t // // The ExchangePassthrough error code is an internal-only error code used specifically to // communicate that an error has occurred, but some other thread is responsible for returning - // the error to the user. In order to avoid polluting the user's error message, we ingore such + // the error to the user. In order to avoid polluting the user's error message, we ignore such // errors with the expectation that all outstanding cursors will be closed promptly. if (_params.getAllowPartialResults() || remote.status == ErrorCodes::ExchangePassthrough) { - remote.status = Status::OK(); - - // Clear the results buffer and cursor id. + // Clear the results buffer and cursor id, and set 'partialResultsReturned' if appropriate. + remote.partialResultsReturned = (remote.status != ErrorCodes::ExchangePassthrough); std::queue<ClusterQueryResult> emptyBuffer; std::swap(remote.docBuffer, emptyBuffer); + remote.status = Status::OK(); remote.cursorId = 0; } } @@ -758,10 +784,15 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort, NamespaceString cursorNss, - CursorId establishedCursorId) + CursorId establishedCursorId, + bool partialResultsReturned) : cursorId(establishedCursorId), cursorNss(std::move(cursorNss)), - shardHostAndPort(std::move(hostAndPort)) {} + shardHostAndPort(std::move(hostAndPort)), + partialResultsReturned(partialResultsReturned) { + // If the 'partialResultsReturned' flag is set, the cursorId must be zero (closed). + invariant(!(partialResultsReturned && cursorId != 0)); +} const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const { return shardHostAndPort; diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index e0e0f2e94c1..05c0ee6c1f4 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -210,9 +210,16 @@ public: */ void addNewShardCursors(std::vector<RemoteCursor>&& newCursors); - std::size_t getNumRemotes() const { - return _remotes.size(); - } + /** + * Returns true if the cursor was opened with 'allowPartialResults:true' and results are not + * available from one or more shards. + */ + bool partialResultsReturned() const; + + /** + * Returns the number of remotes involved in this operation. + */ + std::size_t getNumRemotes() const; /** * For sorted tailable cursors, returns the most recent available sort key. This guarantees that @@ -250,7 +257,8 @@ private: struct RemoteCursorData { RemoteCursorData(HostAndPort hostAndPort, NamespaceString cursorNss, - CursorId establishedCursorId); + CursorId establishedCursorId, + bool partialResultsReturned); /** * Returns the resolved host and port on which the remote cursor resides. @@ -283,12 +291,17 @@ private: // the operation if there is a view. NamespaceString cursorNss; - // The exact host in the shard on which the cursor resides. + // The exact host in the shard on which the cursor resides. Can be empty if this merger has + // 'allowPartialResults' set to true and initial cursor establishment failed on this shard. HostAndPort shardHostAndPort; // The identity of the shard which the cursor belongs to. ShardId shardId; + // This flag is set if the connection to the remote shard was lost, or never established in + // the first place. Only applicable if the 'allowPartialResults' option is enabled. + bool partialResultsReturned = false; + // The buffer of results that have been retrieved but not yet returned to the caller. std::queue<ClusterQueryResult> docBuffer; diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h index 563a1e36010..8ac29a6d803 100644 --- a/src/mongo/s/query/blocking_results_merger.h +++ b/src/mongo/s/query/blocking_results_merger.h @@ -68,6 +68,10 @@ public: return _arm.remotesExhausted(); } + bool partialResultsReturned() const { + return _arm.partialResultsReturned(); + } + std::size_t getNumRemotes() const { return _arm.getNumRemotes(); } diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index f68ab207f74..31d4211570f 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -119,7 +119,13 @@ public: void getOriginatingPrivileges() && = delete; /** - * Returns a reference to the vector of remote hosts involved in this operation. + * Returns true if the cursor was opened with 'allowPartialResults:true' and results are not + * available from one or more shards. + */ + virtual bool partialResultsReturned() const = 0; + + /** + * Returns the number of remote hosts involved in this operation. */ virtual std::size_t getNumRemotes() const = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 19ab85886e1..49d7f8c2e76 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -145,6 +145,10 @@ const PrivilegeVector& ClusterClientCursorImpl::getOriginatingPrivileges() const return _params.originatingPrivileges; } +bool ClusterClientCursorImpl::partialResultsReturned() const { + return _root->partialResultsReturned(); +} + std::size_t ClusterClientCursorImpl::getNumRemotes() const { return _root->getNumRemotes(); } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index daf73e58172..91a9c4455f3 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -84,6 +84,8 @@ public: const PrivilegeVector& getOriginatingPrivileges() const& final; void getOriginatingPrivileges() && = delete; + bool partialResultsReturned() const final; + std::size_t getNumRemotes() const final; BSONObj getPostBatchResumeToken() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index e0fcb5c9b59..78c02246301 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -74,6 +74,10 @@ const PrivilegeVector& ClusterClientCursorMock::getOriginatingPrivileges() const return _originatingPrivileges; } +bool ClusterClientCursorMock::partialResultsReturned() const { + MONGO_UNREACHABLE; +} + std::size_t ClusterClientCursorMock::getNumRemotes() const { MONGO_UNREACHABLE; } diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 065a9a9211a..c86f66315b6 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -74,6 +74,8 @@ public: const PrivilegeVector& getOriginatingPrivileges() const& final; void getOriginatingPrivileges() && = delete; + bool partialResultsReturned() const final; + std::size_t getNumRemotes() const final; BSONObj getPostBatchResumeToken() const final; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index f5e9b0609d1..013f91900f8 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -238,7 +238,8 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, const CachedCollectionRoutingInfo& routingInfo, - std::vector<BSONObj>* results) { + std::vector<BSONObj>* results, + bool* partialResultsReturned) { // Get the set of shards on which we will run the query. auto shardIds = getTargetedShardsForQuery(opCtx, routingInfo, @@ -299,7 +300,6 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, opCtx, routingInfo, shardIds, query, appendGeoNearDistanceProjection); // Establish the cursors with a consistent shardVersion across shards. - params.remotes = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), query.nss(), @@ -367,6 +367,11 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, CurOp::get(opCtx)->debug().nShards = ccc->getNumRemotes(); CurOp::get(opCtx)->debug().nreturned = results->size(); + // If the caller wants to know whether the cursor returned partial results, set it here. + if (partialResultsReturned) { + *partialResultsReturned = ccc->partialResultsReturned(); + } + // If the cursor is exhausted, then there are no more results to return and we don't need to // allocate a cursor id. if (cursorState == ClusterCursorManager::CursorState::Exhausted) { @@ -435,7 +440,14 @@ const size_t ClusterFind::kMaxRetries = 10; CursorId ClusterFind::runQuery(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, - std::vector<BSONObj>* results) { + std::vector<BSONObj>* results, + bool* partialResultsReturned) { + // If the user supplied a 'partialResultsReturned' out-parameter, default it to false here. + if (partialResultsReturned) { + *partialResultsReturned = false; + } + + // We must always have a BSONObj vector into which to output our results. invariant(results); // Projection on the reserved sort key field is illegal in mongos. @@ -461,7 +473,8 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx, auto routingInfo = uassertStatusOK(routingInfoStatus); try { - return runQueryWithoutRetrying(opCtx, query, readPref, routingInfo, results); + return runQueryWithoutRetrying( + opCtx, query, readPref, routingInfo, results, partialResultsReturned); } catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) { if (retries >= kMaxRetries) { // Check if there are no retries remaining, so the last received error can be @@ -731,6 +744,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, postBatchResumeToken = pinnedCursor.getValue()->getPostBatchResumeToken(); } + const bool partialResultsReturned = pinnedCursor.getValue()->partialResultsReturned(); pinnedCursor.getValue()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros()); pinnedCursor.getValue()->incNBatches(); // Upon successful completion, transfer ownership of the cursor back to the cursor manager. If @@ -748,8 +762,13 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, "waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch"); } - return CursorResponse( - request.nss, idToReturn, std::move(batch), startingFrom, postBatchResumeToken); + return CursorResponse(request.nss, + idToReturn, + std::move(batch), + startingFrom, + postBatchResumeToken, + boost::none, + partialResultsReturned); } } // namespace mongo diff --git a/src/mongo/s/query/cluster_find.h b/src/mongo/s/query/cluster_find.h index 0916ecf810a..89ab0d24731 100644 --- a/src/mongo/s/query/cluster_find.h +++ b/src/mongo/s/query/cluster_find.h @@ -64,7 +64,8 @@ public: static CursorId runQuery(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, - std::vector<BSONObj>* results); + std::vector<BSONObj>* results, + bool* partialResultsReturned = nullptr); /** * Executes the getMore request 'request', and on success returns a CursorResponse. diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 829c38246d1..1b4c1f19a22 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -70,8 +70,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, try { // Get the responses while (!ars.done()) { + auto response = ars.next(); try { - auto response = ars.next(); // Note the shardHostAndPort may not be populated if there was an error, so be sure // to do this after parsing the cursor response to ensure the response was ok. // Additionally, be careful not to push into 'remoteCursors' until we are sure we @@ -95,21 +95,20 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, uassertStatusOK(cursor.getStatus()); } - } catch (const ExceptionForCat<ErrorCategory::RetriableError>&) { - // Retriable errors are swallowed if 'allowPartialResults' is true. - if (allowPartialResults) { - continue; - } - throw; // Fail this loop. - } catch (const ExceptionFor<ErrorCodes::FailedToSatisfyReadPreference>&) { - // The errors marked as retriable errors are meant to correspond to the driver's - // spec (see SERVER-42908), but targeting a replica set shard can fail with - // FailedToSatisfyReadPreference, which is not a retriable error in the driver's - // spec, so we swallow it separately here if allowPartialResults is true. - if (allowPartialResults) { - continue; + } catch (const AssertionException& ex) { + // Retriable errors are swallowed if 'allowPartialResults' is true. Targeting shard + // replica sets can also throw FailedToSatisfyReadPreference, so we swallow it too. + bool isEligibleException = (ex.isA<ErrorCategory::RetriableError>() || + ex.code() == ErrorCodes::FailedToSatisfyReadPreference); + // Fail if the exception is something other than a retriable or read preference + // error, or if the 'allowPartialResults' query parameter was not enabled. + if (!allowPartialResults || !isEligibleException) { + throw; } - throw; // Fail this loop. + // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an + // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true. + remoteCursors.push_back( + {response.shardId.toString(), {}, {nss, CursorId{0}, {}, {}, {}, {}, true}}); } } return remoteCursors; diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp index c94ed902268..389417a76ca 100644 --- a/src/mongo/s/query/establish_cursors_test.cpp +++ b/src/mongo/s/query/establish_cursors_test.cpp @@ -275,9 +275,13 @@ TEST_F(EstablishCursorsTest, SingleRemoteMaxesOutRetriableErrorsAllowPartialResu true); // allowPartialResults // Failure to establish a cursor due to maxing out retriable errors on one remote (in this - // case, the only remote) was ignored, since allowPartialResults is true, and one less - // cursor was established. - ASSERT_EQUALS(remotes.size() - 1, cursors.size()); + // case, the only remote) was ignored, since allowPartialResults is true. The cursor entry + // is marked as 'partialResultReturned:true', with a CursorId of 0 and no HostAndPort. + ASSERT_EQ(cursors.size(), 1); + ASSERT(cursors.front().getHostAndPort().empty()); + ASSERT_EQ(cursors.front().getShardId(), kTestShardIds[0]); + ASSERT(cursors.front().getCursorResponse().getPartialResultsReturned()); + ASSERT_EQ(cursors.front().getCursorResponse().getCursorId(), CursorId{0}); }); // Remote repeatedly responds with retriable errors. @@ -571,8 +575,16 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteMaxesOutRetriableErrorsAllo remotes, true); // allowPartialResults // Failure to establish a cursor due to maxing out retriable errors on one remote was - // ignored, since allowPartialResults is true, and one less cursor was established. - ASSERT_EQUALS(remotes.size() - 1, cursors.size()); + // ignored, since allowPartialResults is true. The cursor entry for that shard is marked + // 'partialResultReturned:true', with a CursorId of 0 and no HostAndPort. + ASSERT_EQ(remotes.size(), cursors.size()); + for (auto&& cursor : cursors) { + const bool isMaxedOutShard = (cursor.getShardId() == kTestShardIds[1]); + ASSERT_EQ(cursor.getHostAndPort().empty(), isMaxedOutShard); + ASSERT_EQ(cursor.getCursorResponse().getPartialResultsReturned(), isMaxedOutShard); + ASSERT(isMaxedOutShard ? cursor.getCursorResponse().getCursorId() == CursorId{0} + : cursor.getCursorResponse().getCursorId() > CursorId{0}); + } }); // First remote responds with success. diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 5b611623ea9..7c5ee7aefb6 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -90,6 +90,14 @@ public: } /** + * Returns true if only a subset of the all relevant results are being returned by this cursor. + * Only applicable if the 'allowPartialResults' option was enabled in the query request. + */ + virtual bool partialResultsReturned() const { + return _child ? _child->partialResultsReturned() : false; + } + + /** * Returns the number of remote hosts involved in this execution plan. */ virtual std::size_t getNumRemotes() const { diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index a7cead17d89..0b088f12cbd 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -63,6 +63,10 @@ public: return _resultsMerger.remotesExhausted(); } + bool partialResultsReturned() const final { + return _resultsMerger.partialResultsReturned(); + } + std::size_t getNumRemotes() const final { return _resultsMerger.getNumRemotes(); } |