diff options
author | Arun Banala <arun.banala@mongodb.com> | 2019-05-23 11:12:19 +0100 |
---|---|---|
committer | Arun Banala <arun.banala@mongodb.com> | 2019-05-24 14:24:51 +0100 |
commit | 7536959e9afa7e5dd0ef7bc807630630e48d5706 (patch) | |
tree | 202f6bf9dd8264f1c8de61737e48a86e1ff2ab6a | |
parent | 4f034e89cc7978317d4a6ef34bc718a83ab55ecb (diff) | |
download | mongo-7536959e9afa7e5dd0ef7bc807630630e48d5706.tar.gz |
SERVER-41180 Rename 'MergingLogic' to 'DistributedPlanLogic' to avoid confusion with $merge
48 files changed, 89 insertions, 87 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 8bc5d419321..5e8244e7984 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -220,7 +220,7 @@ public: * A struct representing the information needed to execute this stage on a distributed * collection. Describes how a pipeline should be split for sharded execution. */ - struct MergingLogic { + struct DistributedPlanLogic { // A stage which executes on each shard in parallel, or nullptr if nothing can be done in // parallel. For example, a partial $group before a subsequent global $group. boost::intrusive_ptr<DocumentSource> shardsStage = nullptr; @@ -454,10 +454,10 @@ public: * Otherwise, returns a struct representing what needs to be done to merge each shard's pipeline * into a single stream of results. Must not mutate the existing source object; if different * behaviour is required, a new source should be created and configured appropriately. It is an - * error for the returned MergingLogic to have identical pointers for 'shardsStage' and + * error for the returned DistributedPlanLogic to have identical pointers for 'shardsStage' and * 'mergingStage'. */ - virtual boost::optional<MergingLogic> mergingLogic() = 0; + virtual boost::optional<DistributedPlanLogic> distributedPlanLogic() = 0; /** * Returns true if it would be correct to execute this stage in parallel across the shards in diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 34b4c7ffc60..bd0cc2483e8 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -62,9 +62,9 @@ public: /** * The $bucketAuto stage must be run on the merging shard. */ - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + return DistributedPlanLogic{nullptr, this, boost::none}; } static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index 0e5e6ee87ee..ab28dee5e8d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -76,11 +76,11 @@ public: return new DocumentSourceCloseCursor(expCtx); } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // This stage must run on mongos to ensure it sees any invalidation in the correct order, // and to ensure that all remote cursors are cleaned up properly. // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, change_stream_constants::kSortSpec}; + return DistributedPlanLogic{nullptr, this, change_stream_constants::kSortSpec}; } private: diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index ca1003128fd..415dacac16d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -54,7 +54,7 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const; StageConstraints constraints(Pipeline::SplitState pipeState) const final; - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h index 7f5564a4a38..66b289bb36b 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -58,7 +58,7 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index d449595faf7..c4cd53bb2cf 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -73,7 +73,7 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } @@ -129,8 +129,8 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { - MergingLogic logic; + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + DistributedPlanLogic logic; // This stage must run on mongos to ensure it sees the resume token, which could have come // from any shard. We also must include a mergingPresorted $sort stage to communicate to // the AsyncResultsMerger that we need to merge the streams in a particular order. diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 9573da4c514..9fd9fedf791 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -88,7 +88,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 13d59e8e368..a77c3f363dc 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -125,7 +125,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 4baab7d91c7..215d94c3387 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -66,7 +66,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index b3c5c4431c8..68b3a037047 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -214,7 +214,7 @@ public: LookupRequirement::kNotAllowed}; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 74281663fb6..868159ddb06 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -125,9 +125,9 @@ public: * TODO SERVER-24154: Should be smarter about splitting so that parts of the sub-pipelines can * potentially be run in parallel on multiple shards. */ - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + return DistributedPlanLogic{nullptr, this, boost::none}; } const std::vector<FacetPipeline>& getFacetPipelines() const { diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index a2c529bcb08..6df14d1e05a 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -236,9 +236,10 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx) : DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {} -boost::optional<DocumentSource::MergingLogic> DocumentSourceGeoNear::mergingLogic() { +boost::optional<DocumentSource::DistributedPlanLogic> +DocumentSourceGeoNear::distributedPlanLogic() { // {shardsStage, mergingStage, sortPattern} - return MergingLogic{this, nullptr, BSON(distanceField->fullPath() << 1)}; + return DistributedPlanLogic{this, nullptr, BSON(distanceField->fullPath() << 1)}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 7c84ff79d18..cbb490dd807 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -131,7 +131,7 @@ public: /** * In a sharded cluster, this becomes a merge sort by distance, from nearest to furthest. */ - boost::optional<MergingLogic> mergingLogic() final; + boost::optional<DistributedPlanLogic> distributedPlanLogic() final; private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 7454a998901..535e630acc0 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -77,9 +77,9 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + return DistributedPlanLogic{nullptr, this, boost::none}; } DepsTracker::State getDependencies(DepsTracker* deps) const final { diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 9625bc65d97..c2d2c0d7509 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -666,7 +666,7 @@ Document DocumentSourceGroup::makeDocument(const Value& id, return out.freeze(); } -boost::optional<DocumentSource::MergingLogic> DocumentSourceGroup::mergingLogic() { +boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceGroup::distributedPlanLogic() { intrusive_ptr<DocumentSourceGroup> mergingGroup(new DocumentSourceGroup(pExpCtx)); mergingGroup->setDoingMerge(true); @@ -686,7 +686,7 @@ boost::optional<DocumentSource::MergingLogic> DocumentSourceGroup::mergingLogic( } // {shardsStage, mergingStage, sortPattern} - return MergingLogic{this, mergingGroup, boost::none}; + return DistributedPlanLogic{this, mergingGroup, boost::none}; } bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const { diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index f59843a1c16..8324fc2b7f5 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -157,7 +157,7 @@ public: */ bool usedDisk() final; - boost::optional<MergingLogic> mergingLogic() final; + boost::optional<DistributedPlanLogic> distributedPlanLogic() final; bool canRunInParallelBeforeOut( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final; diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 8f0d7d3c7a2..8ea0cbc912a 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -597,12 +597,12 @@ protected: intrusive_ptr<DocumentSource> createMerger() { // Set up a group merger to simulate merging results in the router. In this // case only one shard is in use. - auto mergeLogic = group()->mergingLogic(); - ASSERT(mergeLogic); - ASSERT(mergeLogic->mergingStage); - ASSERT_NOT_EQUALS(group(), mergeLogic->mergingStage); - ASSERT_FALSE(static_cast<bool>(mergeLogic->inputSortPattern)); - return mergeLogic->mergingStage; + auto distributedPlanLogic = group()->distributedPlanLogic(); + ASSERT(distributedPlanLogic); + ASSERT(distributedPlanLogic->mergingStage); + ASSERT_NOT_EQUALS(group(), distributedPlanLogic->mergingStage); + ASSERT_FALSE(static_cast<bool>(distributedPlanLogic->inputSortPattern)); + return distributedPlanLogic->mergingStage; } void checkResultSet(const intrusive_ptr<DocumentSource>& sink) { // Load the results from the DocumentSourceGroup and sort them by _id. diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 3073f4c54bc..6462c109af7 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -83,7 +83,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index d0901f50998..86b919fb848 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -63,7 +63,7 @@ public: LookupRequirement::kAllowed}; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index ef08fd54488..9d58b7e3fd5 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -58,9 +58,9 @@ public: return kStageName.rawData(); } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + return DistributedPlanLogic{nullptr, this, boost::none}; } StageConstraints constraints(Pipeline::SplitState pipeState) const final { diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 5105f2fa787..3cecb165cf0 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -76,14 +76,15 @@ public: } /** - * Returns a MergingLogic with two identical $limit stages; one for the shards pipeline and one - * for the merging pipeline. + * Returns a DistributedPlanLogic with two identical $limit stages; one for the shards pipeline + * and one for the merging pipeline. */ - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // Running this stage on the shards is an optimization, but is not strictly necessary in // order to produce correct pipeline output. // {shardsStage, mergingStage, sortPattern} - return MergingLogic{this, DocumentSourceLimit::create(pExpCtx, _limit), boost::none}; + return DistributedPlanLogic{ + this, DocumentSourceLimit::create(pExpCtx, _limit), boost::none}; } long long getLimit() const { diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h index 08f6defcd84..3dca331efee 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h @@ -103,7 +103,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 9c04b35f935..80129190c83 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -117,7 +117,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 1718ad5dc63..054a9b9b896 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -117,9 +117,9 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + return DistributedPlanLogic{nullptr, this, boost::none}; } void addInvolvedCollections(stdx::unordered_set<NamespaceString>* collectionNames) const final; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 8920d207555..83bad0c7ee5 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -77,7 +77,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 5fc6d0b9ded..4645a2e1937 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -165,7 +165,7 @@ public: std::pair<boost::intrusive_ptr<DocumentSourceMatch>, boost::intrusive_ptr<DocumentSourceMatch>> splitSourceBy(const std::set<std::string>& fields, const StringMap<std::string>& renames); - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_merge.h b/src/mongo/db/pipeline/document_source_merge.h index 3b0d143ad9b..aafae386acf 100644 --- a/src/mongo/db/pipeline/document_source_merge.h +++ b/src/mongo/db/pipeline/document_source_merge.h @@ -136,7 +136,7 @@ public: LookupRequirement::kNotAllowed}; } - boost::optional<MergingLogic> mergingLogic() final override { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final override { // It should always be faster to avoid splitting the pipeline if the output collection is // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the // target collection in parallel. @@ -148,7 +148,7 @@ public: return boost::none; } // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + return DistributedPlanLogic{nullptr, this, boost::none}; } bool canRunInParallelBeforeOut( diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 11156ef4422..738fc802cc4 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -91,7 +91,7 @@ public: return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } - boost::optional<MergingLogic> mergingLogic() override { + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index ce9e4282b14..bfb9bb08771 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -156,7 +156,7 @@ public: return _mode; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // It should always be faster to avoid splitting the pipeline if the output collection is // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the // target collection in parallel. @@ -168,7 +168,7 @@ public: return boost::none; } // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + return DistributedPlanLogic{nullptr, this, boost::none}; } virtual bool canRunInParallelBeforeOut( diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h index 91283cea1ff..fd7d332a660 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h @@ -105,7 +105,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h index b38c992248e..4f5a61f5fb8 100644 --- a/src/mongo/db/pipeline/document_source_queue.h +++ b/src/mongo/db/pipeline/document_source_queue.h @@ -82,7 +82,7 @@ public: return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } - boost::optional<MergingLogic> mergingLogic() override { + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index bced5c25a01..1e64ba15787 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -51,7 +51,7 @@ public: ChangeStreamRequirement::kWhitelist}; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index b0f709366a1..23a71562a30 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -118,10 +118,10 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::createFromBson( } -boost::optional<DocumentSource::MergingLogic> DocumentSourceSample::mergingLogic() { +boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSample::distributedPlanLogic() { // On the merger we need to merge the pre-sorted documents by their random values, then limit to // the number we need. - MergingLogic logic; + DistributedPlanLogic logic; logic.shardsStage = this; if (_size > 0) { logic.mergingStage = DocumentSourceLimit::create(pExpCtx, _size); diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 7b1d45c1dcc..987f8f8ba94 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -58,7 +58,7 @@ public: return DepsTracker::State::SEE_NEXT; } - boost::optional<MergingLogic> mergingLogic() final; + boost::optional<DistributedPlanLogic> distributedPlanLogic() final; long long getSampleSize() const { return _size; diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index 0e24c6f420e..31f49bec10f 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -55,7 +55,7 @@ public: LookupRequirement::kAllowed}; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index c08be4d311f..19119a1a0f3 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -64,7 +64,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index f486a7ce2d8..44a624013e3 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -73,7 +73,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index 3b7b4b4cb0e..fdd430ccf6c 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -85,9 +85,9 @@ public: /** * The $skip stage must run on the merging half of the pipeline. */ - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + return DistributedPlanLogic{nullptr, this, boost::none}; } long long getSkip() const { diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 3bc7dd75aa3..88a05b076bb 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -531,8 +531,8 @@ int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const { return 0; } -boost::optional<DocumentSource::MergingLogic> DocumentSourceSort::mergingLogic() { - MergingLogic split; +boost::optional<DocumentSource::DistributedPlanLogic> DocumentSourceSort::distributedPlanLogic() { + DistributedPlanLogic split; split.shardsStage = this; split.inputSortPattern = serializeSortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson(); diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index e4d2071184d..ca68b71ad72 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -90,7 +90,7 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; - boost::optional<MergingLogic> mergingLogic() final; + boost::optional<DistributedPlanLogic> distributedPlanLogic() final; bool canRunInParallelBeforeOut( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final; diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 78936091362..b73d90a2d1a 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -131,9 +131,9 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { sort()->serializeToArray(arr); ASSERT_BSONOBJ_EQ(arr[0].getDocument().toBson(), BSON("$sort" << BSON("a" << 1))); - ASSERT(sort()->mergingLogic()); - ASSERT(sort()->mergingLogic()->shardsStage != nullptr); - ASSERT(sort()->mergingLogic()->mergingStage == nullptr); + ASSERT(sort()->distributedPlanLogic()); + ASSERT(sort()->distributedPlanLogic()->shardsStage != nullptr); + ASSERT(sort()->distributedPlanLogic()->mergingStage == nullptr); } container.push_back(DocumentSourceLimit::create(expCtx, 10)); @@ -159,10 +159,10 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { Value(arr), DOC_ARRAY(DOC("$sort" << DOC("a" << 1)) << DOC("$limit" << sort()->getLimit()))); - ASSERT(sort()->mergingLogic()); - ASSERT(sort()->mergingLogic()->shardsStage != nullptr); - ASSERT(sort()->mergingLogic()->mergingStage != nullptr); - ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic()->mergingStage.get())); + ASSERT(sort()->distributedPlanLogic()); + ASSERT(sort()->distributedPlanLogic()->shardsStage != nullptr); + ASSERT(sort()->distributedPlanLogic()->mergingStage != nullptr); + ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->distributedPlanLogic()->mergingStage.get())); } TEST_F(DocumentSourceSortTest, Dependencies) { diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index 8f701a16403..e9054678296 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -64,7 +64,7 @@ public: LookupRequirement::kAllowed}; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h index 56542e5e4fa..ba535906ea6 100644 --- a/src/mongo/db/pipeline/document_source_test_optimizations.h +++ b/src/mongo/db/pipeline/document_source_test_optimizations.h @@ -56,7 +56,7 @@ public: LookupRequirement::kAllowed}; } - virtual boost::optional<MergingLogic> mergingLogic() override { + virtual boost::optional<DistributedPlanLogic> distributedPlanLogic() override { return boost::none; } diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index 0582001e910..4cef6983564 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -61,7 +61,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 08392b4c0de..439d4d9ab88 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -407,7 +407,7 @@ bool Pipeline::requiredToRunOnMongos() const { for (auto&& stage : _sources) { // If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline // as a whole is not required to run on mongoS. - if (_splitState == SplitState::kUnsplit && stage->mergingLogic()) { + if (_splitState == SplitState::kUnsplit && stage->distributedPlanLogic()) { return false; } diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 7af165e2803..200f83ebc26 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -74,23 +74,23 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront(); // Check if this source is splittable. - auto mergeLogic = current->mergingLogic(); - if (!mergeLogic) { + auto distributedPlanLogic = current->distributedPlanLogic(); + if (!distributedPlanLogic) { // Move the source from the merger _sources to the shard _sources. shardPipe->push_back(current); continue; } // A source may not simultaneously be present on both sides of the split. - invariant(mergeLogic->shardsStage != mergeLogic->mergingStage); + invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage); - if (mergeLogic->shardsStage) - shardPipe->push_back(std::move(mergeLogic->shardsStage)); + if (distributedPlanLogic->shardsStage) + shardPipe->push_back(std::move(distributedPlanLogic->shardsStage)); - if (mergeLogic->mergingStage) - mergePipe->addInitialSource(std::move(mergeLogic->mergingStage)); + if (distributedPlanLogic->mergingStage) + mergePipe->addInitialSource(std::move(distributedPlanLogic->mergingStage)); - return mergeLogic->inputSortPattern; + return distributedPlanLogic->inputSortPattern; } return boost::none; } @@ -289,7 +289,7 @@ ClusterClientCursorGuard convertPipelineToRouterStages( bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage, const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) { - if (stage->mergingLogic()) { + if (stage->distributedPlanLogic()) { return stage->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage); } else { // This stage is fine to execute in parallel on each stream. For example, a $match can be diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index efc3d707a53..60cdf7d25f6 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -95,7 +95,7 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h index 90a68fc230d..5b00ece5e52 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -73,7 +73,7 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { return boost::none; } |