diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2023-03-14 14:58:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-14 16:54:31 +0000 |
commit | b83e40d9508c662cbc75d363f83974b1efeb3f36 (patch) | |
tree | b7e665c89e730703dbf80bb4ed44faf88839df4b /src | |
parent | c2898908503a4f6b426cc6b0b44e19104e5869e8 (diff) | |
download | mongo-b83e40d9508c662cbc75d363f83974b1efeb3f36.tar.gz |
SERVER-72610 Refactor BucketCatalog to improve testability
Diffstat (limited to 'src')
20 files changed, 3151 insertions, 2893 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index eae67c55105..c2cb0cd25f2 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -1319,7 +1319,8 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const DatabaseName& mongoDSessionCatalog->invalidateAllSessions(opCtx); } - timeseries::bucket_catalog::BucketCatalog::get(opCtx).clear(dbName.db()); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + clear(bucketCatalog, dbName.db()); } repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, @@ -1388,8 +1389,8 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, } else if (collectionName == NamespaceString::kConfigSettingsNamespace) { ReadWriteConcernDefaults::get(opCtx).invalidate(); } else if (collectionName.isTimeseriesBucketsCollection()) { - timeseries::bucket_catalog::BucketCatalog::get(opCtx).clear( - collectionName.getTimeseriesViewNamespace()); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + clear(bucketCatalog, collectionName.getTimeseriesViewNamespace()); } else if (collectionName.isSystemDotJavascript()) { // Inform the JavaScript engine of the change to system.js. Scope::storedFuncMod(opCtx); @@ -2270,10 +2271,10 @@ void OpObserverImpl::_onReplicationRollback(OperationContext* opCtx, timeseriesNamespaces.insert(ns.getTimeseriesViewNamespace()); } } - timeseries::bucket_catalog::BucketCatalog::get(opCtx).clear( - [timeseriesNamespaces = std::move(timeseriesNamespaces)](const NamespaceString& bucketNs) { - return timeseriesNamespaces.contains(bucketNs); - }); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + clear(bucketCatalog, + [timeseriesNamespaces = std::move(timeseriesNamespaces)]( + const NamespaceString& bucketNs) { return timeseriesNamespaces.contains(bucketNs); }); } } // namespace mongo diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index f40a849d842..c06d81bd13d 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -1905,12 +1905,12 @@ boost::optional<std::pair<Status, bool>> checkFailUnorderedTimeseriesInsertFailP return boost::none; } -timeseries::bucket_catalog::BucketCatalog::CombineWithInsertsFromOtherClients +timeseries::bucket_catalog::CombineWithInsertsFromOtherClients canCombineTimeseriesInsertWithOtherClients(OperationContext* opCtx, const write_ops::InsertCommandRequest& request) { return isTimeseriesWriteRetryable(opCtx) || request.getOrdered() - ? timeseries::bucket_catalog::BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow - : timeseries::bucket_catalog::BucketCatalog::CombineWithInsertsFromOtherClients::kAllow; + ? timeseries::bucket_catalog::CombineWithInsertsFromOtherClients::kDisallow + : timeseries::bucket_catalog::CombineWithInsertsFromOtherClients::kAllow; } TimeseriesSingleWriteResult getTimeseriesSingleWriteResult( @@ -2132,8 +2132,8 @@ bool commitTimeseriesBucket(OperationContext* opCtx, const write_ops::InsertCommandRequest& request) try { auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); - auto metadata = bucketCatalog.getMetadata(batch->bucketHandle); - auto status = bucketCatalog.prepareCommit(batch); + auto metadata = getMetadata(bucketCatalog, batch->bucketHandle); + auto status = prepareCommit(bucketCatalog, batch); if (!status.isOK()) { invariant(timeseries::bucket_catalog::isWriteBatchFinished(*batch)); docsToRetry->push_back(index); @@ -2150,7 +2150,7 @@ bool commitTimeseriesBucket(OperationContext* opCtx, if (auto error = write_ops_exec::generateError( opCtx, output.result.getStatus(), start + index, errors->size())) { errors->emplace_back(std::move(*error)); - bucketCatalog.abort(batch, output.result.getStatus()); + abort(bucketCatalog, batch, output.result.getStatus()); return output.canContinue; } @@ -2166,18 +2166,18 @@ bool commitTimeseriesBucket(OperationContext* opCtx, if ((output.result.isOK() && output.result.getValue().getNModified() != 1) || output.result.getStatus().code() == ErrorCodes::WriteConflict) { - bucketCatalog.abort( - batch, - output.result.isOK() - ? Status{ErrorCodes::WriteConflict, "Could not update non-existent bucket"} - : output.result.getStatus()); + abort(bucketCatalog, + batch, + output.result.isOK() + ? Status{ErrorCodes::WriteConflict, "Could not update non-existent bucket"} + : output.result.getStatus()); docsToRetry->push_back(index); opCtx->recoveryUnit()->abandonSnapshot(); return true; } else if (auto error = write_ops_exec::generateError( opCtx, output.result.getStatus(), start + index, errors->size())) { errors->emplace_back(std::move(*error)); - bucketCatalog.abort(batch, output.result.getStatus()); + abort(bucketCatalog, batch, output.result.getStatus()); return output.canContinue; } } @@ -2185,7 +2185,7 @@ bool commitTimeseriesBucket(OperationContext* opCtx, getOpTimeAndElectionId(opCtx, opTime, electionId); auto closedBucket = - bucketCatalog.finish(batch, timeseries::bucket_catalog::CommitInfo{*opTime, *electionId}); + finish(bucketCatalog, batch, timeseries::bucket_catalog::CommitInfo{*opTime, *electionId}); if (closedBucket) { // If this write closed a bucket, compress the bucket @@ -2198,7 +2198,8 @@ bool commitTimeseriesBucket(OperationContext* opCtx, } return true; } catch (const DBException& ex) { - timeseries::bucket_catalog::BucketCatalog::get(opCtx).abort(batch, ex.toStatus()); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + abort(bucketCatalog, batch, ex.toStatus()); throw; } @@ -2234,7 +2235,7 @@ TimeseriesAtomicWriteResult commitTimeseriesBucketsAtomically( ScopeGuard batchGuard{[&] { for (auto batch : batchesToCommit) { if (batch.get()) { - bucketCatalog.abort(batch, abortStatus); + abort(bucketCatalog, batch, abortStatus); } } }}; @@ -2244,8 +2245,8 @@ TimeseriesAtomicWriteResult commitTimeseriesBucketsAtomically( std::vector<write_ops::UpdateCommandRequest> updateOps; for (auto batch : batchesToCommit) { - auto metadata = bucketCatalog.getMetadata(batch.get()->bucketHandle); - auto prepareCommitStatus = bucketCatalog.prepareCommit(batch); + auto metadata = getMetadata(bucketCatalog, batch.get()->bucketHandle); + auto prepareCommitStatus = prepareCommit(bucketCatalog, batch); if (!prepareCommitStatus.isOK()) { abortStatus = prepareCommitStatus; return TimeseriesAtomicWriteResult::kContinuableError; @@ -2288,8 +2289,8 @@ TimeseriesAtomicWriteResult commitTimeseriesBucketsAtomically( bool compressClosedBuckets = true; for (auto batch : batchesToCommit) { - auto closedBucket = bucketCatalog.finish( - batch, timeseries::bucket_catalog::CommitInfo{*opTime, *electionId}); + auto closedBucket = finish( + bucketCatalog, batch, timeseries::bucket_catalog::CommitInfo{*opTime, *electionId}); batch.get().reset(); if (!closedBucket || !compressClosedBuckets) { @@ -2436,13 +2437,14 @@ insertIntoBucketCatalog(OperationContext* opCtx, : ns(request); auto& measurementDoc = request.getDocuments()[start + index]; - StatusWith<timeseries::bucket_catalog::BucketCatalog::InsertResult> swResult = + StatusWith<timeseries::bucket_catalog::InsertResult> swResult = Status{ErrorCodes::BadValue, "Uninitialized InsertResult"}; do { if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( serverGlobalParams.featureCompatibility)) { - swResult = bucketCatalog.tryInsert( + swResult = timeseries::bucket_catalog::tryInsert( opCtx, + bucketCatalog, viewNs, bucketsColl->getDefaultCollator(), timeSeriesOptions, @@ -2506,8 +2508,9 @@ insertIntoBucketCatalog(OperationContext* opCtx, bucketFindResult.bucketToReopen = std::move(bucketToReopen); } - swResult = bucketCatalog.insert( + swResult = timeseries::bucket_catalog::insert( opCtx, + bucketCatalog, viewNs, bucketsColl->getDefaultCollator(), timeSeriesOptions, @@ -2518,14 +2521,15 @@ insertIntoBucketCatalog(OperationContext* opCtx, } } else { timeseries::bucket_catalog::BucketFindResult bucketFindResult; - swResult = - bucketCatalog.insert(opCtx, - viewNs, - bucketsColl->getDefaultCollator(), - timeSeriesOptions, - measurementDoc, - canCombineTimeseriesInsertWithOtherClients(opCtx, request), - bucketFindResult); + swResult = timeseries::bucket_catalog::insert( + opCtx, + bucketCatalog, + viewNs, + bucketsColl->getDefaultCollator(), + timeSeriesOptions, + measurementDoc, + canCombineTimeseriesInsertWithOtherClients(opCtx, request), + bucketFindResult); } // If there is an era offset (between the bucket we want to reopen and the @@ -2608,8 +2612,8 @@ void getTimeseriesBatchResults(OperationContext* opCtx, // error. if (itr > indexOfLastProcessedBatch && timeseries::bucket_catalog::claimWriteBatchCommitRights(*batch)) { - timeseries::bucket_catalog::BucketCatalog::get(opCtx).abort(batch, - lastError->getStatus()); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + abort(bucketCatalog, batch, lastError->getStatus()); errors->emplace_back(start + index, lastError->getStatus()); continue; } diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index b4abcb55e78..a41fa48d4ea 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -589,7 +589,7 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { // out are no longer updatable. if (nss().isTimeseriesBucketsCollection()) { auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(_opCtx); - bucketCatalog.clear(nss().getTimeseriesViewNamespace()); + clear(bucketCatalog, nss().getTimeseriesViewNamespace()); } _coordinator->setMigrationDecision(DecisionEnum::kCommitted); diff --git a/src/mongo/db/stats/storage_stats.cpp b/src/mongo/db/stats/storage_stats.cpp index d6d089606c1..0ec9643e27d 100644 --- a/src/mongo/db/stats/storage_stats.cpp +++ b/src/mongo/db/stats/storage_stats.cpp @@ -101,8 +101,9 @@ void _appendRecordStats(OperationContext* opCtx, if (numRecords) { bob.append("avgBucketSize", collection->averageObjectSize(opCtx)); } - timeseries::bucket_catalog::BucketCatalog::get(opCtx).appendExecutionStats( - collNss.getTimeseriesViewNamespace(), &bob); + auto& bucketCatalog = timeseries::bucket_catalog::BucketCatalog::get(opCtx); + timeseries::bucket_catalog::appendExecutionStats( + bucketCatalog, collNss.getTimeseriesViewNamespace(), bob); TimeseriesStats::get(collection.get()).append(&bob); } else { result->appendNumber("count", numRecords); diff --git a/src/mongo/db/timeseries/bucket_catalog/SConscript b/src/mongo/db/timeseries/bucket_catalog/SConscript index 5934016e368..c1547a498d4 100644 --- a/src/mongo/db/timeseries/bucket_catalog/SConscript +++ b/src/mongo/db/timeseries/bucket_catalog/SConscript @@ -9,7 +9,9 @@ env.Library( source=[ 'bucket.cpp', 'bucket_catalog.cpp', + 'bucket_catalog_internal.cpp', 'bucket_catalog_helpers.cpp', + 'bucket_catalog_server_status.cpp', 'bucket_identifiers.cpp', 'bucket_metadata.cpp', 'bucket_state.cpp', diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket.h b/src/mongo/db/timeseries/bucket_catalog/bucket.h index ad2dc46963a..9ed284bd132 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket.h +++ b/src/mongo/db/timeseries/bucket_catalog/bucket.h @@ -67,6 +67,11 @@ public: ~Bucket(); + Bucket(const Bucket&) = delete; + Bucket(Bucket&&) = delete; + Bucket& operator=(const Bucket&) = delete; + Bucket& operator=(Bucket&&) = delete; + // The bucket ID for the underlying document const BucketId bucketId; diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp index ee057e1551f..194e27f0600 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp @@ -35,15 +35,13 @@ #include <algorithm> #include <boost/iterator/transform_iterator.hpp> -#include <boost/utility/in_place_factory.hpp> -#include "mongo/bson/util/bsoncolumn.h" #include "mongo/db/catalog/database_holder.h" -#include "mongo/db/commands/server_status.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.h" +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h" #include "mongo/db/timeseries/bucket_compression.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_options.h" @@ -58,88 +56,8 @@ namespace mongo::timeseries::bucket_catalog { namespace { const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>(); MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict); -MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeReopeningBucket); -MONGO_FAIL_POINT_DEFINE(alwaysUseSameBucketCatalogStripe); MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationAfterStart); MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeFinish); -MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch); - -OperationId getOpId(OperationContext* opCtx, - BucketCatalog::CombineWithInsertsFromOtherClients combine) { - switch (combine) { - case BucketCatalog::CombineWithInsertsFromOtherClients::kAllow: - return 0; - case BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow: - invariant(opCtx->getOpID()); - return opCtx->getOpID(); - } - MONGO_UNREACHABLE; -} - -std::pair<OID, Date_t> generateBucketOID(const Date_t& time, const TimeseriesOptions& options) { - OID oid = OID::gen(); - - // We round the measurement timestamp down to the nearest minute, hour, or day depending on the - // granularity. We do this for two reasons. The first is so that if measurements come in - // slightly out of order, we don't have to close the current bucket due to going backwards in - // time. The second, and more important reason, is so that we reliably group measurements - // together into predictable chunks for sharding. This way we know from a measurement timestamp - // what the bucket timestamp will be, so we can route measurements to the right shard chunk. - auto roundedTime = roundTimestampToGranularity(time, options); - int64_t const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch()); - oid.setTimestamp(roundedSeconds); - - // Now, if we stopped here we could end up with bucket OID collisions. Consider the case where - // we have the granularity set to 'Hours'. This means we will round down to the nearest day, so - // any bucket generated on the same machine on the same day will have the same timestamp portion - // and unique instance portion of the OID. Only the increment will differ. Since we only use 3 - // bytes for the increment portion, we run a serious risk of overflow if we are generating lots - // of buckets. - // - // To address this, we'll take the difference between the actual timestamp and the rounded - // timestamp and add it to the instance portion of the OID to ensure we can't have a collision. - // for timestamps generated on the same machine. - // - // This leaves open the possibility that in the case of step-down/step-up, we could get a - // collision if the old primary and the new primary have unique instance bits that differ by - // less than the maximum rounding difference. This is quite unlikely though, and can be resolved - // by restarting the new primary. It remains an open question whether we can fix this in a - // better way. - // TODO (SERVER-61412): Avoid time-series bucket OID collisions after election - auto instance = oid.getInstanceUnique(); - uint32_t sum = DataView(reinterpret_cast<char*>(instance.bytes)).read<uint32_t>(1) + - (durationCount<Seconds>(time.toDurationSinceEpoch()) - roundedSeconds); - DataView(reinterpret_cast<char*>(instance.bytes)).write<uint32_t>(sum, 1); - oid.setInstanceUnique(instance); - - return {oid, roundedTime}; -} - -Status getTimeseriesBucketClearedError(const NamespaceString& ns, const OID& oid) { - return {ErrorCodes::TimeseriesBucketCleared, - str::stream() << "Time-series bucket " << oid << " for namespace " << ns - << " was cleared"}; -} - -/** - * Calculate the bucket max size constrained by the cache size and the cardinality of active - * buckets. - */ -int32_t getCacheDerivedBucketMaxSize(StorageEngine* storageEngine, uint32_t workloadCardinality) { - invariant(storageEngine); - uint64_t storageCacheSize = - static_cast<uint64_t>(storageEngine->getEngine()->getCacheSizeMB() * 1024 * 1024); - - if (!feature_flags::gTimeseriesScalabilityImprovements.isEnabled( - serverGlobalParams.featureCompatibility) || - storageCacheSize == 0 || workloadCardinality == 0) { - return INT_MAX; - } - - uint64_t derivedMaxSize = storageCacheSize / (2 * workloadCardinality); - uint64_t intMax = static_cast<uint64_t>(std::numeric_limits<int32_t>::max()); - return std::min(derivedMaxSize, intMax); -} /** * Prepares the batch for commit. Sets min/max appropriately, records the number of @@ -211,15 +129,6 @@ void abortWriteBatch(WriteBatch& batch, const Status& status) { batch.promise.setError(status); } - -void closeArchivedBucket(BucketStateRegistry& registry, - ArchivedBucket& bucket, - ClosedBuckets& closedBuckets) { - try { - closedBuckets.emplace_back(®istry, bucket.bucketId, bucket.timeField, boost::none); - } catch (...) { - } -} } // namespace BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) { @@ -230,62 +139,12 @@ BucketCatalog& BucketCatalog::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); } -Status BucketCatalog::reopenBucket(OperationContext* opCtx, - const CollectionPtr& coll, - const BSONObj& bucketDoc) { - const NamespaceString ns = coll->ns().getTimeseriesViewNamespace(); - const boost::optional<TimeseriesOptions> options = coll->getTimeseriesOptions(); - invariant(options, - str::stream() << "Attempting to reopen a bucket for a non-timeseries collection: " - << ns); - - BSONElement metadata; - auto metaFieldName = options->getMetaField(); - if (metaFieldName) { - metadata = bucketDoc.getField(kBucketMetaFieldName); - } - auto key = BucketKey{ns, BucketMetadata{metadata, coll->getDefaultCollator(), metaFieldName}}; - - // Validate the bucket document against the schema. - auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto { - return coll->checkValidation(opCtx, bucketDoc); - }; - - auto stats = _getExecutionStats(ns); - - auto res = _rehydrateBucket(opCtx, - ns, - coll->getDefaultCollator(), - *options, - BucketToReopen{bucketDoc, validator}, - boost::none); - if (!res.isOK()) { - return res.getStatus(); - } - auto bucket = std::move(res.getValue()); - - auto stripeNumber = _getStripeNumber(key); - - // Register the reopened bucket with the catalog. - auto& stripe = _stripes[stripeNumber]; - stdx::lock_guard stripeLock{stripe.mutex}; - - ClosedBuckets closedBuckets; - return _reopenBucket(&stripe, - stripeLock, - stats, - key, - std::move(bucket), - getCurrentEra(_bucketStateRegistry), - &closedBuckets) - .getStatus(); -} - -BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) { - auto const& stripe = _stripes[handle.stripe]; +BSONObj getMetadata(BucketCatalog& catalog, const BucketHandle& handle) { + auto const& stripe = catalog.stripes[handle.stripe]; stdx::lock_guard stripeLock{stripe.mutex}; - const Bucket* bucket = _findBucket(stripe, stripeLock, handle.bucketId); + const Bucket* bucket = + internal::findBucket(catalog.bucketStateRegistry, stripe, stripeLock, handle.bucketId); if (!bucket) { return {}; } @@ -293,29 +152,37 @@ BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) { return bucket->key.metadata.toBSON(); } -StatusWith<BucketCatalog::InsertResult> BucketCatalog::tryInsert( - OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine) { - return _insert(opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kNo); -} - -StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( - OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine, - BucketFindResult bucketFindResult) { - return _insert( - opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kYes, bucketFindResult); -} - -Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { +StatusWith<InsertResult> tryInsert(OperationContext* opCtx, + BucketCatalog& catalog, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine) { + return internal::insert( + opCtx, catalog, ns, comparator, options, doc, combine, internal::AllowBucketCreation::kNo); +} + +StatusWith<InsertResult> insert(OperationContext* opCtx, + BucketCatalog& catalog, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + BucketFindResult bucketFindResult) { + return internal::insert(opCtx, + catalog, + ns, + comparator, + options, + doc, + combine, + internal::AllowBucketCreation::kYes, + bucketFindResult); +} + +Status prepareCommit(BucketCatalog& catalog, std::shared_ptr<WriteBatch> batch) { auto getBatchStatus = [&] { return batch->promise.getFuture().getNoThrow().getStatus(); }; @@ -325,12 +192,13 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { return getBatchStatus(); } - auto& stripe = _stripes[batch->bucketHandle.stripe]; - _waitToCommitBatch(&stripe, batch); + auto& stripe = catalog.stripes[batch->bucketHandle.stripe]; + internal::waitToCommitBatch(catalog.bucketStateRegistry, stripe, batch); stdx::lock_guard stripeLock{stripe.mutex}; - Bucket* bucket = _useBucketAndChangeState( - &stripe, + Bucket* bucket = internal::useBucketAndChangeState( + catalog.bucketStateRegistry, + stripe, stripeLock, batch->bucketHandle.bucketId, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { @@ -342,38 +210,41 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { // Someone may have aborted it while we were waiting. Since we have the prepared batch, we // should now be able to fully abort the bucket. if (bucket) { - _abort(&stripe, stripeLock, batch, getBatchStatus()); + internal::abort(catalog, stripe, stripeLock, batch, getBatchStatus()); } return getBatchStatus(); } else if (!bucket) { - _abort(&stripe, - stripeLock, - batch, - getTimeseriesBucketClearedError(batch->bucketHandle.bucketId.ns, - batch->bucketHandle.bucketId.oid)); + internal::abort(catalog, + stripe, + stripeLock, + batch, + internal::getTimeseriesBucketClearedError( + batch->bucketHandle.bucketId.ns, batch->bucketHandle.bucketId.oid)); return getBatchStatus(); } auto prevMemoryUsage = bucket->memoryUsage; prepareWriteBatchForCommit(*batch, *bucket); - _memoryUsage.fetchAndAdd(bucket->memoryUsage - prevMemoryUsage); + catalog.memoryUsage.fetchAndAdd(bucket->memoryUsage - prevMemoryUsage); return Status::OK(); } -boost::optional<ClosedBucket> BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, - const CommitInfo& info) { +boost::optional<ClosedBucket> finish(BucketCatalog& catalog, + std::shared_ptr<WriteBatch> batch, + const CommitInfo& info) { invariant(!isWriteBatchFinished(*batch)); boost::optional<ClosedBucket> closedBucket; finishWriteBatch(*batch, info); - auto& stripe = _stripes[batch->bucketHandle.stripe]; + auto& stripe = catalog.stripes[batch->bucketHandle.stripe]; stdx::lock_guard stripeLock{stripe.mutex}; - Bucket* bucket = _useBucketAndChangeState( - &stripe, + Bucket* bucket = internal::useBucketAndChangeState( + catalog.bucketStateRegistry, + stripe, stripeLock, batch->bucketHandle.bucketId, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { @@ -401,33 +272,35 @@ boost::optional<ClosedBucket> BucketCatalog::finish(std::shared_ptr<WriteBatch> // It's possible that we cleared the bucket in between preparing the commit and finishing // here. In this case, we should abort any other ongoing batches and clear the bucket from // the catalog so it's not hanging around idle. - auto it = stripe.allBuckets.find(batch->bucketHandle.bucketId); - if (it != stripe.allBuckets.end()) { + auto it = stripe.openBucketsById.find(batch->bucketHandle.bucketId); + if (it != stripe.openBucketsById.end()) { bucket = it->second.get(); bucket->preparedBatch.reset(); - _abort(&stripe, - stripeLock, - bucket, - nullptr, - getTimeseriesBucketClearedError(bucket->bucketId.ns, bucket->bucketId.oid)); + internal::abort(catalog, + stripe, + stripeLock, + *bucket, + nullptr, + internal::getTimeseriesBucketClearedError(bucket->bucketId.ns, + bucket->bucketId.oid)); } } else if (allCommitted(*bucket)) { switch (bucket->rolloverAction) { case RolloverAction::kHardClose: case RolloverAction::kSoftClose: { - _closeOpenBucket(stripe, stripeLock, *bucket, closedBucket); + internal::closeOpenBucket(catalog, stripe, stripeLock, *bucket, closedBucket); break; } case RolloverAction::kArchive: { ClosedBuckets closedBuckets; - _archiveBucket(&stripe, stripeLock, bucket, &closedBuckets); + internal::archiveBucket(catalog, stripe, stripeLock, *bucket, closedBuckets); if (!closedBuckets.empty()) { closedBucket = std::move(closedBuckets[0]); } break; } case RolloverAction::kNone: { - _markBucketIdle(&stripe, stripeLock, bucket); + internal::markBucketIdle(stripe, stripeLock, *bucket); break; } } @@ -435,7 +308,7 @@ boost::optional<ClosedBucket> BucketCatalog::finish(std::shared_ptr<WriteBatch> return closedBucket; } -void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, const Status& status) { +void abort(BucketCatalog& catalog, std::shared_ptr<WriteBatch> batch, const Status& status) { invariant(batch); invariant(batch->commitRights.load()); @@ -443,16 +316,16 @@ void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, const Status& statu return; } - auto& stripe = _stripes[batch->bucketHandle.stripe]; + auto& stripe = catalog.stripes[batch->bucketHandle.stripe]; stdx::lock_guard stripeLock{stripe.mutex}; - _abort(&stripe, stripeLock, batch, status); + internal::abort(catalog, stripe, stripeLock, batch, status); } -void BucketCatalog::directWriteStart(const NamespaceString& ns, const OID& oid) { +void directWriteStart(BucketStateRegistry& registry, const NamespaceString& ns, const OID& oid) { invariant(!ns.isTimeseriesBucketsCollection()); auto result = changeBucketState( - _bucketStateRegistry, + registry, BucketId{ns, oid}, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { if (input.has_value()) { @@ -470,11 +343,11 @@ void BucketCatalog::directWriteStart(const NamespaceString& ns, const OID& oid) hangTimeseriesDirectModificationAfterStart.pauseWhileSet(); } -void BucketCatalog::directWriteFinish(const NamespaceString& ns, const OID& oid) { +void directWriteFinish(BucketStateRegistry& registry, const NamespaceString& ns, const OID& oid) { invariant(!ns.isTimeseriesBucketsCollection()); hangTimeseriesDirectModificationBeforeFinish.pauseWhileSet(); (void)changeBucketState( - _bucketStateRegistry, + registry, BucketId{ns, oid}, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { if (!input.has_value()) { @@ -495,28 +368,30 @@ void BucketCatalog::directWriteFinish(const NamespaceString& ns, const OID& oid) }); } -void BucketCatalog::clear(ShouldClearFn&& shouldClear) { +void clear(BucketCatalog& catalog, ShouldClearFn&& shouldClear) { if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( serverGlobalParams.featureCompatibility)) { - clearSetOfBuckets(_bucketStateRegistry, std::move(shouldClear)); + clearSetOfBuckets(catalog.bucketStateRegistry, std::move(shouldClear)); return; } - for (auto& stripe : _stripes) { + for (auto& stripe : catalog.stripes) { stdx::lock_guard stripeLock{stripe.mutex}; - for (auto it = stripe.allBuckets.begin(); it != stripe.allBuckets.end();) { + for (auto it = stripe.openBucketsById.begin(); it != stripe.openBucketsById.end();) { auto nextIt = std::next(it); const auto& bucket = it->second; if (shouldClear(bucket->bucketId.ns)) { { - stdx::lock_guard catalogLock{_mutex}; - _executionStats.erase(bucket->bucketId.ns); + stdx::lock_guard catalogLock{catalog.mutex}; + catalog.executionStats.erase(bucket->bucketId.ns); } - _abort(&stripe, - stripeLock, - bucket.get(), - nullptr, - getTimeseriesBucketClearedError(bucket->bucketId.ns, bucket->bucketId.oid)); + internal::abort(catalog, + stripe, + stripeLock, + *bucket, + nullptr, + internal::getTimeseriesBucketClearedError(bucket->bucketId.ns, + bucket->bucketId.oid)); } it = nextIt; @@ -524,1327 +399,23 @@ void BucketCatalog::clear(ShouldClearFn&& shouldClear) { } } -void BucketCatalog::clear(const NamespaceString& ns) { +void clear(BucketCatalog& catalog, const NamespaceString& ns) { invariant(!ns.isTimeseriesBucketsCollection()); - clear([ns](const NamespaceString& bucketNs) { return bucketNs == ns; }); + clear(catalog, [ns](const NamespaceString& bucketNs) { return bucketNs == ns; }); } -void BucketCatalog::clear(StringData dbName) { - clear([dbName = dbName.toString()](const NamespaceString& bucketNs) { +void clear(BucketCatalog& catalog, StringData dbName) { + clear(catalog, [dbName = dbName.toString()](const NamespaceString& bucketNs) { return bucketNs.db() == dbName; }); } -void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, - BSONObjBuilder* builder) const { - builder->appendNumber("numBucketInserts", stats->numBucketInserts.load()); - builder->appendNumber("numBucketUpdates", stats->numBucketUpdates.load()); - builder->appendNumber("numBucketsOpenedDueToMetadata", - stats->numBucketsOpenedDueToMetadata.load()); - builder->appendNumber("numBucketsClosedDueToCount", stats->numBucketsClosedDueToCount.load()); - builder->appendNumber("numBucketsClosedDueToSchemaChange", - stats->numBucketsClosedDueToSchemaChange.load()); - builder->appendNumber("numBucketsClosedDueToSize", stats->numBucketsClosedDueToSize.load()); - builder->appendNumber("numBucketsClosedDueToTimeForward", - stats->numBucketsClosedDueToTimeForward.load()); - builder->appendNumber("numBucketsClosedDueToTimeBackward", - stats->numBucketsClosedDueToTimeBackward.load()); - builder->appendNumber("numBucketsClosedDueToMemoryThreshold", - stats->numBucketsClosedDueToMemoryThreshold.load()); - - auto commits = stats->numCommits.load(); - builder->appendNumber("numCommits", commits); - builder->appendNumber("numWaits", stats->numWaits.load()); - auto measurementsCommitted = stats->numMeasurementsCommitted.load(); - builder->appendNumber("numMeasurementsCommitted", measurementsCommitted); - if (commits) { - builder->appendNumber("avgNumMeasurementsPerCommit", measurementsCommitted / commits); - } - - if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( - serverGlobalParams.featureCompatibility)) { - builder->appendNumber("numBucketsClosedDueToReopening", - stats->numBucketsClosedDueToReopening.load()); - builder->appendNumber("numBucketsArchivedDueToMemoryThreshold", - stats->numBucketsArchivedDueToMemoryThreshold.load()); - builder->appendNumber("numBucketsArchivedDueToTimeBackward", - stats->numBucketsArchivedDueToTimeBackward.load()); - builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load()); - builder->appendNumber("numBucketsKeptOpenDueToLargeMeasurements", - stats->numBucketsKeptOpenDueToLargeMeasurements.load()); - builder->appendNumber("numBucketsClosedDueToCachePressure", - stats->numBucketsClosedDueToCachePressure.load()); - builder->appendNumber("numBucketsFetched", stats->numBucketsFetched.load()); - builder->appendNumber("numBucketsQueried", stats->numBucketsQueried.load()); - builder->appendNumber("numBucketFetchesFailed", stats->numBucketFetchesFailed.load()); - builder->appendNumber("numBucketQueriesFailed", stats->numBucketQueriesFailed.load()); - builder->appendNumber("numBucketReopeningsFailed", stats->numBucketReopeningsFailed.load()); - builder->appendNumber("numDuplicateBucketsReopened", - stats->numDuplicateBucketsReopened.load()); - } -} - -void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const { - invariant(!ns.isTimeseriesBucketsCollection()); - const std::shared_ptr<ExecutionStats> stats = _getExecutionStats(ns); - _appendExecutionStatsToBuilder(stats.get(), builder); -} - -void BucketCatalog::appendGlobalExecutionStats(BSONObjBuilder* builder) const { - _appendExecutionStatsToBuilder(&_globalExecutionStats, builder); -} - -void BucketCatalog::appendStateManagementStats(BSONObjBuilder* builder) const { - appendStats(_bucketStateRegistry, builder); -} - -long long BucketCatalog::memoryUsage() const { - return _memoryUsage.load(); -} - -StatusWith<std::pair<BucketKey, Date_t>> BucketCatalog::_extractBucketingParameters( - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc) const { - Date_t time; - BSONElement metadata; - - if (!options.getMetaField().has_value()) { - auto swTime = extractTime(doc, options.getTimeField()); - if (!swTime.isOK()) { - return swTime.getStatus(); - } - time = swTime.getValue(); - } else { - auto swDocTimeAndMeta = - extractTimeAndMeta(doc, options.getTimeField(), options.getMetaField().value()); - if (!swDocTimeAndMeta.isOK()) { - return swDocTimeAndMeta.getStatus(); - } - time = swDocTimeAndMeta.getValue().first; - metadata = swDocTimeAndMeta.getValue().second; - } - - // Buckets are spread across independently-lockable stripes to improve parallelism. We map a - // bucket to a stripe by hashing the BucketKey. - auto key = BucketKey{ns, BucketMetadata{metadata, comparator, options.getMetaField()}}; - - return {std::make_pair(key, time)}; -} - -BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) const { - if (MONGO_unlikely(alwaysUseSameBucketCatalogStripe.shouldFail())) { - return 0; - } - return key.hash % kNumberOfStripes; -} - -const Bucket* BucketCatalog::_findBucket(const Stripe& stripe, - WithLock, - const BucketId& bucketId, - IgnoreBucketState mode) { - auto it = stripe.allBuckets.find(bucketId); - if (it != stripe.allBuckets.end()) { - if (mode == IgnoreBucketState::kYes) { - return it->second.get(); - } - - if (auto state = getBucketState(_bucketStateRegistry, it->second.get()); - state && !state.value().conflictsWithInsertion()) { - return it->second.get(); - } - } - return nullptr; -} - -Bucket* BucketCatalog::_useBucket(Stripe* stripe, - WithLock stripeLock, - const BucketId& bucketId, - IgnoreBucketState mode) { - return const_cast<Bucket*>(_findBucket(*stripe, stripeLock, bucketId, mode)); -} - -Bucket* BucketCatalog::_useBucketAndChangeState(Stripe* stripe, - WithLock stripeLock, - const BucketId& bucketId, - const BucketStateRegistry::StateChangeFn& change) { - auto it = stripe->allBuckets.find(bucketId); - if (it != stripe->allBuckets.end()) { - if (auto state = changeBucketState(_bucketStateRegistry, it->second.get(), change); - state && !state.value().conflictsWithInsertion()) { - return it->second.get(); - } - } - return nullptr; -} - -Bucket* BucketCatalog::_useBucket(Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info, - AllowBucketCreation mode) { - auto it = stripe->openBuckets.find(info.key); - if (it == stripe->openBuckets.end()) { - // No open bucket for this metadata. - return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) - : nullptr; - } - - auto& openSet = it->second; - Bucket* bucket = nullptr; - for (Bucket* potentialBucket : openSet) { - if (potentialBucket->rolloverAction == RolloverAction::kNone) { - bucket = potentialBucket; - break; - } - } - if (!bucket) { - return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) - : nullptr; - } - - if (auto state = getBucketState(_bucketStateRegistry, bucket); - state && !state.value().conflictsWithInsertion()) { - _markBucketNotIdle(stripe, stripeLock, bucket); - return bucket; - } - - _abort(stripe, - stripeLock, - bucket, - nullptr, - getTimeseriesBucketClearedError(bucket->bucketId.ns, bucket->bucketId.oid)); - - return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) : nullptr; -} - -Bucket* BucketCatalog::_useAlternateBucket(Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info) { - auto it = stripe->openBuckets.find(info.key); - if (it == stripe->openBuckets.end()) { - // No open bucket for this metadata. - return nullptr; - } - - auto& openSet = it->second; - // In order to potentially erase elements of the set while we iterate it (via _abort), we need - // to advance the iterator before we call erase. This means we can't use the more - // straightforward range iteration, and use the somewhat awkward pattern below. - for (auto it = openSet.begin(); it != openSet.end();) { - Bucket* potentialBucket = *it++; - - if (potentialBucket->rolloverAction == RolloverAction::kNone || - potentialBucket->rolloverAction == RolloverAction::kHardClose) { - continue; - } - - auto bucketTime = potentialBucket->minTime; - if (info.time - bucketTime >= Seconds(*info.options.getBucketMaxSpanSeconds()) || - info.time < bucketTime) { - continue; - } - - auto state = getBucketState(_bucketStateRegistry, potentialBucket); - invariant(state); - if (!state.value().conflictsWithInsertion()) { - invariant(!potentialBucket->idleListEntry.has_value()); - return potentialBucket; - } - - // Clean up the bucket if it has been cleared. - if (state.value().isSet(BucketStateFlag::kCleared)) { - _abort(stripe, - stripeLock, - potentialBucket, - nullptr, - getTimeseriesBucketClearedError(potentialBucket->bucketId.ns, - potentialBucket->bucketId.oid)); - } - } - - return nullptr; -} - -StatusWith<std::unique_ptr<Bucket>> BucketCatalog::_rehydrateBucket( - OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BucketToReopen& bucketToReopen, - boost::optional<const BucketKey&> expectedKey) { - invariant(feature_flags::gTimeseriesScalabilityImprovements.isEnabled( - serverGlobalParams.featureCompatibility)); - const auto& [bucketDoc, validator, catalogEra] = bucketToReopen; - if (catalogEra < getCurrentEra(_bucketStateRegistry)) { - return {ErrorCodes::WriteConflict, "Bucket is from an earlier era, may be outdated"}; - } - - BSONElement bucketIdElem = bucketDoc.getField(kBucketIdFieldName); - if (bucketIdElem.eoo() || bucketIdElem.type() != BSONType::jstOID) { - return {ErrorCodes::BadValue, - str::stream() << kBucketIdFieldName << " is missing or not an ObjectId"}; - } - - // Validate the bucket document against the schema. - auto result = validator(opCtx, bucketDoc); - if (result.first != Collection::SchemaValidationResult::kPass) { - return result.second; - } - - auto controlField = bucketDoc.getObjectField(kBucketControlFieldName); - auto closedElem = controlField.getField(kBucketControlClosedFieldName); - if (closedElem.booleanSafe()) { - return {ErrorCodes::BadValue, - "Bucket has been marked closed and is not eligible for reopening"}; - } - - BSONElement metadata; - auto metaFieldName = options.getMetaField(); - if (metaFieldName) { - metadata = bucketDoc.getField(kBucketMetaFieldName); - } - - // Buckets are spread across independently-lockable stripes to improve parallelism. We map a - // bucket to a stripe by hashing the BucketKey. - auto key = BucketKey{ns, BucketMetadata{metadata, comparator, options.getMetaField()}}; - if (expectedKey.has_value() && key != expectedKey.value()) { - return {ErrorCodes::BadValue, "Bucket metadata does not match (hash collision)"}; - } - - auto minTime = controlField.getObjectField(kBucketControlMinFieldName) - .getField(options.getTimeField()) - .Date(); - BucketId bucketId{key.ns, bucketIdElem.OID()}; - std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>( - bucketId, key, options.getTimeField(), minTime, _bucketStateRegistry); - - const bool isCompressed = isCompressedBucket(bucketDoc); - - // Initialize the remaining member variables from the bucket document. - if (isCompressed) { - auto decompressed = decompressBucket(bucketDoc); - if (!decompressed.has_value()) { - return Status{ErrorCodes::BadValue, "Bucket could not be decompressed"}; - } - bucket->size = decompressed.value().objsize(); - bucket->decompressed = DecompressionResult{bucketDoc, decompressed.value()}; - bucket->memoryUsage += (decompressed.value().objsize() + bucketDoc.objsize()); - } else { - bucket->size = bucketDoc.objsize(); - } - - // Populate the top-level data field names. - const BSONObj& dataObj = bucketDoc.getObjectField(kBucketDataFieldName); - for (const BSONElement& dataElem : dataObj) { - auto hashedKey = StringSet::hasher().hashed_key(dataElem.fieldName()); - bucket->fieldNames.emplace(hashedKey); - } - - auto swMinMax = generateMinMaxFromBucketDoc(bucketDoc, comparator); - if (!swMinMax.isOK()) { - return swMinMax.getStatus(); - } - bucket->minmax = std::move(swMinMax.getValue()); - - auto swSchema = generateSchemaFromBucketDoc(bucketDoc, comparator); - if (!swSchema.isOK()) { - return swSchema.getStatus(); - } - bucket->schema = std::move(swSchema.getValue()); - - uint32_t numMeasurements = 0; - const BSONElement timeColumnElem = dataObj.getField(options.getTimeField()); - - if (isCompressed && timeColumnElem.type() == BSONType::BinData) { - BSONColumn storage{timeColumnElem}; - numMeasurements = storage.size(); - } else if (timeColumnElem.isABSONObj()) { - numMeasurements = timeColumnElem.Obj().nFields(); - } else { - return {ErrorCodes::BadValue, - "Bucket data field is malformed (missing a valid time column)"}; - } - - bucket->numMeasurements = numMeasurements; - bucket->numCommittedMeasurements = numMeasurements; - - // The namespace is stored two times: the bucket itself and openBuckets. We don't have a great - // approximation for the _schema or _minmax data structure size, so we use the control field - // size as an approximation for _minmax, and half that size for _schema. Since the metadata - // is stored in the bucket, we need to add that as well. A unique pointer to the bucket is - // stored once: allBuckets. A raw pointer to the bucket is stored at most twice: openBuckets, - // idleBuckets. - bucket->memoryUsage += (key.ns.size() * 2) + 1.5 * controlField.objsize() + - key.metadata.toBSON().objsize() + sizeof(Bucket) + sizeof(std::unique_ptr<Bucket>) + - (sizeof(Bucket*) * 2); - - return {std::move(bucket)}; -} - -StatusWith<Bucket*> BucketCatalog::_reopenBucket(Stripe* stripe, - WithLock stripeLock, - ExecutionStatsController stats, - const BucketKey& key, - std::unique_ptr<Bucket>&& bucket, - std::uint64_t targetEra, - ClosedBuckets* closedBuckets) { - invariant(bucket); - - _expireIdleBuckets(stripe, stripeLock, stats, closedBuckets); - - // We may need to initialize the bucket's state. - bool conflicts = false; - auto initializeStateFn = - [targetEra, &conflicts](boost::optional<BucketState> input, - std::uint64_t currentEra) -> boost::optional<BucketState> { - if (targetEra < currentEra || - (input.has_value() && input.value().conflictsWithReopening())) { - conflicts = true; - return input; - } - conflicts = false; - return input.has_value() ? input.value() : BucketState{}; - }; - - auto state = changeBucketState(_bucketStateRegistry, bucket->bucketId, initializeStateFn); - if (conflicts) { - return {ErrorCodes::WriteConflict, "Bucket may be stale"}; - } - - // If this bucket was archived, we need to remove it from the set of archived buckets. - if (auto setIt = stripe->archivedBuckets.find(key.hash); - setIt != stripe->archivedBuckets.end()) { - auto& archivedSet = setIt->second; - if (auto bucketIt = archivedSet.find(bucket->minTime); - bucketIt != archivedSet.end() && bucket->bucketId == bucketIt->second.bucketId) { - long long memory = - _marginalMemoryUsageForArchivedBucket(bucketIt->second, archivedSet.size() == 1); - if (archivedSet.size() == 1) { - stripe->archivedBuckets.erase(setIt); - } else { - archivedSet.erase(bucketIt); - } - _memoryUsage.fetchAndSubtract(memory); - _numberOfActiveBuckets.fetchAndSubtract(1); - } - } - - // Pass ownership of the reopened bucket to the bucket catalog. - auto [insertedIt, newlyInserted] = - stripe->allBuckets.try_emplace(bucket->bucketId, std::move(bucket)); - invariant(newlyInserted); - Bucket* unownedBucket = insertedIt->second.get(); - - // If we already have an open bucket for this key, we need to close it. - if (auto it = stripe->openBuckets.find(key); it != stripe->openBuckets.end()) { - auto& openSet = it->second; - for (Bucket* existingBucket : openSet) { - if (existingBucket->rolloverAction == RolloverAction::kNone) { - stats.incNumBucketsClosedDueToReopening(); - if (allCommitted(*existingBucket)) { - _closeOpenBucket(*stripe, stripeLock, *existingBucket, *closedBuckets); - } else { - existingBucket->rolloverAction = RolloverAction::kSoftClose; - } - // We should only have one open bucket at a time. - break; - } - } - } - - // Now actually mark this bucket as open. - stripe->openBuckets[key].emplace(unownedBucket); - stats.incNumBucketsReopened(); - - _memoryUsage.addAndFetch(unownedBucket->memoryUsage); - _numberOfActiveBuckets.fetchAndAdd(1); - - return unownedBucket; -} - -StatusWith<Bucket*> BucketCatalog::_reuseExistingBucket(Stripe* stripe, - WithLock stripeLock, - ExecutionStatsController* stats, - const BucketKey& key, - Bucket* existingBucket, - std::uint64_t targetEra) { - invariant(existingBucket); - - // If we have an existing bucket, passing the Bucket* will let us check if the bucket was - // cleared as part of a set since the last time it was used. If we were to just check by - // OID, we may miss if e.g. there was a move chunk operation. - bool conflicts = false; - auto state = changeBucketState( - _bucketStateRegistry, - existingBucket, - [targetEra, &conflicts](boost::optional<BucketState> input, - std::uint64_t currentEra) -> boost::optional<BucketState> { - if (targetEra < currentEra || - (input.has_value() && input.value().conflictsWithReopening())) { - conflicts = true; - return input; - } - conflicts = false; - return input.has_value() ? input.value() : BucketState{}; - }); - if (state.has_value() && state.value().isSet(BucketStateFlag::kCleared)) { - _abort(stripe, - stripeLock, - existingBucket, - nullptr, - getTimeseriesBucketClearedError(existingBucket->bucketId.ns, - existingBucket->bucketId.oid)); - conflicts = true; - } - if (conflicts) { - return {ErrorCodes::WriteConflict, "Bucket may be stale"}; - } - - // It's possible to have two buckets with the same ID in different collections, so let's make - // extra sure the existing bucket is the right one. - if (existingBucket->bucketId.ns != key.ns) { - return {ErrorCodes::BadValue, "Cannot re-use bucket: same ID but different namespace"}; - } - - // If the bucket was already open, wasn't cleared, the state didn't conflict with reopening, and - // the namespace matches, then we can simply return it. - stats->incNumDuplicateBucketsReopened(); - _markBucketNotIdle(stripe, stripeLock, existingBucket); - - return existingBucket; -} - -StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( - OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine, - AllowBucketCreation mode, - BucketFindResult bucketFindResult) { +void appendExecutionStats(const BucketCatalog& catalog, + const NamespaceString& ns, + BSONObjBuilder& builder) { invariant(!ns.isTimeseriesBucketsCollection()); - - auto res = _extractBucketingParameters(ns, comparator, options, doc); - if (!res.isOK()) { - return res.getStatus(); - } - auto& key = res.getValue().first; - auto time = res.getValue().second; - - ExecutionStatsController stats = _getExecutionStats(ns); - _updateBucketFetchAndQueryStats(stats, bucketFindResult); - - // Buckets are spread across independently-lockable stripes to improve parallelism. We map a - // bucket to a stripe by hashing the BucketKey. - auto stripeNumber = _getStripeNumber(key); - - InsertResult result; - result.catalogEra = getCurrentEra(_bucketStateRegistry); - CreationInfo info{key, stripeNumber, time, options, stats, &result.closedBuckets}; - boost::optional<BucketToReopen> bucketToReopen = std::move(bucketFindResult.bucketToReopen); - - auto rehydratedBucket = bucketToReopen.has_value() - ? _rehydrateBucket(opCtx, ns, comparator, options, bucketToReopen.value(), key) - : StatusWith<std::unique_ptr<Bucket>>{ErrorCodes::BadValue, "No bucket to rehydrate"}; - if (rehydratedBucket.getStatus().code() == ErrorCodes::WriteConflict) { - stats.incNumBucketReopeningsFailed(); - return rehydratedBucket.getStatus(); - } - - auto& stripe = _stripes[stripeNumber]; - stdx::lock_guard stripeLock{stripe.mutex}; - - if (rehydratedBucket.isOK()) { - invariant(mode == AllowBucketCreation::kYes); - hangTimeseriesInsertBeforeReopeningBucket.pauseWhileSet(); - - StatusWith<Bucket*> swBucket{nullptr}; - auto existingIt = stripe.allBuckets.find(rehydratedBucket.getValue()->bucketId); - if (existingIt != stripe.allBuckets.end()) { - // First let's check the existing bucket if we have one. - Bucket* existingBucket = existingIt->second.get(); - swBucket = _reuseExistingBucket( - &stripe, stripeLock, &stats, key, existingBucket, bucketToReopen->catalogEra); - } else { - // No existing bucket to use, go ahead and try to reopen our rehydrated bucket. - swBucket = _reopenBucket(&stripe, - stripeLock, - stats, - key, - std::move(rehydratedBucket.getValue()), - bucketToReopen->catalogEra, - &result.closedBuckets); - } - - if (swBucket.isOK()) { - Bucket* bucket = swBucket.getValue(); - invariant(bucket); - auto insertionResult = _insertIntoBucket( - opCtx, &stripe, stripeLock, stripeNumber, doc, combine, mode, &info, bucket); - auto* batch = stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); - invariant(batch); - result.batch = *batch; - - return std::move(result); - } else { - stats.incNumBucketReopeningsFailed(); - if (swBucket.getStatus().code() == ErrorCodes::WriteConflict) { - return swBucket.getStatus(); - } - // If we had a different type of error, then we should fall through and proceed to open - // a new bucket. - } - } - - Bucket* bucket = _useBucket(&stripe, stripeLock, info, mode); - if (!bucket) { - invariant(mode == AllowBucketCreation::kNo); - constexpr bool allowQueryBasedReopening = true; - result.candidate = - _getReopeningCandidate(opCtx, &stripe, stripeLock, info, allowQueryBasedReopening); - return std::move(result); - } - - auto insertionResult = _insertIntoBucket( - opCtx, &stripe, stripeLock, stripeNumber, doc, combine, mode, &info, bucket); - if (auto* reason = stdx::get_if<RolloverReason>(&insertionResult)) { - invariant(mode == AllowBucketCreation::kNo); - if (allCommitted(*bucket)) { - _markBucketIdle(&stripe, stripeLock, bucket); - } - - // If we were time forward or backward, we might be able to "reopen" a bucket we still have - // in memory that's set to be closed when pending operations finish. - if ((*reason == RolloverReason::kTimeBackward || *reason == RolloverReason::kTimeForward)) { - if (Bucket* alternate = _useAlternateBucket(&stripe, stripeLock, info)) { - insertionResult = _insertIntoBucket( - opCtx, &stripe, stripeLock, stripeNumber, doc, combine, mode, &info, alternate); - if (auto* batch = stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult)) { - result.batch = *batch; - return std::move(result); - } - - // We weren't able to insert into the other bucket, so fall through to the regular - // reopening procedure. - } - } - - bool allowQueryBasedReopening = (*reason == RolloverReason::kTimeBackward); - result.candidate = - _getReopeningCandidate(opCtx, &stripe, stripeLock, info, allowQueryBasedReopening); - } else { - result.batch = *stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); - } - return std::move(result); -} - -stdx::variant<std::shared_ptr<WriteBatch>, RolloverReason> BucketCatalog::_insertIntoBucket( - OperationContext* opCtx, - Stripe* stripe, - WithLock stripeLock, - StripeNumber stripeNumber, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine, - AllowBucketCreation mode, - CreationInfo* info, - Bucket* bucket) { - Bucket::NewFieldNames newFieldNamesToBeInserted; - int32_t sizeToBeAdded = 0; - const auto previousMemoryUsage = bucket->memoryUsage; - - bool isNewlyOpenedBucket = (bucket->size == 0); - if (!isNewlyOpenedBucket) { - auto [action, reason] = _determineRolloverAction( - opCtx, doc, info, bucket, newFieldNamesToBeInserted, sizeToBeAdded, mode); - if ((action == RolloverAction::kSoftClose || action == RolloverAction::kArchive) && - mode == AllowBucketCreation::kNo) { - // We don't actually want to roll this bucket over yet, bail out. - return reason; - } else if (action != RolloverAction::kNone) { - info->openedDuetoMetadata = false; - bucket = _rollover(stripe, stripeLock, bucket, *info, action); - isNewlyOpenedBucket = true; - } - } - if (isNewlyOpenedBucket) { - calculateBucketFieldsAndSizeChange( - *bucket, doc, info->options.getMetaField(), newFieldNamesToBeInserted, sizeToBeAdded); - } - - auto batch = activeBatch(*bucket, getOpId(opCtx, combine), stripeNumber, info->stats); - batch->measurements.push_back(doc); - for (auto&& field : newFieldNamesToBeInserted) { - batch->newFieldNamesToBeInserted[field] = field.hash(); - bucket->uncommittedFieldNames.emplace(field); - } - - bucket->numMeasurements++; - bucket->size += sizeToBeAdded; - if (isNewlyOpenedBucket) { - // The namespace is stored two times: the bucket itself and openBuckets. - // We don't have a great approximation for the - // _schema size, so we use initial document size minus metadata as an approximation. Since - // the metadata itself is stored once, in the bucket, we can combine the two and just use - // the initial document size. A unique pointer to the bucket is stored once: allBuckets. A - // raw pointer to the bucket is stored at most twice: openBuckets, idleBuckets. - bucket->memoryUsage += (info->key.ns.size() * 2) + doc.objsize() + sizeof(Bucket) + - sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2); - - auto updateStatus = bucket->schema.update( - doc, info->options.getMetaField(), info->key.metadata.getComparator()); - invariant(updateStatus == Schema::UpdateStatus::Updated); - } else { - _memoryUsage.fetchAndSubtract(previousMemoryUsage); - } - _memoryUsage.fetchAndAdd(bucket->memoryUsage); - - return batch; -} - -void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch) { - while (true) { - std::shared_ptr<WriteBatch> current; - - { - stdx::lock_guard stripeLock{stripe->mutex}; - Bucket* bucket = _useBucket( - stripe, stripeLock, batch->bucketHandle.bucketId, IgnoreBucketState::kNo); - if (!bucket || isWriteBatchFinished(*batch)) { - return; - } - - current = bucket->preparedBatch; - if (!current) { - // No other batches for this bucket are currently committing, so we can proceed. - bucket->preparedBatch = batch; - bucket->batches.erase(batch->opId); - return; - } - } - - // We only hit this failpoint when there are conflicting prepared batches on the same - // bucket. - hangWaitingForConflictingPreparedBatch.pauseWhileSet(); - - // We have to wait for someone else to finish. - getWriteBatchResult(*current).getStatus().ignore(); // We don't care about the result. - } -} - -void BucketCatalog::_removeBucket(Stripe* stripe, - WithLock stripeLock, - Bucket* bucket, - RemovalMode mode) { - invariant(bucket->batches.empty()); - invariant(!bucket->preparedBatch); - - auto allIt = stripe->allBuckets.find(bucket->bucketId); - invariant(allIt != stripe->allBuckets.end()); - - _memoryUsage.fetchAndSubtract(bucket->memoryUsage); - _markBucketNotIdle(stripe, stripeLock, bucket); - - // If the bucket was rolled over, then there may be a different open bucket for this metadata. - auto openIt = stripe->openBuckets.find({bucket->bucketId.ns, bucket->key.metadata}); - if (openIt != stripe->openBuckets.end()) { - auto& openSet = openIt->second; - auto bucketIt = openSet.find(bucket); - if (bucketIt != openSet.end()) { - if (openSet.size() == 1) { - stripe->openBuckets.erase(openIt); - } else { - openSet.erase(bucketIt); - } - } - } - - // If we are cleaning up while archiving a bucket, then we want to preserve its state. Otherwise - // we can remove the state from the catalog altogether. - switch (mode) { - case RemovalMode::kClose: { - auto state = getBucketState(_bucketStateRegistry, bucket->bucketId); - invariant(state.has_value()); - invariant(state.value().isSet(BucketStateFlag::kPendingCompression)); - break; - } - case RemovalMode::kAbort: - changeBucketState(_bucketStateRegistry, - bucket->bucketId, - [](boost::optional<BucketState> input, - std::uint64_t) -> boost::optional<BucketState> { - invariant(input.has_value()); - if (input->conflictsWithReopening()) { - return input.value().setFlag(BucketStateFlag::kUntracked); - } - return boost::none; - }); - break; - case RemovalMode::kArchive: - // No state change - break; - } - - _numberOfActiveBuckets.fetchAndSubtract(1); - stripe->allBuckets.erase(allIt); -} - -void BucketCatalog::_archiveBucket(Stripe* stripe, - WithLock stripeLock, - Bucket* bucket, - ClosedBuckets* closedBuckets) { - bool archived = false; - auto& archivedSet = stripe->archivedBuckets[bucket->key.hash]; - auto it = archivedSet.find(bucket->minTime); - if (it == archivedSet.end()) { - auto [it, inserted] = archivedSet.emplace( - bucket->minTime, ArchivedBucket{bucket->bucketId, bucket->timeField}); - - long long memory = - _marginalMemoryUsageForArchivedBucket(it->second, archivedSet.size() == 1); - _memoryUsage.fetchAndAdd(memory); - archived = true; - } - - if (archived) { - // If we have an archived bucket, we still want to account for it in numberOfActiveBuckets - // so we will increase it here since removeBucket decrements the count. - _numberOfActiveBuckets.fetchAndAdd(1); - _removeBucket(stripe, stripeLock, bucket, RemovalMode::kArchive); - } else { - // We had a meta hash collision, and already have a bucket archived with the same meta hash - // and timestamp as this bucket. Since it's somewhat arbitrary which bucket we keep, we'll - // keep the one that's already archived and just plain close this one. - _closeOpenBucket(*stripe, stripeLock, *bucket, *closedBuckets); - } -} - -void BucketCatalog::_closeOpenBucket(Stripe& stripe, - WithLock stripeLock, - Bucket& bucket, - ClosedBuckets& closedBuckets) { - bool error = false; - try { - closedBuckets.emplace_back( - &_bucketStateRegistry, bucket.bucketId, bucket.timeField, bucket.numMeasurements); - } catch (...) { - error = true; - } - _removeBucket(&stripe, stripeLock, &bucket, error ? RemovalMode::kAbort : RemovalMode::kClose); -} - -void BucketCatalog::_closeOpenBucket(Stripe& stripe, - WithLock stripeLock, - Bucket& bucket, - boost::optional<ClosedBucket>& closedBucket) { - bool error = false; - try { - closedBucket = boost::in_place( - &_bucketStateRegistry, bucket.bucketId, bucket.timeField, bucket.numMeasurements); - } catch (...) { - closedBucket = boost::none; - error = true; - } - _removeBucket(&stripe, stripeLock, &bucket, error ? RemovalMode::kAbort : RemovalMode::kClose); -} - - -boost::optional<OID> BucketCatalog::_findArchivedCandidate(Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info) { - auto setIt = stripe->archivedBuckets.find(info.key.hash); - if (setIt == stripe->archivedBuckets.end()) { - return boost::none; - } - - auto& archivedSet = setIt->second; - - // We want to find the largest time that is not greater than info.time. Generally lower_bound - // will return the smallest element not less than the search value, but we are using - // std::greater instead of std::less for the map's comparisons. This means the order of keys - // will be reversed, and lower_bound will return what we want. - auto it = archivedSet.lower_bound(info.time); - if (it == archivedSet.end()) { - return boost::none; - } - - const auto& [candidateTime, candidateBucket] = *it; - invariant(candidateTime <= info.time); - // We need to make sure our measurement can fit without violating max span. If not, we - // can't use this bucket. - if (info.time - candidateTime < Seconds(*info.options.getBucketMaxSpanSeconds())) { - auto state = getBucketState(_bucketStateRegistry, candidateBucket.bucketId); - if (state && !state.value().conflictsWithReopening()) { - return candidateBucket.bucketId.oid; - } else { - if (state) { - changeBucketState(_bucketStateRegistry, - candidateBucket.bucketId, - [](boost::optional<BucketState> input, - std::uint64_t) -> boost::optional<BucketState> { - if (!input.has_value()) { - return boost::none; - } - invariant(input.value().conflictsWithReopening()); - return input.value().setFlag(BucketStateFlag::kUntracked); - }); - } - long long memory = - _marginalMemoryUsageForArchivedBucket(candidateBucket, archivedSet.size() == 1); - if (archivedSet.size() == 1) { - stripe->archivedBuckets.erase(setIt); - } else { - archivedSet.erase(it); - } - _memoryUsage.fetchAndSubtract(memory); - _numberOfActiveBuckets.fetchAndSubtract(1); - } - } - - return boost::none; -} - -stdx::variant<std::monostate, OID, std::vector<BSONObj>> BucketCatalog::_getReopeningCandidate( - OperationContext* opCtx, - Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info, - bool allowQueryBasedReopening) { - if (auto archived = _findArchivedCandidate(stripe, stripeLock, info)) { - return archived.value(); - } - - if (!allowQueryBasedReopening) { - return {}; - } - - boost::optional<BSONElement> metaElement; - if (info.options.getMetaField().has_value()) { - metaElement = info.key.metadata.element(); - } - - auto controlMinTimePath = kControlMinFieldNamePrefix.toString() + info.options.getTimeField(); - auto maxDataTimeFieldPath = kDataFieldNamePrefix.toString() + info.options.getTimeField() + - "." + std::to_string(gTimeseriesBucketMaxCount - 1); - - // Derive the maximum bucket size. - auto bucketMaxSize = getCacheDerivedBucketMaxSize( - opCtx->getServiceContext()->getStorageEngine(), _numberOfActiveBuckets.load()); - int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, bucketMaxSize); - - return generateReopeningPipeline(opCtx, - info.time, - metaElement, - controlMinTimePath, - maxDataTimeFieldPath, - *info.options.getBucketMaxSpanSeconds(), - effectiveMaxSize); -} - -void BucketCatalog::_abort(Stripe* stripe, - WithLock stripeLock, - std::shared_ptr<WriteBatch> batch, - const Status& status) { - // Before we access the bucket, make sure it's still there. - Bucket* bucket = - _useBucket(stripe, stripeLock, batch->bucketHandle.bucketId, IgnoreBucketState::kYes); - if (!bucket) { - // Special case, bucket has already been cleared, and we need only abort this batch. - abortWriteBatch(*batch, status); - return; - } - - // Proceed to abort any unprepared batches and remove the bucket if possible - _abort(stripe, stripeLock, bucket, batch, status); -} - -void BucketCatalog::_abort(Stripe* stripe, - WithLock stripeLock, - Bucket* bucket, - std::shared_ptr<WriteBatch> batch, - const Status& status) { - // Abort any unprepared batches. This should be safe since we have a lock on the stripe, - // preventing anyone else from using these. - for (const auto& [_, current] : bucket->batches) { - abortWriteBatch(*current, status); - } - bucket->batches.clear(); - - bool doRemove = true; // We shouldn't remove the bucket if there's a prepared batch outstanding - // and it's not the one we manage. In that case, we don't know what the - // user is doing with it, but we need to keep the bucket around until - // that batch is finished. - if (auto& prepared = bucket->preparedBatch) { - if (batch && prepared == batch) { - // We own the prepared batch, so we can go ahead and abort it and remove the bucket. - abortWriteBatch(*prepared, status); - prepared.reset(); - } else { - doRemove = false; - } - } - - if (doRemove) { - _removeBucket(stripe, stripeLock, bucket, RemovalMode::kAbort); - } else { - changeBucketState( - _bucketStateRegistry, - bucket->bucketId, - [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { - invariant(input.has_value()); - return input.value().setFlag(BucketStateFlag::kCleared); - }); - } + const std::shared_ptr<ExecutionStats> stats = internal::getExecutionStats(catalog, ns); + appendExecutionStatsToBuilder(*stats, builder); } -void BucketCatalog::_compressionDone(const BucketId& bucketId) { - changeBucketState(_bucketStateRegistry, - bucketId, - [](boost::optional<BucketState> input, - std::uint64_t) -> boost::optional<BucketState> { return boost::none; }); -} - -void BucketCatalog::_markBucketIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { - invariant(bucket); - invariant(!bucket->idleListEntry.has_value()); - invariant(allCommitted(*bucket)); - stripe->idleBuckets.push_front(bucket); - bucket->idleListEntry = stripe->idleBuckets.begin(); -} - -void BucketCatalog::_markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { - invariant(bucket); - if (bucket->idleListEntry.has_value()) { - stripe->idleBuckets.erase(bucket->idleListEntry.value()); - bucket->idleListEntry = boost::none; - } -} - -void BucketCatalog::_expireIdleBuckets(Stripe* stripe, - WithLock stripeLock, - ExecutionStatsController& stats, - ClosedBuckets* closedBuckets) { - // As long as we still need space and have entries and remaining attempts, close idle buckets. - int32_t numExpired = 0; - - const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( - serverGlobalParams.featureCompatibility); - - while (!stripe->idleBuckets.empty() && - _memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && - numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { - Bucket* bucket = stripe->idleBuckets.back(); - - auto state = getBucketState(_bucketStateRegistry, bucket); - if (canArchive && state && !state.value().conflictsWithInsertion()) { - // Can archive a bucket if it's still eligible for insertions. - _archiveBucket(stripe, stripeLock, bucket, closedBuckets); - stats.incNumBucketsArchivedDueToMemoryThreshold(); - } else if (state && state.value().isSet(BucketStateFlag::kCleared)) { - // Bucket was cleared and just needs to be removed from catalog. - _removeBucket(stripe, stripeLock, bucket, RemovalMode::kAbort); - } else { - _closeOpenBucket(*stripe, stripeLock, *bucket, *closedBuckets); - stats.incNumBucketsClosedDueToMemoryThreshold(); - } - - ++numExpired; - } - - while (canArchive && !stripe->archivedBuckets.empty() && - _memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && - numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { - - auto& [hash, archivedSet] = *stripe->archivedBuckets.begin(); - invariant(!archivedSet.empty()); - - auto& [timestamp, bucket] = *archivedSet.begin(); - closeArchivedBucket(_bucketStateRegistry, bucket, *closedBuckets); - - long long memory = _marginalMemoryUsageForArchivedBucket(bucket, archivedSet.size() == 1); - if (archivedSet.size() == 1) { - // If this is the only entry, erase the whole map so we don't leave it empty. - stripe->archivedBuckets.erase(stripe->archivedBuckets.begin()); - } else { - // Otherwise just erase this bucket from the map. - archivedSet.erase(archivedSet.begin()); - } - _memoryUsage.fetchAndSubtract(memory); - _numberOfActiveBuckets.fetchAndSubtract(1); - - stats.incNumBucketsClosedDueToMemoryThreshold(); - ++numExpired; - } -} - -Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info) { - _expireIdleBuckets(stripe, stripeLock, info.stats, info.closedBuckets); - - auto [oid, roundedTime] = generateBucketOID(info.time, info.options); - auto bucketId = BucketId{info.key.ns, oid}; - - auto [it, inserted] = stripe->allBuckets.try_emplace( - bucketId, - std::make_unique<Bucket>( - bucketId, info.key, info.options.getTimeField(), roundedTime, _bucketStateRegistry)); - tassert(6130900, "Expected bucket to be inserted", inserted); - Bucket* bucket = it->second.get(); - stripe->openBuckets[info.key].emplace(bucket); - - auto state = changeBucketState( - _bucketStateRegistry, - bucketId, - [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { - invariant(!input.has_value()); - return BucketState{}; - }); - invariant(state == BucketState{}); - _numberOfActiveBuckets.fetchAndAdd(1); - - if (info.openedDuetoMetadata) { - info.stats.incNumBucketsOpenedDueToMetadata(); - } - - // Make sure we set the control.min time field to match the rounded _id timestamp. - auto controlDoc = buildControlMinTimestampDoc(info.options.getTimeField(), roundedTime); - bucket->minmax.update( - controlDoc, /*metaField=*/boost::none, bucket->key.metadata.getComparator()); - return bucket; -} - -std::pair<RolloverAction, RolloverReason> BucketCatalog::_determineRolloverAction( - OperationContext* opCtx, - const BSONObj& doc, - CreationInfo* info, - Bucket* bucket, - Bucket::NewFieldNames& newFieldNamesToBeInserted, - int32_t& sizeToBeAdded, - AllowBucketCreation mode) { - // If the mode is enabled to create new buckets, then we should update stats for soft closures - // accordingly. If we specify the mode to not allow bucket creation, it means we are not sure if - // we want to soft close the bucket yet and should wait to update closure stats. - const bool shouldUpdateStats = (mode == AllowBucketCreation::kYes); - - auto bucketTime = bucket->minTime; - if (info->time - bucketTime >= Seconds(*info->options.getBucketMaxSpanSeconds())) { - if (shouldUpdateStats) { - info->stats.incNumBucketsClosedDueToTimeForward(); - } - return {RolloverAction::kSoftClose, RolloverReason::kTimeForward}; - } - if (info->time < bucketTime) { - const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( - serverGlobalParams.featureCompatibility); - if (shouldUpdateStats) { - if (canArchive) { - info->stats.incNumBucketsArchivedDueToTimeBackward(); - } else { - info->stats.incNumBucketsClosedDueToTimeBackward(); - } - } - return {canArchive ? RolloverAction::kArchive : RolloverAction::kSoftClose, - RolloverReason::kTimeBackward}; - } - if (bucket->numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { - info->stats.incNumBucketsClosedDueToCount(); - return {RolloverAction::kHardClose, RolloverReason::kCount}; - } - - // In scenarios where we have a high cardinality workload and face increased cache pressure we - // will decrease the size of buckets before we close them. - int32_t cacheDerivedBucketMaxSize = getCacheDerivedBucketMaxSize( - opCtx->getServiceContext()->getStorageEngine(), _numberOfActiveBuckets.load()); - int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, cacheDerivedBucketMaxSize); - - // Before we hit our bucket minimum count, we will allow for large measurements to be inserted - // into buckets. Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max - // bucket size to 12MB. This is to leave some space in the bucket if we need to add new internal - // fields to existing, full buckets. - static constexpr int32_t largeMeasurementsMaxBucketSize = - BSONObjMaxUserSize - (4 * 1024 * 1024); - // We restrict the ceiling of the bucket max size under cache pressure. - int32_t absoluteMaxSize = std::min(largeMeasurementsMaxBucketSize, cacheDerivedBucketMaxSize); - - calculateBucketFieldsAndSizeChange( - *bucket, doc, info->options.getMetaField(), newFieldNamesToBeInserted, sizeToBeAdded); - if (bucket->size + sizeToBeAdded > effectiveMaxSize) { - bool keepBucketOpenForLargeMeasurements = - bucket->numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) && - feature_flags::gTimeseriesScalabilityImprovements.isEnabled( - serverGlobalParams.featureCompatibility); - if (keepBucketOpenForLargeMeasurements) { - if (bucket->size + sizeToBeAdded > absoluteMaxSize) { - if (absoluteMaxSize != largeMeasurementsMaxBucketSize) { - info->stats.incNumBucketsClosedDueToCachePressure(); - return {RolloverAction::kHardClose, RolloverReason::kCachePressure}; - } - info->stats.incNumBucketsClosedDueToSize(); - return {RolloverAction::kHardClose, RolloverReason::kSize}; - } - - // There's enough space to add this measurement and we're still below the large - // measurement threshold. - if (!bucket->keptOpenDueToLargeMeasurements) { - // Only increment this metric once per bucket. - bucket->keptOpenDueToLargeMeasurements = true; - info->stats.incNumBucketsKeptOpenDueToLargeMeasurements(); - } - return {RolloverAction::kNone, RolloverReason::kNone}; - } else { - if (effectiveMaxSize == gTimeseriesBucketMaxSize) { - info->stats.incNumBucketsClosedDueToSize(); - return {RolloverAction::kHardClose, RolloverReason::kSize}; - } - info->stats.incNumBucketsClosedDueToCachePressure(); - return {RolloverAction::kHardClose, RolloverReason::kCachePressure}; - } - } - - if (schemaIncompatible( - *bucket, doc, info->options.getMetaField(), info->key.metadata.getComparator())) { - info->stats.incNumBucketsClosedDueToSchemaChange(); - return {RolloverAction::kHardClose, RolloverReason::kSchemaChange}; - } - - return {RolloverAction::kNone, RolloverReason::kNone}; -} - -Bucket* BucketCatalog::_rollover(Stripe* stripe, - WithLock stripeLock, - Bucket* bucket, - const CreationInfo& info, - RolloverAction action) { - invariant(action != RolloverAction::kNone); - if (allCommitted(*bucket)) { - // The bucket does not contain any measurements that are yet to be committed, so we can take - // action now. - if (action == RolloverAction::kArchive) { - _archiveBucket(stripe, stripeLock, bucket, info.closedBuckets); - } else { - _closeOpenBucket(*stripe, stripeLock, *bucket, *info.closedBuckets); - } - } else { - // We must keep the bucket around until all measurements are committed committed, just mark - // the action we chose now so it we know what to do when the last batch finishes. - bucket->rolloverAction = action; - } - - return _allocateBucket(stripe, stripeLock, info); -} - -ExecutionStatsController BucketCatalog::_getExecutionStats(const NamespaceString& ns) { - stdx::lock_guard catalogLock{_mutex}; - auto it = _executionStats.find(ns); - if (it != _executionStats.end()) { - return {it->second, _globalExecutionStats}; - } - - auto res = _executionStats.emplace(ns, std::make_shared<ExecutionStats>()); - return {res.first->second, _globalExecutionStats}; -} - -std::shared_ptr<ExecutionStats> BucketCatalog::_getExecutionStats(const NamespaceString& ns) const { - static const auto kEmptyStats{std::make_shared<ExecutionStats>()}; - - stdx::lock_guard catalogLock{_mutex}; - - auto it = _executionStats.find(ns); - if (it != _executionStats.end()) { - return it->second; - } - return kEmptyStats; -} - -long long BucketCatalog::_marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, - bool onlyEntryForMatchingMetaHash) { - return sizeof(Date_t) + // key in set of archived buckets for meta hash - sizeof(ArchivedBucket) + // main data for archived bucket - bucket.timeField.size() + // allocated space for timeField string, ignoring SSO - (onlyEntryForMatchingMetaHash ? sizeof(std::size_t) + // key in set (meta hash) - sizeof(decltype(Stripe::archivedBuckets)::value_type) // set container - : 0); -} - -void BucketCatalog::_updateBucketFetchAndQueryStats(ExecutionStatsController& stats, - const BucketFindResult& findResult) { - if (findResult.fetchedBucket) { - if (findResult.bucketToReopen.has_value()) { - stats.incNumBucketsFetched(); - } else { - stats.incNumBucketFetchesFailed(); - } - } - - if (findResult.queriedBucket) { - if (findResult.bucketToReopen.has_value()) { - stats.incNumBucketsQueried(); - } else { - stats.incNumBucketQueriesFailed(); - } - } -} - -class BucketCatalog::ServerStatus : public ServerStatusSection { - struct BucketCounts { - BucketCounts& operator+=(const BucketCounts& other) { - if (&other != this) { - all += other.all; - open += other.open; - idle += other.idle; - } - return *this; - } - - std::size_t all = 0; - std::size_t open = 0; - std::size_t idle = 0; - }; - - BucketCounts _getBucketCounts(const BucketCatalog& catalog) const { - BucketCounts sum; - for (auto const& stripe : catalog._stripes) { - stdx::lock_guard stripeLock{stripe.mutex}; - sum += {stripe.allBuckets.size(), stripe.openBuckets.size(), stripe.idleBuckets.size()}; - } - return sum; - } - -public: - ServerStatus() : ServerStatusSection("bucketCatalog") {} - - bool includeByDefault() const override { - return true; - } - - BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override { - const auto& bucketCatalog = BucketCatalog::get(opCtx); - { - stdx::lock_guard catalogLock{bucketCatalog._mutex}; - if (bucketCatalog._executionStats.empty()) { - return {}; - } - } - - auto counts = _getBucketCounts(bucketCatalog); - auto numActive = bucketCatalog._numberOfActiveBuckets.load(); - BSONObjBuilder builder; - builder.appendNumber("numBuckets", static_cast<long long>(numActive)); - builder.appendNumber("numOpenBuckets", static_cast<long long>(counts.open)); - builder.appendNumber("numIdleBuckets", static_cast<long long>(counts.idle)); - builder.appendNumber("numArchivedBuckets", static_cast<long long>(numActive - counts.open)); - builder.appendNumber("memoryUsage", - static_cast<long long>(bucketCatalog._memoryUsage.load())); - - // Append the global execution stats for all namespaces. - bucketCatalog.appendGlobalExecutionStats(&builder); - - // Append the global state management stats for all namespaces. - bucketCatalog.appendStateManagementStats(&builder); - - return builder.obj(); - } -} bucketCatalogServerStatus; } // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h index 751a0266e6a..90fb5d2e4ef 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h @@ -57,538 +57,220 @@ namespace mongo::timeseries::bucket_catalog { -class BucketCatalog { -protected: - using StripeNumber = std::uint8_t; - using ShouldClearFn = std::function<bool(const NamespaceString&)>; +using StripeNumber = std::uint8_t; +using ShouldClearFn = std::function<bool(const NamespaceString&)>; + +/** + * Whether to allow inserts to be batched together with those from other clients. + */ +enum class CombineWithInsertsFromOtherClients { + kAllow, + kDisallow, +}; +/** + * Return type for the insert functions. See insert() and tryInsert() for more information. + */ +class InsertResult { public: - enum class CombineWithInsertsFromOtherClients { - kAllow, - kDisallow, - }; - - /** - * Return type for the insert function. See insert() for more information. - */ - class InsertResult { - public: - InsertResult() = default; - InsertResult(InsertResult&&) = default; - InsertResult& operator=(InsertResult&&) = default; - InsertResult(const InsertResult&) = delete; - InsertResult& operator=(const InsertResult&) = delete; - - std::shared_ptr<WriteBatch> batch; - ClosedBuckets closedBuckets; - stdx::variant<std::monostate, OID, std::vector<BSONObj>> candidate; - uint64_t catalogEra = 0; - }; + InsertResult() = default; + InsertResult(InsertResult&&) = default; + InsertResult& operator=(InsertResult&&) = default; + InsertResult(const InsertResult&) = delete; + InsertResult& operator=(const InsertResult&) = delete; + + std::shared_ptr<WriteBatch> batch; + ClosedBuckets closedBuckets; + stdx::variant<std::monostate, OID, std::vector<BSONObj>> candidate; + uint64_t catalogEra = 0; +}; +/** + * Struct to hold a portion of the buckets managed by the catalog. + * + * Each of the bucket lists, as well as the buckets themselves, are protected by 'mutex'. + */ +struct Stripe { + // All access to a stripe should happen while 'mutex' is locked. + mutable Mutex mutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "BucketCatalog::Stripe::mutex"); + + // All buckets currently open in the catalog, including buckets which are full or pending + // closure but not yet committed, indexed by BucketId. Owning pointers. + stdx::unordered_map<BucketId, std::unique_ptr<Bucket>, BucketHasher> openBucketsById; + + // All buckets currently open in the catalog, including buckets which are full or pending + // closure but not yet committed, indexed by BucketKey. Non-owning pointers. + stdx::unordered_map<BucketKey, std::set<Bucket*>, BucketHasher> openBucketsByKey; + + // Open buckets that do not have any outstanding writes. + using IdleList = std::list<Bucket*>; + IdleList idleBuckets; + + // Buckets that are not currently in the catalog, but which are eligible to receive more + // measurements. The top-level map is keyed by the hash of the BucketKey, while the stored + // map is keyed by the bucket's minimum timestamp. + // + // We invert the key comparison in the inner map so that we can use lower_bound to efficiently + // find an archived bucket that is a candidate for an incoming measurement. + stdx::unordered_map<BucketKey::Hash, + std::map<Date_t, ArchivedBucket, std::greater<Date_t>>, + BucketHasher> + archivedBuckets; +}; + +/** + * This class holds all the data used to coordinate and combine time series inserts amongst threads. + */ +class BucketCatalog { +public: static BucketCatalog& get(ServiceContext* svcCtx); static BucketCatalog& get(OperationContext* opCtx); BucketCatalog() = default; - BucketCatalog(const BucketCatalog&) = delete; BucketCatalog operator=(const BucketCatalog&) = delete; - /** - * Reopens a closed bucket into the catalog given the bucket document. - */ - Status reopenBucket(OperationContext* opCtx, - const CollectionPtr& coll, - const BSONObj& bucketDoc); - - /** - * Returns the metadata for the given bucket in the following format: - * {<metadata field name>: <value>} - * All measurements in the given bucket share same metadata value. - * - * Returns an empty document if the given bucket cannot be found or if this time-series - * collection was not created with a metadata field name. - */ - BSONObj getMetadata(const BucketHandle& bucket); - - /** - * Tries to insert 'doc' into a suitable bucket. If an open bucket is full (or has incompatible - * schema), but is otherwise suitable, we will close it and open a new bucket. If we find no - * bucket with matching data and a time range that can accomodate 'doc', we will not open a new - * bucket, but rather let the caller know to search for an archived or closed bucket that can - * accomodate 'doc'. - * - * If a suitable bucket is found or opened, returns the WriteBatch into which 'doc' was - * inserted and a list of any buckets that were closed to make space to insert 'doc'. Any - * caller who receives the same batch may commit or abort the batch after claiming commit - * rights. See WriteBatch for more details. - * - * If no suitable bucket is found or opened, returns an optional bucket ID. If set, the bucket - * ID corresponds to an archived bucket which should be fetched; otherwise the caller should - * search for a previously-closed bucket that can accomodate 'doc'. The caller should proceed to - * call 'insert' to insert 'doc', passing any fetched bucket. - */ - StatusWith<InsertResult> tryInsert(OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine); - - /** - * Returns the WriteBatch into which the document was inserted and a list of any buckets that - * were closed in order to make space to insert the document. Any caller who receives the same - * batch may commit or abort the batch after claiming commit rights. See WriteBatch for more - * details. - * - * If 'bucketToReopen' is passed, we will reopen that bucket and attempt to add 'doc' to that - * bucket. Otherwise we will attempt to find a suitable open bucket, or open a new bucket if - * none exists. - */ - StatusWith<InsertResult> insert(OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine, - BucketFindResult bucketFindResult = {}); - - /** - * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have - * commit rights on batch. Returns OK if the batch was successfully prepared, or a status - * indicating why the batch was previously aborted by another operation. - */ - Status prepareCommit(std::shared_ptr<WriteBatch> batch); - - /** - * Records the result of a batch commit. Caller must already have commit rights on batch, and - * batch must have been previously prepared. - * - * Returns bucket information of a bucket if one was closed. - */ - boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info); - - /** - * Aborts the given write batch and any other outstanding batches on the same bucket, using the - * provided status. - */ - void abort(std::shared_ptr<WriteBatch> batch, const Status& status); - - /** - * Notifies the catalog of a direct write (that is, a write not initiated by the BucketCatalog) - * that will be performed on the bucket document with the specified ID. If there is already an - * internally-prepared operation on that bucket, this method will throw a - * 'WriteConflictException'. This should be followed by a call to 'directWriteFinish' after the - * write has been committed, rolled back, or otherwise finished. - */ - void directWriteStart(const NamespaceString& ns, const OID& oid); - - /** - * Notifies the catalog that a pending direct write to the bucket document with the specified ID - * has finished or been abandoned, and normal operations on the bucket can resume. After this - * point any in-memory representation of the on-disk bucket data from before the direct write - * should have been cleared from the catalog, and it may be safely reopened from the on-disk - * state. - */ - void directWriteFinish(const NamespaceString& ns, const OID& oid); - - /** - * Clears any bucket whose namespace satisfies the predicate. - */ - void clear(ShouldClearFn&& shouldClear); - - /** - * Clears the buckets for the given namespace. - */ - void clear(const NamespaceString& ns); - - /** - * Clears the buckets for the given database. - */ - void clear(StringData dbName); - - /** - * Appends the execution stats for the given namespace to the builder. - */ - void appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const; - - /** - * Appends the global execution stats for all namespaces to the builder. - */ - void appendGlobalExecutionStats(BSONObjBuilder* builder) const; - - /** - * Appends the global bucket state management stats for all namespaces to the builder. - */ - void appendStateManagementStats(BSONObjBuilder* builder) const; - - /** - * Reports the current memory usage. - */ - long long memoryUsage() const; - -protected: - /** - * Struct to hold a portion of the buckets managed by the catalog. - * - * Each of the bucket lists, as well as the buckets themselves, are protected by 'mutex'. - */ - struct Stripe { - mutable Mutex mutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "BucketCatalog::Stripe::mutex"); - - // All buckets currently in the catalog, including buckets which are full but not yet - // committed. - stdx::unordered_map<BucketId, std::unique_ptr<Bucket>, BucketHasher> allBuckets; - - // The current open bucket for each namespace and metadata pair. - stdx::unordered_map<BucketKey, std::set<Bucket*>, BucketHasher> openBuckets; - - // Buckets that do not have any outstanding writes. - using IdleList = std::list<Bucket*>; - IdleList idleBuckets; - - // Buckets that are not currently in the catalog, but which are eligible to receive more - // measurements. The top-level map is keyed by the hash of the BucketKey, while the stored - // map is keyed by the bucket's minimum timestamp. - // - // We invert the key comparison in the inner map so that we can use lower_bound to - // efficiently find an archived bucket that is a candidate for an incoming measurement. - stdx::unordered_map<BucketKey::Hash, - std::map<Date_t, ArchivedBucket, std::greater<Date_t>>, - BucketHasher> - archivedBuckets; - }; - - /** - * Bundle of information that 'insert' needs to pass down to helper methods that may create a - * new bucket. - */ - struct CreationInfo { - const BucketKey& key; - StripeNumber stripe; - const Date_t& time; - const TimeseriesOptions& options; - ExecutionStatsController& stats; - ClosedBuckets* closedBuckets; - bool openedDuetoMetadata = true; - }; - - /** - * Extracts the information from the input 'doc' that is used to map the document to a bucket. - */ - StatusWith<std::pair<BucketKey, Date_t>> _extractBucketingParameters( - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc) const; - - /** - * Maps bucket key to the stripe that is responsible for it. - */ - StripeNumber _getStripeNumber(const BucketKey& key) const; - - /** - * Mode enum to control whether the bucket retrieval methods below will return buckets that have - * a state that conflicts with insertion. - */ - enum class IgnoreBucketState { kYes, kNo }; - - /** - * Retrieve a bucket for read-only use. - */ - const Bucket* _findBucket(const Stripe& stripe, - WithLock stripeLock, - const BucketId& bucketId, - IgnoreBucketState mode = IgnoreBucketState::kNo); - - /** - * Retrieve a bucket for write use. - */ - Bucket* _useBucket(Stripe* stripe, - WithLock stripeLock, - const BucketId& bucketId, - IgnoreBucketState mode); - - /** - * Retrieve a bucket for write use, updating the state in the process. - */ - Bucket* _useBucketAndChangeState(Stripe* stripe, - WithLock stripeLock, - const BucketId& bucketId, - const BucketStateRegistry::StateChangeFn& change); - - /** - * Mode enum to control whether the bucket retrieval methods below will create new buckets if no - * suitable bucket exists. - */ - enum class AllowBucketCreation { kYes, kNo }; - - /** - * Retrieve the open bucket for write use if one exists. If none exists and 'mode' is set to - * kYes, then we will create a new bucket. - */ - Bucket* _useBucket(Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info, - AllowBucketCreation mode); - - /** - * Retrieve a previously closed bucket for write use if one exists in the catalog. Considers - * buckets that are pending closure or archival but which are still eligible to recieve new - * measurements. - */ - Bucket* _useAlternateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info); - - /** - * Given a bucket to reopen, performs validation and constructs the in-memory representation of - * the bucket. If specified, 'expectedKey' is matched against the key extracted from the - * document to validate that the bucket is expected (i.e. to help resolve hash collisions for - * archived buckets). Does *not* hand ownership of the bucket to the catalog. - */ - StatusWith<std::unique_ptr<Bucket>> _rehydrateBucket( - OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BucketToReopen& bucketToReopen, - boost::optional<const BucketKey&> expectedKey); - - /** - * Given a rehydrated 'bucket', passes ownership of that bucket to the catalog, marking the - * bucket as open. - */ - StatusWith<Bucket*> _reopenBucket(Stripe* stripe, - WithLock stripeLock, - ExecutionStatsController stats, - const BucketKey& key, - std::unique_ptr<Bucket>&& bucket, - std::uint64_t targetEra, - ClosedBuckets* closedBuckets); - - /** - * Check to see if 'insert' can use existing bucket rather than reopening a candidate bucket. If - * true, chances are the caller raced with another thread to reopen the same bucket, but if - * false, there might be another bucket that had been cleared, or that has the same _id in a - * different namespace. - */ - StatusWith<Bucket*> _reuseExistingBucket(Stripe* stripe, - WithLock stripeLock, - ExecutionStatsController* stats, - const BucketKey& key, - Bucket* existingBucket, - std::uint64_t targetEra); - - /** - * Helper method to perform the heavy lifting for both 'tryInsert' and 'insert'. See - * documentation on callers for more details. - */ - StatusWith<InsertResult> _insert(OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine, - AllowBucketCreation mode, - BucketFindResult bucketFindResult = {}); - - /** - * Given an already-selected 'bucket', inserts 'doc' to the bucket if possible. If not, and - * 'mode' is set to 'kYes', we will create a new bucket and insert into that bucket. - */ - stdx::variant<std::shared_ptr<WriteBatch>, RolloverReason> _insertIntoBucket( - OperationContext* opCtx, - Stripe* stripe, - WithLock stripeLock, - StripeNumber stripeNumber, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine, - AllowBucketCreation mode, - CreationInfo* info, - Bucket* bucket); - - /** - * Wait for other batches to finish so we can prepare 'batch' - */ - void _waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch); - - /** - * Mode to signal to '_removeBucket' what's happening to the bucket, and how to handle the - * bucket state change. - */ - enum class RemovalMode { - kClose, // Normal closure, pending compression - kArchive, // Archive bucket, no state change - kAbort, // Bucket is being cleared, possibly due to error, erase state - }; - - /** - * Removes the given bucket from the bucket catalog's internal data structures. - */ - void _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket, RemovalMode mode); - - /** - * Archives the given bucket, minimizing the memory footprint but retaining the necessary - * information required to efficiently identify it as a candidate for future insertions. - */ - void _archiveBucket(Stripe* stripe, - WithLock stripeLock, - Bucket* bucket, - ClosedBuckets* closedBuckets); - - /** - * Close an open bucket, setting the state appropriately and removing it from the catalog. - */ - void _closeOpenBucket(Stripe& stripe, - WithLock stripeLock, - Bucket& bucket, - ClosedBuckets& closedBuckets); - - /** - * Close an open bucket, setting the state appropriately and removing it from the catalog. - */ - void _closeOpenBucket(Stripe& stripe, - WithLock stripeLock, - Bucket& bucket, - boost::optional<ClosedBucket>& closedBucket); - - /** - * Identifies a previously archived bucket that may be able to accomodate the measurement - * represented by 'info', if one exists. - */ - boost::optional<OID> _findArchivedCandidate(Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info); - - /** - * Identifies a previously archived bucket that may be able to accomodate the measurement - * represented by 'info', if one exists. - */ - stdx::variant<std::monostate, OID, std::vector<BSONObj>> _getReopeningCandidate( - OperationContext* opCtx, - Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info, - bool allowQueryBasedReopening); - - /** - * Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other - * unprepared batches and remove the bucket from the catalog if there is no unprepared batch. - */ - void _abort(Stripe* stripe, - WithLock stripeLock, - std::shared_ptr<WriteBatch> batch, - const Status& status); - - /** - * Aborts any unprepared batches for the given bucket, then removes the bucket if there is no - * prepared batch. If 'batch' is non-null, it is assumed that the caller has commit rights for - * that batch. - */ - void _abort(Stripe* stripe, - WithLock stripeLock, - Bucket* bucket, - std::shared_ptr<WriteBatch> batch, - const Status& status); - - /** - * Records that compression for the given bucket has been completed, and the BucketCatalog can - * forget about the bucket. - */ - void _compressionDone(const BucketId& bucketId); - - /** - * Adds the bucket to a list of idle buckets to be expired at a later date. - */ - void _markBucketIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket); - - /** - * Remove the bucket from the list of idle buckets. The second parameter encodes whether the - * caller holds a lock on _idleMutex. - */ - void _markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket); - - /** - * Expires idle buckets until the bucket catalog's memory usage is below the expiry - * threshold. - */ - void _expireIdleBuckets(Stripe* stripe, - WithLock stripeLock, - ExecutionStatsController& stats, - ClosedBuckets* closedBuckets); - - /** - * Allocates a new bucket and adds it to the catalog. - */ - Bucket* _allocateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info); - - /** - * Determines if 'bucket' needs to be rolled over to accomodate 'doc'. If so, determines whether - * to archive or close 'bucket'. - */ - std::pair<RolloverAction, RolloverReason> _determineRolloverAction( - OperationContext* opCtx, - const BSONObj& doc, - CreationInfo* info, - Bucket* bucket, - Bucket::NewFieldNames& newFieldNamesToBeInserted, - int32_t& sizeToBeAdded, - AllowBucketCreation mode); - - /** - * Close the existing, full bucket and open a new one for the same metadata. - * - * Writes information about the closed bucket to the 'info' parameter. - */ - Bucket* _rollover(Stripe* stripe, - WithLock stripeLock, - Bucket* bucket, - const CreationInfo& info, - RolloverAction action); - - ExecutionStatsController _getExecutionStats(const NamespaceString& ns); - std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const; - - void _appendExecutionStatsToBuilder(const ExecutionStats* stats, BSONObjBuilder* builder) const; - - /** - * Calculates the marginal memory usage for an archived bucket. The - * 'onlyEntryForMatchingMetaHash' parameter indicates that the bucket will be (if inserting) - * or was (if removing) the only bucket associated with it's meta hash value. If true, then - * the returned value will attempt to account for the overhead of the map data structure for - * the meta hash value. - */ - static long long _marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, - bool onlyEntryForMatchingMetaHash); - - /** - * Updates stats to reflect the status of bucket fetches and queries based off of the FindResult - * (which is populated when attempting to reopen a bucket). - */ - void _updateBucketFetchAndQueryStats(ExecutionStatsController& stats, - const BucketFindResult& findResult); - - mutable Mutex _mutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketCatalog::_mutex"); - - BucketStateRegistry _bucketStateRegistry{_mutex}; + // Stores state information about all buckets managed by the catalog, across stripes. + BucketStateRegistry bucketStateRegistry; + // The actual buckets in the catalog are distributed across a number of 'Stripe's. Each can be + // independently locked and operated on in parallel. static constexpr std::size_t kNumberOfStripes = 32; - std::array<Stripe, kNumberOfStripes> _stripes; + std::array<Stripe, kNumberOfStripes> stripes; - // Per-namespace execution stats. This map is protected by '_mutex'. Once you complete your + // Per-namespace execution stats. This map is protected by 'mutex'. Once you complete your // lookup, you can keep the shared_ptr to an individual namespace's stats object and release the // lock. The object itself is thread-safe (using atomics). - stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats; + mutable Mutex mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketCatalog::mutex"); + stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> executionStats; // Global execution stats used to report aggregated metrics in server status. - ExecutionStats _globalExecutionStats; + ExecutionStats globalExecutionStats; - // Approximate memory usage of the bucket catalog. - AtomicWord<uint64_t> _memoryUsage; + // Approximate memory usage of the bucket catalog across all stripes. + AtomicWord<uint64_t> memoryUsage; - // Approximate cardinality of opened and archived buckets. - AtomicWord<uint32_t> _numberOfActiveBuckets; - - class ServerStatus; + // Cardinality of opened and archived buckets managed across all stripes. + AtomicWord<uint32_t> numberOfActiveBuckets; }; + +/** + * Returns the metadata for the given bucket in the following format: + * {<metadata field name>: <value>} + * All measurements in the given bucket share same metadata value. + * + * Returns an empty document if the given bucket cannot be found or if this time-series collection + * was not created with a metadata field name. + */ +BSONObj getMetadata(BucketCatalog& catalog, const BucketHandle& bucket); + +/** + * Tries to insert 'doc' into a suitable bucket. If an open bucket is full (or has incompatible + * schema), but is otherwise suitable, we will close it and open a new bucket. If we find no bucket + * with matching data and a time range that can accomodate 'doc', we will not open a new bucket, but + * rather let the caller know to search for an archived or closed bucket that can accomodate 'doc'. + * + * If a suitable bucket is found or opened, returns the WriteBatch into which 'doc' was inserted and + * a list of any buckets that were closed to make space to insert 'doc'. Any caller who receives the + * same batch may commit or abort the batch after claiming commit rights. See WriteBatch for more + * details. + * + * If no suitable bucket is found or opened, returns an optional bucket ID. If set, the bucket ID + * corresponds to an archived bucket which should be fetched; otherwise the caller should search for + * a previously-closed bucket that can accomodate 'doc'. The caller should proceed to call 'insert' + * to insert 'doc', passing any fetched bucket. + */ +StatusWith<InsertResult> tryInsert(OperationContext* opCtx, + BucketCatalog& catalog, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine); + +/** + * Returns the WriteBatch into which the document was inserted and a list of any buckets that were + * closed in order to make space to insert the document. Any caller who receives the same batch may + * commit or abort the batch after claiming commit rights. See WriteBatch for more details. + * + * If 'bucketToReopen' is passed, we will reopen that bucket and attempt to add 'doc' to that + * bucket. Otherwise we will attempt to find a suitable open bucket, or open a new bucket if none + * exists. + */ +StatusWith<InsertResult> insert(OperationContext* opCtx, + BucketCatalog& catalog, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + BucketFindResult bucketFindResult = {}); + +/** + * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have + * commit rights on batch. Returns OK if the batch was successfully prepared, or a status indicating + * why the batch was previously aborted by another operation. If another batch is already prepared, + * this operation will block waiting for it to complete. + */ +Status prepareCommit(BucketCatalog& catalog, std::shared_ptr<WriteBatch> batch); + +/** + * Records the result of a batch commit. Caller must already have commit rights on batch, and batch + * must have been previously prepared. + * + * Returns bucket information of a bucket if one was closed. + */ +boost::optional<ClosedBucket> finish(BucketCatalog& catalog, + std::shared_ptr<WriteBatch> batch, + const CommitInfo& info); + +/** + * Aborts the given write batch and any other outstanding batches on the same bucket, using the + * provided status. + */ +void abort(BucketCatalog& catalog, std::shared_ptr<WriteBatch> batch, const Status& status); + +/** + * Notifies the catalog of a direct write (that is, a write not initiated by the BucketCatalog) that + * will be performed on the bucket document with the specified ID. If there is already an + * internally-prepared operation on that bucket, this method will throw a 'WriteConflictException'. + * This should be followed by a call to 'directWriteFinish' after the write has been committed, + * rolled back, or otherwise finished. + */ +void directWriteStart(BucketStateRegistry& registry, const NamespaceString& ns, const OID& oid); + +/** + * Notifies the catalog that a pending direct write to the bucket document with the specified ID has + * finished or been abandoned, and normal operations on the bucket can resume. After this point any + * in-memory representation of the on-disk bucket data from before the direct write should have been + * cleared from the catalog, and it may be safely reopened from the on-disk state. + */ +void directWriteFinish(BucketStateRegistry& registry, const NamespaceString& ns, const OID& oid); + +/** + * Clears any bucket whose namespace satisfies the predicate by removing the bucket from the catalog + * asynchronously through the BucketStateRegistry. + */ +void clear(BucketCatalog& catalog, ShouldClearFn&& shouldClear); + +/** + * Clears the buckets for the given namespace by removing the bucket from the catalog asynchronously + * through the BucketStateRegistry. + */ +void clear(BucketCatalog& catalog, const NamespaceString& ns); + +/** + * Clears the buckets for the given database by removing the bucket from the catalog asynchronously + * through the BucketStateRegistry. + */ +void clear(BucketCatalog& catalog, StringData dbName); + +/** + * Appends the execution stats for the given namespace to the builder. + */ +void appendExecutionStats(const BucketCatalog& catalog, + const NamespaceString& ns, + BSONObjBuilder& builder); + } // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp index e8e616d2ddf..022b9b5630d 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp @@ -328,19 +328,19 @@ void handleDirectWrite(OperationContext* opCtx, const NamespaceString& ns, const // First notify the BucketCatalog that we intend to start a direct write, so we can conflict // with any already-prepared operation, and also block bucket reopening if it's enabled. auto& bucketCatalog = BucketCatalog::get(opCtx); - bucketCatalog.directWriteStart(resolvedNs, bucketId); + directWriteStart(bucketCatalog.bucketStateRegistry, resolvedNs, bucketId); // Then register callbacks so we can let the BucketCatalog know that we are done with our direct // write after the actual write takes place (or is abandoned), and allow reopening. opCtx->recoveryUnit()->onCommit([svcCtx = opCtx->getServiceContext(), resolvedNs, bucketId]( OperationContext*, boost::optional<Timestamp>) { auto& bucketCatalog = BucketCatalog::get(svcCtx); - bucketCatalog.directWriteFinish(resolvedNs, bucketId); + directWriteFinish(bucketCatalog.bucketStateRegistry, resolvedNs, bucketId); }); opCtx->recoveryUnit()->onRollback( [svcCtx = opCtx->getServiceContext(), resolvedNs, bucketId](OperationContext*) { auto& bucketCatalog = BucketCatalog::get(svcCtx); - bucketCatalog.directWriteFinish(resolvedNs, bucketId); + directWriteFinish(bucketCatalog.bucketStateRegistry, resolvedNs, bucketId); }); } diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp new file mode 100644 index 00000000000..c0fd0abd8eb --- /dev/null +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.cpp @@ -0,0 +1,1379 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h" + +#include <boost/utility/in_place_factory.hpp> + +#include "mongo/bson/util/bsoncolumn.h" +#include "mongo/db/storage/storage_parameters_gen.h" +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.h" +#include "mongo/db/timeseries/timeseries_constants.h" +#include "mongo/util/fail_point.h" + +namespace mongo::timeseries::bucket_catalog::internal { +namespace { +MONGO_FAIL_POINT_DEFINE(alwaysUseSameBucketCatalogStripe); +MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeReopeningBucket); +MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch); + +OperationId getOpId(OperationContext* opCtx, CombineWithInsertsFromOtherClients combine) { + switch (combine) { + case CombineWithInsertsFromOtherClients::kAllow: + return 0; + case CombineWithInsertsFromOtherClients::kDisallow: + invariant(opCtx->getOpID()); + return opCtx->getOpID(); + } + MONGO_UNREACHABLE; +} + +/** + * Updates stats to reflect the status of bucket fetches and queries based off of the FindResult + * (which is populated when attempting to reopen a bucket). + */ +void updateBucketFetchAndQueryStats(const BucketFindResult& findResult, + ExecutionStatsController& stats) { + if (findResult.fetchedBucket) { + if (findResult.bucketToReopen.has_value()) { + stats.incNumBucketsFetched(); + } else { + stats.incNumBucketFetchesFailed(); + } + } + + if (findResult.queriedBucket) { + if (findResult.bucketToReopen.has_value()) { + stats.incNumBucketsQueried(); + } else { + stats.incNumBucketQueriesFailed(); + } + } +} + +/** + * Calculate the bucket max size constrained by the cache size and the cardinality of active + * buckets. + */ +int32_t getCacheDerivedBucketMaxSize(StorageEngine* storageEngine, uint32_t workloadCardinality) { + invariant(storageEngine); + uint64_t storageCacheSize = + static_cast<uint64_t>(storageEngine->getEngine()->getCacheSizeMB() * 1024 * 1024); + + if (!feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility) || + storageCacheSize == 0 || workloadCardinality == 0) { + return INT_MAX; + } + + uint64_t derivedMaxSize = storageCacheSize / (2 * workloadCardinality); + uint64_t intMax = static_cast<uint64_t>(std::numeric_limits<int32_t>::max()); + return std::min(derivedMaxSize, intMax); +} + +/** + * Abandons the write batch and notifies any waiters that the bucket has been cleared. + */ +void abortWriteBatch(WriteBatch& batch, const Status& status) { + if (batch.promise.getFuture().isReady()) { + return; + } + + batch.promise.setError(status); +} +} // namespace + +StripeNumber getStripeNumber(const BucketKey& key) { + if (MONGO_unlikely(alwaysUseSameBucketCatalogStripe.shouldFail())) { + return 0; + } + return key.hash % BucketCatalog::kNumberOfStripes; +} + +StatusWith<std::pair<BucketKey, Date_t>> extractBucketingParameters( + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc) { + Date_t time; + BSONElement metadata; + + if (!options.getMetaField().has_value()) { + auto swTime = extractTime(doc, options.getTimeField()); + if (!swTime.isOK()) { + return swTime.getStatus(); + } + time = swTime.getValue(); + } else { + auto swDocTimeAndMeta = + extractTimeAndMeta(doc, options.getTimeField(), options.getMetaField().value()); + if (!swDocTimeAndMeta.isOK()) { + return swDocTimeAndMeta.getStatus(); + } + time = swDocTimeAndMeta.getValue().first; + metadata = swDocTimeAndMeta.getValue().second; + } + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto key = BucketKey{ns, BucketMetadata{metadata, comparator, options.getMetaField()}}; + + return {std::make_pair(key, time)}; +} + +const Bucket* findBucket(BucketStateRegistry& registry, + const Stripe& stripe, + WithLock, + const BucketId& bucketId, + IgnoreBucketState mode) { + auto it = stripe.openBucketsById.find(bucketId); + if (it != stripe.openBucketsById.end()) { + if (mode == IgnoreBucketState::kYes) { + return it->second.get(); + } + + if (auto state = getBucketState(registry, it->second.get()); + state && !state.value().conflictsWithInsertion()) { + return it->second.get(); + } + } + return nullptr; +} + +Bucket* useBucket(BucketStateRegistry& registry, + Stripe& stripe, + WithLock stripeLock, + const BucketId& bucketId, + IgnoreBucketState mode) { + return const_cast<Bucket*>(findBucket(registry, stripe, stripeLock, bucketId, mode)); +} + +Bucket* useBucketAndChangeState(BucketStateRegistry& registry, + Stripe& stripe, + WithLock stripeLock, + const BucketId& bucketId, + const BucketStateRegistry::StateChangeFn& change) { + auto it = stripe.openBucketsById.find(bucketId); + if (it != stripe.openBucketsById.end()) { + if (auto state = changeBucketState(registry, it->second.get(), change); + state && !state.value().conflictsWithInsertion()) { + return it->second.get(); + } + } + return nullptr; +} + +Bucket* useBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info, + AllowBucketCreation mode) { + auto it = stripe.openBucketsByKey.find(info.key); + if (it == stripe.openBucketsByKey.end()) { + // No open bucket for this metadata. + return mode == AllowBucketCreation::kYes + ? &allocateBucket(catalog, stripe, stripeLock, info) + : nullptr; + } + + auto& openSet = it->second; + Bucket* bucket = nullptr; + for (Bucket* potentialBucket : openSet) { + if (potentialBucket->rolloverAction == RolloverAction::kNone) { + bucket = potentialBucket; + break; + } + } + if (!bucket) { + return mode == AllowBucketCreation::kYes + ? &allocateBucket(catalog, stripe, stripeLock, info) + : nullptr; + } + + if (auto state = getBucketState(catalog.bucketStateRegistry, bucket); + state && !state.value().conflictsWithInsertion()) { + markBucketNotIdle(stripe, stripeLock, *bucket); + return bucket; + } + + abort(catalog, + stripe, + stripeLock, + *bucket, + nullptr, + getTimeseriesBucketClearedError(bucket->bucketId.ns, bucket->bucketId.oid)); + + return mode == AllowBucketCreation::kYes ? &allocateBucket(catalog, stripe, stripeLock, info) + : nullptr; +} + +Bucket* useAlternateBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info) { + auto it = stripe.openBucketsByKey.find(info.key); + if (it == stripe.openBucketsByKey.end()) { + // No open bucket for this metadata. + return nullptr; + } + + auto& openSet = it->second; + // In order to potentially erase elements of the set while we iterate it (via _abort), we need + // to advance the iterator before we call erase. This means we can't use the more + // straightforward range iteration, and use the somewhat awkward pattern below. + for (auto it = openSet.begin(); it != openSet.end();) { + Bucket* potentialBucket = *it++; + + if (potentialBucket->rolloverAction == RolloverAction::kNone || + potentialBucket->rolloverAction == RolloverAction::kHardClose) { + continue; + } + + auto bucketTime = potentialBucket->minTime; + if (info.time - bucketTime >= Seconds(*info.options.getBucketMaxSpanSeconds()) || + info.time < bucketTime) { + continue; + } + + auto state = getBucketState(catalog.bucketStateRegistry, potentialBucket); + invariant(state); + if (!state.value().conflictsWithInsertion()) { + invariant(!potentialBucket->idleListEntry.has_value()); + return potentialBucket; + } + + // Clean up the bucket if it has been cleared. + if (state.value().isSet(BucketStateFlag::kCleared)) { + abort(catalog, + stripe, + stripeLock, + *potentialBucket, + nullptr, + getTimeseriesBucketClearedError(potentialBucket->bucketId.ns, + potentialBucket->bucketId.oid)); + } + } + + return nullptr; +} + +StatusWith<std::unique_ptr<Bucket>> rehydrateBucket( + OperationContext* opCtx, + BucketStateRegistry& registry, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BucketToReopen& bucketToReopen, + const BucketKey* expectedKey) { + invariant(feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility)); + const auto& [bucketDoc, validator, catalogEra] = bucketToReopen; + if (catalogEra < getCurrentEra(registry)) { + return {ErrorCodes::WriteConflict, "Bucket is from an earlier era, may be outdated"}; + } + + BSONElement bucketIdElem = bucketDoc.getField(kBucketIdFieldName); + if (bucketIdElem.eoo() || bucketIdElem.type() != BSONType::jstOID) { + return {ErrorCodes::BadValue, + str::stream() << kBucketIdFieldName << " is missing or not an ObjectId"}; + } + + // Validate the bucket document against the schema. + auto result = validator(opCtx, bucketDoc); + if (result.first != Collection::SchemaValidationResult::kPass) { + return result.second; + } + + auto controlField = bucketDoc.getObjectField(kBucketControlFieldName); + auto closedElem = controlField.getField(kBucketControlClosedFieldName); + if (closedElem.booleanSafe()) { + return {ErrorCodes::BadValue, + "Bucket has been marked closed and is not eligible for reopening"}; + } + + BSONElement metadata; + auto metaFieldName = options.getMetaField(); + if (metaFieldName) { + metadata = bucketDoc.getField(kBucketMetaFieldName); + } + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto key = BucketKey{ns, BucketMetadata{metadata, comparator, options.getMetaField()}}; + if (expectedKey && key != *expectedKey) { + return {ErrorCodes::BadValue, "Bucket metadata does not match (hash collision)"}; + } + + auto minTime = controlField.getObjectField(kBucketControlMinFieldName) + .getField(options.getTimeField()) + .Date(); + BucketId bucketId{key.ns, bucketIdElem.OID()}; + std::unique_ptr<Bucket> bucket = + std::make_unique<Bucket>(bucketId, key, options.getTimeField(), minTime, registry); + + const bool isCompressed = isCompressedBucket(bucketDoc); + + // Initialize the remaining member variables from the bucket document. + if (isCompressed) { + auto decompressed = decompressBucket(bucketDoc); + if (!decompressed.has_value()) { + return Status{ErrorCodes::BadValue, "Bucket could not be decompressed"}; + } + bucket->size = decompressed.value().objsize(); + bucket->decompressed = DecompressionResult{bucketDoc, decompressed.value()}; + bucket->memoryUsage += (decompressed.value().objsize() + bucketDoc.objsize()); + } else { + bucket->size = bucketDoc.objsize(); + } + + // Populate the top-level data field names. + const BSONObj& dataObj = bucketDoc.getObjectField(kBucketDataFieldName); + for (const BSONElement& dataElem : dataObj) { + auto hashedKey = StringSet::hasher().hashed_key(dataElem.fieldName()); + bucket->fieldNames.emplace(hashedKey); + } + + auto swMinMax = generateMinMaxFromBucketDoc(bucketDoc, comparator); + if (!swMinMax.isOK()) { + return swMinMax.getStatus(); + } + bucket->minmax = std::move(swMinMax.getValue()); + + auto swSchema = generateSchemaFromBucketDoc(bucketDoc, comparator); + if (!swSchema.isOK()) { + return swSchema.getStatus(); + } + bucket->schema = std::move(swSchema.getValue()); + + uint32_t numMeasurements = 0; + const BSONElement timeColumnElem = dataObj.getField(options.getTimeField()); + + if (isCompressed && timeColumnElem.type() == BSONType::BinData) { + BSONColumn storage{timeColumnElem}; + numMeasurements = storage.size(); + } else if (timeColumnElem.isABSONObj()) { + numMeasurements = timeColumnElem.Obj().nFields(); + } else { + return {ErrorCodes::BadValue, + "Bucket data field is malformed (missing a valid time column)"}; + } + + bucket->numMeasurements = numMeasurements; + bucket->numCommittedMeasurements = numMeasurements; + + // The namespace is stored two times: the bucket itself and openBucketsByKey. We don't have a + // great approximation for the _schema or _minmax data structure size, so we use the control + // field size as an approximation for _minmax, and half that size for _schema. Since the + // metadata is stored in the bucket, we need to add that as well. A unique pointer to the bucket + // is stored once: openBucketsById. A raw pointer to the bucket is stored at most twice: + // openBucketsByKey, idleBuckets. + bucket->memoryUsage += (key.ns.size() * 2) + 1.5 * controlField.objsize() + + key.metadata.toBSON().objsize() + sizeof(Bucket) + sizeof(std::unique_ptr<Bucket>) + + (sizeof(Bucket*) * 2); + + return {std::move(bucket)}; +} + +StatusWith<std::reference_wrapper<Bucket>> reopenBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + ExecutionStatsController& stats, + const BucketKey& key, + std::unique_ptr<Bucket>&& bucket, + std::uint64_t targetEra, + ClosedBuckets& closedBuckets) { + invariant(bucket); + + expireIdleBuckets(catalog, stripe, stripeLock, stats, closedBuckets); + + // We may need to initialize the bucket's state. + bool conflicts = false; + auto initializeStateFn = + [targetEra, &conflicts](boost::optional<BucketState> input, + std::uint64_t currentEra) -> boost::optional<BucketState> { + if (targetEra < currentEra || + (input.has_value() && input.value().conflictsWithReopening())) { + conflicts = true; + return input; + } + conflicts = false; + return input.has_value() ? input.value() : BucketState{}; + }; + + auto state = + changeBucketState(catalog.bucketStateRegistry, bucket->bucketId, initializeStateFn); + if (conflicts) { + return {ErrorCodes::WriteConflict, "Bucket may be stale"}; + } + + // If this bucket was archived, we need to remove it from the set of archived buckets. + if (auto setIt = stripe.archivedBuckets.find(key.hash); setIt != stripe.archivedBuckets.end()) { + auto& archivedSet = setIt->second; + if (auto bucketIt = archivedSet.find(bucket->minTime); + bucketIt != archivedSet.end() && bucket->bucketId == bucketIt->second.bucketId) { + long long memory = + marginalMemoryUsageForArchivedBucket(bucketIt->second, archivedSet.size() == 1); + if (archivedSet.size() == 1) { + stripe.archivedBuckets.erase(setIt); + } else { + archivedSet.erase(bucketIt); + } + catalog.memoryUsage.fetchAndSubtract(memory); + catalog.numberOfActiveBuckets.fetchAndSubtract(1); + } + } + + // Pass ownership of the reopened bucket to the bucket catalog. + auto [insertedIt, newlyInserted] = + stripe.openBucketsById.try_emplace(bucket->bucketId, std::move(bucket)); + invariant(newlyInserted); + Bucket* unownedBucket = insertedIt->second.get(); + + // If we already have an open bucket for this key, we need to close it. + if (auto it = stripe.openBucketsByKey.find(key); it != stripe.openBucketsByKey.end()) { + auto& openSet = it->second; + for (Bucket* existingBucket : openSet) { + if (existingBucket->rolloverAction == RolloverAction::kNone) { + stats.incNumBucketsClosedDueToReopening(); + if (allCommitted(*existingBucket)) { + closeOpenBucket(catalog, stripe, stripeLock, *existingBucket, closedBuckets); + } else { + existingBucket->rolloverAction = RolloverAction::kSoftClose; + } + // We should only have one open bucket at a time. + break; + } + } + } + + // Now actually mark this bucket as open. + stripe.openBucketsByKey[key].emplace(unownedBucket); + stats.incNumBucketsReopened(); + + catalog.memoryUsage.addAndFetch(unownedBucket->memoryUsage); + catalog.numberOfActiveBuckets.fetchAndAdd(1); + + return *unownedBucket; +} + +StatusWith<std::reference_wrapper<Bucket>> reuseExistingBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + ExecutionStatsController& stats, + const BucketKey& key, + Bucket& existingBucket, + std::uint64_t targetEra) { + // If we have an existing bucket, passing the Bucket* will let us check if the bucket was + // cleared as part of a set since the last time it was used. If we were to just check by + // OID, we may miss if e.g. there was a move chunk operation. + bool conflicts = false; + auto state = changeBucketState( + catalog.bucketStateRegistry, + &existingBucket, + [targetEra, &conflicts](boost::optional<BucketState> input, + std::uint64_t currentEra) -> boost::optional<BucketState> { + if (targetEra < currentEra || + (input.has_value() && input.value().conflictsWithReopening())) { + conflicts = true; + return input; + } + conflicts = false; + return input.has_value() ? input.value() : BucketState{}; + }); + if (state.has_value() && state.value().isSet(BucketStateFlag::kCleared)) { + abort(catalog, + stripe, + stripeLock, + existingBucket, + nullptr, + getTimeseriesBucketClearedError(existingBucket.bucketId.ns, + existingBucket.bucketId.oid)); + conflicts = true; + } + if (conflicts) { + return {ErrorCodes::WriteConflict, "Bucket may be stale"}; + } + + // It's possible to have two buckets with the same ID in different collections, so let's make + // extra sure the existing bucket is the right one. + if (existingBucket.bucketId.ns != key.ns) { + return {ErrorCodes::BadValue, "Cannot re-use bucket: same ID but different namespace"}; + } + + // If the bucket was already open, wasn't cleared, the state didn't conflict with reopening, and + // the namespace matches, then we can simply return it. + stats.incNumDuplicateBucketsReopened(); + markBucketNotIdle(stripe, stripeLock, existingBucket); + + return existingBucket; +} + +stdx::variant<std::shared_ptr<WriteBatch>, RolloverReason> insertIntoBucket( + OperationContext* opCtx, + BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + StripeNumber stripeNumber, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + CreationInfo& info, + Bucket& existingBucket) { + Bucket::NewFieldNames newFieldNamesToBeInserted; + int32_t sizeToBeAdded = 0; + + bool isNewlyOpenedBucket = (existingBucket.size == 0); + std::reference_wrapper<Bucket> bucketToUse{existingBucket}; + if (!isNewlyOpenedBucket) { + auto [action, reason] = determineRolloverAction(opCtx, + doc, + info, + existingBucket, + catalog.numberOfActiveBuckets.load(), + newFieldNamesToBeInserted, + sizeToBeAdded, + mode); + if ((action == RolloverAction::kSoftClose || action == RolloverAction::kArchive) && + mode == AllowBucketCreation::kNo) { + // We don't actually want to roll this bucket over yet, bail out. + return reason; + } else if (action != RolloverAction::kNone) { + info.openedDuetoMetadata = false; + bucketToUse = rollover(catalog, stripe, stripeLock, existingBucket, info, action); + isNewlyOpenedBucket = true; + } + } + Bucket& bucket = bucketToUse.get(); + const auto previousMemoryUsage = bucket.memoryUsage; + + if (isNewlyOpenedBucket) { + calculateBucketFieldsAndSizeChange( + bucket, doc, info.options.getMetaField(), newFieldNamesToBeInserted, sizeToBeAdded); + } + + auto batch = activeBatch(bucket, getOpId(opCtx, combine), stripeNumber, info.stats); + batch->measurements.push_back(doc); + for (auto&& field : newFieldNamesToBeInserted) { + batch->newFieldNamesToBeInserted[field] = field.hash(); + bucket.uncommittedFieldNames.emplace(field); + } + + bucket.numMeasurements++; + bucket.size += sizeToBeAdded; + if (isNewlyOpenedBucket) { + // The namespace is stored two times: the bucket itself and openBucketsByKey. + // We don't have a great approximation for the + // _schema size, so we use initial document size minus metadata as an approximation. Since + // the metadata itself is stored once, in the bucket, we can combine the two and just use + // the initial document size. A unique pointer to the bucket is stored once: + // openBucketsById. A raw pointer to the bucket is stored at most twice: openBucketsByKey, + // idleBuckets. + bucket.memoryUsage += (info.key.ns.size() * 2) + doc.objsize() + sizeof(Bucket) + + sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2); + + auto updateStatus = bucket.schema.update( + doc, info.options.getMetaField(), info.key.metadata.getComparator()); + invariant(updateStatus == Schema::UpdateStatus::Updated); + } else { + catalog.memoryUsage.fetchAndSubtract(previousMemoryUsage); + } + catalog.memoryUsage.fetchAndAdd(bucket.memoryUsage); + + return batch; +} + +StatusWith<InsertResult> insert(OperationContext* opCtx, + BucketCatalog& catalog, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + BucketFindResult bucketFindResult) { + invariant(!ns.isTimeseriesBucketsCollection()); + + auto res = extractBucketingParameters(ns, comparator, options, doc); + if (!res.isOK()) { + return res.getStatus(); + } + auto& key = res.getValue().first; + auto time = res.getValue().second; + + ExecutionStatsController stats = getOrInitializeExecutionStats(catalog, ns); + updateBucketFetchAndQueryStats(bucketFindResult, stats); + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto stripeNumber = getStripeNumber(key); + + InsertResult result; + result.catalogEra = getCurrentEra(catalog.bucketStateRegistry); + CreationInfo info{key, stripeNumber, time, options, stats, &result.closedBuckets}; + boost::optional<BucketToReopen> bucketToReopen = std::move(bucketFindResult.bucketToReopen); + + auto rehydratedBucket = bucketToReopen.has_value() + ? rehydrateBucket(opCtx, + catalog.bucketStateRegistry, + ns, + comparator, + options, + bucketToReopen.value(), + &key) + : StatusWith<std::unique_ptr<Bucket>>{ErrorCodes::BadValue, "No bucket to rehydrate"}; + if (rehydratedBucket.getStatus().code() == ErrorCodes::WriteConflict) { + stats.incNumBucketReopeningsFailed(); + return rehydratedBucket.getStatus(); + } + + auto& stripe = catalog.stripes[stripeNumber]; + stdx::lock_guard stripeLock{stripe.mutex}; + + if (rehydratedBucket.isOK()) { + invariant(mode == AllowBucketCreation::kYes); + hangTimeseriesInsertBeforeReopeningBucket.pauseWhileSet(); + + StatusWith<std::reference_wrapper<Bucket>> swBucket{ErrorCodes::BadValue, ""}; + auto existingIt = stripe.openBucketsById.find(rehydratedBucket.getValue()->bucketId); + if (existingIt != stripe.openBucketsById.end()) { + // First let's check the existing bucket if we have one. + Bucket* existingBucket = existingIt->second.get(); + swBucket = reuseExistingBucket(catalog, + stripe, + stripeLock, + stats, + key, + *existingBucket, + bucketToReopen->catalogEra); + } else { + // No existing bucket to use, go ahead and try to reopen our rehydrated bucket. + swBucket = reopenBucket(catalog, + stripe, + stripeLock, + stats, + key, + std::move(rehydratedBucket.getValue()), + bucketToReopen->catalogEra, + result.closedBuckets); + } + + if (swBucket.isOK()) { + Bucket& bucket = swBucket.getValue().get(); + auto insertionResult = insertIntoBucket( + opCtx, catalog, stripe, stripeLock, stripeNumber, doc, combine, mode, info, bucket); + auto* batch = stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); + invariant(batch); + result.batch = *batch; + + return std::move(result); + } else { + stats.incNumBucketReopeningsFailed(); + if (swBucket.getStatus().code() == ErrorCodes::WriteConflict) { + return swBucket.getStatus(); + } + // If we had a different type of error, then we should fall through and proceed to open + // a new bucket. + } + } + + Bucket* bucket = useBucket(catalog, stripe, stripeLock, info, mode); + if (!bucket) { + invariant(mode == AllowBucketCreation::kNo); + constexpr bool allowQueryBasedReopening = true; + result.candidate = getReopeningCandidate( + opCtx, catalog, stripe, stripeLock, info, allowQueryBasedReopening); + return std::move(result); + } + + auto insertionResult = insertIntoBucket( + opCtx, catalog, stripe, stripeLock, stripeNumber, doc, combine, mode, info, *bucket); + if (auto* reason = stdx::get_if<RolloverReason>(&insertionResult)) { + invariant(mode == AllowBucketCreation::kNo); + if (allCommitted(*bucket)) { + markBucketIdle(stripe, stripeLock, *bucket); + } + + // If we were time forward or backward, we might be able to "reopen" a bucket we still have + // in memory that's set to be closed when pending operations finish. + if ((*reason == RolloverReason::kTimeBackward || *reason == RolloverReason::kTimeForward)) { + if (Bucket* alternate = useAlternateBucket(catalog, stripe, stripeLock, info)) { + insertionResult = insertIntoBucket(opCtx, + catalog, + stripe, + stripeLock, + stripeNumber, + doc, + combine, + mode, + info, + *alternate); + if (auto* batch = stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult)) { + result.batch = *batch; + return std::move(result); + } + + // We weren't able to insert into the other bucket, so fall through to the regular + // reopening procedure. + } + } + + bool allowQueryBasedReopening = (*reason == RolloverReason::kTimeBackward); + result.candidate = getReopeningCandidate( + opCtx, catalog, stripe, stripeLock, info, allowQueryBasedReopening); + } else { + result.batch = *stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); + } + return std::move(result); +} + +void waitToCommitBatch(BucketStateRegistry& registry, + Stripe& stripe, + const std::shared_ptr<WriteBatch>& batch) { + while (true) { + std::shared_ptr<WriteBatch> current; + + { + stdx::lock_guard stripeLock{stripe.mutex}; + Bucket* bucket = useBucket( + registry, stripe, stripeLock, batch->bucketHandle.bucketId, IgnoreBucketState::kNo); + if (!bucket || isWriteBatchFinished(*batch)) { + return; + } + + current = bucket->preparedBatch; + if (!current) { + // No other batches for this bucket are currently committing, so we can proceed. + bucket->preparedBatch = batch; + bucket->batches.erase(batch->opId); + return; + } + } + + // We only hit this failpoint when there are conflicting prepared batches on the same + // bucket. + hangWaitingForConflictingPreparedBatch.pauseWhileSet(); + + // We have to wait for someone else to finish. + getWriteBatchResult(*current).getStatus().ignore(); // We don't care about the result. + } +} + +void removeBucket( + BucketCatalog& catalog, Stripe& stripe, WithLock stripeLock, Bucket& bucket, RemovalMode mode) { + invariant(bucket.batches.empty()); + invariant(!bucket.preparedBatch); + + auto allIt = stripe.openBucketsById.find(bucket.bucketId); + invariant(allIt != stripe.openBucketsById.end()); + + catalog.memoryUsage.fetchAndSubtract(bucket.memoryUsage); + markBucketNotIdle(stripe, stripeLock, bucket); + + // If the bucket was rolled over, then there may be a different open bucket for this metadata. + auto openIt = stripe.openBucketsByKey.find({bucket.bucketId.ns, bucket.key.metadata}); + if (openIt != stripe.openBucketsByKey.end()) { + auto& openSet = openIt->second; + auto bucketIt = openSet.find(&bucket); + if (bucketIt != openSet.end()) { + if (openSet.size() == 1) { + stripe.openBucketsByKey.erase(openIt); + } else { + openSet.erase(bucketIt); + } + } + } + + // If we are cleaning up while archiving a bucket, then we want to preserve its state. Otherwise + // we can remove the state from the catalog altogether. + switch (mode) { + case RemovalMode::kClose: { + auto state = getBucketState(catalog.bucketStateRegistry, bucket.bucketId); + invariant(state.has_value()); + invariant(state.value().isSet(BucketStateFlag::kPendingCompression)); + break; + } + case RemovalMode::kAbort: + changeBucketState(catalog.bucketStateRegistry, + bucket.bucketId, + [](boost::optional<BucketState> input, + std::uint64_t) -> boost::optional<BucketState> { + invariant(input.has_value()); + if (input->conflictsWithReopening()) { + return input.value().setFlag(BucketStateFlag::kUntracked); + } + return boost::none; + }); + break; + case RemovalMode::kArchive: + // No state change + break; + } + + catalog.numberOfActiveBuckets.fetchAndSubtract(1); + stripe.openBucketsById.erase(allIt); +} + +void archiveBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + ClosedBuckets& closedBuckets) { + bool archived = false; + auto& archivedSet = stripe.archivedBuckets[bucket.key.hash]; + auto it = archivedSet.find(bucket.minTime); + if (it == archivedSet.end()) { + auto [it, inserted] = + archivedSet.emplace(bucket.minTime, ArchivedBucket{bucket.bucketId, bucket.timeField}); + + long long memory = + marginalMemoryUsageForArchivedBucket(it->second, archivedSet.size() == 1); + catalog.memoryUsage.fetchAndAdd(memory); + archived = true; + } + + if (archived) { + // If we have an archived bucket, we still want to account for it in numberOfActiveBuckets + // so we will increase it here since removeBucket decrements the count. + catalog.numberOfActiveBuckets.fetchAndAdd(1); + removeBucket(catalog, stripe, stripeLock, bucket, RemovalMode::kArchive); + } else { + // We had a meta hash collision, and already have a bucket archived with the same meta hash + // and timestamp as this bucket. Since it's somewhat arbitrary which bucket we keep, we'll + // keep the one that's already archived and just plain close this one. + closeOpenBucket(catalog, stripe, stripeLock, bucket, closedBuckets); + } +} + +boost::optional<OID> findArchivedCandidate(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info) { + auto setIt = stripe.archivedBuckets.find(info.key.hash); + if (setIt == stripe.archivedBuckets.end()) { + return boost::none; + } + + auto& archivedSet = setIt->second; + + // We want to find the largest time that is not greater than info.time. Generally lower_bound + // will return the smallest element not less than the search value, but we are using + // std::greater instead of std::less for the map's comparisons. This means the order of keys + // will be reversed, and lower_bound will return what we want. + auto it = archivedSet.lower_bound(info.time); + if (it == archivedSet.end()) { + return boost::none; + } + + const auto& [candidateTime, candidateBucket] = *it; + invariant(candidateTime <= info.time); + // We need to make sure our measurement can fit without violating max span. If not, we + // can't use this bucket. + if (info.time - candidateTime < Seconds(*info.options.getBucketMaxSpanSeconds())) { + auto state = getBucketState(catalog.bucketStateRegistry, candidateBucket.bucketId); + if (state && !state.value().conflictsWithReopening()) { + return candidateBucket.bucketId.oid; + } else { + if (state) { + changeBucketState(catalog.bucketStateRegistry, + candidateBucket.bucketId, + [](boost::optional<BucketState> input, + std::uint64_t) -> boost::optional<BucketState> { + if (!input.has_value()) { + return boost::none; + } + invariant(input.value().conflictsWithReopening()); + return input.value().setFlag(BucketStateFlag::kUntracked); + }); + } + long long memory = + marginalMemoryUsageForArchivedBucket(candidateBucket, archivedSet.size() == 1); + if (archivedSet.size() == 1) { + stripe.archivedBuckets.erase(setIt); + } else { + archivedSet.erase(it); + } + catalog.memoryUsage.fetchAndSubtract(memory); + catalog.numberOfActiveBuckets.fetchAndSubtract(1); + } + } + + return boost::none; +} + +stdx::variant<std::monostate, OID, std::vector<BSONObj>> getReopeningCandidate( + OperationContext* opCtx, + BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info, + bool allowQueryBasedReopening) { + if (auto archived = findArchivedCandidate(catalog, stripe, stripeLock, info)) { + return archived.value(); + } + + if (!allowQueryBasedReopening) { + return {}; + } + + boost::optional<BSONElement> metaElement; + if (info.options.getMetaField().has_value()) { + metaElement = info.key.metadata.element(); + } + + auto controlMinTimePath = kControlMinFieldNamePrefix.toString() + info.options.getTimeField(); + auto maxDataTimeFieldPath = kDataFieldNamePrefix.toString() + info.options.getTimeField() + + "." + std::to_string(gTimeseriesBucketMaxCount - 1); + + // Derive the maximum bucket size. + auto bucketMaxSize = getCacheDerivedBucketMaxSize( + opCtx->getServiceContext()->getStorageEngine(), catalog.numberOfActiveBuckets.load()); + int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, bucketMaxSize); + + return generateReopeningPipeline(opCtx, + info.time, + metaElement, + controlMinTimePath, + maxDataTimeFieldPath, + *info.options.getBucketMaxSpanSeconds(), + effectiveMaxSize); +} + +void abort(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + std::shared_ptr<WriteBatch> batch, + const Status& status) { + // Before we access the bucket, make sure it's still there. + Bucket* bucket = useBucket(catalog.bucketStateRegistry, + stripe, + stripeLock, + batch->bucketHandle.bucketId, + IgnoreBucketState::kYes); + if (!bucket) { + // Special case, bucket has already been cleared, and we need only abort this batch. + abortWriteBatch(*batch, status); + return; + } + + // Proceed to abort any unprepared batches and remove the bucket if possible + abort(catalog, stripe, stripeLock, *bucket, batch, status); +} + +void abort(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + std::shared_ptr<WriteBatch> batch, + const Status& status) { + // Abort any unprepared batches. This should be safe since we have a lock on the stripe, + // preventing anyone else from using these. + for (const auto& [_, current] : bucket.batches) { + abortWriteBatch(*current, status); + } + bucket.batches.clear(); + + bool doRemove = true; // We shouldn't remove the bucket if there's a prepared batch outstanding + // and it's not the one we manage. In that case, we don't know what the + // user is doing with it, but we need to keep the bucket around until + // that batch is finished. + if (auto& prepared = bucket.preparedBatch) { + if (batch && prepared == batch) { + // We own the prepared batch, so we can go ahead and abort it and remove the bucket. + abortWriteBatch(*prepared, status); + prepared.reset(); + } else { + doRemove = false; + } + } + + if (doRemove) { + removeBucket(catalog, stripe, stripeLock, bucket, RemovalMode::kAbort); + } else { + changeBucketState( + catalog.bucketStateRegistry, + bucket.bucketId, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + invariant(input.has_value()); + return input.value().setFlag(BucketStateFlag::kCleared); + }); + } +} + +void markBucketIdle(Stripe& stripe, WithLock stripeLock, Bucket& bucket) { + invariant(!bucket.idleListEntry.has_value()); + invariant(allCommitted(bucket)); + stripe.idleBuckets.push_front(&bucket); + bucket.idleListEntry = stripe.idleBuckets.begin(); +} + +void markBucketNotIdle(Stripe& stripe, WithLock stripeLock, Bucket& bucket) { + if (bucket.idleListEntry.has_value()) { + stripe.idleBuckets.erase(bucket.idleListEntry.value()); + bucket.idleListEntry = boost::none; + } +} + +void expireIdleBuckets(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + ExecutionStatsController& stats, + ClosedBuckets& closedBuckets) { + // As long as we still need space and have entries and remaining attempts, close idle buckets. + int32_t numExpired = 0; + + const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + + while (!stripe.idleBuckets.empty() && + catalog.memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && + numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { + Bucket* bucket = stripe.idleBuckets.back(); + + auto state = getBucketState(catalog.bucketStateRegistry, bucket); + if (canArchive && state && !state.value().conflictsWithInsertion()) { + // Can archive a bucket if it's still eligible for insertions. + archiveBucket(catalog, stripe, stripeLock, *bucket, closedBuckets); + stats.incNumBucketsArchivedDueToMemoryThreshold(); + } else if (state && state.value().isSet(BucketStateFlag::kCleared)) { + // Bucket was cleared and just needs to be removed from catalog. + removeBucket(catalog, stripe, stripeLock, *bucket, RemovalMode::kAbort); + } else { + closeOpenBucket(catalog, stripe, stripeLock, *bucket, closedBuckets); + stats.incNumBucketsClosedDueToMemoryThreshold(); + } + + ++numExpired; + } + + while (canArchive && !stripe.archivedBuckets.empty() && + catalog.memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && + numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { + + auto& [hash, archivedSet] = *stripe.archivedBuckets.begin(); + invariant(!archivedSet.empty()); + + auto& [timestamp, bucket] = *archivedSet.begin(); + closeArchivedBucket(catalog.bucketStateRegistry, bucket, closedBuckets); + + long long memory = marginalMemoryUsageForArchivedBucket(bucket, archivedSet.size() == 1); + if (archivedSet.size() == 1) { + // If this is the only entry, erase the whole map so we don't leave it empty. + stripe.archivedBuckets.erase(stripe.archivedBuckets.begin()); + } else { + // Otherwise just erase this bucket from the map. + archivedSet.erase(archivedSet.begin()); + } + catalog.memoryUsage.fetchAndSubtract(memory); + catalog.numberOfActiveBuckets.fetchAndSubtract(1); + + stats.incNumBucketsClosedDueToMemoryThreshold(); + ++numExpired; + } +} + +std::pair<OID, Date_t> generateBucketOID(const Date_t& time, const TimeseriesOptions& options) { + OID oid = OID::gen(); + + // We round the measurement timestamp down to the nearest minute, hour, or day depending on the + // granularity. We do this for two reasons. The first is so that if measurements come in + // slightly out of order, we don't have to close the current bucket due to going backwards in + // time. The second, and more important reason, is so that we reliably group measurements + // together into predictable chunks for sharding. This way we know from a measurement timestamp + // what the bucket timestamp will be, so we can route measurements to the right shard chunk. + auto roundedTime = roundTimestampToGranularity(time, options); + int64_t const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch()); + oid.setTimestamp(roundedSeconds); + + // Now, if we stopped here we could end up with bucket OID collisions. Consider the case where + // we have the granularity set to 'Hours'. This means we will round down to the nearest day, so + // any bucket generated on the same machine on the same day will have the same timestamp portion + // and unique instance portion of the OID. Only the increment will differ. Since we only use 3 + // bytes for the increment portion, we run a serious risk of overflow if we are generating lots + // of buckets. + // + // To address this, we'll take the difference between the actual timestamp and the rounded + // timestamp and add it to the instance portion of the OID to ensure we can't have a collision. + // for timestamps generated on the same machine. + // + // This leaves open the possibility that in the case of step-down/step-up, we could get a + // collision if the old primary and the new primary have unique instance bits that differ by + // less than the maximum rounding difference. This is quite unlikely though, and can be resolved + // by restarting the new primary. It remains an open question whether we can fix this in a + // better way. + // TODO (SERVER-61412): Avoid time-series bucket OID collisions after election + auto instance = oid.getInstanceUnique(); + uint32_t sum = DataView(reinterpret_cast<char*>(instance.bytes)).read<uint32_t>(1) + + (durationCount<Seconds>(time.toDurationSinceEpoch()) - roundedSeconds); + DataView(reinterpret_cast<char*>(instance.bytes)).write<uint32_t>(sum, 1); + oid.setInstanceUnique(instance); + + return {oid, roundedTime}; +} + +Bucket& allocateBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info) { + expireIdleBuckets(catalog, stripe, stripeLock, info.stats, *info.closedBuckets); + + auto [oid, roundedTime] = generateBucketOID(info.time, info.options); + auto bucketId = BucketId{info.key.ns, oid}; + + auto [it, inserted] = + stripe.openBucketsById.try_emplace(bucketId, + std::make_unique<Bucket>(bucketId, + info.key, + info.options.getTimeField(), + roundedTime, + catalog.bucketStateRegistry)); + tassert(6130900, "Expected bucket to be inserted", inserted); + Bucket* bucket = it->second.get(); + stripe.openBucketsByKey[info.key].emplace(bucket); + + auto state = changeBucketState( + catalog.bucketStateRegistry, + bucketId, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + invariant(!input.has_value()); + return BucketState{}; + }); + invariant(state == BucketState{}); + catalog.numberOfActiveBuckets.fetchAndAdd(1); + + if (info.openedDuetoMetadata) { + info.stats.incNumBucketsOpenedDueToMetadata(); + } + + // Make sure we set the control.min time field to match the rounded _id timestamp. + auto controlDoc = buildControlMinTimestampDoc(info.options.getTimeField(), roundedTime); + bucket->minmax.update( + controlDoc, /*metaField=*/boost::none, bucket->key.metadata.getComparator()); + return *bucket; +} + +Bucket& rollover(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + const CreationInfo& info, + RolloverAction action) { + invariant(action != RolloverAction::kNone); + if (allCommitted(bucket)) { + // The bucket does not contain any measurements that are yet to be committed, so we can take + // action now. + if (action == RolloverAction::kArchive) { + archiveBucket(catalog, stripe, stripeLock, bucket, *info.closedBuckets); + } else { + closeOpenBucket(catalog, stripe, stripeLock, bucket, *info.closedBuckets); + } + } else { + // We must keep the bucket around until all measurements are committed committed, just mark + // the action we chose now so it we know what to do when the last batch finishes. + bucket.rolloverAction = action; + } + + return allocateBucket(catalog, stripe, stripeLock, info); +} + +std::pair<RolloverAction, RolloverReason> determineRolloverAction( + OperationContext* opCtx, + const BSONObj& doc, + CreationInfo& info, + Bucket& bucket, + uint32_t numberOfActiveBuckets, + Bucket::NewFieldNames& newFieldNamesToBeInserted, + int32_t& sizeToBeAdded, + AllowBucketCreation mode) { + // If the mode is enabled to create new buckets, then we should update stats for soft closures + // accordingly. If we specify the mode to not allow bucket creation, it means we are not sure if + // we want to soft close the bucket yet and should wait to update closure stats. + const bool shouldUpdateStats = (mode == AllowBucketCreation::kYes); + + auto bucketTime = bucket.minTime; + if (info.time - bucketTime >= Seconds(*info.options.getBucketMaxSpanSeconds())) { + if (shouldUpdateStats) { + info.stats.incNumBucketsClosedDueToTimeForward(); + } + return {RolloverAction::kSoftClose, RolloverReason::kTimeForward}; + } + if (info.time < bucketTime) { + const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + if (shouldUpdateStats) { + if (canArchive) { + info.stats.incNumBucketsArchivedDueToTimeBackward(); + } else { + info.stats.incNumBucketsClosedDueToTimeBackward(); + } + } + return {canArchive ? RolloverAction::kArchive : RolloverAction::kSoftClose, + RolloverReason::kTimeBackward}; + } + if (bucket.numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { + info.stats.incNumBucketsClosedDueToCount(); + return {RolloverAction::kHardClose, RolloverReason::kCount}; + } + + // In scenarios where we have a high cardinality workload and face increased cache pressure we + // will decrease the size of buckets before we close them. + int32_t cacheDerivedBucketMaxSize = getCacheDerivedBucketMaxSize( + opCtx->getServiceContext()->getStorageEngine(), numberOfActiveBuckets); + int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, cacheDerivedBucketMaxSize); + + // Before we hit our bucket minimum count, we will allow for large measurements to be inserted + // into buckets. Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max + // bucket size to 12MB. This is to leave some space in the bucket if we need to add new internal + // fields to existing, full buckets. + static constexpr int32_t largeMeasurementsMaxBucketSize = + BSONObjMaxUserSize - (4 * 1024 * 1024); + // We restrict the ceiling of the bucket max size under cache pressure. + int32_t absoluteMaxSize = std::min(largeMeasurementsMaxBucketSize, cacheDerivedBucketMaxSize); + + calculateBucketFieldsAndSizeChange( + bucket, doc, info.options.getMetaField(), newFieldNamesToBeInserted, sizeToBeAdded); + if (bucket.size + sizeToBeAdded > effectiveMaxSize) { + bool keepBucketOpenForLargeMeasurements = + bucket.numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) && + feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + if (keepBucketOpenForLargeMeasurements) { + if (bucket.size + sizeToBeAdded > absoluteMaxSize) { + if (absoluteMaxSize != largeMeasurementsMaxBucketSize) { + info.stats.incNumBucketsClosedDueToCachePressure(); + return {RolloverAction::kHardClose, RolloverReason::kCachePressure}; + } + info.stats.incNumBucketsClosedDueToSize(); + return {RolloverAction::kHardClose, RolloverReason::kSize}; + } + + // There's enough space to add this measurement and we're still below the large + // measurement threshold. + if (!bucket.keptOpenDueToLargeMeasurements) { + // Only increment this metric once per bucket. + bucket.keptOpenDueToLargeMeasurements = true; + info.stats.incNumBucketsKeptOpenDueToLargeMeasurements(); + } + return {RolloverAction::kNone, RolloverReason::kNone}; + } else { + if (effectiveMaxSize == gTimeseriesBucketMaxSize) { + info.stats.incNumBucketsClosedDueToSize(); + return {RolloverAction::kHardClose, RolloverReason::kSize}; + } + info.stats.incNumBucketsClosedDueToCachePressure(); + return {RolloverAction::kHardClose, RolloverReason::kCachePressure}; + } + } + + if (schemaIncompatible( + bucket, doc, info.options.getMetaField(), info.key.metadata.getComparator())) { + info.stats.incNumBucketsClosedDueToSchemaChange(); + return {RolloverAction::kHardClose, RolloverReason::kSchemaChange}; + } + + return {RolloverAction::kNone, RolloverReason::kNone}; +} + +ExecutionStatsController getOrInitializeExecutionStats(BucketCatalog& catalog, + const NamespaceString& ns) { + stdx::lock_guard catalogLock{catalog.mutex}; + auto it = catalog.executionStats.find(ns); + if (it != catalog.executionStats.end()) { + return {it->second, catalog.globalExecutionStats}; + } + + auto res = catalog.executionStats.emplace(ns, std::make_shared<ExecutionStats>()); + return {res.first->second, catalog.globalExecutionStats}; +} + +std::shared_ptr<ExecutionStats> getExecutionStats(const BucketCatalog& catalog, + const NamespaceString& ns) { + static const auto kEmptyStats{std::make_shared<ExecutionStats>()}; + + stdx::lock_guard catalogLock{catalog.mutex}; + + auto it = catalog.executionStats.find(ns); + if (it != catalog.executionStats.end()) { + return it->second; + } + return kEmptyStats; +} + +Status getTimeseriesBucketClearedError(const NamespaceString& ns, const OID& oid) { + return {ErrorCodes::TimeseriesBucketCleared, + str::stream() << "Time-series bucket " << oid << " for namespace " << ns + << " was cleared"}; +} + +void closeOpenBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + ClosedBuckets& closedBuckets) { + bool error = false; + try { + closedBuckets.emplace_back(&catalog.bucketStateRegistry, + bucket.bucketId, + bucket.timeField, + bucket.numMeasurements); + } catch (...) { + error = true; + } + removeBucket( + catalog, stripe, stripeLock, bucket, error ? RemovalMode::kAbort : RemovalMode::kClose); +} + +void closeOpenBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + boost::optional<ClosedBucket>& closedBucket) { + bool error = false; + try { + closedBucket = boost::in_place(&catalog.bucketStateRegistry, + bucket.bucketId, + bucket.timeField, + bucket.numMeasurements); + } catch (...) { + closedBucket = boost::none; + error = true; + } + removeBucket( + catalog, stripe, stripeLock, bucket, error ? RemovalMode::kAbort : RemovalMode::kClose); +} + +void closeArchivedBucket(BucketStateRegistry& registry, + ArchivedBucket& bucket, + ClosedBuckets& closedBuckets) { + try { + closedBuckets.emplace_back(®istry, bucket.bucketId, bucket.timeField, boost::none); + } catch (...) { + } +} + +} // namespace mongo::timeseries::bucket_catalog::internal diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h new file mode 100644 index 00000000000..4b5c62f72f0 --- /dev/null +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h @@ -0,0 +1,365 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog.h" + +namespace mongo::timeseries::bucket_catalog::internal { + +/** + * Bundle of information that 'insert' needs to pass down to helper methods that may create a + * new bucket. + */ +struct CreationInfo { + const BucketKey& key; + StripeNumber stripe; + const Date_t& time; + const TimeseriesOptions& options; + ExecutionStatsController& stats; + ClosedBuckets* closedBuckets; + bool openedDuetoMetadata = true; +}; + +/** + * Mode enum to control whether bucket retrieval methods will create new buckets if no suitable + * bucket exists. + */ +enum class AllowBucketCreation { kYes, kNo }; + +/** + * Mode to signal to 'removeBucket' what's happening to the bucket, and how to handle the bucket + * state change. + */ +enum class RemovalMode { + kClose, // Normal closure, pending compression + kArchive, // Archive bucket, no state change + kAbort, // Bucket is being cleared, possibly due to error, erase state +}; + +/** + * Mode enum to control whether the bucket retrieval methods will return buckets that have a state + * that conflicts with insertion. + */ +enum class IgnoreBucketState { kYes, kNo }; + +/** + * Maps bucket key to the stripe that is responsible for it. + */ +StripeNumber getStripeNumber(const BucketKey& key); + +/** + * Extracts the information from the input 'doc' that is used to map the document to a bucket. + */ +StatusWith<std::pair<BucketKey, Date_t>> extractBucketingParameters( + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc); + +/** + * Retrieve a bucket for read-only use. + */ +const Bucket* findBucket(BucketStateRegistry& registry, + const Stripe& stripe, + WithLock stripeLock, + const BucketId& bucketId, + IgnoreBucketState mode = IgnoreBucketState::kNo); + +/** + * Retrieve a bucket for write use. + */ +Bucket* useBucket(BucketStateRegistry& registry, + Stripe& stripe, + WithLock stripeLock, + const BucketId& bucketId, + IgnoreBucketState mode); + +/** + * Retrieve a bucket for write use, updating the state in the process. + */ +Bucket* useBucketAndChangeState(BucketStateRegistry& registry, + Stripe& stripe, + WithLock stripeLock, + const BucketId& bucketId, + const BucketStateRegistry::StateChangeFn& change); + +/** + * Retrieve the open bucket for write use if one exists. If none exists and 'mode' is set to kYes, + * then we will create a new bucket. + */ +Bucket* useBucket(Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info, + AllowBucketCreation mode); + +/** + * Retrieve a previously closed bucket for write use if one exists in the catalog. Considers buckets + * that are pending closure or archival but which are still eligible to recieve new measurements. + */ +Bucket* useAlternateBucket(Stripe& stripe, WithLock stripeLock, const CreationInfo& info); + +/** + * Given a bucket to reopen, performs validation and constructs the in-memory representation of the + * bucket. If specified, 'expectedKey' is matched against the key extracted from the document to + * validate that the bucket is expected (i.e. to help resolve hash collisions for archived buckets). + * Does *not* hand ownership of the bucket to the catalog. + */ +StatusWith<std::unique_ptr<Bucket>> rehydrateBucket( + OperationContext* opCtx, + BucketStateRegistry& registry, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BucketToReopen& bucketToReopen, + const BucketKey* expectedKey); + +/** + * Given a rehydrated 'bucket', passes ownership of that bucket to the catalog, marking the bucket + * as open. + */ +StatusWith<std::reference_wrapper<Bucket>> reopenBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + ExecutionStatsController& stats, + const BucketKey& key, + std::unique_ptr<Bucket>&& bucket, + std::uint64_t targetEra, + ClosedBuckets& closedBuckets); + +/** + * Check to see if 'insert' can use existing bucket rather than reopening a candidate bucket. If + * true, chances are the caller raced with another thread to reopen the same bucket, but if false, + * there might be another bucket that had been cleared, or that has the same _id in a different + * namespace. + */ +StatusWith<std::reference_wrapper<Bucket>> reuseExistingBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + ExecutionStatsController& stats, + const BucketKey& key, + Bucket& existingBucket, + std::uint64_t targetEra); + +/** + * Given an already-selected 'bucket', inserts 'doc' to the bucket if possible. If not, and 'mode' + * is set to 'kYes', we will create a new bucket and insert into that bucket. + */ +stdx::variant<std::shared_ptr<WriteBatch>, RolloverReason> insertIntoBucket( + OperationContext* opCtx, + BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + StripeNumber stripeNumber, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + CreationInfo& info, + Bucket& existingBucket); + +/** + * Helper method to perform the heavy lifting for both 'tryInsert' and 'insert'. See documentation + * on callers for more details. + */ +StatusWith<InsertResult> insert(OperationContext* opCtx, + BucketCatalog& catalog, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + BucketFindResult bucketFindResult = {}); + +/** + * Wait for other batches to finish so we can prepare 'batch' + */ +void waitToCommitBatch(BucketStateRegistry& registry, + Stripe& stripe, + const std::shared_ptr<WriteBatch>& batch); + +/** + * Removes the given bucket from the bucket catalog's internal data structures. + */ +void removeBucket( + BucketCatalog& catalog, Stripe& stripe, WithLock stripeLock, Bucket& bucket, RemovalMode mode); + +/** + * Archives the given bucket, minimizing the memory footprint but retaining the necessary + * information required to efficiently identify it as a candidate for future insertions. + */ +void archiveBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + ClosedBuckets& closedBuckets); + +/** + * Identifies a previously archived bucket that may be able to accomodate the measurement + * represented by 'info', if one exists. + */ +boost::optional<OID> findArchivedCandidate(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info); + +/** + * Identifies a previously archived bucket that may be able to accomodate the measurement + * represented by 'info', if one exists. + */ +stdx::variant<std::monostate, OID, std::vector<BSONObj>> getReopeningCandidate( + OperationContext* opCtx, + BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info, + bool allowQueryBasedReopening); + +/** + * Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other + * unprepared batches and remove the bucket from the catalog if there is no unprepared batch. + */ +void abort(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + std::shared_ptr<WriteBatch> batch, + const Status& status); + +/** + * Aborts any unprepared batches for the given bucket, then removes the bucket if there is no + * prepared batch. If 'batch' is non-null, it is assumed that the caller has commit rights for that + * batch. + */ +void abort(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + std::shared_ptr<WriteBatch> batch, + const Status& status); + +/** + * Adds the bucket to a list of idle buckets to be expired at a later date. + */ +void markBucketIdle(Stripe& stripe, WithLock stripeLock, Bucket& bucket); + +/** + * Remove the bucket from the list of idle buckets. The second parameter encodes whether the caller + * holds a lock on _idleMutex. + */ +void markBucketNotIdle(Stripe& stripe, WithLock stripeLock, Bucket& bucket); + +/** + * Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold. + */ +void expireIdleBuckets(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + ExecutionStatsController& stats, + ClosedBuckets& closedBuckets); + +/** + * Generates an OID for the bucket _id field, setting the timestamp portion to a value determined by + * rounding 'time' based on 'options'. + */ +std::pair<OID, Date_t> generateBucketOID(const Date_t& time, const TimeseriesOptions& options); + +/** + * Allocates a new bucket and adds it to the catalog. + */ +Bucket& allocateBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + const CreationInfo& info); + +/** + * Close the existing, full bucket and open a new one for the same metadata. + * + * Writes information about the closed bucket to the 'info' parameter. + */ +Bucket& rollover(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + const CreationInfo& info, + RolloverAction action); + +/** + * Determines if 'bucket' needs to be rolled over to accomodate 'doc'. If so, determines whether + * to archive or close 'bucket'. + */ +std::pair<RolloverAction, RolloverReason> determineRolloverAction( + OperationContext* opCtx, + const BSONObj& doc, + CreationInfo& info, + Bucket& bucket, + uint32_t numberOfActiveBuckets, + Bucket::NewFieldNames& newFieldNamesToBeInserted, + int32_t& sizeToBeAdded, + AllowBucketCreation mode); + +/** + * Retrieves or initializes the execution stats for the given namespace, for writing. + */ +ExecutionStatsController getOrInitializeExecutionStats(BucketCatalog& catalog, + const NamespaceString& ns); + +/** + * Retrieves the execution stats for the given namespace, if they have already been initialized. + */ +std::shared_ptr<ExecutionStats> getExecutionStats(const BucketCatalog& catalog, + const NamespaceString& ns); + +/** + * Generates a status with code TimeseriesBucketCleared and an appropriate error message. + */ +Status getTimeseriesBucketClearedError(const NamespaceString& ns, const OID& oid); + +/** + * Close an open bucket, setting the state appropriately and removing it from the catalog. + */ +void closeOpenBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + ClosedBuckets& closedBuckets); +/** + * Close an open bucket, setting the state appropriately and removing it from the catalog. + */ +void closeOpenBucket(BucketCatalog& catalog, + Stripe& stripe, + WithLock stripeLock, + Bucket& bucket, + boost::optional<ClosedBucket>& closedBucket); +/** + * Close an archived bucket, setting the state appropriately and removing it from the catalog. + */ +void closeArchivedBucket(BucketStateRegistry& registry, + ArchivedBucket& bucket, + ClosedBuckets& closedBuckets); +} // namespace mongo::timeseries::bucket_catalog::internal diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_server_status.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_server_status.cpp new file mode 100644 index 00000000000..e747230a970 --- /dev/null +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_server_status.cpp @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/commands/server_status.h" +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog.h" + +namespace mongo::timeseries::bucket_catalog { +namespace { + +class BucketCatalogServerStatus : public ServerStatusSection { + struct BucketCounts { + BucketCounts& operator+=(const BucketCounts& other) { + if (&other != this) { + all += other.all; + open += other.open; + idle += other.idle; + } + return *this; + } + + std::size_t all = 0; + std::size_t open = 0; + std::size_t idle = 0; + }; + + BucketCounts _getBucketCounts(const BucketCatalog& catalog) const { + BucketCounts sum; + for (auto const& stripe : catalog.stripes) { + stdx::lock_guard stripeLock{stripe.mutex}; + sum += {stripe.openBucketsById.size(), + stripe.openBucketsByKey.size(), + stripe.idleBuckets.size()}; + } + return sum; + } + +public: + BucketCatalogServerStatus() : ServerStatusSection("bucketCatalog") {} + + bool includeByDefault() const override { + return true; + } + + BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override { + const auto& bucketCatalog = BucketCatalog::get(opCtx); + { + stdx::lock_guard catalogLock{bucketCatalog.mutex}; + if (bucketCatalog.executionStats.empty()) { + return {}; + } + } + + auto counts = _getBucketCounts(bucketCatalog); + auto numActive = bucketCatalog.numberOfActiveBuckets.load(); + BSONObjBuilder builder; + builder.appendNumber("numBuckets", static_cast<long long>(numActive)); + builder.appendNumber("numOpenBuckets", static_cast<long long>(counts.open)); + builder.appendNumber("numIdleBuckets", static_cast<long long>(counts.idle)); + builder.appendNumber("numArchivedBuckets", static_cast<long long>(numActive - counts.open)); + builder.appendNumber("memoryUsage", + static_cast<long long>(bucketCatalog.memoryUsage.load())); + + // Append the global execution stats for all namespaces. + appendExecutionStatsToBuilder(bucketCatalog.globalExecutionStats, builder); + + // Append the global state management stats for all namespaces. + appendStats(bucketCatalog.bucketStateRegistry, builder); + + return builder.obj(); + } +} bucketCatalogServerStatus; + +} // namespace +} // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp index 74e0a535d32..f40d3c02aa0 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp @@ -32,7 +32,9 @@ #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/timeseries/bucket_catalog/bucket_catalog.h" +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h" #include "mongo/db/timeseries/bucket_compression.h" +#include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/idl/server_parameter_test_util.h" #include "mongo/stdx/future.h" #include "mongo/unittest/bson_test_util.h" @@ -86,6 +88,8 @@ protected: void _testMeasurementSchema( const std::initializer_list<std::initializer_list<BSONObj>>& groups); + Status _reopenBucket(const CollectionPtr& coll, const BSONObj& bucketDoc); + OperationContext* _opCtx; BucketCatalog* _bucketCatalog; @@ -165,36 +169,37 @@ void BucketCatalogTest::_commit(const std::shared_ptr<WriteBatch>& batch, uint16_t numPreviouslyCommittedMeasurements, size_t expectedBatchSize) { ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), expectedBatchSize); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, numPreviouslyCommittedMeasurements); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); } void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns, uint16_t numPreviouslyCommittedMeasurements) { - auto result = _bucketCatalog->insert(_opCtx, - ns, - _getCollator(ns), - _getTimeseriesOptions(ns), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result = insert(_opCtx, + *_bucketCatalog, + ns, + _getCollator(ns), + _getTimeseriesOptions(ns), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow); auto& batch = result.getValue().batch; _commit(batch, numPreviouslyCommittedMeasurements); } long long BucketCatalogTest::_getExecutionStat(const NamespaceString& ns, StringData stat) { BSONObjBuilder builder; - _bucketCatalog->appendExecutionStats(ns, &builder); + appendExecutionStats(*_bucketCatalog, ns, builder); return builder.obj().getIntField(stat); } void BucketCatalogTest::_testMeasurementSchema( const std::initializer_list<std::initializer_list<BSONObj>>& groups) { // Make sure we start and end with a clean slate. - _bucketCatalog->clear(_ns1); - ScopeGuard guard([this]() { _bucketCatalog->clear(_ns1); }); + clear(*_bucketCatalog, _ns1); + ScopeGuard guard([this]() { clear(*_bucketCatalog, _ns1); }); bool firstGroup = true; for (const auto& group : groups) { @@ -205,13 +210,13 @@ void BucketCatalogTest::_testMeasurementSchema( timestampedDoc.appendElements(doc); auto pre = _getExecutionStat(_ns1, kNumSchemaChanges); - ASSERT(_bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - timestampedDoc.obj(), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + ASSERT(insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + timestampedDoc.obj(), + CombineWithInsertsFromOtherClients::kAllow) .isOK()); auto post = _getExecutionStat(_ns1, kNumSchemaChanges); @@ -234,27 +239,79 @@ void BucketCatalogTest::_testMeasurementSchema( } } +Status BucketCatalogTest::_reopenBucket(const CollectionPtr& coll, const BSONObj& bucketDoc) { + const NamespaceString ns = coll->ns().getTimeseriesViewNamespace(); + const boost::optional<TimeseriesOptions> options = coll->getTimeseriesOptions(); + invariant(options, + str::stream() << "Attempting to reopen a bucket for a non-timeseries collection: " + << ns); + + BSONElement metadata; + auto metaFieldName = options->getMetaField(); + if (metaFieldName) { + metadata = bucketDoc.getField(kBucketMetaFieldName); + } + auto key = BucketKey{ns, BucketMetadata{metadata, coll->getDefaultCollator(), metaFieldName}}; + + // Validate the bucket document against the schema. + auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto { + return coll->checkValidation(opCtx, bucketDoc); + }; + + auto stats = internal::getOrInitializeExecutionStats(*_bucketCatalog, ns); + + auto res = internal::rehydrateBucket(_opCtx, + _bucketCatalog->bucketStateRegistry, + ns, + coll->getDefaultCollator(), + *options, + BucketToReopen{bucketDoc, validator}, + nullptr); + if (!res.isOK()) { + return res.getStatus(); + } + auto bucket = std::move(res.getValue()); + + auto stripeNumber = internal::getStripeNumber(key); + + // Register the reopened bucket with the catalog. + auto& stripe = _bucketCatalog->stripes[stripeNumber]; + stdx::lock_guard stripeLock{stripe.mutex}; + + ClosedBuckets closedBuckets; + return internal::reopenBucket(*_bucketCatalog, + stripe, + stripeLock, + stats, + key, + std::move(bucket), + getCurrentEra(_bucketCatalog->bucketStateRegistry), + closedBuckets) + .getStatus(); +} + + TEST_F(BucketCatalogTest, InsertIntoSameBucket) { // The first insert should be able to take commit rights - auto result1 = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow); auto batch1 = result1.getValue().batch; ASSERT(claimWriteBatchCommitRights(*batch1)); // A subsequent insert into the same bucket should land in the same batch, but not be able to // claim commit rights - auto result2 = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result2 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow); auto batch2 = result2.getValue().batch; ASSERT_EQ(batch1, batch2); ASSERT(!claimWriteBatchCommitRights(*batch2)); @@ -262,7 +319,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { // The batch hasn't actually been committed yet. ASSERT(!isWriteBatchFinished(*batch1)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); // Still not finished. ASSERT(!isWriteBatchFinished(*batch1)); @@ -273,50 +330,50 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); // Once the commit has occurred, the waiter should be notified. - _bucketCatalog->finish(batch1, {}); + finish(*_bucketCatalog, batch1, {}); ASSERT(isWriteBatchFinished(*batch2)); auto result3 = getWriteBatchResult(*batch2); ASSERT_OK(result3.getStatus()); } TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) { - auto batch = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT(claimWriteBatchCommitRights(*batch)); auto bucket = batch->bucketHandle; - _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); - ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket)); + abort(*_bucketCatalog, batch, {ErrorCodes::TimeseriesBucketCleared, ""}); + ASSERT_BSONOBJ_EQ(BSONObj(), getMetadata(*_bucketCatalog, bucket)); } TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { - auto result1 = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << _metaField << "123"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto result2 = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << _metaField << BSONObj()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto result3 = - _bucketCatalog->insert(_opCtx, - _ns2, - _getCollator(_ns2), - _getTimeseriesOptions(_ns2), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField << "123"), + CombineWithInsertsFromOtherClients::kAllow); + auto result2 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField << BSONObj()), + CombineWithInsertsFromOtherClients::kAllow); + auto result3 = insert(_opCtx, + *_bucketCatalog, + _ns2, + _getCollator(_ns2), + _getTimeseriesOptions(_ns2), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow); // Inserts should all be into three distinct buckets (and therefore batches). ASSERT_NE(result1.getValue().batch, result2.getValue().batch); @@ -325,10 +382,10 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); + getMetadata(*_bucketCatalog, result1.getValue().batch->bucketHandle)); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()), - _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle)); - ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucketHandle).isEmpty()); + getMetadata(*_bucketCatalog, result2.getValue().batch->bucketHandle)); + ASSERT(getMetadata(*_bucketCatalog, result3.getValue().batch->bucketHandle).isEmpty()); // Committing one bucket should only return the one document in that bucket and should not // affect the other bucket. @@ -339,49 +396,53 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { } TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) { - auto result1 = _bucketCatalog->insert( + auto result1 = insert( _opCtx, + *_bucketCatalog, _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto result2 = _bucketCatalog->insert( + CombineWithInsertsFromOtherClients::kAllow); + auto result2 = insert( _opCtx, + *_bucketCatalog, _ns1, _getCollator(_ns1), _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("b" << 1 << "a" << 0))), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow); ASSERT_EQ(result1.getValue().batch, result2.getValue().batch); // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); + getMetadata(*_bucketCatalog, result1.getValue().batch->bucketHandle)); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle)); + getMetadata(*_bucketCatalog, result2.getValue().batch->bucketHandle)); } TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { - auto result1 = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << _metaField - << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) - << BSON("f" << 1 << "g" << 0))))), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto result2 = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << _metaField - << BSONObj(BSON("c" << BSON_ARRAY(BSON("b" << 1 << "a" << 0) - << BSON("g" << 0 << "f" << 1))))), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result1 = + insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField + << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) + << BSON("f" << 1 << "g" << 0))))), + CombineWithInsertsFromOtherClients::kAllow); + auto result2 = + insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField + << BSONObj(BSON("c" << BSON_ARRAY(BSON("b" << 1 << "a" << 0) + << BSON("g" << 0 << "f" << 1))))), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_EQ(result1.getValue().batch, result2.getValue().batch); @@ -389,35 +450,37 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { ASSERT_BSONOBJ_EQ( BSON(_metaField << BSONObj(BSON( "c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); + getMetadata(*_bucketCatalog, result1.getValue().batch->bucketHandle)); ASSERT_BSONOBJ_EQ( BSON(_metaField << BSONObj(BSON( "c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle)); + getMetadata(*_bucketCatalog, result2.getValue().batch->bucketHandle)); } TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) { - auto result1 = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << _metaField - << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) - << BSON_ARRAY("123" - << "456"))))), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto result2 = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << _metaField - << BSONObj(BSON("c" << BSON_ARRAY(BSON("b" << 1 << "a" << 0) - << BSON_ARRAY("123" - << "456"))))), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result1 = + insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField + << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) + << BSON_ARRAY("123" + << "456"))))), + CombineWithInsertsFromOtherClients::kAllow); + auto result2 = + insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField + << BSONObj(BSON("c" << BSON_ARRAY(BSON("b" << 1 << "a" << 0) + << BSON_ARRAY("123" + << "456"))))), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_EQ(result1.getValue().batch, result2.getValue().batch); @@ -425,36 +488,36 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) { ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); + getMetadata(*_bucketCatalog, result1.getValue().batch->bucketHandle)); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle)); + getMetadata(*_bucketCatalog, result2.getValue().batch->bucketHandle)); } TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) { - auto result1 = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << _metaField << BSONNULL), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto result2 = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField << BSONNULL), + CombineWithInsertsFromOtherClients::kAllow); + auto result2 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow); // Inserts should all be into three distinct buckets (and therefore batches). ASSERT_NE(result1.getValue().batch, result2.getValue().batch); // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); - ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle).isEmpty()); + getMetadata(*_bucketCatalog, result1.getValue().batch->bucketHandle)); + ASSERT(getMetadata(*_bucketCatalog, result2.getValue().batch->bucketHandle).isEmpty()); // Committing one bucket should only return the one document in that bucket and should not // affect the other bucket. @@ -474,7 +537,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceBuckets) { _insertOneAndCommit(_ns1, 0); _insertOneAndCommit(_ns2, 0); - _bucketCatalog->clear(_ns1); + clear(*_bucketCatalog, _ns1); _insertOneAndCommit(_ns1, 0); _insertOneAndCommit(_ns2, 1); @@ -485,7 +548,7 @@ TEST_F(BucketCatalogTest, ClearDatabaseBuckets) { _insertOneAndCommit(_ns2, 0); _insertOneAndCommit(_ns3, 0); - _bucketCatalog->clear(_ns1.db()); + clear(*_bucketCatalog, _ns1.db()); _insertOneAndCommit(_ns1, 0); _insertOneAndCommit(_ns2, 0); @@ -493,33 +556,33 @@ TEST_F(BucketCatalogTest, ClearDatabaseBuckets) { } TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { - auto batch1 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT(claimWriteBatchCommitRights(*batch1)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); ASSERT_EQ(batch1->measurements.size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); // Insert before finish so there's a second batch live at the same time. - auto batch2 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch2 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT_NE(batch1, batch2); - _bucketCatalog->finish(batch1, {}); + finish(*_bucketCatalog, batch1, {}); ASSERT(isWriteBatchFinished(*batch1)); // Verify the second batch still commits one doc, and that the first batch only commited one. @@ -527,14 +590,15 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { } DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") { - auto result = _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow); auto& batch = result.getValue().batch; - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); // BucketCatalog::prepareCommit uses dassert, so it will only invariant in debug mode. Ensure we // die here in non-debug mode as well. @@ -542,29 +606,30 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") { } TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) { - auto batch = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucketHandle)); + ASSERT_BSONOBJ_EQ(BSONObj(), getMetadata(*_bucketCatalog, batch->bucketHandle)); _commit(batch, 0); } TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { // Creating a new bucket should return all fields from the initial measurement. - auto result = _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << "a" << 0), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << "a" << 0), + CombineWithInsertsFromOtherClients::kAllow); ASSERT(result.isOK()); auto batch = result.getValue().batch; auto oldId = batch->bucketHandle.bucketId; @@ -575,24 +640,26 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { // Inserting a new measurement with the same fields should return an empty set of new fields. - result = _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << "a" << 1), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << "a" << 1), + CombineWithInsertsFromOtherClients::kAllow); ASSERT(result.isOK()); batch = result.getValue().batch; _commit(batch, 1); ASSERT_EQ(0U, batch->newFieldNamesToBeInserted.size()) << batch->toBSON(); // Insert a new measurement with the a new field. - result = _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2), + CombineWithInsertsFromOtherClients::kAllow); ASSERT(result.isOK()); batch = result.getValue().batch; _commit(batch, 2); @@ -601,12 +668,13 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { // Fill up the bucket. for (auto i = 3; i < gTimeseriesBucketMaxCount; ++i) { - result = _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << "a" << i), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << "a" << i), + CombineWithInsertsFromOtherClients::kAllow); ASSERT(result.isOK()); batch = result.getValue().batch; _commit(batch, i); @@ -615,13 +683,13 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { // When a bucket overflows, committing to the new overflow bucket should return the fields of // the first measurement as new fields. - auto result2 = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result2 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount), + CombineWithInsertsFromOtherClients::kAllow); auto& batch2 = result2.getValue().batch; ASSERT_NE(oldId, batch2->bucketHandle.bucketId); _commit(batch2, 0); @@ -631,161 +699,164 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { } TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { - auto batch1 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT(claimWriteBatchCommitRights(*batch1)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); ASSERT_EQ(batch1->measurements.size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); // Insert before finish so there's a second batch live at the same time. - auto batch2 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch2 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT_NE(batch1, batch2); ASSERT(claimWriteBatchCommitRights(*batch2)); - _bucketCatalog->abort(batch2, {ErrorCodes::TimeseriesBucketCleared, ""}); + abort(*_bucketCatalog, batch2, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(isWriteBatchFinished(*batch2)); ASSERT_EQ(getWriteBatchResult(*batch2).getStatus(), ErrorCodes::TimeseriesBucketCleared); - _bucketCatalog->finish(batch1, {}); + finish(*_bucketCatalog, batch1, {}); ASSERT(isWriteBatchFinished(*batch1)); ASSERT_OK(getWriteBatchResult(*batch1).getStatus()); } TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { - auto batch = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT(claimWriteBatchCommitRights(*batch)); - _bucketCatalog->clear(_ns1); + clear(*_bucketCatalog, _ns1); - ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_NOT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT(isWriteBatchFinished(*batch)); ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared); - batch = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + batch = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0); - _bucketCatalog->clear(_ns1); + clear(*_bucketCatalog, _ns1); // Even though bucket has been cleared, finish should still report success. Basically, in this // case we know that the write succeeded, so it must have happened before the namespace drop // operation got the collection lock. So the write did actually happen, but is has since been // removed, and that's fine for our purposes. The finish just records the result to the batch // and updates some statistics. - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); ASSERT(isWriteBatchFinished(*batch)); ASSERT_OK(getWriteBatchResult(*batch).getStatus()); } TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) { - auto batch = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0); - ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch->bucketHandle.bucketId.oid), + ASSERT_THROWS(directWriteStart( + _bucketCatalog->bucketStateRegistry, _ns1, batch->bucketHandle.bucketId.oid), WriteConflictException); - _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); + abort(*_bucketCatalog, batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(isWriteBatchFinished(*batch)); ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared); } TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { - auto batch1 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT(claimWriteBatchCommitRights(*batch1)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); ASSERT_EQ(batch1->measurements.size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); // Insert before clear so there's a second batch live at the same time. - auto batch2 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch2 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT_NE(batch1, batch2); ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId); // Now clear the bucket. Since there's a prepared batch it should conflict. - ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucketHandle.bucketId.oid), + ASSERT_THROWS(directWriteStart( + _bucketCatalog->bucketStateRegistry, _ns1, batch1->bucketHandle.bucketId.oid), WriteConflictException); // Now try to prepare the second batch. Ensure it aborts the batch. ASSERT(claimWriteBatchCommitRights(*batch2)); - ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); + ASSERT_NOT_OK(prepareCommit(*_bucketCatalog, batch2)); ASSERT(isWriteBatchFinished(*batch2)); ASSERT_EQ(getWriteBatchResult(*batch2).getStatus(), ErrorCodes::TimeseriesBucketCleared); // Make sure we didn't clear the bucket state when we aborted the second batch. - ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucketHandle.bucketId.oid), + ASSERT_THROWS(directWriteStart( + _bucketCatalog->bucketStateRegistry, _ns1, batch1->bucketHandle.bucketId.oid), WriteConflictException); // Make sure a subsequent insert, which opens a new bucket, doesn't corrupt the old bucket // state and prevent us from finishing the first batch. - auto batch3 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch3 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT_NE(batch1, batch3); @@ -793,73 +864,73 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { ASSERT_NE(batch1->bucketHandle.bucketId, batch3->bucketHandle.bucketId); // Clean up this batch ASSERT(claimWriteBatchCommitRights(*batch3)); - _bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""}); + abort(*_bucketCatalog, batch3, {ErrorCodes::TimeseriesBucketCleared, ""}); // Make sure we can finish the cleanly prepared batch. - _bucketCatalog->finish(batch1, {}); + finish(*_bucketCatalog, batch1, {}); ASSERT(isWriteBatchFinished(*batch1)); ASSERT_OK(getWriteBatchResult(*batch1).getStatus()); } TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { - auto batch = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; ASSERT(claimWriteBatchCommitRights(*batch)); - _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); + abort(*_bucketCatalog, batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(isWriteBatchFinished(*batch)); ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared); - ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_NOT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT(isWriteBatchFinished(*batch)); ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared); } TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { - auto batch1 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - auto batch2 = _bucketCatalog - ->insert(_makeOperationContext().second.get(), - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch2 = insert(_makeOperationContext().second.get(), + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - auto batch3 = _bucketCatalog - ->insert(_makeOperationContext().second.get(), - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch3 = insert(_makeOperationContext().second.get(), + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - auto batch4 = _bucketCatalog - ->insert(_makeOperationContext().second.get(), - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) + auto batch4 = insert(_makeOperationContext().second.get(), + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; @@ -874,23 +945,23 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { } TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { - auto batch1 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - auto batch2 = _bucketCatalog - ->insert(_makeOperationContext().second.get(), - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch2 = insert(_makeOperationContext().second.get(), + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; @@ -898,50 +969,50 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { ASSERT(claimWriteBatchCommitRights(*batch2)); // Batch 2 will not be able to commit until batch 1 has finished. - ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); { auto task = RunBackgroundTaskAndWaitForFailpoint{ "hangWaitingForConflictingPreparedBatch", [&]() { - ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch2)); }}; // Finish the first batch. - _bucketCatalog->finish(batch1, {}); + finish(*_bucketCatalog, batch1, {}); ASSERT(isWriteBatchFinished(*batch1)); } - _bucketCatalog->finish(batch2, {}); + finish(*_bucketCatalog, batch2, {}); ASSERT(isWriteBatchFinished(*batch2)); } TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { - auto batch1 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - auto batch2 = _bucketCatalog - ->insert(_makeOperationContext().second.get(), - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch2 = insert(_makeOperationContext().second.get(), + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - auto batch3 = _bucketCatalog - ->insert(_makeOperationContext().second.get(), - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch3 = insert(_makeOperationContext().second.get(), + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId); @@ -952,20 +1023,20 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { ASSERT(claimWriteBatchCommitRights(*batch3)); // Batch 2 will not be able to commit until batch 1 has finished. - ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); { auto task = RunBackgroundTaskAndWaitForFailpoint{ "hangWaitingForConflictingPreparedBatch", [&]() { - ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); + ASSERT_NOT_OK(prepareCommit(*_bucketCatalog, batch2)); }}; // If we abort the third batch, it should abort the second one too, as it isn't prepared. // However, since the first batch is prepared, we can't abort it or clean up the bucket. We // can then finish the first batch, which will allow the second batch to proceed. It should // recognize it has been aborted and clean up the bucket. - _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); - _bucketCatalog->finish(batch1, {}); + abort(*_bucketCatalog, batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + finish(*_bucketCatalog, batch1, {}); ASSERT(isWriteBatchFinished(*batch1)); } // Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should @@ -974,36 +1045,36 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { ASSERT(isWriteBatchFinished(*batch2)); // Make sure a new batch ends up in a new bucket. - auto batch4 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch4 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; ASSERT_NE(batch2->bucketHandle.bucketId, batch4->bucketHandle.bucketId); } TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { - auto batch1 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - auto batch2 = _bucketCatalog - ->insert(_makeOperationContext().second.get(), - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch2 = insert(_makeOperationContext().second.get(), + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; @@ -1011,63 +1082,63 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId); ASSERT(claimWriteBatchCommitRights(*batch1)); ASSERT(claimWriteBatchCommitRights(*batch2)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); // Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be // closed after batch 1 finishes. - _bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); - _bucketCatalog->finish(batch1, {}); + abort(*_bucketCatalog, batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + finish(*_bucketCatalog, batch1, {}); ASSERT(isWriteBatchFinished(*batch1)); ASSERT(isWriteBatchFinished(*batch2)); // Ensure a batch started after batch 2 aborts, does not insert future measurements into the // aborted batch/bucket. - auto batch3 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch3 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; ASSERT_NE(batch1->bucketHandle.bucketId, batch3->bucketHandle.bucketId); } TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { - auto batch1 = _bucketCatalog - ->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch1 = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - auto batch2 = _bucketCatalog - ->insert(_makeOperationContext().second.get(), - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + auto batch2 = insert(_makeOperationContext().second.get(), + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; // Batch 2 is the first batch to commit the time field. ASSERT(claimWriteBatchCommitRights(*batch2)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch2)); ASSERT_EQ(batch2->newFieldNamesToBeInserted.size(), 1); ASSERT_EQ(batch2->newFieldNamesToBeInserted.begin()->first, _timeField); - _bucketCatalog->finish(batch2, {}); + finish(*_bucketCatalog, batch2, {}); // Batch 1 was the first batch to insert the time field, but by commit time it was already // committed by batch 2. ASSERT(claimWriteBatchCommitRights(*batch1)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch1)); ASSERT(batch1->newFieldNamesToBeInserted.empty()); - _bucketCatalog->finish(batch1, {}); + finish(*_bucketCatalog, batch1, {}); } TEST_F(BucketCatalogTest, SchemaChanges) { @@ -1124,23 +1195,21 @@ TEST_F(BucketCatalogTest, ReopenMalformedBucket) { { // Missing _id field. BSONObj missingIdObj = bucketDoc.removeField("_id"); - ASSERT_NOT_OK(_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), missingIdObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), missingIdObj)); // Bad _id type. BSONObj badIdObj = bucketDoc.addFields(BSON("_id" << 123)); - ASSERT_NOT_OK(_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), badIdObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), badIdObj)); } { // Missing control field. BSONObj missingControlObj = bucketDoc.removeField("control"); - ASSERT_NOT_OK( - _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), missingControlObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), missingControlObj)); // Bad control type. BSONObj badControlObj = bucketDoc.addFields(BSON("control" << BSONArray())); - ASSERT_NOT_OK( - _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), badControlObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), badControlObj)); // Bad control.version type. BSONObj badVersionObj = bucketDoc.addFields(BSON( @@ -1150,15 +1219,14 @@ TEST_F(BucketCatalogTest, ReopenMalformedBucket) { << "max" << BSON("time" << BSON("$date" << "2022-06-06T15:34:30.000Z"))))); - ASSERT_NOT_OK( - _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), badVersionObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), badVersionObj)); // Bad control.min type. BSONObj badMinObj = bucketDoc.addFields(BSON( "control" << BSON("version" << 1 << "min" << 123 << "max" << BSON("time" << BSON("$date" << "2022-06-06T15:34:30.000Z"))))); - ASSERT_NOT_OK(_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), badMinObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), badMinObj)); // Bad control.max type. BSONObj badMaxObj = bucketDoc.addFields( @@ -1166,15 +1234,14 @@ TEST_F(BucketCatalogTest, ReopenMalformedBucket) { << BSON("time" << BSON("$date" << "2022-06-06T15:34:00.000Z")) << "max" << 123))); - ASSERT_NOT_OK(_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), badMaxObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), badMaxObj)); // Missing control.min.time. BSONObj missingMinTimeObj = bucketDoc.addFields(BSON( "control" << BSON("version" << 1 << "min" << BSON("abc" << 1) << "max" << BSON("time" << BSON("$date" << "2022-06-06T15:34:30.000Z"))))); - ASSERT_NOT_OK( - _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), missingMinTimeObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), missingMinTimeObj)); // Missing control.max.time. BSONObj missingMaxTimeObj = bucketDoc.addFields( @@ -1182,20 +1249,18 @@ TEST_F(BucketCatalogTest, ReopenMalformedBucket) { << BSON("time" << BSON("$date" << "2022-06-06T15:34:00.000Z")) << "max" << BSON("abc" << 1)))); - ASSERT_NOT_OK( - _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), missingMaxTimeObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), missingMaxTimeObj)); } { // Missing data field. BSONObj missingDataObj = bucketDoc.removeField("data"); - ASSERT_NOT_OK( - _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), missingDataObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), missingDataObj)); // Bad data type. BSONObj badDataObj = bucketDoc.addFields(BSON("data" << 123)); - ASSERT_NOT_OK(_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), badDataObj)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), badDataObj)); } } @@ -1217,7 +1282,7 @@ TEST_F(BucketCatalogTest, ReopenClosedBuckets) { "2":{"$date":"2022-06-06T15:34:30.000Z"}}, "a":{"0":1,"1":2,"2":3}, "b":{"0":1,"1":2,"2":3}}})"); - ASSERT_NOT_OK(_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), closedBucket)); + ASSERT_NOT_OK(_reopenBucket(autoColl.getCollection(), closedBucket)); } { @@ -1232,7 +1297,7 @@ TEST_F(BucketCatalogTest, ReopenClosedBuckets) { "2":{"$date":"2022-06-06T15:34:30.000Z"}}, "a":{"0":1,"1":2,"2":3}, "b":{"0":1,"1":2,"2":3}}})"); - ASSERT_OK(_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), openBucket)); + ASSERT_OK(_reopenBucket(autoColl.getCollection(), openBucket)); } { @@ -1246,7 +1311,7 @@ TEST_F(BucketCatalogTest, ReopenClosedBuckets) { "2":{"$date":"2022-06-06T15:34:30.000Z"}}, "a":{"0":1,"1":2,"2":3}, "b":{"0":1,"1":2,"2":3}}})"); - ASSERT_OK(_bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), openBucket)); + ASSERT_OK(_reopenBucket(autoColl.getCollection(), openBucket)); } } @@ -1268,22 +1333,22 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); - auto memUsageBefore = _bucketCatalog->memoryUsage(); - Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), bucketDoc); - auto memUsageAfter = _bucketCatalog->memoryUsage(); + auto memUsageBefore = _bucketCatalog->memoryUsage.load(); + Status status = _reopenBucket(autoColl.getCollection(), bucketDoc); + auto memUsageAfter = _bucketCatalog->memoryUsage.load(); ASSERT_OK(status); ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); ASSERT_GT(memUsageAfter, memUsageBefore); // Insert a measurement that is compatible with the reopened bucket. - auto result = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a":-100,"b":100})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow); // No buckets are closed. ASSERT(result.getValue().closedBuckets.empty()); @@ -1291,7 +1356,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement auto batch = result.getValue().batch; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); // The reopened bucket already contains three committed measurements. @@ -1303,7 +1368,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement batch->max, BSON("u" << BSON("time" << Date_t::fromMillisSinceEpoch(1654529680000) << "b" << 100))); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); } TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurementWithMeta) { @@ -1322,18 +1387,18 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement "b":{"0":1,"1":2,"2":3}}})"); AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); - Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), bucketDoc); + Status status = _reopenBucket(autoColl.getCollection(), bucketDoc); ASSERT_OK(status); // Insert a measurement that is compatible with the reopened bucket. - auto result = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},"tag":42, + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},"tag":42, "a":-100,"b":100})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow); // No buckets are closed. ASSERT(result.getValue().closedBuckets.empty()); @@ -1341,7 +1406,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement auto batch = result.getValue().batch; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); // The reopened bucket already contains three committed measurements. @@ -1353,7 +1418,7 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement batch->max, BSON("u" << BSON("time" << Date_t::fromMillisSinceEpoch(1654529680000) << "b" << 100))); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); } TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasurement) { @@ -1374,22 +1439,22 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); - auto memUsageBefore = _bucketCatalog->memoryUsage(); - Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), bucketDoc); - auto memUsageAfter = _bucketCatalog->memoryUsage(); + auto memUsageBefore = _bucketCatalog->memoryUsage.load(); + Status status = _reopenBucket(autoColl.getCollection(), bucketDoc); + auto memUsageAfter = _bucketCatalog->memoryUsage.load(); ASSERT_OK(status); ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); ASSERT_GT(memUsageAfter, memUsageBefore); // Insert a measurement that is incompatible with the reopened bucket. - auto result = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a":{},"b":{}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow); // The reopened bucket gets closed as the schema is incompatible. ASSERT_EQ(1, result.getValue().closedBuckets.size()); @@ -1397,13 +1462,13 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme auto batch = result.getValue().batch; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); // Since the reopened bucket was incompatible, we opened a new one. ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); } TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) { @@ -1428,23 +1493,22 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) const BSONObj& compressedBucketDoc = compressionResult.compressedBucket.value(); AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); - auto memUsageBefore = _bucketCatalog->memoryUsage(); - Status status = - _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), compressedBucketDoc); - auto memUsageAfter = _bucketCatalog->memoryUsage(); + auto memUsageBefore = _bucketCatalog->memoryUsage.load(); + Status status = _reopenBucket(autoColl.getCollection(), compressedBucketDoc); + auto memUsageAfter = _bucketCatalog->memoryUsage.load(); ASSERT_OK(status); ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); ASSERT_GT(memUsageAfter, memUsageBefore); // Insert a measurement that is compatible with the reopened bucket. - auto result = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a":-100,"b":100})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow); // No buckets are closed. ASSERT(result.getValue().closedBuckets.empty()); @@ -1452,7 +1516,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) auto batch = result.getValue().batch; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); // The reopened bucket already contains three committed measurements. @@ -1464,7 +1528,7 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) batch->max, BSON("u" << BSON("time" << Date_t::fromMillisSinceEpoch(1654529680000) << "b" << 100))); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); } TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement) { @@ -1489,23 +1553,22 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement const BSONObj& compressedBucketDoc = compressionResult.compressedBucket.value(); AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); - auto memUsageBefore = _bucketCatalog->memoryUsage(); - Status status = - _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), compressedBucketDoc); - auto memUsageAfter = _bucketCatalog->memoryUsage(); + auto memUsageBefore = _bucketCatalog->memoryUsage.load(); + Status status = _reopenBucket(autoColl.getCollection(), compressedBucketDoc); + auto memUsageAfter = _bucketCatalog->memoryUsage.load(); ASSERT_OK(status); ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); ASSERT_GT(memUsageAfter, memUsageBefore); // Insert a measurement that is incompatible with the reopened bucket. - auto result = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a":{},"b":{}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + CombineWithInsertsFromOtherClients::kAllow); // The reopened bucket gets closed as the schema is incompatible. ASSERT_EQ(1, result.getValue().closedBuckets.size()); @@ -1513,13 +1576,13 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement auto batch = result.getValue().batch; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); // Since the reopened bucket was incompatible, we opened a new one. ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); } TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) { @@ -1532,18 +1595,18 @@ TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) { // close an old one except under memory pressure. long long meta = 0; auto insertDocument = [&meta, this]() -> ClosedBuckets { - auto result = - _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now() << _metaField << meta++), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now() << _metaField << meta++), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - _bucketCatalog->finish(batch, {}); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); + finish(*_bucketCatalog, batch, {}); return std::move(result.getValue().closedBuckets); }; @@ -1606,56 +1669,56 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); // Try to insert with no open bucket. Should hint to re-open. - auto result = _bucketCatalog->tryInsert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result = tryInsert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); ASSERT(result.getValue().closedBuckets.empty()); ASSERT(!result.getValue().batch); ASSERT_TRUE(stdx::holds_alternative<std::vector<BSONObj>>(result.getValue().candidate)); // Actually insert so we do have an open bucket to test against. - result = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); auto bucketId = batch->bucketHandle.bucketId; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); // Time backwards should hint to re-open. - result = _bucketCatalog->tryInsert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = tryInsert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); ASSERT(result.getValue().closedBuckets.empty()); ASSERT(!result.getValue().batch); ASSERT_TRUE(stdx::holds_alternative<std::vector<BSONObj>>(result.getValue().candidate)); // Time forward should not hint to re-open. - result = _bucketCatalog->tryInsert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = tryInsert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); ASSERT(result.getValue().closedBuckets.empty()); ASSERT(!result.getValue().batch); @@ -1663,13 +1726,14 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { // Now let's insert something with a different meta, so we open a new bucket, see we're past the // memory limit, and archive the existing bucket. - result = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}, "tag": "foo"})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = + insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}, "tag": "foo"})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)); ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)); @@ -1677,19 +1741,19 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { ASSERT_NE(batch->bucketHandle.bucketId, bucketId); ASSERT(batch); ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); // If we try to insert something that could fit in the archived bucket, we should get it back as // a candidate. - result = _bucketCatalog->tryInsert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = tryInsert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); ASSERT(result.getValue().closedBuckets.empty()); ASSERT(!result.getValue().batch); @@ -1703,39 +1767,41 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX); // Insert a document so we have a base bucket - auto result = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a": true})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result = + insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"}, "a": true})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); auto bucketId = batch->bucketHandle.bucketId; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); // Incompatible schema would close the existing bucket, so we should expect to open a new bucket // and proceed to insert the document. - result = _bucketCatalog->tryInsert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}, "a": {}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = + tryInsert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}, "a": {}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); batch = result.getValue().batch; ASSERT(batch); ASSERT_NE(batch->bucketHandle.bucketId, bucketId); ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); } TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { @@ -1745,21 +1811,21 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { // Insert a document so we have a base bucket and we can test that we soft close it when we // reopen a conflicting bucket. - auto result = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); auto oldBucketId = batch->bucketHandle.bucketId; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, @@ -1775,22 +1841,22 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { findResult.bucketToReopen = BucketToReopen{bucketDoc, validator}; // We should be able to pass in a valid bucket and insert into it. - result = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow, - findResult); + result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow, + findResult); ASSERT_OK(result.getStatus()); batch = result.getValue().batch; ASSERT(batch); ASSERT_EQ(batch->bucketHandle.bucketId.oid, bucketDoc["_id"].OID()); ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); // Verify the old bucket was soft-closed ASSERT_EQ(1, _getExecutionStat(_ns1, kNumClosedDueToReopening)); ASSERT_EQ(1, _getExecutionStat(_ns1, kNumBucketsReopened)); @@ -1798,13 +1864,13 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { // Verify that if we try another insert for the soft-closed bucket, we get a query-based // reopening candidate. - result = _bucketCatalog->tryInsert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:35:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + result = tryInsert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:35:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); ASSERT_TRUE(result.getValue().closedBuckets.empty()); ASSERT(!result.getValue().batch); @@ -1818,21 +1884,21 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { // Insert a document so we have a base bucket and we can test that we archive it when we reopen // a conflicting bucket. - auto result = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); + auto result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-05T15:34:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); auto oldBucketId = batch->bucketHandle.bucketId; ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(_bucketCatalog->prepareCommit(batch)); + ASSERT_OK(prepareCommit(*_bucketCatalog, batch)); ASSERT_EQ(batch->measurements.size(), 1); - _bucketCatalog->finish(batch, {}); + finish(*_bucketCatalog, batch, {}); BSONObj bucketDoc = ::mongo::fromjson( R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, @@ -1848,21 +1914,21 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { // previous era. const NamespaceString fakeNs = NamespaceString::createNamespaceString_forTest("test.foo"); const auto fakeId = OID(); - _bucketCatalog->directWriteStart(fakeNs, fakeId); - _bucketCatalog->directWriteFinish(fakeNs, fakeId); + directWriteStart(_bucketCatalog->bucketStateRegistry, fakeNs, fakeId); + directWriteFinish(_bucketCatalog->bucketStateRegistry, fakeNs, fakeId); BucketFindResult findResult; findResult.bucketToReopen = BucketToReopen{bucketDoc, validator, result.getValue().catalogEra}; // We should get an WriteConflict back if we pass in an outdated bucket. - result = _bucketCatalog->insert( - _opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow, - findResult); + result = insert(_opCtx, + *_bucketCatalog, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + ::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:35:40.000Z"}})"), + CombineWithInsertsFromOtherClients::kAllow, + findResult); ASSERT_NOT_OK(result.getStatus()); ASSERT_EQ(result.getStatus().code(), ErrorCodes::WriteConflict); } diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry.cpp index 4448ff0d94c..0767a7843b4 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry.cpp @@ -70,7 +70,7 @@ void incrementEraCountHelper(BucketStateRegistry& registry, BucketStateRegistry: } } -bool isMemberOfClearedSet(BucketStateRegistry& registry, WithLock catalogLock, Bucket* bucket) { +bool isMemberOfClearedSet(BucketStateRegistry& registry, WithLock lock, Bucket* bucket) { for (auto it = registry.clearedSets.lower_bound(bucket->lastChecked + 1); it != registry.clearedSets.end(); ++it) { @@ -89,7 +89,7 @@ bool isMemberOfClearedSet(BucketStateRegistry& registry, WithLock catalogLock, B boost::optional<BucketState> changeBucketStateHelper( BucketStateRegistry& registry, - WithLock catalogLock, + WithLock lock, const BucketId& bucketId, const BucketStateRegistry::StateChangeFn& change) { auto it = registry.bucketStates.find(bucketId); @@ -138,11 +138,11 @@ boost::optional<BucketState> changeBucketStateHelper( } boost::optional<BucketState> markIndividualBucketCleared(BucketStateRegistry& registry, - WithLock catalogLock, + WithLock lock, const BucketId& bucketId) { return changeBucketStateHelper( registry, - catalogLock, + lock, bucketId, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { if (!input.has_value()) { @@ -153,27 +153,25 @@ boost::optional<BucketState> markIndividualBucketCleared(BucketStateRegistry& re } } // namespace -BucketStateRegistry::BucketStateRegistry(Mutex& m) : catalogMutex(m), currentEra(0) {} - BucketStateRegistry::Era getCurrentEra(const BucketStateRegistry& registry) { - stdx::lock_guard lk{registry.catalogMutex}; + stdx::lock_guard lk{registry.mutex}; return registry.currentEra; } BucketStateRegistry::Era getCurrentEraAndIncrementBucketCount(BucketStateRegistry& registry) { - stdx::lock_guard lk{registry.catalogMutex}; + stdx::lock_guard lk{registry.mutex}; incrementEraCountHelper(registry, registry.currentEra); return registry.currentEra; } void decrementBucketCountForEra(BucketStateRegistry& registry, BucketStateRegistry::Era value) { - stdx::lock_guard lk{registry.catalogMutex}; + stdx::lock_guard lk{registry.mutex}; decrementEraCountHelper(registry, value); } BucketStateRegistry::Era getBucketCountForEra(BucketStateRegistry& registry, BucketStateRegistry::Era value) { - stdx::lock_guard lk{registry.catalogMutex}; + stdx::lock_guard lk{registry.mutex}; auto it = registry.bucketsPerEra.find(value); if (it == registry.bucketsPerEra.end()) { return 0; @@ -184,17 +182,17 @@ BucketStateRegistry::Era getBucketCountForEra(BucketStateRegistry& registry, void clearSetOfBuckets(BucketStateRegistry& registry, BucketStateRegistry::ShouldClearFn&& shouldClear) { - stdx::lock_guard lk{registry.catalogMutex}; + stdx::lock_guard lk{registry.mutex}; registry.clearedSets[++registry.currentEra] = std::move(shouldClear); } std::uint64_t getClearedSetsCount(const BucketStateRegistry& registry) { - stdx::lock_guard lk{registry.catalogMutex}; + stdx::lock_guard lk{registry.mutex}; return registry.clearedSets.size(); } boost::optional<BucketState> getBucketState(BucketStateRegistry& registry, Bucket* bucket) { - stdx::lock_guard catalogLock{registry.catalogMutex}; + stdx::lock_guard catalogLock{registry.mutex}; // If the bucket has been cleared, we will set the bucket state accordingly to reflect that. if (isMemberOfClearedSet(registry, catalogLock, bucket)) { return markIndividualBucketCleared(registry, catalogLock, bucket->bucketId); @@ -205,7 +203,7 @@ boost::optional<BucketState> getBucketState(BucketStateRegistry& registry, Bucke boost::optional<BucketState> getBucketState(const BucketStateRegistry& registry, const BucketId& bucketId) { - stdx::lock_guard catalogLock{registry.catalogMutex}; + stdx::lock_guard catalogLock{registry.mutex}; auto it = registry.bucketStates.find(bucketId); return it != registry.bucketStates.end() ? boost::make_optional(it->second) : boost::none; } @@ -213,7 +211,7 @@ boost::optional<BucketState> getBucketState(const BucketStateRegistry& registry, boost::optional<BucketState> changeBucketState(BucketStateRegistry& registry, Bucket* bucket, const BucketStateRegistry::StateChangeFn& change) { - stdx::lock_guard catalogLock{registry.catalogMutex}; + stdx::lock_guard catalogLock{registry.mutex}; if (isMemberOfClearedSet(registry, catalogLock, bucket)) { return markIndividualBucketCleared(registry, catalogLock, bucket->bucketId); } @@ -224,14 +222,14 @@ boost::optional<BucketState> changeBucketState(BucketStateRegistry& registry, boost::optional<BucketState> changeBucketState(BucketStateRegistry& registry, const BucketId& bucketId, const BucketStateRegistry::StateChangeFn& change) { - stdx::lock_guard catalogLock{registry.catalogMutex}; + stdx::lock_guard catalogLock{registry.mutex}; return changeBucketStateHelper(registry, catalogLock, bucketId, change); } -void appendStats(const BucketStateRegistry& registry, BSONObjBuilder* base) { - stdx::lock_guard catalogLock{registry.catalogMutex}; +void appendStats(const BucketStateRegistry& registry, BSONObjBuilder& base) { + stdx::lock_guard catalogLock{registry.mutex}; - BSONObjBuilder builder{base->subobjStart("stateManagement")}; + BSONObjBuilder builder{base.subobjStart("stateManagement")}; builder.appendNumber("bucketsManaged", static_cast<long long>(registry.bucketStates.size())); builder.appendNumber("currentEra", static_cast<long long>(registry.currentEra)); diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry.h b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry.h index fcffbaa9829..1276fe929b6 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry.h +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry.h @@ -52,14 +52,12 @@ struct BucketStateRegistry { using StateChangeFn = std::function<boost::optional<BucketState>(boost::optional<BucketState>, Era)>; - explicit BucketStateRegistry(Mutex& catalogMutex); - - // Pointer to 'BucketCatalog::_mutex'. - Mutex& catalogMutex; + mutable Mutex mutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketStateRegistry::mutex"); // Global number tracking the current number of eras that have passed. Incremented each time // a bucket is cleared. - Era currentEra; + Era currentEra = 0; // Mapping of era to counts of how many buckets are associated with that era. std::map<Era, uint64_t> bucketsPerEra; @@ -133,6 +131,6 @@ boost::optional<BucketState> changeBucketState(BucketStateRegistry& registry, /** * Appends statistics for observability. */ -void appendStats(const BucketStateRegistry& registry, BSONObjBuilder* builder); +void appendStats(const BucketStateRegistry& registry, BSONObjBuilder& builder); } // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp index 363659c18a6..cb7ea2c97e5 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp @@ -29,6 +29,7 @@ #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/timeseries/bucket_catalog/bucket_catalog.h" +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h" #include "mongo/db/timeseries/bucket_catalog/bucket_state.h" #include "mongo/idl/server_parameter_test_util.h" #include "mongo/unittest/bson_test_util.h" @@ -41,44 +42,52 @@ public: BucketStateRegistryTest() {} void clearById(const NamespaceString& ns, const OID& oid) { - directWriteStart(ns, oid); - directWriteFinish(ns, oid); + directWriteStart(bucketStateRegistry, ns, oid); + directWriteFinish(bucketStateRegistry, ns, oid); } - bool hasBeenCleared(Bucket* bucket) { - auto state = getBucketState(_bucketStateRegistry, bucket); + bool hasBeenCleared(Bucket& bucket) { + auto state = getBucketState(bucketStateRegistry, &bucket); return (state && state.value().isSet(BucketStateFlag::kCleared)); } - Bucket* createBucket(const CreationInfo& info) { - auto ptr = _allocateBucket(&_stripes[info.stripe], withLock, info); - ASSERT_FALSE(hasBeenCleared(ptr)); - return ptr; + Bucket& createBucket(const internal::CreationInfo& info) { + auto ptr = &internal::allocateBucket(*this, stripes[info.stripe], withLock, info); + ASSERT_FALSE(hasBeenCleared(*ptr)); + return *ptr; } - bool cannotAccessBucket(Bucket* bucket) { + bool cannotAccessBucket(Bucket& bucket) { if (hasBeenCleared(bucket)) { - _removeBucket( - &_stripes[_getStripeNumber(bucket->key)], withLock, bucket, RemovalMode::kAbort); + internal::removeBucket(*this, + stripes[internal::getStripeNumber(bucket.key)], + withLock, + bucket, + internal::RemovalMode::kAbort); return true; } else { return false; } } - void checkAndRemoveClearedBucket(Bucket* bucket) { - auto a = _findBucket(_stripes[_getStripeNumber(bucket->key)], - withLock, - bucket->bucketId, - IgnoreBucketState::kYes); - ASSERT(a == bucket); - auto b = _findBucket(_stripes[_getStripeNumber(bucket->key)], - withLock, - bucket->bucketId, - IgnoreBucketState::kNo); + void checkAndRemoveClearedBucket(Bucket& bucket) { + auto a = internal::findBucket(bucketStateRegistry, + stripes[internal::getStripeNumber(bucket.key)], + withLock, + bucket.bucketId, + internal::IgnoreBucketState::kYes); + ASSERT(a == &bucket); + auto b = internal::findBucket(bucketStateRegistry, + stripes[internal::getStripeNumber(bucket.key)], + withLock, + bucket.bucketId, + internal::IgnoreBucketState::kNo); ASSERT(b == nullptr); - _removeBucket( - &_stripes[_getStripeNumber(bucket->key)], withLock, bucket, RemovalMode::kAbort); + internal::removeBucket(*this, + stripes[internal::getStripeNumber(bucket.key)], + withLock, + bucket, + internal::RemovalMode::kAbort); } WithLock withLock = WithLock::withoutLock(); @@ -92,14 +101,14 @@ public: BucketKey bucketKey3{ns3, bucketMetadata}; Date_t date = Date_t::now(); TimeseriesOptions options; - ExecutionStatsController stats = _getExecutionStats(ns1); + ExecutionStatsController stats = internal::getOrInitializeExecutionStats(*this, ns1); ClosedBuckets closedBuckets; - BucketCatalog::CreationInfo info1{ - bucketKey1, _getStripeNumber(bucketKey1), date, options, stats, &closedBuckets}; - BucketCatalog::CreationInfo info2{ - bucketKey2, _getStripeNumber(bucketKey2), date, options, stats, &closedBuckets}; - BucketCatalog::CreationInfo info3{ - bucketKey3, _getStripeNumber(bucketKey3), date, options, stats, &closedBuckets}; + internal::CreationInfo info1{ + bucketKey1, internal::getStripeNumber(bucketKey1), date, options, stats, &closedBuckets}; + internal::CreationInfo info2{ + bucketKey2, internal::getStripeNumber(bucketKey2), date, options, stats, &closedBuckets}; + internal::CreationInfo info3{ + bucketKey3, internal::getStripeNumber(bucketKey3), date, options, stats, &closedBuckets}; }; TEST_F(BucketStateRegistryTest, BucketStateSetUnsetFlag) { @@ -309,33 +318,33 @@ TEST_F(BucketStateRegistryTest, EraAdvancesAsExpected) { true}; // When allocating new buckets, we expect their era value to match the BucketCatalog's era. - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 0); - auto bucket1 = createBucket(info1); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 0); - ASSERT_EQ(bucket1->lastChecked, 0); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 0); + auto& bucket1 = createBucket(info1); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 0); + ASSERT_EQ(bucket1.lastChecked, 0); // When clearing buckets, we expect the BucketCatalog's era value to increase while the cleared // bucket era values should remain unchanged. - clear(ns1); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 1); - ASSERT_EQ(bucket1->lastChecked, 0); + clear(*this, ns1); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 1); + ASSERT_EQ(bucket1.lastChecked, 0); // When clearing buckets of one namespace, we expect the era of buckets of any other namespace // to not change. - auto bucket2 = createBucket(info1); - auto bucket3 = createBucket(info2); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 1); - ASSERT_EQ(bucket2->lastChecked, 1); - ASSERT_EQ(bucket3->lastChecked, 1); - clear(ns1); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 2); - ASSERT_EQ(bucket3->lastChecked, 1); - ASSERT_EQ(bucket1->lastChecked, 0); - ASSERT_EQ(bucket2->lastChecked, 1); + auto& bucket2 = createBucket(info1); + auto& bucket3 = createBucket(info2); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 1); + ASSERT_EQ(bucket2.lastChecked, 1); + ASSERT_EQ(bucket3.lastChecked, 1); + clear(*this, ns1); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 2); + ASSERT_EQ(bucket3.lastChecked, 1); + ASSERT_EQ(bucket1.lastChecked, 0); + ASSERT_EQ(bucket2.lastChecked, 1); // Era also advances when clearing by OID clearById(ns1, OID()); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 4); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 4); } TEST_F(BucketStateRegistryTest, EraCountMapUpdatedCorrectly) { @@ -343,184 +352,184 @@ TEST_F(BucketStateRegistryTest, EraCountMapUpdatedCorrectly) { true}; // Creating a bucket in a new era should add a counter for that era to the map. - auto bucket1 = createBucket(info1); - ASSERT_EQ(bucket1->lastChecked, 0); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 0), 1); - clear(ns1); + auto& bucket1 = createBucket(info1); + ASSERT_EQ(bucket1.lastChecked, 0); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 0), 1); + clear(*this, ns1); checkAndRemoveClearedBucket(bucket1); // When the last bucket in an era is destructed, the counter in the map should be removed. - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 0), 0); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 0), 0); // If there are still buckets in the era, however, the counter should still exist in the // map. - auto bucket2 = createBucket(info1); - auto bucket3 = createBucket(info2); - ASSERT_EQ(bucket2->lastChecked, 1); - ASSERT_EQ(bucket3->lastChecked, 1); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 1), 2); - clear(ns2); + auto& bucket2 = createBucket(info1); + auto& bucket3 = createBucket(info2); + ASSERT_EQ(bucket2.lastChecked, 1); + ASSERT_EQ(bucket3.lastChecked, 1); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 1), 2); + clear(*this, ns2); checkAndRemoveClearedBucket(bucket3); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 1), 1); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 1), 1); // A bucket in one era being destroyed and the counter decrementing should not affect a // different era's counter. - auto bucket4 = createBucket(info2); - ASSERT_EQ(bucket4->lastChecked, 2); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 2), 1); - clear(ns2); + auto& bucket4 = createBucket(info2); + ASSERT_EQ(bucket4.lastChecked, 2); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 2), 1); + clear(*this, ns2); checkAndRemoveClearedBucket(bucket4); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 2), 0); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 1), 1); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 2), 0); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 1), 1); } TEST_F(BucketStateRegistryTest, HasBeenClearedFunctionReturnsAsExpected) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucket1 = createBucket(info1); - auto bucket2 = createBucket(info2); - ASSERT_EQ(bucket1->lastChecked, 0); - ASSERT_EQ(bucket2->lastChecked, 0); + auto& bucket1 = createBucket(info1); + auto& bucket2 = createBucket(info2); + ASSERT_EQ(bucket1.lastChecked, 0); + ASSERT_EQ(bucket2.lastChecked, 0); // After a clear operation, _isMemberOfClearedSet returns whether a particular bucket was // cleared or not. It also advances the bucket's era up to the most recent era. ASSERT_FALSE(cannotAccessBucket(bucket1)); ASSERT_FALSE(cannotAccessBucket(bucket2)); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 0), 2); - clear(ns2); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 0), 2); + clear(*this, ns2); ASSERT_FALSE(cannotAccessBucket(bucket1)); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 0), 1); - ASSERT_EQ(bucket1->lastChecked, 1); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 0), 1); + ASSERT_EQ(bucket1.lastChecked, 1); ASSERT(cannotAccessBucket(bucket2)); // Sanity check that all this still works with multiple buckets in a namespace being cleared. - auto bucket3 = createBucket(info2); - auto bucket4 = createBucket(info2); - ASSERT_EQ(bucket3->lastChecked, 1); - ASSERT_EQ(bucket4->lastChecked, 1); - clear(ns2); + auto& bucket3 = createBucket(info2); + auto& bucket4 = createBucket(info2); + ASSERT_EQ(bucket3.lastChecked, 1); + ASSERT_EQ(bucket4.lastChecked, 1); + clear(*this, ns2); ASSERT(cannotAccessBucket(bucket3)); ASSERT(cannotAccessBucket(bucket4)); - auto bucket5 = createBucket(info2); - ASSERT_EQ(bucket5->lastChecked, 2); - clear(ns2); + auto& bucket5 = createBucket(info2); + ASSERT_EQ(bucket5.lastChecked, 2); + clear(*this, ns2); ASSERT(cannotAccessBucket(bucket5)); // _isMemberOfClearedSet should be able to advance a bucket by multiple eras. - ASSERT_EQ(bucket1->lastChecked, 1); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 1), 1); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 3), 0); + ASSERT_EQ(bucket1.lastChecked, 1); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 1), 1); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 3), 0); ASSERT_FALSE(cannotAccessBucket(bucket1)); - ASSERT_EQ(bucket1->lastChecked, 3); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 1), 0); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 3), 1); + ASSERT_EQ(bucket1.lastChecked, 3); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 1), 0); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 3), 1); // _isMemberOfClearedSet works even if the bucket wasn't cleared in the most recent clear. - clear(ns1); - auto bucket6 = createBucket(info2); - ASSERT_EQ(bucket6->lastChecked, 4); - clear(ns2); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 3), 1); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 4), 1); + clear(*this, ns1); + auto& bucket6 = createBucket(info2); + ASSERT_EQ(bucket6.lastChecked, 4); + clear(*this, ns2); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 3), 1); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 4), 1); ASSERT(cannotAccessBucket(bucket1)); ASSERT(cannotAccessBucket(bucket6)); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 3), 0); - ASSERT_EQ(getBucketCountForEra(_bucketStateRegistry, 4), 0); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 3), 0); + ASSERT_EQ(getBucketCountForEra(bucketStateRegistry, 4), 0); } TEST_F(BucketStateRegistryTest, ClearRegistryGarbageCollection) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucket1 = createBucket(info1); - auto bucket2 = createBucket(info2); - ASSERT_EQ(bucket1->lastChecked, 0); - ASSERT_EQ(bucket2->lastChecked, 0); - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 0); - clear(ns1); + auto& bucket1 = createBucket(info1); + auto& bucket2 = createBucket(info2); + ASSERT_EQ(bucket1.lastChecked, 0); + ASSERT_EQ(bucket2.lastChecked, 0); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 0); + clear(*this, ns1); checkAndRemoveClearedBucket(bucket1); // Era 0 still has non-zero count after this clear because bucket2 is still in era 0. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 1); - clear(ns2); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 1); + clear(*this, ns2); checkAndRemoveClearedBucket(bucket2); // Bucket2 gets deleted, which makes era 0's count decrease to 0, then clear registry gets // cleaned. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 0); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 0); - auto bucket3 = createBucket(info1); - auto bucket4 = createBucket(info2); - ASSERT_EQ(bucket3->lastChecked, 2); - ASSERT_EQ(bucket4->lastChecked, 2); - clear(ns1); + auto& bucket3 = createBucket(info1); + auto& bucket4 = createBucket(info2); + ASSERT_EQ(bucket3.lastChecked, 2); + ASSERT_EQ(bucket4.lastChecked, 2); + clear(*this, ns1); checkAndRemoveClearedBucket(bucket3); // Era 2 still has bucket4 in it, so its count remains non-zero. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 1); - auto bucket5 = createBucket(info1); - auto bucket6 = createBucket(info2); - ASSERT_EQ(bucket5->lastChecked, 3); - ASSERT_EQ(bucket6->lastChecked, 3); - clear(ns1); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 1); + auto& bucket5 = createBucket(info1); + auto& bucket6 = createBucket(info2); + ASSERT_EQ(bucket5.lastChecked, 3); + ASSERT_EQ(bucket6.lastChecked, 3); + clear(*this, ns1); checkAndRemoveClearedBucket(bucket5); // Eras 2 and 3 still have bucket4 and bucket6 in them respectively, so their counts remain // non-zero. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 2); - clear(ns2); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 2); + clear(*this, ns2); checkAndRemoveClearedBucket(bucket4); checkAndRemoveClearedBucket(bucket6); // Eras 2 and 3 have their counts become 0 because bucket4 and bucket6 are cleared. The clear // registry is emptied. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 0); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 0); - auto bucket7 = createBucket(info1); - auto bucket8 = createBucket(info3); - ASSERT_EQ(bucket7->lastChecked, 5); - ASSERT_EQ(bucket8->lastChecked, 5); - clear(ns3); + auto& bucket7 = createBucket(info1); + auto& bucket8 = createBucket(info3); + ASSERT_EQ(bucket7.lastChecked, 5); + ASSERT_EQ(bucket8.lastChecked, 5); + clear(*this, ns3); checkAndRemoveClearedBucket(bucket8); // Era 5 still has bucket7 in it so its count remains non-zero. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 1); - auto bucket9 = createBucket(info2); - ASSERT_EQ(bucket9->lastChecked, 6); - clear(ns2); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 1); + auto& bucket9 = createBucket(info2); + ASSERT_EQ(bucket9.lastChecked, 6); + clear(*this, ns2); checkAndRemoveClearedBucket(bucket9); // Era 6's count becomes 0. Since era 5 is the smallest era with non-zero count, no clear ops // are removed. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 2); - auto bucket10 = createBucket(info3); - ASSERT_EQ(bucket10->lastChecked, 7); - clear(ns3); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 2); + auto& bucket10 = createBucket(info3); + ASSERT_EQ(bucket10.lastChecked, 7); + clear(*this, ns3); checkAndRemoveClearedBucket(bucket10); // Era 7's count becomes 0. Since era 5 is the smallest era with non-zero count, no clear ops // are removed. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 3); - clear(ns1); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 3); + clear(*this, ns1); checkAndRemoveClearedBucket(bucket7); // Era 5's count becomes 0. No eras with non-zero counts remain, so all clear ops are removed. - ASSERT_EQUALS(getClearedSetsCount(_bucketStateRegistry), 0); + ASSERT_EQUALS(getClearedSetsCount(bucketStateRegistry), 0); } TEST_F(BucketStateRegistryTest, HasBeenClearedToleratesGapsInRegistry) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucket1 = createBucket(info1); - ASSERT_EQ(bucket1->lastChecked, 0); + auto& bucket1 = createBucket(info1); + ASSERT_EQ(bucket1.lastChecked, 0); clearById(ns1, OID()); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 2); - clear(ns1); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 3); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 2); + clear(*this, ns1); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 3); ASSERT_TRUE(hasBeenCleared(bucket1)); - auto bucket2 = createBucket(info2); - ASSERT_EQ(bucket2->lastChecked, 3); + auto& bucket2 = createBucket(info2); + ASSERT_EQ(bucket2.lastChecked, 3); clearById(ns1, OID()); clearById(ns1, OID()); clearById(ns1, OID()); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 9); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 9); ASSERT_TRUE(hasBeenCleared(bucket1)); ASSERT_FALSE(hasBeenCleared(bucket2)); - clear(ns2); - ASSERT_EQ(getCurrentEra(_bucketStateRegistry), 10); + clear(*this, ns2); + ASSERT_EQ(getCurrentEra(bucketStateRegistry), 10); ASSERT_TRUE(hasBeenCleared(bucket1)); ASSERT_TRUE(hasBeenCleared(bucket2)); } @@ -529,12 +538,13 @@ TEST_F(BucketStateRegistryTest, ArchivingBucketPreservesState) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucket = createBucket(info1); - auto bucketId = bucket->bucketId; + auto& bucket = createBucket(info1); + auto bucketId = bucket.bucketId; ClosedBuckets closedBuckets; - _archiveBucket(&_stripes[info1.stripe], WithLock::withoutLock(), bucket, &closedBuckets); - auto state = getBucketState(_bucketStateRegistry, bucketId); + internal::archiveBucket( + *this, stripes[info1.stripe], WithLock::withoutLock(), bucket, closedBuckets); + auto state = getBucketState(bucketStateRegistry, bucketId); ASSERT_TRUE(state.has_value()); ASSERT_TRUE(state == BucketState{}); } @@ -543,50 +553,50 @@ TEST_F(BucketStateRegistryTest, AbortingBatchRemovesBucketState) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucket = createBucket(info1); - auto bucketId = bucket->bucketId; + auto& bucket = createBucket(info1); + auto bucketId = bucket.bucketId; - auto stats = _getExecutionStats(info1.key.ns); + auto stats = internal::getOrInitializeExecutionStats(*this, info1.key.ns); auto batch = std::make_shared<WriteBatch>(BucketHandle{bucketId, info1.stripe}, 0, stats); - _abort(&_stripes[info1.stripe], WithLock::withoutLock(), batch, Status::OK()); - ASSERT(getBucketState(_bucketStateRegistry, bucketId) == boost::none); + internal::abort(*this, stripes[info1.stripe], WithLock::withoutLock(), batch, Status::OK()); + ASSERT(getBucketState(bucketStateRegistry, bucketId) == boost::none); } TEST_F(BucketStateRegistryTest, ClosingBucketGoesThroughPendingCompressionState) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucket = createBucket(info1); - auto bucketId = bucket->bucketId; + auto& bucket = createBucket(info1); + auto bucketId = bucket.bucketId; - ASSERT(getBucketState(_bucketStateRegistry, bucketId).value() == BucketState{}); + ASSERT(getBucketState(bucketStateRegistry, bucketId).value() == BucketState{}); - auto stats = _getExecutionStats(info1.key.ns); + auto stats = internal::getOrInitializeExecutionStats(*this, info1.key.ns); auto batch = std::make_shared<WriteBatch>(BucketHandle{bucketId, info1.stripe}, 0, stats); ASSERT(claimWriteBatchCommitRights(*batch)); - ASSERT_OK(prepareCommit(batch)); - ASSERT(getBucketState(_bucketStateRegistry, bucketId).value() == + ASSERT_OK(prepareCommit(*this, batch)); + ASSERT(getBucketState(bucketStateRegistry, bucketId).value() == BucketState{}.setFlag(BucketStateFlag::kPrepared)); { // Fool the system by marking the bucket for closure, then finish the batch so it detects // this and closes the bucket. - bucket->rolloverAction = RolloverAction::kHardClose; + bucket.rolloverAction = RolloverAction::kHardClose; CommitInfo commitInfo{}; - auto closedBucket = finish(batch, commitInfo); + auto closedBucket = finish(*this, batch, commitInfo); ASSERT(closedBucket.has_value()); ASSERT_EQ(closedBucket.value().bucketId.oid, bucketId.oid); // Bucket should now be in pending compression state. - ASSERT(getBucketState(_bucketStateRegistry, bucketId).has_value()); - ASSERT(getBucketState(_bucketStateRegistry, bucketId).value() == + ASSERT(getBucketState(bucketStateRegistry, bucketId).has_value()); + ASSERT(getBucketState(bucketStateRegistry, bucketId).value() == BucketState{}.setFlag(BucketStateFlag::kPendingCompression)); } // Destructing the 'ClosedBucket' struct should report it compressed should remove it from the // catalog. - ASSERT(getBucketState(_bucketStateRegistry, bucketId) == boost::none); + ASSERT(getBucketState(bucketStateRegistry, bucketId) == boost::none); } TEST_F(BucketStateRegistryTest, DirectWriteStartInitializesBucketState) { @@ -594,8 +604,8 @@ TEST_F(BucketStateRegistryTest, DirectWriteStartInitializesBucketState) { true}; auto bucketId = BucketId{ns1, OID()}; - directWriteStart(ns1, bucketId.oid); - auto state = getBucketState(_bucketStateRegistry, bucketId); + directWriteStart(bucketStateRegistry, ns1, bucketId.oid); + auto state = getBucketState(bucketStateRegistry, bucketId); ASSERT_TRUE(state.has_value()); ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); } @@ -605,50 +615,50 @@ TEST_F(BucketStateRegistryTest, DirectWriteFinishRemovesBucketState) { true}; auto bucketId = BucketId{ns1, OID()}; - directWriteStart(ns1, bucketId.oid); - auto state = getBucketState(_bucketStateRegistry, bucketId); + directWriteStart(bucketStateRegistry, ns1, bucketId.oid); + auto state = getBucketState(bucketStateRegistry, bucketId); ASSERT_TRUE(state.has_value()); ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); - directWriteFinish(ns1, bucketId.oid); - state = getBucketState(_bucketStateRegistry, bucketId); + directWriteFinish(bucketStateRegistry, ns1, bucketId.oid); + state = getBucketState(bucketStateRegistry, bucketId); ASSERT_FALSE(state.has_value()); } TEST_F(BucketStateRegistryTest, TestDirectWriteStartCounter) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucket = createBucket(info1); - auto bucketId = bucket->bucketId; + auto& bucket = createBucket(info1); + auto bucketId = bucket.bucketId; // Under the hood, the BucketState will contain a counter on the number of ongoing DirectWrites. int32_t dwCounter = 0; // If no direct write has been initiated, the direct write counter should be 0. - auto state = getBucketState(_bucketStateRegistry, bucketId); + auto state = getBucketState(bucketStateRegistry, bucketId); ASSERT_TRUE(state.has_value()); ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites()); // Start a direct write and ensure the counter is incremented correctly. while (dwCounter < 4) { - directWriteStart(ns1, bucketId.oid); + directWriteStart(bucketStateRegistry, ns1, bucketId.oid); dwCounter++; - state = getBucketState(_bucketStateRegistry, bucketId); + state = getBucketState(bucketStateRegistry, bucketId); ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites()); } while (dwCounter > 1) { - directWriteFinish(ns1, bucketId.oid); + directWriteFinish(bucketStateRegistry, ns1, bucketId.oid); dwCounter--; - state = getBucketState(_bucketStateRegistry, bucketId); + state = getBucketState(bucketStateRegistry, bucketId); ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites()); } // When the number of direct writes reaches 0, we should clear the bucket. - directWriteFinish(ns1, bucketId.oid); - state = getBucketState(_bucketStateRegistry, bucketId); + directWriteFinish(bucketStateRegistry, ns1, bucketId.oid); + state = getBucketState(bucketStateRegistry, bucketId); ASSERT_TRUE(hasBeenCleared(bucket)); } @@ -656,28 +666,28 @@ TEST_F(BucketStateRegistryTest, ConflictingDirectWrites) { // While two direct writes (e.g. two racing updates) should correctly conflict at the storage // engine layer, we expect the directWriteStart/Finish pairs to work successfully. BucketId bucketId{ns1, OID()}; - auto state = getBucketState(_bucketStateRegistry, bucketId); + auto state = getBucketState(bucketStateRegistry, bucketId); ASSERT_FALSE(state.has_value()); // First direct write initializes state as untracked. - directWriteStart(bucketId.ns, bucketId.oid); - state = getBucketState(_bucketStateRegistry, bucketId); + directWriteStart(bucketStateRegistry, bucketId.ns, bucketId.oid); + state = getBucketState(bucketStateRegistry, bucketId); ASSERT(state.has_value()); ASSERT(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); ASSERT(state.value().isSet(BucketStateFlag::kUntracked)); - directWriteStart(bucketId.ns, bucketId.oid); + directWriteStart(bucketStateRegistry, bucketId.ns, bucketId.oid); // First finish does not remove the state from the registry. - directWriteFinish(bucketId.ns, bucketId.oid); - state = getBucketState(_bucketStateRegistry, bucketId); + directWriteFinish(bucketStateRegistry, bucketId.ns, bucketId.oid); + state = getBucketState(bucketStateRegistry, bucketId); ASSERT(state.has_value()); ASSERT(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); ASSERT(state.value().isSet(BucketStateFlag::kUntracked)); // Second one removes it. - directWriteFinish(bucketId.ns, bucketId.oid); - state = getBucketState(_bucketStateRegistry, bucketId); + directWriteFinish(bucketStateRegistry, bucketId.ns, bucketId.oid); + state = getBucketState(bucketStateRegistry, bucketId); ASSERT_FALSE(state.has_value()); } diff --git a/src/mongo/db/timeseries/bucket_catalog/execution_stats.cpp b/src/mongo/db/timeseries/bucket_catalog/execution_stats.cpp index 98556bc339c..174a15fa9ec 100644 --- a/src/mongo/db/timeseries/bucket_catalog/execution_stats.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/execution_stats.cpp @@ -29,6 +29,8 @@ #include "mongo/db/timeseries/bucket_catalog/execution_stats.h" +#include "mongo/db/storage/storage_parameters_gen.h" + namespace mongo::timeseries::bucket_catalog { void ExecutionStatsController::incNumBucketInserts(long long increment) { @@ -151,4 +153,53 @@ void ExecutionStatsController::incNumDuplicateBucketsReopened(long long incremen _globalStats.numDuplicateBucketsReopened.fetchAndAddRelaxed(increment); } +void appendExecutionStatsToBuilder(const ExecutionStats& stats, BSONObjBuilder& builder) { + builder.appendNumber("numBucketInserts", stats.numBucketInserts.load()); + builder.appendNumber("numBucketUpdates", stats.numBucketUpdates.load()); + builder.appendNumber("numBucketsOpenedDueToMetadata", + stats.numBucketsOpenedDueToMetadata.load()); + builder.appendNumber("numBucketsClosedDueToCount", stats.numBucketsClosedDueToCount.load()); + builder.appendNumber("numBucketsClosedDueToSchemaChange", + stats.numBucketsClosedDueToSchemaChange.load()); + builder.appendNumber("numBucketsClosedDueToSize", stats.numBucketsClosedDueToSize.load()); + builder.appendNumber("numBucketsClosedDueToTimeForward", + stats.numBucketsClosedDueToTimeForward.load()); + builder.appendNumber("numBucketsClosedDueToTimeBackward", + stats.numBucketsClosedDueToTimeBackward.load()); + builder.appendNumber("numBucketsClosedDueToMemoryThreshold", + stats.numBucketsClosedDueToMemoryThreshold.load()); + + auto commits = stats.numCommits.load(); + builder.appendNumber("numCommits", commits); + builder.appendNumber("numWaits", stats.numWaits.load()); + auto measurementsCommitted = stats.numMeasurementsCommitted.load(); + builder.appendNumber("numMeasurementsCommitted", measurementsCommitted); + if (commits) { + builder.appendNumber("avgNumMeasurementsPerCommit", measurementsCommitted / commits); + } + + if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility)) { + builder.appendNumber("numBucketsClosedDueToReopening", + stats.numBucketsClosedDueToReopening.load()); + builder.appendNumber("numBucketsArchivedDueToMemoryThreshold", + stats.numBucketsArchivedDueToMemoryThreshold.load()); + builder.appendNumber("numBucketsArchivedDueToTimeBackward", + stats.numBucketsArchivedDueToTimeBackward.load()); + builder.appendNumber("numBucketsReopened", stats.numBucketsReopened.load()); + builder.appendNumber("numBucketsKeptOpenDueToLargeMeasurements", + stats.numBucketsKeptOpenDueToLargeMeasurements.load()); + builder.appendNumber("numBucketsClosedDueToCachePressure", + stats.numBucketsClosedDueToCachePressure.load()); + builder.appendNumber("numBucketsFetched", stats.numBucketsFetched.load()); + builder.appendNumber("numBucketsQueried", stats.numBucketsQueried.load()); + builder.appendNumber("numBucketFetchesFailed", stats.numBucketFetchesFailed.load()); + builder.appendNumber("numBucketQueriesFailed", stats.numBucketQueriesFailed.load()); + builder.appendNumber("numBucketReopeningsFailed", stats.numBucketReopeningsFailed.load()); + builder.appendNumber("numDuplicateBucketsReopened", + stats.numDuplicateBucketsReopened.load()); + } +} + + } // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/execution_stats.h b/src/mongo/db/timeseries/bucket_catalog/execution_stats.h index 00551c0e2af..5b2b00c990a 100644 --- a/src/mongo/db/timeseries/bucket_catalog/execution_stats.h +++ b/src/mongo/db/timeseries/bucket_catalog/execution_stats.h @@ -31,6 +31,7 @@ #include <memory> +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/platform/atomic_word.h" namespace mongo::timeseries::bucket_catalog { @@ -100,4 +101,6 @@ private: ExecutionStats& _globalStats; }; +void appendExecutionStatsToBuilder(const ExecutionStats& stats, BSONObjBuilder& builder); + } // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/reopening.cpp b/src/mongo/db/timeseries/bucket_catalog/reopening.cpp index 5bad6a05afc..54f3a729ccf 100644 --- a/src/mongo/db/timeseries/bucket_catalog/reopening.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/reopening.cpp @@ -29,9 +29,21 @@ #include "mongo/db/timeseries/bucket_catalog/reopening.h" +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog.h" + namespace mongo::timeseries::bucket_catalog { ArchivedBucket::ArchivedBucket(const BucketId& b, const std::string& t) : bucketId{b}, timeField{t} {} +long long marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, + bool onlyEntryForMatchingMetaHash) { + return sizeof(Date_t) + // key in set of archived buckets for meta hash + sizeof(ArchivedBucket) + // main data for archived bucket + bucket.timeField.size() + // allocated space for timeField string, ignoring SSO + (onlyEntryForMatchingMetaHash ? sizeof(std::size_t) + // key in set (meta hash) + sizeof(decltype(Stripe::archivedBuckets)::value_type) // set container + : 0); +} + } // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/reopening.h b/src/mongo/db/timeseries/bucket_catalog/reopening.h index d0cca14e3d9..65ceb05f608 100644 --- a/src/mongo/db/timeseries/bucket_catalog/reopening.h +++ b/src/mongo/db/timeseries/bucket_catalog/reopening.h @@ -79,4 +79,14 @@ struct ArchivedBucket { std::string timeField; }; +/** + * Calculates the marginal memory usage for an archived bucket. The 'onlyEntryForMatchingMetaHash' + * parameter indicates that the bucket will be (if inserting) or was (if removing) the only bucket + * associated with it's meta hash value. If true, then the returned value will attempt to account + * for the overhead of the map data structure for the meta hash value. + */ +long long marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, + bool onlyEntryForMatchingMetaHash); + + } // namespace mongo::timeseries::bucket_catalog |