diff options
author | Faustoleyva54 <fausto.leyva@mongodb.com> | 2023-01-27 15:24:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-27 16:00:39 +0000 |
commit | ffd636b1f009b826725f8c30ed8834e285f6d0e3 (patch) | |
tree | f47f92e33ce9ddfcf8e86b93678b0a372c087d15 /src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp | |
parent | a54ab549bb0160c73dfd56f8f64365789409b504 (diff) | |
download | mongo-ffd636b1f009b826725f8c30ed8834e285f6d0e3.tar.gz |
SERVER-73193 Do not proactively set control.closed flag when closing a time-series bucket
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp | 63 |
1 files changed, 31 insertions, 32 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp index fd01a992aa8..a5e218123be 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp @@ -42,7 +42,6 @@ #include "mongo/db/commands/server_status.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/operation_context.h" -#include "mongo/db/storage/kv/kv_engine.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.h" #include "mongo/db/timeseries/bucket_compression.h" @@ -129,7 +128,7 @@ Status getTimeseriesBucketClearedError(const NamespaceString& ns, const OID& oid } /** - * Caluculate the bucket max size constrained by the cache size and the cardinality of active + * Calculate the bucket max size constrained by the cache size and the cardinality of active * buckets. */ int32_t getCacheDerivedBucketMaxSize(StorageEngine* storageEngine, uint32_t workloadCardinality) { @@ -411,13 +410,10 @@ boost::optional<ClosedBucket> BucketCatalog::finish(std::shared_ptr<WriteBatch> switch (bucket->rolloverAction) { case RolloverAction::kHardClose: case RolloverAction::kSoftClose: { - const bool eligibleForReopening = - bucket->rolloverAction == RolloverAction::kSoftClose; closedBucket = boost::in_place(&_bucketStateManager, bucket->bucketId, bucket->timeField, - bucket->numMeasurements, - eligibleForReopening); + bucket->numMeasurements); _removeBucket(&stripe, stripeLock, bucket, RemovalMode::kClose); break; } @@ -945,12 +941,10 @@ StatusWith<Bucket*> BucketCatalog::_reopenBucket(Stripe* stripe, if (existingBucket->rolloverAction == RolloverAction::kNone) { stats.incNumBucketsClosedDueToReopening(); if (allCommitted(*existingBucket)) { - constexpr bool eligibleForReopening = true; closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, existingBucket->bucketId, existingBucket->timeField, - existingBucket->numMeasurements, - eligibleForReopening}); + existingBucket->numMeasurements}); _removeBucket(stripe, stripeLock, existingBucket, RemovalMode::kClose); } else { existingBucket->rolloverAction = RolloverAction::kSoftClose; @@ -1110,7 +1104,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( invariant(mode == AllowBucketCreation::kNo); constexpr bool allowQueryBasedReopening = true; result.candidate = - _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening); + _getReopeningCandidate(opCtx, &stripe, stripeLock, info, allowQueryBasedReopening); return std::move(result); } @@ -1140,7 +1134,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( bool allowQueryBasedReopening = (*reason == RolloverReason::kTimeBackward); result.candidate = - _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening); + _getReopeningCandidate(opCtx, &stripe, stripeLock, info, allowQueryBasedReopening); } else { result.batch = *stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); } @@ -1324,12 +1318,8 @@ void BucketCatalog::_archiveBucket(Stripe* stripe, // and timestamp as this bucket. Since it's somewhat arbitrary which bucket we keep, we'll // keep the one that's already archived and just plain close this one. mode = RemovalMode::kClose; - constexpr bool eligibleForReopening = true; - closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, - bucket->bucketId, - bucket->timeField, - bucket->numMeasurements, - eligibleForReopening}); + closedBuckets->emplace_back(ClosedBucket{ + &_bucketStateManager, bucket->bucketId, bucket->timeField, bucket->numMeasurements}); } _removeBucket(stripe, stripeLock, bucket, mode); @@ -1390,8 +1380,12 @@ boost::optional<OID> BucketCatalog::_findArchivedCandidate(Stripe* stripe, return boost::none; } -stdx::variant<std::monostate, OID, BSONObj> BucketCatalog::_getReopeningCandidate( - Stripe* stripe, WithLock stripeLock, const CreationInfo& info, bool allowQueryBasedReopening) { +stdx::variant<std::monostate, OID, std::vector<BSONObj>> BucketCatalog::_getReopeningCandidate( + OperationContext* opCtx, + Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info, + bool allowQueryBasedReopening) { if (auto archived = _findArchivedCandidate(stripe, stripeLock, info)) { return archived.value(); } @@ -1406,9 +1400,21 @@ stdx::variant<std::monostate, OID, BSONObj> BucketCatalog::_getReopeningCandidat } auto controlMinTimePath = kControlMinFieldNamePrefix.toString() + info.options.getTimeField(); + auto maxDataTimeFieldPath = kDataFieldNamePrefix.toString() + info.options.getTimeField() + + "." + std::to_string(gTimeseriesBucketMaxCount - 1); - return generateReopeningFilters( - info.time, metaElement, controlMinTimePath, *info.options.getBucketMaxSpanSeconds()); + // Derive the maximum bucket size. + auto bucketMaxSize = getCacheDerivedBucketMaxSize( + opCtx->getServiceContext()->getStorageEngine(), _numberOfActiveBuckets.load()); + int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, bucketMaxSize); + + return generateReopeningPipeline(opCtx, + info.time, + metaElement, + controlMinTimePath, + maxDataTimeFieldPath, + *info.options.getBucketMaxSpanSeconds(), + effectiveMaxSize); } void BucketCatalog::_abort(Stripe* stripe, @@ -1499,7 +1505,6 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe, const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( serverGlobalParams.featureCompatibility); - constexpr bool eligibleForReopening{true}; while (!stripe->idleBuckets.empty() && _memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && @@ -1518,8 +1523,7 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe, closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, bucket->bucketId, bucket->timeField, - bucket->numMeasurements, - eligibleForReopening}); + bucket->numMeasurements}); _removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose); stats.incNumBucketsClosedDueToMemoryThreshold(); } @@ -1535,11 +1539,8 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe, invariant(!archivedSet.empty()); auto& [timestamp, bucket] = *archivedSet.begin(); - closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, - bucket.bucketId, - bucket.timeField, - boost::none, - eligibleForReopening}); + closedBuckets->emplace_back( + ClosedBucket{&_bucketStateManager, bucket.bucketId, bucket.timeField, boost::none}); long long memory = _marginalMemoryUsageForArchivedBucket(bucket, archivedSet.size() == 1); if (archivedSet.size() == 1) { @@ -1702,12 +1703,10 @@ Bucket* BucketCatalog::_rollover(Stripe* stripe, if (action == RolloverAction::kArchive) { _archiveBucket(stripe, stripeLock, bucket, info.closedBuckets); } else { - const bool eligibleForReopening = action == RolloverAction::kSoftClose; info.closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, bucket->bucketId, bucket->timeField, - bucket->numMeasurements, - eligibleForReopening}); + bucket->numMeasurements}); _removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose); } |