diff options
author | David Wang <david.wang@mongodb.com> | 2022-07-15 14:47:24 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-15 16:27:33 +0000 |
commit | a16f7f1ee85ac854db4e92294ceb392043623fea (patch) | |
tree | e02a4ecb22d4456a0d46c97f9f995805c98b9e01 | |
parent | d8616301f12951868e52e19cd696cff620caa28e (diff) | |
download | mongo-a16f7f1ee85ac854db4e92294ceb392043623fea.tar.gz |
SERVER-66695 Introduce BucketCatalog clear registry
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 28 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_era_test.cpp | 93 |
3 files changed, 165 insertions, 29 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 2fa543fe051..f7eecbff245 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -318,25 +318,39 @@ uint64_t BucketCatalog::EraManager::getEra() { return _era; } -void BucketCatalog::EraManager::incrementEra() { +uint64_t BucketCatalog::EraManager::incrementEra() { stdx::lock_guard lk{*_mutex}; - ++_era; + return ++_era; } uint64_t BucketCatalog::EraManager::getEraAndIncrementCount() { stdx::lock_guard lk{*_mutex}; - auto it = _countMap.find(_era); - if (it == _countMap.end()) { - (_countMap)[_era] = 1; - } else { - ++it->second; - } + _incrementEraCountHelper(_era); return _era; } void BucketCatalog::EraManager::decrementCountForEra(uint64_t value) { stdx::lock_guard lk{*_mutex}; + _decrementEraCountHelper(value); +} + +uint64_t BucketCatalog::EraManager::getCountForEra(uint64_t value) { + stdx::lock_guard lk{*_mutex}; auto it = _countMap.find(value); + if (it == _countMap.end()) { + return 0; + } else { + return it->second; + } +} + +void BucketCatalog::EraManager::insertToRegistry(uint64_t era, ShouldClearFn&& shouldClear) { + stdx::lock_guard lk{*_mutex}; + _clearRegistry[era] = std::move(shouldClear); +} + +void BucketCatalog::EraManager::_decrementEraCountHelper(uint64_t era) { + auto it = _countMap.find(era); invariant(it != _countMap.end()); if (it->second == 1) { _countMap.erase(it); @@ -345,14 +359,29 @@ void BucketCatalog::EraManager::decrementCountForEra(uint64_t value) { } } -uint64_t BucketCatalog::EraManager::getCountForEra(uint64_t value) { - stdx::lock_guard lk{*_mutex}; - auto it = _countMap.find(value); +void BucketCatalog::EraManager::_incrementEraCountHelper(uint64_t era) { + auto it = _countMap.find(era); if (it == _countMap.end()) { - return uint64_t(0); + (_countMap)[era] = 1; } else { - return it->second; + ++it->second; + } +} + +bool BucketCatalog::EraManager::hasBeenCleared(Bucket* bucket) { + stdx::lock_guard lk{*_mutex}; + for (auto it = _clearRegistry.find(bucket->getEra() + 1); it != _clearRegistry.end(); ++it) { + if (it->second(bucket->_ns)) { + return true; + } } + if (bucket->getEra() != _era) { + _decrementEraCountHelper(bucket->getEra()); + _incrementEraCountHelper(_era); + bucket->setEra(_era); + } + + return false; } BucketCatalog::Bucket::Bucket(const OID& id, @@ -366,13 +395,17 @@ BucketCatalog::Bucket::Bucket(const OID& id, _keyHash(hash) {} BucketCatalog::Bucket::~Bucket() { - _eraManager->decrementCountForEra(era()); + _eraManager->decrementCountForEra(getEra()); } -uint64_t BucketCatalog::Bucket::era() const { +uint64_t BucketCatalog::Bucket::getEra() const { return _lastCheckedEra; } +void BucketCatalog::Bucket::setEra(uint64_t era) { + _lastCheckedEra = era; +} + const OID& BucketCatalog::Bucket::id() const { return _id; } @@ -799,9 +832,7 @@ void BucketCatalog::clear(const OID& oid) { } } -void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& shouldClear) { - _eraManager.incrementEra(); - +void BucketCatalog::clear(ShouldClearFn&& shouldClear) { for (auto& stripe : _stripes) { stdx::lock_guard stripeLock{stripe.mutex}; for (auto it = stripe.allBuckets.begin(); it != stripe.allBuckets.end();) { @@ -823,6 +854,12 @@ void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& sho it = nextIt; } } + + uint64_t era = _eraManager.incrementEra(); + if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility)) { + _eraManager.insertToRegistry(era, std::move(shouldClear)); + } } void BucketCatalog::clear(const NamespaceString& ns) { diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 987dde2a5ca..7f746ce3653 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -56,6 +56,8 @@ protected: using EraCountMap = stdx::unordered_map<uint64_t, uint64_t>; + using ShouldClearFn = std::function<bool(const NamespaceString&)>; + struct BucketHandle { const OID id; const StripeNumber stripe; @@ -351,7 +353,7 @@ public: /** * Clears any bucket whose namespace satisfies the predicate. */ - void clear(const std::function<bool(const NamespaceString&)>& shouldClear); + void clear(ShouldClearFn&& shouldClear); /** * Clears the buckets for the given namespace. @@ -529,12 +531,26 @@ protected: public: EraManager(Mutex* m); uint64_t getEra(); - void incrementEra(); + uint64_t incrementEra(); uint64_t getEraAndIncrementCount(); void decrementCountForEra(uint64_t value); uint64_t getCountForEra(uint64_t value); + // Records a clear operation in the clearRegistry. The key is the new era after the clear + // operation has occurred, and the value is a function which takes a namespace and returns + // whether this namespace should be cleared. + void insertToRegistry(uint64_t era, + std::function<bool(const NamespaceString&)>&& shouldClear); + + // Returns whether the Bucket has been marked as cleared by checking against the + // clearRegistry. Advances Bucket's era up to current global era if the bucket has not been + // cleared. + bool hasBeenCleared(Bucket* bucket); + private: + void _decrementEraCountHelper(uint64_t era); + void _incrementEraCountHelper(uint64_t era); + // Pointer to 'BucketCatalog::_mutex'. Mutex* _mutex; @@ -544,6 +560,10 @@ protected: // Mapping of era to counts of how many buckets are associated with that era. EraCountMap _countMap; + + // Registry storing clear operations. Maps from era to a lambda function which takes in + // information about a Bucket and returns whether the Bucket has been cleared. + std::map<uint64_t, ShouldClearFn> _clearRegistry; }; /** @@ -560,7 +580,9 @@ protected: ~Bucket(); - uint64_t era() const; + uint64_t getEra() const; + + void setEra(uint64_t era); /** * Returns the ID for the underlying bucket. diff --git a/src/mongo/db/timeseries/bucket_catalog_era_test.cpp b/src/mongo/db/timeseries/bucket_catalog_era_test.cpp index 9998f0a44d6..89ef24076d0 100644 --- a/src/mongo/db/timeseries/bucket_catalog_era_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_era_test.cpp @@ -27,6 +27,7 @@ * it in the license file. */ +#include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/idl/server_parameter_test_util.h" #include "mongo/unittest/bson_test_util.h" @@ -40,9 +41,31 @@ public: Bucket* createBucket(const CreationInfo& info) { auto ptr = _allocateBucket(&_stripes[info.stripe], withLock, info); ptr->setNamespace(info.key.ns); + ASSERT_FALSE(_eraManager.hasBeenCleared(ptr)); return ptr; } + void clearForTest(const NamespaceString& ns) { + clearForTest([&ns](const NamespaceString& bucketNs) { return bucketNs == ns; }); + } + + void clearForTest(std::function<bool(const NamespaceString&)>&& shouldClear) { + uint64_t era = _eraManager.incrementEra(); + if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility)) { + _eraManager.insertToRegistry(era, std::move(shouldClear)); + } + } + + bool cannotAccessBucket(Bucket* bucket) { + if (_eraManager.hasBeenCleared(bucket)) { + _removeBucket(&_stripes[bucket->stripe()], withLock, bucket, false); + return true; + } else { + return false; + } + } + Stripe stripe; WithLock withLock = WithLock::withoutLock(); NamespaceString ns1{"db.test1"}; @@ -71,7 +94,7 @@ TEST_F(BucketCatalogEraTest, EraAdvancesAsExpected) { ASSERT_EQ(_eraManager.getEra(), 0); auto bucket1 = createBucket(info1); ASSERT_EQ(_eraManager.getEra(), 0); - ASSERT_EQ(bucket1->era(), 0); + ASSERT_EQ(bucket1->getEra(), 0); // When clearing buckets, we expect the BucketCatalog's era value to increase while the cleared // bucket era values should remain unchanged. @@ -85,11 +108,11 @@ TEST_F(BucketCatalogEraTest, EraAdvancesAsExpected) { auto bucket2 = createBucket(info1); auto bucket3 = createBucket(info2); ASSERT_EQ(_eraManager.getEra(), 1); - ASSERT_EQ(bucket2->era(), 1); - ASSERT_EQ(bucket3->era(), 1); + ASSERT_EQ(bucket2->getEra(), 1); + ASSERT_EQ(bucket3->getEra(), 1); clear(ns1); ASSERT_EQ(_eraManager.getEra(), 2); - ASSERT_EQ(bucket3->era(), 1); + ASSERT_EQ(bucket3->getEra(), 1); // TODO (SERVER-66698): Add checks on the buckets' era values. // ASSERT_EQ(b1->era(), 0); // ASSERT_EQ(b2->era(), 1); @@ -102,7 +125,7 @@ TEST_F(BucketCatalogEraTest, EraCountMapUpdatedCorrectly) { // TODO (SERVER-66698): Change count assertions now that Buckets are cleared lazily. // Creating a bucket in a new era should add a counter for that era to the map. auto bucket1 = createBucket(info1); - ASSERT_EQ(bucket1->era(), 0); + ASSERT_EQ(bucket1->getEra(), 0); ASSERT_EQ(_eraManager.getCountForEra(0), 1); clear(ns1); @@ -113,8 +136,8 @@ TEST_F(BucketCatalogEraTest, EraCountMapUpdatedCorrectly) { // map. auto bucket2 = createBucket(info1); auto bucket3 = createBucket(info2); - ASSERT_EQ(bucket2->era(), 1); - ASSERT_EQ(bucket3->era(), 1); + ASSERT_EQ(bucket2->getEra(), 1); + ASSERT_EQ(bucket3->getEra(), 1); ASSERT_EQ(_eraManager.getCountForEra(1), 2); clear(ns2); ASSERT_EQ(_eraManager.getCountForEra(1), 1); @@ -122,13 +145,67 @@ TEST_F(BucketCatalogEraTest, EraCountMapUpdatedCorrectly) { // A bucket in one era being destroyed and the counter decrementing should not affect a // different era's counter. auto bucket4 = createBucket(info2); - ASSERT_EQ(bucket4->era(), 2); + ASSERT_EQ(bucket4->getEra(), 2); ASSERT_EQ(_eraManager.getCountForEra(2), 1); clear(ns2); ASSERT_EQ(_eraManager.getCountForEra(2), 0); ASSERT_EQ(_eraManager.getCountForEra(1), 1); } +TEST_F(BucketCatalogEraTest, HasBeenClearedFunctionReturnsAsExpected) { + RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", + true}; + + auto bucket1 = createBucket(info1); + auto bucket2 = createBucket(info2); + ASSERT_EQ(bucket1->getEra(), 0); + ASSERT_EQ(bucket2->getEra(), 0); + + // After a clear operation, _hasBeenCleared returns whether a particular bucket was cleared or + // not. It also advances the bucket's era up to the most recent era. + ASSERT_FALSE(cannotAccessBucket(bucket1)); + ASSERT_FALSE(cannotAccessBucket(bucket2)); + ASSERT_EQ(_eraManager.getCountForEra(0), 2); + clearForTest(ns2); + ASSERT_FALSE(cannotAccessBucket(bucket1)); + ASSERT_EQ(_eraManager.getCountForEra(0), 1); + ASSERT_EQ(bucket1->getEra(), 1); + ASSERT(cannotAccessBucket(bucket2)); + + // Sanity check that all this still works with multiple buckets in a namespace being cleared. + auto bucket3 = createBucket(info2); + auto bucket4 = createBucket(info2); + ASSERT_EQ(bucket3->getEra(), 1); + ASSERT_EQ(bucket4->getEra(), 1); + clearForTest(ns2); + ASSERT(cannotAccessBucket(bucket3)); + ASSERT(cannotAccessBucket(bucket4)); + auto bucket5 = createBucket(info2); + ASSERT_EQ(bucket5->getEra(), 2); + clearForTest(ns2); + ASSERT(cannotAccessBucket(bucket5)); + // _hasBeenCleared should be able to advance a bucket by multiple eras. + ASSERT_EQ(bucket1->getEra(), 1); + ASSERT_EQ(_eraManager.getCountForEra(1), 1); + ASSERT_EQ(_eraManager.getCountForEra(3), 0); + ASSERT_FALSE(cannotAccessBucket(bucket1)); + ASSERT_EQ(bucket1->getEra(), 3); + ASSERT_EQ(_eraManager.getCountForEra(1), 0); + ASSERT_EQ(_eraManager.getCountForEra(3), 1); + + // _hasBeenCleared works even if the bucket wasn't cleared in the most recent clear. + clearForTest(ns1); + auto bucket6 = createBucket(info2); + ASSERT_EQ(bucket6->getEra(), 4); + clearForTest(ns2); + ASSERT_EQ(_eraManager.getCountForEra(3), 1); + ASSERT_EQ(_eraManager.getCountForEra(4), 1); + ASSERT(cannotAccessBucket(bucket1)); + ASSERT(cannotAccessBucket(bucket6)); + ASSERT_EQ(_eraManager.getCountForEra(3), 0); + ASSERT_EQ(_eraManager.getCountForEra(4), 0); +} + } // namespace } // namespace mongo |