/** * Copyright (C) 2016 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/json.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/query_request.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" namespace mongo { namespace { const Document kDefaultCursorOptionDocument{ {AggregationRequest::kBatchSizeName, AggregationRequest::kDefaultBatchSize}}; // // Parsing // TEST(AggregationRequestTest, ShouldParseAllKnownOptions) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson( "{pipeline: [{$match: {a: 'abc'}}], explain: false, allowDiskUse: true, fromMongos: true, " "needsMerge: true, bypassDocumentValidation: true, collation: {locale: 'en_US'}, cursor: " "{batchSize: 10}, hint: {a: 1}, maxTimeMS: 100, readConcern: {level: 'linearizable'}, " "$queryOptions: {$readPreference: 'nearest'}, comment: 'agg_comment', exchange: {policy: " "'roundrobin', consumers:NumberInt(2)}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); ASSERT_FALSE(request.getExplain()); ASSERT_TRUE(request.shouldAllowDiskUse()); ASSERT_TRUE(request.isFromMongos()); ASSERT_TRUE(request.needsMerge()); ASSERT_TRUE(request.shouldBypassDocumentValidation()); ASSERT_EQ(request.getBatchSize(), 10); ASSERT_BSONOBJ_EQ(request.getHint(), BSON("a" << 1)); ASSERT_EQ(request.getComment(), "agg_comment"); ASSERT_BSONOBJ_EQ(request.getCollation(), BSON("locale" << "en_US")); ASSERT_EQ(request.getMaxTimeMS(), 100u); ASSERT_BSONOBJ_EQ(request.getReadConcern(), BSON("level" << "linearizable")); ASSERT_BSONOBJ_EQ(request.getUnwrappedReadPref(), BSON("$readPreference" << "nearest")); ASSERT_TRUE(request.getExchangeSpec().is_initialized()); } TEST(AggregationRequestTest, ShouldParseExplicitExplainTrue) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], explain: true, cursor: {}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); ASSERT_TRUE(request.getExplain()); ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kQueryPlanner); } TEST(AggregationRequestTest, ShouldParseExplicitExplainFalseWithCursorOption) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], explain: false, cursor: {batchSize: 10}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); ASSERT_FALSE(request.getExplain()); ASSERT_EQ(request.getBatchSize(), 10); } TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArg) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], cursor: {}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON( nss, inputBson, ExplainOptions::Verbosity::kQueryPlanner)); ASSERT_TRUE(request.getExplain()); ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kQueryPlanner); } TEST(AggregationRequestTest, ShouldParseWithSeparateQueryPlannerExplainModeArgAndCursorOption) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], cursor: {batchSize: 10}}"); auto request = unittest::assertGet( AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats)); ASSERT_TRUE(request.getExplain()); ASSERT(*request.getExplain() == ExplainOptions::Verbosity::kExecStats); ASSERT_EQ(request.getBatchSize(), 10); } TEST(AggregationRequestTest, ShouldParseExplainFlagWithReadConcern) { NamespaceString nss("a.collection"); // Non-local readConcern should not be allowed with the explain flag, but this is checked // elsewhere to avoid having to parse the readConcern in AggregationRequest. const BSONObj inputBson = fromjson("{pipeline: [], explain: true, readConcern: {level: 'majority'}}"); auto request = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBson)); ASSERT_TRUE(request.getExplain()); ASSERT_BSONOBJ_EQ(request.getReadConcern(), BSON("level" << "majority")); } // // Serialization // TEST(AggregationRequestTest, ShouldOnlySerializeRequiredFieldsIfNoOptionalFieldsAreSpecified) { NamespaceString nss("a.collection"); AggregationRequest request(nss, {}); auto expectedSerialization = Document{{AggregationRequest::kCommandName, nss.coll()}, {AggregationRequest::kPipelineName, Value(std::vector{})}, {AggregationRequest::kCursorName, Value(kDefaultCursorOptionDocument)}}; ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); } TEST(AggregationRequestTest, ShouldNotSerializeOptionalValuesIfEquivalentToDefault) { NamespaceString nss("a.collection"); AggregationRequest request(nss, {}); request.setExplain(boost::none); request.setAllowDiskUse(false); request.setFromMongos(false); request.setNeedsMerge(false); request.setBypassDocumentValidation(false); request.setCollation(BSONObj()); request.setHint(BSONObj()); request.setComment(""); request.setMaxTimeMS(0u); request.setUnwrappedReadPref(BSONObj()); request.setReadConcern(BSONObj()); auto expectedSerialization = Document{{AggregationRequest::kCommandName, nss.coll()}, {AggregationRequest::kPipelineName, Value(std::vector{})}, {AggregationRequest::kCursorName, Value(kDefaultCursorOptionDocument)}}; ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); } TEST(AggregationRequestTest, ShouldSerializeOptionalValuesIfSet) { NamespaceString nss("a.collection"); AggregationRequest request(nss, {}); request.setAllowDiskUse(true); request.setFromMongos(true); request.setNeedsMerge(true); request.setBypassDocumentValidation(true); request.setBatchSize(10); request.setMaxTimeMS(10u); const auto hintObj = BSON("a" << 1); request.setHint(hintObj); const auto comment = std::string("agg_comment"); request.setComment(comment); const auto collationObj = BSON("locale" << "en_US"); request.setCollation(collationObj); const auto readPrefObj = BSON("$readPreference" << "nearest"); request.setUnwrappedReadPref(readPrefObj); const auto readConcernObj = BSON("level" << "linearizable"); request.setReadConcern(readConcernObj); auto expectedSerialization = Document{{AggregationRequest::kCommandName, nss.coll()}, {AggregationRequest::kPipelineName, Value(std::vector{})}, {AggregationRequest::kAllowDiskUseName, true}, {AggregationRequest::kFromMongosName, true}, {AggregationRequest::kNeedsMergeName, true}, {bypassDocumentValidationCommandOption(), true}, {AggregationRequest::kCollationName, collationObj}, {AggregationRequest::kCursorName, Value(Document({{AggregationRequest::kBatchSizeName, 10}}))}, {AggregationRequest::kHintName, hintObj}, {AggregationRequest::kCommentName, comment}, {repl::ReadConcernArgs::kReadConcernFieldName, readConcernObj}, {QueryRequest::kUnwrappedReadPrefField, readPrefObj}, {QueryRequest::cmdOptionMaxTimeMS, 10}}; ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); } TEST(AggregationRequestTest, ShouldSerializeBatchSizeIfSetAndExplainFalse) { NamespaceString nss("a.collection"); AggregationRequest request(nss, {}); request.setBatchSize(10); auto expectedSerialization = Document{{AggregationRequest::kCommandName, nss.coll()}, {AggregationRequest::kPipelineName, Value(std::vector{})}, {AggregationRequest::kCursorName, Value(Document({{AggregationRequest::kBatchSizeName, 10}}))}}; ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); } TEST(AggregationRequestTest, ShouldSerialiseAggregateFieldToOneIfCollectionIsAggregateOneNSS) { NamespaceString nss = NamespaceString::makeCollectionlessAggregateNSS("a"); AggregationRequest request(nss, {}); auto expectedSerialization = Document{{AggregationRequest::kCommandName, 1}, {AggregationRequest::kPipelineName, Value(std::vector{})}, {AggregationRequest::kCursorName, Value(Document({{AggregationRequest::kBatchSizeName, AggregationRequest::kDefaultBatchSize}}))}}; ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); } TEST(AggregationRequestTest, ShouldSetBatchSizeToDefaultOnEmptyCursorObject) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}}"); auto request = AggregationRequest::parseFromBSON(nss, inputBson); ASSERT_OK(request.getStatus()); ASSERT_EQ(request.getValue().getBatchSize(), AggregationRequest::kDefaultBatchSize); } TEST(AggregationRequestTest, ShouldAcceptHintAsString) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], hint: 'a_1', cursor: {}}"); auto request = AggregationRequest::parseFromBSON(nss, inputBson); ASSERT_OK(request.getStatus()); ASSERT_BSONOBJ_EQ(request.getValue().getHint(), BSON("$hint" << "a_1")); } TEST(AggregationRequestTest, ShouldNotSerializeBatchSizeWhenExplainSet) { NamespaceString nss("a.collection"); AggregationRequest request(nss, {}); request.setBatchSize(10); request.setExplain(ExplainOptions::Verbosity::kQueryPlanner); auto expectedSerialization = Document{{AggregationRequest::kCommandName, nss.coll()}, {AggregationRequest::kPipelineName, Value(std::vector{})}, {AggregationRequest::kCursorName, Value(Document())}}; ASSERT_DOCUMENT_EQ(request.serializeToCommandObj(), expectedSerialization); } // // Error cases. // TEST(AggregationRequestTest, ShouldRejectNonArrayPipeline) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: {}, cursor: {}}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectPipelineArrayIfAnElementIsNotAnObject) { NamespaceString nss("a.collection"); BSONObj inputBson = fromjson("{pipeline: [4], cursor: {}}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}, 4], cursor: {}}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNonObjectCollation) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, collation: 1}"); ASSERT_NOT_OK( AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNonStringNonObjectHint) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, hint: 1}"); ASSERT_NOT_OK( AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectHintAsArray) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, hint: []}]}"); ASSERT_NOT_OK( AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNonStringComment) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, comment: 1}"); ASSERT_NOT_OK( AggregationRequest::parseFromBSON(NamespaceString("a.collection"), inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectExplainIfNumber) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, explain: 1}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectExplainIfObject) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, explain: {}}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNonBoolFromMongos) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: 1}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNonBoolNeedsMerge) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: 1, fromMongos: true}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNeedsMergeIfFromMongosNotPresent) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNonBoolNeedsMerge34) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: 1}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNeedsMergeIfNeedsMerge34AlsoPresent) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson( "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, needsMerge: true, fromMongos: true, " "fromRouter: true}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectFromMongosIfNeedsMerge34AlsoPresent) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson( "{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromMongos: true, fromRouter: true}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNonBoolAllowDiskUse) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, allowDiskUse: 1}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectNoCursorNoExplain) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}]}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectExplainTrueWithSeparateExplainArg) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], explain: true}"); ASSERT_NOT_OK( AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats) .getStatus()); } TEST(AggregationRequestTest, ShouldRejectExplainFalseWithSeparateExplainArg) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], explain: false}"); ASSERT_NOT_OK( AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats) .getStatus()); } TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithReadConcernMajority) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], readConcern: {level: 'majority'}}"); ASSERT_NOT_OK( AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats) .getStatus()); } TEST(AggregationRequestTest, ShouldRejectExplainWithWriteConcernMajority) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], explain: true, writeConcern: {w: 'majority'}}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectExplainExecStatsVerbosityWithWriteConcernMajority) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], writeConcern: {w: 'majority'}}"); ASSERT_NOT_OK( AggregationRequest::parseFromBSON(nss, inputBson, ExplainOptions::Verbosity::kExecStats) .getStatus()); } TEST(AggregationRequestTest, CannotParseNeedsMerge34) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, fromRouter: true}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ParseNSShouldReturnAggregateOneNSIfAggregateFieldIsOne) { const std::vector ones{ "1", "1.0", "NumberInt(1)", "NumberLong(1)", "NumberDecimal('1')"}; for (auto& one : ones) { const BSONObj inputBSON = fromjson(str::stream() << "{aggregate: " << one << ", pipeline: []}"); ASSERT(AggregationRequest::parseNs("a", inputBSON).isCollectionlessAggregateNS()); } } TEST(AggregationRequestTest, ParseNSShouldRejectNumericNSIfAggregateFieldIsNotOne) { const BSONObj inputBSON = fromjson("{aggregate: 2, pipeline: []}"); ASSERT_THROWS_CODE( AggregationRequest::parseNs("a", inputBSON), AssertionException, ErrorCodes::FailedToParse); } TEST(AggregationRequestTest, ParseNSShouldRejectNonStringNonNumericNS) { const BSONObj inputBSON = fromjson("{aggregate: {}, pipeline: []}"); ASSERT_THROWS_CODE( AggregationRequest::parseNs("a", inputBSON), AssertionException, ErrorCodes::TypeMismatch); } TEST(AggregationRequestTest, ParseNSShouldRejectAggregateOneStringAsCollectionName) { const BSONObj inputBSON = fromjson("{aggregate: '$cmd.aggregate', pipeline: []}"); ASSERT_THROWS_CODE(AggregationRequest::parseNs("a", inputBSON), AssertionException, ErrorCodes::InvalidNamespace); } TEST(AggregationRequestTest, ParseNSShouldRejectInvalidCollectionName) { const BSONObj inputBSON = fromjson("{aggregate: '', pipeline: []}"); ASSERT_THROWS_CODE(AggregationRequest::parseNs("a", inputBSON), AssertionException, ErrorCodes::InvalidNamespace); } TEST(AggregationRequestTest, ParseFromBSONOverloadsShouldProduceIdenticalRequests) { const BSONObj inputBSON = fromjson("{aggregate: 'collection', pipeline: [{$match: {}}, {$project: {}}], cursor: {}}"); NamespaceString nss("a.collection"); auto aggReqDBName = unittest::assertGet(AggregationRequest::parseFromBSON("a", inputBSON)); auto aggReqNSS = unittest::assertGet(AggregationRequest::parseFromBSON(nss, inputBSON)); ASSERT_DOCUMENT_EQ(aggReqDBName.serializeToCommandObj(), aggReqNSS.serializeToCommandObj()); } TEST(AggregationRequestTest, ShouldRejectExchangeNotObject) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], exchage: '42'}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldRejectExchangeInvalidSpec) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [], exchage: {}}"); ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } // // Ignore fields parsed elsewhere. // TEST(AggregationRequestTest, ShouldIgnoreQueryOptions) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, $queryOptions: {}}"); ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) { NamespaceString nss("a.collection"); const BSONObj inputBson = fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}"); ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } } // namespace } // namespace mongo