diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2020-11-25 16:52:41 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-25 23:03:46 +0000 |
commit | 82aea1d428e3a06994d6624464b74b92e47eae2d (patch) | |
tree | a8112379aafd05ba6e581d22d4d5584b0cb7b8e9 /src/mongo/db | |
parent | a194505325087b1e841fdee55c51312a042ce9d2 (diff) | |
download | mongo-82aea1d428e3a06994d6624464b74b92e47eae2d.tar.gz |
SERVER-52523 Implement in-memory time-series bucket catalog
Diffstat (limited to 'src/mongo/db')
23 files changed, 989 insertions, 234 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 3a45337f310..31d9b2a99a4 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -40,6 +40,7 @@ env.SConscript( 'sorter', 'stats', 'storage', + 'timeseries', 'update', 'views', ], diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 2e7152c8a7d..19f23c6310d 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -438,6 +438,7 @@ env.Library( '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/server_options_core', + '$BUILD_DIR/mongo/db/timeseries/bucket_catalog', '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/db/write_ops', 'collection', diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp index a69f55633b2..ef867c99730 100644 --- a/src/mongo/db/catalog/create_collection.cpp +++ b/src/mongo/db/catalog/create_collection.cpp @@ -206,7 +206,7 @@ Status _createTimeseries(OperationContext* opCtx, // If the buckets collection and time-series view creation roll back, ensure that their Top // entries are deleted. opCtx->recoveryUnit()->onRollback( - [serviceContext = opCtx->getServiceContext(), &ns, &bucketsNs]() { + [serviceContext = opCtx->getServiceContext(), ns, bucketsNs]() { Top::get(serviceContext).collectionDropped(ns); Top::get(serviceContext).collectionDropped(bucketsNs); }); diff --git a/src/mongo/db/catalog/database_impl.cpp b/src/mongo/db/catalog/database_impl.cpp index 8167dd2c1a9..6ebf6dc207b 100644 --- a/src/mongo/db/catalog/database_impl.cpp +++ b/src/mongo/db/catalog/database_impl.cpp @@ -619,7 +619,7 @@ Status DatabaseImpl::createView(OperationContext* opCtx, str::stream() << "invalid namespace name for a view: " + viewName.toString()}; } else { status = ViewCatalog::get(this)->createView( - opCtx, viewName, viewOnNss, pipeline, options.collation); + opCtx, viewName, viewOnNss, pipeline, options.collation, options.timeseries); } audit::logCreateView(&cc(), viewName.toString(), viewOnNss.toString(), pipeline, status.code()); diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp index 2642b81bc50..21264f369cf 100644 --- a/src/mongo/db/catalog/drop_collection.cpp +++ b/src/mongo/db/catalog/drop_collection.cpp @@ -44,6 +44,7 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" +#include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/db/views/view_catalog.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" @@ -70,7 +71,8 @@ Status _checkNssAndReplState(OperationContext* opCtx, const CollectionPtr& coll) Status _dropView(OperationContext* opCtx, Database* db, const NamespaceString& collectionName, - BSONObjBuilder* result) { + BSONObjBuilder* result, + bool clearBucketCatalog = false) { if (!db) { return Status(ErrorCodes::NamespaceNotFound, "ns not found"); } @@ -114,6 +116,10 @@ Status _dropView(OperationContext* opCtx, } wunit.commit(); + if (clearBucketCatalog) { + BucketCatalog::get(opCtx).clear(collectionName); + } + result->append("ns", collectionName.ns()); return Status::OK(); } @@ -322,7 +328,7 @@ Status dropCollection(OperationContext* opCtx, return Status(ErrorCodes::NamespaceNotFound, "ns not found"); } - if (!view->isTimeseries()) { + if (!view->timeseries()) { return _dropView(opCtx, db, collectionName, &result); } @@ -331,15 +337,14 @@ Status dropCollection(OperationContext* opCtx, std::move(autoDb), view->viewOn(), [opCtx, &collectionName, &result](Database* db, const NamespaceString& bucketsNs) { - WriteUnitOfWork wuow(opCtx); - auto status = _dropView(opCtx, db, collectionName, &result); + auto status = _dropView( + opCtx, db, collectionName, &result, true /* clearBucketCatalog */); if (!status.isOK()) { return status; } - wuow.commit(); - // Drop the buckets collection in its own writeConflictRetry so that - // if it throws a WCE, only the buckets collection drop is retried. + // Drop the buckets collection in its own writeConflictRetry so that if it + // throws a WCE, only the buckets collection drop is retried. writeConflictRetry(opCtx, "drop", bucketsNs.ns(), [opCtx, db, &bucketsNs] { WriteUnitOfWork wuow(opCtx); db->dropCollectionEvenIfSystem(opCtx, bucketsNs).ignore(); diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp index 3a177f3f907..cbb84773fd6 100644 --- a/src/mongo/db/catalog/drop_database.cpp +++ b/src/mongo/db/catalog/drop_database.cpp @@ -45,6 +45,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/service_context.h" +#include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/db/write_concern_options.h" #include "mongo/logv2/log.h" #include "mongo/util/duration.h" @@ -114,6 +115,8 @@ void _finishDropDatabase(OperationContext* opCtx, databaseHolder->dropDb(opCtx, db); dropPendingGuard.dismiss(); + BucketCatalog::get(opCtx).clear(dbName); + LOGV2(20336, "dropDatabase {dbName} - finished, dropped {numCollections} collection(s)", "dropDatabase", diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 5f61a1dc421..c00b230299b 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -270,6 +270,9 @@ env.Library( 'create.idl', 'create_command_validation.cpp', ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/timeseries/timeseries_idl', + ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/idl/idl_parser', @@ -337,6 +340,7 @@ env.Library( '$BUILD_DIR/mongo/db/stats/server_read_concern_write_concern_metrics', '$BUILD_DIR/mongo/db/storage/storage_engine_common', "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl", + '$BUILD_DIR/mongo/db/timeseries/bucket_catalog', '$BUILD_DIR/mongo/db/transaction', '$BUILD_DIR/mongo/db/views/views_mongod', '$BUILD_DIR/mongo/idl/feature_flag', diff --git a/src/mongo/db/commands/create.idl b/src/mongo/db/commands/create.idl index 5c4fdfef520..8515c2cf023 100644 --- a/src/mongo/db/commands/create.idl +++ b/src/mongo/db/commands/create.idl @@ -33,6 +33,7 @@ global: imports: - "mongo/idl/basic_types.idl" + - "mongo/db/timeseries/timeseries.idl" structs: IndexOptionDefaults: @@ -46,27 +47,6 @@ structs: validator: callback: create_command_validation::validateStorageEngineOptions - TimeseriesOptions: - description: "The options that define a time-series collection." - strict: true - fields: - timeField: - description: "The name of the field to be used for time. Inserted documents must - have this field, and the field must be of the BSON UTC datetime type - (0x9)" - type: string - metaField: - description: "The name of the field describing the series. This field is used to - group related data and may be of any BSON type. This may not be - \"_id\" or the same as 'timeField'." - type: string - optional: true - expireAfterSeconds: - description: "The number of seconds after which old time-series data should be - deleted." - type: long - optional: true - commands: create: description: "Parser for the 'create' Command" diff --git a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp index bca6a26e20b..92eb7526073 100644 --- a/src/mongo/db/commands/set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/commands/set_feature_compatibility_version_command.cpp @@ -264,7 +264,6 @@ public: } else { // Time-series collections are only supported in 5.0. If the user tries to downgrade the // cluster to an earlier version, they must first remove all time-series collections. - // TODO(SERVER-52523): Use the bucket catalog to detect time-series collections. for (const auto& dbName : DatabaseHolder::get(opCtx)->getNames()) { auto viewCatalog = DatabaseHolder::get(opCtx)->getSharedViewCatalog(opCtx, dbName); if (!viewCatalog) { @@ -277,7 +276,7 @@ public: "collections present; drop all time-series collections before " "downgrading. First detected time-series collection: " << view.name(), - !view.isTimeseries()); + !view.timeseries()); }); } diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 8b53fe99b84..a17dd90b683 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -59,6 +59,7 @@ #include "mongo/db/repl/tenant_migration_conflict_info.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/duplicate_key_error_info.h" +#include "mongo/db/timeseries/bucket_catalog.h" #include "mongo/db/views/view_catalog.h" #include "mongo/db/write_concern.h" #include "mongo/s/stale_exception.h" @@ -106,81 +107,80 @@ bool isTimeseries(OperationContext* opCtx, const NamespaceString& ns) { return false; } - return view->isTimeseries(); + return view->timeseries().has_value(); } // Default for control.version in time-series bucket collection. const int kTimeseriesControlVersion = 1; -const int kTimeseriesBucketMaxCount = 1000; -const int kTimeseriesBucketMaxSizeKB = 125; -const Hours kTimeseriesBucketMaxTimeRange(1); /** * Returns min/max $set expressions for the bucket's control field. */ -BSONObj makeTimeseriesControlMinMaxStages(const BSONObj& doc) { +BSONObj makeTimeseriesControlMinMaxStages(const std::vector<BSONObj>& docs) { + struct MinMaxBuilders { + BSONArrayBuilder min; + BSONArrayBuilder max; + }; + StringDataMap<MinMaxBuilders> minMaxBuilders; + + for (const auto& doc : docs) { + for (const auto& elem : doc) { + auto key = elem.fieldNameStringData(); + auto [it, created] = minMaxBuilders.insert({key, MinMaxBuilders{}}); + if (created) { + it->second.min.append("$control.min." + key); + it->second.max.append("$control.max." + key); + } + it->second.min.append(elem); + it->second.max.append(elem); + } + } + BSONObjBuilder builder; - for (const auto& elem : doc) { - auto key = elem.fieldNameStringData(); - builder.append("control.min." + key, - BSON("$min" << BSON_ARRAY(("$control.min." + key) << elem))); - builder.append("control.max." + key, - BSON("$max" << BSON_ARRAY(("$control.max." + key) << elem))); + for (auto& builders : minMaxBuilders) { + builder.append("control.min." + builders.first, BSON("$min" << builders.second.min.arr())); + builder.append("control.max." + builders.first, BSON("$max" << builders.second.max.arr())); } + return builder.obj(); } /** * Returns $set expressions for the bucket's data field. */ -BSONObj makeTimeseriesDataStages(const BSONObj& doc) { +BSONObj makeTimeseriesDataStages(const std::vector<BSONObj>& docs, uint16_t count) { + StringDataMap<BSONArrayBuilder> measurements; + for (const auto& doc : docs) { + for (const auto& elem : doc) { + auto key = elem.fieldNameStringData(); + measurements[key].append( + BSON("k" << std::to_string(count) << elem.wrap("v").firstElement())); + } + count++; + } + BSONObjBuilder builder; - for (const auto& elem : doc) { - auto key = elem.fieldNameStringData(); + for (auto& field : measurements) { builder.append( - "data." + key, + "data." + field.first, BSON("$arrayToObject" << BSON( "$setUnion" << BSON_ARRAY( - BSON("$objectToArray" - << BSON("$ifNull" << BSON_ARRAY(("$data." + key) << BSONObj()))) - << BSON_ARRAY(BSON( - "k" << BSON("$toString" - << BSON("$ifNull" << BSON_ARRAY("$control.count" << 0))) - << elem.wrap("v").firstElement())))))); + BSON("$objectToArray" << BSON( + "$ifNull" << BSON_ARRAY(("$data." + field.first) << BSONObj()))) + << field.second.arr())))); } + return builder.obj(); } /** * Transforms a single time-series insert to an upsert request. */ -BSONObj makeTimeseriesUpsertRequest(const BSONObj& doc) { +BSONObj makeTimeseriesUpsertRequest(const OID& oid, + const std::vector<BSONObj>& docs, + uint16_t count) { BSONObjBuilder builder; - // TODO(SERVER-52523): Obtain _id of bucket to update and name of time field from in-memory - // catalog. - const auto timeField = "time"_sd; - { - BSONObjBuilder queryBuilder(builder.subobjStart(write_ops::UpdateOpEntry::kQFieldName)); - BSONArrayBuilder andBuilder(queryBuilder.subarrayStart("$and")); - // Each bucket can hold up to 'kTimeseriesBucketMaxCount' measurements. - andBuilder.append(BSON(std::string(str::stream() << "data." << timeField << "." - << (kTimeseriesBucketMaxCount - 1)) - << BSON("$exists" << false))); - // The total size of measurements in a bucket cannot exceed 'kTimeseriesBucketMaxSizeKB'. - // Ideally, we would use the following expression to avoid relying on 'control.size': - // {$expr: {$lte: [{$bsonSize: '$data'}, (bucketMaxSizeKB * 1024 - doc.objsize())]}} - // but $expr is not allowed in an upsert. See SERVER-30731. - andBuilder.append(BSON( - "control.size" << BSON("$lte" << (kTimeseriesBucketMaxSizeKB * 1024 - doc.objsize())))); - // The maximum time-range of a bucket is limited, so index scans looking for buckets - // containing a time T only need to consider buckets that are newer than - // T - 'kTimeseriesBucketMaxTimeRange'. - auto docTime = doc[timeField].Date(); - const std::string minTimeFieldName = str::stream() << "control.min." << timeField; - andBuilder.append(BSON(minTimeFieldName << BSON("$lte" << docTime))); - andBuilder.append( - BSON(minTimeFieldName << BSON("$gte" << (docTime - kTimeseriesBucketMaxTimeRange)))); - } + builder.append(write_ops::UpdateOpEntry::kQFieldName, BSON("_id" << oid)); builder.append(write_ops::UpdateOpEntry::kMultiFieldName, false); builder.append(write_ops::UpdateOpEntry::kUpsertFieldName, true); { @@ -190,23 +190,96 @@ BSONObj makeTimeseriesUpsertRequest(const BSONObj& doc) { BSON("$set" << BSON("control.version" << BSON("$ifNull" << BSON_ARRAY("$control.version" << kTimeseriesControlVersion))))); - stagesBuilder.append(BSON( - "$set" << BSON("control.size" - << BSON("$sum" - << BSON_ARRAY(BSON("$ifNull" << BSON_ARRAY("$control.size" << 0)) - << doc.objsize()))))); - stagesBuilder.append(BSON("$set" << makeTimeseriesControlMinMaxStages(doc))); - stagesBuilder.append(BSON("$set" << makeTimeseriesDataStages(doc))); - // Update 'control.count' last because it is referenced in preceding $set stages in this - // aggregation pipeline. - stagesBuilder.append(BSON( - "$set" << BSON("control.count" << BSON( - "$sum" << BSON_ARRAY( - BSON("$ifNull" << BSON_ARRAY("$control.count" << 0)) << 1))))); + stagesBuilder.append(BSON("$set" << makeTimeseriesControlMinMaxStages(docs))); + stagesBuilder.append(BSON("$set" << makeTimeseriesDataStages(docs, count))); } return builder.obj(); } +void appendOpTime(const repl::OpTime& opTime, BSONObjBuilder* out) { + if (opTime.getTerm() == repl::OpTime::kUninitializedTerm) { + out->append("opTime", opTime.getTimestamp()); + } else { + opTime.append(out, "opTime"); + } +} + +boost::optional<BSONObj> generateError(OperationContext* opCtx, + const StatusWith<SingleWriteResult>& result, + int index, + size_t numErrors) { + auto status = result.getStatus(); + if (status.isOK()) { + return boost::none; + } + + auto errorMessage = [numErrors, errorSize = size_t(0)](StringData rawMessage) mutable { + // Start truncating error messages once both of these limits are exceeded. + constexpr size_t kErrorSizeTruncationMin = 1024 * 1024; + constexpr size_t kErrorCountTruncationMin = 2; + if (errorSize >= kErrorSizeTruncationMin && numErrors >= kErrorCountTruncationMin) { + return ""_sd; + } + + errorSize += rawMessage.size(); + return rawMessage; + }; + + BSONSizeTracker errorsSizeTracker; + BSONObjBuilder error(errorsSizeTracker); + error.append("index", index); + if (auto staleInfo = status.extraInfo<StaleConfigInfo>()) { + error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception! + { + BSONObjBuilder errInfo(error.subobjStart("errInfo")); + staleInfo->serialize(&errInfo); + } + } else if (ErrorCodes::DocumentValidationFailure == status.code() && status.extraInfo()) { + auto docValidationError = + status.extraInfo<doc_validation_error::DocumentValidationFailureInfo>(); + error.append("code", static_cast<int>(ErrorCodes::DocumentValidationFailure)); + error.append("errInfo", docValidationError->getDetails()); + } else if (ErrorCodes::isTenantMigrationError(status.code())) { + if (ErrorCodes::TenantMigrationConflict == status.code()) { + auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>(); + + hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx); + + auto mtab = migrationConflictInfo->getTenantMigrationAccessBlocker(); + + auto migrationStatus = mtab->waitUntilCommittedOrAborted(opCtx); + error.append("code", static_cast<int>(migrationStatus.code())); + + // We want to append an empty errmsg for the errors after the first one, so let the + // code below that appends errmsg do that. + if (status.reason() != "") { + error.append("errmsg", errorMessage(migrationStatus.reason())); + } + if (migrationStatus.extraInfo()) { + error.append("errInfo", + migrationStatus.extraInfo<TenantMigrationCommittedInfo>()->toBSON()); + } + } else { + error.append("code", int(status.code())); + if (status.extraInfo()) { + error.append("errInfo", status.extraInfo<TenantMigrationCommittedInfo>()->toBSON()); + } + } + } else { + error.append("code", int(status.code())); + if (auto const extraInfo = status.extraInfo()) { + extraInfo->serialize(&error); + } + } + + // Skip appending errmsg if it has already been appended like in the case of + // TenantMigrationConflict. + if (!error.hasField("errmsg")) { + error.append("errmsg", errorMessage(status.reason())); + } + return error.obj(); +} + enum class ReplyStyle { kUpdate, kNotUpdate }; // update has extra fields. void serializeReply(OperationContext* opCtx, ReplyStyle replyStyle, @@ -238,91 +311,24 @@ void serializeReply(OperationContext* opCtx, std::vector<BSONObj> upsertInfo; std::vector<BSONObj> errors; BSONSizeTracker upsertInfoSizeTracker; - BSONSizeTracker errorsSizeTracker; - - auto errorMessage = [&, errorSize = size_t(0)](StringData rawMessage) mutable { - // Start truncating error messages once both of these limits are exceeded. - constexpr size_t kErrorSizeTruncationMin = 1024 * 1024; - constexpr size_t kErrorCountTruncationMin = 2; - if (errorSize >= kErrorSizeTruncationMin && errors.size() >= kErrorCountTruncationMin) { - return ""_sd; - } - - errorSize += rawMessage.size(); - return rawMessage; - }; for (size_t i = 0; i < result.results.size(); i++) { - if (result.results[i].isOK()) { - const auto& opResult = result.results[i].getValue(); - nVal += opResult.getN(); // Always there. - if (replyStyle == ReplyStyle::kUpdate) { - nModified += opResult.getNModified(); - if (auto idElement = opResult.getUpsertedId().firstElement()) { - BSONObjBuilder upsertedId(upsertInfoSizeTracker); - upsertedId.append("index", int(i)); - upsertedId.appendAs(idElement, "_id"); - upsertInfo.push_back(upsertedId.obj()); - } - } + if (auto error = generateError(opCtx, result.results[i], i, errors.size())) { + errors.push_back(*error); continue; } - const auto& status = result.results[i].getStatus(); - BSONObjBuilder error(errorsSizeTracker); - error.append("index", int(i)); - if (auto staleInfo = status.extraInfo<StaleConfigInfo>()) { - error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception! - { - BSONObjBuilder errInfo(error.subobjStart("errInfo")); - staleInfo->serialize(&errInfo); - } - } else if (ErrorCodes::DocumentValidationFailure == status.code() && status.extraInfo()) { - auto docValidationError = - status.extraInfo<doc_validation_error::DocumentValidationFailureInfo>(); - error.append("code", static_cast<int>(ErrorCodes::DocumentValidationFailure)); - error.append("errInfo", docValidationError->getDetails()); - } else if (ErrorCodes::isTenantMigrationError(status.code())) { - if (ErrorCodes::TenantMigrationConflict == status.code()) { - auto migrationConflictInfo = status.extraInfo<TenantMigrationConflictInfo>(); - - hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx); - - auto mtab = migrationConflictInfo->getTenantMigrationAccessBlocker(); - - auto migrationStatus = mtab->waitUntilCommittedOrAborted(opCtx); - error.append("code", static_cast<int>(migrationStatus.code())); - - // We want to append an empty errmsg for the errors after the first one, so let the - // code below that appends errmsg do that. - if (status.reason() != "") { - error.append("errmsg", errorMessage(migrationStatus.reason())); - } - if (migrationStatus.extraInfo()) { - error.append( - "errInfo", - migrationStatus.extraInfo<TenantMigrationCommittedInfo>()->toBSON()); - } - } else { - error.append("code", int(status.code())); - if (status.extraInfo()) { - error.append("errInfo", - status.extraInfo<TenantMigrationCommittedInfo>()->toBSON()); - } + const auto& opResult = result.results[i].getValue(); + nVal += opResult.getN(); // Always there. + if (replyStyle == ReplyStyle::kUpdate) { + nModified += opResult.getNModified(); + if (auto idElement = opResult.getUpsertedId().firstElement()) { + BSONObjBuilder upsertedId(upsertInfoSizeTracker); + upsertedId.append("index", int(i)); + upsertedId.appendAs(idElement, "_id"); + upsertInfo.push_back(upsertedId.obj()); } - } else { - error.append("code", int(status.code())); - if (auto const extraInfo = status.extraInfo()) { - extraInfo->serialize(&error); - } - } - - // Skip appending errmsg if it has already been appended like in the case of - // TenantMigrationConflict. - if (!error.hasField("errmsg")) { - error.append("errmsg", errorMessage(status.reason())); } - errors.push_back(error.obj()); } out->appendNumber("n", nVal); @@ -345,12 +351,7 @@ void serializeReply(OperationContext* opCtx, auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); const auto replMode = replCoord->getReplicationMode(); if (replMode != repl::ReplicationCoordinator::modeNone) { - const auto lastOp = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - if (lastOp.getTerm() == repl::OpTime::kUninitializedTerm) { - out->append("opTime", lastOp.getTimestamp()); - } else { - lastOp.append(out, "opTime"); - } + appendOpTime(repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), out); if (replMode == repl::ReplicationCoordinator::modeReplSet) { out->append("electionId", replCoord->getElectionId()); @@ -468,39 +469,102 @@ private: /** * Writes to the underlying system.buckets collection. */ - void _performTimeseriesWrites(OperationContext* opCtx, BSONObjBuilder& result) const { + void _performTimeseriesWrites(OperationContext* opCtx, BSONObjBuilder* result) const { auto ns = _batch.getNamespace(); auto bucketsNs = ns.makeTimeseriesBucketsNamespace(); - BSONObjBuilder builder; - builder.append(write_ops::Update::kCommandName, bucketsNs.coll()); - // The schema validation configured in the bucket collection is intended for direct - // operations by end users and is not applicable here. - builder.append(write_ops::Update::kBypassDocumentValidationFieldName, true); - builder.append(write_ops::Update::kOrderedFieldName, _batch.getOrdered()); - if (auto stmtId = _batch.getStmtId()) { - builder.append(write_ops::Update::kStmtIdFieldName, *stmtId); - } else if (auto stmtIds = _batch.getStmtIds()) { - builder.append(write_ops::Update::kStmtIdsFieldName, *stmtIds); + auto& bucketCatalog = BucketCatalog::get(opCtx); + std::vector<std::pair<OID, size_t>> bucketsToCommit; + std::vector<std::pair<Future<BucketCatalog::CommitInfo>, size_t>> bucketsToWaitOn; + for (size_t i = 0; i < _batch.getDocuments().size(); i++) { + auto [bucketId, commitInfo] = + bucketCatalog.insert(opCtx, ns, _batch.getDocuments()[i]); + if (commitInfo) { + bucketsToWaitOn.push_back({std::move(*commitInfo), i}); + } else { + bucketsToCommit.push_back({std::move(bucketId), i}); + } } - { - BSONArrayBuilder updatesBuilder( - builder.subarrayStart(write_ops::Update::kUpdatesFieldName)); - for (const auto& doc : _batch.getDocuments()) { - updatesBuilder.append(makeTimeseriesUpsertRequest(doc)); + + std::vector<BSONObj> errors; + boost::optional<repl::OpTime> opTime; + boost::optional<OID> electionId; + + for (const auto& [bucketId, index] : bucketsToCommit) { + auto data = bucketCatalog.commit(bucketId); + while (!data.docs.empty()) { + BSONObjBuilder builder; + builder.append(write_ops::Update::kCommandName, bucketsNs.coll()); + // The schema validation configured in the bucket collection is intended for + // direct operations by end users and is not applicable here. + builder.append(write_ops::Update::kBypassDocumentValidationFieldName, true); + builder.append(write_ops::Update::kOrderedFieldName, _batch.getOrdered()); + if (auto stmtId = _batch.getStmtId()) { + builder.append(write_ops::Update::kStmtIdFieldName, *stmtId); + } else if (auto stmtIds = _batch.getStmtIds()) { + builder.append(write_ops::Update::kStmtIdsFieldName, *stmtIds); + } + + { + BSONArrayBuilder updatesBuilder( + builder.subarrayStart(write_ops::Update::kUpdatesFieldName)); + updatesBuilder.append(makeTimeseriesUpsertRequest( + bucketId, data.docs, data.numCommittedMeasurements)); + } + + auto request = OpMsgRequest::fromDBAndBody(bucketsNs.db(), builder.obj()); + auto timeseriesUpsertBatch = UpdateOp::parse(request); + auto reply = write_ops_exec::performUpdates(opCtx, timeseriesUpsertBatch); + + invariant(reply.results.size() == 1, + str::stream() + << "Unexpected number of results (" << reply.results.size() + << ") for insert on time-series collection " << ns); + + if (auto error = generateError(opCtx, reply.results[0], index, errors.size())) { + errors.push_back(*error); + } + + auto* replCoord = repl::ReplicationCoordinator::get(opCtx->getServiceContext()); + const auto replMode = replCoord->getReplicationMode(); + + opTime = replMode != repl::ReplicationCoordinator::modeNone + ? boost::make_optional( + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp()) + : boost::none; + electionId = replMode == repl::ReplicationCoordinator::modeReplSet + ? boost::make_optional(replCoord->getElectionId()) + : boost::none; + + data = bucketCatalog.commit( + bucketId, + BucketCatalog::CommitInfo{std::move(reply.results[0]), opTime, electionId}); } } - auto request = OpMsgRequest::fromDBAndBody(bucketsNs.db(), builder.obj()); - auto timeseriesUpsertBatch = UpdateOp::parse(request); + for (const auto& [future, index] : bucketsToWaitOn) { + auto commitInfo = future.get(opCtx); + if (auto error = generateError(opCtx, commitInfo.result, index, errors.size())) { + errors.push_back(*error); + } + if (commitInfo.opTime) { + opTime = std::max(opTime.value_or(repl::OpTime()), *commitInfo.opTime); + } + if (commitInfo.electionId) { + electionId = std::max(electionId.value_or(OID()), *commitInfo.electionId); + } + } - auto reply = write_ops_exec::performUpdates(opCtx, timeseriesUpsertBatch); - serializeReply(opCtx, - ReplyStyle::kUpdate, - !timeseriesUpsertBatch.getWriteCommandBase().getOrdered(), - timeseriesUpsertBatch.getUpdates().size(), - std::move(reply), - &result); + result->appendNumber("n", _batch.getDocuments().size() - errors.size()); + if (!errors.empty()) { + result->append("writeErrors", errors); + } + if (opTime) { + appendOpTime(*opTime, result); + } + if (electionId) { + result->append("electionId", *electionId); + } } void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { @@ -508,7 +572,7 @@ private: // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's // constructor. try { - _performTimeseriesWrites(opCtx, result); + _performTimeseriesWrites(opCtx, &result); } catch (DBException& ex) { ex.addContext(str::stream() << "time-series insert failed: " << ns().ns()); throw; diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript new file mode 100644 index 00000000000..dc89606b46b --- /dev/null +++ b/src/mongo/db/timeseries/SConscript @@ -0,0 +1,38 @@ +# -*- mode: python -*- + +Import("env") + +env = env.Clone() + +env.Library( + target='timeseries_idl', + source=[ + 'timeseries.idl', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/idl/idl_parser', + ], +) + +env.Library( + target='bucket_catalog', + source=[ + 'bucket_catalog.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/database_holder', + '$BUILD_DIR/mongo/db/views/views', + 'timeseries_idl', + ], +) + +env.CppUnitTest( + target='bucket_catalog_test', + source=[ + 'bucket_catalog_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/catalog/catalog_test_fixture', + 'bucket_catalog', + ], +) diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp new file mode 100644 index 00000000000..5fde11b5772 --- /dev/null +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -0,0 +1,223 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/timeseries/bucket_catalog.h" + +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/views/view_catalog.h" + +namespace mongo { +namespace { +const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>(); + +const int kTimeseriesBucketMaxCount = 1000; +const int kTimeseriesBucketMaxSizeBytes = 125 * 1024; // 125 KB +const Hours kTimeseriesBucketMaxTimeRange(1); +} // namespace + +BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) { + return getBucketCatalog(svcCtx); +} + +BucketCatalog& BucketCatalog::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +BucketCatalog::InsertResult BucketCatalog::insert(OperationContext* opCtx, + const NamespaceString& ns, + const BSONObj& doc) { + stdx::lock_guard lk(_mutex); + + auto viewCatalog = DatabaseHolder::get(opCtx)->getSharedViewCatalog(opCtx, ns.db()); + invariant(viewCatalog); + auto viewDef = viewCatalog->lookup(opCtx, ns.ns()); + invariant(viewDef); + const auto& options = *viewDef->timeseries(); + + BSONObjBuilder metadata; + if (auto metaField = options.getMetaField()) { + if (auto elem = doc[*metaField]) { + metadata.appendAs(elem, *metaField); + } else { + metadata.appendNull(*metaField); + } + } + auto key = std::make_pair(ns, BucketMetadata{metadata.obj()}); + + auto time = doc[options.getTimeField()].Date(); + auto setBucketTime = [time = durationCount<Seconds>(time.toDurationSinceEpoch())]( + OID* bucketId) { bucketId->setTimestamp(time); }; + + auto it = _bucketIds.find(key); + if (it == _bucketIds.end()) { + // A bucket for this namespace and metadata pair does not yet exist. + it = _bucketIds.insert({std::move(key), OID::gen()}).first; + setBucketTime(&it->second); + _orderedBuckets.insert({ns, it->first.second, it->second}); + } + + _idleBuckets.erase(it->second); + auto bucket = &_buckets[it->second]; + + StringSet newFieldNamesToBeInserted; + uint32_t sizeToBeAdded = 0; + for (const auto& elem : doc) { + if (options.getMetaField() && elem.fieldNameStringData() == *options.getMetaField()) { + // Ignore the metadata field since it will not be inserted. + continue; + } + + // If the field name is new, add the size of an empty object with that field name. + if (!bucket->fieldNames.contains(elem.fieldName())) { + newFieldNamesToBeInserted.insert(elem.fieldName()); + sizeToBeAdded += BSON(elem.fieldName() << BSONObj()).objsize(); + } + + // Add the element size, taking into account that the name will be changed to its positional + // number. Add 1 to the calculation since the element's field name size accounts for a null + // terminator whereas the stringified position does not. + sizeToBeAdded += + elem.size() - elem.fieldNameSize() + std::to_string(bucket->numMeasurements).size() + 1; + } + + auto bucketTime = it->second.asDateT(); + if (bucket->numMeasurements == kTimeseriesBucketMaxCount || + bucket->size + sizeToBeAdded > kTimeseriesBucketMaxSizeBytes || + time - bucketTime >= kTimeseriesBucketMaxTimeRange || time < bucketTime) { + // The bucket is full, so create a new one. + bucket->full = true; + it->second = OID::gen(); + setBucketTime(&it->second); + _orderedBuckets.insert({ns, it->first.second, it->second}); + bucket = &_buckets[it->second]; + } + + bucket->numWriters++; + bucket->numMeasurements++; + bucket->size += sizeToBeAdded; + bucket->measurementsToBeInserted.push_back(doc); + bucket->newFieldNamesToBeInserted.merge(newFieldNamesToBeInserted); + if (bucket->ns.isEmpty()) { + // The namespace and metadata only need to be set if this bucket was newly created. + bucket->ns = ns; + bucket->metadata = it->first.second; + } + + // If there is exactly 1 uncommitted measurement, the caller is the committer. Otherwise, it is + // a waiter. + boost::optional<Future<CommitInfo>> commitInfoFuture; + if (bucket->numMeasurements - bucket->numCommittedMeasurements > 1) { + auto [promise, future] = makePromiseFuture<CommitInfo>(); + bucket->promises[bucket->numMeasurements - 1] = std::move(promise); + commitInfoFuture = std::move(future); + } + + return {it->second, std::move(commitInfoFuture)}; +} + +BucketCatalog::CommitData BucketCatalog::commit(const OID& bucketId, + boost::optional<CommitInfo> previousCommitInfo) { + stdx::lock_guard lk(_mutex); + auto it = _buckets.find(bucketId); + auto& bucket = it->second; + + // The only case in which previousCommitInfo should not be provided is the first time a given + // committer calls this function. + invariant(!previousCommitInfo || bucket.numCommittedMeasurements != 0 || + bucket.numPendingCommitMeasurements != 0); + + bucket.fieldNames.merge(bucket.newFieldNamesToBeInserted); + bucket.newFieldNamesToBeInserted.clear(); + + std::vector<BSONObj> measurements; + bucket.measurementsToBeInserted.swap(measurements); + + // Inform waiters that their measurements have been committed. + for (uint32_t i = 0; i < bucket.numPendingCommitMeasurements; i++) { + auto it = bucket.promises.find(i + bucket.numCommittedMeasurements); + if (it != bucket.promises.end()) { + it->second.emplaceValue(*previousCommitInfo); + bucket.promises.erase(it); + } + } + + bucket.numWriters -= bucket.numPendingCommitMeasurements; + auto numCommittedMeasurements = bucket.numCommittedMeasurements += + std::exchange(bucket.numPendingCommitMeasurements, measurements.size()); + + if (measurements.empty()) { + if (bucket.full) { + // Everything in the bucket has been committed, and nothing more will be added since the + // bucket is full. Thus, we can remove it. + _orderedBuckets.erase( + {std::move(it->second.ns), std::move(it->second.metadata), bucketId}); + _buckets.erase(it); + } else if (--bucket.numWriters == 0) { + _idleBuckets.insert(bucketId); + } + } + + return {std::move(measurements), numCommittedMeasurements}; +} + +void BucketCatalog::clear(const NamespaceString& ns) { + stdx::lock_guard lk(_mutex); + + auto shouldClear = [&ns](const NamespaceString& bucketNs) { + return ns.coll().empty() ? ns.db() == bucketNs.db() : ns == bucketNs; + }; + + for (auto it = _orderedBuckets.lower_bound({ns, {}, {}}); + it != _orderedBuckets.end() && shouldClear(std::get<NamespaceString>(*it));) { + auto& bucketId = std::get<OID>(*it); + _buckets.erase(bucketId); + _idleBuckets.erase(bucketId); + _bucketIds.erase({std::get<NamespaceString>(*it), std::get<BucketMetadata>(*it)}); + it = _orderedBuckets.erase(it); + } +} + +void BucketCatalog::clear(StringData dbName) { + clear(NamespaceString(dbName, "")); +} + +bool BucketCatalog::BucketMetadata::operator<(const BucketMetadata& other) const { + auto size = metadata.objsize(); + auto otherSize = other.metadata.objsize(); + auto cmp = std::memcmp(metadata.objdata(), other.metadata.objdata(), std::min(size, otherSize)); + return cmp == 0 && size != otherSize ? size < otherSize : cmp < 0; +} + +bool BucketCatalog::BucketMetadata::operator==(const BucketMetadata& other) const { + return metadata.binaryEqual(other.metadata); +} +} // namespace mongo diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h new file mode 100644 index 00000000000..c8aedce23a2 --- /dev/null +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/ops/single_write_result_gen.h" +#include "mongo/db/service_context.h" +#include "mongo/db/timeseries/timeseries_gen.h" +#include "mongo/util/string_map.h" + +namespace mongo { +class BucketCatalog { +public: + struct CommitInfo { + StatusWith<SingleWriteResult> result; + boost::optional<repl::OpTime> opTime; + boost::optional<OID> electionId; + }; + + struct InsertResult { + OID bucketId; + boost::optional<Future<CommitInfo>> commitInfo; + }; + + struct CommitData { + std::vector<BSONObj> docs; + uint16_t numCommittedMeasurements; + }; + + static BucketCatalog& get(ServiceContext* svcCtx); + static BucketCatalog& get(OperationContext* opCtx); + + BucketCatalog() = default; + + BucketCatalog(const BucketCatalog&) = delete; + BucketCatalog operator=(const BucketCatalog&) = delete; + + /** + * Returns the id of the bucket that the document belongs in, and a Future to wait on if the + * caller is a waiter for the bucket. If no Future is provided, the caller is the committer for + * this bucket. + */ + InsertResult insert(OperationContext* opCtx, const NamespaceString& ns, const BSONObj& doc); + + /** + * Returns the uncommitted measurements and the number of measurements that have already been + * committed for the given bucket. This should be called continuously by the committer until + * there are no more uncommitted measurements. + */ + CommitData commit(const OID& bucketId, + boost::optional<CommitInfo> previousCommitInfo = boost::none); + + /** + * Clears the buckets for the given namespace. + */ + void clear(const NamespaceString& ns); + + /** + * Clears the buckets for the given database. + */ + void clear(StringData dbName); + +private: + struct BucketMetadata { + bool operator<(const BucketMetadata& other) const; + bool operator==(const BucketMetadata& other) const; + + template <typename H> + friend H AbslHashValue(H h, const BucketMetadata& metadata) { + // TODO (SERVER-52967): Hash the metadata in a way that does not depend on its ordering. + SimpleBSONObjComparator::Hasher hasher; + return H::combine(std::move(h), hasher(metadata.metadata)); + } + + BSONObj metadata; + }; + + struct Bucket { + // The namespace that this bucket is used for. + NamespaceString ns; + + // The metadata of the data that this bucket contains. + BucketMetadata metadata; + + // Measurements to be inserted into the bucket. + std::vector<BSONObj> measurementsToBeInserted; + + // New top-level field names of the measurements to be inserted. + StringSet newFieldNamesToBeInserted; + + // Top-level field names of the measurements that have been inserted into the bucket. + StringSet fieldNames; + + // The total size in bytes of the bucket's BSON serialization, including measurements to be + // inserted. + uint32_t size = 0; + + // The total number of measurements in the bucket, including uncommitted measurements and + // measurements to be inserted. + uint16_t numMeasurements = 0; + + // The number of measurements that were most recently returned from a call to commit(). + uint16_t numPendingCommitMeasurements = 0; + + // The number of committed measurements in the bucket. + uint16_t numCommittedMeasurements = 0; + + // The number of current writers for the bucket. + uint32_t numWriters = 0; + + // Promises for committers to fulfill in order to signal to waiters that their measurements + // have been committed. + stdx::unordered_map<uint16_t, Promise<CommitInfo>> promises; + + // Whether the bucket is full. This can be due to number of measurements, size, or time + // range. + bool full = false; + }; + + Mutex _mutex = MONGO_MAKE_LATCH("BucketCatalog"); + + // All buckets currently in the catalog, including buckets which are full but not yet committed. + stdx::unordered_map<OID, Bucket, OID::Hasher> _buckets; + + // The _id of the current bucket for each namespace and metadata pair. + stdx::unordered_map<std::pair<NamespaceString, BucketMetadata>, OID> _bucketIds; + + // All namespace, metadata, and _id tuples which currently have a bucket in the catalog. + std::set<std::tuple<NamespaceString, BucketMetadata, OID>> _orderedBuckets; + + // Buckets that do not have any writers. + std::set<OID> _idleBuckets; +}; +} // namespace mongo diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp new file mode 100644 index 00000000000..469c874f14f --- /dev/null +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog/catalog_test_fixture.h" +#include "mongo/db/catalog/create_collection.h" +#include "mongo/db/timeseries/bucket_catalog.h" +#include "mongo/db/views/view_catalog.h" +#include "mongo/unittest/death_test.h" + +namespace mongo { +namespace { +class BucketCatalogTest : public CatalogTestFixture { +protected: + void setUp() override; + + void _commit(const OID& bucketId, uint16_t numCommittedMeasurements); + void _insertOneAndCommit(const NamespaceString& ns, uint16_t numCommittedMeasurements); + + OperationContext* _opCtx; + BucketCatalog* _bucketCatalog; + + StringData _timeField = "time"; + StringData _metaField = "meta"; + + NamespaceString _ns1{"bucket_catalog_test_1", "t_1"}; + NamespaceString _ns2{"bucket_catalog_test_1", "t_2"}; + NamespaceString _ns3{"bucket_catalog_test_2", "t_1"}; + + BucketCatalog::CommitInfo _commitInfo{StatusWith<SingleWriteResult>(SingleWriteResult{})}; +}; + +void BucketCatalogTest::setUp() { + CatalogTestFixture::setUp(); + + _opCtx = operationContext(); + _bucketCatalog = &BucketCatalog::get(_opCtx); + + for (const auto& ns : {_ns1, _ns2, _ns3}) { + ASSERT_OK(createCollection( + _opCtx, + ns.db().toString(), + BSON("create" << ns.coll() << "timeseries" + << BSON("timeField" << _timeField << "metaField" << _metaField)))); + } +} + +void BucketCatalogTest::_commit(const OID& bucketId, uint16_t numCommittedMeasurements) { + auto data = _bucketCatalog->commit(bucketId); + ASSERT_EQ(data.docs.size(), 1); + ASSERT_EQ(data.numCommittedMeasurements, numCommittedMeasurements); + + data = _bucketCatalog->commit(bucketId, _commitInfo); + ASSERT_EQ(data.docs.size(), 0); + ASSERT_EQ(data.numCommittedMeasurements, numCommittedMeasurements + 1); +} + +void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns, + uint16_t numCommittedMeasurements) { + auto [bucketId, commitInfo] = + _bucketCatalog->insert(_opCtx, ns, BSON(_timeField << Date_t::now())); + ASSERT(!commitInfo); + + _commit(bucketId, numCommittedMeasurements); +} + +TEST_F(BucketCatalogTest, InsertIntoSameBucket) { + // The first insert should be the committer. + auto result1 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); + ASSERT(!result1.commitInfo); + + // A subsequent insert into the same bucket should be a waiter. + auto result2 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); + ASSERT(result2.commitInfo); + ASSERT(!result2.commitInfo->isReady()); + + // Committing should return both documents since they belong in the same bucket. + auto data = _bucketCatalog->commit(result1.bucketId); + ASSERT_EQ(data.docs.size(), 2); + ASSERT_EQ(data.numCommittedMeasurements, 0); + ASSERT(!result2.commitInfo->isReady()); + + // Once the commit has occurred, the waiter should be notified. + data = _bucketCatalog->commit(result1.bucketId, _commitInfo); + ASSERT_EQ(data.docs.size(), 0); + ASSERT_EQ(data.numCommittedMeasurements, 2); + ASSERT(result2.commitInfo->isReady()); +} + +TEST_F(BucketCatalogTest, InsertInfoDifferentBuckets) { + // The first insert should be the committer. + auto result1 = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); + ASSERT(!result1.commitInfo); + + // Subsequent inserts into different buckets should also be committers. + auto result2 = _bucketCatalog->insert( + _opCtx, _ns1, BSON(_timeField << Date_t::now() << _metaField << BSONObj())); + ASSERT(!result2.commitInfo); + + auto result3 = _bucketCatalog->insert(_opCtx, _ns2, BSON(_timeField << Date_t::now())); + ASSERT(!result3.commitInfo); + + // Committing one bucket should only return the one document in that bucket and shoukd not + // affect the other bucket. + for (const auto& bucketId : {result1.bucketId, result2.bucketId, result3.bucketId}) { + _commit(bucketId, 0); + } +} + +TEST_F(BucketCatalogTest, NumCommittedMeasurementsAccumulates) { + // The numCommittedMeasurements returned when committing should accumulate as more entries in + // the bucket are committed. + _insertOneAndCommit(_ns1, 0); + _insertOneAndCommit(_ns1, 1); +} + +TEST_F(BucketCatalogTest, ClearNamespaceBuckets) { + _insertOneAndCommit(_ns1, 0); + _insertOneAndCommit(_ns2, 0); + + _bucketCatalog->clear(_ns1); + + _insertOneAndCommit(_ns1, 0); + _insertOneAndCommit(_ns2, 1); +} + +TEST_F(BucketCatalogTest, ClearDatabaseBuckets) { + _insertOneAndCommit(_ns1, 0); + _insertOneAndCommit(_ns2, 0); + _insertOneAndCommit(_ns3, 0); + + _bucketCatalog->clear(_ns1.db()); + + _insertOneAndCommit(_ns1, 0); + _insertOneAndCommit(_ns2, 0); + _insertOneAndCommit(_ns3, 1); +} + +DEATH_TEST_F(BucketCatalogTest, CannotProvideCommitInfoOnFirstCommit, "invariant") { + auto [bucketId, _] = _bucketCatalog->insert(_opCtx, _ns1, BSON(_timeField << Date_t::now())); + _bucketCatalog->commit(bucketId, _commitInfo); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/timeseries/timeseries.idl b/src/mongo/db/timeseries/timeseries.idl new file mode 100644 index 00000000000..c2ecaf9227b --- /dev/null +++ b/src/mongo/db/timeseries/timeseries.idl @@ -0,0 +1,54 @@ +# Copyright (C) 2020-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + TimeseriesOptions: + description: "The options that define a time-series collection." + strict: true + fields: + timeField: + description: "The name of the field to be used for time. Inserted documents must + have this field, and the field must be of the BSON UTC datetime type + (0x9)" + type: string + metaField: + description: "The name of the field describing the series. This field is used to + group related data and may be of any BSON type. This may not be + \"_id\" or the same as 'timeField'." + type: string + optional: true + expireAfterSeconds: + description: "The number of seconds after which old time-series data should be + deleted." + type: long + optional: true diff --git a/src/mongo/db/views/durable_view_catalog.cpp b/src/mongo/db/views/durable_view_catalog.cpp index 1031f87682f..d6770ca7a99 100644 --- a/src/mongo/db/views/durable_view_catalog.cpp +++ b/src/mongo/db/views/durable_view_catalog.cpp @@ -142,7 +142,8 @@ BSONObj DurableViewCatalogImpl::_validateViewDefinition(OperationContext* opCtx, for (const BSONElement& e : viewDefinition) { std::string name(e.fieldName()); - valid &= name == "_id" || name == "viewOn" || name == "pipeline" || name == "collation"; + valid &= name == "_id" || name == "viewOn" || name == "pipeline" || name == "collation" || + name == "timeseries"; } const auto viewName = viewDefinition["_id"].str(); @@ -168,6 +169,9 @@ BSONObj DurableViewCatalogImpl::_validateViewDefinition(OperationContext* opCtx, valid &= (!viewDefinition.hasField("collation") || viewDefinition["collation"].type() == BSONType::Object); + valid &= !viewDefinition.hasField("timeseries") || + viewDefinition["timeseries"].type() == BSONType::Object; + uassert(ErrorCodes::InvalidViewDefinition, str::stream() << "found invalid view definition " << viewDefinition["_id"] << " while reading '" << _db->getSystemViewsName() << "'", diff --git a/src/mongo/db/views/view.cpp b/src/mongo/db/views/view.cpp index e5812dedf19..1f2f6d797bd 100644 --- a/src/mongo/db/views/view.cpp +++ b/src/mongo/db/views/view.cpp @@ -41,8 +41,12 @@ ViewDefinition::ViewDefinition(StringData dbName, StringData viewName, StringData viewOnName, const BSONObj& pipeline, - std::unique_ptr<CollatorInterface> collator) - : _viewNss(dbName, viewName), _viewOnNss(dbName, viewOnName), _collator(std::move(collator)) { + std::unique_ptr<CollatorInterface> collator, + const boost::optional<TimeseriesOptions>& timeseries) + : _viewNss(dbName, viewName), + _viewOnNss(dbName, viewOnName), + _collator(std::move(collator)), + _timeseries(timeseries) { for (BSONElement e : pipeline) { _pipeline.push_back(e.Obj().getOwned()); } @@ -52,22 +56,19 @@ ViewDefinition::ViewDefinition(const ViewDefinition& other) : _viewNss(other._viewNss), _viewOnNss(other._viewOnNss), _collator(CollatorInterface::cloneCollator(other._collator.get())), - _pipeline(other._pipeline) {} + _pipeline(other._pipeline), + _timeseries(other._timeseries) {} ViewDefinition& ViewDefinition::operator=(const ViewDefinition& other) { _viewNss = other._viewNss; _viewOnNss = other._viewOnNss; _collator = CollatorInterface::cloneCollator(other._collator.get()); _pipeline = other._pipeline; + _timeseries = other._timeseries; return *this; } -bool ViewDefinition::isTimeseries() const { - auto bucketsNs = _viewNss.makeTimeseriesBucketsNamespace(); - return bucketsNs == _viewOnNss; -} - void ViewDefinition::setViewOn(const NamespaceString& viewOnNss) { invariant(_viewNss.db() == viewOnNss.db()); _viewOnNss = viewOnNss; diff --git a/src/mongo/db/views/view.h b/src/mongo/db/views/view.h index 715399ef746..07edd41fddc 100644 --- a/src/mongo/db/views/view.h +++ b/src/mongo/db/views/view.h @@ -36,6 +36,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/collation/collator_interface.h" +#include "mongo/db/timeseries/timeseries_gen.h" namespace mongo { @@ -52,7 +53,8 @@ public: StringData viewName, StringData viewOnName, const BSONObj& pipeline, - std::unique_ptr<CollatorInterface> collation); + std::unique_ptr<CollatorInterface> collation, + const boost::optional<TimeseriesOptions>& timeseries); /** * Copying a view 'other' clones its collator and does a simple copy of all other fields. @@ -91,9 +93,11 @@ public: } /** - * Returns true if this view represents a time-series collection. + * Returns the time-series options for the view, or boost::none if not a time-series view. */ - bool isTimeseries() const; + const boost::optional<TimeseriesOptions>& timeseries() const { + return _timeseries; + } void setViewOn(const NamespaceString& viewOnNss); @@ -107,5 +111,6 @@ private: NamespaceString _viewOnNss; std::unique_ptr<CollatorInterface> _collator; std::vector<BSONObj> _pipeline; + boost::optional<TimeseriesOptions> _timeseries; }; } // namespace mongo diff --git a/src/mongo/db/views/view_catalog.cpp b/src/mongo/db/views/view_catalog.cpp index c606eede46a..a20cec26e77 100644 --- a/src/mongo/db/views/view_catalog.cpp +++ b/src/mongo/db/views/view_catalog.cpp @@ -122,11 +122,22 @@ Status ViewCatalog::_reload(WithLock, } } + boost::optional<TimeseriesOptions> timeseries; + if (view.hasField("timeseries")) { + try { + timeseries = + TimeseriesOptions::parse({"ViewCatalog::_reload"}, view["timeseries"].Obj()); + } catch (const DBException& ex) { + return ex.toStatus(); + } + } + _viewMap[viewName.ns()] = std::make_shared<ViewDefinition>(viewName.db(), viewName.coll(), view["viewOn"].str(), pipeline, - std::move(collator.getValue())); + std::move(collator.getValue()), + timeseries); return Status::OK(); }; @@ -185,7 +196,8 @@ Status ViewCatalog::_createOrUpdateView(WithLock lk, const NamespaceString& viewName, const NamespaceString& viewOn, const BSONArray& pipeline, - std::unique_ptr<CollatorInterface> collator) { + std::unique_ptr<CollatorInterface> collator, + const boost::optional<TimeseriesOptions>& timeseries) { invariant(opCtx->lockState()->isDbLockedForMode(viewName.db(), MODE_IX)); invariant(opCtx->lockState()->isCollectionLockedForMode(viewName, MODE_IX)); invariant(opCtx->lockState()->isCollectionLockedForMode( @@ -206,10 +218,17 @@ Status ViewCatalog::_createOrUpdateView(WithLock lk, if (collator) { viewDefBuilder.append("collation", collator->getSpec().toBSON()); } + if (timeseries) { + viewDefBuilder.append("timeseries", timeseries->toBSON()); + } BSONObj ownedPipeline = pipeline.getOwned(); - auto view = std::make_shared<ViewDefinition>( - viewName.db(), viewName.coll(), viewOn.coll(), ownedPipeline, std::move(collator)); + auto view = std::make_shared<ViewDefinition>(viewName.db(), + viewName.coll(), + viewOn.coll(), + ownedPipeline, + std::move(collator), + timeseries); // Check that the resulting dependency graph is acyclic and within the maximum depth. Status graphStatus = _upsertIntoGraph(lk, opCtx, *(view.get())); @@ -403,7 +422,8 @@ Status ViewCatalog::createView(OperationContext* opCtx, const NamespaceString& viewName, const NamespaceString& viewOn, const BSONArray& pipeline, - const BSONObj& collation) { + const BSONObj& collation, + const boost::optional<TimeseriesOptions>& timeseries) { invariant(opCtx->lockState()->isDbLockedForMode(viewName.db(), MODE_IX)); invariant(opCtx->lockState()->isCollectionLockedForMode(viewName, MODE_IX)); invariant(opCtx->lockState()->isCollectionLockedForMode( @@ -428,7 +448,7 @@ Status ViewCatalog::createView(OperationContext* opCtx, return collator.getStatus(); return _createOrUpdateView( - lk, opCtx, viewName, viewOn, pipeline, std::move(collator.getValue())); + lk, opCtx, viewName, viewOn, pipeline, std::move(collator.getValue()), timeseries); } Status ViewCatalog::modifyView(OperationContext* opCtx, diff --git a/src/mongo/db/views/view_catalog.h b/src/mongo/db/views/view_catalog.h index ecfff9de990..46a6735cf9c 100644 --- a/src/mongo/db/views/view_catalog.h +++ b/src/mongo/db/views/view_catalog.h @@ -102,7 +102,8 @@ public: const NamespaceString& viewName, const NamespaceString& viewOn, const BSONArray& pipeline, - const BSONObj& collation); + const BSONObj& collation, + const boost::optional<TimeseriesOptions>& timeseries); /** * Drop the view named 'viewName'. @@ -168,7 +169,8 @@ private: const NamespaceString& viewName, const NamespaceString& viewOn, const BSONArray& pipeline, - std::unique_ptr<CollatorInterface> collator); + std::unique_ptr<CollatorInterface> collator, + const boost::optional<TimeseriesOptions>& timeseries = boost::none); /** * Parses the view definition pipeline, attempts to upsert into the view graph, and refreshes * the graph if necessary. Returns an error status if the resulting graph would be invalid. diff --git a/src/mongo/db/views/view_catalog_test.cpp b/src/mongo/db/views/view_catalog_test.cpp index c2d7367db3e..30d1b396444 100644 --- a/src/mongo/db/views/view_catalog_test.cpp +++ b/src/mongo/db/views/view_catalog_test.cpp @@ -123,7 +123,8 @@ public: MODE_X); WriteUnitOfWork wuow(opCtx); - Status s = _viewCatalog->createView(opCtx, viewName, viewOn, pipeline, collation); + Status s = + _viewCatalog->createView(opCtx, viewName, viewOn, pipeline, collation, boost::none); wuow.commit(); return s; @@ -530,7 +531,7 @@ TEST_F(ViewCatalogFixture, LookupRIDExistingViewRollback) { WriteUnitOfWork wunit(operationContext()); ASSERT_OK(getViewCatalog()->createView( - operationContext(), viewName, viewOn, emptyPipeline, emptyCollation)); + operationContext(), viewName, viewOn, emptyPipeline, emptyCollation, boost::none)); } auto resourceID = ResourceId(RESOURCE_COLLECTION, "db.view"_sd); auto collectionCatalog = CollectionCatalog::get(operationContext()); diff --git a/src/mongo/db/views/view_definition_test.cpp b/src/mongo/db/views/view_definition_test.cpp index 6b2aa57cba9..7ea401caee4 100644 --- a/src/mongo/db/views/view_definition_test.cpp +++ b/src/mongo/db/views/view_definition_test.cpp @@ -47,10 +47,11 @@ namespace { const NamespaceString viewNss("testdb.testview"); const NamespaceString backingNss("testdb.testcoll"); const BSONObj samplePipeline = BSON_ARRAY(BSON("limit" << 9)); +const TimeseriesOptions timeseries("time"); TEST(ViewDefinitionTest, ViewDefinitionCreationCorrectlyBuildsNamespaceStrings) { ViewDefinition viewDef( - viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr); + viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, boost::none); ASSERT_EQ(viewDef.name(), viewNss); ASSERT_EQ(viewDef.viewOn(), backingNss); } @@ -58,8 +59,12 @@ TEST(ViewDefinitionTest, ViewDefinitionCreationCorrectlyBuildsNamespaceStrings) TEST(ViewDefinitionTest, CopyConstructorProperlyClonesAllFields) { auto collator = std::make_unique<CollatorInterfaceMock>(CollatorInterfaceMock::MockType::kReverseString); - ViewDefinition originalView( - viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, std::move(collator)); + ViewDefinition originalView(viewNss.db(), + viewNss.coll(), + backingNss.coll(), + samplePipeline, + std::move(collator), + timeseries); ViewDefinition copiedView(originalView); ASSERT_EQ(originalView.name(), copiedView.name()); @@ -70,13 +75,18 @@ TEST(ViewDefinitionTest, CopyConstructorProperlyClonesAllFields) { SimpleBSONObjComparator::kInstance.makeEqualTo())); ASSERT(CollatorInterface::collatorsMatch(originalView.defaultCollator(), copiedView.defaultCollator())); + ASSERT(originalView.timeseries()->toBSON().binaryEqual(copiedView.timeseries()->toBSON())); } TEST(ViewDefinitionTest, CopyAssignmentOperatorProperlyClonesAllFields) { auto collator = std::make_unique<CollatorInterfaceMock>(CollatorInterfaceMock::MockType::kReverseString); - ViewDefinition originalView( - viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, std::move(collator)); + ViewDefinition originalView(viewNss.db(), + viewNss.coll(), + backingNss.coll(), + samplePipeline, + std::move(collator), + timeseries); ViewDefinition copiedView = originalView; ASSERT_EQ(originalView.name(), copiedView.name()); @@ -87,20 +97,21 @@ TEST(ViewDefinitionTest, CopyAssignmentOperatorProperlyClonesAllFields) { SimpleBSONObjComparator::kInstance.makeEqualTo())); ASSERT(CollatorInterface::collatorsMatch(originalView.defaultCollator(), copiedView.defaultCollator())); + ASSERT(originalView.timeseries()->toBSON().binaryEqual(copiedView.timeseries()->toBSON())); } DEATH_TEST_REGEX(ViewDefinitionTest, SetViewOnFailsIfNewViewOnNotInSameDatabaseAsView, R"#(Invariant failure.*_viewNss.db\(\) == viewOnNss.db\(\))#") { ViewDefinition viewDef( - viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr); + viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, boost::none); NamespaceString badViewOn("someOtherDb.someOtherCollection"); viewDef.setViewOn(badViewOn); } TEST(ViewDefinitionTest, SetViewOnSucceedsIfNewViewOnIsInSameDatabaseAsView) { ViewDefinition viewDef( - viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr); + viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, boost::none); ASSERT_EQ(viewDef.viewOn(), backingNss); NamespaceString newViewOn("testdb.othercollection"); @@ -112,7 +123,7 @@ DEATH_TEST_REGEX(ViewDefinitionTest, SetPiplineFailsIfPipelineTypeIsNotArray, R"#(Invariant failure.*pipeline.type\(\) == Array)#") { ViewDefinition viewDef( - viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr); + viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, boost::none); // We'll pass in a BSONElement that could be a valid array, but is BSONType::Object rather than // BSONType::Array. @@ -127,7 +138,8 @@ DEATH_TEST_REGEX(ViewDefinitionTest, } TEST(ViewDefinitionTest, SetPipelineSucceedsOnValidArrayBSONElement) { - ViewDefinition viewDef(viewNss.db(), viewNss.coll(), backingNss.coll(), BSONObj(), nullptr); + ViewDefinition viewDef( + viewNss.db(), viewNss.coll(), backingNss.coll(), BSONObj(), nullptr, boost::none); ASSERT(viewDef.pipeline().empty()); BSONObj matchStage = BSON("match" << BSON("x" << 9)); @@ -142,5 +154,12 @@ TEST(ViewDefinitionTest, SetPipelineSucceedsOnValidArrayBSONElement) { viewDef.pipeline().begin(), SimpleBSONObjComparator::kInstance.makeEqualTo())); } + +TEST(ViewDefinitionTest, ViewDefinitionCreationCorrectlySetsTimeseries) { + ViewDefinition viewDef( + viewNss.db(), viewNss.coll(), backingNss.coll(), samplePipeline, nullptr, timeseries); + ASSERT(viewDef.timeseries()); + ASSERT_EQ(viewDef.timeseries()->getTimeField(), "time"); +} } // namespace } // namespace mongo diff --git a/src/mongo/db/views/view_graph_test.cpp b/src/mongo/db/views/view_graph_test.cpp index 5ace80ef15a..0dbdab653d4 100644 --- a/src/mongo/db/views/view_graph_test.cpp +++ b/src/mongo/db/views/view_graph_test.cpp @@ -83,7 +83,7 @@ public: collator = std::move(factoryCollator.getValue()); } - return {db, view, viewOn, pipeline, std::move(collator)}; + return {db, view, viewOn, pipeline, std::move(collator), boost::none}; } private: |