summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js173
-rw-r--r--jstests/noPassthrough/timeseries_dynamic_bucket_sizing_large.js187
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h5
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp90
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h14
-rw-r--r--src/mongo/db/timeseries/timeseries.idl9
8 files changed, 471 insertions, 21 deletions
diff --git a/jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js b/jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js
new file mode 100644
index 00000000000..dd799c3bd94
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_dynamic_bucket_sizing.js
@@ -0,0 +1,173 @@
+/**
+ * Evaluate the behaviour of bucket closure when we simulate high cache pressure due to a high
+ * cardinality workload. After we hit a certain cardinality (the number of active buckets generated
+ * in this test by distinct metaField values) we expect buckets to be closed with a smaller bucket
+ * size limit to alleviate pressure on the cache.
+ *
+ * @tags: [
+ * # Exclude in-memory engine, rollbacks due to pinned cache content rely on eviction.
+ * requires_persistence,
+ * requires_replication,
+ * requires_wiredtiger,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/core/timeseries/libs/timeseries.js");
+
+const minWiredTigerCacheSizeGB = 0.256;
+const cacheSize = minWiredTigerCacheSizeGB * 1000 * 1000 * 1000; // 256 MB
+const defaultBucketMaxSize = 128000; // 125 KB
+const minBucketCount = 10;
+
+// A cardinality higher than this calculated value will call for smaller bucket size limit caused by
+// cache pressure.
+const cardinalityForCachePressure = Math.ceil(cacheSize / (2 * defaultBucketMaxSize)); // 1000
+
+const replSet = new ReplSetTest({
+ nodes: 1,
+ nodeOptions: {wiredTigerCacheSizeGB: minWiredTigerCacheSizeGB},
+});
+replSet.startSet({setParameter: {timeseriesBucketMaxSize: defaultBucketMaxSize}});
+replSet.initiate();
+
+const db = replSet.getPrimary().getDB("test");
+const coll = db.getCollection('t');
+coll.drop();
+
+if (!TimeseriesTest.timeseriesScalabilityImprovementsEnabled(db)) {
+ replSet.stopSet();
+ jsTestLog(
+ 'Skipping test because the TimeseriesScalabilityImprovements feature flag is disabled.');
+ return;
+}
+
+const timeFieldName = 'time';
+const metaFieldName = 'meta';
+
+const resetCollection = (() => {
+ coll.drop();
+ assert.commandWorked(db.createCollection(
+ coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}));
+});
+
+// Inserts documents into the collection with increasing meta fields to generate N buckets. We make
+// sure to exceed the bucket min count per bucket to bypass large measurement checks.
+const initializeBucketsPastMinCount = function(numOfBuckets = 1) {
+ jsTestLog("Inserting and generating buckets.");
+ let batch = [];
+ for (let i = 0; i < numOfBuckets; i++) {
+ for (let j = 0; j < minBucketCount; ++j) {
+ const doc = {
+ _id: '' + i + j,
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: i,
+ value: "a".repeat(1000)
+ };
+ batch.push(doc);
+
+ if (batch.length >= 100) {
+ assert.commandWorked(coll.insertMany(batch));
+ batch = [];
+ }
+ }
+ }
+ if (batch.length > 0) {
+ assert.commandWorked(coll.insertMany(batch));
+ }
+};
+
+resetCollection();
+
+const belowCardinalityThreshold = cardinalityForCachePressure;
+initializeBucketsPastMinCount(belowCardinalityThreshold);
+
+let timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
+let bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize;
+let bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure;
+let compressedBuckets = timeseriesStats.numCompressedBuckets;
+
+// We only end up doing two passes before we start to close buckets due to size limits.
+while (bucketsClosedDueToSize == 0) {
+ let batch = [];
+ for (let i = 0; i < belowCardinalityThreshold; i++) {
+ const doc1 = {
+ _id: '00' + i,
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: i,
+ value: "a".repeat(30000)
+ };
+ const doc2 = {
+ _id: '00' + i,
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: i,
+ value: "a".repeat(20000)
+ };
+ batch.push(doc1);
+ batch.push(doc2);
+
+ if (batch.length >= 100) {
+ assert.commandWorked(coll.insertMany(batch));
+ batch = [];
+ }
+ }
+
+ if (batch.length != 0) {
+ assert.commandWorked(coll.insertMany(batch));
+ }
+
+ timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
+ bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize;
+ bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure;
+ compressedBuckets = timeseriesStats.numCompressedBuckets;
+}
+
+// On the second pass of inserts, we will close buckets due to the default size constraints. No
+// buckets should be closed due to cache pressure.
+assert.eq(bucketsClosedDueToSize, cardinalityForCachePressure);
+assert.eq(bucketsClosedDueToCachePressure, 0);
+assert.eq(compressedBuckets, cardinalityForCachePressure);
+
+// If we pass the cardinality point to simulate cache pressure, we will begin to see buckets closed
+// due to 'CachePressure' and not 'DueToSize'.
+const aboveCardinalityThreshold = cardinalityForCachePressure * 3 / 2;
+initializeBucketsPastMinCount(aboveCardinalityThreshold);
+
+let batch = [];
+for (let i = 0; i < aboveCardinalityThreshold; i++) {
+ const doc =
+ {_id: '00' + i, [timeFieldName]: ISODate(), [metaFieldName]: i, value: "a".repeat(20000)};
+ batch.push(doc);
+
+ if (batch.length >= 100) {
+ assert.commandWorked(coll.insertMany(batch));
+ batch = [];
+ }
+}
+if (batch.length != 0) {
+ print(batch.length);
+ assert.commandWorked(coll.insertMany(batch));
+}
+
+timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
+bucketsClosedDueToSize = timeseriesStats.numBucketsClosedDueToSize;
+bucketsClosedDueToCachePressure = timeseriesStats.numBucketsClosedDueToCachePressure;
+compressedBuckets = timeseriesStats.numCompressedBuckets;
+
+// We expect 'bucketsClosedDueToSize' to remain the same but 'bucketsClosedDueToCachePressure' to
+// increase.
+assert.eq(bucketsClosedDueToSize, cardinalityForCachePressure);
+
+// Previously, the bucket max size was 128000 bytes, but under cache pressure using
+// 'aboveCardinalityThreshold', the max size drops to roughly ~85334. This means the old
+// measurements (up to 'cardinalityForCachePressure') will need to be closed since they are sized at
+// ~120000 bytes. The newly inserted measurements are only sized at ~(20000 * 3) bytes so stay open.
+assert.eq(bucketsClosedDueToCachePressure, cardinalityForCachePressure);
+
+// We expect the number of compressed buckets to double (independent to whether the buckets were
+// closed due to size or cache pressure).
+assert.eq(compressedBuckets, 2 * cardinalityForCachePressure);
+
+replSet.stopSet();
+})();
diff --git a/jstests/noPassthrough/timeseries_dynamic_bucket_sizing_large.js b/jstests/noPassthrough/timeseries_dynamic_bucket_sizing_large.js
new file mode 100644
index 00000000000..3c644219837
--- /dev/null
+++ b/jstests/noPassthrough/timeseries_dynamic_bucket_sizing_large.js
@@ -0,0 +1,187 @@
+/**
+ * Evaluate the behaviour of bucket closure when we simulate high cache pressure due to a high
+ * cardinality workload. After we hit a certain cardinality (the number of active buckets generated
+ * in this test by distinct metaField values) we expect buckets to be closed with a smaller bucket
+ * size limit to alleviate pressure on the cache with respect to large measurement insertions.
+ *
+ * @tags: [
+ * # Exclude in-memory engine, rollbacks due to pinned cache content rely on eviction.
+ * requires_persistence,
+ * requires_replication,
+ * requires_wiredtiger,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/core/timeseries/libs/timeseries.js");
+
+const defaultBucketMaxSize = 128000; // 125 KB
+const minWiredTigerCacheSizeGB = 0.256; // 256 MB
+const minWiredTigerCacheSize = minWiredTigerCacheSizeGB * 1024 * 1024 * 1024; // 256 MB
+const measurementValueLength = 1 * 1024 * 1024; // 1 MB
+const defaultBucketMinCount = 10;
+
+const replSet = new ReplSetTest({
+ nodes: 1,
+ nodeOptions: {wiredTigerCacheSizeGB: minWiredTigerCacheSizeGB},
+});
+replSet.startSet({setParameter: {timeseriesBucketMaxSize: defaultBucketMaxSize}});
+replSet.initiate();
+
+const db = replSet.getPrimary().getDB("test");
+let coll = db.getCollection('t');
+coll.drop();
+
+if (!TimeseriesTest.timeseriesScalabilityImprovementsEnabled(db)) {
+ replSet.stopSet();
+ jsTestLog(
+ 'Skipping test because the TimeseriesScalabilityImprovements feature flag is disabled.');
+ return;
+}
+
+const timeFieldName = 'time';
+const metaFieldName = 'meta';
+
+const resetCollection = (() => {
+ coll.drop();
+ assert.commandWorked(db.createCollection(
+ coll.getName(), {timeseries: {timeField: timeFieldName, metaField: metaFieldName}}));
+});
+
+// Inserts small documents into the collection with increasing meta fields to generate N buckets.
+const initializeBuckets = function(numOfBuckets = 1) {
+ jsTestLog("Inserting and generating buckets.");
+ let batch = [];
+ for (let i = 0; i < numOfBuckets; i++) {
+ const doc = {_id: i, [timeFieldName]: ISODate(), [metaFieldName]: i, value: "a"};
+ batch.push(doc);
+ }
+ assert.commandWorked(coll.insertMany(batch));
+};
+
+(function largeMeasurementsNoCachePressure() {
+ jsTestLog("Entering largeMeasurementsNoCachePressure...");
+ coll = db.getCollection('largeMeasurementsNoCachePressure');
+ resetCollection();
+
+ let expectedBucketCount = 0;
+ let numBucketsClosedDueToSize = 0;
+ let numBucketsClosedDueToCachePressure = 0;
+ let numCompressedBuckets = 0;
+
+ const meta1 = 1;
+ const meta2 = 2;
+
+ // Insert 9 large measurements into same bucket (mapping to meta1) resulting in a bucket of size
+ // ~11.5 MB (right under the largest size of buckets we allow which is 12 MB).
+ for (let i = 0; i < defaultBucketMinCount - 1; i++) {
+ const doc = {
+ _id: i,
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: meta1,
+ value: "a".repeat(measurementValueLength)
+ };
+ assert.commandWorked(coll.insert(doc));
+ }
+ expectedBucketCount++;
+
+ let timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
+ assert.eq(timeseriesStats.bucketCount, expectedBucketCount);
+ assert.eq(timeseriesStats.numBucketsClosedDueToSize, numBucketsClosedDueToSize);
+ assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure,
+ numBucketsClosedDueToCachePressure);
+ assert.eq(timeseriesStats.numCompressedBuckets, numCompressedBuckets);
+
+ // If we exceed the min bucket count of 10, we should close the bucket since it exceeds our
+ // default bucket size of 125 KB. (This requires two additional insertions).
+ const doc = {_id: 4, [timeFieldName]: ISODate(), [metaFieldName]: meta1, value: "a"};
+ assert.commandWorked(coll.insert(doc));
+ assert.commandWorked(coll.insert(doc));
+
+ expectedBucketCount++;
+ numBucketsClosedDueToSize++;
+ numCompressedBuckets++;
+
+ timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
+ assert.eq(timeseriesStats.bucketCount, expectedBucketCount);
+ assert.eq(timeseriesStats.numBucketsClosedDueToSize, numBucketsClosedDueToSize);
+ assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure,
+ numBucketsClosedDueToCachePressure);
+ assert.eq(timeseriesStats.numCompressedBuckets, numCompressedBuckets);
+
+ // Since the maximum size for buckets is capped at 12 MB, we should hit the size limit before
+ // closing the bucket due to the minimum count, so we expect to close the oversized bucket and
+ // create another bucket.
+ for (let i = 0; i < defaultBucketMinCount; i++) {
+ const doc = {
+ _id: i,
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: meta2,
+ value: "b".repeat(measurementValueLength)
+ };
+ assert.commandWorked(coll.insert(doc));
+ }
+
+ // We create one bucket for 'meta2', fill it up and create another one for future insertions.
+ expectedBucketCount += 2;
+ numBucketsClosedDueToSize++;
+ numCompressedBuckets++;
+
+ timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
+ assert.eq(timeseriesStats.bucketCount, expectedBucketCount);
+ assert.eq(timeseriesStats.numBucketsClosedDueToSize, numBucketsClosedDueToSize);
+ assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure,
+ numBucketsClosedDueToCachePressure);
+ assert.eq(timeseriesStats.numCompressedBuckets, numCompressedBuckets);
+})();
+
+(function largeMeasurementsWithCachePressure() {
+ jsTestLog("Entering largeMeasurementsWithCachePressure...");
+ coll = db.getCollection('largeMeasurementsWithCachePressure');
+ resetCollection();
+
+ // We want the 'cacheDerivedMaxSize' to equal 5.5 MB.
+ const cacheDerivedMaxSize = 5.5 * 1024 * 1024;
+ const bucketCount =
+ Math.ceil(minWiredTigerCacheSize / (2 * cacheDerivedMaxSize)); // Evaluates to 24.
+ const meta = bucketCount;
+
+ // We expect the bucket mapping to 'meta' to be around ~5 MB in size so no buckets should be
+ // closed yet. We generate a cardinality equal to 'bucketCount'.
+ initializeBuckets(bucketCount - 1);
+ for (let i = 0; i < 3; i++) {
+ const doc = {
+ _id: i,
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: meta,
+ value: "a".repeat(measurementValueLength)
+ };
+ assert.commandWorked(coll.insert(doc));
+ }
+
+ let timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
+ assert.eq(timeseriesStats.bucketCount, bucketCount);
+ assert.eq(timeseriesStats.numBucketsClosedDueToSize, 0);
+ assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure, 0);
+ assert.eq(timeseriesStats.numCompressedBuckets, 0);
+
+ // We expect this insert to cause the bucket to close due to cache pressure since it will exceed
+ // the rough cacheDerivedMaxSize of 5.5 MB and create a new bucket for this measurement.
+ const doc = {
+ _id: bucketCount,
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: meta,
+ value: "a".repeat(measurementValueLength)
+ };
+ assert.commandWorked(coll.insert(doc));
+
+ timeseriesStats = assert.commandWorked(coll.stats()).timeseries;
+ assert.eq(timeseriesStats.bucketCount, bucketCount + 1);
+ assert.eq(timeseriesStats.numBucketsClosedDueToSize, 0);
+ assert.eq(timeseriesStats.numBucketsClosedDueToCachePressure, 1);
+ assert.eq(timeseriesStats.numCompressedBuckets, 1);
+})();
+
+replSet.stopSet();
+})();
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index d3ce95f8c58..16a6eb3f325 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -470,6 +470,13 @@ public:
}
/**
+ * Returns the cache size in MB.
+ */
+ virtual size_t getCacheSizeMB() const {
+ return 0;
+ }
+
+ /**
* The destructor will never be called from mongod, but may be called from tests.
* Engines may assume that this will only be called in the case of clean shutdown, even if
* cleanShutdown() hasn't been called.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 73041899938..41ee195fd7b 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -334,7 +334,8 @@ WiredTigerKVEngine::WiredTigerKVEngine(OperationContext* opCtx,
_sizeStorerSyncTracker(cs, 100000, Seconds(60)),
_ephemeral(ephemeral),
_inRepairMode(repair),
- _keepDataHistory(serverGlobalParams.enableMajorityReadConcern) {
+ _keepDataHistory(serverGlobalParams.enableMajorityReadConcern),
+ _cacheSizeMB(cacheSizeMB) {
_pinnedOplogTimestamp.store(Timestamp::max().asULL());
boost::filesystem::path journalPath = path;
journalPath /= "journal";
@@ -2696,4 +2697,8 @@ KeyFormat WiredTigerKVEngine::getKeyFormat(OperationContext* opCtx, StringData i
return wtTableConfig.find("key_format=u") != string::npos ? KeyFormat::String : KeyFormat::Long;
}
+size_t WiredTigerKVEngine::getCacheSizeMB() const {
+ return _cacheSizeMB;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index c1128d6292b..73b720774d5 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -418,6 +418,8 @@ public:
KeyFormat getKeyFormat(OperationContext* opCtx, StringData ident) const override;
+ size_t getCacheSizeMB() const override;
+
private:
class WiredTigerSessionSweeper;
@@ -556,5 +558,8 @@ private:
// Pins the oplog so that OplogStones will not truncate oplog history equal or newer to this
// timestamp.
AtomicWord<std::uint64_t> _pinnedOplogTimestamp;
+
+ // The amount of memory alloted for the WiredTiger cache.
+ size_t _cacheSizeMB;
};
} // namespace mongo
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index dcda06f4a4e..0c18177c5bd 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/timeseries/bucket_catalog_helpers.h"
#include "mongo/db/timeseries/bucket_compression.h"
@@ -135,6 +136,27 @@ Status getTimeseriesBucketClearedError(const OID& bucketId,
str::stream() << "Time-series bucket " << bucketId << nsIdentification
<< " was cleared"};
}
+
+/**
+ * Caluculate the bucket max size constrained by the cache size and the cardinality of active
+ * buckets.
+ */
+int32_t getCacheDerivedBucketMaxSize(StorageEngine* storageEngine, uint32_t workloadCardinality) {
+ invariant(storageEngine);
+ uint64_t storageCacheSize =
+ static_cast<uint64_t>(storageEngine->getEngine()->getCacheSizeMB() * 1024 * 1024);
+
+ if (!feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility) ||
+ storageCacheSize == 0 || workloadCardinality == 0) {
+ return INT_MAX;
+ }
+
+ uint64_t derivedMaxSize = storageCacheSize / (2 * workloadCardinality);
+ uint64_t intMax = static_cast<uint64_t>(std::numeric_limits<int32_t>::max());
+ return std::min(derivedMaxSize, intMax);
+}
+
} // namespace
void BucketCatalog::ExecutionStatsController::incNumBucketInserts(long long increment) {
@@ -169,6 +191,12 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToSize(long
_globalStats->numBucketsClosedDueToSize.fetchAndAddRelaxed(increment);
}
+void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToCachePressure(
+ long long increment) {
+ _collectionStats->numBucketsClosedDueToCachePressure.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsClosedDueToCachePressure.fetchAndAddRelaxed(increment);
+}
+
void BucketCatalog::ExecutionStatsController::incNumBucketsClosedDueToTimeForward(
long long increment) {
_collectionStats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(increment);
@@ -489,7 +517,7 @@ void BucketCatalog::Bucket::_calculateBucketFieldsAndSizeChange(
const BSONObj& doc,
boost::optional<StringData> metaField,
NewFieldNames* newFieldNamesToBeInserted,
- uint32_t* sizeToBeAdded) const {
+ int32_t* sizeToBeAdded) const {
// BSON size for an object with an empty object field where field name is empty string.
// We can use this as an offset to know the size when we have real field names.
static constexpr int emptyObjSize = 12;
@@ -957,6 +985,8 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats,
builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load());
builder->appendNumber("numBucketsKeptOpenDueToLargeMeasurements",
stats->numBucketsKeptOpenDueToLargeMeasurements.load());
+ builder->appendNumber("numBucketsClosedDueToCachePressure",
+ stats->numBucketsClosedDueToCachePressure.load());
}
}
@@ -1300,6 +1330,7 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe,
stats.incNumBucketsReopened();
_memoryUsage.addAndFetch(unownedBucket->_memoryUsage);
+ _numberOfActiveBuckets.fetchAndAdd(1);
return unownedBucket;
}
@@ -1395,13 +1426,14 @@ std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::_insertIntoBucket(
Bucket* bucket,
ClosedBuckets* closedBuckets) {
NewFieldNames newFieldNamesToBeInserted;
- uint32_t sizeToBeAdded = 0;
+ int32_t sizeToBeAdded = 0;
bucket->_calculateBucketFieldsAndSizeChange(
doc, info->options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded);
bool isNewlyOpenedBucket = bucket->_ns.isEmpty();
if (!isNewlyOpenedBucket) {
- auto action = _determineRolloverAction(doc, info, bucket, sizeToBeAdded, mode);
+
+ auto action = _determineRolloverAction(opCtx, doc, info, bucket, sizeToBeAdded, mode);
if (action == RolloverAction::kSoftClose && mode == AllowBucketCreation::kNo) {
// We don't actually want to roll this bucket over yet, bail out.
return std::shared_ptr<WriteBatch>{};
@@ -1497,6 +1529,7 @@ void BucketCatalog::_removeBucket(Stripe* stripe,
_bucketStateManager.eraseBucketState(bucket->id());
}
+ _numberOfActiveBuckets.fetchAndSubtract(1);
stripe->allBuckets.erase(allIt);
}
@@ -1511,9 +1544,12 @@ void BucketCatalog::_archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket*
long long memory = _marginalMemoryUsageForArchivedBucket(archivedSet[bucket->getTime()],
archivedSet.size() == 1);
_memoryUsage.fetchAndAdd(memory);
-
archived = true;
}
+
+ // If we have an archived bucket, we still want to account for it in numberOfOpenBuckets so we
+ // will increase it here since removeBucket decrements the count.
+ _numberOfActiveBuckets.fetchAndAdd(1);
_removeBucket(stripe, stripeLock, bucket, archived);
}
@@ -1695,6 +1731,7 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
archivedSet.erase(archivedSet.begin());
}
_memoryUsage.fetchAndSubtract(memory);
+ _numberOfActiveBuckets.fetchAndSubtract(1);
stats.incNumBucketsClosedDueToMemoryThreshold();
++numExpired;
@@ -1714,8 +1751,10 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
tassert(6130900, "Expected bucket to be inserted", inserted);
Bucket* bucket = it->second.get();
stripe->openBuckets[info.key] = bucket;
+
bool initialized = _bucketStateManager.initializeBucketState(bucketId, boost::none);
invariant(initialized);
+ _numberOfActiveBuckets.fetchAndAdd(1);
if (info.openedDuetoMetadata) {
info.stats.incNumBucketsOpenedDueToMetadata();
@@ -1731,10 +1770,11 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
return bucket;
}
-BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(const BSONObj& doc,
+BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(OperationContext* opCtx,
+ const BSONObj& doc,
CreationInfo* info,
Bucket* bucket,
- uint32_t sizeToBeAdded,
+ int32_t sizeToBeAdded,
AllowBucketCreation mode) {
// If the mode is enabled to create new buckets, then we should update stats for soft closures
// accordingly. If we specify the mode to not allow bucket creation, it means we are not sure if
@@ -1763,20 +1803,34 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(const BSON
info->stats.incNumBucketsClosedDueToSchemaChange();
return RolloverAction::kHardClose;
}
- if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
+
+ // In scenarios where we have a high cardinality workload and face increased cache pressure we
+ // will decrease the size of buckets before we close them.
+ int32_t cacheDerivedBucketMaxSize = getCacheDerivedBucketMaxSize(
+ opCtx->getServiceContext()->getStorageEngine(), _numberOfActiveBuckets.load());
+ int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, cacheDerivedBucketMaxSize);
+
+ // Before we hit our bucket minimum count, we will allow for large measurements to be inserted
+ // into buckets. Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max
+ // bucket size to 12MB. This is to leave some space in the bucket if we need to add new internal
+ // fields to existing, full buckets.
+ static constexpr int32_t largeMeasurementsMaxBucketSize =
+ BSONObjMaxUserSize - (4 * 1024 * 1024);
+ // We restrict the ceiling of the bucket max size under cache pressure.
+ int32_t absoluteMaxSize = std::min(largeMeasurementsMaxBucketSize, cacheDerivedBucketMaxSize);
+
+ if (bucket->_size + sizeToBeAdded > effectiveMaxSize) {
bool keepBucketOpenForLargeMeasurements =
bucket->_numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) &&
feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
serverGlobalParams.featureCompatibility);
if (keepBucketOpenForLargeMeasurements) {
- // Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max
- // bucket size to 12MB. This is to leave some space in the bucket if we need to add
- // new internal fields to existing, full buckets.
- static constexpr size_t largeMeasurementsMaxBucketSize =
- BSONObjMaxUserSize - (4 * 1024 * 1024);
-
- if (bucket->_size + sizeToBeAdded > largeMeasurementsMaxBucketSize) {
- info->stats.incNumBucketsClosedDueToSize();
+ if (bucket->_size + sizeToBeAdded > absoluteMaxSize) {
+ if (absoluteMaxSize != largeMeasurementsMaxBucketSize) {
+ info->stats.incNumBucketsClosedDueToCachePressure();
+ } else {
+ info->stats.incNumBucketsClosedDueToSize();
+ }
return RolloverAction::kHardClose;
}
@@ -1789,7 +1843,11 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(const BSON
}
return RolloverAction::kNone;
} else {
- info->stats.incNumBucketsClosedDueToSize();
+ if (effectiveMaxSize == gTimeseriesBucketMaxSize) {
+ info->stats.incNumBucketsClosedDueToSize();
+ } else {
+ info->stats.incNumBucketsClosedDueToCachePressure();
+ }
return RolloverAction::kHardClose;
}
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 23dbe7a9243..a03fe11e525 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -71,6 +71,7 @@ protected:
AtomicWord<long long> numBucketsClosedDueToCount;
AtomicWord<long long> numBucketsClosedDueToSchemaChange;
AtomicWord<long long> numBucketsClosedDueToSize;
+ AtomicWord<long long> numBucketsClosedDueToCachePressure;
AtomicWord<long long> numBucketsClosedDueToTimeForward;
AtomicWord<long long> numBucketsClosedDueToTimeBackward;
AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
@@ -97,6 +98,7 @@ protected:
void incNumBucketsClosedDueToCount(long long increment = 1);
void incNumBucketsClosedDueToSchemaChange(long long increment = 1);
void incNumBucketsClosedDueToSize(long long increment = 1);
+ void incNumBucketsClosedDueToCachePressure(long long increment = 1);
void incNumBucketsClosedDueToTimeForward(long long increment = 1);
void incNumBucketsClosedDueToTimeBackward(long long increment = 1);
void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1);
@@ -729,7 +731,7 @@ protected:
void _calculateBucketFieldsAndSizeChange(const BSONObj& doc,
boost::optional<StringData> metaField,
NewFieldNames* newFieldNamesToBeInserted,
- uint32_t* sizeToBeAdded) const;
+ int32_t* sizeToBeAdded) const;
/**
* Returns whether BucketCatalog::commit has been called at least once on this bucket.
@@ -784,7 +786,7 @@ protected:
// The total size in bytes of the bucket's BSON serialization, including measurements to be
// inserted.
- uint64_t _size = 0;
+ int32_t _size = 0;
// The total number of measurements in the bucket, including uncommitted measurements and
// measurements to be inserted.
@@ -1010,10 +1012,11 @@ protected:
* Determines if 'bucket' needs to be rolled over to accomodate 'doc'. If so, determines whether
* to archive or close 'bucket'.
*/
- RolloverAction _determineRolloverAction(const BSONObj& doc,
+ RolloverAction _determineRolloverAction(OperationContext* opCtx,
+ const BSONObj& doc,
CreationInfo* info,
Bucket* bucket,
- uint32_t sizeToBeAdded,
+ int32_t sizeToBeAdded,
AllowBucketCreation mode);
/**
@@ -1061,6 +1064,9 @@ protected:
// Approximate memory usage of the bucket catalog.
AtomicWord<uint64_t> _memoryUsage;
+ // Approximate cardinality of opened and archived buckets.
+ AtomicWord<uint32_t> _numberOfActiveBuckets;
+
class ServerStatus;
};
} // namespace mongo
diff --git a/src/mongo/db/timeseries/timeseries.idl b/src/mongo/db/timeseries/timeseries.idl
index 92324068b30..5b32ae19ec2 100644
--- a/src/mongo/db/timeseries/timeseries.idl
+++ b/src/mongo/db/timeseries/timeseries.idl
@@ -74,6 +74,15 @@ server_parameters:
cpp_varname: "gTimeseriesBucketMinCount"
default: 10
validator: { gte: 1 }
+ "timeseriesBucketMinSize":
+ description: "If there is high memory pressure on the system, we will lower the maximum
+ size (in bytes) of measurements we pack into buckets. This value represents
+ the absolute lowest size we will limit buckets to."
+ set_at: [ startup ]
+ cpp_vartype: "std::int32_t"
+ cpp_varname: "gTimeseriesBucketMinSize"
+ default: 5120 # 5KB
+ validator: { gte: 1 }
enums:
BucketGranularity: