diff options
author | Faustoleyva54 <fausto.leyva@mongodb.com> | 2022-11-07 22:37:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-07 23:23:54 +0000 |
commit | 924c1769d07b776695cf511b0fa87b23219128be (patch) | |
tree | b070ea74b6d96ab838add418e4606738460b7935 /src/mongo/db/timeseries | |
parent | 72c2339d1e0485488e3a3a7c9aa4833a6ad369c1 (diff) | |
download | mongo-924c1769d07b776695cf511b0fa87b23219128be.tar.gz |
SERVER-66691 Track statistics around fetching closed buckets in execution stats and serverStatus
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 31 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 11 |
3 files changed, 103 insertions, 7 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 854a2805d66..cb02b01a0d9 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -254,6 +254,36 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsKeptOpenDueToLargeMea _globalStats->numBucketsKeptOpenDueToLargeMeasurements.fetchAndAddRelaxed(increment); } +void BucketCatalog::ExecutionStatsController::incNumBucketFetchesFailed(long long increment) { + _collectionStats->numBucketFetchesFailed.fetchAndAddRelaxed(increment); + _globalStats->numBucketFetchesFailed.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketQueriesFailed(long long increment) { + _collectionStats->numBucketQueriesFailed.fetchAndAddRelaxed(increment); + _globalStats->numBucketQueriesFailed.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsFetched(long long increment) { + _collectionStats->numBucketsFetched.fetchAndAddRelaxed(increment); + _globalStats->numBucketsFetched.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsQueried(long long increment) { + _collectionStats->numBucketsQueried.fetchAndAddRelaxed(increment); + _globalStats->numBucketsQueried.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketReopeningsFailed(long long increment) { + _collectionStats->numBucketReopeningsFailed.fetchAndAddRelaxed(increment); + _globalStats->numBucketReopeningsFailed.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumDuplicateBucketsReopened(long long increment) { + _collectionStats->numDuplicateBucketsReopened.fetchAndAddRelaxed(increment); + _globalStats->numDuplicateBucketsReopened.fetchAndAddRelaxed(increment); +} + BucketCatalog::BucketStateManager::BucketStateManager(Mutex* m) : _mutex(m), _era(0) {} uint64_t BucketCatalog::BucketStateManager::getEra() { @@ -839,9 +869,9 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( const TimeseriesOptions& options, const BSONObj& doc, CombineWithInsertsFromOtherClients combine, - boost::optional<BucketToReopen> bucketToReopen) { + BucketFindResult bucketFindResult) { return _insert( - opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kYes, bucketToReopen); + opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kYes, bucketFindResult); } Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { @@ -1043,6 +1073,13 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, stats->numBucketsKeptOpenDueToLargeMeasurements.load()); builder->appendNumber("numBucketsClosedDueToCachePressure", stats->numBucketsClosedDueToCachePressure.load()); + builder->appendNumber("numBucketsFetched", stats->numBucketsFetched.load()); + builder->appendNumber("numBucketsQueried", stats->numBucketsQueried.load()); + builder->appendNumber("numBucketFetchesFailed", stats->numBucketFetchesFailed.load()); + builder->appendNumber("numBucketQueriesFailed", stats->numBucketQueriesFailed.load()); + builder->appendNumber("numBucketReopeningsFailed", stats->numBucketReopeningsFailed.load()); + builder->appendNumber("numDuplicateBucketsReopened", + stats->numDuplicateBucketsReopened.load()); } } @@ -1365,6 +1402,7 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe, // If the bucket wasn't inserted into the stripe, then that bucket is already open and we can // return the bucket 'it' points to. if (!inserted) { + stats.incNumDuplicateBucketsReopened(); _markBucketNotIdle(stripe, stripeLock, unownedBucket); return unownedBucket; } @@ -1404,7 +1442,8 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( const BSONObj& doc, CombineWithInsertsFromOtherClients combine, AllowBucketCreation mode, - boost::optional<BucketToReopen> bucketToReopen) { + BucketFindResult bucketFindResult) { + auto res = _extractBucketingParameters(ns, comparator, options, doc); if (!res.isOK()) { return res.getStatus(); @@ -1413,6 +1452,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( auto time = res.getValue().second; ExecutionStatsController stats = _getExecutionStats(ns); + _updateBucketFetchAndQueryStats(stats, bucketFindResult); // Buckets are spread across independently-lockable stripes to improve parallelism. We map a // bucket to a stripe by hashing the BucketKey. @@ -1421,10 +1461,12 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( InsertResult result; result.catalogEra = _bucketStateManager.getEra(); CreationInfo info{key, stripeNumber, time, options, stats, &result.closedBuckets}; + boost::optional<BucketToReopen> bucketToReopen = std::move(bucketFindResult.bucketToReopen); auto rehydratedBucket = _rehydrateBucket(opCtx, ns, comparator, options, stats, bucketToReopen, key); if (rehydratedBucket.getStatus().code() == ErrorCodes::WriteConflict) { + stats.incNumBucketReopeningsFailed(); return rehydratedBucket.getStatus(); } @@ -1453,6 +1495,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( return std::move(result); } else { + stats.incNumBucketReopeningsFailed(); return {ErrorCodes::WriteConflict, "Bucket may be stale"}; } } @@ -1997,6 +2040,25 @@ long long BucketCatalog::_marginalMemoryUsageForArchivedBucket(const ArchivedBuc : 0); } +void BucketCatalog::_updateBucketFetchAndQueryStats(ExecutionStatsController& stats, + const BucketFindResult& findResult) { + if (findResult.fetchedBucket) { + if (findResult.bucketToReopen.has_value()) { + stats.incNumBucketsFetched(); + } else { + stats.incNumBucketFetchesFailed(); + } + } + + if (findResult.queriedBucket) { + if (findResult.bucketToReopen.has_value()) { + stats.incNumBucketsQueried(); + } else { + stats.incNumBucketQueriesFailed(); + } + } +} + class BucketCatalog::ServerStatus : public ServerStatusSection { struct BucketCounts { BucketCounts& operator+=(const BucketCounts& other) { diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index cfe6658bab2..49dd7c47ceb 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -82,6 +82,12 @@ protected: AtomicWord<long long> numMeasurementsCommitted; AtomicWord<long long> numBucketsReopened; AtomicWord<long long> numBucketsKeptOpenDueToLargeMeasurements; + AtomicWord<long long> numBucketsFetched; + AtomicWord<long long> numBucketsQueried; + AtomicWord<long long> numBucketFetchesFailed; + AtomicWord<long long> numBucketQueriesFailed; + AtomicWord<long long> numBucketReopeningsFailed; + AtomicWord<long long> numDuplicateBucketsReopened; }; class ExecutionStatsController { @@ -109,6 +115,12 @@ protected: void incNumMeasurementsCommitted(long long increment = 1); void incNumBucketsReopened(long long increment = 1); void incNumBucketsKeptOpenDueToLargeMeasurements(long long increment = 1); + void incNumBucketsFetched(long long increment = 1); + void incNumBucketsQueried(long long increment = 1); + void incNumBucketFetchesFailed(long long increment = 1); + void incNumBucketQueriesFailed(long long increment = 1); + void incNumBucketReopeningsFailed(long long increment = 1); + void incNumDuplicateBucketsReopened(long long increment = 1); private: std::shared_ptr<ExecutionStats> _collectionStats; @@ -282,6 +294,14 @@ public: uint64_t catalogEra = 0; }; + struct BucketFindResult { + BucketFindResult() {} + + bool fetchedBucket{false}; + bool queriedBucket{false}; + boost::optional<BucketToReopen> bucketToReopen{boost::none}; + }; + static BucketCatalog& get(ServiceContext* svcCtx); static BucketCatalog& get(OperationContext* opCtx); @@ -347,7 +367,7 @@ public: const TimeseriesOptions& options, const BSONObj& doc, CombineWithInsertsFromOtherClients combine, - boost::optional<BucketToReopen> bucketToReopen = boost::none); + BucketFindResult bucketFindResult = {}); /** * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have @@ -946,7 +966,7 @@ protected: const BSONObj& doc, CombineWithInsertsFromOtherClients combine, AllowBucketCreation mode, - boost::optional<BucketToReopen> bucketToReopen = boost::none); + BucketFindResult bucketFindResult = {}); /** * Given an already-selected 'bucket', inserts 'doc' to the bucket if possible. If not, and @@ -1092,6 +1112,13 @@ protected: static long long _marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, bool onlyEntryForMatchingMetaHash); + /** + * Updates stats to reflect the status of bucket fetches and queries based off of the FindResult + * (which is populated when attempting to reopen a bucket). + */ + void _updateBucketFetchAndQueryStats(ExecutionStatsController& stats, + const BucketFindResult& findResult); + mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketCatalog::_mutex"); diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 7253111cbf2..d1ba2e0c4ac 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -1622,6 +1622,9 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { return autoColl->checkValidation(opCtx, bucketDoc); }; + BucketCatalog::BucketFindResult findResult; + findResult.bucketToReopen = BucketCatalog::BucketToReopen{bucketDoc, validator}; + // We should be able to pass in a valid bucket and insert into it. result = _bucketCatalog->insert( _opCtx, @@ -1630,7 +1633,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow, - BucketCatalog::BucketToReopen{bucketDoc, validator}); + findResult); ASSERT_OK(result.getStatus()); batch = result.getValue().batch; ASSERT(batch); @@ -1697,6 +1700,10 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { // previous era. _bucketCatalog->clear(OID()); + BucketCatalog::BucketFindResult findResult; + findResult.bucketToReopen = + BucketCatalog::BucketToReopen{bucketDoc, validator, result.getValue().catalogEra}; + // We should get an WriteConflict back if we pass in an outdated bucket. result = _bucketCatalog->insert( _opCtx, @@ -1705,7 +1712,7 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { _getTimeseriesOptions(_ns1), ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow, - BucketCatalog::BucketToReopen{bucketDoc, validator, result.getValue().catalogEra}); + findResult); ASSERT_NOT_OK(result.getStatus()); ASSERT_EQ(result.getStatus().code(), ErrorCodes::WriteConflict); } |