From 9067d947c9c75146ea889f334e5c5cd17e94baba Mon Sep 17 00:00:00 2001 From: Yuhong Zhang Date: Mon, 15 May 2023 20:11:29 +0000 Subject: SERVER-77151 Make the number of stripes configurable for the bucket catalog --- .../db/timeseries/bucket_catalog/bucket_catalog.h | 18 +++++--- .../bucket_catalog/bucket_catalog_internal.cpp | 6 +-- .../bucket_catalog/bucket_catalog_internal.h | 2 +- .../bucket_catalog/bucket_catalog_test.cpp | 40 ++++++++++++++++- .../bucket_catalog/bucket_state_registry_test.cpp | 50 ++++++++++++++-------- 5 files changed, 86 insertions(+), 30 deletions(-) diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h index 90fb5d2e4ef..1f14f683b7d 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h @@ -127,7 +127,9 @@ public: static BucketCatalog& get(ServiceContext* svcCtx); static BucketCatalog& get(OperationContext* opCtx); - BucketCatalog() = default; + BucketCatalog() : stripes(numberOfStripes) {} + BucketCatalog(size_t numberOfStripes) + : numberOfStripes(numberOfStripes), stripes(numberOfStripes){}; BucketCatalog(const BucketCatalog&) = delete; BucketCatalog operator=(const BucketCatalog&) = delete; @@ -135,9 +137,10 @@ public: BucketStateRegistry bucketStateRegistry; // The actual buckets in the catalog are distributed across a number of 'Stripe's. Each can be - // independently locked and operated on in parallel. - static constexpr std::size_t kNumberOfStripes = 32; - std::array stripes; + // independently locked and operated on in parallel. The size of the stripe vector should not be + // changed after initialization. + const std::size_t numberOfStripes = 32; + std::vector stripes; // Per-namespace execution stats. This map is protected by 'mutex'. Once you complete your // lookup, you can keep the shared_ptr to an individual namespace's stats object and release the @@ -168,8 +171,9 @@ BSONObj getMetadata(BucketCatalog& catalog, const BucketHandle& bucket); /** * Tries to insert 'doc' into a suitable bucket. If an open bucket is full (or has incompatible * schema), but is otherwise suitable, we will close it and open a new bucket. If we find no bucket - * with matching data and a time range that can accomodate 'doc', we will not open a new bucket, but - * rather let the caller know to search for an archived or closed bucket that can accomodate 'doc'. + * with matching data and a time range that can accommodate 'doc', we will not open a new bucket, + * but rather let the caller know to search for an archived or closed bucket that can accommodate + * 'doc'. * * If a suitable bucket is found or opened, returns the WriteBatch into which 'doc' was inserted and * a list of any buckets that were closed to make space to insert 'doc'. Any caller who receives the @@ -178,7 +182,7 @@ BSONObj getMetadata(BucketCatalog& catalog, const BucketHandle& bucket); * * If no suitable bucket is found or opened, returns an optional bucket ID. If set, the bucket ID * corresponds to an archived bucket which should be fetched; otherwise the caller should search for - * a previously-closed bucket that can accomodate 'doc'. The caller should proceed to call 'insert' + * a previously-closed bucket that can accommodate 'doc'. The caller should proceed to call 'insert' * to insert 'doc', passing any fetched bucket. */ StatusWith tryInsert(OperationContext* opCtx, diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp index 50acf2890ac..cd5ec26c81d 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp @@ -109,11 +109,11 @@ void abortWriteBatch(WriteBatch& batch, const Status& status) { } } // namespace -StripeNumber getStripeNumber(const BucketKey& key) { +StripeNumber getStripeNumber(const BucketKey& key, size_t numberOfStripes) { if (MONGO_unlikely(alwaysUseSameBucketCatalogStripe.shouldFail())) { return 0; } - return key.hash % BucketCatalog::kNumberOfStripes; + return key.hash % numberOfStripes; } StatusWith> extractBucketingParameters( @@ -631,7 +631,7 @@ StatusWith insert(OperationContext* opCtx, // Buckets are spread across independently-lockable stripes to improve parallelism. We map a // bucket to a stripe by hashing the BucketKey. - auto stripeNumber = getStripeNumber(key); + auto stripeNumber = getStripeNumber(key, catalog.numberOfStripes); InsertResult result; result.catalogEra = getCurrentEra(catalog.bucketStateRegistry); diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h index 4b5c62f72f0..65e713355f4 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h @@ -72,7 +72,7 @@ enum class IgnoreBucketState { kYes, kNo }; /** * Maps bucket key to the stripe that is responsible for it. */ -StripeNumber getStripeNumber(const BucketKey& key); +StripeNumber getStripeNumber(const BucketKey& key, size_t numberOfStripes); /** * Extracts the information from the input 'doc' that is used to map the document to a bucket. diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp index a0d5e879773..4e16dd68ba3 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp @@ -272,7 +272,7 @@ Status BucketCatalogTest::_reopenBucket(const CollectionPtr& coll, const BSONObj } auto bucket = std::move(res.getValue()); - auto stripeNumber = internal::getStripeNumber(key); + auto stripeNumber = internal::getStripeNumber(key, _bucketCatalog->numberOfStripes); // Register the reopened bucket with the catalog. auto& stripe = _bucketCatalog->stripes[stripeNumber]; @@ -395,6 +395,44 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { } } +TEST_F(BucketCatalogTest, InsertThroughDifferentCatalogsIntoDifferentBuckets) { + BucketCatalog temporaryBucketCatalog(/*numberOfStripes=*/1); + auto result1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow); + auto batch1 = result1.getValue().batch; + auto result2 = insert(_opCtx, + temporaryBucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow); + auto batch2 = result2.getValue().batch; + + // Inserts should be into different buckets (and therefore batches) because they went through + // different bucket catalogs. + ASSERT_NE(batch1, batch2); + + // Committing one bucket should only return the one document in that bucket and should not + // affect the other bucket. + ASSERT(claimWriteBatchCommitRights(*batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); + ASSERT_EQ(batch1->measurements.size(), 1); + ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); + finish(*_bucketCatalog, batch1, {}); + + ASSERT(claimWriteBatchCommitRights(*batch2)); + ASSERT_OK(prepareCommit(temporaryBucketCatalog, batch2)); + ASSERT_EQ(batch2->measurements.size(), 1); + ASSERT_EQ(batch2->numPreviouslyCommittedMeasurements, 0); + finish(temporaryBucketCatalog, batch2, {}); +} + TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) { auto result1 = insert( _opCtx, diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp index cb7ea2c97e5..c74f85b92f5 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp @@ -60,7 +60,7 @@ public: bool cannotAccessBucket(Bucket& bucket) { if (hasBeenCleared(bucket)) { internal::removeBucket(*this, - stripes[internal::getStripeNumber(bucket.key)], + stripes[internal::getStripeNumber(bucket.key, numberOfStripes)], withLock, bucket, internal::RemovalMode::kAbort); @@ -71,20 +71,22 @@ public: } void checkAndRemoveClearedBucket(Bucket& bucket) { - auto a = internal::findBucket(bucketStateRegistry, - stripes[internal::getStripeNumber(bucket.key)], - withLock, - bucket.bucketId, - internal::IgnoreBucketState::kYes); + auto a = + internal::findBucket(bucketStateRegistry, + stripes[internal::getStripeNumber(bucket.key, numberOfStripes)], + withLock, + bucket.bucketId, + internal::IgnoreBucketState::kYes); ASSERT(a == &bucket); - auto b = internal::findBucket(bucketStateRegistry, - stripes[internal::getStripeNumber(bucket.key)], - withLock, - bucket.bucketId, - internal::IgnoreBucketState::kNo); + auto b = + internal::findBucket(bucketStateRegistry, + stripes[internal::getStripeNumber(bucket.key, numberOfStripes)], + withLock, + bucket.bucketId, + internal::IgnoreBucketState::kNo); ASSERT(b == nullptr); internal::removeBucket(*this, - stripes[internal::getStripeNumber(bucket.key)], + stripes[internal::getStripeNumber(bucket.key, numberOfStripes)], withLock, bucket, internal::RemovalMode::kAbort); @@ -103,12 +105,24 @@ public: TimeseriesOptions options; ExecutionStatsController stats = internal::getOrInitializeExecutionStats(*this, ns1); ClosedBuckets closedBuckets; - internal::CreationInfo info1{ - bucketKey1, internal::getStripeNumber(bucketKey1), date, options, stats, &closedBuckets}; - internal::CreationInfo info2{ - bucketKey2, internal::getStripeNumber(bucketKey2), date, options, stats, &closedBuckets}; - internal::CreationInfo info3{ - bucketKey3, internal::getStripeNumber(bucketKey3), date, options, stats, &closedBuckets}; + internal::CreationInfo info1{bucketKey1, + internal::getStripeNumber(bucketKey1, numberOfStripes), + date, + options, + stats, + &closedBuckets}; + internal::CreationInfo info2{bucketKey2, + internal::getStripeNumber(bucketKey2, numberOfStripes), + date, + options, + stats, + &closedBuckets}; + internal::CreationInfo info3{bucketKey3, + internal::getStripeNumber(bucketKey3, numberOfStripes), + date, + options, + stats, + &closedBuckets}; }; TEST_F(BucketStateRegistryTest, BucketStateSetUnsetFlag) { -- cgit v1.2.1