summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorCharlie Swanson <cswanson310@gmail.com>2016-08-29 18:02:04 -0400
committerCharlie Swanson <cswanson310@gmail.com>2016-09-01 14:08:25 -0400
commitd289e240b653e70a7d90be885a3ad6de21b7c6cb (patch)
tree5a19c87341c92bc0307733229d114f5d5fe899e6 /src/mongo/db/pipeline
parenta36409076c3de29c15e497ca3065e775e7369fa3 (diff)
downloadmongo-d289e240b653e70a7d90be885a3ad6de21b7c6cb.tar.gz
SERVER-25864 Propagate calls to needsPrimaryShardMerger through $facet.
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h7
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp44
3 files changed, 60 insertions, 3 deletions
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 0c3e2981d79..312ed25d1db 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -150,6 +150,18 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx)
}
}
+bool DocumentSourceFacet::needsPrimaryShard() const {
+ // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
+ // This means that if any stage in any of the $facet pipelines requires the primary shard, then
+ // the entire $facet must happen on the merger, and the merger must be the primary shard.
+ for (auto&& facet : _facetPipelines) {
+ if (facet.second->needsPrimaryShardMerger()) {
+ return true;
+ }
+ }
+ return false;
+}
+
DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
for (auto&& facet : _facetPipelines) {
auto subDepsTracker = facet.second->getDependencies(deps->getMetadataAvailable());
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
index 267ebe58bb2..81f7b23e9cf 100644
--- a/src/mongo/db/pipeline/document_source_facet.h
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -53,9 +53,6 @@ class Pipeline;
* For example, {$facet: {facetA: [{$skip: 1}], facetB: [{$limit: 1}]}} would describe a $facet
* stage which will produce a document like the following:
* {facetA: [<all input documents except the first one>], facetB: [<the first document>]}.
- *
- * TODO SERVER-24154: Should inherit from SplittableDocumentSource so that it can split in a sharded
- * cluster.
*/
class DocumentSourceFacet final : public DocumentSourceNeedsMongod,
public SplittableDocumentSource {
@@ -98,6 +95,9 @@ public:
/**
* The $facet stage must be run on the merging shard.
+ *
+ * TODO SERVER-24154: Should be smarter about splitting so that parts of the sub-pipelines can
+ * potentially be run in parallel on multiple shards.
*/
boost::intrusive_ptr<DocumentSource> getShardSource() final {
return nullptr;
@@ -111,6 +111,7 @@ public:
void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final;
void doDetachFromOperationContext() final;
void doReattachToOperationContext(OperationContext* opCtx) final;
+ bool needsPrimaryShard() const final;
private:
DocumentSourceFacet(StringMap<boost::intrusive_ptr<Pipeline>> facetPipelines,
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 519b8d0a222..c55a4024c5a 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -490,5 +490,49 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs
ASSERT_THROWS(facetStage->getDependencies(&deps), UserException);
}
+/**
+ * A dummy DocumentSource which needs to run on the primary shard.
+ */
+class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough {
+public:
+ bool needsPrimaryShard() const final {
+ return true;
+ }
+
+ static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() {
+ return new DocumentSourceNeedsPrimaryShard();
+ }
+};
+
+TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrimaryShard) {
+ auto ctx = getExpCtx();
+
+ auto passthrough = DocumentSourcePassthrough::create();
+ auto firstPipeline = unittest::assertGet(Pipeline::create({passthrough}, ctx));
+
+ auto needsPrimaryShard = DocumentSourceNeedsPrimaryShard::create();
+ auto secondPipeline = unittest::assertGet(Pipeline::create({needsPrimaryShard}, ctx));
+
+ auto facetStage = DocumentSourceFacet::create(
+ {{"passthrough", firstPipeline}, {"needsPrimaryShard", secondPipeline}}, ctx);
+
+ ASSERT_TRUE(facetStage->needsPrimaryShard());
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) {
+ auto ctx = getExpCtx();
+
+ auto firstPassthrough = DocumentSourcePassthrough::create();
+ auto firstPipeline = unittest::assertGet(Pipeline::create({firstPassthrough}, ctx));
+
+ auto secondPassthrough = DocumentSourcePassthrough::create();
+ auto secondPipeline = unittest::assertGet(Pipeline::create({secondPassthrough}, ctx));
+
+ auto facetStage =
+ DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx);
+
+ ASSERT_FALSE(facetStage->needsPrimaryShard());
+}
+
} // namespace
} // namespace mongo