diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2021-02-12 21:02:32 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-20 00:03:52 +0000 |
commit | dbf6cdde5434c5e0fe7d6435fbe74b5da53595d4 (patch) | |
tree | 595b1e0440bfd9ab5dc23d7cc41324a8e743c3b6 /src/mongo | |
parent | 9819cd8260e0878324cf729cdc48110b84008ef4 (diff) | |
download | mongo-dbf6cdde5434c5e0fe7d6435fbe74b5da53595d4.tar.gz |
SERVER-54516 Introduce finer-grained locking in BucketCatalog and tighten critical sections
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 479 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 132 |
2 files changed, 472 insertions, 139 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index f4d21d6af7d..5da44c9d501 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -50,6 +50,9 @@ uint8_t numDigits(uint32_t num) { } } // namespace +const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::kEmptyStats{ + std::make_shared<BucketCatalog::ExecutionStats>()}; + BSONObj BucketCatalog::CommitData::toBSON() const { return BSON("docs" << docs << "bucketMin" << bucketMin << "bucketMax" << bucketMax << "numCommittedMeasurements" << int(numCommittedMeasurements) @@ -67,20 +70,17 @@ BucketCatalog& BucketCatalog::get(OperationContext* opCtx) { } BSONObj BucketCatalog::getMetadata(const BucketId& bucketId) const { - stdx::lock_guard lk(_mutex); - auto it = _buckets.find(bucketId); - if (it == _buckets.cend()) { + BucketAccess bucket{const_cast<BucketCatalog*>(this), bucketId}; + if (!bucket) { return {}; } - const auto& bucket = it->second; - return bucket.metadata.metadata; + + return bucket->metadata.metadata; } StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* opCtx, const NamespaceString& ns, const BSONObj& doc) { - stdx::lock_guard lk(_mutex); - auto viewCatalog = DatabaseHolder::get(opCtx)->getViewCatalog(opCtx, ns.db()); invariant(viewCatalog); auto viewDef = viewCatalog->lookup(opCtx, ns.ns()); @@ -97,7 +97,8 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* } auto key = std::make_tuple(ns, BucketMetadata{metadata.obj(), viewDef}); - auto& stats = _executionStats[ns]; + auto stats = _getExecutionStats(ns); + invariant(stats); auto timeElem = doc[options.getTimeField()]; if (!timeElem || BSONType::Date != timeElem.type()) { @@ -107,21 +108,8 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* } auto time = timeElem.Date(); - auto createNewBucketId = [&] { - _expireIdleBuckets(&stats); - return BucketIdInternal{time, ++_bucketNum}; - }; - - auto it = _bucketIds.find(key); - if (it == _bucketIds.end()) { - // A bucket for this namespace and metadata pair does not yet exist. - it = _bucketIds.insert({std::move(key), createNewBucketId()}).first; - _nsBuckets.insert({ns, it->second}); - stats.numBucketsOpenedDueToMetadata++; - } - _idleBuckets.erase(it->second); - auto bucket = &_buckets[it->second]; + BucketAccess bucket{this, key, stats.get(), time}; StringSet newFieldNamesToBeInserted; uint32_t newFieldNamesSize = 0; @@ -132,47 +120,35 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* &newFieldNamesSize, &sizeToBeAdded); - auto isBucketFull = [&]() { - if (bucket->numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { - stats.numBucketsClosedDueToCount++; + auto isBucketFull = [&](BucketAccess* bucket, std::size_t hash) -> bool { + if ((*bucket)->numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { + stats->numBucketsClosedDueToCount.fetchAndAddRelaxed(1); return true; } - if (bucket->size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { - stats.numBucketsClosedDueToSize++; + if ((*bucket)->size + sizeToBeAdded > + static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { + stats->numBucketsClosedDueToSize.fetchAndAddRelaxed(1); return true; } - auto bucketTime = it->second.getTime(); + auto bucketTime = (*bucket).id().getTime(); if (time - bucketTime >= kTimeseriesBucketMaxTimeRange) { - stats.numBucketsClosedDueToTimeForward++; + stats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(1); return true; } if (time < bucketTime) { - if (!bucket->hasBeenCommitted() && - bucket->latestTime - time < kTimeseriesBucketMaxTimeRange) { - it->second.setTime(time); + if (!(*bucket)->hasBeenCommitted() && + (*bucket)->latestTime - time < kTimeseriesBucketMaxTimeRange) { + (*bucket).setTime(hash); } else { - stats.numBucketsClosedDueToTimeBackward++; + stats->numBucketsClosedDueToTimeBackward.fetchAndAddRelaxed(1); return true; } } return false; }; - if (!bucket->ns.isEmpty() && isBucketFull()) { - // The bucket is full, so create a new one. - if (bucket->numPendingCommitMeasurements == 0 && - bucket->numCommittedMeasurements == bucket->numMeasurements) { - // The bucket does not contain any measurements that are yet to be committed, so we can - // remove it now. Otherwise, we must keep the bucket around until it is committed. - _memoryUsage -= bucket->memoryUsage; - _buckets.erase(it->second); - _nsBuckets.erase({std::get<NamespaceString>(it->first), it->second}); - } else { - bucket->full = true; - } - it->second = createNewBucketId(); - _nsBuckets.insert({ns, it->second}); - bucket = &_buckets[it->second]; + if (!bucket->ns.isEmpty() && isBucketFull(&bucket, 0)) { + bucket.rollover(isBucketFull); bucket->calculateBucketFieldsAndSizeChange(doc, options.getMetaField(), &newFieldNamesToBeInserted, @@ -202,7 +178,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* if (bucket->ns.isEmpty()) { // The namespace and metadata only need to be set if this bucket was newly created. bucket->ns = ns; - bucket->metadata = std::get<BucketMetadata>(it->first); + bucket->metadata = std::get<BucketMetadata>(key); // The namespace is stored three times: the bucket itself, _bucketIds, and _nsBuckets. // The metadata is stored two times: the bucket itself and _bucketIds. @@ -211,59 +187,57 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* bucket->memoryUsage += (ns.size() * 3) + (bucket->metadata.metadata.objsize() * 2) + ((sizeof(BucketId) + sizeof(OID)) * 4); } else { - _memoryUsage -= bucket->memoryUsage; + _memoryUsage.fetchAndSubtract(bucket->memoryUsage); } bucket->memoryUsage -= bucket->min.getMemoryUsage() + bucket->max.getMemoryUsage(); bucket->min.update(doc, options.getMetaField(), viewDef->defaultCollator(), std::less<>()); bucket->max.update(doc, options.getMetaField(), viewDef->defaultCollator(), std::greater<>()); bucket->memoryUsage += newFieldNamesSize + bucket->min.getMemoryUsage() + bucket->max.getMemoryUsage(); - _memoryUsage += bucket->memoryUsage; + _memoryUsage.fetchAndAdd(bucket->memoryUsage); - return {InsertResult{it->second, std::move(commitInfoFuture)}}; + return {InsertResult{bucket.id(), std::move(commitInfoFuture)}}; } BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId, boost::optional<CommitInfo> previousCommitInfo) { - stdx::lock_guard lk(_mutex); - auto it = _buckets.find(bucketId); - invariant(it != _buckets.end()); - auto& bucket = it->second; + BucketAccess bucket{this, bucketId}; + invariant(bucket); // The only case in which previousCommitInfo should not be provided is the first time a given // committer calls this function. - invariant(!previousCommitInfo || bucket.hasBeenCommitted()); + invariant(!previousCommitInfo || bucket->hasBeenCommitted()); - auto newFieldNamesToBeInserted = bucket.newFieldNamesToBeInserted; - bucket.fieldNames.merge(bucket.newFieldNamesToBeInserted); - bucket.newFieldNamesToBeInserted.clear(); + auto newFieldNamesToBeInserted = bucket->newFieldNamesToBeInserted; + bucket->fieldNames.merge(bucket->newFieldNamesToBeInserted); + bucket->newFieldNamesToBeInserted.clear(); std::vector<BSONObj> measurements; - bucket.measurementsToBeInserted.swap(measurements); + bucket->measurementsToBeInserted.swap(measurements); - auto& stats = _executionStats[bucket.ns]; - stats.numMeasurementsCommitted += measurements.size(); + auto stats = _getExecutionStats(bucket->ns); + stats->numMeasurementsCommitted.fetchAndAddRelaxed(measurements.size()); // Inform waiters that their measurements have been committed. - for (uint32_t i = 0; i < bucket.numPendingCommitMeasurements; i++) { - if (auto& promise = bucket.promises.front()) { + for (uint32_t i = 0; i < bucket->numPendingCommitMeasurements; i++) { + if (auto& promise = bucket->promises.front()) { promise->emplaceValue(*previousCommitInfo); } - bucket.promises.pop(); + bucket->promises.pop(); } - if (bucket.numPendingCommitMeasurements) { - stats.numWaits += bucket.numPendingCommitMeasurements - 1; + if (bucket->numPendingCommitMeasurements) { + stats->numWaits.fetchAndAddRelaxed(bucket->numPendingCommitMeasurements - 1); } - bucket.numWriters -= bucket.numPendingCommitMeasurements; - bucket.numCommittedMeasurements += - std::exchange(bucket.numPendingCommitMeasurements, measurements.size()); + bucket->numWriters -= bucket->numPendingCommitMeasurements; + bucket->numCommittedMeasurements += + std::exchange(bucket->numPendingCommitMeasurements, measurements.size()); auto [bucketMin, bucketMax] = [&bucket]() -> std::pair<BSONObj, BSONObj> { - if (bucket.numCommittedMeasurements == 0) { - return {bucket.min.toBSON(), bucket.max.toBSON()}; + if (bucket->numCommittedMeasurements == 0) { + return {bucket->min.toBSON(), bucket->max.toBSON()}; } else { - return {bucket.min.getUpdates(), bucket.max.getUpdates()}; + return {bucket->min.getUpdates(), bucket->max.getUpdates()}; } }(); @@ -271,25 +245,32 @@ BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId, CommitData data = {std::move(measurements), std::move(bucketMin), std::move(bucketMax), - bucket.numCommittedMeasurements, + bucket->numCommittedMeasurements, std::move(newFieldNamesToBeInserted)}; if (allCommitted) { - if (bucket.full) { + if (bucket->full) { // Everything in the bucket has been committed, and nothing more will be added since the // bucket is full. Thus, we can remove it. - _memoryUsage -= bucket.memoryUsage; - _nsBuckets.erase({std::move(it->second.ns), bucketId}); + _memoryUsage.fetchAndSubtract(bucket->memoryUsage); + + invariant(bucket->promises.empty()); + bucket.release(); + auto lk = _lock(); + + auto it = _buckets.find(bucketId); + invariant(it != _buckets.end()); + _nsBuckets.erase({std::move(it->second->ns), bucketId}); _buckets.erase(it); - } else if (bucket.numWriters == 0) { - _idleBuckets.insert(bucketId); + } else if (bucket->numWriters == 0) { + _markBucketIdle(bucketId); } } else { - stats.numCommits++; - if (bucket.numCommittedMeasurements == 0) { - stats.numBucketInserts++; + stats->numCommits.fetchAndAddRelaxed(1); + if (bucket->numCommittedMeasurements == 0) { + stats->numBucketInserts.fetchAndAddRelaxed(1); } else { - stats.numBucketUpdates++; + stats->numBucketUpdates.fetchAndAddRelaxed(1); } } @@ -297,28 +278,27 @@ BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId, } void BucketCatalog::clear(const BucketId& bucketId) { - stdx::lock_guard lk(_mutex); - - auto it = _buckets.find(bucketId); - if (it == _buckets.end()) { + BucketAccess bucket{this, bucketId}; + if (!bucket) { return; } - auto& bucket = it->second; - while (!bucket.promises.empty()) { - if (auto& promise = bucket.promises.front()) { + while (!bucket->promises.empty()) { + if (auto& promise = bucket->promises.front()) { promise->setError({ErrorCodes::TimeseriesBucketCleared, str::stream() << "Time-series bucket " << *bucketId << " for " - << bucket.ns << " was cleared"}); + << bucket->ns << " was cleared"}); } - bucket.promises.pop(); + bucket->promises.pop(); } + bucket.release(); + auto lk = _lock(); _removeBucket(bucketId); } void BucketCatalog::clear(const NamespaceString& ns) { - stdx::lock_guard lk(_mutex); + auto lk = _lock(); auto shouldClear = [&ns](const NamespaceString& bucketNs) { return ns.coll().empty() ? ns.db() == bucketNs.db() : ns == bucketNs; @@ -327,7 +307,10 @@ void BucketCatalog::clear(const NamespaceString& ns) { for (auto it = _nsBuckets.lower_bound({ns, BucketIdInternal::min()}); it != _nsBuckets.end() && shouldClear(std::get<NamespaceString>(*it));) { auto nextIt = std::next(it); - _executionStats.erase(std::get<NamespaceString>(*it)); + { + stdx::lock_guard statsLock{_executionStatsLock}; + _executionStats.erase(std::get<NamespaceString>(*it)); + } _removeBucket(std::get<BucketId>(*it), it); it = nextIt; } @@ -338,60 +321,119 @@ void BucketCatalog::clear(StringData dbName) { } void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const { - stdx::lock_guard lk(_mutex); - - auto it = _executionStats.find(ns); - const auto& stats = it == _executionStats.end() ? ExecutionStats() : it->second; - - builder->appendNumber("numBucketInserts", stats.numBucketInserts); - builder->appendNumber("numBucketUpdates", stats.numBucketUpdates); - builder->appendNumber("numBucketsOpenedDueToMetadata", stats.numBucketsOpenedDueToMetadata); - builder->appendNumber("numBucketsClosedDueToCount", stats.numBucketsClosedDueToCount); - builder->appendNumber("numBucketsClosedDueToSize", stats.numBucketsClosedDueToSize); + const auto stats = _getExecutionStats(ns); + + builder->appendNumber("numBucketInserts", stats->numBucketInserts.load()); + builder->appendNumber("numBucketUpdates", stats->numBucketUpdates.load()); + builder->appendNumber("numBucketsOpenedDueToMetadata", + stats->numBucketsOpenedDueToMetadata.load()); + builder->appendNumber("numBucketsClosedDueToCount", stats->numBucketsClosedDueToCount.load()); + builder->appendNumber("numBucketsClosedDueToSize", stats->numBucketsClosedDueToSize.load()); builder->appendNumber("numBucketsClosedDueToTimeForward", - stats.numBucketsClosedDueToTimeForward); + stats->numBucketsClosedDueToTimeForward.load()); builder->appendNumber("numBucketsClosedDueToTimeBackward", - stats.numBucketsClosedDueToTimeBackward); + stats->numBucketsClosedDueToTimeBackward.load()); builder->appendNumber("numBucketsClosedDueToMemoryThreshold", - stats.numBucketsClosedDueToMemoryThreshold); - builder->appendNumber("numCommits", stats.numCommits); - builder->appendNumber("numWaits", stats.numWaits); - builder->appendNumber("numMeasurementsCommitted", stats.numMeasurementsCommitted); - if (stats.numCommits) { - builder->appendNumber("avgNumMeasurementsPerCommit", - stats.numMeasurementsCommitted / stats.numCommits); + stats->numBucketsClosedDueToMemoryThreshold.load()); + auto commits = stats->numCommits.load(); + builder->appendNumber("numCommits", commits); + builder->appendNumber("numWaits", stats->numWaits.load()); + auto measurementsCommitted = stats->numMeasurementsCommitted.load(); + builder->appendNumber("numMeasurementsCommitted", measurementsCommitted); + if (commits) { + builder->appendNumber("avgNumMeasurementsPerCommit", measurementsCommitted / commits); } } +stdx::unique_lock<Mutex> BucketCatalog::_lock() const { + return stdx::unique_lock<Mutex>{_mutex}; +} + void BucketCatalog::_removeBucket(const BucketId& bucketId, boost::optional<NsBuckets::iterator> nsBucketsIt, boost::optional<IdleBuckets::iterator> idleBucketsIt) { auto it = _buckets.find(bucketId); - _memoryUsage -= it->second.memoryUsage; + { + // take a lock on the bucket so we guarantee no one else is accessing it; + // we can release it right away since no one else can take it again without taking the + // catalog lock, which we also hold + stdx::lock_guard<Mutex> lk{it->second->lock}; + _memoryUsage.fetchAndSubtract(it->second->memoryUsage); + } if (nsBucketsIt) { _nsBuckets.erase(*nsBucketsIt); } else { - _nsBuckets.erase({it->second.ns, it->first}); + _nsBuckets.erase({it->second->ns, it->first}); } if (idleBucketsIt) { - _idleBuckets.erase(*idleBucketsIt); + _markBucketNotIdle(*idleBucketsIt); } else { - _idleBuckets.erase(it->first); + _markBucketNotIdle(it->first); } - _bucketIds.erase({std::move(it->second.ns), std::move(it->second.metadata)}); + _bucketIds.erase({std::move(it->second->ns), std::move(it->second->metadata)}); _buckets.erase(it); } +void BucketCatalog::_markBucketIdle(const BucketId& bucketId) { + stdx::lock_guard lk{_idleBucketsLock}; + _idleBuckets.insert(bucketId); +} + +void BucketCatalog::_markBucketNotIdle(const BucketId& bucketId) { + stdx::lock_guard lk{_idleBucketsLock}; + _idleBuckets.erase(bucketId); +} + +void BucketCatalog::_markBucketNotIdle(const IdleBuckets::iterator& it) { + _idleBuckets.erase(it); +} + void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats) { + stdx::lock_guard lk{_idleBucketsLock}; while (!_idleBuckets.empty() && - _memoryUsage > + _memoryUsage.load() > static_cast<std::uint64_t>(gTimeseriesIdleBucketExpiryMemoryUsageThreshold)) { _removeBucket(*_idleBuckets.begin(), boost::none, _idleBuckets.begin()); - stats->numBucketsClosedDueToMemoryThreshold++; + stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1); + } +} + +std::size_t BucketCatalog::_numberOfIdleBuckets() const { + stdx::lock_guard lk{_idleBucketsLock}; + return _idleBuckets.size(); +} + +BucketCatalog::BucketIdInternal BucketCatalog::_createNewBucketId(const Date_t& time, + ExecutionStats* stats) { + _expireIdleBuckets(stats); + return BucketIdInternal{time, ++_bucketNum}; +} + +std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats( + const NamespaceString& ns) { + stdx::lock_guard lock(_executionStatsLock); + + auto it = _executionStats.find(ns); + if (it != _executionStats.end()) { + return it->second; } + + auto res = _executionStats.emplace(ns, std::make_shared<ExecutionStats>()); + return res.first->second; +} + +const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats( + const NamespaceString& ns) const { + stdx::lock_guard lock(_executionStatsLock); + + auto it = _executionStats.find(ns); + if (it != _executionStats.end()) { + return it->second; + } + return kEmptyStats; } bool BucketCatalog::BucketMetadata::operator<(const BucketMetadata& other) const { @@ -441,6 +483,189 @@ bool BucketCatalog::Bucket::hasBeenCommitted() const { return numCommittedMeasurements != 0 || numPendingCommitMeasurements != 0; } +BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, + const std::tuple<NamespaceString, BucketMetadata>& key, + ExecutionStats* stats, + const Date_t& time) + : _catalog(catalog), _key(&key), _stats(stats), _time(&time), _id(Date_t(), 0) { + // precompute the hash outside the lock, since it's expensive + auto hasher = _catalog->_bucketIds.hash_function(); + auto hash = hasher(*_key); + + auto lk = _catalog->_lock(); + { + auto it = _catalog->_bucketIds.find(*_key, hash); + if (it == _catalog->_bucketIds.end()) { + // A bucket for this namespace and metadata pair does not yet exist. + it = _catalog->_bucketIds.insert({*_key, _catalog->_createNewBucketId(*_time, _stats)}) + .first; + _catalog->_nsBuckets.insert({std::get<mongo::NamespaceString>(*_key), it->second}); + _stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1); + } + _id = it->second; + } + + _catalog->_markBucketNotIdle(_id); + auto it = _catalog->_buckets.find(_id); + if (it != _catalog->_buckets.end()) { + _bucket = it->second; + } else { + auto res = _catalog->_buckets.emplace(_id, std::make_shared<Bucket>()); + _bucket = res.first->second; + } + _acquire(); +} + +BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, const BucketId& bucketId) + : _catalog(catalog), _id(Date_t(), 0) { + auto lk = _catalog->_lock(); + + auto it = _catalog->_buckets.find(bucketId); + if (it != _catalog->_buckets.end()) { + _bucket = it->second; + _acquire(); + } +} + +BucketCatalog::BucketAccess::~BucketAccess() { + if (isLocked()) { + release(); + } +} + +void BucketCatalog::BucketAccess::_acquire() { + invariant(_bucket); + _guard = stdx::unique_lock<Mutex>(_bucket->lock); +} + +void BucketCatalog::BucketAccess::release() { + invariant(_guard.owns_lock()); + _guard.unlock(); + _bucket.reset(); +} + +bool BucketCatalog::BucketAccess::isLocked() const { + return _bucket && _guard.owns_lock(); +} + +BucketCatalog::Bucket* BucketCatalog::BucketAccess::operator->() { + invariant(isLocked()); + return _bucket.get(); +} + +BucketCatalog::BucketAccess::operator bool() const { + return isLocked(); +} + +void BucketCatalog::BucketAccess::rollover( + const std::function<bool(BucketAccess*, std::size_t)>& isBucketFull) { + invariant(isLocked()); + invariant(_key); + invariant(_time); + + auto oldId = _id; + release(); + + // precompute the hash outside the lock, since it's expensive + auto hasher = _catalog->_bucketIds.hash_function(); + auto hash = hasher(*_key); + + auto lk = _catalog->_lock(); + BucketIdInternal* actualId; + { + auto it = _catalog->_bucketIds.find(*_key, hash); + if (it == _catalog->_bucketIds.end()) { + // A bucket for this namespace and metadata pair does not yet exist. + it = _catalog->_bucketIds.insert({*_key, _catalog->_createNewBucketId(*_time, _stats)}) + .first; + _catalog->_nsBuckets.insert({std::get<mongo::NamespaceString>(*_key), it->second}); + _stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1); + } + _id = it->second; + actualId = &it->second; + } + + _catalog->_markBucketNotIdle(_id); + auto it = _catalog->_buckets.find(_id); + if (it != _catalog->_buckets.end()) { + _bucket = it->second; + } else { + auto res = _catalog->_buckets.emplace(_id, std::make_shared<Bucket>()); + _bucket = res.first->second; + } + _acquire(); + + // recheck if full now that we've reacquired the bucket + bool newBucket = oldId != _id; // only record stats if bucket has changed, don't double-count + if (!newBucket || isBucketFull(this, hash)) { + // The bucket is full, so create a new one. + if (_bucket->numPendingCommitMeasurements == 0 && + _bucket->numCommittedMeasurements == _bucket->numMeasurements) { + // The bucket does not contain any measurements that are yet to be committed, so we can + // remove it now. Otherwise, we must keep the bucket around until it is committed. + _catalog->_memoryUsage.fetchAndSubtract(_bucket->memoryUsage); + + release(); + + _catalog->_buckets.erase(_id); + _catalog->_nsBuckets.erase({std::get<NamespaceString>(*_key), _id}); + } else { + _bucket->full = true; + release(); + } + + *actualId = _catalog->_createNewBucketId(*_time, _stats); + _id = *actualId; + _catalog->_nsBuckets.insert({std::get<mongo::NamespaceString>(*_key), _id}); + auto res = _catalog->_buckets.emplace(_id, std::make_shared<Bucket>()); + invariant(res.second); + _bucket = res.first->second; + _acquire(); + } +} + +const BucketCatalog::BucketIdInternal& BucketCatalog::BucketAccess::id() { + invariant(_time); // id is only set appropriately if we set time in the constructor + + return _id; +} + +void BucketCatalog::BucketAccess::setTime(std::size_t hash) { + invariant(isLocked()); + invariant(_key); + invariant(_stats); + invariant(_time); + + bool isCatalogLocked = hash != 0; + stdx::unique_lock<Mutex> lk; + if (!isCatalogLocked) { + // precompute the hash outside the lock, since it's expensive + auto hasher = _catalog->_bucketIds.hash_function(); + hash = hasher(*_key); + + release(); + lk = _catalog->_lock(); + } + + auto it = _catalog->_bucketIds.find(*_key, hash); + if (it == _catalog->_bucketIds.end()) { + // someone else got rid of our bucket between releasing it and reacquiring the catalog lock, + // just generate a new bucket and lock it + it = _catalog->_bucketIds.insert({*_key, _catalog->_createNewBucketId(*_time, _stats)}) + .first; + _catalog->_nsBuckets.insert({std::get<mongo::NamespaceString>(*_key), it->second}); + _stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1); + } else { + it->second.setTime(*_time); + } + + if (!isCatalogLocked) { + _id = it->second; + _bucket = _catalog->_buckets[_id]; + _acquire(); + } +} + void BucketCatalog::MinMax::update(const BSONObj& doc, boost::optional<StringData> metaField, const StringData::ComparatorInterface* stringComparator, @@ -678,6 +903,10 @@ bool BucketCatalog::BucketId::operator==(const BucketId& other) const { return _num == other._num; } +bool BucketCatalog::BucketId::operator!=(const BucketId& other) const { + return _num != other._num; +} + bool BucketCatalog::BucketId::operator<(const BucketId& other) const { return _num < other._num; } @@ -709,7 +938,8 @@ public: BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override { const auto& bucketCatalog = BucketCatalog::get(opCtx); - stdx::lock_guard lk(bucketCatalog._mutex); + auto lk = bucketCatalog._lock(); + stdx::lock_guard eslk{bucketCatalog._executionStatsLock}; if (bucketCatalog._executionStats.empty()) { return {}; @@ -718,8 +948,9 @@ public: BSONObjBuilder builder; builder.appendNumber("numBuckets", bucketCatalog._buckets.size()); builder.appendNumber("numOpenBuckets", bucketCatalog._bucketIds.size()); - builder.appendNumber("numIdleBuckets", bucketCatalog._idleBuckets.size()); - builder.appendNumber("memoryUsage", static_cast<long long>(bucketCatalog._memoryUsage)); + builder.appendNumber("numIdleBuckets", bucketCatalog._numberOfIdleBuckets()); + builder.appendNumber("memoryUsage", + static_cast<long long>(bucketCatalog._memoryUsage.load())); return builder.obj(); } } bucketCatalogServerStatus; diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 87687161b79..09164a166a9 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -51,6 +51,7 @@ public: const OID* operator->() const; bool operator==(const BucketId& other) const; + bool operator!=(const BucketId& other) const; bool operator<(const BucketId& other) const; template <typename H> @@ -227,6 +228,9 @@ private: }; struct Bucket { + // Access to the bucket is controlled by this lock + Mutex lock; + // The namespace that this bucket is used for. NamespaceString ns; @@ -297,19 +301,20 @@ private: }; struct ExecutionStats { - long long numBucketInserts = 0; - long long numBucketUpdates = 0; - long long numBucketsOpenedDueToMetadata = 0; - long long numBucketsClosedDueToCount = 0; - long long numBucketsClosedDueToSize = 0; - long long numBucketsClosedDueToTimeForward = 0; - long long numBucketsClosedDueToTimeBackward = 0; - long long numBucketsClosedDueToMemoryThreshold = 0; - long long numCommits = 0; - long long numWaits = 0; - long long numMeasurementsCommitted = 0; + AtomicWord<long long> numBucketInserts; + AtomicWord<long long> numBucketUpdates; + AtomicWord<long long> numBucketsOpenedDueToMetadata; + AtomicWord<long long> numBucketsClosedDueToCount; + AtomicWord<long long> numBucketsClosedDueToSize; + AtomicWord<long long> numBucketsClosedDueToTimeForward; + AtomicWord<long long> numBucketsClosedDueToTimeBackward; + AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; + AtomicWord<long long> numCommits; + AtomicWord<long long> numWaits; + AtomicWord<long long> numMeasurementsCommitted; }; + // a wrapper around the numerical id to allow timestamp manipulations class BucketIdInternal : public BucketId { public: static BucketIdInternal min(); @@ -320,11 +325,68 @@ private: void setTime(const Date_t& time); }; + /** + * Helper class to handle all the locking necessary to lookup and lock a bucket for use. This + * is intended primarily for using a single bucket, including replacing it when it becomes full. + * If the usage pattern iterates over several buckets, you will instead want to use raw access + * using the different mutexes with the locking semantics described below. + */ + class BucketAccess { + public: + BucketAccess() = delete; + BucketAccess(BucketCatalog* catalog, + const std::tuple<NamespaceString, BucketMetadata>& key, + ExecutionStats* stats, + const Date_t& time); + BucketAccess(BucketCatalog* catalog, const BucketId& bucketId); + ~BucketAccess(); + + bool isLocked() const; + Bucket* operator->(); + operator bool() const; + + // release the bucket lock, typically in order to reacquire the catalog lock + void release(); + + /** + * Close the existing, full bucket and open a new one for the same metadata. + * Parameter is a function which should check that the bucket is indeed still full after + * reacquiring the necessary locks. The first parameter will give the function access to + * this BucketAccess instance, with the bucket locked. The second parameter may provide + * the precomputed key hash for the _bucketIds map (or 0, if is hasn't been computed yet). + */ + void rollover(const std::function<bool(BucketAccess*, std::size_t)>& isBucketFull); + + // retrieve the (safely cached) id of the bucket + const BucketIdInternal& id(); + + /** + * Adjust the time associated with the bucket (id) if it hasn't been committed yet. The + * hash parameter is the precomputed hash corresponding to the bucket's key for lookup in + * the _bucketIds map (to save computation). + */ + void setTime(std::size_t hash); + + private: + void _acquire(); + + BucketCatalog* _catalog; + const std::tuple<NamespaceString, BucketMetadata>* _key; + ExecutionStats* _stats; + const Date_t* _time; + + BucketIdInternal _id; + std::shared_ptr<Bucket> _bucket; + stdx::unique_lock<Mutex> _guard; + }; + class ServerStatus; using NsBuckets = std::set<std::tuple<NamespaceString, BucketId>>; using IdleBuckets = std::set<BucketId>; + stdx::unique_lock<Mutex> _lock() const; + /** * Removes the given bucket from the bucket catalog's internal data structures. */ @@ -332,15 +394,42 @@ private: boost::optional<NsBuckets::iterator> nsBucketsIt = boost::none, boost::optional<IdleBuckets::iterator> idleBucketsIt = boost::none); + void _markBucketIdle(const BucketId& bucketId); + void _markBucketNotIdle(const BucketId& bucketId); + void _markBucketNotIdle(const IdleBuckets::iterator& it); + /** * Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold. */ void _expireIdleBuckets(ExecutionStats* stats); - mutable Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog"); + std::size_t _numberOfIdleBuckets() const; + + /** + * Creates a new (internal) bucket ID to identify a bucket. + */ + BucketIdInternal _createNewBucketId(const Date_t& time, ExecutionStats* stats); + + std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns); + const std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const; + + /** + * You must hold _mutex when accessing _buckets, _bucketIds, or _nsBuckets. While, holding a + * lock on _mutex, you can take a lock on an individual bucket, then release _mutex. Any + * iterators on the protected structures should be considered invalid once the lock is released. + * Any subsequent access to the structures requires relocking _mutex. You must *not* be holding + * a lock on a bucket when you attempt to acquire the lock on _mutex, as this can result in + * deadlock. + * + * Typically, if you want to acquire a bucket, you should use the BucketAccess RAII + * class to do so, as it will take care of most of this logic for you. Only use the _mutex + * directly for more global maintenance where you want to take the lock once and interact with + * multiple buckets atomically. + */ + mutable Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog::_mutex"); // All buckets currently in the catalog, including buckets which are full but not yet committed. - stdx::unordered_map<BucketId, Bucket> _buckets; + stdx::unordered_map<BucketId, std::shared_ptr<Bucket>> _buckets; // The _id of the current bucket for each namespace and metadata pair. stdx::unordered_map<std::tuple<NamespaceString, BucketMetadata>, BucketIdInternal> _bucketIds; @@ -348,16 +437,29 @@ private: // All buckets ordered by their namespaces. NsBuckets _nsBuckets; + // This mutex protects access to _idleBuckets + mutable Mutex _idleBucketsLock = MONGO_MAKE_LATCH("BucketCatalog::_idleBucketsLock"); + // Buckets that do not have any writers. IdleBuckets _idleBuckets; + /** + * This mutex protects access to the _executionStats map. Once you complete your lookup, you + * can keep the shared_ptr to an individual namespace's stats object and release the lock. The + * object itself is thread-safe (atomics). + */ + mutable Mutex _executionStatsLock = MONGO_MAKE_LATCH("BucketCatalog::_executionStatsLock"); + // Per-collection execution stats. - stdx::unordered_map<NamespaceString, ExecutionStats> _executionStats; + stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats; + + // A placeholder to be returned in case a namespace has no allocated statistics object + static const std::shared_ptr<ExecutionStats> kEmptyStats; // Counter for buckets created by the bucket catalog. uint64_t _bucketNum = 0; // Approximate memory usage of the bucket catalog. - uint64_t _memoryUsage = 0; + AtomicWord<uint64_t> _memoryUsage; }; } // namespace mongo |