summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_internal_unpack_bucket.h')
-rw-r--r--src/mongo/db/pipeline/document_source_internal_unpack_bucket.h197
1 files changed, 5 insertions, 192 deletions
diff --git a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
index c5133643874..051cc2161ca 100644
--- a/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
+++ b/src/mongo/db/pipeline/document_source_internal_unpack_bucket.h
@@ -31,160 +31,11 @@
#include <set>
+#include "mongo/db/exec/bucket_unpacker.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_match.h"
namespace mongo {
-
-/**
- * Carries parameters for unpacking a bucket.
- */
-struct BucketSpec {
- // The user-supplied timestamp field name specified during time-series collection creation.
- std::string timeField;
-
- // An optional user-supplied metadata field name specified during time-series collection
- // creation. This field name is used during materialization of metadata fields of a measurement
- // after unpacking.
- boost::optional<std::string> metaField;
-
- // The set of field names in the data region that should be included or excluded.
- std::set<std::string> fieldSet;
-
- // Vector of computed meta field projection names. Added at the end of materialized
- // measurements.
- std::vector<std::string> computedMetaProjFields;
-};
-
-
-/**
- * BucketUnpacker will unpack bucket fields for metadata and the provided fields.
- */
-class BucketUnpacker {
-public:
- // A table that is useful for interpolations between the number of measurements in a bucket and
- // the byte size of a bucket's data section timestamp column. Each table entry is a pair (b_i,
- // S_i), where b_i is the number of measurements in the bucket and S_i is the byte size of the
- // timestamp BSONObj. The table is bounded by 16 MB (2 << 23 bytes) where the table entries are
- // pairs of b_i and S_i for the lower bounds of the row key digit intervals [0, 9], [10, 99],
- // [100, 999], [1000, 9999] and so on. The last entry in the table, S7, is the first entry to
- // exceed the server BSON object limit of 16 MB.
- static constexpr std::array<std::pair<int32_t, int32_t>, 8> kTimestampObjSizeTable{
- {{0, BSONObj::kMinBSONLength},
- {10, 115},
- {100, 1195},
- {1000, 12895},
- {10000, 138895},
- {100000, 1488895},
- {1000000, 15888895},
- {10000000, 168888895}}};
-
- /**
- * Given the size of a BSONObj timestamp column, formatted as it would be in a time-series
- * system.buckets.X collection, returns the number of measurements in the bucket in O(1) time.
- */
- static int computeMeasurementCount(int targetTimestampObjSize);
-
- // Set of field names reserved for time-series buckets.
- static const std::set<StringData> reservedBucketFieldNames;
-
- // When BucketUnpacker is created with kInclude it must produce measurements that contain the
- // set of fields. Otherwise, if the kExclude option is used, the measurements will include the
- // set difference between all fields in the bucket and the provided fields.
- enum class Behavior { kInclude, kExclude };
-
- BucketUnpacker(BucketSpec spec,
- Behavior unpackerBehavior,
- bool includeTimeField,
- bool includeMetaField)
- : _spec(std::move(spec)),
- _unpackerBehavior(unpackerBehavior),
- _includeTimeField(includeTimeField),
- _includeMetaField(includeMetaField) {}
-
- /**
- * This method will continue to materialize Documents until the bucket is exhausted. A
- * precondition of this method is that 'hasNext()' must be true.
- */
- Document getNext();
-
- /**
- * This method will extract the j-th measurement from the bucket. A precondition of this method
- * is that j >= 0 && j <= the number of measurements within the underlying bucket.
- */
- Document extractSingleMeasurement(int j);
-
- bool hasNext() const {
- return _timeFieldIter && _timeFieldIter->more();
- }
-
- /**
- * This resets the unpacker to prepare to unpack a new bucket described by the given document.
- */
- void reset(BSONObj&& bucket);
-
- Behavior behavior() const {
- return _unpackerBehavior;
- }
-
- const BucketSpec& bucketSpec() const {
- return _spec;
- }
-
- const BSONObj& bucket() const {
- return _bucket;
- }
-
- bool includeMetaField() const {
- return _includeMetaField;
- }
-
- bool includeTimeField() const {
- return _includeTimeField;
- }
-
- int32_t numberOfMeasurements() const {
- return _numberOfMeasurements;
- }
-
- void setBucketSpecAndBehavior(BucketSpec&& bucketSpec, Behavior behavior);
-
- // Add computed meta projection names to the bucket specification.
- void addComputedMetaProjFields(const std::vector<StringData>& computedFieldNames);
-
-private:
- BucketSpec _spec;
- Behavior _unpackerBehavior;
-
- // Iterates the timestamp section of the bucket to drive the unpacking iteration.
- boost::optional<BSONObjIterator> _timeFieldIter;
-
- // A flag used to mark that the timestamp value should be materialized in measurements.
- bool _includeTimeField;
-
- // A flag used to mark that a bucket's metadata value should be materialized in measurements.
- bool _includeMetaField;
-
- // The bucket being unpacked.
- BSONObj _bucket;
-
- // Since the metadata value is the same across all materialized measurements we can cache the
- // metadata BSONElement in the reset phase and use it to materialize the metadata in each
- // measurement.
- BSONElement _metaValue;
-
- // Iterators used to unpack the columns of the above bucket that are populated during the reset
- // phase according to the provided 'Behavior' and 'BucketSpec'.
- std::vector<std::pair<std::string, BSONObjIterator>> _fieldIters;
-
- // Map <name, BSONElement> for the computed meta field projections. Updated for
- // every bucket upon reset().
- stdx::unordered_map<std::string, BSONElement> _computedMetaProjections;
-
- // The number of measurements in the bucket.
- int32_t _numberOfMeasurements = 0;
-};
-
class DocumentSourceInternalUnpackBucket : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalUnpackBucket"_sd;
@@ -244,6 +95,10 @@ public:
return boost::none;
};
+ BucketUnpacker bucketUnpacker() const {
+ return _bucketUnpacker;
+ }
+
Pipeline::SourceContainer::iterator doOptimizeAt(Pipeline::SourceContainer::iterator itr,
Pipeline::SourceContainer* container) final;
@@ -323,53 +178,11 @@ public:
Pipeline::SourceContainer* container);
private:
- /**
- * Carries the bucket _id and index for the measurement that was sampled by
- * 'sampleRandomBucketOptimized'.
- */
- struct SampledMeasurementKey {
- SampledMeasurementKey(OID bucketId, int64_t measurementIndex)
- : bucketId(bucketId), measurementIndex(measurementIndex) {}
-
- bool operator==(const SampledMeasurementKey& key) const {
- return this->bucketId == key.bucketId && this->measurementIndex == key.measurementIndex;
- }
-
- OID bucketId;
- int32_t measurementIndex;
- };
-
- /**
- * Computes a hash of 'SampledMeasurementKey' so measurements that have already been seen can
- * be kept track of for de-duplication after sampling.
- */
- struct SampledMeasurementKeyHasher {
- size_t operator()(const SampledMeasurementKey& s) const {
- return absl::Hash<uint64_t>{}(s.bucketId.view().read<uint64_t>()) ^
- absl::Hash<uint32_t>{}(s.bucketId.view().read<uint32_t>(8)) ^
- absl::Hash<int32_t>{}(s.measurementIndex);
- }
- };
-
- // Tracks which measurements have been seen so far. This is only used when sampling is enabled
- // for the purpose of de-duplicating measurements.
- using SeenSet = stdx::unordered_set<SampledMeasurementKey, SampledMeasurementKeyHasher>;
-
GetNextResult doGetNext() final;
- /**
- * Keeps trying to sample a unique measurement by using the optimized ARHASH algorithm up to a
- * hardcoded maximum number of attempts. If a unique measurement isn't found before the maximum
- * number of tries is exhausted this method will throw.
- */
- GetNextResult sampleUniqueMeasurementFromBuckets();
-
BucketUnpacker _bucketUnpacker;
- long long _nSampledSoFar = 0;
int _bucketMaxCount = 0;
boost::optional<long long> _sampleSize;
-
- SeenSet _seenSet;
};
} // namespace mongo