diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2022-06-24 20:39:58 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-24 21:35:53 +0000 |
commit | 11aa03c2de2d008bb87264a24c225ff7da4e1a68 (patch) | |
tree | 7d5765b2fdc4207ca7ac2d0cd91e6520012992c0 /src/mongo/db/timeseries | |
parent | b05965036e8659ea4ef5a312dca8d12761c17a33 (diff) | |
download | mongo-11aa03c2de2d008bb87264a24c225ff7da4e1a68.tar.gz |
SERVER-67262 Refactor BucketCatalog APIs to allow for bucket re-opening
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 651 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 158 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 221 |
3 files changed, 811 insertions, 219 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 41148c11dd0..fb506164f75 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -279,6 +279,12 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToMemoryTh _globalStats->numBucketsArchivedDueToMemoryThreshold.fetchAndAddRelaxed(increment); } +void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToReopening( + long long increment) { + _collectionStats->numBucketsArchivedDueToReopening.fetchAndAddRelaxed(increment); + _globalStats->numBucketsArchivedDueToReopening.fetchAndAddRelaxed(increment); +} + void BucketCatalog::ExecutionStatsController::incNumCommits(long long increment) { _collectionStats->numCommits.fetchAndAddRelaxed(increment); _globalStats->numCommits.fetchAndAddRelaxed(increment); @@ -664,92 +670,40 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx, str::stream() << "Attempting to reopen a bucket for a non-timeseries collection: " << ns); - BSONElement bucketIdElem = bucketDoc.getField(timeseries::kBucketIdFieldName); - if (bucketIdElem.eoo() || bucketIdElem.type() != BSONType::jstOID) { - return {ErrorCodes::BadValue, - str::stream() << timeseries::kBucketIdFieldName - << " is missing or not an ObjectId"}; - } - - // Validate the bucket document against the schema. - auto result = coll->checkValidation(opCtx, bucketDoc); - if (result.first != Collection::SchemaValidationResult::kPass) { - return result.second; - } - BSONElement metadata; auto metaFieldName = options->getMetaField(); if (metaFieldName) { metadata = bucketDoc.getField(*metaFieldName); } - - // Buckets are spread across independently-lockable stripes to improve parallelism. We map a - // bucket to a stripe by hashing the BucketKey. auto key = BucketKey{ns, BucketMetadata{metadata, coll->getDefaultCollator()}}; - auto stripeNumber = _getStripeNumber(key); - - auto bucketId = bucketIdElem.OID(); - std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber, key.hash); - // Initialize the remaining member variables from the bucket document. - bucket->_ns = ns; - bucket->_metadata = key.metadata; - bucket->_timeField = options->getTimeField().toString(); - bucket->_size = bucketDoc.objsize(); - bucket->_minTime = bucketDoc.getObjectField(timeseries::kBucketControlFieldName) - .getObjectField(timeseries::kBucketControlMinFieldName) - .getField(options->getTimeField()) - .Date(); - - // Populate the top-level data field names. - const BSONObj& dataObj = bucketDoc.getObjectField(timeseries::kBucketDataFieldName); - for (const BSONElement& dataElem : dataObj) { - auto hashedKey = StringSet::hasher().hashed_key(dataElem.fieldName()); - bucket->_fieldNames.emplace(hashedKey); - } - - auto swMinMax = timeseries::generateMinMaxFromBucketDoc(bucketDoc, coll->getDefaultCollator()); - if (!swMinMax.isOK()) { - return swMinMax.getStatus(); - } - bucket->_minmax = std::move(swMinMax.getValue()); - - auto swSchema = timeseries::generateSchemaFromBucketDoc(bucketDoc, coll->getDefaultCollator()); - if (!swSchema.isOK()) { - return swSchema.getStatus(); - } - bucket->_schema = std::move(swSchema.getValue()); + // Validate the bucket document against the schema. + auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto { + return coll->checkValidation(opCtx, bucketDoc); + }; - uint32_t numMeasurements = 0; - const bool isCompressed = timeseries::isCompressedBucket(bucketDoc); - const BSONElement timeColumnElem = dataObj.getField(options->getTimeField()); + auto stats = _getExecutionStats(ns); - if (isCompressed && timeColumnElem.type() == BSONType::BinData) { - BSONColumn storage{timeColumnElem}; - numMeasurements = storage.size(); - } else { - numMeasurements = timeColumnElem.Obj().nFields(); + auto res = _rehydrateBucket(opCtx, + ns, + coll->getDefaultCollator(), + *options, + stats, + BucketToReopen{bucketDoc, validator}, + boost::none); + if (!res.isOK()) { + return res.getStatus(); } + auto bucket = std::move(res.getValue()); - bucket->_numMeasurements = numMeasurements; - bucket->_numCommittedMeasurements = numMeasurements; - - ExecutionStatsController stats = _getExecutionStats(ns); - stats.incNumBucketsReopened(); + auto stripeNumber = _getStripeNumber(key); // Register the reopened bucket with the catalog. auto& stripe = _stripes[stripeNumber]; stdx::lock_guard stripeLock{stripe.mutex}; ClosedBuckets closedBuckets; - _expireIdleBuckets(&stripe, stripeLock, stats, &closedBuckets); - - auto [it, inserted] = stripe.allBuckets.try_emplace(bucketId, std::move(bucket)); - tassert(6668200, "Expected bucket to be inserted", inserted); - Bucket* unownedBucket = it->second.get(); - stripe.openBuckets[key] = unownedBucket; - _initializeBucketState(bucketId); - + _reopenBucket(&stripe, stripeLock, stats, key, std::move(bucket), &closedBuckets); return Status::OK(); } @@ -765,151 +719,26 @@ BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) const { return bucket->_metadata.toBSON(); } -StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( +StatusWith<BucketCatalog::InsertResult> BucketCatalog::tryInsert( OperationContext* opCtx, const NamespaceString& ns, const StringData::ComparatorInterface* comparator, const TimeseriesOptions& options, const BSONObj& doc, CombineWithInsertsFromOtherClients combine) { + return _insert(opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kNo); +} - auto timeElem = doc[options.getTimeField()]; - if (!timeElem || BSONType::Date != timeElem.type()) { - return {ErrorCodes::BadValue, - str::stream() << "'" << options.getTimeField() << "' must be present and contain a " - << "valid BSON UTC datetime value"}; - } - auto time = timeElem.Date(); - - ExecutionStatsController stats = _getExecutionStats(ns); - - BSONElement metadata; - auto metaFieldName = options.getMetaField(); - if (metaFieldName) { - metadata = doc[*metaFieldName]; - } - - // Buckets are spread across independently-lockable stripes to improve parallelism. We map a - // bucket to a stripe by hashing the BucketKey. - auto key = BucketKey{ns, BucketMetadata{metadata, comparator}}; - auto stripeNumber = _getStripeNumber(key); - - ClosedBuckets closedBuckets; - CreationInfo info{key, stripeNumber, time, options, stats, &closedBuckets}; - - auto& stripe = _stripes[stripeNumber]; - stdx::lock_guard stripeLock{stripe.mutex}; - - Bucket* bucket = _useOrCreateBucket(&stripe, stripeLock, info); - invariant(bucket); - - NewFieldNames newFieldNamesToBeInserted; - uint32_t sizeToBeAdded = 0; - bucket->_calculateBucketFieldsAndSizeChange( - doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); - - auto determineRolloverAction = [&](Bucket* bucket) -> RolloverAction { - const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( - serverGlobalParams.featureCompatibility); - - if (bucket->schemaIncompatible(doc, metaFieldName, comparator)) { - stats.incNumBucketsClosedDueToSchemaChange(); - return RolloverAction::kClose; - } - if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { - stats.incNumBucketsClosedDueToCount(); - return RolloverAction::kClose; - } - auto bucketTime = bucket->getTime(); - if (time - bucketTime >= Seconds(*options.getBucketMaxSpanSeconds())) { - if (canArchive) { - stats.incNumBucketsArchivedDueToTimeForward(); - return RolloverAction::kArchive; - } else { - stats.incNumBucketsClosedDueToTimeForward(); - return RolloverAction::kClose; - } - } - if (time < bucketTime) { - if (canArchive) { - stats.incNumBucketsArchivedDueToTimeBackward(); - return RolloverAction::kArchive; - } else { - stats.incNumBucketsClosedDueToTimeBackward(); - return RolloverAction::kClose; - } - } - if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { - bool keepBucketOpenForLargeMeasurements = - bucket->_numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) && - feature_flags::gTimeseriesScalabilityImprovements.isEnabled( - serverGlobalParams.featureCompatibility); - if (keepBucketOpenForLargeMeasurements) { - // Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max - // bucket size to 12MB. This is to leave some space in the bucket if we need to add - // new internal fields to existing, full buckets. - static constexpr size_t largeMeasurementsMaxBucketSize = - BSONObjMaxUserSize - (4 * 1024 * 1024); - - if (bucket->_size + sizeToBeAdded > largeMeasurementsMaxBucketSize) { - stats.incNumBucketsClosedDueToSize(); - return RolloverAction::kClose; - } - - // There's enough space to add this measurement and we're still below the large - // measurement threshold. - if (!bucket->_keptOpenDueToLargeMeasurements) { - // Only increment this metric once per bucket. - bucket->_keptOpenDueToLargeMeasurements = true; - stats.incNumBucketsKeptOpenDueToLargeMeasurements(); - } - return RolloverAction::kNone; - } else { - stats.incNumBucketsClosedDueToSize(); - return RolloverAction::kClose; - } - } - return RolloverAction::kNone; - }; - - if (!bucket->_ns.isEmpty()) { - auto action = determineRolloverAction(bucket); - if (action != RolloverAction::kNone) { - info.openedDuetoMetadata = false; - bucket = _rollover(&stripe, stripeLock, bucket, info, action); - - bucket->_calculateBucketFieldsAndSizeChange( - doc, options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); - } - } - - auto batch = bucket->_activeBatch(getOpId(opCtx, combine), stats); - batch->_addMeasurement(doc); - batch->_recordNewFields(bucket, std::move(newFieldNamesToBeInserted)); - - bucket->_numMeasurements++; - bucket->_size += sizeToBeAdded; - if (bucket->_ns.isEmpty()) { - // The namespace and metadata only need to be set if this bucket was newly created. - bucket->_ns = ns; - bucket->_metadata = key.metadata; - - // The namespace is stored two times: the bucket itself and openBuckets. - // We don't have a great approximation for the - // _schema size, so we use initial document size minus metadata as an approximation. Since - // the metadata itself is stored once, in the bucket, we can combine the two and just use - // the initial document size. A unique pointer to the bucket is stored once: allBuckets. A - // raw pointer to the bucket is stored at most twice: openBuckets, idleBuckets. - bucket->_memoryUsage += (ns.size() * 2) + doc.objsize() + sizeof(Bucket) + - sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2); - - bucket->_schema.update(doc, options.getMetaField(), comparator); - } else { - _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); - } - _memoryUsage.fetchAndAdd(bucket->_memoryUsage); - - return InsertResult{batch, closedBuckets}; +StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( + OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + boost::optional<BucketToReopen> bucketToReopen) { + return _insert( + opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kYes, bucketToReopen); } Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { @@ -1095,6 +924,8 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, stats->numBucketsArchivedDueToTimeBackward.load()); builder->appendNumber("numBucketsArchivedDueToMemoryThreshold", stats->numBucketsArchivedDueToMemoryThreshold.load()); + builder->appendNumber("numBucketsArchivedDueToReopening", + stats->numBucketsArchivedDueToReopening.load()); builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load()); builder->appendNumber("numBucketsKeptOpenDueToLargeMeasurements", stats->numBucketsKeptOpenDueToLargeMeasurements.load()); @@ -1152,7 +983,33 @@ std::size_t BucketCatalog::PreHashed::operator()(const BucketKey::Hash& key) con return key; } -BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) { +StatusWith<std::pair<BucketCatalog::BucketKey, Date_t>> BucketCatalog::_extractBucketingParameters( + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc) const { + auto timeElem = doc[options.getTimeField()]; + if (!timeElem || BSONType::Date != timeElem.type()) { + return {ErrorCodes::BadValue, + str::stream() << "'" << options.getTimeField() << "' must be present and contain a " + << "valid BSON UTC datetime value"}; + } + auto time = timeElem.Date(); + + BSONElement metadata; + auto metaFieldName = options.getMetaField(); + if (metaFieldName) { + metadata = doc[*metaFieldName]; + } + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto key = BucketKey{ns, BucketMetadata{metadata, comparator}}; + + return {std::make_pair(key, time)}; +} + +BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) const { return key.hash % kNumberOfStripes; } @@ -1195,13 +1052,15 @@ BucketCatalog::Bucket* BucketCatalog::_useBucketInState(Stripe* stripe, return nullptr; } -BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info) { +BucketCatalog::Bucket* BucketCatalog::_useBucket(Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info, + AllowBucketCreation mode) { auto it = stripe->openBuckets.find(info.key); if (it == stripe->openBuckets.end()) { // No open bucket for this metadata. - return _allocateBucket(stripe, stripeLock, info); + return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) + : nullptr; } Bucket* bucket = it->second; @@ -1218,7 +1077,273 @@ BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe, nullptr, getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); - return _allocateBucket(stripe, stripeLock, info); + return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) : nullptr; +} + +StatusWith<std::unique_ptr<BucketCatalog::Bucket>> BucketCatalog::_rehydrateBucket( + OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + ExecutionStatsController stats, + boost::optional<BucketToReopen> bucketToReopen, + boost::optional<const BucketKey&> expectedKey) const { + if (!bucketToReopen) { + // Nothing to rehydrate. + return {ErrorCodes::BadValue, "No bucket to rehydrate"}; + } + invariant(feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility)); + const auto& [bucketDoc, validator] = bucketToReopen.value(); + + BSONElement bucketIdElem = bucketDoc.getField(timeseries::kBucketIdFieldName); + if (bucketIdElem.eoo() || bucketIdElem.type() != BSONType::jstOID) { + return {ErrorCodes::BadValue, + str::stream() << timeseries::kBucketIdFieldName + << " is missing or not an ObjectId"}; + } + + // Validate the bucket document against the schema. + auto result = validator(opCtx, bucketDoc); + if (result.first != Collection::SchemaValidationResult::kPass) { + return result.second; + } + + BSONElement metadata; + auto metaFieldName = options.getMetaField(); + if (metaFieldName) { + metadata = bucketDoc.getField(*metaFieldName); + } + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto key = BucketKey{ns, BucketMetadata{metadata, comparator}}; + if (expectedKey.has_value() && key != expectedKey.value()) { + return {ErrorCodes::BadValue, "Bucket metadata does not match (hash collision)"}; + } + auto stripeNumber = _getStripeNumber(key); + + auto bucketId = bucketIdElem.OID(); + std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber, key.hash); + + // Initialize the remaining member variables from the bucket document. + bucket->_ns = ns; + bucket->_metadata = key.metadata; + bucket->_timeField = options.getTimeField().toString(); + bucket->_size = bucketDoc.objsize(); + bucket->_minTime = bucketDoc.getObjectField(timeseries::kBucketControlFieldName) + .getObjectField(timeseries::kBucketControlMinFieldName) + .getField(options.getTimeField()) + .Date(); + + // Populate the top-level data field names. + const BSONObj& dataObj = bucketDoc.getObjectField(timeseries::kBucketDataFieldName); + for (const BSONElement& dataElem : dataObj) { + auto hashedKey = StringSet::hasher().hashed_key(dataElem.fieldName()); + bucket->_fieldNames.emplace(hashedKey); + } + + auto swMinMax = timeseries::generateMinMaxFromBucketDoc(bucketDoc, comparator); + if (!swMinMax.isOK()) { + return swMinMax.getStatus(); + } + bucket->_minmax = std::move(swMinMax.getValue()); + + auto swSchema = timeseries::generateSchemaFromBucketDoc(bucketDoc, comparator); + if (!swSchema.isOK()) { + return swSchema.getStatus(); + } + bucket->_schema = std::move(swSchema.getValue()); + + uint32_t numMeasurements = 0; + const bool isCompressed = timeseries::isCompressedBucket(bucketDoc); + const BSONElement timeColumnElem = dataObj.getField(options.getTimeField()); + + if (isCompressed && timeColumnElem.type() == BSONType::BinData) { + BSONColumn storage{timeColumnElem}; + numMeasurements = storage.size(); + } else { + numMeasurements = timeColumnElem.Obj().nFields(); + } + + bucket->_numMeasurements = numMeasurements; + bucket->_numCommittedMeasurements = numMeasurements; + + return {std::move(bucket)}; +} + +BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController stats, + const BucketKey& key, + std::unique_ptr<Bucket>&& bucket, + ClosedBuckets* closedBuckets) { + invariant(bucket); + + _expireIdleBuckets(stripe, stripeLock, stats, closedBuckets); + + // If this bucket was archived, we need to remove it from the set of archived buckets. + if (auto setIt = stripe->archivedBuckets.find(key.hash); + setIt != stripe->archivedBuckets.end()) { + auto& archivedSet = setIt->second; + if (auto bucketIt = archivedSet.find(bucket->getTime()); + bucketIt != archivedSet.end() && bucket->id() == bucketIt->second.bucketId) { + if (archivedSet.size() == 1) { + stripe->archivedBuckets.erase(setIt); + } else { + archivedSet.erase(bucketIt); + } + } + } + + // We may need to initialize the bucket's state. + _initializeBucketState(bucket->id()); + + // Pass ownership of the reopened bucket to the bucket catalog. + auto [it, inserted] = stripe->allBuckets.try_emplace(bucket->id(), std::move(bucket)); + tassert(6668200, "Expected bucket to be inserted", inserted); + Bucket* unownedBucket = it->second.get(); + + // If we already have an open bucket for this key, we need to archive it. + if (auto it = stripe->openBuckets.find(key); it != stripe->openBuckets.end()) { + stats.incNumBucketsArchivedDueToReopening(); + if (it->second->allCommitted()) { + _archiveBucket(stripe, stripeLock, it->second); + } else { + it->second->_rolloverAction = RolloverAction::kArchive; + } + } + + // Now actually mark this bucket as open. + stripe->openBuckets[key] = unownedBucket; + stats.incNumBucketsReopened(); + + return unownedBucket; +} + +StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( + OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + boost::optional<BucketToReopen> bucketToReopen) { + auto res = _extractBucketingParameters(ns, comparator, options, doc); + if (!res.isOK()) { + return res.getStatus(); + } + auto& key = res.getValue().first; + auto time = res.getValue().second; + + ExecutionStatsController stats = _getExecutionStats(ns); + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto stripeNumber = _getStripeNumber(key); + + InsertResult result; + CreationInfo info{key, stripeNumber, time, options, stats, &result.closedBuckets}; + + auto rehydratedBucket = + _rehydrateBucket(opCtx, ns, comparator, options, stats, bucketToReopen, key); + + auto& stripe = _stripes[stripeNumber]; + stdx::lock_guard stripeLock{stripe.mutex}; + + if (rehydratedBucket.isOK()) { + invariant(mode == AllowBucketCreation::kYes); + + Bucket* bucket = _reopenBucket(&stripe, + stripeLock, + stats, + key, + std::move(rehydratedBucket.getValue()), + &result.closedBuckets); + + result.batch = _insertIntoBucket( + opCtx, &stripe, stripeLock, doc, combine, mode, &info, bucket, &result.closedBuckets); + invariant(result.batch); + + return result; + } + + Bucket* bucket = _useBucket(&stripe, stripeLock, info, mode); + if (!bucket) { + invariant(mode == AllowBucketCreation::kNo); + result.candidate = _findArchivedCandidate(stripe, stripeLock, info); + return result; + } + + result.batch = _insertIntoBucket( + opCtx, &stripe, stripeLock, doc, combine, mode, &info, bucket, &result.closedBuckets); + if (!result.batch) { + invariant(mode == AllowBucketCreation::kNo); + result.candidate = _findArchivedCandidate(stripe, stripeLock, info); + } + return result; +} + +std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::_insertIntoBucket( + OperationContext* opCtx, + Stripe* stripe, + WithLock stripeLock, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + CreationInfo* info, + Bucket* bucket, + ClosedBuckets* closedBuckets) { + NewFieldNames newFieldNamesToBeInserted; + uint32_t sizeToBeAdded = 0; + bucket->_calculateBucketFieldsAndSizeChange( + doc, info->options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); + + bool isNewlyOpenedBucket = bucket->_ns.isEmpty(); + if (!isNewlyOpenedBucket) { + auto action = _determineRolloverAction(doc, info, bucket, sizeToBeAdded); + if (action == RolloverAction::kArchive && mode == AllowBucketCreation::kNo) { + // We don't actually want to roll this bucket over yet, bail out. + return std::shared_ptr<WriteBatch>{}; + } else if (action != RolloverAction::kNone) { + info->openedDuetoMetadata = false; + bucket = _rollover(stripe, stripeLock, bucket, *info, action); + isNewlyOpenedBucket = true; + + bucket->_calculateBucketFieldsAndSizeChange( + doc, info->options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); + } + } + + auto batch = bucket->_activeBatch(getOpId(opCtx, combine), info->stats); + batch->_addMeasurement(doc); + batch->_recordNewFields(bucket, std::move(newFieldNamesToBeInserted)); + + bucket->_numMeasurements++; + bucket->_size += sizeToBeAdded; + if (isNewlyOpenedBucket) { + // The namespace and metadata only need to be set if this bucket was newly created. + bucket->_ns = info->key.ns; + bucket->_metadata = info->key.metadata; + + // The namespace is stored two times: the bucket itself and openBuckets. + // We don't have a great approximation for the + // _schema size, so we use initial document size minus metadata as an approximation. Since + // the metadata itself is stored once, in the bucket, we can combine the two and just use + // the initial document size. A unique pointer to the bucket is stored once: allBuckets. A + // raw pointer to the bucket is stored at most twice: openBuckets, idleBuckets. + bucket->_memoryUsage += (info->key.ns.size() * 2) + doc.objsize() + sizeof(Bucket) + + sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2); + + bucket->_schema.update( + doc, info->options.getMetaField(), info->key.metadata.getComparator()); + } else { + _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); + } + _memoryUsage.fetchAndAdd(bucket->_memoryUsage); + + return batch; } void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch) { @@ -1294,6 +1419,36 @@ void BucketCatalog::_archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* _removeBucket(stripe, stripeLock, bucket, archived); } +boost::optional<OID> BucketCatalog::_findArchivedCandidate(const Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info) const { + const auto setIt = stripe.archivedBuckets.find(info.key.hash); + if (setIt == stripe.archivedBuckets.end()) { + return boost::none; + } + + const auto& archivedSet = setIt->second; + + // We want to find the largest time that is not greater than info.time. Generally lower_bound + // will return the smallest element not less than the search value, but we are using + // std::greater instead of std::less for the map's comparisons. This means the order of keys + // will be reversed, and lower_bound will return what we want. + auto it = archivedSet.lower_bound(info.time); + if (it == archivedSet.end()) { + return boost::none; + } + + const auto& [candidateTime, candidateBucket] = *it; + invariant(candidateTime <= info.time); + // We need to make sure our measurement can fit without violating max span. If not, we + // can't use this bucket. + if (info.time - candidateTime < Seconds(*info.options.getBucketMaxSpanSeconds())) { + return candidateBucket.bucketId; + } + + return boost::none; +} + void BucketCatalog::_abort(Stripe* stripe, WithLock stripeLock, std::shared_ptr<WriteBatch> batch, @@ -1436,10 +1591,78 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, auto controlDoc = buildControlMinTimestampDoc(info.options.getTimeField(), roundedTime); bucket->_minmax.update( controlDoc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator()); - return bucket; } +BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(const BSONObj& doc, + CreationInfo* info, + Bucket* bucket, + uint32_t sizeToBeAdded) { + const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + + auto bucketTime = bucket->getTime(); + if (info->time - bucketTime >= Seconds(*info->options.getBucketMaxSpanSeconds())) { + if (canArchive) { + info->stats.incNumBucketsArchivedDueToTimeForward(); + return RolloverAction::kArchive; + } else { + info->stats.incNumBucketsClosedDueToTimeForward(); + return RolloverAction::kClose; + } + } + if (info->time < bucketTime) { + if (canArchive) { + info->stats.incNumBucketsArchivedDueToTimeBackward(); + return RolloverAction::kArchive; + } else { + info->stats.incNumBucketsClosedDueToTimeBackward(); + return RolloverAction::kClose; + } + } + if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { + info->stats.incNumBucketsClosedDueToCount(); + return RolloverAction::kClose; + } + if (bucket->schemaIncompatible( + doc, info->options.getMetaField(), info->key.metadata.getComparator())) { + info->stats.incNumBucketsClosedDueToSchemaChange(); + return RolloverAction::kClose; + } + if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { + bool keepBucketOpenForLargeMeasurements = + bucket->_numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) && + feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + if (keepBucketOpenForLargeMeasurements) { + // Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max + // bucket size to 12MB. This is to leave some space in the bucket if we need to add + // new internal fields to existing, full buckets. + static constexpr size_t largeMeasurementsMaxBucketSize = + BSONObjMaxUserSize - (4 * 1024 * 1024); + + if (bucket->_size + sizeToBeAdded > largeMeasurementsMaxBucketSize) { + info->stats.incNumBucketsClosedDueToSize(); + return RolloverAction::kClose; + } + + // There's enough space to add this measurement and we're still below the large + // measurement threshold. + if (!bucket->_keptOpenDueToLargeMeasurements) { + // Only increment this metric once per bucket. + bucket->_keptOpenDueToLargeMeasurements = true; + info->stats.incNumBucketsKeptOpenDueToLargeMeasurements(); + } + return RolloverAction::kNone; + } else { + info->stats.incNumBucketsClosedDueToSize(); + return RolloverAction::kClose; + } + } + + return RolloverAction::kNone; +} + BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe, WithLock stripeLock, Bucket* bucket, diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index c2a82039ee8..f5a5adb1a12 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -34,6 +34,7 @@ #include <queue> #include "mongo/bson/unordered_fields_bsonobj_comparator.h" +#include "mongo/db/catalog/collection.h" #include "mongo/db/ops/single_write_result_gen.h" #include "mongo/db/service_context.h" #include "mongo/db/timeseries/flat_bson.h" @@ -70,6 +71,7 @@ class BucketCatalog { AtomicWord<long long> numBucketsArchivedDueToTimeForward; AtomicWord<long long> numBucketsArchivedDueToTimeBackward; AtomicWord<long long> numBucketsArchivedDueToMemoryThreshold; + AtomicWord<long long> numBucketsArchivedDueToReopening; AtomicWord<long long> numCommits; AtomicWord<long long> numWaits; AtomicWord<long long> numMeasurementsCommitted; @@ -83,6 +85,8 @@ class BucketCatalog { ExecutionStats* globalStats) : _collectionStats(collectionStats), _globalStats(globalStats) {} + ExecutionStatsController() = delete; + void incNumBucketInserts(long long increment = 1); void incNumBucketUpdates(long long increment = 1); void incNumBucketsOpenedDueToMetadata(long long increment = 1); @@ -95,6 +99,7 @@ class BucketCatalog { void incNumBucketsArchivedDueToTimeForward(long long increment = 1); void incNumBucketsArchivedDueToTimeBackward(long long increment = 1); void incNumBucketsArchivedDueToMemoryThreshold(long long increment = 1); + void incNumBucketsArchivedDueToReopening(long long increment = 1); void incNumCommits(long long increment = 1); void incNumWaits(long long increment = 1); void incNumMeasurementsCommitted(long long increment = 1); @@ -230,6 +235,19 @@ public: struct InsertResult { std::shared_ptr<WriteBatch> batch; ClosedBuckets closedBuckets; + boost::optional<OID> candidate; + }; + + /** + * Function that should run validation against the bucket to ensure it's a proper bucket + * document. Typically, this should execute Collection::checkValidation. + */ + using BucketDocumentValidator = + std::function<std::pair<Collection::SchemaValidationResult, Status>(OperationContext*, + const BSONObj&)>; + struct BucketToReopen { + BSONObj bucketDocument; + BucketDocumentValidator validator; }; static BucketCatalog& get(ServiceContext* svcCtx); @@ -258,17 +276,46 @@ public: BSONObj getMetadata(const BucketHandle& bucket) const; /** + * Tries to insert 'doc' into a suitable bucket. If an open bucket is full (or has incompatible + * schema), but is otherwise suitable, we will close it and open a new bucket. If we find no + * bucket with matching data and a time range that can accomodate 'doc', we will not open a new + * bucket, but rather let the caller know to search for an archived or closed bucket that can + * accomodate 'doc'. + * + * If a suitable bucket is found or opened, returns the WriteBatch into which 'doc' was + * inserted and a list of any buckets that were closed to make space to insert 'doc'. Any + * caller who receives the same batch may commit or abort the batch after claiming commit + * rights. See WriteBatch for more details. + * + * If no suitable bucket is found or opened, returns an optional bucket ID. If set, the bucket + * ID corresponds to an archived bucket which should be fetched; otherwise the caller should + * search for a previously-closed bucket that can accomodate 'doc'. The caller should proceed to + * call 'insert' to insert 'doc', passing any fetched bucket. + */ + StatusWith<InsertResult> tryInsert(OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine); + + /** * Returns the WriteBatch into which the document was inserted and a list of any buckets that * were closed in order to make space to insert the document. Any caller who receives the same * batch may commit or abort the batch after claiming commit rights. See WriteBatch for more * details. + * + * If 'bucketToReopen' is passed, we will reopen that bucket and attempt to add 'doc' to that + * bucket. Otherwise we will attempt to find a suitable open bucket, or open a new bucket if + * none exists. */ StatusWith<InsertResult> insert(OperationContext* opCtx, const NamespaceString& ns, const StringData::ComparatorInterface* comparator, const TimeseriesOptions& options, const BSONObj& doc, - CombineWithInsertsFromOtherClients combine); + CombineWithInsertsFromOtherClients combine, + boost::optional<BucketToReopen> bucketToReopen = boost::none); /** * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have @@ -343,6 +390,7 @@ private: BucketMetadata(BSONElement elem, const StringData::ComparatorInterface* comparator); bool operator==(const BucketMetadata& other) const; + bool operator!=(const BucketMetadata& other) const; const BSONObj& toBSON() const; @@ -384,6 +432,9 @@ private: bool operator==(const BucketKey& other) const { return ns == other.ns && metadata == other.metadata; } + bool operator!=(const BucketKey& other) const { + return !(*this == other); + } template <typename H> friend H AbslHashValue(H h, const BucketKey& key) { @@ -438,11 +489,28 @@ private: // Buckets that are not currently in the catalog, but which are eligible to receive more // measurements. The top-level map is keyed by the hash of the BucketKey, while the stored // map is keyed by the bucket's minimum timestamp. - stdx::unordered_map<BucketKey::Hash, std::map<Date_t, ArchivedBucket>, PreHashed> + // + // We invert the key comparison in the inner map so that we can use lower_bound to + // efficiently find an archived bucket that is a candidate for an incoming measurement. + stdx::unordered_map<BucketKey::Hash, + std::map<Date_t, ArchivedBucket, std::greater<Date_t>>, + PreHashed> archivedBuckets; }; - StripeNumber _getStripeNumber(const BucketKey& key); + /** + * Extracts the information from the input 'doc' that is used to map the document to a bucket. + */ + StatusWith<std::pair<BucketKey, Date_t>> _extractBucketingParameters( + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc) const; + + /** + * Maps bucket key to the stripe that is responsible for it. + */ + StripeNumber _getStripeNumber(const BucketKey& key) const; /** * Mode enum to control whether the bucket retrieval methods below will return buckets that are @@ -475,9 +543,72 @@ private: BucketState targetState); /** - * Retrieve a bucket for write use, or create one if a suitable bucket doesn't already exist. + * Mode enum to control whether the bucket retrieval methods below will create new buckets if no + * suitable bucket exists. + */ + enum class AllowBucketCreation { kYes, kNo }; + + /** + * Retrieve a bucket for write use if one exists. If none exists and 'mode' is set to kYes, then + * we will create a new bucket. */ - Bucket* _useOrCreateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info); + Bucket* _useBucket(Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info, + AllowBucketCreation mode); + + /** + * Given a bucket to reopen, performs validation and constructs the in-memory representation of + * the bucket. If specified, 'expectedKey' is matched against the key extracted from the + * document to validate that the bucket is expected (i.e. to help resolve hash collisions for + * archived buckets). Does *not* hand ownership of the bucket to the catalog. + */ + StatusWith<std::unique_ptr<Bucket>> _rehydrateBucket( + OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + ExecutionStatsController stats, + boost::optional<BucketToReopen> bucketToReopen, + boost::optional<const BucketKey&> expectedKey) const; + + /** + * Given a rehydrated 'bucket', passes ownership of that bucket to the catalog, marking the + * bucket as open. + */ + Bucket* _reopenBucket(Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController stats, + const BucketKey& key, + std::unique_ptr<Bucket>&& bucket, + ClosedBuckets* closedBuckets); + + /** + * Helper method to perform the heavy lifting for both 'tryInsert' and 'insert'. See + * documentation on callers for more details. + */ + StatusWith<InsertResult> _insert(OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + boost::optional<BucketToReopen> bucketToReopen = boost::none); + + /** + * Given an already-selected 'bucket', inserts 'doc' to the bucket if possible. If not, and + * 'mode' is set to 'kYes', we will create a new bucket and insert into that bucket. + */ + std::shared_ptr<WriteBatch> _insertIntoBucket(OperationContext* opCtx, + Stripe* stripe, + WithLock stripeLock, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + CreationInfo* info, + Bucket* bucket, + ClosedBuckets* closedBuckets); /** * Wait for other batches to finish so we can prepare 'batch' @@ -496,6 +627,14 @@ private: void _archiveBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket); /** + * Identifies a previously archived bucket that may be able to accomodate the measurement + * represented by 'info', if one exists. + */ + boost::optional<OID> _findArchivedCandidate(const Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info) const; + + /** * Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other * unprepared batches and remove the bucket from the catalog if there is no unprepared batch. */ @@ -546,6 +685,15 @@ private: enum class RolloverAction { kNone, kArchive, kClose }; /** + * Determines if 'bucket' needs to be rolled over to accomodate 'doc'. If so, determines whether + * to archive or close 'bucket'. + */ + RolloverAction _determineRolloverAction(const BSONObj& doc, + CreationInfo* info, + Bucket* bucket, + uint32_t sizeToBeAdded); + + /** * Close the existing, full bucket and open a new one for the same metadata. * * Writes information about the closed bucket to the 'info' parameter. diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index c6e91d25b53..045729e31b0 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#include <boost/optional/optional_io.hpp> + #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/catalog/catalog_test_fixture.h" #include "mongo/db/catalog/create_collection.h" @@ -48,6 +50,7 @@ constexpr StringData kNumBucketsReopened = "numBucketsReopened"_sd; constexpr StringData kNumArchivedDueToTimeForward = "numBucketsArchivedDueToTimeForward"_sd; constexpr StringData kNumArchivedDueToTimeBackward = "numBucketsArchivedDueToTimeBackward"_sd; constexpr StringData kNumArchivedDueToMemoryThreshold = "numBucketsArchivedDueToMemoryThreshold"_sd; +constexpr StringData kNumArchivedDueToReopening = "numBucketsArchivedDueToReopening"_sd; constexpr StringData kNumClosedDueToTimeForward = "numBucketsClosedDueToTimeForward"_sd; constexpr StringData kNumClosedDueToTimeBackward = "numBucketsClosedDueToTimeBackward"_sd; constexpr StringData kNumClosedDueToMemoryThreshold = "numBucketsClosedDueToMemoryThreshold"_sd; @@ -983,6 +986,9 @@ TEST_F(BucketCatalogTest, SchemaChanges) { } TEST_F(BucketCatalogTest, ReopenMalformedBucket) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements", + true}; + BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, "control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1}, @@ -1074,6 +1080,9 @@ TEST_F(BucketCatalogTest, ReopenMalformedBucket) { } TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements", + true}; + // Bucket document to reopen. BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, @@ -1124,6 +1133,9 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement } TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasurement) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements", + true}; + // Bucket document to reopen. BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, @@ -1168,6 +1180,9 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme } TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements", + true}; + // Bucket document to reopen. BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, @@ -1227,6 +1242,9 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) } TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement) { + RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements", + true}; + // Bucket document to reopen. BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, @@ -1430,5 +1448,208 @@ TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) { ASSERT_LT(numClosedInFirstRound, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)); } +TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { + RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", + true}; + AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); + + // Try to insert with no open bucket. Should hint to re-open. + auto result = _bucketCatalog->tryInsert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + ASSERT(result.getValue().closedBuckets.empty()); + ASSERT(!result.getValue().batch); + ASSERT_EQ(result.getValue().candidate, boost::none); + + // Actually insert so we do have an open bucket to test against. + result = _bucketCatalog->insert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + auto batch = result.getValue().batch; + ASSERT(batch); + auto bucketId = batch->bucket().id; + ASSERT(batch->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_EQ(batch->measurements().size(), 1); + _bucketCatalog->finish(batch, {}); + + // Time backwards should hint to re-open. + result = _bucketCatalog->tryInsert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + ASSERT(result.getValue().closedBuckets.empty()); + ASSERT(!result.getValue().batch); + ASSERT_EQ(result.getValue().candidate, boost::none); + + // So should time forward. + result = _bucketCatalog->tryInsert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + ASSERT(result.getValue().closedBuckets.empty()); + ASSERT(!result.getValue().batch); + ASSERT_EQ(result.getValue().candidate, boost::none); + + // Now let's insert something so we archive the existing bucket. + result = _bucketCatalog->insert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + batch = result.getValue().batch; + ASSERT_NE(batch->bucket().id, bucketId); + ASSERT(batch); + ASSERT(batch->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_EQ(batch->measurements().size(), 1); + _bucketCatalog->finish(batch, {}); + + // If we try to insert something that could fit in the archived bucket, we should get it back as + // a candidate. + result = _bucketCatalog->tryInsert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + ASSERT(result.getValue().closedBuckets.empty()); + ASSERT(!result.getValue().batch); + ASSERT(result.getValue().candidate.has_value()); + ASSERT_EQ(result.getValue().candidate.value(), bucketId); +} + +TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) { + RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", + true}; + AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); + + // Insert a document so we have a base bucket + auto result = _bucketCatalog->insert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a": true})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + auto batch = result.getValue().batch; + ASSERT(batch); + auto bucketId = batch->bucket().id; + ASSERT(batch->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_EQ(batch->measurements().size(), 1); + _bucketCatalog->finish(batch, {}); + + // Incompatible schema would close the existing bucket, so we should expect to open a new bucket + // and proceed to insert the document. + result = _bucketCatalog->tryInsert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}, "a": {}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + batch = result.getValue().batch; + ASSERT(batch); + ASSERT_NE(batch->bucket().id, bucketId); + ASSERT(batch->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_EQ(batch->measurements().size(), 1); + _bucketCatalog->finish(batch, {}); +} + +TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { + RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", + true}; + AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); + + // Insert a document so we have a base bucket and we can test that we archive it when we reopen + // a conflicting bucket. + auto result = _bucketCatalog->insert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + auto batch = result.getValue().batch; + ASSERT(batch); + auto oldBucketId = batch->bucket().id; + ASSERT(batch->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_EQ(batch->measurements().size(), 1); + _bucketCatalog->finish(batch, {}); + + BSONObj bucketDoc = ::mongo::fromjson( + R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, + "control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"}}, + "max":{"time":{"$date":"2022-06-06T15:34:30.000Z"}}}, + "data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"}}}})"); + ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId); + auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto { + return autoColl->checkValidation(opCtx, bucketDoc); + }; + + // We should be able to pass in a valid bucket and insert into it. + result = _bucketCatalog->insert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow, + BucketCatalog::BucketToReopen{bucketDoc, validator}); + ASSERT_OK(result.getStatus()); + batch = result.getValue().batch; + ASSERT(batch); + ASSERT_EQ(batch->bucket().id, bucketDoc["_id"].OID()); + ASSERT(batch->claimCommitRights()); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_EQ(batch->measurements().size(), 1); + _bucketCatalog->finish(batch, {}); + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToReopening)); + ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); + + // Verify the old bucket was archived and we'll get it back as a candidate. + result = _bucketCatalog->tryInsert( + _opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:35:40.000Z"}})"), + BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + ASSERT_OK(result.getStatus()); + ASSERT(result.getValue().closedBuckets.empty()); + ASSERT(!result.getValue().batch); + ASSERT(result.getValue().candidate.has_value()); + ASSERT_EQ(result.getValue().candidate.value(), oldBucketId); +} + } // namespace } // namespace mongo |