diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 1411 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 631 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 62 |
4 files changed, 860 insertions, 1268 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 7c1ef7c0143..6eb1009bc86 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -206,9 +206,9 @@ write_ops::UpdateOpEntry makeTimeseriesUpdateOpEntry( options.mustCheckExistenceForInsertOperations = static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx)); write_ops::UpdateModification u(updateBuilder.obj(), options); - write_ops::UpdateOpEntry update(BSON("_id" << batch->bucketId()), std::move(u)); - invariant(!update.getMulti(), batch->bucketId().toString()); - invariant(!update.getUpsert(), batch->bucketId().toString()); + write_ops::UpdateOpEntry update(BSON("_id" << batch->bucket().id), std::move(u)); + invariant(!update.getMulti(), batch->bucket().id.toString()); + invariant(!update.getUpsert(), batch->bucket().id.toString()); return update; } @@ -249,7 +249,7 @@ BSONObj makeTimeseriesInsertDocument(std::shared_ptr<BucketCatalog::WriteBatch> } BSONObjBuilder builder; - builder.append("_id", batch->bucketId()); + builder.append("_id", batch->bucket().id); { BSONObjBuilder bucketControlBuilder(builder.subobjStart("control")); bucketControlBuilder.append(kBucketControlVersionFieldName, @@ -778,7 +778,7 @@ public: std::vector<size_t>* docsToRetry) const { auto& bucketCatalog = BucketCatalog::get(opCtx); - auto metadata = bucketCatalog.getMetadata(batch->bucketId()); + auto metadata = bucketCatalog.getMetadata(batch->bucket()); bool prepared = bucketCatalog.prepareCommit(batch); if (!prepared) { invariant(batch->finished()); @@ -790,7 +790,7 @@ public: hangTimeseriesInsertBeforeWrite.pauseWhileSet(); - const auto docId = batch->bucketId(); + const auto docId = batch->bucket().id; const bool performInsert = batch->numPreviouslyCommittedMeasurements() == 0; if (performInsert) { const auto output = @@ -866,7 +866,7 @@ public: // Sort by bucket so that preparing the commit for each batch cannot deadlock. std::sort(batchesToCommit.begin(), batchesToCommit.end(), [](auto left, auto right) { - return left.get()->bucketId() < right.get()->bucketId(); + return left.get()->bucket().id < right.get()->bucket().id; }); boost::optional<Status> abortStatus; @@ -882,17 +882,17 @@ public: std::vector<write_ops::UpdateCommandRequest> updateOps; for (auto batch : batchesToCommit) { - auto metadata = bucketCatalog.getMetadata(batch.get()->bucketId()); + auto metadata = bucketCatalog.getMetadata(batch.get()->bucket()); if (!bucketCatalog.prepareCommit(batch)) { return false; } if (batch.get()->numPreviouslyCommittedMeasurements() == 0) { insertOps.push_back(_makeTimeseriesInsertOp( - batch, metadata, std::move(stmtIds[batch.get()->bucketId()]))); + batch, metadata, std::move(stmtIds[batch.get()->bucket().id]))); } else { updateOps.push_back(_makeTimeseriesUpdateOp( - opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucketId()]))); + opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucket().id]))); } } @@ -994,7 +994,7 @@ public: const auto& batch = result.getValue().batch; batches.emplace_back(batch, index); if (isTimeseriesWriteRetryable(opCtx)) { - stmtIds[batch->bucketId()].push_back(stmtId); + stmtIds[batch->bucket().id].push_back(stmtId); } } @@ -1195,7 +1195,7 @@ public: auto& [batch, index] = batches[itr]; if (batch->claimCommitRights()) { auto stmtIds = isTimeseriesWriteRetryable(opCtx) - ? std::move(bucketStmtIds[batch->bucketId()]) + ? std::move(bucketStmtIds[batch->bucket().id]) : std::vector<StmtId>{}; canContinue = _commitTimeseriesBucket(opCtx, diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 9b06cdbdc33..9497c269d96 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -195,8 +195,350 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti } } // namespace -const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::kEmptyStats{ - std::make_shared<BucketCatalog::ExecutionStats>()}; +struct BucketCatalog::ExecutionStats { + AtomicWord<long long> numBucketInserts; + AtomicWord<long long> numBucketUpdates; + AtomicWord<long long> numBucketsOpenedDueToMetadata; + AtomicWord<long long> numBucketsClosedDueToCount; + AtomicWord<long long> numBucketsClosedDueToSchemaChange; + AtomicWord<long long> numBucketsClosedDueToSize; + AtomicWord<long long> numBucketsClosedDueToTimeForward; + AtomicWord<long long> numBucketsClosedDueToTimeBackward; + AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; + AtomicWord<long long> numCommits; + AtomicWord<long long> numWaits; + AtomicWord<long long> numMeasurementsCommitted; +}; + +class BucketCatalog::Bucket { +public: + friend class BucketCatalog; + + Bucket(const OID& id, StripeNumber stripe) : _id(id), _stripe(stripe) {} + + /** + * Returns the ID for the underlying bucket. + */ + const OID& id() const { + return _id; + } + + /** + * Returns the number of the stripe that owns the bucket + */ + StripeNumber stripe() const { + return _stripe; + } + + // Returns the time associated with the bucket (id) + Date_t getTime() const { + return _id.asDateT(); + } + + /** + * Returns the timefield for the underlying bucket. + */ + StringData getTimeField() { + return _timeField; + } + + /** + * Returns whether all measurements have been committed. + */ + bool allCommitted() const { + return _batches.empty() && !_preparedBatch; + } + + /** + * Returns total number of measurements in the bucket. + */ + uint32_t numMeasurements() const { + return _numMeasurements; + } + + /** + * Determines if the schema for an incoming measurement is incompatible with those already + * stored in the bucket. + * + * Returns true if incompatible + */ + bool schemaIncompatible(const BSONObj& input, + boost::optional<StringData> metaField, + const StringData::ComparatorInterface* comparator) { + // (Generic FCV reference): TODO (SERVER-60912): Update once kLastLTS is 6.0 + if (serverGlobalParams.featureCompatibility.getVersion() == + multiversion::GenericFCV::kLastLTS) { + return false; + } + + auto result = _schema.update(input, metaField, comparator); + return (result == timeseries::Schema::UpdateStatus::Failed); + } + +private: + /** + * Determines the effect of adding 'doc' to this bucket. If adding 'doc' causes this bucket + * to overflow, we will create a new bucket and recalculate the change to the bucket size + * and data fields. + */ + void _calculateBucketFieldsAndSizeChange(const BSONObj& doc, + boost::optional<StringData> metaField, + NewFieldNames* newFieldNamesToBeInserted, + uint32_t* newFieldNamesSize, + uint32_t* sizeToBeAdded) const { + // BSON size for an object with an empty object field where field name is empty string. + // We can use this as an offset to know the size when we have real field names. + static constexpr int emptyObjSize = 12; + // Validate in debug builds that this size is correct + dassert(emptyObjSize == BSON("" << BSONObj()).objsize()); + + newFieldNamesToBeInserted->clear(); + *newFieldNamesSize = 0; + *sizeToBeAdded = 0; + auto numMeasurementsFieldLength = numDigits(_numMeasurements); + for (const auto& elem : doc) { + auto fieldName = elem.fieldNameStringData(); + if (fieldName == metaField) { + // Ignore the metadata field since it will not be inserted. + continue; + } + + // If the field name is new, add the size of an empty object with that field name. + auto hashedKey = StringSet::hasher().hashed_key(fieldName); + if (!_fieldNames.contains(hashedKey)) { + newFieldNamesToBeInserted->push_back(hashedKey); + *newFieldNamesSize += elem.fieldNameSize(); + *sizeToBeAdded += emptyObjSize + fieldName.size(); + } + + // Add the element size, taking into account that the name will be changed to its + // positional number. Add 1 to the calculation since the element's field name size + // accounts for a null terminator whereas the stringified position does not. + *sizeToBeAdded += elem.size() - elem.fieldNameSize() + numMeasurementsFieldLength + 1; + } + } + + /** + * Returns whether BucketCatalog::commit has been called at least once on this bucket. + */ + bool _hasBeenCommitted() const { + return _numCommittedMeasurements != 0 || _preparedBatch; + } + + /** + * Return a pointer to the current, open batch. + */ + std::shared_ptr<WriteBatch> _activeBatch(OperationId opId, + const std::shared_ptr<ExecutionStats>& stats) { + auto it = _batches.find(opId); + if (it == _batches.end()) { + it = + _batches + .try_emplace( + opId, std::make_shared<WriteBatch>(BucketHandle{_id, _stripe}, opId, stats)) + .first; + } + return it->second; + } + + // The bucket ID for the underlying document + const OID _id; + + // The stripe which owns this bucket. + const StripeNumber _stripe; + + // The namespace that this bucket is used for. + NamespaceString _ns; + + // The metadata of the data that this bucket contains. + BucketMetadata _metadata; + + // Extra metadata combinations that are supported without normalizing the metadata object. + static constexpr std::size_t kNumFieldOrderCombinationsWithoutNormalizing = 1; + boost::container::static_vector<BSONObj, kNumFieldOrderCombinationsWithoutNormalizing> + _nonNormalizedKeyMetadatas; + + // Top-level field names of the measurements that have been inserted into the bucket. + StringSet _fieldNames; + + // Time field for the measurements that have been inserted into the bucket. + std::string _timeField; + + // The minimum and maximum values for each field in the bucket. + timeseries::MinMax _minmax; + + // The reference schema for measurements in this bucket. May reflect schema of uncommitted + // measurements. + timeseries::Schema _schema; + + // The latest time that has been inserted into the bucket. + Date_t _latestTime; + + // The total size in bytes of the bucket's BSON serialization, including measurements to be + // inserted. + uint64_t _size = 0; + + // The total number of measurements in the bucket, including uncommitted measurements and + // measurements to be inserted. + uint32_t _numMeasurements = 0; + + // The number of committed measurements in the bucket. + uint32_t _numCommittedMeasurements = 0; + + // Whether the bucket is full. This can be due to number of measurements, size, or time + // range. + bool _full = false; + + // The batch that has been prepared and is currently in the process of being committed, if + // any. + std::shared_ptr<WriteBatch> _preparedBatch; + + // Batches, per operation, that haven't been committed or aborted yet. + stdx::unordered_map<OperationId, std::shared_ptr<WriteBatch>> _batches; + + // If the bucket is in idleBuckets, then its position is recorded here. + boost::optional<Stripe::IdleList::iterator> _idleListEntry = boost::none; + + // Approximate memory usage of this bucket. + uint64_t _memoryUsage = sizeof(*this); +}; + +/** + * Bundle of information that 'insert' needs to pass down to helper methods that may create a new + * bucket. + */ +struct BucketCatalog::CreationInfo { + const BucketKey& key; + StripeNumber stripe; + const Date_t& time; + const TimeseriesOptions& options; + ExecutionStats* stats; + ClosedBuckets* closedBuckets; + bool openedDuetoMetadata = true; +}; + +BucketCatalog::WriteBatch::WriteBatch(const BucketHandle& bucket, + OperationId opId, + const std::shared_ptr<ExecutionStats>& stats) + : _bucket{bucket}, _opId(opId), _stats{stats} {} + +bool BucketCatalog::WriteBatch::claimCommitRights() { + return !_commitRights.swap(true); +} + +StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() const { + if (!_promise.getFuture().isReady()) { + _stats->numWaits.fetchAndAddRelaxed(1); + } + return _promise.getFuture().getNoThrow(); +} + +const BucketCatalog::BucketHandle& BucketCatalog::WriteBatch::bucket() const { + return _bucket; +} + +const std::vector<BSONObj>& BucketCatalog::WriteBatch::measurements() const { + return _measurements; +} + +const BSONObj& BucketCatalog::WriteBatch::min() const { + return _min; +} + +const BSONObj& BucketCatalog::WriteBatch::max() const { + return _max; +} + +const StringMap<std::size_t>& BucketCatalog::WriteBatch::newFieldNamesToBeInserted() const { + return _newFieldNamesToBeInserted; +} + +uint32_t BucketCatalog::WriteBatch::numPreviouslyCommittedMeasurements() const { + return _numPreviouslyCommittedMeasurements; +} + +bool BucketCatalog::WriteBatch::finished() const { + return _promise.getFuture().isReady(); +} + +BSONObj BucketCatalog::WriteBatch::toBSON() const { + auto toFieldName = [](const auto& nameHashPair) { return nameHashPair.first; }; + return BSON("docs" << _measurements << "bucketMin" << _min << "bucketMax" << _max + << "numCommittedMeasurements" << int(_numPreviouslyCommittedMeasurements) + << "newFieldNamesToBeInserted" + << std::set<std::string>( + boost::make_transform_iterator(_newFieldNamesToBeInserted.begin(), + toFieldName), + boost::make_transform_iterator(_newFieldNamesToBeInserted.end(), + toFieldName))); +} + +void BucketCatalog::WriteBatch::_addMeasurement(const BSONObj& doc) { + _measurements.push_back(doc); +} + +void BucketCatalog::WriteBatch::_recordNewFields(NewFieldNames&& fields) { + for (auto&& field : fields) { + _newFieldNamesToBeInserted[field] = field.hash(); + } +} + +void BucketCatalog::WriteBatch::_prepareCommit(Bucket* bucket) { + invariant(_commitRights.load()); + _numPreviouslyCommittedMeasurements = bucket->_numCommittedMeasurements; + + // Filter out field names that were new at the time of insertion, but have since been committed + // by someone else. + for (auto it = _newFieldNamesToBeInserted.begin(); it != _newFieldNamesToBeInserted.end();) { + StringMapHashedKey fieldName(it->first, it->second); + if (bucket->_fieldNames.contains(fieldName)) { + _newFieldNamesToBeInserted.erase(it++); + continue; + } + + bucket->_fieldNames.emplace(fieldName); + ++it; + } + + for (const auto& doc : _measurements) { + bucket->_minmax.update( + doc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator()); + } + + const bool isUpdate = _numPreviouslyCommittedMeasurements > 0; + if (isUpdate) { + _min = bucket->_minmax.minUpdates(); + _max = bucket->_minmax.maxUpdates(); + } else { + _min = bucket->_minmax.min(); + _max = bucket->_minmax.max(); + + // Approximate minmax memory usage by taking sizes of initial commit. Subsequent updates may + // add fields but are most likely just to update values. + bucket->_memoryUsage += _min.objsize(); + bucket->_memoryUsage += _max.objsize(); + } +} + +void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) { + invariant(_commitRights.load()); + _promise.emplaceValue(info); +} + +void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status, + const Bucket* bucket) { + if (finished()) { + return; + } + + std::string nsIdentification; + if (bucket) { + nsIdentification.append(str::stream() << " for namespace " << bucket->_ns); + } + _promise.setError(status.value_or(Status{ErrorCodes::TimeseriesBucketCleared, + str::stream() << "Time-series bucket " << _bucket.id + << nsIdentification << " was cleared"})); +} BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) { return getBucketCatalog(svcCtx); @@ -206,8 +548,11 @@ BucketCatalog& BucketCatalog::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); } -BSONObj BucketCatalog::getMetadata(const OID& bucketId) const { - BucketAccess bucket{const_cast<BucketCatalog*>(this), bucketId}; +BSONObj BucketCatalog::getMetadata(const BucketHandle& handle) const { + auto const& stripe = _stripes[handle.stripe]; + stdx::lock_guard stripeLock{stripe.mutex}; + + const Bucket* bucket = _findBucket(stripe, stripeLock, handle.id); if (!bucket) { return {}; } @@ -223,27 +568,35 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( const BSONObj& doc, CombineWithInsertsFromOtherClients combine) { - BSONElement metadata; - auto metaFieldName = options.getMetaField(); - if (metaFieldName) { - metadata = doc[*metaFieldName]; - } - auto key = BucketKey{ns, BucketMetadata{metadata, comparator}}; - - auto stats = _getExecutionStats(ns); - invariant(stats); - auto timeElem = doc[options.getTimeField()]; if (!timeElem || BSONType::Date != timeElem.type()) { return {ErrorCodes::BadValue, str::stream() << "'" << options.getTimeField() << "' must be present and contain a " << "valid BSON UTC datetime value"}; } - auto time = timeElem.Date(); + auto stats = _getExecutionStats(ns); + invariant(stats); + + BSONElement metadata; + auto metaFieldName = options.getMetaField(); + if (metaFieldName) { + metadata = doc[*metaFieldName]; + } + + // Buckets are spread across independently-lockable stripes to improve parallelism. We map a + // bucket to a stripe by hashing the BucketKey. + auto key = BucketKey{ns, BucketMetadata{metadata, comparator}}; + auto stripeNumber = _getStripeNumber(key); + ClosedBuckets closedBuckets; - BucketAccess bucket{this, key, options, stats.get(), &closedBuckets, time}; + CreationInfo info{key, stripeNumber, time, options, stats.get(), &closedBuckets}; + + auto& stripe = _stripes[stripeNumber]; + stdx::lock_guard stripeLock{stripe.mutex}; + + Bucket* bucket = _useOrCreateBucket(&stripe, stripeLock, info); invariant(bucket); NewFieldNames newFieldNamesToBeInserted; @@ -255,21 +608,20 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( &newFieldNamesSize, &sizeToBeAdded); - auto shouldCloseBucket = [&](BucketAccess* bucket) -> bool { + auto shouldCloseBucket = [&](Bucket* bucket) -> bool { if (bucket->schemaIncompatible(doc, metaFieldName, comparator)) { stats->numBucketsClosedDueToSchemaChange.fetchAndAddRelaxed(1); return true; } - if ((*bucket)->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { + if (bucket->_numMeasurements == static_cast<std::uint64_t>(gTimeseriesBucketMaxCount)) { stats->numBucketsClosedDueToCount.fetchAndAddRelaxed(1); return true; } - if ((*bucket)->_size + sizeToBeAdded > - static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { + if (bucket->_size + sizeToBeAdded > static_cast<std::uint64_t>(gTimeseriesBucketMaxSize)) { stats->numBucketsClosedDueToSize.fetchAndAddRelaxed(1); return true; } - auto bucketTime = (*bucket).getTime(); + auto bucketTime = bucket->getTime(); if (time - bucketTime >= Seconds(*options.getBucketMaxSpanSeconds())) { stats->numBucketsClosedDueToTimeForward.fetchAndAddRelaxed(1); return true; @@ -281,8 +633,9 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( return false; }; - if (!bucket->_ns.isEmpty() && shouldCloseBucket(&bucket)) { - bucket.rollover(shouldCloseBucket, &closedBuckets); + if (!bucket->_ns.isEmpty() && shouldCloseBucket(bucket)) { + info.openedDuetoMetadata = false; + bucket = _rollover(&stripe, stripeLock, bucket, info); bucket->_calculateBucketFieldsAndSizeChange(doc, options.getMetaField(), @@ -303,15 +656,16 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( if (bucket->_ns.isEmpty()) { // The namespace and metadata only need to be set if this bucket was newly created. bucket->_ns = ns; - key.metadata.normalize(); bucket->_metadata = key.metadata; - // The namespace is stored two times: the bucket itself and _openBuckets. - // The metadata is stored two times, normalized and un-normalized. A unique pointer to the - // bucket is stored once: _allBuckets. A raw pointer to the bucket is stored at most twice: - // _openBuckets, _idleBuckets. - bucket->_memoryUsage += (ns.size() * 2) + (bucket->_metadata.toBSON().objsize() * 2) + - sizeof(Bucket) + sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2); + // The namespace is stored two times: the bucket itself and openBuckets. + // We don't have a great approximation for the + // _schema size, so we use initial document size minus metadata as an approximation. Since + // the metadata itself is stored once, in the bucket, we can combine the two and just use + // the initial document size. A unique pointer to the bucket is stored once: allBuckets. A + // raw pointer to the bucket is stored at most twice: openBuckets, idleBuckets. + bucket->_memoryUsage += (ns.size() * 2) + doc.objsize() + sizeof(Bucket) + + sizeof(std::unique_ptr<Bucket>) + (sizeof(Bucket*) * 2); bucket->_schema.update(doc, options.getMetaField(), comparator); } else { @@ -328,14 +682,18 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { return false; } - _waitToCommitBatch(batch); + auto& stripe = _stripes[batch->bucket().stripe]; + _waitToCommitBatch(&stripe, batch); + + stdx::lock_guard stripeLock{stripe.mutex}; + Bucket* bucket = + _useBucketInState(&stripe, stripeLock, batch->bucket().id, BucketState::kPrepared); - BucketAccess bucket(this, batch->bucketId(), BucketState::kPrepared); if (batch->finished()) { // Someone may have aborted it while we were waiting. return false; } else if (!bucket) { - abort(batch); + _abort(&stripe, stripeLock, batch, boost::none); return false; } @@ -349,13 +707,16 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( std::shared_ptr<WriteBatch> batch, const CommitInfo& info) { invariant(!batch->finished()); - invariant(!batch->active()); boost::optional<ClosedBucket> closedBucket; batch->_finish(info); - BucketAccess bucket(this, batch->bucketId(), BucketState::kNormal); + auto& stripe = _stripes[batch->bucket().stripe]; + stdx::lock_guard stripeLock{stripe.mutex}; + + Bucket* bucket = + _useBucketInState(&stripe, stripeLock, batch->bucket().id, BucketState::kNormal); if (bucket) { bucket->_preparedBatch.reset(); } @@ -377,13 +738,11 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( // It's possible that we cleared the bucket in between preparing the commit and finishing // here. In this case, we should abort any other ongoing batches and clear the bucket from // the catalog so it's not hanging around idle. - auto lk = _lockExclusive(); - auto it = _allBuckets.find(batch->bucketId()); - if (it != _allBuckets.end()) { - auto bucket = it->second.get(); - stdx::unique_lock blk{bucket->_mutex}; + auto it = stripe.allBuckets.find(batch->bucket().id); + if (it != stripe.allBuckets.end()) { + bucket = it->second.get(); bucket->_preparedBatch.reset(); - _abort(blk, bucket, nullptr, boost::none); + _abort(&stripe, stripeLock, bucket, nullptr, boost::none); } } else if (bucket->allCommitted()) { if (bucket->_full) { @@ -391,30 +750,24 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( // bucket is full. Thus, we can remove it. _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); - bucket.release(); - auto lk = _lockExclusive(); + auto it = stripe.allBuckets.find(batch->bucket().id); + if (it != stripe.allBuckets.end()) { + bucket = it->second.get(); - auto it = _allBuckets.find(batch->bucketId()); - if (it != _allBuckets.end()) { - Bucket* ptr = it->second.get(); - _verifyBucketIsUnused(ptr); + closedBucket = ClosedBucket{batch->bucket().id, + bucket->getTimeField().toString(), + bucket->numMeasurements()}; - closedBucket = ClosedBucket{ - batch->bucketId(), ptr->getTimeField().toString(), ptr->numMeasurements()}; - - // Only remove from _allBuckets and _idleBuckets. If it was marked full, we know - // that happened in BucketAccess::rollover, and that there is already a new open + // Only remove from allBuckets and idleBuckets. If it was marked full, we know + // that happened in Stripe::rollover, and that there is already a new open // bucket for this metadata. - _markBucketNotIdle(ptr, false /* locked */); - { - stdx::lock_guard statesLk{_statesMutex}; - _bucketStates.erase(batch->bucketId()); - } + _markBucketNotIdle(&stripe, stripeLock, bucket); + _eraseBucketState(batch->bucket().id); - _allBuckets.erase(batch->bucketId()); + stripe.allBuckets.erase(batch->bucket().id); } } else { - _markBucketIdle(bucket); + _markBucketIdle(&stripe, stripeLock, bucket); } } return closedBucket; @@ -429,18 +782,10 @@ void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, return; } - // Before we access the bucket, make sure it's still there. - auto lk = _lockExclusive(); - auto it = _allBuckets.find(batch->bucketId()); - if (it == _allBuckets.end()) { - // Special case, bucket has already been cleared, and we need only abort this batch. - batch->_abort(status, nullptr); - return; - } + auto& stripe = _stripes[batch->bucket().stripe]; + stdx::lock_guard stripeLock{stripe.mutex}; - Bucket* bucket = it->second.get(); - stdx::unique_lock blk{bucket->_mutex}; - _abort(blk, bucket, batch, status); + _abort(&stripe, stripeLock, batch, status); } void BucketCatalog::clear(const OID& oid) { @@ -452,20 +797,22 @@ void BucketCatalog::clear(const OID& oid) { } void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& shouldClear) { - auto lk = _lockExclusive(); - auto statsLk = _statsMutex.lockExclusive(); + for (auto& stripe : _stripes) { + stdx::lock_guard stripeLock{stripe.mutex}; + for (auto it = stripe.allBuckets.begin(); it != stripe.allBuckets.end();) { + auto nextIt = std::next(it); - for (auto it = _allBuckets.begin(); it != _allBuckets.end();) { - auto nextIt = std::next(it); + const auto& bucket = it->second; + if (shouldClear(bucket->_ns)) { + { + stdx::lock_guard catalogLock{_mutex}; + _executionStats.erase(bucket->_ns); + } + _abort(&stripe, stripeLock, bucket.get(), nullptr, boost::none); + } - const auto& bucket = it->second; - stdx::unique_lock blk{bucket->_mutex}; - if (shouldClear(bucket->_ns)) { - _executionStats.erase(bucket->_ns); - _abort(blk, bucket.get(), nullptr, boost::none); + it = nextIt; } - - it = nextIt; } } @@ -504,54 +851,137 @@ void BucketCatalog::appendExecutionStats(const NamespaceString& ns, BSONObjBuild } } -BucketCatalog::StripedMutex::ExclusiveLock::ExclusiveLock(const StripedMutex& sm) { - invariant(sm._mutexes.size() == _locks.size()); - for (std::size_t i = 0; i < sm._mutexes.size(); ++i) { - _locks[i] = stdx::unique_lock<Mutex>(sm._mutexes[i]); +BucketCatalog::BucketMetadata::BucketMetadata(BSONElement elem, + const StringData::ComparatorInterface* comparator) + : _metadataElement(elem), _comparator(comparator) { + if (_metadataElement) { + BSONObjBuilder objBuilder; + // We will get an object of equal size, just with reordered fields. + objBuilder.bb().reserveBytes(_metadataElement.size()); + normalizeTopLevel(&objBuilder, _metadataElement); + _metadata = objBuilder.obj(); } + // Updates the BSONElement to refer to the copied BSONObj. + _metadataElement = _metadata.firstElement(); } -BucketCatalog::StripedMutex::SharedLock BucketCatalog::StripedMutex::lockShared() const { - static const std::hash<stdx::thread::id> hasher; - return SharedLock{_mutexes[hasher(stdx::this_thread::get_id()) % kNumStripes]}; +bool BucketCatalog::BucketMetadata::operator==(const BucketMetadata& other) const { + return _metadataElement.binaryEqualValues(other._metadataElement); } -BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::StripedMutex::lockExclusive() const { - return ExclusiveLock{*this}; +const BSONObj& BucketCatalog::BucketMetadata::toBSON() const { + return _metadata; } -BucketCatalog::StripedMutex::SharedLock BucketCatalog::_lockShared() const { - return _bucketMutex.lockShared(); +StringData BucketCatalog::BucketMetadata::getMetaField() const { + return StringData(_metadataElement.fieldName()); } -BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::_lockExclusive() const { - return _bucketMutex.lockExclusive(); +const StringData::ComparatorInterface* BucketCatalog::BucketMetadata::getComparator() const { + return _comparator; } -void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) { - while (true) { - BucketAccess bucket{this, batch->bucketId()}; - if (!bucket || batch->finished()) { - return; +BucketCatalog::BucketKey::BucketKey(const NamespaceString& n, const BucketMetadata& m) + : ns(n), metadata(m), hash(absl::Hash<BucketKey>{}(*this)) {} + +std::size_t BucketCatalog::BucketHasher::operator()(const BucketKey& key) const { + // Use the default absl hasher. + return key.hash; +} + +BucketCatalog::StripeNumber BucketCatalog::_getStripeNumber(const BucketKey& key) { + return key.hash % kNumberOfStripes; +} + +const BucketCatalog::Bucket* BucketCatalog::_findBucket(const Stripe& stripe, + WithLock, + const OID& id, + ReturnClearedBuckets mode) const { + auto it = stripe.allBuckets.find(id); + if (it != stripe.allBuckets.end()) { + if (mode == ReturnClearedBuckets::kYes) { + return it->second.get(); } - auto current = bucket->_preparedBatch; - if (!current) { - // No other batches for this bucket are currently committing, so we can proceed. - bucket->_preparedBatch = batch; - bucket->_batches.erase(batch->_opId); - break; + auto state = _getBucketState(id); + if (state && state != BucketState::kCleared && state != BucketState::kPreparedAndCleared) { + return it->second.get(); + } + } + return nullptr; +} + +BucketCatalog::Bucket* BucketCatalog::_useBucket(Stripe* stripe, + WithLock stripeLock, + const OID& id, + ReturnClearedBuckets mode) { + return const_cast<Bucket*>(_findBucket(*stripe, stripeLock, id, mode)); +} + +BucketCatalog::Bucket* BucketCatalog::_useBucketInState(Stripe* stripe, + WithLock, + const OID& id, + BucketState targetState) { + auto it = stripe->allBuckets.find(id); + if (it != stripe->allBuckets.end()) { + auto state = _setBucketState(it->second->_id, targetState); + if (state && state != BucketState::kCleared && state != BucketState::kPreparedAndCleared) { + return it->second.get(); + } + } + return nullptr; +} + +BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info) { + auto it = stripe->openBuckets.find(info.key); + if (it == stripe->openBuckets.end()) { + // No open bucket for this metadata. + return _allocateBucket(stripe, stripeLock, info); + } + + Bucket* bucket = it->second; + + auto state = _getBucketState(bucket->id()); + if (state == BucketState::kNormal || state == BucketState::kPrepared) { + _markBucketNotIdle(stripe, stripeLock, bucket); + return bucket; + } + + _abort(stripe, stripeLock, bucket, nullptr, boost::none); + return _allocateBucket(stripe, stripeLock, info); +} + +void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch) { + while (true) { + std::shared_ptr<WriteBatch> current; + + { + stdx::lock_guard stripeLock{stripe->mutex}; + Bucket* bucket = + _useBucket(stripe, stripeLock, batch->bucket().id, ReturnClearedBuckets::kNo); + if (!bucket || batch->finished()) { + return; + } + + current = bucket->_preparedBatch; + if (!current) { + // No other batches for this bucket are currently committing, so we can proceed. + bucket->_preparedBatch = batch; + bucket->_batches.erase(batch->_opId); + return; + } } // We have to wait for someone else to finish. - bucket.release(); current->getResult().getStatus().ignore(); // We don't care about the result. } } -bool BucketCatalog::_removeBucket(Bucket* bucket, bool expiringBuckets) { - auto it = _allBuckets.find(bucket->id()); - if (it == _allBuckets.end()) { +bool BucketCatalog::_removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { + auto it = stripe->allBuckets.find(bucket->id()); + if (it == stripe->allBuckets.end()) { return false; } @@ -559,43 +989,50 @@ bool BucketCatalog::_removeBucket(Bucket* bucket, bool expiringBuckets) { invariant(!bucket->_preparedBatch); _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); - _markBucketNotIdle(bucket, expiringBuckets /* locked */); - _removeNonNormalizedKeysForBucket(bucket); - _openBuckets.erase({bucket->_ns, bucket->_metadata}); - { - stdx::lock_guard statesLk{_statesMutex}; - _bucketStates.erase(bucket->id()); - } - _allBuckets.erase(it); + _markBucketNotIdle(stripe, stripeLock, bucket); + stripe->openBuckets.erase({bucket->_ns, bucket->_metadata}); + _eraseBucketState(bucket->id()); + + stripe->allBuckets.erase(it); return true; } -void BucketCatalog::_removeNonNormalizedKeysForBucket(Bucket* bucket) { - auto comparator = bucket->_metadata.getComparator(); - for (auto&& metadata : bucket->_nonNormalizedKeyMetadatas) { - _openBuckets.erase({bucket->_ns, {metadata.firstElement(), metadata, comparator}}); +void BucketCatalog::_abort(Stripe* stripe, + WithLock stripeLock, + std::shared_ptr<WriteBatch> batch, + const boost::optional<Status>& status) { + // Before we access the bucket, make sure it's still there. + Bucket* bucket = _useBucket(stripe, stripeLock, batch->bucket().id, ReturnClearedBuckets::kYes); + if (!bucket) { + // Special case, bucket has already been cleared, and we need only abort this batch. + batch->_abort(status, nullptr); + return; } + + // Proceed to abort any unprepared batches and remove the bucket if possible + _abort(stripe, stripeLock, bucket, batch, status); } -void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk, +void BucketCatalog::_abort(Stripe* stripe, + WithLock stripeLock, Bucket* bucket, std::shared_ptr<WriteBatch> batch, const boost::optional<Status>& status) { - // For any uncommitted batches that we need to abort, see if we already have the rights, - // otherwise try to claim the rights and abort it. If we don't get the rights, then wait - // for the other writer to resolve the batch. + // Abort any unprepared batches. This should be safe since we have a lock on the stripe, + // preventing anyone else from using these. for (const auto& [_, current] : bucket->_batches) { current->_abort(status, bucket); } bucket->_batches.clear(); bool doRemove = true; // We shouldn't remove the bucket if there's a prepared batch outstanding - // and it's not the on we manage. In that case, we don't know what the + // and it's not the one we manage. In that case, we don't know what the // user is doing with it, but we need to keep the bucket around until // that batch is finished. if (auto& prepared = bucket->_preparedBatch) { if (prepared == batch) { + // We own the prepared batch, so we can go ahead and abort it and remove the bucket. prepared->_abort(status, bucket); prepared.reset(); } else { @@ -603,63 +1040,40 @@ void BucketCatalog::_abort(stdx::unique_lock<Mutex>& lk, } } - lk.unlock(); if (doRemove) { - [[maybe_unused]] bool removed = _removeBucket(bucket, false /* expiringBuckets */); + [[maybe_unused]] bool removed = _removeBucket(stripe, stripeLock, bucket); } } -void BucketCatalog::_markBucketIdle(Bucket* bucket) { +void BucketCatalog::_markBucketIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { invariant(bucket); - stdx::lock_guard lk{_idleMutex}; - _idleBuckets.push_front(bucket); - bucket->_idleListEntry = _idleBuckets.begin(); + stripe->idleBuckets.push_front(bucket); + bucket->_idleListEntry = stripe->idleBuckets.begin(); } -void BucketCatalog::_markBucketNotIdle(Bucket* bucket, bool locked) { +void BucketCatalog::_markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket) { invariant(bucket); if (bucket->_idleListEntry) { - stdx::unique_lock<Mutex> guard; - if (!locked) { - guard = stdx::unique_lock{_idleMutex}; - } - _idleBuckets.erase(*bucket->_idleListEntry); + stripe->idleBuckets.erase(*bucket->_idleListEntry); bucket->_idleListEntry = boost::none; } } -void BucketCatalog::_verifyBucketIsUnused(Bucket* bucket) const { - // Take a lock on the bucket so we guarantee no one else is accessing it. We can release it - // right away since no one else can take it again without taking the catalog lock, which we - // also hold outside this method. - stdx::lock_guard<Mutex> lk{bucket->_mutex}; -} - -void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats, +void BucketCatalog::_expireIdleBuckets(Stripe* stripe, + WithLock stripeLock, + ExecutionStats* stats, std::vector<BucketCatalog::ClosedBucket>* closedBuckets) { - // Must hold an exclusive lock on _bucketMutex from outside. - stdx::unique_lock lk{_idleMutex}; - // As long as we still need space and have entries and remaining attempts, close idle buckets. int32_t numClosed = 0; - while (!_idleBuckets.empty() && + while (!stripe->idleBuckets.empty() && _memoryUsage.load() > static_cast<std::uint64_t>( gTimeseriesIdleBucketExpiryMemoryUsageThresholdBytes.load()) && numClosed <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { - Bucket* bucket = _idleBuckets.back(); - - lk.unlock(); - _verifyBucketIsUnused(bucket); - lk.lock(); - if (!bucket->_idleListEntry) { - // The bucket may have become non-idle between when we unlocked _idleMutex and locked - // the bucket's mutex. - continue; - } - + Bucket* bucket = stripe->idleBuckets.back(); ClosedBucket closed{ bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}; - if (_removeBucket(bucket, true /* expiringBuckets */)) { + + if (_removeBucket(stripe, stripeLock, bucket)) { stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1); closedBuckets->push_back(closed); ++numClosed; @@ -667,62 +1081,73 @@ void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats, } } -std::size_t BucketCatalog::_numberOfIdleBuckets() const { - stdx::lock_guard lk{_idleMutex}; - return _idleBuckets.size(); -} - -BucketCatalog::Bucket* BucketCatalog::_allocateBucket(const BucketKey& key, - const Date_t& time, - const TimeseriesOptions& options, - ExecutionStats* stats, - ClosedBuckets* closedBuckets, - bool openedDuetoMetadata) { - _expireIdleBuckets(stats, closedBuckets); +BucketCatalog::Bucket* BucketCatalog::_allocateBucket(Stripe* stripe, + WithLock stripeLock, + const CreationInfo& info) { + _expireIdleBuckets(stripe, stripeLock, info.stats, info.closedBuckets); - auto [bucketId, roundedTime] = generateBucketId(time, options); + auto [bucketId, roundedTime] = generateBucketId(info.time, info.options); - auto [it, inserted] = _allBuckets.try_emplace(bucketId, std::make_unique<Bucket>(bucketId)); + auto [it, inserted] = + stripe->allBuckets.try_emplace(bucketId, std::make_unique<Bucket>(bucketId, info.stripe)); tassert(6130900, "Expected bucket to be inserted", inserted); Bucket* bucket = it->second.get(); - _openBuckets[key] = bucket; - { - stdx::lock_guard statesLk{_statesMutex}; - _bucketStates.emplace(bucketId, BucketState::kNormal); - } + stripe->openBuckets[info.key] = bucket; + _initializeBucketState(bucketId); - if (openedDuetoMetadata) { - stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1); + if (info.openedDuetoMetadata) { + info.stats->numBucketsOpenedDueToMetadata.fetchAndAddRelaxed(1); } - bucket->_timeField = options.getTimeField().toString(); + bucket->_timeField = info.options.getTimeField().toString(); // Make sure we set the control.min time field to match the rounded _id timestamp. - auto controlDoc = buildControlMinTimestampDoc(options.getTimeField(), roundedTime); + auto controlDoc = buildControlMinTimestampDoc(info.options.getTimeField(), roundedTime); bucket->_minmax.update( controlDoc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator()); return bucket; } +BucketCatalog::Bucket* BucketCatalog::_rollover(Stripe* stripe, + WithLock stripeLock, + Bucket* bucket, + const CreationInfo& info) { + + if (bucket->allCommitted()) { + // The bucket does not contain any measurements that are yet to be committed, so we can + // remove it now. + info.closedBuckets->push_back(ClosedBucket{ + bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}); + + bool removed = _removeBucket(stripe, stripeLock, bucket); + invariant(removed); + } else { + // We must keep the bucket around until it is committed, just mark it full so it we know to + // clean it up when the last batch finishes. + bucket->_full = true; + } + + return _allocateBucket(stripe, stripeLock, info); +} + std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats( const NamespaceString& ns) { - { - auto lock = _statsMutex.lockShared(); - auto it = _executionStats.find(ns); - if (it != _executionStats.end()) { - return it->second; - } + stdx::lock_guard catalogLock{_mutex}; + auto it = _executionStats.find(ns); + if (it != _executionStats.end()) { + return it->second; } - auto lock = _statsMutex.lockExclusive(); auto res = _executionStats.emplace(ns, std::make_shared<ExecutionStats>()); return res.first->second; } const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutionStats( const NamespaceString& ns) const { - auto lock = _statsMutex.lockShared(); + static const auto kEmptyStats{std::make_shared<ExecutionStats>()}; + + stdx::lock_guard catalogLock{_mutex}; auto it = _executionStats.find(ns); if (it != _executionStats.end()) { @@ -731,9 +1156,25 @@ const std::shared_ptr<BucketCatalog::ExecutionStats> BucketCatalog::_getExecutio return kEmptyStats; } +void BucketCatalog::_initializeBucketState(const OID& id) { + stdx::lock_guard catalogLock{_mutex}; + _bucketStates.emplace(id, BucketState::kNormal); +} + +void BucketCatalog::_eraseBucketState(const OID& id) { + stdx::lock_guard catalogLock{_mutex}; + _bucketStates.erase(id); +} + +boost::optional<BucketCatalog::BucketState> BucketCatalog::_getBucketState(const OID& id) const { + stdx::lock_guard catalogLock{_mutex}; + auto it = _bucketStates.find(id); + return it != _bucketStates.end() ? boost::make_optional(it->second) : boost::none; +} + boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const OID& id, BucketState target) { - stdx::lock_guard statesLk{_statesMutex}; + stdx::lock_guard catalogLock{_mutex}; auto it = _bucketStates.find(id); if (it == _bucketStates.end()) { return boost::none; @@ -771,564 +1212,31 @@ boost::optional<BucketCatalog::BucketState> BucketCatalog::_setBucketState(const return state; } -BucketCatalog::BucketMetadata::BucketMetadata(BSONElement elem, - const StringData::ComparatorInterface* comparator) - : _metadataElement(elem), _comparator(comparator) {} - -BucketCatalog::BucketMetadata::BucketMetadata(BSONElement elem, - BSONObj obj, - const StringData::ComparatorInterface* comparator, - bool normalized, - bool copied) - : _metadataElement(elem), - _metadata(obj), - _comparator(comparator), - _normalized(normalized), - _copied(copied) {} - - -void BucketCatalog::BucketMetadata::normalize() { - if (!_normalized) { - if (_metadataElement) { - BSONObjBuilder objBuilder; - // We will get an object of equal size, just with reordered fields. - objBuilder.bb().reserveBytes(_metadataElement.size()); - normalizeTopLevel(&objBuilder, _metadataElement); - _metadata = objBuilder.obj(); - } - // Updates the BSONElement to refer to the copied BSONObj. - _metadataElement = _metadata.firstElement(); - _normalized = true; - _copied = true; - } -} - -bool BucketCatalog::BucketMetadata::operator==(const BucketMetadata& other) const { - return _metadataElement.binaryEqualValues(other._metadataElement); -} - -const BSONObj& BucketCatalog::BucketMetadata::toBSON() const { - // Should only be called after the metadata is owned. - invariant(_copied); - return _metadata; -} - -const BSONElement BucketCatalog::BucketMetadata::getMetaElement() const { - return _metadataElement; -} - -StringData BucketCatalog::BucketMetadata::getMetaField() const { - return StringData(_metadataElement.fieldName()); -} - -const StringData::ComparatorInterface* BucketCatalog::BucketMetadata::getComparator() const { - return _comparator; -} - -BucketCatalog::Bucket::Bucket(const OID& id) : _id(id) {} - -const OID& BucketCatalog::Bucket::id() const { - return _id; -} - -StringData BucketCatalog::Bucket::getTimeField() { - return _timeField; -} - -void BucketCatalog::Bucket::_calculateBucketFieldsAndSizeChange( - const BSONObj& doc, - boost::optional<StringData> metaField, - NewFieldNames* newFieldNamesToBeInserted, - uint32_t* newFieldNamesSize, - uint32_t* sizeToBeAdded) const { - // BSON size for an object with an empty object field where field name is empty string. - // We can use this as an offset to know the size when we have real field names. - static constexpr int emptyObjSize = 12; - // Validate in debug builds that this size is correct - dassert(emptyObjSize == BSON("" << BSONObj()).objsize()); - - newFieldNamesToBeInserted->clear(); - *newFieldNamesSize = 0; - *sizeToBeAdded = 0; - auto numMeasurementsFieldLength = numDigits(_numMeasurements); - for (const auto& elem : doc) { - auto fieldName = elem.fieldNameStringData(); - if (fieldName == metaField) { - // Ignore the metadata field since it will not be inserted. - continue; - } - - // If the field name is new, add the size of an empty object with that field name. - auto hashedKey = StringSet::hasher().hashed_key(fieldName); - if (!_fieldNames.contains(hashedKey)) { - newFieldNamesToBeInserted->push_back(hashedKey); - *newFieldNamesSize += elem.fieldNameSize(); - *sizeToBeAdded += emptyObjSize + fieldName.size(); - } - - // Add the element size, taking into account that the name will be changed to its - // positional number. Add 1 to the calculation since the element's field name size - // accounts for a null terminator whereas the stringified position does not. - *sizeToBeAdded += elem.size() - elem.fieldNameSize() + numMeasurementsFieldLength + 1; - } -} - -bool BucketCatalog::Bucket::_hasBeenCommitted() const { - return _numCommittedMeasurements != 0 || _preparedBatch; -} - -bool BucketCatalog::Bucket::allCommitted() const { - return _batches.empty() && !_preparedBatch; -} - -uint32_t BucketCatalog::Bucket::numMeasurements() const { - return _numMeasurements; -} - -std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch( - OperationId opId, const std::shared_ptr<ExecutionStats>& stats) { - auto it = _batches.find(opId); - if (it == _batches.end()) { - it = _batches.try_emplace(opId, std::make_shared<WriteBatch>(_id, opId, stats)).first; - } - return it->second; -} - -BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, - BucketKey& key, - const TimeseriesOptions& options, - ExecutionStats* stats, - ClosedBuckets* closedBuckets, - const Date_t& time) - : _catalog(catalog), _key(&key), _options(&options), _stats(stats), _time(&time) { - - auto bucketFound = [](BucketState bucketState) { - return bucketState == BucketState::kNormal || bucketState == BucketState::kPrepared; - }; - - // First we try to find the bucket without normalizing the key as the normalization is an - // expensive operation. - auto hashedKey = BucketHasher{}.hashed_key(key); - if (bucketFound(_findOpenBucketThenLock(hashedKey))) { - return; - } - - // If not found, we normalize the metadata object and try to find it again. - // Save a copy of the non-normalized metadata before normalizing so we can add this key if the - // bucket was found for the normalized metadata. The BSON element is still refering to that of - // the document in current scope at this point. We will only make a copy of it when we decide to - // store it. - BSONElement nonNormalizedMetadata = key.metadata.getMetaElement(); - key.metadata.normalize(); - HashedBucketKey hashedNormalizedKey = BucketHasher{}.hashed_key(key); - - if (bucketFound(_findOpenBucketThenLock(hashedNormalizedKey))) { - // Bucket found, check if we have available slots to store the non-normalized key - if (_bucket->_nonNormalizedKeyMetadatas.size() == - _bucket->_nonNormalizedKeyMetadatas.capacity()) { - return; - } - - // Release the bucket as we need to acquire the exclusive lock for the catalog. - release(); - - // Re-construct the key as it were before normalization. - auto originalBucketKey = nonNormalizedMetadata - ? key.withCopiedMetadata(nonNormalizedMetadata.wrap()) - : key.withCopiedMetadata(BSONObj()); - hashedKey.key = &originalBucketKey; - - // Find the bucket under a catalog exclusive lock for the catalog. It may have been modified - // since we released our locks. If found we store the key to avoid the need to normalize for - // future lookups with this incoming field order. - BSONObj nonNormalizedMetadataObj = - nonNormalizedMetadata ? nonNormalizedMetadata.wrap() : BSONObj(); - if (bucketFound(_findOpenBucketThenLockAndStoreKey( - hashedNormalizedKey, hashedKey, nonNormalizedMetadataObj))) { - return; - } - } - - // Bucket not found, grab exclusive lock and create bucket with the key before normalization. - auto originalBucketKey = nonNormalizedMetadata - ? key.withCopiedMetadata(nonNormalizedMetadata.wrap()) - : key.withCopiedMetadata(BSONObj()); - hashedKey.key = &originalBucketKey; - auto lk = _catalog->_lockExclusive(); - _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets); -} - -BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, - const OID& bucketId, - boost::optional<BucketState> targetState) - : _catalog(catalog) { - invariant(!targetState || targetState == BucketState::kNormal || - targetState == BucketState::kPrepared); - - { - auto lk = _catalog->_lockShared(); - auto bucketIt = _catalog->_allBuckets.find(bucketId); - if (bucketIt == _catalog->_allBuckets.end()) { - return; - } - - _bucket = bucketIt->second.get(); - _acquire(); - } - - auto state = - targetState ? _catalog->_setBucketState(_bucket->_id, *targetState) : _getBucketState(); - if (!state || state == BucketState::kCleared || state == BucketState::kPreparedAndCleared) { - release(); - } -} - -BucketCatalog::BucketAccess::~BucketAccess() { - if (isLocked()) { - release(); - } -} - -boost::optional<BucketCatalog::BucketState> BucketCatalog::BucketAccess::_getBucketState() const { - stdx::lock_guard lk{_catalog->_statesMutex}; - auto it = _catalog->_bucketStates.find(_bucket->_id); - return it != _catalog->_bucketStates.end() ? boost::make_optional(it->second) : boost::none; -} - -BucketCatalog::BucketState BucketCatalog::BucketAccess::_findOpenBucketThenLock( - const HashedBucketKey& key) { - { - auto lk = _catalog->_lockShared(); - auto it = _catalog->_openBuckets.find(key); - if (it == _catalog->_openBuckets.end()) { - // Bucket does not exist. - return BucketState::kCleared; - } - - _bucket = it->second; - _acquire(); - } - - return _confirmStateForAcquiredBucket(); -} - -BucketCatalog::BucketState BucketCatalog::BucketAccess::_findOpenBucketThenLockAndStoreKey( - const HashedBucketKey& normalizedKey, - const HashedBucketKey& nonNormalizedKey, - BSONObj nonNormalizedMetadata) { - invariant(!isLocked()); - { - auto lk = _catalog->_lockExclusive(); - auto it = _catalog->_openBuckets.find(normalizedKey); - if (it == _catalog->_openBuckets.end()) { - // Bucket does not exist. - return BucketState::kCleared; - } - - _bucket = it->second; - _acquire(); - - // Store the non-normalized key if we still have free slots - if (_bucket->_nonNormalizedKeyMetadatas.size() < - _bucket->_nonNormalizedKeyMetadatas.capacity()) { - auto [_, inserted] = - _catalog->_openBuckets.insert(std::make_pair(nonNormalizedKey, _bucket)); - if (inserted) { - _bucket->_nonNormalizedKeyMetadatas.push_back(nonNormalizedMetadata); - // Increment the memory usage to store this key and value in _openBuckets - _bucket->_memoryUsage += nonNormalizedKey.key->ns.size() + - nonNormalizedMetadata.objsize() + sizeof(_bucket); +class BucketCatalog::ServerStatus : public ServerStatusSection { + struct BucketCounts { + BucketCounts& operator+=(const BucketCounts& other) { + if (&other != this) { + all += other.all; + open += other.open; + idle += other.idle; } + return *this; } - } - - return _confirmStateForAcquiredBucket(); -} - -BucketCatalog::BucketState BucketCatalog::BucketAccess::_confirmStateForAcquiredBucket() { - auto state = *_getBucketState(); - if (state == BucketState::kCleared || state == BucketState::kPreparedAndCleared) { - release(); - } else { - _catalog->_markBucketNotIdle(_bucket, false /* locked */); - } - - return state; -} - -void BucketCatalog::BucketAccess::_findOrCreateOpenBucketThenLock( - const HashedBucketKey& normalizedKey, - const HashedBucketKey& nonNormalizedKey, - ClosedBuckets* closedBuckets) { - auto it = _catalog->_openBuckets.find(normalizedKey); - if (it == _catalog->_openBuckets.end()) { - // No open bucket for this metadata. - _create(normalizedKey, nonNormalizedKey, closedBuckets); - return; - } - - _bucket = it->second; - _acquire(); - - auto state = *_getBucketState(); - if (state == BucketState::kNormal || state == BucketState::kPrepared) { - _catalog->_markBucketNotIdle(_bucket, false /* locked */); - return; - } - - _catalog->_abort(_guard, _bucket, nullptr, boost::none); - _create(normalizedKey, nonNormalizedKey, closedBuckets); -} - -void BucketCatalog::BucketAccess::_acquire() { - invariant(_bucket); - _guard = stdx::unique_lock<Mutex>(_bucket->_mutex); -} - -void BucketCatalog::BucketAccess::_create(const HashedBucketKey& normalizedKey, - const HashedBucketKey& nonNormalizedKey, - ClosedBuckets* closedBuckets, - bool openedDuetoMetadata) { - invariant(_options); - _bucket = _catalog->_allocateBucket( - normalizedKey, *_time, *_options, _stats, closedBuckets, openedDuetoMetadata); - _catalog->_openBuckets[nonNormalizedKey] = _bucket; - _bucket->_nonNormalizedKeyMetadatas.push_back(nonNormalizedKey.key->metadata.toBSON()); - _acquire(); -} - -void BucketCatalog::BucketAccess::release() { - invariant(_guard.owns_lock()); - _guard.unlock(); - _bucket = nullptr; -} - -bool BucketCatalog::BucketAccess::isLocked() const { - return _bucket && _guard.owns_lock(); -} - -BucketCatalog::Bucket* BucketCatalog::BucketAccess::operator->() { - invariant(isLocked()); - return _bucket; -} - -BucketCatalog::BucketAccess::operator bool() const { - return isLocked(); -} - -BucketCatalog::BucketAccess::operator BucketCatalog::Bucket*() const { - return _bucket; -} - -bool BucketCatalog::BucketAccess::schemaIncompatible( - const BSONObj& input, - boost::optional<StringData> metaField, - const StringData::ComparatorInterface* comparator) { - // (Generic FCV reference): TODO (SERVER-60912): Update once kLastLTS is 6.0 - if (serverGlobalParams.featureCompatibility.getVersion() == - multiversion::GenericFCV::kLastLTS) { - return false; - } - - auto result = _bucket->_schema.update(input, metaField, comparator); - return (result == timeseries::Schema::UpdateStatus::Failed); -} - -void BucketCatalog::BucketAccess::rollover( - const std::function<bool(BucketAccess*)>& shouldCloseBucket, ClosedBuckets* closedBuckets) { - invariant(isLocked()); - invariant(_key); - invariant(_time); - - auto oldId = _bucket->id(); - release(); - - // Precompute the hash outside the lock, since it's expensive. - auto prevMetadata = _key->metadata.getMetaElement(); - _key->metadata.normalize(); - auto hashedNormalizedKey = BucketHasher{}.hashed_key(*_key); - auto prevBucketKey = prevMetadata ? _key->withCopiedMetadata(prevMetadata.wrap()) - : _key->withCopiedMetadata(BSONObj()); - auto hashedKey = BucketHasher{}.hashed_key(prevBucketKey); - - auto lk = _catalog->_lockExclusive(); - _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets); - - // Recheck if still full now that we've reacquired the bucket. - bool sameBucket = - oldId == _bucket->id(); // Only record stats if bucket has changed, don't double-count. - if (sameBucket || shouldCloseBucket(this)) { - // The bucket is indeed full, so create a new one. - if (_bucket->allCommitted()) { - // The bucket does not contain any measurements that are yet to be committed, so we can - // remove it now. Otherwise, we must keep the bucket around until it is committed. - closedBuckets->push_back(ClosedBucket{ - _bucket->id(), _bucket->getTimeField().toString(), _bucket->numMeasurements()}); - - Bucket* oldBucket = _bucket; - release(); - bool removed = _catalog->_removeBucket(oldBucket, false /* expiringBuckets */); - invariant(removed); - } else { - _bucket->_full = true; - - // We will recreate a new bucket for the same key below. We also need to cleanup all - // extra metadata keys added for the old bucket instance. - _catalog->_removeNonNormalizedKeysForBucket(_bucket); - release(); - } - - _create(hashedNormalizedKey, hashedKey, closedBuckets, false /* openedDueToMetadata */); - } -} - -Date_t BucketCatalog::BucketAccess::getTime() const { - return _bucket->id().asDateT(); -} - -BucketCatalog::WriteBatch::WriteBatch(const OID& bucketId, - OperationId opId, - const std::shared_ptr<ExecutionStats>& stats) - : _bucketId{bucketId}, _opId(opId), _stats{stats} {} - -bool BucketCatalog::WriteBatch::claimCommitRights() { - return !_commitRights.swap(true); -} - -StatusWith<BucketCatalog::CommitInfo> BucketCatalog::WriteBatch::getResult() const { - if (!_promise.getFuture().isReady()) { - _stats->numWaits.fetchAndAddRelaxed(1); - } - return _promise.getFuture().getNoThrow(); -} - -const OID& BucketCatalog::WriteBatch::bucketId() const { - return _bucketId; -} - -const std::vector<BSONObj>& BucketCatalog::WriteBatch::measurements() const { - invariant(!_active); - return _measurements; -} - -const BSONObj& BucketCatalog::WriteBatch::min() const { - invariant(!_active); - return _min; -} - -const BSONObj& BucketCatalog::WriteBatch::max() const { - invariant(!_active); - return _max; -} - -const StringMap<std::size_t>& BucketCatalog::WriteBatch::newFieldNamesToBeInserted() const { - invariant(!_active); - return _newFieldNamesToBeInserted; -} - -uint32_t BucketCatalog::WriteBatch::numPreviouslyCommittedMeasurements() const { - invariant(!_active); - return _numPreviouslyCommittedMeasurements; -} - -bool BucketCatalog::WriteBatch::active() const { - return _active; -} - -bool BucketCatalog::WriteBatch::finished() const { - return _promise.getFuture().isReady(); -} - -BSONObj BucketCatalog::WriteBatch::toBSON() const { - auto toFieldName = [](const auto& nameHashPair) { return nameHashPair.first; }; - return BSON("docs" << _measurements << "bucketMin" << _min << "bucketMax" << _max - << "numCommittedMeasurements" << int(_numPreviouslyCommittedMeasurements) - << "newFieldNamesToBeInserted" - << std::set<std::string>( - boost::make_transform_iterator(_newFieldNamesToBeInserted.begin(), - toFieldName), - boost::make_transform_iterator(_newFieldNamesToBeInserted.end(), - toFieldName))); -} -void BucketCatalog::WriteBatch::_addMeasurement(const BSONObj& doc) { - invariant(_active); - _measurements.push_back(doc); -} - -void BucketCatalog::WriteBatch::_recordNewFields(NewFieldNames&& fields) { - invariant(_active); - for (auto&& field : fields) { - _newFieldNamesToBeInserted[field] = field.hash(); - } -} - -void BucketCatalog::WriteBatch::_prepareCommit(Bucket* bucket) { - invariant(_commitRights.load()); - invariant(_active); - _active = false; - _numPreviouslyCommittedMeasurements = bucket->_numCommittedMeasurements; + std::size_t all = 0; + std::size_t open = 0; + std::size_t idle = 0; + }; - // Filter out field names that were new at the time of insertion, but have since been committed - // by someone else. - for (auto it = _newFieldNamesToBeInserted.begin(); it != _newFieldNamesToBeInserted.end();) { - StringMapHashedKey fieldName(it->first, it->second); - if (bucket->_fieldNames.contains(fieldName)) { - _newFieldNamesToBeInserted.erase(it++); - continue; + BucketCounts _getBucketCounts(const BucketCatalog& catalog) const { + BucketCounts sum; + for (auto const& stripe : catalog._stripes) { + stdx::lock_guard stripeLock{stripe.mutex}; + sum += {stripe.allBuckets.size(), stripe.openBuckets.size(), stripe.idleBuckets.size()}; } - - bucket->_fieldNames.emplace(fieldName); - ++it; - } - - for (const auto& doc : _measurements) { - bucket->_minmax.update( - doc, bucket->_metadata.getMetaField(), bucket->_metadata.getComparator()); - } - - const bool isUpdate = _numPreviouslyCommittedMeasurements > 0; - if (isUpdate) { - _min = bucket->_minmax.minUpdates(); - _max = bucket->_minmax.maxUpdates(); - } else { - _min = bucket->_minmax.min(); - _max = bucket->_minmax.max(); - - // Approximate minmax memory usage by taking sizes of initial commit. Subsequent updates may - // add fields but are most likely just to update values. - bucket->_memoryUsage += _min.objsize(); - bucket->_memoryUsage += _max.objsize(); - - // We don't have a great approximation for the memory usage of _schema, so we use the max as - // a stand-in. - bucket->_memoryUsage += _max.objsize(); - } -} - -void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) { - invariant(_commitRights.load()); - invariant(!_active); - _promise.emplaceValue(info); -} - -void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status, - const Bucket* bucket) { - if (finished()) { - return; + return sum; } - _active = false; - std::string nsIdentification; - if (bucket) { - nsIdentification.append(str::stream() << " for namespace " << bucket->_ns); - } - _promise.setError(status.value_or(Status{ErrorCodes::TimeseriesBucketCleared, - str::stream() << "Time-series bucket " << _bucketId - << nsIdentification << " was cleared"})); -} - -class BucketCatalog::ServerStatus : public ServerStatusSection { public: ServerStatus() : ServerStatusSection("bucketCatalog") {} @@ -1339,20 +1247,17 @@ public: BSONObj generateSection(OperationContext* opCtx, const BSONElement&) const override { const auto& bucketCatalog = BucketCatalog::get(opCtx); { - auto statsLk = bucketCatalog._statsMutex.lockShared(); + stdx::lock_guard catalogLock{bucketCatalog._mutex}; if (bucketCatalog._executionStats.empty()) { return {}; } } - auto lk = bucketCatalog._lockShared(); + auto counts = _getBucketCounts(bucketCatalog); BSONObjBuilder builder; - builder.appendNumber("numBuckets", - static_cast<long long>(bucketCatalog._allBuckets.size())); - builder.appendNumber("numOpenBuckets", - static_cast<long long>(bucketCatalog._openBuckets.size())); - builder.appendNumber("numIdleBuckets", - static_cast<long long>(bucketCatalog._numberOfIdleBuckets())); + builder.appendNumber("numBuckets", static_cast<long long>(counts.all)); + builder.appendNumber("numOpenBuckets", static_cast<long long>(counts.open)); + builder.appendNumber("numIdleBuckets", static_cast<long long>(counts.idle)); builder.appendNumber("memoryUsage", static_cast<long long>(bucketCatalog._memoryUsage.load())); return builder.obj(); diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 3b5b4e752a7..b33a9025b3c 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -46,15 +46,22 @@ namespace mongo { class BucketCatalog { - struct ExecutionStats; - // Number of new field names we can hold in NewFieldNames without needing to allocate memory. static constexpr std::size_t kNumStaticNewFields = 10; using NewFieldNames = boost::container::small_vector<StringMapHashedKey, kNumStaticNewFields>; -public: + using StripeNumber = std::uint8_t; + + struct BucketHandle { + const OID id; + const StripeNumber stripe; + }; + + struct ExecutionStats; class Bucket; + struct CreationInfo; +public: enum class CombineWithInsertsFromOtherClients { kAllow, kDisallow, @@ -66,7 +73,7 @@ public: }; /** - * Information of a Bucket that got closed when performing an operation on this BucketCatalog. + * Information of a Bucket that got closed while performing an operation on this BucketCatalog. */ struct ClosedBucket { OID bucketId; @@ -91,24 +98,28 @@ public: public: WriteBatch() = delete; - WriteBatch(const OID& bucketId, + WriteBatch(const BucketHandle& bucketId, OperationId opId, const std::shared_ptr<ExecutionStats>& stats); /** - * Attempt to claim the right to commit (or abort) a batch. If it returns true, rights are + * Attempts to claim the right to commit a batch. If it returns true, rights are * granted. If it returns false, rights are revoked, and the caller should get the result * of the batch with getResult(). Non-blocking. */ bool claimCommitRights(); /** - * Retrieve the result of the write batch commit. Should be called by any interested party + * Retrieves the result of the write batch commit. Should be called by any interested party * that does not have commit rights. Blocking. */ StatusWith<CommitInfo> getResult() const; - const OID& bucketId() const; + /** + * Returns a handle which can be used by the BucketCatalog internally to locate its record + * for this bucket. + */ + const BucketHandle& bucket() const; const std::vector<BSONObj>& measurements() const; const BSONObj& min() const; @@ -117,12 +128,7 @@ public: uint32_t numPreviouslyCommittedMeasurements() const; /** - * Whether the batch is active and can be written to. - */ - bool active() const; - - /** - * Whether the batch has been committed or aborted. + * Returns whether the batch has already been committed or aborted. */ bool finished() const; @@ -130,36 +136,36 @@ public: private: /** - * Add a measurement. Active batches only. + * Adds a measurement. Active batches only. */ void _addMeasurement(const BSONObj& doc); /** - * Record a set of new-to-the-bucket fields. Active batches only. + * Records a set of new-to-the-bucket fields. Active batches only. */ void _recordNewFields(NewFieldNames&& fields); /** - * Prepare the batch for commit. Sets min/max appropriately, records the number of documents - * that have previously been committed to the bucket, and marks the batch inactive. Must - * have commit rights. + * Prepares the batch for commit. Sets min/max appropriately, records the number of + * documents that have previously been committed to the bucket, and renders the batch + * inactive. Must have commit rights. */ void _prepareCommit(Bucket* bucket); /** - * Report the result and status of a commit, and notify anyone waiting on getResult(). Must - * have commit rights. Inactive batches only. + * Reports the result and status of a commit, and notifies anyone waiting on getResult(). + * Must have commit rights. Inactive batches only. */ void _finish(const CommitInfo& info); /** - * Abandon the write batch and notify any waiters that the bucket has been cleared. Must - * have commit rights. Parameter 'bucket' provides a pointer to the bucket if still - * available, nullptr otherwise. + * Abandons the write batch and notifies any waiters that the bucket has been cleared. + * Parameter 'bucket' provides a pointer to the bucket if still available, nullptr + * otherwise. */ void _abort(const boost::optional<Status>& status, const Bucket* bucket); - const OID _bucketId; + const BucketHandle _bucket; OperationId _opId; std::shared_ptr<ExecutionStats> _stats; @@ -169,15 +175,12 @@ public: uint32_t _numPreviouslyCommittedMeasurements = 0; StringMap<std::size_t> _newFieldNamesToBeInserted; // Value is hash of string key - bool _active = true; - AtomicWord<bool> _commitRights{false}; SharedPromise<CommitInfo> _promise; }; /** - * Return type for the insert function. - * See comment above insert() for more information. + * Return type for the insert function. See insert() for more information. */ struct InsertResult { std::shared_ptr<WriteBatch> batch; @@ -200,12 +203,13 @@ public: * Returns an empty document if the given bucket cannot be found or if this time-series * collection was not created with a metadata field name. */ - BSONObj getMetadata(const OID& bucketId) const; + BSONObj getMetadata(const BucketHandle& bucket) const; /** - * Returns the WriteBatch into which the document was inserted and optional information about a - * bucket if one was closed. Any caller who receives the same batch may commit or abort the - * batch after claiming commit rights. See WriteBatch for more details. + * Returns the WriteBatch into which the document was inserted and a list of any buckets that + * were closed in order to make space to insert the document. Any caller who receives the same + * batch may commit or abort the batch after claiming commit rights. See WriteBatch for more + * details. */ StatusWith<InsertResult> insert(OperationContext* opCtx, const NamespaceString& ns, @@ -230,9 +234,8 @@ public: boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info); /** - * Aborts the given write batch and any other outstanding batches on the same bucket. Caller - * must already have commit rights on batch. Uses the provided status when clearing the bucket, - * or TimeseriesBucketCleared if not provided. + * Aborts the given write batch and any other outstanding batches on the same bucket. Uses the + * provided status when clearing the bucket, or TimeseriesBucketCleared if not provided. */ void abort(std::shared_ptr<WriteBatch> batch, const boost::optional<Status>& status = boost::none); @@ -264,40 +267,18 @@ public: void appendExecutionStats(const NamespaceString& ns, BSONObjBuilder* builder) const; private: - /** - * This class provides a mutex with shared and exclusive locking semantics. Unlike some shared - * mutex implementations, it does not allow for writer starvation (assuming the underlying - * Mutex implemenation does not allow for starvation). The underlying mechanism is simply an - * array of Mutex instances. To take a shared lock, a thread's ID is hashed, mapping the thread - * to a particular mutex, which is then locked. To take an exclusive lock, all mutexes are - * locked. - * - * This behavior makes it easy to allow concurrent read access while still allowing writes to - * occur safely with exclusive access. It should only be used for situations where observed - * access patterns are read-mostly. - * - * A shared lock *cannot* be upgraded to an exclusive lock. - */ - class StripedMutex { - public: - static constexpr std::size_t kNumStripes = 16; - StripedMutex() = default; - - using SharedLock = stdx::unique_lock<Mutex>; - SharedLock lockShared() const; - - class ExclusiveLock { - public: - ExclusiveLock() = default; - explicit ExclusiveLock(const StripedMutex&); - - private: - std::array<stdx::unique_lock<Mutex>, kNumStripes> _locks; - }; - ExclusiveLock lockExclusive() const; - - private: - mutable std::array<Mutex, kNumStripes> _mutexes; + enum class BucketState { + // Bucket can be inserted into, and does not have an outstanding prepared commit + kNormal, + // Bucket can be inserted into, and has a prepared commit outstanding. + kPrepared, + // Bucket can no longer be inserted into, does not have an outstanding prepared + // commit. + kCleared, + // Bucket can no longer be inserted into, but still has an outstanding + // prepared commit. Any writer other than the one who prepared the + // commit should receive a WriteConflictException. + kPreparedAndCleared, }; struct BucketMetadata { @@ -305,24 +286,10 @@ private: BucketMetadata() = default; BucketMetadata(BSONElement elem, const StringData::ComparatorInterface* comparator); - // Constructs with a copy of the metadata. - BucketMetadata(BSONElement elem, - BSONObj obj, - const StringData::ComparatorInterface* comparator, - bool normalized = false, - bool copied = true); - - bool normalized() const { - return _normalized; - } - void normalize(); - bool operator==(const BucketMetadata& other) const; const BSONObj& toBSON() const; - const BSONElement getMetaElement() const; - StringData getMetaField() const; const StringData::ComparatorInterface* getComparator() const; @@ -337,177 +304,24 @@ private: private: // Only the value of '_metadataElement' is used for hashing and comparison. - // When BucketMetadata does not own the '_metadata', only '_metadataElement' will be present - // and used to look up buckets. After owning the '_metadata,' the field should refer to the - // BSONElement within '_metadata'. BSONElement _metadataElement; - // Empty when just looking up buckets. Owns a copy when the field is present. + // Empty if metadata field isn't present, owns a copy otherwise. BSONObj _metadata; - const StringData::ComparatorInterface* _comparator = nullptr; - bool _normalized = false; - bool _copied = false; - }; - - using IdleList = std::list<Bucket*>; - -public: - class Bucket { - public: - friend class BucketAccess; - friend class BucketCatalog; - - Bucket(const OID& id); - /** - * Returns the ID for the underlying bucket. - */ - const OID& id() const; - - /** - * Returns the timefield for the underlying bucket. - */ - StringData getTimeField(); - - /** - * Returns whether all measurements have been committed. - */ - bool allCommitted() const; - - /** - * Returns total number of measurements in the bucket. - */ - uint32_t numMeasurements() const; - - private: - /** - * Determines the effect of adding 'doc' to this bucket. If adding 'doc' causes this bucket - * to overflow, we will create a new bucket and recalculate the change to the bucket size - * and data fields. - */ - void _calculateBucketFieldsAndSizeChange(const BSONObj& doc, - boost::optional<StringData> metaField, - NewFieldNames* newFieldNamesToBeInserted, - uint32_t* newFieldNamesSize, - uint32_t* sizeToBeAdded) const; - - /** - * Returns whether BucketCatalog::commit has been called at least once on this bucket. - */ - bool _hasBeenCommitted() const; - - /** - * Return a pointer to the current, open batch. - */ - std::shared_ptr<WriteBatch> _activeBatch(OperationId opId, - const std::shared_ptr<ExecutionStats>& stats); - - // Access to the bucket is controlled by this lock - mutable Mutex _mutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), "BucketCatalog::Bucket::_mutex"); - - // The bucket ID for the underlying document - const OID _id; - - // The namespace that this bucket is used for. - NamespaceString _ns; - - // The metadata of the data that this bucket contains. - BucketMetadata _metadata; - - // Extra metadata combinations that are supported without normalizing the metadata object. - static constexpr std::size_t kNumFieldOrderCombinationsWithoutNormalizing = 1; - boost::container::static_vector<BSONObj, kNumFieldOrderCombinationsWithoutNormalizing> - _nonNormalizedKeyMetadatas; - - // Top-level field names of the measurements that have been inserted into the bucket. - StringSet _fieldNames; - - // Time field for the measurements that have been inserted into the bucket. - std::string _timeField; - - // The minimum and maximum values for each field in the bucket. - timeseries::MinMax _minmax; - - // The reference schema for measurements in this bucket. May reflect schema of uncommitted - // measurements. - timeseries::Schema _schema; - - // The latest time that has been inserted into the bucket. - Date_t _latestTime; - - // The total size in bytes of the bucket's BSON serialization, including measurements to be - // inserted. - uint64_t _size = 0; - - // The total number of measurements in the bucket, including uncommitted measurements and - // measurements to be inserted. - uint32_t _numMeasurements = 0; - - // The number of committed measurements in the bucket. - uint32_t _numCommittedMeasurements = 0; - - // Whether the bucket is full. This can be due to number of measurements, size, or time - // range. - bool _full = false; - - // The batch that has been prepared and is currently in the process of being committed, if - // any. - std::shared_ptr<WriteBatch> _preparedBatch; - - // Batches, per operation, that haven't been committed or aborted yet. - stdx::unordered_map<OperationId, std::shared_ptr<WriteBatch>> _batches; - - // If the bucket is in _idleBuckets, then its position is recorded here. - boost::optional<IdleList::iterator> _idleListEntry = boost::none; - - // Approximate memory usage of this bucket. - uint64_t _memoryUsage = sizeof(*this); - }; - -private: - struct ExecutionStats { - AtomicWord<long long> numBucketInserts; - AtomicWord<long long> numBucketUpdates; - AtomicWord<long long> numBucketsOpenedDueToMetadata; - AtomicWord<long long> numBucketsClosedDueToCount; - AtomicWord<long long> numBucketsClosedDueToSchemaChange; - AtomicWord<long long> numBucketsClosedDueToSize; - AtomicWord<long long> numBucketsClosedDueToTimeForward; - AtomicWord<long long> numBucketsClosedDueToTimeBackward; - AtomicWord<long long> numBucketsClosedDueToMemoryThreshold; - AtomicWord<long long> numCommits; - AtomicWord<long long> numWaits; - AtomicWord<long long> numMeasurementsCommitted; - }; - - enum class BucketState { - // Bucket can be inserted into, and does not have an outstanding prepared commit - kNormal, - // Bucket can be inserted into, and has a prepared commit outstanding. - kPrepared, - // Bucket can no longer be inserted into, does not have an outstanding prepared - // commit. - kCleared, - // Bucket can no longer be inserted into, but still has an outstanding - // prepared commit. Any writer other than the one who prepared the - // commit should receive a WriteConflictException. - kPreparedAndCleared, + const StringData::ComparatorInterface* _comparator = nullptr; }; /** - * Key to lookup open Bucket for namespace and metadata. + * Key to lookup open Bucket for namespace and metadata, with pre-computed hash. */ struct BucketKey { + BucketKey() = delete; + BucketKey(const NamespaceString& nss, const BucketMetadata& meta); + NamespaceString ns; BucketMetadata metadata; - - /** - * Creates a new BucketKey with a different internal metadata object. - */ - BucketKey withCopiedMetadata(BSONObj meta) const { - return {ns, {meta.firstElement(), meta, metadata.getComparator()}}; - } + std::size_t hash; bool operator==(const BucketKey& other) const { return ns == other.ns && metadata == other.metadata; @@ -520,235 +334,154 @@ private: }; /** - * BucketKey with pre-calculated hash. To avoiding calculating the hash while holding locks. - * - * The unhashed BucketKey is stored inside HashedBucketKey by reference and must not go out of - * scope for the lifetime of the returned HashedBucketKey. + * Hasher to support pre-computed hash lookup for BucketKey. */ - struct HashedBucketKey { - operator BucketKey() const { - return *key; - } - const BucketKey* key; - std::size_t hash; + struct BucketHasher { + std::size_t operator()(const BucketKey& key) const; }; /** - * Hasher to support heterogeneous lookup for BucketKey and HashedBucketKey. + * Struct to hold a portion of the buckets managed by the catalog. + * + * Each of the bucket lists, as well as the buckets themselves, are protected by 'mutex'. */ - struct BucketHasher { - // This using directive activates heterogeneous lookup in the hash table - using is_transparent = void; + struct Stripe { + mutable Mutex mutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "BucketCatalog::Stripe::mutex"); - std::size_t operator()(const BucketKey& key) const { - // Use the default absl hasher. - return absl::Hash<BucketKey>{}(key); - } + // All buckets currently in the catalog, including buckets which are full but not yet + // committed. + stdx::unordered_map<OID, std::unique_ptr<Bucket>, OID::Hasher> allBuckets; - std::size_t operator()(const HashedBucketKey& key) const { - return key.hash; - } + // The current open bucket for each namespace and metadata pair. + stdx::unordered_map<BucketKey, Bucket*, BucketHasher> openBuckets; - /** - * Pre-calculates a hashed BucketKey. - */ - HashedBucketKey hashed_key(const BucketKey& key) { - return HashedBucketKey{&key, operator()(key)}; - } + // Buckets that do not have any outstanding writes. + using IdleList = std::list<Bucket*>; + IdleList idleBuckets; }; + StripeNumber _getStripeNumber(const BucketKey& key); + /** - * Equality, provides comparison between hashed and unhashed bucket keys. + * Mode enum to control whether the bucket retrieval methods below will return buckets that are + * in kCleared or kPreparedAndCleared state. */ - struct BucketEq { - // This using directive activates heterogeneous lookup in the hash table - using is_transparent = void; - - bool operator()(const BucketKey& lhs, const BucketKey& rhs) const { - return lhs == rhs; - } - bool operator()(const BucketKey& lhs, const HashedBucketKey& rhs) const { - return lhs == *rhs.key; - } - bool operator()(const HashedBucketKey& lhs, const BucketKey& rhs) const { - return *lhs.key == rhs; - } - bool operator()(const HashedBucketKey& lhs, const HashedBucketKey& rhs) const { - return *lhs.key == *rhs.key; - } - }; + enum class ReturnClearedBuckets { kYes, kNo }; /** - * Helper class to handle all the locking necessary to lookup and lock a bucket for use. This - * is intended primarily for using a single bucket, including replacing it when it becomes full. - * If the usage pattern iterates over several buckets, you will instead want to use raw access - * using the different mutexes with the locking semantics described below. + * Retrieve a bucket for read-only use. */ - class BucketAccess { - public: - BucketAccess() = delete; - BucketAccess(BucketCatalog* catalog, - BucketKey& key, - const TimeseriesOptions& options, - ExecutionStats* stats, - ClosedBuckets* closedBuckets, - const Date_t& time); - BucketAccess(BucketCatalog* catalog, - const OID& bucketId, - boost::optional<BucketState> targetState = boost::none); - ~BucketAccess(); - - bool isLocked() const; - Bucket* operator->(); - operator bool() const; - operator Bucket*() const; - - // Release the bucket lock, typically in order to reacquire the catalog lock. - void release(); - - /** - * Determines if the schema for an incoming measurement is incompatible with those already - * stored in the bucket. - * - * Returns true if incompatible - */ - bool schemaIncompatible(const BSONObj& doc, - boost::optional<StringData> metaField, - const StringData::ComparatorInterface* comparator); - - /** - * Close the existing, full bucket and open a new one for the same metadata. - * Parameter is a function which should check that the bucket is indeed still full after - * reacquiring the necessary locks. The first parameter will give the function access to - * this BucketAccess instance, with the bucket locked. - * - * Returns bucket information of a bucket if one was closed. - */ - void rollover(const std::function<bool(BucketAccess*)>& isBucketFull, - ClosedBuckets* closedBuckets); - - // Retrieve the time associated with the bucket (id) - Date_t getTime() const; - - private: - /** - * Returns the state of the bucket, or boost::none if there is no state for the bucket. - */ - boost::optional<BucketState> _getBucketState() const; - - /** - * Helper to find and lock an open bucket for the given metadata if it exists. Takes a - * shared lock on the catalog. Returns the state of the bucket if it is locked and usable. - * In case the bucket does not exist or was previously cleared and thus is not usable, the - * return value will be BucketState::kCleared. - */ - BucketState _findOpenBucketThenLock(const HashedBucketKey& key); - - /** - * Same as _findOpenBucketThenLock above but takes an exclusive lock on the catalog. In - * addition to finding the bucket it also store a non-normalized key if there are available - * slots in the bucket. - */ - BucketState _findOpenBucketThenLockAndStoreKey(const HashedBucketKey& normalizedKey, - const HashedBucketKey& key, - BSONObj metadata); + const Bucket* _findBucket(const Stripe& stripe, + WithLock stripeLock, + const OID& id, + ReturnClearedBuckets mode = ReturnClearedBuckets::kNo) const; - /** - * Helper to determine the state of the bucket that is found by _findOpenBucketThenLock and - * _findOpenBucketThenLockAndStoreKey. Requires the bucket lock to be acquired before - * calling this function and it may release the lock depending on the state. - */ - BucketState _confirmStateForAcquiredBucket(); - - // Helper to find an open bucket for the given metadata if it exists, create it if it - // doesn't, and lock it. Requires an exclusive lock on the catalog. - void _findOrCreateOpenBucketThenLock(const HashedBucketKey& normalizedKey, - const HashedBucketKey& key, - ClosedBuckets* closedBuckets); - - // Lock _bucket. - void _acquire(); - - // Allocate a new bucket in the catalog, set the local state to that bucket, and acquire - // a lock on it. - void _create(const HashedBucketKey& normalizedKey, - const HashedBucketKey& key, - ClosedBuckets* closedBuckets, - bool openedDuetoMetadata = true); - - BucketCatalog* _catalog; - BucketKey* _key = nullptr; - const TimeseriesOptions* _options = nullptr; - ExecutionStats* _stats = nullptr; - const Date_t* _time = nullptr; - - Bucket* _bucket = nullptr; - stdx::unique_lock<Mutex> _guard; - }; + /** + * Retrieve a bucket for write use. + */ + Bucket* _useBucket(Stripe* stripe, + WithLock stripeLock, + const OID& id, + ReturnClearedBuckets mode); - class ServerStatus; + /** + * Retrieve a bucket for write use, setting the state in the process. + */ + Bucket* _useBucketInState(Stripe* stripe, + WithLock stripeLock, + const OID& id, + BucketState targetState); - StripedMutex::SharedLock _lockShared() const; - StripedMutex::ExclusiveLock _lockExclusive() const; + /** + * Retrieve a bucket for write use, or create one if a suitable bucket doesn't already exist. + */ + Bucket* _useOrCreateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info); - void _waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch); + /** + * Wait for other batches to finish so we can prepare 'batch' + */ + void _waitToCommitBatch(Stripe* stripe, const std::shared_ptr<WriteBatch>& batch); /** * Removes the given bucket from the bucket catalog's internal data structures. */ - bool _removeBucket(Bucket* bucket, bool expiringBuckets); + bool _removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* bucket); /** - * Removes extra non-normalized BucketKey's for the given bucket from the - * bucket catalog's internal data structures. + * Aborts 'batch', and if the corresponding bucket still exists, proceeds to abort any other + * unprepared batches and remove the bucket from the catalog if there is no unprepared batch. */ - void _removeNonNormalizedKeysForBucket(Bucket* bucket); + void _abort(Stripe* stripe, + WithLock stripeLock, + std::shared_ptr<WriteBatch> batch, + const boost::optional<Status>& status = boost::none); /** - * Aborts any batches it can for the given bucket, then removes the bucket. If batch is - * non-null, it is assumed that the caller has commit rights for that batch. + * Aborts any unprepared batches for the given bucket, then removes the bucket if there is no + * prepared batch. If 'batch' is non-null, it is assumed that the caller has commit rights for + * that batch. */ - void _abort(stdx::unique_lock<Mutex>& lk, + void _abort(Stripe* stripe, + WithLock stripeLock, Bucket* bucket, std::shared_ptr<WriteBatch> batch, const boost::optional<Status>& status); /** - * Adds the bucket to a list of idle buckets to be expired at a later date + * Adds the bucket to a list of idle buckets to be expired at a later date. */ - void _markBucketIdle(Bucket* bucket); + void _markBucketIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket); /** * Remove the bucket from the list of idle buckets. The second parameter encodes whether the * caller holds a lock on _idleMutex. */ - void _markBucketNotIdle(Bucket* bucket, bool locked); + void _markBucketNotIdle(Stripe* stripe, WithLock stripeLock, Bucket* bucket); /** - * Verify the bucket is currently unused by taking a lock on it. Must hold exclusive lock from - * the outside for the result to be meaningful. + * Expires idle buckets until the bucket catalog's memory usage is below the expiry + * threshold. */ - void _verifyBucketIsUnused(Bucket* bucket) const; + void _expireIdleBuckets(Stripe* stripe, + WithLock stripeLock, + ExecutionStats* stats, + ClosedBuckets* closedBuckets); /** - * Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold. + * Allocates a new bucket and adds it to the catalog. */ - void _expireIdleBuckets(ExecutionStats* stats, ClosedBuckets* closedBuckets); - - std::size_t _numberOfIdleBuckets() const; + Bucket* _allocateBucket(Stripe* stripe, WithLock stripeLock, const CreationInfo& info); - // Allocate a new bucket (and ID) and add it to the catalog - Bucket* _allocateBucket(const BucketKey& key, - const Date_t& time, - const TimeseriesOptions& options, - ExecutionStats* stats, - ClosedBuckets* closedBuckets, - bool openedDuetoMetadata); + /** + * Close the existing, full bucket and open a new one for the same metadata. + * + * Writes information about the closed bucket to the 'info' parameter. + */ + Bucket* _rollover(Stripe* stripe, + WithLock stripeLock, + Bucket* bucket, + const CreationInfo& info); std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns); const std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns) const; /** + * Retreives the bucket state if it is tracked in the catalog. + */ + boost::optional<BucketState> _getBucketState(const OID& id) const; + + /** + * Initializes state for the given bucket to kNormal. + */ + void _initializeBucketState(const OID& id); + + /** + * Remove state for the given bucket from the catalog. + */ + void _eraseBucketState(const OID& id); + + /** * Changes the bucket state, taking into account the current state, the specified target state, * and allowed state transitions. The return value, if set, is the final state of the bucket * with the given id; if no such bucket exists, the return value will not be set. @@ -758,59 +491,23 @@ private: */ boost::optional<BucketState> _setBucketState(const OID& id, BucketState target); - /** - * You must hold a lock on _bucketMutex when accessing _allBuckets or _openBuckets. - * While holding a lock on _bucketMutex, you can take a lock on an individual bucket, then - * release _bucketMutex. Any iterators on the protected structures should be considered invalid - * once the lock is released. Any subsequent access to the structures requires relocking - * _bucketMutex. You must *not* be holding a lock on a bucket when you attempt to acquire the - * lock on _mutex, as this can result in deadlock. - * - * The StripedMutex class has both shared (read-only) and exclusive (write) locks. If you are - * going to write to any of the protected structures, you must hold an exclusive lock. - * - * Typically, if you want to acquire a bucket, you should use the BucketAccess RAII - * class to do so, as it will take care of most of this logic for you. Only use the _bucketMutex - * directly for more global maintenance where you want to take the lock once and interact with - * multiple buckets atomically. - */ - mutable StripedMutex _bucketMutex; - - // All buckets currently in the catalog, including buckets which are full but not yet committed. - stdx::unordered_map<OID, std::unique_ptr<Bucket>, OID::Hasher> _allBuckets; + static constexpr std::size_t kNumberOfStripes = 32; + std::array<Stripe, kNumberOfStripes> _stripes; - // The current open bucket for each namespace and metadata pair. - stdx::unordered_map<BucketKey, Bucket*, BucketHasher, BucketEq> _openBuckets; + mutable Mutex _mutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketCatalog::_mutex"); - // Bucket state - mutable Mutex _statesMutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "BucketCatalog::_statesMutex"); + // Bucket state for synchronization with direct writes, protected by '_mutex' stdx::unordered_map<OID, BucketState, OID::Hasher> _bucketStates; - // This mutex protects access to _idleBuckets - mutable Mutex _idleMutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "BucketCatalog::_idleMutex"); - - // Buckets that do not have any writers. - IdleList _idleBuckets; - - /** - * This mutex protects access to the _executionStats map. Once you complete your lookup, you - * can keep the shared_ptr to an individual namespace's stats object and release the lock. The - * object itself is thread-safe (using atomics). - */ - mutable StripedMutex _statsMutex; - - // Per-namespace execution stats. + // Per-namespace execution stats. This map is protected by '_mutex'. Once you complete your + // lookup, you can keep the shared_ptr to an individual namespace's stats object and release the + // lock. The object itself is thread-safe (using atomics). stdx::unordered_map<NamespaceString, std::shared_ptr<ExecutionStats>> _executionStats; - // A placeholder to be returned in case a namespace has no allocated statistics object - static const std::shared_ptr<ExecutionStats> kEmptyStats; - - // Counter for buckets created by the bucket catalog. - uint64_t _bucketNum = 0; - // Approximate memory usage of the bucket catalog. AtomicWord<uint64_t> _memoryUsage; + + class ServerStatus; }; } // namespace mongo diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index cb1028be245..e83f242df8f 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -235,7 +235,7 @@ void BucketCatalogTest::_testMeasurementSchema( } TEST_F(BucketCatalogTest, InsertIntoSameBucket) { - // The first insert should be able to take commit rights, but batch is still active + // The first insert should be able to take commit rights auto result1 = _bucketCatalog->insert(_opCtx, _ns1, @@ -245,7 +245,6 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto batch1 = result1.getValue().batch; ASSERT(batch1->claimCommitRights()); - ASSERT(batch1->active()); // A subsequent insert into the same bucket should land in the same batch, but not be able to // claim commit rights @@ -265,9 +264,8 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { _bucketCatalog->prepareCommit(batch1); - // Still not finished, but no longer active. + // Still not finished. ASSERT(!batch1->finished()); - ASSERT(!batch1->active()); // The batch should contain both documents since they belong in the same bucket and happened // in the same commit epoch. Nothing else has been committed in this bucket yet. @@ -292,9 +290,9 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) { .getValue() .batch; ASSERT(batch->claimCommitRights()); - auto bucketId = batch->bucketId(); + auto bucket = batch->bucket(); _bucketCatalog->abort(batch); - ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucketId)); + ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket)); } TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { @@ -327,10 +325,10 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketId())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()), - _bucketCatalog->getMetadata(result2.getValue().batch->bucketId())); - ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucketId()).isEmpty()); + _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); + ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucket()).isEmpty()); // Committing one bucket should only return the one document in that bucket and should not // affect the other bucket. @@ -360,9 +358,9 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) { // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketId())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucketId())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); } TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { @@ -391,11 +389,11 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { ASSERT_BSONOBJ_EQ( BSON(_metaField << BSONObj(BSON( "c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketId())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); ASSERT_BSONOBJ_EQ( BSON(_metaField << BSONObj(BSON( "c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucketId())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); } @@ -427,11 +425,11 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) { ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketId())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucketId())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); } TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) { @@ -455,8 +453,8 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) { // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL), - _bucketCatalog->getMetadata(result1.getValue().batch->bucketId())); - ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucketId()).isEmpty()); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); + ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucket()).isEmpty()); // Committing one bucket should only return the one document in that bucket and should not // affect the other bucket. @@ -537,18 +535,10 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto& batch = result.getValue().batch; _bucketCatalog->prepareCommit(batch); -} -DEATH_TEST_F(BucketCatalogTest, CannotFinishUnpreparedBatch, "invariant") { - auto result = _bucketCatalog->insert(_opCtx, - _ns1, - _getCollator(_ns1), - _getTimeseriesOptions(_ns1), - BSON(_timeField << Date_t::now()), - BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto& batch = result.getValue().batch; - ASSERT(batch->claimCommitRights()); - _bucketCatalog->finish(batch, {}); + // BucketCatalog::prepareCommit uses dassert, so it will only invariant in debug mode. Ensure we + // die here in non-debug mode as well. + invariant(kDebugBuild); } TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) { @@ -562,7 +552,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) { .getValue() .batch; - ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucketId())); + ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucket())); _commit(batch, 0); } @@ -577,7 +567,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); ASSERT(result.isOK()); auto batch = result.getValue().batch; - auto oldId = batch->bucketId(); + auto oldId = batch->bucket().id; _commit(batch, 0); ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON(); ASSERT(batch->newFieldNamesToBeInserted().count(_timeField)) << batch->toBSON(); @@ -633,7 +623,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto& batch2 = result2.getValue().batch; - ASSERT_NE(oldId, batch2->bucketId()); + ASSERT_NE(oldId, batch2->bucket().id); _commit(batch2, 0); ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON(); ASSERT(batch2->newFieldNamesToBeInserted().count(_timeField)) << batch2->toBSON(); @@ -738,7 +728,7 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) { ASSERT_EQ(batch->measurements().size(), 1); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); - ASSERT_THROWS(_bucketCatalog->clear(batch->bucketId()), WriteConflictException); + ASSERT_THROWS(_bucketCatalog->clear(batch->bucket().id), WriteConflictException); _bucketCatalog->abort(batch); ASSERT(batch->finished()); @@ -771,10 +761,10 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { .getValue() .batch; ASSERT_NE(batch1, batch2); - ASSERT_EQ(batch1->bucketId(), batch2->bucketId()); + ASSERT_EQ(batch1->bucket().id, batch2->bucket().id); // Now clear the bucket. Since there's a prepared batch it should conflict. - ASSERT_THROWS(_bucketCatalog->clear(batch1->bucketId()), WriteConflictException); + ASSERT_THROWS(_bucketCatalog->clear(batch1->bucket().id), WriteConflictException); // Now try to prepare the second batch. Ensure it aborts the batch. ASSERT(batch2->claimCommitRights()); @@ -783,7 +773,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); // Make sure we didn't clear the bucket state when we aborted the second batch. - ASSERT_THROWS(_bucketCatalog->clear(batch1->bucketId()), WriteConflictException); + ASSERT_THROWS(_bucketCatalog->clear(batch1->bucket().id), WriteConflictException); // Make sure a subsequent insert, which opens a new bucket, doesn't corrupt the old bucket // state and prevent us from finishing the first batch. @@ -798,7 +788,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { .batch; ASSERT_NE(batch1, batch3); ASSERT_NE(batch2, batch3); - ASSERT_NE(batch1->bucketId(), batch3->bucketId()); + ASSERT_NE(batch1->bucket().id, batch3->bucket().id); // Clean up this batch ASSERT(batch3->claimCommitRights()); _bucketCatalog->abort(batch3); |