diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-06-09 19:09:42 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-06-24 11:51:21 -0400 |
commit | 46b4dfbdefe018e1f125a5ebac8b584b20274502 (patch) | |
tree | 9c6fea4905fdb235833d2f2010231dbacdcffb2f | |
parent | 20e9b2798d69fb2367ff9f16a1b30f4f9b73d93b (diff) | |
download | mongo-46b4dfbdefe018e1f125a5ebac8b584b20274502.tar.gz |
SERVER-23654 Add $facet aggregation stage
21 files changed, 1485 insertions, 10 deletions
diff --git a/jstests/aggregation/sources/facet/inner_graphlookup.js b/jstests/aggregation/sources/facet/inner_graphlookup.js new file mode 100644 index 00000000000..42d10149240 --- /dev/null +++ b/jstests/aggregation/sources/facet/inner_graphlookup.js @@ -0,0 +1,39 @@ +/** + * Tests that using a $graphLookup stage inside of a $facet stage will yield the same results as + * using the $graphLookup stage outside of the $facet stage. + */ +(function() { + "use strict"; + + // We will only use one collection, the $graphLookup will look up from the same collection. + var graphColl = db.facetGraphLookup; + + // The graph in ASCII form: 0 --- 1 --- 2 3 + graphColl.drop(); + assert.writeOK(graphColl.insert({_id: 0, edges: [1]})); + assert.writeOK(graphColl.insert({_id: 1, edges: [0, 2]})); + assert.writeOK(graphColl.insert({_id: 2, edges: [1]})); + assert.writeOK(graphColl.insert({_id: 3})); + + // For each document in the collection, this will compute all the other documents that are + // reachable from this one. + const graphLookupStage = { + $graphLookup: { + from: graphColl.getName(), + startWith: "$_id", + connectFromField: "edges", + connectToField: "_id", + as: "connected" + } + }; + const normalResults = graphColl.aggregate([graphLookupStage]).toArray(); + const facetedResults = graphColl.aggregate([{$facet: {nested: [graphLookupStage]}}]).toArray(); + assert.eq(facetedResults, [{nested: normalResults}]); + + const normalResultsUnwound = + graphColl.aggregate([graphLookupStage, {$unwind: "$connected"}]).toArray(); + const facetedResultsUnwound = + graphColl.aggregate([{$facet: {nested: [graphLookupStage, {$unwind: "$connected"}]}}]) + .toArray(); + assert.eq(facetedResultsUnwound, [{nested: normalResultsUnwound}]); +}()); diff --git a/jstests/aggregation/sources/facet/inner_lookup.js b/jstests/aggregation/sources/facet/inner_lookup.js new file mode 100644 index 00000000000..f4890324b65 --- /dev/null +++ b/jstests/aggregation/sources/facet/inner_lookup.js @@ -0,0 +1,32 @@ +/** + * Tests that using a $lookup stage inside of a $facet stage will yield the same results as using + * the $lookup stage outside of the $facet stage. + */ +(function() { + "use strict"; + + var local = db.facetLookupLocal; + var foreign = db.facetLookupForeign; + + local.drop(); + assert.writeOK(local.insert({_id: 0})); + assert.writeOK(local.insert({_id: 1})); + + foreign.drop(); + assert.writeOK(foreign.insert({_id: 0, foreignKey: 0})); + assert.writeOK(foreign.insert({_id: 1, foreignKey: 1})); + assert.writeOK(foreign.insert({_id: 2, foreignKey: 2})); + + const lookupStage = { + $lookup: + {from: foreign.getName(), localField: "_id", foreignField: "foreignKey", as: "joined"} + }; + const lookupResults = local.aggregate([lookupStage]).toArray(); + const facetedLookupResults = local.aggregate([{$facet: {nested: [lookupStage]}}]).toArray(); + assert.eq(facetedLookupResults, [{nested: lookupResults}]); + + const lookupResultsUnwound = local.aggregate([lookupStage, {$unwind: "$joined"}]).toArray(); + const facetedLookupResultsUnwound = + local.aggregate([{$facet: {nested: [lookupStage, {$unwind: "$joined"}]}}]).toArray(); + assert.eq(facetedLookupResultsUnwound, [{nested: lookupResultsUnwound}]); +}()); diff --git a/jstests/aggregation/sources/facet/use_cases.js b/jstests/aggregation/sources/facet/use_cases.js new file mode 100644 index 00000000000..1bb3cefdd40 --- /dev/null +++ b/jstests/aggregation/sources/facet/use_cases.js @@ -0,0 +1,137 @@ +/** + * Tests some practical use cases of the $facet stage. + */ +(function() { + "use strict"; + const dbName = "test"; + const collName = jsTest.name(); + + Random.setRandomSeed(); + + /** + * Helper to get a random entry out of an array. + */ + function randomChoice(array) { + return array[Random.randInt(array.length)]; + } + + /** + * Helper to generate a randomized document with the following schema: + * { + * manufacturer: <string>, + * price: <double>, + * screenSize: <double> + * } + */ + function generateRandomDocument() { + const manufacturers = + ["Sony", "Samsung", "LG", "Panasonic", "Mitsubishi", "Vizio", "Toshiba", "Sharp"]; + const minPrice = 100; + const maxPrice = 4000; + const minScreenSize = 18; + const maxScreenSize = 40; + + return { + manufacturer: randomChoice(manufacturers), + price: Random.randInt(maxPrice - minPrice + 1) + minPrice, + screenSize: Random.randInt(maxScreenSize - minScreenSize + 1) + minScreenSize, + }; + } + + function doExecutionTest(conn) { + var coll = conn.getDB(dbName).getCollection(collName); + coll.drop(); + + const nDocs = 1000 * 10; + var bulk = coll.initializeUnorderedBulkOp(); + for (var i = 0; i < nDocs; i++) { + const doc = generateRandomDocument(); + bulk.insert(doc); + } + assert.writeOK(bulk.execute()); + + // + // Compute the most common manufacturers, and the number of TVs in each price range. + // + + // First compute each separately, to make sure we have the correct results. + const manufacturerPipe = + [{$group: {_id: "$manufacturer", count: {$sum: 1}}}, {$sort: {count: -1}}]; + const mostCommonManufacturers = coll.aggregate(manufacturerPipe).toArray(); + + const pricePipe = [ + { + $project: { + priceBucket: { + $switch: { + branches: [ + {case: {$lt: ["$price", 500]}, then: "< 500"}, + {case: {$lt: ["$price", 1000]}, then: "500-1000"}, + {case: {$lt: ["$price", 1500]}, then: "1000-1500"}, + {case: {$lt: ["$price", 2000]}, then: "1500-2000"} + ], + default: "> 2000" + } + } + } + }, + {$group: {_id: "$priceBucket", count: {$sum: 1}}}, + {$sort: {count: -1}} + ]; + const numTVsByPriceRange = coll.aggregate(pricePipe).toArray(); + + // Then compute the results using $facet. + const facetResult = + coll.aggregate([{$facet: {manufacturers: manufacturerPipe, prices: pricePipe}}]) + .toArray(); + assert.eq(facetResult.length, 1); + const facetManufacturers = facetResult[0].manufacturers; + const facetPrices = facetResult[0].prices; + + // Then assert they are the same. + assert.eq(facetManufacturers, mostCommonManufacturers); + assert.eq(facetPrices, numTVsByPriceRange); + } + + // Test against the standalone started by resmoke.py. + const conn = db.getMongo(); + doExecutionTest(conn); + + // Test against a sharded cluster. + const st = new ShardingTest({shards: 2}); + doExecutionTest(st.s0); + + // Test that $facet stage propagates information about involved collections, preventing users + // from doing things like $lookup from a sharded collection. + const shardedDBName = "sharded"; + const shardedCollName = "collection"; + const shardedColl = st.getDB(shardedDBName).getCollection(shardedCollName); + const unshardedColl = st.getDB(shardedDBName).getCollection(collName); + + assert.commandWorked(st.admin.runCommand({enableSharding: shardedDBName})); + assert.commandWorked( + st.admin.runCommand({shardCollection: shardedColl.getFullName(), key: {_id: 1}})); + assert.commandFailed(unshardedColl.runCommand({ + aggregate: unshardedColl, + pipline: [{ + $lookup: + {from: shardedCollName, localField: "_id", foreignField: "_id", as: "results"} + }] + })); + assert.commandFailed(unshardedColl.runCommand({ + aggregate: unshardedColl, + pipeline: [{ + $facet: { + a: [{ + $lookup: { + from: shardedCollName, + localField: "_id", + foreignField: "_id", + as: "results" + } + }] + } + }] + })); + st.stop(); +}()); diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 7d17775d507..c0b0e5fd833 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -186,6 +186,7 @@ boost::intrusive_ptr<Pipeline> reparsePipeline( fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus()); } + reparsedPipeline.getValue()->optimizePipeline(); return reparsedPipeline.getValue(); } @@ -261,6 +262,7 @@ public: return appendCommandStatus(result, statusWithPipeline.getStatus()); } auto pipeline = std::move(statusWithPipeline.getValue()); + pipeline->optimizePipeline(); if (kDebugBuild && !expCtx->isExplain && !expCtx->inShard) { // Make sure all operations round-trip through Pipeline::serialize() correctly by diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index fb8ac72df4d..32c0f82eae8 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -9,6 +9,7 @@ env.Library( LIBDEPS=[ 'aggregation_request', 'document_source', + 'document_source_facet', 'expression_context', 'pipeline', ] @@ -217,6 +218,40 @@ env.Library( ] ) +env.Library( + target='document_source_facet', + source=[ + 'document_source_facet.cpp', + 'tee_buffer.cpp', + 'document_source_tee_consumer.cpp', + ], + LIBDEPS=[ + 'document_source', + 'pipeline', + ] +) + +env.CppUnitTest( + target='document_source_facet_test', + source='document_source_facet_test.cpp', + LIBDEPS=[ + 'document_source_facet', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/query/query_test_service_context', + '$BUILD_DIR/mongo/db/service_context_noop_init', + ], +) + +env.CppUnitTest( + target='tee_buffer_test', + source='tee_buffer_test.cpp', + LIBDEPS=[ + 'document_source_facet', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/service_context_noop_init', + ], +) + env.CppUnitTest( target='agg_expression_test', source='expression_test.cpp', diff --git a/src/mongo/db/pipeline/aggregation_context_fixture.h b/src/mongo/db/pipeline/aggregation_context_fixture.h new file mode 100644 index 00000000000..75353c46e7f --- /dev/null +++ b/src/mongo/db/pipeline/aggregation_context_fixture.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <memory> + +#include "mongo/db/client.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/query/query_test_service_context.h" +#include "mongo/db/service_context_noop.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +/** + * Test fixture which provides an ExpressionContext for use in testing. + */ +class AggregationContextFixture : public unittest::Test { +public: + AggregationContextFixture() + : _queryServiceContext(stdx::make_unique<QueryTestServiceContext>()), + _opCtx(_queryServiceContext->makeOperationContext()), + _expCtx(new ExpressionContext( + _opCtx.get(), AggregationRequest(NamespaceString("unittests.pipeline_test"), {}))) {} + + boost::intrusive_ptr<ExpressionContext> getExpCtx() { + return _expCtx.get(); + } + +private: + std::unique_ptr<QueryTestServiceContext> _queryServiceContext; + ServiceContext::UniqueOperationContext _opCtx; + boost::intrusive_ptr<ExpressionContext> _expCtx; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index f0da2180f2c..df3f4eecaf5 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -360,11 +360,17 @@ public: DocumentSourceNeedsMongod(const boost::intrusive_ptr<ExpressionContext>& expCtx) : DocumentSource(expCtx) {} - virtual void injectMongodInterface(std::shared_ptr<MongodInterface> mongod) { + void injectMongodInterface(std::shared_ptr<MongodInterface> mongod) { _mongod = mongod; + doInjectMongodInterface(mongod); } - void detachFromOperationContext() final { + /** + * Derived classes may override this method to register custom inject functionality. + */ + virtual void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) {} + + void detachFromOperationContext() override { invariant(_mongod); _mongod->setOperationContext(nullptr); doDetachFromOperationContext(); @@ -871,6 +877,8 @@ private: class DocumentSourceMock : public DocumentSource { public: DocumentSourceMock(std::deque<Document> docs); + DocumentSourceMock(std::deque<Document> docs, + const boost::intrusive_ptr<ExpressionContext>& expCtx); boost::optional<Document> getNext() override; const char* getSourceName() const override; @@ -892,9 +900,24 @@ public: static boost::intrusive_ptr<DocumentSourceMock> create( const std::initializer_list<const char*>& jsons); + void reattachToOperationContext(OperationContext* opCtx) { + isDetachedFromOpCtx = false; + } + + void detachFromOperationContext() { + isDetachedFromOpCtx = true; + } + + boost::intrusive_ptr<DocumentSource> optimize() override { + isOptimized = true; + return this; + } + // Return documents from front of queue. std::deque<Document> queue; - bool disposed = false; + bool isDisposed = false; + bool isDetachedFromOpCtx = false; + bool isOptimized = false; BSONObjSet sorts; }; diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp new file mode 100644 index 00000000000..72a69ade0db --- /dev/null +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -0,0 +1,200 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_facet.h" + +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/bsontypes.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_tee_consumer.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/tee_buffer.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::vector; + +DocumentSourceFacet::DocumentSourceFacet(StringMap<intrusive_ptr<Pipeline>> facetPipelines, + const intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceNeedsMongod(expCtx), _facetPipelines(std::move(facetPipelines)) { + + // Build the tee stage, and the consumers of the tee. + _teeBuffer = TeeBuffer::create(); + for (auto&& facet : _facetPipelines) { + auto pipeline = facet.second; + pipeline->addInitialSource(DocumentSourceTeeConsumer::create(pExpCtx, _teeBuffer)); + } +} + +REGISTER_DOCUMENT_SOURCE(facet, DocumentSourceFacet::createFromBson); + +intrusive_ptr<DocumentSourceFacet> DocumentSourceFacet::create( + StringMap<intrusive_ptr<Pipeline>> facetPipelines, + const intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceFacet(std::move(facetPipelines), expCtx); +} + +void DocumentSourceFacet::setSource(DocumentSource* source) { + _teeBuffer->setSource(source); +} + +boost::optional<Document> DocumentSourceFacet::getNext() { + pExpCtx->checkForInterrupt(); + + if (_done) { + return boost::none; + } + _done = true; // We will only ever produce one result. + + // Build the results by executing each pipeline serially, one at a time. + MutableDocument results; + for (auto&& facet : _facetPipelines) { + auto facetName = facet.first; + auto facetPipeline = facet.second; + + std::vector<Value> facetResults; + while (auto next = facetPipeline->getSources().back()->getNext()) { + facetResults.emplace_back(std::move(*next)); + } + results[facetName] = Value(std::move(facetResults)); + } + + _teeBuffer->dispose(); // Clear the buffer since we'll no longer need it. + return results.freeze(); +} + +Value DocumentSourceFacet::serialize(bool explain) const { + MutableDocument serialized; + for (auto&& facet : _facetPipelines) { + serialized[facet.first] = + Value(explain ? facet.second->writeExplainOps() : facet.second->serialize()); + } + return Value(Document{{"$facet", serialized.freezeToValue()}}); +} + +void DocumentSourceFacet::addInvolvedCollections(std::vector<NamespaceString>* collections) const { + for (auto&& facet : _facetPipelines) { + for (auto&& source : facet.second->getSources()) { + source->addInvolvedCollections(collections); + } + } +} + +intrusive_ptr<DocumentSource> DocumentSourceFacet::optimize() { + for (auto&& facet : _facetPipelines) { + facet.second->optimizePipeline(); + } + return this; +} + +void DocumentSourceFacet::doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) { + for (auto&& facet : _facetPipelines) { + for (auto&& stage : facet.second->getSources()) { + if (auto stageNeedingMongod = dynamic_cast<DocumentSourceNeedsMongod*>(stage.get())) { + stageNeedingMongod->injectMongodInterface(mongod); + } + } + } +} + +void DocumentSourceFacet::doDetachFromOperationContext() { + for (auto&& facet : _facetPipelines) { + facet.second->detachFromOperationContext(); + } +} + +void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) { + for (auto&& facet : _facetPipelines) { + facet.second->reattachToOperationContext(opCtx); + } +} + +intrusive_ptr<DocumentSource> DocumentSourceFacet::createFromBson( + BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { + uassert(40169, + str::stream() << "the $facet specification must be a non-empty object, but found: " + << elem, + elem.type() == BSONType::Object && !elem.embeddedObject().isEmpty()); + + StringMap<intrusive_ptr<Pipeline>> facetPipelines; + for (auto&& facetElem : elem.embeddedObject()) { + const auto facetName = facetElem.fieldNameStringData(); + FieldPath::uassertValidFieldName(facetName.toString()); + uassert(40170, + str::stream() << "arguments to $facet must be arrays, " << facetName << " is type " + << typeName(facetElem.type()), + facetElem.type() == BSONType::Array); + + vector<BSONObj> rawPipeline; + for (auto&& subPipeElem : facetElem.Obj()) { + uassert(40171, + str::stream() << "elements of arrays in $facet spec must be objects, " + << facetName + << " argument contained an element of type " + << typeName(subPipeElem.type()), + subPipeElem.type() == BSONType::Object); + + rawPipeline.push_back(subPipeElem.embeddedObject()); + } + + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); + + uassert(40172, + str::stream() << "sub-pipelines in $facet stage cannot be empty: " + << facetElem.toString(), + !pipeline->getSources().empty()); + + // Disallow $out stages, $facet stages, and any stages that need to be the first stage in + // the pipeline. + for (auto&& stage : pipeline->getSources()) { + if ((dynamic_cast<DocumentSourceOut*>(stage.get())) || + (dynamic_cast<DocumentSourceFacet*>(stage.get())) || + (stage->isValidInitialSource())) { + uasserted(40173, + str::stream() << stage->getSourceName() + << " is not allowed to be used within a $facet stage: " + << facetElem.toString()); + } + } + + facetPipelines[facetName] = pipeline; + } + + return new DocumentSourceFacet(std::move(facetPipelines), expCtx); +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_facet.h b/src/mongo/db/pipeline/document_source_facet.h new file mode 100644 index 00000000000..e6ef1bb8496 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_facet.h @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <boost/optional.hpp> +#include <memory> +#include <vector> + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/util/string_map.h" + +namespace mongo { + +class BSONElement; +class TeeBuffer; +class DocumentSourceTeeConsumer; +struct ExpressionContext; +class NamespaceString; +class Pipeline; + +/** + * A $facet stage contains multiple sub-pipelines. Each input to the $facet stage will feed into + * each of the sub-pipelines. The $facet stage is blocking, and outputs only one document, + * containing an array of results for each sub-pipeline. + * + * For example, {$facet: {facetA: [{$skip: 1}], facetB: [{$limit: 1}]}} would describe a $facet + * stage which will produce a document like the following: + * {facetA: [<all input documents except the first one>], facetB: [<the first document>]}. + * + * TODO SERVER-24154: Should inherit from SplittableDocumentSource so that it can split in a sharded + * cluster. + */ +class DocumentSourceFacet final : public DocumentSourceNeedsMongod { +public: + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + static boost::intrusive_ptr<DocumentSourceFacet> create( + StringMap<boost::intrusive_ptr<Pipeline>> facetPipelines, + const boost::intrusive_ptr<ExpressionContext>& expCtx); + + /** + * Blocking call. Will consume all input and produces one output document. + */ + boost::optional<Document> getNext() final; + + /** + * Optimizes inner pipelines. + */ + boost::intrusive_ptr<DocumentSource> optimize() final; + + // TODO SERVER-24640: implement getDependencies() to take a union of all dependencies of + // sub-pipelines. + + const char* getSourceName() const final { + return "$facet"; + } + + /** + * Sets 'source' as the source of '_teeBuffer'. + */ + void setSource(DocumentSource* source) final; + + // The following are overridden just to forward calls to sub-pipelines. + void addInvolvedCollections(std::vector<NamespaceString>* collections) const final; + void doInjectMongodInterface(std::shared_ptr<MongodInterface> mongod) final; + void doDetachFromOperationContext() final; + void doReattachToOperationContext(OperationContext* opCtx) final; + +private: + DocumentSourceFacet(StringMap<boost::intrusive_ptr<Pipeline>> facetPipelines, + const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + + Value serialize(bool explain = false) const final; + + boost::intrusive_ptr<TeeBuffer> _teeBuffer; + StringMap<boost::intrusive_ptr<Pipeline>> _facetPipelines; + + bool _done = false; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp new file mode 100644 index 00000000000..393f97632aa --- /dev/null +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -0,0 +1,346 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_facet.h" + +#include <deque> + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/bsontypes.h" +#include "mongo/bson/json.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +// Crutch. +bool isMongos() { + return false; +} + +namespace { + +// This provides access to getExpCtx(), but we'll use a different name for this test suite. +using DocumentSourceFacetTest = AggregationContextFixture; + +// +// Parsing and serialization. +// + +TEST_F(DocumentSourceFacetTest, ShouldRejectNonObjectSpec) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" + << "string"); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << 1); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON_ARRAY(1 << 2)); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + +TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyObject) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSONObj()); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + +TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithInvalidNames) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("" << BSON_ARRAY(BSON("$skip" << 4)))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON("a.b" << BSON_ARRAY(BSON("$skip" << 4)))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON("$a" << BSON_ARRAY(BSON("$skip" << 4)))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + +TEST_F(DocumentSourceFacetTest, ShouldRejectNonArrayFacets) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("a" << 1)); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 4)) << "b" << 2)); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + +TEST_F(DocumentSourceFacetTest, ShouldRejectEmptyPipelines) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("a" << BSONArray())); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 4)) << "b" << BSONArray())); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + +TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsWithStagesThatMustBeTheFirstStage) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$indexStats" << BSONObj())))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON( + "a" << BSON_ARRAY(BSON("$limit" << 1) << BSON("$indexStats" << BSONObj())))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + +TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsContainingAnOutStage) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$out" + << "out_collection")))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = + BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 1) << BSON("$out" + << "out_collection")))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$out" + << "out_collection") + << BSON("$skip" << 1)))); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + +TEST_F(DocumentSourceFacetTest, ShouldRejectFacetsContainingAFacetStage) { + auto ctx = getExpCtx(); + auto spec = fromjson("{$facet: {a: [{$facet: {a: [{$skip: 2}]}}]}}"); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = fromjson("{$facet: {a: [{$skip: 2}, {$facet: {a: [{$skip: 2}]}}]}}"); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); + + spec = fromjson("{$facet: {a: [{$skip: 2}], b: [{$facet: {a: [{$skip: 2}]}}]}}"); + ASSERT_THROWS(DocumentSourceFacet::createFromBson(spec.firstElement(), ctx), UserException); +} + +TEST_F(DocumentSourceFacetTest, ShouldAcceptLegalSpecification) { + auto ctx = getExpCtx(); + auto spec = BSON("$facet" << BSON("a" << BSON_ARRAY(BSON("$skip" << 4)) << "b" + << BSON_ARRAY(BSON("$limit" << 3)))); + auto facetStage = DocumentSourceFacet::createFromBson(spec.firstElement(), ctx); + ASSERT_TRUE(facetStage.get()); +} + +// +// Evaluation. +// + +/** + * A dummy DocumentSource which just passes all input along to the next stage. + */ +class DocumentSourcePassthrough : public DocumentSourceMock { +public: + DocumentSourcePassthrough(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMock({}, expCtx) {} + + // We need this to be false so that it can be used in a $facet stage. + bool isValidInitialSource() const final { + return false; + } + + boost::optional<Document> getNext() final { + return pSource->getNext(); + } + + static boost::intrusive_ptr<DocumentSourcePassthrough> create( + boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourcePassthrough(expCtx); + } +}; + +TEST_F(DocumentSourceFacetTest, SingleFacetShouldReceiveAllDocuments) { + auto ctx = getExpCtx(); + + auto dummy = DocumentSourcePassthrough::create(ctx); + + auto statusWithPipeline = Pipeline::create({dummy}, ctx); + ASSERT_OK(statusWithPipeline.getStatus()); + auto pipeline = std::move(statusWithPipeline.getValue()); + + auto facetStage = DocumentSourceFacet::create({{"results", pipeline}}, ctx); + + std::deque<Document> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}}; + auto mock = DocumentSourceMock::create(inputs); + facetStage->setSource(mock.get()); + + auto output = facetStage->getNext(); + ASSERT_TRUE(output); + ASSERT_EQ(*output, Document(fromjson("{results: [{_id: 0}, {_id: 1}]}"))); + + // Should be exhausted now. + ASSERT_FALSE(facetStage->getNext()); + ASSERT_FALSE(facetStage->getNext()); + ASSERT_FALSE(facetStage->getNext()); +} + +TEST_F(DocumentSourceFacetTest, MultipleFacetsShouldSeeTheSameDocuments) { + auto ctx = getExpCtx(); + + auto firstDummy = DocumentSourcePassthrough::create(ctx); + auto firstPipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); + + auto secondDummy = DocumentSourcePassthrough::create(ctx); + auto secondPipeline = uassertStatusOK(Pipeline::create({secondDummy}, ctx)); + + auto facetStage = + DocumentSourceFacet::create({{"first", firstPipeline}, {"second", secondPipeline}}, ctx); + + std::deque<Document> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}}; + auto mock = DocumentSourceMock::create(inputs); + facetStage->setSource(mock.get()); + + auto output = facetStage->getNext(); + + // The output fields are in no guaranteed order. + std::vector<Value> expectedOutputs(inputs.begin(), inputs.end()); + ASSERT_TRUE(output); + ASSERT_EQ((*output).size(), 2UL); + ASSERT_EQ((*output)["first"], Value(expectedOutputs)); + ASSERT_EQ((*output)["second"], Value(expectedOutputs)); + + // Should be exhausted now. + ASSERT_FALSE(facetStage->getNext()); + ASSERT_FALSE(facetStage->getNext()); + ASSERT_FALSE(facetStage->getNext()); +} + +TEST_F(DocumentSourceFacetTest, ShouldBeAbleToEvaluateMultipleStagesWithinOneSubPipeline) { + auto ctx = getExpCtx(); + + auto firstDummy = DocumentSourcePassthrough::create(ctx); + auto secondDummy = DocumentSourcePassthrough::create(ctx); + auto pipeline = uassertStatusOK(Pipeline::create({firstDummy, secondDummy}, ctx)); + + auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + + std::deque<Document> inputs = {Document{{"_id", 0}}, Document{{"_id", 1}}}; + auto mock = DocumentSourceMock::create(inputs); + facetStage->setSource(mock.get()); + + auto output = facetStage->getNext(); + ASSERT_TRUE(output); + ASSERT_EQ(*output, Document(fromjson("{subPipe: [{_id: 0}, {_id: 1}]}"))); +} + +// +// Miscellaneous. +// + +TEST_F(DocumentSourceFacetTest, ShouldBeAbleToReParseSerializedStage) { + auto ctx = getExpCtx(); + + // Create a facet stage like the following: + // {$facet: { + // skippedOne: [{$skip: 1}], + // skippedTwo: [{$skip: 2}] + // }} + auto firstSkip = DocumentSourceSkip::create(ctx); + firstSkip->setSkip(1); + auto firstPipeline = uassertStatusOK(Pipeline::create({firstSkip}, ctx)); + + auto secondSkip = DocumentSourceSkip::create(ctx); + secondSkip->setSkip(2); + auto secondPipeline = uassertStatusOK(Pipeline::create({secondSkip}, ctx)); + + auto facetStage = DocumentSourceFacet::create( + {{"skippedOne", firstPipeline}, {"skippedTwo", secondPipeline}}, ctx); + + // Serialize the facet stage. + std::vector<Value> serialization; + facetStage->serializeToArray(serialization); + ASSERT_EQ(serialization.size(), 1UL); + ASSERT_EQ(serialization[0].getType(), BSONType::Object); + + // The fields are in no guaranteed order, so we can't make a simple Document comparison. + ASSERT_EQ(serialization[0].getDocument().size(), 1UL); + ASSERT_EQ(serialization[0].getDocument()["$facet"].getType(), BSONType::Object); + + // Should have two fields: "skippedOne" and "skippedTwo". + auto serializedStage = serialization[0].getDocument()["$facet"].getDocument(); + ASSERT_EQ(serializedStage.size(), 2UL); + ASSERT_EQ(serializedStage["skippedOne"], + Value(std::vector<Value>{Value(Document{{"$skip", 1}})})); + ASSERT_EQ(serializedStage["skippedTwo"], + Value(std::vector<Value>{Value(Document{{"$skip", 2}})})); + + auto serializedBson = serialization[0].getDocument().toBson(); + auto roundTripped = DocumentSourceFacet::createFromBson(serializedBson.firstElement(), ctx); + + // Serialize one more time to make sure we get the same thing. + std::vector<Value> newSerialization; + roundTripped->serializeToArray(newSerialization); + + ASSERT_EQ(newSerialization.size(), 1UL); + ASSERT_EQ(newSerialization[0], serialization[0]); +} + +TEST_F(DocumentSourceFacetTest, ShouldOptimizeInnerPipelines) { + auto ctx = getExpCtx(); + + auto dummy = DocumentSourcePassthrough::create(ctx); + auto pipeline = uassertStatusOK(Pipeline::create({dummy}, ctx)); + + auto facetStage = DocumentSourceFacet::create({{"subPipe", pipeline}}, ctx); + + ASSERT_FALSE(dummy->isOptimized); + facetStage->optimize(); + ASSERT_TRUE(dummy->isOptimized); +} + +TEST_F(DocumentSourceFacetTest, ShouldPropogateDetachingAndReattachingOfOpCtx) { + auto ctx = getExpCtx(); + + auto firstDummy = DocumentSourcePassthrough::create(ctx); + auto firstPipeline = uassertStatusOK(Pipeline::create({firstDummy}, ctx)); + + auto secondDummy = DocumentSourcePassthrough::create(ctx); + auto secondPipeline = uassertStatusOK(Pipeline::create({secondDummy}, ctx)); + + auto facetStage = + DocumentSourceFacet::create({{"one", firstPipeline}, {"two", secondPipeline}}, ctx); + + // Test detaching. + ASSERT_FALSE(firstDummy->isDetachedFromOpCtx); + ASSERT_FALSE(secondDummy->isDetachedFromOpCtx); + facetStage->doDetachFromOperationContext(); + ASSERT_TRUE(firstDummy->isDetachedFromOpCtx); + ASSERT_TRUE(secondDummy->isDetachedFromOpCtx); + + // Test reattaching. + facetStage->doReattachToOperationContext(ctx->opCtx); + ASSERT_FALSE(firstDummy->isDetachedFromOpCtx); + ASSERT_FALSE(secondDummy->isDetachedFromOpCtx); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_mock.cpp b/src/mongo/db/pipeline/document_source_mock.cpp index 3db28967636..26152f07674 100644 --- a/src/mongo/db/pipeline/document_source_mock.cpp +++ b/src/mongo/db/pipeline/document_source_mock.cpp @@ -38,6 +38,10 @@ using boost::intrusive_ptr; DocumentSourceMock::DocumentSourceMock(std::deque<Document> docs) : DocumentSource(NULL), queue(std::move(docs)) {} +DocumentSourceMock::DocumentSourceMock(std::deque<Document> docs, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), queue(std::move(docs)) {} + const char* DocumentSourceMock::getSourceName() const { return "mock"; } @@ -47,7 +51,7 @@ Value DocumentSourceMock::serialize(bool explain) const { } void DocumentSourceMock::dispose() { - disposed = true; + isDisposed = true; } intrusive_ptr<DocumentSourceMock> DocumentSourceMock::create(std::deque<Document> docs) { @@ -77,7 +81,8 @@ intrusive_ptr<DocumentSourceMock> DocumentSourceMock::create( } boost::optional<Document> DocumentSourceMock::getNext() { - invariant(!disposed); + invariant(!isDisposed); + invariant(!isDetachedFromOpCtx); if (queue.empty()) { return {}; diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.cpp b/src/mongo/db/pipeline/document_source_tee_consumer.cpp new file mode 100644 index 00000000000..321d228b7cc --- /dev/null +++ b/src/mongo/db/pipeline/document_source_tee_consumer.cpp @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_tee_consumer.h" + +#include <boost/intrusive_ptr.hpp> +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression_context.h" + +namespace mongo { + +using boost::intrusive_ptr; + +DocumentSourceTeeConsumer::DocumentSourceTeeConsumer(const intrusive_ptr<ExpressionContext>& expCtx, + const intrusive_ptr<TeeBuffer>& bufferSource) + : DocumentSource(expCtx), _bufferSource(bufferSource), _iterator() {} + +boost::intrusive_ptr<DocumentSourceTeeConsumer> DocumentSourceTeeConsumer::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<TeeBuffer>& bufferSource) { + return new DocumentSourceTeeConsumer(expCtx, bufferSource); +} + +boost::optional<Document> DocumentSourceTeeConsumer::getNext() { + pExpCtx->checkForInterrupt(); + + if (!_initialized) { + _bufferSource->populate(); + _initialized = true; + _iterator = _bufferSource->begin(); + } + + if (_iterator == _bufferSource->end()) { + return boost::none; + } + + return {*_iterator++}; +} + +void DocumentSourceTeeConsumer::dispose() { + // Release our reference to the buffer. We shouldn't call dispose() on the buffer, since there + // might be other consumers that need to use it. + _bufferSource.reset(); +} + +Value DocumentSourceTeeConsumer::serialize(bool explain) const { + // This stage will be inserted into the beginning of a pipeline, but should not show up in the + // explain output. + return Value(); +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h new file mode 100644 index 00000000000..d274f3995f5 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <boost/optional.hpp> + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/tee_buffer.h" + +namespace mongo { + +class Document; +struct ExpressionContext; +class Value; + +/** + * This stage acts as a proxy between a pipeline within a $facet stage and the buffer of incoming + * documents held in a TeeBuffer stage. It will simply open an iterator on the TeeBuffer stage, and + * answer calls to getNext() by advancing said iterator. + */ +class DocumentSourceTeeConsumer : public DocumentSource { +public: + static boost::intrusive_ptr<DocumentSourceTeeConsumer> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<TeeBuffer>& bufferSource); + + void dispose() final; + boost::optional<Document> getNext() final; + + Value serialize(bool explain = false) const final; + +private: + DocumentSourceTeeConsumer(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<TeeBuffer>& bufferSource); + + bool _initialized = false; + boost::intrusive_ptr<TeeBuffer> _bufferSource; + TeeBuffer::const_iterator _iterator; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/field_path.h b/src/mongo/db/pipeline/field_path.h index 6cec754815c..3f26891cd2b 100644 --- a/src/mongo/db/pipeline/field_path.h +++ b/src/mongo/db/pipeline/field_path.h @@ -94,10 +94,10 @@ public: */ FieldPath tail() const; -private: /** Uassert if a field name does not pass validation. */ static void uassertValidFieldName(const std::string& fieldName); +private: /** * Push a new field name to the back of the vector of names comprising the field path. * Uassert if 'fieldName' does not pass validation. diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 2dc5ffc7899..3b76d574cc7 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -59,6 +59,9 @@ namespace dps = ::mongo::dotted_path_support; Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {} +Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) + : _sources(stages), pCtx(expCtx) {} + StatusWith<intrusive_ptr<Pipeline>> Pipeline::parse( const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { intrusive_ptr<Pipeline> pipeline(new Pipeline(expCtx)); @@ -73,9 +76,18 @@ StatusWith<intrusive_ptr<Pipeline>> Pipeline::parse( if (!status.isOK()) { return status; } + pipeline->stitch(); + return pipeline; +} - pipeline->optimizePipeline(); - +StatusWith<intrusive_ptr<Pipeline>> Pipeline::create( + SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) { + intrusive_ptr<Pipeline> pipeline(new Pipeline(stages, expCtx)); + auto status = pipeline->ensureAllStagesAreInLegalPositions(); + if (!status.isOK()) { + return status; + } + pipeline->stitch(); return pipeline; } @@ -391,6 +403,9 @@ vector<Value> Pipeline::writeExplainOps() const { } void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) { + if (!_sources.empty()) { + _sources.front()->setSource(source.get()); + } _sources.push_front(source); } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 84a8333ff8e..66bef44c324 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -58,13 +58,23 @@ public: /** * Parses a Pipeline from a BSONElement representing a list of DocumentSources. Returns a non-OK - * status if it failed to parse. + * status if it failed to parse. The returned pipeline is not optimized, but the caller may + * convert it to an optimized pipeline by calling optimizePipeline(). */ static StatusWith<boost::intrusive_ptr<Pipeline>> parse( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** + * Creates a Pipeline from an existing SourceContainer. + * + * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage + * is present but is not the last stage. + */ + static StatusWith<boost::intrusive_ptr<Pipeline>> create( + SourceContainer sources, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + /** * Helper to implement Command::checkAuthForCommand. */ static Status checkAuthForCommand(ClientBasic* client, @@ -172,6 +182,9 @@ public: */ DepsTracker getDependencies(const BSONObj& initialQuery) const; + const SourceContainer& getSources() { + return _sources; + } /* PipelineD is a "sister" class that has additional functionality @@ -198,6 +211,7 @@ private: friend class Optimizations::Sharded; Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx); + Pipeline(SourceContainer stages, const boost::intrusive_ptr<ExpressionContext>& pCtx); /** * Stitch together the source pointers by calling setSource() for each source in '_sources'. diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 1bda01e3968..c08278c4737 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -89,6 +89,7 @@ public: AggregationRequest request(NamespaceString("a.collection"), rawPipeline); intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(&_opCtx, request); auto outputPipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx)); + outputPipe->optimizePipeline(); ASSERT_EQUALS(Value(outputPipe->writeExplainOps()), Value(outputPipeExpected["pipeline"])); ASSERT_EQUALS(Value(outputPipe->serialize()), Value(serializePipeExpected["pipeline"])); @@ -735,6 +736,7 @@ public: AggregationRequest request(NamespaceString("a.collection"), rawPipeline); intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(&_opCtx, request); mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx)); + mergePipe->optimizePipeline(); shardPipe = mergePipe->splitForSharded(); ASSERT(shardPipe != nullptr); diff --git a/src/mongo/db/pipeline/tee_buffer.cpp b/src/mongo/db/pipeline/tee_buffer.cpp new file mode 100644 index 00000000000..9119665748e --- /dev/null +++ b/src/mongo/db/pipeline/tee_buffer.cpp @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/tee_buffer.h" + +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/value.h" + +namespace mongo { + +TeeBuffer::TeeBuffer(uint64_t maxMemoryUsageBytes) + : _maxMemoryUsageBytes(maxMemoryUsageBytes), _buffer() {} + +boost::intrusive_ptr<TeeBuffer> TeeBuffer::create(uint64_t maxMemoryUsageBytes) { + return new TeeBuffer(maxMemoryUsageBytes); +} + +TeeBuffer::const_iterator TeeBuffer::begin() const { + invariant(_populated); + return _buffer.begin(); +} + +TeeBuffer::const_iterator TeeBuffer::end() const { + invariant(_populated); + return _buffer.end(); +} + +void TeeBuffer::dispose() { + _buffer.clear(); + _populated = false; // Set this to ensure no one is calling begin() or end(). +} + +void TeeBuffer::populate() { + invariant(_source); + if (_populated) { + return; + } + _populated = true; + + size_t estimatedMemoryUsageBytes = 0; + while (auto next = _source->getNext()) { + estimatedMemoryUsageBytes += next->getApproximateSize(); + uassert(40174, + "Exceeded memory limit for $facet", + estimatedMemoryUsageBytes <= _maxMemoryUsageBytes); + + _buffer.emplace_back(std::move(*next)); + } +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/tee_buffer.h b/src/mongo/db/pipeline/tee_buffer.h new file mode 100644 index 00000000000..57ce3a611d7 --- /dev/null +++ b/src/mongo/db/pipeline/tee_buffer.h @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/intrusive_ptr.hpp> +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +class Document; +struct ExpressionContext; +class Value; + +/** + * This stage takes a stream of input documents and makes them available to multiple consumers. To + * do so, it will buffer all incoming documents up to the configured memory limit, then provide + * access to that buffer via an iterator. + * + * TODO SERVER-24153: This stage should be able to spill to disk if allowed to and the memory limit + * has been exceeded. + */ +class TeeBuffer : public RefCountable { +public: + using const_iterator = std::vector<Document>::const_iterator; + + static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024; + + static boost::intrusive_ptr<TeeBuffer> create( + uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes); + + void setSource(const boost::intrusive_ptr<DocumentSource>& source) { + _source = source; + } + + /** + * Clears '_buffer'. Once dispose() is called, all iterators are invalid, and it is illegal to + * call begin() or end(). + */ + void dispose(); + + /** + * Populates the buffer by consuming all input from 'pSource'. This must be called before + * calling begin() or end(). + */ + void populate(); + + const_iterator begin() const; + const_iterator end() const; + +private: + TeeBuffer(uint64_t maxMemoryUsageBytes); + + bool _populated = false; + uint64_t _maxMemoryUsageBytes; + std::vector<Document> _buffer; + boost::intrusive_ptr<DocumentSource> _source; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/tee_buffer_test.cpp b/src/mongo/db/pipeline/tee_buffer_test.cpp new file mode 100644 index 00000000000..9fb2b56f1fc --- /dev/null +++ b/src/mongo/db/pipeline/tee_buffer_test.cpp @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/tee_buffer.h" + +#include "mongo/db/pipeline/document.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +// Crutch. +bool isMongos() { + return false; +} + +namespace { + +TEST(TeeBufferTest, ShouldProduceEmptyIteratorsWhenGivenNoInput) { + auto mock = DocumentSourceMock::create(); + auto teeBuffer = TeeBuffer::create(); + teeBuffer->setSource(mock.get()); + teeBuffer->populate(); + + // There are no inputs, so begin() should equal end(). + ASSERT(teeBuffer->begin() == teeBuffer->end()); +} + +TEST(TeeBufferTest, ShouldProvideIteratorOverSingleDocument) { + auto inputDoc = Document{{"a", 1}}; + auto mock = DocumentSourceMock::create(inputDoc); + auto teeBuffer = TeeBuffer::create(); + teeBuffer->setSource(mock.get()); + teeBuffer->populate(); + + // Should be able to establish an iterator and get the document back. + auto it = teeBuffer->begin(); + ASSERT(it != teeBuffer->end()); + ASSERT_EQ(*it, inputDoc); + ++it; + ASSERT(it == teeBuffer->end()); +} + +TEST(TeeBufferTest, ShouldProvideIteratorOverTwoDocuments) { + std::deque<Document> inputDocs = {Document{{"a", 1}}, Document{{"a", 2}}}; + auto mock = DocumentSourceMock::create(inputDocs); + auto teeBuffer = TeeBuffer::create(); + teeBuffer->setSource(mock.get()); + teeBuffer->populate(); + + auto it = teeBuffer->begin(); + ASSERT(it != teeBuffer->end()); + ASSERT_EQ(*it, inputDocs.front()); + ++it; + ASSERT(it != teeBuffer->end()); + ASSERT_EQ(*it, inputDocs.back()); + ++it; + ASSERT(it == teeBuffer->end()); +} + +TEST(TeeBufferTest, ShouldBeAbleToProvideMultipleIteratorsOverTheSameInputs) { + std::deque<Document> inputDocs = {Document{{"a", 1}}, Document{{"a", 2}}}; + auto mock = DocumentSourceMock::create(inputDocs); + auto teeBuffer = TeeBuffer::create(); + teeBuffer->setSource(mock.get()); + teeBuffer->populate(); + + auto firstIt = teeBuffer->begin(); + auto secondIt = teeBuffer->begin(); + + // Advance both once. + ASSERT(firstIt != teeBuffer->end()); + ASSERT_EQ(*firstIt, inputDocs.front()); + ++firstIt; + ASSERT(secondIt != teeBuffer->end()); + ASSERT_EQ(*secondIt, inputDocs.front()); + ++secondIt; + + // Advance them both again. + ASSERT(firstIt != teeBuffer->end()); + ASSERT_EQ(*firstIt, inputDocs.back()); + ++firstIt; + ASSERT(secondIt != teeBuffer->end()); + ASSERT_EQ(*secondIt, inputDocs.back()); + ++secondIt; + + // Assert they've both reached the end. + ASSERT(firstIt == teeBuffer->end()); + ASSERT(secondIt == teeBuffer->end()); +} + +TEST(TeeBufferTest, ShouldErrorWhenBufferingTooManyDocuments) { + // Queue up at least 2000 bytes of input from a mock stage. + std::deque<Document> inputs; + auto largeStr = std::string(1000, 'y'); + auto inputDoc = Document{{"x", largeStr}}; + ASSERT_GTE(inputDoc.getApproximateSize(), 1000UL); + inputs.push_back(inputDoc); + inputs.push_back(Document{{"x", largeStr}}); + auto mock = DocumentSourceMock::create(inputs); + + const uint64_t maxMemoryUsageBytes = 1000; + auto teeBuffer = TeeBuffer::create(maxMemoryUsageBytes); + teeBuffer->setSource(mock.get()); + + // Should exceed the configured memory limit. + ASSERT_THROWS(teeBuffer->populate(), UserException); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 46cf44b9570..0dea97cdf7f 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -127,11 +127,12 @@ public: mergeCtx->inRouter = true; // explicitly *not* setting mergeCtx->tempDir - // Parse the pipeline specification. + // Parse and optimize the pipeline specification. auto pipeline = Pipeline::parse(request.getValue().getPipeline(), mergeCtx); if (!pipeline.isOK()) { return appendCommandStatus(result, pipeline.getStatus()); } + pipeline.getValue()->optimizePipeline(); for (auto&& ns : pipeline.getValue()->getInvolvedCollections()) { uassert( |