diff options
54 files changed, 226 insertions, 366 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 088e63f1a9b..de2b7f8eec6 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -43,7 +43,6 @@ selector: - jstests/sharding/lookup_stale_mongos.js - jstests/sharding/out_cannot_run_on_mongos.js - jstests/sharding/out_command_options.js - - jstests/sharding/out_does_not_force_merge.js - jstests/sharding/out_from_stale_mongos.js - jstests/sharding/out_hashed_shard_key.js - jstests/sharding/out_stale_unique_key.js diff --git a/jstests/aggregation/sharded_agg_cleanup_on_error.js b/jstests/aggregation/sharded_agg_cleanup_on_error.js index c8122290afe..01b31b270fd 100644 --- a/jstests/aggregation/sharded_agg_cleanup_on_error.js +++ b/jstests/aggregation/sharded_agg_cleanup_on_error.js @@ -113,14 +113,10 @@ })); // Run an aggregation which is eligible for $exchange. This should assert because of - // the failpoint. Add a $group stage to force an exchange-eligible split of the pipeline - // before the $out. Without the $group we won't use the exchange optimization and instead - // will send the $out to each shard. + // the failpoint. st.shardColl(mongosDB.target, {_id: 1}, {_id: 0}, {_id: 1}, kDBName, false); assertErrorCode( - coll, - [{$group: {_id: "$fakeShardKey"}}, {$out: {to: "target", mode: "replaceDocuments"}}], - ErrorCodes.FailPointEnabled); + coll, [{$out: {to: "target", mode: "replaceDocuments"}}], ErrorCodes.FailPointEnabled); // Neither mongos or the shards should leave cursors open. assert.eq(mongosDB.serverStatus().metrics.cursor.open.total, 0); diff --git a/jstests/aggregation/sources/out/exchange_explain.js b/jstests/aggregation/sources/out/exchange_explain.js index 84a5b02588f..5a6b7a33ef7 100644 --- a/jstests/aggregation/sources/out/exchange_explain.js +++ b/jstests/aggregation/sources/out/exchange_explain.js @@ -112,7 +112,7 @@ load('jstests/aggregation/extras/utils.js'); explain = runExplainQuery(outCollRange); // Make sure there is no exchange. - assert.eq(explain.mergeType, "anyShard", tojson(explain)); + assert.eq(explain.mergeType, "primaryShard", tojson(explain)); assert(explain.hasOwnProperty("splitPipeline"), tojson(explain)); assert(!explain.splitPipeline.hasOwnProperty("exchange"), tojson(explain)); diff --git a/jstests/sharding/out_does_not_force_merge.js b/jstests/sharding/out_does_not_force_merge.js deleted file mode 100644 index 01585fbe518..00000000000 --- a/jstests/sharding/out_does_not_force_merge.js +++ /dev/null @@ -1,62 +0,0 @@ -// Tests that an $out stage does not force a pipeline to split into a "shards part" and a "merging -// part" if no other stage in the pipeline would force such a split. -(function() { - "use strict"; - - const st = new ShardingTest({shards: 2, rs: {nodes: 1}}); - - const mongosDB = st.s.getDB("test_db"); - - const inColl = mongosDB["inColl"]; - // Two different output collections will be sharded by different keys. - const outCollById = mongosDB["outCollById"]; - const outCollBySK = mongosDB["outCollBySK"]; - st.shardColl(outCollById, {_id: 1}, {_id: 500}, {_id: 500}, mongosDB.getName()); - st.shardColl(outCollBySK, {sk: 1}, {sk: 500}, {sk: 500}, mongosDB.getName()); - - const numDocs = 1000; - - // Shard the input collection. - st.shardColl(inColl, {_id: 1}, {_id: 500}, {_id: 500}, mongosDB.getName()); - - // Insert some data to the input collection. - const bulk = inColl.initializeUnorderedBulkOp(); - for (let i = 0; i < numDocs; i++) { - bulk.insert({_id: i, sk: numDocs - i}); - } - assert.commandWorked(bulk.execute()); - - function assertOutRunsOnShards(explain) { - assert(explain.hasOwnProperty("splitPipeline"), tojson(explain)); - assert(explain.splitPipeline.hasOwnProperty("shardsPart"), tojson(explain)); - assert.eq( - explain.splitPipeline.shardsPart.filter(stage => stage.hasOwnProperty("$out")).length, - 1, - tojson(explain)); - assert(explain.splitPipeline.hasOwnProperty("mergerPart"), tojson(explain)); - assert.eq([], explain.splitPipeline.mergerPart, tojson(explain)); - } - - // Test that a simple $out can run in parallel. Note that we still expect a 'splitPipeline' in - // the explain output, but the merging half should be empty to indicate that the entire thing is - // executing in parallel on the shards. - let explain = - inColl.explain().aggregate([{$out: {to: outCollById.getName(), mode: "insertDocuments"}}]); - assertOutRunsOnShards(explain); - // Actually execute the pipeline and make sure it works as expected. - assert.eq(outCollById.find().itcount(), 0); - inColl.aggregate([{$out: {to: outCollById.getName(), mode: "insertDocuments"}}]); - assert.eq(outCollById.find().itcount(), numDocs); - - // Test the same thing but in a pipeline where the output collection's shard key differs from - // the input collection's. - explain = - inColl.explain().aggregate([{$out: {to: outCollBySK.getName(), mode: "insertDocuments"}}]); - assertOutRunsOnShards(explain); - // Again, test that execution works as expected. - assert.eq(outCollBySK.find().itcount(), 0); - inColl.aggregate([{$out: {to: outCollBySK.getName(), mode: "insertDocuments"}}]); - assert.eq(outCollBySK.find().itcount(), numDocs); - - st.stop(); -}()); diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 9d023d1cd45..edcd4af42b1 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -65,23 +65,16 @@ public: private: bool supportsWriteConcern() const override { - // For an aggregate command that specifies a writeConcern, we check on mongoS whether - // there's an $out in the pipeline, and reject the command if there isn't. Otherwise, - // we forward the writeConcern to any and all aggregates sent to the shards, even for an - // aggregate which represents a part of the global pipeline that does not contain the - // $out. So if the command is from mongos we can trust that the write concern makes - // sense. Otherwise we validate that writeConcern is only passed when there's an $out - // stage. - return this->_request.body["fromMongos"].trueValue() || - Pipeline::aggSupportsWriteConcern(this->_request.body); + return Pipeline::aggSupportsWriteConcern(this->_request.body); } bool supportsReadConcern(repl::ReadConcernLevel level) const override { // Aggregations that are run directly against a collection allow any read concern. // Otherwise, if the aggregate is collectionless then the read concern must be 'local' // (e.g. $currentOp). The exception to this is a $changeStream on a whole database, - // which is considered collectionless but must be read concern 'majority'. Further read - // concern validation is done once the pipeline is parsed. + // which is + // considered collectionless but must be read concern 'majority'. Further read concern + // validation is done one the pipeline is parsed. return level == repl::ReadConcernLevel::kLocalReadConcern || level == repl::ReadConcernLevel::kMajorityReadConcern || !AggregationRequest::parseNs(_dbName, _request.body).isCollectionlessAggregateNS(); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 339a7a74b40..2c3e6ccdd84 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -216,26 +216,6 @@ public: Document _result; }; - /** - * A struct representing the information needed to execute this stage on a distributed - * collection. - */ - struct MergingLogic { - - // A stage which executes on each shard in parallel, or nullptr if nothing can be done in - // parallel. For example, a partial $group before a subsequent global $group. - boost::intrusive_ptr<DocumentSource> shardsStage = nullptr; - - // A stage which executes after merging all the results together, or nullptr if nothing is - // necessary after merging. For example, a $limit stage. - boost::intrusive_ptr<DocumentSource> mergingStage = nullptr; - - // If set, each document is expected to have sort key metadata which will be serialized in - // the '$sortKey' field. 'inputSortPattern' will then be used to describe which fields are - // ascending and which fields are descending when merging the streams together. - boost::optional<BSONObj> inputSortPattern = boost::none; - }; - virtual ~DocumentSource() {} /** @@ -474,26 +454,6 @@ public: return DepsTracker::State::NOT_SUPPORTED; } - /** - * If this stage can be run in parallel across a distributed collection, returns boost::none. - * Otherwise, returns a struct representing what needs to be done to merge each shard's pipeline - * into a single stream of results. Must not mutate the existing source object; if different - * behaviour is required, a new source should be created and configured appropriately. It is an - * error for the returned MergingLogic to have identical pointers for 'shardsStage' and - * 'mergingStage'. - */ - virtual boost::optional<MergingLogic> mergingLogic() = 0; - - /** - * Returns true if it would be correct to execute this stage in parallel across the shards in - * cases where the final stage is an $out. For example, a $group stage which is just merging the - * groups from the shards can be run in parallel since it will preserve the shard key. - */ - virtual bool canRunInParallelBeforeOut( - const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { - return false; - } - protected: explicit DocumentSource(const boost::intrusive_ptr<ExpressionContext>& pExpCtx); @@ -547,4 +507,59 @@ private: boost::optional<ExplainOptions::Verbosity> explain = boost::none) const = 0; }; +/** + * This class marks DocumentSources that should be split between the merger and the shards. See + * Pipeline::Optimizations::Sharded::findSplitPoint() for details. + */ +class NeedsMergerDocumentSource { +public: + /** + * A struct representing the information needed to merge the cursors for the shards half of this + * pipeline. If 'inputSortPattern' is set, each document is expected to have sort key metadata + * which will be serialized in the '$sortKey' field. 'inputSortPattern' will then be used to + * describe which fields are ascending and which fields are descending when merging the streams + * together. + */ + struct MergingLogic { + MergingLogic(boost::intrusive_ptr<DocumentSource>&& mergingStage, + boost::optional<BSONObj> inputSortPattern = boost::none) + : mergingStage(std::move(mergingStage)), inputSortPattern(inputSortPattern) {} + + boost::intrusive_ptr<DocumentSource> mergingStage; + boost::optional<BSONObj> inputSortPattern; + }; + + /** + * Returns a source to be run on the shards, or NULL if no work should be done on the shards for + * this stage. Must not mutate the existing source object; if different behaviour is required in + * the split-pipeline case, a new source should be created and configured appropriately. It is + * an error for getShardSource() to return a pointer to the same object as getMergeSource(), + * since this can result in the source being stitched into both the shard and merge pipelines + * when the latter is executed on mongoS. + */ + virtual boost::intrusive_ptr<DocumentSource> getShardSource() = 0; + + /** + * Returns a struct representing what needs to be done to merge each shard's pipeline into a + * single stream of results. Must not mutate the existing source object; if different behaviour + * is required, a new source should be created and configured appropriately. It is an error for + * mergingLogic() to return a pointer to the same object as getShardSource(). + */ + virtual MergingLogic mergingLogic() = 0; + + /** + * Returns true if it would be correct to execute this stage in parallel across the shards in + * cases where the final stage is an $out. For example, a $group stage which is just merging the + * groups from the shards can be run in parallel since it will preserve the shard key. + */ + virtual bool canRunInParallelBeforeOut( + const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const { + return false; + } + +protected: + // It is invalid to delete through a NeedsMergerDocumentSource-typed pointer. + virtual ~NeedsMergerDocumentSource() {} +}; + } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index cbcce1f46bc..e75d4d16558 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -43,7 +43,7 @@ namespace mongo { * The $bucketAuto stage takes a user-specified number of buckets and automatically determines * boundaries such that the values are approximately equally distributed between those buckets. */ -class DocumentSourceBucketAuto final : public DocumentSource { +class DocumentSourceBucketAuto final : public DocumentSource, public NeedsMergerDocumentSource { public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; DepsTracker::State getDependencies(DepsTracker* deps) const final; @@ -62,9 +62,11 @@ public: /** * The $bucketAuto stage must be run on the merging shard. */ - boost::optional<MergingLogic> mergingLogic() final { - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + MergingLogic mergingLogic() final { + return {this}; } static const uint64_t kDefaultMaxMemoryUsageBytes = 100 * 1024 * 1024; diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h index b4e91e11c39..4c3c8ee7960 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h +++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h @@ -42,7 +42,7 @@ namespace mongo { * "invalidate" entries. * It is not intended to be created by the user. */ -class DocumentSourceCloseCursor final : public DocumentSource { +class DocumentSourceCloseCursor final : public DocumentSource, public NeedsMergerDocumentSource { public: GetNextResult getNext() final; @@ -76,11 +76,14 @@ public: return new DocumentSourceCloseCursor(expCtx); } - boost::optional<MergingLogic> mergingLogic() final { + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + + MergingLogic mergingLogic() final { // This stage must run on mongos to ensure it sees any invalidation in the correct order, // and to ensure that all remote cursors are cleaned up properly. - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, change_stream_constants::kSortSpec}; + return {this, change_stream_constants::kSortSpec}; } private: diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 14c5f51a05f..63f516daad9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -54,11 +54,6 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const; StageConstraints constraints(Pipeline::SplitState pipeState) const final; - - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - DocumentSource::GetNextResult getNext(); const char* getSourceName() const { return DocumentSourceChangeStream::kStageName.rawData(); diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h index 3a6ea3da51c..863199016db 100644 --- a/src/mongo/db/pipeline/document_source_check_invalidate.h +++ b/src/mongo/db/pipeline/document_source_check_invalidate.h @@ -58,10 +58,6 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { // This stage is created by the DocumentSourceChangeStream stage, so serializing it here // would result in it being created twice. diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 6dd3c949420..7489e156029 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -73,10 +73,6 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( @@ -97,7 +93,8 @@ private: * This stage is used internally for change streams to ensure that the resume token is in the * stream. It is not intended to be created by the user. */ -class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource { +class DocumentSourceEnsureResumeTokenPresent final : public DocumentSource, + public NeedsMergerDocumentSource { public: // Used to record the results of comparing the token data extracted from documents in the // resumed stream against the client's resume token. @@ -122,18 +119,21 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { - MergingLogic logic; + /** + * NeedsMergerDocumentSource methods; this has to run on the merger, since the resume point + * could be at any shard. Also add a DocumentSourceShardCheckResumability stage on the shards + * pipeline to ensure that each shard has enough oplog history to resume the change stream. + */ + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return DocumentSourceShardCheckResumability::create(pExpCtx, + _tokenFromClient.getClusterTime()); + }; + + MergingLogic mergingLogic() final { // This stage must run on mongos to ensure it sees the resume token, which could have come // from any shard. We also must include a mergingPresorted $sort stage to communicate to // the AsyncResultsMerger that we need to merge the streams in a particular order. - logic.mergingStage = this; - // Also add logic to the shards to ensure that each shard has enough oplog history to resume - // the change stream. - logic.shardsStage = DocumentSourceShardCheckResumability::create( - pExpCtx, _tokenFromClient.getClusterTime()); - logic.inputSortPattern = change_stream_constants::kSortSpec; - return logic; + return {this, change_stream_constants::kSortSpec}; }; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index d306a867475..b71898dbca9 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -88,10 +88,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index fd2ef5e778e..19854342516 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -125,10 +125,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 5943070b48f..d15c23c2849 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -70,10 +70,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - void detachFromOperationContext() final; void reattachToOperationContext(OperationContext* opCtx) final; diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index 414ef2b2f78..071c04ec32d 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -214,10 +214,6 @@ public: TransactionRequirement::kAllowed}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 570e55b1907..bc8f8dad307 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -57,7 +57,7 @@ class NamespaceString; * stage which will produce a document like the following: * {facetA: [<all input documents except the first one>], facetB: [<the first document>]}. */ -class DocumentSourceFacet final : public DocumentSource { +class DocumentSourceFacet final : public DocumentSource, public NeedsMergerDocumentSource { public: struct FacetPipeline { FacetPipeline(std::string name, std::unique_ptr<Pipeline, PipelineDeleter> pipeline) @@ -126,9 +126,11 @@ public: * TODO SERVER-24154: Should be smarter about splitting so that parts of the sub-pipelines can * potentially be run in parallel on multiple shards. */ - boost::optional<MergingLogic> mergingLogic() final { - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + MergingLogic mergingLogic() final { + return {this}; } const std::vector<FacetPipeline>& getFacetPipelines() const { diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index e80feb995bd..4a9cee80e3f 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -237,9 +237,8 @@ DepsTracker::State DocumentSourceGeoNear::getDependencies(DepsTracker* deps) con DocumentSourceGeoNear::DocumentSourceGeoNear(const intrusive_ptr<ExpressionContext>& pExpCtx) : DocumentSource(pExpCtx), coordsIsArray(false), spherical(false) {} -boost::optional<DocumentSource::MergingLogic> DocumentSourceGeoNear::mergingLogic() { - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{this, nullptr, BSON(distanceField->fullPath() << 1)}; +NeedsMergerDocumentSource::MergingLogic DocumentSourceGeoNear::mergingLogic() { + return {nullptr, BSON(distanceField->fullPath() << 1)}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index d95d59dd402..13f915afd2f 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -35,7 +35,7 @@ namespace mongo { -class DocumentSourceGeoNear : public DocumentSource { +class DocumentSourceGeoNear : public DocumentSource, public NeedsMergerDocumentSource { public: static constexpr StringData kKeyFieldName = "key"_sd; static constexpr auto kStageName = "$geoNear"; @@ -121,9 +121,16 @@ public: BSONObj asNearQuery(StringData nearFieldName) const; /** + * This document source is sent as-is to the shards. + */ + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return this; + } + + /** * In a sharded cluster, this becomes a merge sort by distance, from nearest to furthest. */ - boost::optional<MergingLogic> mergingLogic() final; + MergingLogic mergingLogic() final; private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 42ef2b5ca38..cc186225ce8 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -67,10 +67,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - DepsTracker::State getDependencies(DepsTracker* deps) const final { _startWith->addDependencies(deps); return DepsTracker::State::SEE_NEXT; diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index dee7cd104d4..318a67df1ff 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -950,7 +950,11 @@ Document DocumentSourceGroup::makeDocument(const Value& id, return out.freeze(); } -boost::optional<DocumentSource::MergingLogic> DocumentSourceGroup::mergingLogic() { +intrusive_ptr<DocumentSource> DocumentSourceGroup::getShardSource() { + return this; // No modifications necessary when on shard +} + +NeedsMergerDocumentSource::MergingLogic DocumentSourceGroup::mergingLogic() { intrusive_ptr<DocumentSourceGroup> mergingGroup(new DocumentSourceGroup(pExpCtx)); mergingGroup->setDoingMerge(true); @@ -969,8 +973,7 @@ boost::optional<DocumentSource::MergingLogic> DocumentSourceGroup::mergingLogic( mergingGroup->addAccumulator(copiedAccumuledField); } - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{this, mergingGroup, boost::none}; + return {mergingGroup}; } bool DocumentSourceGroup::pathIncludedInGroupKeys(const std::string& dottedPath) const { diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index 6348a83ab46..a1d6fdad059 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -87,7 +87,7 @@ private: std::string _groupId; }; -class DocumentSourceGroup final : public DocumentSource { +class DocumentSourceGroup final : public DocumentSource, public NeedsMergerDocumentSource { public: using Accumulators = std::vector<boost::intrusive_ptr<Accumulator>>; using GroupsMap = ValueUnorderedMap<Accumulators>; @@ -162,7 +162,9 @@ public: */ bool usedDisk() final; - boost::optional<MergingLogic> mergingLogic() final; + // Virtuals for NeedsMergerDocumentSource. + boost::intrusive_ptr<DocumentSource> getShardSource() final; + MergingLogic mergingLogic() final; bool canRunInParallelBeforeOut( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final; diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index d07f41889ff..bde6dfdeb32 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -582,12 +582,13 @@ protected: intrusive_ptr<DocumentSource> createMerger() { // Set up a group merger to simulate merging results in the router. In this // case only one shard is in use. - auto mergeLogic = group()->mergingLogic(); - ASSERT(mergeLogic); - ASSERT(mergeLogic->mergingStage); - ASSERT_NOT_EQUALS(group(), mergeLogic->mergingStage); - ASSERT_FALSE(static_cast<bool>(mergeLogic->inputSortPattern)); - return mergeLogic->mergingStage; + NeedsMergerDocumentSource* splittable = dynamic_cast<NeedsMergerDocumentSource*>(group()); + ASSERT(splittable); + auto mergeLogic = splittable->mergingLogic(); + ASSERT(mergeLogic.mergingStage); + ASSERT_NOT_EQUALS(group(), mergeLogic.mergingStage); + ASSERT_FALSE(static_cast<bool>(mergeLogic.inputSortPattern)); + return mergeLogic.mergingStage; } void checkResultSet(const intrusive_ptr<DocumentSource>& sink) { // Load the results from the DocumentSourceGroup and sort them by _id. diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 0713e85f368..51d92262b0f 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -83,10 +83,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index b9e22e6cbf4..401087d12e6 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -63,10 +63,6 @@ public: TransactionRequirement::kAllowed}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - GetNextResult getNext() final; private: diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index 3abcc737db4..8f0a8780dbe 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -43,7 +43,8 @@ namespace mongo { * pipeline will be executed on mongoS if all other stages are eligible, and will be sent to a * random participating shard otherwise. */ -class DocumentSourceInternalSplitPipeline final : public DocumentSource { +class DocumentSourceInternalSplitPipeline final : public DocumentSource, + public NeedsMergerDocumentSource { public: static constexpr StringData kStageName = "$_internalSplitPipeline"_sd; @@ -59,9 +60,12 @@ public: return kStageName.rawData(); } - boost::optional<MergingLogic> mergingLogic() final { - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + + MergingLogic mergingLogic() final { + return {this}; } StageConstraints constraints(Pipeline::SplitState pipeState) const final { diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 78d325c9375..885c0f5c875 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -34,7 +34,7 @@ namespace mongo { -class DocumentSourceLimit final : public DocumentSource { +class DocumentSourceLimit final : public DocumentSource, public NeedsMergerDocumentSource { public: static constexpr StringData kStageName = "$limit"_sd; @@ -81,14 +81,21 @@ public: } /** - * Returns a MergingLogic with two identical $limit stages; one for the shards pipeline and one - * for the merging pipeline. + * Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on + * the shards is an optimization, but is not strictly necessary in order to produce correct + * pipeline output. */ - boost::optional<MergingLogic> mergingLogic() final { - // Running this stage on the shards is an optimization, but is not strictly necessary in - // order to produce correct pipeline output. - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{this, DocumentSourceLimit::create(pExpCtx, _limit), boost::none}; + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return this; + } + + /** + * Returns a new DocumentSourceLimit with the same limit as the current stage, for use in the + * merge pipeline. Unlike the shards source, it is necessary for this stage to run on the + * merging host in order to produce correct pipeline output. + */ + MergingLogic mergingLogic() final { + return {DocumentSourceLimit::create(pExpCtx, _limit)}; } long long getLimit() const { diff --git a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h index a6312e8a2b9..47fbb2759cd 100644 --- a/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h +++ b/src/mongo/db/pipeline/document_source_list_cached_and_active_users.h @@ -103,10 +103,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 4c0aa30f4fc..9e0fa9e7363 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -117,10 +117,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 8a0c068f766..2a2b3338789 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -47,7 +47,7 @@ namespace mongo { * Queries separate collection for equality matches with documents in the pipeline collection. * Adds matching documents to a new array field in the input document. */ -class DocumentSourceLookUp final : public DocumentSource { +class DocumentSourceLookUp final : public DocumentSource, public NeedsMergerDocumentSource { public: static constexpr size_t kMaxSubPipelineDepth = 20; @@ -110,9 +110,12 @@ public: return DocumentSource::truncateSortSet(pSource->getOutputSorts(), {_as.fullPath()}); } - boost::optional<MergingLogic> mergingLogic() final { - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + + MergingLogic mergingLogic() final { + return {this}; } void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index cd2f8175e00..9e63d3f5dcd 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -77,10 +77,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - DepsTracker::State getDependencies(DepsTracker* deps) const { // The namespace is not technically needed yet, but we will if there is more than one // collection involved. diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index d770006e2d4..fd08a52250c 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -162,10 +162,6 @@ public: const std::string& path, const boost::intrusive_ptr<ExpressionContext>& expCtx); - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - protected: DocumentSourceMatch(const BSONObj& query, const boost::intrusive_ptr<ExpressionContext>& expCtx); diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index b5d6dffde47..617e0cf462c 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -98,10 +98,6 @@ public: return {GetModPathsReturn::Type::kFiniteSet, std::set<std::string>{}, {}}; } - boost::optional<MergingLogic> mergingLogic() override { - return boost::none; - } - // Return documents from front of queue. std::deque<GetNextResult> queue; diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index cfd9249146b..906a30f10ee 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -65,7 +65,7 @@ public: /** * Abstract class for the $out aggregation stage. */ -class DocumentSourceOut : public DocumentSource { +class DocumentSourceOut : public DocumentSource, public NeedsMergerDocumentSource { public: /** * A "lite parsed" $out stage is similar to other stages involving foreign collections except in @@ -125,12 +125,8 @@ public: PositionRequirement::kLast, // A $out to an unsharded collection should merge on the primary shard to perform // local writes. A $out to a sharded collection has no requirement, since each shard - // can perform its own portion of the write. We use 'kAnyShard' to direct it to - // execute on one of the shards in case some of the writes happen to end up being - // local. - pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs) - ? HostTypeRequirement::kAnyShard - : HostTypeRequirement::kPrimaryShard, + // can perform its own portion of the write. + HostTypeRequirement::kPrimaryShard, DiskUseRequirement::kWritesPersistentData, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed}; @@ -144,17 +140,12 @@ public: return _mode; } - boost::optional<MergingLogic> mergingLogic() final { - // It should always be faster to avoid splitting the pipeline if the output collection is - // sharded. If we avoid splitting the pipeline then each shard can perform the writes to the - // target collection in parallel. - if (pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _outputNs)) { - return boost::none; - } - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + MergingLogic mergingLogic() final { + return {this}; } - virtual bool canRunInParallelBeforeOut( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final { // If someone is asking the question, this must be the $out stage in question, so yes! diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats.h b/src/mongo/db/pipeline/document_source_plan_cache_stats.h index 3c1beb37e76..a17225b1a60 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats.h +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats.h @@ -105,10 +105,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - const char* getSourceName() const override { return kStageName; } diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index 70cc16e1492..0c52b453e7f 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -52,10 +52,6 @@ public: ChangeStreamRequirement::kWhitelist}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - /** * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact * stage. diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 2728f5607f7..90b2e82da1f 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -117,22 +117,17 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::createFromBson( return sample; } +intrusive_ptr<DocumentSource> DocumentSourceSample::getShardSource() { + return this; +} -boost::optional<DocumentSource::MergingLogic> DocumentSourceSample::mergingLogic() { +NeedsMergerDocumentSource::MergingLogic DocumentSourceSample::mergingLogic() { // On the merger we need to merge the pre-sorted documents by their random values, then limit to - // the number we need. - MergingLogic logic; - logic.shardsStage = this; - if (_size > 0) { - logic.mergingStage = DocumentSourceLimit::create(pExpCtx, _size); - } - - // Here we don't use 'randSortSpec' because it uses a metadata sort which the merging logic does - // not understand. The merging logic will use the serialized sort key, and this sort pattern is - // just used to communicate ascending/descending information. A pattern like {$meta: "randVal"} - // is neither ascending nor descending, and so will not be useful when constructing the merging - // logic. - logic.inputSortPattern = BSON("$rand" << -1); - return logic; + // the number we need. Here we don't use 'randSortSpec' because it uses a metadata sort which + // the merging logic does not understand. The merging logic will use the serialized sort key, + // and this sort pattern is just used to communicate ascending/descending information. A pattern + // like {$meta: "randVal"} is neither ascending nor descending, and so will not be useful when + // constructing the merging logic. + return {_size > 0 ? DocumentSourceLimit::create(pExpCtx, _size) : nullptr, BSON("$rand" << -1)}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index eec90e7c24f..11aa6fea8d0 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -35,7 +35,7 @@ namespace mongo { -class DocumentSourceSample final : public DocumentSource { +class DocumentSourceSample final : public DocumentSource, public NeedsMergerDocumentSource { public: static constexpr StringData kStageName = "$sample"_sd; @@ -58,7 +58,8 @@ public: return DepsTracker::State::SEE_NEXT; } - boost::optional<MergingLogic> mergingLogic() final; + boost::intrusive_ptr<DocumentSource> getShardSource() final; + MergingLogic mergingLogic() final; long long getSampleSize() const { return _size; diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index a095ade9fc0..60289140440 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -55,10 +55,6 @@ public: TransactionRequirement::kAllowed}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, long long size, diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index c6b4f21ab45..48e14142829 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -64,10 +64,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - GetNextResult getNext() final; static boost::intrusive_ptr<DocumentSourceSequentialDocumentCache> create( diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 7f5d2196ab0..1b9407b9d6b 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -72,10 +72,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - TransformerInterface::TransformerType getType() const { return _parsedTransform->getType(); } diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index 7a384d083ed..883cde202bd 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -34,7 +34,7 @@ namespace mongo { -class DocumentSourceSkip final : public DocumentSource { +class DocumentSourceSkip final : public DocumentSource, public NeedsMergerDocumentSource { public: static constexpr StringData kStageName = "$skip"_sd; @@ -84,12 +84,11 @@ public: return DepsTracker::State::SEE_NEXT; // This doesn't affect needed fields } - /** - * The $skip stage must run on the merging half of the pipeline. - */ - boost::optional<MergingLogic> mergingLogic() final { - // {shardsStage, mergingStage, sortPattern} - return MergingLogic{nullptr, this, boost::none}; + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + } + MergingLogic mergingLogic() final { + return {this}; } long long getSkip() const { diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index aa48535f810..475897471ed 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -524,14 +524,13 @@ int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const { return 0; } -boost::optional<DocumentSource::MergingLogic> DocumentSourceSort::mergingLogic() { - MergingLogic split; - split.shardsStage = this; - split.inputSortPattern = sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson(); - if (_limitSrc) { - split.mergingStage = DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()); - } - return split; +intrusive_ptr<DocumentSource> DocumentSourceSort::getShardSource() { + return this; +} + +NeedsMergerDocumentSource::MergingLogic DocumentSourceSort::mergingLogic() { + return {_limitSrc ? DocumentSourceLimit::create(pExpCtx, _limitSrc->getLimit()) : nullptr, + sortKeyPattern(SortKeySerialization::kForSortKeyMerging).toBson()}; } bool DocumentSourceSort::canRunInParallelBeforeOut( diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 80ef1ed525a..4ee2584bc45 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -39,7 +39,7 @@ namespace mongo { -class DocumentSourceSort final : public DocumentSource { +class DocumentSourceSort final : public DocumentSource, public NeedsMergerDocumentSource { public: static constexpr StringData kStageName = "$sort"_sd; enum class SortKeySerialization { @@ -83,7 +83,8 @@ public: DepsTracker::State getDependencies(DepsTracker* deps) const final; - boost::optional<MergingLogic> mergingLogic() final; + boost::intrusive_ptr<DocumentSource> getShardSource() final; + MergingLogic mergingLogic() final; bool canRunInParallelBeforeOut( const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) const final; diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 4d0fb324e60..91e54dc7aaf 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -132,9 +132,8 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { sort()->serializeToArray(arr); ASSERT_BSONOBJ_EQ(arr[0].getDocument().toBson(), BSON("$sort" << BSON("a" << 1))); - ASSERT(sort()->mergingLogic()); - ASSERT(sort()->mergingLogic()->shardsStage != nullptr); - ASSERT(sort()->mergingLogic()->mergingStage == nullptr); + ASSERT(sort()->getShardSource() != nullptr); + ASSERT(sort()->mergingLogic().mergingStage == nullptr); } container.push_back(DocumentSourceLimit::create(expCtx, 10)); @@ -160,10 +159,9 @@ TEST_F(DocumentSourceSortTest, SortWithLimit) { Value(arr), DOC_ARRAY(DOC("$sort" << DOC("a" << 1)) << DOC("$limit" << sort()->getLimit()))); - ASSERT(sort()->mergingLogic()); - ASSERT(sort()->mergingLogic()->shardsStage != nullptr); - ASSERT(sort()->mergingLogic()->mergingStage != nullptr); - ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic()->mergingStage.get())); + ASSERT(sort()->getShardSource() != nullptr); + ASSERT(sort()->mergingLogic().mergingStage != nullptr); + ASSERT(dynamic_cast<DocumentSourceLimit*>(sort()->mergingLogic().mergingStage.get())); } TEST_F(DocumentSourceSortTest, Dependencies) { diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index 04a7f96fa31..a642533fdf7 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -64,10 +64,6 @@ public: TransactionRequirement::kAllowed}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - GetNextResult getNext() final; /** diff --git a/src/mongo/db/pipeline/document_source_test_optimizations.h b/src/mongo/db/pipeline/document_source_test_optimizations.h index 8739dea49d2..b58b35566e4 100644 --- a/src/mongo/db/pipeline/document_source_test_optimizations.h +++ b/src/mongo/db/pipeline/document_source_test_optimizations.h @@ -56,10 +56,6 @@ public: TransactionRequirement::kNotAllowed}; } - virtual boost::optional<MergingLogic> mergingLogic() override { - return boost::none; - } - virtual GetModPathsReturn getModifiedPaths() const override { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index 8317bf229dd..787249aec6e 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -60,10 +60,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - DepsTracker::State getDependencies(DepsTracker* deps) const final; /** diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 21a5d73c94d..b01585619b6 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -161,9 +161,7 @@ public: BSONObjBuilder* builder) const = 0; /** - * Gets the collection options for the collection given by 'nss'. Throws - * ErrorCodes::CommandNotSupportedOnView if 'nss' describes a view. Future callers may want to - * parameterize this behavior. + * Gets the collection options for the collection given by 'nss'. */ virtual BSONObj getCollectionOptions(const NamespaceString& nss) = 0; diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 68aa5ae9d3d..effb02d4e6f 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -28,7 +28,7 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" @@ -298,6 +298,11 @@ BSONObj MongoSInterface::createCommandForTargetedShards( // notifies the shards that the mongoS is capable of merging streams based on resume token. // TODO SERVER-38539: the 'mergeByPBRT' flag is no longer necessary in 4.4. targetedCmd[AggregationRequest::kMergeByPBRTName] = Value(litePipe.hasChangeStream()); + + // For split pipelines which need merging, do *not* propagate the writeConcern to the shards + // part. Otherwise this is part of an exchange and in that case we should include the + // writeConcern. + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); } targetedCmd[AggregationRequest::kCursorName] = @@ -415,10 +420,6 @@ MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipe boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; if (needsSplit) { - LOG(5) << "Splitting pipeline: " - << "targeting = " << shardIds.size() - << " shards, needsMongosMerge = " << needsMongosMerge - << ", needsPrimaryShardMerge = " << needsPrimaryShardMerge; splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 1fddc890a17..725c655be44 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -390,7 +390,8 @@ bool Pipeline::requiredToRunOnMongos() const { for (auto&& stage : _sources) { // If this pipeline is capable of splitting before the mongoS-only stage, then the pipeline // as a whole is not required to run on mongoS. - if (_splitState == SplitState::kUnsplit && stage->mergingLogic()) { + if (_splitState == SplitState::kUnsplit && + dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) { return false; } diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 38c521d71e4..aeef3174c5c 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -142,8 +142,7 @@ DBClientBase* MongoInterfaceStandalone::directClient() { } bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); - Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS); + AutoGetCollectionForRead autoColl(opCtx, nss); const auto metadata = CollectionShardingState::get(opCtx, nss)->getCurrentMetadata(); return metadata->isSharded(); } @@ -277,14 +276,7 @@ Status MongoInterfaceStandalone::appendRecordCount(OperationContext* opCtx, BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) { const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - if (infos.empty()) { - return BSONObj(); - } - auto res = infos.front(); - uassert(ErrorCodes::CommandNotSupportedOnView, - str::stream() << nss.toString() << " is a view, not a collection", - res["type"].String() != "view"); - return res.getObjectField("options").getOwned(); + return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); } void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index c0329d181b1..2fd4d769bfb 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -66,6 +66,8 @@ namespace { * * It is not safe to call this optimization multiple times. * + * NOTE: looks for NeedsMergerDocumentSources and uses that API + * * Returns the sort specification if the input streams are sorted, and false otherwise. */ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) { @@ -73,23 +75,28 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront(); // Check if this source is splittable. - auto mergeLogic = current->mergingLogic(); - if (!mergeLogic) { + NeedsMergerDocumentSource* splittable = + dynamic_cast<NeedsMergerDocumentSource*>(current.get()); + + if (!splittable) { // Move the source from the merger _sources to the shard _sources. shardPipe->push_back(current); - continue; - } + } else { + // Split this source into 'merge' and 'shard' _sources. + boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); + auto mergeLogic = splittable->mergingLogic(); - // A source may not simultaneously be present on both sides of the split. - invariant(mergeLogic->shardsStage != mergeLogic->mergingStage); + // A source may not simultaneously be present on both sides of the split. + invariant(shardSource != mergeLogic.mergingStage); - if (mergeLogic->shardsStage) - shardPipe->push_back(std::move(mergeLogic->shardsStage)); + if (shardSource) + shardPipe->push_back(std::move(shardSource)); - if (mergeLogic->mergingStage) - mergePipe->addInitialSource(std::move(mergeLogic->mergingStage)); + if (mergeLogic.mergingStage) + mergePipe->addInitialSource(std::move(mergeLogic.mergingStage)); - return mergeLogic->inputSortPattern; + return mergeLogic.inputSortPattern; + } } return boost::none; } @@ -288,8 +295,8 @@ ClusterClientCursorGuard convertPipelineToRouterStages( bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage, const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) { - if (stage->mergingLogic()) { - return stage->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage); + if (auto needsMerger = dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) { + return needsMerger->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage); } else { // This stage is fine to execute in parallel on each stream. For example, a $match can be // applied to each stream in parallel. diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index 259b7ea088b..f733d2c3088 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -95,10 +95,6 @@ public: return constraints; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - GetNextResult getNext() final; std::size_t getNumRemotes() const; diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h index 37a32a774a3..7f6833b5494 100644 --- a/src/mongo/s/query/document_source_update_on_add_shard.h +++ b/src/mongo/s/query/document_source_update_on_add_shard.h @@ -73,10 +73,6 @@ public: ChangeStreamRequirement::kChangeStreamStage}; } - boost::optional<MergingLogic> mergingLogic() final { - return boost::none; - } - GetNextResult getNext() final; private: |