summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2021-02-12 21:02:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-20 00:03:52 +0000
commitdbf6cdde5434c5e0fe7d6435fbe74b5da53595d4 (patch)
tree595b1e0440bfd9ab5dc23d7cc41324a8e743c3b6 /src/mongo
parent9819cd8260e0878324cf729cdc48110b84008ef4 (diff)
downloadmongo-dbf6cdde5434c5e0fe7d6435fbe74b5da53595d4.tar.gz
SERVER-54516 Introduce finer-grained locking in BucketCatalog and tighten critical sections
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp479
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h132
2 files changed, 472 insertions, 139 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index f4d21d6af7d..5da44c9d501 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -50,6 +50,9 @@ uint8_t numDigits(uint32_t num) {
}
} // namespace
+const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::kEmptyStats{
+ std::make_shared<BucketCatalog::ExecutionStats>()};
+
BSONObj BucketCatalog::CommitData::toBSON() const {
return BSON("docs" << docs << "bucketMin" << bucketMin << "bucketMax" << bucketMax
<< "numCommittedMeasurements" << int(numCommittedMeasurements)
@@ -67,20 +70,17 @@ BucketCatalog& BucketCatalog::get(OperationContext* opCtx) {
}
BSONObj BucketCatalog::getMetadata(const BucketId& bucketId) const {
- stdx::lock_guard lk(_mutex);
- auto it = _buckets.find(bucketId);
- if (it == _buckets.cend()) {
+ BucketAccess bucket{const_cast<BucketCatalog*>(this), bucketId};
+ if (!bucket) {
return {};
}
- const auto& bucket = it->second;
- return bucket.metadata.metadata;
+
+ return bucket->metadata.metadata;
}
StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext* opCtx,
const NamespaceString& ns,
const BSONObj& doc) {
- stdx::lock_guard lk(_mutex);
-
auto viewCatalog = DatabaseHolder::get(opCtx)->getViewCatalog(opCtx, ns.db());
invariant(viewCatalog);
auto viewDef = viewCatalog->lookup(opCtx, ns.ns());
@@ -97,7 +97,8 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext*
}
auto key = std::make_tuple(ns, BucketMetadata{metadata.obj(), viewDef});
- auto& stats = _executionStats[ns];
+ auto stats = _getExecutionStats(ns);
+ invariant(stats);
auto timeElem = doc[options.getTimeField()];
if (!timeElem || BSONType::Date != timeElem.type()) {
@@ -107,21 +108,8 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext*
}
auto time = timeElem.Date();
- auto createNewBucketId = [&] {
- _expireIdleBuckets(&stats);
- return BucketIdInternal{time, ++_bucketNum};
- };
-
- auto it = _bucketIds.find(key);
- if (it == _bucketIds.end()) {
- // A bucket for this namespace and metadata pair does not yet exist.
- it = _bucketIds.insert({std::move(key), createNewBucketId()}).first;
- _nsBuckets.insert({ns, it->second});
- stats.numBucketsOpenedDueToMetadata++;
- }
- _idleBuckets.erase(it->second);
- auto bucket = &_buckets[it->second];
+ BucketAccess bucket{this, key, stats.get(), time};
StringSet newFieldNamesToBeInserted;
uint32_t newFieldNamesSize = 0;
@@ -132,47 +120,35 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext*
&newFieldNamesSize,
&sizeToBeAdded);
- auto isBucketFull = [&]() {
- if (bucket->numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
- stats.numBucketsClosedDueToCount++;
+ auto isBucketFull = [&](BucketAccess* bucket, std::size_t hash) -> bool {
+ if ((*bucket)->numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
+ stats->numBucketsClosedDueToCount.fetchAndAddRelaxed(1);
return true;
}
- if (bucket->size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
- stats.numBucketsClosedDueToSize++;
+ if ((*bucket)->size + sizeToBeAdded >
+ static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
+ stats->numBucketsClosedDueToSize.fetchAndAddRelaxed(1);
return true;
}
- auto bucketTime = it->second.getTime();
+ auto bucketTime = (*bucket).id().getTime();
if (time - bucketTime >= kTimeseriesBucketMaxTimeRange) {
- stats.numBucketsClosedDueToTimeForward++;
+ stats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(1);
return true;
}
if (time < bucketTime) {
- if (!bucket->hasBeenCommitted() &&
- bucket->latestTime - time < kTimeseriesBucketMaxTimeRange) {
- it->second.setTime(time);
+ if (!(*bucket)->hasBeenCommitted() &&
+ (*bucket)->latestTime - time < kTimeseriesBucketMaxTimeRange) {
+ (*bucket).setTime(hash);
} else {
- stats.numBucketsClosedDueToTimeBackward++;
+ stats->numBucketsClosedDueToTimeBackward.fetchAndAddRelaxed(1);
return true;
}
}
return false;
};
- if (!bucket->ns.isEmpty() && isBucketFull()) {
- // The bucket is full, so create a new one.
- if (bucket->numPendingCommitMeasurements == 0 &&
- bucket->numCommittedMeasurements == bucket->numMeasurements) {
- // The bucket does not contain any measurements that are yet to be committed, so we can
- // remove it now. Otherwise, we must keep the bucket around until it is committed.
- _memoryUsage -= bucket->memoryUsage;
- _buckets.erase(it->second);
- _nsBuckets.erase({std::get<NamespaceString>(it->first), it->second});
- } else {
- bucket->full = true;
- }
- it->second = createNewBucketId();
- _nsBuckets.insert({ns, it->second});
- bucket = &_buckets[it->second];
+ if (!bucket->ns.isEmpty() && isBucketFull(&bucket, 0)) {
+ bucket.rollover(isBucketFull);
bucket->calculateBucketFieldsAndSizeChange(doc,
options.getMetaField(),
&newFieldNamesToBeInserted,
@@ -202,7 +178,7 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext*
if (bucket->ns.isEmpty()) {
// The namespace and metadata only need to be set if this bucket was newly created.
bucket->ns = ns;
- bucket->metadata = std::get<BucketMetadata>(it->first);
+ bucket->metadata = std::get<BucketMetadata>(key);
// The namespace is stored three times: the bucket itself, _bucketIds, and _nsBuckets.
// The metadata is stored two times: the bucket itself and _bucketIds.
@@ -211,59 +187,57 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(OperationContext*
bucket->memoryUsage += (ns.size() * 3) + (bucket->metadata.metadata.objsize() * 2) +
((sizeof(BucketId) + sizeof(OID)) * 4);
} else {
- _memoryUsage -= bucket->memoryUsage;
+ _memoryUsage.fetchAndSubtract(bucket->memoryUsage);
}
bucket->memoryUsage -= bucket->min.getMemoryUsage() + bucket->max.getMemoryUsage();
bucket->min.update(doc, options.getMetaField(), viewDef->defaultCollator(), std::less<>());
bucket->max.update(doc, options.getMetaField(), viewDef->defaultCollator(), std::greater<>());
bucket->memoryUsage +=
newFieldNamesSize + bucket->min.getMemoryUsage() + bucket->max.getMemoryUsage();
- _memoryUsage += bucket->memoryUsage;
+ _memoryUsage.fetchAndAdd(bucket->memoryUsage);
- return {InsertResult{it->second, std::move(commitInfoFuture)}};
+ return {InsertResult{bucket.id(), std::move(commitInfoFuture)}};
}
BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId,
boost::optional<CommitInfo> previousCommitInfo) {
- stdx::lock_guard lk(_mutex);
- auto it = _buckets.find(bucketId);
- invariant(it != _buckets.end());
- auto& bucket = it->second;
+ BucketAccess bucket{this, bucketId};
+ invariant(bucket);
// The only case in which previousCommitInfo should not be provided is the first time a given
// committer calls this function.
- invariant(!previousCommitInfo || bucket.hasBeenCommitted());
+ invariant(!previousCommitInfo || bucket->hasBeenCommitted());
- auto newFieldNamesToBeInserted = bucket.newFieldNamesToBeInserted;
- bucket.fieldNames.merge(bucket.newFieldNamesToBeInserted);
- bucket.newFieldNamesToBeInserted.clear();
+ auto newFieldNamesToBeInserted = bucket->newFieldNamesToBeInserted;
+ bucket->fieldNames.merge(bucket->newFieldNamesToBeInserted);
+ bucket->newFieldNamesToBeInserted.clear();
std::vector<BSONObj> measurements;
- bucket.measurementsToBeInserted.swap(measurements);
+ bucket->measurementsToBeInserted.swap(measurements);
- auto& stats = _executionStats[bucket.ns];
- stats.numMeasurementsCommitted += measurements.size();
+ auto stats = _getExecutionStats(bucket->ns);
+ stats->numMeasurementsCommitted.fetchAndAddRelaxed(measurements.size());
// Inform waiters that their measurements have been committed.
- for (uint32_t i = 0; i < bucket.numPendingCommitMeasurements; i++) {
- if (auto& promise = bucket.promises.front()) {
+ for (uint32_t i = 0; i < bucket->numPendingCommitMeasurements; i++) {
+ if (auto& promise = bucket->promises.front()) {
promise->emplaceValue(*previousCommitInfo);
}
- bucket.promises.pop();
+ bucket->promises.pop();
}
- if (bucket.numPendingCommitMeasurements) {
- stats.numWaits += bucket.numPendingCommitMeasurements - 1;
+ if (bucket->numPendingCommitMeasurements) {
+ stats->numWaits.fetchAndAddRelaxed(bucket->numPendingCommitMeasurements - 1);
}
- bucket.numWriters -= bucket.numPendingCommitMeasurements;
- bucket.numCommittedMeasurements +=
- std::exchange(bucket.numPendingCommitMeasurements, measurements.size());
+ bucket->numWriters -= bucket->numPendingCommitMeasurements;
+ bucket->numCommittedMeasurements +=
+ std::exchange(bucket->numPendingCommitMeasurements, measurements.size());
auto [bucketMin, bucketMax] = [&bucket]() -> std::pair<BSONObj, BSONObj> {
- if (bucket.numCommittedMeasurements == 0) {
- return {bucket.min.toBSON(), bucket.max.toBSON()};
+ if (bucket->numCommittedMeasurements == 0) {
+ return {bucket->min.toBSON(), bucket->max.toBSON()};
} else {
- return {bucket.min.getUpdates(), bucket.max.getUpdates()};
+ return {bucket->min.getUpdates(), bucket->max.getUpdates()};
}
}();
@@ -271,25 +245,32 @@ BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId,
CommitData data = {std::move(measurements),
std::move(bucketMin),
std::move(bucketMax),
- bucket.numCommittedMeasurements,
+ bucket->numCommittedMeasurements,
std::move(newFieldNamesToBeInserted)};
if (allCommitted) {
- if (bucket.full) {
+ if (bucket->full) {
// Everything in the bucket has been committed, and nothing more will be added since the
// bucket is full. Thus, we can remove it.
- _memoryUsage -= bucket.memoryUsage;
- _nsBuckets.erase({std::move(it->second.ns), bucketId});
+ _memoryUsage.fetchAndSubtract(bucket->memoryUsage);
+
+ invariant(bucket->promises.empty());
+ bucket.release();
+ auto lk = _lock();
+
+ auto it = _buckets.find(bucketId);
+ invariant(it != _buckets.end());
+ _nsBuckets.erase({std::move(it->second->ns), bucketId});
_buckets.erase(it);
- } else if (bucket.numWriters == 0) {
- _idleBuckets.insert(bucketId);
+ } else if (bucket->numWriters == 0) {
+ _markBucketIdle(bucketId);
}
} else {
- stats.numCommits++;
- if (bucket.numCommittedMeasurements == 0) {
- stats.numBucketInserts++;
+ stats->numCommits.fetchAndAddRelaxed(1);
+ if (bucket->numCommittedMeasurements == 0) {
+ stats->numBucketInserts.fetchAndAddRelaxed(1);
} else {
- stats.numBucketUpdates++;
+ stats->numBucketUpdates.fetchAndAddRelaxed(1);
}
}
@@ -297,28 +278,27 @@ BucketCatalog::CommitData BucketCatalog::commit(const BucketId& bucketId,
}
void BucketCatalog::clear(const BucketId& bucketId) {
- stdx::lock_guard lk(_mutex);
-
- auto it = _buckets.find(bucketId);
- if (it == _buckets.end()) {
+ BucketAccess bucket{this, bucketId};
+ if (!bucket) {
return;
}
- auto& bucket = it->second;
- while (!bucket.promises.empty()) {
- if (auto& promise = bucket.promises.front()) {
+ while (!bucket->promises.empty()) {
+ if (auto& promise = bucket->promises.front()) {
promise->setError({ErrorCodes::TimeseriesBucketCleared,
str::stream() << "Time-series bucket " << *bucketId << " for "
- << bucket.ns << " was cleared"});
+ << bucket->ns << " was cleared"});
}
- bucket.promises.pop();
+ bucket->promises.pop();
}
+ bucket.release();
+ auto lk = _lock();
_removeBucket(bucketId);
}
void BucketCatalog::clear(const NamespaceString& ns) {
- stdx::lock_guard lk(_mutex);
+ auto lk = _lock();
auto shouldClear = [&ns](const NamespaceString& bucketNs) {
return ns.coll().empty() ? ns.db() == bucketNs.db() : ns == bucketNs;
@@ -327,7 +307,10 @@ void BucketCatalog::clear(const NamespaceString& ns) {
for (auto it = _nsBuckets.lower_bound({ns, BucketIdInternal::min()});
it != _nsBuckets.end() && shouldClear(std::get<NamespaceString>(*it));) {
auto nextIt = std::next(it);
- _executionStats.erase(std::get<NamespaceString>(*it));
+ {
+ stdx::lock_guard statsLock{_executionStatsLock};
+ _executionStats.erase(std::get<NamespaceString>(*it));
+ }
_removeBucket(std::get<BucketId>(*it), it);
it = nextIt;
}
@@ -338,60 +321,119 @@ void BucketCatalog::clear(StringData dbName) {
}
void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const {
- stdx::lock_guard lk(_mutex);
-
- auto it = _executionStats.find(ns);
- const auto& stats = it == _executionStats.end() ? ExecutionStats() : it->second;
-
- builder->appendNumber("numBucketInserts", stats.numBucketInserts);
- builder->appendNumber("numBucketUpdates", stats.numBucketUpdates);
- builder->appendNumber("numBucketsOpenedDueToMetadata", stats.numBucketsOpenedDueToMetadata);
- builder->appendNumber("numBucketsClosedDueToCount", stats.numBucketsClosedDueToCount);
- builder->appendNumber("numBucketsClosedDueToSize", stats.numBucketsClosedDueToSize);
+ const auto stats = _getExecutionStats(ns);
+
+ builder->appendNumber("numBucketInserts", stats->numBucketInserts.load());
+ builder->appendNumber("numBucketUpdates", stats->numBucketUpdates.load());
+ builder->appendNumber("numBucketsOpenedDueToMetadata",
+ stats->numBucketsOpenedDueToMetadata.load());
+ builder->appendNumber("numBucketsClosedDueToCount", stats->numBucketsClosedDueToCount.load());
+ builder->appendNumber("numBucketsClosedDueToSize", stats->numBucketsClosedDueToSize.load());
builder->appendNumber("numBucketsClosedDueToTimeForward",
- stats.numBucketsClosedDueToTimeForward);
+ stats->numBucketsClosedDueToTimeForward.load());
builder->appendNumber("numBucketsClosedDueToTimeBackward",
- stats.numBucketsClosedDueToTimeBackward);
+ stats->numBucketsClosedDueToTimeBackward.load());
builder->appendNumber("numBucketsClosedDueToMemoryThreshold",
- stats.numBucketsClosedDueToMemoryThreshold);
- builder->appendNumber("numCommits", stats.numCommits);
- builder->appendNumber("numWaits", stats.numWaits);
- builder->appendNumber("numMeasurementsCommitted", stats.numMeasurementsCommitted);
- if (stats.numCommits) {
- builder->appendNumber("avgNumMeasurementsPerCommit",
- stats.numMeasurementsCommitted / stats.numCommits);
+ stats->numBucketsClosedDueToMemoryThreshold.load());
+ auto commits = stats->numCommits.load();
+ builder->appendNumber("numCommits", commits);
+ builder->appendNumber("numWaits", stats->numWaits.load());
+ auto measurementsCommitted = stats->numMeasurementsCommitted.load();
+ builder->appendNumber("numMeasurementsCommitted", measurementsCommitted);
+ if (commits) {
+ builder->appendNumber("avgNumMeasurementsPerCommit", measurementsCommitted / commits);
}
}
+stdx::unique_lock<Mutex> BucketCatalog::_lock() const {
+ return stdx::unique_lock<Mutex>{_mutex};
+}
+
void BucketCatalog::_removeBucket(const BucketId& bucketId,
boost::optional<NsBuckets::iterator> nsBucketsIt,
boost::optional<IdleBuckets::iterator> idleBucketsIt) {
auto it = _buckets.find(bucketId);
- _memoryUsage -= it->second.memoryUsage;
+ {
+ // take a lock on the bucket so we guarantee no one else is accessing it;
+ // we can release it right away since no one else can take it again without taking the
+ // catalog lock, which we also hold
+ stdx::lock_guard<Mutex> lk{it->second->lock};
+ _memoryUsage.fetchAndSubtract(it->second->memoryUsage);
+ }
if (nsBucketsIt) {
_nsBuckets.erase(*nsBucketsIt);
} else {
- _nsBuckets.erase({it->second.ns, it->first});
+ _nsBuckets.erase({it->second->ns, it->first});
}
if (idleBucketsIt) {
- _idleBuckets.erase(*idleBucketsIt);
+ _markBucketNotIdle(*idleBucketsIt);
} else {
- _idleBuckets.erase(it->first);
+ _markBucketNotIdle(it->first);
}
- _bucketIds.erase({std::move(it->second.ns), std::move(it->second.metadata)});
+ _bucketIds.erase({std::move(it->second->ns), std::move(it->second->metadata)});
_buckets.erase(it);
}
+void BucketCatalog::_markBucketIdle(const BucketId& bucketId) {
+ stdx::lock_guard lk{_idleBucketsLock};
+ _idleBuckets.insert(bucketId);
+}
+
+void BucketCatalog::_markBucketNotIdle(const BucketId& bucketId) {
+ stdx::lock_guard lk{_idleBucketsLock};
+ _idleBuckets.erase(bucketId);
+}
+
+void BucketCatalog::_markBucketNotIdle(const IdleBuckets::iterator& it) {
+ _idleBuckets.erase(it);
+}
+
void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats) {
+ stdx::lock_guard lk{_idleBucketsLock};
while (!_idleBuckets.empty() &&
- _memoryUsage >
+ _memoryUsage.load() >
static_cast<std::uint64_t>(gTimeseriesIdleBucketExpiryMemoryUsageThreshold)) {
_removeBucket(*_idleBuckets.begin(), boost::none, _idleBuckets.begin());
- stats->numBucketsClosedDueToMemoryThreshold++;
+ stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1);
+ }
+}
+
+std::size_t BucketCatalog::_numberOfIdleBuckets() const {
+ stdx::lock_guard lk{_idleBucketsLock};
+ return _idleBuckets.size();
+}
+
+BucketCatalog::BucketIdInternal BucketCatalog::_createNewBucketId(const Date_t& time,
+ ExecutionStats* stats) {
+ _expireIdleBuckets(stats);
+ return BucketIdInternal{time, ++_bucketNum};
+}
+
+std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats(
+ const NamespaceString& ns) {
+ stdx::lock_guard lock(_executionStatsLock);
+
+ auto it = _executionStats.find(ns);
+ if (it != _executionStats.end()) {
+ return it->second;
}
+
+ auto res = _executionStats.emplace(ns, std::make_shared<ExecutionStats>());
+ return res.first->second;
+}
+
+const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats(
+ const NamespaceString& ns) const {
+ stdx::lock_guard lock(_executionStatsLock);
+
+ auto it = _executionStats.find(ns);
+ if (it != _executionStats.end()) {
+ return it->second;
+ }
+ return kEmptyStats;
}
bool BucketCatalog::BucketMetadata::operator<(const BucketMetadata& other) const {
@@ -441,6 +483,189 @@ bool BucketCatalog::Bucket::hasBeenCommitted() const {
return numCommittedMeasurements != 0 || numPendingCommitMeasurements != 0;
}
+BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
+ const std::tuple<NamespaceString, BucketMetadata>& key,
+ ExecutionStats* stats,
+ const Date_t& time)
+ : _catalog(catalog), _key(&key), _stats(stats), _time(&time), _id(Date_t(), 0) {
+ // precompute the hash outside the lock, since it's expensive
+ auto hasher = _catalog->_bucketIds.hash_function();
+ auto hash = hasher(*_key);
+
+ auto lk = _catalog->_lock();
+ {
+ auto it = _catalog->_bucketIds.find(*_key, hash);
+ if (it == _catalog->_bucketIds.end()) {
+ // A bucket for this namespace and metadata pair does not yet exist.
+ it = _catalog->_bucketIds.insert({*_key, _catalog->_createNewBucketId(*_time, _stats)})
+ .first;
+ _catalog->_nsBuckets.insert({std::get<mongo::NamespaceString>(*_key), it->second});
+ _stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1);
+ }
+ _id = it->second;
+ }
+
+ _catalog->_markBucketNotIdle(_id);
+ auto it = _catalog->_buckets.find(_id);
+ if (it != _catalog->_buckets.end()) {
+ _bucket = it->second;
+ } else {
+ auto res = _catalog->_buckets.emplace(_id, std::make_shared<Bucket>());
+ _bucket = res.first->second;
+ }
+ _acquire();
+}
+
+BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, const BucketId& bucketId)
+ : _catalog(catalog), _id(Date_t(), 0) {
+ auto lk = _catalog->_lock();
+
+ auto it = _catalog->_buckets.find(bucketId);
+ if (it != _catalog->_buckets.end()) {
+ _bucket = it->second;
+ _acquire();
+ }
+}
+
+BucketCatalog::BucketAccess::~BucketAccess() {
+ if (isLocked()) {
+ release();
+ }
+}
+
+void BucketCatalog::BucketAccess::_acquire() {
+ invariant(_bucket);
+ _guard = stdx::unique_lock<Mutex>(_bucket->lock);
+}
+
+void BucketCatalog::BucketAccess::release() {
+ invariant(_guard.owns_lock());
+ _guard.unlock();
+ _bucket.reset();
+}
+
+bool BucketCatalog::BucketAccess::isLocked() const {
+ return _bucket && _guard.owns_lock();
+}
+
+BucketCatalog::Bucket* BucketCatalog::BucketAccess::operator->() {
+ invariant(isLocked());
+ return _bucket.get();
+}
+
+BucketCatalog::BucketAccess::operator bool() const {
+ return isLocked();
+}
+
+void BucketCatalog::BucketAccess::rollover(
+ const std::function<bool(BucketAccess*, std::size_t)>& isBucketFull) {
+ invariant(isLocked());
+ invariant(_key);
+ invariant(_time);
+
+ auto oldId = _id;
+ release();
+
+ // precompute the hash outside the lock, since it's expensive
+ auto hasher = _catalog->_bucketIds.hash_function();
+ auto hash = hasher(*_key);
+
+ auto lk = _catalog->_lock();
+ BucketIdInternal* actualId;
+ {
+ auto it = _catalog->_bucketIds.find(*_key, hash);
+ if (it == _catalog->_bucketIds.end()) {
+ // A bucket for this namespace and metadata pair does not yet exist.
+ it = _catalog->_bucketIds.insert({*_key, _catalog->_createNewBucketId(*_time, _stats)})
+ .first;
+ _catalog->_nsBuckets.insert({std::get<mongo::NamespaceString>(*_key), it->second});
+ _stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1);
+ }
+ _id = it->second;
+ actualId = &it->second;
+ }
+
+ _catalog->_markBucketNotIdle(_id);
+ auto it = _catalog->_buckets.find(_id);
+ if (it != _catalog->_buckets.end()) {
+ _bucket = it->second;
+ } else {
+ auto res = _catalog->_buckets.emplace(_id, std::make_shared<Bucket>());
+ _bucket = res.first->second;
+ }
+ _acquire();
+
+ // recheck if full now that we've reacquired the bucket
+ bool newBucket = oldId != _id; // only record stats if bucket has changed, don't double-count
+ if (!newBucket || isBucketFull(this, hash)) {
+ // The bucket is full, so create a new one.
+ if (_bucket->numPendingCommitMeasurements == 0 &&
+ _bucket->numCommittedMeasurements == _bucket->numMeasurements) {
+ // The bucket does not contain any measurements that are yet to be committed, so we can
+ // remove it now. Otherwise, we must keep the bucket around until it is committed.
+ _catalog->_memoryUsage.fetchAndSubtract(_bucket->memoryUsage);
+
+ release();
+
+ _catalog->_buckets.erase(_id);
+ _catalog->_nsBuckets.erase({std::get<NamespaceString>(*_key), _id});
+ } else {
+ _bucket->full = true;
+ release();
+ }
+
+ *actualId = _catalog->_createNewBucketId(*_time, _stats);
+ _id = *actualId;
+ _catalog->_nsBuckets.insert({std::get<mongo::NamespaceString>(*_key), _id});
+ auto res = _catalog->_buckets.emplace(_id, std::make_shared<Bucket>());
+ invariant(res.second);
+ _bucket = res.first->second;
+ _acquire();
+ }
+}
+
+const BucketCatalog::BucketIdInternal& BucketCatalog::BucketAccess::id() {
+ invariant(_time); // id is only set appropriately if we set time in the constructor
+
+ return _id;
+}
+
+void BucketCatalog::BucketAccess::setTime(std::size_t hash) {
+ invariant(isLocked());
+ invariant(_key);
+ invariant(_stats);
+ invariant(_time);
+
+ bool isCatalogLocked = hash != 0;
+ stdx::unique_lock<Mutex> lk;
+ if (!isCatalogLocked) {
+ // precompute the hash outside the lock, since it's expensive
+ auto hasher = _catalog->_bucketIds.hash_function();
+ hash = hasher(*_key);
+
+ release();
+ lk = _catalog->_lock();
+ }
+
+ auto it = _catalog->_bucketIds.find(*_key, hash);
+ if (it == _catalog->_bucketIds.end()) {
+ // someone else got rid of our bucket between releasing it and reacquiring the catalog lock,
+ // just generate a new bucket and lock it
+ it = _catalog->_bucketIds.insert({*_key, _catalog->_createNewBucketId(*_time, _stats)})
+ .first;
+ _catalog->_nsBuckets.insert({std::get<mongo::NamespaceString>(*_key), it->second});
+ _stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1);
+ } else {
+ it->second.setTime(*_time);
+ }
+
+ if (!isCatalogLocked) {
+ _id = it->second;
+ _bucket = _catalog->_buckets[_id];
+ _acquire();
+ }
+}
+
void BucketCatalog::MinMax::update(const BSONObj& doc,
boost::optional<StringData> metaField,
const StringData::ComparatorInterface* stringComparator,
@@ -678,6 +903,10 @@ bool BucketCatalog::BucketId::operator==(const BucketId& other) const {
return _num == other._num;
}
+bool BucketCatalog::BucketId::operator!=(const BucketId& other) const {
+ return _num != other._num;
+}
+
bool BucketCatalog::BucketId::operator<(const BucketId& other) const {
return _num < other._num;
}
@@ -709,7 +938,8 @@ public:
BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override {
const auto& bucketCatalog = BucketCatalog::get(opCtx);
- stdx::lock_guard lk(bucketCatalog._mutex);
+ auto lk = bucketCatalog._lock();
+ stdx::lock_guard eslk{bucketCatalog._executionStatsLock};
if (bucketCatalog._executionStats.empty()) {
return {};
@@ -718,8 +948,9 @@ public:
BSONObjBuilder builder;
builder.appendNumber("numBuckets", bucketCatalog._buckets.size());
builder.appendNumber("numOpenBuckets", bucketCatalog._bucketIds.size());
- builder.appendNumber("numIdleBuckets", bucketCatalog._idleBuckets.size());
- builder.appendNumber("memoryUsage", static_cast<long long>(bucketCatalog._memoryUsage));
+ builder.appendNumber("numIdleBuckets", bucketCatalog._numberOfIdleBuckets());
+ builder.appendNumber("memoryUsage",
+ static_cast<long long>(bucketCatalog._memoryUsage.load()));
return builder.obj();
}
} bucketCatalogServerStatus;
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 87687161b79..09164a166a9 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -51,6 +51,7 @@ public:
const OID* operator->() const;
bool operator==(const BucketId& other) const;
+ bool operator!=(const BucketId& other) const;
bool operator<(const BucketId& other) const;
template <typename H>
@@ -227,6 +228,9 @@ private:
};
struct Bucket {
+ // Access to the bucket is controlled by this lock
+ Mutex lock;
+
// The namespace that this bucket is used for.
NamespaceString ns;
@@ -297,19 +301,20 @@ private:
};
struct ExecutionStats {
- long long numBucketInserts = 0;
- long long numBucketUpdates = 0;
- long long numBucketsOpenedDueToMetadata = 0;
- long long numBucketsClosedDueToCount = 0;
- long long numBucketsClosedDueToSize = 0;
- long long numBucketsClosedDueToTimeForward = 0;
- long long numBucketsClosedDueToTimeBackward = 0;
- long long numBucketsClosedDueToMemoryThreshold = 0;
- long long numCommits = 0;
- long long numWaits = 0;
- long long numMeasurementsCommitted = 0;
+ AtomicWord<long long> numBucketInserts;
+ AtomicWord<long long> numBucketUpdates;
+ AtomicWord<long long> numBucketsOpenedDueToMetadata;
+ AtomicWord<long long> numBucketsClosedDueToCount;
+ AtomicWord<long long> numBucketsClosedDueToSize;
+ AtomicWord<long long> numBucketsClosedDueToTimeForward;
+ AtomicWord<long long> numBucketsClosedDueToTimeBackward;
+ AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
+ AtomicWord<long long> numCommits;
+ AtomicWord<long long> numWaits;
+ AtomicWord<long long> numMeasurementsCommitted;
};
+ // a wrapper around the numerical id to allow timestamp manipulations
class BucketIdInternal : public BucketId {
public:
static BucketIdInternal min();
@@ -320,11 +325,68 @@ private:
void setTime(const Date_t& time);
};
+ /**
+ * Helper class to handle all the locking necessary to lookup and lock a bucket for use. This
+ * is intended primarily for using a single bucket, including replacing it when it becomes full.
+ * If the usage pattern iterates over several buckets, you will instead want to use raw access
+ * using the different mutexes with the locking semantics described below.
+ */
+ class BucketAccess {
+ public:
+ BucketAccess() = delete;
+ BucketAccess(BucketCatalog* catalog,
+ const std::tuple<NamespaceString, BucketMetadata>& key,
+ ExecutionStats* stats,
+ const Date_t& time);
+ BucketAccess(BucketCatalog* catalog, const BucketId& bucketId);
+ ~BucketAccess();
+
+ bool isLocked() const;
+ Bucket* operator->();
+ operator bool() const;
+
+ // release the bucket lock, typically in order to reacquire the catalog lock
+ void release();
+
+ /**
+ * Close the existing, full bucket and open a new one for the same metadata.
+ * Parameter is a function which should check that the bucket is indeed still full after
+ * reacquiring the necessary locks. The first parameter will give the function access to
+ * this BucketAccess instance, with the bucket locked. The second parameter may provide
+ * the precomputed key hash for the _bucketIds map (or 0, if is hasn't been computed yet).
+ */
+ void rollover(const std::function<bool(BucketAccess*, std::size_t)>& isBucketFull);
+
+ // retrieve the (safely cached) id of the bucket
+ const BucketIdInternal& id();
+
+ /**
+ * Adjust the time associated with the bucket (id) if it hasn't been committed yet. The
+ * hash parameter is the precomputed hash corresponding to the bucket's key for lookup in
+ * the _bucketIds map (to save computation).
+ */
+ void setTime(std::size_t hash);
+
+ private:
+ void _acquire();
+
+ BucketCatalog* _catalog;
+ const std::tuple<NamespaceString, BucketMetadata>* _key;
+ ExecutionStats* _stats;
+ const Date_t* _time;
+
+ BucketIdInternal _id;
+ std::shared_ptr<Bucket> _bucket;
+ stdx::unique_lock<Mutex> _guard;
+ };
+
class ServerStatus;
using NsBuckets = std::set<std::tuple<NamespaceString, BucketId>>;
using IdleBuckets = std::set<BucketId>;
+ stdx::unique_lock<Mutex> _lock() const;
+
/**
* Removes the given bucket from the bucket catalog's internal data structures.
*/
@@ -332,15 +394,42 @@ private:
boost::optional<NsBuckets::iterator> nsBucketsIt = boost::none,
boost::optional<IdleBuckets::iterator> idleBucketsIt = boost::none);
+ void _markBucketIdle(const BucketId& bucketId);
+ void _markBucketNotIdle(const BucketId& bucketId);
+ void _markBucketNotIdle(const IdleBuckets::iterator& it);
+
/**
* Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold.
*/
void _expireIdleBuckets(ExecutionStats* stats);
- mutable Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog");
+ std::size_t _numberOfIdleBuckets() const;
+
+ /**
+ * Creates a new (internal) bucket ID to identify a bucket.
+ */
+ BucketIdInternal _createNewBucketId(const Date_t& time, ExecutionStats* stats);
+
+ std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns);
+ const std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const;
+
+ /**
+ * You must hold _mutex when accessing _buckets, _bucketIds, or _nsBuckets. While, holding a
+ * lock on _mutex, you can take a lock on an individual bucket, then release _mutex. Any
+ * iterators on the protected structures should be considered invalid once the lock is released.
+ * Any subsequent access to the structures requires relocking _mutex. You must *not* be holding
+ * a lock on a bucket when you attempt to acquire the lock on _mutex, as this can result in
+ * deadlock.
+ *
+ * Typically, if you want to acquire a bucket, you should use the BucketAccess RAII
+ * class to do so, as it will take care of most of this logic for you. Only use the _mutex
+ * directly for more global maintenance where you want to take the lock once and interact with
+ * multiple buckets atomically.
+ */
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog::_mutex");
// All buckets currently in the catalog, including buckets which are full but not yet committed.
- stdx::unordered_map<BucketId, Bucket> _buckets;
+ stdx::unordered_map<BucketId, std::shared_ptr<Bucket>> _buckets;
// The _id of the current bucket for each namespace and metadata pair.
stdx::unordered_map<std::tuple<NamespaceString, BucketMetadata>, BucketIdInternal> _bucketIds;
@@ -348,16 +437,29 @@ private:
// All buckets ordered by their namespaces.
NsBuckets _nsBuckets;
+ // This mutex protects access to _idleBuckets
+ mutable Mutex _idleBucketsLock = MONGO_MAKE_LATCH("BucketCatalog::_idleBucketsLock");
+
// Buckets that do not have any writers.
IdleBuckets _idleBuckets;
+ /**
+ * This mutex protects access to the _executionStats map. Once you complete your lookup, you
+ * can keep the shared_ptr to an individual namespace's stats object and release the lock. The
+ * object itself is thread-safe (atomics).
+ */
+ mutable Mutex _executionStatsLock = MONGO_MAKE_LATCH("BucketCatalog::_executionStatsLock");
+
// Per-collection execution stats.
- stdx::unordered_map<NamespaceString, ExecutionStats> _executionStats;
+ stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats;
+
+ // A placeholder to be returned in case a namespace has no allocated statistics object
+ static const std::shared_ptr<ExecutionStats> kEmptyStats;
// Counter for buckets created by the bucket catalog.
uint64_t _bucketNum = 0;
// Approximate memory usage of the bucket catalog.
- uint64_t _memoryUsage = 0;
+ AtomicWord<uint64_t> _memoryUsage;
};
} // namespace mongo