summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_client_cursor_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/query/cluster_client_cursor_impl.cpp')
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp41
1 files changed, 32 insertions, 9 deletions
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1a4be45f1be..58484e87bfa 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -30,7 +30,6 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
@@ -136,19 +135,20 @@ OperationContext* ClusterClientCursorImpl::getCurrentOperationContext() const {
}
bool ClusterClientCursorImpl::isTailable() const {
- return _params.tailableMode != TailableModeEnum::kNormal;
+ return _params.tailableMode != TailableMode::kNormal;
}
bool ClusterClientCursorImpl::isTailableAndAwaitData() const {
- return _params.tailableMode == TailableModeEnum::kTailableAndAwaitData;
+ return _params.tailableMode == TailableMode::kTailableAndAwaitData;
}
BSONObj ClusterClientCursorImpl::getOriginatingCommand() const {
return _params.originatingCommandObj;
}
-const std::size_t ClusterClientCursorImpl::getNumRemotes() const {
- return _root->getNumRemotes();
+const std::vector<ClusterClientCursorParams::RemoteCursor>& ClusterClientCursorImpl::getRemotes()
+ const {
+ return _params.remotes;
}
long long ClusterClientCursorImpl::getNumReturnedSoFar() const {
@@ -181,6 +181,30 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
namespace {
+/**
+ * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the
+ * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
+ */
+boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) {
+ // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort.
+ auto frontSort = mergePipeline->popFrontWithCriteria(
+ DocumentSourceSort::kStageName, [](const DocumentSource* const source) {
+ return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted();
+ });
+
+ if (frontSort) {
+ auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
+ if (auto sortLimit = sortStage->getLimitSrc()) {
+ // There was a limit stage absorbed into the sort stage, so we need to preserve that.
+ mergePipeline->addInitialSource(sortLimit);
+ }
+ return sortStage
+ ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
+ .toBson();
+ }
+ return boost::none;
+}
+
bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
dynamic_cast<DocumentSourceSkip*>(stage.get()));
@@ -226,10 +250,10 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu
// instead.
while (!pipeline->getSources().empty()) {
invariant(isSkipOrLimit(pipeline->getSources().front()));
- if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) {
+ if (auto skip = pipeline->popFrontWithCriteria(DocumentSourceSkip::kStageName)) {
root = stdx::make_unique<RouterStageSkip>(
opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
- } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) {
+ } else if (auto limit = pipeline->popFrontWithCriteria(DocumentSourceLimit::kStageName)) {
root = stdx::make_unique<RouterStageLimit>(
opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
}
@@ -246,8 +270,7 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
const auto skip = params->skip;
const auto limit = params->limit;
if (params->mergePipeline) {
- if (auto sort =
- cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) {
+ if (auto sort = extractLeadingSort(params->mergePipeline.get())) {
params->sort = *sort;
}
return buildPipelinePlan(executor, params);