summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Wang <david.wang@mongodb.com>2022-07-15 14:47:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-15 16:27:33 +0000
commita16f7f1ee85ac854db4e92294ceb392043623fea (patch)
treee02a4ecb22d4456a0d46c97f9f995805c98b9e01
parentd8616301f12951868e52e19cd696cff620caa28e (diff)
downloadmongo-a16f7f1ee85ac854db4e92294ceb392043623fea.tar.gz
SERVER-66695 Introduce BucketCatalog clear registry
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp73
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h28
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_era_test.cpp93
3 files changed, 165 insertions, 29 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 2fa543fe051..f7eecbff245 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -318,25 +318,39 @@ uint64_t BucketCatalog::EraManager::getEra() {
return _era;
}
-void BucketCatalog::EraManager::incrementEra() {
+uint64_t BucketCatalog::EraManager::incrementEra() {
stdx::lock_guard lk{*_mutex};
- ++_era;
+ return ++_era;
}
uint64_t BucketCatalog::EraManager::getEraAndIncrementCount() {
stdx::lock_guard lk{*_mutex};
- auto it = _countMap.find(_era);
- if (it == _countMap.end()) {
- (_countMap)[_era] = 1;
- } else {
- ++it->second;
- }
+ _incrementEraCountHelper(_era);
return _era;
}
void BucketCatalog::EraManager::decrementCountForEra(uint64_t value) {
stdx::lock_guard lk{*_mutex};
+ _decrementEraCountHelper(value);
+}
+
+uint64_t BucketCatalog::EraManager::getCountForEra(uint64_t value) {
+ stdx::lock_guard lk{*_mutex};
auto it = _countMap.find(value);
+ if (it == _countMap.end()) {
+ return 0;
+ } else {
+ return it->second;
+ }
+}
+
+void BucketCatalog::EraManager::insertToRegistry(uint64_t era, ShouldClearFn&& shouldClear) {
+ stdx::lock_guard lk{*_mutex};
+ _clearRegistry[era] = std::move(shouldClear);
+}
+
+void BucketCatalog::EraManager::_decrementEraCountHelper(uint64_t era) {
+ auto it = _countMap.find(era);
invariant(it != _countMap.end());
if (it->second == 1) {
_countMap.erase(it);
@@ -345,14 +359,29 @@ void BucketCatalog::EraManager::decrementCountForEra(uint64_t value) {
}
}
-uint64_t BucketCatalog::EraManager::getCountForEra(uint64_t value) {
- stdx::lock_guard lk{*_mutex};
- auto it = _countMap.find(value);
+void BucketCatalog::EraManager::_incrementEraCountHelper(uint64_t era) {
+ auto it = _countMap.find(era);
if (it == _countMap.end()) {
- return uint64_t(0);
+ (_countMap)[era] = 1;
} else {
- return it->second;
+ ++it->second;
+ }
+}
+
+bool BucketCatalog::EraManager::hasBeenCleared(Bucket* bucket) {
+ stdx::lock_guard lk{*_mutex};
+ for (auto it = _clearRegistry.find(bucket->getEra() + 1); it != _clearRegistry.end(); ++it) {
+ if (it->second(bucket->_ns)) {
+ return true;
+ }
}
+ if (bucket->getEra() != _era) {
+ _decrementEraCountHelper(bucket->getEra());
+ _incrementEraCountHelper(_era);
+ bucket->setEra(_era);
+ }
+
+ return false;
}
BucketCatalog::Bucket::Bucket(const OID& id,
@@ -366,13 +395,17 @@ BucketCatalog::Bucket::Bucket(const OID& id,
_keyHash(hash) {}
BucketCatalog::Bucket::~Bucket() {
- _eraManager->decrementCountForEra(era());
+ _eraManager->decrementCountForEra(getEra());
}
-uint64_t BucketCatalog::Bucket::era() const {
+uint64_t BucketCatalog::Bucket::getEra() const {
return _lastCheckedEra;
}
+void BucketCatalog::Bucket::setEra(uint64_t era) {
+ _lastCheckedEra = era;
+}
+
const OID& BucketCatalog::Bucket::id() const {
return _id;
}
@@ -799,9 +832,7 @@ void BucketCatalog::clear(const OID& oid) {
}
}
-void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& shouldClear) {
- _eraManager.incrementEra();
-
+void BucketCatalog::clear(ShouldClearFn&& shouldClear) {
for (auto& stripe : _stripes) {
stdx::lock_guard stripeLock{stripe.mutex};
for (auto it = stripe.allBuckets.begin(); it != stripe.allBuckets.end();) {
@@ -823,6 +854,12 @@ void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& sho
it = nextIt;
}
}
+
+ uint64_t era = _eraManager.incrementEra();
+ if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ _eraManager.insertToRegistry(era, std::move(shouldClear));
+ }
}
void BucketCatalog::clear(const NamespaceString& ns) {
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 987dde2a5ca..7f746ce3653 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -56,6 +56,8 @@ protected:
using EraCountMap = stdx::unordered_map<uint64_t, uint64_t>;
+ using ShouldClearFn = std::function<bool(const NamespaceString&)>;
+
struct BucketHandle {
const OID id;
const StripeNumber stripe;
@@ -351,7 +353,7 @@ public:
/**
* Clears any bucket whose namespace satisfies the predicate.
*/
- void clear(const std::function<bool(const NamespaceString&)>& shouldClear);
+ void clear(ShouldClearFn&& shouldClear);
/**
* Clears the buckets for the given namespace.
@@ -529,12 +531,26 @@ protected:
public:
EraManager(Mutex* m);
uint64_t getEra();
- void incrementEra();
+ uint64_t incrementEra();
uint64_t getEraAndIncrementCount();
void decrementCountForEra(uint64_t value);
uint64_t getCountForEra(uint64_t value);
+ // Records a clear operation in the clearRegistry. The key is the new era after the clear
+ // operation has occurred, and the value is a function which takes a namespace and returns
+ // whether this namespace should be cleared.
+ void insertToRegistry(uint64_t era,
+ std::function<bool(const NamespaceString&)>&& shouldClear);
+
+ // Returns whether the Bucket has been marked as cleared by checking against the
+ // clearRegistry. Advances Bucket's era up to current global era if the bucket has not been
+ // cleared.
+ bool hasBeenCleared(Bucket* bucket);
+
private:
+ void _decrementEraCountHelper(uint64_t era);
+ void _incrementEraCountHelper(uint64_t era);
+
// Pointer to 'BucketCatalog::_mutex'.
Mutex* _mutex;
@@ -544,6 +560,10 @@ protected:
// Mapping of era to counts of how many buckets are associated with that era.
EraCountMap _countMap;
+
+ // Registry storing clear operations. Maps from era to a lambda function which takes in
+ // information about a Bucket and returns whether the Bucket has been cleared.
+ std::map<uint64_t, ShouldClearFn> _clearRegistry;
};
/**
@@ -560,7 +580,9 @@ protected:
~Bucket();
- uint64_t era() const;
+ uint64_t getEra() const;
+
+ void setEra(uint64_t era);
/**
* Returns the ID for the underlying bucket.
diff --git a/src/mongo/db/timeseries/bucket_catalog_era_test.cpp b/src/mongo/db/timeseries/bucket_catalog_era_test.cpp
index 9998f0a44d6..89ef24076d0 100644
--- a/src/mongo/db/timeseries/bucket_catalog_era_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_era_test.cpp
@@ -27,6 +27,7 @@
* it in the license file.
*/
+#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/timeseries/bucket_catalog.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/unittest/bson_test_util.h"
@@ -40,9 +41,31 @@ public:
Bucket* createBucket(const CreationInfo& info) {
auto ptr = _allocateBucket(&_stripes[info.stripe], withLock, info);
ptr->setNamespace(info.key.ns);
+ ASSERT_FALSE(_eraManager.hasBeenCleared(ptr));
return ptr;
}
+ void clearForTest(const NamespaceString& ns) {
+ clearForTest([&ns](const NamespaceString& bucketNs) { return bucketNs == ns; });
+ }
+
+ void clearForTest(std::function<bool(const NamespaceString&)>&& shouldClear) {
+ uint64_t era = _eraManager.incrementEra();
+ if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ _eraManager.insertToRegistry(era, std::move(shouldClear));
+ }
+ }
+
+ bool cannotAccessBucket(Bucket* bucket) {
+ if (_eraManager.hasBeenCleared(bucket)) {
+ _removeBucket(&_stripes[bucket->stripe()], withLock, bucket, false);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
Stripe stripe;
WithLock withLock = WithLock::withoutLock();
NamespaceString ns1{"db.test1"};
@@ -71,7 +94,7 @@ TEST_F(BucketCatalogEraTest, EraAdvancesAsExpected) {
ASSERT_EQ(_eraManager.getEra(), 0);
auto bucket1 = createBucket(info1);
ASSERT_EQ(_eraManager.getEra(), 0);
- ASSERT_EQ(bucket1->era(), 0);
+ ASSERT_EQ(bucket1->getEra(), 0);
// When clearing buckets, we expect the BucketCatalog's era value to increase while the cleared
// bucket era values should remain unchanged.
@@ -85,11 +108,11 @@ TEST_F(BucketCatalogEraTest, EraAdvancesAsExpected) {
auto bucket2 = createBucket(info1);
auto bucket3 = createBucket(info2);
ASSERT_EQ(_eraManager.getEra(), 1);
- ASSERT_EQ(bucket2->era(), 1);
- ASSERT_EQ(bucket3->era(), 1);
+ ASSERT_EQ(bucket2->getEra(), 1);
+ ASSERT_EQ(bucket3->getEra(), 1);
clear(ns1);
ASSERT_EQ(_eraManager.getEra(), 2);
- ASSERT_EQ(bucket3->era(), 1);
+ ASSERT_EQ(bucket3->getEra(), 1);
// TODO (SERVER-66698): Add checks on the buckets' era values.
// ASSERT_EQ(b1->era(), 0);
// ASSERT_EQ(b2->era(), 1);
@@ -102,7 +125,7 @@ TEST_F(BucketCatalogEraTest, EraCountMapUpdatedCorrectly) {
// TODO (SERVER-66698): Change count assertions now that Buckets are cleared lazily.
// Creating a bucket in a new era should add a counter for that era to the map.
auto bucket1 = createBucket(info1);
- ASSERT_EQ(bucket1->era(), 0);
+ ASSERT_EQ(bucket1->getEra(), 0);
ASSERT_EQ(_eraManager.getCountForEra(0), 1);
clear(ns1);
@@ -113,8 +136,8 @@ TEST_F(BucketCatalogEraTest, EraCountMapUpdatedCorrectly) {
// map.
auto bucket2 = createBucket(info1);
auto bucket3 = createBucket(info2);
- ASSERT_EQ(bucket2->era(), 1);
- ASSERT_EQ(bucket3->era(), 1);
+ ASSERT_EQ(bucket2->getEra(), 1);
+ ASSERT_EQ(bucket3->getEra(), 1);
ASSERT_EQ(_eraManager.getCountForEra(1), 2);
clear(ns2);
ASSERT_EQ(_eraManager.getCountForEra(1), 1);
@@ -122,13 +145,67 @@ TEST_F(BucketCatalogEraTest, EraCountMapUpdatedCorrectly) {
// A bucket in one era being destroyed and the counter decrementing should not affect a
// different era's counter.
auto bucket4 = createBucket(info2);
- ASSERT_EQ(bucket4->era(), 2);
+ ASSERT_EQ(bucket4->getEra(), 2);
ASSERT_EQ(_eraManager.getCountForEra(2), 1);
clear(ns2);
ASSERT_EQ(_eraManager.getCountForEra(2), 0);
ASSERT_EQ(_eraManager.getCountForEra(1), 1);
}
+TEST_F(BucketCatalogEraTest, HasBeenClearedFunctionReturnsAsExpected) {
+ RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+
+ auto bucket1 = createBucket(info1);
+ auto bucket2 = createBucket(info2);
+ ASSERT_EQ(bucket1->getEra(), 0);
+ ASSERT_EQ(bucket2->getEra(), 0);
+
+ // After a clear operation, _hasBeenCleared returns whether a particular bucket was cleared or
+ // not. It also advances the bucket's era up to the most recent era.
+ ASSERT_FALSE(cannotAccessBucket(bucket1));
+ ASSERT_FALSE(cannotAccessBucket(bucket2));
+ ASSERT_EQ(_eraManager.getCountForEra(0), 2);
+ clearForTest(ns2);
+ ASSERT_FALSE(cannotAccessBucket(bucket1));
+ ASSERT_EQ(_eraManager.getCountForEra(0), 1);
+ ASSERT_EQ(bucket1->getEra(), 1);
+ ASSERT(cannotAccessBucket(bucket2));
+
+ // Sanity check that all this still works with multiple buckets in a namespace being cleared.
+ auto bucket3 = createBucket(info2);
+ auto bucket4 = createBucket(info2);
+ ASSERT_EQ(bucket3->getEra(), 1);
+ ASSERT_EQ(bucket4->getEra(), 1);
+ clearForTest(ns2);
+ ASSERT(cannotAccessBucket(bucket3));
+ ASSERT(cannotAccessBucket(bucket4));
+ auto bucket5 = createBucket(info2);
+ ASSERT_EQ(bucket5->getEra(), 2);
+ clearForTest(ns2);
+ ASSERT(cannotAccessBucket(bucket5));
+ // _hasBeenCleared should be able to advance a bucket by multiple eras.
+ ASSERT_EQ(bucket1->getEra(), 1);
+ ASSERT_EQ(_eraManager.getCountForEra(1), 1);
+ ASSERT_EQ(_eraManager.getCountForEra(3), 0);
+ ASSERT_FALSE(cannotAccessBucket(bucket1));
+ ASSERT_EQ(bucket1->getEra(), 3);
+ ASSERT_EQ(_eraManager.getCountForEra(1), 0);
+ ASSERT_EQ(_eraManager.getCountForEra(3), 1);
+
+ // _hasBeenCleared works even if the bucket wasn't cleared in the most recent clear.
+ clearForTest(ns1);
+ auto bucket6 = createBucket(info2);
+ ASSERT_EQ(bucket6->getEra(), 4);
+ clearForTest(ns2);
+ ASSERT_EQ(_eraManager.getCountForEra(3), 1);
+ ASSERT_EQ(_eraManager.getCountForEra(4), 1);
+ ASSERT(cannotAccessBucket(bucket1));
+ ASSERT(cannotAccessBucket(bucket6));
+ ASSERT_EQ(_eraManager.getCountForEra(3), 0);
+ ASSERT_EQ(_eraManager.getCountForEra(4), 0);
+}
+
} // namespace
} // namespace mongo