summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries
diff options
context:
space:
mode:
authorDan Larkin-York <13419935+dhly-etc@users.noreply.github.com>2022-12-08 21:48:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-08 23:12:35 +0000
commit2460a8b708a5b0b0ef245cea1ebe9bb6d9929308 (patch)
treeb02dabfc2798695d66b956c0059ec5195c65c318 /src/mongo/db/timeseries
parent4ea67148b604e355398a6355f7e348da12c677d9 (diff)
downloadmongo-2460a8b708a5b0b0ef245cea1ebe9bb6d9929308.tar.gz
SERVER-71457 Archive on time backwards, and restrict query-based reopening
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp505
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h192
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_helpers.cpp21
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_helpers.h2
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp47
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp54
6 files changed, 510 insertions, 311 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 424e70002f4..91dcf1e7ff7 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -92,8 +92,8 @@ BSONObj buildControlMinTimestampDoc(StringData timeField, Date_t roundedTime) {
return builder.obj();
}
-std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOptions& options) {
- OID bucketId = OID::gen();
+std::pair<OID, Date_t> generateBucketOID(const Date_t& time, const TimeseriesOptions& options) {
+ OID oid = OID::gen();
// We round the measurement timestamp down to the nearest minute, hour, or day depending on the
// granularity. We do this for two reasons. The first is so that if measurements come in
@@ -103,7 +103,7 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti
// what the bucket timestamp will be, so we can route measurements to the right shard chunk.
auto roundedTime = timeseries::roundTimestampToGranularity(time, options);
int64_t const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch());
- bucketId.setTimestamp(roundedSeconds);
+ oid.setTimestamp(roundedSeconds);
// Now, if we stopped here we could end up with bucket OID collisions. Consider the case where
// we have the granularity set to 'Hours'. This means we will round down to the nearest day, so
@@ -122,23 +122,18 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti
// by restarting the new primary. It remains an open question whether we can fix this in a
// better way.
// TODO (SERVER-61412): Avoid time-series bucket OID collisions after election
- auto instance = bucketId.getInstanceUnique();
+ auto instance = oid.getInstanceUnique();
uint32_t sum = DataView(reinterpret_cast<char*>(instance.bytes)).read<uint32_t>(1) +
(durationCount<Seconds>(time.toDurationSinceEpoch()) - roundedSeconds);
DataView(reinterpret_cast<char*>(instance.bytes)).write<uint32_t>(sum, 1);
- bucketId.setInstanceUnique(instance);
+ oid.setInstanceUnique(instance);
- return {bucketId, roundedTime};
+ return {oid, roundedTime};
}
-Status getTimeseriesBucketClearedError(const OID& bucketId,
- const boost::optional<NamespaceString>& ns = boost::none) {
- std::string nsIdentification;
- if (ns) {
- nsIdentification.assign(str::stream() << " for namespace " << *ns);
- }
+Status getTimeseriesBucketClearedError(const NamespaceString& ns, const OID& oid) {
return {ErrorCodes::TimeseriesBucketCleared,
- str::stream() << "Time-series bucket " << bucketId << nsIdentification
+ str::stream() << "Time-series bucket " << oid << " for namespace " << ns
<< " was cleared"};
}
@@ -232,6 +227,12 @@ void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToMemoryTh
_globalStats->numBucketsArchivedDueToMemoryThreshold.fetchAndAddRelaxed(increment);
}
+void BucketCatalog::ExecutionStatsController::incNumBucketsArchivedDueToTimeBackward(
+ long long increment) {
+ _collectionStats->numBucketsArchivedDueToTimeBackward.fetchAndAddRelaxed(increment);
+ _globalStats->numBucketsArchivedDueToTimeBackward.fetchAndAddRelaxed(increment);
+}
+
void BucketCatalog::ExecutionStatsController::incNumCommits(long long increment) {
_collectionStats->numCommits.fetchAndAddRelaxed(increment);
_globalStats->numCommits.fetchAndAddRelaxed(increment);
@@ -430,7 +431,7 @@ bool BucketCatalog::BucketStateManager::_isMemberOfClearedSet(WithLock catalogLo
Bucket* bucket) {
for (auto it = _clearRegistry.lower_bound(bucket->getEra() + 1); it != _clearRegistry.end();
++it) {
- if (it->second(bucket->_ns)) {
+ if (it->second(bucket->ns())) {
return true;
}
}
@@ -445,7 +446,7 @@ bool BucketCatalog::BucketStateManager::_isMemberOfClearedSet(WithLock catalogLo
boost::optional<BucketCatalog::BucketState>
BucketCatalog::BucketStateManager::_markIndividualBucketCleared(WithLock catalogLock,
- const OID& bucketId) {
+ const BucketId& bucketId) {
return _changeBucketStateHelper(
catalogLock,
bucketId,
@@ -462,16 +463,16 @@ boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::g
stdx::lock_guard catalogLock{*_mutex};
// If the bucket has been cleared, we will set the bucket state accordingly to reflect that.
if (_isMemberOfClearedSet(catalogLock, bucket)) {
- return _markIndividualBucketCleared(catalogLock, bucket->id());
+ return _markIndividualBucketCleared(catalogLock, bucket->bucketId());
}
- auto it = _bucketStates.find(bucket->id());
+ auto it = _bucketStates.find(bucket->bucketId());
return it != _bucketStates.end() ? boost::make_optional(it->second) : boost::none;
}
boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::getBucketState(
- const OID& oid) const {
+ const BucketId& bucketId) const {
stdx::lock_guard catalogLock{*_mutex};
- auto it = _bucketStates.find(oid);
+ auto it = _bucketStates.find(bucketId);
return it != _bucketStates.end() ? boost::make_optional(it->second) : boost::none;
}
@@ -479,16 +480,16 @@ boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::c
Bucket* bucket, const StateChangeFn& change) {
stdx::lock_guard catalogLock{*_mutex};
if (_isMemberOfClearedSet(catalogLock, bucket)) {
- return _markIndividualBucketCleared(catalogLock, bucket->id());
+ return _markIndividualBucketCleared(catalogLock, bucket->bucketId());
}
- return _changeBucketStateHelper(catalogLock, bucket->id(), change);
+ return _changeBucketStateHelper(catalogLock, bucket->bucketId(), change);
}
boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketStateManager::changeBucketState(
- const OID& id, const StateChangeFn& change) {
+ const BucketId& bucketId, const StateChangeFn& change) {
stdx::lock_guard catalogLock{*_mutex};
- return _changeBucketStateHelper(catalogLock, id, change);
+ return _changeBucketStateHelper(catalogLock, bucketId, change);
}
void BucketCatalog::BucketStateManager::appendStats(BSONObjBuilder* base) const {
@@ -504,9 +505,9 @@ void BucketCatalog::BucketStateManager::appendStats(BSONObjBuilder* base) const
boost::optional<BucketCatalog::BucketState>
BucketCatalog::BucketStateManager::_changeBucketStateHelper(WithLock catalogLock,
- const OID& id,
+ const BucketId& bucketId,
const StateChangeFn& change) {
- auto it = _bucketStates.find(id);
+ auto it = _bucketStates.find(bucketId);
const boost::optional<BucketState> initial =
(it == _bucketStates.end()) ? boost::none : boost::make_optional(it->second);
const boost::optional<BucketState> target = change(initial, _era);
@@ -528,7 +529,7 @@ BucketCatalog::BucketStateManager::_changeBucketStateHelper(WithLock catalogLock
}
return boost::none;
} else if (!initial.has_value()) {
- _bucketStates.emplace(id, target.value());
+ _bucketStates.emplace(bucketId, target.value());
return target;
}
@@ -568,15 +569,15 @@ void BucketCatalog::BucketStateManager::_cleanClearRegistry() {
_clearRegistry.erase(_clearRegistry.begin(), endIt);
}
-BucketCatalog::Bucket::Bucket(const OID& id,
+BucketCatalog::Bucket::Bucket(const BucketId& bucketId,
StripeNumber stripe,
- BucketKey::Hash hash,
+ BucketKey::Hash keyHash,
BucketStateManager* bucketStateManager)
: _lastCheckedEra(bucketStateManager->getEraAndIncrementCount()),
_bucketStateManager(bucketStateManager),
- _id(id),
+ _bucketId(bucketId),
_stripe(stripe),
- _keyHash(hash) {}
+ _keyHash(keyHash) {}
BucketCatalog::Bucket::~Bucket() {
_bucketStateManager->decrementCountForEra(getEra());
@@ -590,8 +591,16 @@ void BucketCatalog::Bucket::setEra(uint64_t era) {
_lastCheckedEra = era;
}
-const OID& BucketCatalog::Bucket::id() const {
- return _id;
+const BucketCatalog::BucketId& BucketCatalog::Bucket::bucketId() const {
+ return _bucketId;
+}
+
+const OID& BucketCatalog::Bucket::oid() const {
+ return _bucketId.oid;
+}
+
+const NamespaceString& BucketCatalog::Bucket::ns() const {
+ return _bucketId.ns;
}
BucketCatalog::StripeNumber BucketCatalog::Bucket::stripe() const {
@@ -618,10 +627,6 @@ uint32_t BucketCatalog::Bucket::numMeasurements() const {
return _numMeasurements;
}
-void BucketCatalog::Bucket::setNamespace(const NamespaceString& ns) {
- _ns = ns;
-}
-
void BucketCatalog::Bucket::setRolloverAction(RolloverAction action) {
_rolloverAction = action;
}
@@ -690,8 +695,9 @@ std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch(
auto it = _batches.find(opId);
if (it == _batches.end()) {
it = _batches
- .try_emplace(opId,
- std::make_shared<WriteBatch>(BucketHandle{_id, _stripe}, opId, stats))
+ .try_emplace(
+ opId,
+ std::make_shared<WriteBatch>(BucketHandle{_bucketId, _stripe}, opId, stats))
.first;
}
return it->second;
@@ -708,11 +714,11 @@ BucketCatalog::ClosedBucket::~ClosedBucket() {
}
BucketCatalog::ClosedBucket::ClosedBucket(BucketStateManager* bsm,
- const OID& id,
+ const BucketId& bucketId,
const std::string& tf,
boost::optional<uint32_t> nm,
bool efr)
- : bucketId{id},
+ : bucketId{bucketId},
timeField{tf},
numMeasurements{nm},
eligibleForReopening{efr},
@@ -918,21 +924,21 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx,
stdx::lock_guard stripeLock{stripe.mutex};
ClosedBuckets closedBuckets;
- _reopenBucket(&stripe,
- stripeLock,
- stats,
- key,
- std::move(bucket),
- _bucketStateManager.getEra(),
- &closedBuckets);
- return Status::OK();
+ return _reopenBucket(&stripe,
+ stripeLock,
+ stats,
+ key,
+ std::move(bucket),
+ _bucketStateManager.getEra(),
+ &closedBuckets)
+ .getStatus();
}
BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) {
auto const& stripe = _stripes[handle.stripe];
stdx::lock_guard stripeLock{stripe.mutex};
- const Bucket* bucket = _findBucket(stripe, stripeLock, handle.id);
+ const Bucket* bucket = _findBucket(stripe, stripeLock, handle.bucketId);
if (!bucket) {
return {};
}
@@ -977,7 +983,7 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
Bucket* bucket = _useBucketAndChangeState(
&stripe,
stripeLock,
- batch->bucket().id,
+ batch->bucket().bucketId,
[](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
invariant(input.has_value());
return input.value().setFlag(BucketStateFlag::kPrepared);
@@ -991,7 +997,11 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
}
return getBatchStatus();
} else if (!bucket) {
- _abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id));
+ _abort(&stripe,
+ stripeLock,
+ batch,
+ getTimeseriesBucketClearedError(batch->bucket().bucketId.ns,
+ batch->bucket().bucketId.oid));
return getBatchStatus();
}
@@ -1016,7 +1026,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
Bucket* bucket = _useBucketAndChangeState(
&stripe,
stripeLock,
- batch->bucket().id,
+ batch->bucket().bucketId,
[](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
invariant(input.has_value());
return input.value().unsetFlag(BucketStateFlag::kPrepared);
@@ -1042,7 +1052,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
// It's possible that we cleared the bucket in between preparing the commit and finishing
// here. In this case, we should abort any other ongoing batches and clear the bucket from
// the catalog so it's not hanging around idle.
- auto it = stripe.allBuckets.find(batch->bucket().id);
+ auto it = stripe.allBuckets.find(batch->bucket().bucketId);
if (it != stripe.allBuckets.end()) {
bucket = it->second.get();
bucket->_preparedBatch.reset();
@@ -1050,7 +1060,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
stripeLock,
bucket,
nullptr,
- getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
+ getTimeseriesBucketClearedError(bucket->ns(), bucket->oid()));
}
} else if (bucket->allCommitted()) {
switch (bucket->_rolloverAction) {
@@ -1059,13 +1069,21 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
const bool eligibleForReopening =
bucket->_rolloverAction == RolloverAction::kSoftClose;
closedBucket = boost::in_place(&_bucketStateManager,
- bucket->id(),
+ bucket->bucketId(),
bucket->getTimeField().toString(),
bucket->numMeasurements(),
eligibleForReopening);
_removeBucket(&stripe, stripeLock, bucket, RemovalMode::kClose);
break;
}
+ case RolloverAction::kArchive: {
+ ClosedBuckets closedBuckets;
+ _archiveBucket(&stripe, stripeLock, bucket, &closedBuckets);
+ if (!closedBuckets.empty()) {
+ closedBucket = std::move(closedBuckets[0]);
+ }
+ break;
+ }
case RolloverAction::kNone: {
_markBucketIdle(&stripe, stripeLock, bucket);
break;
@@ -1089,9 +1107,11 @@ void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, const Status& statu
_abort(&stripe, stripeLock, batch, status);
}
-void BucketCatalog::directWriteStart(const OID& oid) {
+void BucketCatalog::directWriteStart(const NamespaceString& ns, const OID& oid) {
+ invariant(!ns.isTimeseriesBucketsCollection());
auto result = _bucketStateManager.changeBucketState(
- oid, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
+ BucketId{ns, oid},
+ [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
if (input.has_value()) {
return input.value().setFlag(BucketStateFlag::kPendingDirectWrite);
}
@@ -1109,10 +1129,12 @@ void BucketCatalog::directWriteStart(const OID& oid) {
hangTimeseriesDirectModificationAfterStart.pauseWhileSet();
}
-void BucketCatalog::directWriteFinish(const OID& oid) {
+void BucketCatalog::directWriteFinish(const NamespaceString& ns, const OID& oid) {
+ invariant(!ns.isTimeseriesBucketsCollection());
hangTimeseriesDirectModificationBeforeFinish.pauseWhileSet();
(void)_bucketStateManager.changeBucketState(
- oid, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
+ BucketId{ns, oid},
+ [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
if (!input.has_value()) {
// We may have had multiple direct writes to this document in the same storage
// transaction. If so, a previous call to directWriteFinish may have cleaned up the
@@ -1142,16 +1164,16 @@ void BucketCatalog::clear(ShouldClearFn&& shouldClear) {
auto nextIt = std::next(it);
const auto& bucket = it->second;
- if (shouldClear(bucket->_ns)) {
+ if (shouldClear(bucket->ns())) {
{
stdx::lock_guard catalogLock{_mutex};
- _executionStats.erase(bucket->_ns);
+ _executionStats.erase(bucket->ns());
}
_abort(&stripe,
stripeLock,
bucket.get(),
nullptr,
- getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
+ getTimeseriesBucketClearedError(bucket->ns(), bucket->oid()));
}
it = nextIt;
@@ -1160,6 +1182,7 @@ void BucketCatalog::clear(ShouldClearFn&& shouldClear) {
}
void BucketCatalog::clear(const NamespaceString& ns) {
+ invariant(!ns.isTimeseriesBucketsCollection());
clear([ns](const NamespaceString& bucketNs) { return bucketNs == ns; });
}
@@ -1201,6 +1224,8 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats,
stats->numBucketsClosedDueToReopening.load());
builder->appendNumber("numBucketsArchivedDueToMemoryThreshold",
stats->numBucketsArchivedDueToMemoryThreshold.load());
+ builder->appendNumber("numBucketsArchivedDueToTimeBackward",
+ stats->numBucketsArchivedDueToTimeBackward.load());
builder->appendNumber("numBucketsReopened", stats->numBucketsReopened.load());
builder->appendNumber("numBucketsKeptOpenDueToLargeMeasurements",
stats->numBucketsKeptOpenDueToLargeMeasurements.load());
@@ -1217,6 +1242,7 @@ void BucketCatalog::_appendExecutionStatsToBuilder(const ExecutionStats* stats,
}
void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const {
+ invariant(!ns.isTimeseriesBucketsCollection());
const std::shared_ptr<ExecutionStats> stats = _getExecutionStats(ns);
_appendExecutionStatsToBuilder(stats.get(), builder);
}
@@ -1270,12 +1296,20 @@ const StringData::ComparatorInterface* BucketCatalog::BucketMetadata::getCompara
BucketCatalog::BucketKey::BucketKey(const NamespaceString& n, const BucketMetadata& m)
: ns(n), metadata(m), hash(absl::Hash<BucketKey>{}(*this)) {}
+BucketCatalog::BucketId::BucketId(const NamespaceString& n, const OID& o)
+ : ns{n}, oid{o}, hash{absl::Hash<BucketId>{}(*this)} {}
+
std::size_t BucketCatalog::BucketHasher::operator()(const BucketKey& key) const {
// Use the default absl hasher.
return key.hash;
}
-std::size_t BucketCatalog::PreHashed::operator()(const BucketKey::Hash& key) const {
+std::size_t BucketCatalog::BucketHasher::operator()(const BucketId& bucketId) const {
+ // Use the default absl hasher.
+ return bucketId.hash;
+}
+
+std::size_t BucketCatalog::BucketHasher::operator()(const BucketKey::Hash& key) const {
return key;
}
@@ -1319,9 +1353,9 @@ BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key
const BucketCatalog::Bucket* BucketCatalog::_findBucket(const Stripe& stripe,
WithLock,
- const OID& id,
+ const BucketId& bucketId,
IgnoreBucketState mode) {
- auto it = stripe.allBuckets.find(id);
+ auto it = stripe.allBuckets.find(bucketId);
if (it != stripe.allBuckets.end()) {
if (mode == IgnoreBucketState::kYes) {
return it->second.get();
@@ -1337,17 +1371,17 @@ const BucketCatalog::Bucket* BucketCatalog::_findBucket(const Stripe& stripe,
BucketCatalog::Bucket* BucketCatalog::_useBucket(Stripe* stripe,
WithLock stripeLock,
- const OID& id,
+ const BucketId& bucketId,
IgnoreBucketState mode) {
- return const_cast<Bucket*>(_findBucket(*stripe, stripeLock, id, mode));
+ return const_cast<Bucket*>(_findBucket(*stripe, stripeLock, bucketId, mode));
}
BucketCatalog::Bucket* BucketCatalog::_useBucketAndChangeState(
Stripe* stripe,
WithLock stripeLock,
- const OID& id,
+ const BucketId& bucketId,
const BucketStateManager::StateChangeFn& change) {
- auto it = stripe->allBuckets.find(id);
+ auto it = stripe->allBuckets.find(bucketId);
if (it != stripe->allBuckets.end()) {
if (auto state = _bucketStateManager.changeBucketState(it->second.get(), change);
state && !state.value().conflictsWithInsertion()) {
@@ -1380,7 +1414,7 @@ BucketCatalog::Bucket* BucketCatalog::_useBucket(Stripe* stripe,
stripeLock,
bucket,
nullptr,
- getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
+ getTimeseriesBucketClearedError(bucket->ns(), bucket->oid()));
return mode == AllowBucketCreation::kYes ? _allocateBucket(stripe, stripeLock, info) : nullptr;
}
@@ -1426,7 +1460,7 @@ StatusWith<std::unique_ptr<BucketCatalog::Bucket>> BucketCatalog::_rehydrateBuck
}
auto stripeNumber = _getStripeNumber(key);
- auto bucketId = bucketIdElem.OID();
+ BucketId bucketId{key.ns, bucketIdElem.OID()};
std::unique_ptr<Bucket> bucket =
std::make_unique<Bucket>(bucketId, stripeNumber, key.hash, &_bucketStateManager);
@@ -1437,7 +1471,6 @@ StatusWith<std::unique_ptr<BucketCatalog::Bucket>> BucketCatalog::_rehydrateBuck
}
// Initialize the remaining member variables from the bucket document.
- bucket->setNamespace(ns);
bucket->_metadata = key.metadata;
bucket->_timeField = options.getTimeField().toString();
if (isCompressed) {
@@ -1502,35 +1535,34 @@ StatusWith<std::unique_ptr<BucketCatalog::Bucket>> BucketCatalog::_rehydrateBuck
return {std::move(bucket)};
}
-BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe,
- WithLock stripeLock,
- ExecutionStatsController stats,
- const BucketKey& key,
- std::unique_ptr<Bucket>&& bucket,
- std::uint64_t targetEra,
- ClosedBuckets* closedBuckets) {
+StatusWith<BucketCatalog::Bucket*> BucketCatalog::_reopenBucket(Stripe* stripe,
+ WithLock stripeLock,
+ ExecutionStatsController stats,
+ const BucketKey& key,
+ std::unique_ptr<Bucket>&& bucket,
+ std::uint64_t targetEra,
+ ClosedBuckets* closedBuckets) {
invariant(bucket);
_expireIdleBuckets(stripe, stripeLock, stats, closedBuckets);
- hangTimeseriesInsertBeforeReopeningBucket.pauseWhileSet();
-
// We may need to initialize the bucket's state.
- bool initialized = false;
- auto state = _bucketStateManager.changeBucketState(
- bucket->id(),
- [targetEra, &initialized](boost::optional<BucketState> input,
- std::uint64_t currentEra) -> boost::optional<BucketState> {
- if (targetEra < currentEra ||
- (input.has_value() && input.value().conflictsWithReopening())) {
- initialized = false;
- return input;
- }
- initialized = true;
- return BucketState{};
- });
- if (!initialized) {
- return nullptr;
+ bool conflicts = false;
+ auto initializeStateFn =
+ [targetEra, &conflicts](boost::optional<BucketState> input,
+ std::uint64_t currentEra) -> boost::optional<BucketState> {
+ if (targetEra < currentEra ||
+ (input.has_value() && input.value().conflictsWithReopening())) {
+ conflicts = true;
+ return input;
+ }
+ conflicts = false;
+ return input.has_value() ? input.value() : BucketState{};
+ };
+
+ auto state = _bucketStateManager.changeBucketState(bucket->bucketId(), initializeStateFn);
+ if (conflicts) {
+ return {ErrorCodes::WriteConflict, "Bucket may be stale"};
}
// If this bucket was archived, we need to remove it from the set of archived buckets.
@@ -1538,7 +1570,7 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe,
setIt != stripe->archivedBuckets.end()) {
auto& archivedSet = setIt->second;
if (auto bucketIt = archivedSet.find(bucket->getTime());
- bucketIt != archivedSet.end() && bucket->id() == bucketIt->second.bucketId) {
+ bucketIt != archivedSet.end() && bucket->bucketId() == bucketIt->second.bucketId) {
long long memory =
_marginalMemoryUsageForArchivedBucket(bucketIt->second, archivedSet.size() == 1);
if (archivedSet.size() == 1) {
@@ -1552,16 +1584,10 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe,
}
// Pass ownership of the reopened bucket to the bucket catalog.
- auto [it, inserted] = stripe->allBuckets.try_emplace(bucket->id(), std::move(bucket));
- Bucket* unownedBucket = it->second.get();
-
- // If the bucket wasn't inserted into the stripe, then that bucket is already open and we can
- // return the bucket 'it' points to.
- if (!inserted) {
- stats.incNumDuplicateBucketsReopened();
- _markBucketNotIdle(stripe, stripeLock, unownedBucket);
- return unownedBucket;
- }
+ auto [insertedIt, newlyInserted] =
+ stripe->allBuckets.try_emplace(bucket->bucketId(), std::move(bucket));
+ invariant(newlyInserted);
+ Bucket* unownedBucket = insertedIt->second.get();
// If we already have an open bucket for this key, we need to close it.
if (auto it = stripe->openBuckets.find(key); it != stripe->openBuckets.end()) {
@@ -1570,7 +1596,7 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe,
auto* closedBucket = it->second;
constexpr bool eligibleForReopening = true;
closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
- closedBucket->id(),
+ closedBucket->bucketId(),
closedBucket->getTimeField().toString(),
closedBucket->numMeasurements(),
eligibleForReopening});
@@ -1590,6 +1616,53 @@ BucketCatalog::Bucket* BucketCatalog::_reopenBucket(Stripe* stripe,
return unownedBucket;
}
+StatusWith<BucketCatalog::Bucket*> BucketCatalog::_reuseExistingBucket(
+ Stripe* stripe,
+ WithLock stripeLock,
+ ExecutionStatsController* stats,
+ const BucketKey& key,
+ Bucket* existingBucket,
+ std::uint64_t targetEra) {
+ invariant(existingBucket);
+
+ // If we have an existing bucket, passing the Bucket* will let us check if the bucket was
+ // cleared as part of a set since the last time it was used. If we were to just check by
+ // OID, we may miss if e.g. there was a move chunk operation.
+ bool conflicts = false;
+ auto state = _bucketStateManager.changeBucketState(
+ existingBucket,
+ [targetEra, &conflicts](boost::optional<BucketState> input,
+ std::uint64_t currentEra) -> boost::optional<BucketState> {
+ if (targetEra < currentEra ||
+ (input.has_value() && input.value().conflictsWithReopening())) {
+ conflicts = true;
+ return input;
+ }
+ conflicts = false;
+ return input.has_value() ? input.value() : BucketState{};
+ });
+ if (state.has_value() && state.value().isSet(BucketStateFlag::kCleared)) {
+ _removeBucket(stripe, stripeLock, existingBucket, RemovalMode::kAbort);
+ conflicts = true;
+ }
+ if (conflicts) {
+ return {ErrorCodes::WriteConflict, "Bucket may be stale"};
+ }
+
+ // It's possible to have two buckets with the same ID in different collections, so let's make
+ // extra sure the existing bucket is the right one.
+ if (existingBucket->ns() != key.ns) {
+ return {ErrorCodes::BadValue, "Cannot re-use bucket: same ID but different namespace"};
+ }
+
+ // If the bucket was already open, wasn't cleared, the state didn't conflict with reopening, and
+ // the namespace matches, then we can simply return it.
+ stats->incNumDuplicateBucketsReopened();
+ _markBucketNotIdle(stripe, stripeLock, existingBucket);
+
+ return existingBucket;
+}
+
StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
OperationContext* opCtx,
const NamespaceString& ns,
@@ -1599,6 +1672,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
CombineWithInsertsFromOtherClients combine,
AllowBucketCreation mode,
BucketFindResult bucketFindResult) {
+ invariant(!ns.isTimeseriesBucketsCollection());
auto res = _extractBucketingParameters(ns, comparator, options, doc);
if (!res.isOK()) {
@@ -1632,70 +1706,99 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::_insert(
if (rehydratedBucket.isOK()) {
invariant(mode == AllowBucketCreation::kYes);
- if (Bucket* bucket = _reopenBucket(&stripe,
- stripeLock,
- stats,
- key,
- std::move(rehydratedBucket.getValue()),
- bucketToReopen->catalogEra,
- &result.closedBuckets)) {
- result.batch = _insertIntoBucket(opCtx,
- &stripe,
- stripeLock,
- doc,
- combine,
- mode,
- &info,
- bucket,
- &result.closedBuckets);
- invariant(result.batch);
+ hangTimeseriesInsertBeforeReopeningBucket.pauseWhileSet();
+
+ StatusWith<Bucket*> swBucket{nullptr};
+ auto existingIt = stripe.allBuckets.find(rehydratedBucket.getValue()->bucketId());
+ if (existingIt != stripe.allBuckets.end()) {
+ // First let's check the existing bucket if we have one.
+ Bucket* existingBucket = existingIt->second.get();
+ swBucket = _reuseExistingBucket(
+ &stripe, stripeLock, &stats, key, existingBucket, bucketToReopen->catalogEra);
+ } else {
+ // No existing bucket to use, go ahead and try to reopen our rehydrated bucket.
+ swBucket = _reopenBucket(&stripe,
+ stripeLock,
+ stats,
+ key,
+ std::move(rehydratedBucket.getValue()),
+ bucketToReopen->catalogEra,
+ &result.closedBuckets);
+ }
+
+ if (swBucket.isOK()) {
+ Bucket* bucket = swBucket.getValue();
+ invariant(bucket);
+ auto insertionResult = _insertIntoBucket(opCtx,
+ &stripe,
+ stripeLock,
+ doc,
+ combine,
+ mode,
+ &info,
+ bucket,
+ &result.closedBuckets);
+ auto* batch = stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult);
+ invariant(batch);
+ result.batch = *batch;
return std::move(result);
} else {
stats.incNumBucketReopeningsFailed();
- return {ErrorCodes::WriteConflict, "Bucket may be stale"};
+ if (swBucket.getStatus().code() == ErrorCodes::WriteConflict) {
+ return swBucket.getStatus();
+ }
+ // If we had a different type of error, then we should fall through and proceed to open
+ // a new bucket.
}
}
Bucket* bucket = _useBucket(&stripe, stripeLock, info, mode);
if (!bucket) {
invariant(mode == AllowBucketCreation::kNo);
- result.candidate = _getReopeningCandidate(&stripe, stripeLock, info);
+ constexpr bool allowQueryBasedReopening = true;
+ result.candidate =
+ _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening);
return std::move(result);
}
- result.batch = _insertIntoBucket(
+ auto insertionResult = _insertIntoBucket(
opCtx, &stripe, stripeLock, doc, combine, mode, &info, bucket, &result.closedBuckets);
- if (!result.batch) {
+ if (auto* reason = stdx::get_if<RolloverReason>(&insertionResult)) {
invariant(mode == AllowBucketCreation::kNo);
if (bucket->allCommitted()) {
_markBucketIdle(&stripe, stripeLock, bucket);
}
- result.candidate = _getReopeningCandidate(&stripe, stripeLock, info);
+ bool allowQueryBasedReopening = (*reason == RolloverReason::kTimeBackward);
+ result.candidate =
+ _getReopeningCandidate(&stripe, stripeLock, info, allowQueryBasedReopening);
+ } else {
+ result.batch = *stdx::get_if<std::shared_ptr<WriteBatch>>(&insertionResult);
}
return std::move(result);
}
-std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::_insertIntoBucket(
- OperationContext* opCtx,
- Stripe* stripe,
- WithLock stripeLock,
- const BSONObj& doc,
- CombineWithInsertsFromOtherClients combine,
- AllowBucketCreation mode,
- CreationInfo* info,
- Bucket* bucket,
- ClosedBuckets* closedBuckets) {
+stdx::variant<std::shared_ptr<BucketCatalog::WriteBatch>, BucketCatalog::RolloverReason>
+BucketCatalog::_insertIntoBucket(OperationContext* opCtx,
+ Stripe* stripe,
+ WithLock stripeLock,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine,
+ AllowBucketCreation mode,
+ CreationInfo* info,
+ Bucket* bucket,
+ ClosedBuckets* closedBuckets) {
NewFieldNames newFieldNamesToBeInserted;
int32_t sizeToBeAdded = 0;
- bool isNewlyOpenedBucket = bucket->_ns.isEmpty();
+ bool isNewlyOpenedBucket = (bucket->_size == 0);
if (!isNewlyOpenedBucket) {
- auto action = _determineRolloverAction(
+ auto [action, reason] = _determineRolloverAction(
opCtx, doc, info, bucket, &newFieldNamesToBeInserted, &sizeToBeAdded, mode);
- if (action == RolloverAction::kSoftClose && mode == AllowBucketCreation::kNo) {
+ if ((action == RolloverAction::kSoftClose || action == RolloverAction::kArchive) &&
+ mode == AllowBucketCreation::kNo) {
// We don't actually want to roll this bucket over yet, bail out.
- return std::shared_ptr<WriteBatch>{};
+ return reason;
} else if (action != RolloverAction::kNone) {
info->openedDuetoMetadata = false;
bucket = _rollover(stripe, stripeLock, bucket, *info, action);
@@ -1714,8 +1817,7 @@ std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::_insertIntoBucket(
bucket->_numMeasurements++;
bucket->_size += sizeToBeAdded;
if (isNewlyOpenedBucket) {
- // The namespace and metadata only need to be set if this bucket was newly created.
- bucket->setNamespace(info->key.ns);
+ // The metadata only needs to be set if this bucket was newly created.
bucket->_metadata = info->key.metadata;
// The namespace is stored two times: the bucket itself and openBuckets.
@@ -1745,7 +1847,7 @@ void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<Wri
{
stdx::lock_guard stripeLock{stripe->mutex};
Bucket* bucket =
- _useBucket(stripe, stripeLock, batch->bucket().id, IgnoreBucketState::kNo);
+ _useBucket(stripe, stripeLock, batch->bucket().bucketId, IgnoreBucketState::kNo);
if (!bucket || batch->finished()) {
return;
}
@@ -1771,14 +1873,14 @@ void BucketCatalog::_removeBucket(Stripe* stripe,
invariant(bucket->_batches.empty());
invariant(!bucket->_preparedBatch);
- auto allIt = stripe->allBuckets.find(bucket->id());
+ auto allIt = stripe->allBuckets.find(bucket->bucketId());
invariant(allIt != stripe->allBuckets.end());
_memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
_markBucketNotIdle(stripe, stripeLock, bucket);
// If the bucket was rolled over, then there may be a different open bucket for this metadata.
- auto openIt = stripe->openBuckets.find({bucket->_ns, bucket->_metadata});
+ auto openIt = stripe->openBuckets.find({bucket->ns(), bucket->_metadata});
if (openIt != stripe->openBuckets.end() && openIt->second == bucket) {
stripe->openBuckets.erase(openIt);
}
@@ -1787,14 +1889,14 @@ void BucketCatalog::_removeBucket(Stripe* stripe,
// we can remove the state from the catalog altogether.
switch (mode) {
case RemovalMode::kClose: {
- auto state = _bucketStateManager.getBucketState(bucket->id());
+ auto state = _bucketStateManager.getBucketState(bucket->bucketId());
invariant(state.has_value());
invariant(state.value().isSet(BucketStateFlag::kPendingCompression));
break;
}
case RemovalMode::kAbort:
_bucketStateManager.changeBucketState(
- bucket->id(),
+ bucket->bucketId(),
[](boost::optional<BucketState> input,
std::uint64_t) -> boost::optional<BucketState> {
invariant(input.has_value());
@@ -1821,11 +1923,12 @@ void BucketCatalog::_archiveBucket(Stripe* stripe,
auto& archivedSet = stripe->archivedBuckets[bucket->keyHash()];
auto it = archivedSet.find(bucket->getTime());
if (it == archivedSet.end()) {
- archivedSet.emplace(bucket->getTime(),
- ArchivedBucket{bucket->id(), bucket->getTimeField().toString()});
+ auto [it, inserted] = archivedSet.emplace(
+ bucket->getTime(),
+ ArchivedBucket{bucket->bucketId(), bucket->getTimeField().toString()});
- long long memory = _marginalMemoryUsageForArchivedBucket(archivedSet[bucket->getTime()],
- archivedSet.size() == 1);
+ long long memory =
+ _marginalMemoryUsageForArchivedBucket(it->second, archivedSet.size() == 1);
_memoryUsage.fetchAndAdd(memory);
archived = true;
}
@@ -1842,7 +1945,7 @@ void BucketCatalog::_archiveBucket(Stripe* stripe,
mode = RemovalMode::kClose;
constexpr bool eligibleForReopening = true;
closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
- bucket->id(),
+ bucket->bucketId(),
bucket->getTimeField().toString(),
bucket->numMeasurements(),
eligibleForReopening});
@@ -1877,7 +1980,7 @@ boost::optional<OID> BucketCatalog::_findArchivedCandidate(Stripe* stripe,
if (info.time - candidateTime < Seconds(*info.options.getBucketMaxSpanSeconds())) {
auto state = _bucketStateManager.getBucketState(candidateBucket.bucketId);
if (state && !state.value().conflictsWithReopening()) {
- return candidateBucket.bucketId;
+ return candidateBucket.bucketId.oid;
} else {
if (state) {
_bucketStateManager.changeBucketState(
@@ -1907,11 +2010,15 @@ boost::optional<OID> BucketCatalog::_findArchivedCandidate(Stripe* stripe,
}
stdx::variant<std::monostate, OID, BSONObj> BucketCatalog::_getReopeningCandidate(
- Stripe* stripe, WithLock stripeLock, const CreationInfo& info) {
+ Stripe* stripe, WithLock stripeLock, const CreationInfo& info, bool allowQueryBasedReopening) {
if (auto archived = _findArchivedCandidate(stripe, stripeLock, info)) {
return archived.value();
}
+ if (!allowQueryBasedReopening) {
+ return {};
+ }
+
boost::optional<BSONElement> metaElement;
if (info.options.getMetaField().has_value()) {
metaElement = info.key.metadata.element();
@@ -1929,7 +2036,8 @@ void BucketCatalog::_abort(Stripe* stripe,
std::shared_ptr<WriteBatch> batch,
const Status& status) {
// Before we access the bucket, make sure it's still there.
- Bucket* bucket = _useBucket(stripe, stripeLock, batch->bucket().id, IgnoreBucketState::kYes);
+ Bucket* bucket =
+ _useBucket(stripe, stripeLock, batch->bucket().bucketId, IgnoreBucketState::kYes);
if (!bucket) {
// Special case, bucket has already been cleared, and we need only abort this batch.
batch->_abort(status);
@@ -1971,9 +2079,10 @@ void BucketCatalog::_abort(Stripe* stripe,
}
}
-void BucketCatalog::_compressionDone(const OID& id) {
+void BucketCatalog::_compressionDone(const BucketId& bucketId) {
_bucketStateManager.changeBucketState(
- id, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
+ bucketId,
+ [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
return boost::none;
});
}
@@ -2020,7 +2129,7 @@ void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
_removeBucket(stripe, stripeLock, bucket, RemovalMode::kAbort);
} else {
closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
- bucket->id(),
+ bucket->bucketId(),
bucket->getTimeField().toString(),
bucket->numMeasurements(),
eligibleForReopening});
@@ -2066,7 +2175,8 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
const CreationInfo& info) {
_expireIdleBuckets(stripe, stripeLock, info.stats, info.closedBuckets);
- auto [bucketId, roundedTime] = generateBucketId(info.time, info.options);
+ auto [oid, roundedTime] = generateBucketOID(info.time, info.options);
+ auto bucketId = BucketId{info.key.ns, oid};
auto [it, inserted] = stripe->allBuckets.try_emplace(
bucketId,
@@ -2098,14 +2208,14 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
return bucket;
}
-BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(
- OperationContext* opCtx,
- const BSONObj& doc,
- CreationInfo* info,
- Bucket* bucket,
- NewFieldNames* newFieldNamesToBeInserted,
- int32_t* sizeToBeAdded,
- AllowBucketCreation mode) {
+std::pair<BucketCatalog::RolloverAction, BucketCatalog::RolloverReason>
+BucketCatalog::_determineRolloverAction(OperationContext* opCtx,
+ const BSONObj& doc,
+ CreationInfo* info,
+ Bucket* bucket,
+ NewFieldNames* newFieldNamesToBeInserted,
+ int32_t* sizeToBeAdded,
+ AllowBucketCreation mode) {
// If the mode is enabled to create new buckets, then we should update stats for soft closures
// accordingly. If we specify the mode to not allow bucket creation, it means we are not sure if
// we want to soft close the bucket yet and should wait to update closure stats.
@@ -2116,17 +2226,24 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(
if (shouldUpdateStats) {
info->stats.incNumBucketsClosedDueToTimeForward();
}
- return RolloverAction::kSoftClose;
+ return {RolloverAction::kSoftClose, RolloverReason::kTimeForward};
}
if (info->time < bucketTime) {
+ const bool canArchive = feature_flags::gTimeseriesScalabilityImprovements.isEnabled(
+ serverGlobalParams.featureCompatibility);
if (shouldUpdateStats) {
- info->stats.incNumBucketsClosedDueToTimeBackward();
+ if (canArchive) {
+ info->stats.incNumBucketsArchivedDueToTimeBackward();
+ } else {
+ info->stats.incNumBucketsClosedDueToTimeBackward();
+ }
}
- return RolloverAction::kSoftClose;
+ return {canArchive ? RolloverAction::kArchive : RolloverAction::kSoftClose,
+ RolloverReason::kTimeBackward};
}
if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
info->stats.incNumBucketsClosedDueToCount();
- return RolloverAction::kHardClose;
+ return {RolloverAction::kHardClose, RolloverReason::kCount};
}
// In scenarios where we have a high cardinality workload and face increased cache pressure we
@@ -2155,10 +2272,10 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(
if (bucket->_size + *sizeToBeAdded > absoluteMaxSize) {
if (absoluteMaxSize != largeMeasurementsMaxBucketSize) {
info->stats.incNumBucketsClosedDueToCachePressure();
- } else {
- info->stats.incNumBucketsClosedDueToSize();
+ return {RolloverAction::kHardClose, RolloverReason::kCachePressure};
}
- return RolloverAction::kHardClose;
+ info->stats.incNumBucketsClosedDueToSize();
+ return {RolloverAction::kHardClose, RolloverReason::kSize};
}
// There's enough space to add this measurement and we're still below the large
@@ -2168,24 +2285,24 @@ BucketCatalog::RolloverAction BucketCatalog::_determineRolloverAction(
bucket->_keptOpenDueToLargeMeasurements = true;
info->stats.incNumBucketsKeptOpenDueToLargeMeasurements();
}
- return RolloverAction::kNone;
+ return {RolloverAction::kNone, RolloverReason::kNone};
} else {
if (effectiveMaxSize == gTimeseriesBucketMaxSize) {
info->stats.incNumBucketsClosedDueToSize();
- } else {
- info->stats.incNumBucketsClosedDueToCachePressure();
+ return {RolloverAction::kHardClose, RolloverReason::kSize};
}
- return RolloverAction::kHardClose;
+ info->stats.incNumBucketsClosedDueToCachePressure();
+ return {RolloverAction::kHardClose, RolloverReason::kCachePressure};
}
}
if (bucket->schemaIncompatible(
doc, info->options.getMetaField(), info->key.metadata.getComparator())) {
info->stats.incNumBucketsClosedDueToSchemaChange();
- return RolloverAction::kHardClose;
+ return {RolloverAction::kHardClose, RolloverReason::kSchemaChange};
}
- return RolloverAction::kNone;
+ return {RolloverAction::kNone, RolloverReason::kNone};
}
BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe,
@@ -2197,14 +2314,18 @@ BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe,
if (bucket->allCommitted()) {
// The bucket does not contain any measurements that are yet to be committed, so we can take
// action now.
- const bool eligibleForReopening = action == RolloverAction::kSoftClose;
- info.closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
- bucket->id(),
- bucket->getTimeField().toString(),
- bucket->numMeasurements(),
- eligibleForReopening});
-
- _removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose);
+ if (action == RolloverAction::kArchive) {
+ _archiveBucket(stripe, stripeLock, bucket, info.closedBuckets);
+ } else {
+ const bool eligibleForReopening = action == RolloverAction::kSoftClose;
+ info.closedBuckets->emplace_back(ClosedBucket{&_bucketStateManager,
+ bucket->bucketId(),
+ bucket->getTimeField().toString(),
+ bucket->numMeasurements(),
+ eligibleForReopening});
+
+ _removeBucket(stripe, stripeLock, bucket, RemovalMode::kClose);
+ }
} else {
// We must keep the bucket around until all measurements are committed committed, just mark
// the action we chose now so it we know what to do when the last batch finishes.
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 8568349bc6c..637129f0e56 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -65,8 +65,34 @@ protected:
class BucketStateManager;
+ /**
+ * Identifier to lookup bucket by namespace and OID, with pre-computed hash.
+ */
+ struct BucketId {
+ using Hash = std::size_t;
+
+ BucketId() = delete;
+ BucketId(const NamespaceString& nss, const OID& oid);
+
+ NamespaceString ns;
+ OID oid;
+ Hash hash;
+
+ bool operator==(const BucketId& other) const {
+ return oid == other.oid && ns == other.ns;
+ }
+ bool operator!=(const BucketId& other) const {
+ return !(*this == other);
+ }
+
+ template <typename H>
+ friend H AbslHashValue(H h, const BucketId& bucketId) {
+ return H::combine(std::move(h), bucketId.oid, bucketId.ns);
+ }
+ };
+
struct BucketHandle {
- const OID id;
+ const BucketId bucketId;
const StripeNumber stripe;
};
@@ -83,6 +109,7 @@ protected:
AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
AtomicWord<long long> numBucketsClosedDueToReopening;
AtomicWord<long long> numBucketsArchivedDueToMemoryThreshold;
+ AtomicWord<long long> numBucketsArchivedDueToTimeBackward;
AtomicWord<long long> numCommits;
AtomicWord<long long> numWaits;
AtomicWord<long long> numMeasurementsCommitted;
@@ -116,6 +143,7 @@ protected:
void incNumBucketsClosedDueToMemoryThreshold(long long increment = 1);
void incNumBucketsClosedDueToReopening(long long increment = 1);
void incNumBucketsArchivedDueToMemoryThreshold(long long increment = 1);
+ void incNumBucketsArchivedDueToTimeBackward(long long increment = 1);
void incNumCommits(long long increment = 1);
void incNumWaits(long long increment = 1);
void incNumMeasurementsCommitted(long long increment = 1);
@@ -158,14 +186,17 @@ public:
public:
ClosedBucket() = default;
~ClosedBucket();
- ClosedBucket(
- BucketStateManager*, const OID&, const std::string&, boost::optional<uint32_t>, bool);
+ ClosedBucket(BucketStateManager*,
+ const BucketId&,
+ const std::string&,
+ boost::optional<uint32_t>,
+ bool);
ClosedBucket(ClosedBucket&&);
ClosedBucket& operator=(ClosedBucket&&);
ClosedBucket(const ClosedBucket&) = delete;
ClosedBucket& operator=(const ClosedBucket&) = delete;
- OID bucketId;
+ BucketId bucketId;
std::string timeField;
boost::optional<uint32_t> numMeasurements;
bool eligibleForReopening = false;
@@ -403,7 +434,7 @@ public:
* 'WriteConflictException'. This should be followed by a call to 'directWriteFinish' after the
* write has been committed, rolled back, or otherwise finished.
*/
- void directWriteStart(const OID& oid);
+ void directWriteStart(const NamespaceString& ns, const OID& oid);
/**
* Notifies the catalog that a pending direct write to the bucket document with the specified ID
@@ -412,7 +443,7 @@ public:
* should have been cleared from the catalog, and it may be safely reopened from the on-disk
* state.
*/
- void directWriteFinish(const OID& oid);
+ void directWriteFinish(const NamespaceString& ns, const OID& oid);
/**
* Clears any bucket whose namespace satisfies the predicate.
@@ -549,12 +580,7 @@ protected:
*/
struct BucketHasher {
std::size_t operator()(const BucketKey& key) const;
- };
-
- /**
- * Hasher to support using a pre-computed hash as a key without having to compute another hash.
- */
- struct PreHashed {
+ std::size_t operator()(const BucketId& bucketId) const;
std::size_t operator()(const BucketKey::Hash& key) const;
};
@@ -563,7 +589,8 @@ protected:
* BucketCatalog.
*/
struct ArchivedBucket {
- OID bucketId;
+ ArchivedBucket() = delete;
+ BucketId bucketId;
std::string timeField;
};
@@ -578,7 +605,7 @@ protected:
// All buckets currently in the catalog, including buckets which are full but not yet
// committed.
- stdx::unordered_map<OID, std::unique_ptr<Bucket>, OID::Hasher> allBuckets;
+ stdx::unordered_map<BucketId, std::unique_ptr<Bucket>, BucketHasher> allBuckets;
// The current open bucket for each namespace and metadata pair.
stdx::unordered_map<BucketKey, Bucket*, BucketHasher> openBuckets;
@@ -595,7 +622,7 @@ protected:
// efficiently find an archived bucket that is a candidate for an incoming measurement.
stdx::unordered_map<BucketKey::Hash,
std::map<Date_t, ArchivedBucket, std::greater<Date_t>>,
- PreHashed>
+ BucketHasher>
archivedBuckets;
};
@@ -604,11 +631,26 @@ protected:
*/
enum class RolloverAction {
kNone, // Keep bucket open
+ kArchive, // Archive bucket
kSoftClose, // Close bucket so it remains eligible for reopening
kHardClose, // Permanently close bucket
};
/**
+ * Reasons why a bucket was rolled over.
+ */
+ enum class RolloverReason {
+ kNone, // Not actually rolled over
+ kTimeForward, // Measurement time would violate max span for this bucket
+ kTimeBackward, // Measurement time was before bucket min time
+ kCount, // Adding this measurement would violate max count
+ kSchemaChange, // This measurement has a schema incompatible with existing measurements
+ kCachePressure, // System is under cache pressure, and adding this measurement would make
+ // the bucket larger than the dynamic size limit
+ kSize, // Adding this measurement would make the bucket larger than the normal size limit
+ };
+
+ /**
* Bundle of information that 'insert' needs to pass down to helper methods that may create a
* new bucket.
*/
@@ -658,7 +700,7 @@ protected:
/**
* Retrieves the bucket state if it is tracked in the catalog.
*/
- boost::optional<BucketState> getBucketState(const OID& oid) const;
+ boost::optional<BucketState> getBucketState(const BucketId& bucketId) const;
/**
* Checks whether the bucket has been cleared before changing the bucket state as requested.
@@ -681,7 +723,8 @@ protected:
* perform a noop (i.e. if upon inspecting the input, the change would be invalid), 'change'
* may simply return its input state unchanged.
*/
- boost::optional<BucketState> changeBucketState(const OID& id, const StateChangeFn& change);
+ boost::optional<BucketState> changeBucketState(const BucketId& bucketId,
+ const StateChangeFn& change);
/**
* Appends statistics for observability.
@@ -692,7 +735,7 @@ protected:
void _decrementEraCountHelper(uint64_t era);
void _incrementEraCountHelper(uint64_t era);
boost::optional<BucketState> _changeBucketStateHelper(WithLock withLock,
- const OID& id,
+ const BucketId& bucketId,
const StateChangeFn& change);
/**
@@ -707,7 +750,7 @@ protected:
* bucket state isn't currently tracked.
*/
boost::optional<BucketState> _markIndividualBucketCleared(WithLock catalogLock,
- const OID& bucketId);
+ const BucketId& bucketId);
/**
* Removes clear operations from the clear registry that no longer need to be tracked.
@@ -725,7 +768,7 @@ protected:
EraCountMap _countMap;
// Bucket state for synchronization with direct writes
- stdx::unordered_map<OID, BucketState, OID::Hasher> _bucketStates;
+ stdx::unordered_map<BucketId, BucketState, BucketHasher> _bucketStates;
// Registry storing clear operations. Maps from era to a lambda function which takes in
// information about a Bucket and returns whether the Bucket has been cleared.
@@ -742,9 +785,9 @@ protected:
public:
friend class BucketCatalog;
- Bucket(const OID& id,
+ Bucket(const BucketId& bucketId,
StripeNumber stripe,
- BucketKey::Hash hash,
+ BucketKey::Hash keyHash,
BucketStateManager* bucketStateManager);
~Bucket();
@@ -754,9 +797,20 @@ protected:
void setEra(uint64_t era);
/**
- * Returns the ID for the underlying bucket.
+ * Returns the BucketId for the bucket.
*/
- const OID& id() const;
+
+ const BucketId& bucketId() const;
+
+ /**
+ * Returns the OID for the underlying bucket.
+ */
+ const OID& oid() const;
+
+ /**
+ * Returns the namespace for the underlying bucket.
+ */
+ const NamespaceString& ns() const;
/**
* Returns the number of the stripe that owns the bucket.
@@ -789,11 +843,6 @@ protected:
uint32_t numMeasurements() const;
/**
- * Sets the namespace of the bucket.
- */
- void setNamespace(const NamespaceString& ns);
-
- /**
* Sets the rollover action, to determine what to do with a bucket when all measurements
* have been committed.
*/
@@ -838,7 +887,7 @@ protected:
private:
// The bucket ID for the underlying document
- const OID _id;
+ const BucketId _bucketId;
// The stripe which owns this bucket.
const StripeNumber _stripe;
@@ -846,9 +895,6 @@ protected:
// The pre-computed hash of the associated BucketKey
const BucketKey::Hash _keyHash;
- // The namespace that this bucket is used for.
- NamespaceString _ns;
-
// The metadata of the data that this bucket contains.
BucketMetadata _metadata;
@@ -930,20 +976,23 @@ protected:
*/
const Bucket* _findBucket(const Stripe& stripe,
WithLock stripeLock,
- const OID& id,
+ const BucketId& bucketId,
IgnoreBucketState mode = IgnoreBucketState::kNo);
/**
* Retrieve a bucket for write use.
*/
- Bucket* _useBucket(Stripe* stripe, WithLock stripeLock, const OID& id, IgnoreBucketState mode);
+ Bucket* _useBucket(Stripe* stripe,
+ WithLock stripeLock,
+ const BucketId& bucketId,
+ IgnoreBucketState mode);
/**
* Retrieve a bucket for write use, updating the state in the process.
*/
Bucket* _useBucketAndChangeState(Stripe* stripe,
WithLock stripeLock,
- const OID& id,
+ const BucketId& bucketId,
const BucketStateManager::StateChangeFn& change);
/**
@@ -979,13 +1028,26 @@ protected:
* Given a rehydrated 'bucket', passes ownership of that bucket to the catalog, marking the
* bucket as open.
*/
- Bucket* _reopenBucket(Stripe* stripe,
- WithLock stripeLock,
- ExecutionStatsController stats,
- const BucketKey& key,
- std::unique_ptr<Bucket>&& bucket,
- std::uint64_t targetEra,
- ClosedBuckets* closedBuckets);
+ StatusWith<Bucket*> _reopenBucket(Stripe* stripe,
+ WithLock stripeLock,
+ ExecutionStatsController stats,
+ const BucketKey& key,
+ std::unique_ptr<Bucket>&& bucket,
+ std::uint64_t targetEra,
+ ClosedBuckets* closedBuckets);
+
+ /**
+ * Check to see if 'insert' can use existing bucket rather than reopening a candidate bucket. If
+ * true, chances are the caller raced with another thread to reopen the same bucket, but if
+ * false, there might be another bucket that had been cleared, or that has the same _id in a
+ * different namespace.
+ */
+ StatusWith<Bucket*> _reuseExistingBucket(Stripe* stripe,
+ WithLock stripeLock,
+ ExecutionStatsController* stats,
+ const BucketKey& key,
+ Bucket* existingBucket,
+ std::uint64_t targetEra);
/**
* Helper method to perform the heavy lifting for both 'tryInsert' and 'insert'. See
@@ -1004,15 +1066,16 @@ protected:
* Given an already-selected 'bucket', inserts 'doc' to the bucket if possible. If not, and
* 'mode' is set to 'kYes', we will create a new bucket and insert into that bucket.
*/
- std::shared_ptr<WriteBatch> _insertIntoBucket(OperationContext* opCtx,
- Stripe* stripe,
- WithLock stripeLock,
- const BSONObj& doc,
- CombineWithInsertsFromOtherClients combine,
- AllowBucketCreation mode,
- CreationInfo* info,
- Bucket* bucket,
- ClosedBuckets* closedBuckets);
+ stdx::variant<std::shared_ptr<WriteBatch>, RolloverReason> _insertIntoBucket(
+ OperationContext* opCtx,
+ Stripe* stripe,
+ WithLock stripeLock,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine,
+ AllowBucketCreation mode,
+ CreationInfo* info,
+ Bucket* bucket,
+ ClosedBuckets* closedBuckets);
/**
* Wait for other batches to finish so we can prepare 'batch'
@@ -1055,9 +1118,11 @@ protected:
* Identifies a previously archived bucket that may be able to accomodate the measurement
* represented by 'info', if one exists.
*/
- stdx::variant<std::monostate, OID, BSONObj> _getReopeningCandidate(Stripe* stripe,
- WithLock stripeLock,
- const CreationInfo& info);
+ stdx::variant<std::monostate, OID, BSONObj> _getReopeningCandidate(
+ Stripe* stripe,
+ WithLock stripeLock,
+ const CreationInfo& info,
+ bool allowQueryBasedReopening);
/**
* Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other
@@ -1083,7 +1148,7 @@ protected:
* Records that compression for the given bucket has been completed, and the BucketCatalog can
* forget about the bucket.
*/
- void _compressionDone(const OID& bucketId);
+ void _compressionDone(const BucketId& bucketId);
/**
* Adds the bucket to a list of idle buckets to be expired at a later date.
@@ -1114,13 +1179,14 @@ protected:
* Determines if 'bucket' needs to be rolled over to accomodate 'doc'. If so, determines whether
* to archive or close 'bucket'.
*/
- RolloverAction _determineRolloverAction(OperationContext* opCtx,
- const BSONObj& doc,
- CreationInfo* info,
- Bucket* bucket,
- NewFieldNames* newFieldNamesToBeInserted,
- int32_t* sizeToBeAdded,
- AllowBucketCreation mode);
+ std::pair<RolloverAction, RolloverReason> _determineRolloverAction(
+ OperationContext* opCtx,
+ const BSONObj& doc,
+ CreationInfo* info,
+ Bucket* bucket,
+ NewFieldNames* newFieldNamesToBeInserted,
+ int32_t* sizeToBeAdded,
+ AllowBucketCreation mode);
/**
* Close the existing, full bucket and open a new one for the same metadata.
diff --git a/src/mongo/db/timeseries/bucket_catalog_helpers.cpp b/src/mongo/db/timeseries/bucket_catalog_helpers.cpp
index 4500212e420..13e174cd1bf 100644
--- a/src/mongo/db/timeseries/bucket_catalog_helpers.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_helpers.cpp
@@ -266,23 +266,28 @@ BSONObj findDocFromOID(OperationContext* opCtx, const Collection* coll, const OI
return (foundDoc) ? bucketObj.value() : BSONObj();
}
-void handleDirectWrite(OperationContext* opCtx, const OID& bucketId) {
+void handleDirectWrite(OperationContext* opCtx, const NamespaceString& ns, const OID& bucketId) {
+ // Ensure we have the view namespace, as that's what the BucketCatalog operates on.
+ NamespaceString resolvedNs =
+ ns.isTimeseriesBucketsCollection() ? ns.getTimeseriesViewNamespace() : ns;
+
// First notify the BucketCatalog that we intend to start a direct write, so we can conflict
// with any already-prepared operation, and also block bucket reopening if it's enabled.
auto& bucketCatalog = BucketCatalog::get(opCtx);
- bucketCatalog.directWriteStart(bucketId);
+ bucketCatalog.directWriteStart(resolvedNs, bucketId);
// Then register callbacks so we can let the BucketCatalog know that we are done with our
// direct write after the actual write takes place (or is abandoned), and allow reopening.
opCtx->recoveryUnit()->onCommit(
- [svcCtx = opCtx->getServiceContext(), bucketId](boost::optional<Timestamp>) {
+ [svcCtx = opCtx->getServiceContext(), resolvedNs, bucketId](boost::optional<Timestamp>) {
+ auto& bucketCatalog = BucketCatalog::get(svcCtx);
+ bucketCatalog.directWriteFinish(resolvedNs, bucketId);
+ });
+ opCtx->recoveryUnit()->onRollback(
+ [svcCtx = opCtx->getServiceContext(), resolvedNs, bucketId]() {
auto& bucketCatalog = BucketCatalog::get(svcCtx);
- bucketCatalog.directWriteFinish(bucketId);
+ bucketCatalog.directWriteFinish(resolvedNs, bucketId);
});
- opCtx->recoveryUnit()->onRollback([svcCtx = opCtx->getServiceContext(), bucketId]() {
- auto& bucketCatalog = BucketCatalog::get(svcCtx);
- bucketCatalog.directWriteFinish(bucketId);
- });
}
} // namespace mongo::timeseries
diff --git a/src/mongo/db/timeseries/bucket_catalog_helpers.h b/src/mongo/db/timeseries/bucket_catalog_helpers.h
index aa00c9541da..aef8a22fee4 100644
--- a/src/mongo/db/timeseries/bucket_catalog_helpers.h
+++ b/src/mongo/db/timeseries/bucket_catalog_helpers.h
@@ -119,6 +119,6 @@ BSONObj generateReopeningFilters(const Date_t& time,
*
* To be called from an OpObserver, e.g. in aboutToDelete and onUpdate.
*/
-void handleDirectWrite(OperationContext* opCtx, const OID& bucketId);
+void handleDirectWrite(OperationContext* opCtx, const NamespaceString& ns, const OID& bucketId);
} // namespace mongo::timeseries
diff --git a/src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp b/src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp
index 918fdab5254..97c23c451df 100644
--- a/src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_state_manager_test.cpp
@@ -39,9 +39,9 @@ class BucketCatalogStateManagerTest : public BucketCatalog, public unittest::Tes
public:
BucketCatalogStateManagerTest() {}
- void clearById(const OID& oid) {
- directWriteStart(oid);
- directWriteFinish(oid);
+ void clearById(const NamespaceString& ns, const OID& oid) {
+ directWriteStart(ns, oid);
+ directWriteFinish(ns, oid);
}
bool hasBeenCleared(Bucket* bucket) {
@@ -51,7 +51,6 @@ public:
Bucket* createBucket(const CreationInfo& info) {
auto ptr = _allocateBucket(&_stripes[info.stripe], withLock, info);
- ptr->setNamespace(info.key.ns);
ASSERT_FALSE(hasBeenCleared(ptr));
return ptr;
}
@@ -66,11 +65,15 @@ public:
}
void checkAndRemoveClearedBucket(Bucket* bucket, BucketKey bucketKey, WithLock withLock) {
- auto a = _findBucket(
- _stripes[_getStripeNumber(bucketKey)], withLock, bucket->id(), IgnoreBucketState::kYes);
+ auto a = _findBucket(_stripes[_getStripeNumber(bucketKey)],
+ withLock,
+ bucket->bucketId(),
+ IgnoreBucketState::kYes);
ASSERT(a == bucket);
- auto b = _findBucket(
- _stripes[_getStripeNumber(bucketKey)], withLock, bucket->id(), IgnoreBucketState::kNo);
+ auto b = _findBucket(_stripes[_getStripeNumber(bucketKey)],
+ withLock,
+ bucket->bucketId(),
+ IgnoreBucketState::kNo);
ASSERT(b == nullptr);
_removeBucket(
&_stripes[_getStripeNumber(bucketKey)], withLock, bucket, RemovalMode::kAbort);
@@ -329,7 +332,7 @@ TEST_F(BucketCatalogStateManagerTest, EraAdvancesAsExpected) {
ASSERT_EQ(bucket2->getEra(), 1);
// Era also advances when clearing by OID
- clearById(OID());
+ clearById(ns1, OID());
ASSERT_EQ(_bucketStateManager.getEra(), 4);
}
@@ -500,7 +503,7 @@ TEST_F(BucketCatalogStateManagerTest, HasBeenClearedToleratesGapsInRegistry) {
auto bucket1 = createBucket(info1);
ASSERT_EQ(bucket1->getEra(), 0);
- clearById(OID());
+ clearById(ns1, OID());
ASSERT_EQ(_bucketStateManager.getEra(), 2);
clear(ns1);
ASSERT_EQ(_bucketStateManager.getEra(), 3);
@@ -508,9 +511,9 @@ TEST_F(BucketCatalogStateManagerTest, HasBeenClearedToleratesGapsInRegistry) {
auto bucket2 = createBucket(info2);
ASSERT_EQ(bucket2->getEra(), 3);
- clearById(OID());
- clearById(OID());
- clearById(OID());
+ clearById(ns1, OID());
+ clearById(ns1, OID());
+ clearById(ns1, OID());
ASSERT_EQ(_bucketStateManager.getEra(), 9);
ASSERT_TRUE(hasBeenCleared(bucket1));
ASSERT_FALSE(hasBeenCleared(bucket2));
@@ -525,7 +528,7 @@ TEST_F(BucketCatalogStateManagerTest, ArchivingBucketPreservesState) {
true};
auto bucket = createBucket(info1);
- auto bucketId = bucket->id();
+ auto bucketId = bucket->bucketId();
ClosedBuckets closedBuckets;
_archiveBucket(&_stripes[info1.stripe], WithLock::withoutLock(), bucket, &closedBuckets);
@@ -539,7 +542,7 @@ TEST_F(BucketCatalogStateManagerTest, AbortingBatchRemovesBucketState) {
true};
auto bucket = createBucket(info1);
- auto bucketId = bucket->id();
+ auto bucketId = bucket->bucketId();
auto stats = _getExecutionStats(info1.key.ns);
auto batch = std::make_shared<WriteBatch>(BucketHandle{bucketId, info1.stripe}, 0, stats);
@@ -553,7 +556,7 @@ TEST_F(BucketCatalogStateManagerTest, ClosingBucketGoesThroughPendingCompression
true};
auto bucket = createBucket(info1);
- auto bucketId = bucket->id();
+ auto bucketId = bucket->bucketId();
ASSERT(_bucketStateManager.getBucketState(bucketId).value() == BucketState{});
@@ -571,7 +574,7 @@ TEST_F(BucketCatalogStateManagerTest, ClosingBucketGoesThroughPendingCompression
CommitInfo commitInfo{};
auto closedBucket = finish(batch, commitInfo);
ASSERT(closedBucket.has_value());
- ASSERT_EQ(closedBucket.value().bucketId, bucketId);
+ ASSERT_EQ(closedBucket.value().bucketId.oid, bucketId.oid);
// Bucket should now be in pending compression state.
ASSERT(_bucketStateManager.getBucketState(bucketId).has_value());
@@ -588,8 +591,8 @@ TEST_F(BucketCatalogStateManagerTest, DirectWriteStartInitializesBucketState) {
RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements",
true};
- auto bucketId = OID();
- directWriteStart(bucketId);
+ auto bucketId = BucketId{ns1, OID()};
+ directWriteStart(ns1, bucketId.oid);
auto state = _bucketStateManager.getBucketState(bucketId);
ASSERT_TRUE(state.has_value());
ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite));
@@ -599,13 +602,13 @@ TEST_F(BucketCatalogStateManagerTest, DirectWriteFinishRemovesBucketState) {
RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements",
true};
- auto bucketId = OID();
- directWriteStart(bucketId);
+ auto bucketId = BucketId{ns1, OID()};
+ directWriteStart(ns1, bucketId.oid);
auto state = _bucketStateManager.getBucketState(bucketId);
ASSERT_TRUE(state.has_value());
ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite));
- directWriteFinish(bucketId);
+ directWriteFinish(ns1, bucketId.oid);
state = _bucketStateManager.getBucketState(bucketId);
ASSERT_FALSE(state.has_value());
}
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index ef3752a9707..2acb41bc3d8 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -570,7 +570,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
ASSERT(result.isOK());
auto batch = result.getValue().batch;
- auto oldId = batch->bucket().id;
+ auto oldId = batch->bucket().bucketId;
_commit(batch, 0);
ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
ASSERT(batch->newFieldNamesToBeInserted().count(_timeField)) << batch->toBSON();
@@ -626,7 +626,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto& batch2 = result2.getValue().batch;
- ASSERT_NE(oldId, batch2->bucket().id);
+ ASSERT_NE(oldId, batch2->bucket().bucketId);
_commit(batch2, 0);
ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON();
ASSERT(batch2->newFieldNamesToBeInserted().count(_timeField)) << batch2->toBSON();
@@ -730,7 +730,8 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) {
ASSERT_EQ(batch->measurements().size(), 1);
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
- ASSERT_THROWS(_bucketCatalog->directWriteStart(batch->bucket().id), WriteConflictException);
+ ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch->bucket().bucketId.oid),
+ WriteConflictException);
_bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT(batch->finished());
@@ -763,10 +764,11 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
.getValue()
.batch;
ASSERT_NE(batch1, batch2);
- ASSERT_EQ(batch1->bucket().id, batch2->bucket().id);
+ ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId);
// Now clear the bucket. Since there's a prepared batch it should conflict.
- ASSERT_THROWS(_bucketCatalog->directWriteStart(batch1->bucket().id), WriteConflictException);
+ ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucket().bucketId.oid),
+ WriteConflictException);
// Now try to prepare the second batch. Ensure it aborts the batch.
ASSERT(batch2->claimCommitRights());
@@ -775,7 +777,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
// Make sure we didn't clear the bucket state when we aborted the second batch.
- ASSERT_THROWS(_bucketCatalog->directWriteStart(batch1->bucket().id), WriteConflictException);
+ ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucket().bucketId.oid),
+ WriteConflictException);
// Make sure a subsequent insert, which opens a new bucket, doesn't corrupt the old bucket
// state and prevent us from finishing the first batch.
@@ -790,7 +793,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
.batch;
ASSERT_NE(batch1, batch3);
ASSERT_NE(batch2, batch3);
- ASSERT_NE(batch1->bucket().id, batch3->bucket().id);
+ ASSERT_NE(batch1->bucket().bucketId, batch3->bucket().bucketId);
// Clean up this batch
ASSERT(batch3->claimCommitRights());
_bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""});
@@ -940,8 +943,8 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue()
.batch;
- ASSERT_EQ(batch1->bucket().id, batch2->bucket().id);
- ASSERT_EQ(batch1->bucket().id, batch3->bucket().id);
+ ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId);
+ ASSERT_EQ(batch1->bucket().bucketId, batch3->bucket().bucketId);
ASSERT(batch1->claimCommitRights());
ASSERT(batch2->claimCommitRights());
@@ -975,7 +978,7 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue()
.batch;
- ASSERT_NE(batch2->bucket().id, batch4->bucket().id);
+ ASSERT_NE(batch2->bucket().bucketId, batch4->bucket().bucketId);
}
TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
@@ -1446,7 +1449,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
"featureFlagTimeseriesScalabilityImprovements", true};
RAIIServerParameterControllerForTest memoryController{
"timeseriesIdleBucketExpiryMemoryUsageThreshold",
- 200}; // An absurdly low limit that only allows us one open bucket at a time.
+ 250}; // An absurdly low limit that only allows us one open bucket at a time.
setGlobalFailPoint("alwaysUseSameBucketCatalogStripe",
BSON("mode"
<< "alwaysOn"));
@@ -1481,7 +1484,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
ASSERT(batch);
- auto bucketId = batch->bucket().id;
+ auto bucketId = batch->bucket().bucketId;
ASSERT(batch->claimCommitRights());
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
@@ -1500,7 +1503,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
ASSERT(!result.getValue().batch);
ASSERT_TRUE(stdx::holds_alternative<BSONObj>(result.getValue().candidate));
- // So should time forward.
+ // Time forward should not hint to re-open.
result = _bucketCatalog->tryInsert(
_opCtx,
_ns1,
@@ -1511,7 +1514,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
ASSERT_OK(result.getStatus());
ASSERT(result.getValue().closedBuckets.empty());
ASSERT(!result.getValue().batch);
- ASSERT_TRUE(stdx::holds_alternative<BSONObj>(result.getValue().candidate));
+ ASSERT_TRUE(stdx::holds_alternative<std::monostate>(result.getValue().candidate));
// Now let's insert something with a different meta, so we open a new bucket, see we're past the
// memory limit, and archive the existing bucket.
@@ -1526,7 +1529,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold));
ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold));
batch = result.getValue().batch;
- ASSERT_NE(batch->bucket().id, bucketId);
+ ASSERT_NE(batch->bucket().bucketId, bucketId);
ASSERT(batch);
ASSERT(batch->claimCommitRights());
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
@@ -1546,7 +1549,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
ASSERT(result.getValue().closedBuckets.empty());
ASSERT(!result.getValue().batch);
ASSERT_TRUE(stdx::holds_alternative<OID>(result.getValue().candidate));
- ASSERT_EQ(stdx::get<OID>(result.getValue().candidate), bucketId);
+ ASSERT_EQ(stdx::get<OID>(result.getValue().candidate), bucketId.oid);
}
TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) {
@@ -1565,7 +1568,7 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket)
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
ASSERT(batch);
- auto bucketId = batch->bucket().id;
+ auto bucketId = batch->bucket().bucketId;
ASSERT(batch->claimCommitRights());
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
@@ -1583,7 +1586,7 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket)
ASSERT_OK(result.getStatus());
batch = result.getValue().batch;
ASSERT(batch);
- ASSERT_NE(batch->bucket().id, bucketId);
+ ASSERT_NE(batch->bucket().bucketId, bucketId);
ASSERT(batch->claimCommitRights());
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
@@ -1607,7 +1610,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
ASSERT(batch);
- auto oldBucketId = batch->bucket().id;
+ auto oldBucketId = batch->bucket().bucketId;
ASSERT(batch->claimCommitRights());
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
@@ -1618,7 +1621,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
"control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"}},
"max":{"time":{"$date":"2022-06-06T15:34:30.000Z"}}},
"data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"}}}})");
- ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId);
+ ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId.oid);
auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto {
return autoColl->checkValidation(opCtx, bucketDoc);
};
@@ -1638,7 +1641,7 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
ASSERT_OK(result.getStatus());
batch = result.getValue().batch;
ASSERT(batch);
- ASSERT_EQ(batch->bucket().id, bucketDoc["_id"].OID());
+ ASSERT_EQ(batch->bucket().bucketId.oid, bucketDoc["_id"].OID());
ASSERT(batch->claimCommitRights());
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
@@ -1681,7 +1684,7 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) {
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
ASSERT(batch);
- auto oldBucketId = batch->bucket().id;
+ auto oldBucketId = batch->bucket().bucketId;
ASSERT(batch->claimCommitRights());
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
@@ -1692,16 +1695,17 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) {
"control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"}},
"max":{"time":{"$date":"2022-06-06T15:34:30.000Z"}}},
"data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"}}}})");
- ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId);
+ ASSERT_NE(bucketDoc["_id"].OID(), oldBucketId.oid);
auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto {
return autoColl->checkValidation(opCtx, bucketDoc);
};
// If we advance the catalog era, then we shouldn't use a bucket that was fetched during a
// previous era.
+ const NamespaceString fakeNs{"test.foo"};
const auto fakeId = OID();
- _bucketCatalog->directWriteStart(fakeId);
- _bucketCatalog->directWriteFinish(fakeId);
+ _bucketCatalog->directWriteStart(fakeNs, fakeId);
+ _bucketCatalog->directWriteFinish(fakeNs, fakeId);
BucketCatalog::BucketFindResult findResult;
findResult.bucketToReopen =