diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-06-23 17:47:13 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2016-06-24 11:51:20 -0400 |
commit | 20e9b2798d69fb2367ff9f16a1b30f4f9b73d93b (patch) | |
tree | 19acf1fff91817744a202958808800544b783486 | |
parent | 5bdf5d6b8995637193a37d04a0b816b71e47b9fb (diff) | |
download | mongo-20e9b2798d69fb2367ff9f16a1b30f4f9b73d93b.tar.gz |
SERVER-24638 Move command processing from Pipeline to AggregationRequest
23 files changed, 918 insertions, 415 deletions
diff --git a/jstests/aggregation/bugs/server7781.js b/jstests/aggregation/bugs/server7781.js index e8684dd813f..678a224630a 100644 --- a/jstests/aggregation/bugs/server7781.js +++ b/jstests/aggregation/bugs/server7781.js @@ -11,10 +11,9 @@ db[coll].insert({loc: [0, 0]}); // $geoNear is only allowed as the first stage in a pipeline, nowhere else. - assertErrorCode( - db[coll], - [{$match: {x: 1}}, {$geoNear: {near: [1, 1], spherical: true, distanceField: 'dis'}}], - 28837); + assert.throws( + () => db[coll].aggregate( + [{$match: {x: 1}}, {$geoNear: {near: [1, 1], spherical: true, distanceField: 'dis'}}])); function checkOutput(cmdOut, aggOut, expectedNum) { assert.commandWorked(cmdOut, "geoNear command"); diff --git a/jstests/aggregation/bugs/server9444.js b/jstests/aggregation/bugs/server9444.js index f3dc2748b0a..b2f027d314c 100644 --- a/jstests/aggregation/bugs/server9444.js +++ b/jstests/aggregation/bugs/server9444.js @@ -29,17 +29,16 @@ assert.eq(res.code, outOfMemoryCode); // ensure allowDiskUse: false does what it says - var res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: false}); + res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: false}); assert.commandFailed(res); assert.eq(res.code, outOfMemoryCode); // allowDiskUse only supports bool. In particular, numbers aren't allowed. - var res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: 1}); + res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: 1}); assert.commandFailed(res); - assert.eq(res.code, 16949); // ensure we work when allowDiskUse === true - var res = t.aggregate(pipeline, {allowDiskUse: true}); + res = t.aggregate(pipeline, {allowDiskUse: true}); assert.eq(res.itcount(), t.count()); // all tests output one doc per input doc } diff --git a/jstests/noPassthrough/read_majority.js b/jstests/noPassthrough/read_majority.js index c2b20df292a..828b7fba4a5 100644 --- a/jstests/noPassthrough/read_majority.js +++ b/jstests/noPassthrough/read_majority.js @@ -41,8 +41,8 @@ load("jstests/libs/analyze_plan.js"); } function getReadMajorityAggCursor() { - var res = - t.runCommand('aggregate', {cursor: {batchSize: 2}, readConcern: {level: "majority"}}); + var res = t.runCommand( + 'aggregate', {pipeline: [], cursor: {batchSize: 2}, readConcern: {level: "majority"}}); assert.commandWorked(res); return new DBCommandCursor(db.getMongo(), res, 2); } diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 83ca9eb0139..9b722fb3b62 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -726,8 +726,7 @@ serveronlyLibdeps = [ "matcher/expressions_mongod_only", "ops/update_driver", "ops/write_ops_parsers", - "pipeline/document_source", - "pipeline/pipeline", + "pipeline/aggregation", "query/query", "range_deleter", "repl/bgsync", diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index b5c49095b9f..7d17775d507 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -43,6 +43,7 @@ #include "mongo/db/exec/pipeline_proxy.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression.h" @@ -68,16 +69,18 @@ using std::stringstream; using std::unique_ptr; using stdx::make_unique; +namespace { + /** * Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore * requests). Otherwise, returns false. */ -static bool handleCursorCommand(OperationContext* txn, - const string& ns, - ClientCursorPin* pin, - PlanExecutor* exec, - const BSONObj& cmdObj, - BSONObjBuilder& result) { +bool handleCursorCommand(OperationContext* txn, + const string& ns, + ClientCursorPin* pin, + PlanExecutor* exec, + const BSONObj& cmdObj, + BSONObjBuilder& result) { ClientCursor* cursor = pin ? pin->c() : NULL; if (pin) { invariant(cursor); @@ -156,10 +159,42 @@ static bool handleCursorCommand(OperationContext* txn, return static_cast<bool>(cursor); } +/** + * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse(). + * fasserts if it fails to parse after being serialized. + */ +boost::intrusive_ptr<Pipeline> reparsePipeline( + const boost::intrusive_ptr<Pipeline>& pipeline, + const AggregationRequest& request, + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + auto serialized = pipeline->serialize(); + + // Convert vector<Value> to vector<BSONObj>. + std::vector<BSONObj> parseableSerialization; + parseableSerialization.reserve(serialized.size()); + for (auto&& serializedStage : serialized) { + invariant(serializedStage.getType() == BSONType::Object); + parseableSerialization.push_back(serializedStage.getDocument().toBson()); + } + + auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx); + if (!reparsedPipeline.isOK()) { + error() << "Aggregation command did not round trip through parsing and serialization " + "correctly. Input pipeline: " + << Value(request.getPipeline()).toString() + << ", serialized pipeline: " << Value(serialized).toString(); + fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus()); + } + + return reparsedPipeline.getValue(); +} + +} // namespace class PipelineCommand : public Command { public: - PipelineCommand() : Command(Pipeline::commandName) {} // command is called "aggregate" + PipelineCommand() + : Command(AggregationRequest::kCommandName) {} // command is called "aggregate" // Locks are managed manually, in particular by DocumentSourceCursor. virtual bool supportsWriteConcern(const BSONObj& cmd) const override { @@ -210,24 +245,29 @@ public: } NamespaceString nss(ns); - intrusive_ptr<ExpressionContext> pCtx = new ExpressionContext(txn, nss); - pCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; + // Parse the options for this request. + auto request = AggregationRequest::parseFromBSON(nss, cmdObj); + if (!request.isOK()) { + return appendCommandStatus(result, request.getStatus()); + } - /* try to parse the command; if this fails, then we didn't run */ - intrusive_ptr<Pipeline> pPipeline = Pipeline::parseCommand(errmsg, cmdObj, pCtx); - if (!pPipeline.get()) - return false; + // Set up the ExpressionContext. + intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(txn, request.getValue()); + expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; - // This is outside of the if block to keep the object alive until the pipeline is finished. - BSONObj parsed; - if (kDebugBuild && !pPipeline->isExplain() && !pCtx->inShard) { - // Make sure all operations round-trip through Pipeline::toBson() correctly by - // reparsing every command in debug builds. This is important because sharded - // aggregations rely on this ability. Skipping when inShard because this has - // already been through the transformation (and this unsets pCtx->inShard). - parsed = pPipeline->serialize().toBson(); - pPipeline = Pipeline::parseCommand(errmsg, parsed, pCtx); - verify(pPipeline); + // Parse the pipeline. + auto statusWithPipeline = Pipeline::parse(request.getValue().getPipeline(), expCtx); + if (!statusWithPipeline.isOK()) { + return appendCommandStatus(result, statusWithPipeline.getStatus()); + } + auto pipeline = std::move(statusWithPipeline.getValue()); + + if (kDebugBuild && !expCtx->isExplain && !expCtx->inShard) { + // Make sure all operations round-trip through Pipeline::serialize() correctly by + // re-parsing every command in debug builds. This is important because sharded + // aggregations rely on this ability. Skipping when inShard because this has already + // been through the transformation (and this un-sets expCtx->inShard). + pipeline = reparsePipeline(pipeline, request.getValue(), expCtx); } unique_ptr<ClientCursorPin> pin; // either this OR the exec will be non-null @@ -246,22 +286,21 @@ public: // If the pipeline does not have a user-specified collation, set it from the // collection default. - if (pPipeline->getContext()->collation.isEmpty() && collection && + if (request.getValue().getCollation().isEmpty() && collection && collection->getDefaultCollator()) { - pPipeline->setCollator(collection->getDefaultCollator()->clone()); + pipeline->setCollator(collection->getDefaultCollator()->clone()); } // This does mongod-specific stuff like creating the input PlanExecutor and adding // it to the front of the pipeline if needed. std::shared_ptr<PlanExecutor> input = - PipelineD::prepareCursorSource(txn, collection, nss, pPipeline, pCtx); - pPipeline->stitch(); + PipelineD::prepareCursorSource(txn, collection, nss, pipeline, expCtx); // Create the PlanExecutor which returns results from the pipeline. The WorkingSet // ('ws') and the PipelineProxyStage ('proxy') will be owned by the created // PlanExecutor. auto ws = make_unique<WorkingSet>(); - auto proxy = make_unique<PipelineProxyStage>(txn, pPipeline, input, ws.get()); + auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, input, ws.get()); auto statusWithPlanExecutor = (NULL == collection) ? PlanExecutor::make( @@ -327,8 +366,8 @@ public: } // If both explain and cursor are specified, explain wins. - if (pPipeline->isExplain()) { - result << "stages" << Value(pPipeline->writeExplainOps()); + if (expCtx->isExplain) { + result << "stages" << Value(pipeline->writeExplainOps()); } else if (isCursorCommand) { keepCursor = handleCursorCommand(txn, nss.ns(), @@ -337,10 +376,10 @@ public: cmdObj, result); } else { - pPipeline->run(result); + pipeline->run(result); } - if (!pPipeline->isExplain()) { + if (!expCtx->isExplain) { PlanSummaryStats stats; Explain::getSummaryStats(pin ? *pin->c()->getExecutor() : *exec.get(), &stats); curOp->debug().setPlanSummaryMetrics(stats); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 1732f2bb376..fb8ac72df4d 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -1,7 +1,20 @@ +# -*- mode: python -*- Import('env') env.Library( + target='aggregation', + source=[ + ], + LIBDEPS=[ + 'aggregation_request', + 'document_source', + 'expression_context', + 'pipeline', + ] +) + +env.Library( target='field_path', source=[ 'field_path.cpp', @@ -41,6 +54,42 @@ env.CppUnitTest( ], ) +env.Library( + target='aggregation_request', + source=[ + 'aggregation_request.cpp', + ], + LIBDEPS=[ + 'document_value', + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/db/query/query_request', + '$BUILD_DIR/mongo/db/repl/read_concern_args', + '$BUILD_DIR/mongo/db/storage/storage_options', + ] +) + +env.CppUnitTest( + target='aggregation_request_test', + source='aggregation_request_test.cpp', + LIBDEPS=[ + 'aggregation_request', + ], +) + +env.Library( + target='expression_context', + source=[ + 'expression_context.cpp', + ], + LIBDEPS=[ + 'aggregation_request', + '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/util/intrusive_counter', + ] +) + env.CppUnitTest( target='document_source_test', source='document_source_test.cpp', @@ -130,6 +179,7 @@ docSourceEnv.Library( 'dependencies', 'document_value', 'expression', + 'expression_context', '$BUILD_DIR/mongo/client/clientdriver', '$BUILD_DIR/mongo/db/bson/dotted_path_support', '$BUILD_DIR/mongo/db/matcher/expressions', @@ -145,7 +195,6 @@ docSourceEnv.Library( # which is not uniquely defined 'incomplete' ], - ) env.Library( @@ -157,6 +206,7 @@ env.Library( 'dependencies', 'document_source', 'document_value', + 'expression_context', '$BUILD_DIR/mongo/db/auth/authorization_manager_global', '$BUILD_DIR/mongo/db/auth/authcore', '$BUILD_DIR/mongo/db/bson/dotted_path_support', diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp new file mode 100644 index 00000000000..365debf849c --- /dev/null +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/aggregation_request.h" + +#include <algorithm> + +#include "mongo/base/error_codes.h" +#include "mongo/base/status_with.h" +#include "mongo/base/string_data.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/db/query/query_request.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/storage/storage_options.h" + +namespace mongo { + +const StringData AggregationRequest::kCommandName = "aggregate"_sd; +const StringData AggregationRequest::kFromRouterName = "fromRouter"_sd; +const StringData AggregationRequest::kPipelineName = "pipeline"_sd; +const StringData AggregationRequest::kCollationName = "collation"_sd; +const StringData AggregationRequest::kExplainName = "explain"_sd; +const StringData AggregationRequest::kAllowDiskUseName = "allowDiskUse"_sd; + +AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline) + : _nss(std::move(nss)), _pipeline(std::move(pipeline)) {} + +StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(NamespaceString nss, + const BSONObj& cmdObj) { + // Parse required parameters. + auto pipelineElem = cmdObj[kPipelineName]; + if (pipelineElem.eoo() || pipelineElem.type() != BSONType::Array) { + return {ErrorCodes::TypeMismatch, "'pipeline' option must be specified as an array"}; + } + std::vector<BSONObj> pipeline; + for (auto elem : pipelineElem.Obj()) { + if (elem.type() != BSONType::Object) { + return {ErrorCodes::TypeMismatch, + "Each element of the 'pipeline' array must be an object"}; + } + pipeline.push_back(elem.embeddedObject().getOwned()); + } + + AggregationRequest request(std::move(nss), std::move(pipeline)); + + const std::initializer_list<StringData> optionsParsedElseWhere = { + QueryRequest::cmdOptionMaxTimeMS, + "cursor"_sd, + "writeConcern"_sd, + kPipelineName, + kCommandName, + repl::ReadConcernArgs::kReadConcernFieldName}; + + // Parse optional parameters. + for (auto&& elem : cmdObj) { + auto fieldName = elem.fieldNameStringData(); + + // Ignore top-level fields prefixed with $. They are for the command processor, not us. + if (fieldName[0] == '$') { + continue; + } + + // Ignore options that are parsed elsewhere. + if (std::find(optionsParsedElseWhere.begin(), optionsParsedElseWhere.end(), fieldName) != + optionsParsedElseWhere.end()) { + continue; + } + + if (kCollationName == fieldName) { + if (elem.type() != BSONType::Object) { + return {ErrorCodes::TypeMismatch, + str::stream() << kCollationName << " must be an object, not a " + << typeName(elem.type())}; + } + request.setCollation(elem.embeddedObject().getOwned()); + } else if (kExplainName == fieldName) { + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() << kExplainName << " must be a boolean, not a " + << typeName(elem.type())}; + } + request.setExplain(elem.Bool()); + } else if (kFromRouterName == fieldName) { + if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() << kFromRouterName << " must be a boolean, not a " + << typeName(elem.type())}; + } + request.setFromRouter(elem.Bool()); + } else if (kAllowDiskUseName == fieldName) { + if (storageGlobalParams.readOnly) { + return {ErrorCodes::IllegalOperation, + str::stream() << "The '" << kAllowDiskUseName + << "' option is not permitted in read-only mode."}; + } else if (elem.type() != BSONType::Bool) { + return {ErrorCodes::TypeMismatch, + str::stream() << kAllowDiskUseName << " must be a boolean, not a " + << typeName(elem.type())}; + } + request.setAllowDiskUse(elem.Bool()); + } else if (bypassDocumentValidationCommandOption() == fieldName) { + request.setBypassDocumentValidation(elem.trueValue()); + } else { + return {ErrorCodes::FailedToParse, + str::stream() << "unrecognized field '" << elem.fieldName() << "'"}; + } + } + return request; +} + +Document AggregationRequest::serializeToCommandObj() const { + MutableDocument serialized; + return Document{{kCommandName, _nss.coll()}, + {kPipelineName, _pipeline}, + // Only serialize booleans if different than their default. + {kExplainName, _explain ? Value(true) : Value()}, + {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()}, + {kFromRouterName, _fromRouter ? Value(true) : Value()}, + {bypassDocumentValidationCommandOption(), + _bypassDocumentValidation ? Value(true) : Value()}, + // Only serialize a collation if one was specified. + {kCollationName, _collation.isEmpty() ? Value() : Value(_collation)}}; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h new file mode 100644 index 00000000000..23601869245 --- /dev/null +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -0,0 +1,157 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/bson/bsonelement.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" + +namespace mongo { + +template <typename T> +class StatusWith; +class Document; + +/** + * Represents the user-supplied options to the aggregate command. + */ +class AggregationRequest { +public: + static const StringData kCommandName; + static const StringData kFromRouterName; + static const StringData kPipelineName; + static const StringData kCollationName; + static const StringData kExplainName; + static const StringData kAllowDiskUseName; + + /** + * Create a new instance of AggregationRequest by parsing the raw command object. Returns a + * non-OK status if a required field was missing, if there was an unrecognized field name or if + * there was a bad value for one of the fields. + */ + static StatusWith<AggregationRequest> parseFromBSON(NamespaceString nss, const BSONObj& cmdObj); + + /** + * Constructs an AggregationRequest over the given namespace with the given pipeline. All + * options aside from the pipeline assume their default values. + */ + AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline); + + /** + * Serializes the options to a Document. Note that this serialization includes the original + * pipeline object, as specified. Callers will likely want to override this field with a + * serialization of a parsed and optimized Pipeline object. + */ + Document serializeToCommandObj() const; + + // + // Getters. + // + + const NamespaceString& getNamespaceString() const { + return _nss; + } + + /** + * An unparsed version of the pipeline. All BSONObjs are owned. + */ + const std::vector<BSONObj>& getPipeline() const { + return _pipeline; + } + + bool isExplain() const { + return _explain; + } + + bool isFromRouter() const { + return _fromRouter; + } + + bool shouldAllowDiskUse() const { + return _allowDiskUse; + } + + bool shouldBypassDocumentValidation() const { + return _bypassDocumentValidation; + } + + /** + * Returns an empty object if no collation was specified. + */ + BSONObj getCollation() const { + return _collation; + } + + // + // Setters for optional fields. + // + + void setCollation(BSONObj collation) { + _collation = collation.getOwned(); + } + + void setExplain(bool isExplain) { + _explain = isExplain; + } + + void setAllowDiskUse(bool allowDiskUse) { + _allowDiskUse = allowDiskUse; + } + + void setFromRouter(bool isFromRouter) { + _fromRouter = isFromRouter; + } + + void setBypassDocumentValidation(bool shouldBypassDocumentValidation) { + _bypassDocumentValidation = shouldBypassDocumentValidation; + } + +private: + // Required fields. + + const NamespaceString _nss; + + // An unparsed version of the pipeline. + const std::vector<BSONObj> _pipeline; + + // Optional fields. + + // An owned copy of the user-specified collation object, or an empty object if no collation was + // specified. + BSONObj _collation; + + bool _explain = false; + bool _allowDiskUse = false; + bool _fromRouter = false; + bool _bypassDocumentValidation = false; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp new file mode 100644 index 00000000000..ac11018da85 --- /dev/null +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -0,0 +1,197 @@ +/** + * Copyright (C) 2016 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/aggregation_request.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/json.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace { + +// +// Parsing +// + +TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson( + "{pipeline: [{$match: {a: 'abc'}}], explain: true, allowDiskUse: true, fromRouter: true, " + "bypassDocumentValidation: true, collation: {locale: 'en_US'}}"); + auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); + ASSERT_TRUE(request.isExplain()); + ASSERT_TRUE(request.shouldAllowDiskUse()); + ASSERT_TRUE(request.isFromRouter()); + ASSERT_TRUE(request.shouldBypassDocumentValidation()); + ASSERT_EQ(request.getCollation(), + BSON("locale" + << "en_US")); +} + +// +// Serialization +// + +TEST(AggregationRequestTest, ShouldOnlySerializeRequiredFieldsIfNoOptionalFieldsAreSpecified) { + NamespaceString nss("a.collection"); + AggregationRequest request(nss, {}); + + auto expectedSerialization = + Document{{AggregationRequest::kCommandName, nss.coll()}, + {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}}; + ASSERT_EQ(request.serializeToCommandObj(), expectedSerialization); +} + +TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefault) { + NamespaceString nss("a.collection"); + AggregationRequest request(nss, {}); + request.setExplain(false); + request.setAllowDiskUse(false); + request.setFromRouter(false); + request.setBypassDocumentValidation(false); + request.setCollation(BSONObj()); + + auto expectedSerialization = + Document{{AggregationRequest::kCommandName, nss.coll()}, + {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}}; + ASSERT_EQ(request.serializeToCommandObj(), expectedSerialization); +} + +TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { + NamespaceString nss("a.collection"); + AggregationRequest request(nss, {}); + request.setExplain(true); + request.setAllowDiskUse(true); + request.setFromRouter(true); + request.setBypassDocumentValidation(true); + const auto collationObj = BSON("locale" + << "en_US"); + request.setCollation(collationObj); + + auto expectedSerialization = + Document{{AggregationRequest::kCommandName, nss.coll()}, + {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}, + {AggregationRequest::kExplainName, true}, + {AggregationRequest::kAllowDiskUseName, true}, + {AggregationRequest::kFromRouterName, true}, + {bypassDocumentValidationCommandOption(), true}, + {AggregationRequest::kCollationName, collationObj}}; + ASSERT_EQ(request.serializeToCommandObj(), expectedSerialization); +} + +// +// Error cases. +// + +TEST(AggregationRequestTest, ShouldRejectNonArrayPipeline) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: {}}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectPipelineArrayIfAnElementIsNotAnObject) { + NamespaceString nss("a.collection"); + BSONObj inputBson = fromjson("{pipeline: [4]}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); + + inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}, 4]}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectNonObjectCollation) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], collation: 1}"); + ASSERT_NOT_OK( + AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectNonBoolExplain) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], explain: 1}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectNonBoolFromRouter) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], fromRouter: 1}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldRejectNonBoolAllowDiskUse) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], allowDiskUse: 1}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +// +// Ignore fields parsed elsewhere. +// + +TEST(AggregationRequestTest, ShouldIgnoreFieldsPrefixedWithDollar) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], $unknown: 1}"); + ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldIgnoreCursorOption) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: 1}"); + ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = + fromjson("{pipeline: [{$match: {a: 'abc'}}], writeConcern: 'invalid'}"); + ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldIgnoreMaxTimeMsOption) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], maxTimeMS: 'invalid'}"); + ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +TEST(AggregationRequestTest, ShouldIgnoreReadConcernOption) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = + fromjson("{pipeline: [{$match: {a: 'abc'}}], readConcern: 'invalid'}"); + ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 430e3cf1496..5b9b3b39b3b 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -81,7 +81,6 @@ const char* DocumentSource::getSourceName() const { void DocumentSource::setSource(DocumentSource* pTheSource) { verify(!isValidInitialSource()); - verify(!pSource); pSource = pTheSource; } diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index afc131ac7d8..d6cd692f0fe 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -183,6 +183,10 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( str::stream() << "$out only supports a string argument, not " << typeName(elem.type()), elem.type() == String); + uassert(ErrorCodes::InvalidOptions, + "$out can only be used with the 'local' read concern level", + !pExpCtx->opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); + NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str()); uassert(17385, "Can't $out to special collection: " + elem.str(), !outputNs.isSpecial()); return new DocumentSourceOut(outputNs, pExpCtx); diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp index 9a9331d013e..58b9732cdae 100644 --- a/src/mongo/db/pipeline/document_source_test.cpp +++ b/src/mongo/db/pipeline/document_source_test.cpp @@ -208,7 +208,7 @@ public: : _service(makeTestServiceContext()), _client(_service->makeClient("DocumentSourceTest")), _opCtx(_client->makeOperationContext()), - _ctx(new ExpressionContext(_opCtx.get(), NamespaceString(ns))) {} + _ctx(new ExpressionContext(_opCtx.get(), AggregationRequest(NamespaceString(ns), {}))) {} protected: intrusive_ptr<ExpressionContext> ctx() { @@ -447,7 +447,7 @@ protected: BSONElement specElement = namedSpec.firstElement(); intrusive_ptr<ExpressionContext> expressionContext = - new ExpressionContext(_opCtx.get(), NamespaceString(ns)); + new ExpressionContext(_opCtx.get(), AggregationRequest(NamespaceString(ns), {})); expressionContext->inShard = inShard; expressionContext->inRouter = inRouter; // Won't spill to disk properly if it needs to. diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp new file mode 100644 index 00000000000..d7c5a6708a0 --- /dev/null +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/query/collation/collator_factory_interface.h" + +namespace mongo { + +ExpressionContext::ExpressionContext(OperationContext* opCtx, const AggregationRequest& request) + : isExplain(request.isExplain()), + inShard(request.isFromRouter()), + extSortAllowed(request.shouldAllowDiskUse()), + bypassDocumentValidation(request.shouldBypassDocumentValidation()), + ns(request.getNamespaceString()), + opCtx(opCtx), + collation(request.getCollation()) { + if (!collation.isEmpty()) { + auto statusWithCollator = + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation); + uassertStatusOK(statusWithCollator.getStatus()); + collator = std::move(statusWithCollator.getValue()); + } +} + +void ExpressionContext::checkForInterrupt() { + // This check could be expensive, at least in relative terms, so don't check every time. + if (--interruptCounter == 0) { + opCtx->checkForInterrupt(); + interruptCounter = kInterruptCheckPeriod; + } +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 19c5bd59071..90d5bad3a90 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -28,10 +28,13 @@ #pragma once +#include <boost/intrusive_ptr.hpp> +#include <memory> #include <string> #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/util/intrusive_counter.h" @@ -39,23 +42,19 @@ namespace mongo { struct ExpressionContext : public IntrusiveCounterUnsigned { public: - ExpressionContext(OperationContext* opCtx, const NamespaceString& ns) : ns(ns), opCtx(opCtx) {} + ExpressionContext(OperationContext* opCtx, const AggregationRequest& request); - /** Used by a pipeline to check for interrupts so that killOp() works. - * @throws if the operation has been interrupted + /** + * Used by a pipeline to check for interrupts so that killOp() works. Throws a UserAssertion if + * this aggregation pipeline has been interrupted. */ - void checkForInterrupt() { - if (opCtx && --interruptCounter == 0) { // XXX SERVER-13931 for opCtx check - // The checkForInterrupt could be expensive, at least in relative terms. - opCtx->checkForInterrupt(); - interruptCounter = kInterruptCheckPeriod; - } - } + void checkForInterrupt(); - bool inShard = false; + bool isExplain; + bool inShard; bool inRouter = false; - bool extSortAllowed = false; - bool bypassDocumentValidation = false; + bool extSortAllowed; + bool bypassDocumentValidation; NamespaceString ns; std::string tempDir; // Defaults to empty to prevent external sorting in mongos. @@ -64,7 +63,7 @@ public: // Collation requested by the user for this pipeline. Empty if the user did not request a // collation. - BSONObj collation; + const BSONObj collation; // Collator used to compare elements. 'collator' is initialized from 'collation', except in the // case where 'collation' is empty and there is a collection default collation. diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index ca8b671a673..2dc5ffc7899 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -32,12 +32,12 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_optimizations.h" +#include "mongo/base/error_codes.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/document_validation.h" -#include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/accumulator.h" @@ -45,8 +45,6 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/query/collation/collator_factory_interface.h" -#include "mongo/db/repl/read_concern_args.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -59,182 +57,63 @@ using std::vector; namespace dps = ::mongo::dotted_path_support; -const char Pipeline::commandName[] = "aggregate"; -const char Pipeline::pipelineName[] = "pipeline"; -const char Pipeline::collationName[] = "collation"; -const char Pipeline::explainName[] = "explain"; -const char Pipeline::fromRouterName[] = "fromRouter"; -const char Pipeline::serverPipelineName[] = "serverPipeline"; -const char Pipeline::mongosPipelineName[] = "mongosPipeline"; - -Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) - : explain(false), pCtx(pTheCtx) {} - -intrusive_ptr<Pipeline> Pipeline::parseCommand(string& errmsg, - const BSONObj& cmdObj, - const intrusive_ptr<ExpressionContext>& pCtx) { - intrusive_ptr<Pipeline> pPipeline(new Pipeline(pCtx)); - vector<BSONElement> pipeline; - repl::ReadConcernArgs readConcernArgs; - - /* gather the specification for the aggregation */ - for (BSONObj::iterator cmdIterator = cmdObj.begin(); cmdIterator.more();) { - BSONElement cmdElement(cmdIterator.next()); - const char* pFieldName = cmdElement.fieldName(); - - // ignore top-level fields prefixed with $. They are for the command processor, not us. - if (pFieldName[0] == '$') { - continue; - } - - // maxTimeMS is also for the command processor. - if (str::equals(pFieldName, QueryRequest::cmdOptionMaxTimeMS)) { - continue; - } +Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {} - if (pFieldName == repl::ReadConcernArgs::kReadConcernFieldName) { - uassertStatusOK(readConcernArgs.initialize(cmdElement)); - continue; - } - - // ignore cursor options since they are handled externally. - if (str::equals(pFieldName, "cursor")) { - continue; - } +StatusWith<intrusive_ptr<Pipeline>> Pipeline::parse( + const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) { + intrusive_ptr<Pipeline> pipeline(new Pipeline(expCtx)); - // ignore writeConcern since it's handled externally - if (str::equals(pFieldName, "writeConcern")) { - continue; - } - - if (str::equals(pFieldName, collationName)) { - uassert(ErrorCodes::BadValue, - str::stream() << collationName << " must be an object, not a " - << typeName(cmdElement.type()), - cmdElement.type() == BSONType::Object); - pCtx->collation = cmdElement.Obj().getOwned(); - auto statusWithCollator = - CollatorFactoryInterface::get(pCtx->opCtx->getServiceContext()) - ->makeFromBSON(cmdElement.Obj()); - uassertStatusOK(statusWithCollator.getStatus()); - pCtx->collator = std::move(statusWithCollator.getValue()); - continue; - } - - /* look for the aggregation command */ - if (!strcmp(pFieldName, commandName)) { - continue; - } + for (auto&& stageObj : rawPipeline) { + auto parsedSources = DocumentSource::parse(expCtx, stageObj); + pipeline->_sources.insert( + pipeline->_sources.end(), parsedSources.begin(), parsedSources.end()); + } - /* check for the collection name */ - if (!strcmp(pFieldName, pipelineName)) { - pipeline = cmdElement.Array(); - continue; - } + auto status = pipeline->ensureAllStagesAreInLegalPositions(); + if (!status.isOK()) { + return status; + } - /* check for explain option */ - if (!strcmp(pFieldName, explainName)) { - pPipeline->explain = cmdElement.Bool(); - continue; - } + pipeline->optimizePipeline(); - /* if the request came from the router, we're in a shard */ - if (!strcmp(pFieldName, fromRouterName)) { - pCtx->inShard = cmdElement.Bool(); - continue; - } + return pipeline; +} - if (str::equals(pFieldName, "allowDiskUse")) { - uassert(ErrorCodes::IllegalOperation, - "The 'allowDiskUse' option is not permitted in read-only mode.", - !storageGlobalParams.readOnly); - - uassert(16949, - str::stream() << "allowDiskUse must be a bool, not a " - << typeName(cmdElement.type()), - cmdElement.type() == Bool); - pCtx->extSortAllowed = cmdElement.Bool(); - continue; +Status Pipeline::ensureAllStagesAreInLegalPositions() const { + size_t i = 0; + for (auto&& stage : _sources) { + if (stage->isValidInitialSource() && i != 0) { + return {ErrorCodes::BadValue, + str::stream() << stage->getSourceName() + << " is only valid as the first stage in a pipeline."}; } - if (pFieldName == bypassDocumentValidationCommandOption()) { - pCtx->bypassDocumentValidation = cmdElement.trueValue(); - continue; + if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) { + return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"}; } - - /* we didn't recognize a field in the command */ - ostringstream sb; - sb << "unrecognized field '" << cmdElement.fieldName() << "'"; - errmsg = sb.str(); - return intrusive_ptr<Pipeline>(); + ++i; } - - /* - If we get here, we've harvested the fields we expect for a pipeline. - - Set up the specified document source pipeline. - */ - SourceContainer& sources = pPipeline->sources; // shorthand - - /* iterate over the steps in the pipeline */ - const size_t nSteps = pipeline.size(); - for (size_t iStep = 0; iStep < nSteps; ++iStep) { - /* pull out the pipeline element as an object */ - BSONElement pipeElement(pipeline[iStep]); - uassert(15942, - str::stream() << "pipeline element " << iStep << " is not an object", - pipeElement.type() == Object); - - vector<intrusive_ptr<DocumentSource>> stepSources = - DocumentSource::parse(pCtx, pipeElement.Obj()); - - // Iterate over the steps in stepSource. stepSource may have more than one step if the - // current step is a DocumentSource alias. - const size_t nStepSources = stepSources.size(); - for (size_t iStepSource = 0; iStepSource < nStepSources; ++iStepSource) { - sources.push_back(stepSources[iStepSource]); - - if (sources.back()->isValidInitialSource()) { - uassert(28837, - str::stream() << sources.back()->getSourceName() - << " is only valid as the first stage in a pipeline.", - iStep == 0 && iStepSource == 0); - } - - if (dynamic_cast<DocumentSourceOut*>(sources.back().get())) { - uassert(16991, - "$out can only be the final stage in the pipeline", - iStep == nSteps - 1 && iStepSource == nStepSources - 1); - - uassert(ErrorCodes::InvalidOptions, - "$out can only be used with the 'local' read concern level", - readConcernArgs.getLevel() == repl::ReadConcernLevel::kLocalReadConcern); - } - } - } - - pPipeline->optimizePipeline(); - - return pPipeline; + return Status::OK(); } void Pipeline::optimizePipeline() { SourceContainer optimizedSources; - SourceContainer::iterator itr = sources.begin(); + SourceContainer::iterator itr = _sources.begin(); - while (itr != sources.end() && std::next(itr) != sources.end()) { + while (itr != _sources.end() && std::next(itr) != _sources.end()) { invariant((*itr).get()); - itr = (*itr).get()->optimizeAt(itr, &sources); + itr = (*itr).get()->optimizeAt(itr, &_sources); } // Once we have reached our final number of stages, optimize each individually. - for (auto&& source : sources) { + for (auto&& source : _sources) { if (auto out = source->optimize()) { optimizedSources.push_back(out); } } - sources.swap(optimizedSources); + _sources.swap(optimizedSources); + stitch(); } Status Pipeline::checkAuthForCommand(ClientBasic* client, @@ -316,7 +195,7 @@ bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) { void Pipeline::detachFromOperationContext() { pCtx->opCtx = nullptr; - for (auto&& source : sources) { + for (auto&& source : _sources) { source->detachFromOperationContext(); } } @@ -325,7 +204,7 @@ void Pipeline::reattachToOperationContext(OperationContext* opCtx) { invariant(pCtx->opCtx == nullptr); pCtx->opCtx = opCtx; - for (auto&& source : sources) { + for (auto&& source : _sources) { source->reattachToOperationContext(opCtx); } } @@ -342,7 +221,6 @@ intrusive_ptr<Pipeline> Pipeline::splitForSharded() { // shards and all work being done in the merger. Optimizations can move operations between // the pipelines to be more efficient. intrusive_ptr<Pipeline> shardPipeline(new Pipeline(pCtx)); - shardPipeline->explain = explain; // The order in which optimizations are applied can have significant impact on the // efficiency of the final pipeline. Be Careful! @@ -354,25 +232,25 @@ intrusive_ptr<Pipeline> Pipeline::splitForSharded() { } void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { - while (!mergePipe->sources.empty()) { - intrusive_ptr<DocumentSource> current = mergePipe->sources.front(); - mergePipe->sources.pop_front(); + while (!mergePipe->_sources.empty()) { + intrusive_ptr<DocumentSource> current = mergePipe->_sources.front(); + mergePipe->_sources.pop_front(); // Check if this source is splittable SplittableDocumentSource* splittable = dynamic_cast<SplittableDocumentSource*>(current.get()); if (!splittable) { - // move the source from the merger sources to the shard sources - shardPipe->sources.push_back(current); + // move the source from the merger _sources to the shard _sources + shardPipe->_sources.push_back(current); } else { - // split this source into Merge and Shard sources + // split this source into Merge and Shard _sources intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource(); if (shardSource) - shardPipe->sources.push_back(shardSource); + shardPipe->_sources.push_back(shardSource); if (mergeSource) - mergePipe->sources.push_front(mergeSource); + mergePipe->_sources.push_front(mergeSource); break; } @@ -381,10 +259,10 @@ void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipel void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe) { - while (!shardPipe->sources.empty() && - dynamic_cast<DocumentSourceUnwind*>(shardPipe->sources.back().get())) { - mergePipe->sources.push_front(shardPipe->sources.back()); - shardPipe->sources.pop_back(); + while (!shardPipe->_sources.empty() && + dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) { + mergePipe->_sources.push_front(shardPipe->_sources.back()); + shardPipe->_sources.pop_back(); } } @@ -411,27 +289,27 @@ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipelin // objects even though only a subset of fields are needed. // 2) Optimization IS NOT applied immediately following a $project or $group since it would // add an unnecessary project (and therefore a deep-copy). - for (auto&& source : shardPipe->sources) { + for (auto&& source : shardPipe->_sources) { DepsTracker dt; if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) return; } // if we get here, add the project. - shardPipe->sources.push_back(DocumentSourceProject::createFromBson( + shardPipe->_sources.push_back(DocumentSourceProject::createFromBson( BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx)); } BSONObj Pipeline::getInitialQuery() const { - if (sources.empty()) + if (_sources.empty()) return BSONObj(); /* look for an initial $match */ - DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources.front().get()); + DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(_sources.front().get()); if (match) { return match->getQuery(); } - DocumentSourceGeoNear* geoNear = dynamic_cast<DocumentSourceGeoNear*>(sources.front().get()); + DocumentSourceGeoNear* geoNear = dynamic_cast<DocumentSourceGeoNear*>(_sources.front().get()); if (geoNear) { return geoNear->getQuery(); } @@ -440,7 +318,7 @@ BSONObj Pipeline::getInitialQuery() const { } bool Pipeline::needsPrimaryShardMerger() const { - for (auto&& source : sources) { + for (auto&& source : _sources) { if (source->needsPrimaryShard()) { return true; } @@ -450,52 +328,28 @@ bool Pipeline::needsPrimaryShardMerger() const { std::vector<NamespaceString> Pipeline::getInvolvedCollections() const { std::vector<NamespaceString> collections; - for (auto&& source : sources) { + for (auto&& source : _sources) { source->addInvolvedCollections(&collections); } return collections; } -Document Pipeline::serialize() const { - MutableDocument serialized; - // create an array out of the pipeline operations - vector<Value> array; - for (SourceContainer::const_iterator iter(sources.begin()), listEnd(sources.end()); - iter != listEnd; - ++iter) { - intrusive_ptr<DocumentSource> pSource(*iter); - pSource->serializeToArray(array); - } - - // add the top-level items to the command - serialized.setField(commandName, Value(pCtx->ns.coll())); - serialized.setField(pipelineName, Value(array)); - - if (explain) { - serialized.setField(explainName, Value(explain)); +vector<Value> Pipeline::serialize() const { + vector<Value> serializedSources; + for (auto&& source : _sources) { + source->serializeToArray(serializedSources); } - - if (pCtx->extSortAllowed) { - serialized.setField("allowDiskUse", Value(true)); - } - - if (pCtx->bypassDocumentValidation) { - serialized.setField(bypassDocumentValidationCommandOption(), Value(true)); - } - - if (pCtx->collator.get()) { - serialized.setField(collationName, Value(pCtx->collator->getSpec().toBSON())); - } - - return serialized.freeze(); + return serializedSources; } void Pipeline::stitch() { - massert(16600, "should not have an empty pipeline", !sources.empty()); - - /* chain together the sources we found */ - DocumentSource* prevSource = sources.front().get(); - for (SourceContainer::iterator iter(++sources.begin()), listEnd(sources.end()); iter != listEnd; + if (_sources.empty()) { + return; + } + // Chain together all the stages. + DocumentSource* prevSource = _sources.front().get(); + for (SourceContainer::iterator iter(++_sources.begin()), listEnd(_sources.end()); + iter != listEnd; ++iter) { intrusive_ptr<DocumentSource> pTemp(*iter); pTemp->setSource(prevSource); @@ -504,13 +358,13 @@ void Pipeline::stitch() { } void Pipeline::run(BSONObjBuilder& result) { - // should not get here in the explain case - verify(!explain); + // We should not get here in the explain case. + verify(!pCtx->isExplain); // the array in which the aggregation results reside // cant use subArrayStart() due to error handling BSONArrayBuilder resultArray; - DocumentSource* finalSource = sources.back().get(); + DocumentSource* finalSource = _sources.back().get(); while (boost::optional<Document> next = finalSource->getNext()) { // add the document to the result set BSONObjBuilder documentBuilder(resultArray.subobjStart()); @@ -530,21 +384,21 @@ void Pipeline::run(BSONObjBuilder& result) { vector<Value> Pipeline::writeExplainOps() const { vector<Value> array; - for (SourceContainer::const_iterator it = sources.begin(); it != sources.end(); ++it) { + for (SourceContainer::const_iterator it = _sources.begin(); it != _sources.end(); ++it) { (*it)->serializeToArray(array, /*explain=*/true); } return array; } void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) { - sources.push_front(source); + _sources.push_front(source); } DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const { DepsTracker deps; bool knowAllFields = false; bool knowAllMeta = false; - for (auto&& source : sources) { + for (auto&& source : _sources) { DepsTracker localDeps; DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps); diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index f851edefd2d..84a8333ff8e 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -28,7 +28,8 @@ #pragma once -#include <deque> +#include <list> +#include <vector> #include <boost/intrusive_ptr.hpp> @@ -42,33 +43,30 @@ class BSONObj; class BSONObjBuilder; class ClientBasic; class CollatorInterface; -class Command; struct DepsTracker; class DocumentSource; struct ExpressionContext; class OperationContext; -class Privilege; -/** mongodb "commands" (sent via db.$cmd.findOne(...)) - subclass to make a command. define a singleton object for it. - */ +/** + * A Pipeline object represents a list of DocumentSources and is responsible for optimizing the + * pipeline. + */ class Pipeline : public IntrusiveCounterUnsigned { public: typedef std::list<boost::intrusive_ptr<DocumentSource>> SourceContainer; /** - * Create a pipeline from the command. - * - * @param errmsg where to write errors, if there are any - * @param cmdObj the command object sent from the client - * @returns the pipeline, if created, otherwise a NULL reference + * Parses a Pipeline from a BSONElement representing a list of DocumentSources. Returns a non-OK + * status if it failed to parse. */ - static boost::intrusive_ptr<Pipeline> parseCommand( - std::string& errmsg, - const BSONObj& cmdObj, - const boost::intrusive_ptr<ExpressionContext>& pCtx); + static StatusWith<boost::intrusive_ptr<Pipeline>> parse( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx); - // Helper to implement Command::checkAuthForCommand + /** + * Helper to implement Command::checkAuthForCommand. + */ static Status checkAuthForCommand(ClientBasic* client, const std::string& dbname, const BSONObj& cmdObj); @@ -86,7 +84,7 @@ public: * Sets the OperationContext of 'pCtx' to nullptr. * * The PipelineProxyStage is responsible for detaching the OperationContext and releasing any - * storage-engine state of the DocumentSourceCursor that may be present in 'sources'. + * storage-engine state of the DocumentSourceCursor that may be present in '_sources'. */ void detachFromOperationContext(); @@ -94,7 +92,7 @@ public: * Sets the OperationContext of 'pCtx' to 'opCtx'. * * The PipelineProxyStage is responsible for reattaching the OperationContext and reacquiring - * any storage-engine state of the DocumentSourceCursor that may be present in 'sources'. + * any storage-engine state of the DocumentSourceCursor that may be present in '_sources'. */ void reattachToOperationContext(OperationContext* opCtx); @@ -140,22 +138,9 @@ public: std::vector<NamespaceString> getInvolvedCollections() const; /** - Write the Pipeline as a BSONObj command. This should be the - inverse of parseCommand(). - - This is only intended to be used by the shard command obtained - from splitForSharded(). Some pipeline operations in the merge - process do not have equivalent command forms, and using this on - the mongos Pipeline will cause assertions. - - @param the builder to write the command to - */ - Document serialize() const; - - /** Stitch together the source pointers (by calling setSource) for each source in sources. - * Must be called after optimize and addInitialSource but before trying to get results. + * Serializes the pipeline into a form that can be parsed into an equivalent pipeline. */ - void stitch(); + std::vector<Value> serialize() const; /** Run the Pipeline on the given source. @@ -164,17 +149,13 @@ public: */ void run(BSONObjBuilder& result); - bool isExplain() const { - return explain; - } - /// The initial source is special since it varies between mongos and mongod. void addInitialSource(boost::intrusive_ptr<DocumentSource> source); /// The source that represents the output. Returns a non-owning pointer. DocumentSource* output() { - invariant(!sources.empty()); - return sources.back().get(); + invariant(!_sources.empty()); + return _sources.back().get(); } /** @@ -191,10 +172,6 @@ public: */ DepsTracker getDependencies(const BSONObj& initialQuery) const; - /** - The aggregation command name. - */ - static const char commandName[]; /* PipelineD is a "sister" class that has additional functionality @@ -220,17 +197,22 @@ private: friend class Optimizations::Local; friend class Optimizations::Sharded; - static const char pipelineName[]; - static const char collationName[]; - static const char explainName[]; - static const char fromRouterName[]; - static const char serverPipelineName[]; - static const char mongosPipelineName[]; - Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx); - SourceContainer sources; - bool explain; + /** + * Stitch together the source pointers by calling setSource() for each source in '_sources'. + * This function must be called any time the order of stages within the pipeline changes, e.g. + * in optimizePipeline(). + */ + void stitch(); + + /** + * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage + * is present but is not the last stage in the pipeline. + */ + Status ensureAllStagesAreInLegalPositions() const; + + SourceContainer _sources; boost::intrusive_ptr<ExpressionContext> pCtx; }; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index d88e7932f73..c26333245bd 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -254,7 +254,7 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource( const intrusive_ptr<Pipeline>& pPipeline, const intrusive_ptr<ExpressionContext>& pExpCtx) { // We will be modifying the source vector as we go. - Pipeline::SourceContainer& sources = pPipeline->sources; + Pipeline::SourceContainer& sources = pPipeline->_sources; // Inject a MongodImplementation to sources that need them. for (auto&& source : sources) { @@ -422,11 +422,11 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor( } // We know the sort is being handled by the query system, so remove the $sort stage. - pipeline->sources.pop_front(); + pipeline->_sources.pop_front(); if (sortStage->getLimitSrc()) { // We need to reinsert the coalesced $limit after removing the $sort. - pipeline->sources.push_front(sortStage->getLimitSrc()); + pipeline->_sources.push_front(sortStage->getLimitSrc()); } return exec; } @@ -501,7 +501,7 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline std::string PipelineD::getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& pPipeline) { if (auto docSourceCursor = - dynamic_cast<DocumentSourceCursor*>(pPipeline->sources.front().get())) { + dynamic_cast<DocumentSourceCursor*>(pPipeline->_sources.front().get())) { return docSourceCursor->getPlanSummaryStr(); } @@ -513,12 +513,12 @@ void PipelineD::getPlanSummaryStats(const boost::intrusive_ptr<Pipeline>& pPipel invariant(statsOut); if (auto docSourceCursor = - dynamic_cast<DocumentSourceCursor*>(pPipeline->sources.front().get())) { + dynamic_cast<DocumentSourceCursor*>(pPipeline->_sources.front().get())) { *statsOut = docSourceCursor->getPlanSummaryStats(); } bool hasSortStage{false}; - for (auto&& source : pPipeline->sources) { + for (auto&& source : pPipeline->_sources) { if (dynamic_cast<DocumentSourceSort*>(source.get())) { hasSortStage = true; break; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index ca31b0ab4e6..1bda01e3968 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -28,6 +28,10 @@ #include "mongo/platform/basic.h" +#include <boost/intrusive_ptr.hpp> +#include <string> +#include <vector> + #include "mongo/db/operation_context_noop.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression_context.h" @@ -47,6 +51,7 @@ namespace PipelineTests { using boost::intrusive_ptr; using std::string; +using std::vector; namespace Optimizations { using namespace mongo; @@ -75,16 +80,18 @@ public: const BSONObj outputPipeExpected = pipelineFromJsonArray(outputPipeJson()); const BSONObj serializePipeExpected = pipelineFromJsonArray(serializedPipeJson()); - intrusive_ptr<ExpressionContext> ctx = - new ExpressionContext(&_opCtx, NamespaceString("a.collection")); - string errmsg; - intrusive_ptr<Pipeline> outputPipe = Pipeline::parseCommand(errmsg, inputBson, ctx); - ASSERT_EQUALS(errmsg, ""); - ASSERT(outputPipe != NULL); + ASSERT_EQUALS(inputBson["pipeline"].type(), BSONType::Array); + vector<BSONObj> rawPipeline; + for (auto&& stageElem : inputBson["pipeline"].Array()) { + ASSERT_EQUALS(stageElem.type(), BSONType::Object); + rawPipeline.push_back(stageElem.embeddedObject()); + } + AggregationRequest request(NamespaceString("a.collection"), rawPipeline); + intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(&_opCtx, request); + auto outputPipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx)); ASSERT_EQUALS(Value(outputPipe->writeExplainOps()), Value(outputPipeExpected["pipeline"])); - ASSERT_EQUALS(outputPipe->serialize()["pipeline"], - Value(serializePipeExpected["pipeline"])); + ASSERT_EQUALS(Value(outputPipe->serialize()), Value(serializePipeExpected["pipeline"])); } virtual ~Base() {} @@ -719,15 +726,18 @@ public: const BSONObj shardPipeExpected = pipelineFromJsonArray(shardPipeJson()); const BSONObj mergePipeExpected = pipelineFromJsonArray(mergePipeJson()); - intrusive_ptr<ExpressionContext> ctx = - new ExpressionContext(&_opCtx, NamespaceString("a.collection")); - string errmsg; - mergePipe = Pipeline::parseCommand(errmsg, inputBson, ctx); - ASSERT_EQUALS(errmsg, ""); - ASSERT(mergePipe != NULL); + ASSERT_EQUALS(inputBson["pipeline"].type(), BSONType::Array); + vector<BSONObj> rawPipeline; + for (auto&& stageElem : inputBson["pipeline"].Array()) { + ASSERT_EQUALS(stageElem.type(), BSONType::Object); + rawPipeline.push_back(stageElem.embeddedObject()); + } + AggregationRequest request(NamespaceString("a.collection"), rawPipeline); + intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(&_opCtx, request); + mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx)); shardPipe = mergePipe->splitForSharded(); - ASSERT(shardPipe != NULL); + ASSERT(shardPipe != nullptr); ASSERT_EQUALS(Value(shardPipe->writeExplainOps()), Value(shardPipeExpected["pipeline"])); ASSERT_EQUALS(Value(mergePipe->writeExplainOps()), Value(mergePipeExpected["pipeline"])); @@ -1063,58 +1073,41 @@ class LookUp : public needsPrimaryShardMergerBase { namespace { TEST(PipelineInitialSource, GeoNearInitialQuery) { - const BSONObj inputBson = - fromjson("{pipeline: [{$geoNear: {distanceField: 'd', near: [0, 0], query: {a: 1}}}]}"); - OperationContextNoop _opCtx; - intrusive_ptr<ExpressionContext> ctx = - new ExpressionContext(&_opCtx, NamespaceString("a.collection")); - string errmsg; - intrusive_ptr<Pipeline> pipe = Pipeline::parseCommand(errmsg, inputBson, ctx); + const std::vector<BSONObj> rawPipeline = { + fromjson("{$geoNear: {distanceField: 'd', near: [0, 0], query: {a: 1}}}")}; + intrusive_ptr<ExpressionContext> ctx = new ExpressionContext( + &_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline)); + auto pipe = uassertStatusOK(Pipeline::parse(rawPipeline, ctx)); ASSERT_EQ(pipe->getInitialQuery(), BSON("a" << 1)); } TEST(PipelineInitialSource, MatchInitialQuery) { - const BSONObj inputBson = fromjson("{pipeline: [{$match: {'a': 4}}]}"); - OperationContextNoop _opCtx; - intrusive_ptr<ExpressionContext> ctx = - new ExpressionContext(&_opCtx, NamespaceString("a.collection")); - string errmsg; - intrusive_ptr<Pipeline> pipe = Pipeline::parseCommand(errmsg, inputBson, ctx); - ASSERT_EQ(pipe->getInitialQuery(), BSON("a" << 4)); -} + const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {'a': 4}}")}; + intrusive_ptr<ExpressionContext> ctx = new ExpressionContext( + &_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline)); -TEST(PipelineInitialSource, CollationNotAnObjectFailsToParse) { - QueryTestServiceContext serviceContext; - auto txn = serviceContext.makeOperationContext(); - - const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], collation: 1}"); - - intrusive_ptr<ExpressionContext> ctx = - new ExpressionContext(txn.get(), NamespaceString("a.collection")); - string errmsg; - ASSERT_THROWS(Pipeline::parseCommand(errmsg, inputBson, ctx), UserException); + auto pipe = uassertStatusOK(Pipeline::parse(rawPipeline, ctx)); + ASSERT_EQ(pipe->getInitialQuery(), BSON("a" << 4)); } TEST(PipelineInitialSource, ParseCollation) { QueryTestServiceContext serviceContext; - auto txn = serviceContext.makeOperationContext(); + auto opCtx = serviceContext.makeOperationContext(); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], collation: {locale: 'reverse'}}"); + auto request = AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson); + ASSERT_OK(request.getStatus()); - intrusive_ptr<ExpressionContext> ctx = - new ExpressionContext(txn.get(), NamespaceString("a.collection")); - string errmsg; - intrusive_ptr<Pipeline> pipe = Pipeline::parseCommand(errmsg, inputBson, ctx); + intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(opCtx.get(), request.getValue()); ASSERT(ctx->collator.get()); CollatorInterfaceMock collator(CollatorInterfaceMock::MockType::kReverseString); ASSERT_TRUE(CollatorInterface::collatorsMatch(ctx->collator.get(), &collator)); } } - class All : public Suite { public: All() : Suite("PipelineOptimizations") {} diff --git a/src/mongo/db/pipeline/value.cpp b/src/mongo/db/pipeline/value.cpp index 37b3b88afae..4334af8260c 100644 --- a/src/mongo/db/pipeline/value.cpp +++ b/src/mongo/db/pipeline/value.cpp @@ -241,6 +241,15 @@ Value::Value(const BSONArray& arr) : _storage(Array) { _storage.putVector(vec.get()); } +Value::Value(const vector<BSONObj>& arr) : _storage(Array) { + intrusive_ptr<RCVector> vec(new RCVector); + vec->vec.reserve(arr.size()); + for (auto&& obj : arr) { + vec->vec.push_back(Value(obj)); + } + _storage.putVector(vec.get()); +} + Value Value::createIntOrLong(long long longValue) { int intValue = longValue; if (intValue != longValue) { diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h index 5ac5c61180b..bda0723d471 100644 --- a/src/mongo/db/pipeline/value.h +++ b/src/mongo/db/pipeline/value.h @@ -80,6 +80,7 @@ public: explicit Value(const Document& doc) : _storage(Object, doc) {} explicit Value(const BSONObj& obj); explicit Value(const BSONArray& arr); + explicit Value(const std::vector<BSONObj>& arr); explicit Value(std::vector<Value> vec) : _storage(Array, new RCVector(std::move(vec))) {} explicit Value(const BSONBinData& bd) : _storage(BinData, bd) {} explicit Value(const BSONRegEx& re) : _storage(RegEx, re) {} diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index 5206be6974f..4118fa78c07 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -84,7 +84,7 @@ using mongo::DocumentSourceCursor; class Base : public CollectionBase { public: - Base() : _ctx(new ExpressionContext(&_opCtx, nss)) { + Base() : _ctx(new ExpressionContext(&_opCtx, AggregationRequest(nss, {}))) { _ctx->tempDir = storageGlobalParams.dbpath + "/_tmp"; } diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index b06793e3b3f..845456ce7d3 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -279,13 +279,12 @@ public: std::shared_ptr<PlanExecutor> innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10)); // Create the aggregation pipeline. - boost::intrusive_ptr<ExpressionContext> expCtx = - new ExpressionContext(&_txn, NamespaceString(nss.ns())); + std::vector<BSONObj> rawPipeline = {fromjson("{$match: {a: {$gte: 7, $lte: 10}}}")}; + boost::intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext( + &_txn, AggregationRequest(NamespaceString(nss.ns()), rawPipeline)); - string errmsg; - BSONObj inputBson = fromjson("{$match: {a: {$gte: 7, $lte: 10}}}"); - boost::intrusive_ptr<Pipeline> pipeline = Pipeline::parseCommand(errmsg, inputBson, expCtx); - ASSERT_EQUALS(errmsg, ""); + auto statusWithPipeline = Pipeline::parse(rawPipeline, expCtx); + auto pipeline = assertGet(statusWithPipeline); // Create the output PlanExecutor that pulls results from the pipeline. auto ws = make_unique<WorkingSet>(); diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 8fb7a896f89..46cf44b9570 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -39,6 +39,7 @@ #include "mongo/base/status.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/pipeline.h" @@ -72,7 +73,7 @@ namespace { */ class PipelineCommand : public Command { public: - PipelineCommand() : Command(Pipeline::commandName, false) {} + PipelineCommand() : Command(AggregationRequest::kCommandName, false) {} virtual bool slaveOk() const { return true; @@ -117,19 +118,22 @@ public: return aggPassthrough(txn, conf, cmdObj, result, options); } - intrusive_ptr<ExpressionContext> mergeCtx = - new ExpressionContext(txn, NamespaceString(fullns)); + auto request = AggregationRequest::parseFromBSON(NamespaceString(fullns), cmdObj); + if (!request.isOK()) { + return appendCommandStatus(result, request.getStatus()); + } + + intrusive_ptr<ExpressionContext> mergeCtx = new ExpressionContext(txn, request.getValue()); mergeCtx->inRouter = true; // explicitly *not* setting mergeCtx->tempDir - // Parse the pipeline specification - intrusive_ptr<Pipeline> pipeline(Pipeline::parseCommand(errmsg, cmdObj, mergeCtx)); - if (!pipeline.get()) { - // There was some parsing error - return false; + // Parse the pipeline specification. + auto pipeline = Pipeline::parse(request.getValue().getPipeline(), mergeCtx); + if (!pipeline.isOK()) { + return appendCommandStatus(result, pipeline.getStatus()); } - for (auto&& ns : pipeline->getInvolvedCollections()) { + for (auto&& ns : pipeline.getValue()->getInvolvedCollections()) { uassert( 28769, str::stream() << ns.ns() << " cannot be sharded", !conf->isSharded(ns.ns())); } @@ -140,30 +144,34 @@ public: // If the first $match stage is an exact match on the shard key, we only have to send it // to one shard, so send the command to that shard. - BSONObj firstMatchQuery = pipeline->getInitialQuery(); + BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery(); ChunkManagerPtr chunkMgr = conf->getChunkManager(txn, fullns); BSONObj shardKeyMatches = uassertStatusOK( chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(txn, firstMatchQuery)); // Don't need to split pipeline if the first $match is an exact match on shard key, unless // there is a stage that needs to be run on the primary shard. - const bool needPrimaryShardMerger = pipeline->needsPrimaryShardMerger(); + const bool needPrimaryShardMerger = pipeline.getValue()->needsPrimaryShardMerger(); const bool needSplit = shardKeyMatches.isEmpty() || needPrimaryShardMerger; // Split the pipeline into pieces for mongod(s) and this mongos. If needSplit is true, // 'pipeline' will become the merger side. - intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline->splitForSharded() : pipeline); + intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline.getValue()->splitForSharded() + : pipeline.getValue()); - // Create the command for the shards. The 'fromRouter' field means produce output to - // be merged. - MutableDocument commandBuilder(shardPipeline->serialize()); + // Create the command for the shards. The 'fromRouter' field means produce output to be + // merged. + MutableDocument commandBuilder(request.getValue().serializeToCommandObj()); + commandBuilder[AggregationRequest::kPipelineName] = Value(shardPipeline->serialize()); if (needSplit) { - commandBuilder.setField("fromRouter", Value(true)); - commandBuilder.setField("cursor", Value(DOC("batchSize" << 0))); + commandBuilder[AggregationRequest::kFromRouterName] = Value(true); + commandBuilder["cursor"] = Value(DOC("batchSize" << 0)); } else { - commandBuilder.setField("cursor", Value(cmdObj["cursor"])); + commandBuilder["cursor"] = Value(cmdObj["cursor"]); } + // These fields are not part of the AggregationRequest since they are not handled by the + // aggregation subsystem, so we serialize them separately. const std::initializer_list<StringData> fieldsToPropagateToShards = { "$queryOptions", "readConcern", QueryRequest::cmdOptionMaxTimeMS, }; @@ -180,14 +188,14 @@ public: Strategy::commandOp( txn, dbname, shardedCommand, options, fullns, shardQuery, &shardResults); - if (pipeline->isExplain()) { + if (mergeCtx->isExplain) { // This must be checked before we start modifying result. uassertAllShardsSupportExplain(shardResults); if (needSplit) { result << "needsPrimaryShardMerger" << needPrimaryShardMerger << "splitPipeline" << DOC("shardsPart" << shardPipeline->writeExplainOps() << "mergerPart" - << pipeline->writeExplainOps()); + << pipeline.getValue()->writeExplainOps()); } else { result << "splitPipeline" << BSONNULL; } @@ -215,10 +223,11 @@ public: return reply["ok"].trueValue(); } - pipeline->addInitialSource( + pipeline.getValue()->addInitialSource( DocumentSourceMergeCursors::create(parseCursors(shardResults), mergeCtx)); - MutableDocument mergeCmd(pipeline->serialize()); + MutableDocument mergeCmd(request.getValue().serializeToCommandObj()); + mergeCmd["pipeline"] = Value(pipeline.getValue()->serialize()); mergeCmd["cursor"] = Value(cmdObj["cursor"]); if (cmdObj.hasField("$queryOptions")) { @@ -235,7 +244,8 @@ public: // Not propagating readConcern to merger since it doesn't do local reads. string outputNsOrEmpty; - if (DocumentSourceOut* out = dynamic_cast<DocumentSourceOut*>(pipeline->output())) { + if (DocumentSourceOut* out = + dynamic_cast<DocumentSourceOut*>(pipeline.getValue()->output())) { outputNsOrEmpty = out->getOutputNs().ns(); } |