summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2016-06-09 19:09:42 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2016-06-24 11:51:21 -0400
commit46b4dfbdefe018e1f125a5ebac8b584b20274502 (patch)
tree9c6fea4905fdb235833d2f2010231dbacdcffb2f
parent20e9b2798d69fb2367ff9f16a1b30f4f9b73d93b (diff)
downloadmongo-46b4dfbdefe018e1f125a5ebac8b584b20274502.tar.gz
SERVER-23654 Add $facet aggregation stage
-rw-r--r--jstests/aggregation/sources/facet/inner_graphlookup.js39
-rw-r--r--jstests/aggregation/sources/facet/inner_lookup.js32
-rw-r--r--jstests/aggregation/sources/facet/use_cases.js137
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp2
-rw-r--r--src/mongo/db/pipeline/SConscript35
-rw-r--r--src/mongo/db/pipeline/aggregation_context_fixture.h63
-rw-r--r--src/mongo/db/pipeline/document_source.h29
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp200
-rw-r--r--src/mongo/db/pipeline/document_source_facet.h108
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp346
-rw-r--r--src/mongo/db/pipeline/document_source_mock.cpp9
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.cpp81
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.h68
-rw-r--r--src/mongo/db/pipeline/field_path.h2
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp19
-rw-r--r--src/mongo/db/pipeline/pipeline.h16
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp2
-rw-r--r--src/mongo/db/pipeline/tee_buffer.cpp78
-rw-r--r--src/mongo/db/pipeline/tee_buffer.h88
-rw-r--r--src/mongo/db/pipeline/tee_buffer_test.cpp138
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp3
21 files changed, 1485 insertions, 10 deletions
diff --git a/jstests/aggregation/sources/facet/inner_graphlookup.js b/jstests/aggregation/sources/facet/inner_graphlookup.js
new file mode 100644
index 00000000000..42d10149240
--- /dev/null
+++ b/jstests/aggregation/sources/facet/inner_graphlookup.js
@@ -0,0 +1,39 @@
+/**
+ * Tests that using a $graphLookup stage inside of a $facet stage will yield the same results as
+ * using the $graphLookup stage outside of the $facet stage.
+ */
+(function() {
+ "use strict";
+
+ // We will only use one collection, the $graphLookup will look up from the same collection.
+ var graphColl = db.facetGraphLookup;
+
+ // The graph in ASCII form: 0 --- 1 --- 2 3
+ graphColl.drop();
+ assert.writeOK(graphColl.insert({_id: 0, edges: [1]}));
+ assert.writeOK(graphColl.insert({_id: 1, edges: [0, 2]}));
+ assert.writeOK(graphColl.insert({_id: 2, edges: [1]}));
+ assert.writeOK(graphColl.insert({_id: 3}));
+
+ // For each document in the collection, this will compute all the other documents that are
+ // reachable from this one.
+ const graphLookupStage = {
+ $graphLookup: {
+ from: graphColl.getName(),
+ startWith: "$_id",
+ connectFromField: "edges",
+ connectToField: "_id",
+ as: "connected"
+ }
+ };
+ const normalResults = graphColl.aggregate([graphLookupStage]).toArray();
+ const facetedResults = graphColl.aggregate([{$facet: {nested: [graphLookupStage]}}]).toArray();
+ assert.eq(facetedResults, [{nested: normalResults}]);
+
+ const normalResultsUnwound =
+ graphColl.aggregate([graphLookupStage, {$unwind: "$connected"}]).toArray();
+ const facetedResultsUnwound =
+ graphColl.aggregate([{$facet: {nested: [graphLookupStage, {$unwind: "$connected"}]}}])
+ .toArray();
+ assert.eq(facetedResultsUnwound, [{nested: normalResultsUnwound}]);
+}());
diff --git a/jstests/aggregation/sources/facet/inner_lookup.js b/jstests/aggregation/sources/facet/inner_lookup.js
new file mode 100644
index 00000000000..f4890324b65
--- /dev/null
+++ b/jstests/aggregation/sources/facet/inner_lookup.js
@@ -0,0 +1,32 @@
+/**
+ * Tests that using a $lookup stage inside of a $facet stage will yield the same results as using
+ * the $lookup stage outside of the $facet stage.
+ */
+(function() {
+ "use strict";
+
+ var local = db.facetLookupLocal;
+ var foreign = db.facetLookupForeign;
+
+ local.drop();
+ assert.writeOK(local.insert({_id: 0}));
+ assert.writeOK(local.insert({_id: 1}));
+
+ foreign.drop();
+ assert.writeOK(foreign.insert({_id: 0, foreignKey: 0}));
+ assert.writeOK(foreign.insert({_id: 1, foreignKey: 1}));
+ assert.writeOK(foreign.insert({_id: 2, foreignKey: 2}));
+
+ const lookupStage = {
+ $lookup:
+ {from: foreign.getName(), localField: "_id", foreignField: "foreignKey", as: "joined"}
+ };
+ const lookupResults = local.aggregate([lookupStage]).toArray();
+ const facetedLookupResults = local.aggregate([{$facet: {nested: [lookupStage]}}]).toArray();
+ assert.eq(facetedLookupResults, [{nested: lookupResults}]);
+
+ const lookupResultsUnwound = local.aggregate([lookupStage, {$unwind: "$joined"}]).toArray();
+ const facetedLookupResultsUnwound =
+ local.aggregate([{$facet: {nested: [lookupStage, {$unwind: "$joined"}]}}]).toArray();
+ assert.eq(facetedLookupResultsUnwound, [{nested: lookupResultsUnwound}]);
+}());
diff --git a/jstests/aggregation/sources/facet/use_cases.js b/jstests/aggregation/sources/facet/use_cases.js
new file mode 100644
index 00000000000..1bb3cefdd40
--- /dev/null
+++ b/jstests/aggregation/sources/facet/use_cases.js
@@ -0,0 +1,137 @@
+/**
+ * Tests some practical use cases of the $facet stage.
+ */
+(function() {
+ "use strict";
+ const dbName = "test";
+ const collName = jsTest.name();
+
+ Random.setRandomSeed();
+
+ /**
+ * Helper to get a random entry out of an array.
+ */
+ function randomChoice(array) {
+ return array[Random.randInt(array.length)];
+ }
+
+ /**
+ * Helper to generate a randomized document with the following schema:
+ * {
+ * manufacturer: <string>,
+ * price: <double>,
+ * screenSize: <double>
+ * }
+ */
+ function generateRandomDocument() {
+ const manufacturers =
+ ["Sony", "Samsung", "LG", "Panasonic", "Mitsubishi", "Vizio", "Toshiba", "Sharp"];
+ const minPrice = 100;
+ const maxPrice = 4000;
+ const minScreenSize = 18;
+ const maxScreenSize = 40;
+
+ return {
+ manufacturer: randomChoice(manufacturers),
+ price: Random.randInt(maxPrice - minPrice + 1) + minPrice,
+ screenSize: Random.randInt(maxScreenSize - minScreenSize + 1) + minScreenSize,
+ };
+ }
+
+ function doExecutionTest(conn) {
+ var coll = conn.getDB(dbName).getCollection(collName);
+ coll.drop();
+
+ const nDocs = 1000 * 10;
+ var bulk = coll.initializeUnorderedBulkOp();
+ for (var i = 0; i < nDocs; i++) {
+ const doc = generateRandomDocument();
+ bulk.insert(doc);
+ }
+ assert.writeOK(bulk.execute());
+
+ //
+ // Compute the most common manufacturers, and the number of TVs in each price range.
+ //
+
+ // First compute each separately, to make sure we have the correct results.
+ const manufacturerPipe =
+ [{$group: {_id: "$manufacturer", count: {$sum: 1}}}, {$sort: {count: -1}}];
+ const mostCommonManufacturers = coll.aggregate(manufacturerPipe).toArray();
+
+ const pricePipe = [
+ {
+ $project: {
+ priceBucket: {
+ $switch: {
+ branches: [
+ {case: {$lt: ["$price", 500]}, then: "< 500"},
+ {case: {$lt: ["$price", 1000]}, then: "500-1000"},
+ {case: {$lt: ["$price", 1500]}, then: "1000-1500"},
+ {case: {$lt: ["$price", 2000]}, then: "1500-2000"}
+ ],
+ default: "> 2000"
+ }
+ }
+ }
+ },
+ {$group: {_id: "$priceBucket", count: {$sum: 1}}},
+ {$sort: {count: -1}}
+ ];
+ const numTVsByPriceRange = coll.aggregate(pricePipe).toArray();
+
+ // Then compute the results using $facet.
+ const facetResult =
+ coll.aggregate([{$facet: {manufacturers: manufacturerPipe, prices: pricePipe}}])
+ .toArray();
+ assert.eq(facetResult.length, 1);
+ const facetManufacturers = facetResult[0].manufacturers;
+ const facetPrices = facetResult[0].prices;
+
+ // Then assert they are the same.
+ assert.eq(facetManufacturers, mostCommonManufacturers);
+ assert.eq(facetPrices, numTVsByPriceRange);
+ }
+
+ // Test against the standalone started by resmoke.py.
+ const conn = db.getMongo();
+ doExecutionTest(conn);
+
+ // Test against a sharded cluster.
+ const st = new ShardingTest({shards: 2});
+ doExecutionTest(st.s0);
+
+ // Test that $facet stage propagates information about involved collections, preventing users
+ // from doing things like $lookup from a sharded collection.
+ const shardedDBName = "sharded";
+ const shardedCollName = "collection";
+ const shardedColl = st.getDB(shardedDBName).getCollection(shardedCollName);
+ const unshardedColl = st.getDB(shardedDBName).getCollection(collName);
+
+ assert.commandWorked(st.admin.runCommand({enableSharding: shardedDBName}));
+ assert.commandWorked(
+ st.admin.runCommand({shardCollection: shardedColl.getFullName(), key: {_id: 1}}));
+ assert.commandFailed(unshardedColl.runCommand({
+ aggregate: unshardedColl,
+ pipline: [{
+ $lookup:
+ {from: shardedCollName, localField: "_id", foreignField: "_id", as: "results"}
+ }]
+ }));
+ assert.commandFailed(unshardedColl.runCommand({
+ aggregate: unshardedColl,
+ pipeline: [{
+ $facet: {
+ a: [{
+ $lookup: {
+ from: shardedCollName,
+ localField: "_id",
+ foreignField: "_id",
+ as: "results"
+ }
+ }]
+ }
+ }]
+ }));
+ st.stop();
+}());
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 7d17775d507..c0b0e5fd833 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -186,6 +186,7 @@ boost::intrusive_ptr<Pipeline> reparsePipeline(
fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus());
}
+ reparsedPipeline.getValue()->optimizePipeline();
return reparsedPipeline.getValue();
}
@@ -261,6 +262,7 @@ public:
return appendCommandStatus(result, statusWithPipeline.getStatus());
}
auto pipeline = std::move(statusWithPipeline.getValue());
+ pipeline->optimizePipeline();
if (kDebugBuild && !expCtx->isExplain && !expCtx->inShard) {
// Make sure all operations round-trip through Pipeline::serialize() correctly by
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index fb8ac72df4d..32c0f82eae8 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -9,6 +9,7 @@ env.Library(
LIBDEPS=[
'aggregation_request',
'document_source',
+ 'document_source_facet',
'expression_context',
'pipeline',
]
@@ -217,6 +218,40 @@ env.Library(
]
)
+env.Library(
+ target='document_source_facet',
+ source=[
+ 'document_source_facet.cpp',
+ 'tee_buffer.cpp',
+ 'document_source_tee_consumer.cpp',
+ ],
+ LIBDEPS=[
+ 'document_source',
+ 'pipeline',
+ ]
+)
+
+env.CppUnitTest(
+ target='document_source_facet_test',
+ source='document_source_facet_test.cpp',
+ LIBDEPS=[
+ 'document_source_facet',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/query/query_test_service_context',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ ],
+)
+
+env.CppUnitTest(
+ target='tee_buffer_test',
+ source='tee_buffer_test.cpp',
+ LIBDEPS=[
+ 'document_source_facet',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ ],
+)
+
env.CppUnitTest(
target='agg_expression_test',
source='expression_test.cpp',
diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h
new file mode 100644
index 00000000000..75353c46e7f
--- /dev/null
+++ b/src/mongo/db/pipeline/aggregation_context_fixture.h
@@ -0,0 +1,63 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <memory>
+
+#include "mongo/db/client.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/query/query_test_service_context.h"
+#include "mongo/db/service_context_noop.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+
+/**
+ * Test fixture which provides an ExpressionContext for use in testing.
+ */
+class AggregationContextFixture : public unittest::Test {
+public:
+ AggregationContextFixture()
+ : _queryServiceContext(stdx::make_unique<QueryTestServiceContext>()),
+ _opCtx(_queryServiceContext->makeOperationContext()),
+ _expCtx(new ExpressionContext(
+ _opCtx.get(), AggregationRequest(NamespaceString("unittests.pipeline_test"), {}))) {}
+
+ boost::intrusive_ptr<ExpressionContext> getExpCtx() {
+ return _expCtx.get();
+ }
+
+private:
+ std::unique_ptr<QueryTestServiceContext> _queryServiceContext;
+ ServiceContext::UniqueOperationContext _opCtx;
+ boost::intrusive_ptr<ExpressionContext> _expCtx;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index f0da2180f2c..df3f4eecaf5 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -360,11 +360,17 @@ public:
DocumentSourceNeedsMongod(const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(expCtx) {}
- virtual void injectMongodInterface(std::shared_ptr<MongodInterface> mongod) {
+ void injectMongodInterface(std::shared_ptr<MongodInterface> mongod) {
_mongod = mongod;
+ doInjectMongodInterface(mongod);
}
- void detachFromOperationContext() final {
+ /**
+ * Derived classes may override this method to register custom inject functionality.
+ */
+ virtual void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) {}
+
+ void detachFromOperationContext() override {
invariant(_mongod);
_mongod->setOperationContext(nullptr);
doDetachFromOperationContext();
@@ -871,6 +877,8 @@ private:
class DocumentSourceMock : public DocumentSource {
public:
DocumentSourceMock(std::deque<Document> docs);
+ DocumentSourceMock(std::deque<Document> docs,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
boost::optional<Document> getNext() override;
const char* getSourceName() const override;
@@ -892,9 +900,24 @@ public:
static boost::intrusive_ptr<DocumentSourceMock> create(
const std::initializer_list<const char*>& jsons);
+ void reattachToOperationContext(OperationContext* opCtx) {
+ isDetachedFromOpCtx = false;
+ }
+
+ void detachFromOperationContext() {
+ isDetachedFromOpCtx = true;
+ }
+
+ boost::intrusive_ptr<DocumentSource> optimize() override {
+ isOptimized = true;
+ return this;
+ }
+
// Return documents from front of queue.
std::deque<Document> queue;
- bool disposed = false;
+ bool isDisposed = false;
+ bool isDetachedFromOpCtx = false;
+ bool isOptimized = false;
BSONObjSet sorts;
};
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
new file mode 100644
index 00000000000..72a69ade0db
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -0,0 +1,200 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_facet.h"
+
+#include <vector>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/bsontypes.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_tee_consumer.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/tee_buffer.h"
+#include "mongo/db/pipeline/value.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+using std::vector;
+
+DocumentSourceFacet::DocumentSourceFacet(StringMap<intrusive_ptr<Pipeline>> facetPipelines,
+ const intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceNeedsMongod(expCtx), _facetPipelines(std::move(facetPipelines)) {
+
+ // Build the tee stage, and the consumers of the tee.
+ _teeBuffer = TeeBuffer::create();
+ for (auto&& facet : _facetPipelines) {
+ auto pipeline = facet.second;
+ pipeline->addInitialSource(DocumentSourceTeeConsumer::create(pExpCtx, _teeBuffer));
+ }
+}
+
+REGISTER_DOCUMENT_SOURCE(facet, DocumentSourceFacet::createFromBson);
+
+intrusive_ptr<DocumentSourceFacet> DocumentSourceFacet::create(
+ StringMap<intrusive_ptr<Pipeline>> facetPipelines,
+ const intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceFacet(std::move(facetPipelines), expCtx);
+}
+
+void DocumentSourceFacet::setSource(DocumentSource* source) {
+ _teeBuffer->setSource(source);
+}
+
+boost::optional<Document> DocumentSourceFacet::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ if (_done) {
+ return boost::none;
+ }
+ _done = true; // We will only ever produce one result.
+
+ // Build the results by executing each pipeline serially, one at a time.
+ MutableDocument results;
+ for (auto&& facet : _facetPipelines) {
+ auto facetName = facet.first;
+ auto facetPipeline = facet.second;
+
+ std::vector<Value> facetResults;
+ while (auto next = facetPipeline->getSources().back()->getNext()) {
+ facetResults.emplace_back(std::move(*next));
+ }
+ results[facetName] = Value(std::move(facetResults));
+ }
+
+ _teeBuffer->dispose(); // Clear the buffer since we'll no longer need it.
+ return results.freeze();
+}
+
+Value DocumentSourceFacet::serialize(bool explain) const {
+ MutableDocument serialized;
+ for (auto&& facet : _facetPipelines) {
+ serialized[facet.first] =
+ Value(explain ? facet.second->writeExplainOps() : facet.second->serialize());
+ }
+ return Value(Document{{"$facet", serialized.freezeToValue()}});
+}
+
+void DocumentSourceFacet::addInvolvedCollections(std::vector<NamespaceString>* collections) const {
+ for (auto&& facet : _facetPipelines) {
+ for (auto&& source : facet.second->getSources()) {
+ source->addInvolvedCollections(collections);
+ }
+ }
+}
+
+intrusive_ptr<DocumentSource> DocumentSourceFacet::optimize() {
+ for (auto&& facet : _facetPipelines) {
+ facet.second->optimizePipeline();
+ }
+ return this;
+}
+
+void DocumentSourceFacet::doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) {
+ for (auto&& facet : _facetPipelines) {
+ for (auto&& stage : facet.second->getSources()) {
+ if (auto stageNeedingMongod = dynamic_cast<DocumentSourceNeedsMongod*>(stage.get())) {
+ stageNeedingMongod->injectMongodInterface(mongod);
+ }
+ }
+ }
+}
+
+void DocumentSourceFacet::doDetachFromOperationContext() {
+ for (auto&& facet : _facetPipelines) {
+ facet.second->detachFromOperationContext();
+ }
+}
+
+void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) {
+ for (auto&& facet : _facetPipelines) {
+ facet.second->reattachToOperationContext(opCtx);
+ }
+}
+
+intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(40169,
+ str::stream() << "the $facet specification must be a non-empty object, but found: "
+ << elem,
+ elem.type() == BSONType::Object && !elem.embeddedObject().isEmpty());
+
+ StringMap<intrusive_ptr<Pipeline>> facetPipelines;
+ for (auto&& facetElem : elem.embeddedObject()) {
+ const auto facetName = facetElem.fieldNameStringData();
+ FieldPath::uassertValidFieldName(facetName.toString());
+ uassert(40170,
+ str::stream() << "arguments to $facet must be arrays, " << facetName << " is type "
+ << typeName(facetElem.type()),
+ facetElem.type() == BSONType::Array);
+
+ vector<BSONObj> rawPipeline;
+ for (auto&& subPipeElem : facetElem.Obj()) {
+ uassert(40171,
+ str::stream() << "elements of arrays in $facet spec must be objects, "
+ << facetName
+ << " argument contained an element of type "
+ << typeName(subPipeElem.type()),
+ subPipeElem.type() == BSONType::Object);
+
+ rawPipeline.push_back(subPipeElem.embeddedObject());
+ }
+
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
+
+ uassert(40172,
+ str::stream() << "sub-pipelines in $facet stage cannot be empty: "
+ << facetElem.toString(),
+ !pipeline->getSources().empty());
+
+ // Disallow $out stages, $facet stages, and any stages that need to be the first stage in
+ // the pipeline.
+ for (auto&& stage : pipeline->getSources()) {
+ if ((dynamic_cast<DocumentSourceOut*>(stage.get())) ||
+ (dynamic_cast<DocumentSourceFacet*>(stage.get())) ||
+ (stage->isValidInitialSource())) {
+ uasserted(40173,
+ str::stream() << stage->getSourceName()
+ << " is not allowed to be used within a $facet stage: "
+ << facetElem.toString());
+ }
+ }
+
+ facetPipelines[facetName] = pipeline;
+ }
+
+ return new DocumentSourceFacet(std::move(facetPipelines), expCtx);
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h
new file mode 100644
index 00000000000..e6ef1bb8496
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_facet.h
@@ -0,0 +1,108 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/optional.hpp>
+#include <memory>
+#include <vector>
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/util/string_map.h"
+
+namespace mongo {
+
+class BSONElement;
+class TeeBuffer;
+class DocumentSourceTeeConsumer;
+struct ExpressionContext;
+class NamespaceString;
+class Pipeline;
+
+/**
+ * A $facet stage contains multiple sub-pipelines. Each input to the $facet stage will feed into
+ * each of the sub-pipelines. The $facet stage is blocking, and outputs only one document,
+ * containing an array of results for each sub-pipeline.
+ *
+ * For example, {$facet: {facetA: [{$skip: 1}], facetB: [{$limit: 1}]}} would describe a $facet
+ * stage which will produce a document like the following:
+ * {facetA: [<all input documents except the first one>], facetB: [<the first document>]}.
+ *
+ * TODO SERVER-24154: Should inherit from SplittableDocumentSource so that it can split in a sharded
+ * cluster.
+ */
+class DocumentSourceFacet final : public DocumentSourceNeedsMongod {
+public:
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ static boost::intrusive_ptr<DocumentSourceFacet> create(
+ StringMap<boost::intrusive_ptr<Pipeline>> facetPipelines,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ /**
+ * Blocking call. Will consume all input and produces one output document.
+ */
+ boost::optional<Document> getNext() final;
+
+ /**
+ * Optimizes inner pipelines.
+ */
+ boost::intrusive_ptr<DocumentSource> optimize() final;
+
+ // TODO SERVER-24640: implement getDependencies() to take a union of all dependencies of
+ // sub-pipelines.
+
+ const char* getSourceName() const final {
+ return "$facet";
+ }
+
+ /**
+ * Sets 'source' as the source of '_teeBuffer'.
+ */
+ void setSource(DocumentSource* source) final;
+
+ // The following are overridden just to forward calls to sub-pipelines.
+ void addInvolvedCollections(std::vector<NamespaceString>* collections) const final;
+ void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final;
+ void doDetachFromOperationContext() final;
+ void doReattachToOperationContext(OperationContext* opCtx) final;
+
+private:
+ DocumentSourceFacet(StringMap<boost::intrusive_ptr<Pipeline>> facetPipelines,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+ Value serialize(bool explain = false) const final;
+
+ boost::intrusive_ptr<TeeBuffer> _teeBuffer;
+ StringMap<boost::intrusive_ptr<Pipeline>> _facetPipelines;
+
+ bool _done = false;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
new file mode 100644
index 00000000000..393f97632aa
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -0,0 +1,346 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_facet.h"
+
+#include <deque>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/bsontypes.h"
+#include "mongo/bson/json.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+// Crutch.
+bool isMongos() {
+ return false;
+}
+
+namespace {
+
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceFacetTest = AggregationContextFixture;
+
+//
+// Parsing and serialization.
+//
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectNonObjectSpec) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet"
+ << "string");
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << 1);
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << BSON_ARRAY(1 << 2));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyObject) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSONObj());
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithInvalidNames) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSON("" << BSON_ARRAY(BSON("$skip" << 4))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << BSON("a.b" << BSON_ARRAY(BSON("$skip" << 4))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << BSON("$a" << BSON_ARRAY(BSON("$skip" << 4))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectNonArrayFacets) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSON("a" << 1));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 4)) << "b" << 2));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSON("a" << BSONArray()));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 4)) << "b" << BSONArray()));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithStagesThatMustBeTheFirstStage) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$indexStats" << BSONObj()))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << BSON(
+ "a" << BSON_ARRAY(BSON("$limit" << 1) << BSON("$indexStats" << BSONObj()))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsContainingAnOutStage) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$out"
+ << "out_collection"))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec =
+ BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 1) << BSON("$out"
+ << "out_collection"))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$out"
+ << "out_collection")
+ << BSON("$skip" << 1))));
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsContainingAFacetStage) {
+ auto ctx = getExpCtx();
+ auto spec = fromjson("{$facet: {a: [{$facet: {a: [{$skip: 2}]}}]}}");
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = fromjson("{$facet: {a: [{$skip: 2}, {$facet: {a: [{$skip: 2}]}}]}}");
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+
+ spec = fromjson("{$facet: {a: [{$skip: 2}], b: [{$facet: {a: [{$skip: 2}]}}]}}");
+ ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldAcceptLegalSpecification) {
+ auto ctx = getExpCtx();
+ auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 4)) << "b"
+ << BSON_ARRAY(BSON("$limit" << 3))));
+ auto facetStage = DocumentSourceFacet::createFromBson(spec.firstElement(), ctx);
+ ASSERT_TRUE(facetStage.get());
+}
+
+//
+// Evaluation.
+//
+
+/**
+ * A dummy DocumentSource which just passes all input along to the next stage.
+ */
+class DocumentSourcePassthrough : public DocumentSourceMock {
+public:
+ DocumentSourcePassthrough(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceMock({}, expCtx) {}
+
+ // We need this to be false so that it can be used in a $facet stage.
+ bool isValidInitialSource() const final {
+ return false;
+ }
+
+ boost::optional<Document> getNext() final {
+ return pSource->getNext();
+ }
+
+ static boost::intrusive_ptr<DocumentSourcePassthrough> create(
+ boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourcePassthrough(expCtx);
+ }
+};
+
+TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) {
+ auto ctx = getExpCtx();
+
+ auto dummy = DocumentSourcePassthrough::create(ctx);
+
+ auto statusWithPipeline = Pipeline::create({dummy}, ctx);
+ ASSERT_OK(statusWithPipeline.getStatus());
+ auto pipeline = std::move(statusWithPipeline.getValue());
+
+ auto facetStage = DocumentSourceFacet::create({{"results", pipeline}}, ctx);
+
+ std::deque<Document> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}};
+ auto mock = DocumentSourceMock::create(inputs);
+ facetStage->setSource(mock.get());
+
+ auto output = facetStage->getNext();
+ ASSERT_TRUE(output);
+ ASSERT_EQ(*output, Document(fromjson("{results: [{_id: 0}, {_id: 1}]}")));
+
+ // Should be exhausted now.
+ ASSERT_FALSE(facetStage->getNext());
+ ASSERT_FALSE(facetStage->getNext());
+ ASSERT_FALSE(facetStage->getNext());
+}
+
+TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) {
+ auto ctx = getExpCtx();
+
+ auto firstDummy = DocumentSourcePassthrough::create(ctx);
+ auto firstPipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx));
+
+ auto secondDummy = DocumentSourcePassthrough::create(ctx);
+ auto secondPipeline = uassertStatusOK(Pipeline::create({secondDummy}, ctx));
+
+ auto facetStage =
+ DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx);
+
+ std::deque<Document> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}};
+ auto mock = DocumentSourceMock::create(inputs);
+ facetStage->setSource(mock.get());
+
+ auto output = facetStage->getNext();
+
+ // The output fields are in no guaranteed order.
+ std::vector<Value> expectedOutputs(inputs.begin(), inputs.end());
+ ASSERT_TRUE(output);
+ ASSERT_EQ((*output).size(), 2UL);
+ ASSERT_EQ((*output)["first"], Value(expectedOutputs));
+ ASSERT_EQ((*output)["second"], Value(expectedOutputs));
+
+ // Should be exhausted now.
+ ASSERT_FALSE(facetStage->getNext());
+ ASSERT_FALSE(facetStage->getNext());
+ ASSERT_FALSE(facetStage->getNext());
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSubPipeline) {
+ auto ctx = getExpCtx();
+
+ auto firstDummy = DocumentSourcePassthrough::create(ctx);
+ auto secondDummy = DocumentSourcePassthrough::create(ctx);
+ auto pipeline = uassertStatusOK(Pipeline::create({firstDummy, secondDummy}, ctx));
+
+ auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx);
+
+ std::deque<Document> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}};
+ auto mock = DocumentSourceMock::create(inputs);
+ facetStage->setSource(mock.get());
+
+ auto output = facetStage->getNext();
+ ASSERT_TRUE(output);
+ ASSERT_EQ(*output, Document(fromjson("{subPipe: [{_id: 0}, {_id: 1}]}")));
+}
+
+//
+// Miscellaneous.
+//
+
+TEST_F(DocumentSourceFacetTest, ShouldBeAbleToReParseSerializedStage) {
+ auto ctx = getExpCtx();
+
+ // Create a facet stage like the following:
+ // {$facet: {
+ // skippedOne: [{$skip: 1}],
+ // skippedTwo: [{$skip: 2}]
+ // }}
+ auto firstSkip = DocumentSourceSkip::create(ctx);
+ firstSkip->setSkip(1);
+ auto firstPipeline = uassertStatusOK(Pipeline::create({firstSkip}, ctx));
+
+ auto secondSkip = DocumentSourceSkip::create(ctx);
+ secondSkip->setSkip(2);
+ auto secondPipeline = uassertStatusOK(Pipeline::create({secondSkip}, ctx));
+
+ auto facetStage = DocumentSourceFacet::create(
+ {{"skippedOne", firstPipeline}, {"skippedTwo", secondPipeline}}, ctx);
+
+ // Serialize the facet stage.
+ std::vector<Value> serialization;
+ facetStage->serializeToArray(serialization);
+ ASSERT_EQ(serialization.size(), 1UL);
+ ASSERT_EQ(serialization[0].getType(), BSONType::Object);
+
+ // The fields are in no guaranteed order, so we can't make a simple Document comparison.
+ ASSERT_EQ(serialization[0].getDocument().size(), 1UL);
+ ASSERT_EQ(serialization[0].getDocument()["$facet"].getType(), BSONType::Object);
+
+ // Should have two fields: "skippedOne" and "skippedTwo".
+ auto serializedStage = serialization[0].getDocument()["$facet"].getDocument();
+ ASSERT_EQ(serializedStage.size(), 2UL);
+ ASSERT_EQ(serializedStage["skippedOne"],
+ Value(std::vector<Value>{Value(Document{{"$skip", 1}})}));
+ ASSERT_EQ(serializedStage["skippedTwo"],
+ Value(std::vector<Value>{Value(Document{{"$skip", 2}})}));
+
+ auto serializedBson = serialization[0].getDocument().toBson();
+ auto roundTripped = DocumentSourceFacet::createFromBson(serializedBson.firstElement(), ctx);
+
+ // Serialize one more time to make sure we get the same thing.
+ std::vector<Value> newSerialization;
+ roundTripped->serializeToArray(newSerialization);
+
+ ASSERT_EQ(newSerialization.size(), 1UL);
+ ASSERT_EQ(newSerialization[0], serialization[0]);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) {
+ auto ctx = getExpCtx();
+
+ auto dummy = DocumentSourcePassthrough::create(ctx);
+ auto pipeline = uassertStatusOK(Pipeline::create({dummy}, ctx));
+
+ auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx);
+
+ ASSERT_FALSE(dummy->isOptimized);
+ facetStage->optimize();
+ ASSERT_TRUE(dummy->isOptimized);
+}
+
+TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) {
+ auto ctx = getExpCtx();
+
+ auto firstDummy = DocumentSourcePassthrough::create(ctx);
+ auto firstPipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx));
+
+ auto secondDummy = DocumentSourcePassthrough::create(ctx);
+ auto secondPipeline = uassertStatusOK(Pipeline::create({secondDummy}, ctx));
+
+ auto facetStage =
+ DocumentSourceFacet::create({{"one", firstPipeline}, {"two", secondPipeline}}, ctx);
+
+ // Test detaching.
+ ASSERT_FALSE(firstDummy->isDetachedFromOpCtx);
+ ASSERT_FALSE(secondDummy->isDetachedFromOpCtx);
+ facetStage->doDetachFromOperationContext();
+ ASSERT_TRUE(firstDummy->isDetachedFromOpCtx);
+ ASSERT_TRUE(secondDummy->isDetachedFromOpCtx);
+
+ // Test reattaching.
+ facetStage->doReattachToOperationContext(ctx->opCtx);
+ ASSERT_FALSE(firstDummy->isDetachedFromOpCtx);
+ ASSERT_FALSE(secondDummy->isDetachedFromOpCtx);
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp
index 3db28967636..26152f07674 100644
--- a/src/mongo/db/pipeline/document_source_mock.cpp
+++ b/src/mongo/db/pipeline/document_source_mock.cpp
@@ -38,6 +38,10 @@ using boost::intrusive_ptr;
DocumentSourceMock::DocumentSourceMock(std::deque<Document> docs)
: DocumentSource(NULL), queue(std::move(docs)) {}
+DocumentSourceMock::DocumentSourceMock(std::deque<Document> docs,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx), queue(std::move(docs)) {}
+
const char* DocumentSourceMock::getSourceName() const {
return "mock";
}
@@ -47,7 +51,7 @@ Value DocumentSourceMock::serialize(bool explain) const {
}
void DocumentSourceMock::dispose() {
- disposed = true;
+ isDisposed = true;
}
intrusive_ptr<DocumentSourceMock> DocumentSourceMock::create(std::deque<Document> docs) {
@@ -77,7 +81,8 @@ intrusive_ptr<DocumentSourceMock> DocumentSourceMock::create(
}
boost::optional<Document> DocumentSourceMock::getNext() {
- invariant(!disposed);
+ invariant(!isDisposed);
+ invariant(!isDetachedFromOpCtx);
if (queue.empty()) {
return {};
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.cpp b/src/mongo/db/pipeline/document_source_tee_consumer.cpp
new file mode 100644
index 00000000000..321d228b7cc
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.cpp
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_tee_consumer.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/expression_context.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+
+DocumentSourceTeeConsumer::DocumentSourceTeeConsumer(const intrusive_ptr<ExpressionContext>& expCtx,
+ const intrusive_ptr<TeeBuffer>& bufferSource)
+ : DocumentSource(expCtx), _bufferSource(bufferSource), _iterator() {}
+
+boost::intrusive_ptr<DocumentSourceTeeConsumer> DocumentSourceTeeConsumer::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<TeeBuffer>& bufferSource) {
+ return new DocumentSourceTeeConsumer(expCtx, bufferSource);
+}
+
+boost::optional<Document> DocumentSourceTeeConsumer::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ if (!_initialized) {
+ _bufferSource->populate();
+ _initialized = true;
+ _iterator = _bufferSource->begin();
+ }
+
+ if (_iterator == _bufferSource->end()) {
+ return boost::none;
+ }
+
+ return {*_iterator++};
+}
+
+void DocumentSourceTeeConsumer::dispose() {
+ // Release our reference to the buffer. We shouldn't call dispose() on the buffer, since there
+ // might be other consumers that need to use it.
+ _bufferSource.reset();
+}
+
+Value DocumentSourceTeeConsumer::serialize(bool explain) const {
+ // This stage will be inserted into the beginning of a pipeline, but should not show up in the
+ // explain output.
+ return Value();
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h
new file mode 100644
index 00000000000..d274f3995f5
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.h
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/optional.hpp>
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/tee_buffer.h"
+
+namespace mongo {
+
+class Document;
+struct ExpressionContext;
+class Value;
+
+/**
+ * This stage acts as a proxy between a pipeline within a $facet stage and the buffer of incoming
+ * documents held in a TeeBuffer stage. It will simply open an iterator on the TeeBuffer stage, and
+ * answer calls to getNext() by advancing said iterator.
+ */
+class DocumentSourceTeeConsumer : public DocumentSource {
+public:
+ static boost::intrusive_ptr<DocumentSourceTeeConsumer> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<TeeBuffer>& bufferSource);
+
+ void dispose() final;
+ boost::optional<Document> getNext() final;
+
+ Value serialize(bool explain = false) const final;
+
+private:
+ DocumentSourceTeeConsumer(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<TeeBuffer>& bufferSource);
+
+ bool _initialized = false;
+ boost::intrusive_ptr<TeeBuffer> _bufferSource;
+ TeeBuffer::const_iterator _iterator;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/field_path.h b/src/mongo/db/pipeline/field_path.h
index 6cec754815c..3f26891cd2b 100644
--- a/src/mongo/db/pipeline/field_path.h
+++ b/src/mongo/db/pipeline/field_path.h
@@ -94,10 +94,10 @@ public:
*/
FieldPath tail() const;
-private:
/** Uassert if a field name does not pass validation. */
static void uassertValidFieldName(const std::string& fieldName);
+private:
/**
* Push a new field name to the back of the vector of names comprising the field path.
* Uassert if 'fieldName' does not pass validation.
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 2dc5ffc7899..3b76d574cc7 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -59,6 +59,9 @@ namespace dps = ::mongo::dotted_path_support;
Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
+Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx)
+ : _sources(stages), pCtx(expCtx) {}
+
StatusWith<intrusive_ptr<Pipeline>> Pipeline::parse(
const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
intrusive_ptr<Pipeline> pipeline(new Pipeline(expCtx));
@@ -73,9 +76,18 @@ StatusWith<intrusive_ptr<Pipeline>> Pipeline::parse(
if (!status.isOK()) {
return status;
}
+ pipeline->stitch();
+ return pipeline;
+}
- pipeline->optimizePipeline();
-
+StatusWith<intrusive_ptr<Pipeline>> Pipeline::create(
+ SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) {
+ intrusive_ptr<Pipeline> pipeline(new Pipeline(stages, expCtx));
+ auto status = pipeline->ensureAllStagesAreInLegalPositions();
+ if (!status.isOK()) {
+ return status;
+ }
+ pipeline->stitch();
return pipeline;
}
@@ -391,6 +403,9 @@ vector<Value> Pipeline::writeExplainOps() const {
}
void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) {
+ if (!_sources.empty()) {
+ _sources.front()->setSource(source.get());
+ }
_sources.push_front(source);
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 84a8333ff8e..66bef44c324 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -58,13 +58,23 @@ public:
/**
* Parses a Pipeline from a BSONElement representing a list of DocumentSources. Returns a non-OK
- * status if it failed to parse.
+ * status if it failed to parse. The returned pipeline is not optimized, but the caller may
+ * convert it to an optimized pipeline by calling optimizePipeline().
*/
static StatusWith<boost::intrusive_ptr<Pipeline>> parse(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
+ * Creates a Pipeline from an existing SourceContainer.
+ *
+ * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage
+ * is present but is not the last stage.
+ */
+ static StatusWith<boost::intrusive_ptr<Pipeline>> create(
+ SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ /**
* Helper to implement Command::checkAuthForCommand.
*/
static Status checkAuthForCommand(ClientBasic* client,
@@ -172,6 +182,9 @@ public:
*/
DepsTracker getDependencies(const BSONObj& initialQuery) const;
+ const SourceContainer& getSources() {
+ return _sources;
+ }
/*
PipelineD is a "sister" class that has additional functionality
@@ -198,6 +211,7 @@ private:
friend class Optimizations::Sharded;
Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx);
+ Pipeline(SourceContainer stages, const boost::intrusive_ptr<ExpressionContext>& pCtx);
/**
* Stitch together the source pointers by calling setSource() for each source in '_sources'.
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 1bda01e3968..c08278c4737 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -89,6 +89,7 @@ public:
AggregationRequest request(NamespaceString("a.collection"), rawPipeline);
intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(&_opCtx, request);
auto outputPipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx));
+ outputPipe->optimizePipeline();
ASSERT_EQUALS(Value(outputPipe->writeExplainOps()), Value(outputPipeExpected["pipeline"]));
ASSERT_EQUALS(Value(outputPipe->serialize()), Value(serializePipeExpected["pipeline"]));
@@ -735,6 +736,7 @@ public:
AggregationRequest request(NamespaceString("a.collection"), rawPipeline);
intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(&_opCtx, request);
mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx));
+ mergePipe->optimizePipeline();
shardPipe = mergePipe->splitForSharded();
ASSERT(shardPipe != nullptr);
diff --git a/src/mongo/db/pipeline/tee_buffer.cpp b/src/mongo/db/pipeline/tee_buffer.cpp
new file mode 100644
index 00000000000..9119665748e
--- /dev/null
+++ b/src/mongo/db/pipeline/tee_buffer.cpp
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/tee_buffer.h"
+
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/value.h"
+
+namespace mongo {
+
+TeeBuffer::TeeBuffer(uint64_t maxMemoryUsageBytes)
+ : _maxMemoryUsageBytes(maxMemoryUsageBytes), _buffer() {}
+
+boost::intrusive_ptr<TeeBuffer> TeeBuffer::create(uint64_t maxMemoryUsageBytes) {
+ return new TeeBuffer(maxMemoryUsageBytes);
+}
+
+TeeBuffer::const_iterator TeeBuffer::begin() const {
+ invariant(_populated);
+ return _buffer.begin();
+}
+
+TeeBuffer::const_iterator TeeBuffer::end() const {
+ invariant(_populated);
+ return _buffer.end();
+}
+
+void TeeBuffer::dispose() {
+ _buffer.clear();
+ _populated = false; // Set this to ensure no one is calling begin() or end().
+}
+
+void TeeBuffer::populate() {
+ invariant(_source);
+ if (_populated) {
+ return;
+ }
+ _populated = true;
+
+ size_t estimatedMemoryUsageBytes = 0;
+ while (auto next = _source->getNext()) {
+ estimatedMemoryUsageBytes += next->getApproximateSize();
+ uassert(40174,
+ "Exceeded memory limit for $facet",
+ estimatedMemoryUsageBytes <= _maxMemoryUsageBytes);
+
+ _buffer.emplace_back(std::move(*next));
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/tee_buffer.h b/src/mongo/db/pipeline/tee_buffer.h
new file mode 100644
index 00000000000..57ce3a611d7
--- /dev/null
+++ b/src/mongo/db/pipeline/tee_buffer.h
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+class Document;
+struct ExpressionContext;
+class Value;
+
+/**
+ * This stage takes a stream of input documents and makes them available to multiple consumers. To
+ * do so, it will buffer all incoming documents up to the configured memory limit, then provide
+ * access to that buffer via an iterator.
+ *
+ * TODO SERVER-24153: This stage should be able to spill to disk if allowed to and the memory limit
+ * has been exceeded.
+ */
+class TeeBuffer : public RefCountable {
+public:
+ using const_iterator = std::vector<Document>::const_iterator;
+
+ static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024;
+
+ static boost::intrusive_ptr<TeeBuffer> create(
+ uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes);
+
+ void setSource(const boost::intrusive_ptr<DocumentSource>& source) {
+ _source = source;
+ }
+
+ /**
+ * Clears '_buffer'. Once dispose() is called, all iterators are invalid, and it is illegal to
+ * call begin() or end().
+ */
+ void dispose();
+
+ /**
+ * Populates the buffer by consuming all input from 'pSource'. This must be called before
+ * calling begin() or end().
+ */
+ void populate();
+
+ const_iterator begin() const;
+ const_iterator end() const;
+
+private:
+ TeeBuffer(uint64_t maxMemoryUsageBytes);
+
+ bool _populated = false;
+ uint64_t _maxMemoryUsageBytes;
+ std::vector<Document> _buffer;
+ boost::intrusive_ptr<DocumentSource> _source;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/tee_buffer_test.cpp b/src/mongo/db/pipeline/tee_buffer_test.cpp
new file mode 100644
index 00000000000..9fb2b56f1fc
--- /dev/null
+++ b/src/mongo/db/pipeline/tee_buffer_test.cpp
@@ -0,0 +1,138 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/tee_buffer.h"
+
+#include "mongo/db/pipeline/document.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+// Crutch.
+bool isMongos() {
+ return false;
+}
+
+namespace {
+
+TEST(TeeBufferTest, ShouldProduceEmptyIteratorsWhenGivenNoInput) {
+ auto mock = DocumentSourceMock::create();
+ auto teeBuffer = TeeBuffer::create();
+ teeBuffer->setSource(mock.get());
+ teeBuffer->populate();
+
+ // There are no inputs, so begin() should equal end().
+ ASSERT(teeBuffer->begin() == teeBuffer->end());
+}
+
+TEST(TeeBufferTest, ShouldProvideIteratorOverSingleDocument) {
+ auto inputDoc = Document{{"a", 1}};
+ auto mock = DocumentSourceMock::create(inputDoc);
+ auto teeBuffer = TeeBuffer::create();
+ teeBuffer->setSource(mock.get());
+ teeBuffer->populate();
+
+ // Should be able to establish an iterator and get the document back.
+ auto it = teeBuffer->begin();
+ ASSERT(it != teeBuffer->end());
+ ASSERT_EQ(*it, inputDoc);
+ ++it;
+ ASSERT(it == teeBuffer->end());
+}
+
+TEST(TeeBufferTest, ShouldProvideIteratorOverTwoDocuments) {
+ std::deque<Document> inputDocs = {Document{{"a", 1}}, Document{{"a", 2}}};
+ auto mock = DocumentSourceMock::create(inputDocs);
+ auto teeBuffer = TeeBuffer::create();
+ teeBuffer->setSource(mock.get());
+ teeBuffer->populate();
+
+ auto it = teeBuffer->begin();
+ ASSERT(it != teeBuffer->end());
+ ASSERT_EQ(*it, inputDocs.front());
+ ++it;
+ ASSERT(it != teeBuffer->end());
+ ASSERT_EQ(*it, inputDocs.back());
+ ++it;
+ ASSERT(it == teeBuffer->end());
+}
+
+TEST(TeeBufferTest, ShouldBeAbleToProvideMultipleIteratorsOverTheSameInputs) {
+ std::deque<Document> inputDocs = {Document{{"a", 1}}, Document{{"a", 2}}};
+ auto mock = DocumentSourceMock::create(inputDocs);
+ auto teeBuffer = TeeBuffer::create();
+ teeBuffer->setSource(mock.get());
+ teeBuffer->populate();
+
+ auto firstIt = teeBuffer->begin();
+ auto secondIt = teeBuffer->begin();
+
+ // Advance both once.
+ ASSERT(firstIt != teeBuffer->end());
+ ASSERT_EQ(*firstIt, inputDocs.front());
+ ++firstIt;
+ ASSERT(secondIt != teeBuffer->end());
+ ASSERT_EQ(*secondIt, inputDocs.front());
+ ++secondIt;
+
+ // Advance them both again.
+ ASSERT(firstIt != teeBuffer->end());
+ ASSERT_EQ(*firstIt, inputDocs.back());
+ ++firstIt;
+ ASSERT(secondIt != teeBuffer->end());
+ ASSERT_EQ(*secondIt, inputDocs.back());
+ ++secondIt;
+
+ // Assert they've both reached the end.
+ ASSERT(firstIt == teeBuffer->end());
+ ASSERT(secondIt == teeBuffer->end());
+}
+
+TEST(TeeBufferTest, ShouldErrorWhenBufferingTooManyDocuments) {
+ // Queue up at least 2000 bytes of input from a mock stage.
+ std::deque<Document> inputs;
+ auto largeStr = std::string(1000, 'y');
+ auto inputDoc = Document{{"x", largeStr}};
+ ASSERT_GTE(inputDoc.getApproximateSize(), 1000UL);
+ inputs.push_back(inputDoc);
+ inputs.push_back(Document{{"x", largeStr}});
+ auto mock = DocumentSourceMock::create(inputs);
+
+ const uint64_t maxMemoryUsageBytes = 1000;
+ auto teeBuffer = TeeBuffer::create(maxMemoryUsageBytes);
+ teeBuffer->setSource(mock.get());
+
+ // Should exceed the configured memory limit.
+ ASSERT_THROWS(teeBuffer->populate(), UserException);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 46cf44b9570..0dea97cdf7f 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -127,11 +127,12 @@ public:
mergeCtx->inRouter = true;
// explicitly *not* setting mergeCtx->tempDir
- // Parse the pipeline specification.
+ // Parse and optimize the pipeline specification.
auto pipeline = Pipeline::parse(request.getValue().getPipeline(), mergeCtx);
if (!pipeline.isOK()) {
return appendCommandStatus(result, pipeline.getStatus());
}
+ pipeline.getValue()->optimizePipeline();
for (auto&& ns : pipeline.getValue()->getInvolvedCollections()) {
uassert(