summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries/bucket_catalog.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog.h')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h73
1 files changed, 69 insertions, 4 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 2df33182d31..c2a82039ee8 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -67,9 +67,14 @@ class BucketCatalog {
AtomicWord<long long> numBucketsClosedDueToTimeForward;
AtomicWord<long long> numBucketsClosedDueToTimeBackward;
AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
+ AtomicWord<long long> numBucketsArchivedDueToTimeForward;
+ AtomicWord<long long> numBucketsArchivedDueToTimeBackward;
+ AtomicWord<long long> numBucketsArchivedDueToMemoryThreshold;
AtomicWord<long long> numCommits;
AtomicWord<long long> numWaits;
AtomicWord<long long> numMeasurementsCommitted;
+ AtomicWord<long long> numBucketsReopened;
+ AtomicWord<long long> numBucketsKeptOpenDueToLargeMeasurements;
};
class ExecutionStatsController {
@@ -87,9 +92,14 @@ class BucketCatalog {
void incNumBucketsClosedDueToTimeForward(long long increment = 1);
void incNumBucketsClosedDueToTimeBackward(long long increment = 1);
void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1);
+ void incNumBucketsArchivedDueToTimeForward(long long increment = 1);
+ void incNumBucketsArchivedDueToTimeBackward(long long increment = 1);
+ void incNumBucketsArchivedDueToMemoryThreshold(long long increment = 1);
void incNumCommits(long long increment = 1);
void incNumWaits(long long increment = 1);
void incNumMeasurementsCommitted(long long increment = 1);
+ void incNumBucketsReopened(long long increment = 1);
+ void incNumBucketsKeptOpenDueToLargeMeasurements(long long increment = 1);
private:
std::shared_ptr<ExecutionStats> _collectionStats;
@@ -117,6 +127,7 @@ public:
OID bucketId;
std::string timeField;
uint32_t numMeasurements;
+ bool eligibleForReopening = false;
};
using ClosedBuckets = std::vector<ClosedBucket>;
@@ -179,7 +190,7 @@ public:
/**
* Records a set of new-to-the-bucket fields. Active batches only.
*/
- void _recordNewFields(NewFieldNames&& fields);
+ void _recordNewFields(Bucket* bucket, NewFieldNames&& fields);
/**
* Prepares the batch for commit. Sets min/max appropriately, records the number of
@@ -230,6 +241,13 @@ public:
BucketCatalog operator=(const BucketCatalog&) = delete;
/**
+ * Reopens a closed bucket into the catalog given the bucket document.
+ */
+ Status reopenBucket(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const BSONObj& bucketDoc);
+
+ /**
* Returns the metadata for the given bucket in the following format:
* {<metadata field name>: <value>}
* All measurements in the given bucket share same metadata value.
@@ -354,12 +372,14 @@ private:
* Key to lookup open Bucket for namespace and metadata, with pre-computed hash.
*/
struct BucketKey {
+ using Hash = std::size_t;
+
BucketKey() = delete;
BucketKey(const NamespaceString& nss, const BucketMetadata& meta);
NamespaceString ns;
BucketMetadata metadata;
- std::size_t hash;
+ Hash hash;
bool operator==(const BucketKey& other) const {
return ns == other.ns && metadata == other.metadata;
@@ -379,6 +399,23 @@ private:
};
/**
+ * Hasher to support using a pre-computed hash as a key without having to compute another hash.
+ */
+ struct PreHashed {
+ std::size_t operator()(const BucketKey::Hash& key) const;
+ };
+
+ /**
+ * Information of a Bucket that got archived while performing an operation on this
+ * BucketCatalog.
+ */
+ struct ArchivedBucket {
+ OID bucketId;
+ std::string timeField;
+ uint32_t numMeasurements;
+ };
+
+ /**
* Struct to hold a portion of the buckets managed by the catalog.
*
* Each of the bucket lists, as well as the buckets themselves, are protected by 'mutex'.
@@ -397,6 +434,12 @@ private:
// Buckets that do not have any outstanding writes.
using IdleList = std::list<Bucket*>;
IdleList idleBuckets;
+
+ // Buckets that are not currently in the catalog, but which are eligible to receive more
+ // measurements. The top-level map is keyed by the hash of the BucketKey, while the stored
+ // map is keyed by the bucket's minimum timestamp.
+ stdx::unordered_map<BucketKey::Hash, std::map<Date_t, ArchivedBucket>, PreHashed>
+ archivedBuckets;
};
StripeNumber _getStripeNumber(const BucketKey& key);
@@ -444,7 +487,13 @@ private:
/**
* Removes the given bucket from the bucket catalog's internal data structures.
*/
- bool _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket);
+ void _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket, bool archiving);
+
+ /**
+ * Archives the given bucket, minimizing the memory footprint but retaining the necessary
+ * information required to efficiently identify it as a candidate for future insertions.
+ */
+ void _archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket);
/**
* Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other
@@ -492,6 +541,11 @@ private:
Bucket* _allocateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info);
/**
+ * Mode enum to determine the rollover type decision for a given bucket.
+ */
+ enum class RolloverAction { kNone, kArchive, kClose };
+
+ /**
* Close the existing, full bucket and open a new one for the same metadata.
*
* Writes information about the closed bucket to the 'info' parameter.
@@ -499,7 +553,8 @@ private:
Bucket* _rollover(Stripe* stripe,
WithLock stripeLock,
Bucket* bucket,
- const CreationInfo& info);
+ const CreationInfo& info,
+ RolloverAction action);
ExecutionStatsController _getExecutionStats(const NamespaceString& ns);
std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const;
@@ -531,6 +586,16 @@ private:
*/
boost::optional<BucketState> _setBucketState(const OID& id, BucketState target);
+ /**
+ * Calculates the marginal memory usage for an archived bucket. The
+ * 'onlyEntryForMatchingMetaHash' parameter indicates that the bucket will be (if inserting)
+ * or was (if removing) the only bucket associated with it's meta hash value. If true, then
+ * the returned value will attempt to account for the overhead of the map data structure for
+ * the meta hash value.
+ */
+ static long long _marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket,
+ bool onlyEntryForMatchingMetaHash);
+
static constexpr std::size_t kNumberOfStripes = 32;
std::array<Stripe, kNumberOfStripes> _stripes;