summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp210
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h160
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp635
4 files changed, 1007 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 6423fa960ef..75babc86a5e 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -252,6 +252,7 @@ pipelineEnv.Library(
'document_source_tee_consumer.cpp',
'document_source_union_with.cpp',
'document_source_unwind.cpp',
+ 'document_source_internal_unpack_bucket.cpp',
'pipeline.cpp',
'semantic_analysis.cpp',
'sequential_document_cache.cpp',
@@ -379,6 +380,7 @@ env.CppUnitTest(
'document_source_sort_by_count_test.cpp',
'document_source_sort_test.cpp',
'document_source_union_with_test.cpp',
+ 'document_source_internal_unpack_bucket_test.cpp',
'document_source_unwind_test.cpp',
'expression_and_test.cpp',
'expression_compare_test.cpp',
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
new file mode 100644
index 00000000000..02912847281
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp
@@ -0,0 +1,210 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
+
+#include "mongo/db/exec/document_value/document.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(_internalUnpackBucket,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceInternalUnpackBucket::createFromBson);
+
+void BucketUnpacker::reset(Document&& bucket) {
+ _fieldIters.clear();
+ _timeFieldIter = boost::none;
+
+ _bucket = std::move(bucket);
+ uassert(5346510, "An empty bucket cannot be unpacked", !_bucket.empty());
+
+ if (_bucket[kBucketDataFieldName].getDocument().empty()) {
+ // If the data field of a bucket is present but it holds an empty object, there's nothing to
+ // unpack.
+ return;
+ }
+
+ _metaValue = _bucket[kBucketMetaFieldName];
+ uassert(5346511,
+ "A metadata value cannot be undefined nor missing if metaField is specified",
+ !_spec.metaField ||
+ (_metaValue.getType() != BSONType::Undefined && !_metaValue.missing()));
+
+ _timeFieldIter = _bucket[kBucketDataFieldName][_spec.timeField].getDocument().fieldIterator();
+
+ // Walk the data region of the bucket, and decide if an iterator should be set up based on the
+ // include or exclude case.
+ auto colIter = _bucket[kBucketDataFieldName].getDocument().fieldIterator();
+ while (colIter.more()) {
+ auto&& [colName, colVal] = colIter.next();
+ if (colName == _spec.timeField) {
+ // Skip adding a FieldIterator for the timeField since the timestamp value from
+ // _timeFieldIter can be placed accordingly in the materialized measurement.
+ continue;
+ }
+ auto found = _spec.fieldSet.find(colName.toString()) != _spec.fieldSet.end();
+ if ((_unpackerBehavior == Behavior::kInclude) == found) {
+ _fieldIters.push_back({colName.toString(), colVal.getDocument().fieldIterator()});
+ }
+ }
+}
+
+Document BucketUnpacker::getNext() {
+ invariant(hasNext());
+
+ auto measurement = MutableDocument{};
+
+ auto&& [currentIdx, timeVal] = _timeFieldIter->next();
+ if (_includeTimeField) {
+ measurement.addField(_spec.timeField, timeVal);
+ }
+
+ if (!_metaValue.nullish()) {
+ measurement.addField(*_spec.metaField, _metaValue);
+ }
+
+ for (auto&& [colName, colIter] : _fieldIters) {
+ if (colIter.fieldName() == currentIdx) {
+ auto&& [_, val] = colIter.next();
+ measurement.addField(colName, val);
+ }
+ }
+
+ return measurement.freeze();
+}
+
+DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker)
+ : DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createFromBson(
+ BSONElement specElem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(5346500,
+ "$_internalUnpackBucket specification must be an object",
+ specElem.type() == Object);
+
+ BucketUnpacker::Behavior unpackerBehavior;
+ BucketSpec bucketSpec;
+ auto hasIncludeExclude = false;
+ std::vector<std::string> fields;
+ for (auto&& elem : specElem.embeddedObject()) {
+ auto fieldName = elem.fieldNameStringData();
+ if (fieldName == "include" || fieldName == "exclude") {
+ uassert(5346501,
+ "include or exclude field must be an array",
+ elem.type() == BSONType::Array);
+
+ for (auto&& elt : elem.embeddedObject()) {
+ uassert(5346502,
+ "include or exclude field element must be a string",
+ elt.type() == BSONType::String);
+ auto field = elt.valueStringData();
+ uassert(5346503,
+ "include or exclude field element must be a single-element field path",
+ field.find('.') == std::string::npos);
+ bucketSpec.fieldSet.emplace(field);
+ }
+ unpackerBehavior = fieldName == "include" ? BucketUnpacker::Behavior::kInclude
+ : BucketUnpacker::Behavior::kExclude;
+ hasIncludeExclude = true;
+ } else if (fieldName == kTimeFieldName) {
+ uassert(5346504, "timeField field must be a string", elem.type() == BSONType::String);
+ bucketSpec.timeField = elem.str();
+ } else if (fieldName == kMetaFieldName) {
+ uassert(5346505,
+ str::stream() << "metaField field must be a string, got: " << elem.type(),
+ elem.type() == BSONType::String);
+ bucketSpec.metaField = elem.str();
+ } else {
+ uasserted(5346506,
+ str::stream()
+ << "unrecognized parameter to $_internalUnpackBucket: " << fieldName);
+ }
+ }
+
+ // Check that none of the required arguments are missing.
+ uassert(5346507,
+ "The $_internalUnpackBucket stage requries an include/exclude parameter",
+ hasIncludeExclude);
+
+ uassert(5346508,
+ "The $_internalUnpackBucket stage requires a timeField parameter",
+ specElem[kTimeFieldName].ok());
+
+ // Determine if timestamp values should be included in the materialized measurements.
+ auto includeTimeField = (unpackerBehavior == BucketUnpacker::Behavior::kInclude) ==
+ (bucketSpec.fieldSet.find(bucketSpec.timeField) != bucketSpec.fieldSet.end());
+
+ return make_intrusive<DocumentSourceInternalUnpackBucket>(
+ expCtx, BucketUnpacker{std::move(bucketSpec), unpackerBehavior, includeTimeField});
+}
+
+Value DocumentSourceInternalUnpackBucket::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ MutableDocument out;
+ auto behavior =
+ _bucketUnpacker.behavior() == BucketUnpacker::Behavior::kInclude ? kInclude : kExclude;
+ auto&& spec = _bucketUnpacker.bucketSpec();
+ std::vector<Value> fields;
+ for (auto&& field : spec.fieldSet) {
+ fields.emplace_back(field);
+ }
+ out.addField(behavior, Value{fields});
+ out.addField(kTimeFieldName, Value{spec.timeField});
+ if (spec.metaField) {
+ out.addField(kMetaFieldName, Value{*spec.metaField});
+ }
+ return Value(DOC(getSourceName() << out.freeze()));
+}
+
+DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() {
+ if (_bucketUnpacker.hasNext()) {
+ return _bucketUnpacker.getNext();
+ }
+
+ auto nextResult = pSource->getNext();
+ if (nextResult.isAdvanced()) {
+ auto bucket = nextResult.getDocument();
+ _bucketUnpacker.reset(std::move(bucket));
+ uassert(
+ 5346509,
+ str::stream() << "A bucket with _id "
+ << _bucketUnpacker.bucket()[BucketUnpacker::kBucketIdFieldName].toString()
+ << " contains an empty data region",
+ _bucketUnpacker.hasNext());
+ return _bucketUnpacker.getNext();
+ }
+
+ return nextResult;
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
new file mode 100644
index 00000000000..9df0d6d6cbd
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <set>
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * Carries parameters for unpacking a bucket.
+ */
+struct BucketSpec {
+ // The user-supplied timestamp field name specified during time-series collection creation.
+ std::string timeField;
+
+ // An optional user-supplied metadata field name specified during time-series collection
+ // creation. This field name is used during materialization of metadata fields of a measurement
+ // after unpacking.
+ boost::optional<std::string> metaField;
+
+ // The set of fields provided to the include (or exclude) argument to the
+ // $_internalUnpackBucketStage.
+ std::set<std::string> fieldSet;
+};
+
+/**
+ * BucketUnpacker will unpack bucket fields for metadata and the provided fields.
+ */
+class BucketUnpacker {
+public:
+ // These are hard-coded constants in the bucket schema.
+ static constexpr StringData kBucketIdFieldName = "_id"_sd;
+ static constexpr StringData kBucketDataFieldName = "data"_sd;
+ static constexpr StringData kBucketMetaFieldName = "meta"_sd;
+
+ // When BucketUnpacker is created with kInclude it must produce measurements that contain the
+ // set of fields. Otherwise, if the kExclude option is used, the measurements will include the
+ // set difference between all fields in the bucket and the provided fields.
+ enum class Behavior { kInclude, kExclude };
+
+ BucketUnpacker(BucketSpec spec, Behavior unpackerBehavior, bool includeTimeField)
+ : _spec(std::move(spec)),
+ _unpackerBehavior(unpackerBehavior),
+ _includeTimeField(includeTimeField) {}
+
+ Document getNext();
+ bool hasNext() const {
+ return _timeFieldIter && _timeFieldIter->more();
+ }
+
+ /**
+ * This resets the unpacker to prepare to unpack a new bucket described by the given document.
+ */
+ void reset(Document&& bucket);
+
+ Behavior behavior() const {
+ return _unpackerBehavior;
+ }
+
+ const BucketSpec& bucketSpec() const {
+ return _spec;
+ }
+
+ const Document& bucket() const {
+ return _bucket;
+ }
+
+private:
+ const BucketSpec _spec;
+ const Behavior _unpackerBehavior;
+
+ // Iterates the timestamp section of the bucket to drive the unpacking iteration.
+ boost::optional<FieldIterator> _timeFieldIter;
+
+ // A flag used to mark that the timestamp value should be materialized in measurements.
+ const bool _includeTimeField;
+
+ // Since the metadata value is the same across all materialized measurements we can cache the
+ // metadata value in the reset phase and use it to materialize the metadata in each measurement.
+ Value _metaValue;
+
+ // The bucket being unpacked.
+ Document _bucket;
+
+ // Iterators used to unpack the columns of the above bucket that are populated during the reset
+ // phase according to the provided 'Behavior' and 'BucketSpec'.
+ std::vector<std::pair<std::string, FieldIterator>> _fieldIters;
+};
+
+class DocumentSourceInternalUnpackBucket : public DocumentSource {
+public:
+ static constexpr StringData kStageName = "$_internalUnpackBucket"_sd;
+ static constexpr StringData kInclude = "include"_sd;
+ static constexpr StringData kExclude = "exclude"_sd;
+ static constexpr StringData kTimeFieldName = "timeField"_sd;
+ static constexpr StringData kMetaFieldName = "metaField"_sd;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ DocumentSourceInternalUnpackBucket(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ BucketUnpacker bucketUnpacker);
+
+ const char* getSourceName() const override {
+ return kStageName.rawData();
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed,
+ LookupRequirement::kAllowed,
+ UnionRequirement::kAllowed,
+ ChangeStreamRequirement::kBlacklist};
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
+ return boost::none;
+ };
+
+private:
+ GetNextResult doGetNext() final;
+
+ BucketUnpacker _bucketUnpacker;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp
new file mode 100644
index 00000000000..334c12837f0
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test.cpp
@@ -0,0 +1,635 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_internal_unpack_bucket.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+
+namespace mongo {
+namespace {
+
+constexpr auto kUserDefinedTimeName = "time"_sd;
+constexpr auto kUserDefinedMetaName = "myMeta"_sd;
+
+using InternalUnpackBucketStageTest = AggregationContextFixture;
+
+TEST_F(InternalUnpackBucketStageTest, UnpackBasicIncludeAllMeasurementFields) {
+ auto expCtx = getExpCtx();
+
+ auto spec = BSON("$_internalUnpackBucket"
+ << BSON("include" << BSON_ARRAY("_id"
+ << "time"
+ << "a"
+ << "b")
+ << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+ // This source will produce two buckets.
+ auto source = DocumentSourceMock::createForTest(
+ {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}",
+ "{meta: {'m1': 9, 'm2': 9, 'm3': 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+ // The first result exists and is as expected.
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a: 2, b: 1}")));
+
+ // Second bucket
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3, a: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4, a: 2, b: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest, UnpackExcludeASingleField) {
+ auto expCtx = getExpCtx();
+ auto spec = BSON(
+ "$_internalUnpackBucket" << BSON(
+ "exclude" << BSON_ARRAY("b") << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source = DocumentSourceMock::createForTest(
+ {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}",
+ "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+ // The first result exists and is as expected.
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a: 2}")));
+
+ // Second bucket
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3, a: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4, a: 2}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest, UnpackEmptyInclude) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("include" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source = DocumentSourceMock::createForTest(
+ {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}",
+ "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+
+ // We should produce metadata only, one per measurement in the bucket.
+ for (auto idx = 0; idx < 2; ++idx) {
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{myMeta: {m1: 999, m2: 9999}}")));
+ }
+
+ for (auto idx = 0; idx < 2; ++idx) {
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{myMeta: {m1: 9, m2: 9, m3: 9}}")));
+ }
+
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest, UnpackEmptyExclude) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source = DocumentSourceMock::createForTest(
+ {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}",
+ "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, "
+ "a:{'0':1, '1':2}, b:{'1':1}}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a: 2, b: 1}")));
+
+ // Second bucket
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3, a: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4, a: 2, b: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest, UnpackBasicIncludeWithDollarPrefix) {
+ auto expCtx = getExpCtx();
+
+ auto spec = BSON("$_internalUnpackBucket"
+ << BSON("include" << BSON_ARRAY("_id"
+ << "time"
+ << "$a"
+ << "b")
+ << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+ // This source will produce two buckets.
+ auto source = DocumentSourceMock::createForTest(
+ {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, "
+ "$a:{'0':1, '1':2}, b:{'1':1}}}",
+ "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}, "
+ "$a:{'0':1, '1':2}, b:{'1':1}}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+ // The first result exists and is as expected.
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, $a: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, $a: 2, b: 1}")));
+
+ // Second bucket
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3, $a: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.getDocument(),
+ Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4, $a: 2, b: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest, UnpackMetadataOnly) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source = DocumentSourceMock::createForTest(
+ {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}}}",
+ "{meta: {m1: 9, m2: 9, m3: 9}, data: {_id: {'0':3, '1':4}, time: {'0':3, '1':4}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}")));
+
+ // Second bucket
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 3, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 3}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest, UnpackWithStrangeTimestampOrdering) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source = DocumentSourceMock::createForTest(
+ {"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, "
+ "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}",
+ "{meta: {'m1': 9, 'm2': 9, 'm3': 9}, data: {_id: {'1':4, "
+ "'0':5, '2':6}, time: {'1':4, '0': 5, '2': 6}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 3, myMeta: {m1: 999, m2: 9999}, _id: 3}")));
+
+ // Second bucket
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 4, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 4}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 5, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 5}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 6, myMeta: {m1: 9, m2: 9, m3: 9}, _id: 6}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest,
+ BucketUnpackerHandlesMissingMetadataWhenMetaFieldUnspecified) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source =
+ DocumentSourceMock::createForTest({"{data: {_id: {'1':1, "
+ "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}",
+ "{data: {_id: {'1':4, "
+ "'0':5, '2':6}, time: {'1':4, '0': 5, '2': 6}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 1, _id: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 2, _id: 2}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 3, _id: 3}")));
+
+ // Second bucket
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 4, _id: 4}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 5, _id: 5}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 6, _id: 6}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest, BucketUnpackerThrowsOnUndefinedMetadata) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source =
+ DocumentSourceMock::createForTest({"{meta: undefined, data: {_id: {'1':1, "
+ "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+ ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5346511);
+}
+
+TEST_F(InternalUnpackBucketStageTest, BucketUnpackerThrowsOnMissingMetadata) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source =
+ DocumentSourceMock::createForTest({"{data: {_id: {'1':1, "
+ "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+ ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5346511);
+}
+
+
+TEST_F(InternalUnpackBucketStageTest, BucketUnpackerHandlesNullMetadata) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source =
+ DocumentSourceMock::createForTest({"{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, "
+ "'0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}",
+ "{meta: null, data: {_id: {'1':4, "
+ "'0':5, '2':6}, time: {'1':4, '0': 5, '2': 6}}}"},
+ expCtx);
+ unpack->setSource(source.get());
+
+ auto next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(),
+ Document(fromjson("{time: 3, myMeta: {m1: 999, m2: 9999}, _id: 3}")));
+
+ // Second bucket
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 4, _id: 4}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 5, _id: 5}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(next.getDocument(), Document(fromjson("{time: 6, _id: 6}")));
+
+ next = unpack->getNext();
+ ASSERT_TRUE(next.isEOF());
+}
+
+TEST_F(InternalUnpackBucketStageTest, ThrowsOnEmptyDataValue) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source = DocumentSourceMock::createForTest(
+ Document{{"_id", 1}, {"meta", Document{{"m1", 999}, {"m2", 9999}}}, {"data", Document{}}},
+ expCtx);
+ unpack->setSource(source.get());
+ ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5346509);
+}
+
+TEST_F(InternalUnpackBucketStageTest, HandlesEmptyBucket) {
+ auto expCtx = getExpCtx();
+ auto spec =
+ BSON("$_internalUnpackBucket"
+ << BSON("exclude" << BSONArray() << DocumentSourceInternalUnpackBucket::kTimeFieldName
+ << kUserDefinedTimeName
+ << DocumentSourceInternalUnpackBucket::kMetaFieldName
+ << kUserDefinedMetaName));
+ auto unpack = DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), expCtx);
+
+ auto source = DocumentSourceMock::createForTest(Document{}, expCtx);
+ unpack->setSource(source.get());
+ ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5346510);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonObjArgment) {
+ ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: 1}").firstElement(), getExpCtx()),
+ AssertionException,
+ 5346500);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonArrayInclude) {
+ ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {include: 'not array', timeField: "
+ "'foo', metaField: 'bar'}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 5346501);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonArrayExclude) {
+ ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {exclude: 'not array', timeField: "
+ "'foo', metaField: 'bar'}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 5346501);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonStringInclude) {
+ ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {include: [999, 1212], timeField: "
+ "'foo', metaField: 'bar'}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 5346502);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsDottedPaths) {
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson(
+ "{$_internalUnpackBucket: {exclude: ['a.b'], timeField: 'foo', metaField: 'bar'}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 5346503);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsBadIncludeExcludeFieldName) {
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {TYPO: [], timeField: 'foo', metaField: 'bar'}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 5346506);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonStringTimeField) {
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {include: [], timeField: 999, metaField: 'bar'}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 5346504);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsNonStringMetaField) {
+ ASSERT_THROWS_CODE(
+ DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {include: [], timeField: 'foo', metaField: 999}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 5346505);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsAdditionalFields) {
+ ASSERT_THROWS_CODE(DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {include: [], timeField: 'foo', "
+ "metaField: 'bar', extra: 1}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException,
+ 5346506);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsMissingIncludeField) {
+ ASSERT_THROWS(DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {timeField: 'foo', metaField: 'bar'}}")
+ .firstElement(),
+ getExpCtx()),
+ AssertionException);
+}
+
+TEST_F(InternalUnpackBucketStageTest, ParserRejectsMissingTimeField) {
+ ASSERT_THROWS(
+ DocumentSourceInternalUnpackBucket::createFromBson(
+ fromjson("{$_internalUnpackBucket: {include: [], metaField: 'bar'}}").firstElement(),
+ getExpCtx()),
+ AssertionException);
+}
+} // namespace
+} // namespace mongo