summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries
diff options
context:
space:
mode:
authorFaustoleyva54 <fausto.leyva@mongodb.com>2022-11-07 22:37:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-07 23:23:54 +0000
commit924c1769d07b776695cf511b0fa87b23219128be (patch)
treeb070ea74b6d96ab838add418e4606738460b7935 /src/mongo/db/timeseries
parent72c2339d1e0485488e3a3a7c9aa4833a6ad369c1 (diff)
downloadmongo-924c1769d07b776695cf511b0fa87b23219128be.tar.gz
SERVER-66691 Track statistics around fetching closed buckets in execution stats and serverStatus
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp68
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h31
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp11
3 files changed, 103 insertions, 7 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 854a2805d66..cb02b01a0d9 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -254,6 +254,36 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsKeptOpenDueToLargeMea
_globalStats->numBucketsKeptOpenDueToLargeMeasurements.fetchAndAddRelaxed(increment);
}
+void BucketCatalog::ExecutionStatsController::incNumBucketFetchesFailed(long long increment) {
+ _collectionStats->numBucketFetchesFailed.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketFetchesFailed.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketQueriesFailed(long long increment) {
+ _collectionStats->numBucketQueriesFailed.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketQueriesFailed.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsFetched(long long increment) {
+ _collectionStats->numBucketsFetched.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsFetched.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsQueried(long long increment) {
+ _collectionStats->numBucketsQueried.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsQueried.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketReopeningsFailed(long long increment) {
+ _collectionStats->numBucketReopeningsFailed.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketReopeningsFailed.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumDuplicateBucketsReopened(long long increment) {
+ _collectionStats->numDuplicateBucketsReopened.fetchAndAddRelaxed(increment);
+ _globalStats->numDuplicateBucketsReopened.fetchAndAddRelaxed(increment);
+}
+
BucketCatalog::BucketStateManager::BucketStateManager(Mutex* m) : _mutex(m), _era(0) {}
uint64_t BucketCatalog::BucketStateManager::getEra() {
@@ -839,9 +869,9 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
const TimeseriesOptions& options,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine,
- boost::optional<BucketToReopen> bucketToReopen) {
+ BucketFindResult bucketFindResult) {
return _insert(
- opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kYes, bucketToReopen);
+ opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kYes, bucketFindResult);
}
Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
@@ -1043,6 +1073,13 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats,
stats->numBucketsKeptOpenDueToLargeMeasurements.load());
builder->appendNumber("numBucketsClosedDueToCachePressure",
stats->numBucketsClosedDueToCachePressure.load());
+ builder->appendNumber("numBucketsFetched", stats->numBucketsFetched.load());
+ builder->appendNumber("numBucketsQueried", stats->numBucketsQueried.load());
+ builder->appendNumber("numBucketFetchesFailed", stats->numBucketFetchesFailed.load());
+ builder->appendNumber("numBucketQueriesFailed", stats->numBucketQueriesFailed.load());
+ builder->appendNumber("numBucketReopeningsFailed", stats->numBucketReopeningsFailed.load());
+ builder->appendNumber("numDuplicateBucketsReopened",
+ stats->numDuplicateBucketsReopened.load());
}
}
@@ -1365,6 +1402,7 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe,
// If the bucket wasn't inserted into the stripe, then that bucket is already open and we can
// return the bucket 'it' points to.
if (!inserted) {
+ stats.incNumDuplicateBucketsReopened();
_markBucketNotIdle(stripe, stripeLock, unownedBucket);
return unownedBucket;
}
@@ -1404,7 +1442,8 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine,
AllowBucketCreation mode,
- boost::optional<BucketToReopen> bucketToReopen) {
+ BucketFindResult bucketFindResult) {
+
auto res = _extractBucketingParameters(ns, comparator, options, doc);
if (!res.isOK()) {
return res.getStatus();
@@ -1413,6 +1452,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
auto time = res.getValue().second;
ExecutionStatsController stats = _getExecutionStats(ns);
+ _updateBucketFetchAndQueryStats(stats, bucketFindResult);
// Buckets are spread across independently-lockable stripes to improve parallelism. We map a
// bucket to a stripe by hashing the BucketKey.
@@ -1421,10 +1461,12 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
InsertResult result;
result.catalogEra = _bucketStateManager.getEra();
CreationInfo info{key, stripeNumber, time, options, stats, &result.closedBuckets};
+ boost::optional<BucketToReopen> bucketToReopen = std::move(bucketFindResult.bucketToReopen);
auto rehydratedBucket =
_rehydrateBucket(opCtx, ns, comparator, options, stats, bucketToReopen, key);
if (rehydratedBucket.getStatus().code() == ErrorCodes::WriteConflict) {
+ stats.incNumBucketReopeningsFailed();
return rehydratedBucket.getStatus();
}
@@ -1453,6 +1495,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
return std::move(result);
} else {
+ stats.incNumBucketReopeningsFailed();
return {ErrorCodes::WriteConflict, "Bucket may be stale"};
}
}
@@ -1997,6 +2040,25 @@ long long BucketCatalog::_marginalMemoryUsageForArchivedBucket(const ArchivedBuc
: 0);
}
+void BucketCatalog::_updateBucketFetchAndQueryStats(ExecutionStatsController& stats,
+ const BucketFindResult& findResult) {
+ if (findResult.fetchedBucket) {
+ if (findResult.bucketToReopen.has_value()) {
+ stats.incNumBucketsFetched();
+ } else {
+ stats.incNumBucketFetchesFailed();
+ }
+ }
+
+ if (findResult.queriedBucket) {
+ if (findResult.bucketToReopen.has_value()) {
+ stats.incNumBucketsQueried();
+ } else {
+ stats.incNumBucketQueriesFailed();
+ }
+ }
+}
+
class BucketCatalog::ServerStatus : public ServerStatusSection {
struct BucketCounts {
BucketCounts& operator+=(const BucketCounts& other) {
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index cfe6658bab2..49dd7c47ceb 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -82,6 +82,12 @@ protected:
AtomicWord<long long> numMeasurementsCommitted;
AtomicWord<long long> numBucketsReopened;
AtomicWord<long long> numBucketsKeptOpenDueToLargeMeasurements;
+ AtomicWord<long long> numBucketsFetched;
+ AtomicWord<long long> numBucketsQueried;
+ AtomicWord<long long> numBucketFetchesFailed;
+ AtomicWord<long long> numBucketQueriesFailed;
+ AtomicWord<long long> numBucketReopeningsFailed;
+ AtomicWord<long long> numDuplicateBucketsReopened;
};
class ExecutionStatsController {
@@ -109,6 +115,12 @@ protected:
void incNumMeasurementsCommitted(long long increment = 1);
void incNumBucketsReopened(long long increment = 1);
void incNumBucketsKeptOpenDueToLargeMeasurements(long long increment = 1);
+ void incNumBucketsFetched(long long increment = 1);
+ void incNumBucketsQueried(long long increment = 1);
+ void incNumBucketFetchesFailed(long long increment = 1);
+ void incNumBucketQueriesFailed(long long increment = 1);
+ void incNumBucketReopeningsFailed(long long increment = 1);
+ void incNumDuplicateBucketsReopened(long long increment = 1);
private:
std::shared_ptr<ExecutionStats> _collectionStats;
@@ -282,6 +294,14 @@ public:
uint64_t catalogEra = 0;
};
+ struct BucketFindResult {
+ BucketFindResult() {}
+
+ bool fetchedBucket{false};
+ bool queriedBucket{false};
+ boost::optional<BucketToReopen> bucketToReopen{boost::none};
+ };
+
static BucketCatalog& get(ServiceContext* svcCtx);
static BucketCatalog& get(OperationContext* opCtx);
@@ -347,7 +367,7 @@ public:
const TimeseriesOptions& options,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine,
- boost::optional<BucketToReopen> bucketToReopen = boost::none);
+ BucketFindResult bucketFindResult = {});
/**
* Prepares a batch for commit, transitioning it to an inactive state. Caller must already have
@@ -946,7 +966,7 @@ protected:
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine,
AllowBucketCreation mode,
- boost::optional<BucketToReopen> bucketToReopen = boost::none);
+ BucketFindResult bucketFindResult = {});
/**
* Given an already-selected 'bucket', inserts 'doc' to the bucket if possible. If not, and
@@ -1092,6 +1112,13 @@ protected:
static long long _marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket,
bool onlyEntryForMatchingMetaHash);
+ /**
+ * Updates stats to reflect the status of bucket fetches and queries based off of the FindResult
+ * (which is populated when attempting to reopen a bucket).
+ */
+ void _updateBucketFetchAndQueryStats(ExecutionStatsController& stats,
+ const BucketFindResult& findResult);
+
mutable Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketCatalog::_mutex");
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 7253111cbf2..d1ba2e0c4ac 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -1622,6 +1622,9 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
return autoColl->checkValidation(opCtx, bucketDoc);
};
+ BucketCatalog::BucketFindResult findResult;
+ findResult.bucketToReopen = BucketCatalog::BucketToReopen{bucketDoc, validator};
+
// We should be able to pass in a valid bucket and insert into it.
result = _bucketCatalog->insert(
_opCtx,
@@ -1630,7 +1633,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow,
- BucketCatalog::BucketToReopen{bucketDoc, validator});
+ findResult);
ASSERT_OK(result.getStatus());
batch = result.getValue().batch;
ASSERT(batch);
@@ -1697,6 +1700,10 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) {
// previous era.
_bucketCatalog->clear(OID());
+ BucketCatalog::BucketFindResult findResult;
+ findResult.bucketToReopen =
+ BucketCatalog::BucketToReopen{bucketDoc, validator, result.getValue().catalogEra};
+
// We should get an WriteConflict back if we pass in an outdated bucket.
result = _bucketCatalog->insert(
_opCtx,
@@ -1705,7 +1712,7 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) {
_getTimeseriesOptions(_ns1),
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow,
- BucketCatalog::BucketToReopen{bucketDoc, validator, result.getValue().catalogEra});
+ findResult);
ASSERT_NOT_OK(result.getStatus());
ASSERT_EQ(result.getStatus().code(), ErrorCodes::WriteConflict);
}