summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_client_cursor_impl.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-07-02 18:23:25 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-08-15 13:30:12 -0400
commitee06e6cbe5a75775f76836449558be2f6a98ddfd (patch)
treed4dbf37110d25f7f4876337a7b1e11abe251fac5 /src/mongo/s/query/cluster_client_cursor_impl.cpp
parenta5bde2f3e9afc3f72da01788b76829fb29c2f4e7 (diff)
downloadmongo-ee06e6cbe5a75775f76836449558be2f6a98ddfd.tar.gz
SERVER-33323 Refactor agg cursor merging on mongos
This commit makes it so that aggregations will always use a $mergeCursors as a wrapper around a AsyncResultsMerger, which is new behavior for mongos. As part of this refactor, we can delete the concept of a 'merging presorted' $sort stage (which is now handled by the AsyncResultsMerger) and delete the DocumentSourceRouterAdapter stage which talked to a RouterStageMerge, instead directly using a $mergeCursors stage.
Diffstat (limited to 'src/mongo/s/query/cluster_client_cursor_impl.cpp')
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp88
1 files changed, 11 insertions, 77 deletions
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 1b3a665df5e..acda45f66f0 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -30,14 +30,8 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/db/pipeline/cluster_aggregation_planner.h"
-#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_skip.h"
-#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_merge.h"
-#include "mongo/s/query/router_stage_mock.h"
-#include "mongo/s/query/router_stage_pipeline.h"
#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/s/query/router_stage_skip.h"
#include "mongo/stdx/memory.h"
@@ -70,6 +64,14 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
return ClusterClientCursorGuard(opCtx, std::move(cursor));
}
+ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> root,
+ ClusterClientCursorParams&& params) {
+ std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl(
+ opCtx, std::move(root), std::move(params), opCtx->getLogicalSessionId()));
+ return ClusterClientCursorGuard(opCtx, std::move(cursor));
+}
+
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams&& params,
@@ -84,7 +86,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
}
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
- std::unique_ptr<RouterStageMock> root,
+ std::unique_ptr<RouterExecStage> root,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid)
: _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) {
@@ -183,81 +185,13 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
return _params.readPreference;
}
-namespace {
-
-bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
- return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
- dynamic_cast<DocumentSourceSkip*>(stage.get()));
-}
-
-bool isAllLimitsAndSkips(Pipeline* pipeline) {
- const auto stages = pipeline->getSources();
- return std::all_of(
- stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); });
-}
-
-/**
- * Creates the initial stage to feed data into the execution plan. By default, a RouterExecMerge
- * stage, or a custom stage if specified in 'params->creatCustomMerge'.
- */
-std::unique_ptr<RouterExecStage> createInitialStage(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- if (params->createCustomCursorSource) {
- return params->createCustomCursorSource(opCtx, executor, params);
- } else {
- return stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
- }
-}
-
-std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- invariant(params->mergePipeline);
- invariant(!params->skip);
- invariant(!params->limit);
- auto* pipeline = params->mergePipeline.get();
- auto* opCtx = pipeline->getContext()->opCtx;
-
- std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params);
- if (!isAllLimitsAndSkips(pipeline)) {
- return stdx::make_unique<RouterStagePipeline>(std::move(root),
- std::move(params->mergePipeline));
- }
-
- // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and
- // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive
- // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree
- // instead.
- while (!pipeline->getSources().empty()) {
- invariant(isSkipOrLimit(pipeline->getSources().front()));
- if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) {
- root = stdx::make_unique<RouterStageSkip>(
- opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
- } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) {
- root = stdx::make_unique<RouterStageLimit>(
- opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
- }
- }
- // We are executing the pipeline without using an actual Pipeline, so we need to strip out any
- // Document metadata ourselves.
- return stdx::make_unique<RouterStageRemoveMetadataFields>(
- opCtx, std::move(root), Document::allMetadataFieldNames);
-}
-} // namespace
-
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
const auto skip = params->skip;
const auto limit = params->limit;
- if (params->mergePipeline) {
- if (auto sort =
- cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) {
- params->sort = *sort;
- }
- return buildPipelinePlan(executor, params);
- }
- std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params);
+ std::unique_ptr<RouterExecStage> root =
+ std::make_unique<RouterStageMerge>(opCtx, executor, params->extractARMParams());
if (skip) {
root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip);