summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--jstests/aggregation/sharded_agg_cleanup_on_error.js8
-rw-r--r--jstests/aggregation/sources/out/exchange_explain.js2
-rw-r--r--jstests/sharding/out_does_not_force_merge.js62
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp15
-rw-r--r--src/mongo/db/pipeline/document_source.h95
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h10
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h11
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h5
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.h4
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h28
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h4
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h4
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.h4
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h10
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h11
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h4
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_group.h6
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h4
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h12
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h23
-rw-r--r--src/mongo/db/pipeline/document_source_list_cached_and_active_users.h4
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h11
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h4
-rw-r--r--src/mongo/db/pipeline/document_source_match.h4
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h4
-rw-r--r--src/mongo/db/pipeline/document_source_out.h25
-rw-r--r--src/mongo/db/pipeline/document_source_plan_cache_stats.h4
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h4
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp25
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h5
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.h4
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h4
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h4
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h13
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp15
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h5
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.h4
-rw-r--r--src/mongo/db/pipeline/document_source_test_optimizations.h4
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h4
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp11
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp3
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp12
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp33
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h4
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.h4
54 files changed, 226 insertions, 366 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 088e63f1a9b..de2b7f8eec6 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -43,7 +43,6 @@ selector:
- jstests/sharding/lookup_stale_mongos.js
- jstests/sharding/out_cannot_run_on_mongos.js
- jstests/sharding/out_command_options.js
- - jstests/sharding/out_does_not_force_merge.js
- jstests/sharding/out_from_stale_mongos.js
- jstests/sharding/out_hashed_shard_key.js
- jstests/sharding/out_stale_unique_key.js
diff --git a/jstests/aggregation/sharded_agg_cleanup_on_error.js b/jstests/aggregation/sharded_agg_cleanup_on_error.js
index c8122290afe..01b31b270fd 100644
--- a/jstests/aggregation/sharded_agg_cleanup_on_error.js
+++ b/jstests/aggregation/sharded_agg_cleanup_on_error.js
@@ -113,14 +113,10 @@
}));
// Run an aggregation which is eligible for $exchange. This should assert because of
- // the failpoint. Add a $group stage to force an exchange-eligible split of the pipeline
- // before the $out. Without the $group we won't use the exchange optimization and instead
- // will send the $out to each shard.
+ // the failpoint.
st.shardColl(mongosDB.target, {_id: 1}, {_id: 0}, {_id: 1}, kDBName, false);
assertErrorCode(
- coll,
- [{$group: {_id: "$fakeShardKey"}}, {$out: {to: "target", mode: "replaceDocuments"}}],
- ErrorCodes.FailPointEnabled);
+ coll, [{$out: {to: "target", mode: "replaceDocuments"}}], ErrorCodes.FailPointEnabled);
// Neither mongos or the shards should leave cursors open.
assert.eq(mongosDB.serverStatus().metrics.cursor.open.total, 0);
diff --git a/jstests/aggregation/sources/out/exchange_explain.js b/jstests/aggregation/sources/out/exchange_explain.js
index 84a5b02588f..5a6b7a33ef7 100644
--- a/jstests/aggregation/sources/out/exchange_explain.js
+++ b/jstests/aggregation/sources/out/exchange_explain.js
@@ -112,7 +112,7 @@ load('jstests/aggregation/extras/utils.js');
explain = runExplainQuery(outCollRange);
// Make sure there is no exchange.
- assert.eq(explain.mergeType, "anyShard", tojson(explain));
+ assert.eq(explain.mergeType, "primaryShard", tojson(explain));
assert(explain.hasOwnProperty("splitPipeline"), tojson(explain));
assert(!explain.splitPipeline.hasOwnProperty("exchange"), tojson(explain));
diff --git a/jstests/sharding/out_does_not_force_merge.js b/jstests/sharding/out_does_not_force_merge.js
deleted file mode 100644
index 01585fbe518..00000000000
--- a/jstests/sharding/out_does_not_force_merge.js
+++ /dev/null
@@ -1,62 +0,0 @@
-// Tests that an $out stage does not force a pipeline to split into a "shards part" and a "merging
-// part" if no other stage in the pipeline would force such a split.
-(function() {
- "use strict";
-
- const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
-
- const mongosDB = st.s.getDB("test_db");
-
- const inColl = mongosDB["inColl"];
- // Two different output collections will be sharded by different keys.
- const outCollById = mongosDB["outCollById"];
- const outCollBySK = mongosDB["outCollBySK"];
- st.shardColl(outCollById, {_id: 1}, {_id: 500}, {_id: 500}, mongosDB.getName());
- st.shardColl(outCollBySK, {sk: 1}, {sk: 500}, {sk: 500}, mongosDB.getName());
-
- const numDocs = 1000;
-
- // Shard the input collection.
- st.shardColl(inColl, {_id: 1}, {_id: 500}, {_id: 500}, mongosDB.getName());
-
- // Insert some data to the input collection.
- const bulk = inColl.initializeUnorderedBulkOp();
- for (let i = 0; i < numDocs; i++) {
- bulk.insert({_id: i, sk: numDocs - i});
- }
- assert.commandWorked(bulk.execute());
-
- function assertOutRunsOnShards(explain) {
- assert(explain.hasOwnProperty("splitPipeline"), tojson(explain));
- assert(explain.splitPipeline.hasOwnProperty("shardsPart"), tojson(explain));
- assert.eq(
- explain.splitPipeline.shardsPart.filter(stage => stage.hasOwnProperty("$out")).length,
- 1,
- tojson(explain));
- assert(explain.splitPipeline.hasOwnProperty("mergerPart"), tojson(explain));
- assert.eq([], explain.splitPipeline.mergerPart, tojson(explain));
- }
-
- // Test that a simple $out can run in parallel. Note that we still expect a 'splitPipeline' in
- // the explain output, but the merging half should be empty to indicate that the entire thing is
- // executing in parallel on the shards.
- let explain =
- inColl.explain().aggregate([{$out: {to: outCollById.getName(), mode: "insertDocuments"}}]);
- assertOutRunsOnShards(explain);
- // Actually execute the pipeline and make sure it works as expected.
- assert.eq(outCollById.find().itcount(), 0);
- inColl.aggregate([{$out: {to: outCollById.getName(), mode: "insertDocuments"}}]);
- assert.eq(outCollById.find().itcount(), numDocs);
-
- // Test the same thing but in a pipeline where the output collection's shard key differs from
- // the input collection's.
- explain =
- inColl.explain().aggregate([{$out: {to: outCollBySK.getName(), mode: "insertDocuments"}}]);
- assertOutRunsOnShards(explain);
- // Again, test that execution works as expected.
- assert.eq(outCollBySK.find().itcount(), 0);
- inColl.aggregate([{$out: {to: outCollBySK.getName(), mode: "insertDocuments"}}]);
- assert.eq(outCollBySK.find().itcount(), numDocs);
-
- st.stop();
-}());
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 9d023d1cd45..edcd4af42b1 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -65,23 +65,16 @@ public:
private:
bool supportsWriteConcern() const override {
- // For an aggregate command that specifies a writeConcern, we check on mongoS whether
- // there's an $out in the pipeline, and reject the command if there isn't. Otherwise,
- // we forward the writeConcern to any and all aggregates sent to the shards, even for an
- // aggregate which represents a part of the global pipeline that does not contain the
- // $out. So if the command is from mongos we can trust that the write concern makes
- // sense. Otherwise we validate that writeConcern is only passed when there's an $out
- // stage.
- return this->_request.body["fromMongos"].trueValue() ||
- Pipeline::aggSupportsWriteConcern(this->_request.body);
+ return Pipeline::aggSupportsWriteConcern(this->_request.body);
}
bool supportsReadConcern(repl::ReadConcernLevel level) const override {
// Aggregations that are run directly against a collection allow any read concern.
// Otherwise, if the aggregate is collectionless then the read concern must be 'local'
// (e.g. $currentOp). The exception to this is a $changeStream on a whole database,
- // which is considered collectionless but must be read concern 'majority'. Further read
- // concern validation is done once the pipeline is parsed.
+ // which is
+ // considered collectionless but must be read concern 'majority'. Further read concern
+ // validation is done one the pipeline is parsed.
return level == repl::ReadConcernLevel::kLocalReadConcern ||
level == repl::ReadConcernLevel::kMajorityReadConcern ||
!AggregationRequest::parseNs(_dbName, _request.body).isCollectionlessAggregateNS();
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 339a7a74b40..2c3e6ccdd84 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -216,26 +216,6 @@ public:
Document _result;
};
- /**
- * A struct representing the information needed to execute this stage on a distributed
- * collection.
- */
- struct MergingLogic {
-
- // A stage which executes on each shard in parallel, or nullptr if nothing can be done in
- // parallel. For example, a partial $group before a subsequent global $group.
- boost::intrusive_ptr<DocumentSource> shardsStage = nullptr;
-
- // A stage which executes after merging all the results together, or nullptr if nothing is
- // necessary after merging. For example, a $limit stage.
- boost::intrusive_ptr<DocumentSource> mergingStage = nullptr;
-
- // If set, each document is expected to have sort key metadata which will be serialized in
- // the '$sortKey' field. 'inputSortPattern' will then be used to describe which fields are
- // ascending and which fields are descending when merging the streams together.
- boost::optional<BSONObj> inputSortPattern = boost::none;
- };
-
virtual ~DocumentSource() {}
/**
@@ -474,26 +454,6 @@ public:
return DepsTracker::State::NOT_SUPPORTED;
}
- /**
- * If this stage can be run in parallel across a distributed collection, returns boost::none.
- * Otherwise, returns a struct representing what needs to be done to merge each shard's pipeline
- * into a single stream of results. Must not mutate the existing source object; if different
- * behaviour is required, a new source should be created and configured appropriately. It is an
- * error for the returned MergingLogic to have identical pointers for 'shardsStage' and
- * 'mergingStage'.
- */
- virtual boost::optional<MergingLogic> mergingLogic() = 0;
-
- /**
- * Returns true if it would be correct to execute this stage in parallel across the shards in
- * cases where the final stage is an $out. For example, a $group stage which is just merging the
- * groups from the shards can be run in parallel since it will preserve the shard key.
- */
- virtual bool canRunInParallelBeforeOut(
- const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
- return false;
- }
-
protected:
explicit DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
@@ -547,4 +507,59 @@ private:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const = 0;
};
+/**
+ * This class marks DocumentSources that should be split between the merger and the shards. See
+ * Pipeline::Optimizations::Sharded::findSplitPoint() for details.
+ */
+class NeedsMergerDocumentSource {
+public:
+ /**
+ * A struct representing the information needed to merge the cursors for the shards half of this
+ * pipeline. If 'inputSortPattern' is set, each document is expected to have sort key metadata
+ * which will be serialized in the '$sortKey' field. 'inputSortPattern' will then be used to
+ * describe which fields are ascending and which fields are descending when merging the streams
+ * together.
+ */
+ struct MergingLogic {
+ MergingLogic(boost::intrusive_ptr<DocumentSource>&& mergingStage,
+ boost::optional<BSONObj> inputSortPattern = boost::none)
+ : mergingStage(std::move(mergingStage)), inputSortPattern(inputSortPattern) {}
+
+ boost::intrusive_ptr<DocumentSource> mergingStage;
+ boost::optional<BSONObj> inputSortPattern;
+ };
+
+ /**
+ * Returns a source to be run on the shards, or NULL if no work should be done on the shards for
+ * this stage. Must not mutate the existing source object; if different behaviour is required in
+ * the split-pipeline case, a new source should be created and configured appropriately. It is
+ * an error for getShardSource() to return a pointer to the same object as getMergeSource(),
+ * since this can result in the source being stitched into both the shard and merge pipelines
+ * when the latter is executed on mongoS.
+ */
+ virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0;
+
+ /**
+ * Returns a struct representing what needs to be done to merge each shard's pipeline into a
+ * single stream of results. Must not mutate the existing source object; if different behaviour
+ * is required, a new source should be created and configured appropriately. It is an error for
+ * mergingLogic() to return a pointer to the same object as getShardSource().
+ */
+ virtual MergingLogic mergingLogic() = 0;
+
+ /**
+ * Returns true if it would be correct to execute this stage in parallel across the shards in
+ * cases where the final stage is an $out. For example, a $group stage which is just merging the
+ * groups from the shards can be run in parallel since it will preserve the shard key.
+ */
+ virtual bool canRunInParallelBeforeOut(
+ const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const {
+ return false;
+ }
+
+protected:
+ // It is invalid to delete through a NeedsMergerDocumentSource-typed pointer.
+ virtual ~NeedsMergerDocumentSource() {}
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index cbcce1f46bc..e75d4d16558 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -43,7 +43,7 @@ namespace mongo {
* The $bucketAuto stage takes a user-specified number of buckets and automatically determines
* boundaries such that the values are approximately equally distributed between those buckets.
*/
-class DocumentSourceBucketAuto final : public DocumentSource {
+class DocumentSourceBucketAuto final : public DocumentSource, public NeedsMergerDocumentSource {
public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
DepsTracker::State getDependencies(DepsTracker* deps) const final;
@@ -62,9 +62,11 @@ public:
/**
* The $bucketAuto stage must be run on the merging shard.
*/
- boost::optional<MergingLogic> mergingLogic() final {
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{nullptr, this, boost::none};
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+ MergingLogic mergingLogic() final {
+ return {this};
}
static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
index b4e91e11c39..4c3c8ee7960 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
@@ -42,7 +42,7 @@ namespace mongo {
* "invalidate" entries.
* It is not intended to be created by the user.
*/
-class DocumentSourceCloseCursor final : public DocumentSource {
+class DocumentSourceCloseCursor final : public DocumentSource, public NeedsMergerDocumentSource {
public:
GetNextResult getNext() final;
@@ -76,11 +76,14 @@ public:
return new DocumentSourceCloseCursor(expCtx);
}
- boost::optional<MergingLogic> mergingLogic() final {
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+
+ MergingLogic mergingLogic() final {
// This stage must run on mongos to ensure it sees any invalidation in the correct order,
// and to ensure that all remote cursors are cleaned up properly.
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{nullptr, this, change_stream_constants::kSortSpec};
+ return {this, change_stream_constants::kSortSpec};
}
private:
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 14c5f51a05f..63f516daad9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -54,11 +54,6 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const;
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
-
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
DocumentSource::GetNextResult getNext();
const char* getSourceName() const {
return DocumentSourceChangeStream::kStageName.rawData();
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h
index 3a6ea3da51c..863199016db 100644
--- a/src/mongo/db/pipeline/document_source_check_invalidate.h
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.h
@@ -58,10 +58,6 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
// This stage is created by the DocumentSourceChangeStream stage, so serializing it here
// would result in it being created twice.
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 6dd3c949420..7489e156029 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -73,10 +73,6 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
@@ -97,7 +93,8 @@ private:
* This stage is used internally for change streams to ensure that the resume token is in the
* stream. It is not intended to be created by the user.
*/
-class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource {
+class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource,
+ public NeedsMergerDocumentSource {
public:
// Used to record the results of comparing the token data extracted from documents in the
// resumed stream against the client's resume token.
@@ -122,18 +119,21 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<MergingLogic> mergingLogic() final {
- MergingLogic logic;
+ /**
+ * NeedsMergerDocumentSource methods; this has to run on the merger, since the resume point
+ * could be at any shard. Also add a DocumentSourceShardCheckResumability stage on the shards
+ * pipeline to ensure that each shard has enough oplog history to resume the change stream.
+ */
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return DocumentSourceShardCheckResumability::create(pExpCtx,
+ _tokenFromClient.getClusterTime());
+ };
+
+ MergingLogic mergingLogic() final {
// This stage must run on mongos to ensure it sees the resume token, which could have come
// from any shard. We also must include a mergingPresorted $sort stage to communicate to
// the AsyncResultsMerger that we need to merge the streams in a particular order.
- logic.mergingStage = this;
- // Also add logic to the shards to ensure that each shard has enough oplog history to resume
- // the change stream.
- logic.shardsStage = DocumentSourceShardCheckResumability::create(
- pExpCtx, _tokenFromClient.getClusterTime());
- logic.inputSortPattern = change_stream_constants::kSortSpec;
- return logic;
+ return {this, change_stream_constants::kSortSpec};
};
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index d306a867475..b71898dbca9 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -88,10 +88,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index fd2ef5e778e..19854342516 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -125,10 +125,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 5943070b48f..d15c23c2849 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -70,10 +70,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
void detachFromOperationContext() final;
void reattachToOperationContext(OperationContext* opCtx) final;
diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h
index 414ef2b2f78..071c04ec32d 100644
--- a/src/mongo/db/pipeline/document_source_exchange.h
+++ b/src/mongo/db/pipeline/document_source_exchange.h
@@ -214,10 +214,6 @@ public:
TransactionRequirement::kAllowed};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index 570e55b1907..bc8f8dad307 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -57,7 +57,7 @@ class NamespaceString;
* stage which will produce a document like the following:
* {facetA: [<all input documents except the first one>], facetB: [<the first document>]}.
*/
-class DocumentSourceFacet final : public DocumentSource {
+class DocumentSourceFacet final : public DocumentSource, public NeedsMergerDocumentSource {
public:
struct FacetPipeline {
FacetPipeline(std::string name, std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
@@ -126,9 +126,11 @@ public:
* TODO SERVER-24154: Should be smarter about splitting so that parts of the sub-pipelines can
* potentially be run in parallel on multiple shards.
*/
- boost::optional<MergingLogic> mergingLogic() final {
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{nullptr, this, boost::none};
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+ MergingLogic mergingLogic() final {
+ return {this};
}
const std::vector<FacetPipeline>& getFacetPipelines() const {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index e80feb995bd..4a9cee80e3f 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -237,9 +237,8 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con
DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {}
-boost::optional<DocumentSource::MergingLogic> DocumentSourceGeoNear::mergingLogic() {
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{this, nullptr, BSON(distanceField->fullPath() << 1)};
+NeedsMergerDocumentSource::MergingLogic DocumentSourceGeoNear::mergingLogic() {
+ return {nullptr, BSON(distanceField->fullPath() << 1)};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index d95d59dd402..13f915afd2f 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -35,7 +35,7 @@
namespace mongo {
-class DocumentSourceGeoNear : public DocumentSource {
+class DocumentSourceGeoNear : public DocumentSource, public NeedsMergerDocumentSource {
public:
static constexpr StringData kKeyFieldName = "key"_sd;
static constexpr auto kStageName = "$geoNear";
@@ -121,9 +121,16 @@ public:
BSONObj asNearQuery(StringData nearFieldName) const;
/**
+ * This document source is sent as-is to the shards.
+ */
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return this;
+ }
+
+ /**
* In a sharded cluster, this becomes a merge sort by distance, from nearest to furthest.
*/
- boost::optional<MergingLogic> mergingLogic() final;
+ MergingLogic mergingLogic() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 42ef2b5ca38..cc186225ce8 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -67,10 +67,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
DepsTracker::State getDependencies(DepsTracker* deps) const final {
_startWith->addDependencies(deps);
return DepsTracker::State::SEE_NEXT;
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index dee7cd104d4..318a67df1ff 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -950,7 +950,11 @@ Document DocumentSourceGroup::makeDocument(const Value& id,
return out.freeze();
}
-boost::optional<DocumentSource::MergingLogic> DocumentSourceGroup::mergingLogic() {
+intrusive_ptr<DocumentSource> DocumentSourceGroup::getShardSource() {
+ return this; // No modifications necessary when on shard
+}
+
+NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() {
intrusive_ptr<DocumentSourceGroup> mergingGroup(new DocumentSourceGroup(pExpCtx));
mergingGroup->setDoingMerge(true);
@@ -969,8 +973,7 @@ boost::optional<DocumentSource::MergingLogic> DocumentSourceGroup::mergingLogic(
mergingGroup->addAccumulator(copiedAccumuledField);
}
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{this, mergingGroup, boost::none};
+ return {mergingGroup};
}
bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const {
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 6348a83ab46..a1d6fdad059 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -87,7 +87,7 @@ private:
std::string _groupId;
};
-class DocumentSourceGroup final : public DocumentSource {
+class DocumentSourceGroup final : public DocumentSource, public NeedsMergerDocumentSource {
public:
using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>;
using GroupsMap = ValueUnorderedMap<Accumulators>;
@@ -162,7 +162,9 @@ public:
*/
bool usedDisk() final;
- boost::optional<MergingLogic> mergingLogic() final;
+ // Virtuals for NeedsMergerDocumentSource.
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ MergingLogic mergingLogic() final;
bool canRunInParallelBeforeOut(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final;
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index d07f41889ff..bde6dfdeb32 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -582,12 +582,13 @@ protected:
intrusive_ptr<DocumentSource> createMerger() {
// Set up a group merger to simulate merging results in the router. In this
// case only one shard is in use.
- auto mergeLogic = group()->mergingLogic();
- ASSERT(mergeLogic);
- ASSERT(mergeLogic->mergingStage);
- ASSERT_NOT_EQUALS(group(), mergeLogic->mergingStage);
- ASSERT_FALSE(static_cast<bool>(mergeLogic->inputSortPattern));
- return mergeLogic->mergingStage;
+ NeedsMergerDocumentSource* splittable = dynamic_cast<NeedsMergerDocumentSource*>(group());
+ ASSERT(splittable);
+ auto mergeLogic = splittable->mergingLogic();
+ ASSERT(mergeLogic.mergingStage);
+ ASSERT_NOT_EQUALS(group(), mergeLogic.mergingStage);
+ ASSERT_FALSE(static_cast<bool>(mergeLogic.inputSortPattern));
+ return mergeLogic.mergingStage;
}
void checkResultSet(const intrusive_ptr<DocumentSource>& sink) {
// Load the results from the DocumentSourceGroup and sort them by _id.
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 0713e85f368..51d92262b0f 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -83,10 +83,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index b9e22e6cbf4..401087d12e6 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -63,10 +63,6 @@ public:
TransactionRequirement::kAllowed};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
GetNextResult getNext() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index 3abcc737db4..8f0a8780dbe 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -43,7 +43,8 @@ namespace mongo {
* pipeline will be executed on mongoS if all other stages are eligible, and will be sent to a
* random participating shard otherwise.
*/
-class DocumentSourceInternalSplitPipeline final : public DocumentSource {
+class DocumentSourceInternalSplitPipeline final : public DocumentSource,
+ public NeedsMergerDocumentSource {
public:
static constexpr StringData kStageName = "$_internalSplitPipeline"_sd;
@@ -59,9 +60,12 @@ public:
return kStageName.rawData();
}
- boost::optional<MergingLogic> mergingLogic() final {
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{nullptr, this, boost::none};
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+
+ MergingLogic mergingLogic() final {
+ return {this};
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index 78d325c9375..885c0f5c875 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -34,7 +34,7 @@
namespace mongo {
-class DocumentSourceLimit final : public DocumentSource {
+class DocumentSourceLimit final : public DocumentSource, public NeedsMergerDocumentSource {
public:
static constexpr StringData kStageName = "$limit"_sd;
@@ -81,14 +81,21 @@ public:
}
/**
- * Returns a MergingLogic with two identical $limit stages; one for the shards pipeline and one
- * for the merging pipeline.
+ * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on
+ * the shards is an optimization, but is not strictly necessary in order to produce correct
+ * pipeline output.
*/
- boost::optional<MergingLogic> mergingLogic() final {
- // Running this stage on the shards is an optimization, but is not strictly necessary in
- // order to produce correct pipeline output.
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{this, DocumentSourceLimit::create(pExpCtx, _limit), boost::none};
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return this;
+ }
+
+ /**
+ * Returns a new DocumentSourceLimit with the same limit as the current stage, for use in the
+ * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the
+ * merging host in order to produce correct pipeline output.
+ */
+ MergingLogic mergingLogic() final {
+ return {DocumentSourceLimit::create(pExpCtx, _limit)};
}
long long getLimit() const {
diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
index a6312e8a2b9..47fbb2759cd 100644
--- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
+++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h
@@ -103,10 +103,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index 4c0aa30f4fc..9e0fa9e7363 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -117,10 +117,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 8a0c068f766..2a2b3338789 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -47,7 +47,7 @@ namespace mongo {
* Queries separate collection for equality matches with documents in the pipeline collection.
* Adds matching documents to a new array field in the input document.
*/
-class DocumentSourceLookUp final : public DocumentSource {
+class DocumentSourceLookUp final : public DocumentSource, public NeedsMergerDocumentSource {
public:
static constexpr size_t kMaxSubPipelineDepth = 20;
@@ -110,9 +110,12 @@ public:
return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()});
}
- boost::optional<MergingLogic> mergingLogic() final {
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{nullptr, this, boost::none};
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+
+ MergingLogic mergingLogic() final {
+ return {this};
}
void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index cd2f8175e00..9e63d3f5dcd 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -77,10 +77,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
DepsTracker::State getDependencies(DepsTracker* deps) const {
// The namespace is not technically needed yet, but we will if there is more than one
// collection involved.
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index d770006e2d4..fd08a52250c 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -162,10 +162,6 @@ public:
const std::string& path,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
protected:
DocumentSourceMatch(const BSONObj& query,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index b5d6dffde47..617e0cf462c 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -98,10 +98,6 @@ public:
return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}};
}
- boost::optional<MergingLogic> mergingLogic() override {
- return boost::none;
- }
-
// Return documents from front of queue.
std::deque<GetNextResult> queue;
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index cfd9249146b..906a30f10ee 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -65,7 +65,7 @@ public:
/**
* Abstract class for the $out aggregation stage.
*/
-class DocumentSourceOut : public DocumentSource {
+class DocumentSourceOut : public DocumentSource, public NeedsMergerDocumentSource {
public:
/**
* A "lite parsed" $out stage is similar to other stages involving foreign collections except in
@@ -125,12 +125,8 @@ public:
PositionRequirement::kLast,
// A $out to an unsharded collection should merge on the primary shard to perform
// local writes. A $out to a sharded collection has no requirement, since each shard
- // can perform its own portion of the write. We use 'kAnyShard' to direct it to
- // execute on one of the shards in case some of the writes happen to end up being
- // local.
- pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)
- ? HostTypeRequirement::kAnyShard
- : HostTypeRequirement::kPrimaryShard,
+ // can perform its own portion of the write.
+ HostTypeRequirement::kPrimaryShard,
DiskUseRequirement::kWritesPersistentData,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed};
@@ -144,17 +140,12 @@ public:
return _mode;
}
- boost::optional<MergingLogic> mergingLogic() final {
- // It should always be faster to avoid splitting the pipeline if the output collection is
- // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the
- // target collection in parallel.
- if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) {
- return boost::none;
- }
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{nullptr, this, boost::none};
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+ MergingLogic mergingLogic() final {
+ return {this};
}
-
virtual bool canRunInParallelBeforeOut(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final {
// If someone is asking the question, this must be the $out stage in question, so yes!
diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
index 3c1beb37e76..a17225b1a60 100644
--- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h
+++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h
@@ -105,10 +105,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
const char* getSourceName() const override {
return kStageName;
}
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index 70cc16e1492..0c52b453e7f 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -52,10 +52,6 @@ public:
ChangeStreamRequirement::kWhitelist};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
/**
* Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact
* stage.
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 2728f5607f7..90b2e82da1f 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -117,22 +117,17 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::createFromBson(
return sample;
}
+intrusive_ptr<DocumentSource> DocumentSourceSample::getShardSource() {
+ return this;
+}
-boost::optional<DocumentSource::MergingLogic> DocumentSourceSample::mergingLogic() {
+NeedsMergerDocumentSource::MergingLogic DocumentSourceSample::mergingLogic() {
// On the merger we need to merge the pre-sorted documents by their random values, then limit to
- // the number we need.
- MergingLogic logic;
- logic.shardsStage = this;
- if (_size > 0) {
- logic.mergingStage = DocumentSourceLimit::create(pExpCtx, _size);
- }
-
- // Here we don't use 'randSortSpec' because it uses a metadata sort which the merging logic does
- // not understand. The merging logic will use the serialized sort key, and this sort pattern is
- // just used to communicate ascending/descending information. A pattern like {$meta: "randVal"}
- // is neither ascending nor descending, and so will not be useful when constructing the merging
- // logic.
- logic.inputSortPattern = BSON("$rand" << -1);
- return logic;
+ // the number we need. Here we don't use 'randSortSpec' because it uses a metadata sort which
+ // the merging logic does not understand. The merging logic will use the serialized sort key,
+ // and this sort pattern is just used to communicate ascending/descending information. A pattern
+ // like {$meta: "randVal"} is neither ascending nor descending, and so will not be useful when
+ // constructing the merging logic.
+ return {_size > 0 ? DocumentSourceLimit::create(pExpCtx, _size) : nullptr, BSON("$rand" << -1)};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index eec90e7c24f..11aa6fea8d0 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -35,7 +35,7 @@
namespace mongo {
-class DocumentSourceSample final : public DocumentSource {
+class DocumentSourceSample final : public DocumentSource, public NeedsMergerDocumentSource {
public:
static constexpr StringData kStageName = "$sample"_sd;
@@ -58,7 +58,8 @@ public:
return DepsTracker::State::SEE_NEXT;
}
- boost::optional<MergingLogic> mergingLogic() final;
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ MergingLogic mergingLogic() final;
long long getSampleSize() const {
return _size;
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
index a095ade9fc0..60289140440 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
@@ -55,10 +55,6 @@ public:
TransactionRequirement::kAllowed};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
long long size,
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index c6b4f21ab45..48e14142829 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -64,10 +64,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
GetNextResult getNext() final;
static boost::intrusive_ptr<DocumentSourceSequentialDocumentCache> create(
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 7f5d2196ab0..1b9407b9d6b 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -72,10 +72,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
TransformerInterface::TransformerType getType() const {
return _parsedTransform->getType();
}
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index 7a384d083ed..883cde202bd 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -34,7 +34,7 @@
namespace mongo {
-class DocumentSourceSkip final : public DocumentSource {
+class DocumentSourceSkip final : public DocumentSource, public NeedsMergerDocumentSource {
public:
static constexpr StringData kStageName = "$skip"_sd;
@@ -84,12 +84,11 @@ public:
return DepsTracker::State::SEE_NEXT; // This doesn't affect needed fields
}
- /**
- * The $skip stage must run on the merging half of the pipeline.
- */
- boost::optional<MergingLogic> mergingLogic() final {
- // {shardsStage, mergingStage, sortPattern}
- return MergingLogic{nullptr, this, boost::none};
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ }
+ MergingLogic mergingLogic() final {
+ return {this};
}
long long getSkip() const {
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index aa48535f810..475897471ed 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -524,14 +524,13 @@ int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const {
return 0;
}
-boost::optional<DocumentSource::MergingLogic> DocumentSourceSort::mergingLogic() {
- MergingLogic split;
- split.shardsStage = this;
- split.inputSortPattern = sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson();
- if (_limitSrc) {
- split.mergingStage = DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit());
- }
- return split;
+intrusive_ptr<DocumentSource> DocumentSourceSort::getShardSource() {
+ return this;
+}
+
+NeedsMergerDocumentSource::MergingLogic DocumentSourceSort::mergingLogic() {
+ return {_limitSrc ? DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()) : nullptr,
+ sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson()};
}
bool DocumentSourceSort::canRunInParallelBeforeOut(
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 80ef1ed525a..4ee2584bc45 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -39,7 +39,7 @@
namespace mongo {
-class DocumentSourceSort final : public DocumentSource {
+class DocumentSourceSort final : public DocumentSource, public NeedsMergerDocumentSource {
public:
static constexpr StringData kStageName = "$sort"_sd;
enum class SortKeySerialization {
@@ -83,7 +83,8 @@ public:
DepsTracker::State getDependencies(DepsTracker* deps) const final;
- boost::optional<MergingLogic> mergingLogic() final;
+ boost::intrusive_ptr<DocumentSource> getShardSource() final;
+ MergingLogic mergingLogic() final;
bool canRunInParallelBeforeOut(
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final;
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index 4d0fb324e60..91e54dc7aaf 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -132,9 +132,8 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
sort()->serializeToArray(arr);
ASSERT_BSONOBJ_EQ(arr[0].getDocument().toBson(), BSON("$sort" << BSON("a" << 1)));
- ASSERT(sort()->mergingLogic());
- ASSERT(sort()->mergingLogic()->shardsStage != nullptr);
- ASSERT(sort()->mergingLogic()->mergingStage == nullptr);
+ ASSERT(sort()->getShardSource() != nullptr);
+ ASSERT(sort()->mergingLogic().mergingStage == nullptr);
}
container.push_back(DocumentSourceLimit::create(expCtx, 10));
@@ -160,10 +159,9 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) {
Value(arr),
DOC_ARRAY(DOC("$sort" << DOC("a" << 1)) << DOC("$limit" << sort()->getLimit())));
- ASSERT(sort()->mergingLogic());
- ASSERT(sort()->mergingLogic()->shardsStage != nullptr);
- ASSERT(sort()->mergingLogic()->mergingStage != nullptr);
- ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic()->mergingStage.get()));
+ ASSERT(sort()->getShardSource() != nullptr);
+ ASSERT(sort()->mergingLogic().mergingStage != nullptr);
+ ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic().mergingStage.get()));
}
TEST_F(DocumentSourceSortTest, Dependencies) {
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h
index 04a7f96fa31..a642533fdf7 100644
--- a/src/mongo/db/pipeline/document_source_tee_consumer.h
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.h
@@ -64,10 +64,6 @@ public:
TransactionRequirement::kAllowed};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
GetNextResult getNext() final;
/**
diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h
index 8739dea49d2..b58b35566e4 100644
--- a/src/mongo/db/pipeline/document_source_test_optimizations.h
+++ b/src/mongo/db/pipeline/document_source_test_optimizations.h
@@ -56,10 +56,6 @@ public:
TransactionRequirement::kNotAllowed};
}
- virtual boost::optional<MergingLogic> mergingLogic() override {
- return boost::none;
- }
-
virtual GetModPathsReturn getModifiedPaths() const override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index 8317bf229dd..787249aec6e 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -60,10 +60,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
DepsTracker::State getDependencies(DepsTracker* deps) const final;
/**
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 21a5d73c94d..b01585619b6 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -161,9 +161,7 @@ public:
BSONObjBuilder* builder) const = 0;
/**
- * Gets the collection options for the collection given by 'nss'. Throws
- * ErrorCodes::CommandNotSupportedOnView if 'nss' describes a view. Future callers may want to
- * parameterize this behavior.
+ * Gets the collection options for the collection given by 'nss'.
*/
virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0;
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 68aa5ae9d3d..effb02d4e6f 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -28,7 +28,7 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
#include "mongo/platform/basic.h"
@@ -298,6 +298,11 @@ BSONObj MongoSInterface::createCommandForTargetedShards(
// notifies the shards that the mongoS is capable of merging streams based on resume token.
// TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4.
targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream());
+
+ // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
+ // part. Otherwise this is part of an exchange and in that case we should include the
+ // writeConcern.
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
}
targetedCmd[AggregationRequest::kCursorName] =
@@ -415,10 +420,6 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe
boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
if (needsSplit) {
- LOG(5) << "Splitting pipeline: "
- << "targeting = " << shardIds.size()
- << " shards, needsMongosMerge = " << needsMongosMerge
- << ", needsPrimaryShardMerge = " << needsPrimaryShardMerge;
splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 1fddc890a17..725c655be44 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -390,7 +390,8 @@ bool Pipeline::requiredToRunOnMongos() const {
for (auto&& stage : _sources) {
// If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline
// as a whole is not required to run on mongoS.
- if (_splitState == SplitState::kUnsplit && stage->mergingLogic()) {
+ if (_splitState == SplitState::kUnsplit &&
+ dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) {
return false;
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 38c521d71e4..aeef3174c5c 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -142,8 +142,7 @@ DBClientBase* MongoInterfaceStandalone::directClient() {
}
bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS);
- Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS);
+ AutoGetCollectionForRead autoColl(opCtx, nss);
const auto metadata = CollectionShardingState::get(opCtx, nss)->getCurrentMetadata();
return metadata->isSharded();
}
@@ -277,14 +276,7 @@ Status MongoInterfaceStandalone::appendRecordCount(OperationContext* opCtx,
BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) {
const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- if (infos.empty()) {
- return BSONObj();
- }
- auto res = infos.front();
- uassert(ErrorCodes::CommandNotSupportedOnView,
- str::stream() << nss.toString() << " is a view, not a collection",
- res["type"].String() != "view");
- return res.getObjectField("options").getOwned();
+ return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
}
void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged(
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index c0329d181b1..2fd4d769bfb 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -66,6 +66,8 @@ namespace {
*
* It is not safe to call this optimization multiple times.
*
+ * NOTE: looks for NeedsMergerDocumentSources and uses that API
+ *
* Returns the sort specification if the input streams are sorted, and false otherwise.
*/
boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) {
@@ -73,23 +75,28 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi
boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront();
// Check if this source is splittable.
- auto mergeLogic = current->mergingLogic();
- if (!mergeLogic) {
+ NeedsMergerDocumentSource* splittable =
+ dynamic_cast<NeedsMergerDocumentSource*>(current.get());
+
+ if (!splittable) {
// Move the source from the merger _sources to the shard _sources.
shardPipe->push_back(current);
- continue;
- }
+ } else {
+ // Split this source into 'merge' and 'shard' _sources.
+ boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
+ auto mergeLogic = splittable->mergingLogic();
- // A source may not simultaneously be present on both sides of the split.
- invariant(mergeLogic->shardsStage != mergeLogic->mergingStage);
+ // A source may not simultaneously be present on both sides of the split.
+ invariant(shardSource != mergeLogic.mergingStage);
- if (mergeLogic->shardsStage)
- shardPipe->push_back(std::move(mergeLogic->shardsStage));
+ if (shardSource)
+ shardPipe->push_back(std::move(shardSource));
- if (mergeLogic->mergingStage)
- mergePipe->addInitialSource(std::move(mergeLogic->mergingStage));
+ if (mergeLogic.mergingStage)
+ mergePipe->addInitialSource(std::move(mergeLogic.mergingStage));
- return mergeLogic->inputSortPattern;
+ return mergeLogic.inputSortPattern;
+ }
}
return boost::none;
}
@@ -288,8 +295,8 @@ ClusterClientCursorGuard convertPipelineToRouterStages(
bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage,
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) {
- if (stage->mergingLogic()) {
- return stage->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage);
+ if (auto needsMerger = dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) {
+ return needsMerger->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage);
} else {
// This stage is fine to execute in parallel on each stream. For example, a $match can be
// applied to each stream in parallel.
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
index 259b7ea088b..f733d2c3088 100644
--- a/src/mongo/s/query/document_source_merge_cursors.h
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -95,10 +95,6 @@ public:
return constraints;
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
GetNextResult getNext() final;
std::size_t getNumRemotes() const;
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h
index 37a32a774a3..7f6833b5494 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.h
+++ b/src/mongo/s/query/document_source_update_on_add_shard.h
@@ -73,10 +73,6 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<MergingLogic> mergingLogic() final {
- return boost::none;
- }
-
GetNextResult getNext() final;
private: