diff options
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog.h')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 132 |
1 files changed, 117 insertions, 15 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 87687161b79..09164a166a9 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -51,6 +51,7 @@ public: const OID* operator->() const; bool operator==(const BucketId& other) const; + bool operator!=(const BucketId& other) const; bool operator<(const BucketId& other) const; template <typename H> @@ -227,6 +228,9 @@ private: }; struct Bucket { + // Access to the bucket is controlled by this lock + Mutex lock; + // The namespace that this bucket is used for. NamespaceString ns; @@ -297,19 +301,20 @@ private: }; struct ExecutionStats { - long long numBucketInserts = 0; - long long numBucketUpdates = 0; - long long numBucketsOpenedDueToMetadata = 0; - long long numBucketsClosedDueToCount = 0; - long long numBucketsClosedDueToSize = 0; - long long numBucketsClosedDueToTimeForward = 0; - long long numBucketsClosedDueToTimeBackward = 0; - long long numBucketsClosedDueToMemoryThreshold = 0; - long long numCommits = 0; - long long numWaits = 0; - long long numMeasurementsCommitted = 0; + AtomicWord<long long> numBucketInserts; + AtomicWord<long long> numBucketUpdates; + AtomicWord<long long> numBucketsOpenedDueToMetadata; + AtomicWord<long long> numBucketsClosedDueToCount; + AtomicWord<long long> numBucketsClosedDueToSize; + AtomicWord<long long> numBucketsClosedDueToTimeForward; + AtomicWord<long long> numBucketsClosedDueToTimeBackward; + AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; + AtomicWord<long long> numCommits; + AtomicWord<long long> numWaits; + AtomicWord<long long> numMeasurementsCommitted; }; + // a wrapper around the numerical id to allow timestamp manipulations class BucketIdInternal : public BucketId { public: static BucketIdInternal min(); @@ -320,11 +325,68 @@ private: void setTime(const Date_t& time); }; + /** + * Helper class to handle all the locking necessary to lookup and lock a bucket for use. This + * is intended primarily for using a single bucket, including replacing it when it becomes full. + * If the usage pattern iterates over several buckets, you will instead want to use raw access + * using the different mutexes with the locking semantics described below. + */ + class BucketAccess { + public: + BucketAccess() = delete; + BucketAccess(BucketCatalog* catalog, + const std::tuple<NamespaceString, BucketMetadata>& key, + ExecutionStats* stats, + const Date_t& time); + BucketAccess(BucketCatalog* catalog, const BucketId& bucketId); + ~BucketAccess(); + + bool isLocked() const; + Bucket* operator->(); + operator bool() const; + + // release the bucket lock, typically in order to reacquire the catalog lock + void release(); + + /** + * Close the existing, full bucket and open a new one for the same metadata. + * Parameter is a function which should check that the bucket is indeed still full after + * reacquiring the necessary locks. The first parameter will give the function access to + * this BucketAccess instance, with the bucket locked. The second parameter may provide + * the precomputed key hash for the _bucketIds map (or 0, if is hasn't been computed yet). + */ + void rollover(const std::function<bool(BucketAccess*, std::size_t)>& isBucketFull); + + // retrieve the (safely cached) id of the bucket + const BucketIdInternal& id(); + + /** + * Adjust the time associated with the bucket (id) if it hasn't been committed yet. The + * hash parameter is the precomputed hash corresponding to the bucket's key for lookup in + * the _bucketIds map (to save computation). + */ + void setTime(std::size_t hash); + + private: + void _acquire(); + + BucketCatalog* _catalog; + const std::tuple<NamespaceString, BucketMetadata>* _key; + ExecutionStats* _stats; + const Date_t* _time; + + BucketIdInternal _id; + std::shared_ptr<Bucket> _bucket; + stdx::unique_lock<Mutex> _guard; + }; + class ServerStatus; using NsBuckets = std::set<std::tuple<NamespaceString, BucketId>>; using IdleBuckets = std::set<BucketId>; + stdx::unique_lock<Mutex> _lock() const; + /** * Removes the given bucket from the bucket catalog's internal data structures. */ @@ -332,15 +394,42 @@ private: boost::optional<NsBuckets::iterator> nsBucketsIt = boost::none, boost::optional<IdleBuckets::iterator> idleBucketsIt = boost::none); + void _markBucketIdle(const BucketId& bucketId); + void _markBucketNotIdle(const BucketId& bucketId); + void _markBucketNotIdle(const IdleBuckets::iterator& it); + /** * Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold. */ void _expireIdleBuckets(ExecutionStats* stats); - mutable Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog"); + std::size_t _numberOfIdleBuckets() const; + + /** + * Creates a new (internal) bucket ID to identify a bucket. + */ + BucketIdInternal _createNewBucketId(const Date_t& time, ExecutionStats* stats); + + std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns); + const std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const; + + /** + * You must hold _mutex when accessing _buckets, _bucketIds, or _nsBuckets. While, holding a + * lock on _mutex, you can take a lock on an individual bucket, then release _mutex. Any + * iterators on the protected structures should be considered invalid once the lock is released. + * Any subsequent access to the structures requires relocking _mutex. You must *not* be holding + * a lock on a bucket when you attempt to acquire the lock on _mutex, as this can result in + * deadlock. + * + * Typically, if you want to acquire a bucket, you should use the BucketAccess RAII + * class to do so, as it will take care of most of this logic for you. Only use the _mutex + * directly for more global maintenance where you want to take the lock once and interact with + * multiple buckets atomically. + */ + mutable Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog::_mutex"); // All buckets currently in the catalog, including buckets which are full but not yet committed. - stdx::unordered_map<BucketId, Bucket> _buckets; + stdx::unordered_map<BucketId, std::shared_ptr<Bucket>> _buckets; // The _id of the current bucket for each namespace and metadata pair. stdx::unordered_map<std::tuple<NamespaceString, BucketMetadata>, BucketIdInternal> _bucketIds; @@ -348,16 +437,29 @@ private: // All buckets ordered by their namespaces. NsBuckets _nsBuckets; + // This mutex protects access to _idleBuckets + mutable Mutex _idleBucketsLock = MONGO_MAKE_LATCH("BucketCatalog::_idleBucketsLock"); + // Buckets that do not have any writers. IdleBuckets _idleBuckets; + /** + * This mutex protects access to the _executionStats map. Once you complete your lookup, you + * can keep the shared_ptr to an individual namespace's stats object and release the lock. The + * object itself is thread-safe (atomics). + */ + mutable Mutex _executionStatsLock = MONGO_MAKE_LATCH("BucketCatalog::_executionStatsLock"); + // Per-collection execution stats. - stdx::unordered_map<NamespaceString, ExecutionStats> _executionStats; + stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats; + + // A placeholder to be returned in case a namespace has no allocated statistics object + static const std::shared_ptr<ExecutionStats> kEmptyStats; // Counter for buckets created by the bucket catalog. uint64_t _bucketNum = 0; // Approximate memory usage of the bucket catalog. - uint64_t _memoryUsage = 0; + AtomicWord<uint64_t> _memoryUsage; }; } // namespace mongo |