diff options
author | Sally McNichols <sally.mcnichols@mongodb.com> | 2016-07-26 17:52:36 -0400 |
---|---|---|
committer | Sally McNichols <sally.mcnichols@mongodb.com> | 2016-07-26 17:52:36 -0400 |
commit | 39d63ea21d7236a88616e89cb8381b34414ac349 (patch) | |
tree | 2acb3ee8c69049c8edb467e9ee122360c945c600 /src | |
parent | f22f6e220e5471c0876938bd0812ffa62901e3a7 (diff) | |
download | mongo-39d63ea21d7236a88616e89cb8381b34414ac349.tar.gz |
SERVER-24152 add $bucketAuto aggregation stage
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/pipeline/accumulator.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 98 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_bucket_auto.cpp | 391 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_group.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_test.cpp | 469 |
7 files changed, 1008 insertions, 45 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 9fad09d05ff..8dbab039bd8 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -179,6 +179,7 @@ docSourceEnv.Library( source=[ 'document_source.cpp', 'document_source_bucket.cpp', + 'document_source_bucket_auto.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', 'document_source_geo_near.cpp', diff --git a/src/mongo/db/pipeline/accumulator.cpp b/src/mongo/db/pipeline/accumulator.cpp index b2fc55a455e..f86191c1d05 100644 --- a/src/mongo/db/pipeline/accumulator.cpp +++ b/src/mongo/db/pipeline/accumulator.cpp @@ -37,6 +37,10 @@ namespace mongo { +using boost::intrusive_ptr; +using std::string; +using std::pair; + using Factory = Accumulator::Factory; namespace { @@ -59,4 +63,38 @@ Factory Accumulator::getFactory(StringData name) { return it->second; } +pair<StringData, intrusive_ptr<Expression>> Accumulator::parseAccumulator( + const BSONElement& elem, const VariablesParseState& vps) { + auto fieldName = elem.fieldNameStringData(); + uassert(40234, + str::stream() << "The field '" << fieldName << "' must be an accumulator object", + elem.type() == BSONType::Object && + elem.embeddedObject().firstElementFieldName()[0] == '$'); + + uassert(40235, + str::stream() << "The field name '" << fieldName << "' cannot contain '.'", + fieldName.find('.') == string::npos); + + uassert(40236, + str::stream() << "The field name '" << fieldName << "' cannot be an operator name", + fieldName[0] != '$'); + + intrusive_ptr<Expression> accExpression; + size_t accCount = 0; + for (auto&& acc : elem.embeddedObject()) { + accCount++; + auto accName = acc.fieldNameStringData(); + + uassert(40237, + str::stream() << "The " << accName << " accumulator is a unary operator", + acc.type() != BSONType::Array); + + accExpression = Expression::parseOperand(acc, vps); + } + uassert(40238, + str::stream() << "The field '" << fieldName << "' must specify one accumulator", + accCount == 1); + + return {fieldName, accExpression}; +} } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 81b84bf1982..eba774295f2 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -37,6 +37,7 @@ #include "mongo/base/init.h" #include "mongo/bson/bsontypes.h" +#include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/pipeline/value_comparator.h" @@ -103,6 +104,13 @@ public: */ static Factory getFactory(StringData name); + /** + * Parses a BSONElement that is an accumulator field, and returns a pair containing (field name, + * accumulator expression). + */ + static std::pair<StringData, boost::intrusive_ptr<Expression>> parseAccumulator( + const BSONElement& elem, const VariablesParseState& vps); + virtual bool isAssociative() const { return false; } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 8a1d0011890..28aaa1e1a9f 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -1834,7 +1834,6 @@ private: long long _outputIndex; }; - class DocumentSourceSortByCount final { public: static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( @@ -1886,4 +1885,101 @@ private: bool _latencySpecified = false; bool _finished = false; }; + +/** + * The $bucketAuto stage takes a user-specified number of buckets and automatically determines + * boundaries such that the values are approximately equally distributed between those buckets. + */ +class DocumentSourceBucketAuto final : public DocumentSource { +public: + Value serialize(bool explain = false) const final; + GetDepsReturn getDependencies(DepsTracker* deps) const final; + boost::optional<Document> getNext() final; + void dispose() final; + const char* getSourceName() const final; + + static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024; + + static boost::intrusive_ptr<DocumentSourceBucketAuto> create( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + int numBuckets = 0, + uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes); + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + explicit DocumentSourceBucketAuto(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + int numBuckets, + uint64_t maxMemoryUsageBytes); + + // struct for holding information about a bucket. + struct Bucket { + Bucket(Value min, Value max, std::vector<Accumulator::Factory> accumulatorFactories); + Value _min; + Value _max; + std::vector<boost::intrusive_ptr<Accumulator>> _accums; + }; + + /** + * Consumes all of the documents from the source in the pipeline and sorts them by their + * 'groupBy' value. + */ + void populateSorter(); + + /** + * Computes the 'groupBy' expression value for 'doc'. + */ + Value extractKey(const Document& doc); + + /** + * Calculates the bucket boundaries for the input documents and places them into buckets. + */ + void populateBuckets(); + + /** + * Adds an accumulator to this stage. + */ + void addAccumulator(StringData fieldName, + Accumulator::Factory accumulatorFactory, + const boost::intrusive_ptr<Expression>& expression); + + /** + * Adds the document in 'entry' to 'bucket' by updating the accumulators in 'bucket'. + */ + void addDocumentToBucket(const std::pair<Value, Document>& entry, Bucket& bucket); + + /** + * Adds 'newBucket' to _buckets and updates any boundaries if necessary. + */ + void addBucket(const Bucket& newBucket); + + /** + * Makes a document using the information from bucket. This is what is returned when getNext() + * is called. + */ + Document makeDocument(const Bucket& bucket); + + void parseGroupByExpression(const BSONElement& groupByField, const VariablesParseState& vps); + + std::unique_ptr<Sorter<Value, Document>> _sorter; + std::unique_ptr<Sorter<Value, Document>::Iterator> _sortedInput; + + // _fieldNames contains the field names for the result documents, _accumulatorFactories contains + // the accumulator factories for the result documents, and _expressions contains the common + // expressions used by each instance of each accumulator in order to find the right-hand side of + // what gets added to the accumulator. These three vectors parallel each other. + std::vector<std::string> _fieldNames; + std::vector<Accumulator::Factory> _accumulatorFactories; + std::vector<boost::intrusive_ptr<Expression>> _expressions; + + int _nBuckets; + uint64_t _maxMemoryUsageBytes; + bool _populated = false; + std::vector<Bucket> _buckets; + std::vector<Bucket>::iterator _bucketsIterator; + std::unique_ptr<Variables> _variables; + boost::intrusive_ptr<Expression> _groupByExpression; + long long _nDocuments = 0; +}; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp new file mode 100644 index 00000000000..1155bdd0249 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -0,0 +1,391 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::pair; +using std::vector; + +REGISTER_DOCUMENT_SOURCE(bucketAuto, DocumentSourceBucketAuto::createFromBson); + +const char* DocumentSourceBucketAuto::getSourceName() const { + return "$bucketAuto"; +} + +boost::optional<Document> DocumentSourceBucketAuto::getNext() { + pExpCtx->checkForInterrupt(); + + if (!_populated) { + populateSorter(); + populateBuckets(); + + _populated = true; + _bucketsIterator = _buckets.begin(); + } + + if (_bucketsIterator == _buckets.end()) { + dispose(); + return boost::none; + } + + return makeDocument(*(_bucketsIterator++)); +} + +DocumentSource::GetDepsReturn DocumentSourceBucketAuto::getDependencies(DepsTracker* deps) const { + // Add the 'groupBy' expression. + _groupByExpression->addDependencies(deps); + + // Add the 'output' fields. + for (auto&& exp : _expressions) { + exp->addDependencies(deps); + } + + // We know exactly which fields will be present in the output document. Future stages cannot + // depend on any further fields. The grouping process will remove any metadata from the + // documents, so there can be no further dependencies on metadata. + return EXHAUSTIVE_ALL; +} + +void DocumentSourceBucketAuto::populateSorter() { + if (!_sorter) { + SortOptions opts; + opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; + const auto& valueCmp = pExpCtx->getValueComparator(); + auto comparator = [valueCmp](const Sorter<Value, Document>::Data& lhs, + const Sorter<Value, Document>::Data& rhs) { + return valueCmp.compare(lhs.first, rhs.first); + }; + + _sorter.reset(Sorter<Value, Document>::make(opts, comparator)); + } + + _nDocuments = 0; + while (boost::optional<Document> next = pSource->getNext()) { + Document doc = *next; + _sorter->add(extractKey(doc), doc); + _nDocuments++; + } +} + +Value DocumentSourceBucketAuto::extractKey(const Document& doc) { + if (!_groupByExpression) { + return Value(BSONNULL); + } + + _variables->setRoot(doc); + Value key = _groupByExpression->evaluate(_variables.get()); + + // To be consistent with the $group stage, we consider "missing" to be equivalent to null when + // grouping values into buckets. + return key.missing() ? Value(BSONNULL) : std::move(key); +} + +void DocumentSourceBucketAuto::addDocumentToBucket(const pair<Value, Document>& entry, + Bucket& bucket) { + invariant(pExpCtx->getValueComparator().evaluate(entry.first >= bucket._max)); + bucket._max = entry.first; + + const size_t numAccumulators = _accumulatorFactories.size(); + _variables->setRoot(entry.second); + for (size_t k = 0; k < numAccumulators; k++) { + bucket._accums[k]->process(_expressions[k]->evaluate(_variables.get()), false); + } +} + +void DocumentSourceBucketAuto::populateBuckets() { + invariant(_sorter); + _sortedInput.reset(_sorter->done()); + _sorter.reset(); + + // If there are no buckets, then we don't need to populate anything. + if (_nBuckets == 0) { + return; + } + + // Calculate the approximate bucket size. We attempt to fill each bucket with this many + // documents. + long long approxBucketSize = std::round(double(_nDocuments) / double(_nBuckets)); + + if (approxBucketSize < 1) { + // If the number of buckets is larger than the number of documents, then we try to make as + // many buckets as possible by placing each document in its own bucket. + approxBucketSize = 1; + } + + boost::optional<pair<Value, Document>> firstEntryInNextBucket; + + // Start creating and populating the buckets. + for (int i = 0; i < _nBuckets; i++) { + bool isLastBucket = (i == _nBuckets - 1); + + // Get the first value to place in this bucket. + pair<Value, Document> currentValue; + if (firstEntryInNextBucket) { + currentValue = *firstEntryInNextBucket; + firstEntryInNextBucket = boost::none; + } else if (_sortedInput->more()) { + currentValue = _sortedInput->next(); + } else { + // No more values to process. + break; + } + + // Initialize the current bucket. + Bucket currentBucket(currentValue.first, currentValue.first, _accumulatorFactories); + + // Add the first value into the current bucket. + addDocumentToBucket(currentValue, currentBucket); + + if (isLastBucket) { + // If this is the last bucket allowed, we need to put any remaining documents in + // the current bucket. + while (_sortedInput->more()) { + addDocumentToBucket(_sortedInput->next(), currentBucket); + } + } else { + // We go to approxBucketSize - 1 because we already added the first value in order + // to keep track of the minimum value. + for (long long j = 0; j < approxBucketSize - 1; j++) { + if (_sortedInput->more()) { + addDocumentToBucket(_sortedInput->next(), currentBucket); + } else { + // No more values to process. + break; + } + } + + boost::optional<pair<Value, Document>> nextValue = _sortedInput->more() + ? boost::optional<pair<Value, Document>>(_sortedInput->next()) + : boost::none; + + // If there are any more values that are equal to the boundary value, then absorb them + // into the current bucket too. + while (nextValue && + pExpCtx->getValueComparator().evaluate(currentBucket._max == nextValue->first)) { + addDocumentToBucket(*nextValue, currentBucket); + nextValue = _sortedInput->more() + ? boost::optional<pair<Value, Document>>(_sortedInput->next()) + : boost::none; + } + firstEntryInNextBucket = nextValue; + } + + // Add the current bucket to the vector of buckets. + addBucket(currentBucket); + } +} + +DocumentSourceBucketAuto::Bucket::Bucket(Value min, + Value max, + vector<Accumulator::Factory> accumulatorFactories) + : _min(min), _max(max) { + _accums.reserve(accumulatorFactories.size()); + for (auto&& factory : accumulatorFactories) { + _accums.push_back(factory()); + } +} + +void DocumentSourceBucketAuto::addBucket(const Bucket& newBucket) { + // If there is a bucket that comes before the new bucket being added, then the previous bucket's + // max boundary is updated to the new bucket's min. This is makes it so that buckets' min + // boundaries are inclusive and max boundaries are exclusive (except for the last bucket, which + // has an inclusive max). + if (!_buckets.empty()) { + Bucket& previous = _buckets.back(); + previous._max = newBucket._min; + } + _buckets.push_back(newBucket); +} + +Document DocumentSourceBucketAuto::makeDocument(const Bucket& bucket) { + const size_t nAccumulatedFields = _fieldNames.size(); + MutableDocument out(1 + nAccumulatedFields); + + out.addField("_id", Value{Document{{"min", bucket._min}, {"max", bucket._max}}}); + + const bool mergingOutput = false; + for (size_t i = 0; i < nAccumulatedFields; i++) { + Value val = bucket._accums[i]->getValue(mergingOutput); + + // To be consistent with the $group stage, we consider "missing" to be equivalent to null + // when evaluating accumulators. + out.addField(_fieldNames[i], val.missing() ? Value(BSONNULL) : std::move(val)); + } + return out.freeze(); +} + +void DocumentSourceBucketAuto::dispose() { + _sortedInput.reset(); + _bucketsIterator = _buckets.end(); + pSource->dispose(); +} + +Value DocumentSourceBucketAuto::serialize(bool explain) const { + MutableDocument insides; + + insides["groupBy"] = _groupByExpression->serialize(explain); + insides["buckets"] = Value(_nBuckets); + + const size_t nOutputFields = _fieldNames.size(); + MutableDocument outputSpec(nOutputFields); + for (size_t i = 0; i < nOutputFields; i++) { + intrusive_ptr<Accumulator> accum = _accumulatorFactories[i](); + outputSpec[_fieldNames[i]] = + Value{Document{{accum->getOpName(), _expressions[i]->serialize(explain)}}}; + } + insides["output"] = outputSpec.freezeToValue(); + + // TODO SERVER-24152: handle granularity field + + return Value{Document{{getSourceName(), insides.freezeToValue()}}}; +} + +intrusive_ptr<DocumentSourceBucketAuto> DocumentSourceBucketAuto::create( + const intrusive_ptr<ExpressionContext>& pExpCtx, int numBuckets, uint64_t maxMemoryUsageBytes) { + return new DocumentSourceBucketAuto(pExpCtx, numBuckets, maxMemoryUsageBytes); +} + +DocumentSourceBucketAuto::DocumentSourceBucketAuto(const intrusive_ptr<ExpressionContext>& pExpCtx, + int numBuckets, + uint64_t maxMemoryUsageBytes) + : DocumentSource(pExpCtx), _nBuckets(numBuckets), _maxMemoryUsageBytes(maxMemoryUsageBytes) {} + +void DocumentSourceBucketAuto::parseGroupByExpression(const BSONElement& groupByField, + const VariablesParseState& vps) { + if (groupByField.type() == BSONType::Object && + groupByField.embeddedObject().firstElementFieldName()[0] == '$') { + _groupByExpression = Expression::parseObject(groupByField.embeddedObject(), vps); + } else if (groupByField.type() == BSONType::String && + groupByField.valueStringData()[0] == '$') { + _groupByExpression = ExpressionFieldPath::parse(groupByField.str(), vps); + } else { + uasserted( + 40239, + str::stream() << "The $bucketAuto 'groupBy' field must be defined as a $-prefixed " + "path or an expression object, but found: " + << groupByField.toString(false, false)); + } +} + +void DocumentSourceBucketAuto::addAccumulator(StringData fieldName, + Accumulator::Factory accumulatorFactory, + const intrusive_ptr<Expression>& expression) { + + _fieldNames.push_back(fieldName.toString()); + _accumulatorFactories.push_back(accumulatorFactory); + _expressions.push_back(expression); +} + +intrusive_ptr<DocumentSource> DocumentSourceBucketAuto::createFromBson( + BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { + uassert(40240, + str::stream() << "The argument to $bucketAuto must be an object, but found type: " + << typeName(elem.type()), + elem.type() == BSONType::Object); + + intrusive_ptr<DocumentSourceBucketAuto> bucketAuto(DocumentSourceBucketAuto::create(pExpCtx)); + + const BSONObj bucketAutoObj = elem.embeddedObject(); + VariablesIdGenerator idGenerator; + VariablesParseState vps(&idGenerator); + bool outputFieldSpecified = false; + + for (auto&& argument : bucketAutoObj) { + const auto argName = argument.fieldNameStringData(); + if ("groupBy" == argName) { + bucketAuto->parseGroupByExpression(argument, vps); + } else if ("buckets" == argName) { + Value bucketsValue = Value(argument); + + uassert( + 40241, + str::stream() + << "The $bucketAuto 'buckets' field must be a numeric value, but found type: " + << typeName(argument.type()), + bucketsValue.numeric()); + + uassert(40242, + str::stream() << "The $bucketAuto 'buckets' field must be representable as a " + "32-bit integer, but found " + << Value(argument).coerceToDouble(), + bucketsValue.integral()); + + int numBuckets = bucketsValue.coerceToInt(); + uassert(40243, + str::stream() + << "The $bucketAuto 'buckets' field must be greater than 0, but found: " + << numBuckets, + numBuckets > 0); + bucketAuto->_nBuckets = numBuckets; + } else if ("output" == argName) { + uassert(40244, + str::stream() + << "The $bucketAuto 'output' field must be an object, but found type: " + << typeName(argument.type()), + argument.type() == BSONType::Object); + + outputFieldSpecified = true; + for (auto&& outputField : argument.embeddedObject()) { + auto parsedAccumulator = Accumulator::parseAccumulator(outputField, vps); + + auto fieldName = parsedAccumulator.first; + auto accExpression = parsedAccumulator.second; + auto factory = + Accumulator::getFactory(outputField.embeddedObject().firstElementFieldName()); + + bucketAuto->addAccumulator(fieldName, factory, accExpression); + } + } else { + uasserted(40245, str::stream() << "Unrecognized option to $bucketAuto: " << argName); + } + + // TODO SERVER-24152: handle granularity field + } + + uassert(40246, + "$bucketAuto requires 'groupBy' and 'buckets' to be specified", + bucketAuto->_groupByExpression && bucketAuto->_nBuckets > 0); + + // If there is no output field specified, then add the default one. + if (!outputFieldSpecified) { + bucketAuto->addAccumulator("count"_sd, + Accumulator::getFactory("$sum"), + ExpressionConstant::create(pExpCtx, Value(1))); + } + + bucketAuto->_variables.reset(new Variables(idGenerator.getIdCount())); + + return bucketAuto; +} +} // namespace mongo + +#include "mongo/db/sorter/sorter.cpp" +// Explicit instantiation unneeded since we aren't exposing Sorter outside of this file. diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 0a37706b455..d10eb4dc513 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -306,49 +306,13 @@ intrusive_ptr<DocumentSource> DocumentSourceGroup::createFromBson( Treat as a projection field with the additional ability to add aggregation operators. */ - uassert( - 16414, - str::stream() << "the group aggregate field name '" << pFieldName - << "' cannot be used because $group's field names cannot contain '.'", - !str::contains(pFieldName, '.')); - - uassert(15950, - str::stream() << "the group aggregate field name '" << pFieldName - << "' cannot be an operator name", - pFieldName[0] != '$'); - - uassert(15951, - str::stream() << "the group aggregate field '" << pFieldName - << "' must be defined as an expression inside an object", - groupField.type() == Object); - - BSONObj subField(groupField.Obj()); - BSONObjIterator subIterator(subField); - size_t subCount = 0; - for (; subIterator.more(); ++subCount) { - BSONElement subElement(subIterator.next()); - - auto name = subElement.fieldNameStringData(); - Accumulator::Factory factory = Accumulator::getFactory(name); - intrusive_ptr<Expression> pGroupExpr; - BSONType elementType = subElement.type(); - if (elementType == Object) { - pGroupExpr = Expression::parseObject(subElement.Obj(), vps); - } else if (elementType == Array) { - uasserted(15953, - str::stream() << "aggregating group operators are unary (" << name - << ")"); - } else { /* assume its an atomic single operand */ - pGroupExpr = Expression::parseOperand(subElement, vps); - } - - pGroup->addAccumulator(pFieldName, factory, pGroupExpr); - } + auto parsedAccumulator = Accumulator::parseAccumulator(groupField, vps); + auto fieldName = parsedAccumulator.first.toString(); + auto accExpression = parsedAccumulator.second; + auto factory = + Accumulator::getFactory(groupField.embeddedObject().firstElementFieldName()); - uassert(15954, - str::stream() << "the computed aggregate '" << pFieldName - << "' must specify exactly one operator", - subCount == 1); + pGroup->addAccumulator(fieldName, factory, accExpression); } } diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp index eb38ddc7436..f8c44b1e9e6 100644 --- a/src/mongo/db/pipeline/document_source_test.cpp +++ b/src/mongo/db/pipeline/document_source_test.cpp @@ -3834,12 +3834,12 @@ TEST_F(InvalidBucketSpec, GroupFailsForBucketWithInvalidOutputField) { spec = fromjson( "{$bucket : {groupBy : '$x', boundaries : [1, 2, 3], output : {number : 'test'}}}"); - ASSERT_THROWS_CODE(createBucket(spec), UserException, 15951); + ASSERT_THROWS_CODE(createBucket(spec), UserException, 40234); spec = fromjson( "{$bucket : {groupBy : '$x', boundaries : [1, 2, 3], output : {'test.test' : {$sum : " "1}}}}"); - ASSERT_THROWS_CODE(createBucket(spec), UserException, 16414); + ASSERT_THROWS_CODE(createBucket(spec), UserException, 40235); } TEST_F(InvalidBucketSpec, SwitchFailsForBucketWhenNoDefaultSpecified) { @@ -3861,6 +3861,471 @@ TEST_F(InvalidBucketSpec, SwitchFailsForBucketWhenNoDefaultSpecified) { } } // namespace DocumentSourceBucket +namespace DocumentSourceBucketAuto { +using mongo::DocumentSourceBucketAuto; +using mongo::DocumentSourceMock; +using std::vector; +using std::deque; +using boost::intrusive_ptr; + +class BucketAutoTests : public Mock::Base, public unittest::Test { +public: + intrusive_ptr<DocumentSource> createBucketAuto(BSONObj bucketAutoSpec) { + return DocumentSourceBucketAuto::createFromBson(bucketAutoSpec.firstElement(), ctx()); + } + + vector<Document> getResults(BSONObj bucketAutoSpec, deque<Document> docs) { + auto bucketAutoStage = createBucketAuto(bucketAutoSpec); + assertBucketAutoType(bucketAutoStage); + + auto source = DocumentSourceMock::create(docs); + bucketAutoStage->setSource(source.get()); + + vector<Document> results; + while (boost::optional<Document> next = bucketAutoStage->getNext()) { + results.push_back(*next); + } + + return results; + } + + void testSerialize(BSONObj bucketAutoSpec, BSONObj expectedObj) { + auto bucketAutoStage = createBucketAuto(bucketAutoSpec); + assertBucketAutoType(bucketAutoStage); + + const bool explain = true; + vector<Value> explainedStages; + bucketAutoStage->serializeToArray(explainedStages, explain); + ASSERT_EQUALS(explainedStages.size(), 1UL); + + Value expectedExplain = Value(expectedObj); + + auto bucketAutoExplain = explainedStages[0]; + ASSERT_VALUE_EQ(bucketAutoExplain["$bucketAuto"], expectedExplain); + } + +private: + void assertBucketAutoType(intrusive_ptr<DocumentSource> documentSource) { + const auto* bucketAutoStage = dynamic_cast<DocumentSourceBucketAuto*>(documentSource.get()); + ASSERT(bucketAutoStage); + } +}; + +TEST_F(BucketAutoTests, ReturnsNoBucketsWhenSourceIsEmpty) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets: 1}}"); + auto results = getResults(bucketAutoSpec, {}); + ASSERT_EQUALS(results.size(), 0UL); +} + +TEST_F(BucketAutoTests, Returns1Of1RequestedBucketWhenAllUniqueValues) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets: 1}}"); + + // Values are 1, 2, 3, 4 + auto intDocs = {Document{{"x", 4}}, Document{{"x", 1}}, Document{{"x", 3}}, Document{{"x", 2}}}; + auto results = getResults(bucketAutoSpec, intDocs); + ASSERT_EQUALS(results.size(), 1UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 1, max : 4}, count : 4}"))); + + // Values are 'a', 'b', 'c', 'd' + auto stringDocs = { + Document{{"x", "d"}}, Document{{"x", "b"}}, Document{{"x", "a"}}, Document{{"x", "c"}}}; + results = getResults(bucketAutoSpec, stringDocs); + ASSERT_EQUALS(results.size(), 1UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 'a', max : 'd'}, count : 4}"))); +} + +TEST_F(BucketAutoTests, Returns1Of1RequestedBucketWithNonUniqueValues) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets: 1}}"); + + // Values are 1, 2, 7, 7, 7 + auto docs = {Document{{"x", 7}}, + Document{{"x", 1}}, + Document{{"x", 7}}, + Document{{"x", 2}}, + Document{{"x", 7}}}; + auto results = getResults(bucketAutoSpec, docs); + ASSERT_EQUALS(results.size(), 1UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 1, max : 7}, count : 5}"))); +} + +TEST_F(BucketAutoTests, Returns1Of1RequestedBucketWhen1ValueInSource) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets: 1}}"); + auto intDocs = {Document{{"x", 1}}}; + auto results = getResults(bucketAutoSpec, intDocs); + ASSERT_EQUALS(results.size(), 1UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 1, max : 1}, count : 1}"))); + + auto stringDocs = {Document{{"x", "a"}}}; + results = getResults(bucketAutoSpec, stringDocs); + ASSERT_EQUALS(results.size(), 1UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 'a', max : 'a'}, count : 1}"))); +} + +TEST_F(BucketAutoTests, Returns2Of2RequestedBucketsWhenSmallestValueHasManyDuplicates) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}"); + + // Values are 1, 1, 1, 1, 2 + auto docs = {Document{{"x", 1}}, + Document{{"x", 1}}, + Document{{"x", 1}}, + Document{{"x", 2}}, + Document{{"x", 1}}}; + auto results = getResults(bucketAutoSpec, docs); + ASSERT_EQUALS(results.size(), 2UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 1, max : 2}, count : 4}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 2, max : 2}, count : 1}"))); +} + +TEST_F(BucketAutoTests, Returns2Of2RequestedBucketsWhenLargestValueHasManyDuplicates) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}"); + + // Values are 0, 1, 2, 3, 4, 5, 5, 5, 5 + auto docs = {Document{{"x", 5}}, + Document{{"x", 0}}, + Document{{"x", 2}}, + Document{{"x", 3}}, + Document{{"x", 5}}, + Document{{"x", 1}}, + Document{{"x", 5}}, + Document{{"x", 4}}, + Document{{"x", 5}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 2UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 5}, count : 5}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 5, max : 5}, count : 4}"))); +} + +TEST_F(BucketAutoTests, Returns3Of3RequestedBucketsWhenAllUniqueValues) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 3}}"); + + // Values are 0, 1, 2, 3, 4, 5, 6, 7 + auto docs = {Document{{"x", 2}}, + Document{{"x", 4}}, + Document{{"x", 1}}, + Document{{"x", 7}}, + Document{{"x", 0}}, + Document{{"x", 5}}, + Document{{"x", 3}}, + Document{{"x", 6}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 3UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 3}, count : 3}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 3, max : 6}, count : 3}"))); + ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 6, max : 7}, count : 2}"))); +} + +TEST_F(BucketAutoTests, Returns2Of3RequestedBucketsWhenLargestValueHasManyDuplicates) { + // In this case, two buckets will be made because the approximate bucket size calculated will be + // 7/3, which rounds to 2. Therefore, the boundaries will be calculated so that values 0 and 1 + // into the first bucket. All of the 2 values will then fall into a second bucket. + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 3}}"); + + // Values are 0, 1, 2, 2, 2, 2, 2 + auto docs = {Document{{"x", 2}}, + Document{{"x", 0}}, + Document{{"x", 2}}, + Document{{"x", 2}}, + Document{{"x", 1}}, + Document{{"x", 2}}, + Document{{"x", 2}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 2UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 2}, count : 2}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 2, max : 2}, count : 5}"))); +} + +TEST_F(BucketAutoTests, Returns1Of3RequestedBucketsWhenLargestValueHasManyDuplicates) { + // In this case, one bucket will be made because the approximate bucket size calculated will be + // 8/3, which rounds to 3. Therefore, the boundaries will be calculated so that values 0, 1, and + // 2 fall into the first bucket. Since 2 is repeated many times, all of the 2 values will be + // pulled into the first bucket. + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 3}}"); + + // Values are 0, 1, 2, 2, 2, 2, 2, 2 + auto docs = {Document{{"x", 2}}, + Document{{"x", 2}}, + Document{{"x", 0}}, + Document{{"x", 2}}, + Document{{"x", 2}}, + Document{{"x", 2}}, + Document{{"x", 1}}, + Document{{"x", 2}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 1UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 2}, count : 8}"))); +} + +TEST_F(BucketAutoTests, Returns3Of3RequestedBucketsWhen3ValuesInSource) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 3}}"); + auto docs = {Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 3UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 1}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 1, max : 2}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 2, max : 2}, count : 1}"))); +} + +TEST_F(BucketAutoTests, Returns3Of10RequestedBucketsWhen3ValuesInSource) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 10}}"); + auto docs = {Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 3UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 1}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 1, max : 2}, count : 1}"))); + ASSERT_DOCUMENT_EQ(results[2], Document(fromjson("{_id : {min : 2, max : 2}, count : 1}"))); +} + +TEST_F(BucketAutoTests, EvaluatesAccumulatorsInOutputField) { + auto bucketAutoSpec = + fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2, output : {avg : {$avg : '$x'}}}}"); + auto docs = {Document{{"x", 0}}, Document{{"x", 2}}, Document{{"x", 4}}, Document{{"x", 6}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 2UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0, max : 4}, avg : 1}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 4, max : 6}, avg : 5}"))); +} + +TEST_F(BucketAutoTests, EvaluatesNonFieldPathExpressionInGroupByField) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : {$add : ['$x', 1]}, buckets : 2}}"); + auto docs = {Document{{"x", 0}}, Document{{"x", 1}}, Document{{"x", 2}}, Document{{"x", 3}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 2UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 1, max : 3}, count : 2}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 3, max : 4}, count : 2}"))); +} + +TEST_F(BucketAutoTests, RespectsCanonicalTypeOrderingOfValues) { + auto bucketAutoSpec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}"); + auto docs = {Document{{"x", "a"}}, + Document{{"x", 1}}, + Document{{"x", "b"}}, + Document{{"x", 2}}, + Document{{"x", 0.0}}}; + auto results = getResults(bucketAutoSpec, docs); + + ASSERT_EQUALS(results.size(), 2UL); + ASSERT_DOCUMENT_EQ(results[0], Document(fromjson("{_id : {min : 0.0, max : 'a'}, count : 3}"))); + ASSERT_DOCUMENT_EQ(results[1], Document(fromjson("{_id : {min : 'a', max : 'b'}, count : 2}"))); +} + +TEST_F(BucketAutoTests, SourceNameIsBucketAuto) { + auto bucketAuto = createBucketAuto(fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}")); + ASSERT_EQUALS(bucketAuto->getSourceName(), "$bucketAuto"); +} + +TEST_F(BucketAutoTests, ShouldAddDependenciesOfGroupByFieldAndComputedFields) { + auto bucketAuto = + createBucketAuto(fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2, output: {field1 : " + "{$sum : '$a'}, field2 : {$avg : '$b'}}}}")); + + DepsTracker dependencies; + ASSERT_EQUALS(DocumentSource::EXHAUSTIVE_ALL, bucketAuto->getDependencies(&dependencies)); + ASSERT_EQUALS(3U, dependencies.fields.size()); + + // Dependency from 'groupBy' + ASSERT_EQUALS(1U, dependencies.fields.count("x")); + + // Dependencies from 'output' + ASSERT_EQUALS(1U, dependencies.fields.count("a")); + ASSERT_EQUALS(1U, dependencies.fields.count("b")); + + ASSERT_EQUALS(false, dependencies.needWholeDocument); + ASSERT_EQUALS(false, dependencies.getNeedTextScore()); +} + +TEST_F(BucketAutoTests, ShouldNeedTextScoreInDependenciesFromGroupByField) { + auto bucketAuto = + createBucketAuto(fromjson("{$bucketAuto : {groupBy : {$meta: 'textScore'}, buckets : 2}}")); + + DepsTracker dependencies(DepsTracker::MetadataAvailable::kTextScore); + ASSERT_EQUALS(DocumentSource::EXHAUSTIVE_ALL, bucketAuto->getDependencies(&dependencies)); + ASSERT_EQUALS(0U, dependencies.fields.size()); + + ASSERT_EQUALS(false, dependencies.needWholeDocument); + ASSERT_EQUALS(true, dependencies.getNeedTextScore()); +} + +TEST_F(BucketAutoTests, ShouldNeedTextScoreInDependenciesFromOutputField) { + auto bucketAuto = + createBucketAuto(fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2, output: {avg : " + "{$avg : {$meta : 'textScore'}}}}}")); + + DepsTracker dependencies(DepsTracker::MetadataAvailable::kTextScore); + ASSERT_EQUALS(DocumentSource::EXHAUSTIVE_ALL, bucketAuto->getDependencies(&dependencies)); + ASSERT_EQUALS(1U, dependencies.fields.size()); + + // Dependency from 'groupBy' + ASSERT_EQUALS(1U, dependencies.fields.count("x")); + + ASSERT_EQUALS(false, dependencies.needWholeDocument); + ASSERT_EQUALS(true, dependencies.getNeedTextScore()); +} + +TEST_F(BucketAutoTests, SerializesDefaultAccumulatorIfOutputFieldIsNotSpecified) { + BSONObj spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2}}"); + BSONObj expected = + fromjson("{groupBy : '$x', buckets : 2, output : {count : {$sum : {$const : 1}}}}"); + + testSerialize(spec, expected); +} + +TEST_F(BucketAutoTests, SerializesOutputFieldIfSpecified) { + BSONObj spec = + fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2, output : {field : {$avg : '$x'}}}}"); + BSONObj expected = fromjson("{groupBy : '$x', buckets : 2, output : {field : {$avg : '$x'}}}"); + + testSerialize(spec, expected); +} + +TEST_F(BucketAutoTests, ShouldBeAbleToReParseSerializedStage) { + auto bucketAuto = createBucketAuto(fromjson( + "{$bucketAuto : {groupBy : '$x', buckets : 2, output : {field : {$avg : '$x'}}}}")); + vector<Value> serialization; + bucketAuto->serializeToArray(serialization); + ASSERT_EQUALS(serialization.size(), 1UL); + ASSERT_EQUALS(serialization[0].getType(), BSONType::Object); + + ASSERT_EQUALS(serialization[0].getDocument().size(), 1UL); + ASSERT_EQUALS(serialization[0].getDocument()["$bucketAuto"].getType(), BSONType::Object); + + auto serializedBson = serialization[0].getDocument().toBson(); + auto roundTripped = createBucketAuto(serializedBson); + + vector<Value> newSerialization; + roundTripped->serializeToArray(newSerialization); + + ASSERT_EQUALS(newSerialization.size(), 1UL); + ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); +} + +TEST_F(BucketAutoTests, ReturnsNoBucketsWhenNoBucketsAreSpecifiedInCreate) { + auto docs = {Document{{"x", 1}}}; + auto mock = DocumentSourceMock::create(docs); + auto bucketAuto = DocumentSourceBucketAuto::create(ctx()); + + bucketAuto->setSource(mock.get()); + auto result = bucketAuto->getNext(); + ASSERT(!result); +} + +TEST_F(BucketAutoTests, FailsWithInvalidNumberOfBuckets) { + auto spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 'test'}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40241); + + spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 2147483648}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40242); + + spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 1.5}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40242); + + spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 0}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40243); + + spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : -1}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40243); +} + +TEST_F(BucketAutoTests, FailsWithNonExpressionGroupBy) { + auto spec = fromjson("{$bucketAuto : {groupBy : 'test', buckets : 1}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40239); + + spec = fromjson("{$bucketAuto : {groupBy : {test : 'test'}, buckets : 1}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40239); +} + +TEST_F(BucketAutoTests, FailsWithNonObjectArgument) { + auto spec = fromjson("{$bucketAuto : 'test'}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40240); + + spec = fromjson("{$bucketAuto : [1, 2, 3]}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40240); +} + +TEST_F(BucketAutoTests, FailsWithNonObjectOutput) { + auto spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 1, output : 'test'}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40244); + + spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 1, output : [1, 2, 3]}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40244); + + spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 1, output : 1}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40244); +} + +TEST_F(BucketAutoTests, FailsWhenGroupByMissing) { + auto spec = fromjson("{$bucketAuto : {buckets : 1}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40246); +} + +TEST_F(BucketAutoTests, FailsWhenBucketsMissing) { + auto spec = fromjson("{$bucketAuto : {groupBy : '$x'}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40246); +} + +TEST_F(BucketAutoTests, FailsWithUnknownField) { + auto spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 1, field : 'test'}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40245); +} + +TEST_F(BucketAutoTests, FailsWithInvalidExpressionToAccumulator) { + auto spec = fromjson( + "{$bucketAuto : {groupBy : '$x', buckets : 1, output : {avg : {$avg : ['$x', 1]}}}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40237); + + spec = fromjson( + "{$bucketAuto : {groupBy : '$x', buckets : 1, output : {test : {$avg : '$x', $sum : " + "'$x'}}}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40238); +} + +TEST_F(BucketAutoTests, FailsWithNonAccumulatorObjectOutputField) { + auto spec = + fromjson("{$bucketAuto : {groupBy : '$x', buckets : 1, output : {field : 'test'}}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40234); + + spec = fromjson("{$bucketAuto : {groupBy : '$x', buckets : 1, output : {field : 1}}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40234); + + spec = fromjson( + "{$bucketAuto : {groupBy : '$x', buckets : 1, output : {test : {field : 'test'}}}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40234); +} + +TEST_F(BucketAutoTests, FailsWithInvalidOutputFieldName) { + auto spec = fromjson( + "{$bucketAuto : {groupBy : '$x', buckets : 1, output : {'field.test' : {$avg : '$x'}}}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40235); + + spec = fromjson( + "{$bucketAuto : {groupBy : '$x', buckets : 1, output : {'$field' : {$avg : '$x'}}}}"); + ASSERT_THROWS_CODE(createBucketAuto(spec), UserException, 40236); +} + +TEST_F(BucketAutoTests, FailsWhenBufferingTooManyDocuments) { + std::deque<Document> inputs; + auto largeStr = std::string(1000, 'b'); + auto inputDoc = Document{{"a", largeStr}}; + ASSERT_GTE(inputDoc.getApproximateSize(), 1000UL); + inputs.push_back(inputDoc); + inputs.push_back(Document{{"a", largeStr}}); + auto mock = DocumentSourceMock::create(inputs); + + const uint64_t maxMemoryUsageBytes = 1000; + const int numBuckets = 1; + auto bucketAuto = DocumentSourceBucketAuto::create(ctx(), numBuckets, maxMemoryUsageBytes); + bucketAuto->setSource(mock.get()); + ASSERT_THROWS_CODE(bucketAuto->getNext(), UserException, 16819); +} +} // namespace DocumentSourceBucketAuto + class All : public Suite { public: All() : Suite("documentsource") {} |