diff options
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog.h')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 73 |
1 files changed, 69 insertions, 4 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 2df33182d31..c2a82039ee8 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -67,9 +67,14 @@ class BucketCatalog { AtomicWord<long long> numBucketsClosedDueToTimeForward; AtomicWord<long long> numBucketsClosedDueToTimeBackward; AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; + AtomicWord<long long> numBucketsArchivedDueToTimeForward; + AtomicWord<long long> numBucketsArchivedDueToTimeBackward; + AtomicWord<long long> numBucketsArchivedDueToMemoryThreshold; AtomicWord<long long> numCommits; AtomicWord<long long> numWaits; AtomicWord<long long> numMeasurementsCommitted; + AtomicWord<long long> numBucketsReopened; + AtomicWord<long long> numBucketsKeptOpenDueToLargeMeasurements; }; class ExecutionStatsController { @@ -87,9 +92,14 @@ class BucketCatalog { void incNumBucketsClosedDueToTimeForward(long long increment = 1); void incNumBucketsClosedDueToTimeBackward(long long increment = 1); void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1); + void incNumBucketsArchivedDueToTimeForward(long long increment = 1); + void incNumBucketsArchivedDueToTimeBackward(long long increment = 1); + void incNumBucketsArchivedDueToMemoryThreshold(long long increment = 1); void incNumCommits(long long increment = 1); void incNumWaits(long long increment = 1); void incNumMeasurementsCommitted(long long increment = 1); + void incNumBucketsReopened(long long increment = 1); + void incNumBucketsKeptOpenDueToLargeMeasurements(long long increment = 1); private: std::shared_ptr<ExecutionStats> _collectionStats; @@ -117,6 +127,7 @@ public: OID bucketId; std::string timeField; uint32_t numMeasurements; + bool eligibleForReopening = false; }; using ClosedBuckets = std::vector<ClosedBucket>; @@ -179,7 +190,7 @@ public: /** * Records a set of new-to-the-bucket fields. Active batches only. */ - void _recordNewFields(NewFieldNames&& fields); + void _recordNewFields(Bucket* bucket, NewFieldNames&& fields); /** * Prepares the batch for commit. Sets min/max appropriately, records the number of @@ -230,6 +241,13 @@ public: BucketCatalog operator=(const BucketCatalog&) = delete; /** + * Reopens a closed bucket into the catalog given the bucket document. + */ + Status reopenBucket(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& bucketDoc); + + /** * Returns the metadata for the given bucket in the following format: * {<metadata field name>: <value>} * All measurements in the given bucket share same metadata value. @@ -354,12 +372,14 @@ private: * Key to lookup open Bucket for namespace and metadata, with pre-computed hash. */ struct BucketKey { + using Hash = std::size_t; + BucketKey() = delete; BucketKey(const NamespaceString& nss, const BucketMetadata& meta); NamespaceString ns; BucketMetadata metadata; - std::size_t hash; + Hash hash; bool operator==(const BucketKey& other) const { return ns == other.ns && metadata == other.metadata; @@ -379,6 +399,23 @@ private: }; /** + * Hasher to support using a pre-computed hash as a key without having to compute another hash. + */ + struct PreHashed { + std::size_t operator()(const BucketKey::Hash& key) const; + }; + + /** + * Information of a Bucket that got archived while performing an operation on this + * BucketCatalog. + */ + struct ArchivedBucket { + OID bucketId; + std::string timeField; + uint32_t numMeasurements; + }; + + /** * Struct to hold a portion of the buckets managed by the catalog. * * Each of the bucket lists, as well as the buckets themselves, are protected by 'mutex'. @@ -397,6 +434,12 @@ private: // Buckets that do not have any outstanding writes. using IdleList = std::list<Bucket*>; IdleList idleBuckets; + + // Buckets that are not currently in the catalog, but which are eligible to receive more + // measurements. The top-level map is keyed by the hash of the BucketKey, while the stored + // map is keyed by the bucket's minimum timestamp. + stdx::unordered_map<BucketKey::Hash, std::map<Date_t, ArchivedBucket>, PreHashed> + archivedBuckets; }; StripeNumber _getStripeNumber(const BucketKey& key); @@ -444,7 +487,13 @@ private: /** * Removes the given bucket from the bucket catalog's internal data structures. */ - bool _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket); + void _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket, bool archiving); + + /** + * Archives the given bucket, minimizing the memory footprint but retaining the necessary + * information required to efficiently identify it as a candidate for future insertions. + */ + void _archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket); /** * Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other @@ -492,6 +541,11 @@ private: Bucket* _allocateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info); /** + * Mode enum to determine the rollover type decision for a given bucket. + */ + enum class RolloverAction { kNone, kArchive, kClose }; + + /** * Close the existing, full bucket and open a new one for the same metadata. * * Writes information about the closed bucket to the 'info' parameter. @@ -499,7 +553,8 @@ private: Bucket* _rollover(Stripe* stripe, WithLock stripeLock, Bucket* bucket, - const CreationInfo& info); + const CreationInfo& info, + RolloverAction action); ExecutionStatsController _getExecutionStats(const NamespaceString& ns); std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const; @@ -531,6 +586,16 @@ private: */ boost::optional<BucketState> _setBucketState(const OID& id, BucketState target); + /** + * Calculates the marginal memory usage for an archived bucket. The + * 'onlyEntryForMatchingMetaHash' parameter indicates that the bucket will be (if inserting) + * or was (if removing) the only bucket associated with it's meta hash value. If true, then + * the returned value will attempt to account for the overhead of the map data structure for + * the meta hash value. + */ + static long long _marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, + bool onlyEntryForMatchingMetaHash); + static constexpr std::size_t kNumberOfStripes = 32; std::array<Stripe, kNumberOfStripes> _stripes; |