diff options
Diffstat (limited to 'src/mongo/s/query/cluster_client_cursor_impl.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.cpp | 88 |
1 files changed, 11 insertions, 77 deletions
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 1b3a665df5e..acda45f66f0 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -30,14 +30,8 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" -#include "mongo/db/pipeline/cluster_aggregation_planner.h" -#include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_skip.h" -#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_merge.h" -#include "mongo/s/query/router_stage_mock.h" -#include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/router_stage_remove_metadata_fields.h" #include "mongo/s/query/router_stage_skip.h" #include "mongo/stdx/memory.h" @@ -70,6 +64,14 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, return ClusterClientCursorGuard(opCtx, std::move(cursor)); } +ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx, + std::unique_ptr<RouterExecStage> root, + ClusterClientCursorParams&& params) { + std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl( + opCtx, std::move(root), std::move(params), opCtx->getLogicalSessionId())); + return ClusterClientCursorGuard(opCtx, std::move(cursor)); +} + ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams&& params, @@ -84,7 +86,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, } ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx, - std::unique_ptr<RouterStageMock> root, + std::unique_ptr<RouterExecStage> root, ClusterClientCursorParams&& params, boost::optional<LogicalSessionId> lsid) : _params(std::move(params)), _root(std::move(root)), _lsid(lsid), _opCtx(opCtx) { @@ -183,81 +185,13 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc return _params.readPreference; } -namespace { - -bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) { - return (dynamic_cast<DocumentSourceLimit*>(stage.get()) || - dynamic_cast<DocumentSourceSkip*>(stage.get())); -} - -bool isAllLimitsAndSkips(Pipeline* pipeline) { - const auto stages = pipeline->getSources(); - return std::all_of( - stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); }); -} - -/** - * Creates the initial stage to feed data into the execution plan. By default, a RouterExecMerge - * stage, or a custom stage if specified in 'params->creatCustomMerge'. - */ -std::unique_ptr<RouterExecStage> createInitialStage(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - if (params->createCustomCursorSource) { - return params->createCustomCursorSource(opCtx, executor, params); - } else { - return stdx::make_unique<RouterStageMerge>(opCtx, executor, params); - } -} - -std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - invariant(params->mergePipeline); - invariant(!params->skip); - invariant(!params->limit); - auto* pipeline = params->mergePipeline.get(); - auto* opCtx = pipeline->getContext()->opCtx; - - std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params); - if (!isAllLimitsAndSkips(pipeline)) { - return stdx::make_unique<RouterStagePipeline>(std::move(root), - std::move(params->mergePipeline)); - } - - // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and - // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive - // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree - // instead. - while (!pipeline->getSources().empty()) { - invariant(isSkipOrLimit(pipeline->getSources().front())); - if (auto skip = pipeline->popFrontWithName(DocumentSourceSkip::kStageName)) { - root = stdx::make_unique<RouterStageSkip>( - opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip()); - } else if (auto limit = pipeline->popFrontWithName(DocumentSourceLimit::kStageName)) { - root = stdx::make_unique<RouterStageLimit>( - opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit()); - } - } - // We are executing the pipeline without using an actual Pipeline, so we need to strip out any - // Document metadata ourselves. - return stdx::make_unique<RouterStageRemoveMetadataFields>( - opCtx, std::move(root), Document::allMetadataFieldNames); -} -} // namespace - std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) { const auto skip = params->skip; const auto limit = params->limit; - if (params->mergePipeline) { - if (auto sort = - cluster_aggregation_planner::popLeadingMergeSort(params->mergePipeline.get())) { - params->sort = *sort; - } - return buildPipelinePlan(executor, params); - } - std::unique_ptr<RouterExecStage> root = createInitialStage(opCtx, executor, params); + std::unique_ptr<RouterExecStage> root = + std::make_unique<RouterStageMerge>(opCtx, executor, params->extractARMParams()); if (skip) { root = stdx::make_unique<RouterStageSkip>(opCtx, std::move(root), *skip); |