summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2022-06-14 20:28:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-14 21:13:04 +0000
commitaac074d9844f61f9a9a44d014987f2da93915a59 (patch)
tree75e143983312bb60f5fa2fac2895c767b57703ca /src/mongo
parentf678ddaee2ed81c0bed9f72116135d63a3754e85 (diff)
downloadmongo-aac074d9844f61f9a9a44d014987f2da93915a59.tar.gz
SERVER-66683 Archive eligible buckets instead of closing them
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp276
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h60
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp201
3 files changed, 424 insertions, 113 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 0b365571616..f043c687e28 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -261,6 +261,24 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToMemoryThre
_globalStats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(increment);
}
+void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToTimeForward(
+ long long increment) {
+ _collectionStats->numBucketsArchivedDueToTimeForward.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsArchivedDueToTimeForward.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToTimeBackward(
+ long long increment) {
+ _collectionStats->numBucketsArchivedDueToTimeBackward.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsArchivedDueToTimeBackward.fetchAndAddRelaxed(increment);
+}
+
+void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToMemoryThreshold(
+ long long increment) {
+ _collectionStats->numBucketsArchivedDueToMemoryThreshold.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsArchivedDueToMemoryThreshold.fetchAndAddRelaxed(increment);
+}
+
void BucketCatalog::ExecutionStatsController::incNumCommits(long long increment) {
_collectionStats->numCommits.fetchAndAddRelaxed(increment);
_globalStats->numCommits.fetchAndAddRelaxed(increment);
@@ -285,7 +303,8 @@ class BucketCatalog::Bucket {
public:
friend class BucketCatalog;
- Bucket(const OID& id, StripeNumber stripe) : _id(id), _stripe(stripe) {}
+ Bucket(const OID& id, StripeNumber stripe, BucketKey::Hash hash)
+ : _id(id), _stripe(stripe), _keyHash(hash) {}
/**
* Returns the ID for the underlying bucket.
@@ -301,6 +320,13 @@ public:
return _stripe;
}
+ /**
+ * Returns the pre-computed hash of the corresponding BucketKey
+ */
+ BucketKey::Hash keyHash() const {
+ return _keyHash;
+ }
+
// Returns the time associated with the bucket (id)
Date_t getTime() const {
return _minTime;
@@ -421,6 +447,9 @@ private:
// The stripe which owns this bucket.
const StripeNumber _stripe;
+ // The pre-computed hash of the associated BucketKey
+ const BucketKey::Hash _keyHash;
+
// The namespace that this bucket is used for.
NamespaceString _ns;
@@ -457,9 +486,10 @@ private:
// The number of committed measurements in the bucket.
uint32_t _numCommittedMeasurements = 0;
- // Whether the bucket is full. This can be due to number of measurements, size, or time
+ // Whether the bucket has been marked for a rollover action. It can be marked for closure due to
+ // number of measurements, size, or schema changes, or it can be marked for archival due to time
// range.
- bool _full = false;
+ RolloverAction _rolloverAction = RolloverAction::kNone;
// The batch that has been prepared and is currently in the process of being committed, if
// any.
@@ -649,7 +679,7 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx,
auto stripeNumber = _getStripeNumber(key);
auto bucketId = bucketIdElem.OID();
- std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber);
+ std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber, key.hash);
// Initialize the remaining member variables from the bucket document.
bucket->_ns = ns;
@@ -694,18 +724,6 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx,
bucket->_numMeasurements = numMeasurements;
bucket->_numCommittedMeasurements = numMeasurements;
- auto isBucketFull = [](Bucket* bucket) -> bool {
- if (bucket->_numMeasurements >= static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
- return true;
- }
- if (bucket->_size >= static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
- return true;
- }
- return false;
- };
-
- bucket->_full = isBucketFull(bucket.get());
-
ExecutionStatsController stats = _getExecutionStats(ns);
stats.incNumBucketsReopened();
@@ -780,37 +798,53 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
bucket->_calculateBucketFieldsAndSizeChange(
doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded);
- auto shouldCloseBucket = [&](Bucket* bucket) -> bool {
+ auto determineRolloverAction = [&](Bucket* bucket) -> RolloverAction {
+ const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility);
+
if (bucket->schemaIncompatible(doc, metaFieldName, comparator)) {
stats.incNumBucketsClosedDueToSchemaChange();
- return true;
+ return RolloverAction::kClose;
}
if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
stats.incNumBucketsClosedDueToCount();
- return true;
+ return RolloverAction::kClose;
}
if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
stats.incNumBucketsClosedDueToSize();
- return true;
+ return RolloverAction::kClose;
}
auto bucketTime = bucket->getTime();
if (time - bucketTime >= Seconds(*options.getBucketMaxSpanSeconds())) {
- stats.incNumBucketsClosedDueToTimeForward();
- return true;
+ if (canArchive) {
+ stats.incNumBucketsArchivedDueToTimeForward();
+ return RolloverAction::kArchive;
+ } else {
+ stats.incNumBucketsClosedDueToTimeForward();
+ return RolloverAction::kClose;
+ }
}
if (time < bucketTime) {
- stats.incNumBucketsClosedDueToTimeBackward();
- return true;
+ if (canArchive) {
+ stats.incNumBucketsArchivedDueToTimeBackward();
+ return RolloverAction::kArchive;
+ } else {
+ stats.incNumBucketsClosedDueToTimeBackward();
+ return RolloverAction::kClose;
+ }
}
- return false;
+ return RolloverAction::kNone;
};
- if (!bucket->_ns.isEmpty() && shouldCloseBucket(bucket)) {
- info.openedDuetoMetadata = false;
- bucket = _rollover(&stripe, stripeLock, bucket, info);
+ if (!bucket->_ns.isEmpty()) {
+ auto action = determineRolloverAction(bucket);
+ if (action != RolloverAction::kNone) {
+ info.openedDuetoMetadata = false;
+ bucket = _rollover(&stripe, stripeLock, bucket, info, action);
- bucket->_calculateBucketFieldsAndSizeChange(
- doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded);
+ bucket->_calculateBucketFieldsAndSizeChange(
+ doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded);
+ }
}
auto batch = bucket->_activeBatch(getOpId(opCtx, combine), stats);
@@ -917,29 +951,21 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
}
} else if (bucket->allCommitted()) {
- if (bucket->_full) {
- // Everything in the bucket has been committed, and nothing more will be added since the
- // bucket is full. Thus, we can remove it.
- _memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
-
- auto it = stripe.allBuckets.find(batch->bucket().id);
- if (it != stripe.allBuckets.end()) {
- bucket = it->second.get();
-
- closedBucket = ClosedBucket{batch->bucket().id,
- bucket->getTimeField().toString(),
- bucket->numMeasurements()};
-
- // Only remove from allBuckets and idleBuckets. If it was marked full, we know
- // that happened in Stripe::rollover, and that there is already a new open
- // bucket for this metadata.
- _markBucketNotIdle(&stripe, stripeLock, bucket);
- _eraseBucketState(batch->bucket().id);
-
- stripe.allBuckets.erase(batch->bucket().id);
+ switch (bucket->_rolloverAction) {
+ case RolloverAction::kClose: {
+ closedBucket = ClosedBucket{
+ bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()};
+ _removeBucket(&stripe, stripeLock, bucket, false);
+ break;
+ }
+ case RolloverAction::kArchive: {
+ _archiveBucket(&stripe, stripeLock, bucket);
+ break;
+ }
+ case RolloverAction::kNone: {
+ _markBucketIdle(&stripe, stripeLock, bucket);
+ break;
}
- } else {
- _markBucketIdle(&stripe, stripeLock, bucket);
}
}
return closedBucket;
@@ -1015,6 +1041,7 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats,
stats->numBucketsClosedDueToTimeBackward.load());
builder->appendNumber("numBucketsClosedDueToMemoryThreshold",
stats->numBucketsClosedDueToMemoryThreshold.load());
+
auto commits = stats->numCommits.load();
builder->appendNumber("numCommits", commits);
builder->appendNumber("numWaits", stats->numWaits.load());
@@ -1026,11 +1053,16 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats,
if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
serverGlobalParams.featureCompatibility)) {
+ builder->appendNumber("numBucketsArchivedDueToTimeForward",
+ stats->numBucketsArchivedDueToTimeForward.load());
+ builder->appendNumber("numBucketsArchivedDueToTimeBackward",
+ stats->numBucketsArchivedDueToTimeBackward.load());
+ builder->appendNumber("numBucketsArchivedDueToMemoryThreshold",
+ stats->numBucketsArchivedDueToMemoryThreshold.load());
builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load());
}
}
-
void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const {
const std::shared_ptr<ExecutionStats> stats = _getExecutionStats(ns);
_appendExecutionStatsToBuilder(stats.get(), builder);
@@ -1078,6 +1110,10 @@ std::size_t BucketCatalog::BucketHasher::operator()(const BucketKey& key) const
return key.hash;
}
+std::size_t BucketCatalog::PreHashed::operator()(const BucketKey::Hash& key) const {
+ return key;
+}
+
BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) {
return key.hash % kNumberOfStripes;
}
@@ -1173,23 +1209,51 @@ void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<Wri
}
}
-bool BucketCatalog::_removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket) {
- auto it = stripe->allBuckets.find(bucket->id());
- if (it == stripe->allBuckets.end()) {
- return false;
- }
-
+void BucketCatalog::_removeBucket(Stripe* stripe,
+ WithLock stripeLock,
+ Bucket* bucket,
+ bool archiving) {
invariant(bucket->_batches.empty());
invariant(!bucket->_preparedBatch);
+ auto allIt = stripe->allBuckets.find(bucket->id());
+ invariant(allIt != stripe->allBuckets.end());
+
_memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
_markBucketNotIdle(stripe, stripeLock, bucket);
- stripe->openBuckets.erase({bucket->_ns, bucket->_metadata});
- _eraseBucketState(bucket->id());
- stripe->allBuckets.erase(it);
+ // If the bucket was rolled over, then there may be a different open bucket for this metadata.
+ auto openIt = stripe->openBuckets.find({bucket->_ns, bucket->_metadata});
+ if (openIt != stripe->openBuckets.end() && openIt->second == bucket) {
+ stripe->openBuckets.erase(openIt);
+ }
+
+ // If we are cleaning up while archiving a bucket, then we want to preserve its state. Otherwise
+ // we can remove the state from the catalog altogether.
+ if (!archiving) {
+ _eraseBucketState(bucket->id());
+ }
- return true;
+ stripe->allBuckets.erase(allIt);
+}
+
+void BucketCatalog::_archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket) {
+ bool archived = false;
+ auto& archivedSet = stripe->archivedBuckets[bucket->keyHash()];
+ auto it = archivedSet.find(bucket->getTime());
+ if (it == archivedSet.end()) {
+ archivedSet.emplace(bucket->getTime(),
+ ArchivedBucket{bucket->id(),
+ bucket->getTimeField().toString(),
+ bucket->numMeasurements()});
+
+ long long memory = _marginalMemoryUsageForArchivedBucket(archivedSet[bucket->getTime()],
+ archivedSet.size() == 1);
+ _memoryUsage.fetchAndAdd(memory);
+
+ archived = true;
+ }
+ _removeBucket(stripe, stripeLock, bucket, archived);
}
void BucketCatalog::_abort(Stripe* stripe,
@@ -1235,7 +1299,7 @@ void BucketCatalog::_abort(Stripe* stripe,
}
if (doRemove) {
- [[maybe_unused]] bool removed = _removeBucket(stripe, stripeLock, bucket);
+ _removeBucket(stripe, stripeLock, bucket, false);
}
}
@@ -1258,19 +1322,54 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
ExecutionStatsController& stats,
std::vector<BucketCatalog::ClosedBucket>* closedBuckets) {
// As long as we still need space and have entries and remaining attempts, close idle buckets.
- int32_t numClosed = 0;
+ int32_t numExpired = 0;
+
+ const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility);
+
while (!stripe->idleBuckets.empty() &&
_memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() &&
- numClosed <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) {
+ numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) {
Bucket* bucket = stripe->idleBuckets.back();
- ClosedBucket closed{
- bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()};
- if (_removeBucket(stripe, stripeLock, bucket)) {
+ if (canArchive) {
+ _archiveBucket(stripe, stripeLock, bucket);
+ stats.incNumBucketsArchivedDueToMemoryThreshold();
+ } else {
+ ClosedBucket closed{
+ bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()};
+ _removeBucket(stripe, stripeLock, bucket, false);
stats.incNumBucketsClosedDueToMemoryThreshold();
closedBuckets->push_back(closed);
- ++numClosed;
}
+
+ ++numExpired;
+ }
+
+ while (canArchive && !stripe->archivedBuckets.empty() &&
+ _memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() &&
+ numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) {
+
+ auto& [hash, archivedSet] = *stripe->archivedBuckets.begin();
+ invariant(!archivedSet.empty());
+
+ auto& [timestamp, bucket] = *archivedSet.begin();
+ ClosedBucket closed{bucket.bucketId, bucket.timeField, bucket.numMeasurements, true};
+
+ long long memory = _marginalMemoryUsageForArchivedBucket(bucket, archivedSet.size() == 1);
+ _eraseBucketState(bucket.bucketId);
+ if (archivedSet.size() == 1) {
+ // If this is the only entry, erase the whole map so we don't leave it empty.
+ stripe->archivedBuckets.erase(stripe->archivedBuckets.begin());
+ } else {
+ // Otherwise just erase this bucket from the map.
+ archivedSet.erase(archivedSet.begin());
+ }
+ _memoryUsage.fetchAndSubtract(memory);
+
+ stats.incNumBucketsClosedDueToMemoryThreshold();
+ closedBuckets->push_back(closed);
+ ++numExpired;
}
}
@@ -1281,8 +1380,8 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
auto [bucketId, roundedTime] = generateBucketId(info.time, info.options);
- auto [it, inserted] =
- stripe->allBuckets.try_emplace(bucketId, std::make_unique<Bucket>(bucketId, info.stripe));
+ auto [it, inserted] = stripe->allBuckets.try_emplace(
+ bucketId, std::make_unique<Bucket>(bucketId, info.stripe, info.key.hash));
tassert(6130900, "Expected bucket to be inserted", inserted);
Bucket* bucket = it->second.get();
stripe->openBuckets[info.key] = bucket;
@@ -1306,20 +1405,25 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe,
WithLock stripeLock,
Bucket* bucket,
- const CreationInfo& info) {
-
+ const CreationInfo& info,
+ RolloverAction action) {
+ invariant(action != RolloverAction::kNone);
if (bucket->allCommitted()) {
- // The bucket does not contain any measurements that are yet to be committed, so we can
- // remove it now.
- info.closedBuckets->push_back(ClosedBucket{
- bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()});
+ // The bucket does not contain any measurements that are yet to be committed, so we can take
+ // action now.
+ if (action == RolloverAction::kClose) {
+ info.closedBuckets->push_back(ClosedBucket{
+ bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()});
- bool removed = _removeBucket(stripe, stripeLock, bucket);
- invariant(removed);
+ _removeBucket(stripe, stripeLock, bucket, false);
+ } else {
+ invariant(action == RolloverAction::kArchive);
+ _archiveBucket(stripe, stripeLock, bucket);
+ }
} else {
- // We must keep the bucket around until it is committed, just mark it full so it we know to
- // clean it up when the last batch finishes.
- bucket->_full = true;
+ // We must keep the bucket around until all measurements are committed committed, just mark
+ // the action we chose now so it we know what to do when the last batch finishes.
+ bucket->_rolloverAction = action;
}
return _allocateBucket(stripe, stripeLock, info);
@@ -1406,6 +1510,12 @@ boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const
return state;
}
+long long BucketCatalog::_marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket,
+ bool onlyEntryForMatchingMetaHash) {
+ return sizeof(std::size_t) + sizeof(Date_t) + sizeof(ArchivedBucket) + bucket.timeField.size() +
+ (onlyEntryForMatchingMetaHash ? sizeof(decltype(Stripe::archivedBuckets)::value_type) : 0);
+}
+
class BucketCatalog::ServerStatus : public ServerStatusSection {
struct BucketCounts {
BucketCounts& operator+=(const BucketCounts& other) {
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index c133fea3864..fcd24f8a40f 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -67,6 +67,9 @@ class BucketCatalog {
AtomicWord<long long> numBucketsClosedDueToTimeForward;
AtomicWord<long long> numBucketsClosedDueToTimeBackward;
AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
+ AtomicWord<long long> numBucketsArchivedDueToTimeForward;
+ AtomicWord<long long> numBucketsArchivedDueToTimeBackward;
+ AtomicWord<long long> numBucketsArchivedDueToMemoryThreshold;
AtomicWord<long long> numCommits;
AtomicWord<long long> numWaits;
AtomicWord<long long> numMeasurementsCommitted;
@@ -88,6 +91,9 @@ class BucketCatalog {
void incNumBucketsClosedDueToTimeForward(long long increment = 1);
void incNumBucketsClosedDueToTimeBackward(long long increment = 1);
void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1);
+ void incNumBucketsArchivedDueToTimeForward(long long increment = 1);
+ void incNumBucketsArchivedDueToTimeBackward(long long increment = 1);
+ void incNumBucketsArchivedDueToMemoryThreshold(long long increment = 1);
void incNumCommits(long long increment = 1);
void incNumWaits(long long increment = 1);
void incNumMeasurementsCommitted(long long increment = 1);
@@ -119,6 +125,7 @@ public:
OID bucketId;
std::string timeField;
uint32_t numMeasurements;
+ bool eligibleForReopening = false;
};
using ClosedBuckets = std::vector<ClosedBucket>;
@@ -363,12 +370,14 @@ private:
* Key to lookup open Bucket for namespace and metadata, with pre-computed hash.
*/
struct BucketKey {
+ using Hash = std::size_t;
+
BucketKey() = delete;
BucketKey(const NamespaceString& nss, const BucketMetadata& meta);
NamespaceString ns;
BucketMetadata metadata;
- std::size_t hash;
+ Hash hash;
bool operator==(const BucketKey& other) const {
return ns == other.ns && metadata == other.metadata;
@@ -388,6 +397,23 @@ private:
};
/**
+ * Hasher to support using a pre-computed hash as a key without having to compute another hash.
+ */
+ struct PreHashed {
+ std::size_t operator()(const BucketKey::Hash& key) const;
+ };
+
+ /**
+ * Information of a Bucket that got archived while performing an operation on this
+ * BucketCatalog.
+ */
+ struct ArchivedBucket {
+ OID bucketId;
+ std::string timeField;
+ uint32_t numMeasurements;
+ };
+
+ /**
* Struct to hold a portion of the buckets managed by the catalog.
*
* Each of the bucket lists, as well as the buckets themselves, are protected by 'mutex'.
@@ -406,6 +432,12 @@ private:
// Buckets that do not have any outstanding writes.
using IdleList = std::list<Bucket*>;
IdleList idleBuckets;
+
+ // Buckets that are not currently in the catalog, but which are eligible to receive more
+ // measurements. The top-level map is keyed by the hash of the BucketKey, while the stored
+ // map is keyed by the bucket's minimum timestamp.
+ stdx::unordered_map<BucketKey::Hash, std::map<Date_t, ArchivedBucket>, PreHashed>
+ archivedBuckets;
};
StripeNumber _getStripeNumber(const BucketKey& key);
@@ -453,7 +485,13 @@ private:
/**
* Removes the given bucket from the bucket catalog's internal data structures.
*/
- bool _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket);
+ void _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket, bool archiving);
+
+ /**
+ * Archives the given bucket, minimizing the memory footprint but retaining the necessary
+ * information required to efficiently identify it as a candidate for future insertions.
+ */
+ void _archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket);
/**
* Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other
@@ -501,6 +539,11 @@ private:
Bucket* _allocateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info);
/**
+ * Mode enum to determine the rollover type decision for a given bucket.
+ */
+ enum class RolloverAction { kNone, kArchive, kClose };
+
+ /**
* Close the existing, full bucket and open a new one for the same metadata.
*
* Writes information about the closed bucket to the 'info' parameter.
@@ -508,7 +551,8 @@ private:
Bucket* _rollover(Stripe* stripe,
WithLock stripeLock,
Bucket* bucket,
- const CreationInfo& info);
+ const CreationInfo& info,
+ RolloverAction action);
ExecutionStatsController _getExecutionStats(const NamespaceString& ns);
std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const;
@@ -540,6 +584,16 @@ private:
*/
boost::optional<BucketState> _setBucketState(const OID& id, BucketState target);
+ /**
+ * Calculates the marginal memory usage for an archived bucket. The
+ * 'onlyEntryForMatchingMetaHash' parameter indicates that the bucket will be (if inserting)
+ * or was (if removing) the only bucket associated with it's meta hash value. If true, then
+ * the returned value will attempt to account for the overhead of the map data structure for
+ * the meta hash value.
+ */
+ static long long _marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket,
+ bool onlyEntryForMatchingMetaHash);
+
static constexpr std::size_t kNumberOfStripes = 32;
std::array<Stripe, kNumberOfStripes> _stripes;
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 024c7685b26..daa391d3b58 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -43,6 +43,15 @@
namespace mongo {
namespace {
+constexpr StringData kNumSchemaChanges = "numBucketsClosedDueToSchemaChange"_sd;
+constexpr StringData kNumBucketsReopened = "numBucketsReopened"_sd;
+constexpr StringData kNumArchivedDueToTimeForward = "numBucketsArchivedDueToTimeForward"_sd;
+constexpr StringData kNumArchivedDueToTimeBackward = "numBucketsArchivedDueToTimeBackward"_sd;
+constexpr StringData kNumArchivedDueToMemoryThreshold = "numBucketsArchivedDueToMemoryThreshold"_sd;
+constexpr StringData kNumClosedDueToTimeForward = "numBucketsClosedDueToTimeForward"_sd;
+constexpr StringData kNumClosedDueToTimeBackward = "numBucketsClosedDueToTimeBackward"_sd;
+constexpr StringData kNumClosedDueToMemoryThreshold = "numBucketsClosedDueToMemoryThreshold"_sd;
+
class BucketCatalogTest : public CatalogTestFixture {
protected:
class Task {
@@ -74,9 +83,7 @@ protected:
void _insertOneAndCommit(const NamespaceString& ns,
uint16_t numPreviouslyCommittedMeasurements);
- long long _getNumWaits(const NamespaceString& ns);
- long long _getNumSchemaChanges(const NamespaceString& ns);
- long long _getNumBucketsReopened(const NamespaceString& ns);
+ long long _getExecutionStat(const NamespaceString& ns, StringData stat);
// Check that each group of objects has compatible schema with itself, but that inserting the
// first object in new group closes the existing bucket and opens a new one
@@ -180,22 +187,10 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns,
_commit(batch, numPreviouslyCommittedMeasurements);
}
-long long BucketCatalogTest::_getNumWaits(const NamespaceString& ns) {
- BSONObjBuilder builder;
- _bucketCatalog->appendExecutionStats(ns, &builder);
- return builder.obj().getIntField("numWaits");
-}
-
-long long BucketCatalogTest::_getNumSchemaChanges(const NamespaceString& ns) {
+long long BucketCatalogTest::_getExecutionStat(const NamespaceString& ns, StringData stat) {
BSONObjBuilder builder;
_bucketCatalog->appendExecutionStats(ns, &builder);
- return builder.obj().getIntField("numBucketsClosedDueToSchemaChange");
-}
-
-long long BucketCatalogTest::_getNumBucketsReopened(const NamespaceString& ns) {
- BSONObjBuilder builder;
- _bucketCatalog->appendExecutionStats(ns, &builder);
- return builder.obj().getIntField("numBucketsReopened");
+ return builder.obj().getIntField(stat);
}
void BucketCatalogTest::_testMeasurementSchema(
@@ -212,7 +207,7 @@ void BucketCatalogTest::_testMeasurementSchema(
timestampedDoc.append(_timeField, Date_t::now());
timestampedDoc.appendElements(doc);
- auto pre = _getNumSchemaChanges(_ns1);
+ auto pre = _getExecutionStat(_ns1, kNumSchemaChanges);
auto result = _bucketCatalog
->insert(_opCtx,
_ns1,
@@ -221,7 +216,7 @@ void BucketCatalogTest::_testMeasurementSchema(
timestampedDoc.obj(),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue();
- auto post = _getNumSchemaChanges(_ns1);
+ auto post = _getExecutionStat(_ns1, kNumSchemaChanges);
if (firstMember) {
if (firstGroup) {
@@ -1095,7 +1090,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement
AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX);
Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), bucketDoc);
ASSERT_OK(status);
- ASSERT_EQ(1, _getNumBucketsReopened(_ns1));
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened));
// Insert a measurement that is compatible with the reopened bucket.
auto result =
@@ -1109,7 +1104,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement
// No buckets are closed.
ASSERT(result.getValue().closedBuckets.empty());
- ASSERT_EQ(0, _getNumSchemaChanges(_ns1));
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges));
auto batch = result.getValue().batch;
ASSERT(batch->claimCommitRights());
@@ -1145,7 +1140,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme
AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX);
Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), bucketDoc);
ASSERT_OK(status);
- ASSERT_EQ(1, _getNumBucketsReopened(_ns1));
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened));
// Insert a measurement that is incompatible with the reopened bucket.
auto result =
@@ -1159,7 +1154,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme
// The reopened bucket gets closed as the schema is incompatible.
ASSERT_EQ(1, result.getValue().closedBuckets.size());
- ASSERT_EQ(1, _getNumSchemaChanges(_ns1));
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumSchemaChanges));
auto batch = result.getValue().batch;
ASSERT(batch->claimCommitRights());
@@ -1194,7 +1189,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement)
Status status =
_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), compressedBucketDoc);
ASSERT_OK(status);
- ASSERT_EQ(1, _getNumBucketsReopened(_ns1));
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened));
// Insert a measurement that is compatible with the reopened bucket.
auto result =
@@ -1208,7 +1203,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement)
// No buckets are closed.
ASSERT(result.getValue().closedBuckets.empty());
- ASSERT_EQ(0, _getNumSchemaChanges(_ns1));
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges));
auto batch = result.getValue().batch;
ASSERT(batch->claimCommitRights());
@@ -1249,7 +1244,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement
Status status =
_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), compressedBucketDoc);
ASSERT_OK(status);
- ASSERT_EQ(1, _getNumBucketsReopened(_ns1));
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened));
// Insert a measurement that is incompatible with the reopened bucket.
auto result =
@@ -1263,7 +1258,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement
// The reopened bucket gets closed as the schema is incompatible.
ASSERT_EQ(1, result.getValue().closedBuckets.size());
- ASSERT_EQ(1, _getNumSchemaChanges(_ns1));
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumSchemaChanges));
auto batch = result.getValue().batch;
ASSERT(batch->claimCommitRights());
@@ -1275,5 +1270,157 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement
_bucketCatalog->finish(batch, {});
}
+
+TEST_F(BucketCatalogTest, ArchiveIfTimeForward) {
+ RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+ auto baseTimestamp = Date_t::now();
+
+ // Insert an initial document to make sure we have an open bucket.
+ auto result1 =
+ _bucketCatalog->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << baseTimestamp),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result1.getStatus());
+ auto batch1 = result1.getValue().batch;
+ ASSERT(batch1->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
+ _bucketCatalog->finish(batch1, {});
+
+ // Make sure we start out with nothing closed or archived.
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumArchivedDueToTimeForward));
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToTimeForward));
+
+ // Now insert another that's too far forward to fit in the same bucket
+ auto result2 =
+ _bucketCatalog->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << (baseTimestamp + Seconds{7200})),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result2.getStatus());
+ auto batch2 = result2.getValue().batch;
+ ASSERT(batch2->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch2));
+ _bucketCatalog->finish(batch2, {});
+
+ // Make sure it was archived, not closed.
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToTimeForward));
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToTimeForward));
+}
+
+TEST_F(BucketCatalogTest, ArchiveIfTimeBackward) {
+ RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+ auto baseTimestamp = Date_t::now();
+
+ // Insert an initial document to make sure we have an open bucket.
+ auto result1 =
+ _bucketCatalog->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << baseTimestamp),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result1.getStatus());
+ auto batch1 = result1.getValue().batch;
+ ASSERT(batch1->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
+ _bucketCatalog->finish(batch1, {});
+
+ // Make sure we start out with nothing closed or archived.
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumArchivedDueToTimeBackward));
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToTimeBackward));
+
+ // Now insert another that's too far Backward to fit in the same bucket
+ auto result2 =
+ _bucketCatalog->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << (baseTimestamp - Seconds{7200})),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result2.getStatus());
+ auto batch2 = result2.getValue().batch;
+ ASSERT(batch2->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch2));
+ _bucketCatalog->finish(batch2, {});
+
+ // Make sure it was archived, not closed.
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToTimeBackward));
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToTimeBackward));
+}
+
+TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) {
+ RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+ RAIIServerParameterControllerForTest memoryLimit{
+ "timeseriesIdleBucketExpiryMemoryUsageThreshold", 10000};
+
+ // Insert a measurement with a unique meta value, guaranteeing we will open a new bucket but not
+ // close an old one except under memory pressure.
+ long long meta = 0;
+ auto insertDocument = [&meta, this]() -> BucketCatalog::ClosedBuckets {
+ auto result =
+ _bucketCatalog->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now() << _metaField << meta++),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ auto batch = result.getValue().batch;
+ ASSERT(batch->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
+ _bucketCatalog->finish(batch, {});
+
+ return result.getValue().closedBuckets;
+ };
+
+ // Ensure we start out with no buckets archived or closed due to memory pressure.
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold));
+ ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold));
+
+ // With a memory limit of 10000 bytes, we should be guaranteed to hit the memory limit with no
+ // more than 1000 buckets since an open bucket takes up at least 10 bytes (in reality,
+ // significantly more, but this is definitely a safe assumption).
+ for (int i = 0; i < 1000; ++i) {
+ [[maybe_unused]] auto closedBuckets = insertDocument();
+
+ if (0 < _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)) {
+ break;
+ }
+ }
+
+ // When we first hit the limit, we should try to archive some buckets prior to closing anything.
+ // However, depending on how the buckets are distributed over the stripes, it's possible that
+ // the current stripe will not have enough open buckets to archive to drop below the limit, and
+ // may immediately close a bucket it has just archived. We should be able to guarantee that we
+ // have archived a bucket prior to closing it though.
+ ASSERT_LT(0, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold));
+ auto numClosedInFirstRound = _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold);
+ ASSERT_LTE(numClosedInFirstRound, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold));
+
+ // If we continue to open more new buckets with distinct meta values, eventually we'll run out
+ // of open buckets to archive and have to start closing archived buckets to relieve memory
+ // pressure. Again, an archived bucket should take up more than 10 bytes in the catalog, so we
+ // should be fine with a maximum of 1000 iterations.
+ for (int i = 0; i < 1000; ++i) {
+ auto closedBuckets = insertDocument();
+
+ if (numClosedInFirstRound < _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)) {
+ ASSERT_FALSE(closedBuckets.empty());
+ break;
+ }
+ }
+
+ // We should have closed some (additional) buckets by now.
+ ASSERT_LT(numClosedInFirstRound, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold));
+}
+
} // namespace
} // namespace mongo