diff options
4 files changed, 1007 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 6423fa960ef..75babc86a5e 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -252,6 +252,7 @@ pipelineEnv.Library( 'document_source_tee_consumer.cpp', 'document_source_union_with.cpp', 'document_source_unwind.cpp', + 'document_source_internal_unpack_bucket.cpp', 'pipeline.cpp', 'semantic_analysis.cpp', 'sequential_document_cache.cpp', @@ -379,6 +380,7 @@ env.CppUnitTest( 'document_source_sort_by_count_test.cpp', 'document_source_sort_test.cpp', 'document_source_union_with_test.cpp', + 'document_source_internal_unpack_bucket_test.cpp', 'document_source_unwind_test.cpp', 'expression_and_test.cpp', 'expression_compare_test.cpp', diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp new file mode 100644 index 00000000000..02912847281 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(_internalUnpackBucket, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceInternalUnpackBucket::createFromBson); + +void BucketUnpacker::reset(Document&& bucket) { + _fieldIters.clear(); + _timeFieldIter = boost::none; + + _bucket = std::move(bucket); + uassert(5346510, "An empty bucket cannot be unpacked", !_bucket.empty()); + + if (_bucket[kBucketDataFieldName].getDocument().empty()) { + // If the data field of a bucket is present but it holds an empty object, there's nothing to + // unpack. + return; + } + + _metaValue = _bucket[kBucketMetaFieldName]; + uassert(5346511, + "A metadata value cannot be undefined nor missing if metaField is specified", + !_spec.metaField || + (_metaValue.getType() != BSONType::Undefined && !_metaValue.missing())); + + _timeFieldIter = _bucket[kBucketDataFieldName][_spec.timeField].getDocument().fieldIterator(); + + // Walk the data region of the bucket, and decide if an iterator should be set up based on the + // include or exclude case. + auto colIter = _bucket[kBucketDataFieldName].getDocument().fieldIterator(); + while (colIter.more()) { + auto&& [colName, colVal] = colIter.next(); + if (colName == _spec.timeField) { + // Skip adding a FieldIterator for the timeField since the timestamp value from + // _timeFieldIter can be placed accordingly in the materialized measurement. + continue; + } + auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end(); + if ((_unpackerBehavior == Behavior::kInclude) == found) { + _fieldIters.push_back({colName.toString(), colVal.getDocument().fieldIterator()}); + } + } +} + +Document BucketUnpacker::getNext() { + invariant(hasNext()); + + auto measurement = MutableDocument{}; + + auto&& [currentIdx, timeVal] = _timeFieldIter->next(); + if (_includeTimeField) { + measurement.addField(_spec.timeField, timeVal); + } + + if (!_metaValue.nullish()) { + measurement.addField(*_spec.metaField, _metaValue); + } + + for (auto&& [colName, colIter] : _fieldIters) { + if (colIter.fieldName() == currentIdx) { + auto&& [_, val] = colIter.next(); + measurement.addField(colName, val); + } + } + + return measurement.freeze(); +} + +DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket( + const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker) + : DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {} + +boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createFromBson( + BSONElement specElem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(5346500, + "$_internalUnpackBucket specification must be an object", + specElem.type() == Object); + + BucketUnpacker::Behavior unpackerBehavior; + BucketSpec bucketSpec; + auto hasIncludeExclude = false; + std::vector<std::string> fields; + for (auto&& elem : specElem.embeddedObject()) { + auto fieldName = elem.fieldNameStringData(); + if (fieldName == "include" || fieldName == "exclude") { + uassert(5346501, + "include or exclude field must be an array", + elem.type() == BSONType::Array); + + for (auto&& elt : elem.embeddedObject()) { + uassert(5346502, + "include or exclude field element must be a string", + elt.type() == BSONType::String); + auto field = elt.valueStringData(); + uassert(5346503, + "include or exclude field element must be a single-element field path", + field.find('.') == std::string::npos); + bucketSpec.fieldSet.emplace(field); + } + unpackerBehavior = fieldName == "include" ? BucketUnpacker::Behavior::kInclude + : BucketUnpacker::Behavior::kExclude; + hasIncludeExclude = true; + } else if (fieldName == kTimeFieldName) { + uassert(5346504, "timeField field must be a string", elem.type() == BSONType::String); + bucketSpec.timeField = elem.str(); + } else if (fieldName == kMetaFieldName) { + uassert(5346505, + str::stream() << "metaField field must be a string, got: " << elem.type(), + elem.type() == BSONType::String); + bucketSpec.metaField = elem.str(); + } else { + uasserted(5346506, + str::stream() + << "unrecognized parameter to $_internalUnpackBucket: " << fieldName); + } + } + + // Check that none of the required arguments are missing. + uassert(5346507, + "The $_internalUnpackBucket stage requries an include/exclude parameter", + hasIncludeExclude); + + uassert(5346508, + "The $_internalUnpackBucket stage requires a timeField parameter", + specElem[kTimeFieldName].ok()); + + // Determine if timestamp values should be included in the materialized measurements. + auto includeTimeField = (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == + (bucketSpec.fieldSet.find(bucketSpec.timeField) != bucketSpec.fieldSet.end()); + + return make_intrusive<DocumentSourceInternalUnpackBucket>( + expCtx, BucketUnpacker{std::move(bucketSpec), unpackerBehavior, includeTimeField}); +} + +Value DocumentSourceInternalUnpackBucket::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + MutableDocument out; + auto behavior = + _bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude; + auto&& spec = _bucketUnpacker.bucketSpec(); + std::vector<Value> fields; + for (auto&& field : spec.fieldSet) { + fields.emplace_back(field); + } + out.addField(behavior, Value{fields}); + out.addField(kTimeFieldName, Value{spec.timeField}); + if (spec.metaField) { + out.addField(kMetaFieldName, Value{*spec.metaField}); + } + return Value(DOC(getSourceName() << out.freeze())); +} + +DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() { + if (_bucketUnpacker.hasNext()) { + return _bucketUnpacker.getNext(); + } + + auto nextResult = pSource->getNext(); + if (nextResult.isAdvanced()) { + auto bucket = nextResult.getDocument(); + _bucketUnpacker.reset(std::move(bucket)); + uassert( + 5346509, + str::stream() << "A bucket with _id " + << _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName].toString() + << " contains an empty data region", + _bucketUnpacker.hasNext()); + return _bucketUnpacker.getNext(); + } + + return nextResult; +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h new file mode 100644 index 00000000000..9df0d6d6cbd --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <set> + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * Carries parameters for unpacking a bucket. + */ +struct BucketSpec { + // The user-supplied timestamp field name specified during time-series collection creation. + std::string timeField; + + // An optional user-supplied metadata field name specified during time-series collection + // creation. This field name is used during materialization of metadata fields of a measurement + // after unpacking. + boost::optional<std::string> metaField; + + // The set of fields provided to the include (or exclude) argument to the + // $_internalUnpackBucketStage. + std::set<std::string> fieldSet; +}; + +/** + * BucketUnpacker will unpack bucket fields for metadata and the provided fields. + */ +class BucketUnpacker { +public: + // These are hard-coded constants in the bucket schema. + static constexpr StringData kBucketIdFieldName = "_id"_sd; + static constexpr StringData kBucketDataFieldName = "data"_sd; + static constexpr StringData kBucketMetaFieldName = "meta"_sd; + + // When BucketUnpacker is created with kInclude it must produce measurements that contain the + // set of fields. Otherwise, if the kExclude option is used, the measurements will include the + // set difference between all fields in the bucket and the provided fields. + enum class Behavior { kInclude, kExclude }; + + BucketUnpacker(BucketSpec spec, Behavior unpackerBehavior, bool includeTimeField) + : _spec(std::move(spec)), + _unpackerBehavior(unpackerBehavior), + _includeTimeField(includeTimeField) {} + + Document getNext(); + bool hasNext() const { + return _timeFieldIter && _timeFieldIter->more(); + } + + /** + * This resets the unpacker to prepare to unpack a new bucket described by the given document. + */ + void reset(Document&& bucket); + + Behavior behavior() const { + return _unpackerBehavior; + } + + const BucketSpec& bucketSpec() const { + return _spec; + } + + const Document& bucket() const { + return _bucket; + } + +private: + const BucketSpec _spec; + const Behavior _unpackerBehavior; + + // Iterates the timestamp section of the bucket to drive the unpacking iteration. + boost::optional<FieldIterator> _timeFieldIter; + + // A flag used to mark that the timestamp value should be materialized in measurements. + const bool _includeTimeField; + + // Since the metadata value is the same across all materialized measurements we can cache the + // metadata value in the reset phase and use it to materialize the metadata in each measurement. + Value _metaValue; + + // The bucket being unpacked. + Document _bucket; + + // Iterators used to unpack the columns of the above bucket that are populated during the reset + // phase according to the provided 'Behavior' and 'BucketSpec'. + std::vector<std::pair<std::string, FieldIterator>> _fieldIters; +}; + +class DocumentSourceInternalUnpackBucket : public DocumentSource { +public: + static constexpr StringData kStageName = "$_internalUnpackBucket"_sd; + static constexpr StringData kInclude = "include"_sd; + static constexpr StringData kExclude = "exclude"_sd; + static constexpr StringData kTimeFieldName = "timeField"_sd; + static constexpr StringData kMetaFieldName = "metaField"_sd; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + DocumentSourceInternalUnpackBucket(const boost::intrusive_ptr<ExpressionContext>& expCtx, + BucketUnpacker bucketUnpacker); + + const char* getSourceName() const override { + return kStageName.rawData(); + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kAllowed, + LookupRequirement::kAllowed, + UnionRequirement::kAllowed, + ChangeStreamRequirement::kBlacklist}; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + return boost::none; + }; + +private: + GetNextResult doGetNext() final; + + BucketUnpacker _bucketUnpacker; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp new file mode 100644 index 00000000000..334c12837f0 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp @@ -0,0 +1,635 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" +#include "mongo/db/pipeline/document_source_mock.h" + +namespace mongo { +namespace { + +constexpr auto kUserDefinedTimeName = "time"_sd; +constexpr auto kUserDefinedMetaName = "myMeta"_sd; + +using InternalUnpackBucketStageTest = AggregationContextFixture; + +TEST_F(InternalUnpackBucketStageTest, UnpackBasicIncludeAllMeasurementFields) { + auto expCtx = getExpCtx(); + + auto spec = BSON("$_internalUnpackBucket" + << BSON("include" << BSON_ARRAY("_id" + << "time" + << "a" + << "b") + << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + // This source will produce two buckets. + auto source = DocumentSourceMock::createForTest( + {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1, '1':2}, b:{'1':1}}}", + "{meta: {'m1': 9, 'm2': 9, 'm3': 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, " + "a:{'0':1, '1':2}, b:{'1':1}}}}"}, + expCtx); + unpack->setSource(source.get()); + // The first result exists and is as expected. + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a: 2, b: 1}"))); + + // Second bucket + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3, a: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4, a: 2, b: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, UnpackExcludeASingleField) { + auto expCtx = getExpCtx(); + auto spec = BSON( + "$_internalUnpackBucket" << BSON( + "exclude" << BSON_ARRAY("b") << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = DocumentSourceMock::createForTest( + {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1, '1':2}, b:{'1':1}}}", + "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, " + "a:{'0':1, '1':2}, b:{'1':1}}}}"}, + expCtx); + unpack->setSource(source.get()); + // The first result exists and is as expected. + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a: 2}"))); + + // Second bucket + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3, a: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4, a: 2}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, UnpackEmptyInclude) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("include" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = DocumentSourceMock::createForTest( + {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1, '1':2}, b:{'1':1}}}", + "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, " + "a:{'0':1, '1':2}, b:{'1':1}}}}"}, + expCtx); + unpack->setSource(source.get()); + + // We should produce metadata only, one per measurement in the bucket. + for (auto idx = 0; idx < 2; ++idx) { + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{myMeta: {m1: 999, m2: 9999}}"))); + } + + for (auto idx = 0; idx < 2; ++idx) { + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{myMeta: {m1: 9, m2: 9, m3: 9}}"))); + } + + auto next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, UnpackEmptyExclude) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = DocumentSourceMock::createForTest( + {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1, '1':2}, b:{'1':1}}}", + "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, " + "a:{'0':1, '1':2}, b:{'1':1}}}}"}, + expCtx); + unpack->setSource(source.get()); + + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a: 2, b: 1}"))); + + // Second bucket + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3, a: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4, a: 2, b: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, UnpackBasicIncludeWithDollarPrefix) { + auto expCtx = getExpCtx(); + + auto spec = BSON("$_internalUnpackBucket" + << BSON("include" << BSON_ARRAY("_id" + << "time" + << "$a" + << "b") + << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + // This source will produce two buckets. + auto source = DocumentSourceMock::createForTest( + {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "$a:{'0':1, '1':2}, b:{'1':1}}}", + "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, " + "$a:{'0':1, '1':2}, b:{'1':1}}}}"}, + expCtx); + unpack->setSource(source.get()); + // The first result exists and is as expected. + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, $a: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, $a: 2, b: 1}"))); + + // Second bucket + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3, $a: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.getDocument(), + Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4, $a: 2, b: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, UnpackMetadataOnly) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = DocumentSourceMock::createForTest( + {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}}}", + "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}}}"}, + expCtx); + unpack->setSource(source.get()); + + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}"))); + + // Second bucket + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, UnpackWithStrangeTimestampOrdering) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = DocumentSourceMock::createForTest( + {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, " + "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}", + "{meta: {'m1': 9, 'm2': 9, 'm3': 9}, data: {_id: {'1':4, " + "'0':5, '2':6}, time: {'1':4, '0': 5, '2': 6}}}"}, + expCtx); + unpack->setSource(source.get()); + + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 3, myMeta: {m1: 999, m2: 9999}, _id: 3}"))); + + // Second bucket + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 5, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 5}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 6, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 6}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, + BucketUnpackerHandlesMissingMetadataWhenMetaFieldUnspecified) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = + DocumentSourceMock::createForTest({"{data: {_id: {'1':1, " + "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}", + "{data: {_id: {'1':4, " + "'0':5, '2':6}, time: {'1':4, '0': 5, '2': 6}}}"}, + expCtx); + unpack->setSource(source.get()); + + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 1, _id: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 2, _id: 2}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 3, _id: 3}"))); + + // Second bucket + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 4, _id: 4}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 5, _id: 5}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 6, _id: 6}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, BucketUnpackerThrowsOnUndefinedMetadata) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = + DocumentSourceMock::createForTest({"{meta: undefined, data: {_id: {'1':1, " + "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}"}, + expCtx); + unpack->setSource(source.get()); + ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5346511); +} + +TEST_F(InternalUnpackBucketStageTest, BucketUnpackerThrowsOnMissingMetadata) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = + DocumentSourceMock::createForTest({"{data: {_id: {'1':1, " + "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}"}, + expCtx); + unpack->setSource(source.get()); + ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5346511); +} + + +TEST_F(InternalUnpackBucketStageTest, BucketUnpackerHandlesNullMetadata) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = + DocumentSourceMock::createForTest({"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, " + "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}", + "{meta: null, data: {_id: {'1':4, " + "'0':5, '2':6}, time: {'1':4, '0': 5, '2': 6}}}"}, + expCtx); + unpack->setSource(source.get()); + + auto next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), + Document(fromjson("{time: 3, myMeta: {m1: 999, m2: 9999}, _id: 3}"))); + + // Second bucket + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 4, _id: 4}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 5, _id: 5}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 6, _id: 6}"))); + + next = unpack->getNext(); + ASSERT_TRUE(next.isEOF()); +} + +TEST_F(InternalUnpackBucketStageTest, ThrowsOnEmptyDataValue) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = DocumentSourceMock::createForTest( + Document{{"_id", 1}, {"meta", Document{{"m1", 999}, {"m2", 9999}}}, {"data", Document{}}}, + expCtx); + unpack->setSource(source.get()); + ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5346509); +} + +TEST_F(InternalUnpackBucketStageTest, HandlesEmptyBucket) { + auto expCtx = getExpCtx(); + auto spec = + BSON("$_internalUnpackBucket" + << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName + << kUserDefinedTimeName + << DocumentSourceInternalUnpackBucket::kMetaFieldName + << kUserDefinedMetaName)); + auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx); + + auto source = DocumentSourceMock::createForTest(Document{}, expCtx); + unpack->setSource(source.get()); + ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5346510); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonObjArgment) { + ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: 1}").firstElement(), getExpCtx()), + AssertionException, + 5346500); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonArrayInclude) { + ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {include: 'not array', timeField: " + "'foo', metaField: 'bar'}}") + .firstElement(), + getExpCtx()), + AssertionException, + 5346501); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonArrayExclude) { + ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {exclude: 'not array', timeField: " + "'foo', metaField: 'bar'}}") + .firstElement(), + getExpCtx()), + AssertionException, + 5346501); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonStringInclude) { + ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {include: [999, 1212], timeField: " + "'foo', metaField: 'bar'}}") + .firstElement(), + getExpCtx()), + AssertionException, + 5346502); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsDottedPaths) { + ASSERT_THROWS_CODE( + DocumentSourceInternalUnpackBucket::createFromBson( + fromjson( + "{$_internalUnpackBucket: {exclude: ['a.b'], timeField: 'foo', metaField: 'bar'}}") + .firstElement(), + getExpCtx()), + AssertionException, + 5346503); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsBadIncludeExcludeFieldName) { + ASSERT_THROWS_CODE( + DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {TYPO: [], timeField: 'foo', metaField: 'bar'}}") + .firstElement(), + getExpCtx()), + AssertionException, + 5346506); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonStringTimeField) { + ASSERT_THROWS_CODE( + DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {include: [], timeField: 999, metaField: 'bar'}}") + .firstElement(), + getExpCtx()), + AssertionException, + 5346504); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonStringMetaField) { + ASSERT_THROWS_CODE( + DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {include: [], timeField: 'foo', metaField: 999}}") + .firstElement(), + getExpCtx()), + AssertionException, + 5346505); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsAdditionalFields) { + ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {include: [], timeField: 'foo', " + "metaField: 'bar', extra: 1}}") + .firstElement(), + getExpCtx()), + AssertionException, + 5346506); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsMissingIncludeField) { + ASSERT_THROWS(DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {timeField: 'foo', metaField: 'bar'}}") + .firstElement(), + getExpCtx()), + AssertionException); +} + +TEST_F(InternalUnpackBucketStageTest, ParserRejectsMissingTimeField) { + ASSERT_THROWS( + DocumentSourceInternalUnpackBucket::createFromBson( + fromjson("{$_internalUnpackBucket: {include: [], metaField: 'bar'}}").firstElement(), + getExpCtx()), + AssertionException); +} +} // namespace +} // namespace mongo |