summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/write_commands.cpp62
-rw-r--r--src/mongo/db/timeseries/SConscript1
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp4
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h4
-rw-r--r--src/mongo/db/timeseries/bucket_compression.cpp56
-rw-r--r--src/mongo/db/timeseries/bucket_compression.h3
-rw-r--r--src/mongo/db/timeseries/bucket_compression_test.cpp97
7 files changed, 211 insertions, 16 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index 27c55c8aed2..1fc9d0d0519 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -225,11 +225,11 @@ write_ops::UpdateOpEntry makeTimeseriesUpdateOpEntry(
/**
* Transforms a single time-series insert to an update request on an existing bucket.
*/
-write_ops::UpdateOpEntry makeTimeseriesCompressionOpEntry(
+write_ops::UpdateOpEntry makeTimeseriesTransformationOpEntry(
OperationContext* opCtx,
const OID& bucketId,
- write_ops::UpdateModification::TransformFunc compressionFunc) {
- write_ops::UpdateModification u(std::move(compressionFunc));
+ write_ops::UpdateModification::TransformFunc transformationFunc) {
+ write_ops::UpdateModification u(std::move(transformationFunc));
write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u));
invariant(!update.getMulti(), bucketId.toString());
invariant(!update.getUpsert(), bucketId.toString());
@@ -649,13 +649,14 @@ public:
return op;
}
- write_ops::UpdateCommandRequest _makeTimeseriesCompressionOp(
+ write_ops::UpdateCommandRequest _makeTimeseriesTransformationOp(
OperationContext* opCtx,
const OID& bucketId,
- write_ops::UpdateModification::TransformFunc compressionFunc) const {
+ write_ops::UpdateModification::TransformFunc transformationFunc) const {
write_ops::UpdateCommandRequest op(
makeTimeseriesBucketsNamespace(ns()),
- {makeTimeseriesCompressionOpEntry(opCtx, bucketId, std::move(compressionFunc))});
+ {makeTimeseriesTransformationOpEntry(
+ opCtx, bucketId, std::move(transformationFunc))});
write_ops::WriteCommandRequestBase base;
// The schema validation configured in the bucket collection is intended for direct
@@ -694,15 +695,13 @@ public:
OperationContext* opCtx,
std::shared_ptr<BucketCatalog::WriteBatch> batch,
const BSONObj& metadata,
- std::vector<StmtId>&& stmtIds) const {
+ const write_ops::UpdateCommandRequest& op) const {
if (auto status = checkFailUnorderedTimeseriesInsertFailPoint(metadata)) {
return {status->first, status->second};
}
- return _getTimeseriesSingleWriteResult(write_ops_exec::performUpdates(
- opCtx,
- _makeTimeseriesUpdateOp(opCtx, batch, metadata, std::move(stmtIds)),
- OperationSource::kTimeseriesInsert));
+ return _getTimeseriesSingleWriteResult(
+ write_ops_exec::performUpdates(opCtx, op, OperationSource::kTimeseriesInsert));
}
TimeseriesSingleWriteResult _performTimeseriesBucketCompression(
@@ -753,8 +752,8 @@ public:
return compressed.compressedBucket;
};
- auto compressionOp =
- _makeTimeseriesCompressionOp(opCtx, closedBucket.bucketId, bucketCompressionFunc);
+ auto compressionOp = _makeTimeseriesTransformationOp(
+ opCtx, closedBucket.bucketId, bucketCompressionFunc);
auto result = _getTimeseriesSingleWriteResult(
write_ops_exec::performUpdates(opCtx, compressionOp, OperationSource::kStandard));
@@ -775,6 +774,18 @@ public:
return result;
}
+ write_ops::UpdateCommandRequest _makeTimeseriesDecompressionOp(
+ OperationContext* opCtx,
+ const std::shared_ptr<BucketCatalog::WriteBatch>& batch) const {
+ auto bucketDecompressionFunc =
+ [&](const BSONObj& bucketDoc) -> boost::optional<BSONObj> {
+ return timeseries::decompressBucket(bucketDoc);
+ };
+
+ return _makeTimeseriesTransformationOp(
+ opCtx, batch->bucket().id, bucketDecompressionFunc);
+ }
+
/**
* Returns whether the request can continue.
*/
@@ -816,8 +827,26 @@ public:
<< "Expected 1 insertion of document with _id '" << docId
<< "', but found " << output.result.getValue().getN() << ".");
} else {
- const auto output =
- _performTimeseriesUpdate(opCtx, batch, metadata, std::move(stmtIds));
+ if (batch->needToDecompressBucketBeforeInserting()) {
+ const auto output = _performTimeseriesUpdate(
+ opCtx, batch, metadata, _makeTimeseriesDecompressionOp(opCtx, batch));
+ if (auto error =
+ generateError(opCtx, output.result, start + index, errors->size())) {
+ errors->emplace_back(std::move(*error));
+ bucketCatalog.abort(batch, output.result.getStatus());
+ return output.canContinue;
+ }
+ invariant(output.result.getValue().getNModified() == 1,
+ str::stream() << "Expected 1 update of document with _id '" << docId
+ << "', but found "
+ << output.result.getValue().getNModified() << ".");
+ }
+
+ const auto output = _performTimeseriesUpdate(
+ opCtx,
+ batch,
+ metadata,
+ _makeTimeseriesUpdateOp(opCtx, batch, metadata, std::move(stmtIds)));
if (auto error =
generateError(opCtx, output.result, start + index, errors->size())) {
errors->emplace_back(std::move(*error));
@@ -909,6 +938,9 @@ public:
insertOps.push_back(_makeTimeseriesInsertOp(
batch, metadata, std::move(stmtIds[batch.get()->bucket().id])));
} else {
+ if (batch.get()->needToDecompressBucketBeforeInserting()) {
+ updateOps.push_back(_makeTimeseriesDecompressionOp(opCtx, batch));
+ }
updateOps.push_back(_makeTimeseriesUpdateOp(
opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucket().id])));
}
diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript
index 414f0841a70..80704b8ea45 100644
--- a/src/mongo/db/timeseries/SConscript
+++ b/src/mongo/db/timeseries/SConscript
@@ -136,6 +136,7 @@ env.CppUnitTest(
'bucket_catalog_helpers_test.cpp',
'bucket_catalog_state_manager_test.cpp',
'bucket_catalog_test.cpp',
+ 'bucket_compression_test.cpp',
'minmax_test.cpp',
'timeseries_dotted_path_support_test.cpp',
'timeseries_extended_range_test.cpp',
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 03c94f47986..1c91bf38d5a 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -667,6 +667,10 @@ uint32_t BucketCatalog::WriteBatch::numPreviouslyCommittedMeasurements() const {
return _numPreviouslyCommittedMeasurements;
}
+bool BucketCatalog::WriteBatch::needToDecompressBucketBeforeInserting() const {
+ return _needToDecompressBucketBeforeInserting;
+}
+
bool BucketCatalog::WriteBatch::finished() const {
return _promise.getFuture().isReady();
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index a6cd1169a11..c68f101fb18 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -185,6 +185,7 @@ public:
const BSONObj& max() const;
const StringMap<std::size_t>& newFieldNamesToBeInserted() const;
uint32_t numPreviouslyCommittedMeasurements() const;
+ bool needToDecompressBucketBeforeInserting() const;
/**
* Returns whether the batch has already been committed or aborted.
@@ -230,7 +231,8 @@ public:
BSONObj _min; // Batch-local min; full if first batch, updates otherwise.
BSONObj _max; // Batch-local max; full if first batch, updates otherwise.
uint32_t _numPreviouslyCommittedMeasurements = 0;
- StringMap<std::size_t> _newFieldNamesToBeInserted; // Value is hash of string key
+ StringMap<std::size_t> _newFieldNamesToBeInserted; // Value is hash of string key
+ bool _needToDecompressBucketBeforeInserting = false; // Bucket is compressed on-disk.
AtomicWord<bool> _commitRights{false};
SharedPromise<CommitInfo> _promise;
diff --git a/src/mongo/db/timeseries/bucket_compression.cpp b/src/mongo/db/timeseries/bucket_compression.cpp
index 1ccd26da0bd..2d1e03f8674 100644
--- a/src/mongo/db/timeseries/bucket_compression.cpp
+++ b/src/mongo/db/timeseries/bucket_compression.cpp
@@ -359,6 +359,62 @@ CompressionResult compressBucket(const BSONObj& bucketDoc,
return {};
}
+boost::optional<BSONObj> decompressBucket(const BSONObj& bucketDoc) {
+ BSONObjBuilder builder;
+
+ for (auto&& topLevel : bucketDoc) {
+ if (topLevel.fieldNameStringData() == kBucketControlFieldName) {
+ BSONObjBuilder controlBuilder{builder.subobjStart(kBucketControlFieldName)};
+
+ for (auto&& e : topLevel.Obj()) {
+ if (e.fieldNameStringData() == kBucketControlVersionFieldName) {
+ // Check that we have a compressed bucket, and rewrite the version to signal
+ // it's uncompressed now.
+ if (e.type() != BSONType::NumberInt ||
+ e.numberInt() != kTimeseriesControlCompressedVersion) {
+ // This bucket isn't compressed.
+ return boost::none;
+ }
+ builder.append(kBucketControlVersionFieldName,
+ kTimeseriesControlDefaultVersion);
+ } else if (e.fieldNameStringData() == kBucketControlCountFieldName) {
+ // Omit the count field when decompressing.
+ continue;
+ } else {
+ // Just copy all the other fields.
+ builder.append(e);
+ }
+ }
+ } else if (topLevel.fieldNameStringData() == kBucketDataFieldName) {
+ BSONObjBuilder dataBuilder{builder.subobjStart(kBucketDataFieldName)};
+
+ // Iterate over the compressed data columns and decompress each one.
+ for (auto&& e : topLevel.Obj()) {
+ if (e.type() != BSONType::BinData) {
+ // This bucket isn't actually compressed.
+ return boost::none;
+ }
+
+ BSONObjBuilder columnBuilder{dataBuilder.subobjStart(e.fieldNameStringData())};
+
+ BSONColumn column{e};
+ DecimalCounter<uint32_t> count{0};
+ for (auto&& measurement : column) {
+ if (!measurement.eoo()) {
+ builder.appendAs(measurement, count);
+ }
+ count++;
+ }
+ }
+ } else {
+ // If it's not control or data, we can just copy it and continue.
+ builder.append(topLevel);
+ }
+ }
+
+ return builder.obj();
+}
+
bool isCompressedBucket(const BSONObj& bucketDoc) {
auto&& controlField = bucketDoc[timeseries::kBucketControlFieldName];
uassert(6540600,
diff --git a/src/mongo/db/timeseries/bucket_compression.h b/src/mongo/db/timeseries/bucket_compression.h
index 30c788a9417..51ef1945235 100644
--- a/src/mongo/db/timeseries/bucket_compression.h
+++ b/src/mongo/db/timeseries/bucket_compression.h
@@ -33,6 +33,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/timeseries/timeseries_gen.h"
namespace mongo {
@@ -61,6 +62,8 @@ CompressionResult compressBucket(const BSONObj& bucketDoc,
bool eligibleForReopening,
bool validateDecompression);
+boost::optional<BSONObj> decompressBucket(const BSONObj& bucketDoc);
+
/**
* Returns whether a timeseries bucket has been compressed to the v2 format.
*/
diff --git a/src/mongo/db/timeseries/bucket_compression_test.cpp b/src/mongo/db/timeseries/bucket_compression_test.cpp
new file mode 100644
index 00000000000..54065085db1
--- /dev/null
+++ b/src/mongo/db/timeseries/bucket_compression_test.cpp
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/bson/json.h"
+#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
+#include "mongo/db/timeseries/bucket_compression.h"
+#include "mongo/unittest/bson_test_util.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+const BSONObj sampleBucket = mongo::fromjson(R"({
+ "_id" : {"$oid": "630ea4802093f9983fc394dc"},
+ "control" : {
+ "version" : 1,
+ "min" : {
+ "_id" : {"$oid": "630fabf7c388456f8aea4f2d"},
+ "t" : {"$date": "2022-08-31T00:00:00Z"},
+ "a" : 0
+ },
+ "max" : {
+ "_id" : {"$oid": "630fabf7c388456f8aea4f35"},
+ "t" : {"$date": "2022-08-31T00:00:04Z"},
+ "a" : 4
+ }
+ },
+ "data" : {
+ "_id" : {
+ "0" : {"$oid": "630fabf7c388456f8aea4f2d"},
+ "1" : {"$oid": "630fabf7c388456f8aea4f2f"},
+ "2" : {"$oid": "630fabf7c388456f8aea4f31"},
+ "3" : {"$oid": "630fabf7c388456f8aea4f33"},
+ "4" : {"$oid": "630fabf7c388456f8aea4f35"}
+ },
+ "a" : {
+ "0" : 0,
+ "1" : 1,
+ "2" : 2,
+ "3" : 3,
+ "4" : 4
+ },
+ "t" : {
+ "0" : {"$date": "2022-08-31T00:00:00Z"},
+ "1" : {"$date": "2022-08-31T00:00:01Z"},
+ "2" : {"$date": "2022-08-31T00:00:02Z"},
+ "3" : {"$date": "2022-08-31T00:00:03Z"},
+ "4" : {"$date": "2022-08-31T00:00:04Z"}
+ }
+ }
+})");
+
+TEST(TimeseriesBucketCompression, BasicRoundtrip) {
+ auto compressed =
+ timeseries::compressBucket(sampleBucket, "t"_sd, NamespaceString{"test.foo"}, true, false);
+ ASSERT_TRUE(compressed.compressedBucket.has_value());
+ auto decompressed = timeseries::decompressBucket(compressed.compressedBucket.value());
+ ASSERT_TRUE(decompressed.has_value());
+
+ // Compression will re-order data fields, moving the timeField to the front.
+ UnorderedFieldsBSONObjComparator comparator;
+ ASSERT_EQ(0, comparator.compare(decompressed.value(), sampleBucket));
+}
+
+TEST(TimeseriesBucketCompression, CannotDecompressUncompressedBucket) {
+ auto decompressed = timeseries::decompressBucket(sampleBucket);
+ ASSERT_FALSE(decompressed.has_value());
+}
+
+} // namespace
+} // namespace mongo