diff options
-rw-r--r-- | jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js | 173 | ||||
-rw-r--r-- | jstests/noPassthrough/timeseries_dynamic_bucket_sizing_large.js | 187 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/kv_engine.h | 7 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h | 5 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 90 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 14 | ||||
-rw-r--r-- | src/mongo/db/timeseries/timeseries.idl | 9 |
8 files changed, 471 insertions, 21 deletions
diff --git a/jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js b/jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js new file mode 100644 index 00000000000..dd799c3bd94 --- /dev/null +++ b/jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js @@ -0,0 +1,173 @@ +/** + * Evaluate the behaviour of bucket closure when we simulate high cache pressure due to a high + * cardinality workload. After we hit a certain cardinality (the number of active buckets generated + * in this test by distinct metaField values) we expect buckets to be closed with a smaller bucket + * size limit to alleviate pressure on the cache. + * + * @tags: [ + * # Exclude in-memory engine, rollbacks due to pinned cache content rely on eviction. + * requires_persistence, + * requires_replication, + * requires_wiredtiger, + * ] + */ +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); + +const minWiredTigerCacheSizeGB = 0.256; +const cacheSize = minWiredTigerCacheSizeGB * 1000 * 1000 * 1000; // 256 MB +const defaultBucketMaxSize = 128000; // 125 KB +const minBucketCount = 10; + +// A cardinality higher than this calculated value will call for smaller bucket size limit caused by +// cache pressure. +const cardinalityForCachePressure = Math.ceil(cacheSize / (2 * defaultBucketMaxSize)); // 1000 + +const replSet = new ReplSetTest({ + nodes: 1, + nodeOptions: {wiredTigerCacheSizeGB: minWiredTigerCacheSizeGB}, +}); +replSet.startSet({setParameter: {timeseriesBucketMaxSize: defaultBucketMaxSize}}); +replSet.initiate(); + +const db = replSet.getPrimary().getDB("test"); +const coll = db.getCollection('t'); +coll.drop(); + +if (!TimeseriesTest.timeseriesScalabilityImprovementsEnabled(db)) { + replSet.stopSet(); + jsTestLog( + 'Skipping test because the TimeseriesScalabilityImprovements feature flag is disabled.'); + return; +} + +const timeFieldName = 'time'; +const metaFieldName = 'meta'; + +const resetCollection = (() => { + coll.drop(); + assert.commandWorked(db.createCollection( + coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); +}); + +// Inserts documents into the collection with increasing meta fields to generate N buckets. We make +// sure to exceed the bucket min count per bucket to bypass large measurement checks. +const initializeBucketsPastMinCount = function(numOfBuckets = 1) { + jsTestLog("Inserting and generating buckets."); + let batch = []; + for (let i = 0; i < numOfBuckets; i++) { + for (let j = 0; j < minBucketCount; ++j) { + const doc = { + _id: '' + i + j, + [timeFieldName]: ISODate(), + [metaFieldName]: i, + value: "a".repeat(1000) + }; + batch.push(doc); + + if (batch.length >= 100) { + assert.commandWorked(coll.insertMany(batch)); + batch = []; + } + } + } + if (batch.length > 0) { + assert.commandWorked(coll.insertMany(batch)); + } +}; + +resetCollection(); + +const belowCardinalityThreshold = cardinalityForCachePressure; +initializeBucketsPastMinCount(belowCardinalityThreshold); + +let timeseriesStats = assert.commandWorked(coll.stats()).timeseries; +let bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize; +let bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure; +let compressedBuckets = timeseriesStats.numCompressedBuckets; + +// We only end up doing two passes before we start to close buckets due to size limits. +while (bucketsClosedDueToSize == 0) { + let batch = []; + for (let i = 0; i < belowCardinalityThreshold; i++) { + const doc1 = { + _id: '00' + i, + [timeFieldName]: ISODate(), + [metaFieldName]: i, + value: "a".repeat(30000) + }; + const doc2 = { + _id: '00' + i, + [timeFieldName]: ISODate(), + [metaFieldName]: i, + value: "a".repeat(20000) + }; + batch.push(doc1); + batch.push(doc2); + + if (batch.length >= 100) { + assert.commandWorked(coll.insertMany(batch)); + batch = []; + } + } + + if (batch.length != 0) { + assert.commandWorked(coll.insertMany(batch)); + } + + timeseriesStats = assert.commandWorked(coll.stats()).timeseries; + bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize; + bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure; + compressedBuckets = timeseriesStats.numCompressedBuckets; +} + +// On the second pass of inserts, we will close buckets due to the default size constraints. No +// buckets should be closed due to cache pressure. +assert.eq(bucketsClosedDueToSize, cardinalityForCachePressure); +assert.eq(bucketsClosedDueToCachePressure, 0); +assert.eq(compressedBuckets, cardinalityForCachePressure); + +// If we pass the cardinality point to simulate cache pressure, we will begin to see buckets closed +// due to 'CachePressure' and not 'DueToSize'. +const aboveCardinalityThreshold = cardinalityForCachePressure * 3 / 2; +initializeBucketsPastMinCount(aboveCardinalityThreshold); + +let batch = []; +for (let i = 0; i < aboveCardinalityThreshold; i++) { + const doc = + {_id: '00' + i, [timeFieldName]: ISODate(), [metaFieldName]: i, value: "a".repeat(20000)}; + batch.push(doc); + + if (batch.length >= 100) { + assert.commandWorked(coll.insertMany(batch)); + batch = []; + } +} +if (batch.length != 0) { + print(batch.length); + assert.commandWorked(coll.insertMany(batch)); +} + +timeseriesStats = assert.commandWorked(coll.stats()).timeseries; +bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize; +bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure; +compressedBuckets = timeseriesStats.numCompressedBuckets; + +// We expect 'bucketsClosedDueToSize' to remain the same but 'bucketsClosedDueToCachePressure' to +// increase. +assert.eq(bucketsClosedDueToSize, cardinalityForCachePressure); + +// Previously, the bucket max size was 128000 bytes, but under cache pressure using +// 'aboveCardinalityThreshold', the max size drops to roughly ~85334. This means the old +// measurements (up to 'cardinalityForCachePressure') will need to be closed since they are sized at +// ~120000 bytes. The newly inserted measurements are only sized at ~(20000 * 3) bytes so stay open. +assert.eq(bucketsClosedDueToCachePressure, cardinalityForCachePressure); + +// We expect the number of compressed buckets to double (independent to whether the buckets were +// closed due to size or cache pressure). +assert.eq(compressedBuckets, 2 * cardinalityForCachePressure); + +replSet.stopSet(); +})(); diff --git a/jstests/noPassthrough/timeseries_dynamic_bucket_sizing_large.js b/jstests/noPassthrough/timeseries_dynamic_bucket_sizing_large.js new file mode 100644 index 00000000000..3c644219837 --- /dev/null +++ b/jstests/noPassthrough/timeseries_dynamic_bucket_sizing_large.js @@ -0,0 +1,187 @@ +/** + * Evaluate the behaviour of bucket closure when we simulate high cache pressure due to a high + * cardinality workload. After we hit a certain cardinality (the number of active buckets generated + * in this test by distinct metaField values) we expect buckets to be closed with a smaller bucket + * size limit to alleviate pressure on the cache with respect to large measurement insertions. + * + * @tags: [ + * # Exclude in-memory engine, rollbacks due to pinned cache content rely on eviction. + * requires_persistence, + * requires_replication, + * requires_wiredtiger, + * ] + */ +(function() { +"use strict"; + +load("jstests/core/timeseries/libs/timeseries.js"); + +const defaultBucketMaxSize = 128000; // 125 KB +const minWiredTigerCacheSizeGB = 0.256; // 256 MB +const minWiredTigerCacheSize = minWiredTigerCacheSizeGB * 1024 * 1024 * 1024; // 256 MB +const measurementValueLength = 1 * 1024 * 1024; // 1 MB +const defaultBucketMinCount = 10; + +const replSet = new ReplSetTest({ + nodes: 1, + nodeOptions: {wiredTigerCacheSizeGB: minWiredTigerCacheSizeGB}, +}); +replSet.startSet({setParameter: {timeseriesBucketMaxSize: defaultBucketMaxSize}}); +replSet.initiate(); + +const db = replSet.getPrimary().getDB("test"); +let coll = db.getCollection('t'); +coll.drop(); + +if (!TimeseriesTest.timeseriesScalabilityImprovementsEnabled(db)) { + replSet.stopSet(); + jsTestLog( + 'Skipping test because the TimeseriesScalabilityImprovements feature flag is disabled.'); + return; +} + +const timeFieldName = 'time'; +const metaFieldName = 'meta'; + +const resetCollection = (() => { + coll.drop(); + assert.commandWorked(db.createCollection( + coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}})); +}); + +// Inserts small documents into the collection with increasing meta fields to generate N buckets. +const initializeBuckets = function(numOfBuckets = 1) { + jsTestLog("Inserting and generating buckets."); + let batch = []; + for (let i = 0; i < numOfBuckets; i++) { + const doc = {_id: i, [timeFieldName]: ISODate(), [metaFieldName]: i, value: "a"}; + batch.push(doc); + } + assert.commandWorked(coll.insertMany(batch)); +}; + +(function largeMeasurementsNoCachePressure() { + jsTestLog("Entering largeMeasurementsNoCachePressure..."); + coll = db.getCollection('largeMeasurementsNoCachePressure'); + resetCollection(); + + let expectedBucketCount = 0; + let numBucketsClosedDueToSize = 0; + let numBucketsClosedDueToCachePressure = 0; + let numCompressedBuckets = 0; + + const meta1 = 1; + const meta2 = 2; + + // Insert 9 large measurements into same bucket (mapping to meta1) resulting in a bucket of size + // ~11.5 MB (right under the largest size of buckets we allow which is 12 MB). + for (let i = 0; i < defaultBucketMinCount - 1; i++) { + const doc = { + _id: i, + [timeFieldName]: ISODate(), + [metaFieldName]: meta1, + value: "a".repeat(measurementValueLength) + }; + assert.commandWorked(coll.insert(doc)); + } + expectedBucketCount++; + + let timeseriesStats = assert.commandWorked(coll.stats()).timeseries; + assert.eq(timeseriesStats.bucketCount, expectedBucketCount); + assert.eq(timeseriesStats.numBucketsClosedDueToSize, numBucketsClosedDueToSize); + assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure, + numBucketsClosedDueToCachePressure); + assert.eq(timeseriesStats.numCompressedBuckets, numCompressedBuckets); + + // If we exceed the min bucket count of 10, we should close the bucket since it exceeds our + // default bucket size of 125 KB. (This requires two additional insertions). + const doc = {_id: 4, [timeFieldName]: ISODate(), [metaFieldName]: meta1, value: "a"}; + assert.commandWorked(coll.insert(doc)); + assert.commandWorked(coll.insert(doc)); + + expectedBucketCount++; + numBucketsClosedDueToSize++; + numCompressedBuckets++; + + timeseriesStats = assert.commandWorked(coll.stats()).timeseries; + assert.eq(timeseriesStats.bucketCount, expectedBucketCount); + assert.eq(timeseriesStats.numBucketsClosedDueToSize, numBucketsClosedDueToSize); + assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure, + numBucketsClosedDueToCachePressure); + assert.eq(timeseriesStats.numCompressedBuckets, numCompressedBuckets); + + // Since the maximum size for buckets is capped at 12 MB, we should hit the size limit before + // closing the bucket due to the minimum count, so we expect to close the oversized bucket and + // create another bucket. + for (let i = 0; i < defaultBucketMinCount; i++) { + const doc = { + _id: i, + [timeFieldName]: ISODate(), + [metaFieldName]: meta2, + value: "b".repeat(measurementValueLength) + }; + assert.commandWorked(coll.insert(doc)); + } + + // We create one bucket for 'meta2', fill it up and create another one for future insertions. + expectedBucketCount += 2; + numBucketsClosedDueToSize++; + numCompressedBuckets++; + + timeseriesStats = assert.commandWorked(coll.stats()).timeseries; + assert.eq(timeseriesStats.bucketCount, expectedBucketCount); + assert.eq(timeseriesStats.numBucketsClosedDueToSize, numBucketsClosedDueToSize); + assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure, + numBucketsClosedDueToCachePressure); + assert.eq(timeseriesStats.numCompressedBuckets, numCompressedBuckets); +})(); + +(function largeMeasurementsWithCachePressure() { + jsTestLog("Entering largeMeasurementsWithCachePressure..."); + coll = db.getCollection('largeMeasurementsWithCachePressure'); + resetCollection(); + + // We want the 'cacheDerivedMaxSize' to equal 5.5 MB. + const cacheDerivedMaxSize = 5.5 * 1024 * 1024; + const bucketCount = + Math.ceil(minWiredTigerCacheSize / (2 * cacheDerivedMaxSize)); // Evaluates to 24. + const meta = bucketCount; + + // We expect the bucket mapping to 'meta' to be around ~5 MB in size so no buckets should be + // closed yet. We generate a cardinality equal to 'bucketCount'. + initializeBuckets(bucketCount - 1); + for (let i = 0; i < 3; i++) { + const doc = { + _id: i, + [timeFieldName]: ISODate(), + [metaFieldName]: meta, + value: "a".repeat(measurementValueLength) + }; + assert.commandWorked(coll.insert(doc)); + } + + let timeseriesStats = assert.commandWorked(coll.stats()).timeseries; + assert.eq(timeseriesStats.bucketCount, bucketCount); + assert.eq(timeseriesStats.numBucketsClosedDueToSize, 0); + assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure, 0); + assert.eq(timeseriesStats.numCompressedBuckets, 0); + + // We expect this insert to cause the bucket to close due to cache pressure since it will exceed + // the rough cacheDerivedMaxSize of 5.5 MB and create a new bucket for this measurement. + const doc = { + _id: bucketCount, + [timeFieldName]: ISODate(), + [metaFieldName]: meta, + value: "a".repeat(measurementValueLength) + }; + assert.commandWorked(coll.insert(doc)); + + timeseriesStats = assert.commandWorked(coll.stats()).timeseries; + assert.eq(timeseriesStats.bucketCount, bucketCount + 1); + assert.eq(timeseriesStats.numBucketsClosedDueToSize, 0); + assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure, 1); + assert.eq(timeseriesStats.numCompressedBuckets, 1); +})(); + +replSet.stopSet(); +})(); diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h index d3ce95f8c58..16a6eb3f325 100644 --- a/src/mongo/db/storage/kv/kv_engine.h +++ b/src/mongo/db/storage/kv/kv_engine.h @@ -470,6 +470,13 @@ public: } /** + * Returns the cache size in MB. + */ + virtual size_t getCacheSizeMB() const { + return 0; + } + + /** * The destructor will never be called from mongod, but may be called from tests. * Engines may assume that this will only be called in the case of clean shutdown, even if * cleanShutdown() hasn't been called. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 73041899938..41ee195fd7b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -334,7 +334,8 @@ WiredTigerKVEngine::WiredTigerKVEngine(OperationContext* opCtx, _sizeStorerSyncTracker(cs, 100000, Seconds(60)), _ephemeral(ephemeral), _inRepairMode(repair), - _keepDataHistory(serverGlobalParams.enableMajorityReadConcern) { + _keepDataHistory(serverGlobalParams.enableMajorityReadConcern), + _cacheSizeMB(cacheSizeMB) { _pinnedOplogTimestamp.store(Timestamp::max().asULL()); boost::filesystem::path journalPath = path; journalPath /= "journal"; @@ -2696,4 +2697,8 @@ KeyFormat WiredTigerKVEngine::getKeyFormat(OperationContext* opCtx, StringData i return wtTableConfig.find("key_format=u") != string::npos ? KeyFormat::String : KeyFormat::Long; } +size_t WiredTigerKVEngine::getCacheSizeMB() const { + return _cacheSizeMB; +} + } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index c1128d6292b..73b720774d5 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -418,6 +418,8 @@ public: KeyFormat getKeyFormat(OperationContext* opCtx, StringData ident) const override; + size_t getCacheSizeMB() const override; + private: class WiredTigerSessionSweeper; @@ -556,5 +558,8 @@ private: // Pins the oplog so that OplogStones will not truncate oplog history equal or newer to this // timestamp. AtomicWord<std::uint64_t> _pinnedOplogTimestamp; + + // The amount of memory alloted for the WiredTiger cache. + size_t _cacheSizeMB; }; } // namespace mongo diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index dcda06f4a4e..0c18177c5bd 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -41,6 +41,7 @@ #include "mongo/db/commands/server_status.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/operation_context.h" +#include "mongo/db/storage/kv/kv_engine.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/timeseries/bucket_catalog_helpers.h" #include "mongo/db/timeseries/bucket_compression.h" @@ -135,6 +136,27 @@ Status getTimeseriesBucketClearedError(const OID& bucketId, str::stream() << "Time-series bucket " << bucketId << nsIdentification << " was cleared"}; } + +/** + * Caluculate the bucket max size constrained by the cache size and the cardinality of active + * buckets. + */ +int32_t getCacheDerivedBucketMaxSize(StorageEngine* storageEngine, uint32_t workloadCardinality) { + invariant(storageEngine); + uint64_t storageCacheSize = + static_cast<uint64_t>(storageEngine->getEngine()->getCacheSizeMB() * 1024 * 1024); + + if (!feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility) || + storageCacheSize == 0 || workloadCardinality == 0) { + return INT_MAX; + } + + uint64_t derivedMaxSize = storageCacheSize / (2 * workloadCardinality); + uint64_t intMax = static_cast<uint64_t>(std::numeric_limits<int32_t>::max()); + return std::min(derivedMaxSize, intMax); +} + } // namespace void BucketCatalog::ExecutionStatsController::incNumBucketInserts(long long increment) { @@ -169,6 +191,12 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToSize(long _globalStats->numBucketsClosedDueToSize.fetchAndAddRelaxed(increment); } +void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToCachePressure( + long long increment) { + _collectionStats->numBucketsClosedDueToCachePressure.fetchAndAddRelaxed(increment); + _globalStats->numBucketsClosedDueToCachePressure.fetchAndAddRelaxed(increment); +} + void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToTimeForward( long long increment) { _collectionStats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(increment); @@ -489,7 +517,7 @@ void BucketCatalog::Bucket::_calculateBucketFieldsAndSizeChange( const BSONObj& doc, boost::optional<StringData> metaField, NewFieldNames* newFieldNamesToBeInserted, - uint32_t* sizeToBeAdded) const { + int32_t* sizeToBeAdded) const { // BSON size for an object with an empty object field where field name is empty string. // We can use this as an offset to know the size when we have real field names. static constexpr int emptyObjSize = 12; @@ -957,6 +985,8 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load()); builder->appendNumber("numBucketsKeptOpenDueToLargeMeasurements", stats->numBucketsKeptOpenDueToLargeMeasurements.load()); + builder->appendNumber("numBucketsClosedDueToCachePressure", + stats->numBucketsClosedDueToCachePressure.load()); } } @@ -1300,6 +1330,7 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe, stats.incNumBucketsReopened(); _memoryUsage.addAndFetch(unownedBucket->_memoryUsage); + _numberOfActiveBuckets.fetchAndAdd(1); return unownedBucket; } @@ -1395,13 +1426,14 @@ std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::_insertIntoBucket( Bucket* bucket, ClosedBuckets* closedBuckets) { NewFieldNames newFieldNamesToBeInserted; - uint32_t sizeToBeAdded = 0; + int32_t sizeToBeAdded = 0; bucket->_calculateBucketFieldsAndSizeChange( doc, info->options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); bool isNewlyOpenedBucket = bucket->_ns.isEmpty(); if (!isNewlyOpenedBucket) { - auto action = _determineRolloverAction(doc, info, bucket, sizeToBeAdded, mode); + + auto action = _determineRolloverAction(opCtx, doc, info, bucket, sizeToBeAdded, mode); if (action == RolloverAction::kSoftClose && mode == AllowBucketCreation::kNo) { // We don't actually want to roll this bucket over yet, bail out. return std::shared_ptr<WriteBatch>{}; @@ -1497,6 +1529,7 @@ void BucketCatalog::_removeBucket(Stripe* stripe, _bucketStateManager.eraseBucketState(bucket->id()); } + _numberOfActiveBuckets.fetchAndSubtract(1); stripe->allBuckets.erase(allIt); } @@ -1511,9 +1544,12 @@ void BucketCatalog::_archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* long long memory = _marginalMemoryUsageForArchivedBucket(archivedSet[bucket->getTime()], archivedSet.size() == 1); _memoryUsage.fetchAndAdd(memory); - archived = true; } + + // If we have an archived bucket, we still want to account for it in numberOfOpenBuckets so we + // will increase it here since removeBucket decrements the count. + _numberOfActiveBuckets.fetchAndAdd(1); _removeBucket(stripe, stripeLock, bucket, archived); } @@ -1695,6 +1731,7 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe, archivedSet.erase(archivedSet.begin()); } _memoryUsage.fetchAndSubtract(memory); + _numberOfActiveBuckets.fetchAndSubtract(1); stats.incNumBucketsClosedDueToMemoryThreshold(); ++numExpired; @@ -1714,8 +1751,10 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, tassert(6130900, "Expected bucket to be inserted", inserted); Bucket* bucket = it->second.get(); stripe->openBuckets[info.key] = bucket; + bool initialized = _bucketStateManager.initializeBucketState(bucketId, boost::none); invariant(initialized); + _numberOfActiveBuckets.fetchAndAdd(1); if (info.openedDuetoMetadata) { info.stats.incNumBucketsOpenedDueToMetadata(); @@ -1731,10 +1770,11 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, return bucket; } -BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(const BSONObj& doc, +BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(OperationContext* opCtx, + const BSONObj& doc, CreationInfo* info, Bucket* bucket, - uint32_t sizeToBeAdded, + int32_t sizeToBeAdded, AllowBucketCreation mode) { // If the mode is enabled to create new buckets, then we should update stats for soft closures // accordingly. If we specify the mode to not allow bucket creation, it means we are not sure if @@ -1763,20 +1803,34 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(const BSON info->stats.incNumBucketsClosedDueToSchemaChange(); return RolloverAction::kHardClose; } - if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { + + // In scenarios where we have a high cardinality workload and face increased cache pressure we + // will decrease the size of buckets before we close them. + int32_t cacheDerivedBucketMaxSize = getCacheDerivedBucketMaxSize( + opCtx->getServiceContext()->getStorageEngine(), _numberOfActiveBuckets.load()); + int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, cacheDerivedBucketMaxSize); + + // Before we hit our bucket minimum count, we will allow for large measurements to be inserted + // into buckets. Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max + // bucket size to 12MB. This is to leave some space in the bucket if we need to add new internal + // fields to existing, full buckets. + static constexpr int32_t largeMeasurementsMaxBucketSize = + BSONObjMaxUserSize - (4 * 1024 * 1024); + // We restrict the ceiling of the bucket max size under cache pressure. + int32_t absoluteMaxSize = std::min(largeMeasurementsMaxBucketSize, cacheDerivedBucketMaxSize); + + if (bucket->_size + sizeToBeAdded > effectiveMaxSize) { bool keepBucketOpenForLargeMeasurements = bucket->_numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) && feature_flags::gTimeseriesScalabilityImprovements.isEnabled( serverGlobalParams.featureCompatibility); if (keepBucketOpenForLargeMeasurements) { - // Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max - // bucket size to 12MB. This is to leave some space in the bucket if we need to add - // new internal fields to existing, full buckets. - static constexpr size_t largeMeasurementsMaxBucketSize = - BSONObjMaxUserSize - (4 * 1024 * 1024); - - if (bucket->_size + sizeToBeAdded > largeMeasurementsMaxBucketSize) { - info->stats.incNumBucketsClosedDueToSize(); + if (bucket->_size + sizeToBeAdded > absoluteMaxSize) { + if (absoluteMaxSize != largeMeasurementsMaxBucketSize) { + info->stats.incNumBucketsClosedDueToCachePressure(); + } else { + info->stats.incNumBucketsClosedDueToSize(); + } return RolloverAction::kHardClose; } @@ -1789,7 +1843,11 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(const BSON } return RolloverAction::kNone; } else { - info->stats.incNumBucketsClosedDueToSize(); + if (effectiveMaxSize == gTimeseriesBucketMaxSize) { + info->stats.incNumBucketsClosedDueToSize(); + } else { + info->stats.incNumBucketsClosedDueToCachePressure(); + } return RolloverAction::kHardClose; } } diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 23dbe7a9243..a03fe11e525 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -71,6 +71,7 @@ protected: AtomicWord<long long> numBucketsClosedDueToCount; AtomicWord<long long> numBucketsClosedDueToSchemaChange; AtomicWord<long long> numBucketsClosedDueToSize; + AtomicWord<long long> numBucketsClosedDueToCachePressure; AtomicWord<long long> numBucketsClosedDueToTimeForward; AtomicWord<long long> numBucketsClosedDueToTimeBackward; AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; @@ -97,6 +98,7 @@ protected: void incNumBucketsClosedDueToCount(long long increment = 1); void incNumBucketsClosedDueToSchemaChange(long long increment = 1); void incNumBucketsClosedDueToSize(long long increment = 1); + void incNumBucketsClosedDueToCachePressure(long long increment = 1); void incNumBucketsClosedDueToTimeForward(long long increment = 1); void incNumBucketsClosedDueToTimeBackward(long long increment = 1); void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1); @@ -729,7 +731,7 @@ protected: void _calculateBucketFieldsAndSizeChange(const BSONObj& doc, boost::optional<StringData> metaField, NewFieldNames* newFieldNamesToBeInserted, - uint32_t* sizeToBeAdded) const; + int32_t* sizeToBeAdded) const; /** * Returns whether BucketCatalog::commit has been called at least once on this bucket. @@ -784,7 +786,7 @@ protected: // The total size in bytes of the bucket's BSON serialization, including measurements to be // inserted. - uint64_t _size = 0; + int32_t _size = 0; // The total number of measurements in the bucket, including uncommitted measurements and // measurements to be inserted. @@ -1010,10 +1012,11 @@ protected: * Determines if 'bucket' needs to be rolled over to accomodate 'doc'. If so, determines whether * to archive or close 'bucket'. */ - RolloverAction _determineRolloverAction(const BSONObj& doc, + RolloverAction _determineRolloverAction(OperationContext* opCtx, + const BSONObj& doc, CreationInfo* info, Bucket* bucket, - uint32_t sizeToBeAdded, + int32_t sizeToBeAdded, AllowBucketCreation mode); /** @@ -1061,6 +1064,9 @@ protected: // Approximate memory usage of the bucket catalog. AtomicWord<uint64_t> _memoryUsage; + // Approximate cardinality of opened and archived buckets. + AtomicWord<uint32_t> _numberOfActiveBuckets; + class ServerStatus; }; } // namespace mongo diff --git a/src/mongo/db/timeseries/timeseries.idl b/src/mongo/db/timeseries/timeseries.idl index 92324068b30..5b32ae19ec2 100644 --- a/src/mongo/db/timeseries/timeseries.idl +++ b/src/mongo/db/timeseries/timeseries.idl @@ -74,6 +74,15 @@ server_parameters: cpp_varname: "gTimeseriesBucketMinCount" default: 10 validator: { gte: 1 } + "timeseriesBucketMinSize": + description: "If there is high memory pressure on the system, we will lower the maximum + size (in bytes) of measurements we pack into buckets. This value represents + the absolute lowest size we will limit buckets to." + set_at: [ startup ] + cpp_vartype: "std::int32_t" + cpp_varname: "gTimeseriesBucketMinSize" + default: 5120 # 5KB + validator: { gte: 1 } enums: BucketGranularity: |