summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYuhong Zhang <yuhong.zhang@mongodb.com>2023-05-15 20:11:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-16 19:41:57 +0000
commit9067d947c9c75146ea889f334e5c5cd17e94baba (patch)
tree514aecf8108f2f0185c0791d21298fbfbc10bd42
parent2dfb0409715bfc35e12d7622e1c6e0a4727db1aa (diff)
downloadmongo-9067d947c9c75146ea889f334e5c5cd17e94baba.tar.gz
SERVER-77151 Make the number of stripes configurable for the bucket catalog
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h18
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp6
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h2
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp40
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp50
5 files changed, 86 insertions, 30 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h
index 90fb5d2e4ef..1f14f683b7d 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h
@@ -127,7 +127,9 @@ public:
static BucketCatalog& get(ServiceContext* svcCtx);
static BucketCatalog& get(OperationContext* opCtx);
- BucketCatalog() = default;
+ BucketCatalog() : stripes(numberOfStripes) {}
+ BucketCatalog(size_t numberOfStripes)
+ : numberOfStripes(numberOfStripes), stripes(numberOfStripes){};
BucketCatalog(const BucketCatalog&) = delete;
BucketCatalog operator=(const BucketCatalog&) = delete;
@@ -135,9 +137,10 @@ public:
BucketStateRegistry bucketStateRegistry;
// The actual buckets in the catalog are distributed across a number of 'Stripe's. Each can be
- // independently locked and operated on in parallel.
- static constexpr std::size_t kNumberOfStripes = 32;
- std::array<Stripe, kNumberOfStripes> stripes;
+ // independently locked and operated on in parallel. The size of the stripe vector should not be
+ // changed after initialization.
+ const std::size_t numberOfStripes = 32;
+ std::vector<Stripe> stripes;
// Per-namespace execution stats. This map is protected by 'mutex'. Once you complete your
// lookup, you can keep the shared_ptr to an individual namespace's stats object and release the
@@ -168,8 +171,9 @@ BSONObj getMetadata(BucketCatalog& catalog, const BucketHandle& bucket);
/**
* Tries to insert 'doc' into a suitable bucket. If an open bucket is full (or has incompatible
* schema), but is otherwise suitable, we will close it and open a new bucket. If we find no bucket
- * with matching data and a time range that can accomodate 'doc', we will not open a new bucket, but
- * rather let the caller know to search for an archived or closed bucket that can accomodate 'doc'.
+ * with matching data and a time range that can accommodate 'doc', we will not open a new bucket,
+ * but rather let the caller know to search for an archived or closed bucket that can accommodate
+ * 'doc'.
*
* If a suitable bucket is found or opened, returns the WriteBatch into which 'doc' was inserted and
* a list of any buckets that were closed to make space to insert 'doc'. Any caller who receives the
@@ -178,7 +182,7 @@ BSONObj getMetadata(BucketCatalog& catalog, const BucketHandle& bucket);
*
* If no suitable bucket is found or opened, returns an optional bucket ID. If set, the bucket ID
* corresponds to an archived bucket which should be fetched; otherwise the caller should search for
- * a previously-closed bucket that can accomodate 'doc'. The caller should proceed to call 'insert'
+ * a previously-closed bucket that can accommodate 'doc'. The caller should proceed to call 'insert'
* to insert 'doc', passing any fetched bucket.
*/
StatusWith<InsertResult> tryInsert(OperationContext* opCtx,
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp
index 50acf2890ac..cd5ec26c81d 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp
@@ -109,11 +109,11 @@ void abortWriteBatch(WriteBatch& batch, const Status& status) {
}
} // namespace
-StripeNumber getStripeNumber(const BucketKey& key) {
+StripeNumber getStripeNumber(const BucketKey& key, size_t numberOfStripes) {
if (MONGO_unlikely(alwaysUseSameBucketCatalogStripe.shouldFail())) {
return 0;
}
- return key.hash % BucketCatalog::kNumberOfStripes;
+ return key.hash % numberOfStripes;
}
StatusWith<std::pair<BucketKey, Date_t>> extractBucketingParameters(
@@ -631,7 +631,7 @@ StatusWith<InsertResult> insert(OperationContext* opCtx,
// Buckets are spread across independently-lockable stripes to improve parallelism. We map a
// bucket to a stripe by hashing the BucketKey.
- auto stripeNumber = getStripeNumber(key);
+ auto stripeNumber = getStripeNumber(key, catalog.numberOfStripes);
InsertResult result;
result.catalogEra = getCurrentEra(catalog.bucketStateRegistry);
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h
index 4b5c62f72f0..65e713355f4 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h
@@ -72,7 +72,7 @@ enum class IgnoreBucketState { kYes, kNo };
/**
* Maps bucket key to the stripe that is responsible for it.
*/
-StripeNumber getStripeNumber(const BucketKey& key);
+StripeNumber getStripeNumber(const BucketKey& key, size_t numberOfStripes);
/**
* Extracts the information from the input 'doc' that is used to map the document to a bucket.
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp
index a0d5e879773..4e16dd68ba3 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp
@@ -272,7 +272,7 @@ Status BucketCatalogTest::_reopenBucket(const CollectionPtr& coll, const BSONObj
}
auto bucket = std::move(res.getValue());
- auto stripeNumber = internal::getStripeNumber(key);
+ auto stripeNumber = internal::getStripeNumber(key, _bucketCatalog->numberOfStripes);
// Register the reopened bucket with the catalog.
auto& stripe = _bucketCatalog->stripes[stripeNumber];
@@ -395,6 +395,44 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) {
}
}
+TEST_F(BucketCatalogTest, InsertThroughDifferentCatalogsIntoDifferentBuckets) {
+ BucketCatalog temporaryBucketCatalog(/*numberOfStripes=*/1);
+ auto result1 = insert(_opCtx,
+ *_bucketCatalog,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ CombineWithInsertsFromOtherClients::kAllow);
+ auto batch1 = result1.getValue().batch;
+ auto result2 = insert(_opCtx,
+ temporaryBucketCatalog,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ CombineWithInsertsFromOtherClients::kAllow);
+ auto batch2 = result2.getValue().batch;
+
+ // Inserts should be into different buckets (and therefore batches) because they went through
+ // different bucket catalogs.
+ ASSERT_NE(batch1, batch2);
+
+ // Committing one bucket should only return the one document in that bucket and should not
+ // affect the other bucket.
+ ASSERT(claimWriteBatchCommitRights(*batch1));
+ ASSERT_OK(prepareCommit(*_bucketCatalog, batch1));
+ ASSERT_EQ(batch1->measurements.size(), 1);
+ ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0);
+ finish(*_bucketCatalog, batch1, {});
+
+ ASSERT(claimWriteBatchCommitRights(*batch2));
+ ASSERT_OK(prepareCommit(temporaryBucketCatalog, batch2));
+ ASSERT_EQ(batch2->measurements.size(), 1);
+ ASSERT_EQ(batch2->numPreviouslyCommittedMeasurements, 0);
+ finish(temporaryBucketCatalog, batch2, {});
+}
+
TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) {
auto result1 = insert(
_opCtx,
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp
index cb7ea2c97e5..c74f85b92f5 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp
@@ -60,7 +60,7 @@ public:
bool cannotAccessBucket(Bucket& bucket) {
if (hasBeenCleared(bucket)) {
internal::removeBucket(*this,
- stripes[internal::getStripeNumber(bucket.key)],
+ stripes[internal::getStripeNumber(bucket.key, numberOfStripes)],
withLock,
bucket,
internal::RemovalMode::kAbort);
@@ -71,20 +71,22 @@ public:
}
void checkAndRemoveClearedBucket(Bucket& bucket) {
- auto a = internal::findBucket(bucketStateRegistry,
- stripes[internal::getStripeNumber(bucket.key)],
- withLock,
- bucket.bucketId,
- internal::IgnoreBucketState::kYes);
+ auto a =
+ internal::findBucket(bucketStateRegistry,
+ stripes[internal::getStripeNumber(bucket.key, numberOfStripes)],
+ withLock,
+ bucket.bucketId,
+ internal::IgnoreBucketState::kYes);
ASSERT(a == &bucket);
- auto b = internal::findBucket(bucketStateRegistry,
- stripes[internal::getStripeNumber(bucket.key)],
- withLock,
- bucket.bucketId,
- internal::IgnoreBucketState::kNo);
+ auto b =
+ internal::findBucket(bucketStateRegistry,
+ stripes[internal::getStripeNumber(bucket.key, numberOfStripes)],
+ withLock,
+ bucket.bucketId,
+ internal::IgnoreBucketState::kNo);
ASSERT(b == nullptr);
internal::removeBucket(*this,
- stripes[internal::getStripeNumber(bucket.key)],
+ stripes[internal::getStripeNumber(bucket.key, numberOfStripes)],
withLock,
bucket,
internal::RemovalMode::kAbort);
@@ -103,12 +105,24 @@ public:
TimeseriesOptions options;
ExecutionStatsController stats = internal::getOrInitializeExecutionStats(*this, ns1);
ClosedBuckets closedBuckets;
- internal::CreationInfo info1{
- bucketKey1, internal::getStripeNumber(bucketKey1), date, options, stats, &closedBuckets};
- internal::CreationInfo info2{
- bucketKey2, internal::getStripeNumber(bucketKey2), date, options, stats, &closedBuckets};
- internal::CreationInfo info3{
- bucketKey3, internal::getStripeNumber(bucketKey3), date, options, stats, &closedBuckets};
+ internal::CreationInfo info1{bucketKey1,
+ internal::getStripeNumber(bucketKey1, numberOfStripes),
+ date,
+ options,
+ stats,
+ &closedBuckets};
+ internal::CreationInfo info2{bucketKey2,
+ internal::getStripeNumber(bucketKey2, numberOfStripes),
+ date,
+ options,
+ stats,
+ &closedBuckets};
+ internal::CreationInfo info3{bucketKey3,
+ internal::getStripeNumber(bucketKey3, numberOfStripes),
+ date,
+ options,
+ stats,
+ &closedBuckets};
};
TEST_F(BucketStateRegistryTest, BucketStateSetUnsetFlag) {