diff options
author | Gregory Wlodarek <gregory.wlodarek@mongodb.com> | 2022-05-06 17:34:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-06 17:57:36 +0000 |
commit | 8adb070dcb43842e61da3e15415ad4e132d1655a (patch) | |
tree | 2dfc4215e9c9d4c1d3ec4942ffef613482295fc3 | |
parent | 2d2290fe818dfad7ad08d149ae556bb12cc1c0f6 (diff) | |
download | mongo-8adb070dcb43842e61da3e15415ad4e132d1655a.tar.gz |
SERVER-62102 Append aggregated BucketCatalog execution stats for all namespaces to ServerStatus
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 145 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 62 |
2 files changed, 157 insertions, 50 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index f2710fff960..5090e358141 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -205,20 +205,70 @@ Status getTimeseriesBucketClearedError(const OID& bucketId, } } // namespace -struct BucketCatalog::ExecutionStats { - AtomicWord<long long> numBucketInserts; - AtomicWord<long long> numBucketUpdates; - AtomicWord<long long> numBucketsOpenedDueToMetadata; - AtomicWord<long long> numBucketsClosedDueToCount; - AtomicWord<long long> numBucketsClosedDueToSchemaChange; - AtomicWord<long long> numBucketsClosedDueToSize; - AtomicWord<long long> numBucketsClosedDueToTimeForward; - AtomicWord<long long> numBucketsClosedDueToTimeBackward; - AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; - AtomicWord<long long> numCommits; - AtomicWord<long long> numWaits; - AtomicWord<long long> numMeasurementsCommitted; -}; +void BucketCatalog::ExecutionStatsController::incNumBucketInserts(long long increment) { + _collectionStats->numBucketInserts.fetchAndAddRelaxed(increment); + _globalStats->numBucketInserts.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketUpdates(long long increment) { + _collectionStats->numBucketUpdates.fetchAndAddRelaxed(increment); + _globalStats->numBucketUpdates.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsOpenedDueToMetadata( + long long increment) { + _collectionStats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(increment); + _globalStats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToCount(long long increment) { + _collectionStats->numBucketsClosedDueToCount.fetchAndAddRelaxed(increment); + _globalStats->numBucketsClosedDueToCount.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToSchemaChange( + long long increment) { + _collectionStats->numBucketsClosedDueToSchemaChange.fetchAndAddRelaxed(increment); + _globalStats->numBucketsClosedDueToSchemaChange.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToSize(long long increment) { + _collectionStats->numBucketsClosedDueToSize.fetchAndAddRelaxed(increment); + _globalStats->numBucketsClosedDueToSize.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToTimeForward( + long long increment) { + _collectionStats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(increment); + _globalStats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToTimeBackward( + long long increment) { + _collectionStats->numBucketsClosedDueToTimeBackward.fetchAndAddRelaxed(increment); + _globalStats->numBucketsClosedDueToTimeBackward.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToMemoryThreshold( + long long increment) { + _collectionStats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(increment); + _globalStats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumCommits(long long increment) { + _collectionStats->numCommits.fetchAndAddRelaxed(increment); + _globalStats->numCommits.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumWaits(long long increment) { + _collectionStats->numWaits.fetchAndAddRelaxed(increment); + _globalStats->numWaits.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumMeasurementsCommitted(long long increment) { + _collectionStats->numMeasurementsCommitted.fetchAndAddRelaxed(increment); + _globalStats->numMeasurementsCommitted.fetchAndAddRelaxed(increment); +} class BucketCatalog::Bucket { public: @@ -338,8 +388,7 @@ private: /** * Return a pointer to the current, open batch. */ - std::shared_ptr<WriteBatch> _activeBatch(OperationId opId, - const std::shared_ptr<ExecutionStats>& stats) { + std::shared_ptr<WriteBatch> _activeBatch(OperationId opId, ExecutionStatsController& stats) { auto it = _batches.find(opId); if (it == _batches.end()) { it = @@ -422,23 +471,23 @@ struct BucketCatalog::CreationInfo { StripeNumber stripe; const Date_t& time; const TimeseriesOptions& options; - ExecutionStats* stats; + ExecutionStatsController& stats; ClosedBuckets* closedBuckets; bool openedDuetoMetadata = true; }; BucketCatalog::WriteBatch::WriteBatch(const BucketHandle& bucket, OperationId opId, - const std::shared_ptr<ExecutionStats>& stats) - : _bucket{bucket}, _opId(opId), _stats{stats} {} + ExecutionStatsController& stats) + : _bucket{bucket}, _opId(opId), _stats(stats) {} bool BucketCatalog::WriteBatch::claimCommitRights() { return !_commitRights.swap(true); } -StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() const { +StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() { if (!_promise.getFuture().isReady()) { - _stats->numWaits.fetchAndAddRelaxed(1); + _stats.incNumWaits(); } return _promise.getFuture().getNoThrow(); } @@ -579,8 +628,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( } auto time = timeElem.Date(); - auto stats = _getExecutionStats(ns); - invariant(stats); + ExecutionStatsController stats = _getExecutionStats(ns); BSONElement metadata; auto metaFieldName = options.getMetaField(); @@ -594,7 +642,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( auto stripeNumber = _getStripeNumber(key); ClosedBuckets closedBuckets; - CreationInfo info{key, stripeNumber, time, options, stats.get(), &closedBuckets}; + CreationInfo info{key, stripeNumber, time, options, stats, &closedBuckets}; auto& stripe = _stripes[stripeNumber]; stdx::lock_guard stripeLock{stripe.mutex}; @@ -613,24 +661,24 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( auto shouldCloseBucket = [&](Bucket* bucket) -> bool { if (bucket->schemaIncompatible(doc, metaFieldName, comparator)) { - stats->numBucketsClosedDueToSchemaChange.fetchAndAddRelaxed(1); + stats.incNumBucketsClosedDueToSchemaChange(); return true; } if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { - stats->numBucketsClosedDueToCount.fetchAndAddRelaxed(1); + stats.incNumBucketsClosedDueToCount(); return true; } if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { - stats->numBucketsClosedDueToSize.fetchAndAddRelaxed(1); + stats.incNumBucketsClosedDueToSize(); return true; } auto bucketTime = bucket->getTime(); if (time - bucketTime >= Seconds(*options.getBucketMaxSpanSeconds())) { - stats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(1); + stats.incNumBucketsClosedDueToTimeForward(); return true; } if (time < bucketTime) { - stats->numBucketsClosedDueToTimeBackward.fetchAndAddRelaxed(1); + stats.incNumBucketsClosedDueToTimeBackward(); return true; } return false; @@ -727,14 +775,14 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( } auto& stats = batch->_stats; - stats->numCommits.fetchAndAddRelaxed(1); + stats.incNumCommits(); if (batch->numPreviouslyCommittedMeasurements() == 0) { - stats->numBucketInserts.fetchAndAddRelaxed(1); + stats.incNumBucketInserts(); } else { - stats->numBucketUpdates.fetchAndAddRelaxed(1); + stats.incNumBucketUpdates(); } - stats->numMeasurementsCommitted.fetchAndAddRelaxed(batch->measurements().size()); + stats.incNumMeasurementsCommitted(batch->measurements().size()); if (bucket) { bucket->_numCommittedMeasurements += batch->measurements().size(); } @@ -836,9 +884,8 @@ void BucketCatalog::clear(StringData dbName) { clear([&dbName](const NamespaceString& bucketNs) { return bucketNs.db() == dbName; }); } -void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const { - const auto stats = _getExecutionStats(ns); - +void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, + BSONObjBuilder* builder) const { builder->appendNumber("numBucketInserts", stats->numBucketInserts.load()); builder->appendNumber("numBucketUpdates", stats->numBucketUpdates.load()); builder->appendNumber("numBucketsOpenedDueToMetadata", @@ -863,6 +910,16 @@ void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuild } } + +void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const { + const std::shared_ptr<ExecutionStats> stats = _getExecutionStats(ns); + _appendExecutionStatsToBuilder(stats.get(), builder); +} + +void BucketCatalog::appendGlobalExecutionStats(BSONObjBuilder* builder) const { + _appendExecutionStatsToBuilder(&_globalExecutionStats, builder); +} + BucketCatalog::BucketMetadata::BucketMetadata(BSONElement elem, const StringData::ComparatorInterface* comparator) : _metadataElement(elem), _comparator(comparator) { @@ -1078,7 +1135,7 @@ void BucketCatalog::_markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Buck void BucketCatalog::_expireIdleBuckets(Stripe* stripe, WithLock stripeLock, - ExecutionStats* stats, + ExecutionStatsController& stats, std::vector<BucketCatalog::ClosedBucket>* closedBuckets) { // As long as we still need space and have entries and remaining attempts, close idle buckets. int32_t numClosed = 0; @@ -1090,7 +1147,7 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe, bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}; if (_removeBucket(stripe, stripeLock, bucket)) { - stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1); + stats.incNumBucketsClosedDueToMemoryThreshold(); closedBuckets->push_back(closed); ++numClosed; } @@ -1112,7 +1169,7 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, _initializeBucketState(bucketId); if (info.openedDuetoMetadata) { - info.stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1); + info.stats.incNumBucketsOpenedDueToMetadata(); } bucket->_timeField = info.options.getTimeField().toString(); @@ -1147,16 +1204,16 @@ BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe, return _allocateBucket(stripe, stripeLock, info); } -std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats( +BucketCatalog::ExecutionStatsController BucketCatalog::_getExecutionStats( const NamespaceString& ns) { stdx::lock_guard catalogLock{_mutex}; auto it = _executionStats.find(ns); if (it != _executionStats.end()) { - return it->second; + return {it->second, &_globalExecutionStats}; } auto res = _executionStats.emplace(ns, std::make_shared<ExecutionStats>()); - return res.first->second; + return {res.first->second, &_globalExecutionStats}; } const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats( @@ -1276,6 +1333,10 @@ public: builder.appendNumber("numIdleBuckets", static_cast<long long>(counts.idle)); builder.appendNumber("memoryUsage", static_cast<long long>(bucketCatalog._memoryUsage.load())); + + // Append the global execution stats for all namespaces. + bucketCatalog.appendGlobalExecutionStats(&builder); + return builder.obj(); } } bucketCatalogServerStatus; diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 9807cdea1a1..2a3af67ec68 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -57,7 +57,45 @@ class BucketCatalog { const StripeNumber stripe; }; - struct ExecutionStats; + struct ExecutionStats { + AtomicWord<long long> numBucketInserts; + AtomicWord<long long> numBucketUpdates; + AtomicWord<long long> numBucketsOpenedDueToMetadata; + AtomicWord<long long> numBucketsClosedDueToCount; + AtomicWord<long long> numBucketsClosedDueToSchemaChange; + AtomicWord<long long> numBucketsClosedDueToSize; + AtomicWord<long long> numBucketsClosedDueToTimeForward; + AtomicWord<long long> numBucketsClosedDueToTimeBackward; + AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; + AtomicWord<long long> numCommits; + AtomicWord<long long> numWaits; + AtomicWord<long long> numMeasurementsCommitted; + }; + + class ExecutionStatsController { + public: + ExecutionStatsController(const std::shared_ptr<ExecutionStats>& collectionStats, + ExecutionStats* globalStats) + : _collectionStats(collectionStats), _globalStats(globalStats) {} + + void incNumBucketInserts(long long increment = 1); + void incNumBucketUpdates(long long increment = 1); + void incNumBucketsOpenedDueToMetadata(long long increment = 1); + void incNumBucketsClosedDueToCount(long long increment = 1); + void incNumBucketsClosedDueToSchemaChange(long long increment = 1); + void incNumBucketsClosedDueToSize(long long increment = 1); + void incNumBucketsClosedDueToTimeForward(long long increment = 1); + void incNumBucketsClosedDueToTimeBackward(long long increment = 1); + void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1); + void incNumCommits(long long increment = 1); + void incNumWaits(long long increment = 1); + void incNumMeasurementsCommitted(long long increment = 1); + + private: + std::shared_ptr<ExecutionStats> _collectionStats; + ExecutionStats* _globalStats; + }; + class Bucket; struct CreationInfo; @@ -98,9 +136,7 @@ public: public: WriteBatch() = delete; - WriteBatch(const BucketHandle& bucketId, - OperationId opId, - const std::shared_ptr<ExecutionStats>& stats); + WriteBatch(const BucketHandle& bucketId, OperationId opId, ExecutionStatsController& stats); /** * Attempts to claim the right to commit a batch. If it returns true, rights are @@ -113,7 +149,7 @@ public: * Retrieves the result of the write batch commit. Should be called by any interested party * that does not have commit rights. Blocking. */ - StatusWith<CommitInfo> getResult() const; + StatusWith<CommitInfo> getResult(); /** * Returns a handle which can be used by the BucketCatalog internally to locate its record @@ -165,7 +201,7 @@ public: const BucketHandle _bucket; OperationId _opId; - std::shared_ptr<ExecutionStats> _stats; + ExecutionStatsController _stats; std::vector<BSONObj> _measurements; BSONObj _min; // Batch-local min; full if first batch, updates otherwise. @@ -263,6 +299,11 @@ public: */ void appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const; + /** + * Appends the global execution stats for all namespaces to the builder. + */ + void appendGlobalExecutionStats(BSONObjBuilder* builder) const; + private: enum class BucketState { // Bucket can be inserted into, and does not have an outstanding prepared commit @@ -442,7 +483,7 @@ private: */ void _expireIdleBuckets(Stripe* stripe, WithLock stripeLock, - ExecutionStats* stats, + ExecutionStatsController& stats, ClosedBuckets* closedBuckets); /** @@ -460,9 +501,11 @@ private: Bucket* bucket, const CreationInfo& info); - std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns); + ExecutionStatsController _getExecutionStats(const NamespaceString& ns); const std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const; + void _appendExecutionStatsToBuilder(const ExecutionStats* stats, BSONObjBuilder* builder) const; + /** * Retreives the bucket state if it is tracked in the catalog. */ @@ -502,6 +545,9 @@ private: // lock. The object itself is thread-safe (using atomics). stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats; + // Global execution stats used to report aggregated metrics in server status. + ExecutionStats _globalExecutionStats; + // Approximate memory usage of the bucket catalog. AtomicWord<uint64_t> _memoryUsage; |