summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-11-29 09:56:42 -0500
committerCharlie Swanson <charlie.swanson@mongodb.com>2019-02-13 07:30:10 -0500
commit87194fbe0c24525bc1f2d674012fe6978eca77d2 (patch)
tree0f13b14046152f0b475ae16d9a95b5e2ba0c4cbb /src/mongo/db/pipeline
parent69f26fa3798b0d7927858aa704243cdac676c6e9 (diff)
downloadmongo-87194fbe0c24525bc1f2d674012fe6978eca77d2.tar.gz
SERVER-38311 Change out merging strategy
Allows an $out stage to run in parallel on all shards if the target collection is sharded and so is the input collection to the aggregate.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source.h94
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h5
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.h4
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h28
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h4
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h4
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.h4
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h10
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h11
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h15
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_group.h6
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h4
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h12
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h23
-rw-r--r--src/mongo/db/pipeline/document_source_list_cached_and_active_users.h4
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h11
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h4
-rw-r--r--src/mongo/db/pipeline/document_source_match.h4
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h4
-rw-r--r--src/mongo/db/pipeline/document_source_out.h39
-rw-r--r--src/mongo/db/pipeline/document_source_plan_cache_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h4
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp25
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h5
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.h4
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h4
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h4
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h13
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp15
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h5
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.h4
-rw-r--r--src/mongo/db/pipeline/document_source_test_optimizations.h4
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h4
-rw-r--r--src/mongo/db/pipeline/document_source_watch_for_uuid.h4
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp3
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp3
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp12
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp20
48 files changed, 291 insertions, 212 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 2c3e6ccdd84..94498c3647c 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -216,6 +216,25 @@ public:
Document _result;
};
+ /**
+ * A struct representing the information needed to execute this stage on a distributed
+ * collection. Describes how a pipeline should be split for sharded execution.
+ */
+ struct MergingLogic {
+ // A stage which executes on each shard in parallel, or nullptr if nothing can be done in
+ // parallel. For example, a partial $group before a subsequent global $group.
+ boost::intrusive_ptr<DocumentSource> shardsStage = nullptr;
+
+ // A stage which executes after merging all the results together, or nullptr if nothing is
+ // necessary after merging. For example, a $limit stage.
+ boost::intrusive_ptr<DocumentSource> mergingStage = nullptr;
+
+ // If set, each document is expected to have sort key metadata which will be serialized in
+ // the '$sortKey' field. 'inputSortPattern' will then be used to describe which fields are
+ // ascending and which fields are descending when merging the streams together.
+ boost::optional<BSONObj> inputSortPattern = boost::none;
+ };
+
virtual ~DocumentSource() {}
/**
@@ -454,6 +473,26 @@ public:
return DepsTracker::State::NOT_SUPPORTED;
}
+ /**
+ * If this stage can be run in parallel across a distributed collection, returns boost::none.
+ * Otherwise, returns a struct representing what needs to be done to merge each shard's pipeline
+ * into a single stream of results. Must not mutate the existing source object; if different
+ * behaviour is required, a new source should be created and configured appropriately. It is an
+ * error for the returned MergingLogic to have identical pointers for 'shardsStage' and
+ * 'mergingStage'.
+ */
+ virtual boost::optional<MergingLogic> mergingLogic() = 0;
+
+ /**
+ * Returns true if it would be correct to execute this stage in parallel across the shards in
+ * cases where the final stage is an $out. For example, a $group stage which is just merging the
+ * groups from the shards can be run in parallel since it will preserve the shard key.
+ */
+ virtual bool canRunInParallelBeforeOut(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
+ return false;
+ }
+
protected:
explicit DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
@@ -507,59 +546,4 @@ private:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const = 0;
};
-/**
- * This class marks DocumentSources that should be split between the merger and the shards. See
- * Pipeline::Optimizations::Sharded::findSplitPoint() for details.
- */
-class NeedsMergerDocumentSource {
-public:
- /**
- * A struct representing the information needed to merge the cursors for the shards half of this
- * pipeline. If 'inputSortPattern' is set, each document is expected to have sort key metadata
- * which will be serialized in the '$sortKey' field. 'inputSortPattern' will then be used to
- * describe which fields are ascending and which fields are descending when merging the streams
- * together.
- */
- struct MergingLogic {
- MergingLogic(boost::intrusive_ptr<DocumentSource>&& mergingStage,
- boost::optional<BSONObj> inputSortPattern = boost::none)
- : mergingStage(std::move(mergingStage)), inputSortPattern(inputSortPattern) {}
-
- boost::intrusive_ptr<DocumentSource> mergingStage;
- boost::optional<BSONObj> inputSortPattern;
- };
-
- /**
- * Returns a source to be run on the shards, or NULL if no work should be done on the shards for
- * this stage. Must not mutate the existing source object; if different behaviour is required in
- * the split-pipeline case, a new source should be created and configured appropriately. It is
- * an error for getShardSource() to return a pointer to the same object as getMergeSource(),
- * since this can result in the source being stitched into both the shard and merge pipelines
- * when the latter is executed on mongoS.
- */
- virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0;
-
- /**
- * Returns a struct representing what needs to be done to merge each shard's pipeline into a
- * single stream of results. Must not mutate the existing source object; if different behaviour
- * is required, a new source should be created and configured appropriately. It is an error for
- * mergingLogic() to return a pointer to the same object as getShardSource().
- */
- virtual MergingLogic mergingLogic() = 0;
-
- /**
- * Returns true if it would be correct to execute this stage in parallel across the shards in
- * cases where the final stage is an $out. For example, a $group stage which is just merging the
- * groups from the shards can be run in parallel since it will preserve the shard key.
- */
- virtual bool canRunInParallelBeforeOut(
- const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
- return false;
- }
-
-protected:
- // It is invalid to delete through a NeedsMergerDocumentSource-typed pointer.
- virtual ~NeedsMergerDocumentSource() {}
-};
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index e75d4d16558..cbcce1f46bc 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -43,7 +43,7 @@ namespace mongo {
* The $bucketAuto stage takes a user-specified number of buckets and automatically determines
* boundaries such that the values are approximately equally distributed between those buckets.
*/
-class DocumentSourceBucketAuto final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceBucketAuto final : public DocumentSource {
public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
@@ -62,11 +62,9 @@ public:
/**
* The $bucketAuto stage must be run on the merging shard.
*/
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
- MergingLogic mergingLogic() final {
- return {this};
+ boost::optional<MergingLogic> mergingLogic() final {
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, boost::none};
}
static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
index 4c3c8ee7960..b4e91e11c39 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
@@ -42,7 +42,7 @@ namespace mongo {
* "invalidate" entries.
* It is not intended to be created by the user.
*/
-class DocumentSourceCloseCursor final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceCloseCursor final : public DocumentSource {
public:
GetNextResult getNext() final;
@@ -76,14 +76,11 @@ public:
return new DocumentSourceCloseCursor(expCtx);
}
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
-
- MergingLogic mergingLogic() final {
+ boost::optional<MergingLogic> mergingLogic() final {
// This stage must run on mongos to ensure it sees any invalidation in the correct order,
// and to ensure that all remote cursors are cleaned up properly.
- return {this, change_stream_constants::kSortSpec};
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, change_stream_constants::kSortSpec};
}
private:
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 63f516daad9..14c5f51a05f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -54,6 +54,11 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const;
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
+
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
DocumentSource::GetNextResult getNext();
const char* getSourceName() const {
return DocumentSourceChangeStream::kStageName.rawData();
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h
index 863199016db..3a6ea3da51c 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.h
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.h
@@ -58,6 +58,10 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
// This stage is created by the DocumentSourceChangeStream stage, so serializing it here
// would result in it being created twice.
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index d919f19fa62..58052dce3bb 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -73,6 +73,10 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
@@ -93,8 +97,7 @@ private:
* This stage is used internally for change streams to ensure that the resume token is in the
* stream. It is not intended to be created by the user.
*/
-class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource,
- public NeedsMergerDocumentSource {
+class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource {
public:
// Used to record the results of comparing the token data extracted from documents in the
// resumed stream against the client's resume token.
@@ -119,21 +122,18 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- /**
- * NeedsMergerDocumentSource methods; this has to run on the merger, since the resume point
- * could be at any shard. Also add a DocumentSourceShardCheckResumability stage on the shards
- * pipeline to ensure that each shard has enough oplog history to resume the change stream.
- */
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return DocumentSourceShardCheckResumability::create(pExpCtx,
- _tokenFromClient.getClusterTime());
- };
-
- MergingLogic mergingLogic() final {
+ boost::optional<MergingLogic> mergingLogic() final {
+ MergingLogic logic;
// This stage must run on mongos to ensure it sees the resume token, which could have come
// from any shard. We also must include a mergingPresorted $sort stage to communicate to
// the AsyncResultsMerger that we need to merge the streams in a particular order.
- return {this, change_stream_constants::kSortSpec};
+ logic.mergingStage = this;
+ // Also add logic to the shards to ensure that each shard has enough oplog history to resume
+ // the change stream.
+ logic.shardsStage = DocumentSourceShardCheckResumability::create(
+ pExpCtx, _tokenFromClient.getClusterTime());
+ logic.inputSortPattern = change_stream_constants::kSortSpec;
+ return logic;
};
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index b71898dbca9..d306a867475 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -88,6 +88,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index 19854342516..fd2ef5e778e 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -125,6 +125,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 90f3c6d4f1c..2e7a87e004d 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -70,6 +70,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
void detachFromOperationContext() final;
void reattachToOperationContext(OperationContext* opCtx) final;
diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h
index 071c04ec32d..414ef2b2f78 100644
--- a/src/mongo/db/pipeline/document_source_exchange.h
+++ b/src/mongo/db/pipeline/document_source_exchange.h
@@ -214,6 +214,10 @@ public:
TransactionRequirement::kAllowed};
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index bc8f8dad307..570e55b1907 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -57,7 +57,7 @@ class NamespaceString;
* stage which will produce a document like the following:
* {facetA: [<all input documents except the first one>], facetB: [<the first document>]}.
*/
-class DocumentSourceFacet final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceFacet final : public DocumentSource {
public:
struct FacetPipeline {
FacetPipeline(std::string name, std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
@@ -126,11 +126,9 @@ public:
* TODO SERVER-24154: Should be smarter about splitting so that parts of the sub-pipelines can
* potentially be run in parallel on multiple shards.
*/
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
- MergingLogic mergingLogic() final {
- return {this};
+ boost::optional<MergingLogic> mergingLogic() final {
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, boost::none};
}
const std::vector<FacetPipeline>& getFacetPipelines() const {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index 4a9cee80e3f..e80feb995bd 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -237,8 +237,9 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con
DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {}
-NeedsMergerDocumentSource::MergingLogic DocumentSourceGeoNear::mergingLogic() {
- return {nullptr, BSON(distanceField->fullPath() << 1)};
+boost::optional<DocumentSource::MergingLogic> DocumentSourceGeoNear::mergingLogic() {
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{this, nullptr, BSON(distanceField->fullPath() << 1)};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index fa9989fe5c7..f4136a5b4d1 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -35,7 +35,7 @@
namespace mongo {
-class DocumentSourceGeoNear : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceGeoNear : public DocumentSource {
public:
static constexpr StringData kKeyFieldName = "key"_sd;
static constexpr auto kStageName = "$geoNear";
@@ -122,16 +122,9 @@ public:
BSONObj asNearQuery(StringData nearFieldName) const;
/**
- * This document source is sent as-is to the shards.
- */
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return this;
- }
-
- /**
* In a sharded cluster, this becomes a merge sort by distance, from nearest to furthest.
*/
- MergingLogic mergingLogic() final;
+ boost::optional<MergingLogic> mergingLogic() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 1c7cdd55924..70d4b91e9d5 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -38,7 +38,7 @@
namespace mongo {
-class DocumentSourceGraphLookUp final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceGraphLookUp final : public DocumentSource {
public:
static std::unique_ptr<LiteParsedDocumentSourceForeignCollections> liteParse(
const AggregationRequest& request, const BSONElement& spec);
@@ -74,19 +74,16 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, boost::none};
+ }
+
DepsTracker::State getDependencies(DepsTracker* deps) const final {
_startWith->addDependencies(deps);
return DepsTracker::State::SEE_NEXT;
};
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
-
- MergingLogic mergingLogic() final {
- return {this};
- }
-
void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
collections->push_back(_from);
}
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 318a67df1ff..dee7cd104d4 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -950,11 +950,7 @@ Document DocumentSourceGroup::makeDocument(const Value& id,
return out.freeze();
}
-intrusive_ptr<DocumentSource> DocumentSourceGroup::getShardSource() {
- return this; // No modifications necessary when on shard
-}
-
-NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() {
+boost::optional<DocumentSource::MergingLogic> DocumentSourceGroup::mergingLogic() {
intrusive_ptr<DocumentSourceGroup> mergingGroup(new DocumentSourceGroup(pExpCtx));
mergingGroup->setDoingMerge(true);
@@ -973,7 +969,8 @@ NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() {
mergingGroup->addAccumulator(copiedAccumuledField);
}
- return {mergingGroup};
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{this, mergingGroup, boost::none};
}
bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const {
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index a1d6fdad059..6348a83ab46 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -87,7 +87,7 @@ private:
std::string _groupId;
};
-class DocumentSourceGroup final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceGroup final : public DocumentSource {
public:
using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>;
using GroupsMap = ValueUnorderedMap<Accumulators>;
@@ -162,9 +162,7 @@ public:
*/
bool usedDisk() final;
- // Virtuals for NeedsMergerDocumentSource.
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- MergingLogic mergingLogic() final;
+ boost::optional<MergingLogic> mergingLogic() final;
bool canRunInParallelBeforeOut(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final;
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index bde6dfdeb32..d07f41889ff 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -582,13 +582,12 @@ protected:
intrusive_ptr<DocumentSource> createMerger() {
// Set up a group merger to simulate merging results in the router. In this
// case only one shard is in use.
- NeedsMergerDocumentSource* splittable = dynamic_cast<NeedsMergerDocumentSource*>(group());
- ASSERT(splittable);
- auto mergeLogic = splittable->mergingLogic();
- ASSERT(mergeLogic.mergingStage);
- ASSERT_NOT_EQUALS(group(), mergeLogic.mergingStage);
- ASSERT_FALSE(static_cast<bool>(mergeLogic.inputSortPattern));
- return mergeLogic.mergingStage;
+ auto mergeLogic = group()->mergingLogic();
+ ASSERT(mergeLogic);
+ ASSERT(mergeLogic->mergingStage);
+ ASSERT_NOT_EQUALS(group(), mergeLogic->mergingStage);
+ ASSERT_FALSE(static_cast<bool>(mergeLogic->inputSortPattern));
+ return mergeLogic->mergingStage;
}
void checkResultSet(const intrusive_ptr<DocumentSource>& sink) {
// Load the results from the DocumentSourceGroup and sort them by _id.
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 51d92262b0f..0713e85f368 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -83,6 +83,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index 401087d12e6..b9e22e6cbf4 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -63,6 +63,10 @@ public:
TransactionRequirement::kAllowed};
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
GetNextResult getNext() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index 8f0a8780dbe..3abcc737db4 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -43,8 +43,7 @@ namespace mongo {
* pipeline will be executed on mongoS if all other stages are eligible, and will be sent to a
* random participating shard otherwise.
*/
-class DocumentSourceInternalSplitPipeline final : public DocumentSource,
- public NeedsMergerDocumentSource {
+class DocumentSourceInternalSplitPipeline final : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalSplitPipeline"_sd;
@@ -60,12 +59,9 @@ public:
return kStageName.rawData();
}
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
-
- MergingLogic mergingLogic() final {
- return {this};
+ boost::optional<MergingLogic> mergingLogic() final {
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, boost::none};
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index 885c0f5c875..78d325c9375 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -34,7 +34,7 @@
namespace mongo {
-class DocumentSourceLimit final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceLimit final : public DocumentSource {
public:
static constexpr StringData kStageName = "$limit"_sd;
@@ -81,21 +81,14 @@ public:
}
/**
- * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on
- * the shards is an optimization, but is not strictly necessary in order to produce correct
- * pipeline output.
+ * Returns a MergingLogic with two identical $limit stages; one for the shards pipeline and one
+ * for the merging pipeline.
*/
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return this;
- }
-
- /**
- * Returns a new DocumentSourceLimit with the same limit as the current stage, for use in the
- * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the
- * merging host in order to produce correct pipeline output.
- */
- MergingLogic mergingLogic() final {
- return {DocumentSourceLimit::create(pExpCtx, _limit)};
+ boost::optional<MergingLogic> mergingLogic() final {
+ // Running this stage on the shards is an optimization, but is not strictly necessary in
+ // order to produce correct pipeline output.
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{this, DocumentSourceLimit::create(pExpCtx, _limit), boost::none};
}
long long getLimit() const {
diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
index 47fbb2759cd..a6312e8a2b9 100644
--- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
+++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
@@ -103,6 +103,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index 9e0fa9e7363..4c0aa30f4fc 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -117,6 +117,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 29454ee5534..8d05b8286d8 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -47,7 +47,7 @@ namespace mongo {
* Queries separate collection for equality matches with documents in the pipeline collection.
* Adds matching documents to a new array field in the input document.
*/
-class DocumentSourceLookUp final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceLookUp final : public DocumentSource {
public:
static constexpr size_t kMaxSubPipelineDepth = 20;
@@ -122,12 +122,9 @@ public:
return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()});
}
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
-
- MergingLogic mergingLogic() final {
- return {this};
+ boost::optional<MergingLogic> mergingLogic() final {
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, boost::none};
}
void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index 9e63d3f5dcd..cd2f8175e00 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -77,6 +77,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
DepsTracker::State getDependencies(DepsTracker* deps) const {
// The namespace is not technically needed yet, but we will if there is more than one
// collection involved.
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index fd08a52250c..d770006e2d4 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -162,6 +162,10 @@ public:
const std::string& path,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
protected:
DocumentSourceMatch(const BSONObj& query,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 617e0cf462c..b5d6dffde47 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -98,6 +98,10 @@ public:
return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
}
+ boost::optional<MergingLogic> mergingLogic() override {
+ return boost::none;
+ }
+
// Return documents from front of queue.
std::deque<GetNextResult> queue;
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 906a30f10ee..da09c728e79 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -65,7 +65,7 @@ public:
/**
* Abstract class for the $out aggregation stage.
*/
-class DocumentSourceOut : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceOut : public DocumentSource {
public:
/**
* A "lite parsed" $out stage is similar to other stages involving foreign collections except in
@@ -121,12 +121,22 @@ public:
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ // A $out to an unsharded collection should merge on the primary shard to perform local
+ // writes. A $out to a sharded collection has no requirement, since each shard can perform
+ // its own portion of the write. We use 'kAnyShard' to direct it to execute on one of the
+ // shards in case some of the writes happen to end up being local.
+ //
+ // Note that this decision is inherently racy and subject to become stale. This is okay
+ // because either choice will work correctly, we are simply applying a heuristic
+ // optimization.
+ auto hostTypeRequirement = HostTypeRequirement::kPrimaryShard;
+ if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) &&
+ _mode != WriteModeEnum::kModeReplaceCollection) {
+ hostTypeRequirement = HostTypeRequirement::kAnyShard;
+ }
return {StreamType::kStreaming,
PositionRequirement::kLast,
- // A $out to an unsharded collection should merge on the primary shard to perform
- // local writes. A $out to a sharded collection has no requirement, since each shard
- // can perform its own portion of the write.
- HostTypeRequirement::kPrimaryShard,
+ hostTypeRequirement,
DiskUseRequirement::kWritesPersistentData,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed};
@@ -140,12 +150,21 @@ public:
return _mode;
}
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
- MergingLogic mergingLogic() final {
- return {this};
+ boost::optional<MergingLogic> mergingLogic() final {
+ // It should always be faster to avoid splitting the pipeline if the output collection is
+ // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the
+ // target collection in parallel.
+ //
+ // Note that this decision is inherently racy and subject to become stale. This is okay
+ // because either choice will work correctly, we are simply applying a heuristic
+ // optimization.
+ if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) {
+ return boost::none;
+ }
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, boost::none};
}
+
virtual bool canRunInParallelBeforeOut(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final {
// If someone is asking the question, this must be the $out stage in question, so yes!
diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
index a17225b1a60..3c1beb37e76 100644
--- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h
+++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
@@ -105,6 +105,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
const char* getSourceName() const override {
return kStageName;
}
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index 0c52b453e7f..70cc16e1492 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -52,6 +52,10 @@ public:
ChangeStreamRequirement::kWhitelist};
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
/**
* Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact
* stage.
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 90b2e82da1f..2728f5607f7 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -117,17 +117,22 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::createFromBson(
return sample;
}
-intrusive_ptr<DocumentSource> DocumentSourceSample::getShardSource() {
- return this;
-}
-NeedsMergerDocumentSource::MergingLogic DocumentSourceSample::mergingLogic() {
+boost::optional<DocumentSource::MergingLogic> DocumentSourceSample::mergingLogic() {
// On the merger we need to merge the pre-sorted documents by their random values, then limit to
- // the number we need. Here we don't use 'randSortSpec' because it uses a metadata sort which
- // the merging logic does not understand. The merging logic will use the serialized sort key,
- // and this sort pattern is just used to communicate ascending/descending information. A pattern
- // like {$meta: "randVal"} is neither ascending nor descending, and so will not be useful when
- // constructing the merging logic.
- return {_size > 0 ? DocumentSourceLimit::create(pExpCtx, _size) : nullptr, BSON("$rand" << -1)};
+ // the number we need.
+ MergingLogic logic;
+ logic.shardsStage = this;
+ if (_size > 0) {
+ logic.mergingStage = DocumentSourceLimit::create(pExpCtx, _size);
+ }
+
+ // Here we don't use 'randSortSpec' because it uses a metadata sort which the merging logic does
+ // not understand. The merging logic will use the serialized sort key, and this sort pattern is
+ // just used to communicate ascending/descending information. A pattern like {$meta: "randVal"}
+ // is neither ascending nor descending, and so will not be useful when constructing the merging
+ // logic.
+ logic.inputSortPattern = BSON("$rand" << -1);
+ return logic;
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index 11aa6fea8d0..eec90e7c24f 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -35,7 +35,7 @@
namespace mongo {
-class DocumentSourceSample final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceSample final : public DocumentSource {
public:
static constexpr StringData kStageName = "$sample"_sd;
@@ -58,8 +58,7 @@ public:
return DepsTracker::State::SEE_NEXT;
}
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- MergingLogic mergingLogic() final;
+ boost::optional<MergingLogic> mergingLogic() final;
long long getSampleSize() const {
return _size;
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
index 60289140440..a095ade9fc0 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
@@ -55,6 +55,10 @@ public:
TransactionRequirement::kAllowed};
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
long long size,
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index be9422bfceb..ea460b2e4c1 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -64,6 +64,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
GetNextResult getNext() final;
static boost::intrusive_ptr<DocumentSourceSequentialDocumentCache> create(
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 1b9407b9d6b..7f5d2196ab0 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -72,6 +72,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
TransformerInterface::TransformerType getType() const {
return _parsedTransform->getType();
}
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index 883cde202bd..7a384d083ed 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -34,7 +34,7 @@
namespace mongo {
-class DocumentSourceSkip final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceSkip final : public DocumentSource {
public:
static constexpr StringData kStageName = "$skip"_sd;
@@ -84,11 +84,12 @@ public:
return DepsTracker::State::SEE_NEXT; // This doesn't affect needed fields
}
- boost::intrusive_ptr<DocumentSource> getShardSource() final {
- return nullptr;
- }
- MergingLogic mergingLogic() final {
- return {this};
+ /**
+ * The $skip stage must run on the merging half of the pipeline.
+ */
+ boost::optional<MergingLogic> mergingLogic() final {
+ // {shardsStage, mergingStage, sortPattern}
+ return MergingLogic{nullptr, this, boost::none};
}
long long getSkip() const {
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 475897471ed..aa48535f810 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -524,13 +524,14 @@ int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const {
return 0;
}
-intrusive_ptr<DocumentSource> DocumentSourceSort::getShardSource() {
- return this;
-}
-
-NeedsMergerDocumentSource::MergingLogic DocumentSourceSort::mergingLogic() {
- return {_limitSrc ? DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()) : nullptr,
- sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson()};
+boost::optional<DocumentSource::MergingLogic> DocumentSourceSort::mergingLogic() {
+ MergingLogic split;
+ split.shardsStage = this;
+ split.inputSortPattern = sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson();
+ if (_limitSrc) {
+ split.mergingStage = DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit());
+ }
+ return split;
}
bool DocumentSourceSort::canRunInParallelBeforeOut(
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 4a07cc64f37..32bf2d3ee56 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -39,7 +39,7 @@
namespace mongo {
-class DocumentSourceSort final : public DocumentSource, public NeedsMergerDocumentSource {
+class DocumentSourceSort final : public DocumentSource {
public:
static constexpr StringData kStageName = "$sort"_sd;
enum class SortKeySerialization {
@@ -83,8 +83,7 @@ public:
DepsTracker::State getDependencies(DepsTracker* deps) const final;
- boost::intrusive_ptr<DocumentSource> getShardSource() final;
- MergingLogic mergingLogic() final;
+ boost::optional<MergingLogic> mergingLogic() final;
bool canRunInParallelBeforeOut(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final;
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index 91e54dc7aaf..4d0fb324e60 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -132,8 +132,9 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
sort()->serializeToArray(arr);
ASSERT_BSONOBJ_EQ(arr[0].getDocument().toBson(), BSON("$sort" << BSON("a" << 1)));
- ASSERT(sort()->getShardSource() != nullptr);
- ASSERT(sort()->mergingLogic().mergingStage == nullptr);
+ ASSERT(sort()->mergingLogic());
+ ASSERT(sort()->mergingLogic()->shardsStage != nullptr);
+ ASSERT(sort()->mergingLogic()->mergingStage == nullptr);
}
container.push_back(DocumentSourceLimit::create(expCtx, 10));
@@ -159,9 +160,10 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
Value(arr),
DOC_ARRAY(DOC("$sort" << DOC("a" << 1)) << DOC("$limit" << sort()->getLimit())));
- ASSERT(sort()->getShardSource() != nullptr);
- ASSERT(sort()->mergingLogic().mergingStage != nullptr);
- ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic().mergingStage.get()));
+ ASSERT(sort()->mergingLogic());
+ ASSERT(sort()->mergingLogic()->shardsStage != nullptr);
+ ASSERT(sort()->mergingLogic()->mergingStage != nullptr);
+ ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic()->mergingStage.get()));
}
TEST_F(DocumentSourceSortTest, Dependencies) {
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h
index a642533fdf7..04a7f96fa31 100644
--- a/src/mongo/db/pipeline/document_source_tee_consumer.h
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.h
@@ -64,6 +64,10 @@ public:
TransactionRequirement::kAllowed};
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
GetNextResult getNext() final;
/**
diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h
index b58b35566e4..8739dea49d2 100644
--- a/src/mongo/db/pipeline/document_source_test_optimizations.h
+++ b/src/mongo/db/pipeline/document_source_test_optimizations.h
@@ -56,6 +56,10 @@ public:
TransactionRequirement::kNotAllowed};
}
+ virtual boost::optional<MergingLogic> mergingLogic() override {
+ return boost::none;
+ }
+
virtual GetModPathsReturn getModifiedPaths() const override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index 787249aec6e..8317bf229dd 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -60,6 +60,10 @@ public:
return constraints;
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
DepsTracker::State getDependencies(DepsTracker* deps) const final;
/**
diff --git a/src/mongo/db/pipeline/document_source_watch_for_uuid.h b/src/mongo/db/pipeline/document_source_watch_for_uuid.h
index 1ff653378c1..6570454fb05 100644
--- a/src/mongo/db/pipeline/document_source_watch_for_uuid.h
+++ b/src/mongo/db/pipeline/document_source_watch_for_uuid.h
@@ -63,6 +63,10 @@ public:
return Value();
}
+ boost::optional<MergingLogic> mergingLogic() final {
+ return boost::none;
+ }
+
static boost::intrusive_ptr<DocumentSourceWatchForUUID> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
// Only created for a single-collection stream where the UUID does not exist.
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index b01585619b6..21a5d73c94d 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -161,7 +161,9 @@ public:
BSONObjBuilder* builder) const = 0;
/**
- * Gets the collection options for the collection given by 'nss'.
+ * Gets the collection options for the collection given by 'nss'. Throws
+ * ErrorCodes::CommandNotSupportedOnView if 'nss' describes a view. Future callers may want to
+ * parameterize this behavior.
*/
virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0;
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 896554f2dc2..9874ed7ba2f 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -28,8 +28,6 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
-
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/mongos_process_interface.h"
@@ -54,7 +52,6 @@
#include "mongo/s/query/router_exec_stage.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/fail_point.h"
-#include "mongo/util/log.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 725c655be44..1fddc890a17 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -390,8 +390,7 @@ bool Pipeline::requiredToRunOnMongos() const {
for (auto&& stage : _sources) {
// If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline
// as a whole is not required to run on mongoS.
- if (_splitState == SplitState::kUnsplit &&
- dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) {
+ if (_splitState == SplitState::kUnsplit && stage->mergingLogic()) {
return false;
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 56150b9535e..e394a715592 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -147,7 +147,8 @@ DBClientBase* MongoInterfaceStandalone::directClient() {
}
bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- AutoGetCollectionForRead autoColl(opCtx, nss);
+ Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS);
+ Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS);
const auto metadata = CollectionShardingState::get(opCtx, nss)->getCurrentMetadata();
return metadata->isSharded();
}
@@ -281,7 +282,14 @@ Status MongoInterfaceStandalone::appendRecordCount(OperationContext* opCtx,
BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) {
const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
+ if (infos.empty()) {
+ return BSONObj();
+ }
+ const auto& infoObj = infos.front();
+ uassert(ErrorCodes::CommandNotSupportedOnView,
+ str::stream() << nss.toString() << " is a view, not a collection",
+ infoObj["type"].valueStringData() != "view"_sd);
+ return infoObj.getObjectField("options").getOwned();
}
void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged(
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index cb1f44f7092..d5bce605c9d 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -26,7 +26,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
@@ -34,6 +34,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/query/cluster_query_knobs.h"
@@ -186,10 +187,15 @@ BSONObj createCommandForTargetedShards(
// TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4.
targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream());
- // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
- // part. Otherwise this is part of an exchange and in that case we should include the
- // writeConcern.
- targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
+ // If there aren't any stages like $out in the pipeline being sent to the shards, remove the
+ // write concern. The write concern should only be applied when there are writes performed
+ // to avoid mistakenly waiting for writes which didn't happen.
+ const auto& shardsPipe = splitPipeline.shardsPipeline->getSources();
+ if (!std::any_of(shardsPipe.begin(), shardsPipe.end(), [](const auto& stage) {
+ return stage->constraints().writesPersistentData();
+ })) {
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
+ }
}
targetedCmd[AggregationRequest::kCursorName] =
@@ -261,6 +267,10 @@ DispatchShardPipelineResults dispatchShardPipeline(
boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
if (needsSplit) {
+ LOG(5) << "Splitting pipeline: "
+ << "targeting = " << shardIds.size()
+ << " shards, needsMongosMerge = " << needsMongosMerge
+ << ", needsPrimaryShardMerge = " << needsPrimaryShardMerge;
splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(