diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2023-01-20 02:40:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-20 03:16:06 +0000 |
commit | 45378b57c88584ee8ef36393ae1a336310d17d12 (patch) | |
tree | e65c359203dc5155c4b8a0a0c6e1b17266f82f17 /src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp | |
parent | 042b97b3e3f1423bdcf66a6436ea4e236565e61b (diff) | |
download | mongo-45378b57c88584ee8ef36393ae1a336310d17d12.tar.gz |
SERVER-72845 Flatten BucketCatalog class structure
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp | 1786 |
1 files changed, 1786 insertions, 0 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp new file mode 100644 index 00000000000..d84cb7af36d --- /dev/null +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp @@ -0,0 +1,1786 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/platform/basic.h" + +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog.h" + +#include <algorithm> +#include <boost/iterator/transform_iterator.hpp> +#include <boost/utility/in_place_factory.hpp> + +#include "mongo/bson/util/bsoncolumn.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/commands/server_status.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/storage/kv/kv_engine.h" +#include "mongo/db/storage/storage_parameters_gen.h" +#include "mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.h" +#include "mongo/db/timeseries/bucket_compression.h" +#include "mongo/db/timeseries/timeseries_constants.h" +#include "mongo/db/timeseries/timeseries_options.h" +#include "mongo/logv2/log.h" +#include "mongo/platform/compiler.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/fail_point.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage + +namespace mongo::timeseries::bucket_catalog { +namespace { +const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>(); +MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict); +MONGO_FAIL_POINT_DEFINE(hangTimeseriesInsertBeforeReopeningBucket); +MONGO_FAIL_POINT_DEFINE(alwaysUseSameBucketCatalogStripe); +MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationAfterStart); +MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeFinish); +MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch); + +OperationId getOpId(OperationContext* opCtx, + BucketCatalog::CombineWithInsertsFromOtherClients combine) { + switch (combine) { + case BucketCatalog::CombineWithInsertsFromOtherClients::kAllow: + return 0; + case BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow: + invariant(opCtx->getOpID()); + return opCtx->getOpID(); + } + MONGO_UNREACHABLE; +} + +BSONObj buildControlMinTimestampDoc(StringData timeField, Date_t roundedTime) { + BSONObjBuilder builder; + builder.append(timeField, roundedTime); + return builder.obj(); +} + +std::pair<OID, Date_t> generateBucketOID(const Date_t& time, const TimeseriesOptions& options) { + OID oid = OID::gen(); + + // We round the measurement timestamp down to the nearest minute, hour, or day depending on the + // granularity. We do this for two reasons. The first is so that if measurements come in + // slightly out of order, we don't have to close the current bucket due to going backwards in + // time. The second, and more important reason, is so that we reliably group measurements + // together into predictable chunks for sharding. This way we know from a measurement timestamp + // what the bucket timestamp will be, so we can route measurements to the right shard chunk. + auto roundedTime = roundTimestampToGranularity(time, options); + int64_t const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch()); + oid.setTimestamp(roundedSeconds); + + // Now, if we stopped here we could end up with bucket OID collisions. Consider the case where + // we have the granularity set to 'Hours'. This means we will round down to the nearest day, so + // any bucket generated on the same machine on the same day will have the same timestamp portion + // and unique instance portion of the OID. Only the increment will differ. Since we only use 3 + // bytes for the increment portion, we run a serious risk of overflow if we are generating lots + // of buckets. + // + // To address this, we'll take the difference between the actual timestamp and the rounded + // timestamp and add it to the instance portion of the OID to ensure we can't have a collision. + // for timestamps generated on the same machine. + // + // This leaves open the possibility that in the case of step-down/step-up, we could get a + // collision if the old primary and the new primary have unique instance bits that differ by + // less than the maximum rounding difference. This is quite unlikely though, and can be resolved + // by restarting the new primary. It remains an open question whether we can fix this in a + // better way. + // TODO (SERVER-61412): Avoid time-series bucket OID collisions after election + auto instance = oid.getInstanceUnique(); + uint32_t sum = DataView(reinterpret_cast<char*>(instance.bytes)).read<uint32_t>(1) + + (durationCount<Seconds>(time.toDurationSinceEpoch()) - roundedSeconds); + DataView(reinterpret_cast<char*>(instance.bytes)).write<uint32_t>(sum, 1); + oid.setInstanceUnique(instance); + + return {oid, roundedTime}; +} + +Status getTimeseriesBucketClearedError(const NamespaceString& ns, const OID& oid) { + return {ErrorCodes::TimeseriesBucketCleared, + str::stream() << "Time-series bucket " << oid << " for namespace " << ns + << " was cleared"}; +} + +/** + * Caluculate the bucket max size constrained by the cache size and the cardinality of active + * buckets. + */ +int32_t getCacheDerivedBucketMaxSize(StorageEngine* storageEngine, uint32_t workloadCardinality) { + invariant(storageEngine); + uint64_t storageCacheSize = + static_cast<uint64_t>(storageEngine->getEngine()->getCacheSizeMB() * 1024 * 1024); + + if (!feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility) || + storageCacheSize == 0 || workloadCardinality == 0) { + return INT_MAX; + } + + uint64_t derivedMaxSize = storageCacheSize / (2 * workloadCardinality); + uint64_t intMax = static_cast<uint64_t>(std::numeric_limits<int32_t>::max()); + return std::min(derivedMaxSize, intMax); +} + +} // namespace + +BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) { + return getBucketCatalog(svcCtx); +} + +BucketCatalog& BucketCatalog::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +Status BucketCatalog::reopenBucket(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& bucketDoc) { + const NamespaceString ns = coll->ns().getTimeseriesViewNamespace(); + const boost::optional<TimeseriesOptions> options = coll->getTimeseriesOptions(); + invariant(options, + str::stream() << "Attempting to reopen a bucket for a non-timeseries collection: " + << ns); + + BSONElement metadata; + auto metaFieldName = options->getMetaField(); + if (metaFieldName) { + metadata = bucketDoc.getField(*metaFieldName); + } + auto key = BucketKey{ns, BucketMetadata{metadata, coll->getDefaultCollator()}}; + + // Validate the bucket document against the schema. + auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto { + return coll->checkValidation(opCtx, bucketDoc); + }; + + auto stats = _getExecutionStats(ns); + + auto res = _rehydrateBucket(opCtx, + ns, + coll->getDefaultCollator(), + *options, + BucketToReopen{bucketDoc, validator}, + boost::none); + if (!res.isOK()) { + return res.getStatus(); + } + auto bucket = std::move(res.getValue()); + + auto stripeNumber = _getStripeNumber(key); + + // Register the reopened bucket with the catalog. + auto& stripe = _stripes[stripeNumber]; + stdx::lock_guard stripeLock{stripe.mutex}; + + ClosedBuckets closedBuckets; + return _reopenBucket(&stripe, + stripeLock, + stats, + key, + std::move(bucket), + _bucketStateManager.getEra(), + &closedBuckets) + .getStatus(); +} + +BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) { + auto const& stripe = _stripes[handle.stripe]; + stdx::lock_guard stripeLock{stripe.mutex}; + + const Bucket* bucket = _findBucket(stripe, stripeLock, handle.bucketId); + if (!bucket) { + return {}; + } + + return bucket->_metadata.toBSON(); +} + +StatusWith<BucketCatalog::InsertResult> BucketCatalog::tryInsert( + OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine) { + return _insert(opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kNo); +} + +StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( + OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + BucketFindResult bucketFindResult) { + return _insert( + opCtx, ns, comparator, options, doc, combine, AllowBucketCreation::kYes, bucketFindResult); +} + +Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { + auto getBatchStatus = [&] { return batch->_promise.getFuture().getNoThrow().getStatus(); }; + + if (batch->finished()) { + // In this case, someone else aborted the batch behind our back. Oops. + return getBatchStatus(); + } + + auto& stripe = _stripes[batch->bucket().stripe]; + _waitToCommitBatch(&stripe, batch); + + stdx::lock_guard stripeLock{stripe.mutex}; + Bucket* bucket = _useBucketAndChangeState( + &stripe, + stripeLock, + batch->bucket().bucketId, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + invariant(input.has_value()); + return input.value().setFlag(BucketStateFlag::kPrepared); + }); + + if (batch->finished()) { + // Someone may have aborted it while we were waiting. Since we have the prepared batch, we + // should now be able to fully abort the bucket. + if (bucket) { + _abort(&stripe, stripeLock, batch, getBatchStatus()); + } + return getBatchStatus(); + } else if (!bucket) { + _abort(&stripe, + stripeLock, + batch, + getTimeseriesBucketClearedError(batch->bucket().bucketId.ns, + batch->bucket().bucketId.oid)); + return getBatchStatus(); + } + + auto prevMemoryUsage = bucket->_memoryUsage; + batch->_prepareCommit(bucket); + _memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage); + + return Status::OK(); +} + +boost::optional<ClosedBucket> BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, + const CommitInfo& info) { + invariant(!batch->finished()); + + boost::optional<ClosedBucket> closedBucket; + + batch->_finish(info); + + auto& stripe = _stripes[batch->bucket().stripe]; + stdx::lock_guard stripeLock{stripe.mutex}; + + Bucket* bucket = _useBucketAndChangeState( + &stripe, + stripeLock, + batch->bucket().bucketId, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + invariant(input.has_value()); + return input.value().unsetFlag(BucketStateFlag::kPrepared); + }); + if (bucket) { + bucket->_preparedBatch.reset(); + } + + auto& stats = batch->_stats; + stats.incNumCommits(); + if (batch->numPreviouslyCommittedMeasurements() == 0) { + stats.incNumBucketInserts(); + } else { + stats.incNumBucketUpdates(); + } + + stats.incNumMeasurementsCommitted(batch->measurements().size()); + if (bucket) { + bucket->_numCommittedMeasurements += batch->measurements().size(); + } + + if (!bucket) { + // It's possible that we cleared the bucket in between preparing the commit and finishing + // here. In this case, we should abort any other ongoing batches and clear the bucket from + // the catalog so it's not hanging around idle. + auto it = stripe.allBuckets.find(batch->bucket().bucketId); + if (it != stripe.allBuckets.end()) { + bucket = it->second.get(); + bucket->_preparedBatch.reset(); + _abort(&stripe, + stripeLock, + bucket, + nullptr, + getTimeseriesBucketClearedError(bucket->ns(), bucket->oid())); + } + } else if (bucket->allCommitted()) { + switch (bucket->_rolloverAction) { + case RolloverAction::kHardClose: + case RolloverAction::kSoftClose: { + const bool eligibleForReopening = + bucket->_rolloverAction == RolloverAction::kSoftClose; + closedBucket = boost::in_place(&_bucketStateManager, + bucket->bucketId(), + bucket->getTimeField().toString(), + bucket->numMeasurements(), + eligibleForReopening); + _removeBucket(&stripe, stripeLock, bucket, RemovalMode::kClose); + break; + } + case RolloverAction::kArchive: { + ClosedBuckets closedBuckets; + _archiveBucket(&stripe, stripeLock, bucket, &closedBuckets); + if (!closedBuckets.empty()) { + closedBucket = std::move(closedBuckets[0]); + } + break; + } + case RolloverAction::kNone: { + _markBucketIdle(&stripe, stripeLock, bucket); + break; + } + } + } + return closedBucket; +} + +void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, const Status& status) { + invariant(batch); + invariant(batch->_commitRights.load()); + + if (batch->finished()) { + return; + } + + auto& stripe = _stripes[batch->bucket().stripe]; + stdx::lock_guard stripeLock{stripe.mutex}; + + _abort(&stripe, stripeLock, batch, status); +} + +void BucketCatalog::directWriteStart(const NamespaceString& ns, const OID& oid) { + invariant(!ns.isTimeseriesBucketsCollection()); + auto result = _bucketStateManager.changeBucketState( + BucketId{ns, oid}, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + if (input.has_value()) { + return input.value().setFlag(BucketStateFlag::kPendingDirectWrite); + } + // The underlying bucket isn't tracked by the catalog, but we need to insert a state + // here so that we can conflict reopening this bucket until we've completed our write + // and the reader has refetched. + return BucketState{} + .setFlag(BucketStateFlag::kPendingDirectWrite) + .setFlag(BucketStateFlag::kUntracked); + }); + if (result && result.value().isPrepared()) { + hangTimeseriesDirectModificationBeforeWriteConflict.pauseWhileSet(); + throwWriteConflictException("Prepared bucket can no longer be inserted into."); + } + hangTimeseriesDirectModificationAfterStart.pauseWhileSet(); +} + +void BucketCatalog::directWriteFinish(const NamespaceString& ns, const OID& oid) { + invariant(!ns.isTimeseriesBucketsCollection()); + hangTimeseriesDirectModificationBeforeFinish.pauseWhileSet(); + (void)_bucketStateManager.changeBucketState( + BucketId{ns, oid}, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + if (!input.has_value()) { + // We may have had multiple direct writes to this document in the same storage + // transaction. If so, a previous call to directWriteFinish may have cleaned up the + // state. + return boost::none; + } + if (input.value().isSet(BucketStateFlag::kUntracked)) { + // The underlying bucket is not tracked by the catalog, so we can clean up the + // state. + return boost::none; + } + return input.value() + .unsetFlag(BucketStateFlag::kPendingDirectWrite) + .setFlag(BucketStateFlag::kCleared); + }); +} + +void BucketCatalog::clear(ShouldClearFn&& shouldClear) { + if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility)) { + _bucketStateManager.clearSetOfBuckets(std::move(shouldClear)); + return; + } + for (auto& stripe : _stripes) { + stdx::lock_guard stripeLock{stripe.mutex}; + for (auto it = stripe.allBuckets.begin(); it != stripe.allBuckets.end();) { + auto nextIt = std::next(it); + + const auto& bucket = it->second; + if (shouldClear(bucket->ns())) { + { + stdx::lock_guard catalogLock{_mutex}; + _executionStats.erase(bucket->ns()); + } + _abort(&stripe, + stripeLock, + bucket.get(), + nullptr, + getTimeseriesBucketClearedError(bucket->ns(), bucket->oid())); + } + + it = nextIt; + } + } +} + +void BucketCatalog::clear(const NamespaceString& ns) { + invariant(!ns.isTimeseriesBucketsCollection()); + clear([ns](const NamespaceString& bucketNs) { return bucketNs == ns; }); +} + +void BucketCatalog::clear(StringData dbName) { + clear([dbName = dbName.toString()](const NamespaceString& bucketNs) { + return bucketNs.db() == dbName; + }); +} + +void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, + BSONObjBuilder* builder) const { + builder->appendNumber("numBucketInserts", stats->numBucketInserts.load()); + builder->appendNumber("numBucketUpdates", stats->numBucketUpdates.load()); + builder->appendNumber("numBucketsOpenedDueToMetadata", + stats->numBucketsOpenedDueToMetadata.load()); + builder->appendNumber("numBucketsClosedDueToCount", stats->numBucketsClosedDueToCount.load()); + builder->appendNumber("numBucketsClosedDueToSchemaChange", + stats->numBucketsClosedDueToSchemaChange.load()); + builder->appendNumber("numBucketsClosedDueToSize", stats->numBucketsClosedDueToSize.load()); + builder->appendNumber("numBucketsClosedDueToTimeForward", + stats->numBucketsClosedDueToTimeForward.load()); + builder->appendNumber("numBucketsClosedDueToTimeBackward", + stats->numBucketsClosedDueToTimeBackward.load()); + builder->appendNumber("numBucketsClosedDueToMemoryThreshold", + stats->numBucketsClosedDueToMemoryThreshold.load()); + + auto commits = stats->numCommits.load(); + builder->appendNumber("numCommits", commits); + builder->appendNumber("numWaits", stats->numWaits.load()); + auto measurementsCommitted = stats->numMeasurementsCommitted.load(); + builder->appendNumber("numMeasurementsCommitted", measurementsCommitted); + if (commits) { + builder->appendNumber("avgNumMeasurementsPerCommit", measurementsCommitted / commits); + } + + if (feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility)) { + builder->appendNumber("numBucketsClosedDueToReopening", + stats->numBucketsClosedDueToReopening.load()); + builder->appendNumber("numBucketsArchivedDueToMemoryThreshold", + stats->numBucketsArchivedDueToMemoryThreshold.load()); + builder->appendNumber("numBucketsArchivedDueToTimeBackward", + stats->numBucketsArchivedDueToTimeBackward.load()); + builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load()); + builder->appendNumber("numBucketsKeptOpenDueToLargeMeasurements", + stats->numBucketsKeptOpenDueToLargeMeasurements.load()); + builder->appendNumber("numBucketsClosedDueToCachePressure", + stats->numBucketsClosedDueToCachePressure.load()); + builder->appendNumber("numBucketsFetched", stats->numBucketsFetched.load()); + builder->appendNumber("numBucketsQueried", stats->numBucketsQueried.load()); + builder->appendNumber("numBucketFetchesFailed", stats->numBucketFetchesFailed.load()); + builder->appendNumber("numBucketQueriesFailed", stats->numBucketQueriesFailed.load()); + builder->appendNumber("numBucketReopeningsFailed", stats->numBucketReopeningsFailed.load()); + builder->appendNumber("numDuplicateBucketsReopened", + stats->numDuplicateBucketsReopened.load()); + } +} + +void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const { + invariant(!ns.isTimeseriesBucketsCollection()); + const std::shared_ptr<ExecutionStats> stats = _getExecutionStats(ns); + _appendExecutionStatsToBuilder(stats.get(), builder); +} + +void BucketCatalog::appendGlobalExecutionStats(BSONObjBuilder* builder) const { + _appendExecutionStatsToBuilder(&_globalExecutionStats, builder); +} + +void BucketCatalog::appendStateManagementStats(BSONObjBuilder* builder) const { + _bucketStateManager.appendStats(builder); +} + +long long BucketCatalog::memoryUsage() const { + return _memoryUsage.load(); +} + +StatusWith<std::pair<BucketKey, Date_t>> BucketCatalog::_extractBucketingParameters( + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc) const { + Date_t time; + BSONElement metadata; + + if (!options.getMetaField().has_value()) { + auto swTime = extractTime(doc, options.getTimeField()); + if (!swTime.isOK()) { + return swTime.getStatus(); + } + time = swTime.getValue(); + } else { + auto swDocTimeAndMeta = + extractTimeAndMeta(doc, options.getTimeField(), options.getMetaField().value()); + if (!swDocTimeAndMeta.isOK()) { + return swDocTimeAndMeta.getStatus(); + } + time = swDocTimeAndMeta.getValue().first; + metadata = swDocTimeAndMeta.getValue().second; + } + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto key = BucketKey{ns, BucketMetadata{metadata, comparator}}; + + return {std::make_pair(key, time)}; +} + +BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) const { + if (MONGO_unlikely(alwaysUseSameBucketCatalogStripe.shouldFail())) { + return 0; + } + return key.hash % kNumberOfStripes; +} + +const Bucket* BucketCatalog::_findBucket(const Stripe& stripe, + WithLock, + const BucketId& bucketId, + IgnoreBucketState mode) { + auto it = stripe.allBuckets.find(bucketId); + if (it != stripe.allBuckets.end()) { + if (mode == IgnoreBucketState::kYes) { + return it->second.get(); + } + + if (auto state = _bucketStateManager.getBucketState(it->second.get()); + state && !state.value().conflictsWithInsertion()) { + return it->second.get(); + } + } + return nullptr; +} + +Bucket* BucketCatalog::_useBucket(Stripe* stripe, + WithLock stripeLock, + const BucketId& bucketId, + IgnoreBucketState mode) { + return const_cast<Bucket*>(_findBucket(*stripe, stripeLock, bucketId, mode)); +} + +Bucket* BucketCatalog::_useBucketAndChangeState(Stripe* stripe, + WithLock stripeLock, + const BucketId& bucketId, + const BucketStateManager::StateChangeFn& change) { + auto it = stripe->allBuckets.find(bucketId); + if (it != stripe->allBuckets.end()) { + if (auto state = _bucketStateManager.changeBucketState(it->second.get(), change); + state && !state.value().conflictsWithInsertion()) { + return it->second.get(); + } + } + return nullptr; +} + +Bucket* BucketCatalog::_useBucket(Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info, + AllowBucketCreation mode) { + auto it = stripe->openBuckets.find(info.key); + if (it == stripe->openBuckets.end()) { + // No open bucket for this metadata. + return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) + : nullptr; + } + + auto& openSet = it->second; + Bucket* bucket = nullptr; + for (Bucket* potentialBucket : openSet) { + if (potentialBucket->_rolloverAction == RolloverAction::kNone) { + bucket = potentialBucket; + break; + } + } + if (!bucket) { + return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) + : nullptr; + } + + if (auto state = _bucketStateManager.getBucketState(bucket); + state && !state.value().conflictsWithInsertion()) { + _markBucketNotIdle(stripe, stripeLock, bucket); + return bucket; + } + + _abort(stripe, + stripeLock, + bucket, + nullptr, + getTimeseriesBucketClearedError(bucket->ns(), bucket->oid())); + + return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) : nullptr; +} + +Bucket* BucketCatalog::_useAlternateBucket(Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info) { + auto it = stripe->openBuckets.find(info.key); + if (it == stripe->openBuckets.end()) { + // No open bucket for this metadata. + return nullptr; + } + + auto& openSet = it->second; + // In order to potentially erase elements of the set while we iterate it (via _abort), we need + // to advance the iterator before we call erase. This means we can't use the more + // straightforward range iteration, and use the somewhat awkward pattern below. + for (auto it = openSet.begin(); it != openSet.end();) { + Bucket* potentialBucket = *it++; + + if (potentialBucket->_rolloverAction == RolloverAction::kNone || + potentialBucket->_rolloverAction == RolloverAction::kHardClose) { + continue; + } + + auto bucketTime = potentialBucket->getTime(); + if (info.time - bucketTime >= Seconds(*info.options.getBucketMaxSpanSeconds()) || + info.time < bucketTime) { + continue; + } + + auto state = _bucketStateManager.getBucketState(potentialBucket); + invariant(state); + if (!state.value().conflictsWithInsertion()) { + invariant(!potentialBucket->_idleListEntry.has_value()); + return potentialBucket; + } + + // If we still have an entry for the bucket in the open set, but it conflicts with + // insertion, then it must have been cleared, and we can clean it up. + invariant(state.value().isSet(BucketStateFlag::kCleared)); + _abort(stripe, + stripeLock, + potentialBucket, + nullptr, + getTimeseriesBucketClearedError(potentialBucket->bucketId().ns, + potentialBucket->bucketId().oid)); + } + + return nullptr; +} + +StatusWith<std::unique_ptr<Bucket>> BucketCatalog::_rehydrateBucket( + OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BucketToReopen& bucketToReopen, + boost::optional<const BucketKey&> expectedKey) { + invariant(feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility)); + const auto& [bucketDoc, validator, catalogEra] = bucketToReopen; + if (catalogEra < _bucketStateManager.getEra()) { + return {ErrorCodes::WriteConflict, "Bucket is from an earlier era, may be outdated"}; + } + + BSONElement bucketIdElem = bucketDoc.getField(kBucketIdFieldName); + if (bucketIdElem.eoo() || bucketIdElem.type() != BSONType::jstOID) { + return {ErrorCodes::BadValue, + str::stream() << kBucketIdFieldName << " is missing or not an ObjectId"}; + } + + // Validate the bucket document against the schema. + auto result = validator(opCtx, bucketDoc); + if (result.first != Collection::SchemaValidationResult::kPass) { + return result.second; + } + + auto controlField = bucketDoc.getObjectField(kBucketControlFieldName); + auto closedElem = controlField.getField(kBucketControlClosedFieldName); + if (closedElem.booleanSafe()) { + return {ErrorCodes::BadValue, + "Bucket has been marked closed and is not eligible for reopening"}; + } + + BSONElement metadata; + auto metaFieldName = options.getMetaField(); + if (metaFieldName) { + metadata = bucketDoc.getField(kBucketMetaFieldName); + } + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto key = BucketKey{ns, BucketMetadata{metadata, comparator}}; + if (expectedKey.has_value() && key != expectedKey.value()) { + return {ErrorCodes::BadValue, "Bucket metadata does not match (hash collision)"}; + } + auto stripeNumber = _getStripeNumber(key); + + BucketId bucketId{key.ns, bucketIdElem.OID()}; + std::unique_ptr<Bucket> bucket = + std::make_unique<Bucket>(bucketId, stripeNumber, key.hash, &_bucketStateManager); + + const bool isCompressed = isCompressedBucket(bucketDoc); + + // Initialize the remaining member variables from the bucket document. + bucket->_metadata = key.metadata; + bucket->_timeField = options.getTimeField().toString(); + if (isCompressed) { + auto decompressed = decompressBucket(bucketDoc); + if (!decompressed.has_value()) { + return Status{ErrorCodes::BadValue, "Bucket could not be decompressed"}; + } + bucket->_size = decompressed.value().objsize(); + bucket->_decompressed = DecompressionResult{bucketDoc, decompressed.value()}; + bucket->_memoryUsage += (decompressed.value().objsize() + bucketDoc.objsize()); + } else { + bucket->_size = bucketDoc.objsize(); + } + bucket->_minTime = controlField.getObjectField(kBucketControlMinFieldName) + .getField(options.getTimeField()) + .Date(); + + // Populate the top-level data field names. + const BSONObj& dataObj = bucketDoc.getObjectField(kBucketDataFieldName); + for (const BSONElement& dataElem : dataObj) { + auto hashedKey = StringSet::hasher().hashed_key(dataElem.fieldName()); + bucket->_fieldNames.emplace(hashedKey); + } + + auto swMinMax = generateMinMaxFromBucketDoc(bucketDoc, comparator); + if (!swMinMax.isOK()) { + return swMinMax.getStatus(); + } + bucket->_minmax = std::move(swMinMax.getValue()); + + auto swSchema = generateSchemaFromBucketDoc(bucketDoc, comparator); + if (!swSchema.isOK()) { + return swSchema.getStatus(); + } + bucket->_schema = std::move(swSchema.getValue()); + + uint32_t numMeasurements = 0; + const BSONElement timeColumnElem = dataObj.getField(options.getTimeField()); + + if (isCompressed && timeColumnElem.type() == BSONType::BinData) { + BSONColumn storage{timeColumnElem}; + numMeasurements = storage.size(); + } else if (timeColumnElem.isABSONObj()) { + numMeasurements = timeColumnElem.Obj().nFields(); + } else { + return {ErrorCodes::BadValue, + "Bucket data field is malformed (missing a valid time column)"}; + } + + bucket->_numMeasurements = numMeasurements; + bucket->_numCommittedMeasurements = numMeasurements; + + // The namespace is stored two times: the bucket itself and openBuckets. We don't have a great + // approximation for the _schema or _minmax data structure size, so we use the control field + // size as an approximation for _minmax, and half that size for _schema. Since the metadata + // is stored in the bucket, we need to add that as well. A unique pointer to the bucket is + // stored once: allBuckets. A raw pointer to the bucket is stored at most twice: openBuckets, + // idleBuckets. + bucket->_memoryUsage += (key.ns.size() * 2) + 1.5 * controlField.objsize() + + key.metadata.toBSON().objsize() + sizeof(Bucket) + sizeof(std::unique_ptr<Bucket>) + + (sizeof(Bucket*) * 2); + + return {std::move(bucket)}; +} + +StatusWith<Bucket*> BucketCatalog::_reopenBucket(Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController stats, + const BucketKey& key, + std::unique_ptr<Bucket>&& bucket, + std::uint64_t targetEra, + ClosedBuckets* closedBuckets) { + invariant(bucket); + + _expireIdleBuckets(stripe, stripeLock, stats, closedBuckets); + + // We may need to initialize the bucket's state. + bool conflicts = false; + auto initializeStateFn = + [targetEra, &conflicts](boost::optional<BucketState> input, + std::uint64_t currentEra) -> boost::optional<BucketState> { + if (targetEra < currentEra || + (input.has_value() && input.value().conflictsWithReopening())) { + conflicts = true; + return input; + } + conflicts = false; + return input.has_value() ? input.value() : BucketState{}; + }; + + auto state = _bucketStateManager.changeBucketState(bucket->bucketId(), initializeStateFn); + if (conflicts) { + return {ErrorCodes::WriteConflict, "Bucket may be stale"}; + } + + // If this bucket was archived, we need to remove it from the set of archived buckets. + if (auto setIt = stripe->archivedBuckets.find(key.hash); + setIt != stripe->archivedBuckets.end()) { + auto& archivedSet = setIt->second; + if (auto bucketIt = archivedSet.find(bucket->getTime()); + bucketIt != archivedSet.end() && bucket->bucketId() == bucketIt->second.bucketId) { + long long memory = + _marginalMemoryUsageForArchivedBucket(bucketIt->second, archivedSet.size() == 1); + if (archivedSet.size() == 1) { + stripe->archivedBuckets.erase(setIt); + } else { + archivedSet.erase(bucketIt); + } + _memoryUsage.fetchAndSubtract(memory); + _numberOfActiveBuckets.fetchAndSubtract(1); + } + } + + // Pass ownership of the reopened bucket to the bucket catalog. + auto [insertedIt, newlyInserted] = + stripe->allBuckets.try_emplace(bucket->bucketId(), std::move(bucket)); + invariant(newlyInserted); + Bucket* unownedBucket = insertedIt->second.get(); + + // If we already have an open bucket for this key, we need to close it. + if (auto it = stripe->openBuckets.find(key); it != stripe->openBuckets.end()) { + auto& openSet = it->second; + for (Bucket* existingBucket : openSet) { + if (existingBucket->_rolloverAction == RolloverAction::kNone) { + stats.incNumBucketsClosedDueToReopening(); + if (existingBucket->allCommitted()) { + constexpr bool eligibleForReopening = true; + closedBuckets->emplace_back( + ClosedBucket{&_bucketStateManager, + existingBucket->bucketId(), + existingBucket->getTimeField().toString(), + existingBucket->numMeasurements(), + eligibleForReopening}); + _removeBucket(stripe, stripeLock, existingBucket, RemovalMode::kClose); + } else { + existingBucket->setRolloverAction(RolloverAction::kSoftClose); + } + // We should only have one open bucket at a time. + break; + } + } + } + + // Now actually mark this bucket as open. + stripe->openBuckets[key].emplace(unownedBucket); + stats.incNumBucketsReopened(); + + _memoryUsage.addAndFetch(unownedBucket->_memoryUsage); + _numberOfActiveBuckets.fetchAndAdd(1); + + return unownedBucket; +} + +StatusWith<Bucket*> BucketCatalog::_reuseExistingBucket(Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController* stats, + const BucketKey& key, + Bucket* existingBucket, + std::uint64_t targetEra) { + invariant(existingBucket); + + // If we have an existing bucket, passing the Bucket* will let us check if the bucket was + // cleared as part of a set since the last time it was used. If we were to just check by + // OID, we may miss if e.g. there was a move chunk operation. + bool conflicts = false; + auto state = _bucketStateManager.changeBucketState( + existingBucket, + [targetEra, &conflicts](boost::optional<BucketState> input, + std::uint64_t currentEra) -> boost::optional<BucketState> { + if (targetEra < currentEra || + (input.has_value() && input.value().conflictsWithReopening())) { + conflicts = true; + return input; + } + conflicts = false; + return input.has_value() ? input.value() : BucketState{}; + }); + if (state.has_value() && state.value().isSet(BucketStateFlag::kCleared)) { + _abort(stripe, + stripeLock, + existingBucket, + nullptr, + getTimeseriesBucketClearedError(existingBucket->_bucketId.ns, + existingBucket->_bucketId.oid)); + conflicts = true; + } + if (conflicts) { + return {ErrorCodes::WriteConflict, "Bucket may be stale"}; + } + + // It's possible to have two buckets with the same ID in different collections, so let's make + // extra sure the existing bucket is the right one. + if (existingBucket->ns() != key.ns) { + return {ErrorCodes::BadValue, "Cannot re-use bucket: same ID but different namespace"}; + } + + // If the bucket was already open, wasn't cleared, the state didn't conflict with reopening, and + // the namespace matches, then we can simply return it. + stats->incNumDuplicateBucketsReopened(); + _markBucketNotIdle(stripe, stripeLock, existingBucket); + + return existingBucket; +} + +StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( + OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + BucketFindResult bucketFindResult) { + invariant(!ns.isTimeseriesBucketsCollection()); + + auto res = _extractBucketingParameters(ns, comparator, options, doc); + if (!res.isOK()) { + return res.getStatus(); + } + auto& key = res.getValue().first; + auto time = res.getValue().second; + + ExecutionStatsController stats = _getExecutionStats(ns); + _updateBucketFetchAndQueryStats(stats, bucketFindResult); + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto stripeNumber = _getStripeNumber(key); + + InsertResult result; + result.catalogEra = _bucketStateManager.getEra(); + CreationInfo info{key, stripeNumber, time, options, stats, &result.closedBuckets}; + boost::optional<BucketToReopen> bucketToReopen = std::move(bucketFindResult.bucketToReopen); + + auto rehydratedBucket = bucketToReopen.has_value() + ? _rehydrateBucket(opCtx, ns, comparator, options, bucketToReopen.value(), key) + : StatusWith<std::unique_ptr<Bucket>>{ErrorCodes::BadValue, "No bucket to rehydrate"}; + if (rehydratedBucket.getStatus().code() == ErrorCodes::WriteConflict) { + stats.incNumBucketReopeningsFailed(); + return rehydratedBucket.getStatus(); + } + + auto& stripe = _stripes[stripeNumber]; + stdx::lock_guard stripeLock{stripe.mutex}; + + if (rehydratedBucket.isOK()) { + invariant(mode == AllowBucketCreation::kYes); + hangTimeseriesInsertBeforeReopeningBucket.pauseWhileSet(); + + StatusWith<Bucket*> swBucket{nullptr}; + auto existingIt = stripe.allBuckets.find(rehydratedBucket.getValue()->bucketId()); + if (existingIt != stripe.allBuckets.end()) { + // First let's check the existing bucket if we have one. + Bucket* existingBucket = existingIt->second.get(); + swBucket = _reuseExistingBucket( + &stripe, stripeLock, &stats, key, existingBucket, bucketToReopen->catalogEra); + } else { + // No existing bucket to use, go ahead and try to reopen our rehydrated bucket. + swBucket = _reopenBucket(&stripe, + stripeLock, + stats, + key, + std::move(rehydratedBucket.getValue()), + bucketToReopen->catalogEra, + &result.closedBuckets); + } + + if (swBucket.isOK()) { + Bucket* bucket = swBucket.getValue(); + invariant(bucket); + auto insertionResult = _insertIntoBucket(opCtx, + &stripe, + stripeLock, + doc, + combine, + mode, + &info, + bucket, + &result.closedBuckets); + auto* batch = stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); + invariant(batch); + result.batch = *batch; + + return std::move(result); + } else { + stats.incNumBucketReopeningsFailed(); + if (swBucket.getStatus().code() == ErrorCodes::WriteConflict) { + return swBucket.getStatus(); + } + // If we had a different type of error, then we should fall through and proceed to open + // a new bucket. + } + } + + Bucket* bucket = _useBucket(&stripe, stripeLock, info, mode); + if (!bucket) { + invariant(mode == AllowBucketCreation::kNo); + constexpr bool allowQueryBasedReopening = true; + result.candidate = + _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening); + return std::move(result); + } + + auto insertionResult = _insertIntoBucket( + opCtx, &stripe, stripeLock, doc, combine, mode, &info, bucket, &result.closedBuckets); + if (auto* reason = stdx::get_if<RolloverReason>(&insertionResult)) { + invariant(mode == AllowBucketCreation::kNo); + if (bucket->allCommitted()) { + _markBucketIdle(&stripe, stripeLock, bucket); + } + + // If we were time forward or backward, we might be able to "reopen" a bucket we still have + // in memory that's set to be closed when pending operations finish. + if ((*reason == RolloverReason::kTimeBackward || *reason == RolloverReason::kTimeForward)) { + if (Bucket* alternate = _useAlternateBucket(&stripe, stripeLock, info)) { + insertionResult = _insertIntoBucket(opCtx, + &stripe, + stripeLock, + doc, + combine, + mode, + &info, + alternate, + &result.closedBuckets); + if (auto* batch = stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult)) { + result.batch = *batch; + return std::move(result); + } + + // We weren't able to insert into the other bucket, so fall through to the regular + // reopening procedure. + } + } + + bool allowQueryBasedReopening = (*reason == RolloverReason::kTimeBackward); + result.candidate = + _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening); + } else { + result.batch = *stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); + } + return std::move(result); +} + +stdx::variant<std::shared_ptr<WriteBatch>, RolloverReason> BucketCatalog::_insertIntoBucket( + OperationContext* opCtx, + Stripe* stripe, + WithLock stripeLock, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + CreationInfo* info, + Bucket* bucket, + ClosedBuckets* closedBuckets) { + NewFieldNames newFieldNamesToBeInserted; + int32_t sizeToBeAdded = 0; + const auto previousMemoryUsage = bucket->_memoryUsage; + + bool isNewlyOpenedBucket = (bucket->_size == 0); + if (!isNewlyOpenedBucket) { + auto [action, reason] = _determineRolloverAction( + opCtx, doc, info, bucket, &newFieldNamesToBeInserted, &sizeToBeAdded, mode); + if ((action == RolloverAction::kSoftClose || action == RolloverAction::kArchive) && + mode == AllowBucketCreation::kNo) { + // We don't actually want to roll this bucket over yet, bail out. + return reason; + } else if (action != RolloverAction::kNone) { + info->openedDuetoMetadata = false; + bucket = _rollover(stripe, stripeLock, bucket, *info, action); + isNewlyOpenedBucket = true; + } + } + if (isNewlyOpenedBucket) { + bucket->_calculateBucketFieldsAndSizeChange( + doc, info->options.getMetaField(), &newFieldNamesToBeInserted, &sizeToBeAdded); + } + + auto batch = bucket->_activeBatch(getOpId(opCtx, combine), info->stats); + batch->_addMeasurement(doc); + batch->_recordNewFields(bucket, std::move(newFieldNamesToBeInserted)); + + bucket->_numMeasurements++; + bucket->_size += sizeToBeAdded; + if (isNewlyOpenedBucket) { + // The metadata only needs to be set if this bucket was newly created. + bucket->_metadata = info->key.metadata; + + // The namespace is stored two times: the bucket itself and openBuckets. + // We don't have a great approximation for the + // _schema size, so we use initial document size minus metadata as an approximation. Since + // the metadata itself is stored once, in the bucket, we can combine the two and just use + // the initial document size. A unique pointer to the bucket is stored once: allBuckets. A + // raw pointer to the bucket is stored at most twice: openBuckets, idleBuckets. + bucket->_memoryUsage += (info->key.ns.size() * 2) + doc.objsize() + sizeof(Bucket) + + sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2); + + auto updateStatus = bucket->_schema.update( + doc, info->options.getMetaField(), info->key.metadata.getComparator()); + invariant(updateStatus == Schema::UpdateStatus::Updated); + } else { + _memoryUsage.fetchAndSubtract(previousMemoryUsage); + } + _memoryUsage.fetchAndAdd(bucket->_memoryUsage); + + return batch; +} + +void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch) { + while (true) { + std::shared_ptr<WriteBatch> current; + + { + stdx::lock_guard stripeLock{stripe->mutex}; + Bucket* bucket = + _useBucket(stripe, stripeLock, batch->bucket().bucketId, IgnoreBucketState::kNo); + if (!bucket || batch->finished()) { + return; + } + + current = bucket->_preparedBatch; + if (!current) { + // No other batches for this bucket are currently committing, so we can proceed. + bucket->_preparedBatch = batch; + bucket->_batches.erase(batch->_opId); + return; + } + } + + // We only hit this failpoint when there are conflicting prepared batches on the same + // bucket. + hangWaitingForConflictingPreparedBatch.pauseWhileSet(); + + // We have to wait for someone else to finish. + current->getResult().getStatus().ignore(); // We don't care about the result. + } +} + +void BucketCatalog::_removeBucket(Stripe* stripe, + WithLock stripeLock, + Bucket* bucket, + RemovalMode mode) { + invariant(bucket->_batches.empty()); + invariant(!bucket->_preparedBatch); + + auto allIt = stripe->allBuckets.find(bucket->bucketId()); + invariant(allIt != stripe->allBuckets.end()); + + _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); + _markBucketNotIdle(stripe, stripeLock, bucket); + + // If the bucket was rolled over, then there may be a different open bucket for this metadata. + auto openIt = stripe->openBuckets.find({bucket->ns(), bucket->_metadata}); + if (openIt != stripe->openBuckets.end()) { + auto& openSet = openIt->second; + auto bucketIt = openSet.find(bucket); + if (bucketIt != openSet.end()) { + if (openSet.size() == 1) { + stripe->openBuckets.erase(openIt); + } else { + openSet.erase(bucketIt); + } + } + } + + // If we are cleaning up while archiving a bucket, then we want to preserve its state. Otherwise + // we can remove the state from the catalog altogether. + switch (mode) { + case RemovalMode::kClose: { + auto state = _bucketStateManager.getBucketState(bucket->bucketId()); + invariant(state.has_value()); + invariant(state.value().isSet(BucketStateFlag::kPendingCompression)); + break; + } + case RemovalMode::kAbort: + _bucketStateManager.changeBucketState( + bucket->bucketId(), + [](boost::optional<BucketState> input, + std::uint64_t) -> boost::optional<BucketState> { + invariant(input.has_value()); + if (input->conflictsWithReopening()) { + return input.value().setFlag(BucketStateFlag::kUntracked); + } + return boost::none; + }); + break; + case RemovalMode::kArchive: + // No state change + break; + } + + _numberOfActiveBuckets.fetchAndSubtract(1); + stripe->allBuckets.erase(allIt); +} + +void BucketCatalog::_archiveBucket(Stripe* stripe, + WithLock stripeLock, + Bucket* bucket, + ClosedBuckets* closedBuckets) { + bool archived = false; + auto& archivedSet = stripe->archivedBuckets[bucket->keyHash()]; + auto it = archivedSet.find(bucket->getTime()); + if (it == archivedSet.end()) { + auto [it, inserted] = archivedSet.emplace( + bucket->getTime(), + ArchivedBucket{bucket->bucketId(), bucket->getTimeField().toString()}); + + long long memory = + _marginalMemoryUsageForArchivedBucket(it->second, archivedSet.size() == 1); + _memoryUsage.fetchAndAdd(memory); + archived = true; + } + + RemovalMode mode = RemovalMode::kArchive; + if (archived) { + // If we have an archived bucket, we still want to account for it in numberOfActiveBuckets + // so we will increase it here since removeBucket decrements the count. + _numberOfActiveBuckets.fetchAndAdd(1); + } else { + // We had a meta hash collision, and already have a bucket archived with the same meta hash + // and timestamp as this bucket. Since it's somewhat arbitrary which bucket we keep, we'll + // keep the one that's already archived and just plain close this one. + mode = RemovalMode::kClose; + constexpr bool eligibleForReopening = true; + closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, + bucket->bucketId(), + bucket->getTimeField().toString(), + bucket->numMeasurements(), + eligibleForReopening}); + } + + _removeBucket(stripe, stripeLock, bucket, mode); +} + +boost::optional<OID> BucketCatalog::_findArchivedCandidate(Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info) { + auto setIt = stripe->archivedBuckets.find(info.key.hash); + if (setIt == stripe->archivedBuckets.end()) { + return boost::none; + } + + auto& archivedSet = setIt->second; + + // We want to find the largest time that is not greater than info.time. Generally lower_bound + // will return the smallest element not less than the search value, but we are using + // std::greater instead of std::less for the map's comparisons. This means the order of keys + // will be reversed, and lower_bound will return what we want. + auto it = archivedSet.lower_bound(info.time); + if (it == archivedSet.end()) { + return boost::none; + } + + const auto& [candidateTime, candidateBucket] = *it; + invariant(candidateTime <= info.time); + // We need to make sure our measurement can fit without violating max span. If not, we + // can't use this bucket. + if (info.time - candidateTime < Seconds(*info.options.getBucketMaxSpanSeconds())) { + auto state = _bucketStateManager.getBucketState(candidateBucket.bucketId); + if (state && !state.value().conflictsWithReopening()) { + return candidateBucket.bucketId.oid; + } else { + if (state) { + _bucketStateManager.changeBucketState( + candidateBucket.bucketId, + [](boost::optional<BucketState> input, + std::uint64_t) -> boost::optional<BucketState> { + if (!input.has_value()) { + return boost::none; + } + invariant(input.value().conflictsWithReopening()); + return input.value().setFlag(BucketStateFlag::kUntracked); + }); + } + long long memory = + _marginalMemoryUsageForArchivedBucket(candidateBucket, archivedSet.size() == 1); + if (archivedSet.size() == 1) { + stripe->archivedBuckets.erase(setIt); + } else { + archivedSet.erase(it); + } + _memoryUsage.fetchAndSubtract(memory); + _numberOfActiveBuckets.fetchAndSubtract(1); + } + } + + return boost::none; +} + +stdx::variant<std::monostate, OID, BSONObj> BucketCatalog::_getReopeningCandidate( + Stripe* stripe, WithLock stripeLock, const CreationInfo& info, bool allowQueryBasedReopening) { + if (auto archived = _findArchivedCandidate(stripe, stripeLock, info)) { + return archived.value(); + } + + if (!allowQueryBasedReopening) { + return {}; + } + + boost::optional<BSONElement> metaElement; + if (info.options.getMetaField().has_value()) { + metaElement = info.key.metadata.element(); + } + + auto controlMinTimePath = kControlMinFieldNamePrefix.toString() + info.options.getTimeField(); + + return generateReopeningFilters( + info.time, metaElement, controlMinTimePath, *info.options.getBucketMaxSpanSeconds()); +} + +void BucketCatalog::_abort(Stripe* stripe, + WithLock stripeLock, + std::shared_ptr<WriteBatch> batch, + const Status& status) { + // Before we access the bucket, make sure it's still there. + Bucket* bucket = + _useBucket(stripe, stripeLock, batch->bucket().bucketId, IgnoreBucketState::kYes); + if (!bucket) { + // Special case, bucket has already been cleared, and we need only abort this batch. + batch->_abort(status); + return; + } + + // Proceed to abort any unprepared batches and remove the bucket if possible + _abort(stripe, stripeLock, bucket, batch, status); +} + +void BucketCatalog::_abort(Stripe* stripe, + WithLock stripeLock, + Bucket* bucket, + std::shared_ptr<WriteBatch> batch, + const Status& status) { + // Abort any unprepared batches. This should be safe since we have a lock on the stripe, + // preventing anyone else from using these. + for (const auto& [_, current] : bucket->_batches) { + current->_abort(status); + } + bucket->_batches.clear(); + + bool doRemove = true; // We shouldn't remove the bucket if there's a prepared batch outstanding + // and it's not the one we manage. In that case, we don't know what the + // user is doing with it, but we need to keep the bucket around until + // that batch is finished. + if (auto& prepared = bucket->_preparedBatch) { + if (batch && prepared == batch) { + // We own the prepared batch, so we can go ahead and abort it and remove the bucket. + prepared->_abort(status); + prepared.reset(); + } else { + doRemove = false; + } + } + + if (doRemove) { + _removeBucket(stripe, stripeLock, bucket, RemovalMode::kAbort); + } else { + _bucketStateManager.changeBucketState( + bucket->bucketId(), + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + invariant(input.has_value()); + return input.value().setFlag(BucketStateFlag::kCleared); + }); + } +} + +void BucketCatalog::_compressionDone(const BucketId& bucketId) { + _bucketStateManager.changeBucketState( + bucketId, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + return boost::none; + }); +} + +void BucketCatalog::_markBucketIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { + invariant(bucket); + invariant(!bucket->_idleListEntry.has_value()); + invariant(bucket->allCommitted()); + stripe->idleBuckets.push_front(bucket); + bucket->_idleListEntry = stripe->idleBuckets.begin(); +} + +void BucketCatalog::_markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { + invariant(bucket); + if (bucket->_idleListEntry.has_value()) { + stripe->idleBuckets.erase(bucket->_idleListEntry.value()); + bucket->_idleListEntry = boost::none; + } +} + +void BucketCatalog::_expireIdleBuckets(Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController& stats, + ClosedBuckets* closedBuckets) { + // As long as we still need space and have entries and remaining attempts, close idle buckets. + int32_t numExpired = 0; + + const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + constexpr bool eligibleForReopening{true}; + + while (!stripe->idleBuckets.empty() && + _memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && + numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { + Bucket* bucket = stripe->idleBuckets.back(); + + auto state = _bucketStateManager.getBucketState(bucket); + if (canArchive && state && !state.value().conflictsWithInsertion()) { + // Can archive a bucket if it's still eligible for insertions. + _archiveBucket(stripe, stripeLock, bucket, closedBuckets); + stats.incNumBucketsArchivedDueToMemoryThreshold(); + } else if (state && state.value().isSet(BucketStateFlag::kCleared)) { + // Bucket was cleared and just needs to be removed from catalog. + _removeBucket(stripe, stripeLock, bucket, RemovalMode::kAbort); + } else { + closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, + bucket->bucketId(), + bucket->getTimeField().toString(), + bucket->numMeasurements(), + eligibleForReopening}); + _removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose); + stats.incNumBucketsClosedDueToMemoryThreshold(); + } + + ++numExpired; + } + + while (canArchive && !stripe->archivedBuckets.empty() && + _memoryUsage.load() > getTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes() && + numExpired <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { + + auto& [hash, archivedSet] = *stripe->archivedBuckets.begin(); + invariant(!archivedSet.empty()); + + auto& [timestamp, bucket] = *archivedSet.begin(); + closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, + bucket.bucketId, + bucket.timeField, + boost::none, + eligibleForReopening}); + + long long memory = _marginalMemoryUsageForArchivedBucket(bucket, archivedSet.size() == 1); + if (archivedSet.size() == 1) { + // If this is the only entry, erase the whole map so we don't leave it empty. + stripe->archivedBuckets.erase(stripe->archivedBuckets.begin()); + } else { + // Otherwise just erase this bucket from the map. + archivedSet.erase(archivedSet.begin()); + } + _memoryUsage.fetchAndSubtract(memory); + _numberOfActiveBuckets.fetchAndSubtract(1); + + stats.incNumBucketsClosedDueToMemoryThreshold(); + ++numExpired; + } +} + +Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info) { + _expireIdleBuckets(stripe, stripeLock, info.stats, info.closedBuckets); + + auto [oid, roundedTime] = generateBucketOID(info.time, info.options); + auto bucketId = BucketId{info.key.ns, oid}; + + auto [it, inserted] = stripe->allBuckets.try_emplace( + bucketId, + std::make_unique<Bucket>(bucketId, info.stripe, info.key.hash, &_bucketStateManager)); + tassert(6130900, "Expected bucket to be inserted", inserted); + Bucket* bucket = it->second.get(); + stripe->openBuckets[info.key].emplace(bucket); + + auto state = _bucketStateManager.changeBucketState( + bucketId, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + invariant(!input.has_value()); + return BucketState{}; + }); + invariant(state == BucketState{}); + _numberOfActiveBuckets.fetchAndAdd(1); + + if (info.openedDuetoMetadata) { + info.stats.incNumBucketsOpenedDueToMetadata(); + } + + bucket->_timeField = info.options.getTimeField().toString(); + bucket->_minTime = roundedTime; + + // Make sure we set the control.min time field to match the rounded _id timestamp. + auto controlDoc = buildControlMinTimestampDoc(info.options.getTimeField(), roundedTime); + bucket->_minmax.update( + controlDoc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator()); + return bucket; +} + +std::pair<RolloverAction, RolloverReason> BucketCatalog::_determineRolloverAction( + OperationContext* opCtx, + const BSONObj& doc, + CreationInfo* info, + Bucket* bucket, + NewFieldNames* newFieldNamesToBeInserted, + int32_t* sizeToBeAdded, + AllowBucketCreation mode) { + // If the mode is enabled to create new buckets, then we should update stats for soft closures + // accordingly. If we specify the mode to not allow bucket creation, it means we are not sure if + // we want to soft close the bucket yet and should wait to update closure stats. + const bool shouldUpdateStats = (mode == AllowBucketCreation::kYes); + + auto bucketTime = bucket->getTime(); + if (info->time - bucketTime >= Seconds(*info->options.getBucketMaxSpanSeconds())) { + if (shouldUpdateStats) { + info->stats.incNumBucketsClosedDueToTimeForward(); + } + return {RolloverAction::kSoftClose, RolloverReason::kTimeForward}; + } + if (info->time < bucketTime) { + const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + if (shouldUpdateStats) { + if (canArchive) { + info->stats.incNumBucketsArchivedDueToTimeBackward(); + } else { + info->stats.incNumBucketsClosedDueToTimeBackward(); + } + } + return {canArchive ? RolloverAction::kArchive : RolloverAction::kSoftClose, + RolloverReason::kTimeBackward}; + } + if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { + info->stats.incNumBucketsClosedDueToCount(); + return {RolloverAction::kHardClose, RolloverReason::kCount}; + } + + // In scenarios where we have a high cardinality workload and face increased cache pressure we + // will decrease the size of buckets before we close them. + int32_t cacheDerivedBucketMaxSize = getCacheDerivedBucketMaxSize( + opCtx->getServiceContext()->getStorageEngine(), _numberOfActiveBuckets.load()); + int32_t effectiveMaxSize = std::min(gTimeseriesBucketMaxSize, cacheDerivedBucketMaxSize); + + // Before we hit our bucket minimum count, we will allow for large measurements to be inserted + // into buckets. Instead of packing the bucket to the BSON size limit, 16MB, we'll limit the max + // bucket size to 12MB. This is to leave some space in the bucket if we need to add new internal + // fields to existing, full buckets. + static constexpr int32_t largeMeasurementsMaxBucketSize = + BSONObjMaxUserSize - (4 * 1024 * 1024); + // We restrict the ceiling of the bucket max size under cache pressure. + int32_t absoluteMaxSize = std::min(largeMeasurementsMaxBucketSize, cacheDerivedBucketMaxSize); + + bucket->_calculateBucketFieldsAndSizeChange( + doc, info->options.getMetaField(), newFieldNamesToBeInserted, sizeToBeAdded); + if (bucket->_size + *sizeToBeAdded > effectiveMaxSize) { + bool keepBucketOpenForLargeMeasurements = + bucket->_numMeasurements < static_cast<std::uint64_t>(gTimeseriesBucketMinCount) && + feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); + if (keepBucketOpenForLargeMeasurements) { + if (bucket->_size + *sizeToBeAdded > absoluteMaxSize) { + if (absoluteMaxSize != largeMeasurementsMaxBucketSize) { + info->stats.incNumBucketsClosedDueToCachePressure(); + return {RolloverAction::kHardClose, RolloverReason::kCachePressure}; + } + info->stats.incNumBucketsClosedDueToSize(); + return {RolloverAction::kHardClose, RolloverReason::kSize}; + } + + // There's enough space to add this measurement and we're still below the large + // measurement threshold. + if (!bucket->_keptOpenDueToLargeMeasurements) { + // Only increment this metric once per bucket. + bucket->_keptOpenDueToLargeMeasurements = true; + info->stats.incNumBucketsKeptOpenDueToLargeMeasurements(); + } + return {RolloverAction::kNone, RolloverReason::kNone}; + } else { + if (effectiveMaxSize == gTimeseriesBucketMaxSize) { + info->stats.incNumBucketsClosedDueToSize(); + return {RolloverAction::kHardClose, RolloverReason::kSize}; + } + info->stats.incNumBucketsClosedDueToCachePressure(); + return {RolloverAction::kHardClose, RolloverReason::kCachePressure}; + } + } + + if (bucket->schemaIncompatible( + doc, info->options.getMetaField(), info->key.metadata.getComparator())) { + info->stats.incNumBucketsClosedDueToSchemaChange(); + return {RolloverAction::kHardClose, RolloverReason::kSchemaChange}; + } + + return {RolloverAction::kNone, RolloverReason::kNone}; +} + +Bucket* BucketCatalog::_rollover(Stripe* stripe, + WithLock stripeLock, + Bucket* bucket, + const CreationInfo& info, + RolloverAction action) { + invariant(action != RolloverAction::kNone); + if (bucket->allCommitted()) { + // The bucket does not contain any measurements that are yet to be committed, so we can take + // action now. + if (action == RolloverAction::kArchive) { + _archiveBucket(stripe, stripeLock, bucket, info.closedBuckets); + } else { + const bool eligibleForReopening = action == RolloverAction::kSoftClose; + info.closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, + bucket->bucketId(), + bucket->getTimeField().toString(), + bucket->numMeasurements(), + eligibleForReopening}); + + _removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose); + } + } else { + // We must keep the bucket around until all measurements are committed committed, just mark + // the action we chose now so it we know what to do when the last batch finishes. + bucket->setRolloverAction(action); + } + + return _allocateBucket(stripe, stripeLock, info); +} + +ExecutionStatsController BucketCatalog::_getExecutionStats(const NamespaceString& ns) { + stdx::lock_guard catalogLock{_mutex}; + auto it = _executionStats.find(ns); + if (it != _executionStats.end()) { + return {it->second, _globalExecutionStats}; + } + + auto res = _executionStats.emplace(ns, std::make_shared<ExecutionStats>()); + return {res.first->second, _globalExecutionStats}; +} + +std::shared_ptr<ExecutionStats> BucketCatalog::_getExecutionStats(const NamespaceString& ns) const { + static const auto kEmptyStats{std::make_shared<ExecutionStats>()}; + + stdx::lock_guard catalogLock{_mutex}; + + auto it = _executionStats.find(ns); + if (it != _executionStats.end()) { + return it->second; + } + return kEmptyStats; +} + +long long BucketCatalog::_marginalMemoryUsageForArchivedBucket(const ArchivedBucket& bucket, + bool onlyEntryForMatchingMetaHash) { + return sizeof(Date_t) + // key in set of archived buckets for meta hash + sizeof(ArchivedBucket) + // main data for archived bucket + bucket.timeField.size() + // allocated space for timeField string, ignoring SSO + (onlyEntryForMatchingMetaHash ? sizeof(std::size_t) + // key in set (meta hash) + sizeof(decltype(Stripe::archivedBuckets)::value_type) // set container + : 0); +} + +void BucketCatalog::_updateBucketFetchAndQueryStats(ExecutionStatsController& stats, + const BucketFindResult& findResult) { + if (findResult.fetchedBucket) { + if (findResult.bucketToReopen.has_value()) { + stats.incNumBucketsFetched(); + } else { + stats.incNumBucketFetchesFailed(); + } + } + + if (findResult.queriedBucket) { + if (findResult.bucketToReopen.has_value()) { + stats.incNumBucketsQueried(); + } else { + stats.incNumBucketQueriesFailed(); + } + } +} + +class BucketCatalog::ServerStatus : public ServerStatusSection { + struct BucketCounts { + BucketCounts& operator+=(const BucketCounts& other) { + if (&other != this) { + all += other.all; + open += other.open; + idle += other.idle; + } + return *this; + } + + std::size_t all = 0; + std::size_t open = 0; + std::size_t idle = 0; + }; + + BucketCounts _getBucketCounts(const BucketCatalog& catalog) const { + BucketCounts sum; + for (auto const& stripe : catalog._stripes) { + stdx::lock_guard stripeLock{stripe.mutex}; + sum += {stripe.allBuckets.size(), stripe.openBuckets.size(), stripe.idleBuckets.size()}; + } + return sum; + } + +public: + ServerStatus() : ServerStatusSection("bucketCatalog") {} + + bool includeByDefault() const override { + return true; + } + + BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override { + const auto& bucketCatalog = BucketCatalog::get(opCtx); + { + stdx::lock_guard catalogLock{bucketCatalog._mutex}; + if (bucketCatalog._executionStats.empty()) { + return {}; + } + } + + auto counts = _getBucketCounts(bucketCatalog); + auto numActive = bucketCatalog._numberOfActiveBuckets.load(); + BSONObjBuilder builder; + builder.appendNumber("numBuckets", static_cast<long long>(numActive)); + builder.appendNumber("numOpenBuckets", static_cast<long long>(counts.open)); + builder.appendNumber("numIdleBuckets", static_cast<long long>(counts.idle)); + builder.appendNumber("numArchivedBuckets", static_cast<long long>(numActive - counts.open)); + builder.appendNumber("memoryUsage", + static_cast<long long>(bucketCatalog._memoryUsage.load())); + + // Append the global execution stats for all namespaces. + bucketCatalog.appendGlobalExecutionStats(&builder); + + // Append the global state management stats for all namespaces. + bucketCatalog.appendStateManagementStats(&builder); + + return builder.obj(); + } +} bucketCatalogServerStatus; +} // namespace mongo::timeseries::bucket_catalog |