diff options
author | Charlie Swanson <cswanson310@gmail.com> | 2016-08-29 18:02:04 -0400 |
---|---|---|
committer | Charlie Swanson <cswanson310@gmail.com> | 2016-09-01 14:08:25 -0400 |
commit | d289e240b653e70a7d90be885a3ad6de21b7c6cb (patch) | |
tree | 5a19c87341c92bc0307733229d114f5d5fe899e6 | |
parent | a36409076c3de29c15e497ca3065e775e7369fa3 (diff) | |
download | mongo-d289e240b653e70a7d90be885a3ad6de21b7c6cb.tar.gz |
SERVER-25864 Propagate calls to needsPrimaryShardMerger through $facet.
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_facet_test.cpp | 44 |
3 files changed, 60 insertions, 3 deletions
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index 0c3e2981d79..312ed25d1db 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -150,6 +150,18 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) } } +bool DocumentSourceFacet::needsPrimaryShard() const { + // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). + // This means that if any stage in any of the $facet pipelines requires the primary shard, then + // the entire $facet must happen on the merger, and the merger must be the primary shard. + for (auto&& facet : _facetPipelines) { + if (facet.second->needsPrimaryShardMerger()) { + return true; + } + } + return false; +} + DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { for (auto&& facet : _facetPipelines) { auto subDepsTracker = facet.second->getDependencies(deps->getMetadataAvailable()); diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h index 267ebe58bb2..81f7b23e9cf 100644 --- a/src/mongo/db/pipeline/document_source_facet.h +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -53,9 +53,6 @@ class Pipeline; * For example, {$facet: {facetA: [{$skip: 1}], facetB: [{$limit: 1}]}} would describe a $facet * stage which will produce a document like the following: * {facetA: [<all input documents except the first one>], facetB: [<the first document>]}. - * - * TODO SERVER-24154: Should inherit from SplittableDocumentSource so that it can split in a sharded - * cluster. */ class DocumentSourceFacet final : public DocumentSourceNeedsMongod, public SplittableDocumentSource { @@ -98,6 +95,9 @@ public: /** * The $facet stage must be run on the merging shard. + * + * TODO SERVER-24154: Should be smarter about splitting so that parts of the sub-pipelines can + * potentially be run in parallel on multiple shards. */ boost::intrusive_ptr<DocumentSource> getShardSource() final { return nullptr; @@ -111,6 +111,7 @@ public: void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final; void doDetachFromOperationContext() final; void doReattachToOperationContext(OperationContext* opCtx) final; + bool needsPrimaryShard() const final; private: DocumentSourceFacet(StringMap<boost::intrusive_ptr<Pipeline>> facetPipelines, diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 519b8d0a222..c55a4024c5a 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -490,5 +490,49 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs ASSERT_THROWS(facetStage->getDependencies(&deps), UserException); } +/** + * A dummy DocumentSource which needs to run on the primary shard. + */ +class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough { +public: + bool needsPrimaryShard() const final { + return true; + } + + static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() { + return new DocumentSourceNeedsPrimaryShard(); + } +}; + +TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrimaryShard) { + auto ctx = getExpCtx(); + + auto passthrough = DocumentSourcePassthrough::create(); + auto firstPipeline = unittest::assertGet(Pipeline::create({passthrough}, ctx)); + + auto needsPrimaryShard = DocumentSourceNeedsPrimaryShard::create(); + auto secondPipeline = unittest::assertGet(Pipeline::create({needsPrimaryShard}, ctx)); + + auto facetStage = DocumentSourceFacet::create( + {{"passthrough", firstPipeline}, {"needsPrimaryShard", secondPipeline}}, ctx); + + ASSERT_TRUE(facetStage->needsPrimaryShard()); +} + +TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) { + auto ctx = getExpCtx(); + + auto firstPassthrough = DocumentSourcePassthrough::create(); + auto firstPipeline = unittest::assertGet(Pipeline::create({firstPassthrough}, ctx)); + + auto secondPassthrough = DocumentSourcePassthrough::create(); + auto secondPipeline = unittest::assertGet(Pipeline::create({secondPassthrough}, ctx)); + + auto facetStage = + DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx); + + ASSERT_FALSE(facetStage->needsPrimaryShard()); +} + } // namespace } // namespace mongo |