summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2016-06-23 17:47:13 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2016-06-24 11:51:20 -0400
commit20e9b2798d69fb2367ff9f16a1b30f4f9b73d93b (patch)
tree19acf1fff91817744a202958808800544b783486
parent5bdf5d6b8995637193a37d04a0b816b71e47b9fb (diff)
downloadmongo-20e9b2798d69fb2367ff9f16a1b30f4f9b73d93b.tar.gz
SERVER-24638 Move command processing from Pipeline to AggregationRequest
-rw-r--r--jstests/aggregation/bugs/server7781.js7
-rw-r--r--jstests/aggregation/bugs/server9444.js7
-rw-r--r--jstests/noPassthrough/read_majority.js4
-rw-r--r--src/mongo/db/SConscript3
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp103
-rw-r--r--src/mongo/db/pipeline/SConscript52
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp154
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h157
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp197
-rw-r--r--src/mongo/db/pipeline/document_source.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_test.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp59
-rw-r--r--src/mongo/db/pipeline/expression_context.h27
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp292
-rw-r--r--src/mongo/db/pipeline/pipeline.h86
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp12
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp85
-rw-r--r--src/mongo/db/pipeline/value.cpp9
-rw-r--r--src/mongo/db/pipeline/value.h1
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp2
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp11
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp56
23 files changed, 918 insertions, 415 deletions
diff --git a/jstests/aggregation/bugs/server7781.js b/jstests/aggregation/bugs/server7781.js
index e8684dd813f..678a224630a 100644
--- a/jstests/aggregation/bugs/server7781.js
+++ b/jstests/aggregation/bugs/server7781.js
@@ -11,10 +11,9 @@
db[coll].insert({loc: [0, 0]});
// $geoNear is only allowed as the first stage in a pipeline, nowhere else.
- assertErrorCode(
- db[coll],
- [{$match: {x: 1}}, {$geoNear: {near: [1, 1], spherical: true, distanceField: 'dis'}}],
- 28837);
+ assert.throws(
+ () => db[coll].aggregate(
+ [{$match: {x: 1}}, {$geoNear: {near: [1, 1], spherical: true, distanceField: 'dis'}}]));
function checkOutput(cmdOut, aggOut, expectedNum) {
assert.commandWorked(cmdOut, "geoNear command");
diff --git a/jstests/aggregation/bugs/server9444.js b/jstests/aggregation/bugs/server9444.js
index f3dc2748b0a..b2f027d314c 100644
--- a/jstests/aggregation/bugs/server9444.js
+++ b/jstests/aggregation/bugs/server9444.js
@@ -29,17 +29,16 @@
assert.eq(res.code, outOfMemoryCode);
// ensure allowDiskUse: false does what it says
- var res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: false});
+ res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: false});
assert.commandFailed(res);
assert.eq(res.code, outOfMemoryCode);
// allowDiskUse only supports bool. In particular, numbers aren't allowed.
- var res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: 1});
+ res = t.runCommand('aggregate', {pipeline: pipeline, allowDiskUse: 1});
assert.commandFailed(res);
- assert.eq(res.code, 16949);
// ensure we work when allowDiskUse === true
- var res = t.aggregate(pipeline, {allowDiskUse: true});
+ res = t.aggregate(pipeline, {allowDiskUse: true});
assert.eq(res.itcount(), t.count()); // all tests output one doc per input doc
}
diff --git a/jstests/noPassthrough/read_majority.js b/jstests/noPassthrough/read_majority.js
index c2b20df292a..828b7fba4a5 100644
--- a/jstests/noPassthrough/read_majority.js
+++ b/jstests/noPassthrough/read_majority.js
@@ -41,8 +41,8 @@ load("jstests/libs/analyze_plan.js");
}
function getReadMajorityAggCursor() {
- var res =
- t.runCommand('aggregate', {cursor: {batchSize: 2}, readConcern: {level: "majority"}});
+ var res = t.runCommand(
+ 'aggregate', {pipeline: [], cursor: {batchSize: 2}, readConcern: {level: "majority"}});
assert.commandWorked(res);
return new DBCommandCursor(db.getMongo(), res, 2);
}
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 83ca9eb0139..9b722fb3b62 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -726,8 +726,7 @@ serveronlyLibdeps = [
"matcher/expressions_mongod_only",
"ops/update_driver",
"ops/write_ops_parsers",
- "pipeline/document_source",
- "pipeline/pipeline",
+ "pipeline/aggregation",
"query/query",
"range_deleter",
"repl/bgsync",
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index b5c49095b9f..7d17775d507 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/exec/pipeline_proxy.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
@@ -68,16 +69,18 @@ using std::stringstream;
using std::unique_ptr;
using stdx::make_unique;
+namespace {
+
/**
* Returns true if we need to keep a ClientCursor saved for this pipeline (for future getMore
* requests). Otherwise, returns false.
*/
-static bool handleCursorCommand(OperationContext* txn,
- const string& ns,
- ClientCursorPin* pin,
- PlanExecutor* exec,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
+bool handleCursorCommand(OperationContext* txn,
+ const string& ns,
+ ClientCursorPin* pin,
+ PlanExecutor* exec,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
ClientCursor* cursor = pin ? pin->c() : NULL;
if (pin) {
invariant(cursor);
@@ -156,10 +159,42 @@ static bool handleCursorCommand(OperationContext* txn,
return static_cast<bool>(cursor);
}
+/**
+ * Round trips the pipeline through serialization by calling serialize(), then Pipeline::parse().
+ * fasserts if it fails to parse after being serialized.
+ */
+boost::intrusive_ptr<Pipeline> reparsePipeline(
+ const boost::intrusive_ptr<Pipeline>& pipeline,
+ const AggregationRequest& request,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ auto serialized = pipeline->serialize();
+
+ // Convert vector<Value> to vector<BSONObj>.
+ std::vector<BSONObj> parseableSerialization;
+ parseableSerialization.reserve(serialized.size());
+ for (auto&& serializedStage : serialized) {
+ invariant(serializedStage.getType() == BSONType::Object);
+ parseableSerialization.push_back(serializedStage.getDocument().toBson());
+ }
+
+ auto reparsedPipeline = Pipeline::parse(parseableSerialization, expCtx);
+ if (!reparsedPipeline.isOK()) {
+ error() << "Aggregation command did not round trip through parsing and serialization "
+ "correctly. Input pipeline: "
+ << Value(request.getPipeline()).toString()
+ << ", serialized pipeline: " << Value(serialized).toString();
+ fassertFailedWithStatusNoTrace(40175, reparsedPipeline.getStatus());
+ }
+
+ return reparsedPipeline.getValue();
+}
+
+} // namespace
class PipelineCommand : public Command {
public:
- PipelineCommand() : Command(Pipeline::commandName) {} // command is called "aggregate"
+ PipelineCommand()
+ : Command(AggregationRequest::kCommandName) {} // command is called "aggregate"
// Locks are managed manually, in particular by DocumentSourceCursor.
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
@@ -210,24 +245,29 @@ public:
}
NamespaceString nss(ns);
- intrusive_ptr<ExpressionContext> pCtx = new ExpressionContext(txn, nss);
- pCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
+ // Parse the options for this request.
+ auto request = AggregationRequest::parseFromBSON(nss, cmdObj);
+ if (!request.isOK()) {
+ return appendCommandStatus(result, request.getStatus());
+ }
- /* try to parse the command; if this fails, then we didn't run */
- intrusive_ptr<Pipeline> pPipeline = Pipeline::parseCommand(errmsg, cmdObj, pCtx);
- if (!pPipeline.get())
- return false;
+ // Set up the ExpressionContext.
+ intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(txn, request.getValue());
+ expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
- // This is outside of the if block to keep the object alive until the pipeline is finished.
- BSONObj parsed;
- if (kDebugBuild && !pPipeline->isExplain() && !pCtx->inShard) {
- // Make sure all operations round-trip through Pipeline::toBson() correctly by
- // reparsing every command in debug builds. This is important because sharded
- // aggregations rely on this ability. Skipping when inShard because this has
- // already been through the transformation (and this unsets pCtx->inShard).
- parsed = pPipeline->serialize().toBson();
- pPipeline = Pipeline::parseCommand(errmsg, parsed, pCtx);
- verify(pPipeline);
+ // Parse the pipeline.
+ auto statusWithPipeline = Pipeline::parse(request.getValue().getPipeline(), expCtx);
+ if (!statusWithPipeline.isOK()) {
+ return appendCommandStatus(result, statusWithPipeline.getStatus());
+ }
+ auto pipeline = std::move(statusWithPipeline.getValue());
+
+ if (kDebugBuild && !expCtx->isExplain && !expCtx->inShard) {
+ // Make sure all operations round-trip through Pipeline::serialize() correctly by
+ // re-parsing every command in debug builds. This is important because sharded
+ // aggregations rely on this ability. Skipping when inShard because this has already
+ // been through the transformation (and this un-sets expCtx->inShard).
+ pipeline = reparsePipeline(pipeline, request.getValue(), expCtx);
}
unique_ptr<ClientCursorPin> pin; // either this OR the exec will be non-null
@@ -246,22 +286,21 @@ public:
// If the pipeline does not have a user-specified collation, set it from the
// collection default.
- if (pPipeline->getContext()->collation.isEmpty() && collection &&
+ if (request.getValue().getCollation().isEmpty() && collection &&
collection->getDefaultCollator()) {
- pPipeline->setCollator(collection->getDefaultCollator()->clone());
+ pipeline->setCollator(collection->getDefaultCollator()->clone());
}
// This does mongod-specific stuff like creating the input PlanExecutor and adding
// it to the front of the pipeline if needed.
std::shared_ptr<PlanExecutor> input =
- PipelineD::prepareCursorSource(txn, collection, nss, pPipeline, pCtx);
- pPipeline->stitch();
+ PipelineD::prepareCursorSource(txn, collection, nss, pipeline, expCtx);
// Create the PlanExecutor which returns results from the pipeline. The WorkingSet
// ('ws') and the PipelineProxyStage ('proxy') will be owned by the created
// PlanExecutor.
auto ws = make_unique<WorkingSet>();
- auto proxy = make_unique<PipelineProxyStage>(txn, pPipeline, input, ws.get());
+ auto proxy = make_unique<PipelineProxyStage>(txn, pipeline, input, ws.get());
auto statusWithPlanExecutor = (NULL == collection)
? PlanExecutor::make(
@@ -327,8 +366,8 @@ public:
}
// If both explain and cursor are specified, explain wins.
- if (pPipeline->isExplain()) {
- result << "stages" << Value(pPipeline->writeExplainOps());
+ if (expCtx->isExplain) {
+ result << "stages" << Value(pipeline->writeExplainOps());
} else if (isCursorCommand) {
keepCursor = handleCursorCommand(txn,
nss.ns(),
@@ -337,10 +376,10 @@ public:
cmdObj,
result);
} else {
- pPipeline->run(result);
+ pipeline->run(result);
}
- if (!pPipeline->isExplain()) {
+ if (!expCtx->isExplain) {
PlanSummaryStats stats;
Explain::getSummaryStats(pin ? *pin->c()->getExecutor() : *exec.get(), &stats);
curOp->debug().setPlanSummaryMetrics(stats);
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 1732f2bb376..fb8ac72df4d 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -1,7 +1,20 @@
+# -*- mode: python -*-
Import('env')
env.Library(
+ target='aggregation',
+ source=[
+ ],
+ LIBDEPS=[
+ 'aggregation_request',
+ 'document_source',
+ 'expression_context',
+ 'pipeline',
+ ]
+)
+
+env.Library(
target='field_path',
source=[
'field_path.cpp',
@@ -41,6 +54,42 @@ env.CppUnitTest(
],
)
+env.Library(
+ target='aggregation_request',
+ source=[
+ 'aggregation_request.cpp',
+ ],
+ LIBDEPS=[
+ 'document_value',
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/db/repl/read_concern_args',
+ '$BUILD_DIR/mongo/db/storage/storage_options',
+ ]
+)
+
+env.CppUnitTest(
+ target='aggregation_request_test',
+ source='aggregation_request_test.cpp',
+ LIBDEPS=[
+ 'aggregation_request',
+ ],
+)
+
+env.Library(
+ target='expression_context',
+ source=[
+ 'expression_context.cpp',
+ ],
+ LIBDEPS=[
+ 'aggregation_request',
+ '$BUILD_DIR/mongo/db/query/collation/collator_factory_interface',
+ '$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/util/intrusive_counter',
+ ]
+)
+
env.CppUnitTest(
target='document_source_test',
source='document_source_test.cpp',
@@ -130,6 +179,7 @@ docSourceEnv.Library(
'dependencies',
'document_value',
'expression',
+ 'expression_context',
'$BUILD_DIR/mongo/client/clientdriver',
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
'$BUILD_DIR/mongo/db/matcher/expressions',
@@ -145,7 +195,6 @@ docSourceEnv.Library(
# which is not uniquely defined
'incomplete'
],
-
)
env.Library(
@@ -157,6 +206,7 @@ env.Library(
'dependencies',
'document_source',
'document_value',
+ 'expression_context',
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
'$BUILD_DIR/mongo/db/auth/authcore',
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
new file mode 100644
index 00000000000..365debf849c
--- /dev/null
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_request.h"
+
+#include <algorithm>
+
+#include "mongo/base/error_codes.h"
+#include "mongo/base/status_with.h"
+#include "mongo/base/string_data.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/value.h"
+#include "mongo/db/query/query_request.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/storage/storage_options.h"
+
+namespace mongo {
+
+const StringData AggregationRequest::kCommandName = "aggregate"_sd;
+const StringData AggregationRequest::kFromRouterName = "fromRouter"_sd;
+const StringData AggregationRequest::kPipelineName = "pipeline"_sd;
+const StringData AggregationRequest::kCollationName = "collation"_sd;
+const StringData AggregationRequest::kExplainName = "explain"_sd;
+const StringData AggregationRequest::kAllowDiskUseName = "allowDiskUse"_sd;
+
+AggregationRequest::AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline)
+ : _nss(std::move(nss)), _pipeline(std::move(pipeline)) {}
+
+StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(NamespaceString nss,
+ const BSONObj& cmdObj) {
+ // Parse required parameters.
+ auto pipelineElem = cmdObj[kPipelineName];
+ if (pipelineElem.eoo() || pipelineElem.type() != BSONType::Array) {
+ return {ErrorCodes::TypeMismatch, "'pipeline' option must be specified as an array"};
+ }
+ std::vector<BSONObj> pipeline;
+ for (auto elem : pipelineElem.Obj()) {
+ if (elem.type() != BSONType::Object) {
+ return {ErrorCodes::TypeMismatch,
+ "Each element of the 'pipeline' array must be an object"};
+ }
+ pipeline.push_back(elem.embeddedObject().getOwned());
+ }
+
+ AggregationRequest request(std::move(nss), std::move(pipeline));
+
+ const std::initializer_list<StringData> optionsParsedElseWhere = {
+ QueryRequest::cmdOptionMaxTimeMS,
+ "cursor"_sd,
+ "writeConcern"_sd,
+ kPipelineName,
+ kCommandName,
+ repl::ReadConcernArgs::kReadConcernFieldName};
+
+ // Parse optional parameters.
+ for (auto&& elem : cmdObj) {
+ auto fieldName = elem.fieldNameStringData();
+
+ // Ignore top-level fields prefixed with $. They are for the command processor, not us.
+ if (fieldName[0] == '$') {
+ continue;
+ }
+
+ // Ignore options that are parsed elsewhere.
+ if (std::find(optionsParsedElseWhere.begin(), optionsParsedElseWhere.end(), fieldName) !=
+ optionsParsedElseWhere.end()) {
+ continue;
+ }
+
+ if (kCollationName == fieldName) {
+ if (elem.type() != BSONType::Object) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << kCollationName << " must be an object, not a "
+ << typeName(elem.type())};
+ }
+ request.setCollation(elem.embeddedObject().getOwned());
+ } else if (kExplainName == fieldName) {
+ if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << kExplainName << " must be a boolean, not a "
+ << typeName(elem.type())};
+ }
+ request.setExplain(elem.Bool());
+ } else if (kFromRouterName == fieldName) {
+ if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << kFromRouterName << " must be a boolean, not a "
+ << typeName(elem.type())};
+ }
+ request.setFromRouter(elem.Bool());
+ } else if (kAllowDiskUseName == fieldName) {
+ if (storageGlobalParams.readOnly) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << "The '" << kAllowDiskUseName
+ << "' option is not permitted in read-only mode."};
+ } else if (elem.type() != BSONType::Bool) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << kAllowDiskUseName << " must be a boolean, not a "
+ << typeName(elem.type())};
+ }
+ request.setAllowDiskUse(elem.Bool());
+ } else if (bypassDocumentValidationCommandOption() == fieldName) {
+ request.setBypassDocumentValidation(elem.trueValue());
+ } else {
+ return {ErrorCodes::FailedToParse,
+ str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
+ }
+ }
+ return request;
+}
+
+Document AggregationRequest::serializeToCommandObj() const {
+ MutableDocument serialized;
+ return Document{{kCommandName, _nss.coll()},
+ {kPipelineName, _pipeline},
+ // Only serialize booleans if different than their default.
+ {kExplainName, _explain ? Value(true) : Value()},
+ {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()},
+ {kFromRouterName, _fromRouter ? Value(true) : Value()},
+ {bypassDocumentValidationCommandOption(),
+ _bypassDocumentValidation ? Value(true) : Value()},
+ // Only serialize a collation if one was specified.
+ {kCollationName, _collation.isEmpty() ? Value() : Value(_collation)}};
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
new file mode 100644
index 00000000000..23601869245
--- /dev/null
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -0,0 +1,157 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/bson/bsonelement.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+
+template <typename T>
+class StatusWith;
+class Document;
+
+/**
+ * Represents the user-supplied options to the aggregate command.
+ */
+class AggregationRequest {
+public:
+ static const StringData kCommandName;
+ static const StringData kFromRouterName;
+ static const StringData kPipelineName;
+ static const StringData kCollationName;
+ static const StringData kExplainName;
+ static const StringData kAllowDiskUseName;
+
+ /**
+ * Create a new instance of AggregationRequest by parsing the raw command object. Returns a
+ * non-OK status if a required field was missing, if there was an unrecognized field name or if
+ * there was a bad value for one of the fields.
+ */
+ static StatusWith<AggregationRequest> parseFromBSON(NamespaceString nss, const BSONObj& cmdObj);
+
+ /**
+ * Constructs an AggregationRequest over the given namespace with the given pipeline. All
+ * options aside from the pipeline assume their default values.
+ */
+ AggregationRequest(NamespaceString nss, std::vector<BSONObj> pipeline);
+
+ /**
+ * Serializes the options to a Document. Note that this serialization includes the original
+ * pipeline object, as specified. Callers will likely want to override this field with a
+ * serialization of a parsed and optimized Pipeline object.
+ */
+ Document serializeToCommandObj() const;
+
+ //
+ // Getters.
+ //
+
+ const NamespaceString& getNamespaceString() const {
+ return _nss;
+ }
+
+ /**
+ * An unparsed version of the pipeline. All BSONObjs are owned.
+ */
+ const std::vector<BSONObj>& getPipeline() const {
+ return _pipeline;
+ }
+
+ bool isExplain() const {
+ return _explain;
+ }
+
+ bool isFromRouter() const {
+ return _fromRouter;
+ }
+
+ bool shouldAllowDiskUse() const {
+ return _allowDiskUse;
+ }
+
+ bool shouldBypassDocumentValidation() const {
+ return _bypassDocumentValidation;
+ }
+
+ /**
+ * Returns an empty object if no collation was specified.
+ */
+ BSONObj getCollation() const {
+ return _collation;
+ }
+
+ //
+ // Setters for optional fields.
+ //
+
+ void setCollation(BSONObj collation) {
+ _collation = collation.getOwned();
+ }
+
+ void setExplain(bool isExplain) {
+ _explain = isExplain;
+ }
+
+ void setAllowDiskUse(bool allowDiskUse) {
+ _allowDiskUse = allowDiskUse;
+ }
+
+ void setFromRouter(bool isFromRouter) {
+ _fromRouter = isFromRouter;
+ }
+
+ void setBypassDocumentValidation(bool shouldBypassDocumentValidation) {
+ _bypassDocumentValidation = shouldBypassDocumentValidation;
+ }
+
+private:
+ // Required fields.
+
+ const NamespaceString _nss;
+
+ // An unparsed version of the pipeline.
+ const std::vector<BSONObj> _pipeline;
+
+ // Optional fields.
+
+ // An owned copy of the user-specified collation object, or an empty object if no collation was
+ // specified.
+ BSONObj _collation;
+
+ bool _explain = false;
+ bool _allowDiskUse = false;
+ bool _fromRouter = false;
+ bool _bypassDocumentValidation = false;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
new file mode 100644
index 00000000000..ac11018da85
--- /dev/null
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -0,0 +1,197 @@
+/**
+ * Copyright (C) 2016 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_request.h"
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/json.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/value.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace {
+
+//
+// Parsing
+//
+
+TEST(AggregationRequestTest, ShouldParseAllKnownOptions) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson(
+ "{pipeline: [{$match: {a: 'abc'}}], explain: true, allowDiskUse: true, fromRouter: true, "
+ "bypassDocumentValidation: true, collation: {locale: 'en_US'}}");
+ auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson));
+ ASSERT_TRUE(request.isExplain());
+ ASSERT_TRUE(request.shouldAllowDiskUse());
+ ASSERT_TRUE(request.isFromRouter());
+ ASSERT_TRUE(request.shouldBypassDocumentValidation());
+ ASSERT_EQ(request.getCollation(),
+ BSON("locale"
+ << "en_US"));
+}
+
+//
+// Serialization
+//
+
+TEST(AggregationRequestTest, ShouldOnlySerializeRequiredFieldsIfNoOptionalFieldsAreSpecified) {
+ NamespaceString nss("a.collection");
+ AggregationRequest request(nss, {});
+
+ auto expectedSerialization =
+ Document{{AggregationRequest::kCommandName, nss.coll()},
+ {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}};
+ ASSERT_EQ(request.serializeToCommandObj(), expectedSerialization);
+}
+
+TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefault) {
+ NamespaceString nss("a.collection");
+ AggregationRequest request(nss, {});
+ request.setExplain(false);
+ request.setAllowDiskUse(false);
+ request.setFromRouter(false);
+ request.setBypassDocumentValidation(false);
+ request.setCollation(BSONObj());
+
+ auto expectedSerialization =
+ Document{{AggregationRequest::kCommandName, nss.coll()},
+ {AggregationRequest::kPipelineName, Value(std::vector<Value>{})}};
+ ASSERT_EQ(request.serializeToCommandObj(), expectedSerialization);
+}
+
+TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) {
+ NamespaceString nss("a.collection");
+ AggregationRequest request(nss, {});
+ request.setExplain(true);
+ request.setAllowDiskUse(true);
+ request.setFromRouter(true);
+ request.setBypassDocumentValidation(true);
+ const auto collationObj = BSON("locale"
+ << "en_US");
+ request.setCollation(collationObj);
+
+ auto expectedSerialization =
+ Document{{AggregationRequest::kCommandName, nss.coll()},
+ {AggregationRequest::kPipelineName, Value(std::vector<Value>{})},
+ {AggregationRequest::kExplainName, true},
+ {AggregationRequest::kAllowDiskUseName, true},
+ {AggregationRequest::kFromRouterName, true},
+ {bypassDocumentValidationCommandOption(), true},
+ {AggregationRequest::kCollationName, collationObj}};
+ ASSERT_EQ(request.serializeToCommandObj(), expectedSerialization);
+}
+
+//
+// Error cases.
+//
+
+TEST(AggregationRequestTest, ShouldRejectNonArrayPipeline) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: {}}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectPipelineArrayIfAnElementIsNotAnObject) {
+ NamespaceString nss("a.collection");
+ BSONObj inputBson = fromjson("{pipeline: [4]}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+
+ inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}, 4]}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectNonObjectCollation) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], collation: 1}");
+ ASSERT_NOT_OK(
+ AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectNonBoolExplain) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], explain: 1}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectNonBoolFromRouter) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], fromRouter: 1}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldRejectNonBoolAllowDiskUse) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], allowDiskUse: 1}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+//
+// Ignore fields parsed elsewhere.
+//
+
+TEST(AggregationRequestTest, ShouldIgnoreFieldsPrefixedWithDollar) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], $unknown: 1}");
+ ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldIgnoreCursorOption) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: 1}");
+ ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], writeConcern: 'invalid'}");
+ ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldIgnoreMaxTimeMsOption) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], maxTimeMS: 'invalid'}");
+ ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+TEST(AggregationRequestTest, ShouldIgnoreReadConcernOption) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], readConcern: 'invalid'}");
+ ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 430e3cf1496..5b9b3b39b3b 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -81,7 +81,6 @@ const char* DocumentSource::getSourceName() const {
void DocumentSource::setSource(DocumentSource* pTheSource) {
verify(!isValidInitialSource());
- verify(!pSource);
pSource = pTheSource;
}
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index afc131ac7d8..d6cd692f0fe 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -183,6 +183,10 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
str::stream() << "$out only supports a string argument, not " << typeName(elem.type()),
elem.type() == String);
+ uassert(ErrorCodes::InvalidOptions,
+ "$out can only be used with the 'local' read concern level",
+ !pExpCtx->opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot());
+
NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str());
uassert(17385, "Can't $out to special collection: " + elem.str(), !outputNs.isSpecial());
return new DocumentSourceOut(outputNs, pExpCtx);
diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp
index 9a9331d013e..58b9732cdae 100644
--- a/src/mongo/db/pipeline/document_source_test.cpp
+++ b/src/mongo/db/pipeline/document_source_test.cpp
@@ -208,7 +208,7 @@ public:
: _service(makeTestServiceContext()),
_client(_service->makeClient("DocumentSourceTest")),
_opCtx(_client->makeOperationContext()),
- _ctx(new ExpressionContext(_opCtx.get(), NamespaceString(ns))) {}
+ _ctx(new ExpressionContext(_opCtx.get(), AggregationRequest(NamespaceString(ns), {}))) {}
protected:
intrusive_ptr<ExpressionContext> ctx() {
@@ -447,7 +447,7 @@ protected:
BSONElement specElement = namedSpec.firstElement();
intrusive_ptr<ExpressionContext> expressionContext =
- new ExpressionContext(_opCtx.get(), NamespaceString(ns));
+ new ExpressionContext(_opCtx.get(), AggregationRequest(NamespaceString(ns), {}));
expressionContext->inShard = inShard;
expressionContext->inRouter = inRouter;
// Won't spill to disk properly if it needs to.
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
new file mode 100644
index 00000000000..d7c5a6708a0
--- /dev/null
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/query/collation/collator_factory_interface.h"
+
+namespace mongo {
+
+ExpressionContext::ExpressionContext(OperationContext* opCtx, const AggregationRequest& request)
+ : isExplain(request.isExplain()),
+ inShard(request.isFromRouter()),
+ extSortAllowed(request.shouldAllowDiskUse()),
+ bypassDocumentValidation(request.shouldBypassDocumentValidation()),
+ ns(request.getNamespaceString()),
+ opCtx(opCtx),
+ collation(request.getCollation()) {
+ if (!collation.isEmpty()) {
+ auto statusWithCollator =
+ CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation);
+ uassertStatusOK(statusWithCollator.getStatus());
+ collator = std::move(statusWithCollator.getValue());
+ }
+}
+
+void ExpressionContext::checkForInterrupt() {
+ // This check could be expensive, at least in relative terms, so don't check every time.
+ if (--interruptCounter == 0) {
+ opCtx->checkForInterrupt();
+ interruptCounter = kInterruptCheckPeriod;
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 19c5bd59071..90d5bad3a90 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -28,10 +28,13 @@
#pragma once
+#include <boost/intrusive_ptr.hpp>
+#include <memory>
#include <string>
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/util/intrusive_counter.h"
@@ -39,23 +42,19 @@ namespace mongo {
struct ExpressionContext : public IntrusiveCounterUnsigned {
public:
- ExpressionContext(OperationContext* opCtx, const NamespaceString& ns) : ns(ns), opCtx(opCtx) {}
+ ExpressionContext(OperationContext* opCtx, const AggregationRequest& request);
- /** Used by a pipeline to check for interrupts so that killOp() works.
- * @throws if the operation has been interrupted
+ /**
+ * Used by a pipeline to check for interrupts so that killOp() works. Throws a UserAssertion if
+ * this aggregation pipeline has been interrupted.
*/
- void checkForInterrupt() {
- if (opCtx && --interruptCounter == 0) { // XXX SERVER-13931 for opCtx check
- // The checkForInterrupt could be expensive, at least in relative terms.
- opCtx->checkForInterrupt();
- interruptCounter = kInterruptCheckPeriod;
- }
- }
+ void checkForInterrupt();
- bool inShard = false;
+ bool isExplain;
+ bool inShard;
bool inRouter = false;
- bool extSortAllowed = false;
- bool bypassDocumentValidation = false;
+ bool extSortAllowed;
+ bool bypassDocumentValidation;
NamespaceString ns;
std::string tempDir; // Defaults to empty to prevent external sorting in mongos.
@@ -64,7 +63,7 @@ public:
// Collation requested by the user for this pipeline. Empty if the user did not request a
// collation.
- BSONObj collation;
+ const BSONObj collation;
// Collator used to compare elements. 'collator' is initialized from 'collation', except in the
// case where 'collation' is empty and there is a collection default collation.
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index ca8b671a673..2dc5ffc7899 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -32,12 +32,12 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_optimizations.h"
+#include "mongo/base/error_codes.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/privilege.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/catalog/document_validation.h"
-#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/accumulator.h"
@@ -45,8 +45,6 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
-#include "mongo/db/query/collation/collator_factory_interface.h"
-#include "mongo/db/repl/read_concern_args.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
@@ -59,182 +57,63 @@ using std::vector;
namespace dps = ::mongo::dotted_path_support;
-const char Pipeline::commandName[] = "aggregate";
-const char Pipeline::pipelineName[] = "pipeline";
-const char Pipeline::collationName[] = "collation";
-const char Pipeline::explainName[] = "explain";
-const char Pipeline::fromRouterName[] = "fromRouter";
-const char Pipeline::serverPipelineName[] = "serverPipeline";
-const char Pipeline::mongosPipelineName[] = "mongosPipeline";
-
-Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx)
- : explain(false), pCtx(pTheCtx) {}
-
-intrusive_ptr<Pipeline> Pipeline::parseCommand(string& errmsg,
- const BSONObj& cmdObj,
- const intrusive_ptr<ExpressionContext>& pCtx) {
- intrusive_ptr<Pipeline> pPipeline(new Pipeline(pCtx));
- vector<BSONElement> pipeline;
- repl::ReadConcernArgs readConcernArgs;
-
- /* gather the specification for the aggregation */
- for (BSONObj::iterator cmdIterator = cmdObj.begin(); cmdIterator.more();) {
- BSONElement cmdElement(cmdIterator.next());
- const char* pFieldName = cmdElement.fieldName();
-
- // ignore top-level fields prefixed with $. They are for the command processor, not us.
- if (pFieldName[0] == '$') {
- continue;
- }
-
- // maxTimeMS is also for the command processor.
- if (str::equals(pFieldName, QueryRequest::cmdOptionMaxTimeMS)) {
- continue;
- }
+Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
- if (pFieldName == repl::ReadConcernArgs::kReadConcernFieldName) {
- uassertStatusOK(readConcernArgs.initialize(cmdElement));
- continue;
- }
-
- // ignore cursor options since they are handled externally.
- if (str::equals(pFieldName, "cursor")) {
- continue;
- }
+StatusWith<intrusive_ptr<Pipeline>> Pipeline::parse(
+ const std::vector<BSONObj>& rawPipeline, const intrusive_ptr<ExpressionContext>& expCtx) {
+ intrusive_ptr<Pipeline> pipeline(new Pipeline(expCtx));
- // ignore writeConcern since it's handled externally
- if (str::equals(pFieldName, "writeConcern")) {
- continue;
- }
-
- if (str::equals(pFieldName, collationName)) {
- uassert(ErrorCodes::BadValue,
- str::stream() << collationName << " must be an object, not a "
- << typeName(cmdElement.type()),
- cmdElement.type() == BSONType::Object);
- pCtx->collation = cmdElement.Obj().getOwned();
- auto statusWithCollator =
- CollatorFactoryInterface::get(pCtx->opCtx->getServiceContext())
- ->makeFromBSON(cmdElement.Obj());
- uassertStatusOK(statusWithCollator.getStatus());
- pCtx->collator = std::move(statusWithCollator.getValue());
- continue;
- }
-
- /* look for the aggregation command */
- if (!strcmp(pFieldName, commandName)) {
- continue;
- }
+ for (auto&& stageObj : rawPipeline) {
+ auto parsedSources = DocumentSource::parse(expCtx, stageObj);
+ pipeline->_sources.insert(
+ pipeline->_sources.end(), parsedSources.begin(), parsedSources.end());
+ }
- /* check for the collection name */
- if (!strcmp(pFieldName, pipelineName)) {
- pipeline = cmdElement.Array();
- continue;
- }
+ auto status = pipeline->ensureAllStagesAreInLegalPositions();
+ if (!status.isOK()) {
+ return status;
+ }
- /* check for explain option */
- if (!strcmp(pFieldName, explainName)) {
- pPipeline->explain = cmdElement.Bool();
- continue;
- }
+ pipeline->optimizePipeline();
- /* if the request came from the router, we're in a shard */
- if (!strcmp(pFieldName, fromRouterName)) {
- pCtx->inShard = cmdElement.Bool();
- continue;
- }
+ return pipeline;
+}
- if (str::equals(pFieldName, "allowDiskUse")) {
- uassert(ErrorCodes::IllegalOperation,
- "The 'allowDiskUse' option is not permitted in read-only mode.",
- !storageGlobalParams.readOnly);
-
- uassert(16949,
- str::stream() << "allowDiskUse must be a bool, not a "
- << typeName(cmdElement.type()),
- cmdElement.type() == Bool);
- pCtx->extSortAllowed = cmdElement.Bool();
- continue;
+Status Pipeline::ensureAllStagesAreInLegalPositions() const {
+ size_t i = 0;
+ for (auto&& stage : _sources) {
+ if (stage->isValidInitialSource() && i != 0) {
+ return {ErrorCodes::BadValue,
+ str::stream() << stage->getSourceName()
+ << " is only valid as the first stage in a pipeline."};
}
- if (pFieldName == bypassDocumentValidationCommandOption()) {
- pCtx->bypassDocumentValidation = cmdElement.trueValue();
- continue;
+ if (dynamic_cast<DocumentSourceOut*>(stage.get()) && i != _sources.size() - 1) {
+ return {ErrorCodes::BadValue, "$out can only be the final stage in the pipeline"};
}
-
- /* we didn't recognize a field in the command */
- ostringstream sb;
- sb << "unrecognized field '" << cmdElement.fieldName() << "'";
- errmsg = sb.str();
- return intrusive_ptr<Pipeline>();
+ ++i;
}
-
- /*
- If we get here, we've harvested the fields we expect for a pipeline.
-
- Set up the specified document source pipeline.
- */
- SourceContainer& sources = pPipeline->sources; // shorthand
-
- /* iterate over the steps in the pipeline */
- const size_t nSteps = pipeline.size();
- for (size_t iStep = 0; iStep < nSteps; ++iStep) {
- /* pull out the pipeline element as an object */
- BSONElement pipeElement(pipeline[iStep]);
- uassert(15942,
- str::stream() << "pipeline element " << iStep << " is not an object",
- pipeElement.type() == Object);
-
- vector<intrusive_ptr<DocumentSource>> stepSources =
- DocumentSource::parse(pCtx, pipeElement.Obj());
-
- // Iterate over the steps in stepSource. stepSource may have more than one step if the
- // current step is a DocumentSource alias.
- const size_t nStepSources = stepSources.size();
- for (size_t iStepSource = 0; iStepSource < nStepSources; ++iStepSource) {
- sources.push_back(stepSources[iStepSource]);
-
- if (sources.back()->isValidInitialSource()) {
- uassert(28837,
- str::stream() << sources.back()->getSourceName()
- << " is only valid as the first stage in a pipeline.",
- iStep == 0 && iStepSource == 0);
- }
-
- if (dynamic_cast<DocumentSourceOut*>(sources.back().get())) {
- uassert(16991,
- "$out can only be the final stage in the pipeline",
- iStep == nSteps - 1 && iStepSource == nStepSources - 1);
-
- uassert(ErrorCodes::InvalidOptions,
- "$out can only be used with the 'local' read concern level",
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kLocalReadConcern);
- }
- }
- }
-
- pPipeline->optimizePipeline();
-
- return pPipeline;
+ return Status::OK();
}
void Pipeline::optimizePipeline() {
SourceContainer optimizedSources;
- SourceContainer::iterator itr = sources.begin();
+ SourceContainer::iterator itr = _sources.begin();
- while (itr != sources.end() && std::next(itr) != sources.end()) {
+ while (itr != _sources.end() && std::next(itr) != _sources.end()) {
invariant((*itr).get());
- itr = (*itr).get()->optimizeAt(itr, &sources);
+ itr = (*itr).get()->optimizeAt(itr, &_sources);
}
// Once we have reached our final number of stages, optimize each individually.
- for (auto&& source : sources) {
+ for (auto&& source : _sources) {
if (auto out = source->optimize()) {
optimizedSources.push_back(out);
}
}
- sources.swap(optimizedSources);
+ _sources.swap(optimizedSources);
+ stitch();
}
Status Pipeline::checkAuthForCommand(ClientBasic* client,
@@ -316,7 +195,7 @@ bool Pipeline::aggSupportsWriteConcern(const BSONObj& cmd) {
void Pipeline::detachFromOperationContext() {
pCtx->opCtx = nullptr;
- for (auto&& source : sources) {
+ for (auto&& source : _sources) {
source->detachFromOperationContext();
}
}
@@ -325,7 +204,7 @@ void Pipeline::reattachToOperationContext(OperationContext* opCtx) {
invariant(pCtx->opCtx == nullptr);
pCtx->opCtx = opCtx;
- for (auto&& source : sources) {
+ for (auto&& source : _sources) {
source->reattachToOperationContext(opCtx);
}
}
@@ -342,7 +221,6 @@ intrusive_ptr<Pipeline> Pipeline::splitForSharded() {
// shards and all work being done in the merger. Optimizations can move operations between
// the pipelines to be more efficient.
intrusive_ptr<Pipeline> shardPipeline(new Pipeline(pCtx));
- shardPipeline->explain = explain;
// The order in which optimizations are applied can have significant impact on the
// efficiency of the final pipeline. Be Careful!
@@ -354,25 +232,25 @@ intrusive_ptr<Pipeline> Pipeline::splitForSharded() {
}
void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
- while (!mergePipe->sources.empty()) {
- intrusive_ptr<DocumentSource> current = mergePipe->sources.front();
- mergePipe->sources.pop_front();
+ while (!mergePipe->_sources.empty()) {
+ intrusive_ptr<DocumentSource> current = mergePipe->_sources.front();
+ mergePipe->_sources.pop_front();
// Check if this source is splittable
SplittableDocumentSource* splittable =
dynamic_cast<SplittableDocumentSource*>(current.get());
if (!splittable) {
- // move the source from the merger sources to the shard sources
- shardPipe->sources.push_back(current);
+ // move the source from the merger _sources to the shard _sources
+ shardPipe->_sources.push_back(current);
} else {
- // split this source into Merge and Shard sources
+ // split this source into Merge and Shard _sources
intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource();
if (shardSource)
- shardPipe->sources.push_back(shardSource);
+ shardPipe->_sources.push_back(shardSource);
if (mergeSource)
- mergePipe->sources.push_front(mergeSource);
+ mergePipe->_sources.push_front(mergeSource);
break;
}
@@ -381,10 +259,10 @@ void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipel
void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
Pipeline* mergePipe) {
- while (!shardPipe->sources.empty() &&
- dynamic_cast<DocumentSourceUnwind*>(shardPipe->sources.back().get())) {
- mergePipe->sources.push_front(shardPipe->sources.back());
- shardPipe->sources.pop_back();
+ while (!shardPipe->_sources.empty() &&
+ dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) {
+ mergePipe->_sources.push_front(shardPipe->_sources.back());
+ shardPipe->_sources.pop_back();
}
}
@@ -411,27 +289,27 @@ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipelin
// objects even though only a subset of fields are needed.
// 2) Optimization IS NOT applied immediately following a $project or $group since it would
// add an unnecessary project (and therefore a deep-copy).
- for (auto&& source : shardPipe->sources) {
+ for (auto&& source : shardPipe->_sources) {
DepsTracker dt;
if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
return;
}
// if we get here, add the project.
- shardPipe->sources.push_back(DocumentSourceProject::createFromBson(
+ shardPipe->_sources.push_back(DocumentSourceProject::createFromBson(
BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx));
}
BSONObj Pipeline::getInitialQuery() const {
- if (sources.empty())
+ if (_sources.empty())
return BSONObj();
/* look for an initial $match */
- DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources.front().get());
+ DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(_sources.front().get());
if (match) {
return match->getQuery();
}
- DocumentSourceGeoNear* geoNear = dynamic_cast<DocumentSourceGeoNear*>(sources.front().get());
+ DocumentSourceGeoNear* geoNear = dynamic_cast<DocumentSourceGeoNear*>(_sources.front().get());
if (geoNear) {
return geoNear->getQuery();
}
@@ -440,7 +318,7 @@ BSONObj Pipeline::getInitialQuery() const {
}
bool Pipeline::needsPrimaryShardMerger() const {
- for (auto&& source : sources) {
+ for (auto&& source : _sources) {
if (source->needsPrimaryShard()) {
return true;
}
@@ -450,52 +328,28 @@ bool Pipeline::needsPrimaryShardMerger() const {
std::vector<NamespaceString> Pipeline::getInvolvedCollections() const {
std::vector<NamespaceString> collections;
- for (auto&& source : sources) {
+ for (auto&& source : _sources) {
source->addInvolvedCollections(&collections);
}
return collections;
}
-Document Pipeline::serialize() const {
- MutableDocument serialized;
- // create an array out of the pipeline operations
- vector<Value> array;
- for (SourceContainer::const_iterator iter(sources.begin()), listEnd(sources.end());
- iter != listEnd;
- ++iter) {
- intrusive_ptr<DocumentSource> pSource(*iter);
- pSource->serializeToArray(array);
- }
-
- // add the top-level items to the command
- serialized.setField(commandName, Value(pCtx->ns.coll()));
- serialized.setField(pipelineName, Value(array));
-
- if (explain) {
- serialized.setField(explainName, Value(explain));
+vector<Value> Pipeline::serialize() const {
+ vector<Value> serializedSources;
+ for (auto&& source : _sources) {
+ source->serializeToArray(serializedSources);
}
-
- if (pCtx->extSortAllowed) {
- serialized.setField("allowDiskUse", Value(true));
- }
-
- if (pCtx->bypassDocumentValidation) {
- serialized.setField(bypassDocumentValidationCommandOption(), Value(true));
- }
-
- if (pCtx->collator.get()) {
- serialized.setField(collationName, Value(pCtx->collator->getSpec().toBSON()));
- }
-
- return serialized.freeze();
+ return serializedSources;
}
void Pipeline::stitch() {
- massert(16600, "should not have an empty pipeline", !sources.empty());
-
- /* chain together the sources we found */
- DocumentSource* prevSource = sources.front().get();
- for (SourceContainer::iterator iter(++sources.begin()), listEnd(sources.end()); iter != listEnd;
+ if (_sources.empty()) {
+ return;
+ }
+ // Chain together all the stages.
+ DocumentSource* prevSource = _sources.front().get();
+ for (SourceContainer::iterator iter(++_sources.begin()), listEnd(_sources.end());
+ iter != listEnd;
++iter) {
intrusive_ptr<DocumentSource> pTemp(*iter);
pTemp->setSource(prevSource);
@@ -504,13 +358,13 @@ void Pipeline::stitch() {
}
void Pipeline::run(BSONObjBuilder& result) {
- // should not get here in the explain case
- verify(!explain);
+ // We should not get here in the explain case.
+ verify(!pCtx->isExplain);
// the array in which the aggregation results reside
// cant use subArrayStart() due to error handling
BSONArrayBuilder resultArray;
- DocumentSource* finalSource = sources.back().get();
+ DocumentSource* finalSource = _sources.back().get();
while (boost::optional<Document> next = finalSource->getNext()) {
// add the document to the result set
BSONObjBuilder documentBuilder(resultArray.subobjStart());
@@ -530,21 +384,21 @@ void Pipeline::run(BSONObjBuilder& result) {
vector<Value> Pipeline::writeExplainOps() const {
vector<Value> array;
- for (SourceContainer::const_iterator it = sources.begin(); it != sources.end(); ++it) {
+ for (SourceContainer::const_iterator it = _sources.begin(); it != _sources.end(); ++it) {
(*it)->serializeToArray(array, /*explain=*/true);
}
return array;
}
void Pipeline::addInitialSource(intrusive_ptr<DocumentSource> source) {
- sources.push_front(source);
+ _sources.push_front(source);
}
DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const {
DepsTracker deps;
bool knowAllFields = false;
bool knowAllMeta = false;
- for (auto&& source : sources) {
+ for (auto&& source : _sources) {
DepsTracker localDeps;
DocumentSource::GetDepsReturn status = source->getDependencies(&localDeps);
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index f851edefd2d..84a8333ff8e 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -28,7 +28,8 @@
#pragma once
-#include <deque>
+#include <list>
+#include <vector>
#include <boost/intrusive_ptr.hpp>
@@ -42,33 +43,30 @@ class BSONObj;
class BSONObjBuilder;
class ClientBasic;
class CollatorInterface;
-class Command;
struct DepsTracker;
class DocumentSource;
struct ExpressionContext;
class OperationContext;
-class Privilege;
-/** mongodb "commands" (sent via db.$cmd.findOne(...))
- subclass to make a command. define a singleton object for it.
- */
+/**
+ * A Pipeline object represents a list of DocumentSources and is responsible for optimizing the
+ * pipeline.
+ */
class Pipeline : public IntrusiveCounterUnsigned {
public:
typedef std::list<boost::intrusive_ptr<DocumentSource>> SourceContainer;
/**
- * Create a pipeline from the command.
- *
- * @param errmsg where to write errors, if there are any
- * @param cmdObj the command object sent from the client
- * @returns the pipeline, if created, otherwise a NULL reference
+ * Parses a Pipeline from a BSONElement representing a list of DocumentSources. Returns a non-OK
+ * status if it failed to parse.
*/
- static boost::intrusive_ptr<Pipeline> parseCommand(
- std::string& errmsg,
- const BSONObj& cmdObj,
- const boost::intrusive_ptr<ExpressionContext>& pCtx);
+ static StatusWith<boost::intrusive_ptr<Pipeline>> parse(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
- // Helper to implement Command::checkAuthForCommand
+ /**
+ * Helper to implement Command::checkAuthForCommand.
+ */
static Status checkAuthForCommand(ClientBasic* client,
const std::string& dbname,
const BSONObj& cmdObj);
@@ -86,7 +84,7 @@ public:
* Sets the OperationContext of 'pCtx' to nullptr.
*
* The PipelineProxyStage is responsible for detaching the OperationContext and releasing any
- * storage-engine state of the DocumentSourceCursor that may be present in 'sources'.
+ * storage-engine state of the DocumentSourceCursor that may be present in '_sources'.
*/
void detachFromOperationContext();
@@ -94,7 +92,7 @@ public:
* Sets the OperationContext of 'pCtx' to 'opCtx'.
*
* The PipelineProxyStage is responsible for reattaching the OperationContext and reacquiring
- * any storage-engine state of the DocumentSourceCursor that may be present in 'sources'.
+ * any storage-engine state of the DocumentSourceCursor that may be present in '_sources'.
*/
void reattachToOperationContext(OperationContext* opCtx);
@@ -140,22 +138,9 @@ public:
std::vector<NamespaceString> getInvolvedCollections() const;
/**
- Write the Pipeline as a BSONObj command. This should be the
- inverse of parseCommand().
-
- This is only intended to be used by the shard command obtained
- from splitForSharded(). Some pipeline operations in the merge
- process do not have equivalent command forms, and using this on
- the mongos Pipeline will cause assertions.
-
- @param the builder to write the command to
- */
- Document serialize() const;
-
- /** Stitch together the source pointers (by calling setSource) for each source in sources.
- * Must be called after optimize and addInitialSource but before trying to get results.
+ * Serializes the pipeline into a form that can be parsed into an equivalent pipeline.
*/
- void stitch();
+ std::vector<Value> serialize() const;
/**
Run the Pipeline on the given source.
@@ -164,17 +149,13 @@ public:
*/
void run(BSONObjBuilder& result);
- bool isExplain() const {
- return explain;
- }
-
/// The initial source is special since it varies between mongos and mongod.
void addInitialSource(boost::intrusive_ptr<DocumentSource> source);
/// The source that represents the output. Returns a non-owning pointer.
DocumentSource* output() {
- invariant(!sources.empty());
- return sources.back().get();
+ invariant(!_sources.empty());
+ return _sources.back().get();
}
/**
@@ -191,10 +172,6 @@ public:
*/
DepsTracker getDependencies(const BSONObj& initialQuery) const;
- /**
- The aggregation command name.
- */
- static const char commandName[];
/*
PipelineD is a "sister" class that has additional functionality
@@ -220,17 +197,22 @@ private:
friend class Optimizations::Local;
friend class Optimizations::Sharded;
- static const char pipelineName[];
- static const char collationName[];
- static const char explainName[];
- static const char fromRouterName[];
- static const char serverPipelineName[];
- static const char mongosPipelineName[];
-
Pipeline(const boost::intrusive_ptr<ExpressionContext>& pCtx);
- SourceContainer sources;
- bool explain;
+ /**
+ * Stitch together the source pointers by calling setSource() for each source in '_sources'.
+ * This function must be called any time the order of stages within the pipeline changes, e.g.
+ * in optimizePipeline().
+ */
+ void stitch();
+
+ /**
+ * Returns a non-OK status if any stage is in an invalid position. For example, if an $out stage
+ * is present but is not the last stage in the pipeline.
+ */
+ Status ensureAllStagesAreInLegalPositions() const;
+
+ SourceContainer _sources;
boost::intrusive_ptr<ExpressionContext> pCtx;
};
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index d88e7932f73..c26333245bd 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -254,7 +254,7 @@ shared_ptr<PlanExecutor> PipelineD::prepareCursorSource(
const intrusive_ptr<Pipeline>& pPipeline,
const intrusive_ptr<ExpressionContext>& pExpCtx) {
// We will be modifying the source vector as we go.
- Pipeline::SourceContainer& sources = pPipeline->sources;
+ Pipeline::SourceContainer& sources = pPipeline->_sources;
// Inject a MongodImplementation to sources that need them.
for (auto&& source : sources) {
@@ -422,11 +422,11 @@ std::shared_ptr<PlanExecutor> PipelineD::prepareExecutor(
}
// We know the sort is being handled by the query system, so remove the $sort stage.
- pipeline->sources.pop_front();
+ pipeline->_sources.pop_front();
if (sortStage->getLimitSrc()) {
// We need to reinsert the coalesced $limit after removing the $sort.
- pipeline->sources.push_front(sortStage->getLimitSrc());
+ pipeline->_sources.push_front(sortStage->getLimitSrc());
}
return exec;
}
@@ -501,7 +501,7 @@ shared_ptr<PlanExecutor> PipelineD::addCursorSource(const intrusive_ptr<Pipeline
std::string PipelineD::getPlanSummaryStr(const boost::intrusive_ptr<Pipeline>& pPipeline) {
if (auto docSourceCursor =
- dynamic_cast<DocumentSourceCursor*>(pPipeline->sources.front().get())) {
+ dynamic_cast<DocumentSourceCursor*>(pPipeline->_sources.front().get())) {
return docSourceCursor->getPlanSummaryStr();
}
@@ -513,12 +513,12 @@ void PipelineD::getPlanSummaryStats(const boost::intrusive_ptr<Pipeline>& pPipel
invariant(statsOut);
if (auto docSourceCursor =
- dynamic_cast<DocumentSourceCursor*>(pPipeline->sources.front().get())) {
+ dynamic_cast<DocumentSourceCursor*>(pPipeline->_sources.front().get())) {
*statsOut = docSourceCursor->getPlanSummaryStats();
}
bool hasSortStage{false};
- for (auto&& source : pPipeline->sources) {
+ for (auto&& source : pPipeline->_sources) {
if (dynamic_cast<DocumentSourceSort*>(source.get())) {
hasSortStage = true;
break;
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index ca31b0ab4e6..1bda01e3968 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -28,6 +28,10 @@
#include "mongo/platform/basic.h"
+#include <boost/intrusive_ptr.hpp>
+#include <string>
+#include <vector>
+
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -47,6 +51,7 @@ namespace PipelineTests {
using boost::intrusive_ptr;
using std::string;
+using std::vector;
namespace Optimizations {
using namespace mongo;
@@ -75,16 +80,18 @@ public:
const BSONObj outputPipeExpected = pipelineFromJsonArray(outputPipeJson());
const BSONObj serializePipeExpected = pipelineFromJsonArray(serializedPipeJson());
- intrusive_ptr<ExpressionContext> ctx =
- new ExpressionContext(&_opCtx, NamespaceString("a.collection"));
- string errmsg;
- intrusive_ptr<Pipeline> outputPipe = Pipeline::parseCommand(errmsg, inputBson, ctx);
- ASSERT_EQUALS(errmsg, "");
- ASSERT(outputPipe != NULL);
+ ASSERT_EQUALS(inputBson["pipeline"].type(), BSONType::Array);
+ vector<BSONObj> rawPipeline;
+ for (auto&& stageElem : inputBson["pipeline"].Array()) {
+ ASSERT_EQUALS(stageElem.type(), BSONType::Object);
+ rawPipeline.push_back(stageElem.embeddedObject());
+ }
+ AggregationRequest request(NamespaceString("a.collection"), rawPipeline);
+ intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(&_opCtx, request);
+ auto outputPipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx));
ASSERT_EQUALS(Value(outputPipe->writeExplainOps()), Value(outputPipeExpected["pipeline"]));
- ASSERT_EQUALS(outputPipe->serialize()["pipeline"],
- Value(serializePipeExpected["pipeline"]));
+ ASSERT_EQUALS(Value(outputPipe->serialize()), Value(serializePipeExpected["pipeline"]));
}
virtual ~Base() {}
@@ -719,15 +726,18 @@ public:
const BSONObj shardPipeExpected = pipelineFromJsonArray(shardPipeJson());
const BSONObj mergePipeExpected = pipelineFromJsonArray(mergePipeJson());
- intrusive_ptr<ExpressionContext> ctx =
- new ExpressionContext(&_opCtx, NamespaceString("a.collection"));
- string errmsg;
- mergePipe = Pipeline::parseCommand(errmsg, inputBson, ctx);
- ASSERT_EQUALS(errmsg, "");
- ASSERT(mergePipe != NULL);
+ ASSERT_EQUALS(inputBson["pipeline"].type(), BSONType::Array);
+ vector<BSONObj> rawPipeline;
+ for (auto&& stageElem : inputBson["pipeline"].Array()) {
+ ASSERT_EQUALS(stageElem.type(), BSONType::Object);
+ rawPipeline.push_back(stageElem.embeddedObject());
+ }
+ AggregationRequest request(NamespaceString("a.collection"), rawPipeline);
+ intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(&_opCtx, request);
+ mergePipe = uassertStatusOK(Pipeline::parse(request.getPipeline(), ctx));
shardPipe = mergePipe->splitForSharded();
- ASSERT(shardPipe != NULL);
+ ASSERT(shardPipe != nullptr);
ASSERT_EQUALS(Value(shardPipe->writeExplainOps()), Value(shardPipeExpected["pipeline"]));
ASSERT_EQUALS(Value(mergePipe->writeExplainOps()), Value(mergePipeExpected["pipeline"]));
@@ -1063,58 +1073,41 @@ class LookUp : public needsPrimaryShardMergerBase {
namespace {
TEST(PipelineInitialSource, GeoNearInitialQuery) {
- const BSONObj inputBson =
- fromjson("{pipeline: [{$geoNear: {distanceField: 'd', near: [0, 0], query: {a: 1}}}]}");
-
OperationContextNoop _opCtx;
- intrusive_ptr<ExpressionContext> ctx =
- new ExpressionContext(&_opCtx, NamespaceString("a.collection"));
- string errmsg;
- intrusive_ptr<Pipeline> pipe = Pipeline::parseCommand(errmsg, inputBson, ctx);
+ const std::vector<BSONObj> rawPipeline = {
+ fromjson("{$geoNear: {distanceField: 'd', near: [0, 0], query: {a: 1}}}")};
+ intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(
+ &_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline));
+ auto pipe = uassertStatusOK(Pipeline::parse(rawPipeline, ctx));
ASSERT_EQ(pipe->getInitialQuery(), BSON("a" << 1));
}
TEST(PipelineInitialSource, MatchInitialQuery) {
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {'a': 4}}]}");
-
OperationContextNoop _opCtx;
- intrusive_ptr<ExpressionContext> ctx =
- new ExpressionContext(&_opCtx, NamespaceString("a.collection"));
- string errmsg;
- intrusive_ptr<Pipeline> pipe = Pipeline::parseCommand(errmsg, inputBson, ctx);
- ASSERT_EQ(pipe->getInitialQuery(), BSON("a" << 4));
-}
+ const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {'a': 4}}")};
+ intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(
+ &_opCtx, AggregationRequest(NamespaceString("a.collection"), rawPipeline));
-TEST(PipelineInitialSource, CollationNotAnObjectFailsToParse) {
- QueryTestServiceContext serviceContext;
- auto txn = serviceContext.makeOperationContext();
-
- const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], collation: 1}");
-
- intrusive_ptr<ExpressionContext> ctx =
- new ExpressionContext(txn.get(), NamespaceString("a.collection"));
- string errmsg;
- ASSERT_THROWS(Pipeline::parseCommand(errmsg, inputBson, ctx), UserException);
+ auto pipe = uassertStatusOK(Pipeline::parse(rawPipeline, ctx));
+ ASSERT_EQ(pipe->getInitialQuery(), BSON("a" << 4));
}
TEST(PipelineInitialSource, ParseCollation) {
QueryTestServiceContext serviceContext;
- auto txn = serviceContext.makeOperationContext();
+ auto opCtx = serviceContext.makeOperationContext();
const BSONObj inputBson =
fromjson("{pipeline: [{$match: {a: 'abc'}}], collation: {locale: 'reverse'}}");
+ auto request = AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson);
+ ASSERT_OK(request.getStatus());
- intrusive_ptr<ExpressionContext> ctx =
- new ExpressionContext(txn.get(), NamespaceString("a.collection"));
- string errmsg;
- intrusive_ptr<Pipeline> pipe = Pipeline::parseCommand(errmsg, inputBson, ctx);
+ intrusive_ptr<ExpressionContext> ctx = new ExpressionContext(opCtx.get(), request.getValue());
ASSERT(ctx->collator.get());
CollatorInterfaceMock collator(CollatorInterfaceMock::MockType::kReverseString);
ASSERT_TRUE(CollatorInterface::collatorsMatch(ctx->collator.get(), &collator));
}
}
-
class All : public Suite {
public:
All() : Suite("PipelineOptimizations") {}
diff --git a/src/mongo/db/pipeline/value.cpp b/src/mongo/db/pipeline/value.cpp
index 37b3b88afae..4334af8260c 100644
--- a/src/mongo/db/pipeline/value.cpp
+++ b/src/mongo/db/pipeline/value.cpp
@@ -241,6 +241,15 @@ Value::Value(const BSONArray& arr) : _storage(Array) {
_storage.putVector(vec.get());
}
+Value::Value(const vector<BSONObj>& arr) : _storage(Array) {
+ intrusive_ptr<RCVector> vec(new RCVector);
+ vec->vec.reserve(arr.size());
+ for (auto&& obj : arr) {
+ vec->vec.push_back(Value(obj));
+ }
+ _storage.putVector(vec.get());
+}
+
Value Value::createIntOrLong(long long longValue) {
int intValue = longValue;
if (intValue != longValue) {
diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h
index 5ac5c61180b..bda0723d471 100644
--- a/src/mongo/db/pipeline/value.h
+++ b/src/mongo/db/pipeline/value.h
@@ -80,6 +80,7 @@ public:
explicit Value(const Document& doc) : _storage(Object, doc) {}
explicit Value(const BSONObj& obj);
explicit Value(const BSONArray& arr);
+ explicit Value(const std::vector<BSONObj>& arr);
explicit Value(std::vector<Value> vec) : _storage(Array, new RCVector(std::move(vec))) {}
explicit Value(const BSONBinData& bd) : _storage(BinData, bd) {}
explicit Value(const BSONRegEx& re) : _storage(RegEx, re) {}
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index 5206be6974f..4118fa78c07 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -84,7 +84,7 @@ using mongo::DocumentSourceCursor;
class Base : public CollectionBase {
public:
- Base() : _ctx(new ExpressionContext(&_opCtx, nss)) {
+ Base() : _ctx(new ExpressionContext(&_opCtx, AggregationRequest(nss, {}))) {
_ctx->tempDir = storageGlobalParams.dbpath + "/_tmp";
}
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index b06793e3b3f..845456ce7d3 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -279,13 +279,12 @@ public:
std::shared_ptr<PlanExecutor> innerExec(makeIndexScanExec(ctx.db(), indexSpec, 7, 10));
// Create the aggregation pipeline.
- boost::intrusive_ptr<ExpressionContext> expCtx =
- new ExpressionContext(&_txn, NamespaceString(nss.ns()));
+ std::vector<BSONObj> rawPipeline = {fromjson("{$match: {a: {$gte: 7, $lte: 10}}}")};
+ boost::intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(
+ &_txn, AggregationRequest(NamespaceString(nss.ns()), rawPipeline));
- string errmsg;
- BSONObj inputBson = fromjson("{$match: {a: {$gte: 7, $lte: 10}}}");
- boost::intrusive_ptr<Pipeline> pipeline = Pipeline::parseCommand(errmsg, inputBson, expCtx);
- ASSERT_EQUALS(errmsg, "");
+ auto statusWithPipeline = Pipeline::parse(rawPipeline, expCtx);
+ auto pipeline = assertGet(statusWithPipeline);
// Create the output PlanExecutor that pulls results from the pipeline.
auto ws = make_unique<WorkingSet>();
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 8fb7a896f89..46cf44b9570 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -39,6 +39,7 @@
#include "mongo/base/status.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
@@ -72,7 +73,7 @@ namespace {
*/
class PipelineCommand : public Command {
public:
- PipelineCommand() : Command(Pipeline::commandName, false) {}
+ PipelineCommand() : Command(AggregationRequest::kCommandName, false) {}
virtual bool slaveOk() const {
return true;
@@ -117,19 +118,22 @@ public:
return aggPassthrough(txn, conf, cmdObj, result, options);
}
- intrusive_ptr<ExpressionContext> mergeCtx =
- new ExpressionContext(txn, NamespaceString(fullns));
+ auto request = AggregationRequest::parseFromBSON(NamespaceString(fullns), cmdObj);
+ if (!request.isOK()) {
+ return appendCommandStatus(result, request.getStatus());
+ }
+
+ intrusive_ptr<ExpressionContext> mergeCtx = new ExpressionContext(txn, request.getValue());
mergeCtx->inRouter = true;
// explicitly *not* setting mergeCtx->tempDir
- // Parse the pipeline specification
- intrusive_ptr<Pipeline> pipeline(Pipeline::parseCommand(errmsg, cmdObj, mergeCtx));
- if (!pipeline.get()) {
- // There was some parsing error
- return false;
+ // Parse the pipeline specification.
+ auto pipeline = Pipeline::parse(request.getValue().getPipeline(), mergeCtx);
+ if (!pipeline.isOK()) {
+ return appendCommandStatus(result, pipeline.getStatus());
}
- for (auto&& ns : pipeline->getInvolvedCollections()) {
+ for (auto&& ns : pipeline.getValue()->getInvolvedCollections()) {
uassert(
28769, str::stream() << ns.ns() << " cannot be sharded", !conf->isSharded(ns.ns()));
}
@@ -140,30 +144,34 @@ public:
// If the first $match stage is an exact match on the shard key, we only have to send it
// to one shard, so send the command to that shard.
- BSONObj firstMatchQuery = pipeline->getInitialQuery();
+ BSONObj firstMatchQuery = pipeline.getValue()->getInitialQuery();
ChunkManagerPtr chunkMgr = conf->getChunkManager(txn, fullns);
BSONObj shardKeyMatches = uassertStatusOK(
chunkMgr->getShardKeyPattern().extractShardKeyFromQuery(txn, firstMatchQuery));
// Don't need to split pipeline if the first $match is an exact match on shard key, unless
// there is a stage that needs to be run on the primary shard.
- const bool needPrimaryShardMerger = pipeline->needsPrimaryShardMerger();
+ const bool needPrimaryShardMerger = pipeline.getValue()->needsPrimaryShardMerger();
const bool needSplit = shardKeyMatches.isEmpty() || needPrimaryShardMerger;
// Split the pipeline into pieces for mongod(s) and this mongos. If needSplit is true,
// 'pipeline' will become the merger side.
- intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline->splitForSharded() : pipeline);
+ intrusive_ptr<Pipeline> shardPipeline(needSplit ? pipeline.getValue()->splitForSharded()
+ : pipeline.getValue());
- // Create the command for the shards. The 'fromRouter' field means produce output to
- // be merged.
- MutableDocument commandBuilder(shardPipeline->serialize());
+ // Create the command for the shards. The 'fromRouter' field means produce output to be
+ // merged.
+ MutableDocument commandBuilder(request.getValue().serializeToCommandObj());
+ commandBuilder[AggregationRequest::kPipelineName] = Value(shardPipeline->serialize());
if (needSplit) {
- commandBuilder.setField("fromRouter", Value(true));
- commandBuilder.setField("cursor", Value(DOC("batchSize" << 0)));
+ commandBuilder[AggregationRequest::kFromRouterName] = Value(true);
+ commandBuilder["cursor"] = Value(DOC("batchSize" << 0));
} else {
- commandBuilder.setField("cursor", Value(cmdObj["cursor"]));
+ commandBuilder["cursor"] = Value(cmdObj["cursor"]);
}
+ // These fields are not part of the AggregationRequest since they are not handled by the
+ // aggregation subsystem, so we serialize them separately.
const std::initializer_list<StringData> fieldsToPropagateToShards = {
"$queryOptions", "readConcern", QueryRequest::cmdOptionMaxTimeMS,
};
@@ -180,14 +188,14 @@ public:
Strategy::commandOp(
txn, dbname, shardedCommand, options, fullns, shardQuery, &shardResults);
- if (pipeline->isExplain()) {
+ if (mergeCtx->isExplain) {
// This must be checked before we start modifying result.
uassertAllShardsSupportExplain(shardResults);
if (needSplit) {
result << "needsPrimaryShardMerger" << needPrimaryShardMerger << "splitPipeline"
<< DOC("shardsPart" << shardPipeline->writeExplainOps() << "mergerPart"
- << pipeline->writeExplainOps());
+ << pipeline.getValue()->writeExplainOps());
} else {
result << "splitPipeline" << BSONNULL;
}
@@ -215,10 +223,11 @@ public:
return reply["ok"].trueValue();
}
- pipeline->addInitialSource(
+ pipeline.getValue()->addInitialSource(
DocumentSourceMergeCursors::create(parseCursors(shardResults), mergeCtx));
- MutableDocument mergeCmd(pipeline->serialize());
+ MutableDocument mergeCmd(request.getValue().serializeToCommandObj());
+ mergeCmd["pipeline"] = Value(pipeline.getValue()->serialize());
mergeCmd["cursor"] = Value(cmdObj["cursor"]);
if (cmdObj.hasField("$queryOptions")) {
@@ -235,7 +244,8 @@ public:
// Not propagating readConcern to merger since it doesn't do local reads.
string outputNsOrEmpty;
- if (DocumentSourceOut* out = dynamic_cast<DocumentSourceOut*>(pipeline->output())) {
+ if (DocumentSourceOut* out =
+ dynamic_cast<DocumentSourceOut*>(pipeline.getValue()->output())) {
outputNsOrEmpty = out->getOutputNs().ns();
}