summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenrik Edin <henrik.edin@mongodb.com>2021-09-16 09:29:47 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-16 15:05:58 +0000
commita7001db2c3a43c6632904a7a4c34f4d99a61d295 (patch)
treecb6f90e4aadc387597c057ca21e6db39effcd775
parentaa36306977363038be233195ac5645edb3d71dab (diff)
downloadmongo-a7001db2c3a43c6632904a7a4c34f4d99a61d295.tar.gz
SERVER-58578 Timeseries bucket compression
All fields under data will be compressed using BSON Binary Subtype 7. Measurements are sorted by time field. Buckets are compressed in a separate update operation when the BucketCatalog is closing the bucket and no further writes to it will be performed. Compression operation are transparant for the user, if it fails for any reason the bucket will be left uncompressed.
-rw-r--r--jstests/core/timeseries/libs/timeseries.js10
-rw-r--r--jstests/core/timeseries/timeseries_bucket_limit_count.js9
-rw-r--r--jstests/core/timeseries/timeseries_bucket_limit_size.js22
-rw-r--r--jstests/core/timeseries/timeseries_bucket_limit_time_range.js26
-rw-r--r--jstests/core/timeseries/timeseries_idle_buckets.js43
-rw-r--r--src/mongo/bson/util/bsoncolumnbuilder.cpp10
-rw-r--r--src/mongo/bson/util/bsoncolumnbuilder.h7
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/write_commands.cpp161
-rw-r--r--src/mongo/db/exec/SConscript3
-rw-r--r--src/mongo/db/exec/bucket_unpacker_test.cpp86
-rw-r--r--src/mongo/db/ops/write_ops.cpp12
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h41
-rw-r--r--src/mongo/db/timeseries/SConscript13
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp70
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h68
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp132
-rw-r--r--src/mongo/db/timeseries/bucket_compression.cpp203
-rw-r--r--src/mongo/db/timeseries/bucket_compression.h50
-rw-r--r--src/mongo/db/timeseries/timeseries.idl7
-rw-r--r--src/mongo/db/update/SConscript2
-rw-r--r--src/mongo/db/update/object_transform_executor.cpp74
-rw-r--r--src/mongo/db/update/object_transform_executor.h83
-rw-r--r--src/mongo/db/update/object_transform_executor_test.cpp210
-rw-r--r--src/mongo/db/update/update_driver.cpp13
-rw-r--r--src/mongo/db/update/update_driver.h2
-rw-r--r--src/mongo/shell/bench.cpp3
27 files changed, 1151 insertions, 210 deletions
diff --git a/jstests/core/timeseries/libs/timeseries.js b/jstests/core/timeseries/libs/timeseries.js
index c9d27895566..88a3d38d705 100644
--- a/jstests/core/timeseries/libs/timeseries.js
+++ b/jstests/core/timeseries/libs/timeseries.js
@@ -11,6 +11,16 @@ var TimeseriesTest = class {
}
/**
+ * Returns whether time-series bucket compression are supported.
+ */
+ static timeseriesBucketCompressionEnabled(conn) {
+ return assert
+ .commandWorked(
+ conn.adminCommand({getParameter: 1, featureFlagTimeseriesBucketCompression: 1}))
+ .featureFlagTimeseriesBucketCompression.value;
+ }
+
+ /**
* Returns whether time-series updates and deletes are supported.
*/
static timeseriesUpdatesAndDeletesEnabled(conn) {
diff --git a/jstests/core/timeseries/timeseries_bucket_limit_count.js b/jstests/core/timeseries/timeseries_bucket_limit_count.js
index 5f6869cc838..7183a561ec6 100644
--- a/jstests/core/timeseries/timeseries_bucket_limit_count.js
+++ b/jstests/core/timeseries/timeseries_bucket_limit_count.js
@@ -13,6 +13,9 @@
load("jstests/core/timeseries/libs/timeseries.js");
TimeseriesTest.run((insert) => {
+ const isTimeseriesBucketCompressionEnabled =
+ TimeseriesTest.timeseriesBucketCompressionEnabled(db);
+
const collNamePrefix = 'timeseries_bucket_limit_count_';
// Assumes each bucket has a limit of 1000 measurements.
@@ -68,6 +71,9 @@ TimeseriesTest.run((insert) => {
assert.eq(bucketMaxCount - 1,
bucketDocs[0].control.max.x,
'invalid control.max for x in first bucket: ' + tojson(bucketDocs));
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
// Second bucket should contain the remaining documents.
assert.eq(bucketMaxCount,
@@ -82,6 +88,9 @@ TimeseriesTest.run((insert) => {
assert.eq(numDocs - 1,
bucketDocs[1].control.max.x,
'invalid control.max for x in second bucket: ' + tojson(bucketDocs));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
};
runTest(1);
diff --git a/jstests/core/timeseries/timeseries_bucket_limit_size.js b/jstests/core/timeseries/timeseries_bucket_limit_size.js
index bb29d1141a8..2d410f9eb1c 100644
--- a/jstests/core/timeseries/timeseries_bucket_limit_size.js
+++ b/jstests/core/timeseries/timeseries_bucket_limit_size.js
@@ -13,17 +13,21 @@
load("jstests/core/timeseries/libs/timeseries.js");
TimeseriesTest.run((insert) => {
+ const isTimeseriesBucketCompressionEnabled =
+ TimeseriesTest.timeseriesBucketCompressionEnabled(db);
+
const collNamePrefix = 'timeseries_bucket_limit_size_';
const timeFieldName = 'time';
// Assumes each bucket has a limit of 125kB on the measurements stored in the 'data' field.
const bucketMaxSizeKB = 125;
- const numDocs = 2;
+ const numDocs = 3;
- // The measurement data should not take up all of the 'bucketMaxSizeKB' limit because we need
- // to leave a little room for the _id and the time fields.
- const largeValue = 'x'.repeat((bucketMaxSizeKB - 1) * 1024);
+ // The measurement data should not take up all of the 'bucketMaxSizeKB' limit because we need to
+ // leave a little room for the _id and the time fields. We need to fit two measurements within
+ // this limit to trigger compression if enabled.
+ const largeValue = 'x'.repeat(((bucketMaxSizeKB - 1) / 2) * 1024);
const runTest = function(numDocsPerInsert) {
const coll = db.getCollection(collNamePrefix + numDocsPerInsert);
@@ -58,7 +62,7 @@ TimeseriesTest.run((insert) => {
assert.eq(2, bucketDocs.length, bucketDocs);
// Check both buckets.
- // First bucket should be full with one document since we spill the second document over
+ // First bucket should be full with two documents since we spill the third document over
// into the second bucket due to size constraints on 'data'.
assert.eq(0,
bucketDocs[0].control.min._id,
@@ -66,12 +70,15 @@ TimeseriesTest.run((insert) => {
assert.eq(largeValue,
bucketDocs[0].control.min.x,
'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control));
- assert.eq(0,
+ assert.eq(1,
bucketDocs[0].control.max._id,
'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
assert.eq(largeValue,
bucketDocs[0].control.max.x,
'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
// Second bucket should contain the remaining document.
assert.eq(numDocs - 1,
@@ -86,6 +93,9 @@ TimeseriesTest.run((insert) => {
assert.eq(largeValue,
bucketDocs[1].control.max.x,
'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
};
runTest(1);
diff --git a/jstests/core/timeseries/timeseries_bucket_limit_time_range.js b/jstests/core/timeseries/timeseries_bucket_limit_time_range.js
index a46c8f52bf6..e2ece34dbb5 100644
--- a/jstests/core/timeseries/timeseries_bucket_limit_time_range.js
+++ b/jstests/core/timeseries/timeseries_bucket_limit_time_range.js
@@ -13,13 +13,23 @@
load("jstests/core/timeseries/libs/timeseries.js");
TimeseriesTest.run((insert) => {
+ const isTimeseriesBucketCompressionEnabled =
+ TimeseriesTest.timeseriesBucketCompressionEnabled(db);
+
const collNamePrefix = 'timeseries_bucket_limit_time_range_';
const timeFieldName = 'time';
// Assumes the measurements in each bucket span at most one hour (based on the time field).
- const docTimes = [ISODate("2020-11-13T01:00:00Z"), ISODate("2020-11-13T03:00:00Z")];
- const numDocs = 2;
+ // Make sure we have three measurements to trigger compression if enabled. The data types in
+ // this test are so small so two measurements may not yield a smaller compressed object
+ const docTimes = [
+ ISODate("2020-11-13T01:00:00Z"),
+ ISODate("2020-11-13T01:00:01Z"),
+ ISODate("2020-11-13T01:00:02Z"),
+ ISODate("2020-11-13T03:00:00Z")
+ ];
+ const numDocs = 4;
const runTest = function(numDocsPerInsert) {
const coll = db.getCollection(collNamePrefix + numDocsPerInsert);
@@ -67,15 +77,18 @@ TimeseriesTest.run((insert) => {
assert.eq(docTimes[0],
bucketDocs[0].control.min[timeFieldName],
'invalid control.min for time in first bucket: ' + tojson(bucketDocs[0].control));
- assert.eq(0,
+ assert.eq(2,
bucketDocs[0].control.max._id,
'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
- assert.eq(0,
+ assert.eq(2,
bucketDocs[0].control.max.x,
'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
- assert.eq(docTimes[0],
+ assert.eq(docTimes[2],
bucketDocs[0].control.max[timeFieldName],
'invalid control.max for time in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
// Second bucket should contain the remaining document.
assert.eq(numDocs - 1,
@@ -98,6 +111,9 @@ TimeseriesTest.run((insert) => {
docTimes[numDocs - 1],
bucketDocs[1].control.max[timeFieldName],
'invalid control.max for time in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
};
runTest(1);
diff --git a/jstests/core/timeseries/timeseries_idle_buckets.js b/jstests/core/timeseries/timeseries_idle_buckets.js
index b0d183b493a..dc402a7fadb 100644
--- a/jstests/core/timeseries/timeseries_idle_buckets.js
+++ b/jstests/core/timeseries/timeseries_idle_buckets.js
@@ -14,11 +14,15 @@
load("jstests/core/timeseries/libs/timeseries.js");
TimeseriesTest.run((insert) => {
+ const isTimeseriesBucketCompressionEnabled =
+ TimeseriesTest.timeseriesBucketCompressionEnabled(db);
+
const coll = db.timeseries_idle_buckets;
const bucketsColl = db.getCollection('system.buckets.' + coll.getName());
const timeFieldName = 'time';
const metaFieldName = 'meta';
+ const valueFieldName = 'value';
coll.drop();
assert.commandWorked(db.createCollection(
@@ -30,18 +34,44 @@ TimeseriesTest.run((insert) => {
const numDocs = 100;
const metaValue = 'a'.repeat(1024 * 1024);
for (let i = 0; i < numDocs; i++) {
- assert.commandWorked(insert(
- coll, {[timeFieldName]: ISODate(), [metaFieldName]: {[i.toString()]: metaValue}}));
+ // Insert a couple of measurements in the bucket to make sure compression is triggered if
+ // enabled
+ assert.commandWorked(insert(coll, {
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: {[i.toString()]: metaValue},
+ [valueFieldName]: 0
+ }));
+ assert.commandWorked(insert(coll, {
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: {[i.toString()]: metaValue},
+ [valueFieldName]: 1
+ }));
+ assert.commandWorked(insert(coll, {
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: {[i.toString()]: metaValue},
+ [valueFieldName]: 3
+ }));
}
// Insert a document with the metadata of a bucket which should have been expired. Thus, a new
// bucket will be created.
- assert.commandWorked(
- insert(coll, {[timeFieldName]: ISODate(), [metaFieldName]: {0: metaValue}}));
- let bucketDocs = bucketsColl.find({meta: {0: metaValue}}).toArray();
+ assert.commandWorked(insert(
+ coll, {[timeFieldName]: ISODate(), [metaFieldName]: {0: metaValue}, [valueFieldName]: 3}));
+
+ // Check buckets.
+ let bucketDocs =
+ bucketsColl.find({meta: {0: metaValue}}).sort({'control.min._id': 1}).toArray();
assert.eq(
bucketDocs.length, 2, 'Invalid number of buckets for metadata 0: ' + tojson(bucketDocs));
+ // If bucket compression is enabled the expired bucket should have been compressed
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in second bucket: ' + tojson(bucketDocs));
+
// Insert a document with the metadata of a bucket with should still be open. Thus, the existing
// bucket will be used.
assert.commandWorked(
@@ -51,5 +81,8 @@ TimeseriesTest.run((insert) => {
bucketDocs.length,
1,
'Invalid number of buckets for metadata ' + (numDocs - 1) + ': ' + tojson(bucketDocs));
+ assert.eq(1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in second bucket: ' + tojson(bucketDocs));
});
})();
diff --git a/src/mongo/bson/util/bsoncolumnbuilder.cpp b/src/mongo/bson/util/bsoncolumnbuilder.cpp
index 1d7ba9334c4..c7dee5223cb 100644
--- a/src/mongo/bson/util/bsoncolumnbuilder.cpp
+++ b/src/mongo/bson/util/bsoncolumnbuilder.cpp
@@ -60,13 +60,18 @@ std::pair<int64_t, uint8_t> scaleAndEncodeDouble(double value, uint8_t minScaleI
} // namespace
BSONColumnBuilder::BSONColumnBuilder(StringData fieldName)
+ : BSONColumnBuilder(fieldName, BufBuilder()) {}
+
+BSONColumnBuilder::BSONColumnBuilder(StringData fieldName, BufBuilder&& builder)
: _simple8bBuilder64(_createBufferWriter()),
_simple8bBuilder128(_createBufferWriter()),
_scaleIndex(Simple8bTypeUtil::kMemoryAsInteger),
+ _bufBuilder(std::move(builder)),
_fieldName(fieldName) {
// Leave space for element count at the beginning
static_assert(sizeof(_elementCount) == kElementCountBytes,
"Element count for BSONColumn should be 4 bytes");
+ _bufBuilder.reset();
_bufBuilder.skip(kElementCountBytes);
// Store EOO type with empty field name as previous.
_storePrevious(BSONElement());
@@ -370,6 +375,11 @@ BSONBinData BSONColumnBuilder::finalize() {
return {_bufBuilder.buf(), _bufBuilder.len(), BinDataType::Column};
}
+BufBuilder BSONColumnBuilder::detach() {
+ return std::move(_bufBuilder);
+}
+
+
void BSONColumnBuilder::_storePrevious(BSONElement elem) {
auto valuesize = elem.valuesize();
diff --git a/src/mongo/bson/util/bsoncolumnbuilder.h b/src/mongo/bson/util/bsoncolumnbuilder.h
index 449a08e9a2c..036f30cccf9 100644
--- a/src/mongo/bson/util/bsoncolumnbuilder.h
+++ b/src/mongo/bson/util/bsoncolumnbuilder.h
@@ -45,6 +45,7 @@ namespace mongo {
class BSONColumnBuilder {
public:
BSONColumnBuilder(StringData fieldName);
+ BSONColumnBuilder(StringData fieldName, BufBuilder&& builder);
BSONColumnBuilder(BSONColumnBuilder&&) = delete;
/**
@@ -84,6 +85,12 @@ public:
*/
BSONBinData finalize();
+ /**
+ * Detaches the buffer associated with this BSONColumnBuilder. Allows the memory to be reused
+ * for building another BSONColumn.
+ */
+ BufBuilder detach();
+
private:
BSONElement _previous() const;
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 712dd0d573e..3d72dd2dbd7 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -363,6 +363,7 @@ env.Library(
'$BUILD_DIR/mongo/db/storage/storage_engine_common',
"$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl",
'$BUILD_DIR/mongo/db/timeseries/bucket_catalog',
+ '$BUILD_DIR/mongo/db/timeseries/bucket_compression',
'$BUILD_DIR/mongo/db/timeseries/catalog_helper',
'$BUILD_DIR/mongo/db/timeseries/timeseries_commands_conversion_helper',
'$BUILD_DIR/mongo/db/timeseries/timeseries_index_schema_conversion_functions',
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index d2b41047053..ceafad09b8a 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
#include "mongo/base/checked_cast.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/mutable/document.h"
@@ -65,12 +67,14 @@
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/timeseries/bucket_catalog.h"
+#include "mongo/db/timeseries/bucket_compression.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
#include "mongo/db/timeseries/timeseries_update_delete_util.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/db/write_concern.h"
+#include "mongo/logv2/log.h"
#include "mongo/logv2/redaction.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/fail_point.h"
@@ -209,6 +213,20 @@ write_ops::UpdateOpEntry makeTimeseriesUpdateOpEntry(
}
/**
+ * Transforms a single time-series insert to an update request on an existing bucket.
+ */
+write_ops::UpdateOpEntry makeTimeseriesCompressionOpEntry(
+ OperationContext* opCtx,
+ const OID& bucketId,
+ write_ops::UpdateModification::TransformFunc compressionFunc) {
+ write_ops::UpdateModification u(std::move(compressionFunc));
+ write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u));
+ invariant(!update.getMulti(), bucketId.toString());
+ invariant(!update.getUpsert(), bucketId.toString());
+ return update;
+}
+
+/**
* Returns the document for inserting a new bucket.
*/
BSONObj makeTimeseriesInsertDocument(std::shared_ptr<BucketCatalog::WriteBatch> batch,
@@ -611,6 +629,27 @@ public:
return op;
}
+ write_ops::UpdateCommandRequest _makeTimeseriesCompressionOp(
+ OperationContext* opCtx,
+ const OID& bucketId,
+ write_ops::UpdateModification::TransformFunc compressionFunc) const {
+ write_ops::UpdateCommandRequest op(
+ makeTimeseriesBucketsNamespace(ns()),
+ {makeTimeseriesCompressionOpEntry(opCtx, bucketId, std::move(compressionFunc))});
+
+ write_ops::WriteCommandRequestBase base;
+ // The schema validation configured in the bucket collection is intended for direct
+ // operations by end users and is not applicable here.
+ base.setBypassDocumentValidation(true);
+
+ // Timeseries compression operation is not a user operation and should not use a
+ // statement id from any user op. Set to Uninitialized to bypass.
+ base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId});
+
+ op.setWriteCommandRequestBase(std::move(base));
+ return op;
+ }
+
StatusWith<SingleWriteResult> _performTimeseriesInsert(
OperationContext* opCtx,
std::shared_ptr<BucketCatalog::WriteBatch> batch,
@@ -641,6 +680,40 @@ public:
OperationSource::kTimeseriesInsert));
}
+ StatusWith<SingleWriteResult> _performTimeseriesBucketCompression(
+ OperationContext* opCtx, const BucketCatalog::ClosedBucket& closedBucket) const {
+ if (!feature_flags::gTimeseriesBucketCompression.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ return SingleWriteResult();
+ }
+
+ // Buckets with just a single measurement is not worth compressing.
+ if (closedBucket.numMeasurements <= 1) {
+ return SingleWriteResult();
+ }
+
+ auto bucketCompressionFunc =
+ [&closedBucket](const BSONObj& bucketDoc) -> boost::optional<BSONObj> {
+ auto compressed = timeseries::compressBucket(bucketDoc, closedBucket.timeField);
+ // If compressed object size is larger than uncompressed, skip compression update.
+ if (compressed && compressed->objsize() > bucketDoc.objsize()) {
+ LOGV2_DEBUG(5857802,
+ 1,
+ "Skipping time-series bucket compression, compressed object is "
+ "larger than original",
+ "originalSize"_attr = bucketDoc.objsize(),
+ "compressedSize"_attr = compressed->objsize());
+ return boost::none;
+ }
+ return compressed;
+ };
+
+ return _getTimeseriesSingleWriteResult(write_ops_exec::performUpdates(
+ opCtx,
+ _makeTimeseriesCompressionOp(opCtx, closedBucket.bucketId, bucketCompressionFunc),
+ OperationSource::kStandard));
+ }
+
void _commitTimeseriesBucket(OperationContext* opCtx,
std::shared_ptr<BucketCatalog::WriteBatch> batch,
size_t start,
@@ -705,8 +778,19 @@ public:
getOpTimeAndElectionId(opCtx, opTime, electionId);
- bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
+ auto closedBucket =
+ bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
+
batchGuard.dismiss();
+
+ if (closedBucket) {
+ // If this write closed a bucket, compress the bucket
+ auto result = _performTimeseriesBucketCompression(opCtx, *closedBucket);
+ if (auto error = generateError(opCtx, result, start + index, errors->size())) {
+ errors->push_back(*error);
+ return;
+ }
+ }
}
bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx,
@@ -769,21 +853,38 @@ public:
getOpTimeAndElectionId(opCtx, opTime, electionId);
+ bool compressClosedBuckets = true;
for (auto batch : batchesToCommit) {
- bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
+ auto closedBucket =
+ bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
batch.get().reset();
+
+ if (!closedBucket || !compressClosedBuckets) {
+ continue;
+ }
+
+ // If this write closed a bucket, compress the bucket
+ auto ret = _performTimeseriesBucketCompression(opCtx, *closedBucket);
+ if (!ret.isOK()) {
+ // Don't try to compress any other buckets if we fail. We're not allowed to do
+ // more write operations.
+ compressClosedBuckets = false;
+ }
}
return true;
}
- std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t> _insertIntoBucketCatalog(
- OperationContext* opCtx,
- size_t start,
- size_t numDocs,
- const std::vector<size_t>& indices,
- std::vector<BSONObj>* errors,
- bool* containsRetry) const {
+ std::tuple<TimeseriesBatches,
+ TimeseriesStmtIds,
+ size_t /* numInserted */,
+ bool /* canContinue */>
+ _insertIntoBucketCatalog(OperationContext* opCtx,
+ size_t start,
+ size_t numDocs,
+ const std::vector<size_t>& indices,
+ std::vector<BSONObj>* errors,
+ bool* containsRetry) const {
auto& bucketCatalog = BucketCatalog::get(opCtx);
auto bucketsNs = makeTimeseriesBucketsNamespace(ns());
@@ -800,6 +901,7 @@ public:
TimeseriesBatches batches;
TimeseriesStmtIds stmtIds;
+ bool canContinue = true;
auto insert = [&](size_t index) {
invariant(start + index < request().getDocuments().size());
@@ -828,13 +930,32 @@ public:
errors->push_back(*error);
return false;
} else {
- const auto& batch = result.getValue();
+ const auto& batch = result.getValue().batch;
batches.emplace_back(batch, index);
if (isTimeseriesWriteRetryable(opCtx)) {
stmtIds[batch->bucket()].push_back(stmtId);
}
}
+ // If this insert closed buckets, rewrite to be a compressed column. If we cannot
+ // perform write operations at this point the bucket will be left uncompressed.
+ for (const auto& closedBucket : result.getValue().closedBuckets) {
+ if (!canContinue) {
+ break;
+ }
+
+ // If this write closed a bucket, compress the bucket
+ auto ret = _performTimeseriesBucketCompression(opCtx, closedBucket);
+ if (auto error = generateError(opCtx, ret, start + index, errors->size())) {
+ // Bucket compression only fail when we may not try to perform any other
+ // write operation. When handleError() inside write_ops_exec.cpp return
+ // false.
+ errors->push_back(*error);
+ canContinue = false;
+ return false;
+ }
+ }
+
return true;
};
@@ -843,12 +964,15 @@ public:
} else {
for (size_t i = 0; i < numDocs; i++) {
if (!insert(i) && request().getOrdered()) {
- return {std::move(batches), std::move(stmtIds), i};
+ return {std::move(batches), std::move(stmtIds), i, canContinue};
}
}
}
- return {std::move(batches), std::move(stmtIds), request().getDocuments().size()};
+ return {std::move(batches),
+ std::move(stmtIds),
+ request().getDocuments().size(),
+ canContinue};
}
void _getTimeseriesBatchResults(OperationContext* opCtx,
@@ -889,8 +1013,13 @@ public:
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry) const {
- auto [batches, stmtIds, numInserted] = _insertIntoBucketCatalog(
+ auto [batches, stmtIds, numInserted, canContinue] = _insertIntoBucketCatalog(
opCtx, 0, request().getDocuments().size(), {}, errors, containsRetry);
+ if (!canContinue) {
+ // If we are not allowed to continue with any write operation return true here to
+ // prevent the ordered inserts from being retried one by one.
+ return true;
+ }
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
@@ -942,13 +1071,17 @@ public:
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry) const {
- auto [batches, bucketStmtIds, _] =
+ auto [batches, bucketStmtIds, _, canContinue] =
_insertIntoBucketCatalog(opCtx, start, numDocs, indices, errors, containsRetry);
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
std::vector<size_t> docsToRetry;
+ if (!canContinue) {
+ return docsToRetry;
+ }
+
for (auto& [batch, index] : batches) {
if (batch->claimCommitRights()) {
auto stmtIds = isTimeseriesWriteRetryable(opCtx)
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
index 963d12c5372..88655a990db 100644
--- a/src/mongo/db/exec/SConscript
+++ b/src/mongo/db/exec/SConscript
@@ -151,7 +151,7 @@ env.CppUnitTest(
],
LIBDEPS=[
"$BUILD_DIR/mongo/base",
- "$BUILD_DIR/mongo/bson/util/bson_column",
+ "$BUILD_DIR/mongo/bson/util/bson_column",
"$BUILD_DIR/mongo/db/auth/authmocks",
"$BUILD_DIR/mongo/db/query/collation/collator_factory_mock",
"$BUILD_DIR/mongo/db/query/collation/collator_interface_mock",
@@ -159,6 +159,7 @@ env.CppUnitTest(
"$BUILD_DIR/mongo/db/query_exec",
"$BUILD_DIR/mongo/db/record_id_helpers",
"$BUILD_DIR/mongo/db/service_context_d_test_fixture",
+ "$BUILD_DIR/mongo/db/timeseries/bucket_compression",
"$BUILD_DIR/mongo/dbtests/mocklib",
"$BUILD_DIR/mongo/util/clock_source_mock",
"document_value/document_value",
diff --git a/src/mongo/db/exec/bucket_unpacker_test.cpp b/src/mongo/db/exec/bucket_unpacker_test.cpp
index fcb285dc94b..85e2bfb45d5 100644
--- a/src/mongo/db/exec/bucket_unpacker_test.cpp
+++ b/src/mongo/db/exec/bucket_unpacker_test.cpp
@@ -30,9 +30,9 @@
#include "mongo/platform/basic.h"
#include "mongo/bson/json.h"
-#include "mongo/bson/util/bsoncolumnbuilder.h"
#include "mongo/db/exec/bucket_unpacker.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/timeseries/bucket_compression.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -102,65 +102,6 @@ public:
BSONObj obj = root.obj();
return {obj, "time"_sd};
}
-
- // Simple bucket compressor. Does not handle data fields out of order and does not sort fields
- // on time.
- // TODO (SERVER-58578): Replace with real bucket compressor
- BSONObj compress(BSONObj uncompressed, StringData timeField) {
- // Rewrite data fields as columns.
- BSONObjBuilder builder;
- for (auto& elem : uncompressed) {
- if (elem.fieldNameStringData() == "control") {
- BSONObjBuilder control(builder.subobjStart("control"));
-
- // Set right version, leave other control fields unchanged
- for (const auto& controlField : elem.Obj()) {
- if (controlField.fieldNameStringData() == "version") {
- control.append("version", 2);
- } else {
- control.append(controlField);
- }
- }
-
- continue;
- }
- if (elem.fieldNameStringData() != "data") {
- // Non-data fields can be unchanged.
- builder.append(elem);
- continue;
- }
-
- BSONObjBuilder dataBuilder = builder.subobjStart("data");
- std::list<BSONColumnBuilder> columnBuilders;
- size_t numTimeFields = 0;
- for (auto& column : elem.Obj()) {
- // Compress all data fields
- columnBuilders.emplace_back(column.fieldNameStringData());
- auto& columnBuilder = columnBuilders.back();
-
- for (auto& measurement : column.Obj()) {
- int index = std::atoi(measurement.fieldName());
- for (int i = columnBuilder.size(); i < index; ++i) {
- columnBuilder.skip();
- }
- columnBuilder.append(measurement);
- }
- if (columnBuilder.fieldName() == timeField) {
- numTimeFields = columnBuilder.size();
- }
- }
-
- for (auto& builder : columnBuilders) {
- for (size_t i = builder.size(); i < numTimeFields; ++i) {
- builder.skip();
- }
-
- BSONBinData binData = builder.finalize();
- dataBuilder.append(builder.fieldName(), binData);
- }
- }
- return builder.obj();
- }
};
TEST_F(BucketUnpackerTest, UnpackBasicIncludeAllMeasurementFields) {
@@ -212,7 +153,7 @@ TEST_F(BucketUnpackerTest, ExcludeASingleField) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, EmptyIncludeGetsEmptyMeasurements) {
@@ -238,7 +179,7 @@ TEST_F(BucketUnpackerTest, EmptyIncludeGetsEmptyMeasurements) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, EmptyExcludeMaterializesAllFields) {
@@ -266,7 +207,7 @@ TEST_F(BucketUnpackerTest, EmptyExcludeMaterializesAllFields) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, SparseColumnsWhereOneColumnIsExhaustedBeforeTheOther) {
@@ -292,7 +233,7 @@ TEST_F(BucketUnpackerTest, SparseColumnsWhereOneColumnIsExhaustedBeforeTheOther)
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnpackBasicIncludeWithDollarPrefix) {
@@ -321,7 +262,7 @@ TEST_F(BucketUnpackerTest, UnpackBasicIncludeWithDollarPrefix) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, BucketsWithMetadataOnly) {
@@ -346,7 +287,7 @@ TEST_F(BucketUnpackerTest, BucketsWithMetadataOnly) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnorderedRowKeysDoesntAffectMaterialization) {
@@ -404,7 +345,7 @@ TEST_F(BucketUnpackerTest, MissingMetaFieldDoesntMaterializeMetadata) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, MissingMetaFieldDoesntMaterializeMetadataUnorderedKeys) {
@@ -459,7 +400,7 @@ TEST_F(BucketUnpackerTest, ExcludedMetaFieldDoesntMaterializeMetadataWhenBucketH
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUndefinedMeta) {
@@ -478,7 +419,7 @@ TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUndefinedMeta) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUnexpectedMeta) {
@@ -498,7 +439,7 @@ TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUnexpectedMeta) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, NullMetaInBucketMaterializesAsNull) {
@@ -525,7 +466,7 @@ TEST_F(BucketUnpackerTest, NullMetaInBucketMaterializesAsNull) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, GetNextHandlesMissingMetaInBucket) {
@@ -557,7 +498,7 @@ TEST_F(BucketUnpackerTest, GetNextHandlesMissingMetaInBucket) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, EmptyDataRegionInBucketIsTolerated) {
@@ -576,7 +517,6 @@ TEST_F(BucketUnpackerTest, EmptyDataRegionInBucketIsTolerated) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnEmptyBucket) {
diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp
index 5c60f26fb6c..ea387ae920c 100644
--- a/src/mongo/db/ops/write_ops.cpp
+++ b/src/mongo/db/ops/write_ops.cpp
@@ -247,6 +247,9 @@ write_ops::UpdateModification write_ops::UpdateModification::parseFromOplogEntry
write_ops::UpdateModification::UpdateModification(doc_diff::Diff diff, DiffOptions options)
: _update(DeltaUpdate{std::move(diff), options}) {}
+write_ops::UpdateModification::UpdateModification(TransformFunc transform)
+ : _update(TransformUpdate{std::move(transform)}) {}
+
write_ops::UpdateModification::UpdateModification(BSONElement update) {
const auto type = update.type();
if (type == BSONType::Object) {
@@ -297,7 +300,8 @@ int write_ops::UpdateModification::objsize() const {
return size + kWriteCommandBSONArrayPerElementOverheadBytes;
},
- [](const DeltaUpdate& delta) -> int { return delta.diff.objsize(); }},
+ [](const DeltaUpdate& delta) -> int { return delta.diff.objsize(); },
+ [](const TransformUpdate& transform) -> int { return 0; }},
_update);
}
@@ -307,7 +311,8 @@ write_ops::UpdateModification::Type write_ops::UpdateModification::type() const
visit_helper::Overloaded{
[](const ClassicUpdate& classic) { return Type::kClassic; },
[](const PipelineUpdate& pipelineUpdate) { return Type::kPipeline; },
- [](const DeltaUpdate& delta) { return Type::kDelta; }},
+ [](const DeltaUpdate& delta) { return Type::kDelta; },
+ [](const TransformUpdate& transform) { return Type::kTransform; }},
_update);
}
@@ -328,7 +333,8 @@ void write_ops::UpdateModification::serializeToBSON(StringData fieldName,
}
arrayBuilder.doneFast();
},
- [fieldName, bob](const DeltaUpdate& delta) { *bob << fieldName << delta.diff; }},
+ [fieldName, bob](const DeltaUpdate& delta) { *bob << fieldName << delta.diff; },
+ [](const TransformUpdate& transform) {}},
_update);
}
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index 788ae259722..4a70bfdc0da 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -79,7 +79,8 @@ repl::OpTime opTimeParser(BSONElement elem);
class UpdateModification {
public:
- enum class Type { kClassic, kPipeline, kDelta };
+ enum class Type { kClassic, kPipeline, kDelta, kTransform };
+ using TransformFunc = std::function<boost::optional<BSONObj>(const BSONObj&)>;
/**
* Used to indicate that a certain type of update is being passed to the constructor.
@@ -105,6 +106,8 @@ public:
UpdateModification(BSONElement update);
UpdateModification(std::vector<BSONObj> pipeline);
UpdateModification(doc_diff::Diff, DiffOptions);
+ // Creates an transform-style update. The transform function MUST preserve the _id element.
+ UpdateModification(TransformFunc transform);
// This constructor exists only to provide a fast-path for constructing classic-style updates.
UpdateModification(const BSONObj& update, ClassicTag);
@@ -141,6 +144,11 @@ public:
return stdx::get<DeltaUpdate>(_update).diff;
}
+ const TransformFunc& getTransform() const {
+ invariant(type() == Type::kTransform);
+ return stdx::get<TransformUpdate>(_update).transform;
+ }
+
bool mustCheckExistenceForInsertOperations() const {
invariant(type() == Type::kDelta);
return stdx::get<DeltaUpdate>(_update).options.mustCheckExistenceForInsertOperations;
@@ -149,19 +157,19 @@ public:
std::string toString() const {
StringBuilder sb;
- stdx::visit(visit_helper::Overloaded{[&sb](const ClassicUpdate& classic) {
- sb << "{type: Classic, update: " << classic.bson
- << "}";
- },
- [&sb](const PipelineUpdate& pipeline) {
- sb << "{type: Pipeline, update: "
- << Value(pipeline).toString() << "}";
- },
- [&sb](const DeltaUpdate& delta) {
- sb << "{type: Delta, update: " << delta.diff
- << "}";
- }},
- _update);
+ stdx::visit(
+ visit_helper::Overloaded{
+ [&sb](const ClassicUpdate& classic) {
+ sb << "{type: Classic, update: " << classic.bson << "}";
+ },
+ [&sb](const PipelineUpdate& pipeline) {
+ sb << "{type: Pipeline, update: " << Value(pipeline).toString() << "}";
+ },
+ [&sb](const DeltaUpdate& delta) {
+ sb << "{type: Delta, update: " << delta.diff << "}";
+ },
+ [&sb](const TransformUpdate& transform) { sb << "{type: Transform}"; }},
+ _update);
return sb.str();
}
@@ -176,7 +184,10 @@ private:
doc_diff::Diff diff;
DiffOptions options;
};
- stdx::variant<ClassicUpdate, PipelineUpdate, DeltaUpdate> _update;
+ struct TransformUpdate {
+ TransformFunc transform;
+ };
+ stdx::variant<ClassicUpdate, PipelineUpdate, DeltaUpdate, TransformUpdate> _update;
};
} // namespace write_ops
diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript
index 18ae4a3885b..505e7ba6cec 100644
--- a/src/mongo/db/timeseries/SConscript
+++ b/src/mongo/db/timeseries/SConscript
@@ -34,6 +34,19 @@ env.Library(
)
env.Library(
+ target='bucket_compression',
+ source=[
+ 'bucket_compression.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/bson/util/bson_column',
+ ]
+)
+
+env.Library(
target='timeseries_index_schema_conversion_functions',
source=[
'timeseries_index_schema_conversion_functions.cpp',
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 2780942ce78..3b5da81a3bb 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -177,7 +177,7 @@ BSONObj BucketCatalog::getMetadata(Bucket* ptr) const {
return bucket->_metadata.toBSON();
}
-StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
+StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
OperationContext* opCtx,
const NamespaceString& ns,
const StringData::ComparatorInterface* comparator,
@@ -204,7 +204,8 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
auto time = timeElem.Date();
- BucketAccess bucket{this, key, options, stats.get(), time};
+ ClosedBuckets closedBuckets;
+ BucketAccess bucket{this, key, options, stats.get(), &closedBuckets, time};
invariant(bucket);
NewFieldNames newFieldNamesToBeInserted;
@@ -239,7 +240,8 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
};
if (!bucket->_ns.isEmpty() && isBucketFull(&bucket)) {
- bucket.rollover(isBucketFull);
+ bucket.rollover(isBucketFull, &closedBuckets);
+
bucket->_calculateBucketFieldsAndSizeChange(doc,
options.getMetaField(),
&newFieldNamesToBeInserted,
@@ -273,7 +275,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
}
_memoryUsage.fetchAndAdd(bucket->_memoryUsage);
- return batch;
+ return InsertResult{batch, closedBuckets};
}
bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
@@ -302,10 +304,13 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
return true;
}
-void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info) {
+boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
+ std::shared_ptr<WriteBatch> batch, const CommitInfo& info) {
invariant(!batch->finished());
invariant(!batch->active());
+ boost::optional<ClosedBucket> closedBucket;
+
Bucket* ptr(batch->bucket());
batch->_finish(info);
@@ -346,6 +351,9 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
bucket.release();
auto lk = _lockExclusive();
+ closedBucket =
+ ClosedBucket{ptr->_id, ptr->getTimeField().toString(), ptr->numMeasurements()};
+
// Only remove from _allBuckets and _idleBuckets. If it was marked full, we know that
// happened in BucketAccess::rollover, and that there is already a new open bucket for
// this metadata.
@@ -359,6 +367,7 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
_markBucketIdle(bucket);
}
}
+ return closedBucket;
}
void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch,
@@ -580,18 +589,25 @@ void BucketCatalog::_verifyBucketIsUnused(Bucket* bucket) const {
stdx::lock_guard<Mutex> lk{bucket->_mutex};
}
-void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats) {
+void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats,
+ std::vector<BucketCatalog::ClosedBucket>* closedBuckets) {
// Must hold an exclusive lock on _bucketMutex from outside.
stdx::lock_guard lk{_idleMutex};
- // As long as we still need space and have entries, close idle buckets.
+ // As long as we still need space and have entries and remaining attempts, close idle buckets.
+ int32_t numClosed = 0;
while (!_idleBuckets.empty() &&
_memoryUsage.load() >
- static_cast<std::uint64_t>(gTimeseriesIdleBucketExpiryMemoryUsageThreshold)) {
+ static_cast<std::uint64_t>(gTimeseriesIdleBucketExpiryMemoryUsageThreshold) &&
+ numClosed <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) {
Bucket* bucket = _idleBuckets.back();
_verifyBucketIsUnused(bucket);
+ ClosedBucket closed{
+ bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()};
if (_removeBucket(bucket, true /* expiringBuckets */)) {
stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1);
+ closedBuckets->push_back(closed);
+ ++numClosed;
}
}
}
@@ -605,8 +621,9 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(const BucketKey& key,
const Date_t& time,
const TimeseriesOptions& options,
ExecutionStats* stats,
+ ClosedBuckets* closedBuckets,
bool openedDuetoMetadata) {
- _expireIdleBuckets(stats);
+ _expireIdleBuckets(stats, closedBuckets);
auto [it, inserted] = _allBuckets.insert(std::make_unique<Bucket>());
Bucket* bucket = it->get();
@@ -652,6 +669,7 @@ void BucketCatalog::_setIdTimestamp(Bucket* bucket,
auto roundedTime = timeseries::roundTimestampToGranularity(time, options.getGranularity());
auto const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch());
bucket->_id.setTimestamp(roundedSeconds);
+ bucket->_timeField = options.getTimeField().toString();
// Make sure we set the control.min time field to match the rounded _id timestamp.
auto controlDoc = buildControlMinTimestampDoc(options.getTimeField(), roundedTime);
@@ -760,6 +778,10 @@ const OID& BucketCatalog::Bucket::id() const {
return _id;
}
+StringData BucketCatalog::Bucket::getTimeField() {
+ return _timeField;
+}
+
void BucketCatalog::Bucket::_calculateBucketFieldsAndSizeChange(
const BSONObj& doc,
boost::optional<StringData> metaField,
@@ -806,6 +828,10 @@ bool BucketCatalog::Bucket::allCommitted() const {
return _batches.empty() && !_preparedBatch;
}
+uint32_t BucketCatalog::Bucket::numMeasurements() const {
+ return _numMeasurements;
+}
+
std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch(
OperationId opId, const std::shared_ptr<ExecutionStats>& stats) {
auto it = _batches.find(opId);
@@ -819,6 +845,7 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
BucketKey& key,
const TimeseriesOptions& options,
ExecutionStats* stats,
+ ClosedBuckets* closedBuckets,
const Date_t& time)
: _catalog(catalog), _key(&key), _options(&options), _stats(stats), _time(&time) {
@@ -875,7 +902,7 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
: key.withCopiedMetadata(BSONObj());
hashedKey.key = &originalBucketKey;
auto lk = _catalog->_lockExclusive();
- _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey);
+ _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets);
}
BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
@@ -977,11 +1004,13 @@ BucketCatalog::BucketState BucketCatalog::BucketAccess::_confirmStateForAcquired
}
void BucketCatalog::BucketAccess::_findOrCreateOpenBucketThenLock(
- const HashedBucketKey& normalizedKey, const HashedBucketKey& nonNormalizedKey) {
+ const HashedBucketKey& normalizedKey,
+ const HashedBucketKey& nonNormalizedKey,
+ ClosedBuckets* closedBuckets) {
auto it = _catalog->_openBuckets.find(normalizedKey);
if (it == _catalog->_openBuckets.end()) {
// No open bucket for this metadata.
- _create(normalizedKey, nonNormalizedKey);
+ _create(normalizedKey, nonNormalizedKey, closedBuckets);
return;
}
@@ -995,7 +1024,7 @@ void BucketCatalog::BucketAccess::_findOrCreateOpenBucketThenLock(
}
_catalog->_abort(_guard, _bucket, nullptr, boost::none);
- _create(normalizedKey, nonNormalizedKey);
+ _create(normalizedKey, nonNormalizedKey, closedBuckets);
}
void BucketCatalog::BucketAccess::_acquire() {
@@ -1005,10 +1034,11 @@ void BucketCatalog::BucketAccess::_acquire() {
void BucketCatalog::BucketAccess::_create(const HashedBucketKey& normalizedKey,
const HashedBucketKey& nonNormalizedKey,
+ ClosedBuckets* closedBuckets,
bool openedDuetoMetadata) {
invariant(_options);
- _bucket =
- _catalog->_allocateBucket(normalizedKey, *_time, *_options, _stats, openedDuetoMetadata);
+ _bucket = _catalog->_allocateBucket(
+ normalizedKey, *_time, *_options, _stats, closedBuckets, openedDuetoMetadata);
_catalog->_openBuckets[nonNormalizedKey] = _bucket;
_bucket->_nonNormalizedKeyMetadatas.push_back(nonNormalizedKey.key->metadata.toBSON());
_acquire();
@@ -1037,7 +1067,8 @@ BucketCatalog::BucketAccess::operator BucketCatalog::Bucket*() const {
return _bucket;
}
-void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess*)>& isBucketFull) {
+void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess*)>& isBucketFull,
+ ClosedBuckets* closedBuckets) {
invariant(isLocked());
invariant(_key);
invariant(_time);
@@ -1054,7 +1085,7 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess
auto hashedKey = BucketHasher{}.hashed_key(prevBucketKey);
auto lk = _catalog->_lockExclusive();
- _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey);
+ _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets);
// Recheck if still full now that we've reacquired the bucket.
bool sameBucket =
@@ -1064,6 +1095,9 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess
if (_bucket->allCommitted()) {
// The bucket does not contain any measurements that are yet to be committed, so we can
// remove it now. Otherwise, we must keep the bucket around until it is committed.
+ closedBuckets->push_back(ClosedBucket{
+ _bucket->id(), _bucket->getTimeField().toString(), _bucket->numMeasurements()});
+
oldBucket = _bucket;
release();
bool removed = _catalog->_removeBucket(oldBucket, false /* expiringBuckets */);
@@ -1077,7 +1111,7 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess
release();
}
- _create(hashedNormalizedKey, hashedKey, false /* openedDueToMetadata */);
+ _create(hashedNormalizedKey, hashedKey, closedBuckets, false /* openedDueToMetadata */);
}
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 44c59a9ede1..2c2b1994443 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -66,6 +66,16 @@ public:
};
/**
+ * Information of a Bucket that got closed when performing an operation on this BucketCatalog.
+ */
+ struct ClosedBucket {
+ OID bucketId;
+ std::string timeField;
+ uint32_t numMeasurements;
+ };
+ using ClosedBuckets = std::vector<ClosedBucket>;
+
+ /**
* The basic unit of work for a bucket. Each insert will return a shared_ptr to a WriteBatch.
* When a writer is finished with all their insertions, they should then take steps to ensure
* each batch they wrote into is committed. To ensure a batch is committed, a writer should
@@ -164,6 +174,15 @@ public:
SharedPromise<CommitInfo> _promise;
};
+ /**
+ * Return type for the insert function.
+ * See comment above insert() for more information.
+ */
+ struct InsertResult {
+ std::shared_ptr<WriteBatch> batch;
+ ClosedBuckets closedBuckets;
+ };
+
static BucketCatalog& get(ServiceContext* svcCtx);
static BucketCatalog& get(OperationContext* opCtx);
@@ -183,17 +202,16 @@ public:
BSONObj getMetadata(Bucket* bucket) const;
/**
- * Returns the WriteBatch into which the document was inserted. Any caller who receives the same
- * batch may commit or abort the batch after claiming commit rights. See WriteBatch for more
- * details.
+ * Returns the WriteBatch into which the document was inserted and optional information about a
+ * bucket if one was closed. Any caller who receives the same batch may commit or abort the
+ * batch after claiming commit rights. See WriteBatch for more details.
*/
- StatusWith<std::shared_ptr<WriteBatch>> insert(
- OperationContext* opCtx,
- const NamespaceString& ns,
- const StringData::ComparatorInterface* comparator,
- const TimeseriesOptions& options,
- const BSONObj& doc,
- CombineWithInsertsFromOtherClients combine);
+ StatusWith<InsertResult> insert(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine);
/**
* Prepares a batch for commit, transitioning it to an inactive state. Caller must already have
@@ -205,8 +223,10 @@ public:
/**
* Records the result of a batch commit. Caller must already have commit rights on batch, and
* batch must have been previously prepared.
+ *
+ * Returns bucket information of a bucket if one was closed.
*/
- void finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info);
+ boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info);
/**
* Aborts the given write batch and any other outstanding batches on the same bucket. Caller
@@ -342,10 +362,20 @@ public:
const OID& id() const;
/**
+ * Returns the timefield for the underlying bucket.
+ */
+ StringData getTimeField();
+
+ /**
* Returns whether all measurements have been committed.
*/
bool allCommitted() const;
+ /**
+ * Returns total number of measurements in the bucket.
+ */
+ uint32_t numMeasurements() const;
+
private:
/**
* Determines the effect of adding 'doc' to this bucket. If adding 'doc' causes this bucket
@@ -389,6 +419,9 @@ public:
// Top-level field names of the measurements that have been inserted into the bucket.
StringSet _fieldNames;
+ // Time field for the measurements that have been inserted into the bucket.
+ std::string _timeField;
+
// The minimum and maximum values for each field in the bucket.
timeseries::MinMax _minmax;
@@ -549,6 +582,7 @@ private:
BucketKey& key,
const TimeseriesOptions& options,
ExecutionStats* stats,
+ ClosedBuckets* closedBuckets,
const Date_t& time);
BucketAccess(BucketCatalog* catalog,
Bucket* bucket,
@@ -568,8 +602,11 @@ private:
* Parameter is a function which should check that the bucket is indeed still full after
* reacquiring the necessary locks. The first parameter will give the function access to
* this BucketAccess instance, with the bucket locked.
+ *
+ * Returns bucket information of a bucket if one was closed.
*/
- void rollover(const std::function<bool(BucketAccess*)>& isBucketFull);
+ void rollover(const std::function<bool(BucketAccess*)>& isBucketFull,
+ ClosedBuckets* closedBuckets);
// Retrieve the time associated with the bucket (id)
Date_t getTime() const;
@@ -607,7 +644,8 @@ private:
// Helper to find an open bucket for the given metadata if it exists, create it if it
// doesn't, and lock it. Requires an exclusive lock on the catalog.
void _findOrCreateOpenBucketThenLock(const HashedBucketKey& normalizedKey,
- const HashedBucketKey& key);
+ const HashedBucketKey& key,
+ ClosedBuckets* closedBuckets);
// Lock _bucket.
void _acquire();
@@ -616,6 +654,7 @@ private:
// a lock on it.
void _create(const HashedBucketKey& normalizedKey,
const HashedBucketKey& key,
+ ClosedBuckets* closedBuckets,
bool openedDuetoMetadata = true);
BucketCatalog* _catalog;
@@ -675,7 +714,7 @@ private:
/**
* Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold.
*/
- void _expireIdleBuckets(ExecutionStats* stats);
+ void _expireIdleBuckets(ExecutionStats* stats, ClosedBuckets* closedBuckets);
std::size_t _numberOfIdleBuckets() const;
@@ -684,6 +723,7 @@ private:
const Date_t& time,
const TimeseriesOptions& options,
ExecutionStats* stats,
+ ClosedBuckets* closedBuckets,
bool openedDuetoMetadata);
std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns);
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 8658b6807dd..02f056ff629 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -168,7 +168,7 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns,
_getTimeseriesOptions(ns),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch = result.getValue();
+ auto& batch = result.getValue().batch;
_commit(batch, numPreviouslyCommittedMeasurements);
}
@@ -187,7 +187,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto batch1 = result1.getValue();
+ auto batch1 = result1.getValue().batch;
ASSERT(batch1->claimCommitRights());
ASSERT(batch1->active());
@@ -200,7 +200,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto batch2 = result2.getValue();
+ auto batch2 = result2.getValue().batch;
ASSERT_EQ(batch1, batch2);
ASSERT(!batch2->claimCommitRights());
@@ -233,7 +233,8 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
auto bucket = batch->bucket();
_bucketCatalog->abort(batch);
@@ -264,20 +265,21 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
// Inserts should all be into three distinct buckets (and therefore batches).
- ASSERT_NE(result1.getValue(), result2.getValue());
- ASSERT_NE(result1.getValue(), result3.getValue());
- ASSERT_NE(result2.getValue(), result3.getValue());
+ ASSERT_NE(result1.getValue().batch, result2.getValue().batch);
+ ASSERT_NE(result1.getValue().batch, result3.getValue().batch);
+ ASSERT_NE(result2.getValue().batch, result3.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()),
- _bucketCatalog->getMetadata(result2.getValue()->bucket()));
- ASSERT(_bucketCatalog->getMetadata(result3.getValue()->bucket()).isEmpty());
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
+ ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucket()).isEmpty());
// Committing one bucket should only return the one document in that bucket and should not
// affect the other bucket.
- for (const auto& batch : {result1.getValue(), result2.getValue(), result3.getValue()}) {
+ for (const auto& batch :
+ {result1.getValue().batch, result2.getValue().batch, result3.getValue().batch}) {
_commit(batch, 0);
}
}
@@ -298,13 +300,13 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) {
BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("b" << 1 << "a" << 0))),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_EQ(result1.getValue(), result2.getValue());
+ ASSERT_EQ(result1.getValue().batch, result2.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
- _bucketCatalog->getMetadata(result2.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
@@ -327,17 +329,17 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
<< BSON("g" << 0 << "f" << 1))))),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_EQ(result1.getValue(), result2.getValue());
+ ASSERT_EQ(result1.getValue().batch, result2.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(
BSON(_metaField << BSONObj(BSON(
"c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(
BSON(_metaField << BSONObj(BSON(
"c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))),
- _bucketCatalog->getMetadata(result2.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
@@ -363,17 +365,17 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) {
<< "456"))))),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_EQ(result1.getValue(), result2.getValue());
+ ASSERT_EQ(result1.getValue().batch, result2.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
- _bucketCatalog->getMetadata(result2.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
@@ -393,16 +395,16 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
// Inserts should all be into three distinct buckets (and therefore batches).
- ASSERT_NE(result1.getValue(), result2.getValue());
+ ASSERT_NE(result1.getValue().batch, result2.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
- ASSERT(_bucketCatalog->getMetadata(result2.getValue()->bucket()).isEmpty());
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
+ ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucket()).isEmpty());
// Committing one bucket should only return the one document in that bucket and should not
// affect the other bucket.
- for (const auto& batch : {result1.getValue(), result2.getValue()}) {
+ for (const auto& batch : {result1.getValue().batch, result2.getValue().batch}) {
_commit(batch, 0);
}
}
@@ -444,7 +446,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch1->claimCommitRights());
_bucketCatalog->prepareCommit(batch1);
ASSERT_EQ(batch1->measurements().size(), 1);
@@ -458,7 +461,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch2);
_bucketCatalog->finish(batch1, {});
@@ -475,7 +479,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch = result.getValue();
+ auto& batch = result.getValue().batch;
_bucketCatalog->prepareCommit(batch);
}
@@ -486,7 +490,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotFinishUnpreparedBatch, "invariant") {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch = result.getValue();
+ auto& batch = result.getValue().batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->finish(batch, {});
}
@@ -499,7 +503,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucket()));
@@ -514,8 +519,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 0),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_OK(result);
- auto batch = result.getValue();
+ ASSERT(result.isOK());
+ auto batch = result.getValue().batch;
auto oldId = batch->bucket()->id();
_commit(batch, 0);
ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
@@ -530,8 +535,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 1),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_OK(result);
- batch = result.getValue();
+ ASSERT(result.isOK());
+ batch = result.getValue().batch;
_commit(batch, 1);
ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
@@ -542,8 +547,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_OK(result);
- batch = result.getValue();
+ ASSERT(result.isOK());
+ batch = result.getValue().batch;
_commit(batch, 2);
ASSERT_EQ(1U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
ASSERT(batch->newFieldNamesToBeInserted().count("b")) << batch->toBSON();
@@ -556,8 +561,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << i),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_OK(result);
- batch = result.getValue();
+ ASSERT(result.isOK());
+ batch = result.getValue().batch;
_commit(batch, i);
ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << i << ":" << batch->toBSON();
}
@@ -571,7 +576,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch2 = result2.getValue();
+ auto& batch2 = result2.getValue().batch;
ASSERT_NE(oldId, batch2->bucket()->id());
_commit(batch2, 0);
ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON();
@@ -587,7 +592,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch1->claimCommitRights());
_bucketCatalog->prepareCommit(batch1);
ASSERT_EQ(batch1->measurements().size(), 1);
@@ -601,7 +607,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch2);
ASSERT(batch2->claimCommitRights());
@@ -622,7 +629,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->clear(_ns1);
@@ -639,7 +647,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->prepareCommit(batch);
ASSERT_EQ(batch->measurements().size(), 1);
@@ -666,7 +675,8 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->prepareCommit(batch);
ASSERT_EQ(batch->measurements().size(), 1);
@@ -687,7 +697,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch1->claimCommitRights());
_bucketCatalog->prepareCommit(batch1);
ASSERT_EQ(batch1->measurements().size(), 1);
@@ -701,7 +712,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch2);
ASSERT_EQ(batch1->bucket(), batch2->bucket());
@@ -726,7 +738,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch3);
ASSERT_NE(batch2, batch3);
ASSERT_NE(batch1->bucket(), batch3->bucket());
@@ -748,7 +761,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->abort(batch);
@@ -769,7 +783,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
auto batch2 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -778,7 +793,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
auto batch3 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -787,7 +803,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
auto batch4 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -796,7 +813,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch2);
ASSERT_NE(batch1, batch3);
@@ -816,7 +834,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
auto batch2 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -825,7 +844,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch1->claimCommitRights());
ASSERT(batch2->claimCommitRights());
@@ -852,7 +872,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
auto batch2 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -861,7 +882,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
// Batch 2 is the first batch to commit the time field.
ASSERT(batch2->claimCommitRights());
diff --git a/src/mongo/db/timeseries/bucket_compression.cpp b/src/mongo/db/timeseries/bucket_compression.cpp
new file mode 100644
index 00000000000..06f65f5252e
--- /dev/null
+++ b/src/mongo/db/timeseries/bucket_compression.cpp
@@ -0,0 +1,203 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
+
+#include "mongo/db/timeseries/bucket_compression.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bsoncolumnbuilder.h"
+#include "mongo/db/timeseries/timeseries_constants.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+namespace timeseries {
+
+boost::optional<BSONObj> compressBucket(const BSONObj& bucketDoc, StringData timeFieldName) try {
+ // Helper for uncompressed measurements
+ struct Measurement {
+ BSONElement timeField;
+ std::vector<BSONElement> dataFields;
+ };
+
+ BSONObjBuilder builder; // builder to build the compressed bucket
+ std::vector<Measurement> measurements; // Extracted measurements from uncompressed bucket
+ boost::optional<BSONObjIterator> time; // Iterator to read time fields from uncompressed bucket
+ std::vector<std::pair<StringData, BSONObjIterator>>
+ columns; // Iterators to read data fields from uncompressed bucket
+
+ // Read everything from the uncompressed bucket
+ for (auto& elem : bucketDoc) {
+ // Control field is left as-is except for the version field.
+ if (elem.fieldNameStringData() == kBucketControlFieldName) {
+ BSONObjBuilder control(builder.subobjStart(kBucketControlFieldName));
+
+ // Set right version, leave other control fields unchanged
+ bool versionSet = false;
+ for (const auto& controlField : elem.Obj()) {
+ if (controlField.fieldNameStringData() == kBucketControlVersionFieldName) {
+ control.append(kBucketControlVersionFieldName,
+ kTimeseriesControlCompressedVersion);
+ versionSet = true;
+ } else {
+ control.append(controlField);
+ }
+ }
+
+ // Set version if it was missing from uncompressed bucket
+ if (!versionSet) {
+ control.append(kBucketControlVersionFieldName, kTimeseriesControlCompressedVersion);
+ }
+
+ continue;
+ }
+
+ // Everything that's not under data or control is left as-is
+ if (elem.fieldNameStringData() != kBucketDataFieldName) {
+ // Skip any updates to non-data fields.
+ builder.append(elem);
+ continue;
+ }
+
+ // Setup iterators to read all fields under data in lock-step
+ for (auto& columnObj : elem.Obj()) {
+ if (columnObj.fieldNameStringData() == timeFieldName) {
+ time.emplace(columnObj.Obj());
+ } else {
+ columns.emplace_back(columnObj.fieldNameStringData(), columnObj.Obj());
+ }
+ }
+ }
+
+ // If provided time field didn't exist then there is nothing to do
+ if (!time) {
+ return boost::none;
+ }
+
+ // Read all measurements from bucket
+ while (time->more()) {
+ // Get and advance the time iterator
+ auto timeElement = time->next();
+
+ // Get BSONElement's to all data elements. Missing data fields are represented as EOO.
+ Measurement measurement;
+ measurement.timeField = timeElement;
+ measurement.dataFields.resize(columns.size());
+
+ // Read one element from each data field iterator
+ for (size_t i = 0; i < columns.size(); ++i) {
+ auto& column = columns[i].second;
+ // If we reach the end nothing more to do, all subsequent elements will be left as
+ // EOO/missing.
+ if (!column.more()) {
+ continue;
+ }
+
+ // Check that the field name match the name of the time field. Field names are
+ // strings of integer indexes, "0", "1" etc. Data fields may have missing entries
+ // where the time field may not. So we can use this fact and just do a simple string
+ // compare against the time field. If it does not match our data field is skipped
+ // and the iterator is positioned at an element with a higher index. We should then
+ // leave the data field as EOO and not advance the data iterator.
+ auto elem = *column;
+ if (timeElement.fieldNameStringData() == elem.fieldNameStringData()) {
+ // Extract the element and advance the iterator
+ measurement.dataFields[i] = elem;
+ column.advance(elem);
+ }
+ }
+
+ measurements.push_back(std::move(measurement));
+ }
+
+ // Verify that we are at end for all data iterators, if we are not then there is something
+ // funky with the bucket and we have not read everything. We cannot compress as that would
+ // lose user data.
+ // This can happen if the bucket contain unordered keys in its data fields {"0": ..., "2":
+ // ..., "1": ...}. Or if there are more data fields than time fields.
+ if (std::any_of(columns.begin(), columns.end(), [](const auto& entry) {
+ return entry.second.more();
+ })) {
+ LOGV2_DEBUG(5857801,
+ 1,
+ "Failed to parse timeseries bucket during compression, leaving uncompressed");
+ return boost::none;
+ }
+
+ // Sort all the measurements on time order.
+ std::sort(measurements.begin(),
+ measurements.end(),
+ [](const Measurement& lhs, const Measurement& rhs) {
+ return lhs.timeField.timestamp() < rhs.timeField.timestamp();
+ });
+
+ // Last, compress elements and build compressed bucket
+ {
+ BSONObjBuilder dataBuilder = builder.subobjStart(kBucketDataFieldName);
+ BufBuilder columnBuffer; // Reusable buffer to avoid extra allocs per column.
+
+ // Add compressed time field first
+ {
+ BSONColumnBuilder timeColumn(timeFieldName, std::move(columnBuffer));
+ for (const auto& measurement : measurements) {
+ timeColumn.append(measurement.timeField);
+ }
+ dataBuilder.append(timeFieldName, timeColumn.finalize());
+ columnBuffer = timeColumn.detach();
+ }
+
+ // Then add compressed data fields.
+ for (size_t i = 0; i < columns.size(); ++i) {
+ BSONColumnBuilder column(columns[i].first, std::move(columnBuffer));
+ for (const auto& measurement : measurements) {
+ if (auto elem = measurement.dataFields[i]) {
+ column.append(elem);
+ } else {
+ column.skip();
+ }
+ }
+ dataBuilder.append(column.fieldName(), column.finalize());
+ columnBuffer = column.detach();
+ }
+ }
+
+ return builder.obj();
+} catch (...) {
+ // Skip compression if we encounter any exception
+ LOGV2_DEBUG(5857800,
+ 1,
+ "Exception when compressing timeseries bucket, leaving it uncompressed",
+ "error"_attr = exceptionToStatus());
+ return boost::none;
+}
+
+} // namespace timeseries
+} // namespace mongo
diff --git a/src/mongo/db/timeseries/bucket_compression.h b/src/mongo/db/timeseries/bucket_compression.h
new file mode 100644
index 00000000000..f28360bf786
--- /dev/null
+++ b/src/mongo/db/timeseries/bucket_compression.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+
+namespace timeseries {
+
+/**
+ * Returns a compressed timeseries bucket in v2 format for a given uncompressed v1 bucket and time
+ * field. The compressed bucket will have all measurements sorted by time.
+ *
+ * If bucket compression is not possible for any reason, boost::none is returned.
+ */
+boost::optional<BSONObj> compressBucket(const BSONObj& bucketDoc, StringData timeFieldName);
+
+} // namespace timeseries
+} // namespace mongo
diff --git a/src/mongo/db/timeseries/timeseries.idl b/src/mongo/db/timeseries/timeseries.idl
index c97e9cf1887..ba01d4e7121 100644
--- a/src/mongo/db/timeseries/timeseries.idl
+++ b/src/mongo/db/timeseries/timeseries.idl
@@ -54,6 +54,13 @@ server_parameters:
cpp_varname: "gTimeseriesIdleBucketExpiryMemoryUsageThreshold"
default: 104857600 # 100MB
validator: { gte: 1 }
+ "timeseriesIdleBucketExpiryMaxCountPerAttempt":
+ description: "The maximum number of buckets that may be closed due to expiry at each attempt"
+ set_at: [ startup ]
+ cpp_vartype: "std::int32_t"
+ cpp_varname: "gTimeseriesIdleBucketExpiryMaxCountPerAttempt"
+ default: 3
+ validator: { gte: 2 }
enums:
BucketGranularity:
diff --git a/src/mongo/db/update/SConscript b/src/mongo/db/update/SConscript
index 500fdde1adc..a8f0bd1dc2b 100644
--- a/src/mongo/db/update/SConscript
+++ b/src/mongo/db/update/SConscript
@@ -59,6 +59,7 @@ env.Library(
source=[
'delta_executor.cpp',
'object_replace_executor.cpp',
+ 'object_transform_executor.cpp',
'pipeline_executor.cpp',
],
LIBDEPS=[
@@ -125,6 +126,7 @@ env.CppUnitTest(
'field_checker_test.cpp',
'modifier_table_test.cpp',
'object_replace_executor_test.cpp',
+ 'object_transform_executor_test.cpp',
'path_support_test.cpp',
'pipeline_executor_test.cpp',
'pop_node_test.cpp',
diff --git a/src/mongo/db/update/object_transform_executor.cpp b/src/mongo/db/update/object_transform_executor.cpp
new file mode 100644
index 00000000000..597b7dbb335
--- /dev/null
+++ b/src/mongo/db/update/object_transform_executor.cpp
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/update/object_transform_executor.h"
+
+#include "mongo/db/update/object_replace_executor.h"
+#include "mongo/db/update/storage_validation.h"
+#include "mongo/db/update/update_oplog_entry_serialization.h"
+
+namespace mongo {
+
+ObjectTransformExecutor::ObjectTransformExecutor(TransformFunc transformFunc)
+ : _transformFunc(std::move(transformFunc)) {}
+
+UpdateExecutor::ApplyResult ObjectTransformExecutor::applyTransformUpdate(
+ ApplyParams applyParams, const TransformFunc& transformFunc) {
+
+ auto originalDoc = applyParams.element.getDocument().getObject();
+ auto transformedDoc = transformFunc(originalDoc);
+ if (!transformedDoc) {
+ return ApplyResult::noopResult();
+ }
+
+ return ObjectReplaceExecutor::applyReplacementUpdate(
+ std::move(applyParams),
+ *transformedDoc,
+ true /* replacementDocContainsIdField */,
+ true /* allowTopLevelDollarPrefixedFields */);
+}
+
+UpdateExecutor::ApplyResult ObjectTransformExecutor::applyUpdate(ApplyParams applyParams) const {
+ auto ret = applyTransformUpdate(applyParams, _transformFunc);
+
+ if (!ret.noop && applyParams.logMode != ApplyParams::LogMode::kDoNotGenerateOplogEntry) {
+ // Using a full replacement oplog entry as the current use case is doing major changes.
+ // Consider supporting v:2 update in the future.
+ ret.oplogEntry = update_oplog_entry::makeReplacementOplogEntry(
+ applyParams.element.getDocument().getObject());
+ }
+ return ret;
+}
+
+Value ObjectTransformExecutor::serialize() const {
+ MONGO_UNREACHABLE_TASSERT(5857810);
+}
+} // namespace mongo
diff --git a/src/mongo/db/update/object_transform_executor.h b/src/mongo/db/update/object_transform_executor.h
new file mode 100644
index 00000000000..aee80ac80b7
--- /dev/null
+++ b/src/mongo/db/update/object_transform_executor.h
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <functional>
+
+#include "mongo/db/update/update_executor.h"
+
+namespace mongo {
+
+/**
+ * An UpdateExecutor representing a transform-style update.
+ *
+ * A transform-style update is similar to a replacement-style update with the difference that the
+ * replacement document is calculated from the original document via a user-provided function. In
+ * case of a WriteConflict or similar where the update needs to be retried the output document will
+ * be re-calculated.
+ *
+ * The replacement function MUST preserve the _id element from the original document.
+ *
+ * If the replacement function returns boost::none the update operation will be considered a no-op.
+ */
+class ObjectTransformExecutor : public UpdateExecutor {
+public:
+ using TransformFunc = std::function<boost::optional<BSONObj>(const BSONObj&)>;
+
+ /**
+ * Applies a transform style update to 'applyParams.element'.
+ *
+ * The transform function MUST preserve the _id element in the original document.
+ *
+ * This function will ignore the log mode provided in 'applyParams'. The 'oplogEntry' field
+ * of the returned ApplyResult is always empty.
+ */
+ static ApplyResult applyTransformUpdate(ApplyParams applyParams,
+ const TransformFunc& transformFunc);
+
+ /**
+ * Initializes the node with the transform function to apply to the original document.
+ */
+ explicit ObjectTransformExecutor(TransformFunc transform);
+
+ /**
+ * Replaces the document that 'applyParams.element' belongs to with result of transform
+ * function. 'applyParams.element' must be the root of the document. Always returns a result
+ * stating that indexes are affected when the replacement is not a noop.
+ */
+ ApplyResult applyUpdate(ApplyParams applyParams) const final;
+
+ Value serialize() const final;
+
+private:
+ TransformFunc _transformFunc;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/update/object_transform_executor_test.cpp b/src/mongo/db/update/object_transform_executor_test.cpp
new file mode 100644
index 00000000000..06cca4da7cd
--- /dev/null
+++ b/src/mongo/db/update/object_transform_executor_test.cpp
@@ -0,0 +1,210 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/update/object_transform_executor.h"
+
+#include "mongo/bson/mutable/algorithm.h"
+#include "mongo/bson/mutable/mutable_bson_test_utils.h"
+#include "mongo/db/json.h"
+#include "mongo/db/update/update_node_test_fixture.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+using ObjectTransformExecutorTest = UpdateNodeTest;
+using mongo::mutablebson::countChildren;
+using mongo::mutablebson::Element;
+
+TEST_F(ObjectTransformExecutorTest, Noop) {
+ BSONObj input = fromjson("{a: 1, b: 2}");
+
+ ObjectTransformExecutor node([&input](const BSONObj& pre) {
+ ASSERT_BSONOBJ_BINARY_EQ(pre, input);
+ return pre;
+ });
+
+ mutablebson::Document doc(input);
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_TRUE(result.noop);
+ ASSERT_FALSE(result.indexesAffected);
+ ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc);
+ ASSERT_TRUE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{}"), result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, NoneNoop) {
+ ObjectTransformExecutor node([](const BSONObj& pre) { return boost::none; });
+
+ mutablebson::Document doc(fromjson("{a: 1, b: 2}"));
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_TRUE(result.noop);
+ ASSERT_FALSE(result.indexesAffected);
+ ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc);
+ ASSERT_TRUE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{}"), result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, Replace) {
+ BSONObj from = fromjson("{a: 1, b: 2}");
+ BSONObj to = fromjson("{a: 1, b: 3}");
+
+ ObjectTransformExecutor node([&from, &to](const BSONObj& pre) {
+ ASSERT_BSONOBJ_BINARY_EQ(pre, from);
+ return to;
+ });
+
+ mutablebson::Document doc(from);
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_FALSE(result.noop);
+ ASSERT_TRUE(result.indexesAffected);
+ ASSERT_EQUALS(to, doc);
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(to, result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, ShouldSucceedWhenImmutableIdIsNotModified) {
+ auto obj = fromjson("{_id: 0, a: 1, b: 2}");
+ auto to = fromjson("{_id: 0, c: 1, d: 2}");
+ ObjectTransformExecutor node([&obj, &to](const BSONObj& pre) {
+ ASSERT_BSONOBJ_BINARY_EQ(pre, obj);
+ return to;
+ });
+
+ mutablebson::Document doc(obj);
+ addImmutablePath("_id");
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_FALSE(result.noop);
+ ASSERT_TRUE(result.indexesAffected);
+ ASSERT_EQUALS(to, doc);
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(to, result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotRemoveImmutablePath) {
+ auto from = fromjson("{_id: 0, a: {b: 1}}");
+ auto obj = fromjson("{_id: 0, c: 1}");
+ ObjectTransformExecutor node([&from, &obj](const BSONObj& pre) {
+ ASSERT_BSONOBJ_BINARY_EQ(pre, from);
+ return obj;
+ });
+
+ mutablebson::Document doc(from);
+ addImmutablePath("a.b");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::ImmutableField,
+ "After applying the update, the 'a.b' (required and immutable) "
+ "field was found to have been removed --{ _id: 0, a: { b: 1 } }");
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotReplaceImmutablePathWithArrayField) {
+ auto obj = fromjson("{_id: 0, a: [{b: 1}]}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{_id: 0, a: {b: 1}}"));
+ addImmutablePath("a.b");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::NotSingleValueField,
+ "After applying the update to the document, the (immutable) field "
+ "'a.b' was found to be an array or array descendant.");
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotMakeImmutablePathArrayDescendant) {
+ auto obj = fromjson("{_id: 0, a: [1]}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{_id: 0, a: {'0': 1}}"));
+ addImmutablePath("a.0");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::NotSingleValueField,
+ "After applying the update to the document, the (immutable) field "
+ "'a.0' was found to be an array or array descendant.");
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotModifyImmutablePath) {
+ auto obj = fromjson("{_id: 0, a: {b: 2}}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{_id: 0, a: {b: 1}}"));
+ addImmutablePath("a.b");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::ImmutableField,
+ "After applying the update, the (immutable) field 'a.b' was found "
+ "to have been altered to b: 2");
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotModifyImmutableId) {
+ auto obj = fromjson("{_id: 1}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{_id: 0}"));
+ addImmutablePath("_id");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::ImmutableField,
+ "After applying the update, the (immutable) field '_id' was found "
+ "to have been altered to _id: 1");
+}
+
+TEST_F(ObjectTransformExecutorTest, CanAddImmutableField) {
+ auto obj = fromjson("{a: {b: 1}}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{c: 1}"));
+ addImmutablePath("a.b");
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_FALSE(result.noop);
+ ASSERT_TRUE(result.indexesAffected);
+ ASSERT_EQUALS(fromjson("{a: {b: 1}}"), doc);
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: {b: 1}}"), result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, CanAddImmutableId) {
+ auto obj = fromjson("{_id: 0}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{c: 1}"));
+ addImmutablePath("_id");
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_FALSE(result.noop);
+ ASSERT_TRUE(result.indexesAffected);
+ ASSERT_EQUALS(fromjson("{_id: 0}"), doc);
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0}"), result.oplogEntry);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/update/update_driver.cpp b/src/mongo/db/update/update_driver.cpp
index 1826267443d..2dd809aa99b 100644
--- a/src/mongo/db/update/update_driver.cpp
+++ b/src/mongo/db/update/update_driver.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/update/delta_executor.h"
#include "mongo/db/update/modifier_table.h"
#include "mongo/db/update/object_replace_executor.h"
+#include "mongo/db/update/object_transform_executor.h"
#include "mongo/db/update/path_support.h"
#include "mongo/db/update/storage_validation.h"
#include "mongo/db/update/update_oplog_entry_version.h"
@@ -165,6 +166,18 @@ void UpdateDriver::parse(
return;
}
+ if (updateMod.type() == write_ops::UpdateModification::Type::kTransform) {
+ uassert(5857811, "multi update is not supported for transform-style update", !multi);
+
+ uassert(5857812,
+ "arrayFilters may not be specified for transform-syle updates",
+ arrayFilters.empty());
+
+ _updateType = UpdateType::kTransform;
+ _updateExecutor = std::make_unique<ObjectTransformExecutor>(updateMod.getTransform());
+ return;
+ }
+
invariant(_updateType == UpdateType::kOperator);
// By this point we are expecting a "classic" update. This version of mongod only supports $v:
diff --git a/src/mongo/db/update/update_driver.h b/src/mongo/db/update/update_driver.h
index 18cce2c38b8..96f34b4cfaf 100644
--- a/src/mongo/db/update/update_driver.h
+++ b/src/mongo/db/update/update_driver.h
@@ -56,7 +56,7 @@ class OperationContext;
class UpdateDriver {
public:
- enum class UpdateType { kOperator, kReplacement, kPipeline, kDelta };
+ enum class UpdateType { kOperator, kReplacement, kPipeline, kDelta, kTransform };
UpdateDriver(const boost::intrusive_ptr<ExpressionContext>& expCtx);
diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp
index 79a20cdfc2d..88f5d685df0 100644
--- a/src/mongo/shell/bench.cpp
+++ b/src/mongo/shell/bench.cpp
@@ -1193,6 +1193,9 @@ void BenchRunOp::executeOnce(DBClientBase* conn,
// Delta updates are only executed on secondaries as part of oplog
// application.
MONGO_UNREACHABLE;
+ case write_ops::UpdateModification::Type::kTransform:
+ // It is not possible to run a transform update directly from a client.
+ MONGO_UNREACHABLE;
}
singleUpdate.append("multi", this->multi);
singleUpdate.append("upsert", this->upsert);