diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.h')
-rw-r--r-- | src/mongo/db/pipeline/document_source_internal_unpack_bucket.h | 197 |
1 files changed, 5 insertions, 192 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h index c5133643874..051cc2161ca 100644 --- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h +++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h @@ -31,160 +31,11 @@ #include <set> +#include "mongo/db/exec/bucket_unpacker.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_match.h" namespace mongo { - -/** - * Carries parameters for unpacking a bucket. - */ -struct BucketSpec { - // The user-supplied timestamp field name specified during time-series collection creation. - std::string timeField; - - // An optional user-supplied metadata field name specified during time-series collection - // creation. This field name is used during materialization of metadata fields of a measurement - // after unpacking. - boost::optional<std::string> metaField; - - // The set of field names in the data region that should be included or excluded. - std::set<std::string> fieldSet; - - // Vector of computed meta field projection names. Added at the end of materialized - // measurements. - std::vector<std::string> computedMetaProjFields; -}; - - -/** - * BucketUnpacker will unpack bucket fields for metadata and the provided fields. - */ -class BucketUnpacker { -public: - // A table that is useful for interpolations between the number of measurements in a bucket and - // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i, - // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the - // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are - // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99], - // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to - // exceed the server BSON object limit of 16 MB. - static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{ - {{0, BSONObj::kMinBSONLength}, - {10, 115}, - {100, 1195}, - {1000, 12895}, - {10000, 138895}, - {100000, 1488895}, - {1000000, 15888895}, - {10000000, 168888895}}}; - - /** - * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series - * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time. - */ - static int computeMeasurementCount(int targetTimestampObjSize); - - // Set of field names reserved for time-series buckets. - static const std::set<StringData> reservedBucketFieldNames; - - // When BucketUnpacker is created with kInclude it must produce measurements that contain the - // set of fields. Otherwise, if the kExclude option is used, the measurements will include the - // set difference between all fields in the bucket and the provided fields. - enum class Behavior { kInclude, kExclude }; - - BucketUnpacker(BucketSpec spec, - Behavior unpackerBehavior, - bool includeTimeField, - bool includeMetaField) - : _spec(std::move(spec)), - _unpackerBehavior(unpackerBehavior), - _includeTimeField(includeTimeField), - _includeMetaField(includeMetaField) {} - - /** - * This method will continue to materialize Documents until the bucket is exhausted. A - * precondition of this method is that 'hasNext()' must be true. - */ - Document getNext(); - - /** - * This method will extract the j-th measurement from the bucket. A precondition of this method - * is that j >= 0 && j <= the number of measurements within the underlying bucket. - */ - Document extractSingleMeasurement(int j); - - bool hasNext() const { - return _timeFieldIter && _timeFieldIter->more(); - } - - /** - * This resets the unpacker to prepare to unpack a new bucket described by the given document. - */ - void reset(BSONObj&& bucket); - - Behavior behavior() const { - return _unpackerBehavior; - } - - const BucketSpec& bucketSpec() const { - return _spec; - } - - const BSONObj& bucket() const { - return _bucket; - } - - bool includeMetaField() const { - return _includeMetaField; - } - - bool includeTimeField() const { - return _includeTimeField; - } - - int32_t numberOfMeasurements() const { - return _numberOfMeasurements; - } - - void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior); - - // Add computed meta projection names to the bucket specification. - void addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames); - -private: - BucketSpec _spec; - Behavior _unpackerBehavior; - - // Iterates the timestamp section of the bucket to drive the unpacking iteration. - boost::optional<BSONObjIterator> _timeFieldIter; - - // A flag used to mark that the timestamp value should be materialized in measurements. - bool _includeTimeField; - - // A flag used to mark that a bucket's metadata value should be materialized in measurements. - bool _includeMetaField; - - // The bucket being unpacked. - BSONObj _bucket; - - // Since the metadata value is the same across all materialized measurements we can cache the - // metadata BSONElement in the reset phase and use it to materialize the metadata in each - // measurement. - BSONElement _metaValue; - - // Iterators used to unpack the columns of the above bucket that are populated during the reset - // phase according to the provided 'Behavior' and 'BucketSpec'. - std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters; - - // Map <name, BSONElement> for the computed meta field projections. Updated for - // every bucket upon reset(). - stdx::unordered_map<std::string, BSONElement> _computedMetaProjections; - - // The number of measurements in the bucket. - int32_t _numberOfMeasurements = 0; -}; - class DocumentSourceInternalUnpackBucket : public DocumentSource { public: static constexpr StringData kStageName = "$_internalUnpackBucket"_sd; @@ -244,6 +95,10 @@ public: return boost::none; }; + BucketUnpacker bucketUnpacker() const { + return _bucketUnpacker; + } + Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) final; @@ -323,53 +178,11 @@ public: Pipeline::SourceContainer* container); private: - /** - * Carries the bucket _id and index for the measurement that was sampled by - * 'sampleRandomBucketOptimized'. - */ - struct SampledMeasurementKey { - SampledMeasurementKey(OID bucketId, int64_t measurementIndex) - : bucketId(bucketId), measurementIndex(measurementIndex) {} - - bool operator==(const SampledMeasurementKey& key) const { - return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex; - } - - OID bucketId; - int32_t measurementIndex; - }; - - /** - * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can - * be kept track of for de-duplication after sampling. - */ - struct SampledMeasurementKeyHasher { - size_t operator()(const SampledMeasurementKey& s) const { - return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^ - absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^ - absl::Hash<int32_t>{}(s.measurementIndex); - } - }; - - // Tracks which measurements have been seen so far. This is only used when sampling is enabled - // for the purpose of de-duplicating measurements. - using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>; - GetNextResult doGetNext() final; - /** - * Keeps trying to sample a unique measurement by using the optimized ARHASH algorithm up to a - * hardcoded maximum number of attempts. If a unique measurement isn't found before the maximum - * number of tries is exhausted this method will throw. - */ - GetNextResult sampleUniqueMeasurementFromBuckets(); - BucketUnpacker _bucketUnpacker; - long long _nSampledSoFar = 0; int _bucketMaxCount = 0; boost::optional<long long> _sampleSize; - - SeenSet _seenSet; }; } // namespace mongo |