summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Wlodarek <gregory.wlodarek@mongodb.com>2022-05-06 17:34:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-06 17:57:36 +0000
commit8adb070dcb43842e61da3e15415ad4e132d1655a (patch)
tree2dfc4215e9c9d4c1d3ec4942ffef613482295fc3
parent2d2290fe818dfad7ad08d149ae556bb12cc1c0f6 (diff)
downloadmongo-8adb070dcb43842e61da3e15415ad4e132d1655a.tar.gz
SERVER-62102 Append aggregated BucketCatalog execution stats for all namespaces to ServerStatus
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp145
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h62
2 files changed, 157 insertions, 50 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index f2710fff960..5090e358141 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -205,20 +205,70 @@ Status getTimeseriesBucketClearedError(const OID& bucketId,
}
} // namespace
-struct BucketCatalog::ExecutionStats {
- AtomicWord<long long> numBucketInserts;
- AtomicWord<long long> numBucketUpdates;
- AtomicWord<long long> numBucketsOpenedDueToMetadata;
- AtomicWord<long long> numBucketsClosedDueToCount;
- AtomicWord<long long> numBucketsClosedDueToSchemaChange;
- AtomicWord<long long> numBucketsClosedDueToSize;
- AtomicWord<long long> numBucketsClosedDueToTimeForward;
- AtomicWord<long long> numBucketsClosedDueToTimeBackward;
- AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
- AtomicWord<long long> numCommits;
- AtomicWord<long long> numWaits;
- AtomicWord<long long> numMeasurementsCommitted;
-};
+void BucketCatalog::ExecutionStatsController::incNumBucketInserts(long long increment) {
+ _collectionStats->numBucketInserts.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketInserts.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketUpdates(long long increment) {
+ _collectionStats->numBucketUpdates.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketUpdates.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsOpenedDueToMetadata(
+ long long increment) {
+ _collectionStats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToCount(long long increment) {
+ _collectionStats->numBucketsClosedDueToCount.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsClosedDueToCount.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToSchemaChange(
+ long long increment) {
+ _collectionStats->numBucketsClosedDueToSchemaChange.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsClosedDueToSchemaChange.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToSize(long long increment) {
+ _collectionStats->numBucketsClosedDueToSize.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsClosedDueToSize.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToTimeForward(
+ long long increment) {
+ _collectionStats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToTimeBackward(
+ long long increment) {
+ _collectionStats->numBucketsClosedDueToTimeBackward.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsClosedDueToTimeBackward.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToMemoryThreshold(
+ long long increment) {
+ _collectionStats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumCommits(long long increment) {
+ _collectionStats->numCommits.fetchAndAddRelaxed(increment);
+ _globalStats->numCommits.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumWaits(long long increment) {
+ _collectionStats->numWaits.fetchAndAddRelaxed(increment);
+ _globalStats->numWaits.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumMeasurementsCommitted(long long increment) {
+ _collectionStats->numMeasurementsCommitted.fetchAndAddRelaxed(increment);
+ _globalStats->numMeasurementsCommitted.fetchAndAddRelaxed(increment);
+}
class BucketCatalog::Bucket {
public:
@@ -338,8 +388,7 @@ private:
/**
* Return a pointer to the current, open batch.
*/
- std::shared_ptr<WriteBatch> _activeBatch(OperationId opId,
- const std::shared_ptr<ExecutionStats>& stats) {
+ std::shared_ptr<WriteBatch> _activeBatch(OperationId opId, ExecutionStatsController& stats) {
auto it = _batches.find(opId);
if (it == _batches.end()) {
it =
@@ -422,23 +471,23 @@ struct BucketCatalog::CreationInfo {
StripeNumber stripe;
const Date_t& time;
const TimeseriesOptions& options;
- ExecutionStats* stats;
+ ExecutionStatsController& stats;
ClosedBuckets* closedBuckets;
bool openedDuetoMetadata = true;
};
BucketCatalog::WriteBatch::WriteBatch(const BucketHandle& bucket,
OperationId opId,
- const std::shared_ptr<ExecutionStats>& stats)
- : _bucket{bucket}, _opId(opId), _stats{stats} {}
+ ExecutionStatsController& stats)
+ : _bucket{bucket}, _opId(opId), _stats(stats) {}
bool BucketCatalog::WriteBatch::claimCommitRights() {
return !_commitRights.swap(true);
}
-StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() const {
+StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() {
if (!_promise.getFuture().isReady()) {
- _stats->numWaits.fetchAndAddRelaxed(1);
+ _stats.incNumWaits();
}
return _promise.getFuture().getNoThrow();
}
@@ -579,8 +628,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
}
auto time = timeElem.Date();
- auto stats = _getExecutionStats(ns);
- invariant(stats);
+ ExecutionStatsController stats = _getExecutionStats(ns);
BSONElement metadata;
auto metaFieldName = options.getMetaField();
@@ -594,7 +642,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
auto stripeNumber = _getStripeNumber(key);
ClosedBuckets closedBuckets;
- CreationInfo info{key, stripeNumber, time, options, stats.get(), &closedBuckets};
+ CreationInfo info{key, stripeNumber, time, options, stats, &closedBuckets};
auto& stripe = _stripes[stripeNumber];
stdx::lock_guard stripeLock{stripe.mutex};
@@ -613,24 +661,24 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
auto shouldCloseBucket = [&](Bucket* bucket) -> bool {
if (bucket->schemaIncompatible(doc, metaFieldName, comparator)) {
- stats->numBucketsClosedDueToSchemaChange.fetchAndAddRelaxed(1);
+ stats.incNumBucketsClosedDueToSchemaChange();
return true;
}
if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
- stats->numBucketsClosedDueToCount.fetchAndAddRelaxed(1);
+ stats.incNumBucketsClosedDueToCount();
return true;
}
if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
- stats->numBucketsClosedDueToSize.fetchAndAddRelaxed(1);
+ stats.incNumBucketsClosedDueToSize();
return true;
}
auto bucketTime = bucket->getTime();
if (time - bucketTime >= Seconds(*options.getBucketMaxSpanSeconds())) {
- stats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(1);
+ stats.incNumBucketsClosedDueToTimeForward();
return true;
}
if (time < bucketTime) {
- stats->numBucketsClosedDueToTimeBackward.fetchAndAddRelaxed(1);
+ stats.incNumBucketsClosedDueToTimeBackward();
return true;
}
return false;
@@ -727,14 +775,14 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
}
auto& stats = batch->_stats;
- stats->numCommits.fetchAndAddRelaxed(1);
+ stats.incNumCommits();
if (batch->numPreviouslyCommittedMeasurements() == 0) {
- stats->numBucketInserts.fetchAndAddRelaxed(1);
+ stats.incNumBucketInserts();
} else {
- stats->numBucketUpdates.fetchAndAddRelaxed(1);
+ stats.incNumBucketUpdates();
}
- stats->numMeasurementsCommitted.fetchAndAddRelaxed(batch->measurements().size());
+ stats.incNumMeasurementsCommitted(batch->measurements().size());
if (bucket) {
bucket->_numCommittedMeasurements += batch->measurements().size();
}
@@ -836,9 +884,8 @@ void BucketCatalog::clear(StringData dbName) {
clear([&dbName](const NamespaceString& bucketNs) { return bucketNs.db() == dbName; });
}
-void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const {
- const auto stats = _getExecutionStats(ns);
-
+void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats,
+ BSONObjBuilder* builder) const {
builder->appendNumber("numBucketInserts", stats->numBucketInserts.load());
builder->appendNumber("numBucketUpdates", stats->numBucketUpdates.load());
builder->appendNumber("numBucketsOpenedDueToMetadata",
@@ -863,6 +910,16 @@ void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuild
}
}
+
+void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const {
+ const std::shared_ptr<ExecutionStats> stats = _getExecutionStats(ns);
+ _appendExecutionStatsToBuilder(stats.get(), builder);
+}
+
+void BucketCatalog::appendGlobalExecutionStats(BSONObjBuilder* builder) const {
+ _appendExecutionStatsToBuilder(&_globalExecutionStats, builder);
+}
+
BucketCatalog::BucketMetadata::BucketMetadata(BSONElement elem,
const StringData::ComparatorInterface* comparator)
: _metadataElement(elem), _comparator(comparator) {
@@ -1078,7 +1135,7 @@ void BucketCatalog::_markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Buck
void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
WithLock stripeLock,
- ExecutionStats* stats,
+ ExecutionStatsController& stats,
std::vector<BucketCatalog::ClosedBucket>* closedBuckets) {
// As long as we still need space and have entries and remaining attempts, close idle buckets.
int32_t numClosed = 0;
@@ -1090,7 +1147,7 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()};
if (_removeBucket(stripe, stripeLock, bucket)) {
- stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1);
+ stats.incNumBucketsClosedDueToMemoryThreshold();
closedBuckets->push_back(closed);
++numClosed;
}
@@ -1112,7 +1169,7 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
_initializeBucketState(bucketId);
if (info.openedDuetoMetadata) {
- info.stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1);
+ info.stats.incNumBucketsOpenedDueToMetadata();
}
bucket->_timeField = info.options.getTimeField().toString();
@@ -1147,16 +1204,16 @@ BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe,
return _allocateBucket(stripe, stripeLock, info);
}
-std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats(
+BucketCatalog::ExecutionStatsController BucketCatalog::_getExecutionStats(
const NamespaceString& ns) {
stdx::lock_guard catalogLock{_mutex};
auto it = _executionStats.find(ns);
if (it != _executionStats.end()) {
- return it->second;
+ return {it->second, &_globalExecutionStats};
}
auto res = _executionStats.emplace(ns, std::make_shared<ExecutionStats>());
- return res.first->second;
+ return {res.first->second, &_globalExecutionStats};
}
const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats(
@@ -1276,6 +1333,10 @@ public:
builder.appendNumber("numIdleBuckets", static_cast<long long>(counts.idle));
builder.appendNumber("memoryUsage",
static_cast<long long>(bucketCatalog._memoryUsage.load()));
+
+ // Append the global execution stats for all namespaces.
+ bucketCatalog.appendGlobalExecutionStats(&builder);
+
return builder.obj();
}
} bucketCatalogServerStatus;
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 9807cdea1a1..2a3af67ec68 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -57,7 +57,45 @@ class BucketCatalog {
const StripeNumber stripe;
};
- struct ExecutionStats;
+ struct ExecutionStats {
+ AtomicWord<long long> numBucketInserts;
+ AtomicWord<long long> numBucketUpdates;
+ AtomicWord<long long> numBucketsOpenedDueToMetadata;
+ AtomicWord<long long> numBucketsClosedDueToCount;
+ AtomicWord<long long> numBucketsClosedDueToSchemaChange;
+ AtomicWord<long long> numBucketsClosedDueToSize;
+ AtomicWord<long long> numBucketsClosedDueToTimeForward;
+ AtomicWord<long long> numBucketsClosedDueToTimeBackward;
+ AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
+ AtomicWord<long long> numCommits;
+ AtomicWord<long long> numWaits;
+ AtomicWord<long long> numMeasurementsCommitted;
+ };
+
+ class ExecutionStatsController {
+ public:
+ ExecutionStatsController(const std::shared_ptr<ExecutionStats>& collectionStats,
+ ExecutionStats* globalStats)
+ : _collectionStats(collectionStats), _globalStats(globalStats) {}
+
+ void incNumBucketInserts(long long increment = 1);
+ void incNumBucketUpdates(long long increment = 1);
+ void incNumBucketsOpenedDueToMetadata(long long increment = 1);
+ void incNumBucketsClosedDueToCount(long long increment = 1);
+ void incNumBucketsClosedDueToSchemaChange(long long increment = 1);
+ void incNumBucketsClosedDueToSize(long long increment = 1);
+ void incNumBucketsClosedDueToTimeForward(long long increment = 1);
+ void incNumBucketsClosedDueToTimeBackward(long long increment = 1);
+ void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1);
+ void incNumCommits(long long increment = 1);
+ void incNumWaits(long long increment = 1);
+ void incNumMeasurementsCommitted(long long increment = 1);
+
+ private:
+ std::shared_ptr<ExecutionStats> _collectionStats;
+ ExecutionStats* _globalStats;
+ };
+
class Bucket;
struct CreationInfo;
@@ -98,9 +136,7 @@ public:
public:
WriteBatch() = delete;
- WriteBatch(const BucketHandle& bucketId,
- OperationId opId,
- const std::shared_ptr<ExecutionStats>& stats);
+ WriteBatch(const BucketHandle& bucketId, OperationId opId, ExecutionStatsController& stats);
/**
* Attempts to claim the right to commit a batch. If it returns true, rights are
@@ -113,7 +149,7 @@ public:
* Retrieves the result of the write batch commit. Should be called by any interested party
* that does not have commit rights. Blocking.
*/
- StatusWith<CommitInfo> getResult() const;
+ StatusWith<CommitInfo> getResult();
/**
* Returns a handle which can be used by the BucketCatalog internally to locate its record
@@ -165,7 +201,7 @@ public:
const BucketHandle _bucket;
OperationId _opId;
- std::shared_ptr<ExecutionStats> _stats;
+ ExecutionStatsController _stats;
std::vector<BSONObj> _measurements;
BSONObj _min; // Batch-local min; full if first batch, updates otherwise.
@@ -263,6 +299,11 @@ public:
*/
void appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const;
+ /**
+ * Appends the global execution stats for all namespaces to the builder.
+ */
+ void appendGlobalExecutionStats(BSONObjBuilder* builder) const;
+
private:
enum class BucketState {
// Bucket can be inserted into, and does not have an outstanding prepared commit
@@ -442,7 +483,7 @@ private:
*/
void _expireIdleBuckets(Stripe* stripe,
WithLock stripeLock,
- ExecutionStats* stats,
+ ExecutionStatsController& stats,
ClosedBuckets* closedBuckets);
/**
@@ -460,9 +501,11 @@ private:
Bucket* bucket,
const CreationInfo& info);
- std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns);
+ ExecutionStatsController _getExecutionStats(const NamespaceString& ns);
const std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const;
+ void _appendExecutionStatsToBuilder(const ExecutionStats* stats, BSONObjBuilder* builder) const;
+
/**
* Retreives the bucket state if it is tracked in the catalog.
*/
@@ -502,6 +545,9 @@ private:
// lock. The object itself is thread-safe (using atomics).
stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats;
+ // Global execution stats used to report aggregated metrics in server status.
+ ExecutionStats _globalExecutionStats;
+
// Approximate memory usage of the bucket catalog.
AtomicWord<uint64_t> _memoryUsage;