summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries/bucket_catalog.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog.h')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h132
1 files changed, 117 insertions, 15 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 87687161b79..09164a166a9 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -51,6 +51,7 @@ public:
const OID* operator->() const;
bool operator==(const BucketId& other) const;
+ bool operator!=(const BucketId& other) const;
bool operator<(const BucketId& other) const;
template <typename H>
@@ -227,6 +228,9 @@ private:
};
struct Bucket {
+ // Access to the bucket is controlled by this lock
+ Mutex lock;
+
// The namespace that this bucket is used for.
NamespaceString ns;
@@ -297,19 +301,20 @@ private:
};
struct ExecutionStats {
- long long numBucketInserts = 0;
- long long numBucketUpdates = 0;
- long long numBucketsOpenedDueToMetadata = 0;
- long long numBucketsClosedDueToCount = 0;
- long long numBucketsClosedDueToSize = 0;
- long long numBucketsClosedDueToTimeForward = 0;
- long long numBucketsClosedDueToTimeBackward = 0;
- long long numBucketsClosedDueToMemoryThreshold = 0;
- long long numCommits = 0;
- long long numWaits = 0;
- long long numMeasurementsCommitted = 0;
+ AtomicWord<long long> numBucketInserts;
+ AtomicWord<long long> numBucketUpdates;
+ AtomicWord<long long> numBucketsOpenedDueToMetadata;
+ AtomicWord<long long> numBucketsClosedDueToCount;
+ AtomicWord<long long> numBucketsClosedDueToSize;
+ AtomicWord<long long> numBucketsClosedDueToTimeForward;
+ AtomicWord<long long> numBucketsClosedDueToTimeBackward;
+ AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
+ AtomicWord<long long> numCommits;
+ AtomicWord<long long> numWaits;
+ AtomicWord<long long> numMeasurementsCommitted;
};
+ // a wrapper around the numerical id to allow timestamp manipulations
class BucketIdInternal : public BucketId {
public:
static BucketIdInternal min();
@@ -320,11 +325,68 @@ private:
void setTime(const Date_t& time);
};
+ /**
+ * Helper class to handle all the locking necessary to lookup and lock a bucket for use. This
+ * is intended primarily for using a single bucket, including replacing it when it becomes full.
+ * If the usage pattern iterates over several buckets, you will instead want to use raw access
+ * using the different mutexes with the locking semantics described below.
+ */
+ class BucketAccess {
+ public:
+ BucketAccess() = delete;
+ BucketAccess(BucketCatalog* catalog,
+ const std::tuple<NamespaceString, BucketMetadata>& key,
+ ExecutionStats* stats,
+ const Date_t& time);
+ BucketAccess(BucketCatalog* catalog, const BucketId& bucketId);
+ ~BucketAccess();
+
+ bool isLocked() const;
+ Bucket* operator->();
+ operator bool() const;
+
+ // release the bucket lock, typically in order to reacquire the catalog lock
+ void release();
+
+ /**
+ * Close the existing, full bucket and open a new one for the same metadata.
+ * Parameter is a function which should check that the bucket is indeed still full after
+ * reacquiring the necessary locks. The first parameter will give the function access to
+ * this BucketAccess instance, with the bucket locked. The second parameter may provide
+ * the precomputed key hash for the _bucketIds map (or 0, if is hasn't been computed yet).
+ */
+ void rollover(const std::function<bool(BucketAccess*, std::size_t)>& isBucketFull);
+
+ // retrieve the (safely cached) id of the bucket
+ const BucketIdInternal& id();
+
+ /**
+ * Adjust the time associated with the bucket (id) if it hasn't been committed yet. The
+ * hash parameter is the precomputed hash corresponding to the bucket's key for lookup in
+ * the _bucketIds map (to save computation).
+ */
+ void setTime(std::size_t hash);
+
+ private:
+ void _acquire();
+
+ BucketCatalog* _catalog;
+ const std::tuple<NamespaceString, BucketMetadata>* _key;
+ ExecutionStats* _stats;
+ const Date_t* _time;
+
+ BucketIdInternal _id;
+ std::shared_ptr<Bucket> _bucket;
+ stdx::unique_lock<Mutex> _guard;
+ };
+
class ServerStatus;
using NsBuckets = std::set<std::tuple<NamespaceString, BucketId>>;
using IdleBuckets = std::set<BucketId>;
+ stdx::unique_lock<Mutex> _lock() const;
+
/**
* Removes the given bucket from the bucket catalog's internal data structures.
*/
@@ -332,15 +394,42 @@ private:
boost::optional<NsBuckets::iterator> nsBucketsIt = boost::none,
boost::optional<IdleBuckets::iterator> idleBucketsIt = boost::none);
+ void _markBucketIdle(const BucketId& bucketId);
+ void _markBucketNotIdle(const BucketId& bucketId);
+ void _markBucketNotIdle(const IdleBuckets::iterator& it);
+
/**
* Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold.
*/
void _expireIdleBuckets(ExecutionStats* stats);
- mutable Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog");
+ std::size_t _numberOfIdleBuckets() const;
+
+ /**
+ * Creates a new (internal) bucket ID to identify a bucket.
+ */
+ BucketIdInternal _createNewBucketId(const Date_t& time, ExecutionStats* stats);
+
+ std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns);
+ const std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const;
+
+ /**
+ * You must hold _mutex when accessing _buckets, _bucketIds, or _nsBuckets. While, holding a
+ * lock on _mutex, you can take a lock on an individual bucket, then release _mutex. Any
+ * iterators on the protected structures should be considered invalid once the lock is released.
+ * Any subsequent access to the structures requires relocking _mutex. You must *not* be holding
+ * a lock on a bucket when you attempt to acquire the lock on _mutex, as this can result in
+ * deadlock.
+ *
+ * Typically, if you want to acquire a bucket, you should use the BucketAccess RAII
+ * class to do so, as it will take care of most of this logic for you. Only use the _mutex
+ * directly for more global maintenance where you want to take the lock once and interact with
+ * multiple buckets atomically.
+ */
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog::_mutex");
// All buckets currently in the catalog, including buckets which are full but not yet committed.
- stdx::unordered_map<BucketId, Bucket> _buckets;
+ stdx::unordered_map<BucketId, std::shared_ptr<Bucket>> _buckets;
// The _id of the current bucket for each namespace and metadata pair.
stdx::unordered_map<std::tuple<NamespaceString, BucketMetadata>, BucketIdInternal> _bucketIds;
@@ -348,16 +437,29 @@ private:
// All buckets ordered by their namespaces.
NsBuckets _nsBuckets;
+ // This mutex protects access to _idleBuckets
+ mutable Mutex _idleBucketsLock = MONGO_MAKE_LATCH("BucketCatalog::_idleBucketsLock");
+
// Buckets that do not have any writers.
IdleBuckets _idleBuckets;
+ /**
+ * This mutex protects access to the _executionStats map. Once you complete your lookup, you
+ * can keep the shared_ptr to an individual namespace's stats object and release the lock. The
+ * object itself is thread-safe (atomics).
+ */
+ mutable Mutex _executionStatsLock = MONGO_MAKE_LATCH("BucketCatalog::_executionStatsLock");
+
// Per-collection execution stats.
- stdx::unordered_map<NamespaceString, ExecutionStats> _executionStats;
+ stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats;
+
+ // A placeholder to be returned in case a namespace has no allocated statistics object
+ static const std::shared_ptr<ExecutionStats> kEmptyStats;
// Counter for buckets created by the bucket catalog.
uint64_t _bucketNum = 0;
// Approximate memory usage of the bucket catalog.
- uint64_t _memoryUsage = 0;
+ AtomicWord<uint64_t> _memoryUsage;
};
} // namespace mongo