summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@mongodb.com>2019-11-06 01:37:15 +0000
committerevergreen <evergreen@mongodb.com>2019-11-06 01:37:15 +0000
commita4b84bfd711ab0b2c236831abb11f29590ac4393 (patch)
tree83ba881af8449af39b7e7794153c15bd19083bf0 /src
parente2bd86fe3f5118b092067eaab0d944f899389cef (diff)
downloadmongo-a4b84bfd711ab0b2c236831abb11f29590ac4393.tar.gz
SERVER-43996 Return a cursor flag to the client if only a partial subset of results are available
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/query/cursor_response.cpp28
-rw-r--r--src/mongo/db/query/cursor_response.h15
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp95
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp6
-rw-r--r--src/mongo/s/query/async_results_merger.cpp47
-rw-r--r--src/mongo/s/query/async_results_merger.h23
-rw-r--r--src/mongo/s/query/blocking_results_merger.h4
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h8
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_find.cpp31
-rw-r--r--src/mongo/s/query/cluster_find.h3
-rw-r--r--src/mongo/s/query/establish_cursors.cpp29
-rw-r--r--src/mongo/s/query/establish_cursors_test.cpp22
-rw-r--r--src/mongo/s/query/router_exec_stage.h8
-rw-r--r--src/mongo/s/query/router_stage_merge.h4
18 files changed, 287 insertions, 48 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 8cb8a063e1f..8063c51ac7e 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -49,6 +49,7 @@ const char kBatchFieldInitial[] = "firstBatch";
const char kBatchDocSequenceField[] = "cursor.nextBatch";
const char kBatchDocSequenceFieldInitial[] = "cursor.firstBatch";
const char kPostBatchResumeTokenField[] = "postBatchResumeToken";
+const char kPartialResultsReturnedField[] = "partialResultsReturned";
} // namespace
@@ -78,6 +79,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace)
if (!_postBatchResumeToken.isEmpty()) {
_cursorObject->append(kPostBatchResumeTokenField, _postBatchResumeToken);
}
+ if (_partialResultsReturned) {
+ _cursorObject->append(kPartialResultsReturnedField, true);
+ }
_cursorObject->append(kIdField, cursorId);
_cursorObject->append(kNsField, cursorNamespace);
_cursorObject.reset();
@@ -123,13 +127,15 @@ CursorResponse::CursorResponse(NamespaceString nss,
std::vector<BSONObj> batch,
boost::optional<long long> numReturnedSoFar,
boost::optional<BSONObj> postBatchResumeToken,
- boost::optional<BSONObj> writeConcernError)
+ boost::optional<BSONObj> writeConcernError,
+ bool partialResultsReturned)
: _nss(std::move(nss)),
_cursorId(cursorId),
_batch(std::move(batch)),
_numReturnedSoFar(numReturnedSoFar),
_postBatchResumeToken(std::move(postBatchResumeToken)),
- _writeConcernError(std::move(writeConcernError)) {}
+ _writeConcernError(std::move(writeConcernError)),
+ _partialResultsReturned(partialResultsReturned) {}
std::vector<StatusWith<CursorResponse>> CursorResponse::parseFromBSONMany(
const BSONObj& cmdResponse) {
@@ -225,6 +231,17 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
<< postBatchResumeTokenElem.type()};
}
+ auto partialResultsReturned = cursorObj[kPartialResultsReturnedField];
+
+ if (partialResultsReturned) {
+ if (partialResultsReturned.type() != BSONType::Bool) {
+ return {ErrorCodes::BadValue,
+ str::stream() << kPartialResultsReturnedField
+ << " format is invalid; expected Bool, but found: "
+ << partialResultsReturned.type()};
+ }
+ }
+
auto writeConcernError = cmdResponse["writeConcernError"];
if (writeConcernError && writeConcernError.type() != BSONType::Object) {
@@ -239,7 +256,8 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
boost::none,
postBatchResumeTokenElem ? postBatchResumeTokenElem.Obj().getOwned()
: boost::optional<BSONObj>{},
- writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{}}};
+ writeConcernError ? writeConcernError.Obj().getOwned() : boost::optional<BSONObj>{},
+ partialResultsReturned.trueValue()}};
}
void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
@@ -261,6 +279,10 @@ void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
cursorBuilder.append(kPostBatchResumeTokenField, *_postBatchResumeToken);
}
+ if (_partialResultsReturned) {
+ cursorBuilder.append(kPartialResultsReturnedField, true);
+ }
+
cursorBuilder.doneFast();
builder->append("ok", 1.0);
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index a92750dfeee..509600103e4 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -49,7 +49,7 @@ class CursorResponseBuilder {
public:
/**
- * Structure used to confiugre the CursorResponseBuilder.
+ * Structure used to configure the CursorResponseBuilder.
*/
struct Options {
bool isInitialResponse = false;
@@ -91,6 +91,10 @@ public:
_postBatchResumeToken = token.getOwned();
}
+ void setPartialResultsReturned(bool partialResults) {
+ _partialResultsReturned = partialResults;
+ }
+
long long numDocs() const {
return _numDocs;
}
@@ -120,6 +124,7 @@ private:
bool _active = true;
long long _numDocs = 0;
BSONObj _postBatchResumeToken;
+ bool _partialResultsReturned = false;
};
/**
@@ -188,7 +193,8 @@ public:
std::vector<BSONObj> batch,
boost::optional<long long> numReturnedSoFar = boost::none,
boost::optional<BSONObj> postBatchResumeToken = boost::none,
- boost::optional<BSONObj> writeConcernError = boost::none);
+ boost::optional<BSONObj> writeConcernError = boost::none,
+ bool partialResultsReturned = false);
CursorResponse(CursorResponse&& other) = default;
CursorResponse& operator=(CursorResponse&& other) = default;
@@ -225,6 +231,10 @@ public:
return _writeConcernError;
}
+ bool getPartialResultsReturned() const {
+ return _partialResultsReturned;
+ }
+
/**
* Converts this response to its raw BSON representation.
*/
@@ -241,6 +251,7 @@ private:
boost::optional<long long> _numReturnedSoFar;
boost::optional<BSONObj> _postBatchResumeToken;
boost::optional<BSONObj> _writeConcernError;
+ bool _partialResultsReturned = false;
};
} // namespace mongo
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index ecb5d7570b6..db1a70b6a74 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -183,6 +183,82 @@ TEST(CursorResponseTest, parseFromBSONOkFieldMissing) {
ASSERT_NOT_OK(result.getStatus());
}
+TEST(CursorResponseTest, parseFromBSONPartialResultsReturnedField) {
+ StatusWith<CursorResponse> result = CursorResponse::parseFromBSON(BSON(
+ "cursor" << BSON("id" << CursorId(123) << "ns"
+ << "db.coll"
+ << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2))
+ << "partialResultsReturned" << true)
+ << "ok" << 1));
+ ASSERT_OK(result.getStatus());
+
+ CursorResponse response = std::move(result.getValue());
+ ASSERT_EQ(response.getCursorId(), CursorId(123));
+ ASSERT_EQ(response.getNSS().ns(), "db.coll");
+ ASSERT_EQ(response.getBatch().size(), 2U);
+ ASSERT_BSONOBJ_EQ(response.getBatch()[0], BSON("_id" << 1));
+ ASSERT_BSONOBJ_EQ(response.getBatch()[1], BSON("_id" << 2));
+ ASSERT_EQ(response.getPartialResultsReturned(), true);
+}
+
+TEST(CursorResponseTest, parseFromBSONPartialResultsReturnedFieldWrongType) {
+ StatusWith<CursorResponse> result = CursorResponse::parseFromBSON(BSON(
+ "cursor" << BSON("id" << CursorId(123) << "ns"
+ << "db.coll"
+ << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2))
+ << "partialResultsReturned" << 1)
+ << "ok" << 1));
+ ASSERT_NOT_OK(result.getStatus());
+}
+
+TEST(CursorResponseTest, roundTripThroughCursorResponseBuilderWithPartialResultsReturned) {
+ CursorResponseBuilder::Options options;
+ options.isInitialResponse = true;
+ rpc::OpMsgReplyBuilder builder;
+ BSONObj okStatus = BSON("ok" << 1);
+ BSONObj testDoc = BSON("_id" << 1);
+ BSONObj expectedBody =
+ BSON("cursor" << BSON("firstBatch" << BSON_ARRAY(testDoc) << "partialResultsReturned"
+ << true << "id" << CursorId(123) << "ns"
+ << "db.coll"));
+
+ // Use CursorResponseBuilder to serialize the cursor response to OpMsgReplyBuilder.
+ CursorResponseBuilder crb(&builder, options);
+ crb.append(testDoc);
+ crb.setPartialResultsReturned(true);
+ crb.done(CursorId(123), "db.coll");
+
+ // Confirm that the resulting BSONObj response matches the expected body.
+ auto msg = builder.done();
+ auto opMsg = OpMsg::parse(msg);
+ ASSERT_BSONOBJ_EQ(expectedBody, opMsg.body);
+
+ // Append {"ok": 1} to the opMsg body so that it can be parsed by CursorResponse.
+ auto swCursorResponse = CursorResponse::parseFromBSON(opMsg.body.addField(okStatus["ok"]));
+ ASSERT_OK(swCursorResponse.getStatus());
+
+ // Confirm the CursorReponse parsed from CursorResponseBuilder output has the correct content.
+ CursorResponse response = std::move(swCursorResponse.getValue());
+ ASSERT_EQ(response.getCursorId(), CursorId(123));
+ ASSERT_EQ(response.getNSS().ns(), "db.coll");
+ ASSERT_EQ(response.getBatch().size(), 1U);
+ ASSERT_BSONOBJ_EQ(response.getBatch()[0], testDoc);
+ ASSERT_EQ(response.getPartialResultsReturned(), true);
+
+ // Re-serialize a BSONObj response from the CursorResponse.
+ auto cursorResBSON = response.toBSONAsInitialResponse();
+
+ // Confirm that the BSON serialized by the CursorResponse is the same as that serialized by the
+ // CursorResponseBuilder. Field ordering differs between the two, so compare per-element.
+ BSONObjIteratorSorted cursorResIt(cursorResBSON["cursor"].Obj());
+ BSONObjIteratorSorted cursorBuilderIt(opMsg.body["cursor"].Obj());
+ while (cursorResIt.more()) {
+ ASSERT(cursorBuilderIt.more());
+ ASSERT_EQ(cursorResIt.next().woCompare(cursorBuilderIt.next()), 0);
+ }
+ ASSERT(!cursorBuilderIt.more());
+}
+
TEST(CursorResponseTest, parseFromBSONHandleErrorResponse) {
StatusWith<CursorResponse> result =
CursorResponse::parseFromBSON(BSON("ok" << 0 << "code" << 123 << "errmsg"
@@ -216,6 +292,25 @@ TEST(CursorResponseTest, toBSONSubsequentResponse) {
ASSERT_BSONOBJ_EQ(responseObj, expectedResponse);
}
+TEST(CursorResponseTest, toBSONPartialResultsReturned) {
+ std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
+ CursorResponse response(NamespaceString("testdb.testcoll"),
+ CursorId(123),
+ batch,
+ boost::none,
+ boost::none,
+ boost::none,
+ true);
+ BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse);
+ BSONObj expectedResponse = BSON(
+ "cursor" << BSON("id" << CursorId(123) << "ns"
+ << "testdb.testcoll"
+ << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2))
+ << "partialResultsReturned" << true)
+ << "ok" << 1.0);
+ ASSERT_BSONOBJ_EQ(responseObj, expectedResponse);
+}
+
TEST(CursorResponseTest, addToBSONInitialResponse) {
std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch);
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index 091cac8879b..36d15ca6ce8 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -213,9 +213,10 @@ public:
try {
// Do the work to generate the first batch of results. This blocks waiting to get
// responses from the shard(s).
+ bool partialResultsReturned = false;
std::vector<BSONObj> batch;
- auto cursorId =
- ClusterFind::runQuery(opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch);
+ auto cursorId = ClusterFind::runQuery(
+ opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch, &partialResultsReturned);
// Build the response document.
CursorResponseBuilder::Options options;
@@ -224,6 +225,7 @@ public:
for (const auto& obj : batch) {
firstBatch.append(obj);
}
+ firstBatch.setPartialResultsReturned(partialResultsReturned);
firstBatch.done(cursorId, cq->ns());
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
result->reset();
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 51bbcf84db4..0cb80cf3e42 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -102,7 +102,11 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
for (const auto& remote : _params.getRemotes()) {
_remotes.emplace_back(remote.getHostAndPort(),
remote.getCursorResponse().getNSS(),
- remote.getCursorResponse().getCursorId());
+ remote.getCursorResponse().getCursorId(),
+ remote.getCursorResponse().getPartialResultsReturned());
+
+ // A remote cannot be flagged as 'partialResultsReturned' if 'allowPartialResults' is false.
+ invariant(!(_remotes.back().partialResultsReturned && !_params.getAllowPartialResults()));
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
@@ -183,11 +187,33 @@ void AsyncResultsMerger::addNewShardCursors(std::vector<RemoteCursor>&& newCurso
const auto newIndex = _remotes.size();
_remotes.emplace_back(remote.getHostAndPort(),
remote.getCursorResponse().getNSS(),
- remote.getCursorResponse().getCursorId());
+ remote.getCursorResponse().getCursorId(),
+ remote.getCursorResponse().getPartialResultsReturned());
_addBatchToBuffer(lk, newIndex, remote.getCursorResponse());
}
}
+bool AsyncResultsMerger::partialResultsReturned() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return std::any_of(_remotes.begin(), _remotes.end(), [](const auto& remote) {
+ return remote.partialResultsReturned;
+ });
+}
+
+std::size_t AsyncResultsMerger::getNumRemotes() const {
+ // Take the lock to guard against shard additions or disconnections.
+ stdx::lock_guard<Latch> lk(_mutex);
+
+ // If 'allowPartialResults' is false, the number of participating remotes is constant.
+ if (!_params.getAllowPartialResults()) {
+ return _remotes.size();
+ }
+ // Otherwise, discount remotes which failed to connect or disconnected prematurely.
+ return std::count_if(_remotes.begin(), _remotes.end(), [](const auto& remote) {
+ return !remote.partialResultsReturned;
+ });
+}
+
BSONObj AsyncResultsMerger::getHighWaterMark() {
stdx::lock_guard<Latch> lk(_mutex);
auto minPromisedSortKey = _getMinPromisedSortKey(lk);
@@ -579,14 +605,14 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t
//
// The ExchangePassthrough error code is an internal-only error code used specifically to
// communicate that an error has occurred, but some other thread is responsible for returning
- // the error to the user. In order to avoid polluting the user's error message, we ingore such
+ // the error to the user. In order to avoid polluting the user's error message, we ignore such
// errors with the expectation that all outstanding cursors will be closed promptly.
if (_params.getAllowPartialResults() || remote.status == ErrorCodes::ExchangePassthrough) {
- remote.status = Status::OK();
-
- // Clear the results buffer and cursor id.
+ // Clear the results buffer and cursor id, and set 'partialResultsReturned' if appropriate.
+ remote.partialResultsReturned = (remote.status != ErrorCodes::ExchangePassthrough);
std::queue<ClusterQueryResult> emptyBuffer;
std::swap(remote.docBuffer, emptyBuffer);
+ remote.status = Status::OK();
remote.cursorId = 0;
}
}
@@ -758,10 +784,15 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort,
NamespaceString cursorNss,
- CursorId establishedCursorId)
+ CursorId establishedCursorId,
+ bool partialResultsReturned)
: cursorId(establishedCursorId),
cursorNss(std::move(cursorNss)),
- shardHostAndPort(std::move(hostAndPort)) {}
+ shardHostAndPort(std::move(hostAndPort)),
+ partialResultsReturned(partialResultsReturned) {
+ // If the 'partialResultsReturned' flag is set, the cursorId must be zero (closed).
+ invariant(!(partialResultsReturned && cursorId != 0));
+}
const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const {
return shardHostAndPort;
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index e0e0f2e94c1..05c0ee6c1f4 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -210,9 +210,16 @@ public:
*/
void addNewShardCursors(std::vector<RemoteCursor>&& newCursors);
- std::size_t getNumRemotes() const {
- return _remotes.size();
- }
+ /**
+ * Returns true if the cursor was opened with 'allowPartialResults:true' and results are not
+ * available from one or more shards.
+ */
+ bool partialResultsReturned() const;
+
+ /**
+ * Returns the number of remotes involved in this operation.
+ */
+ std::size_t getNumRemotes() const;
/**
* For sorted tailable cursors, returns the most recent available sort key. This guarantees that
@@ -250,7 +257,8 @@ private:
struct RemoteCursorData {
RemoteCursorData(HostAndPort hostAndPort,
NamespaceString cursorNss,
- CursorId establishedCursorId);
+ CursorId establishedCursorId,
+ bool partialResultsReturned);
/**
* Returns the resolved host and port on which the remote cursor resides.
@@ -283,12 +291,17 @@ private:
// the operation if there is a view.
NamespaceString cursorNss;
- // The exact host in the shard on which the cursor resides.
+ // The exact host in the shard on which the cursor resides. Can be empty if this merger has
+ // 'allowPartialResults' set to true and initial cursor establishment failed on this shard.
HostAndPort shardHostAndPort;
// The identity of the shard which the cursor belongs to.
ShardId shardId;
+ // This flag is set if the connection to the remote shard was lost, or never established in
+ // the first place. Only applicable if the 'allowPartialResults' option is enabled.
+ bool partialResultsReturned = false;
+
// The buffer of results that have been retrieved but not yet returned to the caller.
std::queue<ClusterQueryResult> docBuffer;
diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h
index 563a1e36010..8ac29a6d803 100644
--- a/src/mongo/s/query/blocking_results_merger.h
+++ b/src/mongo/s/query/blocking_results_merger.h
@@ -68,6 +68,10 @@ public:
return _arm.remotesExhausted();
}
+ bool partialResultsReturned() const {
+ return _arm.partialResultsReturned();
+ }
+
std::size_t getNumRemotes() const {
return _arm.getNumRemotes();
}
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index f68ab207f74..31d4211570f 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -119,7 +119,13 @@ public:
void getOriginatingPrivileges() && = delete;
/**
- * Returns a reference to the vector of remote hosts involved in this operation.
+ * Returns true if the cursor was opened with 'allowPartialResults:true' and results are not
+ * available from one or more shards.
+ */
+ virtual bool partialResultsReturned() const = 0;
+
+ /**
+ * Returns the number of remote hosts involved in this operation.
*/
virtual std::size_t getNumRemotes() const = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 19ab85886e1..49d7f8c2e76 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -145,6 +145,10 @@ const PrivilegeVector& ClusterClientCursorImpl::getOriginatingPrivileges() const
return _params.originatingPrivileges;
}
+bool ClusterClientCursorImpl::partialResultsReturned() const {
+ return _root->partialResultsReturned();
+}
+
std::size_t ClusterClientCursorImpl::getNumRemotes() const {
return _root->getNumRemotes();
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index daf73e58172..91a9c4455f3 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -84,6 +84,8 @@ public:
const PrivilegeVector& getOriginatingPrivileges() const& final;
void getOriginatingPrivileges() && = delete;
+ bool partialResultsReturned() const final;
+
std::size_t getNumRemotes() const final;
BSONObj getPostBatchResumeToken() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index e0fcb5c9b59..78c02246301 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -74,6 +74,10 @@ const PrivilegeVector& ClusterClientCursorMock::getOriginatingPrivileges() const
return _originatingPrivileges;
}
+bool ClusterClientCursorMock::partialResultsReturned() const {
+ MONGO_UNREACHABLE;
+}
+
std::size_t ClusterClientCursorMock::getNumRemotes() const {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 065a9a9211a..c86f66315b6 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -74,6 +74,8 @@ public:
const PrivilegeVector& getOriginatingPrivileges() const& final;
void getOriginatingPrivileges() && = delete;
+ bool partialResultsReturned() const final;
+
std::size_t getNumRemotes() const final;
BSONObj getPostBatchResumeToken() const final;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index f5e9b0609d1..013f91900f8 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -238,7 +238,8 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
const CanonicalQuery& query,
const ReadPreferenceSetting& readPref,
const CachedCollectionRoutingInfo& routingInfo,
- std::vector<BSONObj>* results) {
+ std::vector<BSONObj>* results,
+ bool* partialResultsReturned) {
// Get the set of shards on which we will run the query.
auto shardIds = getTargetedShardsForQuery(opCtx,
routingInfo,
@@ -299,7 +300,6 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
opCtx, routingInfo, shardIds, query, appendGeoNearDistanceProjection);
// Establish the cursors with a consistent shardVersion across shards.
-
params.remotes = establishCursors(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
query.nss(),
@@ -367,6 +367,11 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
CurOp::get(opCtx)->debug().nShards = ccc->getNumRemotes();
CurOp::get(opCtx)->debug().nreturned = results->size();
+ // If the caller wants to know whether the cursor returned partial results, set it here.
+ if (partialResultsReturned) {
+ *partialResultsReturned = ccc->partialResultsReturned();
+ }
+
// If the cursor is exhausted, then there are no more results to return and we don't need to
// allocate a cursor id.
if (cursorState == ClusterCursorManager::CursorState::Exhausted) {
@@ -435,7 +440,14 @@ const size_t ClusterFind::kMaxRetries = 10;
CursorId ClusterFind::runQuery(OperationContext* opCtx,
const CanonicalQuery& query,
const ReadPreferenceSetting& readPref,
- std::vector<BSONObj>* results) {
+ std::vector<BSONObj>* results,
+ bool* partialResultsReturned) {
+ // If the user supplied a 'partialResultsReturned' out-parameter, default it to false here.
+ if (partialResultsReturned) {
+ *partialResultsReturned = false;
+ }
+
+ // We must always have a BSONObj vector into which to output our results.
invariant(results);
// Projection on the reserved sort key field is illegal in mongos.
@@ -461,7 +473,8 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
auto routingInfo = uassertStatusOK(routingInfoStatus);
try {
- return runQueryWithoutRetrying(opCtx, query, readPref, routingInfo, results);
+ return runQueryWithoutRetrying(
+ opCtx, query, readPref, routingInfo, results, partialResultsReturned);
} catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) {
if (retries >= kMaxRetries) {
// Check if there are no retries remaining, so the last received error can be
@@ -731,6 +744,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
postBatchResumeToken = pinnedCursor.getValue()->getPostBatchResumeToken();
}
+ const bool partialResultsReturned = pinnedCursor.getValue()->partialResultsReturned();
pinnedCursor.getValue()->setLeftoverMaxTimeMicros(opCtx->getRemainingMaxTimeMicros());
pinnedCursor.getValue()->incNBatches();
// Upon successful completion, transfer ownership of the cursor back to the cursor manager. If
@@ -748,8 +762,13 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
"waitBeforeUnpinningOrDeletingCursorAfterGetMoreBatch");
}
- return CursorResponse(
- request.nss, idToReturn, std::move(batch), startingFrom, postBatchResumeToken);
+ return CursorResponse(request.nss,
+ idToReturn,
+ std::move(batch),
+ startingFrom,
+ postBatchResumeToken,
+ boost::none,
+ partialResultsReturned);
}
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_find.h b/src/mongo/s/query/cluster_find.h
index 0916ecf810a..89ab0d24731 100644
--- a/src/mongo/s/query/cluster_find.h
+++ b/src/mongo/s/query/cluster_find.h
@@ -64,7 +64,8 @@ public:
static CursorId runQuery(OperationContext* opCtx,
const CanonicalQuery& query,
const ReadPreferenceSetting& readPref,
- std::vector<BSONObj>* results);
+ std::vector<BSONObj>* results,
+ bool* partialResultsReturned = nullptr);
/**
* Executes the getMore request 'request', and on success returns a CursorResponse.
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 829c38246d1..1b4c1f19a22 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -70,8 +70,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
try {
// Get the responses
while (!ars.done()) {
+ auto response = ars.next();
try {
- auto response = ars.next();
// Note the shardHostAndPort may not be populated if there was an error, so be sure
// to do this after parsing the cursor response to ensure the response was ok.
// Additionally, be careful not to push into 'remoteCursors' until we are sure we
@@ -95,21 +95,20 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
uassertStatusOK(cursor.getStatus());
}
- } catch (const ExceptionForCat<ErrorCategory::RetriableError>&) {
- // Retriable errors are swallowed if 'allowPartialResults' is true.
- if (allowPartialResults) {
- continue;
- }
- throw; // Fail this loop.
- } catch (const ExceptionFor<ErrorCodes::FailedToSatisfyReadPreference>&) {
- // The errors marked as retriable errors are meant to correspond to the driver's
- // spec (see SERVER-42908), but targeting a replica set shard can fail with
- // FailedToSatisfyReadPreference, which is not a retriable error in the driver's
- // spec, so we swallow it separately here if allowPartialResults is true.
- if (allowPartialResults) {
- continue;
+ } catch (const AssertionException& ex) {
+ // Retriable errors are swallowed if 'allowPartialResults' is true. Targeting shard
+ // replica sets can also throw FailedToSatisfyReadPreference, so we swallow it too.
+ bool isEligibleException = (ex.isA<ErrorCategory::RetriableError>() ||
+ ex.code() == ErrorCodes::FailedToSatisfyReadPreference);
+ // Fail if the exception is something other than a retriable or read preference
+ // error, or if the 'allowPartialResults' query parameter was not enabled.
+ if (!allowPartialResults || !isEligibleException) {
+ throw;
}
- throw; // Fail this loop.
+ // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an
+ // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true.
+ remoteCursors.push_back(
+ {response.shardId.toString(), {}, {nss, CursorId{0}, {}, {}, {}, {}, true}});
}
}
return remoteCursors;
diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp
index c94ed902268..389417a76ca 100644
--- a/src/mongo/s/query/establish_cursors_test.cpp
+++ b/src/mongo/s/query/establish_cursors_test.cpp
@@ -275,9 +275,13 @@ TEST_F(EstablishCursorsTest, SingleRemoteMaxesOutRetriableErrorsAllowPartialResu
true); // allowPartialResults
// Failure to establish a cursor due to maxing out retriable errors on one remote (in this
- // case, the only remote) was ignored, since allowPartialResults is true, and one less
- // cursor was established.
- ASSERT_EQUALS(remotes.size() - 1, cursors.size());
+ // case, the only remote) was ignored, since allowPartialResults is true. The cursor entry
+ // is marked as 'partialResultReturned:true', with a CursorId of 0 and no HostAndPort.
+ ASSERT_EQ(cursors.size(), 1);
+ ASSERT(cursors.front().getHostAndPort().empty());
+ ASSERT_EQ(cursors.front().getShardId(), kTestShardIds[0]);
+ ASSERT(cursors.front().getCursorResponse().getPartialResultsReturned());
+ ASSERT_EQ(cursors.front().getCursorResponse().getCursorId(), CursorId{0});
});
// Remote repeatedly responds with retriable errors.
@@ -571,8 +575,16 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteMaxesOutRetriableErrorsAllo
remotes,
true); // allowPartialResults
// Failure to establish a cursor due to maxing out retriable errors on one remote was
- // ignored, since allowPartialResults is true, and one less cursor was established.
- ASSERT_EQUALS(remotes.size() - 1, cursors.size());
+ // ignored, since allowPartialResults is true. The cursor entry for that shard is marked
+ // 'partialResultReturned:true', with a CursorId of 0 and no HostAndPort.
+ ASSERT_EQ(remotes.size(), cursors.size());
+ for (auto&& cursor : cursors) {
+ const bool isMaxedOutShard = (cursor.getShardId() == kTestShardIds[1]);
+ ASSERT_EQ(cursor.getHostAndPort().empty(), isMaxedOutShard);
+ ASSERT_EQ(cursor.getCursorResponse().getPartialResultsReturned(), isMaxedOutShard);
+ ASSERT(isMaxedOutShard ? cursor.getCursorResponse().getCursorId() == CursorId{0}
+ : cursor.getCursorResponse().getCursorId() > CursorId{0});
+ }
});
// First remote responds with success.
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 5b611623ea9..7c5ee7aefb6 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -90,6 +90,14 @@ public:
}
/**
+ * Returns true if only a subset of the all relevant results are being returned by this cursor.
+ * Only applicable if the 'allowPartialResults' option was enabled in the query request.
+ */
+ virtual bool partialResultsReturned() const {
+ return _child ? _child->partialResultsReturned() : false;
+ }
+
+ /**
* Returns the number of remote hosts involved in this execution plan.
*/
virtual std::size_t getNumRemotes() const {
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index a7cead17d89..0b088f12cbd 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -63,6 +63,10 @@ public:
return _resultsMerger.remotesExhausted();
}
+ bool partialResultsReturned() const final {
+ return _resultsMerger.partialResultsReturned();
+ }
+
std::size_t getNumRemotes() const final {
return _resultsMerger.getNumRemotes();
}