summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
diff options
context:
space:
mode:
authorFaustoleyva54 <fausto.leyva@mongodb.com>2023-01-27 15:24:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-27 16:00:39 +0000
commitffd636b1f009b826725f8c30ed8834e285f6d0e3 (patch)
treef47f92e33ce9ddfcf8e86b93678b0a372c087d15 /src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
parenta54ab549bb0160c73dfd56f8f64365789409b504 (diff)
downloadmongo-ffd636b1f009b826725f8c30ed8834e285f6d0e3.tar.gz
SERVER-73193 Do not proactively set control.closed flag when closing a time-series bucket
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp63
1 files changed, 31 insertions, 32 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
index fd01a992aa8..a5e218123be 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.h"
#include "mongo/db/timeseries/bucket_compression.h"
@@ -129,7 +128,7 @@ Status getTimeseriesBucketClearedError(const NamespaceString& ns, const OID& oid
}
/**
- * Caluculate the bucket max size constrained by the cache size and the cardinality of active
+ * Calculate the bucket max size constrained by the cache size and the cardinality of active
* buckets.
*/
int32_t getCacheDerivedBucketMaxSize(StorageEngine* storageEngine, uint32_t workloadCardinality) {
@@ -411,13 +410,10 @@ boost::optional<ClosedBucket> BucketCatalog::finish(std::shared_ptr<WriteBatch>
switch (bucket->rolloverAction) {
case RolloverAction::kHardClose:
case RolloverAction::kSoftClose: {
- const bool eligibleForReopening =
- bucket->rolloverAction == RolloverAction::kSoftClose;
closedBucket = boost::in_place(&_bucketStateManager,
bucket->bucketId,
bucket->timeField,
- bucket->numMeasurements,
- eligibleForReopening);
+ bucket->numMeasurements);
_removeBucket(&stripe, stripeLock, bucket, RemovalMode::kClose);
break;
}
@@ -945,12 +941,10 @@ StatusWith<Bucket*> BucketCatalog::_reopenBucket(Stripe* stripe,
if (existingBucket->rolloverAction == RolloverAction::kNone) {
stats.incNumBucketsClosedDueToReopening();
if (allCommitted(*existingBucket)) {
- constexpr bool eligibleForReopening = true;
closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
existingBucket->bucketId,
existingBucket->timeField,
- existingBucket->numMeasurements,
- eligibleForReopening});
+ existingBucket->numMeasurements});
_removeBucket(stripe, stripeLock, existingBucket, RemovalMode::kClose);
} else {
existingBucket->rolloverAction = RolloverAction::kSoftClose;
@@ -1110,7 +1104,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
invariant(mode == AllowBucketCreation::kNo);
constexpr bool allowQueryBasedReopening = true;
result.candidate =
- _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening);
+ _getReopeningCandidate(opCtx, &stripe, stripeLock, info, allowQueryBasedReopening);
return std::move(result);
}
@@ -1140,7 +1134,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
bool allowQueryBasedReopening = (*reason == RolloverReason::kTimeBackward);
result.candidate =
- _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening);
+ _getReopeningCandidate(opCtx, &stripe, stripeLock, info, allowQueryBasedReopening);
} else {
result.batch = *stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult);
}
@@ -1324,12 +1318,8 @@ void BucketCatalog::_archiveBucket(Stripe* stripe,
// and timestamp as this bucket. Since it's somewhat arbitrary which bucket we keep, we'll
// keep the one that's already archived and just plain close this one.
mode = RemovalMode::kClose;
- constexpr bool eligibleForReopening = true;
- closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
- bucket->bucketId,
- bucket->timeField,
- bucket->numMeasurements,
- eligibleForReopening});
+ closedBuckets->emplace_back(ClosedBucket{
+ &_bucketStateManager, bucket->bucketId, bucket->timeField, bucket->numMeasurements});
}
_removeBucket(stripe, stripeLock, bucket, mode);
@@ -1390,8 +1380,12 @@ boost::optional<OID> BucketCatalog::_findArchivedCandidate(Stripe* stripe,
return boost::none;
}
-stdx::variant<std::monostate, OID, BSONObj> BucketCatalog::_getReopeningCandidate(
- Stripe* stripe, WithLock stripeLock, const CreationInfo& info, bool allowQueryBasedReopening) {
+stdx::variant<std::monostate, OID, std::vector<BSONObj>> BucketCatalog::_getReopeningCandidate(
+ OperationContext* opCtx,
+ Stripe* stripe,
+ WithLock stripeLock,
+ const CreationInfo& info,
+ bool allowQueryBasedReopening) {
if (auto archived = _findArchivedCandidate(stripe, stripeLock, info)) {
return archived.value();
}
@@ -1406,9 +1400,21 @@ stdx::variant<std::monostate, OID, BSONObj> BucketCatalog::_getReopeningCandidat
}
auto controlMinTimePath = kControlMinFieldNamePrefix.toString() + info.options.getTimeField();
+ auto maxDataTimeFieldPath = kDataFieldNamePrefix.toString() + info.options.getTimeField() +
+ "." + std::to_string(gTimeseriesBucketMaxCount - 1);
- return generateReopeningFilters(
- info.time, metaElement, controlMinTimePath, *info.options.getBucketMaxSpanSeconds());
+ // Derive the maximum bucket size.
+ auto bucketMaxSize = getCacheDerivedBucketMaxSize(
+ opCtx->getServiceContext()->getStorageEngine(), _numberOfActiveBuckets.load());
+ int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, bucketMaxSize);
+
+ return generateReopeningPipeline(opCtx,
+ info.time,
+ metaElement,
+ controlMinTimePath,
+ maxDataTimeFieldPath,
+ *info.options.getBucketMaxSpanSeconds(),
+ effectiveMaxSize);
}
void BucketCatalog::_abort(Stripe* stripe,
@@ -1499,7 +1505,6 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
serverGlobalParams.featureCompatibility);
- constexpr bool eligibleForReopening{true};
while (!stripe->idleBuckets.empty() &&
_memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() &&
@@ -1518,8 +1523,7 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
bucket->bucketId,
bucket->timeField,
- bucket->numMeasurements,
- eligibleForReopening});
+ bucket->numMeasurements});
_removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose);
stats.incNumBucketsClosedDueToMemoryThreshold();
}
@@ -1535,11 +1539,8 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
invariant(!archivedSet.empty());
auto& [timestamp, bucket] = *archivedSet.begin();
- closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
- bucket.bucketId,
- bucket.timeField,
- boost::none,
- eligibleForReopening});
+ closedBuckets->emplace_back(
+ ClosedBucket{&_bucketStateManager, bucket.bucketId, bucket.timeField, boost::none});
long long memory = _marginalMemoryUsageForArchivedBucket(bucket, archivedSet.size() == 1);
if (archivedSet.size() == 1) {
@@ -1702,12 +1703,10 @@ Bucket* BucketCatalog::_rollover(Stripe* stripe,
if (action == RolloverAction::kArchive) {
_archiveBucket(stripe, stripeLock, bucket, info.closedBuckets);
} else {
- const bool eligibleForReopening = action == RolloverAction::kSoftClose;
info.closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
bucket->bucketId,
bucket->timeField,
- bucket->numMeasurements,
- eligibleForReopening});
+ bucket->numMeasurements});
_removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose);
}