summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries/bucket_catalog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog.cpp')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp73
1 files changed, 55 insertions, 18 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 2fa543fe051..f7eecbff245 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -318,25 +318,39 @@ uint64_t BucketCatalog::EraManager::getEra() {
return _era;
}
-void BucketCatalog::EraManager::incrementEra() {
+uint64_t BucketCatalog::EraManager::incrementEra() {
stdx::lock_guard lk{*_mutex};
- ++_era;
+ return ++_era;
}
uint64_t BucketCatalog::EraManager::getEraAndIncrementCount() {
stdx::lock_guard lk{*_mutex};
- auto it = _countMap.find(_era);
- if (it == _countMap.end()) {
- (_countMap)[_era] = 1;
- } else {
- ++it->second;
- }
+ _incrementEraCountHelper(_era);
return _era;
}
void BucketCatalog::EraManager::decrementCountForEra(uint64_t value) {
stdx::lock_guard lk{*_mutex};
+ _decrementEraCountHelper(value);
+}
+
+uint64_t BucketCatalog::EraManager::getCountForEra(uint64_t value) {
+ stdx::lock_guard lk{*_mutex};
auto it = _countMap.find(value);
+ if (it == _countMap.end()) {
+ return 0;
+ } else {
+ return it->second;
+ }
+}
+
+void BucketCatalog::EraManager::insertToRegistry(uint64_t era, ShouldClearFn&& shouldClear) {
+ stdx::lock_guard lk{*_mutex};
+ _clearRegistry[era] = std::move(shouldClear);
+}
+
+void BucketCatalog::EraManager::_decrementEraCountHelper(uint64_t era) {
+ auto it = _countMap.find(era);
invariant(it != _countMap.end());
if (it->second == 1) {
_countMap.erase(it);
@@ -345,14 +359,29 @@ void BucketCatalog::EraManager::decrementCountForEra(uint64_t value) {
}
}
-uint64_t BucketCatalog::EraManager::getCountForEra(uint64_t value) {
- stdx::lock_guard lk{*_mutex};
- auto it = _countMap.find(value);
+void BucketCatalog::EraManager::_incrementEraCountHelper(uint64_t era) {
+ auto it = _countMap.find(era);
if (it == _countMap.end()) {
- return uint64_t(0);
+ (_countMap)[era] = 1;
} else {
- return it->second;
+ ++it->second;
+ }
+}
+
+bool BucketCatalog::EraManager::hasBeenCleared(Bucket* bucket) {
+ stdx::lock_guard lk{*_mutex};
+ for (auto it = _clearRegistry.find(bucket->getEra() + 1); it != _clearRegistry.end(); ++it) {
+ if (it->second(bucket->_ns)) {
+ return true;
+ }
}
+ if (bucket->getEra() != _era) {
+ _decrementEraCountHelper(bucket->getEra());
+ _incrementEraCountHelper(_era);
+ bucket->setEra(_era);
+ }
+
+ return false;
}
BucketCatalog::Bucket::Bucket(const OID& id,
@@ -366,13 +395,17 @@ BucketCatalog::Bucket::Bucket(const OID& id,
_keyHash(hash) {}
BucketCatalog::Bucket::~Bucket() {
- _eraManager->decrementCountForEra(era());
+ _eraManager->decrementCountForEra(getEra());
}
-uint64_t BucketCatalog::Bucket::era() const {
+uint64_t BucketCatalog::Bucket::getEra() const {
return _lastCheckedEra;
}
+void BucketCatalog::Bucket::setEra(uint64_t era) {
+ _lastCheckedEra = era;
+}
+
const OID& BucketCatalog::Bucket::id() const {
return _id;
}
@@ -799,9 +832,7 @@ void BucketCatalog::clear(const OID& oid) {
}
}
-void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& shouldClear) {
- _eraManager.incrementEra();
-
+void BucketCatalog::clear(ShouldClearFn&& shouldClear) {
for (auto& stripe : _stripes) {
stdx::lock_guard stripeLock{stripe.mutex};
for (auto it = stripe.allBuckets.begin(); it != stripe.allBuckets.end();) {
@@ -823,6 +854,12 @@ void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& sho
it = nextIt;
}
}
+
+ uint64_t era = _eraManager.incrementEra();
+ if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ _eraManager.insertToRegistry(era, std::move(shouldClear));
+ }
}
void BucketCatalog::clear(const NamespaceString& ns) {