/** * Copyright (C) 2023-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/timeseries/timeseries_write_util.h" #include "mongo/db/catalog/collection_write_path.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/curop.h" #include "mongo/db/ops/write_ops_exec_util.h" #include "mongo/db/query/collection_query_info.h" #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.h" #include "mongo/db/timeseries/bucket_catalog/bucket_catalog_internal.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/update/document_diff_applier.h" #include "mongo/db/update/update_oplog_entry_serialization.h" namespace mongo::timeseries { namespace { // Builds the data field of a bucket document. Computes the min and max fields if necessary. boost::optional processTimeseriesMeasurements( const std::vector& measurements, const BSONObj& metadata, StringDataMap& dataBuilders, const boost::optional& options = boost::none, const boost::optional& comparator = boost::none) { bucket_catalog::MinMax minmax; bool computeMinmax = options && comparator; auto metadataElem = metadata.firstElement(); boost::optional metaFieldName; if (metadataElem) { metaFieldName = metadataElem.fieldNameStringData(); } DecimalCounter count; for (const auto& doc : measurements) { if (computeMinmax) { minmax.update(doc, metaFieldName, *comparator); } for (const auto& elem : doc) { auto key = elem.fieldNameStringData(); if (key == metaFieldName) { continue; } dataBuilders[key].appendAs(elem, count); } ++count; } // Rounds the minimum timestamp and updates the min time field. if (computeMinmax) { auto minTime = roundTimestampToGranularity( minmax.min().getField(options->getTimeField()).Date(), *options); auto controlDoc = bucket_catalog::buildControlMinTimestampDoc(options->getTimeField(), minTime); minmax.update(controlDoc, /*metaField=*/boost::none, *comparator); return minmax; } return boost::none; } // Builds a complete and new bucket document. BSONObj makeNewDocument(const OID& bucketId, const BSONObj& metadata, const BSONObj& min, const BSONObj& max, StringDataMap& dataBuilders) { auto metadataElem = metadata.firstElement(); BSONObjBuilder builder; builder.append("_id", bucketId); { BSONObjBuilder bucketControlBuilder(builder.subobjStart("control")); bucketControlBuilder.append(kBucketControlVersionFieldName, kTimeseriesControlUncompressedVersion); bucketControlBuilder.append(kBucketControlMinFieldName, min); bucketControlBuilder.append(kBucketControlMaxFieldName, max); } if (metadataElem) { builder.appendAs(metadataElem, kBucketMetaFieldName); } { BSONObjBuilder bucketDataBuilder(builder.subobjStart(kBucketDataFieldName)); for (auto& dataBuilder : dataBuilders) { bucketDataBuilder.append(dataBuilder.first, dataBuilder.second.obj()); } } return builder.obj(); } } // namespace BSONObj makeNewDocumentForWrite(std::shared_ptr batch, const BSONObj& metadata) { StringDataMap dataBuilders; processTimeseriesMeasurements( {batch->measurements.begin(), batch->measurements.end()}, metadata, dataBuilders); return makeNewDocument( batch->bucketHandle.bucketId.oid, metadata, batch->min, batch->max, dataBuilders); } BSONObj makeNewDocumentForWrite( const OID& bucketId, const std::vector& measurements, const BSONObj& metadata, const boost::optional& options, const boost::optional& comparator) { StringDataMap dataBuilders; auto minmax = processTimeseriesMeasurements(measurements, metadata, dataBuilders, options, comparator); invariant(minmax); return makeNewDocument(bucketId, metadata, minmax->min(), minmax->max(), dataBuilders); } std::vector makeInsertsToNewBuckets( const std::vector& measurements, const NamespaceString& nss, const TimeseriesOptions& options, const StringData::ComparatorInterface* comparator) { std::vector insertOps; for (const auto& measurement : measurements) { auto res = uassertStatusOK(bucket_catalog::internal::extractBucketingParameters( nss, comparator, options, measurement)); auto time = res.second; auto [oid, _] = bucket_catalog::internal::generateBucketOID(time, options); insertOps.push_back( {nss, {makeNewDocumentForWrite( oid, {measurement}, res.first.metadata.toBSON(), options, comparator)}}); } return insertOps; } stdx::variant makeModificationOp( const OID& bucketId, const CollectionPtr& coll, const std::vector& measurements) { if (measurements.empty()) { write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId), false); write_ops::DeleteCommandRequest op(coll->ns(), {deleteEntry}); return op; } auto timeseriesOptions = coll->getTimeseriesOptions(); auto metaFieldName = timeseriesOptions->getMetaField(); auto metadata = [&] { if (!metaFieldName) { // Collection has no metadata field. return BSONObj(); } // Look for the metadata field on this bucket and return it if present. auto metaField = measurements[0].getField(*metaFieldName); return metaField ? metaField.wrap() : BSONObj(); }(); auto replaceBucket = timeseries::makeNewDocumentForWrite( bucketId, measurements, metadata, timeseriesOptions, coll->getDefaultCollator()); write_ops::UpdateModification u(replaceBucket); write_ops::UpdateOpEntry updateEntry(BSON("_id" << bucketId), std::move(u)); write_ops::UpdateCommandRequest op(coll->ns(), {updateEntry}); return op; } void performAtomicWrites(OperationContext* opCtx, const CollectionPtr& coll, const RecordId& recordId, const stdx::variant& modificationOp, const std::vector& insertOps, bool fromMigrate, StmtId stmtId) { NamespaceString ns = coll->ns(); DisableDocumentValidation disableDocumentValidation{opCtx}; write_ops_exec::LastOpFixer lastOpFixer{opCtx}; lastOpFixer.startingOp(ns); auto curOp = CurOp::get(opCtx); curOp->raiseDbProfileLevel(CollectionCatalog::get(opCtx)->getDatabaseProfileLevel(ns.dbName())); write_ops_exec::assertCanWrite_inlock(opCtx, ns); // Groups all operations in one or several chained oplog entries to ensure the writes are // replicated atomically. // TODO(SERVER-76432): Handle the updateOne case for retryable writes. auto groupOplogEntries = !opCtx->getTxnNumber() && !insertOps.empty(); WriteUnitOfWork wuow{opCtx, groupOplogEntries}; stdx::visit( OverloadedVisitor{ [&](const write_ops::UpdateCommandRequest& updateOp) { invariant(updateOp.getUpdates().size() == 1); auto& update = updateOp.getUpdates().front(); invariant(coll->isClustered()); auto original = coll->docFor(opCtx, recordId); CollectionUpdateArgs args{original.value()}; args.criteria = update.getQ(); args.stmtIds = {stmtId}; if (fromMigrate) { args.source = OperationSource::kFromMigrate; } BSONObj diffFromUpdate; const BSONObj* diffOnIndexes = collection_internal::kUpdateAllIndexes; // Assume all indexes are affected. // Overwrites the original bucket. invariant(update.getU().type() == write_ops::UpdateModification::Type::kReplacement); auto updated = update.getU().getUpdateReplacement(); args.update = update_oplog_entry::makeReplacementOplogEntry(updated); collection_internal::updateDocument(opCtx, coll, recordId, original, updated, diffOnIndexes, nullptr /* indexesAffected */, &curOp->debug(), &args); }, [&](const write_ops::DeleteCommandRequest& deleteOp) { invariant(deleteOp.getDeletes().size() == 1); auto deleteId = record_id_helpers::keyForOID(deleteOp.getDeletes().front().getQ()["_id"].OID()); invariant(recordId == deleteId); collection_internal::deleteDocument( opCtx, coll, stmtId, recordId, &curOp->debug(), fromMigrate); }}, modificationOp); if (!insertOps.empty()) { std::vector insertStatements; for (auto& op : insertOps) { invariant(op.getDocuments().size() == 1); insertStatements.emplace_back(op.getDocuments().front()); } uassertStatusOK(collection_internal::insertDocuments( opCtx, coll, insertStatements.begin(), insertStatements.end(), &curOp->debug())); } wuow.commit(); lastOpFixer.finishedOpSuccessfully(); } } // namespace mongo::timeseries