diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2022-06-14 20:28:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-14 21:13:04 +0000 |
commit | aac074d9844f61f9a9a44d014987f2da93915a59 (patch) | |
tree | 75e143983312bb60f5fa2fac2895c767b57703ca /src/mongo | |
parent | f678ddaee2ed81c0bed9f72116135d63a3754e85 (diff) | |
download | mongo-aac074d9844f61f9a9a44d014987f2da93915a59.tar.gz |
SERVER-66683 Archive eligible buckets instead of closing them
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 276 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 60 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 201 |
3 files changed, 424 insertions, 113 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 0b365571616..f043c687e28 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -261,6 +261,24 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToMemoryThre _globalStats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(increment); } +void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToTimeForward( + long long increment) { + _collectionStats->numBucketsArchivedDueToTimeForward.fetchAndAddRelaxed(increment); + _globalStats->numBucketsArchivedDueToTimeForward.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToTimeBackward( + long long increment) { + _collectionStats->numBucketsArchivedDueToTimeBackward.fetchAndAddRelaxed(increment); + _globalStats->numBucketsArchivedDueToTimeBackward.fetchAndAddRelaxed(increment); +} + +void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToMemoryThreshold( + long long increment) { + _collectionStats->numBucketsArchivedDueToMemoryThreshold.fetchAndAddRelaxed(increment); + _globalStats->numBucketsArchivedDueToMemoryThreshold.fetchAndAddRelaxed(increment); +} + void BucketCatalog::ExecutionStatsController::incNumCommits(long long increment) { _collectionStats->numCommits.fetchAndAddRelaxed(increment); _globalStats->numCommits.fetchAndAddRelaxed(increment); @@ -285,7 +303,8 @@ class BucketCatalog::Bucket { public: friend class BucketCatalog; - Bucket(const OID& id, StripeNumber stripe) : _id(id), _stripe(stripe) {} + Bucket(const OID& id, StripeNumber stripe, BucketKey::Hash hash) + : _id(id), _stripe(stripe), _keyHash(hash) {} /** * Returns the ID for the underlying bucket. @@ -301,6 +320,13 @@ public: return _stripe; } + /** + * Returns the pre-computed hash of the corresponding BucketKey + */ + BucketKey::Hash keyHash() const { + return _keyHash; + } + // Returns the time associated with the bucket (id) Date_t getTime() const { return _minTime; @@ -421,6 +447,9 @@ private: // The stripe which owns this bucket. const StripeNumber _stripe; + // The pre-computed hash of the associated BucketKey + const BucketKey::Hash _keyHash; + // The namespace that this bucket is used for. NamespaceString _ns; @@ -457,9 +486,10 @@ private: // The number of committed measurements in the bucket. uint32_t _numCommittedMeasurements = 0; - // Whether the bucket is full. This can be due to number of measurements, size, or time + // Whether the bucket has been marked for a rollover action. It can be marked for closure due to + // number of measurements, size, or schema changes, or it can be marked for archival due to time // range. - bool _full = false; + RolloverAction _rolloverAction = RolloverAction::kNone; // The batch that has been prepared and is currently in the process of being committed, if // any. @@ -649,7 +679,7 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx, auto stripeNumber = _getStripeNumber(key); auto bucketId = bucketIdElem.OID(); - std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber); + std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber, key.hash); // Initialize the remaining member variables from the bucket document. bucket->_ns = ns; @@ -694,18 +724,6 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx, bucket->_numMeasurements = numMeasurements; bucket->_numCommittedMeasurements = numMeasurements; - auto isBucketFull = [](Bucket* bucket) -> bool { - if (bucket->_numMeasurements >= static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { - return true; - } - if (bucket->_size >= static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { - return true; - } - return false; - }; - - bucket->_full = isBucketFull(bucket.get()); - ExecutionStatsController stats = _getExecutionStats(ns); stats.incNumBucketsReopened(); @@ -780,37 +798,53 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( bucket->_calculateBucketFieldsAndSizeChange( doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); - auto shouldCloseBucket = [&](Bucket* bucket) -> bool { + auto determineRolloverAction = [&](Bucket* bucket) -> RolloverAction { + const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + if (bucket->schemaIncompatible(doc, metaFieldName, comparator)) { stats.incNumBucketsClosedDueToSchemaChange(); - return true; + return RolloverAction::kClose; } if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { stats.incNumBucketsClosedDueToCount(); - return true; + return RolloverAction::kClose; } if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { stats.incNumBucketsClosedDueToSize(); - return true; + return RolloverAction::kClose; } auto bucketTime = bucket->getTime(); if (time - bucketTime >= Seconds(*options.getBucketMaxSpanSeconds())) { - stats.incNumBucketsClosedDueToTimeForward(); - return true; + if (canArchive) { + stats.incNumBucketsArchivedDueToTimeForward(); + return RolloverAction::kArchive; + } else { + stats.incNumBucketsClosedDueToTimeForward(); + return RolloverAction::kClose; + } } if (time < bucketTime) { - stats.incNumBucketsClosedDueToTimeBackward(); - return true; + if (canArchive) { + stats.incNumBucketsArchivedDueToTimeBackward(); + return RolloverAction::kArchive; + } else { + stats.incNumBucketsClosedDueToTimeBackward(); + return RolloverAction::kClose; + } } - return false; + return RolloverAction::kNone; }; - if (!bucket->_ns.isEmpty() && shouldCloseBucket(bucket)) { - info.openedDuetoMetadata = false; - bucket = _rollover(&stripe, stripeLock, bucket, info); + if (!bucket->_ns.isEmpty()) { + auto action = determineRolloverAction(bucket); + if (action != RolloverAction::kNone) { + info.openedDuetoMetadata = false; + bucket = _rollover(&stripe, stripeLock, bucket, info, action); - bucket->_calculateBucketFieldsAndSizeChange( - doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); + bucket->_calculateBucketFieldsAndSizeChange( + doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); + } } auto batch = bucket->_activeBatch(getOpId(opCtx, combine), stats); @@ -917,29 +951,21 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); } } else if (bucket->allCommitted()) { - if (bucket->_full) { - // Everything in the bucket has been committed, and nothing more will be added since the - // bucket is full. Thus, we can remove it. - _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); - - auto it = stripe.allBuckets.find(batch->bucket().id); - if (it != stripe.allBuckets.end()) { - bucket = it->second.get(); - - closedBucket = ClosedBucket{batch->bucket().id, - bucket->getTimeField().toString(), - bucket->numMeasurements()}; - - // Only remove from allBuckets and idleBuckets. If it was marked full, we know - // that happened in Stripe::rollover, and that there is already a new open - // bucket for this metadata. - _markBucketNotIdle(&stripe, stripeLock, bucket); - _eraseBucketState(batch->bucket().id); - - stripe.allBuckets.erase(batch->bucket().id); + switch (bucket->_rolloverAction) { + case RolloverAction::kClose: { + closedBucket = ClosedBucket{ + bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}; + _removeBucket(&stripe, stripeLock, bucket, false); + break; + } + case RolloverAction::kArchive: { + _archiveBucket(&stripe, stripeLock, bucket); + break; + } + case RolloverAction::kNone: { + _markBucketIdle(&stripe, stripeLock, bucket); + break; } - } else { - _markBucketIdle(&stripe, stripeLock, bucket); } } return closedBucket; @@ -1015,6 +1041,7 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, stats->numBucketsClosedDueToTimeBackward.load()); builder->appendNumber("numBucketsClosedDueToMemoryThreshold", stats->numBucketsClosedDueToMemoryThreshold.load()); + auto commits = stats->numCommits.load(); builder->appendNumber("numCommits", commits); builder->appendNumber("numWaits", stats->numWaits.load()); @@ -1026,11 +1053,16 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( serverGlobalParams.featureCompatibility)) { + builder->appendNumber("numBucketsArchivedDueToTimeForward", + stats->numBucketsArchivedDueToTimeForward.load()); + builder->appendNumber("numBucketsArchivedDueToTimeBackward", + stats->numBucketsArchivedDueToTimeBackward.load()); + builder->appendNumber("numBucketsArchivedDueToMemoryThreshold", + stats->numBucketsArchivedDueToMemoryThreshold.load()); builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load()); } } - void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const { const std::shared_ptr<ExecutionStats> stats = _getExecutionStats(ns); _appendExecutionStatsToBuilder(stats.get(), builder); @@ -1078,6 +1110,10 @@ std::size_t BucketCatalog::BucketHasher::operator()(const BucketKey& key) const return key.hash; } +std::size_t BucketCatalog::PreHashed::operator()(const BucketKey::Hash& key) const { + return key; +} + BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) { return key.hash % kNumberOfStripes; } @@ -1173,23 +1209,51 @@ void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<Wri } } -bool BucketCatalog::_removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { - auto it = stripe->allBuckets.find(bucket->id()); - if (it == stripe->allBuckets.end()) { - return false; - } - +void BucketCatalog::_removeBucket(Stripe* stripe, + WithLock stripeLock, + Bucket* bucket, + bool archiving) { invariant(bucket->_batches.empty()); invariant(!bucket->_preparedBatch); + auto allIt = stripe->allBuckets.find(bucket->id()); + invariant(allIt != stripe->allBuckets.end()); + _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); _markBucketNotIdle(stripe, stripeLock, bucket); - stripe->openBuckets.erase({bucket->_ns, bucket->_metadata}); - _eraseBucketState(bucket->id()); - stripe->allBuckets.erase(it); + // If the bucket was rolled over, then there may be a different open bucket for this metadata. + auto openIt = stripe->openBuckets.find({bucket->_ns, bucket->_metadata}); + if (openIt != stripe->openBuckets.end() && openIt->second == bucket) { + stripe->openBuckets.erase(openIt); + } + + // If we are cleaning up while archiving a bucket, then we want to preserve its state. Otherwise + // we can remove the state from the catalog altogether. + if (!archiving) { + _eraseBucketState(bucket->id()); + } - return true; + stripe->allBuckets.erase(allIt); +} + +void BucketCatalog::_archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { + bool archived = false; + auto& archivedSet = stripe->archivedBuckets[bucket->keyHash()]; + auto it = archivedSet.find(bucket->getTime()); + if (it == archivedSet.end()) { + archivedSet.emplace(bucket->getTime(), + ArchivedBucket{bucket->id(), + bucket->getTimeField().toString(), + bucket->numMeasurements()}); + + long long memory = _marginalMemoryUsageForArchivedBucket(archivedSet[bucket->getTime()], + archivedSet.size() == 1); + _memoryUsage.fetchAndAdd(memory); + + archived = true; + } + _removeBucket(stripe, stripeLock, bucket, archived); } void BucketCatalog::_abort(Stripe* stripe, @@ -1235,7 +1299,7 @@ void BucketCatalog::_abort(Stripe* stripe, } if (doRemove) { - [[maybe_unused]] bool removed = _removeBucket(stripe, stripeLock, bucket); + _removeBucket(stripe, stripeLock, bucket, false); } } @@ -1258,19 +1322,54 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe, ExecutionStatsController& stats, std::vector<BucketCatalog::ClosedBucket>* closedBuckets) { // As long as we still need space and have entries and remaining attempts, close idle buckets. - int32_t numClosed = 0; + int32_t numExpired = 0; + + const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + while (!stripe->idleBuckets.empty() && _memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && - numClosed <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { + numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { Bucket* bucket = stripe->idleBuckets.back(); - ClosedBucket closed{ - bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}; - if (_removeBucket(stripe, stripeLock, bucket)) { + if (canArchive) { + _archiveBucket(stripe, stripeLock, bucket); + stats.incNumBucketsArchivedDueToMemoryThreshold(); + } else { + ClosedBucket closed{ + bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}; + _removeBucket(stripe, stripeLock, bucket, false); stats.incNumBucketsClosedDueToMemoryThreshold(); closedBuckets->push_back(closed); - ++numClosed; } + + ++numExpired; + } + + while (canArchive && !stripe->archivedBuckets.empty() && + _memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && + numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { + + auto& [hash, archivedSet] = *stripe->archivedBuckets.begin(); + invariant(!archivedSet.empty()); + + auto& [timestamp, bucket] = *archivedSet.begin(); + ClosedBucket closed{bucket.bucketId, bucket.timeField, bucket.numMeasurements, true}; + + long long memory = _marginalMemoryUsageForArchivedBucket(bucket, archivedSet.size() == 1); + _eraseBucketState(bucket.bucketId); + if (archivedSet.size() == 1) { + // If this is the only entry, erase the whole map so we don't leave it empty. + stripe->archivedBuckets.erase(stripe->archivedBuckets.begin()); + } else { + // Otherwise just erase this bucket from the map. + archivedSet.erase(archivedSet.begin()); + } + _memoryUsage.fetchAndSubtract(memory); + + stats.incNumBucketsClosedDueToMemoryThreshold(); + closedBuckets->push_back(closed); + ++numExpired; } } @@ -1281,8 +1380,8 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, auto [bucketId, roundedTime] = generateBucketId(info.time, info.options); - auto [it, inserted] = - stripe->allBuckets.try_emplace(bucketId, std::make_unique<Bucket>(bucketId, info.stripe)); + auto [it, inserted] = stripe->allBuckets.try_emplace( + bucketId, std::make_unique<Bucket>(bucketId, info.stripe, info.key.hash)); tassert(6130900, "Expected bucket to be inserted", inserted); Bucket* bucket = it->second.get(); stripe->openBuckets[info.key] = bucket; @@ -1306,20 +1405,25 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe, WithLock stripeLock, Bucket* bucket, - const CreationInfo& info) { - + const CreationInfo& info, + RolloverAction action) { + invariant(action != RolloverAction::kNone); if (bucket->allCommitted()) { - // The bucket does not contain any measurements that are yet to be committed, so we can - // remove it now. - info.closedBuckets->push_back(ClosedBucket{ - bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}); + // The bucket does not contain any measurements that are yet to be committed, so we can take + // action now. + if (action == RolloverAction::kClose) { + info.closedBuckets->push_back(ClosedBucket{ + bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}); - bool removed = _removeBucket(stripe, stripeLock, bucket); - invariant(removed); + _removeBucket(stripe, stripeLock, bucket, false); + } else { + invariant(action == RolloverAction::kArchive); + _archiveBucket(stripe, stripeLock, bucket); + } } else { - // We must keep the bucket around until it is committed, just mark it full so it we know to - // clean it up when the last batch finishes. - bucket->_full = true; + // We must keep the bucket around until all measurements are committed committed, just mark + // the action we chose now so it we know what to do when the last batch finishes. + bucket->_rolloverAction = action; } return _allocateBucket(stripe, stripeLock, info); @@ -1406,6 +1510,12 @@ boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const return state; } +long long BucketCatalog::_marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, + bool onlyEntryForMatchingMetaHash) { + return sizeof(std::size_t) + sizeof(Date_t) + sizeof(ArchivedBucket) + bucket.timeField.size() + + (onlyEntryForMatchingMetaHash ? sizeof(decltype(Stripe::archivedBuckets)::value_type) : 0); +} + class BucketCatalog::ServerStatus : public ServerStatusSection { struct BucketCounts { BucketCounts& operator+=(const BucketCounts& other) { diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index c133fea3864..fcd24f8a40f 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -67,6 +67,9 @@ class BucketCatalog { AtomicWord<long long> numBucketsClosedDueToTimeForward; AtomicWord<long long> numBucketsClosedDueToTimeBackward; AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; + AtomicWord<long long> numBucketsArchivedDueToTimeForward; + AtomicWord<long long> numBucketsArchivedDueToTimeBackward; + AtomicWord<long long> numBucketsArchivedDueToMemoryThreshold; AtomicWord<long long> numCommits; AtomicWord<long long> numWaits; AtomicWord<long long> numMeasurementsCommitted; @@ -88,6 +91,9 @@ class BucketCatalog { void incNumBucketsClosedDueToTimeForward(long long increment = 1); void incNumBucketsClosedDueToTimeBackward(long long increment = 1); void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1); + void incNumBucketsArchivedDueToTimeForward(long long increment = 1); + void incNumBucketsArchivedDueToTimeBackward(long long increment = 1); + void incNumBucketsArchivedDueToMemoryThreshold(long long increment = 1); void incNumCommits(long long increment = 1); void incNumWaits(long long increment = 1); void incNumMeasurementsCommitted(long long increment = 1); @@ -119,6 +125,7 @@ public: OID bucketId; std::string timeField; uint32_t numMeasurements; + bool eligibleForReopening = false; }; using ClosedBuckets = std::vector<ClosedBucket>; @@ -363,12 +370,14 @@ private: * Key to lookup open Bucket for namespace and metadata, with pre-computed hash. */ struct BucketKey { + using Hash = std::size_t; + BucketKey() = delete; BucketKey(const NamespaceString& nss, const BucketMetadata& meta); NamespaceString ns; BucketMetadata metadata; - std::size_t hash; + Hash hash; bool operator==(const BucketKey& other) const { return ns == other.ns && metadata == other.metadata; @@ -388,6 +397,23 @@ private: }; /** + * Hasher to support using a pre-computed hash as a key without having to compute another hash. + */ + struct PreHashed { + std::size_t operator()(const BucketKey::Hash& key) const; + }; + + /** + * Information of a Bucket that got archived while performing an operation on this + * BucketCatalog. + */ + struct ArchivedBucket { + OID bucketId; + std::string timeField; + uint32_t numMeasurements; + }; + + /** * Struct to hold a portion of the buckets managed by the catalog. * * Each of the bucket lists, as well as the buckets themselves, are protected by 'mutex'. @@ -406,6 +432,12 @@ private: // Buckets that do not have any outstanding writes. using IdleList = std::list<Bucket*>; IdleList idleBuckets; + + // Buckets that are not currently in the catalog, but which are eligible to receive more + // measurements. The top-level map is keyed by the hash of the BucketKey, while the stored + // map is keyed by the bucket's minimum timestamp. + stdx::unordered_map<BucketKey::Hash, std::map<Date_t, ArchivedBucket>, PreHashed> + archivedBuckets; }; StripeNumber _getStripeNumber(const BucketKey& key); @@ -453,7 +485,13 @@ private: /** * Removes the given bucket from the bucket catalog's internal data structures. */ - bool _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket); + void _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket, bool archiving); + + /** + * Archives the given bucket, minimizing the memory footprint but retaining the necessary + * information required to efficiently identify it as a candidate for future insertions. + */ + void _archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket); /** * Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other @@ -501,6 +539,11 @@ private: Bucket* _allocateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info); /** + * Mode enum to determine the rollover type decision for a given bucket. + */ + enum class RolloverAction { kNone, kArchive, kClose }; + + /** * Close the existing, full bucket and open a new one for the same metadata. * * Writes information about the closed bucket to the 'info' parameter. @@ -508,7 +551,8 @@ private: Bucket* _rollover(Stripe* stripe, WithLock stripeLock, Bucket* bucket, - const CreationInfo& info); + const CreationInfo& info, + RolloverAction action); ExecutionStatsController _getExecutionStats(const NamespaceString& ns); std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const; @@ -540,6 +584,16 @@ private: */ boost::optional<BucketState> _setBucketState(const OID& id, BucketState target); + /** + * Calculates the marginal memory usage for an archived bucket. The + * 'onlyEntryForMatchingMetaHash' parameter indicates that the bucket will be (if inserting) + * or was (if removing) the only bucket associated with it's meta hash value. If true, then + * the returned value will attempt to account for the overhead of the map data structure for + * the meta hash value. + */ + static long long _marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, + bool onlyEntryForMatchingMetaHash); + static constexpr std::size_t kNumberOfStripes = 32; std::array<Stripe, kNumberOfStripes> _stripes; diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 024c7685b26..daa391d3b58 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -43,6 +43,15 @@ namespace mongo { namespace { +constexpr StringData kNumSchemaChanges = "numBucketsClosedDueToSchemaChange"_sd; +constexpr StringData kNumBucketsReopened = "numBucketsReopened"_sd; +constexpr StringData kNumArchivedDueToTimeForward = "numBucketsArchivedDueToTimeForward"_sd; +constexpr StringData kNumArchivedDueToTimeBackward = "numBucketsArchivedDueToTimeBackward"_sd; +constexpr StringData kNumArchivedDueToMemoryThreshold = "numBucketsArchivedDueToMemoryThreshold"_sd; +constexpr StringData kNumClosedDueToTimeForward = "numBucketsClosedDueToTimeForward"_sd; +constexpr StringData kNumClosedDueToTimeBackward = "numBucketsClosedDueToTimeBackward"_sd; +constexpr StringData kNumClosedDueToMemoryThreshold = "numBucketsClosedDueToMemoryThreshold"_sd; + class BucketCatalogTest : public CatalogTestFixture { protected: class Task { @@ -74,9 +83,7 @@ protected: void _insertOneAndCommit(const NamespaceString& ns, uint16_t numPreviouslyCommittedMeasurements); - long long _getNumWaits(const NamespaceString& ns); - long long _getNumSchemaChanges(const NamespaceString& ns); - long long _getNumBucketsReopened(const NamespaceString& ns); + long long _getExecutionStat(const NamespaceString& ns, StringData stat); // Check that each group of objects has compatible schema with itself, but that inserting the // first object in new group closes the existing bucket and opens a new one @@ -180,22 +187,10 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns, _commit(batch, numPreviouslyCommittedMeasurements); } -long long BucketCatalogTest::_getNumWaits(const NamespaceString& ns) { - BSONObjBuilder builder; - _bucketCatalog->appendExecutionStats(ns, &builder); - return builder.obj().getIntField("numWaits"); -} - -long long BucketCatalogTest::_getNumSchemaChanges(const NamespaceString& ns) { +long long BucketCatalogTest::_getExecutionStat(const NamespaceString& ns, StringData stat) { BSONObjBuilder builder; _bucketCatalog->appendExecutionStats(ns, &builder); - return builder.obj().getIntField("numBucketsClosedDueToSchemaChange"); -} - -long long BucketCatalogTest::_getNumBucketsReopened(const NamespaceString& ns) { - BSONObjBuilder builder; - _bucketCatalog->appendExecutionStats(ns, &builder); - return builder.obj().getIntField("numBucketsReopened"); + return builder.obj().getIntField(stat); } void BucketCatalogTest::_testMeasurementSchema( @@ -212,7 +207,7 @@ void BucketCatalogTest::_testMeasurementSchema( timestampedDoc.append(_timeField, Date_t::now()); timestampedDoc.appendElements(doc); - auto pre = _getNumSchemaChanges(_ns1); + auto pre = _getExecutionStat(_ns1, kNumSchemaChanges); auto result = _bucketCatalog ->insert(_opCtx, _ns1, @@ -221,7 +216,7 @@ void BucketCatalogTest::_testMeasurementSchema( timestampedDoc.obj(), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue(); - auto post = _getNumSchemaChanges(_ns1); + auto post = _getExecutionStat(_ns1, kNumSchemaChanges); if (firstMember) { if (firstGroup) { @@ -1095,7 +1090,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), bucketDoc); ASSERT_OK(status); - ASSERT_EQ(1, _getNumBucketsReopened(_ns1)); + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); // Insert a measurement that is compatible with the reopened bucket. auto result = @@ -1109,7 +1104,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement // No buckets are closed. ASSERT(result.getValue().closedBuckets.empty()); - ASSERT_EQ(0, _getNumSchemaChanges(_ns1)); + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges)); auto batch = result.getValue().batch; ASSERT(batch->claimCommitRights()); @@ -1145,7 +1140,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), bucketDoc); ASSERT_OK(status); - ASSERT_EQ(1, _getNumBucketsReopened(_ns1)); + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); // Insert a measurement that is incompatible with the reopened bucket. auto result = @@ -1159,7 +1154,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme // The reopened bucket gets closed as the schema is incompatible. ASSERT_EQ(1, result.getValue().closedBuckets.size()); - ASSERT_EQ(1, _getNumSchemaChanges(_ns1)); + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumSchemaChanges)); auto batch = result.getValue().batch; ASSERT(batch->claimCommitRights()); @@ -1194,7 +1189,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), compressedBucketDoc); ASSERT_OK(status); - ASSERT_EQ(1, _getNumBucketsReopened(_ns1)); + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); // Insert a measurement that is compatible with the reopened bucket. auto result = @@ -1208,7 +1203,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) // No buckets are closed. ASSERT(result.getValue().closedBuckets.empty()); - ASSERT_EQ(0, _getNumSchemaChanges(_ns1)); + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges)); auto batch = result.getValue().batch; ASSERT(batch->claimCommitRights()); @@ -1249,7 +1244,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), compressedBucketDoc); ASSERT_OK(status); - ASSERT_EQ(1, _getNumBucketsReopened(_ns1)); + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); // Insert a measurement that is incompatible with the reopened bucket. auto result = @@ -1263,7 +1258,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement // The reopened bucket gets closed as the schema is incompatible. ASSERT_EQ(1, result.getValue().closedBuckets.size()); - ASSERT_EQ(1, _getNumSchemaChanges(_ns1)); + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumSchemaChanges)); auto batch = result.getValue().batch; ASSERT(batch->claimCommitRights()); @@ -1275,5 +1270,157 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement _bucketCatalog->finish(batch, {}); } + +TEST_F(BucketCatalogTest, ArchiveIfTimeForward) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements", + true}; + auto baseTimestamp = Date_t::now(); + + // Insert an initial document to make sure we have an open bucket. + auto result1 = + _bucketCatalog->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << baseTimestamp), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result1.getStatus()); + auto batch1 = result1.getValue().batch; + ASSERT(batch1->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + _bucketCatalog->finish(batch1, {}); + + // Make sure we start out with nothing closed or archived. + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumArchivedDueToTimeForward)); + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToTimeForward)); + + // Now insert another that's too far forward to fit in the same bucket + auto result2 = + _bucketCatalog->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << (baseTimestamp + Seconds{7200})), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result2.getStatus()); + auto batch2 = result2.getValue().batch; + ASSERT(batch2->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); + _bucketCatalog->finish(batch2, {}); + + // Make sure it was archived, not closed. + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToTimeForward)); + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToTimeForward)); +} + +TEST_F(BucketCatalogTest, ArchiveIfTimeBackward) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements", + true}; + auto baseTimestamp = Date_t::now(); + + // Insert an initial document to make sure we have an open bucket. + auto result1 = + _bucketCatalog->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << baseTimestamp), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result1.getStatus()); + auto batch1 = result1.getValue().batch; + ASSERT(batch1->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + _bucketCatalog->finish(batch1, {}); + + // Make sure we start out with nothing closed or archived. + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumArchivedDueToTimeBackward)); + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToTimeBackward)); + + // Now insert another that's too far Backward to fit in the same bucket + auto result2 = + _bucketCatalog->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << (baseTimestamp - Seconds{7200})), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result2.getStatus()); + auto batch2 = result2.getValue().batch; + ASSERT(batch2->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); + _bucketCatalog->finish(batch2, {}); + + // Make sure it was archived, not closed. + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToTimeBackward)); + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToTimeBackward)); +} + +TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements", + true}; + RAIIServerParameterControllerForTest memoryLimit{ + "timeseriesIdleBucketExpiryMemoryUsageThreshold", 10000}; + + // Insert a measurement with a unique meta value, guaranteeing we will open a new bucket but not + // close an old one except under memory pressure. + long long meta = 0; + auto insertDocument = [&meta, this]() -> BucketCatalog::ClosedBuckets { + auto result = + _bucketCatalog->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField << meta++), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + auto batch = result.getValue().batch; + ASSERT(batch->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + _bucketCatalog->finish(batch, {}); + + return result.getValue().closedBuckets; + }; + + // Ensure we start out with no buckets archived or closed due to memory pressure. + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)); + ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)); + + // With a memory limit of 10000 bytes, we should be guaranteed to hit the memory limit with no + // more than 1000 buckets since an open bucket takes up at least 10 bytes (in reality, + // significantly more, but this is definitely a safe assumption). + for (int i = 0; i < 1000; ++i) { + [[maybe_unused]] auto closedBuckets = insertDocument(); + + if (0 < _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)) { + break; + } + } + + // When we first hit the limit, we should try to archive some buckets prior to closing anything. + // However, depending on how the buckets are distributed over the stripes, it's possible that + // the current stripe will not have enough open buckets to archive to drop below the limit, and + // may immediately close a bucket it has just archived. We should be able to guarantee that we + // have archived a bucket prior to closing it though. + ASSERT_LT(0, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)); + auto numClosedInFirstRound = _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold); + ASSERT_LTE(numClosedInFirstRound, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)); + + // If we continue to open more new buckets with distinct meta values, eventually we'll run out + // of open buckets to archive and have to start closing archived buckets to relieve memory + // pressure. Again, an archived bucket should take up more than 10 bytes in the catalog, so we + // should be fine with a maximum of 1000 iterations. + for (int i = 0; i < 1000; ++i) { + auto closedBuckets = insertDocument(); + + if (numClosedInFirstRound < _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)) { + ASSERT_FALSE(closedBuckets.empty()); + break; + } + } + + // We should have closed some (additional) buckets by now. + ASSERT_LT(numClosedInFirstRound, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)); +} + } // namespace } // namespace mongo |