/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/aggregation_request.h" #include #include "mongo/base/error_codes.h" #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/cursor_request.h" #include "mongo/db/query/query_request.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/storage/storage_options.h" namespace mongo { const StringData AggregationRequest::kCommandName = "aggregate"_sd; const StringData AggregationRequest::kCursorName = "cursor"_sd; const StringData AggregationRequest::kBatchSizeName = "batchSize"_sd; const StringData AggregationRequest::kFromRouterName = "fromRouter"_sd; const StringData AggregationRequest::kPipelineName = "pipeline"_sd; const StringData AggregationRequest::kCollationName = "collation"_sd; const StringData AggregationRequest::kExplainName = "explain"_sd; const StringData AggregationRequest::kAllowDiskUseName = "allowDiskUse"_sd; const long long AggregationRequest::kDefaultBatchSize = 101; AggregationRequest::AggregationRequest(NamespaceString nss, std::vector pipeline) : _nss(std::move(nss)), _pipeline(std::move(pipeline)) {} StatusWith AggregationRequest::parseFromBSON(NamespaceString nss, const BSONObj& cmdObj) { // Parse required parameters. auto pipelineElem = cmdObj[kPipelineName]; if (pipelineElem.eoo() || pipelineElem.type() != BSONType::Array) { return {ErrorCodes::TypeMismatch, "'pipeline' option must be specified as an array"}; } std::vector pipeline; for (auto elem : pipelineElem.Obj()) { if (elem.type() != BSONType::Object) { return {ErrorCodes::TypeMismatch, "Each element of the 'pipeline' array must be an object"}; } pipeline.push_back(elem.embeddedObject().getOwned()); } AggregationRequest request(std::move(nss), std::move(pipeline)); const std::initializer_list optionsParsedElseWhere = { QueryRequest::cmdOptionMaxTimeMS, "writeConcern"_sd, kPipelineName, kCommandName, repl::ReadConcernArgs::kReadConcernFieldName}; // Parse optional parameters. for (auto&& elem : cmdObj) { auto fieldName = elem.fieldNameStringData(); // Ignore top-level fields prefixed with $. They are for the command processor, not us. if (fieldName[0] == '$') { continue; } // Ignore options that are parsed elsewhere. if (std::find(optionsParsedElseWhere.begin(), optionsParsedElseWhere.end(), fieldName) != optionsParsedElseWhere.end()) { continue; } if (kCursorName == fieldName) { long long batchSize; auto status = CursorRequest::parseCommandCursorOptions(cmdObj, kDefaultBatchSize, &batchSize); if (!status.isOK()) { return status; } request.setCursorCommand(true); request.setBatchSize(batchSize); } else if (kCollationName == fieldName) { if (elem.type() != BSONType::Object) { return {ErrorCodes::TypeMismatch, str::stream() << kCollationName << " must be an object, not a " << typeName(elem.type())}; } request.setCollation(elem.embeddedObject().getOwned()); } else if (kExplainName == fieldName) { if (elem.type() != BSONType::Bool) { return {ErrorCodes::TypeMismatch, str::stream() << kExplainName << " must be a boolean, not a " << typeName(elem.type())}; } request.setExplain(elem.Bool()); } else if (kFromRouterName == fieldName) { if (elem.type() != BSONType::Bool) { return {ErrorCodes::TypeMismatch, str::stream() << kFromRouterName << " must be a boolean, not a " << typeName(elem.type())}; } request.setFromRouter(elem.Bool()); } else if (kAllowDiskUseName == fieldName) { if (storageGlobalParams.readOnly) { return {ErrorCodes::IllegalOperation, str::stream() << "The '" << kAllowDiskUseName << "' option is not permitted in read-only mode."}; } else if (elem.type() != BSONType::Bool) { return {ErrorCodes::TypeMismatch, str::stream() << kAllowDiskUseName << " must be a boolean, not a " << typeName(elem.type())}; } request.setAllowDiskUse(elem.Bool()); } else if (bypassDocumentValidationCommandOption() == fieldName) { request.setBypassDocumentValidation(elem.trueValue()); } else { return {ErrorCodes::FailedToParse, str::stream() << "unrecognized field '" << elem.fieldName() << "'"}; } } return request; } Document AggregationRequest::serializeToCommandObj() const { MutableDocument serialized; return Document{ {kCommandName, _nss.coll()}, {kPipelineName, _pipeline}, // Only serialize booleans if different than their default. {kExplainName, _explain ? Value(true) : Value()}, {kAllowDiskUseName, _allowDiskUse ? Value(true) : Value()}, {kFromRouterName, _fromRouter ? Value(true) : Value()}, {bypassDocumentValidationCommandOption(), _bypassDocumentValidation ? Value(true) : Value()}, // Only serialize a collation if one was specified. {kCollationName, _collation.isEmpty() ? Value() : Value(_collation)}, {kCursorName, _batchSize ? Value(Document{{kBatchSizeName, _batchSize.get()}}) : Value()}}; } } // namespace mongo