summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2022-06-24 20:39:58 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-24 21:35:53 +0000
commit11aa03c2de2d008bb87264a24c225ff7da4e1a68 (patch)
tree7d5765b2fdc4207ca7ac2d0cd91e6520012992c0 /src/mongo/db/timeseries
parentb05965036e8659ea4ef5a312dca8d12761c17a33 (diff)
downloadmongo-11aa03c2de2d008bb87264a24c225ff7da4e1a68.tar.gz
SERVER-67262 Refactor BucketCatalog APIs to allow for bucket re-opening
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp651
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h158
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp221
3 files changed, 811 insertions, 219 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 41148c11dd0..fb506164f75 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -279,6 +279,12 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToMemoryTh
_globalStats->numBucketsArchivedDueToMemoryThreshold.fetchAndAddRelaxed(increment);
}
+void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToReopening(
+ long long increment) {
+ _collectionStats->numBucketsArchivedDueToReopening.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsArchivedDueToReopening.fetchAndAddRelaxed(increment);
+}
+
void BucketCatalog::ExecutionStatsController::incNumCommits(long long increment) {
_collectionStats->numCommits.fetchAndAddRelaxed(increment);
_globalStats->numCommits.fetchAndAddRelaxed(increment);
@@ -664,92 +670,40 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx,
str::stream() << "Attempting to reopen a bucket for a non-timeseries collection: "
<< ns);
- BSONElement bucketIdElem = bucketDoc.getField(timeseries::kBucketIdFieldName);
- if (bucketIdElem.eoo() || bucketIdElem.type() != BSONType::jstOID) {
- return {ErrorCodes::BadValue,
- str::stream() << timeseries::kBucketIdFieldName
- << " is missing or not an ObjectId"};
- }
-
- // Validate the bucket document against the schema.
- auto result = coll->checkValidation(opCtx, bucketDoc);
- if (result.first != Collection::SchemaValidationResult::kPass) {
- return result.second;
- }
-
BSONElement metadata;
auto metaFieldName = options->getMetaField();
if (metaFieldName) {
metadata = bucketDoc.getField(*metaFieldName);
}
-
- // Buckets are spread across independently-lockable stripes to improve parallelism. We map a
- // bucket to a stripe by hashing the BucketKey.
auto key = BucketKey{ns, BucketMetadata{metadata, coll->getDefaultCollator()}};
- auto stripeNumber = _getStripeNumber(key);
-
- auto bucketId = bucketIdElem.OID();
- std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber, key.hash);
- // Initialize the remaining member variables from the bucket document.
- bucket->_ns = ns;
- bucket->_metadata = key.metadata;
- bucket->_timeField = options->getTimeField().toString();
- bucket->_size = bucketDoc.objsize();
- bucket->_minTime = bucketDoc.getObjectField(timeseries::kBucketControlFieldName)
- .getObjectField(timeseries::kBucketControlMinFieldName)
- .getField(options->getTimeField())
- .Date();
-
- // Populate the top-level data field names.
- const BSONObj& dataObj = bucketDoc.getObjectField(timeseries::kBucketDataFieldName);
- for (const BSONElement& dataElem : dataObj) {
- auto hashedKey = StringSet::hasher().hashed_key(dataElem.fieldName());
- bucket->_fieldNames.emplace(hashedKey);
- }
-
- auto swMinMax = timeseries::generateMinMaxFromBucketDoc(bucketDoc, coll->getDefaultCollator());
- if (!swMinMax.isOK()) {
- return swMinMax.getStatus();
- }
- bucket->_minmax = std::move(swMinMax.getValue());
-
- auto swSchema = timeseries::generateSchemaFromBucketDoc(bucketDoc, coll->getDefaultCollator());
- if (!swSchema.isOK()) {
- return swSchema.getStatus();
- }
- bucket->_schema = std::move(swSchema.getValue());
+ // Validate the bucket document against the schema.
+ auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto {
+ return coll->checkValidation(opCtx, bucketDoc);
+ };
- uint32_t numMeasurements = 0;
- const bool isCompressed = timeseries::isCompressedBucket(bucketDoc);
- const BSONElement timeColumnElem = dataObj.getField(options->getTimeField());
+ auto stats = _getExecutionStats(ns);
- if (isCompressed && timeColumnElem.type() == BSONType::BinData) {
- BSONColumn storage{timeColumnElem};
- numMeasurements = storage.size();
- } else {
- numMeasurements = timeColumnElem.Obj().nFields();
+ auto res = _rehydrateBucket(opCtx,
+ ns,
+ coll->getDefaultCollator(),
+ *options,
+ stats,
+ BucketToReopen{bucketDoc, validator},
+ boost::none);
+ if (!res.isOK()) {
+ return res.getStatus();
}
+ auto bucket = std::move(res.getValue());
- bucket->_numMeasurements = numMeasurements;
- bucket->_numCommittedMeasurements = numMeasurements;
-
- ExecutionStatsController stats = _getExecutionStats(ns);
- stats.incNumBucketsReopened();
+ auto stripeNumber = _getStripeNumber(key);
// Register the reopened bucket with the catalog.
auto& stripe = _stripes[stripeNumber];
stdx::lock_guard stripeLock{stripe.mutex};
ClosedBuckets closedBuckets;
- _expireIdleBuckets(&stripe, stripeLock, stats, &closedBuckets);
-
- auto [it, inserted] = stripe.allBuckets.try_emplace(bucketId, std::move(bucket));
- tassert(6668200, "Expected bucket to be inserted", inserted);
- Bucket* unownedBucket = it->second.get();
- stripe.openBuckets[key] = unownedBucket;
- _initializeBucketState(bucketId);
-
+ _reopenBucket(&stripe, stripeLock, stats, key, std::move(bucket), &closedBuckets);
return Status::OK();
}
@@ -765,151 +719,26 @@ BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) const {
return bucket->_metadata.toBSON();
}
-StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
+StatusWith<BucketCatalog::InsertResult> BucketCatalog::tryInsert(
OperationContext* opCtx,
const NamespaceString& ns,
const StringData::ComparatorInterface* comparator,
const TimeseriesOptions& options,
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine) {
+ return _insert(opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kNo);
+}
- auto timeElem = doc[options.getTimeField()];
- if (!timeElem || BSONType::Date != timeElem.type()) {
- return {ErrorCodes::BadValue,
- str::stream() << "'" << options.getTimeField() << "' must be present and contain a "
- << "valid BSON UTC datetime value"};
- }
- auto time = timeElem.Date();
-
- ExecutionStatsController stats = _getExecutionStats(ns);
-
- BSONElement metadata;
- auto metaFieldName = options.getMetaField();
- if (metaFieldName) {
- metadata = doc[*metaFieldName];
- }
-
- // Buckets are spread across independently-lockable stripes to improve parallelism. We map a
- // bucket to a stripe by hashing the BucketKey.
- auto key = BucketKey{ns, BucketMetadata{metadata, comparator}};
- auto stripeNumber = _getStripeNumber(key);
-
- ClosedBuckets closedBuckets;
- CreationInfo info{key, stripeNumber, time, options, stats, &closedBuckets};
-
- auto& stripe = _stripes[stripeNumber];
- stdx::lock_guard stripeLock{stripe.mutex};
-
- Bucket* bucket = _useOrCreateBucket(&stripe, stripeLock, info);
- invariant(bucket);
-
- NewFieldNames newFieldNamesToBeInserted;
- uint32_t sizeToBeAdded = 0;
- bucket->_calculateBucketFieldsAndSizeChange(
- doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded);
-
- auto determineRolloverAction = [&](Bucket* bucket) -> RolloverAction {
- const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
- serverGlobalParams.featureCompatibility);
-
- if (bucket->schemaIncompatible(doc, metaFieldName, comparator)) {
- stats.incNumBucketsClosedDueToSchemaChange();
- return RolloverAction::kClose;
- }
- if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
- stats.incNumBucketsClosedDueToCount();
- return RolloverAction::kClose;
- }
- auto bucketTime = bucket->getTime();
- if (time - bucketTime >= Seconds(*options.getBucketMaxSpanSeconds())) {
- if (canArchive) {
- stats.incNumBucketsArchivedDueToTimeForward();
- return RolloverAction::kArchive;
- } else {
- stats.incNumBucketsClosedDueToTimeForward();
- return RolloverAction::kClose;
- }
- }
- if (time < bucketTime) {
- if (canArchive) {
- stats.incNumBucketsArchivedDueToTimeBackward();
- return RolloverAction::kArchive;
- } else {
- stats.incNumBucketsClosedDueToTimeBackward();
- return RolloverAction::kClose;
- }
- }
- if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
- bool keepBucketOpenForLargeMeasurements =
- bucket->_numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) &&
- feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
- serverGlobalParams.featureCompatibility);
- if (keepBucketOpenForLargeMeasurements) {
- // Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max
- // bucket size to 12MB. This is to leave some space in the bucket if we need to add
- // new internal fields to existing, full buckets.
- static constexpr size_t largeMeasurementsMaxBucketSize =
- BSONObjMaxUserSize - (4 * 1024 * 1024);
-
- if (bucket->_size + sizeToBeAdded > largeMeasurementsMaxBucketSize) {
- stats.incNumBucketsClosedDueToSize();
- return RolloverAction::kClose;
- }
-
- // There's enough space to add this measurement and we're still below the large
- // measurement threshold.
- if (!bucket->_keptOpenDueToLargeMeasurements) {
- // Only increment this metric once per bucket.
- bucket->_keptOpenDueToLargeMeasurements = true;
- stats.incNumBucketsKeptOpenDueToLargeMeasurements();
- }
- return RolloverAction::kNone;
- } else {
- stats.incNumBucketsClosedDueToSize();
- return RolloverAction::kClose;
- }
- }
- return RolloverAction::kNone;
- };
-
- if (!bucket->_ns.isEmpty()) {
- auto action = determineRolloverAction(bucket);
- if (action != RolloverAction::kNone) {
- info.openedDuetoMetadata = false;
- bucket = _rollover(&stripe, stripeLock, bucket, info, action);
-
- bucket->_calculateBucketFieldsAndSizeChange(
- doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded);
- }
- }
-
- auto batch = bucket->_activeBatch(getOpId(opCtx, combine), stats);
- batch->_addMeasurement(doc);
- batch->_recordNewFields(bucket, std::move(newFieldNamesToBeInserted));
-
- bucket->_numMeasurements++;
- bucket->_size += sizeToBeAdded;
- if (bucket->_ns.isEmpty()) {
- // The namespace and metadata only need to be set if this bucket was newly created.
- bucket->_ns = ns;
- bucket->_metadata = key.metadata;
-
- // The namespace is stored two times: the bucket itself and openBuckets.
- // We don't have a great approximation for the
- // _schema size, so we use initial document size minus metadata as an approximation. Since
- // the metadata itself is stored once, in the bucket, we can combine the two and just use
- // the initial document size. A unique pointer to the bucket is stored once: allBuckets. A
- // raw pointer to the bucket is stored at most twice: openBuckets, idleBuckets.
- bucket->_memoryUsage += (ns.size() * 2) + doc.objsize() + sizeof(Bucket) +
- sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2);
-
- bucket->_schema.update(doc, options.getMetaField(), comparator);
- } else {
- _memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
- }
- _memoryUsage.fetchAndAdd(bucket->_memoryUsage);
-
- return InsertResult{batch, closedBuckets};
+StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
+ OperationContext* opCtx,
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine,
+ boost::optional<BucketToReopen> bucketToReopen) {
+ return _insert(
+ opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kYes, bucketToReopen);
}
Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
@@ -1095,6 +924,8 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats,
stats->numBucketsArchivedDueToTimeBackward.load());
builder->appendNumber("numBucketsArchivedDueToMemoryThreshold",
stats->numBucketsArchivedDueToMemoryThreshold.load());
+ builder->appendNumber("numBucketsArchivedDueToReopening",
+ stats->numBucketsArchivedDueToReopening.load());
builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load());
builder->appendNumber("numBucketsKeptOpenDueToLargeMeasurements",
stats->numBucketsKeptOpenDueToLargeMeasurements.load());
@@ -1152,7 +983,33 @@ std::size_t BucketCatalog::PreHashed::operator()(const BucketKey::Hash& key) con
return key;
}
-BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) {
+StatusWith<std::pair<BucketCatalog::BucketKey, Date_t>> BucketCatalog::_extractBucketingParameters(
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ const BSONObj& doc) const {
+ auto timeElem = doc[options.getTimeField()];
+ if (!timeElem || BSONType::Date != timeElem.type()) {
+ return {ErrorCodes::BadValue,
+ str::stream() << "'" << options.getTimeField() << "' must be present and contain a "
+ << "valid BSON UTC datetime value"};
+ }
+ auto time = timeElem.Date();
+
+ BSONElement metadata;
+ auto metaFieldName = options.getMetaField();
+ if (metaFieldName) {
+ metadata = doc[*metaFieldName];
+ }
+
+ // Buckets are spread across independently-lockable stripes to improve parallelism. We map a
+ // bucket to a stripe by hashing the BucketKey.
+ auto key = BucketKey{ns, BucketMetadata{metadata, comparator}};
+
+ return {std::make_pair(key, time)};
+}
+
+BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) const {
return key.hash % kNumberOfStripes;
}
@@ -1195,13 +1052,15 @@ BucketCatalog::Bucket* BucketCatalog::_useBucketInState(Stripe* stripe,
return nullptr;
}
-BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe,
- WithLock stripeLock,
- const CreationInfo& info) {
+BucketCatalog::Bucket* BucketCatalog::_useBucket(Stripe* stripe,
+ WithLock stripeLock,
+ const CreationInfo& info,
+ AllowBucketCreation mode) {
auto it = stripe->openBuckets.find(info.key);
if (it == stripe->openBuckets.end()) {
// No open bucket for this metadata.
- return _allocateBucket(stripe, stripeLock, info);
+ return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info)
+ : nullptr;
}
Bucket* bucket = it->second;
@@ -1218,7 +1077,273 @@ BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe,
nullptr,
getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
- return _allocateBucket(stripe, stripeLock, info);
+ return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) : nullptr;
+}
+
+StatusWith<std::unique_ptr<BucketCatalog::Bucket>> BucketCatalog::_rehydrateBucket(
+ OperationContext* opCtx,
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ ExecutionStatsController stats,
+ boost::optional<BucketToReopen> bucketToReopen,
+ boost::optional<const BucketKey&> expectedKey) const {
+ if (!bucketToReopen) {
+ // Nothing to rehydrate.
+ return {ErrorCodes::BadValue, "No bucket to rehydrate"};
+ }
+ invariant(feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility));
+ const auto& [bucketDoc, validator] = bucketToReopen.value();
+
+ BSONElement bucketIdElem = bucketDoc.getField(timeseries::kBucketIdFieldName);
+ if (bucketIdElem.eoo() || bucketIdElem.type() != BSONType::jstOID) {
+ return {ErrorCodes::BadValue,
+ str::stream() << timeseries::kBucketIdFieldName
+ << " is missing or not an ObjectId"};
+ }
+
+ // Validate the bucket document against the schema.
+ auto result = validator(opCtx, bucketDoc);
+ if (result.first != Collection::SchemaValidationResult::kPass) {
+ return result.second;
+ }
+
+ BSONElement metadata;
+ auto metaFieldName = options.getMetaField();
+ if (metaFieldName) {
+ metadata = bucketDoc.getField(*metaFieldName);
+ }
+
+ // Buckets are spread across independently-lockable stripes to improve parallelism. We map a
+ // bucket to a stripe by hashing the BucketKey.
+ auto key = BucketKey{ns, BucketMetadata{metadata, comparator}};
+ if (expectedKey.has_value() && key != expectedKey.value()) {
+ return {ErrorCodes::BadValue, "Bucket metadata does not match (hash collision)"};
+ }
+ auto stripeNumber = _getStripeNumber(key);
+
+ auto bucketId = bucketIdElem.OID();
+ std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber, key.hash);
+
+ // Initialize the remaining member variables from the bucket document.
+ bucket->_ns = ns;
+ bucket->_metadata = key.metadata;
+ bucket->_timeField = options.getTimeField().toString();
+ bucket->_size = bucketDoc.objsize();
+ bucket->_minTime = bucketDoc.getObjectField(timeseries::kBucketControlFieldName)
+ .getObjectField(timeseries::kBucketControlMinFieldName)
+ .getField(options.getTimeField())
+ .Date();
+
+ // Populate the top-level data field names.
+ const BSONObj& dataObj = bucketDoc.getObjectField(timeseries::kBucketDataFieldName);
+ for (const BSONElement& dataElem : dataObj) {
+ auto hashedKey = StringSet::hasher().hashed_key(dataElem.fieldName());
+ bucket->_fieldNames.emplace(hashedKey);
+ }
+
+ auto swMinMax = timeseries::generateMinMaxFromBucketDoc(bucketDoc, comparator);
+ if (!swMinMax.isOK()) {
+ return swMinMax.getStatus();
+ }
+ bucket->_minmax = std::move(swMinMax.getValue());
+
+ auto swSchema = timeseries::generateSchemaFromBucketDoc(bucketDoc, comparator);
+ if (!swSchema.isOK()) {
+ return swSchema.getStatus();
+ }
+ bucket->_schema = std::move(swSchema.getValue());
+
+ uint32_t numMeasurements = 0;
+ const bool isCompressed = timeseries::isCompressedBucket(bucketDoc);
+ const BSONElement timeColumnElem = dataObj.getField(options.getTimeField());
+
+ if (isCompressed && timeColumnElem.type() == BSONType::BinData) {
+ BSONColumn storage{timeColumnElem};
+ numMeasurements = storage.size();
+ } else {
+ numMeasurements = timeColumnElem.Obj().nFields();
+ }
+
+ bucket->_numMeasurements = numMeasurements;
+ bucket->_numCommittedMeasurements = numMeasurements;
+
+ return {std::move(bucket)};
+}
+
+BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe,
+ WithLock stripeLock,
+ ExecutionStatsController stats,
+ const BucketKey& key,
+ std::unique_ptr<Bucket>&& bucket,
+ ClosedBuckets* closedBuckets) {
+ invariant(bucket);
+
+ _expireIdleBuckets(stripe, stripeLock, stats, closedBuckets);
+
+ // If this bucket was archived, we need to remove it from the set of archived buckets.
+ if (auto setIt = stripe->archivedBuckets.find(key.hash);
+ setIt != stripe->archivedBuckets.end()) {
+ auto& archivedSet = setIt->second;
+ if (auto bucketIt = archivedSet.find(bucket->getTime());
+ bucketIt != archivedSet.end() && bucket->id() == bucketIt->second.bucketId) {
+ if (archivedSet.size() == 1) {
+ stripe->archivedBuckets.erase(setIt);
+ } else {
+ archivedSet.erase(bucketIt);
+ }
+ }
+ }
+
+ // We may need to initialize the bucket's state.
+ _initializeBucketState(bucket->id());
+
+ // Pass ownership of the reopened bucket to the bucket catalog.
+ auto [it, inserted] = stripe->allBuckets.try_emplace(bucket->id(), std::move(bucket));
+ tassert(6668200, "Expected bucket to be inserted", inserted);
+ Bucket* unownedBucket = it->second.get();
+
+ // If we already have an open bucket for this key, we need to archive it.
+ if (auto it = stripe->openBuckets.find(key); it != stripe->openBuckets.end()) {
+ stats.incNumBucketsArchivedDueToReopening();
+ if (it->second->allCommitted()) {
+ _archiveBucket(stripe, stripeLock, it->second);
+ } else {
+ it->second->_rolloverAction = RolloverAction::kArchive;
+ }
+ }
+
+ // Now actually mark this bucket as open.
+ stripe->openBuckets[key] = unownedBucket;
+ stats.incNumBucketsReopened();
+
+ return unownedBucket;
+}
+
+StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
+ OperationContext* opCtx,
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine,
+ AllowBucketCreation mode,
+ boost::optional<BucketToReopen> bucketToReopen) {
+ auto res = _extractBucketingParameters(ns, comparator, options, doc);
+ if (!res.isOK()) {
+ return res.getStatus();
+ }
+ auto& key = res.getValue().first;
+ auto time = res.getValue().second;
+
+ ExecutionStatsController stats = _getExecutionStats(ns);
+
+ // Buckets are spread across independently-lockable stripes to improve parallelism. We map a
+ // bucket to a stripe by hashing the BucketKey.
+ auto stripeNumber = _getStripeNumber(key);
+
+ InsertResult result;
+ CreationInfo info{key, stripeNumber, time, options, stats, &result.closedBuckets};
+
+ auto rehydratedBucket =
+ _rehydrateBucket(opCtx, ns, comparator, options, stats, bucketToReopen, key);
+
+ auto& stripe = _stripes[stripeNumber];
+ stdx::lock_guard stripeLock{stripe.mutex};
+
+ if (rehydratedBucket.isOK()) {
+ invariant(mode == AllowBucketCreation::kYes);
+
+ Bucket* bucket = _reopenBucket(&stripe,
+ stripeLock,
+ stats,
+ key,
+ std::move(rehydratedBucket.getValue()),
+ &result.closedBuckets);
+
+ result.batch = _insertIntoBucket(
+ opCtx, &stripe, stripeLock, doc, combine, mode, &info, bucket, &result.closedBuckets);
+ invariant(result.batch);
+
+ return result;
+ }
+
+ Bucket* bucket = _useBucket(&stripe, stripeLock, info, mode);
+ if (!bucket) {
+ invariant(mode == AllowBucketCreation::kNo);
+ result.candidate = _findArchivedCandidate(stripe, stripeLock, info);
+ return result;
+ }
+
+ result.batch = _insertIntoBucket(
+ opCtx, &stripe, stripeLock, doc, combine, mode, &info, bucket, &result.closedBuckets);
+ if (!result.batch) {
+ invariant(mode == AllowBucketCreation::kNo);
+ result.candidate = _findArchivedCandidate(stripe, stripeLock, info);
+ }
+ return result;
+}
+
+std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::_insertIntoBucket(
+ OperationContext* opCtx,
+ Stripe* stripe,
+ WithLock stripeLock,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine,
+ AllowBucketCreation mode,
+ CreationInfo* info,
+ Bucket* bucket,
+ ClosedBuckets* closedBuckets) {
+ NewFieldNames newFieldNamesToBeInserted;
+ uint32_t sizeToBeAdded = 0;
+ bucket->_calculateBucketFieldsAndSizeChange(
+ doc, info->options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded);
+
+ bool isNewlyOpenedBucket = bucket->_ns.isEmpty();
+ if (!isNewlyOpenedBucket) {
+ auto action = _determineRolloverAction(doc, info, bucket, sizeToBeAdded);
+ if (action == RolloverAction::kArchive && mode == AllowBucketCreation::kNo) {
+ // We don't actually want to roll this bucket over yet, bail out.
+ return std::shared_ptr<WriteBatch>{};
+ } else if (action != RolloverAction::kNone) {
+ info->openedDuetoMetadata = false;
+ bucket = _rollover(stripe, stripeLock, bucket, *info, action);
+ isNewlyOpenedBucket = true;
+
+ bucket->_calculateBucketFieldsAndSizeChange(
+ doc, info->options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded);
+ }
+ }
+
+ auto batch = bucket->_activeBatch(getOpId(opCtx, combine), info->stats);
+ batch->_addMeasurement(doc);
+ batch->_recordNewFields(bucket, std::move(newFieldNamesToBeInserted));
+
+ bucket->_numMeasurements++;
+ bucket->_size += sizeToBeAdded;
+ if (isNewlyOpenedBucket) {
+ // The namespace and metadata only need to be set if this bucket was newly created.
+ bucket->_ns = info->key.ns;
+ bucket->_metadata = info->key.metadata;
+
+ // The namespace is stored two times: the bucket itself and openBuckets.
+ // We don't have a great approximation for the
+ // _schema size, so we use initial document size minus metadata as an approximation. Since
+ // the metadata itself is stored once, in the bucket, we can combine the two and just use
+ // the initial document size. A unique pointer to the bucket is stored once: allBuckets. A
+ // raw pointer to the bucket is stored at most twice: openBuckets, idleBuckets.
+ bucket->_memoryUsage += (info->key.ns.size() * 2) + doc.objsize() + sizeof(Bucket) +
+ sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2);
+
+ bucket->_schema.update(
+ doc, info->options.getMetaField(), info->key.metadata.getComparator());
+ } else {
+ _memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
+ }
+ _memoryUsage.fetchAndAdd(bucket->_memoryUsage);
+
+ return batch;
}
void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch) {
@@ -1294,6 +1419,36 @@ void BucketCatalog::_archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket*
_removeBucket(stripe, stripeLock, bucket, archived);
}
+boost::optional<OID> BucketCatalog::_findArchivedCandidate(const Stripe& stripe,
+ WithLock stripeLock,
+ const CreationInfo& info) const {
+ const auto setIt = stripe.archivedBuckets.find(info.key.hash);
+ if (setIt == stripe.archivedBuckets.end()) {
+ return boost::none;
+ }
+
+ const auto& archivedSet = setIt->second;
+
+ // We want to find the largest time that is not greater than info.time. Generally lower_bound
+ // will return the smallest element not less than the search value, but we are using
+ // std::greater instead of std::less for the map's comparisons. This means the order of keys
+ // will be reversed, and lower_bound will return what we want.
+ auto it = archivedSet.lower_bound(info.time);
+ if (it == archivedSet.end()) {
+ return boost::none;
+ }
+
+ const auto& [candidateTime, candidateBucket] = *it;
+ invariant(candidateTime <= info.time);
+ // We need to make sure our measurement can fit without violating max span. If not, we
+ // can't use this bucket.
+ if (info.time - candidateTime < Seconds(*info.options.getBucketMaxSpanSeconds())) {
+ return candidateBucket.bucketId;
+ }
+
+ return boost::none;
+}
+
void BucketCatalog::_abort(Stripe* stripe,
WithLock stripeLock,
std::shared_ptr<WriteBatch> batch,
@@ -1436,10 +1591,78 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
auto controlDoc = buildControlMinTimestampDoc(info.options.getTimeField(), roundedTime);
bucket->_minmax.update(
controlDoc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator());
-
return bucket;
}
+BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(const BSONObj& doc,
+ CreationInfo* info,
+ Bucket* bucket,
+ uint32_t sizeToBeAdded) {
+ const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility);
+
+ auto bucketTime = bucket->getTime();
+ if (info->time - bucketTime >= Seconds(*info->options.getBucketMaxSpanSeconds())) {
+ if (canArchive) {
+ info->stats.incNumBucketsArchivedDueToTimeForward();
+ return RolloverAction::kArchive;
+ } else {
+ info->stats.incNumBucketsClosedDueToTimeForward();
+ return RolloverAction::kClose;
+ }
+ }
+ if (info->time < bucketTime) {
+ if (canArchive) {
+ info->stats.incNumBucketsArchivedDueToTimeBackward();
+ return RolloverAction::kArchive;
+ } else {
+ info->stats.incNumBucketsClosedDueToTimeBackward();
+ return RolloverAction::kClose;
+ }
+ }
+ if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
+ info->stats.incNumBucketsClosedDueToCount();
+ return RolloverAction::kClose;
+ }
+ if (bucket->schemaIncompatible(
+ doc, info->options.getMetaField(), info->key.metadata.getComparator())) {
+ info->stats.incNumBucketsClosedDueToSchemaChange();
+ return RolloverAction::kClose;
+ }
+ if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
+ bool keepBucketOpenForLargeMeasurements =
+ bucket->_numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) &&
+ feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility);
+ if (keepBucketOpenForLargeMeasurements) {
+ // Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max
+ // bucket size to 12MB. This is to leave some space in the bucket if we need to add
+ // new internal fields to existing, full buckets.
+ static constexpr size_t largeMeasurementsMaxBucketSize =
+ BSONObjMaxUserSize - (4 * 1024 * 1024);
+
+ if (bucket->_size + sizeToBeAdded > largeMeasurementsMaxBucketSize) {
+ info->stats.incNumBucketsClosedDueToSize();
+ return RolloverAction::kClose;
+ }
+
+ // There's enough space to add this measurement and we're still below the large
+ // measurement threshold.
+ if (!bucket->_keptOpenDueToLargeMeasurements) {
+ // Only increment this metric once per bucket.
+ bucket->_keptOpenDueToLargeMeasurements = true;
+ info->stats.incNumBucketsKeptOpenDueToLargeMeasurements();
+ }
+ return RolloverAction::kNone;
+ } else {
+ info->stats.incNumBucketsClosedDueToSize();
+ return RolloverAction::kClose;
+ }
+ }
+
+ return RolloverAction::kNone;
+}
+
BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe,
WithLock stripeLock,
Bucket* bucket,
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index c2a82039ee8..f5a5adb1a12 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -34,6 +34,7 @@
#include <queue>
#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
+#include "mongo/db/catalog/collection.h"
#include "mongo/db/ops/single_write_result_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/db/timeseries/flat_bson.h"
@@ -70,6 +71,7 @@ class BucketCatalog {
AtomicWord<long long> numBucketsArchivedDueToTimeForward;
AtomicWord<long long> numBucketsArchivedDueToTimeBackward;
AtomicWord<long long> numBucketsArchivedDueToMemoryThreshold;
+ AtomicWord<long long> numBucketsArchivedDueToReopening;
AtomicWord<long long> numCommits;
AtomicWord<long long> numWaits;
AtomicWord<long long> numMeasurementsCommitted;
@@ -83,6 +85,8 @@ class BucketCatalog {
ExecutionStats* globalStats)
: _collectionStats(collectionStats), _globalStats(globalStats) {}
+ ExecutionStatsController() = delete;
+
void incNumBucketInserts(long long increment = 1);
void incNumBucketUpdates(long long increment = 1);
void incNumBucketsOpenedDueToMetadata(long long increment = 1);
@@ -95,6 +99,7 @@ class BucketCatalog {
void incNumBucketsArchivedDueToTimeForward(long long increment = 1);
void incNumBucketsArchivedDueToTimeBackward(long long increment = 1);
void incNumBucketsArchivedDueToMemoryThreshold(long long increment = 1);
+ void incNumBucketsArchivedDueToReopening(long long increment = 1);
void incNumCommits(long long increment = 1);
void incNumWaits(long long increment = 1);
void incNumMeasurementsCommitted(long long increment = 1);
@@ -230,6 +235,19 @@ public:
struct InsertResult {
std::shared_ptr<WriteBatch> batch;
ClosedBuckets closedBuckets;
+ boost::optional<OID> candidate;
+ };
+
+ /**
+ * Function that should run validation against the bucket to ensure it's a proper bucket
+ * document. Typically, this should execute Collection::checkValidation.
+ */
+ using BucketDocumentValidator =
+ std::function<std::pair<Collection::SchemaValidationResult, Status>(OperationContext*,
+ const BSONObj&)>;
+ struct BucketToReopen {
+ BSONObj bucketDocument;
+ BucketDocumentValidator validator;
};
static BucketCatalog& get(ServiceContext* svcCtx);
@@ -258,17 +276,46 @@ public:
BSONObj getMetadata(const BucketHandle& bucket) const;
/**
+ * Tries to insert 'doc' into a suitable bucket. If an open bucket is full (or has incompatible
+ * schema), but is otherwise suitable, we will close it and open a new bucket. If we find no
+ * bucket with matching data and a time range that can accomodate 'doc', we will not open a new
+ * bucket, but rather let the caller know to search for an archived or closed bucket that can
+ * accomodate 'doc'.
+ *
+ * If a suitable bucket is found or opened, returns the WriteBatch into which 'doc' was
+ * inserted and a list of any buckets that were closed to make space to insert 'doc'. Any
+ * caller who receives the same batch may commit or abort the batch after claiming commit
+ * rights. See WriteBatch for more details.
+ *
+ * If no suitable bucket is found or opened, returns an optional bucket ID. If set, the bucket
+ * ID corresponds to an archived bucket which should be fetched; otherwise the caller should
+ * search for a previously-closed bucket that can accomodate 'doc'. The caller should proceed to
+ * call 'insert' to insert 'doc', passing any fetched bucket.
+ */
+ StatusWith<InsertResult> tryInsert(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine);
+
+ /**
* Returns the WriteBatch into which the document was inserted and a list of any buckets that
* were closed in order to make space to insert the document. Any caller who receives the same
* batch may commit or abort the batch after claiming commit rights. See WriteBatch for more
* details.
+ *
+ * If 'bucketToReopen' is passed, we will reopen that bucket and attempt to add 'doc' to that
+ * bucket. Otherwise we will attempt to find a suitable open bucket, or open a new bucket if
+ * none exists.
*/
StatusWith<InsertResult> insert(OperationContext* opCtx,
const NamespaceString& ns,
const StringData::ComparatorInterface* comparator,
const TimeseriesOptions& options,
const BSONObj& doc,
- CombineWithInsertsFromOtherClients combine);
+ CombineWithInsertsFromOtherClients combine,
+ boost::optional<BucketToReopen> bucketToReopen = boost::none);
/**
* Prepares a batch for commit, transitioning it to an inactive state. Caller must already have
@@ -343,6 +390,7 @@ private:
BucketMetadata(BSONElement elem, const StringData::ComparatorInterface* comparator);
bool operator==(const BucketMetadata& other) const;
+ bool operator!=(const BucketMetadata& other) const;
const BSONObj& toBSON() const;
@@ -384,6 +432,9 @@ private:
bool operator==(const BucketKey& other) const {
return ns == other.ns && metadata == other.metadata;
}
+ bool operator!=(const BucketKey& other) const {
+ return !(*this == other);
+ }
template <typename H>
friend H AbslHashValue(H h, const BucketKey& key) {
@@ -438,11 +489,28 @@ private:
// Buckets that are not currently in the catalog, but which are eligible to receive more
// measurements. The top-level map is keyed by the hash of the BucketKey, while the stored
// map is keyed by the bucket's minimum timestamp.
- stdx::unordered_map<BucketKey::Hash, std::map<Date_t, ArchivedBucket>, PreHashed>
+ //
+ // We invert the key comparison in the inner map so that we can use lower_bound to
+ // efficiently find an archived bucket that is a candidate for an incoming measurement.
+ stdx::unordered_map<BucketKey::Hash,
+ std::map<Date_t, ArchivedBucket, std::greater<Date_t>>,
+ PreHashed>
archivedBuckets;
};
- StripeNumber _getStripeNumber(const BucketKey& key);
+ /**
+ * Extracts the information from the input 'doc' that is used to map the document to a bucket.
+ */
+ StatusWith<std::pair<BucketKey, Date_t>> _extractBucketingParameters(
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ const BSONObj& doc) const;
+
+ /**
+ * Maps bucket key to the stripe that is responsible for it.
+ */
+ StripeNumber _getStripeNumber(const BucketKey& key) const;
/**
* Mode enum to control whether the bucket retrieval methods below will return buckets that are
@@ -475,9 +543,72 @@ private:
BucketState targetState);
/**
- * Retrieve a bucket for write use, or create one if a suitable bucket doesn't already exist.
+ * Mode enum to control whether the bucket retrieval methods below will create new buckets if no
+ * suitable bucket exists.
+ */
+ enum class AllowBucketCreation { kYes, kNo };
+
+ /**
+ * Retrieve a bucket for write use if one exists. If none exists and 'mode' is set to kYes, then
+ * we will create a new bucket.
*/
- Bucket* _useOrCreateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info);
+ Bucket* _useBucket(Stripe* stripe,
+ WithLock stripeLock,
+ const CreationInfo& info,
+ AllowBucketCreation mode);
+
+ /**
+ * Given a bucket to reopen, performs validation and constructs the in-memory representation of
+ * the bucket. If specified, 'expectedKey' is matched against the key extracted from the
+ * document to validate that the bucket is expected (i.e. to help resolve hash collisions for
+ * archived buckets). Does *not* hand ownership of the bucket to the catalog.
+ */
+ StatusWith<std::unique_ptr<Bucket>> _rehydrateBucket(
+ OperationContext* opCtx,
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ ExecutionStatsController stats,
+ boost::optional<BucketToReopen> bucketToReopen,
+ boost::optional<const BucketKey&> expectedKey) const;
+
+ /**
+ * Given a rehydrated 'bucket', passes ownership of that bucket to the catalog, marking the
+ * bucket as open.
+ */
+ Bucket* _reopenBucket(Stripe* stripe,
+ WithLock stripeLock,
+ ExecutionStatsController stats,
+ const BucketKey& key,
+ std::unique_ptr<Bucket>&& bucket,
+ ClosedBuckets* closedBuckets);
+
+ /**
+ * Helper method to perform the heavy lifting for both 'tryInsert' and 'insert'. See
+ * documentation on callers for more details.
+ */
+ StatusWith<InsertResult> _insert(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine,
+ AllowBucketCreation mode,
+ boost::optional<BucketToReopen> bucketToReopen = boost::none);
+
+ /**
+ * Given an already-selected 'bucket', inserts 'doc' to the bucket if possible. If not, and
+ * 'mode' is set to 'kYes', we will create a new bucket and insert into that bucket.
+ */
+ std::shared_ptr<WriteBatch> _insertIntoBucket(OperationContext* opCtx,
+ Stripe* stripe,
+ WithLock stripeLock,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine,
+ AllowBucketCreation mode,
+ CreationInfo* info,
+ Bucket* bucket,
+ ClosedBuckets* closedBuckets);
/**
* Wait for other batches to finish so we can prepare 'batch'
@@ -496,6 +627,14 @@ private:
void _archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket);
/**
+ * Identifies a previously archived bucket that may be able to accomodate the measurement
+ * represented by 'info', if one exists.
+ */
+ boost::optional<OID> _findArchivedCandidate(const Stripe& stripe,
+ WithLock stripeLock,
+ const CreationInfo& info) const;
+
+ /**
* Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other
* unprepared batches and remove the bucket from the catalog if there is no unprepared batch.
*/
@@ -546,6 +685,15 @@ private:
enum class RolloverAction { kNone, kArchive, kClose };
/**
+ * Determines if 'bucket' needs to be rolled over to accomodate 'doc'. If so, determines whether
+ * to archive or close 'bucket'.
+ */
+ RolloverAction _determineRolloverAction(const BSONObj& doc,
+ CreationInfo* info,
+ Bucket* bucket,
+ uint32_t sizeToBeAdded);
+
+ /**
* Close the existing, full bucket and open a new one for the same metadata.
*
* Writes information about the closed bucket to the 'info' parameter.
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index c6e91d25b53..045729e31b0 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#include <boost/optional/optional_io.hpp>
+
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/catalog/catalog_test_fixture.h"
#include "mongo/db/catalog/create_collection.h"
@@ -48,6 +50,7 @@ constexpr StringData kNumBucketsReopened = "numBucketsReopened"_sd;
constexpr StringData kNumArchivedDueToTimeForward = "numBucketsArchivedDueToTimeForward"_sd;
constexpr StringData kNumArchivedDueToTimeBackward = "numBucketsArchivedDueToTimeBackward"_sd;
constexpr StringData kNumArchivedDueToMemoryThreshold = "numBucketsArchivedDueToMemoryThreshold"_sd;
+constexpr StringData kNumArchivedDueToReopening = "numBucketsArchivedDueToReopening"_sd;
constexpr StringData kNumClosedDueToTimeForward = "numBucketsClosedDueToTimeForward"_sd;
constexpr StringData kNumClosedDueToTimeBackward = "numBucketsClosedDueToTimeBackward"_sd;
constexpr StringData kNumClosedDueToMemoryThreshold = "numBucketsClosedDueToMemoryThreshold"_sd;
@@ -983,6 +986,9 @@ TEST_F(BucketCatalogTest, SchemaChanges) {
}
TEST_F(BucketCatalogTest, ReopenMalformedBucket) {
+ RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
"control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1},
@@ -1074,6 +1080,9 @@ TEST_F(BucketCatalogTest, ReopenMalformedBucket) {
}
TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement) {
+ RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+
// Bucket document to reopen.
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
@@ -1124,6 +1133,9 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement
}
TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasurement) {
+ RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+
// Bucket document to reopen.
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
@@ -1168,6 +1180,9 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme
}
TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) {
+ RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+
// Bucket document to reopen.
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
@@ -1227,6 +1242,9 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement)
}
TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement) {
+ RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+
// Bucket document to reopen.
BSONObj bucketDoc = ::mongo::fromjson(
R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
@@ -1430,5 +1448,208 @@ TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) {
ASSERT_LT(numClosedInFirstRound, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold));
}
+TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
+ RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+ AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX);
+
+ // Try to insert with no open bucket. Should hint to re-open.
+ auto result = _bucketCatalog->tryInsert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ ASSERT(result.getValue().closedBuckets.empty());
+ ASSERT(!result.getValue().batch);
+ ASSERT_EQ(result.getValue().candidate, boost::none);
+
+ // Actually insert so we do have an open bucket to test against.
+ result = _bucketCatalog->insert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ auto batch = result.getValue().batch;
+ ASSERT(batch);
+ auto bucketId = batch->bucket().id;
+ ASSERT(batch->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
+ ASSERT_EQ(batch->measurements().size(), 1);
+ _bucketCatalog->finish(batch, {});
+
+ // Time backwards should hint to re-open.
+ result = _bucketCatalog->tryInsert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ ASSERT(result.getValue().closedBuckets.empty());
+ ASSERT(!result.getValue().batch);
+ ASSERT_EQ(result.getValue().candidate, boost::none);
+
+ // So should time forward.
+ result = _bucketCatalog->tryInsert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ ASSERT(result.getValue().closedBuckets.empty());
+ ASSERT(!result.getValue().batch);
+ ASSERT_EQ(result.getValue().candidate, boost::none);
+
+ // Now let's insert something so we archive the existing bucket.
+ result = _bucketCatalog->insert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ batch = result.getValue().batch;
+ ASSERT_NE(batch->bucket().id, bucketId);
+ ASSERT(batch);
+ ASSERT(batch->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
+ ASSERT_EQ(batch->measurements().size(), 1);
+ _bucketCatalog->finish(batch, {});
+
+ // If we try to insert something that could fit in the archived bucket, we should get it back as
+ // a candidate.
+ result = _bucketCatalog->tryInsert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ ASSERT(result.getValue().closedBuckets.empty());
+ ASSERT(!result.getValue().batch);
+ ASSERT(result.getValue().candidate.has_value());
+ ASSERT_EQ(result.getValue().candidate.value(), bucketId);
+}
+
+TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) {
+ RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+ AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX);
+
+ // Insert a document so we have a base bucket
+ auto result = _bucketCatalog->insert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a": true})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ auto batch = result.getValue().batch;
+ ASSERT(batch);
+ auto bucketId = batch->bucket().id;
+ ASSERT(batch->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
+ ASSERT_EQ(batch->measurements().size(), 1);
+ _bucketCatalog->finish(batch, {});
+
+ // Incompatible schema would close the existing bucket, so we should expect to open a new bucket
+ // and proceed to insert the document.
+ result = _bucketCatalog->tryInsert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}, "a": {}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ batch = result.getValue().batch;
+ ASSERT(batch);
+ ASSERT_NE(batch->bucket().id, bucketId);
+ ASSERT(batch->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
+ ASSERT_EQ(batch->measurements().size(), 1);
+ _bucketCatalog->finish(batch, {});
+}
+
+TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
+ RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+ AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX);
+
+ // Insert a document so we have a base bucket and we can test that we archive it when we reopen
+ // a conflicting bucket.
+ auto result = _bucketCatalog->insert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ auto batch = result.getValue().batch;
+ ASSERT(batch);
+ auto oldBucketId = batch->bucket().id;
+ ASSERT(batch->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
+ ASSERT_EQ(batch->measurements().size(), 1);
+ _bucketCatalog->finish(batch, {});
+
+ BSONObj bucketDoc = ::mongo::fromjson(
+ R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
+ "control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"}},
+ "max":{"time":{"$date":"2022-06-06T15:34:30.000Z"}}},
+ "data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"}}}})");
+ ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId);
+ auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto {
+ return autoColl->checkValidation(opCtx, bucketDoc);
+ };
+
+ // We should be able to pass in a valid bucket and insert into it.
+ result = _bucketCatalog->insert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow,
+ BucketCatalog::BucketToReopen{bucketDoc, validator});
+ ASSERT_OK(result.getStatus());
+ batch = result.getValue().batch;
+ ASSERT(batch);
+ ASSERT_EQ(batch->bucket().id, bucketDoc["_id"].OID());
+ ASSERT(batch->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
+ ASSERT_EQ(batch->measurements().size(), 1);
+ _bucketCatalog->finish(batch, {});
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToReopening));
+ ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened));
+
+ // Verify the old bucket was archived and we'll get it back as a candidate.
+ result = _bucketCatalog->tryInsert(
+ _opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:35:40.000Z"}})"),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
+ ASSERT_OK(result.getStatus());
+ ASSERT(result.getValue().closedBuckets.empty());
+ ASSERT(!result.getValue().batch);
+ ASSERT(result.getValue().candidate.has_value());
+ ASSERT_EQ(result.getValue().candidate.value(), oldBucketId);
+}
+
} // namespace
} // namespace mongo