diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/timeseries/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 4 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_compression.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_compression.h | 3 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_compression_test.cpp | 97 |
7 files changed, 211 insertions, 16 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 27c55c8aed2..1fc9d0d0519 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -225,11 +225,11 @@ write_ops::UpdateOpEntry makeTimeseriesUpdateOpEntry( /** * Transforms a single time-series insert to an update request on an existing bucket. */ -write_ops::UpdateOpEntry makeTimeseriesCompressionOpEntry( +write_ops::UpdateOpEntry makeTimeseriesTransformationOpEntry( OperationContext* opCtx, const OID& bucketId, - write_ops::UpdateModification::TransformFunc compressionFunc) { - write_ops::UpdateModification u(std::move(compressionFunc)); + write_ops::UpdateModification::TransformFunc transformationFunc) { + write_ops::UpdateModification u(std::move(transformationFunc)); write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u)); invariant(!update.getMulti(), bucketId.toString()); invariant(!update.getUpsert(), bucketId.toString()); @@ -649,13 +649,14 @@ public: return op; } - write_ops::UpdateCommandRequest _makeTimeseriesCompressionOp( + write_ops::UpdateCommandRequest _makeTimeseriesTransformationOp( OperationContext* opCtx, const OID& bucketId, - write_ops::UpdateModification::TransformFunc compressionFunc) const { + write_ops::UpdateModification::TransformFunc transformationFunc) const { write_ops::UpdateCommandRequest op( makeTimeseriesBucketsNamespace(ns()), - {makeTimeseriesCompressionOpEntry(opCtx, bucketId, std::move(compressionFunc))}); + {makeTimeseriesTransformationOpEntry( + opCtx, bucketId, std::move(transformationFunc))}); write_ops::WriteCommandRequestBase base; // The schema validation configured in the bucket collection is intended for direct @@ -694,15 +695,13 @@ public: OperationContext* opCtx, std::shared_ptr<BucketCatalog::WriteBatch> batch, const BSONObj& metadata, - std::vector<StmtId>&& stmtIds) const { + const write_ops::UpdateCommandRequest& op) const { if (auto status = checkFailUnorderedTimeseriesInsertFailPoint(metadata)) { return {status->first, status->second}; } - return _getTimeseriesSingleWriteResult(write_ops_exec::performUpdates( - opCtx, - _makeTimeseriesUpdateOp(opCtx, batch, metadata, std::move(stmtIds)), - OperationSource::kTimeseriesInsert)); + return _getTimeseriesSingleWriteResult( + write_ops_exec::performUpdates(opCtx, op, OperationSource::kTimeseriesInsert)); } TimeseriesSingleWriteResult _performTimeseriesBucketCompression( @@ -753,8 +752,8 @@ public: return compressed.compressedBucket; }; - auto compressionOp = - _makeTimeseriesCompressionOp(opCtx, closedBucket.bucketId, bucketCompressionFunc); + auto compressionOp = _makeTimeseriesTransformationOp( + opCtx, closedBucket.bucketId, bucketCompressionFunc); auto result = _getTimeseriesSingleWriteResult( write_ops_exec::performUpdates(opCtx, compressionOp, OperationSource::kStandard)); @@ -775,6 +774,18 @@ public: return result; } + write_ops::UpdateCommandRequest _makeTimeseriesDecompressionOp( + OperationContext* opCtx, + const std::shared_ptr<BucketCatalog::WriteBatch>& batch) const { + auto bucketDecompressionFunc = + [&](const BSONObj& bucketDoc) -> boost::optional<BSONObj> { + return timeseries::decompressBucket(bucketDoc); + }; + + return _makeTimeseriesTransformationOp( + opCtx, batch->bucket().id, bucketDecompressionFunc); + } + /** * Returns whether the request can continue. */ @@ -816,8 +827,26 @@ public: << "Expected 1 insertion of document with _id '" << docId << "', but found " << output.result.getValue().getN() << "."); } else { - const auto output = - _performTimeseriesUpdate(opCtx, batch, metadata, std::move(stmtIds)); + if (batch->needToDecompressBucketBeforeInserting()) { + const auto output = _performTimeseriesUpdate( + opCtx, batch, metadata, _makeTimeseriesDecompressionOp(opCtx, batch)); + if (auto error = + generateError(opCtx, output.result, start + index, errors->size())) { + errors->emplace_back(std::move(*error)); + bucketCatalog.abort(batch, output.result.getStatus()); + return output.canContinue; + } + invariant(output.result.getValue().getNModified() == 1, + str::stream() << "Expected 1 update of document with _id '" << docId + << "', but found " + << output.result.getValue().getNModified() << "."); + } + + const auto output = _performTimeseriesUpdate( + opCtx, + batch, + metadata, + _makeTimeseriesUpdateOp(opCtx, batch, metadata, std::move(stmtIds))); if (auto error = generateError(opCtx, output.result, start + index, errors->size())) { errors->emplace_back(std::move(*error)); @@ -909,6 +938,9 @@ public: insertOps.push_back(_makeTimeseriesInsertOp( batch, metadata, std::move(stmtIds[batch.get()->bucket().id]))); } else { + if (batch.get()->needToDecompressBucketBeforeInserting()) { + updateOps.push_back(_makeTimeseriesDecompressionOp(opCtx, batch)); + } updateOps.push_back(_makeTimeseriesUpdateOp( opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucket().id]))); } diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript index 414f0841a70..80704b8ea45 100644 --- a/src/mongo/db/timeseries/SConscript +++ b/src/mongo/db/timeseries/SConscript @@ -136,6 +136,7 @@ env.CppUnitTest( 'bucket_catalog_helpers_test.cpp', 'bucket_catalog_state_manager_test.cpp', 'bucket_catalog_test.cpp', + 'bucket_compression_test.cpp', 'minmax_test.cpp', 'timeseries_dotted_path_support_test.cpp', 'timeseries_extended_range_test.cpp', diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 03c94f47986..1c91bf38d5a 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -667,6 +667,10 @@ uint32_t BucketCatalog::WriteBatch::numPreviouslyCommittedMeasurements() const { return _numPreviouslyCommittedMeasurements; } +bool BucketCatalog::WriteBatch::needToDecompressBucketBeforeInserting() const { + return _needToDecompressBucketBeforeInserting; +} + bool BucketCatalog::WriteBatch::finished() const { return _promise.getFuture().isReady(); } diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index a6cd1169a11..c68f101fb18 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -185,6 +185,7 @@ public: const BSONObj& max() const; const StringMap<std::size_t>& newFieldNamesToBeInserted() const; uint32_t numPreviouslyCommittedMeasurements() const; + bool needToDecompressBucketBeforeInserting() const; /** * Returns whether the batch has already been committed or aborted. @@ -230,7 +231,8 @@ public: BSONObj _min; // Batch-local min; full if first batch, updates otherwise. BSONObj _max; // Batch-local max; full if first batch, updates otherwise. uint32_t _numPreviouslyCommittedMeasurements = 0; - StringMap<std::size_t> _newFieldNamesToBeInserted; // Value is hash of string key + StringMap<std::size_t> _newFieldNamesToBeInserted; // Value is hash of string key + bool _needToDecompressBucketBeforeInserting = false; // Bucket is compressed on-disk. AtomicWord<bool> _commitRights{false}; SharedPromise<CommitInfo> _promise; diff --git a/src/mongo/db/timeseries/bucket_compression.cpp b/src/mongo/db/timeseries/bucket_compression.cpp index 1ccd26da0bd..2d1e03f8674 100644 --- a/src/mongo/db/timeseries/bucket_compression.cpp +++ b/src/mongo/db/timeseries/bucket_compression.cpp @@ -359,6 +359,62 @@ CompressionResult compressBucket(const BSONObj& bucketDoc, return {}; } +boost::optional<BSONObj> decompressBucket(const BSONObj& bucketDoc) { + BSONObjBuilder builder; + + for (auto&& topLevel : bucketDoc) { + if (topLevel.fieldNameStringData() == kBucketControlFieldName) { + BSONObjBuilder controlBuilder{builder.subobjStart(kBucketControlFieldName)}; + + for (auto&& e : topLevel.Obj()) { + if (e.fieldNameStringData() == kBucketControlVersionFieldName) { + // Check that we have a compressed bucket, and rewrite the version to signal + // it's uncompressed now. + if (e.type() != BSONType::NumberInt || + e.numberInt() != kTimeseriesControlCompressedVersion) { + // This bucket isn't compressed. + return boost::none; + } + builder.append(kBucketControlVersionFieldName, + kTimeseriesControlDefaultVersion); + } else if (e.fieldNameStringData() == kBucketControlCountFieldName) { + // Omit the count field when decompressing. + continue; + } else { + // Just copy all the other fields. + builder.append(e); + } + } + } else if (topLevel.fieldNameStringData() == kBucketDataFieldName) { + BSONObjBuilder dataBuilder{builder.subobjStart(kBucketDataFieldName)}; + + // Iterate over the compressed data columns and decompress each one. + for (auto&& e : topLevel.Obj()) { + if (e.type() != BSONType::BinData) { + // This bucket isn't actually compressed. + return boost::none; + } + + BSONObjBuilder columnBuilder{dataBuilder.subobjStart(e.fieldNameStringData())}; + + BSONColumn column{e}; + DecimalCounter<uint32_t> count{0}; + for (auto&& measurement : column) { + if (!measurement.eoo()) { + builder.appendAs(measurement, count); + } + count++; + } + } + } else { + // If it's not control or data, we can just copy it and continue. + builder.append(topLevel); + } + } + + return builder.obj(); +} + bool isCompressedBucket(const BSONObj& bucketDoc) { auto&& controlField = bucketDoc[timeseries::kBucketControlFieldName]; uassert(6540600, diff --git a/src/mongo/db/timeseries/bucket_compression.h b/src/mongo/db/timeseries/bucket_compression.h index 30c788a9417..51ef1945235 100644 --- a/src/mongo/db/timeseries/bucket_compression.h +++ b/src/mongo/db/timeseries/bucket_compression.h @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/timeseries/timeseries_gen.h" namespace mongo { @@ -61,6 +62,8 @@ CompressionResult compressBucket(const BSONObj& bucketDoc, bool eligibleForReopening, bool validateDecompression); +boost::optional<BSONObj> decompressBucket(const BSONObj& bucketDoc); + /** * Returns whether a timeseries bucket has been compressed to the v2 format. */ diff --git a/src/mongo/db/timeseries/bucket_compression_test.cpp b/src/mongo/db/timeseries/bucket_compression_test.cpp new file mode 100644 index 00000000000..54065085db1 --- /dev/null +++ b/src/mongo/db/timeseries/bucket_compression_test.cpp @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/bson/json.h" +#include "mongo/bson/unordered_fields_bsonobj_comparator.h" +#include "mongo/db/timeseries/bucket_compression.h" +#include "mongo/unittest/bson_test_util.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +const BSONObj sampleBucket = mongo::fromjson(R"({ + "_id" : {"$oid": "630ea4802093f9983fc394dc"}, + "control" : { + "version" : 1, + "min" : { + "_id" : {"$oid": "630fabf7c388456f8aea4f2d"}, + "t" : {"$date": "2022-08-31T00:00:00Z"}, + "a" : 0 + }, + "max" : { + "_id" : {"$oid": "630fabf7c388456f8aea4f35"}, + "t" : {"$date": "2022-08-31T00:00:04Z"}, + "a" : 4 + } + }, + "data" : { + "_id" : { + "0" : {"$oid": "630fabf7c388456f8aea4f2d"}, + "1" : {"$oid": "630fabf7c388456f8aea4f2f"}, + "2" : {"$oid": "630fabf7c388456f8aea4f31"}, + "3" : {"$oid": "630fabf7c388456f8aea4f33"}, + "4" : {"$oid": "630fabf7c388456f8aea4f35"} + }, + "a" : { + "0" : 0, + "1" : 1, + "2" : 2, + "3" : 3, + "4" : 4 + }, + "t" : { + "0" : {"$date": "2022-08-31T00:00:00Z"}, + "1" : {"$date": "2022-08-31T00:00:01Z"}, + "2" : {"$date": "2022-08-31T00:00:02Z"}, + "3" : {"$date": "2022-08-31T00:00:03Z"}, + "4" : {"$date": "2022-08-31T00:00:04Z"} + } + } +})"); + +TEST(TimeseriesBucketCompression, BasicRoundtrip) { + auto compressed = + timeseries::compressBucket(sampleBucket, "t"_sd, NamespaceString{"test.foo"}, true, false); + ASSERT_TRUE(compressed.compressedBucket.has_value()); + auto decompressed = timeseries::decompressBucket(compressed.compressedBucket.value()); + ASSERT_TRUE(decompressed.has_value()); + + // Compression will re-order data fields, moving the timeField to the front. + UnorderedFieldsBSONObjComparator comparator; + ASSERT_EQ(0, comparator.compare(decompressed.value(), sampleBucket)); +} + +TEST(TimeseriesBucketCompression, CannotDecompressUncompressedBucket) { + auto decompressed = timeseries::decompressBucket(sampleBucket); + ASSERT_FALSE(decompressed.has_value()); +} + +} // namespace +} // namespace mongo |