summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger.cpp
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2017-12-19 17:43:50 -0500
committerTess Avitabile <tess.avitabile@mongodb.com>2017-12-28 11:01:26 -0500
commitf98cb60d80f281d3065b0282ed6f25b5f419ae1b (patch)
treeef3834ff588eaab6ff8382fa9f4bc74ebbff7d57 /src/mongo/s/query/async_results_merger.cpp
parente64b7e331649fd1d6beb83d981857ffd7ad6e539 (diff)
downloadmongo-f98cb60d80f281d3065b0282ed6f25b5f419ae1b.tar.gz
SERVER-1981 Support near and nearSphere predicates on sharded collections
Diffstat (limited to 'src/mongo/s/query/async_results_merger.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp44
1 files changed, 30 insertions, 14 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index fb4a6ab4459..59b01d775f5 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -55,8 +55,12 @@ const int kMaxNumFailedHostRetryAttempts = 3;
* Returns the sort key out of the $sortKey metadata field in 'obj'. This object is of the form
* {'': 'firstSortKey', '': 'secondSortKey', ...}.
*/
-BSONObj extractSortKey(BSONObj obj) {
+BSONObj extractSortKey(BSONObj obj, bool compareWholeSortKey) {
auto key = obj[ClusterClientCursorParams::kSortKeyField];
+ invariant(key);
+ if (compareWholeSortKey) {
+ return key.wrap();
+ }
invariant(key.type() == BSONType::Object);
return key.Obj();
}
@@ -80,7 +84,7 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
: _opCtx(opCtx),
_executor(executor),
_params(params),
- _mergeQueue(MergingComparator(_remotes, _params->sort)) {
+ _mergeQueue(MergingComparator(_remotes, _params->sort, _params->compareWholeSortKey)) {
size_t remoteIndex = 0;
for (const auto& remote : _params->remotes) {
_remotes.emplace_back(remote.hostAndPort,
@@ -218,7 +222,8 @@ bool AsyncResultsMerger::_readySortedTailable(WithLock) {
auto smallestRemote = _mergeQueue.top();
auto smallestResult = _remotes[smallestRemote].docBuffer.front();
- auto keyWeWantToReturn = extractSortKey(*smallestResult.getResult());
+ auto keyWeWantToReturn =
+ extractSortKey(*smallestResult.getResult(), _params->compareWholeSortKey);
for (const auto& remote : _remotes) {
if (!remote.promisedMinSortKey) {
// In order to merge sorted tailable cursors, we need this value to be populated.
@@ -473,7 +478,9 @@ void AsyncResultsMerger::updateRemoteMetadata(RemoteCursorData* remote,
// of {lastOplogTimestamp, uuid, docID} will be greater than the artificial promised min
// sort key of {lastOplogTimestamp, MINKEY, MINKEY}.
auto maxSortKeyFromResponse =
- (response.getBatch().empty() ? BSONObj() : extractSortKey(response.getBatch().back()));
+ (response.getBatch().empty()
+ ? BSONObj()
+ : extractSortKey(response.getBatch().back(), _params->compareWholeSortKey));
remote->promisedMinSortKey =
(compareSortKeys(
@@ -584,14 +591,23 @@ bool AsyncResultsMerger::_addBatchToBuffer(WithLock lk,
updateRemoteMetadata(&remote, response);
for (const auto& obj : response.getBatch()) {
// If there's a sort, we're expecting the remote node to have given us back a sort key.
- if (!_params->sort.isEmpty() &&
- obj[ClusterClientCursorParams::kSortKeyField].type() != BSONType::Object) {
- remote.status = Status(ErrorCodes::InternalError,
- str::stream() << "Missing field '"
- << ClusterClientCursorParams::kSortKeyField
- << "' in document: "
- << obj);
- return false;
+ if (!_params->sort.isEmpty()) {
+ auto key = obj[ClusterClientCursorParams::kSortKeyField];
+ if (!key) {
+ remote.status = Status(ErrorCodes::InternalError,
+ str::stream() << "Missing field '"
+ << ClusterClientCursorParams::kSortKeyField
+ << "' in document: "
+ << obj);
+ return false;
+ } else if (!_params->compareWholeSortKey && key.type() != BSONType::Object) {
+ remote.status =
+ Status(ErrorCodes::InternalError,
+ str::stream() << "Field '" << ClusterClientCursorParams::kSortKeyField
+ << "' was not of type Object in document: "
+ << obj);
+ return false;
+ }
}
ClusterQueryResult result(obj);
@@ -714,8 +730,8 @@ bool AsyncResultsMerger::MergingComparator::operator()(const size_t& lhs, const
const ClusterQueryResult& leftDoc = _remotes[lhs].docBuffer.front();
const ClusterQueryResult& rightDoc = _remotes[rhs].docBuffer.front();
- return compareSortKeys(extractSortKey(*leftDoc.getResult()),
- extractSortKey(*rightDoc.getResult()),
+ return compareSortKeys(extractSortKey(*leftDoc.getResult(), _compareWholeSortKey),
+ extractSortKey(*rightDoc.getResult(), _compareWholeSortKey),
_sort) > 0;
}