summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2022-01-21 13:41:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-21 14:11:04 +0000
commit3f28a62b0911515acd027d501074a13ad7e4afb5 (patch)
treed04c0273137c2be193652c8151111c9a70cae51b
parent86c4fdeebf9a956e75b0fa12a77a8eeb5ad8dcb4 (diff)
downloadmongo-3f28a62b0911515acd027d501074a13ad7e4afb5.tar.gz
SERVER-61196 Simplify BucketCatalog concurrency model
-rw-r--r--jstests/noPassthrough/timeseries_collStats.js40
-rw-r--r--jstests/noPassthrough/timeseries_idle_buckets.js63
-rw-r--r--src/mongo/db/commands/write_commands.cpp24
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp1411
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h631
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp62
6 files changed, 928 insertions, 1303 deletions
diff --git a/jstests/noPassthrough/timeseries_collStats.js b/jstests/noPassthrough/timeseries_collStats.js
index 1aa5625fc0e..8b9c47502e5 100644
--- a/jstests/noPassthrough/timeseries_collStats.js
+++ b/jstests/noPassthrough/timeseries_collStats.js
@@ -12,8 +12,11 @@
load("jstests/core/timeseries/libs/timeseries.js");
-const conn = MongoRunner.runMongod(
- {setParameter: {timeseriesIdleBucketExpiryMemoryUsageThreshold: 104857600}});
+const kIdleBucketExpiryMemoryUsageThreshold = 1024 * 1024 * 10;
+const conn = MongoRunner.runMongod({
+ setParameter:
+ {timeseriesIdleBucketExpiryMemoryUsageThreshold: kIdleBucketExpiryMemoryUsageThreshold}
+});
const dbName = jsTestName();
const testDB = conn.getDB(dbName);
@@ -62,9 +65,28 @@ const checkCollStats = function(empty = false) {
assert.eq(coll.getFullName(), stats.ns);
for (let [stat, value] of Object.entries(expectedStats)) {
- assert.eq(stats.timeseries[stat],
- value,
- "Invalid 'timeseries." + stat + "' value in collStats: " + tojson(stats));
+ if (stat === 'numBucketsClosedDueToMemoryThreshold') {
+ // Idle bucket expiration behavior will be non-deterministic since buckets are hashed
+ // into shards within the catalog based on metadata, and expiration is done on a
+ // per-shard basis. We just want to make sure that if we are expecting the number to be
+ // sufficiently large under a global-expiry regime, that it is at least greater than 0,
+ // signifying we have expired something from some shard.
+ //
+ // The value 33 was chosen as "sufficiently large" simply because we use 32 shards in
+ // the BucketCatalog and so we can apply the pigeon-hole principle to conclude that at
+ // least one of those inserted buckets that we expect to have triggered an expiration
+ // did in fact land in a shard with an existing idle bucket that it could expire.
+ if (value > 33) {
+ assert.gte(
+ stats.timeseries[stat],
+ 1,
+ "Invalid 'timeseries." + stat + "' value in collStats: " + tojson(stats));
+ }
+ } else {
+ assert.eq(stats.timeseries[stat],
+ value,
+ "Invalid 'timeseries." + stat + "' value in collStats: " + tojson(stats));
+ }
}
if (empty) {
@@ -217,17 +239,17 @@ expectedStats.avgNumMeasurementsPerCommit =
Math.floor(expectedStats.numMeasurementsCommitted / expectedStats.numCommits);
checkCollStats();
-const kIdleBucketExpiryMemoryUsageThreshold = 1024 * 1024 * 100;
numDocs = 70;
largeValue = 'a'.repeat(1024 * 1024);
const testIdleBucketExpiry = function(docFn) {
clearCollection();
+ let memoryUsage = 0;
let shouldExpire = false;
for (let i = 0; i < numDocs; i++) {
assert.commandWorked(coll.insert(docFn(i), {ordered: false}));
- const memoryUsage = assert.commandWorked(testDB.serverStatus()).bucketCatalog.memoryUsage;
+ memoryUsage = assert.commandWorked(testDB.serverStatus()).bucketCatalog.memoryUsage;
expectedStats.bucketCount++;
expectedStats.numBucketInserts++;
@@ -244,7 +266,9 @@ const testIdleBucketExpiry = function(docFn) {
shouldExpire = memoryUsage > kIdleBucketExpiryMemoryUsageThreshold;
}
- assert(shouldExpire, 'Memory usage did not reach idle bucket expiry threshold');
+ assert(shouldExpire,
+ `Memory usage did not reach idle bucket expiry threshold: ${memoryUsage} < ${
+ kIdleBucketExpiryMemoryUsageThreshold}`);
};
testIdleBucketExpiry(i => {
diff --git a/jstests/noPassthrough/timeseries_idle_buckets.js b/jstests/noPassthrough/timeseries_idle_buckets.js
index 4d8c1d048fe..651e7a446f2 100644
--- a/jstests/noPassthrough/timeseries_idle_buckets.js
+++ b/jstests/noPassthrough/timeseries_idle_buckets.js
@@ -13,7 +13,7 @@
load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest'.
const rst = new ReplSetTest({nodes: 1});
-rst.startSet({setParameter: {timeseriesIdleBucketExpiryMemoryUsageThreshold: 104857600}});
+rst.startSet({setParameter: {timeseriesIdleBucketExpiryMemoryUsageThreshold: 10485760}});
rst.initiate();
const db = rst.getPrimary().getDB(jsTestName());
@@ -57,34 +57,43 @@ for (let i = 0; i < numDocs; i++) {
}));
}
-// Insert a document with the metadata of a bucket which should have been expired. Thus, a new
-// bucket will be created.
-assert.commandWorked(coll.insert(
- {[timeFieldName]: ISODate(), [metaFieldName]: {0: metaValue}, [valueFieldName]: 3}));
-
-// Check buckets.
-let bucketDocs = bucketsColl.find({meta: {0: metaValue}}).sort({'control.min._id': 1}).toArray();
-assert.eq(bucketDocs.length, 2, 'Invalid number of buckets for metadata 0: ' + tojson(bucketDocs));
+// No go back and insert documents with the same metadata, and verify that we at some point
+// insert into a new bucket, indicating the old one was expired.
+let foundExpiredBucket = false;
+for (let i = 0; i < numDocs; i++) {
+ assert.commandWorked(coll.insert({
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: {[i.toString()]: metaValue},
+ [valueFieldName]: 3
+ }));
-// If bucket compression is enabled the expired bucket should have been compressed
-assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
- bucketDocs[0].control.version,
- 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
-assert.eq(1,
- bucketDocs[1].control.version,
- 'unexpected control.version in second bucket: ' + tojson(bucketDocs));
+ // Check buckets.
+ let bucketDocs = bucketsColl.find({meta: {[i.toString()]: metaValue}})
+ .sort({'control.min._id': 1})
+ .toArray();
+ if (bucketDocs.length > 1) {
+ // If bucket compression is enabled the expired bucket should have been compressed
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in second bucket: ' + tojson(bucketDocs));
-// Insert a document with the metadata of a bucket with should still be open. Thus, the existing
-// bucket will be used.
-assert.commandWorked(
- coll.insert({[timeFieldName]: ISODate(), [metaFieldName]: {[numDocs - 1]: metaValue}}));
-bucketDocs = bucketsColl.find({meta: {[numDocs - 1]: metaValue}}).toArray();
-assert.eq(bucketDocs.length,
- 1,
- 'Invalid number of buckets for metadata ' + (numDocs - 1) + ': ' + tojson(bucketDocs));
-assert.eq(1,
- bucketDocs[0].control.version,
- 'unexpected control.version in second bucket: ' + tojson(bucketDocs));
+ foundExpiredBucket = true;
+ break;
+ } else {
+ // The insert landed in an existing bucket, verify that compression didn't take place yet.
+ assert.eq(
+ bucketDocs.length,
+ 1,
+ 'Invalid number of buckets for metadata ' + (numDocs - 1) + ': ' + tojson(bucketDocs));
+ assert.eq(1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in second bucket: ' + tojson(bucketDocs));
+ }
+}
+assert(foundExpiredBucket, "Did not find an expired bucket");
rst.stopSet();
})();
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index 7c1ef7c0143..6eb1009bc86 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -206,9 +206,9 @@ write_ops::UpdateOpEntry makeTimeseriesUpdateOpEntry(
options.mustCheckExistenceForInsertOperations =
static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx));
write_ops::UpdateModification u(updateBuilder.obj(), options);
- write_ops::UpdateOpEntry update(BSON("_id" << batch->bucketId()), std::move(u));
- invariant(!update.getMulti(), batch->bucketId().toString());
- invariant(!update.getUpsert(), batch->bucketId().toString());
+ write_ops::UpdateOpEntry update(BSON("_id" << batch->bucket().id), std::move(u));
+ invariant(!update.getMulti(), batch->bucket().id.toString());
+ invariant(!update.getUpsert(), batch->bucket().id.toString());
return update;
}
@@ -249,7 +249,7 @@ BSONObj makeTimeseriesInsertDocument(std::shared_ptr<BucketCatalog::WriteBatch>
}
BSONObjBuilder builder;
- builder.append("_id", batch->bucketId());
+ builder.append("_id", batch->bucket().id);
{
BSONObjBuilder bucketControlBuilder(builder.subobjStart("control"));
bucketControlBuilder.append(kBucketControlVersionFieldName,
@@ -778,7 +778,7 @@ public:
std::vector<size_t>* docsToRetry) const {
auto& bucketCatalog = BucketCatalog::get(opCtx);
- auto metadata = bucketCatalog.getMetadata(batch->bucketId());
+ auto metadata = bucketCatalog.getMetadata(batch->bucket());
bool prepared = bucketCatalog.prepareCommit(batch);
if (!prepared) {
invariant(batch->finished());
@@ -790,7 +790,7 @@ public:
hangTimeseriesInsertBeforeWrite.pauseWhileSet();
- const auto docId = batch->bucketId();
+ const auto docId = batch->bucket().id;
const bool performInsert = batch->numPreviouslyCommittedMeasurements() == 0;
if (performInsert) {
const auto output =
@@ -866,7 +866,7 @@ public:
// Sort by bucket so that preparing the commit for each batch cannot deadlock.
std::sort(batchesToCommit.begin(), batchesToCommit.end(), [](auto left, auto right) {
- return left.get()->bucketId() < right.get()->bucketId();
+ return left.get()->bucket().id < right.get()->bucket().id;
});
boost::optional<Status> abortStatus;
@@ -882,17 +882,17 @@ public:
std::vector<write_ops::UpdateCommandRequest> updateOps;
for (auto batch : batchesToCommit) {
- auto metadata = bucketCatalog.getMetadata(batch.get()->bucketId());
+ auto metadata = bucketCatalog.getMetadata(batch.get()->bucket());
if (!bucketCatalog.prepareCommit(batch)) {
return false;
}
if (batch.get()->numPreviouslyCommittedMeasurements() == 0) {
insertOps.push_back(_makeTimeseriesInsertOp(
- batch, metadata, std::move(stmtIds[batch.get()->bucketId()])));
+ batch, metadata, std::move(stmtIds[batch.get()->bucket().id])));
} else {
updateOps.push_back(_makeTimeseriesUpdateOp(
- opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucketId()])));
+ opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucket().id])));
}
}
@@ -994,7 +994,7 @@ public:
const auto& batch = result.getValue().batch;
batches.emplace_back(batch, index);
if (isTimeseriesWriteRetryable(opCtx)) {
- stmtIds[batch->bucketId()].push_back(stmtId);
+ stmtIds[batch->bucket().id].push_back(stmtId);
}
}
@@ -1195,7 +1195,7 @@ public:
auto& [batch, index] = batches[itr];
if (batch->claimCommitRights()) {
auto stmtIds = isTimeseriesWriteRetryable(opCtx)
- ? std::move(bucketStmtIds[batch->bucketId()])
+ ? std::move(bucketStmtIds[batch->bucket().id])
: std::vector<StmtId>{};
canContinue = _commitTimeseriesBucket(opCtx,
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 9b06cdbdc33..9497c269d96 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -195,8 +195,350 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti
}
} // namespace
-const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::kEmptyStats{
- std::make_shared<BucketCatalog::ExecutionStats>()};
+struct BucketCatalog::ExecutionStats {
+ AtomicWord<long long> numBucketInserts;
+ AtomicWord<long long> numBucketUpdates;
+ AtomicWord<long long> numBucketsOpenedDueToMetadata;
+ AtomicWord<long long> numBucketsClosedDueToCount;
+ AtomicWord<long long> numBucketsClosedDueToSchemaChange;
+ AtomicWord<long long> numBucketsClosedDueToSize;
+ AtomicWord<long long> numBucketsClosedDueToTimeForward;
+ AtomicWord<long long> numBucketsClosedDueToTimeBackward;
+ AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
+ AtomicWord<long long> numCommits;
+ AtomicWord<long long> numWaits;
+ AtomicWord<long long> numMeasurementsCommitted;
+};
+
+class BucketCatalog::Bucket {
+public:
+ friend class BucketCatalog;
+
+ Bucket(const OID& id, StripeNumber stripe) : _id(id), _stripe(stripe) {}
+
+ /**
+ * Returns the ID for the underlying bucket.
+ */
+ const OID& id() const {
+ return _id;
+ }
+
+ /**
+ * Returns the number of the stripe that owns the bucket
+ */
+ StripeNumber stripe() const {
+ return _stripe;
+ }
+
+ // Returns the time associated with the bucket (id)
+ Date_t getTime() const {
+ return _id.asDateT();
+ }
+
+ /**
+ * Returns the timefield for the underlying bucket.
+ */
+ StringData getTimeField() {
+ return _timeField;
+ }
+
+ /**
+ * Returns whether all measurements have been committed.
+ */
+ bool allCommitted() const {
+ return _batches.empty() && !_preparedBatch;
+ }
+
+ /**
+ * Returns total number of measurements in the bucket.
+ */
+ uint32_t numMeasurements() const {
+ return _numMeasurements;
+ }
+
+ /**
+ * Determines if the schema for an incoming measurement is incompatible with those already
+ * stored in the bucket.
+ *
+ * Returns true if incompatible
+ */
+ bool schemaIncompatible(const BSONObj& input,
+ boost::optional<StringData> metaField,
+ const StringData::ComparatorInterface* comparator) {
+ // (Generic FCV reference): TODO (SERVER-60912): Update once kLastLTS is 6.0
+ if (serverGlobalParams.featureCompatibility.getVersion() ==
+ multiversion::GenericFCV::kLastLTS) {
+ return false;
+ }
+
+ auto result = _schema.update(input, metaField, comparator);
+ return (result == timeseries::Schema::UpdateStatus::Failed);
+ }
+
+private:
+ /**
+ * Determines the effect of adding 'doc' to this bucket. If adding 'doc' causes this bucket
+ * to overflow, we will create a new bucket and recalculate the change to the bucket size
+ * and data fields.
+ */
+ void _calculateBucketFieldsAndSizeChange(const BSONObj& doc,
+ boost::optional<StringData> metaField,
+ NewFieldNames* newFieldNamesToBeInserted,
+ uint32_t* newFieldNamesSize,
+ uint32_t* sizeToBeAdded) const {
+ // BSON size for an object with an empty object field where field name is empty string.
+ // We can use this as an offset to know the size when we have real field names.
+ static constexpr int emptyObjSize = 12;
+ // Validate in debug builds that this size is correct
+ dassert(emptyObjSize == BSON("" << BSONObj()).objsize());
+
+ newFieldNamesToBeInserted->clear();
+ *newFieldNamesSize = 0;
+ *sizeToBeAdded = 0;
+ auto numMeasurementsFieldLength = numDigits(_numMeasurements);
+ for (const auto& elem : doc) {
+ auto fieldName = elem.fieldNameStringData();
+ if (fieldName == metaField) {
+ // Ignore the metadata field since it will not be inserted.
+ continue;
+ }
+
+ // If the field name is new, add the size of an empty object with that field name.
+ auto hashedKey = StringSet::hasher().hashed_key(fieldName);
+ if (!_fieldNames.contains(hashedKey)) {
+ newFieldNamesToBeInserted->push_back(hashedKey);
+ *newFieldNamesSize += elem.fieldNameSize();
+ *sizeToBeAdded += emptyObjSize + fieldName.size();
+ }
+
+ // Add the element size, taking into account that the name will be changed to its
+ // positional number. Add 1 to the calculation since the element's field name size
+ // accounts for a null terminator whereas the stringified position does not.
+ *sizeToBeAdded += elem.size() - elem.fieldNameSize() + numMeasurementsFieldLength + 1;
+ }
+ }
+
+ /**
+ * Returns whether BucketCatalog::commit has been called at least once on this bucket.
+ */
+ bool _hasBeenCommitted() const {
+ return _numCommittedMeasurements != 0 || _preparedBatch;
+ }
+
+ /**
+ * Return a pointer to the current, open batch.
+ */
+ std::shared_ptr<WriteBatch> _activeBatch(OperationId opId,
+ const std::shared_ptr<ExecutionStats>& stats) {
+ auto it = _batches.find(opId);
+ if (it == _batches.end()) {
+ it =
+ _batches
+ .try_emplace(
+ opId, std::make_shared<WriteBatch>(BucketHandle{_id, _stripe}, opId, stats))
+ .first;
+ }
+ return it->second;
+ }
+
+ // The bucket ID for the underlying document
+ const OID _id;
+
+ // The stripe which owns this bucket.
+ const StripeNumber _stripe;
+
+ // The namespace that this bucket is used for.
+ NamespaceString _ns;
+
+ // The metadata of the data that this bucket contains.
+ BucketMetadata _metadata;
+
+ // Extra metadata combinations that are supported without normalizing the metadata object.
+ static constexpr std::size_t kNumFieldOrderCombinationsWithoutNormalizing = 1;
+ boost::container::static_vector<BSONObj, kNumFieldOrderCombinationsWithoutNormalizing>
+ _nonNormalizedKeyMetadatas;
+
+ // Top-level field names of the measurements that have been inserted into the bucket.
+ StringSet _fieldNames;
+
+ // Time field for the measurements that have been inserted into the bucket.
+ std::string _timeField;
+
+ // The minimum and maximum values for each field in the bucket.
+ timeseries::MinMax _minmax;
+
+ // The reference schema for measurements in this bucket. May reflect schema of uncommitted
+ // measurements.
+ timeseries::Schema _schema;
+
+ // The latest time that has been inserted into the bucket.
+ Date_t _latestTime;
+
+ // The total size in bytes of the bucket's BSON serialization, including measurements to be
+ // inserted.
+ uint64_t _size = 0;
+
+ // The total number of measurements in the bucket, including uncommitted measurements and
+ // measurements to be inserted.
+ uint32_t _numMeasurements = 0;
+
+ // The number of committed measurements in the bucket.
+ uint32_t _numCommittedMeasurements = 0;
+
+ // Whether the bucket is full. This can be due to number of measurements, size, or time
+ // range.
+ bool _full = false;
+
+ // The batch that has been prepared and is currently in the process of being committed, if
+ // any.
+ std::shared_ptr<WriteBatch> _preparedBatch;
+
+ // Batches, per operation, that haven't been committed or aborted yet.
+ stdx::unordered_map<OperationId, std::shared_ptr<WriteBatch>> _batches;
+
+ // If the bucket is in idleBuckets, then its position is recorded here.
+ boost::optional<Stripe::IdleList::iterator> _idleListEntry = boost::none;
+
+ // Approximate memory usage of this bucket.
+ uint64_t _memoryUsage = sizeof(*this);
+};
+
+/**
+ * Bundle of information that 'insert' needs to pass down to helper methods that may create a new
+ * bucket.
+ */
+struct BucketCatalog::CreationInfo {
+ const BucketKey& key;
+ StripeNumber stripe;
+ const Date_t& time;
+ const TimeseriesOptions& options;
+ ExecutionStats* stats;
+ ClosedBuckets* closedBuckets;
+ bool openedDuetoMetadata = true;
+};
+
+BucketCatalog::WriteBatch::WriteBatch(const BucketHandle& bucket,
+ OperationId opId,
+ const std::shared_ptr<ExecutionStats>& stats)
+ : _bucket{bucket}, _opId(opId), _stats{stats} {}
+
+bool BucketCatalog::WriteBatch::claimCommitRights() {
+ return !_commitRights.swap(true);
+}
+
+StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() const {
+ if (!_promise.getFuture().isReady()) {
+ _stats->numWaits.fetchAndAddRelaxed(1);
+ }
+ return _promise.getFuture().getNoThrow();
+}
+
+const BucketCatalog::BucketHandle& BucketCatalog::WriteBatch::bucket() const {
+ return _bucket;
+}
+
+const std::vector<BSONObj>& BucketCatalog::WriteBatch::measurements() const {
+ return _measurements;
+}
+
+const BSONObj& BucketCatalog::WriteBatch::min() const {
+ return _min;
+}
+
+const BSONObj& BucketCatalog::WriteBatch::max() const {
+ return _max;
+}
+
+const StringMap<std::size_t>& BucketCatalog::WriteBatch::newFieldNamesToBeInserted() const {
+ return _newFieldNamesToBeInserted;
+}
+
+uint32_t BucketCatalog::WriteBatch::numPreviouslyCommittedMeasurements() const {
+ return _numPreviouslyCommittedMeasurements;
+}
+
+bool BucketCatalog::WriteBatch::finished() const {
+ return _promise.getFuture().isReady();
+}
+
+BSONObj BucketCatalog::WriteBatch::toBSON() const {
+ auto toFieldName = [](const auto& nameHashPair) { return nameHashPair.first; };
+ return BSON("docs" << _measurements << "bucketMin" << _min << "bucketMax" << _max
+ << "numCommittedMeasurements" << int(_numPreviouslyCommittedMeasurements)
+ << "newFieldNamesToBeInserted"
+ << std::set<std::string>(
+ boost::make_transform_iterator(_newFieldNamesToBeInserted.begin(),
+ toFieldName),
+ boost::make_transform_iterator(_newFieldNamesToBeInserted.end(),
+ toFieldName)));
+}
+
+void BucketCatalog::WriteBatch::_addMeasurement(const BSONObj& doc) {
+ _measurements.push_back(doc);
+}
+
+void BucketCatalog::WriteBatch::_recordNewFields(NewFieldNames&& fields) {
+ for (auto&& field : fields) {
+ _newFieldNamesToBeInserted[field] = field.hash();
+ }
+}
+
+void BucketCatalog::WriteBatch::_prepareCommit(Bucket* bucket) {
+ invariant(_commitRights.load());
+ _numPreviouslyCommittedMeasurements = bucket->_numCommittedMeasurements;
+
+ // Filter out field names that were new at the time of insertion, but have since been committed
+ // by someone else.
+ for (auto it = _newFieldNamesToBeInserted.begin(); it != _newFieldNamesToBeInserted.end();) {
+ StringMapHashedKey fieldName(it->first, it->second);
+ if (bucket->_fieldNames.contains(fieldName)) {
+ _newFieldNamesToBeInserted.erase(it++);
+ continue;
+ }
+
+ bucket->_fieldNames.emplace(fieldName);
+ ++it;
+ }
+
+ for (const auto& doc : _measurements) {
+ bucket->_minmax.update(
+ doc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator());
+ }
+
+ const bool isUpdate = _numPreviouslyCommittedMeasurements > 0;
+ if (isUpdate) {
+ _min = bucket->_minmax.minUpdates();
+ _max = bucket->_minmax.maxUpdates();
+ } else {
+ _min = bucket->_minmax.min();
+ _max = bucket->_minmax.max();
+
+ // Approximate minmax memory usage by taking sizes of initial commit. Subsequent updates may
+ // add fields but are most likely just to update values.
+ bucket->_memoryUsage += _min.objsize();
+ bucket->_memoryUsage += _max.objsize();
+ }
+}
+
+void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) {
+ invariant(_commitRights.load());
+ _promise.emplaceValue(info);
+}
+
+void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status,
+ const Bucket* bucket) {
+ if (finished()) {
+ return;
+ }
+
+ std::string nsIdentification;
+ if (bucket) {
+ nsIdentification.append(str::stream() << " for namespace " << bucket->_ns);
+ }
+ _promise.setError(status.value_or(Status{ErrorCodes::TimeseriesBucketCleared,
+ str::stream() << "Time-series bucket " << _bucket.id
+ << nsIdentification << " was cleared"}));
+}
BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) {
return getBucketCatalog(svcCtx);
@@ -206,8 +548,11 @@ BucketCatalog& BucketCatalog::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
}
-BSONObj BucketCatalog::getMetadata(const OID& bucketId) const {
- BucketAccess bucket{const_cast<BucketCatalog*>(this), bucketId};
+BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) const {
+ auto const& stripe = _stripes[handle.stripe];
+ stdx::lock_guard stripeLock{stripe.mutex};
+
+ const Bucket* bucket = _findBucket(stripe, stripeLock, handle.id);
if (!bucket) {
return {};
}
@@ -223,27 +568,35 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
const BSONObj& doc,
CombineWithInsertsFromOtherClients combine) {
- BSONElement metadata;
- auto metaFieldName = options.getMetaField();
- if (metaFieldName) {
- metadata = doc[*metaFieldName];
- }
- auto key = BucketKey{ns, BucketMetadata{metadata, comparator}};
-
- auto stats = _getExecutionStats(ns);
- invariant(stats);
-
auto timeElem = doc[options.getTimeField()];
if (!timeElem || BSONType::Date != timeElem.type()) {
return {ErrorCodes::BadValue,
str::stream() << "'" << options.getTimeField() << "' must be present and contain a "
<< "valid BSON UTC datetime value"};
}
-
auto time = timeElem.Date();
+ auto stats = _getExecutionStats(ns);
+ invariant(stats);
+
+ BSONElement metadata;
+ auto metaFieldName = options.getMetaField();
+ if (metaFieldName) {
+ metadata = doc[*metaFieldName];
+ }
+
+ // Buckets are spread across independently-lockable stripes to improve parallelism. We map a
+ // bucket to a stripe by hashing the BucketKey.
+ auto key = BucketKey{ns, BucketMetadata{metadata, comparator}};
+ auto stripeNumber = _getStripeNumber(key);
+
ClosedBuckets closedBuckets;
- BucketAccess bucket{this, key, options, stats.get(), &closedBuckets, time};
+ CreationInfo info{key, stripeNumber, time, options, stats.get(), &closedBuckets};
+
+ auto& stripe = _stripes[stripeNumber];
+ stdx::lock_guard stripeLock{stripe.mutex};
+
+ Bucket* bucket = _useOrCreateBucket(&stripe, stripeLock, info);
invariant(bucket);
NewFieldNames newFieldNamesToBeInserted;
@@ -255,21 +608,20 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
&newFieldNamesSize,
&sizeToBeAdded);
- auto shouldCloseBucket = [&](BucketAccess* bucket) -> bool {
+ auto shouldCloseBucket = [&](Bucket* bucket) -> bool {
if (bucket->schemaIncompatible(doc, metaFieldName, comparator)) {
stats->numBucketsClosedDueToSchemaChange.fetchAndAddRelaxed(1);
return true;
}
- if ((*bucket)->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
+ if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) {
stats->numBucketsClosedDueToCount.fetchAndAddRelaxed(1);
return true;
}
- if ((*bucket)->_size + sizeToBeAdded >
- static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
+ if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) {
stats->numBucketsClosedDueToSize.fetchAndAddRelaxed(1);
return true;
}
- auto bucketTime = (*bucket).getTime();
+ auto bucketTime = bucket->getTime();
if (time - bucketTime >= Seconds(*options.getBucketMaxSpanSeconds())) {
stats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(1);
return true;
@@ -281,8 +633,9 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
return false;
};
- if (!bucket->_ns.isEmpty() && shouldCloseBucket(&bucket)) {
- bucket.rollover(shouldCloseBucket, &closedBuckets);
+ if (!bucket->_ns.isEmpty() && shouldCloseBucket(bucket)) {
+ info.openedDuetoMetadata = false;
+ bucket = _rollover(&stripe, stripeLock, bucket, info);
bucket->_calculateBucketFieldsAndSizeChange(doc,
options.getMetaField(),
@@ -303,15 +656,16 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
if (bucket->_ns.isEmpty()) {
// The namespace and metadata only need to be set if this bucket was newly created.
bucket->_ns = ns;
- key.metadata.normalize();
bucket->_metadata = key.metadata;
- // The namespace is stored two times: the bucket itself and _openBuckets.
- // The metadata is stored two times, normalized and un-normalized. A unique pointer to the
- // bucket is stored once: _allBuckets. A raw pointer to the bucket is stored at most twice:
- // _openBuckets, _idleBuckets.
- bucket->_memoryUsage += (ns.size() * 2) + (bucket->_metadata.toBSON().objsize() * 2) +
- sizeof(Bucket) + sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2);
+ // The namespace is stored two times: the bucket itself and openBuckets.
+ // We don't have a great approximation for the
+ // _schema size, so we use initial document size minus metadata as an approximation. Since
+ // the metadata itself is stored once, in the bucket, we can combine the two and just use
+ // the initial document size. A unique pointer to the bucket is stored once: allBuckets. A
+ // raw pointer to the bucket is stored at most twice: openBuckets, idleBuckets.
+ bucket->_memoryUsage += (ns.size() * 2) + doc.objsize() + sizeof(Bucket) +
+ sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2);
bucket->_schema.update(doc, options.getMetaField(), comparator);
} else {
@@ -328,14 +682,18 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
return false;
}
- _waitToCommitBatch(batch);
+ auto& stripe = _stripes[batch->bucket().stripe];
+ _waitToCommitBatch(&stripe, batch);
+
+ stdx::lock_guard stripeLock{stripe.mutex};
+ Bucket* bucket =
+ _useBucketInState(&stripe, stripeLock, batch->bucket().id, BucketState::kPrepared);
- BucketAccess bucket(this, batch->bucketId(), BucketState::kPrepared);
if (batch->finished()) {
// Someone may have aborted it while we were waiting.
return false;
} else if (!bucket) {
- abort(batch);
+ _abort(&stripe, stripeLock, batch, boost::none);
return false;
}
@@ -349,13 +707,16 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
std::shared_ptr<WriteBatch> batch, const CommitInfo& info) {
invariant(!batch->finished());
- invariant(!batch->active());
boost::optional<ClosedBucket> closedBucket;
batch->_finish(info);
- BucketAccess bucket(this, batch->bucketId(), BucketState::kNormal);
+ auto& stripe = _stripes[batch->bucket().stripe];
+ stdx::lock_guard stripeLock{stripe.mutex};
+
+ Bucket* bucket =
+ _useBucketInState(&stripe, stripeLock, batch->bucket().id, BucketState::kNormal);
if (bucket) {
bucket->_preparedBatch.reset();
}
@@ -377,13 +738,11 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
// It's possible that we cleared the bucket in between preparing the commit and finishing
// here. In this case, we should abort any other ongoing batches and clear the bucket from
// the catalog so it's not hanging around idle.
- auto lk = _lockExclusive();
- auto it = _allBuckets.find(batch->bucketId());
- if (it != _allBuckets.end()) {
- auto bucket = it->second.get();
- stdx::unique_lock blk{bucket->_mutex};
+ auto it = stripe.allBuckets.find(batch->bucket().id);
+ if (it != stripe.allBuckets.end()) {
+ bucket = it->second.get();
bucket->_preparedBatch.reset();
- _abort(blk, bucket, nullptr, boost::none);
+ _abort(&stripe, stripeLock, bucket, nullptr, boost::none);
}
} else if (bucket->allCommitted()) {
if (bucket->_full) {
@@ -391,30 +750,24 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
// bucket is full. Thus, we can remove it.
_memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
- bucket.release();
- auto lk = _lockExclusive();
+ auto it = stripe.allBuckets.find(batch->bucket().id);
+ if (it != stripe.allBuckets.end()) {
+ bucket = it->second.get();
- auto it = _allBuckets.find(batch->bucketId());
- if (it != _allBuckets.end()) {
- Bucket* ptr = it->second.get();
- _verifyBucketIsUnused(ptr);
+ closedBucket = ClosedBucket{batch->bucket().id,
+ bucket->getTimeField().toString(),
+ bucket->numMeasurements()};
- closedBucket = ClosedBucket{
- batch->bucketId(), ptr->getTimeField().toString(), ptr->numMeasurements()};
-
- // Only remove from _allBuckets and _idleBuckets. If it was marked full, we know
- // that happened in BucketAccess::rollover, and that there is already a new open
+ // Only remove from allBuckets and idleBuckets. If it was marked full, we know
+ // that happened in Stripe::rollover, and that there is already a new open
// bucket for this metadata.
- _markBucketNotIdle(ptr, false /* locked */);
- {
- stdx::lock_guard statesLk{_statesMutex};
- _bucketStates.erase(batch->bucketId());
- }
+ _markBucketNotIdle(&stripe, stripeLock, bucket);
+ _eraseBucketState(batch->bucket().id);
- _allBuckets.erase(batch->bucketId());
+ stripe.allBuckets.erase(batch->bucket().id);
}
} else {
- _markBucketIdle(bucket);
+ _markBucketIdle(&stripe, stripeLock, bucket);
}
}
return closedBucket;
@@ -429,18 +782,10 @@ void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch,
return;
}
- // Before we access the bucket, make sure it's still there.
- auto lk = _lockExclusive();
- auto it = _allBuckets.find(batch->bucketId());
- if (it == _allBuckets.end()) {
- // Special case, bucket has already been cleared, and we need only abort this batch.
- batch->_abort(status, nullptr);
- return;
- }
+ auto& stripe = _stripes[batch->bucket().stripe];
+ stdx::lock_guard stripeLock{stripe.mutex};
- Bucket* bucket = it->second.get();
- stdx::unique_lock blk{bucket->_mutex};
- _abort(blk, bucket, batch, status);
+ _abort(&stripe, stripeLock, batch, status);
}
void BucketCatalog::clear(const OID& oid) {
@@ -452,20 +797,22 @@ void BucketCatalog::clear(const OID& oid) {
}
void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& shouldClear) {
- auto lk = _lockExclusive();
- auto statsLk = _statsMutex.lockExclusive();
+ for (auto& stripe : _stripes) {
+ stdx::lock_guard stripeLock{stripe.mutex};
+ for (auto it = stripe.allBuckets.begin(); it != stripe.allBuckets.end();) {
+ auto nextIt = std::next(it);
- for (auto it = _allBuckets.begin(); it != _allBuckets.end();) {
- auto nextIt = std::next(it);
+ const auto& bucket = it->second;
+ if (shouldClear(bucket->_ns)) {
+ {
+ stdx::lock_guard catalogLock{_mutex};
+ _executionStats.erase(bucket->_ns);
+ }
+ _abort(&stripe, stripeLock, bucket.get(), nullptr, boost::none);
+ }
- const auto& bucket = it->second;
- stdx::unique_lock blk{bucket->_mutex};
- if (shouldClear(bucket->_ns)) {
- _executionStats.erase(bucket->_ns);
- _abort(blk, bucket.get(), nullptr, boost::none);
+ it = nextIt;
}
-
- it = nextIt;
}
}
@@ -504,54 +851,137 @@ void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuild
}
}
-BucketCatalog::StripedMutex::ExclusiveLock::ExclusiveLock(const StripedMutex& sm) {
- invariant(sm._mutexes.size() == _locks.size());
- for (std::size_t i = 0; i < sm._mutexes.size(); ++i) {
- _locks[i] = stdx::unique_lock<Mutex>(sm._mutexes[i]);
+BucketCatalog::BucketMetadata::BucketMetadata(BSONElement elem,
+ const StringData::ComparatorInterface* comparator)
+ : _metadataElement(elem), _comparator(comparator) {
+ if (_metadataElement) {
+ BSONObjBuilder objBuilder;
+ // We will get an object of equal size, just with reordered fields.
+ objBuilder.bb().reserveBytes(_metadataElement.size());
+ normalizeTopLevel(&objBuilder, _metadataElement);
+ _metadata = objBuilder.obj();
}
+ // Updates the BSONElement to refer to the copied BSONObj.
+ _metadataElement = _metadata.firstElement();
}
-BucketCatalog::StripedMutex::SharedLock BucketCatalog::StripedMutex::lockShared() const {
- static const std::hash<stdx::thread::id> hasher;
- return SharedLock{_mutexes[hasher(stdx::this_thread::get_id()) % kNumStripes]};
+bool BucketCatalog::BucketMetadata::operator==(const BucketMetadata& other) const {
+ return _metadataElement.binaryEqualValues(other._metadataElement);
}
-BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::StripedMutex::lockExclusive() const {
- return ExclusiveLock{*this};
+const BSONObj& BucketCatalog::BucketMetadata::toBSON() const {
+ return _metadata;
}
-BucketCatalog::StripedMutex::SharedLock BucketCatalog::_lockShared() const {
- return _bucketMutex.lockShared();
+StringData BucketCatalog::BucketMetadata::getMetaField() const {
+ return StringData(_metadataElement.fieldName());
}
-BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::_lockExclusive() const {
- return _bucketMutex.lockExclusive();
+const StringData::ComparatorInterface* BucketCatalog::BucketMetadata::getComparator() const {
+ return _comparator;
}
-void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) {
- while (true) {
- BucketAccess bucket{this, batch->bucketId()};
- if (!bucket || batch->finished()) {
- return;
+BucketCatalog::BucketKey::BucketKey(const NamespaceString& n, const BucketMetadata& m)
+ : ns(n), metadata(m), hash(absl::Hash<BucketKey>{}(*this)) {}
+
+std::size_t BucketCatalog::BucketHasher::operator()(const BucketKey& key) const {
+ // Use the default absl hasher.
+ return key.hash;
+}
+
+BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) {
+ return key.hash % kNumberOfStripes;
+}
+
+const BucketCatalog::Bucket* BucketCatalog::_findBucket(const Stripe& stripe,
+ WithLock,
+ const OID& id,
+ ReturnClearedBuckets mode) const {
+ auto it = stripe.allBuckets.find(id);
+ if (it != stripe.allBuckets.end()) {
+ if (mode == ReturnClearedBuckets::kYes) {
+ return it->second.get();
}
- auto current = bucket->_preparedBatch;
- if (!current) {
- // No other batches for this bucket are currently committing, so we can proceed.
- bucket->_preparedBatch = batch;
- bucket->_batches.erase(batch->_opId);
- break;
+ auto state = _getBucketState(id);
+ if (state && state != BucketState::kCleared && state != BucketState::kPreparedAndCleared) {
+ return it->second.get();
+ }
+ }
+ return nullptr;
+}
+
+BucketCatalog::Bucket* BucketCatalog::_useBucket(Stripe* stripe,
+ WithLock stripeLock,
+ const OID& id,
+ ReturnClearedBuckets mode) {
+ return const_cast<Bucket*>(_findBucket(*stripe, stripeLock, id, mode));
+}
+
+BucketCatalog::Bucket* BucketCatalog::_useBucketInState(Stripe* stripe,
+ WithLock,
+ const OID& id,
+ BucketState targetState) {
+ auto it = stripe->allBuckets.find(id);
+ if (it != stripe->allBuckets.end()) {
+ auto state = _setBucketState(it->second->_id, targetState);
+ if (state && state != BucketState::kCleared && state != BucketState::kPreparedAndCleared) {
+ return it->second.get();
+ }
+ }
+ return nullptr;
+}
+
+BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe,
+ WithLock stripeLock,
+ const CreationInfo& info) {
+ auto it = stripe->openBuckets.find(info.key);
+ if (it == stripe->openBuckets.end()) {
+ // No open bucket for this metadata.
+ return _allocateBucket(stripe, stripeLock, info);
+ }
+
+ Bucket* bucket = it->second;
+
+ auto state = _getBucketState(bucket->id());
+ if (state == BucketState::kNormal || state == BucketState::kPrepared) {
+ _markBucketNotIdle(stripe, stripeLock, bucket);
+ return bucket;
+ }
+
+ _abort(stripe, stripeLock, bucket, nullptr, boost::none);
+ return _allocateBucket(stripe, stripeLock, info);
+}
+
+void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch) {
+ while (true) {
+ std::shared_ptr<WriteBatch> current;
+
+ {
+ stdx::lock_guard stripeLock{stripe->mutex};
+ Bucket* bucket =
+ _useBucket(stripe, stripeLock, batch->bucket().id, ReturnClearedBuckets::kNo);
+ if (!bucket || batch->finished()) {
+ return;
+ }
+
+ current = bucket->_preparedBatch;
+ if (!current) {
+ // No other batches for this bucket are currently committing, so we can proceed.
+ bucket->_preparedBatch = batch;
+ bucket->_batches.erase(batch->_opId);
+ return;
+ }
}
// We have to wait for someone else to finish.
- bucket.release();
current->getResult().getStatus().ignore(); // We don't care about the result.
}
}
-bool BucketCatalog::_removeBucket(Bucket* bucket, bool expiringBuckets) {
- auto it = _allBuckets.find(bucket->id());
- if (it == _allBuckets.end()) {
+bool BucketCatalog::_removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket) {
+ auto it = stripe->allBuckets.find(bucket->id());
+ if (it == stripe->allBuckets.end()) {
return false;
}
@@ -559,43 +989,50 @@ bool BucketCatalog::_removeBucket(Bucket* bucket, bool expiringBuckets) {
invariant(!bucket->_preparedBatch);
_memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
- _markBucketNotIdle(bucket, expiringBuckets /* locked */);
- _removeNonNormalizedKeysForBucket(bucket);
- _openBuckets.erase({bucket->_ns, bucket->_metadata});
- {
- stdx::lock_guard statesLk{_statesMutex};
- _bucketStates.erase(bucket->id());
- }
- _allBuckets.erase(it);
+ _markBucketNotIdle(stripe, stripeLock, bucket);
+ stripe->openBuckets.erase({bucket->_ns, bucket->_metadata});
+ _eraseBucketState(bucket->id());
+
+ stripe->allBuckets.erase(it);
return true;
}
-void BucketCatalog::_removeNonNormalizedKeysForBucket(Bucket* bucket) {
- auto comparator = bucket->_metadata.getComparator();
- for (auto&& metadata : bucket->_nonNormalizedKeyMetadatas) {
- _openBuckets.erase({bucket->_ns, {metadata.firstElement(), metadata, comparator}});
+void BucketCatalog::_abort(Stripe* stripe,
+ WithLock stripeLock,
+ std::shared_ptr<WriteBatch> batch,
+ const boost::optional<Status>& status) {
+ // Before we access the bucket, make sure it's still there.
+ Bucket* bucket = _useBucket(stripe, stripeLock, batch->bucket().id, ReturnClearedBuckets::kYes);
+ if (!bucket) {
+ // Special case, bucket has already been cleared, and we need only abort this batch.
+ batch->_abort(status, nullptr);
+ return;
}
+
+ // Proceed to abort any unprepared batches and remove the bucket if possible
+ _abort(stripe, stripeLock, bucket, batch, status);
}
-void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk,
+void BucketCatalog::_abort(Stripe* stripe,
+ WithLock stripeLock,
Bucket* bucket,
std::shared_ptr<WriteBatch> batch,
const boost::optional<Status>& status) {
- // For any uncommitted batches that we need to abort, see if we already have the rights,
- // otherwise try to claim the rights and abort it. If we don't get the rights, then wait
- // for the other writer to resolve the batch.
+ // Abort any unprepared batches. This should be safe since we have a lock on the stripe,
+ // preventing anyone else from using these.
for (const auto& [_, current] : bucket->_batches) {
current->_abort(status, bucket);
}
bucket->_batches.clear();
bool doRemove = true; // We shouldn't remove the bucket if there's a prepared batch outstanding
- // and it's not the on we manage. In that case, we don't know what the
+ // and it's not the one we manage. In that case, we don't know what the
// user is doing with it, but we need to keep the bucket around until
// that batch is finished.
if (auto& prepared = bucket->_preparedBatch) {
if (prepared == batch) {
+ // We own the prepared batch, so we can go ahead and abort it and remove the bucket.
prepared->_abort(status, bucket);
prepared.reset();
} else {
@@ -603,63 +1040,40 @@ void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk,
}
}
- lk.unlock();
if (doRemove) {
- [[maybe_unused]] bool removed = _removeBucket(bucket, false /* expiringBuckets */);
+ [[maybe_unused]] bool removed = _removeBucket(stripe, stripeLock, bucket);
}
}
-void BucketCatalog::_markBucketIdle(Bucket* bucket) {
+void BucketCatalog::_markBucketIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket) {
invariant(bucket);
- stdx::lock_guard lk{_idleMutex};
- _idleBuckets.push_front(bucket);
- bucket->_idleListEntry = _idleBuckets.begin();
+ stripe->idleBuckets.push_front(bucket);
+ bucket->_idleListEntry = stripe->idleBuckets.begin();
}
-void BucketCatalog::_markBucketNotIdle(Bucket* bucket, bool locked) {
+void BucketCatalog::_markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket) {
invariant(bucket);
if (bucket->_idleListEntry) {
- stdx::unique_lock<Mutex> guard;
- if (!locked) {
- guard = stdx::unique_lock{_idleMutex};
- }
- _idleBuckets.erase(*bucket->_idleListEntry);
+ stripe->idleBuckets.erase(*bucket->_idleListEntry);
bucket->_idleListEntry = boost::none;
}
}
-void BucketCatalog::_verifyBucketIsUnused(Bucket* bucket) const {
- // Take a lock on the bucket so we guarantee no one else is accessing it. We can release it
- // right away since no one else can take it again without taking the catalog lock, which we
- // also hold outside this method.
- stdx::lock_guard<Mutex> lk{bucket->_mutex};
-}
-
-void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats,
+void BucketCatalog::_expireIdleBuckets(Stripe* stripe,
+ WithLock stripeLock,
+ ExecutionStats* stats,
std::vector<BucketCatalog::ClosedBucket>* closedBuckets) {
- // Must hold an exclusive lock on _bucketMutex from outside.
- stdx::unique_lock lk{_idleMutex};
-
// As long as we still need space and have entries and remaining attempts, close idle buckets.
int32_t numClosed = 0;
- while (!_idleBuckets.empty() &&
+ while (!stripe->idleBuckets.empty() &&
_memoryUsage.load() > static_cast<std::uint64_t>(
gTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes.load()) &&
numClosed <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) {
- Bucket* bucket = _idleBuckets.back();
-
- lk.unlock();
- _verifyBucketIsUnused(bucket);
- lk.lock();
- if (!bucket->_idleListEntry) {
- // The bucket may have become non-idle between when we unlocked _idleMutex and locked
- // the bucket's mutex.
- continue;
- }
-
+ Bucket* bucket = stripe->idleBuckets.back();
ClosedBucket closed{
bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()};
- if (_removeBucket(bucket, true /* expiringBuckets */)) {
+
+ if (_removeBucket(stripe, stripeLock, bucket)) {
stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1);
closedBuckets->push_back(closed);
++numClosed;
@@ -667,62 +1081,73 @@ void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats,
}
}
-std::size_t BucketCatalog::_numberOfIdleBuckets() const {
- stdx::lock_guard lk{_idleMutex};
- return _idleBuckets.size();
-}
-
-BucketCatalog::Bucket* BucketCatalog::_allocateBucket(const BucketKey& key,
- const Date_t& time,
- const TimeseriesOptions& options,
- ExecutionStats* stats,
- ClosedBuckets* closedBuckets,
- bool openedDuetoMetadata) {
- _expireIdleBuckets(stats, closedBuckets);
+BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe,
+ WithLock stripeLock,
+ const CreationInfo& info) {
+ _expireIdleBuckets(stripe, stripeLock, info.stats, info.closedBuckets);
- auto [bucketId, roundedTime] = generateBucketId(time, options);
+ auto [bucketId, roundedTime] = generateBucketId(info.time, info.options);
- auto [it, inserted] = _allBuckets.try_emplace(bucketId, std::make_unique<Bucket>(bucketId));
+ auto [it, inserted] =
+ stripe->allBuckets.try_emplace(bucketId, std::make_unique<Bucket>(bucketId, info.stripe));
tassert(6130900, "Expected bucket to be inserted", inserted);
Bucket* bucket = it->second.get();
- _openBuckets[key] = bucket;
- {
- stdx::lock_guard statesLk{_statesMutex};
- _bucketStates.emplace(bucketId, BucketState::kNormal);
- }
+ stripe->openBuckets[info.key] = bucket;
+ _initializeBucketState(bucketId);
- if (openedDuetoMetadata) {
- stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1);
+ if (info.openedDuetoMetadata) {
+ info.stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1);
}
- bucket->_timeField = options.getTimeField().toString();
+ bucket->_timeField = info.options.getTimeField().toString();
// Make sure we set the control.min time field to match the rounded _id timestamp.
- auto controlDoc = buildControlMinTimestampDoc(options.getTimeField(), roundedTime);
+ auto controlDoc = buildControlMinTimestampDoc(info.options.getTimeField(), roundedTime);
bucket->_minmax.update(
controlDoc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator());
return bucket;
}
+BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe,
+ WithLock stripeLock,
+ Bucket* bucket,
+ const CreationInfo& info) {
+
+ if (bucket->allCommitted()) {
+ // The bucket does not contain any measurements that are yet to be committed, so we can
+ // remove it now.
+ info.closedBuckets->push_back(ClosedBucket{
+ bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()});
+
+ bool removed = _removeBucket(stripe, stripeLock, bucket);
+ invariant(removed);
+ } else {
+ // We must keep the bucket around until it is committed, just mark it full so it we know to
+ // clean it up when the last batch finishes.
+ bucket->_full = true;
+ }
+
+ return _allocateBucket(stripe, stripeLock, info);
+}
+
std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats(
const NamespaceString& ns) {
- {
- auto lock = _statsMutex.lockShared();
- auto it = _executionStats.find(ns);
- if (it != _executionStats.end()) {
- return it->second;
- }
+ stdx::lock_guard catalogLock{_mutex};
+ auto it = _executionStats.find(ns);
+ if (it != _executionStats.end()) {
+ return it->second;
}
- auto lock = _statsMutex.lockExclusive();
auto res = _executionStats.emplace(ns, std::make_shared<ExecutionStats>());
return res.first->second;
}
const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats(
const NamespaceString& ns) const {
- auto lock = _statsMutex.lockShared();
+ static const auto kEmptyStats{std::make_shared<ExecutionStats>()};
+
+ stdx::lock_guard catalogLock{_mutex};
auto it = _executionStats.find(ns);
if (it != _executionStats.end()) {
@@ -731,9 +1156,25 @@ const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutio
return kEmptyStats;
}
+void BucketCatalog::_initializeBucketState(const OID& id) {
+ stdx::lock_guard catalogLock{_mutex};
+ _bucketStates.emplace(id, BucketState::kNormal);
+}
+
+void BucketCatalog::_eraseBucketState(const OID& id) {
+ stdx::lock_guard catalogLock{_mutex};
+ _bucketStates.erase(id);
+}
+
+boost::optional<BucketCatalog::BucketState> BucketCatalog::_getBucketState(const OID& id) const {
+ stdx::lock_guard catalogLock{_mutex};
+ auto it = _bucketStates.find(id);
+ return it != _bucketStates.end() ? boost::make_optional(it->second) : boost::none;
+}
+
boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const OID& id,
BucketState target) {
- stdx::lock_guard statesLk{_statesMutex};
+ stdx::lock_guard catalogLock{_mutex};
auto it = _bucketStates.find(id);
if (it == _bucketStates.end()) {
return boost::none;
@@ -771,564 +1212,31 @@ boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const
return state;
}
-BucketCatalog::BucketMetadata::BucketMetadata(BSONElement elem,
- const StringData::ComparatorInterface* comparator)
- : _metadataElement(elem), _comparator(comparator) {}
-
-BucketCatalog::BucketMetadata::BucketMetadata(BSONElement elem,
- BSONObj obj,
- const StringData::ComparatorInterface* comparator,
- bool normalized,
- bool copied)
- : _metadataElement(elem),
- _metadata(obj),
- _comparator(comparator),
- _normalized(normalized),
- _copied(copied) {}
-
-
-void BucketCatalog::BucketMetadata::normalize() {
- if (!_normalized) {
- if (_metadataElement) {
- BSONObjBuilder objBuilder;
- // We will get an object of equal size, just with reordered fields.
- objBuilder.bb().reserveBytes(_metadataElement.size());
- normalizeTopLevel(&objBuilder, _metadataElement);
- _metadata = objBuilder.obj();
- }
- // Updates the BSONElement to refer to the copied BSONObj.
- _metadataElement = _metadata.firstElement();
- _normalized = true;
- _copied = true;
- }
-}
-
-bool BucketCatalog::BucketMetadata::operator==(const BucketMetadata& other) const {
- return _metadataElement.binaryEqualValues(other._metadataElement);
-}
-
-const BSONObj& BucketCatalog::BucketMetadata::toBSON() const {
- // Should only be called after the metadata is owned.
- invariant(_copied);
- return _metadata;
-}
-
-const BSONElement BucketCatalog::BucketMetadata::getMetaElement() const {
- return _metadataElement;
-}
-
-StringData BucketCatalog::BucketMetadata::getMetaField() const {
- return StringData(_metadataElement.fieldName());
-}
-
-const StringData::ComparatorInterface* BucketCatalog::BucketMetadata::getComparator() const {
- return _comparator;
-}
-
-BucketCatalog::Bucket::Bucket(const OID& id) : _id(id) {}
-
-const OID& BucketCatalog::Bucket::id() const {
- return _id;
-}
-
-StringData BucketCatalog::Bucket::getTimeField() {
- return _timeField;
-}
-
-void BucketCatalog::Bucket::_calculateBucketFieldsAndSizeChange(
- const BSONObj& doc,
- boost::optional<StringData> metaField,
- NewFieldNames* newFieldNamesToBeInserted,
- uint32_t* newFieldNamesSize,
- uint32_t* sizeToBeAdded) const {
- // BSON size for an object with an empty object field where field name is empty string.
- // We can use this as an offset to know the size when we have real field names.
- static constexpr int emptyObjSize = 12;
- // Validate in debug builds that this size is correct
- dassert(emptyObjSize == BSON("" << BSONObj()).objsize());
-
- newFieldNamesToBeInserted->clear();
- *newFieldNamesSize = 0;
- *sizeToBeAdded = 0;
- auto numMeasurementsFieldLength = numDigits(_numMeasurements);
- for (const auto& elem : doc) {
- auto fieldName = elem.fieldNameStringData();
- if (fieldName == metaField) {
- // Ignore the metadata field since it will not be inserted.
- continue;
- }
-
- // If the field name is new, add the size of an empty object with that field name.
- auto hashedKey = StringSet::hasher().hashed_key(fieldName);
- if (!_fieldNames.contains(hashedKey)) {
- newFieldNamesToBeInserted->push_back(hashedKey);
- *newFieldNamesSize += elem.fieldNameSize();
- *sizeToBeAdded += emptyObjSize + fieldName.size();
- }
-
- // Add the element size, taking into account that the name will be changed to its
- // positional number. Add 1 to the calculation since the element's field name size
- // accounts for a null terminator whereas the stringified position does not.
- *sizeToBeAdded += elem.size() - elem.fieldNameSize() + numMeasurementsFieldLength + 1;
- }
-}
-
-bool BucketCatalog::Bucket::_hasBeenCommitted() const {
- return _numCommittedMeasurements != 0 || _preparedBatch;
-}
-
-bool BucketCatalog::Bucket::allCommitted() const {
- return _batches.empty() && !_preparedBatch;
-}
-
-uint32_t BucketCatalog::Bucket::numMeasurements() const {
- return _numMeasurements;
-}
-
-std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch(
- OperationId opId, const std::shared_ptr<ExecutionStats>& stats) {
- auto it = _batches.find(opId);
- if (it == _batches.end()) {
- it = _batches.try_emplace(opId, std::make_shared<WriteBatch>(_id, opId, stats)).first;
- }
- return it->second;
-}
-
-BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
- BucketKey& key,
- const TimeseriesOptions& options,
- ExecutionStats* stats,
- ClosedBuckets* closedBuckets,
- const Date_t& time)
- : _catalog(catalog), _key(&key), _options(&options), _stats(stats), _time(&time) {
-
- auto bucketFound = [](BucketState bucketState) {
- return bucketState == BucketState::kNormal || bucketState == BucketState::kPrepared;
- };
-
- // First we try to find the bucket without normalizing the key as the normalization is an
- // expensive operation.
- auto hashedKey = BucketHasher{}.hashed_key(key);
- if (bucketFound(_findOpenBucketThenLock(hashedKey))) {
- return;
- }
-
- // If not found, we normalize the metadata object and try to find it again.
- // Save a copy of the non-normalized metadata before normalizing so we can add this key if the
- // bucket was found for the normalized metadata. The BSON element is still refering to that of
- // the document in current scope at this point. We will only make a copy of it when we decide to
- // store it.
- BSONElement nonNormalizedMetadata = key.metadata.getMetaElement();
- key.metadata.normalize();
- HashedBucketKey hashedNormalizedKey = BucketHasher{}.hashed_key(key);
-
- if (bucketFound(_findOpenBucketThenLock(hashedNormalizedKey))) {
- // Bucket found, check if we have available slots to store the non-normalized key
- if (_bucket->_nonNormalizedKeyMetadatas.size() ==
- _bucket->_nonNormalizedKeyMetadatas.capacity()) {
- return;
- }
-
- // Release the bucket as we need to acquire the exclusive lock for the catalog.
- release();
-
- // Re-construct the key as it were before normalization.
- auto originalBucketKey = nonNormalizedMetadata
- ? key.withCopiedMetadata(nonNormalizedMetadata.wrap())
- : key.withCopiedMetadata(BSONObj());
- hashedKey.key = &originalBucketKey;
-
- // Find the bucket under a catalog exclusive lock for the catalog. It may have been modified
- // since we released our locks. If found we store the key to avoid the need to normalize for
- // future lookups with this incoming field order.
- BSONObj nonNormalizedMetadataObj =
- nonNormalizedMetadata ? nonNormalizedMetadata.wrap() : BSONObj();
- if (bucketFound(_findOpenBucketThenLockAndStoreKey(
- hashedNormalizedKey, hashedKey, nonNormalizedMetadataObj))) {
- return;
- }
- }
-
- // Bucket not found, grab exclusive lock and create bucket with the key before normalization.
- auto originalBucketKey = nonNormalizedMetadata
- ? key.withCopiedMetadata(nonNormalizedMetadata.wrap())
- : key.withCopiedMetadata(BSONObj());
- hashedKey.key = &originalBucketKey;
- auto lk = _catalog->_lockExclusive();
- _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets);
-}
-
-BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
- const OID& bucketId,
- boost::optional<BucketState> targetState)
- : _catalog(catalog) {
- invariant(!targetState || targetState == BucketState::kNormal ||
- targetState == BucketState::kPrepared);
-
- {
- auto lk = _catalog->_lockShared();
- auto bucketIt = _catalog->_allBuckets.find(bucketId);
- if (bucketIt == _catalog->_allBuckets.end()) {
- return;
- }
-
- _bucket = bucketIt->second.get();
- _acquire();
- }
-
- auto state =
- targetState ? _catalog->_setBucketState(_bucket->_id, *targetState) : _getBucketState();
- if (!state || state == BucketState::kCleared || state == BucketState::kPreparedAndCleared) {
- release();
- }
-}
-
-BucketCatalog::BucketAccess::~BucketAccess() {
- if (isLocked()) {
- release();
- }
-}
-
-boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketAccess::_getBucketState() const {
- stdx::lock_guard lk{_catalog->_statesMutex};
- auto it = _catalog->_bucketStates.find(_bucket->_id);
- return it != _catalog->_bucketStates.end() ? boost::make_optional(it->second) : boost::none;
-}
-
-BucketCatalog::BucketState BucketCatalog::BucketAccess::_findOpenBucketThenLock(
- const HashedBucketKey& key) {
- {
- auto lk = _catalog->_lockShared();
- auto it = _catalog->_openBuckets.find(key);
- if (it == _catalog->_openBuckets.end()) {
- // Bucket does not exist.
- return BucketState::kCleared;
- }
-
- _bucket = it->second;
- _acquire();
- }
-
- return _confirmStateForAcquiredBucket();
-}
-
-BucketCatalog::BucketState BucketCatalog::BucketAccess::_findOpenBucketThenLockAndStoreKey(
- const HashedBucketKey& normalizedKey,
- const HashedBucketKey& nonNormalizedKey,
- BSONObj nonNormalizedMetadata) {
- invariant(!isLocked());
- {
- auto lk = _catalog->_lockExclusive();
- auto it = _catalog->_openBuckets.find(normalizedKey);
- if (it == _catalog->_openBuckets.end()) {
- // Bucket does not exist.
- return BucketState::kCleared;
- }
-
- _bucket = it->second;
- _acquire();
-
- // Store the non-normalized key if we still have free slots
- if (_bucket->_nonNormalizedKeyMetadatas.size() <
- _bucket->_nonNormalizedKeyMetadatas.capacity()) {
- auto [_, inserted] =
- _catalog->_openBuckets.insert(std::make_pair(nonNormalizedKey, _bucket));
- if (inserted) {
- _bucket->_nonNormalizedKeyMetadatas.push_back(nonNormalizedMetadata);
- // Increment the memory usage to store this key and value in _openBuckets
- _bucket->_memoryUsage += nonNormalizedKey.key->ns.size() +
- nonNormalizedMetadata.objsize() + sizeof(_bucket);
+class BucketCatalog::ServerStatus : public ServerStatusSection {
+ struct BucketCounts {
+ BucketCounts& operator+=(const BucketCounts& other) {
+ if (&other != this) {
+ all += other.all;
+ open += other.open;
+ idle += other.idle;
}
+ return *this;
}
- }
-
- return _confirmStateForAcquiredBucket();
-}
-
-BucketCatalog::BucketState BucketCatalog::BucketAccess::_confirmStateForAcquiredBucket() {
- auto state = *_getBucketState();
- if (state == BucketState::kCleared || state == BucketState::kPreparedAndCleared) {
- release();
- } else {
- _catalog->_markBucketNotIdle(_bucket, false /* locked */);
- }
-
- return state;
-}
-
-void BucketCatalog::BucketAccess::_findOrCreateOpenBucketThenLock(
- const HashedBucketKey& normalizedKey,
- const HashedBucketKey& nonNormalizedKey,
- ClosedBuckets* closedBuckets) {
- auto it = _catalog->_openBuckets.find(normalizedKey);
- if (it == _catalog->_openBuckets.end()) {
- // No open bucket for this metadata.
- _create(normalizedKey, nonNormalizedKey, closedBuckets);
- return;
- }
-
- _bucket = it->second;
- _acquire();
-
- auto state = *_getBucketState();
- if (state == BucketState::kNormal || state == BucketState::kPrepared) {
- _catalog->_markBucketNotIdle(_bucket, false /* locked */);
- return;
- }
-
- _catalog->_abort(_guard, _bucket, nullptr, boost::none);
- _create(normalizedKey, nonNormalizedKey, closedBuckets);
-}
-
-void BucketCatalog::BucketAccess::_acquire() {
- invariant(_bucket);
- _guard = stdx::unique_lock<Mutex>(_bucket->_mutex);
-}
-
-void BucketCatalog::BucketAccess::_create(const HashedBucketKey& normalizedKey,
- const HashedBucketKey& nonNormalizedKey,
- ClosedBuckets* closedBuckets,
- bool openedDuetoMetadata) {
- invariant(_options);
- _bucket = _catalog->_allocateBucket(
- normalizedKey, *_time, *_options, _stats, closedBuckets, openedDuetoMetadata);
- _catalog->_openBuckets[nonNormalizedKey] = _bucket;
- _bucket->_nonNormalizedKeyMetadatas.push_back(nonNormalizedKey.key->metadata.toBSON());
- _acquire();
-}
-
-void BucketCatalog::BucketAccess::release() {
- invariant(_guard.owns_lock());
- _guard.unlock();
- _bucket = nullptr;
-}
-
-bool BucketCatalog::BucketAccess::isLocked() const {
- return _bucket && _guard.owns_lock();
-}
-
-BucketCatalog::Bucket* BucketCatalog::BucketAccess::operator->() {
- invariant(isLocked());
- return _bucket;
-}
-
-BucketCatalog::BucketAccess::operator bool() const {
- return isLocked();
-}
-
-BucketCatalog::BucketAccess::operator BucketCatalog::Bucket*() const {
- return _bucket;
-}
-
-bool BucketCatalog::BucketAccess::schemaIncompatible(
- const BSONObj& input,
- boost::optional<StringData> metaField,
- const StringData::ComparatorInterface* comparator) {
- // (Generic FCV reference): TODO (SERVER-60912): Update once kLastLTS is 6.0
- if (serverGlobalParams.featureCompatibility.getVersion() ==
- multiversion::GenericFCV::kLastLTS) {
- return false;
- }
-
- auto result = _bucket->_schema.update(input, metaField, comparator);
- return (result == timeseries::Schema::UpdateStatus::Failed);
-}
-
-void BucketCatalog::BucketAccess::rollover(
- const std::function<bool(BucketAccess*)>& shouldCloseBucket, ClosedBuckets* closedBuckets) {
- invariant(isLocked());
- invariant(_key);
- invariant(_time);
-
- auto oldId = _bucket->id();
- release();
-
- // Precompute the hash outside the lock, since it's expensive.
- auto prevMetadata = _key->metadata.getMetaElement();
- _key->metadata.normalize();
- auto hashedNormalizedKey = BucketHasher{}.hashed_key(*_key);
- auto prevBucketKey = prevMetadata ? _key->withCopiedMetadata(prevMetadata.wrap())
- : _key->withCopiedMetadata(BSONObj());
- auto hashedKey = BucketHasher{}.hashed_key(prevBucketKey);
-
- auto lk = _catalog->_lockExclusive();
- _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets);
-
- // Recheck if still full now that we've reacquired the bucket.
- bool sameBucket =
- oldId == _bucket->id(); // Only record stats if bucket has changed, don't double-count.
- if (sameBucket || shouldCloseBucket(this)) {
- // The bucket is indeed full, so create a new one.
- if (_bucket->allCommitted()) {
- // The bucket does not contain any measurements that are yet to be committed, so we can
- // remove it now. Otherwise, we must keep the bucket around until it is committed.
- closedBuckets->push_back(ClosedBucket{
- _bucket->id(), _bucket->getTimeField().toString(), _bucket->numMeasurements()});
-
- Bucket* oldBucket = _bucket;
- release();
- bool removed = _catalog->_removeBucket(oldBucket, false /* expiringBuckets */);
- invariant(removed);
- } else {
- _bucket->_full = true;
-
- // We will recreate a new bucket for the same key below. We also need to cleanup all
- // extra metadata keys added for the old bucket instance.
- _catalog->_removeNonNormalizedKeysForBucket(_bucket);
- release();
- }
-
- _create(hashedNormalizedKey, hashedKey, closedBuckets, false /* openedDueToMetadata */);
- }
-}
-
-Date_t BucketCatalog::BucketAccess::getTime() const {
- return _bucket->id().asDateT();
-}
-
-BucketCatalog::WriteBatch::WriteBatch(const OID& bucketId,
- OperationId opId,
- const std::shared_ptr<ExecutionStats>& stats)
- : _bucketId{bucketId}, _opId(opId), _stats{stats} {}
-
-bool BucketCatalog::WriteBatch::claimCommitRights() {
- return !_commitRights.swap(true);
-}
-
-StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() const {
- if (!_promise.getFuture().isReady()) {
- _stats->numWaits.fetchAndAddRelaxed(1);
- }
- return _promise.getFuture().getNoThrow();
-}
-
-const OID& BucketCatalog::WriteBatch::bucketId() const {
- return _bucketId;
-}
-
-const std::vector<BSONObj>& BucketCatalog::WriteBatch::measurements() const {
- invariant(!_active);
- return _measurements;
-}
-
-const BSONObj& BucketCatalog::WriteBatch::min() const {
- invariant(!_active);
- return _min;
-}
-
-const BSONObj& BucketCatalog::WriteBatch::max() const {
- invariant(!_active);
- return _max;
-}
-
-const StringMap<std::size_t>& BucketCatalog::WriteBatch::newFieldNamesToBeInserted() const {
- invariant(!_active);
- return _newFieldNamesToBeInserted;
-}
-
-uint32_t BucketCatalog::WriteBatch::numPreviouslyCommittedMeasurements() const {
- invariant(!_active);
- return _numPreviouslyCommittedMeasurements;
-}
-
-bool BucketCatalog::WriteBatch::active() const {
- return _active;
-}
-
-bool BucketCatalog::WriteBatch::finished() const {
- return _promise.getFuture().isReady();
-}
-
-BSONObj BucketCatalog::WriteBatch::toBSON() const {
- auto toFieldName = [](const auto& nameHashPair) { return nameHashPair.first; };
- return BSON("docs" << _measurements << "bucketMin" << _min << "bucketMax" << _max
- << "numCommittedMeasurements" << int(_numPreviouslyCommittedMeasurements)
- << "newFieldNamesToBeInserted"
- << std::set<std::string>(
- boost::make_transform_iterator(_newFieldNamesToBeInserted.begin(),
- toFieldName),
- boost::make_transform_iterator(_newFieldNamesToBeInserted.end(),
- toFieldName)));
-}
-void BucketCatalog::WriteBatch::_addMeasurement(const BSONObj& doc) {
- invariant(_active);
- _measurements.push_back(doc);
-}
-
-void BucketCatalog::WriteBatch::_recordNewFields(NewFieldNames&& fields) {
- invariant(_active);
- for (auto&& field : fields) {
- _newFieldNamesToBeInserted[field] = field.hash();
- }
-}
-
-void BucketCatalog::WriteBatch::_prepareCommit(Bucket* bucket) {
- invariant(_commitRights.load());
- invariant(_active);
- _active = false;
- _numPreviouslyCommittedMeasurements = bucket->_numCommittedMeasurements;
+ std::size_t all = 0;
+ std::size_t open = 0;
+ std::size_t idle = 0;
+ };
- // Filter out field names that were new at the time of insertion, but have since been committed
- // by someone else.
- for (auto it = _newFieldNamesToBeInserted.begin(); it != _newFieldNamesToBeInserted.end();) {
- StringMapHashedKey fieldName(it->first, it->second);
- if (bucket->_fieldNames.contains(fieldName)) {
- _newFieldNamesToBeInserted.erase(it++);
- continue;
+ BucketCounts _getBucketCounts(const BucketCatalog& catalog) const {
+ BucketCounts sum;
+ for (auto const& stripe : catalog._stripes) {
+ stdx::lock_guard stripeLock{stripe.mutex};
+ sum += {stripe.allBuckets.size(), stripe.openBuckets.size(), stripe.idleBuckets.size()};
}
-
- bucket->_fieldNames.emplace(fieldName);
- ++it;
- }
-
- for (const auto& doc : _measurements) {
- bucket->_minmax.update(
- doc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator());
- }
-
- const bool isUpdate = _numPreviouslyCommittedMeasurements > 0;
- if (isUpdate) {
- _min = bucket->_minmax.minUpdates();
- _max = bucket->_minmax.maxUpdates();
- } else {
- _min = bucket->_minmax.min();
- _max = bucket->_minmax.max();
-
- // Approximate minmax memory usage by taking sizes of initial commit. Subsequent updates may
- // add fields but are most likely just to update values.
- bucket->_memoryUsage += _min.objsize();
- bucket->_memoryUsage += _max.objsize();
-
- // We don't have a great approximation for the memory usage of _schema, so we use the max as
- // a stand-in.
- bucket->_memoryUsage += _max.objsize();
- }
-}
-
-void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) {
- invariant(_commitRights.load());
- invariant(!_active);
- _promise.emplaceValue(info);
-}
-
-void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status,
- const Bucket* bucket) {
- if (finished()) {
- return;
+ return sum;
}
- _active = false;
- std::string nsIdentification;
- if (bucket) {
- nsIdentification.append(str::stream() << " for namespace " << bucket->_ns);
- }
- _promise.setError(status.value_or(Status{ErrorCodes::TimeseriesBucketCleared,
- str::stream() << "Time-series bucket " << _bucketId
- << nsIdentification << " was cleared"}));
-}
-
-class BucketCatalog::ServerStatus : public ServerStatusSection {
public:
ServerStatus() : ServerStatusSection("bucketCatalog") {}
@@ -1339,20 +1247,17 @@ public:
BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override {
const auto& bucketCatalog = BucketCatalog::get(opCtx);
{
- auto statsLk = bucketCatalog._statsMutex.lockShared();
+ stdx::lock_guard catalogLock{bucketCatalog._mutex};
if (bucketCatalog._executionStats.empty()) {
return {};
}
}
- auto lk = bucketCatalog._lockShared();
+ auto counts = _getBucketCounts(bucketCatalog);
BSONObjBuilder builder;
- builder.appendNumber("numBuckets",
- static_cast<long long>(bucketCatalog._allBuckets.size()));
- builder.appendNumber("numOpenBuckets",
- static_cast<long long>(bucketCatalog._openBuckets.size()));
- builder.appendNumber("numIdleBuckets",
- static_cast<long long>(bucketCatalog._numberOfIdleBuckets()));
+ builder.appendNumber("numBuckets", static_cast<long long>(counts.all));
+ builder.appendNumber("numOpenBuckets", static_cast<long long>(counts.open));
+ builder.appendNumber("numIdleBuckets", static_cast<long long>(counts.idle));
builder.appendNumber("memoryUsage",
static_cast<long long>(bucketCatalog._memoryUsage.load()));
return builder.obj();
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 3b5b4e752a7..b33a9025b3c 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -46,15 +46,22 @@
namespace mongo {
class BucketCatalog {
- struct ExecutionStats;
-
// Number of new field names we can hold in NewFieldNames without needing to allocate memory.
static constexpr std::size_t kNumStaticNewFields = 10;
using NewFieldNames = boost::container::small_vector<StringMapHashedKey, kNumStaticNewFields>;
-public:
+ using StripeNumber = std::uint8_t;
+
+ struct BucketHandle {
+ const OID id;
+ const StripeNumber stripe;
+ };
+
+ struct ExecutionStats;
class Bucket;
+ struct CreationInfo;
+public:
enum class CombineWithInsertsFromOtherClients {
kAllow,
kDisallow,
@@ -66,7 +73,7 @@ public:
};
/**
- * Information of a Bucket that got closed when performing an operation on this BucketCatalog.
+ * Information of a Bucket that got closed while performing an operation on this BucketCatalog.
*/
struct ClosedBucket {
OID bucketId;
@@ -91,24 +98,28 @@ public:
public:
WriteBatch() = delete;
- WriteBatch(const OID& bucketId,
+ WriteBatch(const BucketHandle& bucketId,
OperationId opId,
const std::shared_ptr<ExecutionStats>& stats);
/**
- * Attempt to claim the right to commit (or abort) a batch. If it returns true, rights are
+ * Attempts to claim the right to commit a batch. If it returns true, rights are
* granted. If it returns false, rights are revoked, and the caller should get the result
* of the batch with getResult(). Non-blocking.
*/
bool claimCommitRights();
/**
- * Retrieve the result of the write batch commit. Should be called by any interested party
+ * Retrieves the result of the write batch commit. Should be called by any interested party
* that does not have commit rights. Blocking.
*/
StatusWith<CommitInfo> getResult() const;
- const OID& bucketId() const;
+ /**
+ * Returns a handle which can be used by the BucketCatalog internally to locate its record
+ * for this bucket.
+ */
+ const BucketHandle& bucket() const;
const std::vector<BSONObj>& measurements() const;
const BSONObj& min() const;
@@ -117,12 +128,7 @@ public:
uint32_t numPreviouslyCommittedMeasurements() const;
/**
- * Whether the batch is active and can be written to.
- */
- bool active() const;
-
- /**
- * Whether the batch has been committed or aborted.
+ * Returns whether the batch has already been committed or aborted.
*/
bool finished() const;
@@ -130,36 +136,36 @@ public:
private:
/**
- * Add a measurement. Active batches only.
+ * Adds a measurement. Active batches only.
*/
void _addMeasurement(const BSONObj& doc);
/**
- * Record a set of new-to-the-bucket fields. Active batches only.
+ * Records a set of new-to-the-bucket fields. Active batches only.
*/
void _recordNewFields(NewFieldNames&& fields);
/**
- * Prepare the batch for commit. Sets min/max appropriately, records the number of documents
- * that have previously been committed to the bucket, and marks the batch inactive. Must
- * have commit rights.
+ * Prepares the batch for commit. Sets min/max appropriately, records the number of
+ * documents that have previously been committed to the bucket, and renders the batch
+ * inactive. Must have commit rights.
*/
void _prepareCommit(Bucket* bucket);
/**
- * Report the result and status of a commit, and notify anyone waiting on getResult(). Must
- * have commit rights. Inactive batches only.
+ * Reports the result and status of a commit, and notifies anyone waiting on getResult().
+ * Must have commit rights. Inactive batches only.
*/
void _finish(const CommitInfo& info);
/**
- * Abandon the write batch and notify any waiters that the bucket has been cleared. Must
- * have commit rights. Parameter 'bucket' provides a pointer to the bucket if still
- * available, nullptr otherwise.
+ * Abandons the write batch and notifies any waiters that the bucket has been cleared.
+ * Parameter 'bucket' provides a pointer to the bucket if still available, nullptr
+ * otherwise.
*/
void _abort(const boost::optional<Status>& status, const Bucket* bucket);
- const OID _bucketId;
+ const BucketHandle _bucket;
OperationId _opId;
std::shared_ptr<ExecutionStats> _stats;
@@ -169,15 +175,12 @@ public:
uint32_t _numPreviouslyCommittedMeasurements = 0;
StringMap<std::size_t> _newFieldNamesToBeInserted; // Value is hash of string key
- bool _active = true;
-
AtomicWord<bool> _commitRights{false};
SharedPromise<CommitInfo> _promise;
};
/**
- * Return type for the insert function.
- * See comment above insert() for more information.
+ * Return type for the insert function. See insert() for more information.
*/
struct InsertResult {
std::shared_ptr<WriteBatch> batch;
@@ -200,12 +203,13 @@ public:
* Returns an empty document if the given bucket cannot be found or if this time-series
* collection was not created with a metadata field name.
*/
- BSONObj getMetadata(const OID& bucketId) const;
+ BSONObj getMetadata(const BucketHandle& bucket) const;
/**
- * Returns the WriteBatch into which the document was inserted and optional information about a
- * bucket if one was closed. Any caller who receives the same batch may commit or abort the
- * batch after claiming commit rights. See WriteBatch for more details.
+ * Returns the WriteBatch into which the document was inserted and a list of any buckets that
+ * were closed in order to make space to insert the document. Any caller who receives the same
+ * batch may commit or abort the batch after claiming commit rights. See WriteBatch for more
+ * details.
*/
StatusWith<InsertResult> insert(OperationContext* opCtx,
const NamespaceString& ns,
@@ -230,9 +234,8 @@ public:
boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info);
/**
- * Aborts the given write batch and any other outstanding batches on the same bucket. Caller
- * must already have commit rights on batch. Uses the provided status when clearing the bucket,
- * or TimeseriesBucketCleared if not provided.
+ * Aborts the given write batch and any other outstanding batches on the same bucket. Uses the
+ * provided status when clearing the bucket, or TimeseriesBucketCleared if not provided.
*/
void abort(std::shared_ptr<WriteBatch> batch,
const boost::optional<Status>& status = boost::none);
@@ -264,40 +267,18 @@ public:
void appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const;
private:
- /**
- * This class provides a mutex with shared and exclusive locking semantics. Unlike some shared
- * mutex implementations, it does not allow for writer starvation (assuming the underlying
- * Mutex implemenation does not allow for starvation). The underlying mechanism is simply an
- * array of Mutex instances. To take a shared lock, a thread's ID is hashed, mapping the thread
- * to a particular mutex, which is then locked. To take an exclusive lock, all mutexes are
- * locked.
- *
- * This behavior makes it easy to allow concurrent read access while still allowing writes to
- * occur safely with exclusive access. It should only be used for situations where observed
- * access patterns are read-mostly.
- *
- * A shared lock *cannot* be upgraded to an exclusive lock.
- */
- class StripedMutex {
- public:
- static constexpr std::size_t kNumStripes = 16;
- StripedMutex() = default;
-
- using SharedLock = stdx::unique_lock<Mutex>;
- SharedLock lockShared() const;
-
- class ExclusiveLock {
- public:
- ExclusiveLock() = default;
- explicit ExclusiveLock(const StripedMutex&);
-
- private:
- std::array<stdx::unique_lock<Mutex>, kNumStripes> _locks;
- };
- ExclusiveLock lockExclusive() const;
-
- private:
- mutable std::array<Mutex, kNumStripes> _mutexes;
+ enum class BucketState {
+ // Bucket can be inserted into, and does not have an outstanding prepared commit
+ kNormal,
+ // Bucket can be inserted into, and has a prepared commit outstanding.
+ kPrepared,
+ // Bucket can no longer be inserted into, does not have an outstanding prepared
+ // commit.
+ kCleared,
+ // Bucket can no longer be inserted into, but still has an outstanding
+ // prepared commit. Any writer other than the one who prepared the
+ // commit should receive a WriteConflictException.
+ kPreparedAndCleared,
};
struct BucketMetadata {
@@ -305,24 +286,10 @@ private:
BucketMetadata() = default;
BucketMetadata(BSONElement elem, const StringData::ComparatorInterface* comparator);
- // Constructs with a copy of the metadata.
- BucketMetadata(BSONElement elem,
- BSONObj obj,
- const StringData::ComparatorInterface* comparator,
- bool normalized = false,
- bool copied = true);
-
- bool normalized() const {
- return _normalized;
- }
- void normalize();
-
bool operator==(const BucketMetadata& other) const;
const BSONObj& toBSON() const;
- const BSONElement getMetaElement() const;
-
StringData getMetaField() const;
const StringData::ComparatorInterface* getComparator() const;
@@ -337,177 +304,24 @@ private:
private:
// Only the value of '_metadataElement' is used for hashing and comparison.
- // When BucketMetadata does not own the '_metadata', only '_metadataElement' will be present
- // and used to look up buckets. After owning the '_metadata,' the field should refer to the
- // BSONElement within '_metadata'.
BSONElement _metadataElement;
- // Empty when just looking up buckets. Owns a copy when the field is present.
+ // Empty if metadata field isn't present, owns a copy otherwise.
BSONObj _metadata;
- const StringData::ComparatorInterface* _comparator = nullptr;
- bool _normalized = false;
- bool _copied = false;
- };
-
- using IdleList = std::list<Bucket*>;
-
-public:
- class Bucket {
- public:
- friend class BucketAccess;
- friend class BucketCatalog;
-
- Bucket(const OID& id);
- /**
- * Returns the ID for the underlying bucket.
- */
- const OID& id() const;
-
- /**
- * Returns the timefield for the underlying bucket.
- */
- StringData getTimeField();
-
- /**
- * Returns whether all measurements have been committed.
- */
- bool allCommitted() const;
-
- /**
- * Returns total number of measurements in the bucket.
- */
- uint32_t numMeasurements() const;
-
- private:
- /**
- * Determines the effect of adding 'doc' to this bucket. If adding 'doc' causes this bucket
- * to overflow, we will create a new bucket and recalculate the change to the bucket size
- * and data fields.
- */
- void _calculateBucketFieldsAndSizeChange(const BSONObj& doc,
- boost::optional<StringData> metaField,
- NewFieldNames* newFieldNamesToBeInserted,
- uint32_t* newFieldNamesSize,
- uint32_t* sizeToBeAdded) const;
-
- /**
- * Returns whether BucketCatalog::commit has been called at least once on this bucket.
- */
- bool _hasBeenCommitted() const;
-
- /**
- * Return a pointer to the current, open batch.
- */
- std::shared_ptr<WriteBatch> _activeBatch(OperationId opId,
- const std::shared_ptr<ExecutionStats>& stats);
-
- // Access to the bucket is controlled by this lock
- mutable Mutex _mutex =
- MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), "BucketCatalog::Bucket::_mutex");
-
- // The bucket ID for the underlying document
- const OID _id;
-
- // The namespace that this bucket is used for.
- NamespaceString _ns;
-
- // The metadata of the data that this bucket contains.
- BucketMetadata _metadata;
-
- // Extra metadata combinations that are supported without normalizing the metadata object.
- static constexpr std::size_t kNumFieldOrderCombinationsWithoutNormalizing = 1;
- boost::container::static_vector<BSONObj, kNumFieldOrderCombinationsWithoutNormalizing>
- _nonNormalizedKeyMetadatas;
-
- // Top-level field names of the measurements that have been inserted into the bucket.
- StringSet _fieldNames;
-
- // Time field for the measurements that have been inserted into the bucket.
- std::string _timeField;
-
- // The minimum and maximum values for each field in the bucket.
- timeseries::MinMax _minmax;
-
- // The reference schema for measurements in this bucket. May reflect schema of uncommitted
- // measurements.
- timeseries::Schema _schema;
-
- // The latest time that has been inserted into the bucket.
- Date_t _latestTime;
-
- // The total size in bytes of the bucket's BSON serialization, including measurements to be
- // inserted.
- uint64_t _size = 0;
-
- // The total number of measurements in the bucket, including uncommitted measurements and
- // measurements to be inserted.
- uint32_t _numMeasurements = 0;
-
- // The number of committed measurements in the bucket.
- uint32_t _numCommittedMeasurements = 0;
-
- // Whether the bucket is full. This can be due to number of measurements, size, or time
- // range.
- bool _full = false;
-
- // The batch that has been prepared and is currently in the process of being committed, if
- // any.
- std::shared_ptr<WriteBatch> _preparedBatch;
-
- // Batches, per operation, that haven't been committed or aborted yet.
- stdx::unordered_map<OperationId, std::shared_ptr<WriteBatch>> _batches;
-
- // If the bucket is in _idleBuckets, then its position is recorded here.
- boost::optional<IdleList::iterator> _idleListEntry = boost::none;
-
- // Approximate memory usage of this bucket.
- uint64_t _memoryUsage = sizeof(*this);
- };
-
-private:
- struct ExecutionStats {
- AtomicWord<long long> numBucketInserts;
- AtomicWord<long long> numBucketUpdates;
- AtomicWord<long long> numBucketsOpenedDueToMetadata;
- AtomicWord<long long> numBucketsClosedDueToCount;
- AtomicWord<long long> numBucketsClosedDueToSchemaChange;
- AtomicWord<long long> numBucketsClosedDueToSize;
- AtomicWord<long long> numBucketsClosedDueToTimeForward;
- AtomicWord<long long> numBucketsClosedDueToTimeBackward;
- AtomicWord<long long> numBucketsClosedDueToMemoryThreshold;
- AtomicWord<long long> numCommits;
- AtomicWord<long long> numWaits;
- AtomicWord<long long> numMeasurementsCommitted;
- };
-
- enum class BucketState {
- // Bucket can be inserted into, and does not have an outstanding prepared commit
- kNormal,
- // Bucket can be inserted into, and has a prepared commit outstanding.
- kPrepared,
- // Bucket can no longer be inserted into, does not have an outstanding prepared
- // commit.
- kCleared,
- // Bucket can no longer be inserted into, but still has an outstanding
- // prepared commit. Any writer other than the one who prepared the
- // commit should receive a WriteConflictException.
- kPreparedAndCleared,
+ const StringData::ComparatorInterface* _comparator = nullptr;
};
/**
- * Key to lookup open Bucket for namespace and metadata.
+ * Key to lookup open Bucket for namespace and metadata, with pre-computed hash.
*/
struct BucketKey {
+ BucketKey() = delete;
+ BucketKey(const NamespaceString& nss, const BucketMetadata& meta);
+
NamespaceString ns;
BucketMetadata metadata;
-
- /**
- * Creates a new BucketKey with a different internal metadata object.
- */
- BucketKey withCopiedMetadata(BSONObj meta) const {
- return {ns, {meta.firstElement(), meta, metadata.getComparator()}};
- }
+ std::size_t hash;
bool operator==(const BucketKey& other) const {
return ns == other.ns && metadata == other.metadata;
@@ -520,235 +334,154 @@ private:
};
/**
- * BucketKey with pre-calculated hash. To avoiding calculating the hash while holding locks.
- *
- * The unhashed BucketKey is stored inside HashedBucketKey by reference and must not go out of
- * scope for the lifetime of the returned HashedBucketKey.
+ * Hasher to support pre-computed hash lookup for BucketKey.
*/
- struct HashedBucketKey {
- operator BucketKey() const {
- return *key;
- }
- const BucketKey* key;
- std::size_t hash;
+ struct BucketHasher {
+ std::size_t operator()(const BucketKey& key) const;
};
/**
- * Hasher to support heterogeneous lookup for BucketKey and HashedBucketKey.
+ * Struct to hold a portion of the buckets managed by the catalog.
+ *
+ * Each of the bucket lists, as well as the buckets themselves, are protected by 'mutex'.
*/
- struct BucketHasher {
- // This using directive activates heterogeneous lookup in the hash table
- using is_transparent = void;
+ struct Stripe {
+ mutable Mutex mutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "BucketCatalog::Stripe::mutex");
- std::size_t operator()(const BucketKey& key) const {
- // Use the default absl hasher.
- return absl::Hash<BucketKey>{}(key);
- }
+ // All buckets currently in the catalog, including buckets which are full but not yet
+ // committed.
+ stdx::unordered_map<OID, std::unique_ptr<Bucket>, OID::Hasher> allBuckets;
- std::size_t operator()(const HashedBucketKey& key) const {
- return key.hash;
- }
+ // The current open bucket for each namespace and metadata pair.
+ stdx::unordered_map<BucketKey, Bucket*, BucketHasher> openBuckets;
- /**
- * Pre-calculates a hashed BucketKey.
- */
- HashedBucketKey hashed_key(const BucketKey& key) {
- return HashedBucketKey{&key, operator()(key)};
- }
+ // Buckets that do not have any outstanding writes.
+ using IdleList = std::list<Bucket*>;
+ IdleList idleBuckets;
};
+ StripeNumber _getStripeNumber(const BucketKey& key);
+
/**
- * Equality, provides comparison between hashed and unhashed bucket keys.
+ * Mode enum to control whether the bucket retrieval methods below will return buckets that are
+ * in kCleared or kPreparedAndCleared state.
*/
- struct BucketEq {
- // This using directive activates heterogeneous lookup in the hash table
- using is_transparent = void;
-
- bool operator()(const BucketKey& lhs, const BucketKey& rhs) const {
- return lhs == rhs;
- }
- bool operator()(const BucketKey& lhs, const HashedBucketKey& rhs) const {
- return lhs == *rhs.key;
- }
- bool operator()(const HashedBucketKey& lhs, const BucketKey& rhs) const {
- return *lhs.key == rhs;
- }
- bool operator()(const HashedBucketKey& lhs, const HashedBucketKey& rhs) const {
- return *lhs.key == *rhs.key;
- }
- };
+ enum class ReturnClearedBuckets { kYes, kNo };
/**
- * Helper class to handle all the locking necessary to lookup and lock a bucket for use. This
- * is intended primarily for using a single bucket, including replacing it when it becomes full.
- * If the usage pattern iterates over several buckets, you will instead want to use raw access
- * using the different mutexes with the locking semantics described below.
+ * Retrieve a bucket for read-only use.
*/
- class BucketAccess {
- public:
- BucketAccess() = delete;
- BucketAccess(BucketCatalog* catalog,
- BucketKey& key,
- const TimeseriesOptions& options,
- ExecutionStats* stats,
- ClosedBuckets* closedBuckets,
- const Date_t& time);
- BucketAccess(BucketCatalog* catalog,
- const OID& bucketId,
- boost::optional<BucketState> targetState = boost::none);
- ~BucketAccess();
-
- bool isLocked() const;
- Bucket* operator->();
- operator bool() const;
- operator Bucket*() const;
-
- // Release the bucket lock, typically in order to reacquire the catalog lock.
- void release();
-
- /**
- * Determines if the schema for an incoming measurement is incompatible with those already
- * stored in the bucket.
- *
- * Returns true if incompatible
- */
- bool schemaIncompatible(const BSONObj& doc,
- boost::optional<StringData> metaField,
- const StringData::ComparatorInterface* comparator);
-
- /**
- * Close the existing, full bucket and open a new one for the same metadata.
- * Parameter is a function which should check that the bucket is indeed still full after
- * reacquiring the necessary locks. The first parameter will give the function access to
- * this BucketAccess instance, with the bucket locked.
- *
- * Returns bucket information of a bucket if one was closed.
- */
- void rollover(const std::function<bool(BucketAccess*)>& isBucketFull,
- ClosedBuckets* closedBuckets);
-
- // Retrieve the time associated with the bucket (id)
- Date_t getTime() const;
-
- private:
- /**
- * Returns the state of the bucket, or boost::none if there is no state for the bucket.
- */
- boost::optional<BucketState> _getBucketState() const;
-
- /**
- * Helper to find and lock an open bucket for the given metadata if it exists. Takes a
- * shared lock on the catalog. Returns the state of the bucket if it is locked and usable.
- * In case the bucket does not exist or was previously cleared and thus is not usable, the
- * return value will be BucketState::kCleared.
- */
- BucketState _findOpenBucketThenLock(const HashedBucketKey& key);
-
- /**
- * Same as _findOpenBucketThenLock above but takes an exclusive lock on the catalog. In
- * addition to finding the bucket it also store a non-normalized key if there are available
- * slots in the bucket.
- */
- BucketState _findOpenBucketThenLockAndStoreKey(const HashedBucketKey& normalizedKey,
- const HashedBucketKey& key,
- BSONObj metadata);
+ const Bucket* _findBucket(const Stripe& stripe,
+ WithLock stripeLock,
+ const OID& id,
+ ReturnClearedBuckets mode = ReturnClearedBuckets::kNo) const;
- /**
- * Helper to determine the state of the bucket that is found by _findOpenBucketThenLock and
- * _findOpenBucketThenLockAndStoreKey. Requires the bucket lock to be acquired before
- * calling this function and it may release the lock depending on the state.
- */
- BucketState _confirmStateForAcquiredBucket();
-
- // Helper to find an open bucket for the given metadata if it exists, create it if it
- // doesn't, and lock it. Requires an exclusive lock on the catalog.
- void _findOrCreateOpenBucketThenLock(const HashedBucketKey& normalizedKey,
- const HashedBucketKey& key,
- ClosedBuckets* closedBuckets);
-
- // Lock _bucket.
- void _acquire();
-
- // Allocate a new bucket in the catalog, set the local state to that bucket, and acquire
- // a lock on it.
- void _create(const HashedBucketKey& normalizedKey,
- const HashedBucketKey& key,
- ClosedBuckets* closedBuckets,
- bool openedDuetoMetadata = true);
-
- BucketCatalog* _catalog;
- BucketKey* _key = nullptr;
- const TimeseriesOptions* _options = nullptr;
- ExecutionStats* _stats = nullptr;
- const Date_t* _time = nullptr;
-
- Bucket* _bucket = nullptr;
- stdx::unique_lock<Mutex> _guard;
- };
+ /**
+ * Retrieve a bucket for write use.
+ */
+ Bucket* _useBucket(Stripe* stripe,
+ WithLock stripeLock,
+ const OID& id,
+ ReturnClearedBuckets mode);
- class ServerStatus;
+ /**
+ * Retrieve a bucket for write use, setting the state in the process.
+ */
+ Bucket* _useBucketInState(Stripe* stripe,
+ WithLock stripeLock,
+ const OID& id,
+ BucketState targetState);
- StripedMutex::SharedLock _lockShared() const;
- StripedMutex::ExclusiveLock _lockExclusive() const;
+ /**
+ * Retrieve a bucket for write use, or create one if a suitable bucket doesn't already exist.
+ */
+ Bucket* _useOrCreateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info);
- void _waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch);
+ /**
+ * Wait for other batches to finish so we can prepare 'batch'
+ */
+ void _waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch);
/**
* Removes the given bucket from the bucket catalog's internal data structures.
*/
- bool _removeBucket(Bucket* bucket, bool expiringBuckets);
+ bool _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket);
/**
- * Removes extra non-normalized BucketKey's for the given bucket from the
- * bucket catalog's internal data structures.
+ * Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other
+ * unprepared batches and remove the bucket from the catalog if there is no unprepared batch.
*/
- void _removeNonNormalizedKeysForBucket(Bucket* bucket);
+ void _abort(Stripe* stripe,
+ WithLock stripeLock,
+ std::shared_ptr<WriteBatch> batch,
+ const boost::optional<Status>& status = boost::none);
/**
- * Aborts any batches it can for the given bucket, then removes the bucket. If batch is
- * non-null, it is assumed that the caller has commit rights for that batch.
+ * Aborts any unprepared batches for the given bucket, then removes the bucket if there is no
+ * prepared batch. If 'batch' is non-null, it is assumed that the caller has commit rights for
+ * that batch.
*/
- void _abort(stdx::unique_lock<Mutex>& lk,
+ void _abort(Stripe* stripe,
+ WithLock stripeLock,
Bucket* bucket,
std::shared_ptr<WriteBatch> batch,
const boost::optional<Status>& status);
/**
- * Adds the bucket to a list of idle buckets to be expired at a later date
+ * Adds the bucket to a list of idle buckets to be expired at a later date.
*/
- void _markBucketIdle(Bucket* bucket);
+ void _markBucketIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket);
/**
* Remove the bucket from the list of idle buckets. The second parameter encodes whether the
* caller holds a lock on _idleMutex.
*/
- void _markBucketNotIdle(Bucket* bucket, bool locked);
+ void _markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket);
/**
- * Verify the bucket is currently unused by taking a lock on it. Must hold exclusive lock from
- * the outside for the result to be meaningful.
+ * Expires idle buckets until the bucket catalog's memory usage is below the expiry
+ * threshold.
*/
- void _verifyBucketIsUnused(Bucket* bucket) const;
+ void _expireIdleBuckets(Stripe* stripe,
+ WithLock stripeLock,
+ ExecutionStats* stats,
+ ClosedBuckets* closedBuckets);
/**
- * Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold.
+ * Allocates a new bucket and adds it to the catalog.
*/
- void _expireIdleBuckets(ExecutionStats* stats, ClosedBuckets* closedBuckets);
-
- std::size_t _numberOfIdleBuckets() const;
+ Bucket* _allocateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info);
- // Allocate a new bucket (and ID) and add it to the catalog
- Bucket* _allocateBucket(const BucketKey& key,
- const Date_t& time,
- const TimeseriesOptions& options,
- ExecutionStats* stats,
- ClosedBuckets* closedBuckets,
- bool openedDuetoMetadata);
+ /**
+ * Close the existing, full bucket and open a new one for the same metadata.
+ *
+ * Writes information about the closed bucket to the 'info' parameter.
+ */
+ Bucket* _rollover(Stripe* stripe,
+ WithLock stripeLock,
+ Bucket* bucket,
+ const CreationInfo& info);
std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns);
const std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const;
/**
+ * Retreives the bucket state if it is tracked in the catalog.
+ */
+ boost::optional<BucketState> _getBucketState(const OID& id) const;
+
+ /**
+ * Initializes state for the given bucket to kNormal.
+ */
+ void _initializeBucketState(const OID& id);
+
+ /**
+ * Remove state for the given bucket from the catalog.
+ */
+ void _eraseBucketState(const OID& id);
+
+ /**
* Changes the bucket state, taking into account the current state, the specified target state,
* and allowed state transitions. The return value, if set, is the final state of the bucket
* with the given id; if no such bucket exists, the return value will not be set.
@@ -758,59 +491,23 @@ private:
*/
boost::optional<BucketState> _setBucketState(const OID& id, BucketState target);
- /**
- * You must hold a lock on _bucketMutex when accessing _allBuckets or _openBuckets.
- * While holding a lock on _bucketMutex, you can take a lock on an individual bucket, then
- * release _bucketMutex. Any iterators on the protected structures should be considered invalid
- * once the lock is released. Any subsequent access to the structures requires relocking
- * _bucketMutex. You must *not* be holding a lock on a bucket when you attempt to acquire the
- * lock on _mutex, as this can result in deadlock.
- *
- * The StripedMutex class has both shared (read-only) and exclusive (write) locks. If you are
- * going to write to any of the protected structures, you must hold an exclusive lock.
- *
- * Typically, if you want to acquire a bucket, you should use the BucketAccess RAII
- * class to do so, as it will take care of most of this logic for you. Only use the _bucketMutex
- * directly for more global maintenance where you want to take the lock once and interact with
- * multiple buckets atomically.
- */
- mutable StripedMutex _bucketMutex;
-
- // All buckets currently in the catalog, including buckets which are full but not yet committed.
- stdx::unordered_map<OID, std::unique_ptr<Bucket>, OID::Hasher> _allBuckets;
+ static constexpr std::size_t kNumberOfStripes = 32;
+ std::array<Stripe, kNumberOfStripes> _stripes;
- // The current open bucket for each namespace and metadata pair.
- stdx::unordered_map<BucketKey, Bucket*, BucketHasher, BucketEq> _openBuckets;
+ mutable Mutex _mutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketCatalog::_mutex");
- // Bucket state
- mutable Mutex _statesMutex =
- MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketCatalog::_statesMutex");
+ // Bucket state for synchronization with direct writes, protected by '_mutex'
stdx::unordered_map<OID, BucketState, OID::Hasher> _bucketStates;
- // This mutex protects access to _idleBuckets
- mutable Mutex _idleMutex =
- MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "BucketCatalog::_idleMutex");
-
- // Buckets that do not have any writers.
- IdleList _idleBuckets;
-
- /**
- * This mutex protects access to the _executionStats map. Once you complete your lookup, you
- * can keep the shared_ptr to an individual namespace's stats object and release the lock. The
- * object itself is thread-safe (using atomics).
- */
- mutable StripedMutex _statsMutex;
-
- // Per-namespace execution stats.
+ // Per-namespace execution stats. This map is protected by '_mutex'. Once you complete your
+ // lookup, you can keep the shared_ptr to an individual namespace's stats object and release the
+ // lock. The object itself is thread-safe (using atomics).
stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats;
- // A placeholder to be returned in case a namespace has no allocated statistics object
- static const std::shared_ptr<ExecutionStats> kEmptyStats;
-
- // Counter for buckets created by the bucket catalog.
- uint64_t _bucketNum = 0;
-
// Approximate memory usage of the bucket catalog.
AtomicWord<uint64_t> _memoryUsage;
+
+ class ServerStatus;
};
} // namespace mongo
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index cb1028be245..e83f242df8f 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -235,7 +235,7 @@ void BucketCatalogTest::_testMeasurementSchema(
}
TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
- // The first insert should be able to take commit rights, but batch is still active
+ // The first insert should be able to take commit rights
auto result1 =
_bucketCatalog->insert(_opCtx,
_ns1,
@@ -245,7 +245,6 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto batch1 = result1.getValue().batch;
ASSERT(batch1->claimCommitRights());
- ASSERT(batch1->active());
// A subsequent insert into the same bucket should land in the same batch, but not be able to
// claim commit rights
@@ -265,9 +264,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
_bucketCatalog->prepareCommit(batch1);
- // Still not finished, but no longer active.
+ // Still not finished.
ASSERT(!batch1->finished());
- ASSERT(!batch1->active());
// The batch should contain both documents since they belong in the same bucket and happened
// in the same commit epoch. Nothing else has been committed in this bucket yet.
@@ -292,9 +290,9 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) {
.getValue()
.batch;
ASSERT(batch->claimCommitRights());
- auto bucketId = batch->bucketId();
+ auto bucket = batch->bucket();
_bucketCatalog->abort(batch);
- ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucketId));
+ ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket));
}
TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) {
@@ -327,10 +325,10 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) {
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucketId()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()),
- _bucketCatalog->getMetadata(result2.getValue().batch->bucketId()));
- ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucketId()).isEmpty());
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
+ ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucket()).isEmpty());
// Committing one bucket should only return the one document in that bucket and should not
// affect the other bucket.
@@ -360,9 +358,9 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) {
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucketId()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
- _bucketCatalog->getMetadata(result2.getValue().batch->bucketId()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
@@ -391,11 +389,11 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
ASSERT_BSONOBJ_EQ(
BSON(_metaField << BSONObj(BSON(
"c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucketId()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(
BSON(_metaField << BSONObj(BSON(
"c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))),
- _bucketCatalog->getMetadata(result2.getValue().batch->bucketId()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
@@ -427,11 +425,11 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) {
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucketId()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
- _bucketCatalog->getMetadata(result2.getValue().batch->bucketId()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
@@ -455,8 +453,8 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucketId()));
- ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucketId()).isEmpty());
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
+ ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucket()).isEmpty());
// Committing one bucket should only return the one document in that bucket and should not
// affect the other bucket.
@@ -537,18 +535,10 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto& batch = result.getValue().batch;
_bucketCatalog->prepareCommit(batch);
-}
-DEATH_TEST_F(BucketCatalogTest, CannotFinishUnpreparedBatch, "invariant") {
- auto result = _bucketCatalog->insert(_opCtx,
- _ns1,
- _getCollator(_ns1),
- _getTimeseriesOptions(_ns1),
- BSON(_timeField << Date_t::now()),
- BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch = result.getValue().batch;
- ASSERT(batch->claimCommitRights());
- _bucketCatalog->finish(batch, {});
+ // BucketCatalog::prepareCommit uses dassert, so it will only invariant in debug mode. Ensure we
+ // die here in non-debug mode as well.
+ invariant(kDebugBuild);
}
TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) {
@@ -562,7 +552,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) {
.getValue()
.batch;
- ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucketId()));
+ ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucket()));
_commit(batch, 0);
}
@@ -577,7 +567,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
ASSERT(result.isOK());
auto batch = result.getValue().batch;
- auto oldId = batch->bucketId();
+ auto oldId = batch->bucket().id;
_commit(batch, 0);
ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
ASSERT(batch->newFieldNamesToBeInserted().count(_timeField)) << batch->toBSON();
@@ -633,7 +623,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto& batch2 = result2.getValue().batch;
- ASSERT_NE(oldId, batch2->bucketId());
+ ASSERT_NE(oldId, batch2->bucket().id);
_commit(batch2, 0);
ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON();
ASSERT(batch2->newFieldNamesToBeInserted().count(_timeField)) << batch2->toBSON();
@@ -738,7 +728,7 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) {
ASSERT_EQ(batch->measurements().size(), 1);
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
- ASSERT_THROWS(_bucketCatalog->clear(batch->bucketId()), WriteConflictException);
+ ASSERT_THROWS(_bucketCatalog->clear(batch->bucket().id), WriteConflictException);
_bucketCatalog->abort(batch);
ASSERT(batch->finished());
@@ -771,10 +761,10 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
.getValue()
.batch;
ASSERT_NE(batch1, batch2);
- ASSERT_EQ(batch1->bucketId(), batch2->bucketId());
+ ASSERT_EQ(batch1->bucket().id, batch2->bucket().id);
// Now clear the bucket. Since there's a prepared batch it should conflict.
- ASSERT_THROWS(_bucketCatalog->clear(batch1->bucketId()), WriteConflictException);
+ ASSERT_THROWS(_bucketCatalog->clear(batch1->bucket().id), WriteConflictException);
// Now try to prepare the second batch. Ensure it aborts the batch.
ASSERT(batch2->claimCommitRights());
@@ -783,7 +773,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
// Make sure we didn't clear the bucket state when we aborted the second batch.
- ASSERT_THROWS(_bucketCatalog->clear(batch1->bucketId()), WriteConflictException);
+ ASSERT_THROWS(_bucketCatalog->clear(batch1->bucket().id), WriteConflictException);
// Make sure a subsequent insert, which opens a new bucket, doesn't corrupt the old bucket
// state and prevent us from finishing the first batch.
@@ -798,7 +788,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
.batch;
ASSERT_NE(batch1, batch3);
ASSERT_NE(batch2, batch3);
- ASSERT_NE(batch1->bucketId(), batch3->bucketId());
+ ASSERT_NE(batch1->bucket().id, batch3->bucket().id);
// Clean up this batch
ASSERT(batch3->claimCommitRights());
_bucketCatalog->abort(batch3);