diff options
Diffstat (limited to 'src/mongo/db')
19 files changed, 1639 insertions, 807 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index b6029533ebf..2edde6d77ac 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1187,6 +1187,7 @@ env.Library( 'exec/requires_collection_stage.cpp', 'exec/requires_index_stage.cpp', 'exec/return_key.cpp', + 'exec/sample_from_timeseries_bucket.cpp', 'exec/shard_filter.cpp', 'exec/shard_filterer_impl.cpp', 'exec/skip.cpp', @@ -1198,6 +1199,7 @@ env.Library( 'exec/trial_period_utils.cpp', 'exec/trial_stage.cpp', 'exec/update_stage.cpp', + 'exec/unpack_timeseries_bucket.cpp', 'exec/upsert_stage.cpp', 'exec/working_set_common.cpp', 'exec/write_stage_common.cpp', diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript index 981512045ea..0a238aae69c 100644 --- a/src/mongo/db/exec/SConscript +++ b/src/mongo/db/exec/SConscript @@ -51,6 +51,16 @@ env.Library( ], ) +env.Library( + target = "bucket_unpacker", + source = [ + "bucket_unpacker.cpp", + ], + LIBDEPS = [ + "document_value/document_value", + ], +) + sortExecutorEnv = env.Clone() sortExecutorEnv.InjectThirdParty(libraries=['snappy']) sortExecutorEnv.Library( @@ -133,6 +143,7 @@ env.CppUnitTest( "queued_data_stage_test.cpp", "sort_test.cpp", "working_set_test.cpp", + "bucket_unpacker_test.cpp", ], LIBDEPS=[ "$BUILD_DIR/mongo/base", diff --git a/src/mongo/db/exec/bucket_unpacker.cpp b/src/mongo/db/exec/bucket_unpacker.cpp new file mode 100644 index 00000000000..37d4ec4a65c --- /dev/null +++ b/src/mongo/db/exec/bucket_unpacker.cpp @@ -0,0 +1,240 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/bucket_unpacker.h" +#include "mongo/db/timeseries/timeseries_field_names.h" + +namespace mongo { + +/** + * Erase computed meta projection fields if they are present in the exclusion field set. + */ +void eraseExcludedComputedMetaProjFields(BucketUnpacker::Behavior unpackerBehavior, + BucketSpec* bucketSpec) { + if (unpackerBehavior == BucketUnpacker::Behavior::kExclude && + bucketSpec->computedMetaProjFields.size() > 0) { + for (auto it = bucketSpec->computedMetaProjFields.begin(); + it != bucketSpec->computedMetaProjFields.end();) { + if (bucketSpec->fieldSet.find(*it) != bucketSpec->fieldSet.end()) { + it = bucketSpec->computedMetaProjFields.erase(it); + } else { + it++; + } + } + } +} + +BucketUnpacker::BucketUnpacker(BucketSpec spec, Behavior unpackerBehavior) { + setBucketSpecAndBehavior(std::move(spec), unpackerBehavior); +} + +// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the +// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in +// the table, this helper returns the measurement count corresponding to the table record. +// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i} +// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the +// pair for the interval is computed and then linear interpolation is used to compute the +// measurement count corresponding to the 'targetTimestampObjSize' provided. +int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) { + auto currentInterval = + std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable), + std::end(BucketUnpacker::kTimestampObjSizeTable), + [&](const auto& entry) { return targetTimestampObjSize <= entry.second; }); + + if (currentInterval->second == targetTimestampObjSize) { + return currentInterval->first; + } + // This points to the first interval larger than the target 'targetTimestampObjSize', the actual + // interval that will cover the object size is the interval before the current one. + tassert(5422104, + "currentInterval should not point to the first table entry", + currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin()); + --currentInterval; + + auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin()); + + return currentInterval->first + + ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey)); +} + +void BucketUnpacker::reset(BSONObj&& bucket) { + _fieldIters.clear(); + _timeFieldIter = boost::none; + + _bucket = std::move(bucket); + uassert(5346510, "An empty bucket cannot be unpacked", !_bucket.isEmpty()); + + auto&& dataRegion = _bucket.getField(timeseries::kBucketDataFieldName).Obj(); + if (dataRegion.isEmpty()) { + // If the data field of a bucket is present but it holds an empty object, there's nothing to + // unpack. + return; + } + + auto&& timeFieldElem = dataRegion.getField(_spec.timeField); + uassert(5346700, + "The $_internalUnpackBucket stage requires the data region to have a timeField object", + timeFieldElem); + + _timeFieldIter = BSONObjIterator{timeFieldElem.Obj()}; + + _metaValue = _bucket[timeseries::kBucketMetaFieldName]; + if (_spec.metaField) { + // The spec indicates that there might be a metadata region. Missing metadata in + // measurements is expressed with missing metadata in a bucket. But we disallow undefined + // since the undefined BSON type is deprecated. + uassert(5369600, + "The $_internalUnpackBucket stage allows metadata to be absent or otherwise, it " + "must not be the deprecated undefined bson type", + !_metaValue || _metaValue.type() != BSONType::Undefined); + } else { + // If the spec indicates that the time series collection has no metadata field, then we + // should not find a metadata region in the underlying bucket documents. + uassert(5369601, + "The $_internalUnpackBucket stage expects buckets to have missing metadata regions " + "if the metaField parameter is not provided", + !_metaValue); + } + + // Walk the data region of the bucket, and decide if an iterator should be set up based on the + // include or exclude case. + for (auto&& elem : dataRegion) { + auto& colName = elem.fieldNameStringData(); + if (colName == _spec.timeField) { + // Skip adding a FieldIterator for the timeField since the timestamp value from + // _timeFieldIter can be placed accordingly in the materialized measurement. + continue; + } + + // Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or + // _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'. + if (determineIncludeField(colName, _unpackerBehavior, _spec)) { + _fieldIters.emplace_back(colName.toString(), BSONObjIterator{elem.Obj()}); + } + } + + // Update computed meta projections with values from this bucket. + if (!_spec.computedMetaProjFields.empty()) { + for (auto&& name : _spec.computedMetaProjFields) { + _computedMetaProjections[name] = _bucket[name]; + } + } + + // Save the measurement count for the bucket. + _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize()); +} + +void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) { + _includeMetaField = eraseMetaFromFieldSetAndDetermineIncludeMeta(behavior, &bucketSpec); + _includeTimeField = determineIncludeTimeField(behavior, &bucketSpec); + _unpackerBehavior = behavior; + eraseExcludedComputedMetaProjFields(behavior, &bucketSpec); + _spec = std::move(bucketSpec); +} + +const std::set<StringData> BucketUnpacker::reservedBucketFieldNames = { + timeseries::kBucketIdFieldName, + timeseries::kBucketDataFieldName, + timeseries::kBucketMetaFieldName, + timeseries::kBucketControlFieldName}; + +void BucketUnpacker::addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames) { + for (auto&& field : computedFieldNames) { + _spec.computedMetaProjFields.emplace_back(field.toString()); + } +} + +Document BucketUnpacker::getNext() { + tassert(5521503, "'getNext()' requires the bucket to be owned", _bucket.isOwned()); + tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext()); + + auto measurement = MutableDocument{}; + auto&& timeElem = _timeFieldIter->next(); + if (_includeTimeField) { + measurement.addField(_spec.timeField, Value{timeElem}); + } + + // Includes metaField when we're instructed to do so and metaField value exists. + if (_includeMetaField && _metaValue) { + measurement.addField(*_spec.metaField, Value{_metaValue}); + } + + auto& currentIdx = timeElem.fieldNameStringData(); + for (auto&& [colName, colIter] : _fieldIters) { + if (auto&& elem = *colIter; colIter.more() && elem.fieldNameStringData() == currentIdx) { + measurement.addField(colName, Value{elem}); + colIter.advance(elem); + } + } + + // Add computed meta projections. + for (auto&& name : _spec.computedMetaProjFields) { + measurement.addField(name, Value{_computedMetaProjections[name]}); + } + + return measurement.freeze(); +} + +Document BucketUnpacker::extractSingleMeasurement(int j) { + tassert(5422101, + "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than " + "or equal to the number of measurements in a bucket", + j >= 0 && j < _numberOfMeasurements); + + auto measurement = MutableDocument{}; + + auto rowKey = std::to_string(j); + auto targetIdx = StringData{rowKey}; + auto&& dataRegion = _bucket.getField(timeseries::kBucketDataFieldName).Obj(); + + if (_includeMetaField && !_metaValue.isNull()) { + measurement.addField(*_spec.metaField, Value{_metaValue}); + } + + for (auto&& dataElem : dataRegion) { + auto colName = dataElem.fieldNameStringData(); + if (!determineIncludeField(colName, _unpackerBehavior, _spec)) { + continue; + } + auto value = dataElem[targetIdx]; + if (value) { + measurement.addField(dataElem.fieldNameStringData(), Value{value}); + } + } + + // Add computed meta projections. + for (auto&& name : _spec.computedMetaProjFields) { + measurement.addField(name, Value{_computedMetaProjections[name]}); + } + + return measurement.freeze(); +} +} // namespace mongo diff --git a/src/mongo/db/exec/bucket_unpacker.h b/src/mongo/db/exec/bucket_unpacker.h new file mode 100644 index 00000000000..9c505bc1682 --- /dev/null +++ b/src/mongo/db/exec/bucket_unpacker.h @@ -0,0 +1,214 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <set> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/exec/document_value/document.h" + +namespace mongo { +/** + * Carries parameters for unpacking a bucket. + */ +struct BucketSpec { + // The user-supplied timestamp field name specified during time-series collection creation. + std::string timeField; + + // An optional user-supplied metadata field name specified during time-series collection + // creation. This field name is used during materialization of metadata fields of a measurement + // after unpacking. + boost::optional<std::string> metaField; + + // The set of field names in the data region that should be included or excluded. + std::set<std::string> fieldSet; + + // Vector of computed meta field projection names. Added at the end of materialized + // measurements. + std::vector<std::string> computedMetaProjFields; +}; + +/** + * BucketUnpacker will unpack bucket fields for metadata and the provided fields. + */ +class BucketUnpacker { +public: + // A table that is useful for interpolations between the number of measurements in a bucket and + // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i, + // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the + // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are + // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99], + // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to + // exceed the server BSON object limit of 16 MB. + static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{ + {{0, BSONObj::kMinBSONLength}, + {10, 115}, + {100, 1195}, + {1000, 12895}, + {10000, 138895}, + {100000, 1488895}, + {1000000, 15888895}, + {10000000, 168888895}}}; + + /** + * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series + * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time. + */ + static int computeMeasurementCount(int targetTimestampObjSize); + + // Set of field names reserved for time-series buckets. + static const std::set<StringData> reservedBucketFieldNames; + + // When BucketUnpacker is created with kInclude it must produce measurements that contain the + // set of fields. Otherwise, if the kExclude option is used, the measurements will include the + // set difference between all fields in the bucket and the provided fields. + enum class Behavior { kInclude, kExclude }; + + BucketUnpacker(BucketSpec spec, Behavior unpackerBehavior); + + /** + * This method will continue to materialize Documents until the bucket is exhausted. A + * precondition of this method is that 'hasNext()' must be true. + */ + Document getNext(); + + /** + * This method will extract the j-th measurement from the bucket. A precondition of this method + * is that j >= 0 && j <= the number of measurements within the underlying bucket. + */ + Document extractSingleMeasurement(int j); + + bool hasNext() const { + return _timeFieldIter && _timeFieldIter->more(); + } + + /** + * This resets the unpacker to prepare to unpack a new bucket described by the given document. + */ + void reset(BSONObj&& bucket); + + Behavior behavior() const { + return _unpackerBehavior; + } + + const BucketSpec& bucketSpec() const { + return _spec; + } + + const BSONObj& bucket() const { + return _bucket; + } + + bool includeMetaField() const { + return _includeMetaField; + } + + bool includeTimeField() const { + return _includeTimeField; + } + + int32_t numberOfMeasurements() const { + return _numberOfMeasurements; + } + + void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior); + + // Add computed meta projection names to the bucket specification. + void addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames); + +private: + BucketSpec _spec; + Behavior _unpackerBehavior; + + // Iterates the timestamp section of the bucket to drive the unpacking iteration. + boost::optional<BSONObjIterator> _timeFieldIter; + + // A flag used to mark that the timestamp value should be materialized in measurements. + bool _includeTimeField; + + // A flag used to mark that a bucket's metadata value should be materialized in measurements. + bool _includeMetaField; + + // The bucket being unpacked. + BSONObj _bucket; + + // Since the metadata value is the same across all materialized measurements we can cache the + // metadata BSONElement in the reset phase and use it to materialize the metadata in each + // measurement. + BSONElement _metaValue; + + // Iterators used to unpack the columns of the above bucket that are populated during the reset + // phase according to the provided 'Behavior' and 'BucketSpec'. + std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters; + + // Map <name, BSONElement> for the computed meta field projections. Updated for + // every bucket upon reset(). + stdx::unordered_map<std::string, BSONElement> _computedMetaProjections; + + // The number of measurements in the bucket. + int32_t _numberOfMeasurements = 0; +}; + +/** + * Removes metaField from the field set and returns a boolean indicating whether metaField should be + * included in the materialized measurements. Always returns false if metaField does not exist. + */ +inline bool eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior unpackerBehavior, + BucketSpec* bucketSpec) { + if (!bucketSpec->metaField) { + return false; + } else if (auto itr = bucketSpec->fieldSet.find(*bucketSpec->metaField); + itr != bucketSpec->fieldSet.end()) { + bucketSpec->fieldSet.erase(itr); + return unpackerBehavior == BucketUnpacker::Behavior::kInclude; + } else { + return unpackerBehavior == BucketUnpacker::Behavior::kExclude; + } +} + +/** + * Determines if timestamp values should be included in the materialized measurements. + */ +inline bool determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, + BucketSpec* bucketSpec) { + return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == + (bucketSpec->fieldSet.find(bucketSpec->timeField) != bucketSpec->fieldSet.end()); +} + +/** + * Determines if an arbitrary field should be included in the materialized measurements. + */ +inline bool determineIncludeField(StringData fieldName, + BucketUnpacker::Behavior unpackerBehavior, + const BucketSpec& bucketSpec) { + return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == + (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end()); +} +} // namespace mongo diff --git a/src/mongo/db/exec/bucket_unpacker_test.cpp b/src/mongo/db/exec/bucket_unpacker_test.cpp new file mode 100644 index 00000000000..311f626854e --- /dev/null +++ b/src/mongo/db/exec/bucket_unpacker_test.cpp @@ -0,0 +1,596 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/json.h" +#include "mongo/db/exec/bucket_unpacker.h" +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +constexpr auto kUserDefinedTimeName = "time"_sd; +constexpr auto kUserDefinedMetaName = "myMeta"_sd; + +/** + * A fixture to test the BucketUnpacker + */ +class BucketUnpackerTest : public mongo::unittest::Test { +public: + /** + * Makes a fresh BucketUnpacker, resets it to unpack the given 'bucket', and then returns it + * before actually doing any unpacking. + */ + BucketUnpacker makeBucketUnpacker(std::set<std::string> fields, + BucketUnpacker::Behavior behavior, + BSONObj bucket, + boost::optional<std::string> metaFieldName = boost::none) { + auto spec = BucketSpec{kUserDefinedTimeName.toString(), metaFieldName, std::move(fields)}; + + BucketUnpacker unpacker{std::move(spec), behavior}; + unpacker.reset(std::move(bucket)); + return unpacker; + } + + /** + * Constructs a 'BucketUnpacker' based on the provided parameters and then resets it to unpack + * the given 'bucket'. Asserts that 'reset()' throws the given 'errorCode'. + */ + void assertUnpackerThrowsCode(std::set<std::string> fields, + BucketUnpacker::Behavior behavior, + BSONObj bucket, + boost::optional<std::string> metaFieldName, + int errorCode) { + auto spec = BucketSpec{kUserDefinedTimeName.toString(), metaFieldName, std::move(fields)}; + BucketUnpacker unpacker{std::move(spec), behavior}; + ASSERT_THROWS_CODE(unpacker.reset(std::move(bucket)), AssertionException, errorCode); + } + + void assertGetNext(BucketUnpacker& unpacker, const Document& expected) { + ASSERT_DOCUMENT_EQ(unpacker.getNext(), expected); + } +}; + +TEST_F(BucketUnpackerTest, UnpackBasicIncludeAllMeasurementFields) { + std::set<std::string> fields{ + "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1, '1':2}, b:{'1':1}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kInclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a :2, b: 1}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, ExcludeASingleField) { + std::set<std::string> fields{"b"}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1, '1':2}, b:{'1':1}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a: 2}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, EmptyIncludeGetsEmptyMeasurements) { + std::set<std::string> fields{}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1, '1':2}, b:{'1':1}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kInclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + + // We should produce empty documents, one per measurement in the bucket. + for (auto idx = 0; idx < 2; ++idx) { + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document(fromjson("{}"))); + } + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, EmptyExcludeMaterializesAllFields) { + std::set<std::string> fields{}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1, '1':2}, b:{'1':1}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, a :2, b: 1}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, SparseColumnsWhereOneColumnIsExhaustedBeforeTheOther) { + std::set<std::string> fields{}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "a:{'0':1}, b:{'1':1}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, a: 1}")}); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, b: 1}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, UnpackBasicIncludeWithDollarPrefix) { + std::set<std::string> fields{ + "_id", "$a", "b", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString()}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}, " + "$a:{'0':1, '1':2}, b:{'1':1}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kInclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, + Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1, $a: 1}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext( + unpacker, + Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2, $a: 2, b: 1}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, BucketsWithMetadataOnly) { + std::set<std::string> fields{}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'0':1, '1':2}, time: {'0':1, '1':2}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}")}); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, UnorderedRowKeysDoesntAffectMaterialization) { + std::set<std::string> fields{}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, " + "'2': 3}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 1, myMeta: {m1: 999, m2: 9999}, _id: 1}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 2, myMeta: {m1: 999, m2: 9999}, _id: 2}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 3, myMeta: {m1: 999, m2: 9999}, _id: 3}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, MissingMetaFieldDoesntMaterializeMetadata) { + std::set<std::string> fields{}; + + auto bucket = fromjson("{data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 1, _id: 1}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 2, _id: 2}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 3, _id: 3}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, ExcludedMetaFieldDoesntMaterializeMetadataWhenBucketHasMeta) { + std::set<std::string> fields{kUserDefinedMetaName.toString()}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, " + "'2': 3}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 1, _id: 1}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 2, _id: 2}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 3, _id: 3}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUndefinedMeta) { + std::set<std::string> fields{}; + + auto bucket = fromjson( + "{meta: undefined, data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, '2': 3}}}"); + + assertUnpackerThrowsCode(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString(), + 5369600); +} + +TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUnexpectedMeta) { + std::set<std::string> fields{}; + + auto bucket = fromjson( + "{meta: {'m1': 999, 'm2': 9999}, data: {_id: {'1':1, '0':2, '2': 3}, time: {'1':1, '0': 2, " + "'2': 3}}}"); + + assertUnpackerThrowsCode(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + boost::none /* no metaField provided */, + 5369601); +} + +TEST_F(BucketUnpackerTest, NullMetaInBucketMaterializesAsNull) { + std::set<std::string> fields{}; + + auto bucket = + fromjson("{meta: null, data: {_id: {'1':4, '0':5, '2':6}, time: {'1':4, '0': 5, '2': 6}}}"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 4, myMeta: null, _id: 4}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 5, myMeta: null, _id: 5}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 6, myMeta: null, _id: 6}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, GetNextHandlesMissingMetaInBucket) { + std::set<std::string> fields{}; + + auto bucket = fromjson(R"( +{ + data: { + _id: {'1':4, '0':5, '2':6}, + time: {'1':4, '0': 5, '2': 6} + } +})"); + + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + std::move(bucket), + kUserDefinedMetaName.toString()); + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 4, _id: 4}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 5, _id: 5}")}); + + ASSERT_TRUE(unpacker.hasNext()); + assertGetNext(unpacker, Document{fromjson("{time: 6, _id: 6}")}); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, EmptyDataRegionInBucketIsTolerated) { + std::set<std::string> fields{}; + + auto bucket = + Document{{"_id", 1}, {"meta", Document{{"m1", 999}, {"m2", 9999}}}, {"data", Document{}}}; + auto unpacker = makeBucketUnpacker(std::move(fields), + BucketUnpacker::Behavior::kExclude, + bucket.toBson(), + kUserDefinedMetaName.toString()); + ASSERT_FALSE(unpacker.hasNext()); +} + +TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnEmptyBucket) { + std::set<std::string> fields{}; + + auto bucket = Document{}; + assertUnpackerThrowsCode(std::move(fields), + BucketUnpacker::Behavior::kExclude, + bucket.toBson(), + kUserDefinedMetaName.toString(), + 5346510); +} + +TEST_F(BucketUnpackerTest, EraseMetaFromFieldSetAndDetermineIncludeMeta) { + // Tests a missing 'metaField' in the spec. + std::set<std::string> empFields{}; + auto spec = BucketSpec{kUserDefinedTimeName.toString(), boost::none, std::move(empFields)}; + ASSERT_FALSE( + eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kInclude, &spec)); + + // Tests a spec with 'metaField' in include list. + std::set<std::string> fields{kUserDefinedMetaName.toString()}; + auto specWithMetaInclude = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; + ASSERT_TRUE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kInclude, + &specWithMetaInclude)); + ASSERT_EQ(specWithMetaInclude.fieldSet.count(kUserDefinedMetaName.toString()), 0); + + std::set<std::string> fieldsNoMeta{"foo"}; + auto specWithFooInclude = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fieldsNoMeta)}; + ASSERT_TRUE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kExclude, + &specWithFooInclude)); + ASSERT_FALSE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kInclude, + &specWithFooInclude)); + + // Tests a spec with 'metaField' not in exclude list. + std::set<std::string> excludeFields{}; + auto specWithMetaExclude = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(excludeFields)}; + ASSERT_TRUE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kExclude, + &specWithMetaExclude)); + ASSERT_FALSE(eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior::kInclude, + &specWithMetaExclude)); +} + +TEST_F(BucketUnpackerTest, DetermineIncludeTimeField) { + std::set<std::string> fields{kUserDefinedTimeName.toString()}; + auto spec = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; + ASSERT_TRUE(determineIncludeTimeField(BucketUnpacker::Behavior::kInclude, &spec)); + ASSERT_FALSE(determineIncludeTimeField(BucketUnpacker::Behavior::kExclude, &spec)); +} + +TEST_F(BucketUnpackerTest, DetermineIncludeField) { + std::string includedMeasurementField = "measurementField1"; + std::string excludedMeasurementField = "measurementField2"; + std::set<std::string> fields{kUserDefinedTimeName.toString(), includedMeasurementField}; + auto spec = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; + + ASSERT_TRUE(determineIncludeField( + kUserDefinedTimeName.toString(), BucketUnpacker::Behavior::kInclude, spec)); + ASSERT_FALSE(determineIncludeField( + kUserDefinedTimeName.toString(), BucketUnpacker::Behavior::kExclude, spec)); + + ASSERT_TRUE( + determineIncludeField(includedMeasurementField, BucketUnpacker::Behavior::kInclude, spec)); + ASSERT_FALSE( + determineIncludeField(includedMeasurementField, BucketUnpacker::Behavior::kExclude, spec)); + + ASSERT_FALSE( + determineIncludeField(excludedMeasurementField, BucketUnpacker::Behavior::kInclude, spec)); + ASSERT_TRUE( + determineIncludeField(excludedMeasurementField, BucketUnpacker::Behavior::kExclude, spec)); +} + +/** + * Manually computes the timestamp object size for n timestamps. + */ +auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) { + BSONObjBuilder bob; + for (auto i = 0; i < n; ++i) { + bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now()); + } + return bob.done().objsize(); +} + +TEST_F(BucketUnpackerTest, ExtractSingleMeasurement) { + std::set<std::string> fields{ + "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; + auto spec = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; + auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude}; + + auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); + auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); + auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue(); + auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" + << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time" + << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a" + << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b" + << BSON("1" << 1 << "2" << 2))); + + unpacker.reset(std::move(bucket)); + + auto next = unpacker.extractSingleMeasurement(0); + auto expected = Document{ + {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; + ASSERT_DOCUMENT_EQ(next, expected); + + next = unpacker.extractSingleMeasurement(2); + expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, + {"_id", 3}, + {"time", d3}, + {"a", 3}, + {"b", 2}}; + ASSERT_DOCUMENT_EQ(next, expected); + + next = unpacker.extractSingleMeasurement(1); + expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, + {"_id", 2}, + {"time", d2}, + {"a", 2}, + {"b", 1}}; + ASSERT_DOCUMENT_EQ(next, expected); + + // Can we extract the middle element again? + next = unpacker.extractSingleMeasurement(1); + ASSERT_DOCUMENT_EQ(next, expected); +} + +TEST_F(BucketUnpackerTest, ExtractSingleMeasurementSparse) { + std::set<std::string> fields{ + "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; + auto spec = BucketSpec{ + kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; + auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude}; + + auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); + auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); + auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" + << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time" + << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1) + << "b" << BSON("1" << 1))); + + unpacker.reset(std::move(bucket)); + auto next = unpacker.extractSingleMeasurement(1); + auto expected = Document{ + {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}}; + ASSERT_DOCUMENT_EQ(next, expected); + + // Can we extract the same element again? + next = unpacker.extractSingleMeasurement(1); + ASSERT_DOCUMENT_EQ(next, expected); + + next = unpacker.extractSingleMeasurement(0); + expected = Document{ + {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; + ASSERT_DOCUMENT_EQ(next, expected); + + // Can we extract the same element twice in a row? + next = unpacker.extractSingleMeasurement(0); + ASSERT_DOCUMENT_EQ(next, expected); + + next = unpacker.extractSingleMeasurement(0); + ASSERT_DOCUMENT_EQ(next, expected); +} + +TEST_F(BucketUnpackerTest, ComputeMeasurementCountLowerBoundsAreCorrect) { + // The last table entry is a sentinel for an upper bound on the interval that covers measurement + // counts up to 16 MB. + const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; + + // Test the case when the target size hits a table entry which represents the lower bound of an + // interval. + for (size_t index = 0; index < maxTableEntry; ++index) { + auto interval = BucketUnpacker::kTimestampObjSizeTable[index]; + ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second)); + } +} + +TEST_F(BucketUnpackerTest, ComputeMeasurementCountUpperBoundsAreCorrect) { + const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; + + // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this + // fact and walk the table backwards to check the correctness of the S_i'th interval's upper + // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size + // containing one timestamp with the appropriate rowKey. + std::pair<int, int> currentInterval; + auto currentIntervalSize = 0; + auto currentIntervalCount = 0; + auto size = 0; + for (size_t index = maxTableEntry; index > 0; --index) { + currentInterval = BucketUnpacker::kTimestampObjSizeTable[index]; + currentIntervalSize = currentInterval.second; + currentIntervalCount = currentInterval.first; + auto rowKey = currentIntervalCount - 1; + size = expectedTimestampObjSize(rowKey, 1); + // We need to add back the kMinBSONLength since it's subtracted out. + ASSERT_EQ(currentIntervalCount - 1, + BucketUnpacker::computeMeasurementCount(currentIntervalSize - size + + BSONObj::kMinBSONLength)); + } +} + +TEST_F(BucketUnpackerTest, ComputeMeasurementCountAllPointsInSmallerIntervals) { + // Test all values for some of the smaller intervals up to 100 measurements. + for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) { + auto size = expectedTimestampObjSize(0, bucketCount); + ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size)); + } +} + +TEST_F(BucketUnpackerTest, ComputeMeasurementCountInLargerIntervals) { + ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003)); + ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560)); + ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863)); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/exec/plan_stats.h b/src/mongo/db/exec/plan_stats.h index b80b22710e6..bf21a599597 100644 --- a/src/mongo/db/exec/plan_stats.h +++ b/src/mongo/db/exec/plan_stats.h @@ -878,4 +878,29 @@ struct UnionWithStats final : public SpecificStats { PlanSummaryStats planSummaryStats; }; +struct UnpackTimeseriesBucketStats final : public SpecificStats { + std::unique_ptr<SpecificStats> clone() const final { + return std::make_unique<UnpackTimeseriesBucketStats>(*this); + } + + uint64_t estimateObjectSizeInBytes() const { + return sizeof(*this); + } + + size_t nBucketsUnpacked = 0u; +}; + +struct SampleFromTimeseriesBucketStats final : public SpecificStats { + std::unique_ptr<SpecificStats> clone() const final { + return std::make_unique<SampleFromTimeseriesBucketStats>(*this); + } + + uint64_t estimateObjectSizeInBytes() const { + return sizeof(*this); + } + + size_t nBucketsDiscarded = 0u; + size_t dupsTested = 0u; + size_t dupsDropped = 0u; +}; } // namespace mongo diff --git a/src/mongo/db/exec/sample_from_timeseries_bucket.cpp b/src/mongo/db/exec/sample_from_timeseries_bucket.cpp new file mode 100644 index 00000000000..2d0fe7e72aa --- /dev/null +++ b/src/mongo/db/exec/sample_from_timeseries_bucket.cpp @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/exec/sample_from_timeseries_bucket.h" +#include "mongo/db/timeseries/timeseries_field_names.h" + +namespace mongo { +const char* SampleFromTimeseriesBucket::kStageType = "SAMPLE_FROM_TIMESERIES_BUCKET"; + +SampleFromTimeseriesBucket::SampleFromTimeseriesBucket(ExpressionContext* expCtx, + WorkingSet* ws, + std::unique_ptr<PlanStage> child, + BucketUnpacker bucketUnpacker, + int maxConsecutiveAttempts, + long long sampleSize, + int bucketMaxCount) + : PlanStage{kStageType, expCtx}, + _ws{*ws}, + _bucketUnpacker{std::move(bucketUnpacker)}, + _maxConsecutiveAttempts{maxConsecutiveAttempts}, + _sampleSize{sampleSize}, + _bucketMaxCount{bucketMaxCount} { + tassert(5521500, "sampleSize must be gte to 0", sampleSize >= 0); + tassert(5521501, "bucketMaxCount must be gt 0", bucketMaxCount > 0); + + _children.emplace_back(std::move(child)); +} + +void SampleFromTimeseriesBucket::materializeMeasurement(int64_t measurementIdx, + WorkingSetMember* member) { + auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(measurementIdx); + + member->keyData.clear(); + member->recordId = {}; + member->doc = {{}, std::move(sampledDocument)}; + member->transitionToOwnedObj(); +} + +std::unique_ptr<PlanStageStats> SampleFromTimeseriesBucket::getStats() { + _commonStats.isEOF = isEOF(); + auto ret = std::make_unique<PlanStageStats>(_commonStats, stageType()); + ret->specific = std::make_unique<SampleFromTimeseriesBucketStats>(_specificStats); + ret->children.emplace_back(child()->getStats()); + return ret; +} + +PlanStage::StageState SampleFromTimeseriesBucket::doWork(WorkingSetID* out) { + if (isEOF()) { + return PlanStage::IS_EOF; + } + + auto id = WorkingSet::INVALID_ID; + auto status = child()->work(&id); + + if (PlanStage::ADVANCED == status) { + auto member = _ws.get(id); + + auto bucket = member->doc.value().toBson(); + _bucketUnpacker.reset(std::move(bucket)); + + auto& prng = expCtx()->opCtx->getClient()->getPrng(); + auto j = prng.nextInt64(_bucketMaxCount); + + if (j < _bucketUnpacker.numberOfMeasurements()) { + auto bucketId = _bucketUnpacker.bucket()[timeseries::kBucketIdFieldName]; + auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j}; + + ++_specificStats.dupsTested; + if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) { + materializeMeasurement(j, member); + ++_nSampledSoFar; + _worksSinceLastAdvanced = 0; + *out = id; + } else { + ++_specificStats.dupsDropped; + ++_worksSinceLastAdvanced; + _ws.free(id); + return PlanStage::NEED_TIME; + } + } else { + ++_specificStats.nBucketsDiscarded; + ++_worksSinceLastAdvanced; + _ws.free(id); + return PlanStage::NEED_TIME; + } + uassert(5521504, + str::stream() << kStageType << " could not find a non-duplicate measurement after " + << _worksSinceLastAdvanced << " attempts", + _worksSinceLastAdvanced < _maxConsecutiveAttempts); + } else if (PlanStage::NEED_YIELD == status) { + *out = id; + } + return status; +} +} // namespace mongo diff --git a/src/mongo/db/exec/sample_from_timeseries_bucket.h b/src/mongo/db/exec/sample_from_timeseries_bucket.h new file mode 100644 index 00000000000..2aa26aabe7a --- /dev/null +++ b/src/mongo/db/exec/sample_from_timeseries_bucket.h @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/bucket_unpacker.h" +#include "mongo/db/exec/plan_stage.h" + +namespace mongo { +/** + * This stage implements a variation on the ARHASH algorithm (see + * https://dl.acm.org/doi/10.1145/93605.98746), by running one iteration of the ARHASH algorithm to + * materialze a random measurement from a randomly sampled bucket once per doWork() call. The plan + * is constructed such that the input documents to this stage are coming from a storage-provided + * random cursor. + */ +class SampleFromTimeseriesBucket final : public PlanStage { +public: + static const char* kStageType; + + /** + * Constructs a 'SampleFromTimeseriesBucket' stage which uses 'bucketUnpacker' to materialize + * the sampled measurment from the buckets returned by the child stage. + * - 'sampleSize' is the user-requested number of documents to sample. + * - 'maxConsecutiveAttempts' configures the maximum number of consecutive "misses" when + * performing the ARHASH algorithm. A miss may happen either when we sample a duplicate, or the + * index 'j' selected by the PRNG exceeds the number of measurements in the bucket. If we miss + * enough times in a row, we throw an exception that terminates the execution of the query. + * - 'bucketMaxCount' is the maximum number of measurements allowed in a bucket, which can be + * configured via a server parameter. + */ + SampleFromTimeseriesBucket(ExpressionContext* expCtx, + WorkingSet* ws, + std::unique_ptr<PlanStage> child, + BucketUnpacker bucketUnpacker, + int maxConsecutiveAttempts, + long long sampleSize, + int bucketMaxCount); + + StageType stageType() const final { + return STAGE_SAMPLE_FROM_TIMESERIES_BUCKET; + } + + bool isEOF() final { + return _nSampledSoFar >= _sampleSize; + } + + std::unique_ptr<PlanStageStats> getStats() final; + + const SpecificStats* getSpecificStats() const final { + return &_specificStats; + } + + PlanStage::StageState doWork(WorkingSetID* id); + +private: + /** + * Carries the bucket _id and index for the measurement that was sampled. + */ + struct SampledMeasurementKey { + SampledMeasurementKey(OID bucketId, int64_t measurementIndex) + : bucketId(bucketId), measurementIndex(measurementIndex) {} + + bool operator==(const SampledMeasurementKey& key) const { + return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex; + } + + OID bucketId; + int32_t measurementIndex; + }; + + /** + * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can + * be kept track of for de-duplication after sampling. + */ + struct SampledMeasurementKeyHasher { + size_t operator()(const SampledMeasurementKey& s) const { + return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^ + absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^ + absl::Hash<int32_t>{}(s.measurementIndex); + } + }; + + // Tracks which measurements have been seen so far. + using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>; + + void materializeMeasurement(int64_t measurementIdx, WorkingSetMember* out); + + WorkingSet& _ws; + BucketUnpacker _bucketUnpacker; + SampleFromTimeseriesBucketStats _specificStats; + + const int _maxConsecutiveAttempts; + const long long _sampleSize; + const int _bucketMaxCount; + + int _worksSinceLastAdvanced = 0; + long long _nSampledSoFar = 0; + + // Used to de-duplicate randomly sampled measurements. + SeenSet _seenSet; +}; +} // namespace mongo diff --git a/src/mongo/db/exec/unpack_timeseries_bucket.cpp b/src/mongo/db/exec/unpack_timeseries_bucket.cpp new file mode 100644 index 00000000000..bbd4b61cf12 --- /dev/null +++ b/src/mongo/db/exec/unpack_timeseries_bucket.cpp @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/exec/unpack_timeseries_bucket.h" + +namespace mongo { +namespace { + +void transitionToOwnedObj(Document&& doc, WorkingSetMember* member) { + member->keyData.clear(); + member->recordId = {}; + member->doc = {{}, std::move(doc)}; + member->transitionToOwnedObj(); +} +} // namespace + +const char* UnpackTimeseriesBucket::kStageType = "UNPACK_BUCKET"; + +UnpackTimeseriesBucket::UnpackTimeseriesBucket(ExpressionContext* expCtx, + WorkingSet* ws, + std::unique_ptr<PlanStage> child, + BucketUnpacker bucketUnpacker) + : PlanStage{kStageType, expCtx}, _ws{*ws}, _bucketUnpacker{std::move(bucketUnpacker)} { + _children.emplace_back(std::move(child)); +} + +std::unique_ptr<PlanStageStats> UnpackTimeseriesBucket::getStats() { + _commonStats.isEOF = isEOF(); + auto ret = std::make_unique<PlanStageStats>(_commonStats, stageType()); + ret->specific = std::make_unique<UnpackTimeseriesBucketStats>(_specificStats); + ret->children.emplace_back(child()->getStats()); + return ret; +} + +PlanStage::StageState UnpackTimeseriesBucket::doWork(WorkingSetID* out) { + if (isEOF()) { + return PlanStage::IS_EOF; + } + + if (!_bucketUnpacker.hasNext()) { + auto id = WorkingSet::INVALID_ID; + auto status = child()->work(&id); + + if (PlanStage::ADVANCED == status) { + auto member = _ws.get(id); + + // Make an owned copy of the bucket document if necessary. The bucket will be unwound + // across multiple calls to 'doWork()', so we need to hold our own copy in the query + // execution layer in case the storage engine reclaims the memory for the bucket between + // calls to 'doWork()'. + auto ownedBucket = member->doc.value().toBson().getOwned(); + _bucketUnpacker.reset(std::move(ownedBucket)); + + auto measurement = _bucketUnpacker.getNext(); + transitionToOwnedObj(std::move(measurement), member); + ++_specificStats.nBucketsUnpacked; + + *out = id; + } else if (PlanStage::NEED_YIELD == status) { + *out = id; + } + return status; + } + + auto measurement = _bucketUnpacker.getNext(); + *out = _ws.allocate(); + auto member = _ws.get(*out); + + transitionToOwnedObj(std::move(measurement), member); + + return PlanStage::ADVANCED; +} +} // namespace mongo diff --git a/src/mongo/db/exec/unpack_timeseries_bucket.h b/src/mongo/db/exec/unpack_timeseries_bucket.h new file mode 100644 index 00000000000..84a1392369e --- /dev/null +++ b/src/mongo/db/exec/unpack_timeseries_bucket.h @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/bucket_unpacker.h" +#include "mongo/db/exec/plan_stage.h" + +namespace mongo { +/** + * This PlanStage is the analog of DocumentSourceInternalUnpackBucket, but in the PlanStage layer. + * It fetches a bucket from it's child as an owned BSONObj and uses the BucketUnpacker to + * materialize time-series measurements until the time-series bucket collection is exhausted. + */ +class UnpackTimeseriesBucket final : public PlanStage { +public: + static const char* kStageType; + + UnpackTimeseriesBucket(ExpressionContext* expCtx, + WorkingSet* ws, + std::unique_ptr<PlanStage> child, + BucketUnpacker bucketUnpacker); + + StageType stageType() const final { + return STAGE_UNPACK_TIMESERIES_BUCKET; + } + + bool isEOF() final { + return !_bucketUnpacker.hasNext() && child()->isEOF(); + } + + std::unique_ptr<PlanStageStats> getStats() final; + + const SpecificStats* getSpecificStats() const final { + return &_specificStats; + } + + PlanStage::StageState doWork(WorkingSetID* id); + +private: + WorkingSet& _ws; + BucketUnpacker _bucketUnpacker; + UnpackTimeseriesBucketStats _specificStats; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index c9c8ffae144..bbfe3182218 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -276,6 +276,7 @@ pipelineEnv.Library( '$BUILD_DIR/mongo/db/bson/dotted_path_support', '$BUILD_DIR/mongo/db/curop', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', + '$BUILD_DIR/mongo/db/exec/bucket_unpacker', '$BUILD_DIR/mongo/db/exec/document_value/document_value', '$BUILD_DIR/mongo/db/exec/projection_executor', '$BUILD_DIR/mongo/db/exec/scoped_timer', diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp index 8e2e81d2d14..961ca3d610b 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.cpp @@ -33,7 +33,6 @@ #include "mongo/db/pipeline/document_source_internal_unpack_bucket.h" -#include "mongo/bson/bsonobj.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/matcher/expression.h" #include "mongo/db/matcher/expression_algo.h" @@ -59,59 +58,6 @@ REGISTER_DOCUMENT_SOURCE(_internalUnpackBucket, namespace { /** - * Removes metaField from the field set and returns a boolean indicating whether metaField should be - * included in the materialized measurements. Always returns false if metaField does not exist. - */ -auto eraseMetaFromFieldSetAndDetermineIncludeMeta(BucketUnpacker::Behavior unpackerBehavior, - BucketSpec* bucketSpec) { - if (!bucketSpec->metaField) { - return false; - } else if (auto itr = bucketSpec->fieldSet.find(*bucketSpec->metaField); - itr != bucketSpec->fieldSet.end()) { - bucketSpec->fieldSet.erase(itr); - return unpackerBehavior == BucketUnpacker::Behavior::kInclude; - } else { - return unpackerBehavior == BucketUnpacker::Behavior::kExclude; - } -} - -/** - * Determine if timestamp values should be included in the materialized measurements. - */ -auto determineIncludeTimeField(BucketUnpacker::Behavior unpackerBehavior, BucketSpec* bucketSpec) { - return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == - (bucketSpec->fieldSet.find(bucketSpec->timeField) != bucketSpec->fieldSet.end()); -} - -/** - * Determine if an arbitrary field should be included in the materialized measurements. - */ -auto determineIncludeField(StringData fieldName, - BucketUnpacker::Behavior unpackerBehavior, - const BucketSpec& bucketSpec) { - return (unpackerBehavior == BucketUnpacker::Behavior::kInclude) == - (bucketSpec.fieldSet.find(fieldName.toString()) != bucketSpec.fieldSet.end()); -} - -/** - * Erase computed meta projection fields if they are present in the exclusion field set. - */ -void eraseExcludedComputedMetaProjFields(BucketUnpacker::Behavior unpackerBehavior, - BucketSpec* bucketSpec) { - if (unpackerBehavior == BucketUnpacker::Behavior::kExclude && - bucketSpec->computedMetaProjFields.size() > 0) { - for (auto it = bucketSpec->computedMetaProjFields.begin(); - it != bucketSpec->computedMetaProjFields.end();) { - if (bucketSpec->fieldSet.find(*it) != bucketSpec->fieldSet.end()) { - it = bucketSpec->computedMetaProjFields.erase(it); - } else { - it++; - } - } - } -} - -/** * A projection can be internalized if every field corresponds to a boolean value. Note that this * correctly rejects dotted fieldnames, which are mapped to objects internally. */ @@ -234,190 +180,6 @@ void optimizePrefix(Pipeline::SourceContainer::iterator itr, Pipeline::SourceCon } } // namespace -// Calculates the number of measurements in a bucket given the 'targetTimestampObjSize' using the -// 'BucketUnpacker::kTimestampObjSizeTable' table. If the 'targetTimestampObjSize' hits a record in -// the table, this helper returns the measurement count corresponding to the table record. -// Otherwise, the 'targetTimestampObjSize' is used to probe the table for the smallest {b_i, S_i} -// pair such that 'targetTimestampObjSize' < S_i. Once the interval is found, the upper bound of the -// pair for the interval is computed and then linear interpolation is used to compute the -// measurement count corresponding to the 'targetTimestampObjSize' provided. -int BucketUnpacker::computeMeasurementCount(int targetTimestampObjSize) { - auto currentInterval = - std::find_if(std::begin(BucketUnpacker::kTimestampObjSizeTable), - std::end(BucketUnpacker::kTimestampObjSizeTable), - [&](const auto& entry) { return targetTimestampObjSize <= entry.second; }); - - if (currentInterval->second == targetTimestampObjSize) { - return currentInterval->first; - } - // This points to the first interval larger than the target 'targetTimestampObjSize', the actual - // interval that will cover the object size is the interval before the current one. - tassert(5422104, - "currentInterval should not point to the first table entry", - currentInterval > BucketUnpacker::kTimestampObjSizeTable.begin()); - --currentInterval; - - auto nDigitsInRowKey = 1 + (currentInterval - BucketUnpacker::kTimestampObjSizeTable.begin()); - - return currentInterval->first + - ((targetTimestampObjSize - currentInterval->second) / (10 + nDigitsInRowKey)); -} - -void BucketUnpacker::reset(BSONObj&& bucket) { - _fieldIters.clear(); - _timeFieldIter = boost::none; - - _bucket = std::move(bucket); - uassert(5346510, "An empty bucket cannot be unpacked", !_bucket.isEmpty()); - tassert(5346701, - "The $_internalUnpackBucket stage requires the bucket to be owned", - _bucket.isOwned()); - - auto&& dataRegion = _bucket.getField(timeseries::kBucketDataFieldName).Obj(); - if (dataRegion.isEmpty()) { - // If the data field of a bucket is present but it holds an empty object, there's nothing to - // unpack. - return; - } - - auto&& timeFieldElem = dataRegion.getField(_spec.timeField); - uassert(5346700, - "The $_internalUnpackBucket stage requires the data region to have a timeField object", - timeFieldElem); - - _timeFieldIter = BSONObjIterator{timeFieldElem.Obj()}; - - _metaValue = _bucket[timeseries::kBucketMetaFieldName]; - if (_spec.metaField) { - // The spec indicates that there might be a metadata region. Missing metadata in - // measurements is expressed with missing metadata in a bucket. But we disallow undefined - // since the undefined BSON type is deprecated. - uassert(5369600, - "The $_internalUnpackBucket stage allows metadata to be absent or otherwise, it " - "must not be the deprecated undefined bson type", - !_metaValue || _metaValue.type() != BSONType::Undefined); - } else { - // If the spec indicates that the time series collection has no metadata field, then we - // should not find a metadata region in the underlying bucket documents. - uassert(5369601, - "The $_internalUnpackBucket stage expects buckets to have missing metadata regions " - "if the metaField parameter is not provided", - !_metaValue); - } - - // Walk the data region of the bucket, and decide if an iterator should be set up based on the - // include or exclude case. - for (auto&& elem : dataRegion) { - auto& colName = elem.fieldNameStringData(); - if (colName == _spec.timeField) { - // Skip adding a FieldIterator for the timeField since the timestamp value from - // _timeFieldIter can be placed accordingly in the materialized measurement. - continue; - } - - // Includes a field when '_unpackerBehavior' is 'kInclude' and it's found in 'fieldSet' or - // _unpackerBehavior is 'kExclude' and it's not found in 'fieldSet'. - if (determineIncludeField(colName, _unpackerBehavior, _spec)) { - _fieldIters.emplace_back(colName.toString(), BSONObjIterator{elem.Obj()}); - } - } - - // Update computed meta projections with values from this bucket. - if (!_spec.computedMetaProjFields.empty()) { - for (auto&& name : _spec.computedMetaProjFields) { - _computedMetaProjections[name] = _bucket[name]; - } - } - - // Save the measurement count for the owned bucket. - _numberOfMeasurements = computeMeasurementCount(timeFieldElem.objsize()); -} - -void BucketUnpacker::setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior) { - _includeMetaField = eraseMetaFromFieldSetAndDetermineIncludeMeta(behavior, &bucketSpec); - _includeTimeField = determineIncludeTimeField(behavior, &bucketSpec); - _unpackerBehavior = behavior; - eraseExcludedComputedMetaProjFields(behavior, &bucketSpec); - _spec = std::move(bucketSpec); -} - -Document BucketUnpacker::getNext() { - tassert(5422100, "'getNext()' was called after the bucket has been exhausted", hasNext()); - - auto measurement = MutableDocument{}; - auto&& timeElem = _timeFieldIter->next(); - if (_includeTimeField) { - measurement.addField(_spec.timeField, Value{timeElem}); - } - - // Includes metaField when we're instructed to do so and metaField value exists. - if (_includeMetaField && _metaValue) { - measurement.addField(*_spec.metaField, Value{_metaValue}); - } - - auto& currentIdx = timeElem.fieldNameStringData(); - for (auto&& [colName, colIter] : _fieldIters) { - if (auto&& elem = *colIter; colIter.more() && elem.fieldNameStringData() == currentIdx) { - measurement.addField(colName, Value{elem}); - colIter.advance(elem); - } - } - - // Add computed meta projections. - for (auto&& name : _spec.computedMetaProjFields) { - measurement.addField(name, Value{_computedMetaProjections[name]}); - } - - return measurement.freeze(); -} - -Document BucketUnpacker::extractSingleMeasurement(int j) { - tassert(5422101, - "'extractSingleMeasurment' expects j to be greater than or equal to zero and less than " - "or equal to the number of measurements in a bucket", - j >= 0 && j < _numberOfMeasurements); - - auto measurement = MutableDocument{}; - - auto rowKey = std::to_string(j); - auto targetIdx = StringData{rowKey}; - auto&& dataRegion = _bucket.getField(timeseries::kBucketDataFieldName).Obj(); - - if (_includeMetaField && !_metaValue.isNull()) { - measurement.addField(*_spec.metaField, Value{_metaValue}); - } - - for (auto&& dataElem : dataRegion) { - auto colName = dataElem.fieldNameStringData(); - if (!determineIncludeField(colName, _unpackerBehavior, _spec)) { - continue; - } - auto value = dataElem[targetIdx]; - if (value) { - measurement.addField(dataElem.fieldNameStringData(), Value{value}); - } - } - - // Add computed meta projections. - for (auto&& name : _spec.computedMetaProjFields) { - measurement.addField(name, Value{_computedMetaProjections[name]}); - } - - return measurement.freeze(); -} - -const std::set<StringData> BucketUnpacker::reservedBucketFieldNames = { - timeseries::kBucketIdFieldName, - timeseries::kBucketDataFieldName, - timeseries::kBucketMetaFieldName, - timeseries::kBucketControlFieldName}; - -void BucketUnpacker::addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames) { - for (auto&& field : computedFieldNames) { - _spec.computedMetaProjFields.emplace_back(field.toString()); - } -} - DocumentSourceInternalUnpackBucket::DocumentSourceInternalUnpackBucket( const boost::intrusive_ptr<ExpressionContext>& expCtx, BucketUnpacker bucketUnpacker) : DocumentSource(kStageName, expCtx), _bucketUnpacker(std::move(bucketUnpacker)) {} @@ -482,15 +244,8 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalUnpackBucket::createF "The $_internalUnpackBucket stage requires a timeField parameter", specElem[timeseries::kTimeFieldName].ok()); - auto includeTimeField = determineIncludeTimeField(unpackerBehavior, &bucketSpec); - - auto includeMetaField = - eraseMetaFromFieldSetAndDetermineIncludeMeta(unpackerBehavior, &bucketSpec); - return make_intrusive<DocumentSourceInternalUnpackBucket>( - expCtx, - BucketUnpacker{ - std::move(bucketSpec), unpackerBehavior, includeTimeField, includeMetaField}); + expCtx, BucketUnpacker{std::move(bucketSpec), unpackerBehavior}); } void DocumentSourceInternalUnpackBucket::serializeToArray( @@ -524,65 +279,8 @@ void DocumentSourceInternalUnpackBucket::serializeToArray( } } -DocumentSource::GetNextResult -DocumentSourceInternalUnpackBucket::sampleUniqueMeasurementFromBuckets() { - const auto kMaxAttempts = 100; - for (auto attempt = 0; attempt < kMaxAttempts; ++attempt) { - auto randResult = pSource->getNext(); - switch (randResult.getStatus()) { - case GetNextResult::ReturnStatus::kAdvanced: { - auto bucket = randResult.getDocument().toBson(); - _bucketUnpacker.reset(std::move(bucket)); - - auto& prng = pExpCtx->opCtx->getClient()->getPrng(); - auto j = prng.nextInt64(_bucketMaxCount); - - if (j < _bucketUnpacker.numberOfMeasurements()) { - auto sampledDocument = _bucketUnpacker.extractSingleMeasurement(j); - - auto bucketId = _bucketUnpacker.bucket()[timeseries::kBucketIdFieldName]; - auto bucketIdMeasurementIdxKey = SampledMeasurementKey{bucketId.OID(), j}; - - if (_seenSet.insert(std::move(bucketIdMeasurementIdxKey)).second) { - _nSampledSoFar++; - return sampledDocument; - } else { - LOGV2_DEBUG( - 5422102, - 1, - "$_internalUnpackBucket optimized for sample saw duplicate measurement", - "measurementIndex"_attr = j, - "bucketId"_attr = bucketId); - } - } - break; - } - case GetNextResult::ReturnStatus::kPauseExecution: { - // This state should never be reached since the input stage is a random cursor. - MONGO_UNREACHABLE; - } - case GetNextResult::ReturnStatus::kEOF: { - return randResult; - } - } - } - uasserted(5422103, - str::stream() - << "$_internalUnpackBucket stage could not find a non-duplicate document after " - << kMaxAttempts - << " attempts while using a random cursor. This is likely a " - "sporadic failure, please try again"); -} - DocumentSource::GetNextResult DocumentSourceInternalUnpackBucket::doGetNext() { - // If the '_sampleSize' member is present, then the stage will produce randomly sampled - // documents from buckets. - if (_sampleSize) { - if (_nSampledSoFar >= _sampleSize) { - return GetNextResult::makeEOF(); - } - return sampleUniqueMeasurementFromBuckets(); - } + tassert(5521502, "calling doGetNext() when '_sampleSize' is set is disallowed", !_sampleSize); // Otherwise, fallback to unpacking every measurement in all buckets until the child stage is // exhausted. @@ -653,8 +351,7 @@ bool DocumentSourceInternalUnpackBucket::pushDownComputedMetaProjection( void DocumentSourceInternalUnpackBucket::internalizeProject(const BSONObj& project, bool isInclusion) { // 'fields' are the top-level fields to be included/excluded by the unpacker. We handle the - // special case of _id, which may be excluded in an inclusion $project (or vice versa), - // here. + // special case of _id, which may be excluded in an inclusion $project (or vice versa), here. auto fields = project.getFieldNames<std::set<std::string>>(); if (auto elt = project.getField("_id"); (elt.isBoolean() && elt.Bool() != isInclusion) || (elt.isNumber() && (elt.Int() == 1) != isInclusion)) { @@ -672,8 +369,7 @@ void DocumentSourceInternalUnpackBucket::internalizeProject(const BSONObj& proje std::pair<BSONObj, bool> DocumentSourceInternalUnpackBucket::extractOrBuildProjectToInternalize( Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) const { if (std::next(itr) == container->end() || !_bucketUnpacker.bucketSpec().fieldSet.empty()) { - // There is no project to internalize or there are already fields being - // included/excluded. + // There is no project to internalize or there are already fields being included/excluded. return {BSONObj{}, false}; } @@ -684,9 +380,9 @@ std::pair<BSONObj, bool> DocumentSourceInternalUnpackBucket::extractOrBuildProje return {existingProj, isInclusion}; } - // Attempt to get an inclusion $project representing the root-level dependencies of the - // pipeline after the $_internalUnpackBucket. If this $project is not empty, then the - // dependency set was finite. + // Attempt to get an inclusion $project representing the root-level dependencies of the pipeline + // after the $_internalUnpackBucket. If this $project is not empty, then the dependency set was + // finite. Pipeline::SourceContainer restOfPipeline(std::next(itr), container->end()); auto deps = Pipeline::getDependenciesForContainer(pExpCtx, restOfPipeline, boost::none); if (auto dependencyProj = @@ -709,18 +405,17 @@ std::unique_ptr<MatchExpression> createComparisonPredicate( auto path = matchExpr->path(); auto rhs = matchExpr->getData(); - // The control field's min and max are chosen using a field-order insensitive comparator, - // while MatchExpressions use a comparator that treats field-order as significant. Because - // of this we will not perform this optimization on queries with operands of compound types. + // The control field's min and max are chosen using a field-order insensitive comparator, while + // MatchExpressions use a comparator that treats field-order as significant. Because of this we + // will not perform this optimization on queries with operands of compound types. if (rhs.type() == BSONType::Object || rhs.type() == BSONType::Array) { return nullptr; } - // MatchExpressions have special comparison semantics regarding null, in that {$eq: null} - // will match all documents where the field is either null or missing. Because this is - // different from both the comparison semantics that InternalExprComparison expressions and - // the control's min and max fields use, we will not perform this optimization on queries - // with null operands. + // MatchExpressions have special comparison semantics regarding null, in that {$eq: null} will + // match all documents where the field is either null or missing. Because this is different from + // both the comparison semantics that InternalExprComparison expressions and the control's min + // and max fields use, we will not perform this optimization on queries with null operands. if (rhs.type() == BSONType::jstNULL) { return nullptr; } diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index c5133643874..051cc2161ca 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -31,160 +31,11 @@ #include <set> +#include "mongo/db/exec/bucket_unpacker.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_match.h" namespace mongo { - -/** - * Carries parameters for unpacking a bucket. - */ -struct BucketSpec { - // The user-supplied timestamp field name specified during time-series collection creation. - std::string timeField; - - // An optional user-supplied metadata field name specified during time-series collection - // creation. This field name is used during materialization of metadata fields of a measurement - // after unpacking. - boost::optional<std::string> metaField; - - // The set of field names in the data region that should be included or excluded. - std::set<std::string> fieldSet; - - // Vector of computed meta field projection names. Added at the end of materialized - // measurements. - std::vector<std::string> computedMetaProjFields; -}; - - -/** - * BucketUnpacker will unpack bucket fields for metadata and the provided fields. - */ -class BucketUnpacker { -public: - // A table that is useful for interpolations between the number of measurements in a bucket and - // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i, - // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the - // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are - // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99], - // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to - // exceed the server BSON object limit of 16 MB. - static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{ - {{0, BSONObj::kMinBSONLength}, - {10, 115}, - {100, 1195}, - {1000, 12895}, - {10000, 138895}, - {100000, 1488895}, - {1000000, 15888895}, - {10000000, 168888895}}}; - - /** - * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series - * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time. - */ - static int computeMeasurementCount(int targetTimestampObjSize); - - // Set of field names reserved for time-series buckets. - static const std::set<StringData> reservedBucketFieldNames; - - // When BucketUnpacker is created with kInclude it must produce measurements that contain the - // set of fields. Otherwise, if the kExclude option is used, the measurements will include the - // set difference between all fields in the bucket and the provided fields. - enum class Behavior { kInclude, kExclude }; - - BucketUnpacker(BucketSpec spec, - Behavior unpackerBehavior, - bool includeTimeField, - bool includeMetaField) - : _spec(std::move(spec)), - _unpackerBehavior(unpackerBehavior), - _includeTimeField(includeTimeField), - _includeMetaField(includeMetaField) {} - - /** - * This method will continue to materialize Documents until the bucket is exhausted. A - * precondition of this method is that 'hasNext()' must be true. - */ - Document getNext(); - - /** - * This method will extract the j-th measurement from the bucket. A precondition of this method - * is that j >= 0 && j <= the number of measurements within the underlying bucket. - */ - Document extractSingleMeasurement(int j); - - bool hasNext() const { - return _timeFieldIter && _timeFieldIter->more(); - } - - /** - * This resets the unpacker to prepare to unpack a new bucket described by the given document. - */ - void reset(BSONObj&& bucket); - - Behavior behavior() const { - return _unpackerBehavior; - } - - const BucketSpec& bucketSpec() const { - return _spec; - } - - const BSONObj& bucket() const { - return _bucket; - } - - bool includeMetaField() const { - return _includeMetaField; - } - - bool includeTimeField() const { - return _includeTimeField; - } - - int32_t numberOfMeasurements() const { - return _numberOfMeasurements; - } - - void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior); - - // Add computed meta projection names to the bucket specification. - void addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames); - -private: - BucketSpec _spec; - Behavior _unpackerBehavior; - - // Iterates the timestamp section of the bucket to drive the unpacking iteration. - boost::optional<BSONObjIterator> _timeFieldIter; - - // A flag used to mark that the timestamp value should be materialized in measurements. - bool _includeTimeField; - - // A flag used to mark that a bucket's metadata value should be materialized in measurements. - bool _includeMetaField; - - // The bucket being unpacked. - BSONObj _bucket; - - // Since the metadata value is the same across all materialized measurements we can cache the - // metadata BSONElement in the reset phase and use it to materialize the metadata in each - // measurement. - BSONElement _metaValue; - - // Iterators used to unpack the columns of the above bucket that are populated during the reset - // phase according to the provided 'Behavior' and 'BucketSpec'. - std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters; - - // Map <name, BSONElement> for the computed meta field projections. Updated for - // every bucket upon reset(). - stdx::unordered_map<std::string, BSONElement> _computedMetaProjections; - - // The number of measurements in the bucket. - int32_t _numberOfMeasurements = 0; -}; - class DocumentSourceInternalUnpackBucket : public DocumentSource { public: static constexpr StringData kStageName = "$_internalUnpackBucket"_sd; @@ -244,6 +95,10 @@ public: return boost::none; }; + BucketUnpacker bucketUnpacker() const { + return _bucketUnpacker; + } + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; @@ -323,53 +178,11 @@ public: Pipeline::SourceContainer* container); private: - /** - * Carries the bucket _id and index for the measurement that was sampled by - * 'sampleRandomBucketOptimized'. - */ - struct SampledMeasurementKey { - SampledMeasurementKey(OID bucketId, int64_t measurementIndex) - : bucketId(bucketId), measurementIndex(measurementIndex) {} - - bool operator==(const SampledMeasurementKey& key) const { - return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex; - } - - OID bucketId; - int32_t measurementIndex; - }; - - /** - * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can - * be kept track of for de-duplication after sampling. - */ - struct SampledMeasurementKeyHasher { - size_t operator()(const SampledMeasurementKey& s) const { - return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^ - absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^ - absl::Hash<int32_t>{}(s.measurementIndex); - } - }; - - // Tracks which measurements have been seen so far. This is only used when sampling is enabled - // for the purpose of de-duplicating measurements. - using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>; - GetNextResult doGetNext() final; - /** - * Keeps trying to sample a unique measurement by using the optimized ARHASH algorithm up to a - * hardcoded maximum number of attempts. If a unique measurement isn't found before the maximum - * number of tries is exhausted this method will throw. - */ - GetNextResult sampleUniqueMeasurementFromBuckets(); - BucketUnpacker _bucketUnpacker; - long long _nSampledSoFar = 0; int _bucketMaxCount = 0; boost::optional<long long> _sampleSize; - - SeenSet _seenSet; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp index 92e43593e32..9c0ca61a175 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket_test/unpack_bucket_exec_test.cpp @@ -812,280 +812,5 @@ TEST_F(InternalUnpackBucketExecTest, ParserRejectsBothIncludeAndExcludeParameter AssertionException, 5408000); } - -TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurement) { - auto expCtx = getExpCtx(); - - std::set<std::string> fields{ - "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; - auto spec = BucketSpec{ - kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; - auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true}; - - auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); - auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); - auto d3 = dateFromISOString("2020-02-17T02:00:00.000Z").getValue(); - auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" - << BSON("_id" << BSON("0" << 1 << "1" << 2 << "2" << 3) << "time" - << BSON("0" << d1 << "1" << d2 << "2" << d3) << "a" - << BSON("0" << 1 << "1" << 2 << "2" << 3) << "b" - << BSON("1" << 1 << "2" << 2))); - - unpacker.reset(std::move(bucket)); - - auto next = unpacker.extractSingleMeasurement(0); - auto expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(2); - expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, - {"_id", 3}, - {"time", d3}, - {"a", 3}, - {"b", 2}}; - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(1); - expected = Document{{"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, - {"_id", 2}, - {"time", d2}, - {"a", 2}, - {"b", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the middle element again? - next = unpacker.extractSingleMeasurement(1); - ASSERT_DOCUMENT_EQ(next, expected); -} - -TEST_F(InternalUnpackBucketExecTest, BucketUnpackerExtractSingleMeasurementSparse) { - auto expCtx = getExpCtx(); - - std::set<std::string> fields{ - "_id", kUserDefinedMetaName.toString(), kUserDefinedTimeName.toString(), "a", "b"}; - auto spec = BucketSpec{ - kUserDefinedTimeName.toString(), kUserDefinedMetaName.toString(), std::move(fields)}; - auto unpacker = BucketUnpacker{std::move(spec), BucketUnpacker::Behavior::kInclude, true, true}; - - auto d1 = dateFromISOString("2020-02-17T00:00:00.000Z").getValue(); - auto d2 = dateFromISOString("2020-02-17T01:00:00.000Z").getValue(); - auto bucket = BSON("meta" << BSON("m1" << 999 << "m2" << 9999) << "data" - << BSON("_id" << BSON("0" << 1 << "1" << 2) << "time" - << BSON("0" << d1 << "1" << d2) << "a" << BSON("0" << 1) - << "b" << BSON("1" << 1))); - - unpacker.reset(std::move(bucket)); - auto next = unpacker.extractSingleMeasurement(1); - auto expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 2}, {"time", d2}, {"b", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the same element again? - next = unpacker.extractSingleMeasurement(1); - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(0); - expected = Document{ - {"myMeta", Document{{"m1", 999}, {"m2", 9999}}}, {"_id", 1}, {"time", d1}, {"a", 1}}; - ASSERT_DOCUMENT_EQ(next, expected); - - // Can we extract the same element twice in a row? - next = unpacker.extractSingleMeasurement(0); - ASSERT_DOCUMENT_EQ(next, expected); - - next = unpacker.extractSingleMeasurement(0); - ASSERT_DOCUMENT_EQ(next, expected); -} - -class InternalUnpackBucketRandomSampleTest : public AggregationContextFixture { -protected: - BSONObj makeIncludeAllSpec() { - return BSON("$_internalUnpackBucket" - << BSON("include" << BSON_ARRAY("_id" - << "time" << kUserDefinedMetaName << "a" - << "b") - << timeseries::kTimeFieldName << kUserDefinedTimeName - << timeseries::kMetaFieldName << kUserDefinedMetaName)); - } - - boost::intrusive_ptr<DocumentSource> makeUnpackStage(const BSONObj& spec, - long long nSample, - int bucketMaxCount) { - auto ds = - DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx()); - auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get()); - unpack->setSampleParameters(nSample, bucketMaxCount); - return unpack; - } - - boost::intrusive_ptr<DocumentSource> makeInternalUnpackBucketSample(int nSample, - int nBuckets, - int nMeasurements) { - auto spec = makeIncludeAllSpec(); - generateBuckets(nBuckets, nMeasurements); - auto ds = - DocumentSourceInternalUnpackBucket::createFromBson(spec.firstElement(), getExpCtx()); - auto unpack = dynamic_cast<DocumentSourceInternalUnpackBucket*>(ds.get()); - unpack->setSampleParameters(nSample, 1000); - return unpack; - } - - boost::intrusive_ptr<DocumentSource> prepareMock() { - auto mock = DocumentSourceMock::createForTest(getExpCtx()); - for (auto&& b : _buckets) { - mock->push_back(DocumentSource::GetNextResult{std::move(b)}); - } - return mock; - } - - Document makeBucketPart(int nMeasurements, std::function<Value(int)> gen) { - auto doc = MutableDocument{}; - for (auto i = 0; i < nMeasurements; ++i) { - doc.addField(std::to_string(i), gen(i)); - } - return doc.freeze(); - } - - void generateBuckets(int nBuckets, int nMeasurements) { - auto& prng = getExpCtx()->opCtx->getClient()->getPrng(); - std::vector<Document> buckets; - for (auto m = 0; m < nBuckets; m++) { - auto idDoc = makeBucketPart(nMeasurements, [](int i) { return Value{OID::gen()}; }); - auto timeDoc = makeBucketPart(nMeasurements, [](int i) { return Value{Date_t{}}; }); - auto aCol = makeBucketPart(nMeasurements, - [&](int i) { return Value{prng.nextCanonicalDouble()}; }); - buckets.push_back({Document{ - {"_id", Value{OID::gen()}}, - {"meta", Document{{"m1", m}, {"m2", m + 1}}}, - {"data", - Document{{"_id", idDoc}, {"time", std::move(timeDoc)}, {"a", std::move(aCol)}}}}}); - } - - _buckets = std::move(buckets); - } - -private: - std::vector<Document> _buckets; -}; - -TEST_F(InternalUnpackBucketRandomSampleTest, SampleHasExpectedStatProperties) { - auto unpack = makeInternalUnpackBucketSample(100, 1000, 1000); - auto mock = prepareMock(); - unpack->setSource(mock.get()); - - auto next = unpack->getNext(); - ASSERT_TRUE(next.isAdvanced()); - - auto avg = 0.0; - auto nSampled = 0; - while (next.isAdvanced()) { - avg += next.getDocument()["a"].getDouble(); - next = unpack->getNext(); - nSampled++; - } - avg /= nSampled; - ASSERT_EQ(nSampled, 100); - - // The average for the uniform distribution on [0, 1) is ~0.5, and the stdev is sqrt(1/12). - // We will check if the avg is between +/- 2*sqrt(1/12). - auto stddev = std::sqrt(1.0 / 12.0); - ASSERT_GT(avg, 0.5 - 2 * stddev); - ASSERT_LT(avg, 0.5 + 2 * stddev); -} - -TEST_F(InternalUnpackBucketRandomSampleTest, SampleIgnoresDuplicates) { - auto spec = BSON("$_internalUnpackBucket" - << BSON("include" << BSON_ARRAY("_id" - << "time" << kUserDefinedMetaName << "a" - << "b") - << timeseries::kTimeFieldName << kUserDefinedTimeName - << timeseries::kMetaFieldName << kUserDefinedMetaName)); - - // Make an unpack bucket stage initialized with a sample size of 2 and bucketMaxCount of 1. - auto unpack = makeUnpackStage(spec, 2, 1); - - // Fill mock with duplicate buckets to simulate random sampling the same buckets over and over - // again until the 'kMaxAttempts' are reached in 'doGetNext'. - auto mock = DocumentSourceMock::createForTest(getExpCtx()); - for (auto i = 0; i < 101; ++i) { - mock->push_back(Document{{"_id", Value{OID::createFromString("000000000000000000000001")}}, - {"meta", Document{{"m1", 1}, {"m2", 2}}}, - {"data", - Document{{"_id", Document{{"0", 1}}}, - {"time", Document{{"0", Date_t::now()}}}, - {"a", Document{{"0", 1}}}}}}); - } - unpack->setSource(mock.get()); - - // The sample size is 2 and there's only one unique measurement in the mock. The second - // 'getNext' call should spin until the it reaches 'kMaxAttempts' of tries and then throw. - ASSERT_TRUE(unpack->getNext().isAdvanced()); - ASSERT_THROWS_CODE(unpack->getNext(), AssertionException, 5422103); -} - -namespace { -/** - * Manually computes the timestamp object size for n timestamps. - */ -auto expectedTimestampObjSize(int32_t rowKeyOffset, int32_t n) { - BSONObjBuilder bob; - for (auto i = 0; i < n; ++i) { - bob.appendDate(std::to_string(i + rowKeyOffset), Date_t::now()); - } - return bob.done().objsize(); -} -} // namespace - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountLowerBoundsAreCorrect) { - // The last table entry is a sentinel for an upper bound on the interval that covers measurement - // counts up to 16 MB. - const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; - - // Test the case when the target size hits a table entry which represents the lower bound of an - // interval. - for (size_t index = 0; index < maxTableEntry; ++index) { - auto interval = BucketUnpacker::kTimestampObjSizeTable[index]; - ASSERT_EQ(interval.first, BucketUnpacker::computeMeasurementCount(interval.second)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountUpperBoundsAreCorrect) { - const auto maxTableEntry = BucketUnpacker::kTimestampObjSizeTable.size() - 1; - - // The lower bound sizes of each interval in the kTimestampObjSizeTable are hardcoded. Use this - // fact and walk the table backwards to check the correctness of the S_i'th interval's upper - // bound by using the lower bound size for the S_i+1 interval and subtracting the BSONObj size - // containing one timestamp with the appropriate rowKey. - std::pair<int, int> currentInterval; - auto currentIntervalSize = 0; - auto currentIntervalCount = 0; - auto size = 0; - for (size_t index = maxTableEntry; index > 0; --index) { - currentInterval = BucketUnpacker::kTimestampObjSizeTable[index]; - currentIntervalSize = currentInterval.second; - currentIntervalCount = currentInterval.first; - auto rowKey = currentIntervalCount - 1; - size = expectedTimestampObjSize(rowKey, 1); - // We need to add back the kMinBSONLength since it's subtracted out. - ASSERT_EQ(currentIntervalCount - 1, - BucketUnpacker::computeMeasurementCount(currentIntervalSize - size + - BSONObj::kMinBSONLength)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountAllPointsInSmallerIntervals) { - // Test all values for some of the smaller intervals up to 100 measurements. - for (auto bucketCount = 0; bucketCount < 25; ++bucketCount) { - auto size = expectedTimestampObjSize(0, bucketCount); - ASSERT_EQ(bucketCount, BucketUnpacker::computeMeasurementCount(size)); - } -} - -TEST_F(InternalUnpackBucketExecTest, ComputeMeasurementCountInLargerIntervals) { - ASSERT_EQ(2222, BucketUnpacker::computeMeasurementCount(30003)); - ASSERT_EQ(11111, BucketUnpacker::computeMeasurementCount(155560)); - ASSERT_EQ(449998, BucketUnpacker::computeMeasurementCount(7088863)); -} } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 1750ce38027..12131e95780 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -45,8 +45,10 @@ #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/multi_iterator.h" #include "mongo/db/exec/queued_data_stage.h" +#include "mongo/db/exec/sample_from_timeseries_bucket.h" #include "mongo/db/exec/shard_filter.h" #include "mongo/db/exec/trial_stage.h" +#include "mongo/db/exec/unpack_timeseries_bucket.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/matcher/extensions_callback_real.h" @@ -101,7 +103,8 @@ StatusWith<std::pair<unique_ptr<PlanExecutor, PlanExecutor::Deleter>, bool>> createRandomCursorExecutor(const CollectionPtr& coll, const boost::intrusive_ptr<ExpressionContext>& expCtx, long long sampleSize, - long long numRecords) { + long long numRecords, + boost::optional<BucketUnpacker> bucketUnpacker) { OperationContext* opCtx = expCtx->opCtx; // Verify that we are already under a collection lock. We avoid taking locks ourselves in this @@ -139,14 +142,14 @@ createRandomCursorExecutor(const CollectionPtr& coll, // cursor may have been mistaken. For sharded collections, build a TRIAL plan that will switch // to a collection scan if the ratio of orphaned to owned documents encountered over the first // 100 works() is such that we would have chosen not to optimize. - if (collectionFilter.isSharded()) { + static const size_t kMaxPresampleSize = 100; + if (collectionFilter.isSharded() && !expCtx->ns.isTimeseriesBucketsCollection()) { // The ratio of owned to orphaned documents must be at least equal to the ratio between the // requested sampleSize and the maximum permitted sampleSize for the original constraints to // be satisfied. For instance, if there are 200 documents and the sampleSize is 5, then at // least (5 / (200*0.05)) = (5/10) = 50% of those documents must be owned. If less than 5% // of the documents in the collection are owned, we default to the backup plan. - static const size_t kMaxPresampleSize = 100; - const auto minWorkAdvancedRatio = std::max( + const auto minAdvancedToWorkRatio = std::max( sampleSize / (numRecords * kMaxSampleRatioForRandCursor), kMaxSampleRatioForRandCursor); // The trial plan is SHARDING_FILTER-MULTI_ITERATOR. auto randomCursorPlan = std::make_unique<ShardFilterStage>( @@ -162,7 +165,65 @@ createRandomCursorExecutor(const CollectionPtr& coll, std::move(randomCursorPlan), std::move(collScanPlan), kMaxPresampleSize, - minWorkAdvancedRatio); + minAdvancedToWorkRatio); + trialStage = static_cast<TrialStage*>(root.get()); + } else if (expCtx->ns.isTimeseriesBucketsCollection()) { + // Use a 'TrialStage' to run a trial between 'SampleFromTimeseriesBucket' and + // 'UnpackTimeseriesBucket' with $sample left in the pipeline in-place. If the buckets are + // not sufficiently full, or the 'SampleFromTimeseriesBucket' plan draws too many + // duplicates, then we will fall back to the 'TrialStage' backup plan. This backup plan uses + // the top-k sort sampling approach. + // + // Suppose the 'gTimeseriesBucketMaxCount' is 1000, but each bucket only contains 500 + // documents on average. The observed trial advanced/work ratio approximates the average + // bucket fullness, noted here as "abf". In this example, abf = 500 / 1000 = 0.5. + // Experiments have shown that the optimized 'SampleFromTimeseriesBucket' algorithm performs + // better than backup plan when + // + // sampleSize < 0.02 * abf * numRecords * gTimeseriesBucketMaxCount + // + // This inequality can be rewritten as + // + // abf > sampleSize / (0.02 * numRecords * gTimeseriesBucketMaxCount) + // + // Therefore, if the advanced/work ratio exceeds this threshold, we will use the + // 'SampleFromTimeseriesBucket' plan. Note that as the sample size requested by the user + // becomes larger with respect to the number of buckets, we require a higher advanced/work + // ratio in order to justify using 'SampleFromTimeseriesBucket'. + // + // Additionally, we require the 'TrialStage' to approximate the abf as at least 0.25. When + // buckets are mostly empty, the 'SampleFromTimeseriesBucket' will be inefficient due to a + // lot of sampling "misses". + static const auto kCoefficient = 0.02; + static const auto kMinBucketFullness = 0.25; + const auto minAdvancedToWorkRatio = std::max( + std::min(sampleSize / (kCoefficient * numRecords * gTimeseriesBucketMaxCount), 1.0), + kMinBucketFullness); + + auto arhashPlan = std::make_unique<SampleFromTimeseriesBucket>( + expCtx.get(), + ws.get(), + std::move(root), + *bucketUnpacker, + // By using a quantity slightly higher than 'kMaxPresampleSize', we ensure that the + // 'SampleFromTimeseriesBucket' stage won't fail due to too many consecutive sampling + // attempts during the 'TrialStage's trial period. + kMaxPresampleSize + 5, + sampleSize, + gTimeseriesBucketMaxCount); + + std::unique_ptr<PlanStage> collScanPlan = std::make_unique<CollectionScan>( + expCtx.get(), coll, CollectionScanParams{}, ws.get(), nullptr); + + auto topkSortPlan = std::make_unique<UnpackTimeseriesBucket>( + expCtx.get(), ws.get(), std::move(collScanPlan), *bucketUnpacker); + + root = std::make_unique<TrialStage>(expCtx.get(), + ws.get(), + std::move(arhashPlan), + std::move(topkSortPlan), + kMaxPresampleSize, + minAdvancedToWorkRatio); trialStage = static_cast<TrialStage*>(root.get()); } @@ -365,32 +426,38 @@ PipelineD::buildInnerQueryExecutorSample(DocumentSourceSample* sampleStage, const long long sampleSize = sampleStage->getSampleSize(); const long long numRecords = collection->getRecordStore()->numRecords(expCtx->opCtx); - auto&& [exec, isStorageOptimizedSample] = - uassertStatusOK(createRandomCursorExecutor(collection, expCtx, sampleSize, numRecords)); + + boost::optional<BucketUnpacker> bucketUnpacker; + if (unpackBucketStage) { + bucketUnpacker = unpackBucketStage->bucketUnpacker(); + } + auto&& [exec, isStorageOptimizedSample] = uassertStatusOK(createRandomCursorExecutor( + collection, expCtx, sampleSize, numRecords, std::move(bucketUnpacker))); AttachExecutorCallback attachExecutorCallback; if (exec) { - if (isStorageOptimizedSample) { - if (!unpackBucketStage) { + if (!unpackBucketStage) { + if (isStorageOptimizedSample) { // Replace $sample stage with $sampleFromRandomCursor stage. pipeline->popFront(); std::string idString = collection->ns().isOplog() ? "ts" : "_id"; pipeline->addInitialSource(DocumentSourceSampleFromRandomCursor::create( expCtx, sampleSize, idString, numRecords)); - } else { + } + } else { + if (isStorageOptimizedSample) { // If there are non-nullptrs for 'sampleStage' and 'unpackBucketStage', then // 'unpackBucketStage' is at the front of the pipeline immediately followed by a - // 'sampleStage'. Coalesce a $_internalUnpackBucket followed by a $sample. - unpackBucketStage->setSampleParameters(sampleSize, gTimeseriesBucketMaxCount); - sources.erase(std::next(sources.begin())); - - // Fix the source for the next stage by pointing it to the $_internalUnpackBucket - // stage. - auto sourcesIt = sources.begin(); - if (std::next(sourcesIt) != sources.end()) { - ++sourcesIt; - (*sourcesIt)->setSource(unpackBucketStage); - } + // 'sampleStage'. We need to use a TrialStage approach to handle a problem where + // ARHASH sampling can fail due to small measurement counts. We can push sampling + // and bucket unpacking down to the PlanStage layer and erase $_internalUnpackBucket + // and $sample. + sources.erase(sources.begin()); + sources.erase(sources.begin()); + } else { + // The TrialStage chose the backup plan and we need to erase just the + // $_internalUnpackBucket stage and leave $sample where it is. + sources.erase(sources.begin()); } } diff --git a/src/mongo/db/query/classic_stage_builder.cpp b/src/mongo/db/query/classic_stage_builder.cpp index bc9a9ab756b..ea83f27bd56 100644 --- a/src/mongo/db/query/classic_stage_builder.cpp +++ b/src/mongo/db/query/classic_stage_builder.cpp @@ -416,9 +416,11 @@ std::unique_ptr<PlanStage> ClassicStageBuilder::build(const QuerySolutionNode* r case STAGE_MULTI_PLAN: case STAGE_QUEUED_DATA: case STAGE_RECORD_STORE_FAST_COUNT: + case STAGE_SAMPLE_FROM_TIMESERIES_BUCKET: case STAGE_SUBPLAN: case STAGE_TRIAL: case STAGE_UNKNOWN: + case STAGE_UNPACK_TIMESERIES_BUCKET: case STAGE_UPDATE: { LOGV2_WARNING(4615604, "Can't build exec tree for node", "node"_attr = *root); } diff --git a/src/mongo/db/query/plan_explainer_impl.cpp b/src/mongo/db/query/plan_explainer_impl.cpp index f75c88d140b..17023980407 100644 --- a/src/mongo/db/query/plan_explainer_impl.cpp +++ b/src/mongo/db/query/plan_explainer_impl.cpp @@ -427,6 +427,15 @@ void statsToBSON(const PlanStageStats& stats, bob->appendNumber("nCounted", spec->nCounted); bob->appendNumber("nSkipped", spec->nSkipped); } + } else if (STAGE_SAMPLE_FROM_TIMESERIES_BUCKET == stats.stageType) { + SampleFromTimeseriesBucketStats* spec = + static_cast<SampleFromTimeseriesBucketStats*>(stats.specific.get()); + + if (verbosity >= ExplainOptions::Verbosity::kExecStats) { + bob->appendNumber("nBucketsDiscarded", static_cast<long long>(spec->nBucketsDiscarded)); + bob->appendNumber("dupsTested", static_cast<long long>(spec->dupsTested)); + bob->appendNumber("dupsDropped", static_cast<long long>(spec->dupsDropped)); + } } else if (STAGE_SHARDING_FILTER == stats.stageType) { ShardingFilterStats* spec = static_cast<ShardingFilterStats*>(stats.specific.get()); @@ -477,6 +486,13 @@ void statsToBSON(const PlanStageStats& stats, if (verbosity >= ExplainOptions::Verbosity::kExecStats) { bob->appendNumber("docsExamined", static_cast<long long>(spec->fetches)); } + } else if (STAGE_UNPACK_TIMESERIES_BUCKET == stats.stageType) { + UnpackTimeseriesBucketStats* spec = + static_cast<UnpackTimeseriesBucketStats*>(stats.specific.get()); + + if (verbosity >= ExplainOptions::Verbosity::kExecStats) { + bob->appendNumber("nBucketsUnpacked", static_cast<long long>(spec->nBucketsUnpacked)); + } } else if (STAGE_UPDATE == stats.stageType) { UpdateStats* spec = static_cast<UpdateStats*>(stats.specific.get()); diff --git a/src/mongo/db/query/stage_types.cpp b/src/mongo/db/query/stage_types.cpp index 6d3b0f1fd2b..9c54f668495 100644 --- a/src/mongo/db/query/stage_types.cpp +++ b/src/mongo/db/query/stage_types.cpp @@ -61,6 +61,7 @@ StringData stageTypeToString(StageType stageType) { {STAGE_QUEUED_DATA, "QUEUED_DATA"_sd}, {STAGE_RECORD_STORE_FAST_COUNT, "RECORD_STORE_FAST_COUNT"_sd}, {STAGE_RETURN_KEY, "RETURN_KEY"_sd}, + {STAGE_SAMPLE_FROM_TIMESERIES_BUCKET, "SAMPLE_FROM_TIMESERIES_BUCKET"_sd}, {STAGE_SHARDING_FILTER, "SHARDING_FILTER"_sd}, {STAGE_SKIP, "SKIP"_sd}, {STAGE_SORT_DEFAULT, "SORT"_sd}, @@ -72,6 +73,7 @@ StringData stageTypeToString(StageType stageType) { {STAGE_TEXT_MATCH, "TEXT_MATCH"_sd}, {STAGE_TRIAL, "TRIAL"_sd}, {STAGE_UNKNOWN, "UNKNOWN"_sd}, + {STAGE_UNPACK_TIMESERIES_BUCKET, "UNPACK_TIMESERIES_BUCKET"_sd}, {STAGE_UPDATE, "UPDATE"_sd}, }; if (auto it = kStageTypesMap.find(stageType); it != kStageTypesMap.end()) { diff --git a/src/mongo/db/query/stage_types.h b/src/mongo/db/query/stage_types.h index a0dc411028b..fd3ce6fa261 100644 --- a/src/mongo/db/query/stage_types.h +++ b/src/mongo/db/query/stage_types.h @@ -102,6 +102,7 @@ enum StageType { STAGE_QUEUED_DATA, STAGE_RECORD_STORE_FAST_COUNT, STAGE_RETURN_KEY, + STAGE_SAMPLE_FROM_TIMESERIES_BUCKET, STAGE_SHARDING_FILTER, STAGE_SKIP, @@ -121,6 +122,8 @@ enum StageType { STAGE_UNKNOWN, + STAGE_UNPACK_TIMESERIES_BUCKET, + STAGE_UPDATE, }; |