From 87194fbe0c24525bc1f2d674012fe6978eca77d2 Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Thu, 29 Nov 2018 09:56:42 -0500 Subject: SERVER-38311 Change out merging strategy Allows an $out stage to run in parallel on all shards if the target collection is sharded and so is the input collection to the aggregate. --- src/mongo/db/commands/pipeline_command.cpp | 5 +- src/mongo/db/pipeline/document_source.h | 94 +++++++++------------- .../db/pipeline/document_source_bucket_auto.h | 10 +-- .../document_source_change_stream_close_cursor.h | 11 +-- .../document_source_change_stream_transform.h | 5 ++ .../db/pipeline/document_source_check_invalidate.h | 4 + .../pipeline/document_source_check_resume_token.h | 28 +++---- src/mongo/db/pipeline/document_source_coll_stats.h | 4 + src/mongo/db/pipeline/document_source_current_op.h | 4 + src/mongo/db/pipeline/document_source_cursor.h | 4 + src/mongo/db/pipeline/document_source_exchange.h | 4 + src/mongo/db/pipeline/document_source_facet.h | 10 +-- src/mongo/db/pipeline/document_source_geo_near.cpp | 5 +- src/mongo/db/pipeline/document_source_geo_near.h | 11 +-- .../db/pipeline/document_source_graph_lookup.h | 15 ++-- src/mongo/db/pipeline/document_source_group.cpp | 9 +-- src/mongo/db/pipeline/document_source_group.h | 6 +- .../db/pipeline/document_source_group_test.cpp | 13 ++- .../db/pipeline/document_source_index_stats.h | 4 + ...document_source_internal_inhibit_optimization.h | 4 + .../document_source_internal_split_pipeline.h | 12 +-- src/mongo/db/pipeline/document_source_limit.h | 23 ++---- .../document_source_list_cached_and_active_users.h | 4 + .../pipeline/document_source_list_local_sessions.h | 4 + src/mongo/db/pipeline/document_source_lookup.h | 11 +-- .../document_source_lookup_change_post_image.h | 4 + src/mongo/db/pipeline/document_source_match.h | 4 + src/mongo/db/pipeline/document_source_mock.h | 4 + src/mongo/db/pipeline/document_source_out.h | 39 ++++++--- .../db/pipeline/document_source_plan_cache_stats.h | 4 + src/mongo/db/pipeline/document_source_redact.h | 4 + src/mongo/db/pipeline/document_source_sample.cpp | 25 +++--- src/mongo/db/pipeline/document_source_sample.h | 5 +- .../document_source_sample_from_random_cursor.h | 4 + .../document_source_sequential_document_cache.h | 4 + ...ocument_source_single_document_transformation.h | 4 + src/mongo/db/pipeline/document_source_skip.h | 13 +-- src/mongo/db/pipeline/document_source_sort.cpp | 15 ++-- src/mongo/db/pipeline/document_source_sort.h | 5 +- .../db/pipeline/document_source_sort_test.cpp | 12 +-- .../db/pipeline/document_source_tee_consumer.h | 4 + .../pipeline/document_source_test_optimizations.h | 4 + src/mongo/db/pipeline/document_source_unwind.h | 4 + .../db/pipeline/document_source_watch_for_uuid.h | 4 + src/mongo/db/pipeline/mongo_process_interface.h | 4 +- src/mongo/db/pipeline/mongos_process_interface.cpp | 3 - src/mongo/db/pipeline/pipeline.cpp | 3 +- .../db/pipeline/process_interface_standalone.cpp | 12 ++- src/mongo/db/pipeline/sharded_agg_helpers.cpp | 20 +++-- 49 files changed, 293 insertions(+), 215 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 06a91ad79f0..03face35cec 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -81,9 +81,8 @@ public: // Aggregations that are run directly against a collection allow any read concern. // Otherwise, if the aggregate is collectionless then the read concern must be 'local' // (e.g. $currentOp). The exception to this is a $changeStream on a whole database, - // which is - // considered collectionless but must be read concern 'majority'. Further read concern - // validation is done one the pipeline is parsed. + // which is considered collectionless but must be read concern 'majority'. Further read + // concern validation is done one the pipeline is parsed. return level == repl::ReadConcernLevel::kLocalReadConcern || level == repl::ReadConcernLevel::kMajorityReadConcern || !AggregationRequest::parseNs(_dbName, _request.body).isCollectionlessAggregateNS(); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 2c3e6ccdd84..94498c3647c 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -216,6 +216,25 @@ public: Document _result; }; + /** + * A struct representing the information needed to execute this stage on a distributed + * collection. Describes how a pipeline should be split for sharded execution. + */ + struct MergingLogic { + // A stage which executes on each shard in parallel, or nullptr if nothing can be done in + // parallel. For example, a partial $group before a subsequent global $group. + boost::intrusive_ptr shardsStage = nullptr; + + // A stage which executes after merging all the results together, or nullptr if nothing is + // necessary after merging. For example, a $limit stage. + boost::intrusive_ptr mergingStage = nullptr; + + // If set, each document is expected to have sort key metadata which will be serialized in + // the '$sortKey' field. 'inputSortPattern' will then be used to describe which fields are + // ascending and which fields are descending when merging the streams together. + boost::optional inputSortPattern = boost::none; + }; + virtual ~DocumentSource() {} /** @@ -454,6 +473,26 @@ public: return DepsTracker::State::NOT_SUPPORTED; } + /** + * If this stage can be run in parallel across a distributed collection, returns boost::none. + * Otherwise, returns a struct representing what needs to be done to merge each shard's pipeline + * into a single stream of results. Must not mutate the existing source object; if different + * behaviour is required, a new source should be created and configured appropriately. It is an + * error for the returned MergingLogic to have identical pointers for 'shardsStage' and + * 'mergingStage'. + */ + virtual boost::optional mergingLogic() = 0; + + /** + * Returns true if it would be correct to execute this stage in parallel across the shards in + * cases where the final stage is an $out. For example, a $group stage which is just merging the + * groups from the shards can be run in parallel since it will preserve the shard key. + */ + virtual bool canRunInParallelBeforeOut( + const std::set& nameOfShardKeyFieldsUponEntryToStage) const { + return false; + } + protected: explicit DocumentSource(const boost::intrusive_ptr& pExpCtx); @@ -507,59 +546,4 @@ private: boost::optional explain = boost::none) const = 0; }; -/** - * This class marks DocumentSources that should be split between the merger and the shards. See - * Pipeline::Optimizations::Sharded::findSplitPoint() for details. - */ -class NeedsMergerDocumentSource { -public: - /** - * A struct representing the information needed to merge the cursors for the shards half of this - * pipeline. If 'inputSortPattern' is set, each document is expected to have sort key metadata - * which will be serialized in the '$sortKey' field. 'inputSortPattern' will then be used to - * describe which fields are ascending and which fields are descending when merging the streams - * together. - */ - struct MergingLogic { - MergingLogic(boost::intrusive_ptr&& mergingStage, - boost::optional inputSortPattern = boost::none) - : mergingStage(std::move(mergingStage)), inputSortPattern(inputSortPattern) {} - - boost::intrusive_ptr mergingStage; - boost::optional inputSortPattern; - }; - - /** - * Returns a source to be run on the shards, or NULL if no work should be done on the shards for - * this stage. Must not mutate the existing source object; if different behaviour is required in - * the split-pipeline case, a new source should be created and configured appropriately. It is - * an error for getShardSource() to return a pointer to the same object as getMergeSource(), - * since this can result in the source being stitched into both the shard and merge pipelines - * when the latter is executed on mongoS. - */ - virtual boost::intrusive_ptr getShardSource() = 0; - - /** - * Returns a struct representing what needs to be done to merge each shard's pipeline into a - * single stream of results. Must not mutate the existing source object; if different behaviour - * is required, a new source should be created and configured appropriately. It is an error for - * mergingLogic() to return a pointer to the same object as getShardSource(). - */ - virtual MergingLogic mergingLogic() = 0; - - /** - * Returns true if it would be correct to execute this stage in parallel across the shards in - * cases where the final stage is an $out. For example, a $group stage which is just merging the - * groups from the shards can be run in parallel since it will preserve the shard key. - */ - virtual bool canRunInParallelBeforeOut( - const std::set& nameOfShardKeyFieldsUponEntryToStage) const { - return false; - } - -protected: - // It is invalid to delete through a NeedsMergerDocumentSource-typed pointer. - virtual ~NeedsMergerDocumentSource() {} -}; - } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index e75d4d16558..cbcce1f46bc 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -43,7 +43,7 @@ namespace mongo { * The $bucketAuto stage takes a user-specified number of buckets and automatically determines * boundaries such that the values are approximately equally distributed between those buckets. */ -class DocumentSourceBucketAuto final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceBucketAuto final : public DocumentSource { public: Value serialize(boost::optional explain = boost::none) const final; DepsTracker::State getDependencies(DepsTracker* deps) const final; @@ -62,11 +62,9 @@ public: /** * The $bucketAuto stage must be run on the merging shard. */ - boost::intrusive_ptr getShardSource() final { - return nullptr; - } - MergingLogic mergingLogic() final { - return {this}; + boost::optional mergingLogic() final { + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, boost::none}; } static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index 4c3c8ee7960..b4e91e11c39 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -42,7 +42,7 @@ namespace mongo { * "invalidate" entries. * It is not intended to be created by the user. */ -class DocumentSourceCloseCursor final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceCloseCursor final : public DocumentSource { public: GetNextResult getNext() final; @@ -76,14 +76,11 @@ public: return new DocumentSourceCloseCursor(expCtx); } - boost::intrusive_ptr getShardSource() final { - return nullptr; - } - - MergingLogic mergingLogic() final { + boost::optional mergingLogic() final { // This stage must run on mongos to ensure it sees any invalidation in the correct order, // and to ensure that all remote cursors are cleaned up properly. - return {this, change_stream_constants::kSortSpec}; + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, change_stream_constants::kSortSpec}; } private: diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 63f516daad9..14c5f51a05f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -54,6 +54,11 @@ public: Value serialize(boost::optional explain) const; StageConstraints constraints(Pipeline::SplitState pipeState) const final; + + boost::optional mergingLogic() final { + return boost::none; + } + DocumentSource::GetNextResult getNext(); const char* getSourceName() const { return DocumentSourceChangeStream::kStageName.rawData(); diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h index 863199016db..3a6ea3da51c 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -58,6 +58,10 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } + boost::optional mergingLogic() final { + return boost::none; + } + Value serialize(boost::optional explain = boost::none) const final { // This stage is created by the DocumentSourceChangeStream stage, so serializing it here // would result in it being created twice. diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index d919f19fa62..58052dce3bb 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -73,6 +73,10 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } + boost::optional mergingLogic() final { + return boost::none; + } + Value serialize(boost::optional explain = boost::none) const final; static boost::intrusive_ptr create( @@ -93,8 +97,7 @@ private: * This stage is used internally for change streams to ensure that the resume token is in the * stream. It is not intended to be created by the user. */ -class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource, - public NeedsMergerDocumentSource { +class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource { public: // Used to record the results of comparing the token data extracted from documents in the // resumed stream against the client's resume token. @@ -119,21 +122,18 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - /** - * NeedsMergerDocumentSource methods; this has to run on the merger, since the resume point - * could be at any shard. Also add a DocumentSourceShardCheckResumability stage on the shards - * pipeline to ensure that each shard has enough oplog history to resume the change stream. - */ - boost::intrusive_ptr getShardSource() final { - return DocumentSourceShardCheckResumability::create(pExpCtx, - _tokenFromClient.getClusterTime()); - }; - - MergingLogic mergingLogic() final { + boost::optional mergingLogic() final { + MergingLogic logic; // This stage must run on mongos to ensure it sees the resume token, which could have come // from any shard. We also must include a mergingPresorted $sort stage to communicate to // the AsyncResultsMerger that we need to merge the streams in a particular order. - return {this, change_stream_constants::kSortSpec}; + logic.mergingStage = this; + // Also add logic to the shards to ensure that each shard has enough oplog history to resume + // the change stream. + logic.shardsStage = DocumentSourceShardCheckResumability::create( + pExpCtx, _tokenFromClient.getClusterTime()); + logic.inputSortPattern = change_stream_constants::kSortSpec; + return logic; }; Value serialize(boost::optional explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index b71898dbca9..d306a867475 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -88,6 +88,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + Value serialize(boost::optional explain = boost::none) const final; static boost::intrusive_ptr createFromBson( diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index 19854342516..fd2ef5e778e 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -125,6 +125,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + static boost::intrusive_ptr createFromBson( BSONElement spec, const boost::intrusive_ptr& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 90f3c6d4f1c..2e7a87e004d 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -70,6 +70,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + void detachFromOperationContext() final; void reattachToOperationContext(OperationContext* opCtx) final; diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index 071c04ec32d..414ef2b2f78 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -214,6 +214,10 @@ public: TransactionRequirement::kAllowed}; } + boost::optional mergingLogic() final { + return boost::none; + } + const char* getSourceName() const final; Value serialize(boost::optional explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index bc8f8dad307..570e55b1907 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -57,7 +57,7 @@ class NamespaceString; * stage which will produce a document like the following: * {facetA: [], facetB: []}. */ -class DocumentSourceFacet final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceFacet final : public DocumentSource { public: struct FacetPipeline { FacetPipeline(std::string name, std::unique_ptr pipeline) @@ -126,11 +126,9 @@ public: * TODO SERVER-24154: Should be smarter about splitting so that parts of the sub-pipelines can * potentially be run in parallel on multiple shards. */ - boost::intrusive_ptr getShardSource() final { - return nullptr; - } - MergingLogic mergingLogic() final { - return {this}; + boost::optional mergingLogic() final { + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, boost::none}; } const std::vector& getFacetPipelines() const { diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index 4a9cee80e3f..e80feb995bd 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -237,8 +237,9 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr& pExpCtx) : DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {} -NeedsMergerDocumentSource::MergingLogic DocumentSourceGeoNear::mergingLogic() { - return {nullptr, BSON(distanceField->fullPath() << 1)}; +boost::optional DocumentSourceGeoNear::mergingLogic() { + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{this, nullptr, BSON(distanceField->fullPath() << 1)}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index fa9989fe5c7..f4136a5b4d1 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -35,7 +35,7 @@ namespace mongo { -class DocumentSourceGeoNear : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceGeoNear : public DocumentSource { public: static constexpr StringData kKeyFieldName = "key"_sd; static constexpr auto kStageName = "$geoNear"; @@ -121,17 +121,10 @@ public: */ BSONObj asNearQuery(StringData nearFieldName) const; - /** - * This document source is sent as-is to the shards. - */ - boost::intrusive_ptr getShardSource() final { - return this; - } - /** * In a sharded cluster, this becomes a merge sort by distance, from nearest to furthest. */ - MergingLogic mergingLogic() final; + boost::optional mergingLogic() final; private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 1c7cdd55924..70d4b91e9d5 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -38,7 +38,7 @@ namespace mongo { -class DocumentSourceGraphLookUp final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceGraphLookUp final : public DocumentSource { public: static std::unique_ptr liteParse( const AggregationRequest& request, const BSONElement& spec); @@ -74,19 +74,16 @@ public: return constraints; } + boost::optional mergingLogic() final { + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, boost::none}; + } + DepsTracker::State getDependencies(DepsTracker* deps) const final { _startWith->addDependencies(deps); return DepsTracker::State::SEE_NEXT; }; - boost::intrusive_ptr getShardSource() final { - return nullptr; - } - - MergingLogic mergingLogic() final { - return {this}; - } - void addInvolvedCollections(std::vector* collections) const final { collections->push_back(_from); } diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 318a67df1ff..dee7cd104d4 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -950,11 +950,7 @@ Document DocumentSourceGroup::makeDocument(const Value& id, return out.freeze(); } -intrusive_ptr DocumentSourceGroup::getShardSource() { - return this; // No modifications necessary when on shard -} - -NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() { +boost::optional DocumentSourceGroup::mergingLogic() { intrusive_ptr mergingGroup(new DocumentSourceGroup(pExpCtx)); mergingGroup->setDoingMerge(true); @@ -973,7 +969,8 @@ NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() { mergingGroup->addAccumulator(copiedAccumuledField); } - return {mergingGroup}; + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{this, mergingGroup, boost::none}; } bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const { diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index a1d6fdad059..6348a83ab46 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -87,7 +87,7 @@ private: std::string _groupId; }; -class DocumentSourceGroup final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceGroup final : public DocumentSource { public: using Accumulators = std::vector>; using GroupsMap = ValueUnorderedMap; @@ -162,9 +162,7 @@ public: */ bool usedDisk() final; - // Virtuals for NeedsMergerDocumentSource. - boost::intrusive_ptr getShardSource() final; - MergingLogic mergingLogic() final; + boost::optional mergingLogic() final; bool canRunInParallelBeforeOut( const std::set& nameOfShardKeyFieldsUponEntryToStage) const final; diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index bde6dfdeb32..d07f41889ff 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -582,13 +582,12 @@ protected: intrusive_ptr createMerger() { // Set up a group merger to simulate merging results in the router. In this // case only one shard is in use. - NeedsMergerDocumentSource* splittable = dynamic_cast(group()); - ASSERT(splittable); - auto mergeLogic = splittable->mergingLogic(); - ASSERT(mergeLogic.mergingStage); - ASSERT_NOT_EQUALS(group(), mergeLogic.mergingStage); - ASSERT_FALSE(static_cast(mergeLogic.inputSortPattern)); - return mergeLogic.mergingStage; + auto mergeLogic = group()->mergingLogic(); + ASSERT(mergeLogic); + ASSERT(mergeLogic->mergingStage); + ASSERT_NOT_EQUALS(group(), mergeLogic->mergingStage); + ASSERT_FALSE(static_cast(mergeLogic->inputSortPattern)); + return mergeLogic->mergingStage; } void checkResultSet(const intrusive_ptr& sink) { // Load the results from the DocumentSourceGroup and sort them by _id. diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 51d92262b0f..0713e85f368 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -83,6 +83,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + static boost::intrusive_ptr createFromBson( BSONElement elem, const boost::intrusive_ptr& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index 401087d12e6..b9e22e6cbf4 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -63,6 +63,10 @@ public: TransactionRequirement::kAllowed}; } + boost::optional mergingLogic() final { + return boost::none; + } + GetNextResult getNext() final; private: diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index 8f0a8780dbe..3abcc737db4 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -43,8 +43,7 @@ namespace mongo { * pipeline will be executed on mongoS if all other stages are eligible, and will be sent to a * random participating shard otherwise. */ -class DocumentSourceInternalSplitPipeline final : public DocumentSource, - public NeedsMergerDocumentSource { +class DocumentSourceInternalSplitPipeline final : public DocumentSource { public: static constexpr StringData kStageName = "$_internalSplitPipeline"_sd; @@ -60,12 +59,9 @@ public: return kStageName.rawData(); } - boost::intrusive_ptr getShardSource() final { - return nullptr; - } - - MergingLogic mergingLogic() final { - return {this}; + boost::optional mergingLogic() final { + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, boost::none}; } StageConstraints constraints(Pipeline::SplitState pipeState) const final { diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 885c0f5c875..78d325c9375 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -34,7 +34,7 @@ namespace mongo { -class DocumentSourceLimit final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceLimit final : public DocumentSource { public: static constexpr StringData kStageName = "$limit"_sd; @@ -81,21 +81,14 @@ public: } /** - * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on - * the shards is an optimization, but is not strictly necessary in order to produce correct - * pipeline output. + * Returns a MergingLogic with two identical $limit stages; one for the shards pipeline and one + * for the merging pipeline. */ - boost::intrusive_ptr getShardSource() final { - return this; - } - - /** - * Returns a new DocumentSourceLimit with the same limit as the current stage, for use in the - * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the - * merging host in order to produce correct pipeline output. - */ - MergingLogic mergingLogic() final { - return {DocumentSourceLimit::create(pExpCtx, _limit)}; + boost::optional mergingLogic() final { + // Running this stage on the shards is an optimization, but is not strictly necessary in + // order to produce correct pipeline output. + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{this, DocumentSourceLimit::create(pExpCtx, _limit), boost::none}; } long long getLimit() const { diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h index 47fbb2759cd..a6312e8a2b9 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h @@ -103,6 +103,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + static boost::intrusive_ptr createFromBson( BSONElement elem, const boost::intrusive_ptr& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 9e0fa9e7363..4c0aa30f4fc 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -117,6 +117,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + static boost::intrusive_ptr createFromBson( BSONElement elem, const boost::intrusive_ptr& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 29454ee5534..8d05b8286d8 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -47,7 +47,7 @@ namespace mongo { * Queries separate collection for equality matches with documents in the pipeline collection. * Adds matching documents to a new array field in the input document. */ -class DocumentSourceLookUp final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceLookUp final : public DocumentSource { public: static constexpr size_t kMaxSubPipelineDepth = 20; @@ -122,12 +122,9 @@ public: return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); } - boost::intrusive_ptr getShardSource() final { - return nullptr; - } - - MergingLogic mergingLogic() final { - return {this}; + boost::optional mergingLogic() final { + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, boost::none}; } void addInvolvedCollections(std::vector* collections) const final { diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 9e63d3f5dcd..cd2f8175e00 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -77,6 +77,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + DepsTracker::State getDependencies(DepsTracker* deps) const { // The namespace is not technically needed yet, but we will if there is more than one // collection involved. diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index fd08a52250c..d770006e2d4 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -162,6 +162,10 @@ public: const std::string& path, const boost::intrusive_ptr& expCtx); + boost::optional mergingLogic() final { + return boost::none; + } + protected: DocumentSourceMatch(const BSONObj& query, const boost::intrusive_ptr& expCtx); diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 617e0cf462c..b5d6dffde47 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -98,6 +98,10 @@ public: return {GetModPathsReturn::Type::kFiniteSet, std::set{}, {}}; } + boost::optional mergingLogic() override { + return boost::none; + } + // Return documents from front of queue. std::deque queue; diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 906a30f10ee..da09c728e79 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -65,7 +65,7 @@ public: /** * Abstract class for the $out aggregation stage. */ -class DocumentSourceOut : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceOut : public DocumentSource { public: /** * A "lite parsed" $out stage is similar to other stages involving foreign collections except in @@ -121,12 +121,22 @@ public: } StageConstraints constraints(Pipeline::SplitState pipeState) const final { + // A $out to an unsharded collection should merge on the primary shard to perform local + // writes. A $out to a sharded collection has no requirement, since each shard can perform + // its own portion of the write. We use 'kAnyShard' to direct it to execute on one of the + // shards in case some of the writes happen to end up being local. + // + // Note that this decision is inherently racy and subject to become stale. This is okay + // because either choice will work correctly, we are simply applying a heuristic + // optimization. + auto hostTypeRequirement = HostTypeRequirement::kPrimaryShard; + if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) && + _mode != WriteModeEnum::kModeReplaceCollection) { + hostTypeRequirement = HostTypeRequirement::kAnyShard; + } return {StreamType::kStreaming, PositionRequirement::kLast, - // A $out to an unsharded collection should merge on the primary shard to perform - // local writes. A $out to a sharded collection has no requirement, since each shard - // can perform its own portion of the write. - HostTypeRequirement::kPrimaryShard, + hostTypeRequirement, DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed}; @@ -140,12 +150,21 @@ public: return _mode; } - boost::intrusive_ptr getShardSource() final { - return nullptr; - } - MergingLogic mergingLogic() final { - return {this}; + boost::optional mergingLogic() final { + // It should always be faster to avoid splitting the pipeline if the output collection is + // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the + // target collection in parallel. + // + // Note that this decision is inherently racy and subject to become stale. This is okay + // because either choice will work correctly, we are simply applying a heuristic + // optimization. + if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) { + return boost::none; + } + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, boost::none}; } + virtual bool canRunInParallelBeforeOut( const std::set& nameOfShardKeyFieldsUponEntryToStage) const final { // If someone is asking the question, this must be the $out stage in question, so yes! diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h index a17225b1a60..3c1beb37e76 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h @@ -105,6 +105,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + const char* getSourceName() const override { return kStageName; } diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index 0c52b453e7f..70cc16e1492 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -52,6 +52,10 @@ public: ChangeStreamRequirement::kWhitelist}; } + boost::optional mergingLogic() final { + return boost::none; + } + /** * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact * stage. diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 90b2e82da1f..2728f5607f7 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -117,17 +117,22 @@ intrusive_ptr DocumentSourceSample::createFromBson( return sample; } -intrusive_ptr DocumentSourceSample::getShardSource() { - return this; -} -NeedsMergerDocumentSource::MergingLogic DocumentSourceSample::mergingLogic() { +boost::optional DocumentSourceSample::mergingLogic() { // On the merger we need to merge the pre-sorted documents by their random values, then limit to - // the number we need. Here we don't use 'randSortSpec' because it uses a metadata sort which - // the merging logic does not understand. The merging logic will use the serialized sort key, - // and this sort pattern is just used to communicate ascending/descending information. A pattern - // like {$meta: "randVal"} is neither ascending nor descending, and so will not be useful when - // constructing the merging logic. - return {_size > 0 ? DocumentSourceLimit::create(pExpCtx, _size) : nullptr, BSON("$rand" << -1)}; + // the number we need. + MergingLogic logic; + logic.shardsStage = this; + if (_size > 0) { + logic.mergingStage = DocumentSourceLimit::create(pExpCtx, _size); + } + + // Here we don't use 'randSortSpec' because it uses a metadata sort which the merging logic does + // not understand. The merging logic will use the serialized sort key, and this sort pattern is + // just used to communicate ascending/descending information. A pattern like {$meta: "randVal"} + // is neither ascending nor descending, and so will not be useful when constructing the merging + // logic. + logic.inputSortPattern = BSON("$rand" << -1); + return logic; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 11aa6fea8d0..eec90e7c24f 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -35,7 +35,7 @@ namespace mongo { -class DocumentSourceSample final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceSample final : public DocumentSource { public: static constexpr StringData kStageName = "$sample"_sd; @@ -58,8 +58,7 @@ public: return DepsTracker::State::SEE_NEXT; } - boost::intrusive_ptr getShardSource() final; - MergingLogic mergingLogic() final; + boost::optional mergingLogic() final; long long getSampleSize() const { return _size; diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index 60289140440..a095ade9fc0 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -55,6 +55,10 @@ public: TransactionRequirement::kAllowed}; } + boost::optional mergingLogic() final { + return boost::none; + } + static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx, long long size, diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index be9422bfceb..ea460b2e4c1 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -64,6 +64,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + GetNextResult getNext() final; static boost::intrusive_ptr create( diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 1b9407b9d6b..7f5d2196ab0 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -72,6 +72,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + TransformerInterface::TransformerType getType() const { return _parsedTransform->getType(); } diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index 883cde202bd..7a384d083ed 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -34,7 +34,7 @@ namespace mongo { -class DocumentSourceSkip final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceSkip final : public DocumentSource { public: static constexpr StringData kStageName = "$skip"_sd; @@ -84,11 +84,12 @@ public: return DepsTracker::State::SEE_NEXT; // This doesn't affect needed fields } - boost::intrusive_ptr getShardSource() final { - return nullptr; - } - MergingLogic mergingLogic() final { - return {this}; + /** + * The $skip stage must run on the merging half of the pipeline. + */ + boost::optional mergingLogic() final { + // {shardsStage, mergingStage, sortPattern} + return MergingLogic{nullptr, this, boost::none}; } long long getSkip() const { diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 475897471ed..aa48535f810 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -524,13 +524,14 @@ int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const { return 0; } -intrusive_ptr DocumentSourceSort::getShardSource() { - return this; -} - -NeedsMergerDocumentSource::MergingLogic DocumentSourceSort::mergingLogic() { - return {_limitSrc ? DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()) : nullptr, - sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson()}; +boost::optional DocumentSourceSort::mergingLogic() { + MergingLogic split; + split.shardsStage = this; + split.inputSortPattern = sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson(); + if (_limitSrc) { + split.mergingStage = DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()); + } + return split; } bool DocumentSourceSort::canRunInParallelBeforeOut( diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 4a07cc64f37..32bf2d3ee56 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -39,7 +39,7 @@ namespace mongo { -class DocumentSourceSort final : public DocumentSource, public NeedsMergerDocumentSource { +class DocumentSourceSort final : public DocumentSource { public: static constexpr StringData kStageName = "$sort"_sd; enum class SortKeySerialization { @@ -83,8 +83,7 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; - boost::intrusive_ptr getShardSource() final; - MergingLogic mergingLogic() final; + boost::optional mergingLogic() final; bool canRunInParallelBeforeOut( const std::set& nameOfShardKeyFieldsUponEntryToStage) const final; diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 91e54dc7aaf..4d0fb324e60 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -132,8 +132,9 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { sort()->serializeToArray(arr); ASSERT_BSONOBJ_EQ(arr[0].getDocument().toBson(), BSON("$sort" << BSON("a" << 1))); - ASSERT(sort()->getShardSource() != nullptr); - ASSERT(sort()->mergingLogic().mergingStage == nullptr); + ASSERT(sort()->mergingLogic()); + ASSERT(sort()->mergingLogic()->shardsStage != nullptr); + ASSERT(sort()->mergingLogic()->mergingStage == nullptr); } container.push_back(DocumentSourceLimit::create(expCtx, 10)); @@ -159,9 +160,10 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { Value(arr), DOC_ARRAY(DOC("$sort" << DOC("a" << 1)) << DOC("$limit" << sort()->getLimit()))); - ASSERT(sort()->getShardSource() != nullptr); - ASSERT(sort()->mergingLogic().mergingStage != nullptr); - ASSERT(dynamic_cast(sort()->mergingLogic().mergingStage.get())); + ASSERT(sort()->mergingLogic()); + ASSERT(sort()->mergingLogic()->shardsStage != nullptr); + ASSERT(sort()->mergingLogic()->mergingStage != nullptr); + ASSERT(dynamic_cast(sort()->mergingLogic()->mergingStage.get())); } TEST_F(DocumentSourceSortTest, Dependencies) { diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index a642533fdf7..04a7f96fa31 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -64,6 +64,10 @@ public: TransactionRequirement::kAllowed}; } + boost::optional mergingLogic() final { + return boost::none; + } + GetNextResult getNext() final; /** diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h index b58b35566e4..8739dea49d2 100644 --- a/src/mongo/db/pipeline/document_source_test_optimizations.h +++ b/src/mongo/db/pipeline/document_source_test_optimizations.h @@ -56,6 +56,10 @@ public: TransactionRequirement::kNotAllowed}; } + virtual boost::optional mergingLogic() override { + return boost::none; + } + virtual GetModPathsReturn getModifiedPaths() const override { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index 787249aec6e..8317bf229dd 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -60,6 +60,10 @@ public: return constraints; } + boost::optional mergingLogic() final { + return boost::none; + } + DepsTracker::State getDependencies(DepsTracker* deps) const final; /** diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.h b/src/mongo/db/pipeline/document_source_watch_for_uuid.h index 1ff653378c1..6570454fb05 100644 --- a/src/mongo/db/pipeline/document_source_watch_for_uuid.h +++ b/src/mongo/db/pipeline/document_source_watch_for_uuid.h @@ -63,6 +63,10 @@ public: return Value(); } + boost::optional mergingLogic() final { + return boost::none; + } + static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx) { // Only created for a single-collection stream where the UUID does not exist. diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index b01585619b6..21a5d73c94d 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -161,7 +161,9 @@ public: BSONObjBuilder* builder) const = 0; /** - * Gets the collection options for the collection given by 'nss'. + * Gets the collection options for the collection given by 'nss'. Throws + * ErrorCodes::CommandNotSupportedOnView if 'nss' describes a view. Future callers may want to + * parameterize this behavior. */ virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0; diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 896554f2dc2..9874ed7ba2f 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -28,8 +28,6 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand - #include "mongo/platform/basic.h" #include "mongo/db/pipeline/mongos_process_interface.h" @@ -54,7 +52,6 @@ #include "mongo/s/query/router_exec_stage.h" #include "mongo/s/transaction_router.h" #include "mongo/util/fail_point.h" -#include "mongo/util/log.h" namespace mongo { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 725c655be44..1fddc890a17 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -390,8 +390,7 @@ bool Pipeline::requiredToRunOnMongos() const { for (auto&& stage : _sources) { // If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline // as a whole is not required to run on mongoS. - if (_splitState == SplitState::kUnsplit && - dynamic_cast(stage.get())) { + if (_splitState == SplitState::kUnsplit && stage->mergingLogic()) { return false; } diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 56150b9535e..e394a715592 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -147,7 +147,8 @@ DBClientBase* MongoInterfaceStandalone::directClient() { } bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - AutoGetCollectionForRead autoColl(opCtx, nss); + Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); + Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS); const auto metadata = CollectionShardingState::get(opCtx, nss)->getCurrentMetadata(); return metadata->isSharded(); } @@ -281,7 +282,14 @@ Status MongoInterfaceStandalone::appendRecordCount(OperationContext* opCtx, BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) { const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); + if (infos.empty()) { + return BSONObj(); + } + const auto& infoObj = infos.front(); + uassert(ErrorCodes::CommandNotSupportedOnView, + str::stream() << nss.toString() << " is a view, not a collection", + infoObj["type"].valueStringData() != "view"_sd); + return infoObj.getObjectField("options").getOwned(); } void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index cb1f44f7092..d5bce605c9d 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -26,7 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" @@ -34,6 +34,7 @@ #include "mongo/db/curop.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_out.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/query/cluster_query_knobs.h" @@ -186,10 +187,15 @@ BSONObj createCommandForTargetedShards( // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4. targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream()); - // For split pipelines which need merging, do *not* propagate the writeConcern to the shards - // part. Otherwise this is part of an exchange and in that case we should include the - // writeConcern. - targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); + // If there aren't any stages like $out in the pipeline being sent to the shards, remove the + // write concern. The write concern should only be applied when there are writes performed + // to avoid mistakenly waiting for writes which didn't happen. + const auto& shardsPipe = splitPipeline.shardsPipeline->getSources(); + if (!std::any_of(shardsPipe.begin(), shardsPipe.end(), [](const auto& stage) { + return stage->constraints().writesPersistentData(); + })) { + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); + } } targetedCmd[AggregationRequest::kCursorName] = @@ -261,6 +267,10 @@ DispatchShardPipelineResults dispatchShardPipeline( boost::optional splitPipeline; if (needsSplit) { + LOG(5) << "Splitting pipeline: " + << "targeting = " << shardIds.size() + << " shards, needsMongosMerge = " << needsMongosMerge + << ", needsPrimaryShardMerge = " << needsPrimaryShardMerge; splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( -- cgit v1.2.1