diff options
author | Dan Larkin-York <13419935+dhly-etc@users.noreply.github.com> | 2022-12-08 21:48:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-08 23:12:35 +0000 |
commit | 2460a8b708a5b0b0ef245cea1ebe9bb6d9929308 (patch) | |
tree | b02dabfc2798695d66b956c0059ec5195c65c318 /src/mongo/db/timeseries | |
parent | 4ea67148b604e355398a6355f7e348da12c677d9 (diff) | |
download | mongo-2460a8b708a5b0b0ef245cea1ebe9bb6d9929308.tar.gz |
SERVER-71457 Archive on time backwards, and restrict query-based reopening
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 505 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 192 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_helpers.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_helpers.h | 2 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 54 |
6 files changed, 510 insertions, 311 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 424e70002f4..91dcf1e7ff7 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -92,8 +92,8 @@ BSONObj buildControlMinTimestampDoc(StringData timeField, Date_t roundedTime) { return builder.obj(); } -std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOptions& options) { - OID bucketId = OID::gen(); +std::pair<OID, Date_t> generateBucketOID(const Date_t& time, const TimeseriesOptions& options) { + OID oid = OID::gen(); // We round the measurement timestamp down to the nearest minute, hour, or day depending on the // granularity. We do this for two reasons. The first is so that if measurements come in @@ -103,7 +103,7 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti // what the bucket timestamp will be, so we can route measurements to the right shard chunk. auto roundedTime = timeseries::roundTimestampToGranularity(time, options); int64_t const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch()); - bucketId.setTimestamp(roundedSeconds); + oid.setTimestamp(roundedSeconds); // Now, if we stopped here we could end up with bucket OID collisions. Consider the case where // we have the granularity set to 'Hours'. This means we will round down to the nearest day, so @@ -122,23 +122,18 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti // by restarting the new primary. It remains an open question whether we can fix this in a // better way. // TODO (SERVER-61412): Avoid time-series bucket OID collisions after election - auto instance = bucketId.getInstanceUnique(); + auto instance = oid.getInstanceUnique(); uint32_t sum = DataView(reinterpret_cast<char*>(instance.bytes)).read<uint32_t>(1) + (durationCount<Seconds>(time.toDurationSinceEpoch()) - roundedSeconds); DataView(reinterpret_cast<char*>(instance.bytes)).write<uint32_t>(sum, 1); - bucketId.setInstanceUnique(instance); + oid.setInstanceUnique(instance); - return {bucketId, roundedTime}; + return {oid, roundedTime}; } -Status getTimeseriesBucketClearedError(const OID& bucketId, - const boost::optional<NamespaceString>& ns = boost::none) { - std::string nsIdentification; - if (ns) { - nsIdentification.assign(str::stream() << " for namespace " << *ns); - } +Status getTimeseriesBucketClearedError(const NamespaceString& ns, const OID& oid) { return {ErrorCodes::TimeseriesBucketCleared, - str::stream() << "Time-series bucket " << bucketId << nsIdentification + str::stream() << "Time-series bucket " << oid << " for namespace " << ns << " was cleared"}; } @@ -232,6 +227,12 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToMemoryTh _globalStats->numBucketsArchivedDueToMemoryThreshold.fetchAndAddRelaxed(increment); } +void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToTimeBackward( + long long increment) { + _collectionStats->numBucketsArchivedDueToTimeBackward.fetchAndAddRelaxed(increment); + _globalStats->numBucketsArchivedDueToTimeBackward.fetchAndAddRelaxed(increment); +} + void BucketCatalog::ExecutionStatsController::incNumCommits(long long increment) { _collectionStats->numCommits.fetchAndAddRelaxed(increment); _globalStats->numCommits.fetchAndAddRelaxed(increment); @@ -430,7 +431,7 @@ bool BucketCatalog::BucketStateManager::_isMemberOfClearedSet(WithLock catalogLo Bucket* bucket) { for (auto it = _clearRegistry.lower_bound(bucket->getEra() + 1); it != _clearRegistry.end(); ++it) { - if (it->second(bucket->_ns)) { + if (it->second(bucket->ns())) { return true; } } @@ -445,7 +446,7 @@ bool BucketCatalog::BucketStateManager::_isMemberOfClearedSet(WithLock catalogLo boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::_markIndividualBucketCleared(WithLock catalogLock, - const OID& bucketId) { + const BucketId& bucketId) { return _changeBucketStateHelper( catalogLock, bucketId, @@ -462,16 +463,16 @@ boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::g stdx::lock_guard catalogLock{*_mutex}; // If the bucket has been cleared, we will set the bucket state accordingly to reflect that. if (_isMemberOfClearedSet(catalogLock, bucket)) { - return _markIndividualBucketCleared(catalogLock, bucket->id()); + return _markIndividualBucketCleared(catalogLock, bucket->bucketId()); } - auto it = _bucketStates.find(bucket->id()); + auto it = _bucketStates.find(bucket->bucketId()); return it != _bucketStates.end() ? boost::make_optional(it->second) : boost::none; } boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::getBucketState( - const OID& oid) const { + const BucketId& bucketId) const { stdx::lock_guard catalogLock{*_mutex}; - auto it = _bucketStates.find(oid); + auto it = _bucketStates.find(bucketId); return it != _bucketStates.end() ? boost::make_optional(it->second) : boost::none; } @@ -479,16 +480,16 @@ boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::c Bucket* bucket, const StateChangeFn& change) { stdx::lock_guard catalogLock{*_mutex}; if (_isMemberOfClearedSet(catalogLock, bucket)) { - return _markIndividualBucketCleared(catalogLock, bucket->id()); + return _markIndividualBucketCleared(catalogLock, bucket->bucketId()); } - return _changeBucketStateHelper(catalogLock, bucket->id(), change); + return _changeBucketStateHelper(catalogLock, bucket->bucketId(), change); } boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::changeBucketState( - const OID& id, const StateChangeFn& change) { + const BucketId& bucketId, const StateChangeFn& change) { stdx::lock_guard catalogLock{*_mutex}; - return _changeBucketStateHelper(catalogLock, id, change); + return _changeBucketStateHelper(catalogLock, bucketId, change); } void BucketCatalog::BucketStateManager::appendStats(BSONObjBuilder* base) const { @@ -504,9 +505,9 @@ void BucketCatalog::BucketStateManager::appendStats(BSONObjBuilder* base) const boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::_changeBucketStateHelper(WithLock catalogLock, - const OID& id, + const BucketId& bucketId, const StateChangeFn& change) { - auto it = _bucketStates.find(id); + auto it = _bucketStates.find(bucketId); const boost::optional<BucketState> initial = (it == _bucketStates.end()) ? boost::none : boost::make_optional(it->second); const boost::optional<BucketState> target = change(initial, _era); @@ -528,7 +529,7 @@ BucketCatalog::BucketStateManager::_changeBucketStateHelper(WithLock catalogLock } return boost::none; } else if (!initial.has_value()) { - _bucketStates.emplace(id, target.value()); + _bucketStates.emplace(bucketId, target.value()); return target; } @@ -568,15 +569,15 @@ void BucketCatalog::BucketStateManager::_cleanClearRegistry() { _clearRegistry.erase(_clearRegistry.begin(), endIt); } -BucketCatalog::Bucket::Bucket(const OID& id, +BucketCatalog::Bucket::Bucket(const BucketId& bucketId, StripeNumber stripe, - BucketKey::Hash hash, + BucketKey::Hash keyHash, BucketStateManager* bucketStateManager) : _lastCheckedEra(bucketStateManager->getEraAndIncrementCount()), _bucketStateManager(bucketStateManager), - _id(id), + _bucketId(bucketId), _stripe(stripe), - _keyHash(hash) {} + _keyHash(keyHash) {} BucketCatalog::Bucket::~Bucket() { _bucketStateManager->decrementCountForEra(getEra()); @@ -590,8 +591,16 @@ void BucketCatalog::Bucket::setEra(uint64_t era) { _lastCheckedEra = era; } -const OID& BucketCatalog::Bucket::id() const { - return _id; +const BucketCatalog::BucketId& BucketCatalog::Bucket::bucketId() const { + return _bucketId; +} + +const OID& BucketCatalog::Bucket::oid() const { + return _bucketId.oid; +} + +const NamespaceString& BucketCatalog::Bucket::ns() const { + return _bucketId.ns; } BucketCatalog::StripeNumber BucketCatalog::Bucket::stripe() const { @@ -618,10 +627,6 @@ uint32_t BucketCatalog::Bucket::numMeasurements() const { return _numMeasurements; } -void BucketCatalog::Bucket::setNamespace(const NamespaceString& ns) { - _ns = ns; -} - void BucketCatalog::Bucket::setRolloverAction(RolloverAction action) { _rolloverAction = action; } @@ -690,8 +695,9 @@ std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch( auto it = _batches.find(opId); if (it == _batches.end()) { it = _batches - .try_emplace(opId, - std::make_shared<WriteBatch>(BucketHandle{_id, _stripe}, opId, stats)) + .try_emplace( + opId, + std::make_shared<WriteBatch>(BucketHandle{_bucketId, _stripe}, opId, stats)) .first; } return it->second; @@ -708,11 +714,11 @@ BucketCatalog::ClosedBucket::~ClosedBucket() { } BucketCatalog::ClosedBucket::ClosedBucket(BucketStateManager* bsm, - const OID& id, + const BucketId& bucketId, const std::string& tf, boost::optional<uint32_t> nm, bool efr) - : bucketId{id}, + : bucketId{bucketId}, timeField{tf}, numMeasurements{nm}, eligibleForReopening{efr}, @@ -918,21 +924,21 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx, stdx::lock_guard stripeLock{stripe.mutex}; ClosedBuckets closedBuckets; - _reopenBucket(&stripe, - stripeLock, - stats, - key, - std::move(bucket), - _bucketStateManager.getEra(), - &closedBuckets); - return Status::OK(); + return _reopenBucket(&stripe, + stripeLock, + stats, + key, + std::move(bucket), + _bucketStateManager.getEra(), + &closedBuckets) + .getStatus(); } BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) { auto const& stripe = _stripes[handle.stripe]; stdx::lock_guard stripeLock{stripe.mutex}; - const Bucket* bucket = _findBucket(stripe, stripeLock, handle.id); + const Bucket* bucket = _findBucket(stripe, stripeLock, handle.bucketId); if (!bucket) { return {}; } @@ -977,7 +983,7 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { Bucket* bucket = _useBucketAndChangeState( &stripe, stripeLock, - batch->bucket().id, + batch->bucket().bucketId, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { invariant(input.has_value()); return input.value().setFlag(BucketStateFlag::kPrepared); @@ -991,7 +997,11 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { } return getBatchStatus(); } else if (!bucket) { - _abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id)); + _abort(&stripe, + stripeLock, + batch, + getTimeseriesBucketClearedError(batch->bucket().bucketId.ns, + batch->bucket().bucketId.oid)); return getBatchStatus(); } @@ -1016,7 +1026,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( Bucket* bucket = _useBucketAndChangeState( &stripe, stripeLock, - batch->bucket().id, + batch->bucket().bucketId, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { invariant(input.has_value()); return input.value().unsetFlag(BucketStateFlag::kPrepared); @@ -1042,7 +1052,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( // It's possible that we cleared the bucket in between preparing the commit and finishing // here. In this case, we should abort any other ongoing batches and clear the bucket from // the catalog so it's not hanging around idle. - auto it = stripe.allBuckets.find(batch->bucket().id); + auto it = stripe.allBuckets.find(batch->bucket().bucketId); if (it != stripe.allBuckets.end()) { bucket = it->second.get(); bucket->_preparedBatch.reset(); @@ -1050,7 +1060,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( stripeLock, bucket, nullptr, - getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); + getTimeseriesBucketClearedError(bucket->ns(), bucket->oid())); } } else if (bucket->allCommitted()) { switch (bucket->_rolloverAction) { @@ -1059,13 +1069,21 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( const bool eligibleForReopening = bucket->_rolloverAction == RolloverAction::kSoftClose; closedBucket = boost::in_place(&_bucketStateManager, - bucket->id(), + bucket->bucketId(), bucket->getTimeField().toString(), bucket->numMeasurements(), eligibleForReopening); _removeBucket(&stripe, stripeLock, bucket, RemovalMode::kClose); break; } + case RolloverAction::kArchive: { + ClosedBuckets closedBuckets; + _archiveBucket(&stripe, stripeLock, bucket, &closedBuckets); + if (!closedBuckets.empty()) { + closedBucket = std::move(closedBuckets[0]); + } + break; + } case RolloverAction::kNone: { _markBucketIdle(&stripe, stripeLock, bucket); break; @@ -1089,9 +1107,11 @@ void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, const Status& statu _abort(&stripe, stripeLock, batch, status); } -void BucketCatalog::directWriteStart(const OID& oid) { +void BucketCatalog::directWriteStart(const NamespaceString& ns, const OID& oid) { + invariant(!ns.isTimeseriesBucketsCollection()); auto result = _bucketStateManager.changeBucketState( - oid, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + BucketId{ns, oid}, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { if (input.has_value()) { return input.value().setFlag(BucketStateFlag::kPendingDirectWrite); } @@ -1109,10 +1129,12 @@ void BucketCatalog::directWriteStart(const OID& oid) { hangTimeseriesDirectModificationAfterStart.pauseWhileSet(); } -void BucketCatalog::directWriteFinish(const OID& oid) { +void BucketCatalog::directWriteFinish(const NamespaceString& ns, const OID& oid) { + invariant(!ns.isTimeseriesBucketsCollection()); hangTimeseriesDirectModificationBeforeFinish.pauseWhileSet(); (void)_bucketStateManager.changeBucketState( - oid, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + BucketId{ns, oid}, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { if (!input.has_value()) { // We may have had multiple direct writes to this document in the same storage // transaction. If so, a previous call to directWriteFinish may have cleaned up the @@ -1142,16 +1164,16 @@ void BucketCatalog::clear(ShouldClearFn&& shouldClear) { auto nextIt = std::next(it); const auto& bucket = it->second; - if (shouldClear(bucket->_ns)) { + if (shouldClear(bucket->ns())) { { stdx::lock_guard catalogLock{_mutex}; - _executionStats.erase(bucket->_ns); + _executionStats.erase(bucket->ns()); } _abort(&stripe, stripeLock, bucket.get(), nullptr, - getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); + getTimeseriesBucketClearedError(bucket->ns(), bucket->oid())); } it = nextIt; @@ -1160,6 +1182,7 @@ void BucketCatalog::clear(ShouldClearFn&& shouldClear) { } void BucketCatalog::clear(const NamespaceString& ns) { + invariant(!ns.isTimeseriesBucketsCollection()); clear([ns](const NamespaceString& bucketNs) { return bucketNs == ns; }); } @@ -1201,6 +1224,8 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, stats->numBucketsClosedDueToReopening.load()); builder->appendNumber("numBucketsArchivedDueToMemoryThreshold", stats->numBucketsArchivedDueToMemoryThreshold.load()); + builder->appendNumber("numBucketsArchivedDueToTimeBackward", + stats->numBucketsArchivedDueToTimeBackward.load()); builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load()); builder->appendNumber("numBucketsKeptOpenDueToLargeMeasurements", stats->numBucketsKeptOpenDueToLargeMeasurements.load()); @@ -1217,6 +1242,7 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats, } void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const { + invariant(!ns.isTimeseriesBucketsCollection()); const std::shared_ptr<ExecutionStats> stats = _getExecutionStats(ns); _appendExecutionStatsToBuilder(stats.get(), builder); } @@ -1270,12 +1296,20 @@ const StringData::ComparatorInterface* BucketCatalog::BucketMetadata::getCompara BucketCatalog::BucketKey::BucketKey(const NamespaceString& n, const BucketMetadata& m) : ns(n), metadata(m), hash(absl::Hash<BucketKey>{}(*this)) {} +BucketCatalog::BucketId::BucketId(const NamespaceString& n, const OID& o) + : ns{n}, oid{o}, hash{absl::Hash<BucketId>{}(*this)} {} + std::size_t BucketCatalog::BucketHasher::operator()(const BucketKey& key) const { // Use the default absl hasher. return key.hash; } -std::size_t BucketCatalog::PreHashed::operator()(const BucketKey::Hash& key) const { +std::size_t BucketCatalog::BucketHasher::operator()(const BucketId& bucketId) const { + // Use the default absl hasher. + return bucketId.hash; +} + +std::size_t BucketCatalog::BucketHasher::operator()(const BucketKey::Hash& key) const { return key; } @@ -1319,9 +1353,9 @@ BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key const BucketCatalog::Bucket* BucketCatalog::_findBucket(const Stripe& stripe, WithLock, - const OID& id, + const BucketId& bucketId, IgnoreBucketState mode) { - auto it = stripe.allBuckets.find(id); + auto it = stripe.allBuckets.find(bucketId); if (it != stripe.allBuckets.end()) { if (mode == IgnoreBucketState::kYes) { return it->second.get(); @@ -1337,17 +1371,17 @@ const BucketCatalog::Bucket* BucketCatalog::_findBucket(const Stripe& stripe, BucketCatalog::Bucket* BucketCatalog::_useBucket(Stripe* stripe, WithLock stripeLock, - const OID& id, + const BucketId& bucketId, IgnoreBucketState mode) { - return const_cast<Bucket*>(_findBucket(*stripe, stripeLock, id, mode)); + return const_cast<Bucket*>(_findBucket(*stripe, stripeLock, bucketId, mode)); } BucketCatalog::Bucket* BucketCatalog::_useBucketAndChangeState( Stripe* stripe, WithLock stripeLock, - const OID& id, + const BucketId& bucketId, const BucketStateManager::StateChangeFn& change) { - auto it = stripe->allBuckets.find(id); + auto it = stripe->allBuckets.find(bucketId); if (it != stripe->allBuckets.end()) { if (auto state = _bucketStateManager.changeBucketState(it->second.get(), change); state && !state.value().conflictsWithInsertion()) { @@ -1380,7 +1414,7 @@ BucketCatalog::Bucket* BucketCatalog::_useBucket(Stripe* stripe, stripeLock, bucket, nullptr, - getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); + getTimeseriesBucketClearedError(bucket->ns(), bucket->oid())); return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) : nullptr; } @@ -1426,7 +1460,7 @@ StatusWith<std::unique_ptr<BucketCatalog::Bucket>> BucketCatalog::_rehydrateBuck } auto stripeNumber = _getStripeNumber(key); - auto bucketId = bucketIdElem.OID(); + BucketId bucketId{key.ns, bucketIdElem.OID()}; std::unique_ptr<Bucket> bucket = std::make_unique<Bucket>(bucketId, stripeNumber, key.hash, &_bucketStateManager); @@ -1437,7 +1471,6 @@ StatusWith<std::unique_ptr<BucketCatalog::Bucket>> BucketCatalog::_rehydrateBuck } // Initialize the remaining member variables from the bucket document. - bucket->setNamespace(ns); bucket->_metadata = key.metadata; bucket->_timeField = options.getTimeField().toString(); if (isCompressed) { @@ -1502,35 +1535,34 @@ StatusWith<std::unique_ptr<BucketCatalog::Bucket>> BucketCatalog::_rehydrateBuck return {std::move(bucket)}; } -BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe, - WithLock stripeLock, - ExecutionStatsController stats, - const BucketKey& key, - std::unique_ptr<Bucket>&& bucket, - std::uint64_t targetEra, - ClosedBuckets* closedBuckets) { +StatusWith<BucketCatalog::Bucket*> BucketCatalog::_reopenBucket(Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController stats, + const BucketKey& key, + std::unique_ptr<Bucket>&& bucket, + std::uint64_t targetEra, + ClosedBuckets* closedBuckets) { invariant(bucket); _expireIdleBuckets(stripe, stripeLock, stats, closedBuckets); - hangTimeseriesInsertBeforeReopeningBucket.pauseWhileSet(); - // We may need to initialize the bucket's state. - bool initialized = false; - auto state = _bucketStateManager.changeBucketState( - bucket->id(), - [targetEra, &initialized](boost::optional<BucketState> input, - std::uint64_t currentEra) -> boost::optional<BucketState> { - if (targetEra < currentEra || - (input.has_value() && input.value().conflictsWithReopening())) { - initialized = false; - return input; - } - initialized = true; - return BucketState{}; - }); - if (!initialized) { - return nullptr; + bool conflicts = false; + auto initializeStateFn = + [targetEra, &conflicts](boost::optional<BucketState> input, + std::uint64_t currentEra) -> boost::optional<BucketState> { + if (targetEra < currentEra || + (input.has_value() && input.value().conflictsWithReopening())) { + conflicts = true; + return input; + } + conflicts = false; + return input.has_value() ? input.value() : BucketState{}; + }; + + auto state = _bucketStateManager.changeBucketState(bucket->bucketId(), initializeStateFn); + if (conflicts) { + return {ErrorCodes::WriteConflict, "Bucket may be stale"}; } // If this bucket was archived, we need to remove it from the set of archived buckets. @@ -1538,7 +1570,7 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe, setIt != stripe->archivedBuckets.end()) { auto& archivedSet = setIt->second; if (auto bucketIt = archivedSet.find(bucket->getTime()); - bucketIt != archivedSet.end() && bucket->id() == bucketIt->second.bucketId) { + bucketIt != archivedSet.end() && bucket->bucketId() == bucketIt->second.bucketId) { long long memory = _marginalMemoryUsageForArchivedBucket(bucketIt->second, archivedSet.size() == 1); if (archivedSet.size() == 1) { @@ -1552,16 +1584,10 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe, } // Pass ownership of the reopened bucket to the bucket catalog. - auto [it, inserted] = stripe->allBuckets.try_emplace(bucket->id(), std::move(bucket)); - Bucket* unownedBucket = it->second.get(); - - // If the bucket wasn't inserted into the stripe, then that bucket is already open and we can - // return the bucket 'it' points to. - if (!inserted) { - stats.incNumDuplicateBucketsReopened(); - _markBucketNotIdle(stripe, stripeLock, unownedBucket); - return unownedBucket; - } + auto [insertedIt, newlyInserted] = + stripe->allBuckets.try_emplace(bucket->bucketId(), std::move(bucket)); + invariant(newlyInserted); + Bucket* unownedBucket = insertedIt->second.get(); // If we already have an open bucket for this key, we need to close it. if (auto it = stripe->openBuckets.find(key); it != stripe->openBuckets.end()) { @@ -1570,7 +1596,7 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe, auto* closedBucket = it->second; constexpr bool eligibleForReopening = true; closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, - closedBucket->id(), + closedBucket->bucketId(), closedBucket->getTimeField().toString(), closedBucket->numMeasurements(), eligibleForReopening}); @@ -1590,6 +1616,53 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe, return unownedBucket; } +StatusWith<BucketCatalog::Bucket*> BucketCatalog::_reuseExistingBucket( + Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController* stats, + const BucketKey& key, + Bucket* existingBucket, + std::uint64_t targetEra) { + invariant(existingBucket); + + // If we have an existing bucket, passing the Bucket* will let us check if the bucket was + // cleared as part of a set since the last time it was used. If we were to just check by + // OID, we may miss if e.g. there was a move chunk operation. + bool conflicts = false; + auto state = _bucketStateManager.changeBucketState( + existingBucket, + [targetEra, &conflicts](boost::optional<BucketState> input, + std::uint64_t currentEra) -> boost::optional<BucketState> { + if (targetEra < currentEra || + (input.has_value() && input.value().conflictsWithReopening())) { + conflicts = true; + return input; + } + conflicts = false; + return input.has_value() ? input.value() : BucketState{}; + }); + if (state.has_value() && state.value().isSet(BucketStateFlag::kCleared)) { + _removeBucket(stripe, stripeLock, existingBucket, RemovalMode::kAbort); + conflicts = true; + } + if (conflicts) { + return {ErrorCodes::WriteConflict, "Bucket may be stale"}; + } + + // It's possible to have two buckets with the same ID in different collections, so let's make + // extra sure the existing bucket is the right one. + if (existingBucket->ns() != key.ns) { + return {ErrorCodes::BadValue, "Cannot re-use bucket: same ID but different namespace"}; + } + + // If the bucket was already open, wasn't cleared, the state didn't conflict with reopening, and + // the namespace matches, then we can simply return it. + stats->incNumDuplicateBucketsReopened(); + _markBucketNotIdle(stripe, stripeLock, existingBucket); + + return existingBucket; +} + StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( OperationContext* opCtx, const NamespaceString& ns, @@ -1599,6 +1672,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( CombineWithInsertsFromOtherClients combine, AllowBucketCreation mode, BucketFindResult bucketFindResult) { + invariant(!ns.isTimeseriesBucketsCollection()); auto res = _extractBucketingParameters(ns, comparator, options, doc); if (!res.isOK()) { @@ -1632,70 +1706,99 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert( if (rehydratedBucket.isOK()) { invariant(mode == AllowBucketCreation::kYes); - if (Bucket* bucket = _reopenBucket(&stripe, - stripeLock, - stats, - key, - std::move(rehydratedBucket.getValue()), - bucketToReopen->catalogEra, - &result.closedBuckets)) { - result.batch = _insertIntoBucket(opCtx, - &stripe, - stripeLock, - doc, - combine, - mode, - &info, - bucket, - &result.closedBuckets); - invariant(result.batch); + hangTimeseriesInsertBeforeReopeningBucket.pauseWhileSet(); + + StatusWith<Bucket*> swBucket{nullptr}; + auto existingIt = stripe.allBuckets.find(rehydratedBucket.getValue()->bucketId()); + if (existingIt != stripe.allBuckets.end()) { + // First let's check the existing bucket if we have one. + Bucket* existingBucket = existingIt->second.get(); + swBucket = _reuseExistingBucket( + &stripe, stripeLock, &stats, key, existingBucket, bucketToReopen->catalogEra); + } else { + // No existing bucket to use, go ahead and try to reopen our rehydrated bucket. + swBucket = _reopenBucket(&stripe, + stripeLock, + stats, + key, + std::move(rehydratedBucket.getValue()), + bucketToReopen->catalogEra, + &result.closedBuckets); + } + + if (swBucket.isOK()) { + Bucket* bucket = swBucket.getValue(); + invariant(bucket); + auto insertionResult = _insertIntoBucket(opCtx, + &stripe, + stripeLock, + doc, + combine, + mode, + &info, + bucket, + &result.closedBuckets); + auto* batch = stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); + invariant(batch); + result.batch = *batch; return std::move(result); } else { stats.incNumBucketReopeningsFailed(); - return {ErrorCodes::WriteConflict, "Bucket may be stale"}; + if (swBucket.getStatus().code() == ErrorCodes::WriteConflict) { + return swBucket.getStatus(); + } + // If we had a different type of error, then we should fall through and proceed to open + // a new bucket. } } Bucket* bucket = _useBucket(&stripe, stripeLock, info, mode); if (!bucket) { invariant(mode == AllowBucketCreation::kNo); - result.candidate = _getReopeningCandidate(&stripe, stripeLock, info); + constexpr bool allowQueryBasedReopening = true; + result.candidate = + _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening); return std::move(result); } - result.batch = _insertIntoBucket( + auto insertionResult = _insertIntoBucket( opCtx, &stripe, stripeLock, doc, combine, mode, &info, bucket, &result.closedBuckets); - if (!result.batch) { + if (auto* reason = stdx::get_if<RolloverReason>(&insertionResult)) { invariant(mode == AllowBucketCreation::kNo); if (bucket->allCommitted()) { _markBucketIdle(&stripe, stripeLock, bucket); } - result.candidate = _getReopeningCandidate(&stripe, stripeLock, info); + bool allowQueryBasedReopening = (*reason == RolloverReason::kTimeBackward); + result.candidate = + _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening); + } else { + result.batch = *stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult); } return std::move(result); } -std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::_insertIntoBucket( - OperationContext* opCtx, - Stripe* stripe, - WithLock stripeLock, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine, - AllowBucketCreation mode, - CreationInfo* info, - Bucket* bucket, - ClosedBuckets* closedBuckets) { +stdx::variant<std::shared_ptr<BucketCatalog::WriteBatch>, BucketCatalog::RolloverReason> +BucketCatalog::_insertIntoBucket(OperationContext* opCtx, + Stripe* stripe, + WithLock stripeLock, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + CreationInfo* info, + Bucket* bucket, + ClosedBuckets* closedBuckets) { NewFieldNames newFieldNamesToBeInserted; int32_t sizeToBeAdded = 0; - bool isNewlyOpenedBucket = bucket->_ns.isEmpty(); + bool isNewlyOpenedBucket = (bucket->_size == 0); if (!isNewlyOpenedBucket) { - auto action = _determineRolloverAction( + auto [action, reason] = _determineRolloverAction( opCtx, doc, info, bucket, &newFieldNamesToBeInserted, &sizeToBeAdded, mode); - if (action == RolloverAction::kSoftClose && mode == AllowBucketCreation::kNo) { + if ((action == RolloverAction::kSoftClose || action == RolloverAction::kArchive) && + mode == AllowBucketCreation::kNo) { // We don't actually want to roll this bucket over yet, bail out. - return std::shared_ptr<WriteBatch>{}; + return reason; } else if (action != RolloverAction::kNone) { info->openedDuetoMetadata = false; bucket = _rollover(stripe, stripeLock, bucket, *info, action); @@ -1714,8 +1817,7 @@ std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::_insertIntoBucket( bucket->_numMeasurements++; bucket->_size += sizeToBeAdded; if (isNewlyOpenedBucket) { - // The namespace and metadata only need to be set if this bucket was newly created. - bucket->setNamespace(info->key.ns); + // The metadata only needs to be set if this bucket was newly created. bucket->_metadata = info->key.metadata; // The namespace is stored two times: the bucket itself and openBuckets. @@ -1745,7 +1847,7 @@ void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<Wri { stdx::lock_guard stripeLock{stripe->mutex}; Bucket* bucket = - _useBucket(stripe, stripeLock, batch->bucket().id, IgnoreBucketState::kNo); + _useBucket(stripe, stripeLock, batch->bucket().bucketId, IgnoreBucketState::kNo); if (!bucket || batch->finished()) { return; } @@ -1771,14 +1873,14 @@ void BucketCatalog::_removeBucket(Stripe* stripe, invariant(bucket->_batches.empty()); invariant(!bucket->_preparedBatch); - auto allIt = stripe->allBuckets.find(bucket->id()); + auto allIt = stripe->allBuckets.find(bucket->bucketId()); invariant(allIt != stripe->allBuckets.end()); _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); _markBucketNotIdle(stripe, stripeLock, bucket); // If the bucket was rolled over, then there may be a different open bucket for this metadata. - auto openIt = stripe->openBuckets.find({bucket->_ns, bucket->_metadata}); + auto openIt = stripe->openBuckets.find({bucket->ns(), bucket->_metadata}); if (openIt != stripe->openBuckets.end() && openIt->second == bucket) { stripe->openBuckets.erase(openIt); } @@ -1787,14 +1889,14 @@ void BucketCatalog::_removeBucket(Stripe* stripe, // we can remove the state from the catalog altogether. switch (mode) { case RemovalMode::kClose: { - auto state = _bucketStateManager.getBucketState(bucket->id()); + auto state = _bucketStateManager.getBucketState(bucket->bucketId()); invariant(state.has_value()); invariant(state.value().isSet(BucketStateFlag::kPendingCompression)); break; } case RemovalMode::kAbort: _bucketStateManager.changeBucketState( - bucket->id(), + bucket->bucketId(), [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { invariant(input.has_value()); @@ -1821,11 +1923,12 @@ void BucketCatalog::_archiveBucket(Stripe* stripe, auto& archivedSet = stripe->archivedBuckets[bucket->keyHash()]; auto it = archivedSet.find(bucket->getTime()); if (it == archivedSet.end()) { - archivedSet.emplace(bucket->getTime(), - ArchivedBucket{bucket->id(), bucket->getTimeField().toString()}); + auto [it, inserted] = archivedSet.emplace( + bucket->getTime(), + ArchivedBucket{bucket->bucketId(), bucket->getTimeField().toString()}); - long long memory = _marginalMemoryUsageForArchivedBucket(archivedSet[bucket->getTime()], - archivedSet.size() == 1); + long long memory = + _marginalMemoryUsageForArchivedBucket(it->second, archivedSet.size() == 1); _memoryUsage.fetchAndAdd(memory); archived = true; } @@ -1842,7 +1945,7 @@ void BucketCatalog::_archiveBucket(Stripe* stripe, mode = RemovalMode::kClose; constexpr bool eligibleForReopening = true; closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, - bucket->id(), + bucket->bucketId(), bucket->getTimeField().toString(), bucket->numMeasurements(), eligibleForReopening}); @@ -1877,7 +1980,7 @@ boost::optional<OID> BucketCatalog::_findArchivedCandidate(Stripe* stripe, if (info.time - candidateTime < Seconds(*info.options.getBucketMaxSpanSeconds())) { auto state = _bucketStateManager.getBucketState(candidateBucket.bucketId); if (state && !state.value().conflictsWithReopening()) { - return candidateBucket.bucketId; + return candidateBucket.bucketId.oid; } else { if (state) { _bucketStateManager.changeBucketState( @@ -1907,11 +2010,15 @@ boost::optional<OID> BucketCatalog::_findArchivedCandidate(Stripe* stripe, } stdx::variant<std::monostate, OID, BSONObj> BucketCatalog::_getReopeningCandidate( - Stripe* stripe, WithLock stripeLock, const CreationInfo& info) { + Stripe* stripe, WithLock stripeLock, const CreationInfo& info, bool allowQueryBasedReopening) { if (auto archived = _findArchivedCandidate(stripe, stripeLock, info)) { return archived.value(); } + if (!allowQueryBasedReopening) { + return {}; + } + boost::optional<BSONElement> metaElement; if (info.options.getMetaField().has_value()) { metaElement = info.key.metadata.element(); @@ -1929,7 +2036,8 @@ void BucketCatalog::_abort(Stripe* stripe, std::shared_ptr<WriteBatch> batch, const Status& status) { // Before we access the bucket, make sure it's still there. - Bucket* bucket = _useBucket(stripe, stripeLock, batch->bucket().id, IgnoreBucketState::kYes); + Bucket* bucket = + _useBucket(stripe, stripeLock, batch->bucket().bucketId, IgnoreBucketState::kYes); if (!bucket) { // Special case, bucket has already been cleared, and we need only abort this batch. batch->_abort(status); @@ -1971,9 +2079,10 @@ void BucketCatalog::_abort(Stripe* stripe, } } -void BucketCatalog::_compressionDone(const OID& id) { +void BucketCatalog::_compressionDone(const BucketId& bucketId) { _bucketStateManager.changeBucketState( - id, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { + bucketId, + [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { return boost::none; }); } @@ -2020,7 +2129,7 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe, _removeBucket(stripe, stripeLock, bucket, RemovalMode::kAbort); } else { closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, - bucket->id(), + bucket->bucketId(), bucket->getTimeField().toString(), bucket->numMeasurements(), eligibleForReopening}); @@ -2066,7 +2175,8 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, const CreationInfo& info) { _expireIdleBuckets(stripe, stripeLock, info.stats, info.closedBuckets); - auto [bucketId, roundedTime] = generateBucketId(info.time, info.options); + auto [oid, roundedTime] = generateBucketOID(info.time, info.options); + auto bucketId = BucketId{info.key.ns, oid}; auto [it, inserted] = stripe->allBuckets.try_emplace( bucketId, @@ -2098,14 +2208,14 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, return bucket; } -BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction( - OperationContext* opCtx, - const BSONObj& doc, - CreationInfo* info, - Bucket* bucket, - NewFieldNames* newFieldNamesToBeInserted, - int32_t* sizeToBeAdded, - AllowBucketCreation mode) { +std::pair<BucketCatalog::RolloverAction, BucketCatalog::RolloverReason> +BucketCatalog::_determineRolloverAction(OperationContext* opCtx, + const BSONObj& doc, + CreationInfo* info, + Bucket* bucket, + NewFieldNames* newFieldNamesToBeInserted, + int32_t* sizeToBeAdded, + AllowBucketCreation mode) { // If the mode is enabled to create new buckets, then we should update stats for soft closures // accordingly. If we specify the mode to not allow bucket creation, it means we are not sure if // we want to soft close the bucket yet and should wait to update closure stats. @@ -2116,17 +2226,24 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction( if (shouldUpdateStats) { info->stats.incNumBucketsClosedDueToTimeForward(); } - return RolloverAction::kSoftClose; + return {RolloverAction::kSoftClose, RolloverReason::kTimeForward}; } if (info->time < bucketTime) { + const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled( + serverGlobalParams.featureCompatibility); if (shouldUpdateStats) { - info->stats.incNumBucketsClosedDueToTimeBackward(); + if (canArchive) { + info->stats.incNumBucketsArchivedDueToTimeBackward(); + } else { + info->stats.incNumBucketsClosedDueToTimeBackward(); + } } - return RolloverAction::kSoftClose; + return {canArchive ? RolloverAction::kArchive : RolloverAction::kSoftClose, + RolloverReason::kTimeBackward}; } if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { info->stats.incNumBucketsClosedDueToCount(); - return RolloverAction::kHardClose; + return {RolloverAction::kHardClose, RolloverReason::kCount}; } // In scenarios where we have a high cardinality workload and face increased cache pressure we @@ -2155,10 +2272,10 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction( if (bucket->_size + *sizeToBeAdded > absoluteMaxSize) { if (absoluteMaxSize != largeMeasurementsMaxBucketSize) { info->stats.incNumBucketsClosedDueToCachePressure(); - } else { - info->stats.incNumBucketsClosedDueToSize(); + return {RolloverAction::kHardClose, RolloverReason::kCachePressure}; } - return RolloverAction::kHardClose; + info->stats.incNumBucketsClosedDueToSize(); + return {RolloverAction::kHardClose, RolloverReason::kSize}; } // There's enough space to add this measurement and we're still below the large @@ -2168,24 +2285,24 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction( bucket->_keptOpenDueToLargeMeasurements = true; info->stats.incNumBucketsKeptOpenDueToLargeMeasurements(); } - return RolloverAction::kNone; + return {RolloverAction::kNone, RolloverReason::kNone}; } else { if (effectiveMaxSize == gTimeseriesBucketMaxSize) { info->stats.incNumBucketsClosedDueToSize(); - } else { - info->stats.incNumBucketsClosedDueToCachePressure(); + return {RolloverAction::kHardClose, RolloverReason::kSize}; } - return RolloverAction::kHardClose; + info->stats.incNumBucketsClosedDueToCachePressure(); + return {RolloverAction::kHardClose, RolloverReason::kCachePressure}; } } if (bucket->schemaIncompatible( doc, info->options.getMetaField(), info->key.metadata.getComparator())) { info->stats.incNumBucketsClosedDueToSchemaChange(); - return RolloverAction::kHardClose; + return {RolloverAction::kHardClose, RolloverReason::kSchemaChange}; } - return RolloverAction::kNone; + return {RolloverAction::kNone, RolloverReason::kNone}; } BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe, @@ -2197,14 +2314,18 @@ BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe, if (bucket->allCommitted()) { // The bucket does not contain any measurements that are yet to be committed, so we can take // action now. - const bool eligibleForReopening = action == RolloverAction::kSoftClose; - info.closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, - bucket->id(), - bucket->getTimeField().toString(), - bucket->numMeasurements(), - eligibleForReopening}); - - _removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose); + if (action == RolloverAction::kArchive) { + _archiveBucket(stripe, stripeLock, bucket, info.closedBuckets); + } else { + const bool eligibleForReopening = action == RolloverAction::kSoftClose; + info.closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager, + bucket->bucketId(), + bucket->getTimeField().toString(), + bucket->numMeasurements(), + eligibleForReopening}); + + _removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose); + } } else { // We must keep the bucket around until all measurements are committed committed, just mark // the action we chose now so it we know what to do when the last batch finishes. diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 8568349bc6c..637129f0e56 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -65,8 +65,34 @@ protected: class BucketStateManager; + /** + * Identifier to lookup bucket by namespace and OID, with pre-computed hash. + */ + struct BucketId { + using Hash = std::size_t; + + BucketId() = delete; + BucketId(const NamespaceString& nss, const OID& oid); + + NamespaceString ns; + OID oid; + Hash hash; + + bool operator==(const BucketId& other) const { + return oid == other.oid && ns == other.ns; + } + bool operator!=(const BucketId& other) const { + return !(*this == other); + } + + template <typename H> + friend H AbslHashValue(H h, const BucketId& bucketId) { + return H::combine(std::move(h), bucketId.oid, bucketId.ns); + } + }; + struct BucketHandle { - const OID id; + const BucketId bucketId; const StripeNumber stripe; }; @@ -83,6 +109,7 @@ protected: AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; AtomicWord<long long> numBucketsClosedDueToReopening; AtomicWord<long long> numBucketsArchivedDueToMemoryThreshold; + AtomicWord<long long> numBucketsArchivedDueToTimeBackward; AtomicWord<long long> numCommits; AtomicWord<long long> numWaits; AtomicWord<long long> numMeasurementsCommitted; @@ -116,6 +143,7 @@ protected: void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1); void incNumBucketsClosedDueToReopening(long long increment = 1); void incNumBucketsArchivedDueToMemoryThreshold(long long increment = 1); + void incNumBucketsArchivedDueToTimeBackward(long long increment = 1); void incNumCommits(long long increment = 1); void incNumWaits(long long increment = 1); void incNumMeasurementsCommitted(long long increment = 1); @@ -158,14 +186,17 @@ public: public: ClosedBucket() = default; ~ClosedBucket(); - ClosedBucket( - BucketStateManager*, const OID&, const std::string&, boost::optional<uint32_t>, bool); + ClosedBucket(BucketStateManager*, + const BucketId&, + const std::string&, + boost::optional<uint32_t>, + bool); ClosedBucket(ClosedBucket&&); ClosedBucket& operator=(ClosedBucket&&); ClosedBucket(const ClosedBucket&) = delete; ClosedBucket& operator=(const ClosedBucket&) = delete; - OID bucketId; + BucketId bucketId; std::string timeField; boost::optional<uint32_t> numMeasurements; bool eligibleForReopening = false; @@ -403,7 +434,7 @@ public: * 'WriteConflictException'. This should be followed by a call to 'directWriteFinish' after the * write has been committed, rolled back, or otherwise finished. */ - void directWriteStart(const OID& oid); + void directWriteStart(const NamespaceString& ns, const OID& oid); /** * Notifies the catalog that a pending direct write to the bucket document with the specified ID @@ -412,7 +443,7 @@ public: * should have been cleared from the catalog, and it may be safely reopened from the on-disk * state. */ - void directWriteFinish(const OID& oid); + void directWriteFinish(const NamespaceString& ns, const OID& oid); /** * Clears any bucket whose namespace satisfies the predicate. @@ -549,12 +580,7 @@ protected: */ struct BucketHasher { std::size_t operator()(const BucketKey& key) const; - }; - - /** - * Hasher to support using a pre-computed hash as a key without having to compute another hash. - */ - struct PreHashed { + std::size_t operator()(const BucketId& bucketId) const; std::size_t operator()(const BucketKey::Hash& key) const; }; @@ -563,7 +589,8 @@ protected: * BucketCatalog. */ struct ArchivedBucket { - OID bucketId; + ArchivedBucket() = delete; + BucketId bucketId; std::string timeField; }; @@ -578,7 +605,7 @@ protected: // All buckets currently in the catalog, including buckets which are full but not yet // committed. - stdx::unordered_map<OID, std::unique_ptr<Bucket>, OID::Hasher> allBuckets; + stdx::unordered_map<BucketId, std::unique_ptr<Bucket>, BucketHasher> allBuckets; // The current open bucket for each namespace and metadata pair. stdx::unordered_map<BucketKey, Bucket*, BucketHasher> openBuckets; @@ -595,7 +622,7 @@ protected: // efficiently find an archived bucket that is a candidate for an incoming measurement. stdx::unordered_map<BucketKey::Hash, std::map<Date_t, ArchivedBucket, std::greater<Date_t>>, - PreHashed> + BucketHasher> archivedBuckets; }; @@ -604,11 +631,26 @@ protected: */ enum class RolloverAction { kNone, // Keep bucket open + kArchive, // Archive bucket kSoftClose, // Close bucket so it remains eligible for reopening kHardClose, // Permanently close bucket }; /** + * Reasons why a bucket was rolled over. + */ + enum class RolloverReason { + kNone, // Not actually rolled over + kTimeForward, // Measurement time would violate max span for this bucket + kTimeBackward, // Measurement time was before bucket min time + kCount, // Adding this measurement would violate max count + kSchemaChange, // This measurement has a schema incompatible with existing measurements + kCachePressure, // System is under cache pressure, and adding this measurement would make + // the bucket larger than the dynamic size limit + kSize, // Adding this measurement would make the bucket larger than the normal size limit + }; + + /** * Bundle of information that 'insert' needs to pass down to helper methods that may create a * new bucket. */ @@ -658,7 +700,7 @@ protected: /** * Retrieves the bucket state if it is tracked in the catalog. */ - boost::optional<BucketState> getBucketState(const OID& oid) const; + boost::optional<BucketState> getBucketState(const BucketId& bucketId) const; /** * Checks whether the bucket has been cleared before changing the bucket state as requested. @@ -681,7 +723,8 @@ protected: * perform a noop (i.e. if upon inspecting the input, the change would be invalid), 'change' * may simply return its input state unchanged. */ - boost::optional<BucketState> changeBucketState(const OID& id, const StateChangeFn& change); + boost::optional<BucketState> changeBucketState(const BucketId& bucketId, + const StateChangeFn& change); /** * Appends statistics for observability. @@ -692,7 +735,7 @@ protected: void _decrementEraCountHelper(uint64_t era); void _incrementEraCountHelper(uint64_t era); boost::optional<BucketState> _changeBucketStateHelper(WithLock withLock, - const OID& id, + const BucketId& bucketId, const StateChangeFn& change); /** @@ -707,7 +750,7 @@ protected: * bucket state isn't currently tracked. */ boost::optional<BucketState> _markIndividualBucketCleared(WithLock catalogLock, - const OID& bucketId); + const BucketId& bucketId); /** * Removes clear operations from the clear registry that no longer need to be tracked. @@ -725,7 +768,7 @@ protected: EraCountMap _countMap; // Bucket state for synchronization with direct writes - stdx::unordered_map<OID, BucketState, OID::Hasher> _bucketStates; + stdx::unordered_map<BucketId, BucketState, BucketHasher> _bucketStates; // Registry storing clear operations. Maps from era to a lambda function which takes in // information about a Bucket and returns whether the Bucket has been cleared. @@ -742,9 +785,9 @@ protected: public: friend class BucketCatalog; - Bucket(const OID& id, + Bucket(const BucketId& bucketId, StripeNumber stripe, - BucketKey::Hash hash, + BucketKey::Hash keyHash, BucketStateManager* bucketStateManager); ~Bucket(); @@ -754,9 +797,20 @@ protected: void setEra(uint64_t era); /** - * Returns the ID for the underlying bucket. + * Returns the BucketId for the bucket. */ - const OID& id() const; + + const BucketId& bucketId() const; + + /** + * Returns the OID for the underlying bucket. + */ + const OID& oid() const; + + /** + * Returns the namespace for the underlying bucket. + */ + const NamespaceString& ns() const; /** * Returns the number of the stripe that owns the bucket. @@ -789,11 +843,6 @@ protected: uint32_t numMeasurements() const; /** - * Sets the namespace of the bucket. - */ - void setNamespace(const NamespaceString& ns); - - /** * Sets the rollover action, to determine what to do with a bucket when all measurements * have been committed. */ @@ -838,7 +887,7 @@ protected: private: // The bucket ID for the underlying document - const OID _id; + const BucketId _bucketId; // The stripe which owns this bucket. const StripeNumber _stripe; @@ -846,9 +895,6 @@ protected: // The pre-computed hash of the associated BucketKey const BucketKey::Hash _keyHash; - // The namespace that this bucket is used for. - NamespaceString _ns; - // The metadata of the data that this bucket contains. BucketMetadata _metadata; @@ -930,20 +976,23 @@ protected: */ const Bucket* _findBucket(const Stripe& stripe, WithLock stripeLock, - const OID& id, + const BucketId& bucketId, IgnoreBucketState mode = IgnoreBucketState::kNo); /** * Retrieve a bucket for write use. */ - Bucket* _useBucket(Stripe* stripe, WithLock stripeLock, const OID& id, IgnoreBucketState mode); + Bucket* _useBucket(Stripe* stripe, + WithLock stripeLock, + const BucketId& bucketId, + IgnoreBucketState mode); /** * Retrieve a bucket for write use, updating the state in the process. */ Bucket* _useBucketAndChangeState(Stripe* stripe, WithLock stripeLock, - const OID& id, + const BucketId& bucketId, const BucketStateManager::StateChangeFn& change); /** @@ -979,13 +1028,26 @@ protected: * Given a rehydrated 'bucket', passes ownership of that bucket to the catalog, marking the * bucket as open. */ - Bucket* _reopenBucket(Stripe* stripe, - WithLock stripeLock, - ExecutionStatsController stats, - const BucketKey& key, - std::unique_ptr<Bucket>&& bucket, - std::uint64_t targetEra, - ClosedBuckets* closedBuckets); + StatusWith<Bucket*> _reopenBucket(Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController stats, + const BucketKey& key, + std::unique_ptr<Bucket>&& bucket, + std::uint64_t targetEra, + ClosedBuckets* closedBuckets); + + /** + * Check to see if 'insert' can use existing bucket rather than reopening a candidate bucket. If + * true, chances are the caller raced with another thread to reopen the same bucket, but if + * false, there might be another bucket that had been cleared, or that has the same _id in a + * different namespace. + */ + StatusWith<Bucket*> _reuseExistingBucket(Stripe* stripe, + WithLock stripeLock, + ExecutionStatsController* stats, + const BucketKey& key, + Bucket* existingBucket, + std::uint64_t targetEra); /** * Helper method to perform the heavy lifting for both 'tryInsert' and 'insert'. See @@ -1004,15 +1066,16 @@ protected: * Given an already-selected 'bucket', inserts 'doc' to the bucket if possible. If not, and * 'mode' is set to 'kYes', we will create a new bucket and insert into that bucket. */ - std::shared_ptr<WriteBatch> _insertIntoBucket(OperationContext* opCtx, - Stripe* stripe, - WithLock stripeLock, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine, - AllowBucketCreation mode, - CreationInfo* info, - Bucket* bucket, - ClosedBuckets* closedBuckets); + stdx::variant<std::shared_ptr<WriteBatch>, RolloverReason> _insertIntoBucket( + OperationContext* opCtx, + Stripe* stripe, + WithLock stripeLock, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine, + AllowBucketCreation mode, + CreationInfo* info, + Bucket* bucket, + ClosedBuckets* closedBuckets); /** * Wait for other batches to finish so we can prepare 'batch' @@ -1055,9 +1118,11 @@ protected: * Identifies a previously archived bucket that may be able to accomodate the measurement * represented by 'info', if one exists. */ - stdx::variant<std::monostate, OID, BSONObj> _getReopeningCandidate(Stripe* stripe, - WithLock stripeLock, - const CreationInfo& info); + stdx::variant<std::monostate, OID, BSONObj> _getReopeningCandidate( + Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info, + bool allowQueryBasedReopening); /** * Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other @@ -1083,7 +1148,7 @@ protected: * Records that compression for the given bucket has been completed, and the BucketCatalog can * forget about the bucket. */ - void _compressionDone(const OID& bucketId); + void _compressionDone(const BucketId& bucketId); /** * Adds the bucket to a list of idle buckets to be expired at a later date. @@ -1114,13 +1179,14 @@ protected: * Determines if 'bucket' needs to be rolled over to accomodate 'doc'. If so, determines whether * to archive or close 'bucket'. */ - RolloverAction _determineRolloverAction(OperationContext* opCtx, - const BSONObj& doc, - CreationInfo* info, - Bucket* bucket, - NewFieldNames* newFieldNamesToBeInserted, - int32_t* sizeToBeAdded, - AllowBucketCreation mode); + std::pair<RolloverAction, RolloverReason> _determineRolloverAction( + OperationContext* opCtx, + const BSONObj& doc, + CreationInfo* info, + Bucket* bucket, + NewFieldNames* newFieldNamesToBeInserted, + int32_t* sizeToBeAdded, + AllowBucketCreation mode); /** * Close the existing, full bucket and open a new one for the same metadata. diff --git a/src/mongo/db/timeseries/bucket_catalog_helpers.cpp b/src/mongo/db/timeseries/bucket_catalog_helpers.cpp index 4500212e420..13e174cd1bf 100644 --- a/src/mongo/db/timeseries/bucket_catalog_helpers.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_helpers.cpp @@ -266,23 +266,28 @@ BSONObj findDocFromOID(OperationContext* opCtx, const Collection* coll, const OI return (foundDoc) ? bucketObj.value() : BSONObj(); } -void handleDirectWrite(OperationContext* opCtx, const OID& bucketId) { +void handleDirectWrite(OperationContext* opCtx, const NamespaceString& ns, const OID& bucketId) { + // Ensure we have the view namespace, as that's what the BucketCatalog operates on. + NamespaceString resolvedNs = + ns.isTimeseriesBucketsCollection() ? ns.getTimeseriesViewNamespace() : ns; + // First notify the BucketCatalog that we intend to start a direct write, so we can conflict // with any already-prepared operation, and also block bucket reopening if it's enabled. auto& bucketCatalog = BucketCatalog::get(opCtx); - bucketCatalog.directWriteStart(bucketId); + bucketCatalog.directWriteStart(resolvedNs, bucketId); // Then register callbacks so we can let the BucketCatalog know that we are done with our // direct write after the actual write takes place (or is abandoned), and allow reopening. opCtx->recoveryUnit()->onCommit( - [svcCtx = opCtx->getServiceContext(), bucketId](boost::optional<Timestamp>) { + [svcCtx = opCtx->getServiceContext(), resolvedNs, bucketId](boost::optional<Timestamp>) { + auto& bucketCatalog = BucketCatalog::get(svcCtx); + bucketCatalog.directWriteFinish(resolvedNs, bucketId); + }); + opCtx->recoveryUnit()->onRollback( + [svcCtx = opCtx->getServiceContext(), resolvedNs, bucketId]() { auto& bucketCatalog = BucketCatalog::get(svcCtx); - bucketCatalog.directWriteFinish(bucketId); + bucketCatalog.directWriteFinish(resolvedNs, bucketId); }); - opCtx->recoveryUnit()->onRollback([svcCtx = opCtx->getServiceContext(), bucketId]() { - auto& bucketCatalog = BucketCatalog::get(svcCtx); - bucketCatalog.directWriteFinish(bucketId); - }); } } // namespace mongo::timeseries diff --git a/src/mongo/db/timeseries/bucket_catalog_helpers.h b/src/mongo/db/timeseries/bucket_catalog_helpers.h index aa00c9541da..aef8a22fee4 100644 --- a/src/mongo/db/timeseries/bucket_catalog_helpers.h +++ b/src/mongo/db/timeseries/bucket_catalog_helpers.h @@ -119,6 +119,6 @@ BSONObj generateReopeningFilters(const Date_t& time, * * To be called from an OpObserver, e.g. in aboutToDelete and onUpdate. */ -void handleDirectWrite(OperationContext* opCtx, const OID& bucketId); +void handleDirectWrite(OperationContext* opCtx, const NamespaceString& ns, const OID& bucketId); } // namespace mongo::timeseries diff --git a/src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp b/src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp index 918fdab5254..97c23c451df 100644 --- a/src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp @@ -39,9 +39,9 @@ class BucketCatalogStateManagerTest : public BucketCatalog, public unittest::Tes public: BucketCatalogStateManagerTest() {} - void clearById(const OID& oid) { - directWriteStart(oid); - directWriteFinish(oid); + void clearById(const NamespaceString& ns, const OID& oid) { + directWriteStart(ns, oid); + directWriteFinish(ns, oid); } bool hasBeenCleared(Bucket* bucket) { @@ -51,7 +51,6 @@ public: Bucket* createBucket(const CreationInfo& info) { auto ptr = _allocateBucket(&_stripes[info.stripe], withLock, info); - ptr->setNamespace(info.key.ns); ASSERT_FALSE(hasBeenCleared(ptr)); return ptr; } @@ -66,11 +65,15 @@ public: } void checkAndRemoveClearedBucket(Bucket* bucket, BucketKey bucketKey, WithLock withLock) { - auto a = _findBucket( - _stripes[_getStripeNumber(bucketKey)], withLock, bucket->id(), IgnoreBucketState::kYes); + auto a = _findBucket(_stripes[_getStripeNumber(bucketKey)], + withLock, + bucket->bucketId(), + IgnoreBucketState::kYes); ASSERT(a == bucket); - auto b = _findBucket( - _stripes[_getStripeNumber(bucketKey)], withLock, bucket->id(), IgnoreBucketState::kNo); + auto b = _findBucket(_stripes[_getStripeNumber(bucketKey)], + withLock, + bucket->bucketId(), + IgnoreBucketState::kNo); ASSERT(b == nullptr); _removeBucket( &_stripes[_getStripeNumber(bucketKey)], withLock, bucket, RemovalMode::kAbort); @@ -329,7 +332,7 @@ TEST_F(BucketCatalogStateManagerTest, EraAdvancesAsExpected) { ASSERT_EQ(bucket2->getEra(), 1); // Era also advances when clearing by OID - clearById(OID()); + clearById(ns1, OID()); ASSERT_EQ(_bucketStateManager.getEra(), 4); } @@ -500,7 +503,7 @@ TEST_F(BucketCatalogStateManagerTest, HasBeenClearedToleratesGapsInRegistry) { auto bucket1 = createBucket(info1); ASSERT_EQ(bucket1->getEra(), 0); - clearById(OID()); + clearById(ns1, OID()); ASSERT_EQ(_bucketStateManager.getEra(), 2); clear(ns1); ASSERT_EQ(_bucketStateManager.getEra(), 3); @@ -508,9 +511,9 @@ TEST_F(BucketCatalogStateManagerTest, HasBeenClearedToleratesGapsInRegistry) { auto bucket2 = createBucket(info2); ASSERT_EQ(bucket2->getEra(), 3); - clearById(OID()); - clearById(OID()); - clearById(OID()); + clearById(ns1, OID()); + clearById(ns1, OID()); + clearById(ns1, OID()); ASSERT_EQ(_bucketStateManager.getEra(), 9); ASSERT_TRUE(hasBeenCleared(bucket1)); ASSERT_FALSE(hasBeenCleared(bucket2)); @@ -525,7 +528,7 @@ TEST_F(BucketCatalogStateManagerTest, ArchivingBucketPreservesState) { true}; auto bucket = createBucket(info1); - auto bucketId = bucket->id(); + auto bucketId = bucket->bucketId(); ClosedBuckets closedBuckets; _archiveBucket(&_stripes[info1.stripe], WithLock::withoutLock(), bucket, &closedBuckets); @@ -539,7 +542,7 @@ TEST_F(BucketCatalogStateManagerTest, AbortingBatchRemovesBucketState) { true}; auto bucket = createBucket(info1); - auto bucketId = bucket->id(); + auto bucketId = bucket->bucketId(); auto stats = _getExecutionStats(info1.key.ns); auto batch = std::make_shared<WriteBatch>(BucketHandle{bucketId, info1.stripe}, 0, stats); @@ -553,7 +556,7 @@ TEST_F(BucketCatalogStateManagerTest, ClosingBucketGoesThroughPendingCompression true}; auto bucket = createBucket(info1); - auto bucketId = bucket->id(); + auto bucketId = bucket->bucketId(); ASSERT(_bucketStateManager.getBucketState(bucketId).value() == BucketState{}); @@ -571,7 +574,7 @@ TEST_F(BucketCatalogStateManagerTest, ClosingBucketGoesThroughPendingCompression CommitInfo commitInfo{}; auto closedBucket = finish(batch, commitInfo); ASSERT(closedBucket.has_value()); - ASSERT_EQ(closedBucket.value().bucketId, bucketId); + ASSERT_EQ(closedBucket.value().bucketId.oid, bucketId.oid); // Bucket should now be in pending compression state. ASSERT(_bucketStateManager.getBucketState(bucketId).has_value()); @@ -588,8 +591,8 @@ TEST_F(BucketCatalogStateManagerTest, DirectWriteStartInitializesBucketState) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucketId = OID(); - directWriteStart(bucketId); + auto bucketId = BucketId{ns1, OID()}; + directWriteStart(ns1, bucketId.oid); auto state = _bucketStateManager.getBucketState(bucketId); ASSERT_TRUE(state.has_value()); ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); @@ -599,13 +602,13 @@ TEST_F(BucketCatalogStateManagerTest, DirectWriteFinishRemovesBucketState) { RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", true}; - auto bucketId = OID(); - directWriteStart(bucketId); + auto bucketId = BucketId{ns1, OID()}; + directWriteStart(ns1, bucketId.oid); auto state = _bucketStateManager.getBucketState(bucketId); ASSERT_TRUE(state.has_value()); ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); - directWriteFinish(bucketId); + directWriteFinish(ns1, bucketId.oid); state = _bucketStateManager.getBucketState(bucketId); ASSERT_FALSE(state.has_value()); } diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index ef3752a9707..2acb41bc3d8 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -570,7 +570,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); ASSERT(result.isOK()); auto batch = result.getValue().batch; - auto oldId = batch->bucket().id; + auto oldId = batch->bucket().bucketId; _commit(batch, 0); ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON(); ASSERT(batch->newFieldNamesToBeInserted().count(_timeField)) << batch->toBSON(); @@ -626,7 +626,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto& batch2 = result2.getValue().batch; - ASSERT_NE(oldId, batch2->bucket().id); + ASSERT_NE(oldId, batch2->bucket().bucketId); _commit(batch2, 0); ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON(); ASSERT(batch2->newFieldNamesToBeInserted().count(_timeField)) << batch2->toBSON(); @@ -730,7 +730,8 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) { ASSERT_EQ(batch->measurements().size(), 1); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); - ASSERT_THROWS(_bucketCatalog->directWriteStart(batch->bucket().id), WriteConflictException); + ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch->bucket().bucketId.oid), + WriteConflictException); _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(batch->finished()); @@ -763,10 +764,11 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { .getValue() .batch; ASSERT_NE(batch1, batch2); - ASSERT_EQ(batch1->bucket().id, batch2->bucket().id); + ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId); // Now clear the bucket. Since there's a prepared batch it should conflict. - ASSERT_THROWS(_bucketCatalog->directWriteStart(batch1->bucket().id), WriteConflictException); + ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucket().bucketId.oid), + WriteConflictException); // Now try to prepare the second batch. Ensure it aborts the batch. ASSERT(batch2->claimCommitRights()); @@ -775,7 +777,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); // Make sure we didn't clear the bucket state when we aborted the second batch. - ASSERT_THROWS(_bucketCatalog->directWriteStart(batch1->bucket().id), WriteConflictException); + ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucket().bucketId.oid), + WriteConflictException); // Make sure a subsequent insert, which opens a new bucket, doesn't corrupt the old bucket // state and prevent us from finishing the first batch. @@ -790,7 +793,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { .batch; ASSERT_NE(batch1, batch3); ASSERT_NE(batch2, batch3); - ASSERT_NE(batch1->bucket().id, batch3->bucket().id); + ASSERT_NE(batch1->bucket().bucketId, batch3->bucket().bucketId); // Clean up this batch ASSERT(batch3->claimCommitRights()); _bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""}); @@ -940,8 +943,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - ASSERT_EQ(batch1->bucket().id, batch2->bucket().id); - ASSERT_EQ(batch1->bucket().id, batch3->bucket().id); + ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId); + ASSERT_EQ(batch1->bucket().bucketId, batch3->bucket().bucketId); ASSERT(batch1->claimCommitRights()); ASSERT(batch2->claimCommitRights()); @@ -975,7 +978,7 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - ASSERT_NE(batch2->bucket().id, batch4->bucket().id); + ASSERT_NE(batch2->bucket().bucketId, batch4->bucket().bucketId); } TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { @@ -1446,7 +1449,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { "featureFlagTimeseriesScalabilityImprovements", true}; RAIIServerParameterControllerForTest memoryController{ "timeseriesIdleBucketExpiryMemoryUsageThreshold", - 200}; // An absurdly low limit that only allows us one open bucket at a time. + 250}; // An absurdly low limit that only allows us one open bucket at a time. setGlobalFailPoint("alwaysUseSameBucketCatalogStripe", BSON("mode" << "alwaysOn")); @@ -1481,7 +1484,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); - auto bucketId = batch->bucket().id; + auto bucketId = batch->bucket().bucketId; ASSERT(batch->claimCommitRights()); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); @@ -1500,7 +1503,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { ASSERT(!result.getValue().batch); ASSERT_TRUE(stdx::holds_alternative<BSONObj>(result.getValue().candidate)); - // So should time forward. + // Time forward should not hint to re-open. result = _bucketCatalog->tryInsert( _opCtx, _ns1, @@ -1511,7 +1514,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { ASSERT_OK(result.getStatus()); ASSERT(result.getValue().closedBuckets.empty()); ASSERT(!result.getValue().batch); - ASSERT_TRUE(stdx::holds_alternative<BSONObj>(result.getValue().candidate)); + ASSERT_TRUE(stdx::holds_alternative<std::monostate>(result.getValue().candidate)); // Now let's insert something with a different meta, so we open a new bucket, see we're past the // memory limit, and archive the existing bucket. @@ -1526,7 +1529,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)); ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)); batch = result.getValue().batch; - ASSERT_NE(batch->bucket().id, bucketId); + ASSERT_NE(batch->bucket().bucketId, bucketId); ASSERT(batch); ASSERT(batch->claimCommitRights()); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); @@ -1546,7 +1549,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { ASSERT(result.getValue().closedBuckets.empty()); ASSERT(!result.getValue().batch); ASSERT_TRUE(stdx::holds_alternative<OID>(result.getValue().candidate)); - ASSERT_EQ(stdx::get<OID>(result.getValue().candidate), bucketId); + ASSERT_EQ(stdx::get<OID>(result.getValue().candidate), bucketId.oid); } TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) { @@ -1565,7 +1568,7 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); - auto bucketId = batch->bucket().id; + auto bucketId = batch->bucket().bucketId; ASSERT(batch->claimCommitRights()); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); @@ -1583,7 +1586,7 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) ASSERT_OK(result.getStatus()); batch = result.getValue().batch; ASSERT(batch); - ASSERT_NE(batch->bucket().id, bucketId); + ASSERT_NE(batch->bucket().bucketId, bucketId); ASSERT(batch->claimCommitRights()); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); @@ -1607,7 +1610,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); - auto oldBucketId = batch->bucket().id; + auto oldBucketId = batch->bucket().bucketId; ASSERT(batch->claimCommitRights()); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); @@ -1618,7 +1621,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { "control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"}}, "max":{"time":{"$date":"2022-06-06T15:34:30.000Z"}}}, "data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"}}}})"); - ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId); + ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId.oid); auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto { return autoColl->checkValidation(opCtx, bucketDoc); }; @@ -1638,7 +1641,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { ASSERT_OK(result.getStatus()); batch = result.getValue().batch; ASSERT(batch); - ASSERT_EQ(batch->bucket().id, bucketDoc["_id"].OID()); + ASSERT_EQ(batch->bucket().bucketId.oid, bucketDoc["_id"].OID()); ASSERT(batch->claimCommitRights()); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); @@ -1681,7 +1684,7 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); - auto oldBucketId = batch->bucket().id; + auto oldBucketId = batch->bucket().bucketId; ASSERT(batch->claimCommitRights()); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); @@ -1692,16 +1695,17 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { "control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"}}, "max":{"time":{"$date":"2022-06-06T15:34:30.000Z"}}}, "data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"}}}})"); - ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId); + ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId.oid); auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto { return autoColl->checkValidation(opCtx, bucketDoc); }; // If we advance the catalog era, then we shouldn't use a bucket that was fetched during a // previous era. + const NamespaceString fakeNs{"test.foo"}; const auto fakeId = OID(); - _bucketCatalog->directWriteStart(fakeId); - _bucketCatalog->directWriteFinish(fakeId); + _bucketCatalog->directWriteStart(fakeNs, fakeId); + _bucketCatalog->directWriteFinish(fakeNs, fakeId); BucketCatalog::BucketFindResult findResult; findResult.bucketToReopen = |