summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/timeseries/libs/timeseries.js10
-rw-r--r--jstests/core/timeseries/timeseries_bucket_limit_count.js9
-rw-r--r--jstests/core/timeseries/timeseries_bucket_limit_size.js22
-rw-r--r--jstests/core/timeseries/timeseries_bucket_limit_time_range.js26
-rw-r--r--jstests/core/timeseries/timeseries_idle_buckets.js43
-rw-r--r--src/mongo/bson/util/bsoncolumnbuilder.cpp10
-rw-r--r--src/mongo/bson/util/bsoncolumnbuilder.h7
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/write_commands.cpp161
-rw-r--r--src/mongo/db/exec/SConscript3
-rw-r--r--src/mongo/db/exec/bucket_unpacker_test.cpp86
-rw-r--r--src/mongo/db/ops/write_ops.cpp12
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h41
-rw-r--r--src/mongo/db/timeseries/SConscript13
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp70
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h68
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp132
-rw-r--r--src/mongo/db/timeseries/bucket_compression.cpp203
-rw-r--r--src/mongo/db/timeseries/bucket_compression.h50
-rw-r--r--src/mongo/db/timeseries/timeseries.idl7
-rw-r--r--src/mongo/db/update/SConscript2
-rw-r--r--src/mongo/db/update/object_transform_executor.cpp74
-rw-r--r--src/mongo/db/update/object_transform_executor.h83
-rw-r--r--src/mongo/db/update/object_transform_executor_test.cpp210
-rw-r--r--src/mongo/db/update/update_driver.cpp13
-rw-r--r--src/mongo/db/update/update_driver.h2
-rw-r--r--src/mongo/shell/bench.cpp3
27 files changed, 1151 insertions, 210 deletions
diff --git a/jstests/core/timeseries/libs/timeseries.js b/jstests/core/timeseries/libs/timeseries.js
index c9d27895566..88a3d38d705 100644
--- a/jstests/core/timeseries/libs/timeseries.js
+++ b/jstests/core/timeseries/libs/timeseries.js
@@ -11,6 +11,16 @@ var TimeseriesTest = class {
}
/**
+ * Returns whether time-series bucket compression are supported.
+ */
+ static timeseriesBucketCompressionEnabled(conn) {
+ return assert
+ .commandWorked(
+ conn.adminCommand({getParameter: 1, featureFlagTimeseriesBucketCompression: 1}))
+ .featureFlagTimeseriesBucketCompression.value;
+ }
+
+ /**
* Returns whether time-series updates and deletes are supported.
*/
static timeseriesUpdatesAndDeletesEnabled(conn) {
diff --git a/jstests/core/timeseries/timeseries_bucket_limit_count.js b/jstests/core/timeseries/timeseries_bucket_limit_count.js
index 5f6869cc838..7183a561ec6 100644
--- a/jstests/core/timeseries/timeseries_bucket_limit_count.js
+++ b/jstests/core/timeseries/timeseries_bucket_limit_count.js
@@ -13,6 +13,9 @@
load("jstests/core/timeseries/libs/timeseries.js");
TimeseriesTest.run((insert) => {
+ const isTimeseriesBucketCompressionEnabled =
+ TimeseriesTest.timeseriesBucketCompressionEnabled(db);
+
const collNamePrefix = 'timeseries_bucket_limit_count_';
// Assumes each bucket has a limit of 1000 measurements.
@@ -68,6 +71,9 @@ TimeseriesTest.run((insert) => {
assert.eq(bucketMaxCount - 1,
bucketDocs[0].control.max.x,
'invalid control.max for x in first bucket: ' + tojson(bucketDocs));
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
// Second bucket should contain the remaining documents.
assert.eq(bucketMaxCount,
@@ -82,6 +88,9 @@ TimeseriesTest.run((insert) => {
assert.eq(numDocs - 1,
bucketDocs[1].control.max.x,
'invalid control.max for x in second bucket: ' + tojson(bucketDocs));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
};
runTest(1);
diff --git a/jstests/core/timeseries/timeseries_bucket_limit_size.js b/jstests/core/timeseries/timeseries_bucket_limit_size.js
index bb29d1141a8..2d410f9eb1c 100644
--- a/jstests/core/timeseries/timeseries_bucket_limit_size.js
+++ b/jstests/core/timeseries/timeseries_bucket_limit_size.js
@@ -13,17 +13,21 @@
load("jstests/core/timeseries/libs/timeseries.js");
TimeseriesTest.run((insert) => {
+ const isTimeseriesBucketCompressionEnabled =
+ TimeseriesTest.timeseriesBucketCompressionEnabled(db);
+
const collNamePrefix = 'timeseries_bucket_limit_size_';
const timeFieldName = 'time';
// Assumes each bucket has a limit of 125kB on the measurements stored in the 'data' field.
const bucketMaxSizeKB = 125;
- const numDocs = 2;
+ const numDocs = 3;
- // The measurement data should not take up all of the 'bucketMaxSizeKB' limit because we need
- // to leave a little room for the _id and the time fields.
- const largeValue = 'x'.repeat((bucketMaxSizeKB - 1) * 1024);
+ // The measurement data should not take up all of the 'bucketMaxSizeKB' limit because we need to
+ // leave a little room for the _id and the time fields. We need to fit two measurements within
+ // this limit to trigger compression if enabled.
+ const largeValue = 'x'.repeat(((bucketMaxSizeKB - 1) / 2) * 1024);
const runTest = function(numDocsPerInsert) {
const coll = db.getCollection(collNamePrefix + numDocsPerInsert);
@@ -58,7 +62,7 @@ TimeseriesTest.run((insert) => {
assert.eq(2, bucketDocs.length, bucketDocs);
// Check both buckets.
- // First bucket should be full with one document since we spill the second document over
+ // First bucket should be full with two documents since we spill the third document over
// into the second bucket due to size constraints on 'data'.
assert.eq(0,
bucketDocs[0].control.min._id,
@@ -66,12 +70,15 @@ TimeseriesTest.run((insert) => {
assert.eq(largeValue,
bucketDocs[0].control.min.x,
'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control));
- assert.eq(0,
+ assert.eq(1,
bucketDocs[0].control.max._id,
'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
assert.eq(largeValue,
bucketDocs[0].control.max.x,
'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
// Second bucket should contain the remaining document.
assert.eq(numDocs - 1,
@@ -86,6 +93,9 @@ TimeseriesTest.run((insert) => {
assert.eq(largeValue,
bucketDocs[1].control.max.x,
'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
};
runTest(1);
diff --git a/jstests/core/timeseries/timeseries_bucket_limit_time_range.js b/jstests/core/timeseries/timeseries_bucket_limit_time_range.js
index a46c8f52bf6..e2ece34dbb5 100644
--- a/jstests/core/timeseries/timeseries_bucket_limit_time_range.js
+++ b/jstests/core/timeseries/timeseries_bucket_limit_time_range.js
@@ -13,13 +13,23 @@
load("jstests/core/timeseries/libs/timeseries.js");
TimeseriesTest.run((insert) => {
+ const isTimeseriesBucketCompressionEnabled =
+ TimeseriesTest.timeseriesBucketCompressionEnabled(db);
+
const collNamePrefix = 'timeseries_bucket_limit_time_range_';
const timeFieldName = 'time';
// Assumes the measurements in each bucket span at most one hour (based on the time field).
- const docTimes = [ISODate("2020-11-13T01:00:00Z"), ISODate("2020-11-13T03:00:00Z")];
- const numDocs = 2;
+ // Make sure we have three measurements to trigger compression if enabled. The data types in
+ // this test are so small so two measurements may not yield a smaller compressed object
+ const docTimes = [
+ ISODate("2020-11-13T01:00:00Z"),
+ ISODate("2020-11-13T01:00:01Z"),
+ ISODate("2020-11-13T01:00:02Z"),
+ ISODate("2020-11-13T03:00:00Z")
+ ];
+ const numDocs = 4;
const runTest = function(numDocsPerInsert) {
const coll = db.getCollection(collNamePrefix + numDocsPerInsert);
@@ -67,15 +77,18 @@ TimeseriesTest.run((insert) => {
assert.eq(docTimes[0],
bucketDocs[0].control.min[timeFieldName],
'invalid control.min for time in first bucket: ' + tojson(bucketDocs[0].control));
- assert.eq(0,
+ assert.eq(2,
bucketDocs[0].control.max._id,
'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control));
- assert.eq(0,
+ assert.eq(2,
bucketDocs[0].control.max.x,
'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control));
- assert.eq(docTimes[0],
+ assert.eq(docTimes[2],
bucketDocs[0].control.max[timeFieldName],
'invalid control.max for time in first bucket: ' + tojson(bucketDocs[0].control));
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
// Second bucket should contain the remaining document.
assert.eq(numDocs - 1,
@@ -98,6 +111,9 @@ TimeseriesTest.run((insert) => {
docTimes[numDocs - 1],
bucketDocs[1].control.max[timeFieldName],
'invalid control.max for time in second bucket: ' + tojson(bucketDocs[1].control));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
};
runTest(1);
diff --git a/jstests/core/timeseries/timeseries_idle_buckets.js b/jstests/core/timeseries/timeseries_idle_buckets.js
index b0d183b493a..dc402a7fadb 100644
--- a/jstests/core/timeseries/timeseries_idle_buckets.js
+++ b/jstests/core/timeseries/timeseries_idle_buckets.js
@@ -14,11 +14,15 @@
load("jstests/core/timeseries/libs/timeseries.js");
TimeseriesTest.run((insert) => {
+ const isTimeseriesBucketCompressionEnabled =
+ TimeseriesTest.timeseriesBucketCompressionEnabled(db);
+
const coll = db.timeseries_idle_buckets;
const bucketsColl = db.getCollection('system.buckets.' + coll.getName());
const timeFieldName = 'time';
const metaFieldName = 'meta';
+ const valueFieldName = 'value';
coll.drop();
assert.commandWorked(db.createCollection(
@@ -30,18 +34,44 @@ TimeseriesTest.run((insert) => {
const numDocs = 100;
const metaValue = 'a'.repeat(1024 * 1024);
for (let i = 0; i < numDocs; i++) {
- assert.commandWorked(insert(
- coll, {[timeFieldName]: ISODate(), [metaFieldName]: {[i.toString()]: metaValue}}));
+ // Insert a couple of measurements in the bucket to make sure compression is triggered if
+ // enabled
+ assert.commandWorked(insert(coll, {
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: {[i.toString()]: metaValue},
+ [valueFieldName]: 0
+ }));
+ assert.commandWorked(insert(coll, {
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: {[i.toString()]: metaValue},
+ [valueFieldName]: 1
+ }));
+ assert.commandWorked(insert(coll, {
+ [timeFieldName]: ISODate(),
+ [metaFieldName]: {[i.toString()]: metaValue},
+ [valueFieldName]: 3
+ }));
}
// Insert a document with the metadata of a bucket which should have been expired. Thus, a new
// bucket will be created.
- assert.commandWorked(
- insert(coll, {[timeFieldName]: ISODate(), [metaFieldName]: {0: metaValue}}));
- let bucketDocs = bucketsColl.find({meta: {0: metaValue}}).toArray();
+ assert.commandWorked(insert(
+ coll, {[timeFieldName]: ISODate(), [metaFieldName]: {0: metaValue}, [valueFieldName]: 3}));
+
+ // Check buckets.
+ let bucketDocs =
+ bucketsColl.find({meta: {0: metaValue}}).sort({'control.min._id': 1}).toArray();
assert.eq(
bucketDocs.length, 2, 'Invalid number of buckets for metadata 0: ' + tojson(bucketDocs));
+ // If bucket compression is enabled the expired bucket should have been compressed
+ assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in first bucket: ' + tojson(bucketDocs));
+ assert.eq(1,
+ bucketDocs[1].control.version,
+ 'unexpected control.version in second bucket: ' + tojson(bucketDocs));
+
// Insert a document with the metadata of a bucket with should still be open. Thus, the existing
// bucket will be used.
assert.commandWorked(
@@ -51,5 +81,8 @@ TimeseriesTest.run((insert) => {
bucketDocs.length,
1,
'Invalid number of buckets for metadata ' + (numDocs - 1) + ': ' + tojson(bucketDocs));
+ assert.eq(1,
+ bucketDocs[0].control.version,
+ 'unexpected control.version in second bucket: ' + tojson(bucketDocs));
});
})();
diff --git a/src/mongo/bson/util/bsoncolumnbuilder.cpp b/src/mongo/bson/util/bsoncolumnbuilder.cpp
index 1d7ba9334c4..c7dee5223cb 100644
--- a/src/mongo/bson/util/bsoncolumnbuilder.cpp
+++ b/src/mongo/bson/util/bsoncolumnbuilder.cpp
@@ -60,13 +60,18 @@ std::pair<int64_t, uint8_t> scaleAndEncodeDouble(double value, uint8_t minScaleI
} // namespace
BSONColumnBuilder::BSONColumnBuilder(StringData fieldName)
+ : BSONColumnBuilder(fieldName, BufBuilder()) {}
+
+BSONColumnBuilder::BSONColumnBuilder(StringData fieldName, BufBuilder&& builder)
: _simple8bBuilder64(_createBufferWriter()),
_simple8bBuilder128(_createBufferWriter()),
_scaleIndex(Simple8bTypeUtil::kMemoryAsInteger),
+ _bufBuilder(std::move(builder)),
_fieldName(fieldName) {
// Leave space for element count at the beginning
static_assert(sizeof(_elementCount) == kElementCountBytes,
"Element count for BSONColumn should be 4 bytes");
+ _bufBuilder.reset();
_bufBuilder.skip(kElementCountBytes);
// Store EOO type with empty field name as previous.
_storePrevious(BSONElement());
@@ -370,6 +375,11 @@ BSONBinData BSONColumnBuilder::finalize() {
return {_bufBuilder.buf(), _bufBuilder.len(), BinDataType::Column};
}
+BufBuilder BSONColumnBuilder::detach() {
+ return std::move(_bufBuilder);
+}
+
+
void BSONColumnBuilder::_storePrevious(BSONElement elem) {
auto valuesize = elem.valuesize();
diff --git a/src/mongo/bson/util/bsoncolumnbuilder.h b/src/mongo/bson/util/bsoncolumnbuilder.h
index 449a08e9a2c..036f30cccf9 100644
--- a/src/mongo/bson/util/bsoncolumnbuilder.h
+++ b/src/mongo/bson/util/bsoncolumnbuilder.h
@@ -45,6 +45,7 @@ namespace mongo {
class BSONColumnBuilder {
public:
BSONColumnBuilder(StringData fieldName);
+ BSONColumnBuilder(StringData fieldName, BufBuilder&& builder);
BSONColumnBuilder(BSONColumnBuilder&&) = delete;
/**
@@ -84,6 +85,12 @@ public:
*/
BSONBinData finalize();
+ /**
+ * Detaches the buffer associated with this BSONColumnBuilder. Allows the memory to be reused
+ * for building another BSONColumn.
+ */
+ BufBuilder detach();
+
private:
BSONElement _previous() const;
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 712dd0d573e..3d72dd2dbd7 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -363,6 +363,7 @@ env.Library(
'$BUILD_DIR/mongo/db/storage/storage_engine_common',
"$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl",
'$BUILD_DIR/mongo/db/timeseries/bucket_catalog',
+ '$BUILD_DIR/mongo/db/timeseries/bucket_compression',
'$BUILD_DIR/mongo/db/timeseries/catalog_helper',
'$BUILD_DIR/mongo/db/timeseries/timeseries_commands_conversion_helper',
'$BUILD_DIR/mongo/db/timeseries/timeseries_index_schema_conversion_functions',
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index d2b41047053..ceafad09b8a 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
#include "mongo/base/checked_cast.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/mutable/document.h"
@@ -65,12 +67,14 @@
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/timeseries/bucket_catalog.h"
+#include "mongo/db/timeseries/bucket_compression.h"
#include "mongo/db/timeseries/timeseries_constants.h"
#include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h"
#include "mongo/db/timeseries/timeseries_update_delete_util.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/db/write_concern.h"
+#include "mongo/logv2/log.h"
#include "mongo/logv2/redaction.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/fail_point.h"
@@ -209,6 +213,20 @@ write_ops::UpdateOpEntry makeTimeseriesUpdateOpEntry(
}
/**
+ * Transforms a single time-series insert to an update request on an existing bucket.
+ */
+write_ops::UpdateOpEntry makeTimeseriesCompressionOpEntry(
+ OperationContext* opCtx,
+ const OID& bucketId,
+ write_ops::UpdateModification::TransformFunc compressionFunc) {
+ write_ops::UpdateModification u(std::move(compressionFunc));
+ write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u));
+ invariant(!update.getMulti(), bucketId.toString());
+ invariant(!update.getUpsert(), bucketId.toString());
+ return update;
+}
+
+/**
* Returns the document for inserting a new bucket.
*/
BSONObj makeTimeseriesInsertDocument(std::shared_ptr<BucketCatalog::WriteBatch> batch,
@@ -611,6 +629,27 @@ public:
return op;
}
+ write_ops::UpdateCommandRequest _makeTimeseriesCompressionOp(
+ OperationContext* opCtx,
+ const OID& bucketId,
+ write_ops::UpdateModification::TransformFunc compressionFunc) const {
+ write_ops::UpdateCommandRequest op(
+ makeTimeseriesBucketsNamespace(ns()),
+ {makeTimeseriesCompressionOpEntry(opCtx, bucketId, std::move(compressionFunc))});
+
+ write_ops::WriteCommandRequestBase base;
+ // The schema validation configured in the bucket collection is intended for direct
+ // operations by end users and is not applicable here.
+ base.setBypassDocumentValidation(true);
+
+ // Timeseries compression operation is not a user operation and should not use a
+ // statement id from any user op. Set to Uninitialized to bypass.
+ base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId});
+
+ op.setWriteCommandRequestBase(std::move(base));
+ return op;
+ }
+
StatusWith<SingleWriteResult> _performTimeseriesInsert(
OperationContext* opCtx,
std::shared_ptr<BucketCatalog::WriteBatch> batch,
@@ -641,6 +680,40 @@ public:
OperationSource::kTimeseriesInsert));
}
+ StatusWith<SingleWriteResult> _performTimeseriesBucketCompression(
+ OperationContext* opCtx, const BucketCatalog::ClosedBucket& closedBucket) const {
+ if (!feature_flags::gTimeseriesBucketCompression.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ return SingleWriteResult();
+ }
+
+ // Buckets with just a single measurement is not worth compressing.
+ if (closedBucket.numMeasurements <= 1) {
+ return SingleWriteResult();
+ }
+
+ auto bucketCompressionFunc =
+ [&closedBucket](const BSONObj& bucketDoc) -> boost::optional<BSONObj> {
+ auto compressed = timeseries::compressBucket(bucketDoc, closedBucket.timeField);
+ // If compressed object size is larger than uncompressed, skip compression update.
+ if (compressed && compressed->objsize() > bucketDoc.objsize()) {
+ LOGV2_DEBUG(5857802,
+ 1,
+ "Skipping time-series bucket compression, compressed object is "
+ "larger than original",
+ "originalSize"_attr = bucketDoc.objsize(),
+ "compressedSize"_attr = compressed->objsize());
+ return boost::none;
+ }
+ return compressed;
+ };
+
+ return _getTimeseriesSingleWriteResult(write_ops_exec::performUpdates(
+ opCtx,
+ _makeTimeseriesCompressionOp(opCtx, closedBucket.bucketId, bucketCompressionFunc),
+ OperationSource::kStandard));
+ }
+
void _commitTimeseriesBucket(OperationContext* opCtx,
std::shared_ptr<BucketCatalog::WriteBatch> batch,
size_t start,
@@ -705,8 +778,19 @@ public:
getOpTimeAndElectionId(opCtx, opTime, electionId);
- bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
+ auto closedBucket =
+ bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
+
batchGuard.dismiss();
+
+ if (closedBucket) {
+ // If this write closed a bucket, compress the bucket
+ auto result = _performTimeseriesBucketCompression(opCtx, *closedBucket);
+ if (auto error = generateError(opCtx, result, start + index, errors->size())) {
+ errors->push_back(*error);
+ return;
+ }
+ }
}
bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx,
@@ -769,21 +853,38 @@ public:
getOpTimeAndElectionId(opCtx, opTime, electionId);
+ bool compressClosedBuckets = true;
for (auto batch : batchesToCommit) {
- bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
+ auto closedBucket =
+ bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
batch.get().reset();
+
+ if (!closedBucket || !compressClosedBuckets) {
+ continue;
+ }
+
+ // If this write closed a bucket, compress the bucket
+ auto ret = _performTimeseriesBucketCompression(opCtx, *closedBucket);
+ if (!ret.isOK()) {
+ // Don't try to compress any other buckets if we fail. We're not allowed to do
+ // more write operations.
+ compressClosedBuckets = false;
+ }
}
return true;
}
- std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t> _insertIntoBucketCatalog(
- OperationContext* opCtx,
- size_t start,
- size_t numDocs,
- const std::vector<size_t>& indices,
- std::vector<BSONObj>* errors,
- bool* containsRetry) const {
+ std::tuple<TimeseriesBatches,
+ TimeseriesStmtIds,
+ size_t /* numInserted */,
+ bool /* canContinue */>
+ _insertIntoBucketCatalog(OperationContext* opCtx,
+ size_t start,
+ size_t numDocs,
+ const std::vector<size_t>& indices,
+ std::vector<BSONObj>* errors,
+ bool* containsRetry) const {
auto& bucketCatalog = BucketCatalog::get(opCtx);
auto bucketsNs = makeTimeseriesBucketsNamespace(ns());
@@ -800,6 +901,7 @@ public:
TimeseriesBatches batches;
TimeseriesStmtIds stmtIds;
+ bool canContinue = true;
auto insert = [&](size_t index) {
invariant(start + index < request().getDocuments().size());
@@ -828,13 +930,32 @@ public:
errors->push_back(*error);
return false;
} else {
- const auto& batch = result.getValue();
+ const auto& batch = result.getValue().batch;
batches.emplace_back(batch, index);
if (isTimeseriesWriteRetryable(opCtx)) {
stmtIds[batch->bucket()].push_back(stmtId);
}
}
+ // If this insert closed buckets, rewrite to be a compressed column. If we cannot
+ // perform write operations at this point the bucket will be left uncompressed.
+ for (const auto& closedBucket : result.getValue().closedBuckets) {
+ if (!canContinue) {
+ break;
+ }
+
+ // If this write closed a bucket, compress the bucket
+ auto ret = _performTimeseriesBucketCompression(opCtx, closedBucket);
+ if (auto error = generateError(opCtx, ret, start + index, errors->size())) {
+ // Bucket compression only fail when we may not try to perform any other
+ // write operation. When handleError() inside write_ops_exec.cpp return
+ // false.
+ errors->push_back(*error);
+ canContinue = false;
+ return false;
+ }
+ }
+
return true;
};
@@ -843,12 +964,15 @@ public:
} else {
for (size_t i = 0; i < numDocs; i++) {
if (!insert(i) && request().getOrdered()) {
- return {std::move(batches), std::move(stmtIds), i};
+ return {std::move(batches), std::move(stmtIds), i, canContinue};
}
}
}
- return {std::move(batches), std::move(stmtIds), request().getDocuments().size()};
+ return {std::move(batches),
+ std::move(stmtIds),
+ request().getDocuments().size(),
+ canContinue};
}
void _getTimeseriesBatchResults(OperationContext* opCtx,
@@ -889,8 +1013,13 @@ public:
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry) const {
- auto [batches, stmtIds, numInserted] = _insertIntoBucketCatalog(
+ auto [batches, stmtIds, numInserted, canContinue] = _insertIntoBucketCatalog(
opCtx, 0, request().getDocuments().size(), {}, errors, containsRetry);
+ if (!canContinue) {
+ // If we are not allowed to continue with any write operation return true here to
+ // prevent the ordered inserts from being retried one by one.
+ return true;
+ }
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
@@ -942,13 +1071,17 @@ public:
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry) const {
- auto [batches, bucketStmtIds, _] =
+ auto [batches, bucketStmtIds, _, canContinue] =
_insertIntoBucketCatalog(opCtx, start, numDocs, indices, errors, containsRetry);
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
std::vector<size_t> docsToRetry;
+ if (!canContinue) {
+ return docsToRetry;
+ }
+
for (auto& [batch, index] : batches) {
if (batch->claimCommitRights()) {
auto stmtIds = isTimeseriesWriteRetryable(opCtx)
diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript
index 963d12c5372..88655a990db 100644
--- a/src/mongo/db/exec/SConscript
+++ b/src/mongo/db/exec/SConscript
@@ -151,7 +151,7 @@ env.CppUnitTest(
],
LIBDEPS=[
"$BUILD_DIR/mongo/base",
- "$BUILD_DIR/mongo/bson/util/bson_column",
+ "$BUILD_DIR/mongo/bson/util/bson_column",
"$BUILD_DIR/mongo/db/auth/authmocks",
"$BUILD_DIR/mongo/db/query/collation/collator_factory_mock",
"$BUILD_DIR/mongo/db/query/collation/collator_interface_mock",
@@ -159,6 +159,7 @@ env.CppUnitTest(
"$BUILD_DIR/mongo/db/query_exec",
"$BUILD_DIR/mongo/db/record_id_helpers",
"$BUILD_DIR/mongo/db/service_context_d_test_fixture",
+ "$BUILD_DIR/mongo/db/timeseries/bucket_compression",
"$BUILD_DIR/mongo/dbtests/mocklib",
"$BUILD_DIR/mongo/util/clock_source_mock",
"document_value/document_value",
diff --git a/src/mongo/db/exec/bucket_unpacker_test.cpp b/src/mongo/db/exec/bucket_unpacker_test.cpp
index fcb285dc94b..85e2bfb45d5 100644
--- a/src/mongo/db/exec/bucket_unpacker_test.cpp
+++ b/src/mongo/db/exec/bucket_unpacker_test.cpp
@@ -30,9 +30,9 @@
#include "mongo/platform/basic.h"
#include "mongo/bson/json.h"
-#include "mongo/bson/util/bsoncolumnbuilder.h"
#include "mongo/db/exec/bucket_unpacker.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/timeseries/bucket_compression.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -102,65 +102,6 @@ public:
BSONObj obj = root.obj();
return {obj, "time"_sd};
}
-
- // Simple bucket compressor. Does not handle data fields out of order and does not sort fields
- // on time.
- // TODO (SERVER-58578): Replace with real bucket compressor
- BSONObj compress(BSONObj uncompressed, StringData timeField) {
- // Rewrite data fields as columns.
- BSONObjBuilder builder;
- for (auto& elem : uncompressed) {
- if (elem.fieldNameStringData() == "control") {
- BSONObjBuilder control(builder.subobjStart("control"));
-
- // Set right version, leave other control fields unchanged
- for (const auto& controlField : elem.Obj()) {
- if (controlField.fieldNameStringData() == "version") {
- control.append("version", 2);
- } else {
- control.append(controlField);
- }
- }
-
- continue;
- }
- if (elem.fieldNameStringData() != "data") {
- // Non-data fields can be unchanged.
- builder.append(elem);
- continue;
- }
-
- BSONObjBuilder dataBuilder = builder.subobjStart("data");
- std::list<BSONColumnBuilder> columnBuilders;
- size_t numTimeFields = 0;
- for (auto& column : elem.Obj()) {
- // Compress all data fields
- columnBuilders.emplace_back(column.fieldNameStringData());
- auto& columnBuilder = columnBuilders.back();
-
- for (auto& measurement : column.Obj()) {
- int index = std::atoi(measurement.fieldName());
- for (int i = columnBuilder.size(); i < index; ++i) {
- columnBuilder.skip();
- }
- columnBuilder.append(measurement);
- }
- if (columnBuilder.fieldName() == timeField) {
- numTimeFields = columnBuilder.size();
- }
- }
-
- for (auto& builder : columnBuilders) {
- for (size_t i = builder.size(); i < numTimeFields; ++i) {
- builder.skip();
- }
-
- BSONBinData binData = builder.finalize();
- dataBuilder.append(builder.fieldName(), binData);
- }
- }
- return builder.obj();
- }
};
TEST_F(BucketUnpackerTest, UnpackBasicIncludeAllMeasurementFields) {
@@ -212,7 +153,7 @@ TEST_F(BucketUnpackerTest, ExcludeASingleField) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, EmptyIncludeGetsEmptyMeasurements) {
@@ -238,7 +179,7 @@ TEST_F(BucketUnpackerTest, EmptyIncludeGetsEmptyMeasurements) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, EmptyExcludeMaterializesAllFields) {
@@ -266,7 +207,7 @@ TEST_F(BucketUnpackerTest, EmptyExcludeMaterializesAllFields) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, SparseColumnsWhereOneColumnIsExhaustedBeforeTheOther) {
@@ -292,7 +233,7 @@ TEST_F(BucketUnpackerTest, SparseColumnsWhereOneColumnIsExhaustedBeforeTheOther)
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnpackBasicIncludeWithDollarPrefix) {
@@ -321,7 +262,7 @@ TEST_F(BucketUnpackerTest, UnpackBasicIncludeWithDollarPrefix) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, BucketsWithMetadataOnly) {
@@ -346,7 +287,7 @@ TEST_F(BucketUnpackerTest, BucketsWithMetadataOnly) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnorderedRowKeysDoesntAffectMaterialization) {
@@ -404,7 +345,7 @@ TEST_F(BucketUnpackerTest, MissingMetaFieldDoesntMaterializeMetadata) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, MissingMetaFieldDoesntMaterializeMetadataUnorderedKeys) {
@@ -459,7 +400,7 @@ TEST_F(BucketUnpackerTest, ExcludedMetaFieldDoesntMaterializeMetadataWhenBucketH
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUndefinedMeta) {
@@ -478,7 +419,7 @@ TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUndefinedMeta) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUnexpectedMeta) {
@@ -498,7 +439,7 @@ TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUnexpectedMeta) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, NullMetaInBucketMaterializesAsNull) {
@@ -525,7 +466,7 @@ TEST_F(BucketUnpackerTest, NullMetaInBucketMaterializesAsNull) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, GetNextHandlesMissingMetaInBucket) {
@@ -557,7 +498,7 @@ TEST_F(BucketUnpackerTest, GetNextHandlesMissingMetaInBucket) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
+ test(*timeseries::compressBucket(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, EmptyDataRegionInBucketIsTolerated) {
@@ -576,7 +517,6 @@ TEST_F(BucketUnpackerTest, EmptyDataRegionInBucketIsTolerated) {
};
test(bucket);
- test(compress(bucket, "time"_sd));
}
TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnEmptyBucket) {
diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp
index 5c60f26fb6c..ea387ae920c 100644
--- a/src/mongo/db/ops/write_ops.cpp
+++ b/src/mongo/db/ops/write_ops.cpp
@@ -247,6 +247,9 @@ write_ops::UpdateModification write_ops::UpdateModification::parseFromOplogEntry
write_ops::UpdateModification::UpdateModification(doc_diff::Diff diff, DiffOptions options)
: _update(DeltaUpdate{std::move(diff), options}) {}
+write_ops::UpdateModification::UpdateModification(TransformFunc transform)
+ : _update(TransformUpdate{std::move(transform)}) {}
+
write_ops::UpdateModification::UpdateModification(BSONElement update) {
const auto type = update.type();
if (type == BSONType::Object) {
@@ -297,7 +300,8 @@ int write_ops::UpdateModification::objsize() const {
return size + kWriteCommandBSONArrayPerElementOverheadBytes;
},
- [](const DeltaUpdate& delta) -> int { return delta.diff.objsize(); }},
+ [](const DeltaUpdate& delta) -> int { return delta.diff.objsize(); },
+ [](const TransformUpdate& transform) -> int { return 0; }},
_update);
}
@@ -307,7 +311,8 @@ write_ops::UpdateModification::Type write_ops::UpdateModification::type() const
visit_helper::Overloaded{
[](const ClassicUpdate& classic) { return Type::kClassic; },
[](const PipelineUpdate& pipelineUpdate) { return Type::kPipeline; },
- [](const DeltaUpdate& delta) { return Type::kDelta; }},
+ [](const DeltaUpdate& delta) { return Type::kDelta; },
+ [](const TransformUpdate& transform) { return Type::kTransform; }},
_update);
}
@@ -328,7 +333,8 @@ void write_ops::UpdateModification::serializeToBSON(StringData fieldName,
}
arrayBuilder.doneFast();
},
- [fieldName, bob](const DeltaUpdate& delta) { *bob << fieldName << delta.diff; }},
+ [fieldName, bob](const DeltaUpdate& delta) { *bob << fieldName << delta.diff; },
+ [](const TransformUpdate& transform) {}},
_update);
}
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index 788ae259722..4a70bfdc0da 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -79,7 +79,8 @@ repl::OpTime opTimeParser(BSONElement elem);
class UpdateModification {
public:
- enum class Type { kClassic, kPipeline, kDelta };
+ enum class Type { kClassic, kPipeline, kDelta, kTransform };
+ using TransformFunc = std::function<boost::optional<BSONObj>(const BSONObj&)>;
/**
* Used to indicate that a certain type of update is being passed to the constructor.
@@ -105,6 +106,8 @@ public:
UpdateModification(BSONElement update);
UpdateModification(std::vector<BSONObj> pipeline);
UpdateModification(doc_diff::Diff, DiffOptions);
+ // Creates an transform-style update. The transform function MUST preserve the _id element.
+ UpdateModification(TransformFunc transform);
// This constructor exists only to provide a fast-path for constructing classic-style updates.
UpdateModification(const BSONObj& update, ClassicTag);
@@ -141,6 +144,11 @@ public:
return stdx::get<DeltaUpdate>(_update).diff;
}
+ const TransformFunc& getTransform() const {
+ invariant(type() == Type::kTransform);
+ return stdx::get<TransformUpdate>(_update).transform;
+ }
+
bool mustCheckExistenceForInsertOperations() const {
invariant(type() == Type::kDelta);
return stdx::get<DeltaUpdate>(_update).options.mustCheckExistenceForInsertOperations;
@@ -149,19 +157,19 @@ public:
std::string toString() const {
StringBuilder sb;
- stdx::visit(visit_helper::Overloaded{[&sb](const ClassicUpdate& classic) {
- sb << "{type: Classic, update: " << classic.bson
- << "}";
- },
- [&sb](const PipelineUpdate& pipeline) {
- sb << "{type: Pipeline, update: "
- << Value(pipeline).toString() << "}";
- },
- [&sb](const DeltaUpdate& delta) {
- sb << "{type: Delta, update: " << delta.diff
- << "}";
- }},
- _update);
+ stdx::visit(
+ visit_helper::Overloaded{
+ [&sb](const ClassicUpdate& classic) {
+ sb << "{type: Classic, update: " << classic.bson << "}";
+ },
+ [&sb](const PipelineUpdate& pipeline) {
+ sb << "{type: Pipeline, update: " << Value(pipeline).toString() << "}";
+ },
+ [&sb](const DeltaUpdate& delta) {
+ sb << "{type: Delta, update: " << delta.diff << "}";
+ },
+ [&sb](const TransformUpdate& transform) { sb << "{type: Transform}"; }},
+ _update);
return sb.str();
}
@@ -176,7 +184,10 @@ private:
doc_diff::Diff diff;
DiffOptions options;
};
- stdx::variant<ClassicUpdate, PipelineUpdate, DeltaUpdate> _update;
+ struct TransformUpdate {
+ TransformFunc transform;
+ };
+ stdx::variant<ClassicUpdate, PipelineUpdate, DeltaUpdate, TransformUpdate> _update;
};
} // namespace write_ops
diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript
index 18ae4a3885b..505e7ba6cec 100644
--- a/src/mongo/db/timeseries/SConscript
+++ b/src/mongo/db/timeseries/SConscript
@@ -34,6 +34,19 @@ env.Library(
)
env.Library(
+ target='bucket_compression',
+ source=[
+ 'bucket_compression.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/bson/util/bson_column',
+ ]
+)
+
+env.Library(
target='timeseries_index_schema_conversion_functions',
source=[
'timeseries_index_schema_conversion_functions.cpp',
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 2780942ce78..3b5da81a3bb 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -177,7 +177,7 @@ BSONObj BucketCatalog::getMetadata(Bucket* ptr) const {
return bucket->_metadata.toBSON();
}
-StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
+StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
OperationContext* opCtx,
const NamespaceString& ns,
const StringData::ComparatorInterface* comparator,
@@ -204,7 +204,8 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
auto time = timeElem.Date();
- BucketAccess bucket{this, key, options, stats.get(), time};
+ ClosedBuckets closedBuckets;
+ BucketAccess bucket{this, key, options, stats.get(), &closedBuckets, time};
invariant(bucket);
NewFieldNames newFieldNamesToBeInserted;
@@ -239,7 +240,8 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
};
if (!bucket->_ns.isEmpty() && isBucketFull(&bucket)) {
- bucket.rollover(isBucketFull);
+ bucket.rollover(isBucketFull, &closedBuckets);
+
bucket->_calculateBucketFieldsAndSizeChange(doc,
options.getMetaField(),
&newFieldNamesToBeInserted,
@@ -273,7 +275,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert(
}
_memoryUsage.fetchAndAdd(bucket->_memoryUsage);
- return batch;
+ return InsertResult{batch, closedBuckets};
}
bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
@@ -302,10 +304,13 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
return true;
}
-void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info) {
+boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
+ std::shared_ptr<WriteBatch> batch, const CommitInfo& info) {
invariant(!batch->finished());
invariant(!batch->active());
+ boost::optional<ClosedBucket> closedBucket;
+
Bucket* ptr(batch->bucket());
batch->_finish(info);
@@ -346,6 +351,9 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
bucket.release();
auto lk = _lockExclusive();
+ closedBucket =
+ ClosedBucket{ptr->_id, ptr->getTimeField().toString(), ptr->numMeasurements()};
+
// Only remove from _allBuckets and _idleBuckets. If it was marked full, we know that
// happened in BucketAccess::rollover, and that there is already a new open bucket for
// this metadata.
@@ -359,6 +367,7 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
_markBucketIdle(bucket);
}
}
+ return closedBucket;
}
void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch,
@@ -580,18 +589,25 @@ void BucketCatalog::_verifyBucketIsUnused(Bucket* bucket) const {
stdx::lock_guard<Mutex> lk{bucket->_mutex};
}
-void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats) {
+void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats,
+ std::vector<BucketCatalog::ClosedBucket>* closedBuckets) {
// Must hold an exclusive lock on _bucketMutex from outside.
stdx::lock_guard lk{_idleMutex};
- // As long as we still need space and have entries, close idle buckets.
+ // As long as we still need space and have entries and remaining attempts, close idle buckets.
+ int32_t numClosed = 0;
while (!_idleBuckets.empty() &&
_memoryUsage.load() >
- static_cast<std::uint64_t>(gTimeseriesIdleBucketExpiryMemoryUsageThreshold)) {
+ static_cast<std::uint64_t>(gTimeseriesIdleBucketExpiryMemoryUsageThreshold) &&
+ numClosed <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) {
Bucket* bucket = _idleBuckets.back();
_verifyBucketIsUnused(bucket);
+ ClosedBucket closed{
+ bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()};
if (_removeBucket(bucket, true /* expiringBuckets */)) {
stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1);
+ closedBuckets->push_back(closed);
+ ++numClosed;
}
}
}
@@ -605,8 +621,9 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(const BucketKey& key,
const Date_t& time,
const TimeseriesOptions& options,
ExecutionStats* stats,
+ ClosedBuckets* closedBuckets,
bool openedDuetoMetadata) {
- _expireIdleBuckets(stats);
+ _expireIdleBuckets(stats, closedBuckets);
auto [it, inserted] = _allBuckets.insert(std::make_unique<Bucket>());
Bucket* bucket = it->get();
@@ -652,6 +669,7 @@ void BucketCatalog::_setIdTimestamp(Bucket* bucket,
auto roundedTime = timeseries::roundTimestampToGranularity(time, options.getGranularity());
auto const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch());
bucket->_id.setTimestamp(roundedSeconds);
+ bucket->_timeField = options.getTimeField().toString();
// Make sure we set the control.min time field to match the rounded _id timestamp.
auto controlDoc = buildControlMinTimestampDoc(options.getTimeField(), roundedTime);
@@ -760,6 +778,10 @@ const OID& BucketCatalog::Bucket::id() const {
return _id;
}
+StringData BucketCatalog::Bucket::getTimeField() {
+ return _timeField;
+}
+
void BucketCatalog::Bucket::_calculateBucketFieldsAndSizeChange(
const BSONObj& doc,
boost::optional<StringData> metaField,
@@ -806,6 +828,10 @@ bool BucketCatalog::Bucket::allCommitted() const {
return _batches.empty() && !_preparedBatch;
}
+uint32_t BucketCatalog::Bucket::numMeasurements() const {
+ return _numMeasurements;
+}
+
std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch(
OperationId opId, const std::shared_ptr<ExecutionStats>& stats) {
auto it = _batches.find(opId);
@@ -819,6 +845,7 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
BucketKey& key,
const TimeseriesOptions& options,
ExecutionStats* stats,
+ ClosedBuckets* closedBuckets,
const Date_t& time)
: _catalog(catalog), _key(&key), _options(&options), _stats(stats), _time(&time) {
@@ -875,7 +902,7 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
: key.withCopiedMetadata(BSONObj());
hashedKey.key = &originalBucketKey;
auto lk = _catalog->_lockExclusive();
- _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey);
+ _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets);
}
BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog,
@@ -977,11 +1004,13 @@ BucketCatalog::BucketState BucketCatalog::BucketAccess::_confirmStateForAcquired
}
void BucketCatalog::BucketAccess::_findOrCreateOpenBucketThenLock(
- const HashedBucketKey& normalizedKey, const HashedBucketKey& nonNormalizedKey) {
+ const HashedBucketKey& normalizedKey,
+ const HashedBucketKey& nonNormalizedKey,
+ ClosedBuckets* closedBuckets) {
auto it = _catalog->_openBuckets.find(normalizedKey);
if (it == _catalog->_openBuckets.end()) {
// No open bucket for this metadata.
- _create(normalizedKey, nonNormalizedKey);
+ _create(normalizedKey, nonNormalizedKey, closedBuckets);
return;
}
@@ -995,7 +1024,7 @@ void BucketCatalog::BucketAccess::_findOrCreateOpenBucketThenLock(
}
_catalog->_abort(_guard, _bucket, nullptr, boost::none);
- _create(normalizedKey, nonNormalizedKey);
+ _create(normalizedKey, nonNormalizedKey, closedBuckets);
}
void BucketCatalog::BucketAccess::_acquire() {
@@ -1005,10 +1034,11 @@ void BucketCatalog::BucketAccess::_acquire() {
void BucketCatalog::BucketAccess::_create(const HashedBucketKey& normalizedKey,
const HashedBucketKey& nonNormalizedKey,
+ ClosedBuckets* closedBuckets,
bool openedDuetoMetadata) {
invariant(_options);
- _bucket =
- _catalog->_allocateBucket(normalizedKey, *_time, *_options, _stats, openedDuetoMetadata);
+ _bucket = _catalog->_allocateBucket(
+ normalizedKey, *_time, *_options, _stats, closedBuckets, openedDuetoMetadata);
_catalog->_openBuckets[nonNormalizedKey] = _bucket;
_bucket->_nonNormalizedKeyMetadatas.push_back(nonNormalizedKey.key->metadata.toBSON());
_acquire();
@@ -1037,7 +1067,8 @@ BucketCatalog::BucketAccess::operator BucketCatalog::Bucket*() const {
return _bucket;
}
-void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess*)>& isBucketFull) {
+void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess*)>& isBucketFull,
+ ClosedBuckets* closedBuckets) {
invariant(isLocked());
invariant(_key);
invariant(_time);
@@ -1054,7 +1085,7 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess
auto hashedKey = BucketHasher{}.hashed_key(prevBucketKey);
auto lk = _catalog->_lockExclusive();
- _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey);
+ _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets);
// Recheck if still full now that we've reacquired the bucket.
bool sameBucket =
@@ -1064,6 +1095,9 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess
if (_bucket->allCommitted()) {
// The bucket does not contain any measurements that are yet to be committed, so we can
// remove it now. Otherwise, we must keep the bucket around until it is committed.
+ closedBuckets->push_back(ClosedBucket{
+ _bucket->id(), _bucket->getTimeField().toString(), _bucket->numMeasurements()});
+
oldBucket = _bucket;
release();
bool removed = _catalog->_removeBucket(oldBucket, false /* expiringBuckets */);
@@ -1077,7 +1111,7 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess
release();
}
- _create(hashedNormalizedKey, hashedKey, false /* openedDueToMetadata */);
+ _create(hashedNormalizedKey, hashedKey, closedBuckets, false /* openedDueToMetadata */);
}
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 44c59a9ede1..2c2b1994443 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -66,6 +66,16 @@ public:
};
/**
+ * Information of a Bucket that got closed when performing an operation on this BucketCatalog.
+ */
+ struct ClosedBucket {
+ OID bucketId;
+ std::string timeField;
+ uint32_t numMeasurements;
+ };
+ using ClosedBuckets = std::vector<ClosedBucket>;
+
+ /**
* The basic unit of work for a bucket. Each insert will return a shared_ptr to a WriteBatch.
* When a writer is finished with all their insertions, they should then take steps to ensure
* each batch they wrote into is committed. To ensure a batch is committed, a writer should
@@ -164,6 +174,15 @@ public:
SharedPromise<CommitInfo> _promise;
};
+ /**
+ * Return type for the insert function.
+ * See comment above insert() for more information.
+ */
+ struct InsertResult {
+ std::shared_ptr<WriteBatch> batch;
+ ClosedBuckets closedBuckets;
+ };
+
static BucketCatalog& get(ServiceContext* svcCtx);
static BucketCatalog& get(OperationContext* opCtx);
@@ -183,17 +202,16 @@ public:
BSONObj getMetadata(Bucket* bucket) const;
/**
- * Returns the WriteBatch into which the document was inserted. Any caller who receives the same
- * batch may commit or abort the batch after claiming commit rights. See WriteBatch for more
- * details.
+ * Returns the WriteBatch into which the document was inserted and optional information about a
+ * bucket if one was closed. Any caller who receives the same batch may commit or abort the
+ * batch after claiming commit rights. See WriteBatch for more details.
*/
- StatusWith<std::shared_ptr<WriteBatch>> insert(
- OperationContext* opCtx,
- const NamespaceString& ns,
- const StringData::ComparatorInterface* comparator,
- const TimeseriesOptions& options,
- const BSONObj& doc,
- CombineWithInsertsFromOtherClients combine);
+ StatusWith<InsertResult> insert(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const StringData::ComparatorInterface* comparator,
+ const TimeseriesOptions& options,
+ const BSONObj& doc,
+ CombineWithInsertsFromOtherClients combine);
/**
* Prepares a batch for commit, transitioning it to an inactive state. Caller must already have
@@ -205,8 +223,10 @@ public:
/**
* Records the result of a batch commit. Caller must already have commit rights on batch, and
* batch must have been previously prepared.
+ *
+ * Returns bucket information of a bucket if one was closed.
*/
- void finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info);
+ boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info);
/**
* Aborts the given write batch and any other outstanding batches on the same bucket. Caller
@@ -342,10 +362,20 @@ public:
const OID& id() const;
/**
+ * Returns the timefield for the underlying bucket.
+ */
+ StringData getTimeField();
+
+ /**
* Returns whether all measurements have been committed.
*/
bool allCommitted() const;
+ /**
+ * Returns total number of measurements in the bucket.
+ */
+ uint32_t numMeasurements() const;
+
private:
/**
* Determines the effect of adding 'doc' to this bucket. If adding 'doc' causes this bucket
@@ -389,6 +419,9 @@ public:
// Top-level field names of the measurements that have been inserted into the bucket.
StringSet _fieldNames;
+ // Time field for the measurements that have been inserted into the bucket.
+ std::string _timeField;
+
// The minimum and maximum values for each field in the bucket.
timeseries::MinMax _minmax;
@@ -549,6 +582,7 @@ private:
BucketKey& key,
const TimeseriesOptions& options,
ExecutionStats* stats,
+ ClosedBuckets* closedBuckets,
const Date_t& time);
BucketAccess(BucketCatalog* catalog,
Bucket* bucket,
@@ -568,8 +602,11 @@ private:
* Parameter is a function which should check that the bucket is indeed still full after
* reacquiring the necessary locks. The first parameter will give the function access to
* this BucketAccess instance, with the bucket locked.
+ *
+ * Returns bucket information of a bucket if one was closed.
*/
- void rollover(const std::function<bool(BucketAccess*)>& isBucketFull);
+ void rollover(const std::function<bool(BucketAccess*)>& isBucketFull,
+ ClosedBuckets* closedBuckets);
// Retrieve the time associated with the bucket (id)
Date_t getTime() const;
@@ -607,7 +644,8 @@ private:
// Helper to find an open bucket for the given metadata if it exists, create it if it
// doesn't, and lock it. Requires an exclusive lock on the catalog.
void _findOrCreateOpenBucketThenLock(const HashedBucketKey& normalizedKey,
- const HashedBucketKey& key);
+ const HashedBucketKey& key,
+ ClosedBuckets* closedBuckets);
// Lock _bucket.
void _acquire();
@@ -616,6 +654,7 @@ private:
// a lock on it.
void _create(const HashedBucketKey& normalizedKey,
const HashedBucketKey& key,
+ ClosedBuckets* closedBuckets,
bool openedDuetoMetadata = true);
BucketCatalog* _catalog;
@@ -675,7 +714,7 @@ private:
/**
* Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold.
*/
- void _expireIdleBuckets(ExecutionStats* stats);
+ void _expireIdleBuckets(ExecutionStats* stats, ClosedBuckets* closedBuckets);
std::size_t _numberOfIdleBuckets() const;
@@ -684,6 +723,7 @@ private:
const Date_t& time,
const TimeseriesOptions& options,
ExecutionStats* stats,
+ ClosedBuckets* closedBuckets,
bool openedDuetoMetadata);
std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns);
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 8658b6807dd..02f056ff629 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -168,7 +168,7 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns,
_getTimeseriesOptions(ns),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch = result.getValue();
+ auto& batch = result.getValue().batch;
_commit(batch, numPreviouslyCommittedMeasurements);
}
@@ -187,7 +187,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto batch1 = result1.getValue();
+ auto batch1 = result1.getValue().batch;
ASSERT(batch1->claimCommitRights());
ASSERT(batch1->active());
@@ -200,7 +200,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto batch2 = result2.getValue();
+ auto batch2 = result2.getValue().batch;
ASSERT_EQ(batch1, batch2);
ASSERT(!batch2->claimCommitRights());
@@ -233,7 +233,8 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
auto bucket = batch->bucket();
_bucketCatalog->abort(batch);
@@ -264,20 +265,21 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
// Inserts should all be into three distinct buckets (and therefore batches).
- ASSERT_NE(result1.getValue(), result2.getValue());
- ASSERT_NE(result1.getValue(), result3.getValue());
- ASSERT_NE(result2.getValue(), result3.getValue());
+ ASSERT_NE(result1.getValue().batch, result2.getValue().batch);
+ ASSERT_NE(result1.getValue().batch, result3.getValue().batch);
+ ASSERT_NE(result2.getValue().batch, result3.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()),
- _bucketCatalog->getMetadata(result2.getValue()->bucket()));
- ASSERT(_bucketCatalog->getMetadata(result3.getValue()->bucket()).isEmpty());
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
+ ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucket()).isEmpty());
// Committing one bucket should only return the one document in that bucket and should not
// affect the other bucket.
- for (const auto& batch : {result1.getValue(), result2.getValue(), result3.getValue()}) {
+ for (const auto& batch :
+ {result1.getValue().batch, result2.getValue().batch, result3.getValue().batch}) {
_commit(batch, 0);
}
}
@@ -298,13 +300,13 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) {
BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("b" << 1 << "a" << 0))),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_EQ(result1.getValue(), result2.getValue());
+ ASSERT_EQ(result1.getValue().batch, result2.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
- _bucketCatalog->getMetadata(result2.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
@@ -327,17 +329,17 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
<< BSON("g" << 0 << "f" << 1))))),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_EQ(result1.getValue(), result2.getValue());
+ ASSERT_EQ(result1.getValue().batch, result2.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(
BSON(_metaField << BSONObj(BSON(
"c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(
BSON(_metaField << BSONObj(BSON(
"c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))),
- _bucketCatalog->getMetadata(result2.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
@@ -363,17 +365,17 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) {
<< "456"))))),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_EQ(result1.getValue(), result2.getValue());
+ ASSERT_EQ(result1.getValue().batch, result2.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
- _bucketCatalog->getMetadata(result2.getValue()->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
}
TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
@@ -393,16 +395,16 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
// Inserts should all be into three distinct buckets (and therefore batches).
- ASSERT_NE(result1.getValue(), result2.getValue());
+ ASSERT_NE(result1.getValue().batch, result2.getValue().batch);
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL),
- _bucketCatalog->getMetadata(result1.getValue()->bucket()));
- ASSERT(_bucketCatalog->getMetadata(result2.getValue()->bucket()).isEmpty());
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
+ ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucket()).isEmpty());
// Committing one bucket should only return the one document in that bucket and should not
// affect the other bucket.
- for (const auto& batch : {result1.getValue(), result2.getValue()}) {
+ for (const auto& batch : {result1.getValue().batch, result2.getValue().batch}) {
_commit(batch, 0);
}
}
@@ -444,7 +446,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch1->claimCommitRights());
_bucketCatalog->prepareCommit(batch1);
ASSERT_EQ(batch1->measurements().size(), 1);
@@ -458,7 +461,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch2);
_bucketCatalog->finish(batch1, {});
@@ -475,7 +479,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch = result.getValue();
+ auto& batch = result.getValue().batch;
_bucketCatalog->prepareCommit(batch);
}
@@ -486,7 +490,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotFinishUnpreparedBatch, "invariant") {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch = result.getValue();
+ auto& batch = result.getValue().batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->finish(batch, {});
}
@@ -499,7 +503,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucket()));
@@ -514,8 +519,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 0),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_OK(result);
- auto batch = result.getValue();
+ ASSERT(result.isOK());
+ auto batch = result.getValue().batch;
auto oldId = batch->bucket()->id();
_commit(batch, 0);
ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
@@ -530,8 +535,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 1),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_OK(result);
- batch = result.getValue();
+ ASSERT(result.isOK());
+ batch = result.getValue().batch;
_commit(batch, 1);
ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
@@ -542,8 +547,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_OK(result);
- batch = result.getValue();
+ ASSERT(result.isOK());
+ batch = result.getValue().batch;
_commit(batch, 2);
ASSERT_EQ(1U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
ASSERT(batch->newFieldNamesToBeInserted().count("b")) << batch->toBSON();
@@ -556,8 +561,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << i),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- ASSERT_OK(result);
- batch = result.getValue();
+ ASSERT(result.isOK());
+ batch = result.getValue().batch;
_commit(batch, i);
ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << i << ":" << batch->toBSON();
}
@@ -571,7 +576,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
- auto& batch2 = result2.getValue();
+ auto& batch2 = result2.getValue().batch;
ASSERT_NE(oldId, batch2->bucket()->id());
_commit(batch2, 0);
ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON();
@@ -587,7 +592,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch1->claimCommitRights());
_bucketCatalog->prepareCommit(batch1);
ASSERT_EQ(batch1->measurements().size(), 1);
@@ -601,7 +607,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch2);
ASSERT(batch2->claimCommitRights());
@@ -622,7 +629,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->clear(_ns1);
@@ -639,7 +647,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->prepareCommit(batch);
ASSERT_EQ(batch->measurements().size(), 1);
@@ -666,7 +675,8 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->prepareCommit(batch);
ASSERT_EQ(batch->measurements().size(), 1);
@@ -687,7 +697,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch1->claimCommitRights());
_bucketCatalog->prepareCommit(batch1);
ASSERT_EQ(batch1->measurements().size(), 1);
@@ -701,7 +712,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch2);
ASSERT_EQ(batch1->bucket(), batch2->bucket());
@@ -726,7 +738,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch3);
ASSERT_NE(batch2, batch3);
ASSERT_NE(batch1->bucket(), batch3->bucket());
@@ -748,7 +761,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch->claimCommitRights());
_bucketCatalog->abort(batch);
@@ -769,7 +783,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
auto batch2 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -778,7 +793,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
auto batch3 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -787,7 +803,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
auto batch4 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -796,7 +813,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT_NE(batch1, batch2);
ASSERT_NE(batch1, batch3);
@@ -816,7 +834,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
auto batch2 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -825,7 +844,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
ASSERT(batch1->claimCommitRights());
ASSERT(batch2->claimCommitRights());
@@ -852,7 +872,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
auto batch2 = _bucketCatalog
->insert(_makeOperationContext().second.get(),
@@ -861,7 +882,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
_getTimeseriesOptions(_ns1),
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
- .getValue();
+ .getValue()
+ .batch;
// Batch 2 is the first batch to commit the time field.
ASSERT(batch2->claimCommitRights());
diff --git a/src/mongo/db/timeseries/bucket_compression.cpp b/src/mongo/db/timeseries/bucket_compression.cpp
new file mode 100644
index 00000000000..06f65f5252e
--- /dev/null
+++ b/src/mongo/db/timeseries/bucket_compression.cpp
@@ -0,0 +1,203 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
+
+#include "mongo/db/timeseries/bucket_compression.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bsoncolumnbuilder.h"
+#include "mongo/db/timeseries/timeseries_constants.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+namespace timeseries {
+
+boost::optional<BSONObj> compressBucket(const BSONObj& bucketDoc, StringData timeFieldName) try {
+ // Helper for uncompressed measurements
+ struct Measurement {
+ BSONElement timeField;
+ std::vector<BSONElement> dataFields;
+ };
+
+ BSONObjBuilder builder; // builder to build the compressed bucket
+ std::vector<Measurement> measurements; // Extracted measurements from uncompressed bucket
+ boost::optional<BSONObjIterator> time; // Iterator to read time fields from uncompressed bucket
+ std::vector<std::pair<StringData, BSONObjIterator>>
+ columns; // Iterators to read data fields from uncompressed bucket
+
+ // Read everything from the uncompressed bucket
+ for (auto& elem : bucketDoc) {
+ // Control field is left as-is except for the version field.
+ if (elem.fieldNameStringData() == kBucketControlFieldName) {
+ BSONObjBuilder control(builder.subobjStart(kBucketControlFieldName));
+
+ // Set right version, leave other control fields unchanged
+ bool versionSet = false;
+ for (const auto& controlField : elem.Obj()) {
+ if (controlField.fieldNameStringData() == kBucketControlVersionFieldName) {
+ control.append(kBucketControlVersionFieldName,
+ kTimeseriesControlCompressedVersion);
+ versionSet = true;
+ } else {
+ control.append(controlField);
+ }
+ }
+
+ // Set version if it was missing from uncompressed bucket
+ if (!versionSet) {
+ control.append(kBucketControlVersionFieldName, kTimeseriesControlCompressedVersion);
+ }
+
+ continue;
+ }
+
+ // Everything that's not under data or control is left as-is
+ if (elem.fieldNameStringData() != kBucketDataFieldName) {
+ // Skip any updates to non-data fields.
+ builder.append(elem);
+ continue;
+ }
+
+ // Setup iterators to read all fields under data in lock-step
+ for (auto& columnObj : elem.Obj()) {
+ if (columnObj.fieldNameStringData() == timeFieldName) {
+ time.emplace(columnObj.Obj());
+ } else {
+ columns.emplace_back(columnObj.fieldNameStringData(), columnObj.Obj());
+ }
+ }
+ }
+
+ // If provided time field didn't exist then there is nothing to do
+ if (!time) {
+ return boost::none;
+ }
+
+ // Read all measurements from bucket
+ while (time->more()) {
+ // Get and advance the time iterator
+ auto timeElement = time->next();
+
+ // Get BSONElement's to all data elements. Missing data fields are represented as EOO.
+ Measurement measurement;
+ measurement.timeField = timeElement;
+ measurement.dataFields.resize(columns.size());
+
+ // Read one element from each data field iterator
+ for (size_t i = 0; i < columns.size(); ++i) {
+ auto& column = columns[i].second;
+ // If we reach the end nothing more to do, all subsequent elements will be left as
+ // EOO/missing.
+ if (!column.more()) {
+ continue;
+ }
+
+ // Check that the field name match the name of the time field. Field names are
+ // strings of integer indexes, "0", "1" etc. Data fields may have missing entries
+ // where the time field may not. So we can use this fact and just do a simple string
+ // compare against the time field. If it does not match our data field is skipped
+ // and the iterator is positioned at an element with a higher index. We should then
+ // leave the data field as EOO and not advance the data iterator.
+ auto elem = *column;
+ if (timeElement.fieldNameStringData() == elem.fieldNameStringData()) {
+ // Extract the element and advance the iterator
+ measurement.dataFields[i] = elem;
+ column.advance(elem);
+ }
+ }
+
+ measurements.push_back(std::move(measurement));
+ }
+
+ // Verify that we are at end for all data iterators, if we are not then there is something
+ // funky with the bucket and we have not read everything. We cannot compress as that would
+ // lose user data.
+ // This can happen if the bucket contain unordered keys in its data fields {"0": ..., "2":
+ // ..., "1": ...}. Or if there are more data fields than time fields.
+ if (std::any_of(columns.begin(), columns.end(), [](const auto& entry) {
+ return entry.second.more();
+ })) {
+ LOGV2_DEBUG(5857801,
+ 1,
+ "Failed to parse timeseries bucket during compression, leaving uncompressed");
+ return boost::none;
+ }
+
+ // Sort all the measurements on time order.
+ std::sort(measurements.begin(),
+ measurements.end(),
+ [](const Measurement& lhs, const Measurement& rhs) {
+ return lhs.timeField.timestamp() < rhs.timeField.timestamp();
+ });
+
+ // Last, compress elements and build compressed bucket
+ {
+ BSONObjBuilder dataBuilder = builder.subobjStart(kBucketDataFieldName);
+ BufBuilder columnBuffer; // Reusable buffer to avoid extra allocs per column.
+
+ // Add compressed time field first
+ {
+ BSONColumnBuilder timeColumn(timeFieldName, std::move(columnBuffer));
+ for (const auto& measurement : measurements) {
+ timeColumn.append(measurement.timeField);
+ }
+ dataBuilder.append(timeFieldName, timeColumn.finalize());
+ columnBuffer = timeColumn.detach();
+ }
+
+ // Then add compressed data fields.
+ for (size_t i = 0; i < columns.size(); ++i) {
+ BSONColumnBuilder column(columns[i].first, std::move(columnBuffer));
+ for (const auto& measurement : measurements) {
+ if (auto elem = measurement.dataFields[i]) {
+ column.append(elem);
+ } else {
+ column.skip();
+ }
+ }
+ dataBuilder.append(column.fieldName(), column.finalize());
+ columnBuffer = column.detach();
+ }
+ }
+
+ return builder.obj();
+} catch (...) {
+ // Skip compression if we encounter any exception
+ LOGV2_DEBUG(5857800,
+ 1,
+ "Exception when compressing timeseries bucket, leaving it uncompressed",
+ "error"_attr = exceptionToStatus());
+ return boost::none;
+}
+
+} // namespace timeseries
+} // namespace mongo
diff --git a/src/mongo/db/timeseries/bucket_compression.h b/src/mongo/db/timeseries/bucket_compression.h
new file mode 100644
index 00000000000..f28360bf786
--- /dev/null
+++ b/src/mongo/db/timeseries/bucket_compression.h
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+
+namespace timeseries {
+
+/**
+ * Returns a compressed timeseries bucket in v2 format for a given uncompressed v1 bucket and time
+ * field. The compressed bucket will have all measurements sorted by time.
+ *
+ * If bucket compression is not possible for any reason, boost::none is returned.
+ */
+boost::optional<BSONObj> compressBucket(const BSONObj& bucketDoc, StringData timeFieldName);
+
+} // namespace timeseries
+} // namespace mongo
diff --git a/src/mongo/db/timeseries/timeseries.idl b/src/mongo/db/timeseries/timeseries.idl
index c97e9cf1887..ba01d4e7121 100644
--- a/src/mongo/db/timeseries/timeseries.idl
+++ b/src/mongo/db/timeseries/timeseries.idl
@@ -54,6 +54,13 @@ server_parameters:
cpp_varname: "gTimeseriesIdleBucketExpiryMemoryUsageThreshold"
default: 104857600 # 100MB
validator: { gte: 1 }
+ "timeseriesIdleBucketExpiryMaxCountPerAttempt":
+ description: "The maximum number of buckets that may be closed due to expiry at each attempt"
+ set_at: [ startup ]
+ cpp_vartype: "std::int32_t"
+ cpp_varname: "gTimeseriesIdleBucketExpiryMaxCountPerAttempt"
+ default: 3
+ validator: { gte: 2 }
enums:
BucketGranularity:
diff --git a/src/mongo/db/update/SConscript b/src/mongo/db/update/SConscript
index 500fdde1adc..a8f0bd1dc2b 100644
--- a/src/mongo/db/update/SConscript
+++ b/src/mongo/db/update/SConscript
@@ -59,6 +59,7 @@ env.Library(
source=[
'delta_executor.cpp',
'object_replace_executor.cpp',
+ 'object_transform_executor.cpp',
'pipeline_executor.cpp',
],
LIBDEPS=[
@@ -125,6 +126,7 @@ env.CppUnitTest(
'field_checker_test.cpp',
'modifier_table_test.cpp',
'object_replace_executor_test.cpp',
+ 'object_transform_executor_test.cpp',
'path_support_test.cpp',
'pipeline_executor_test.cpp',
'pop_node_test.cpp',
diff --git a/src/mongo/db/update/object_transform_executor.cpp b/src/mongo/db/update/object_transform_executor.cpp
new file mode 100644
index 00000000000..597b7dbb335
--- /dev/null
+++ b/src/mongo/db/update/object_transform_executor.cpp
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/update/object_transform_executor.h"
+
+#include "mongo/db/update/object_replace_executor.h"
+#include "mongo/db/update/storage_validation.h"
+#include "mongo/db/update/update_oplog_entry_serialization.h"
+
+namespace mongo {
+
+ObjectTransformExecutor::ObjectTransformExecutor(TransformFunc transformFunc)
+ : _transformFunc(std::move(transformFunc)) {}
+
+UpdateExecutor::ApplyResult ObjectTransformExecutor::applyTransformUpdate(
+ ApplyParams applyParams, const TransformFunc& transformFunc) {
+
+ auto originalDoc = applyParams.element.getDocument().getObject();
+ auto transformedDoc = transformFunc(originalDoc);
+ if (!transformedDoc) {
+ return ApplyResult::noopResult();
+ }
+
+ return ObjectReplaceExecutor::applyReplacementUpdate(
+ std::move(applyParams),
+ *transformedDoc,
+ true /* replacementDocContainsIdField */,
+ true /* allowTopLevelDollarPrefixedFields */);
+}
+
+UpdateExecutor::ApplyResult ObjectTransformExecutor::applyUpdate(ApplyParams applyParams) const {
+ auto ret = applyTransformUpdate(applyParams, _transformFunc);
+
+ if (!ret.noop && applyParams.logMode != ApplyParams::LogMode::kDoNotGenerateOplogEntry) {
+ // Using a full replacement oplog entry as the current use case is doing major changes.
+ // Consider supporting v:2 update in the future.
+ ret.oplogEntry = update_oplog_entry::makeReplacementOplogEntry(
+ applyParams.element.getDocument().getObject());
+ }
+ return ret;
+}
+
+Value ObjectTransformExecutor::serialize() const {
+ MONGO_UNREACHABLE_TASSERT(5857810);
+}
+} // namespace mongo
diff --git a/src/mongo/db/update/object_transform_executor.h b/src/mongo/db/update/object_transform_executor.h
new file mode 100644
index 00000000000..aee80ac80b7
--- /dev/null
+++ b/src/mongo/db/update/object_transform_executor.h
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <functional>
+
+#include "mongo/db/update/update_executor.h"
+
+namespace mongo {
+
+/**
+ * An UpdateExecutor representing a transform-style update.
+ *
+ * A transform-style update is similar to a replacement-style update with the difference that the
+ * replacement document is calculated from the original document via a user-provided function. In
+ * case of a WriteConflict or similar where the update needs to be retried the output document will
+ * be re-calculated.
+ *
+ * The replacement function MUST preserve the _id element from the original document.
+ *
+ * If the replacement function returns boost::none the update operation will be considered a no-op.
+ */
+class ObjectTransformExecutor : public UpdateExecutor {
+public:
+ using TransformFunc = std::function<boost::optional<BSONObj>(const BSONObj&)>;
+
+ /**
+ * Applies a transform style update to 'applyParams.element'.
+ *
+ * The transform function MUST preserve the _id element in the original document.
+ *
+ * This function will ignore the log mode provided in 'applyParams'. The 'oplogEntry' field
+ * of the returned ApplyResult is always empty.
+ */
+ static ApplyResult applyTransformUpdate(ApplyParams applyParams,
+ const TransformFunc& transformFunc);
+
+ /**
+ * Initializes the node with the transform function to apply to the original document.
+ */
+ explicit ObjectTransformExecutor(TransformFunc transform);
+
+ /**
+ * Replaces the document that 'applyParams.element' belongs to with result of transform
+ * function. 'applyParams.element' must be the root of the document. Always returns a result
+ * stating that indexes are affected when the replacement is not a noop.
+ */
+ ApplyResult applyUpdate(ApplyParams applyParams) const final;
+
+ Value serialize() const final;
+
+private:
+ TransformFunc _transformFunc;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/update/object_transform_executor_test.cpp b/src/mongo/db/update/object_transform_executor_test.cpp
new file mode 100644
index 00000000000..06cca4da7cd
--- /dev/null
+++ b/src/mongo/db/update/object_transform_executor_test.cpp
@@ -0,0 +1,210 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/update/object_transform_executor.h"
+
+#include "mongo/bson/mutable/algorithm.h"
+#include "mongo/bson/mutable/mutable_bson_test_utils.h"
+#include "mongo/db/json.h"
+#include "mongo/db/update/update_node_test_fixture.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+using ObjectTransformExecutorTest = UpdateNodeTest;
+using mongo::mutablebson::countChildren;
+using mongo::mutablebson::Element;
+
+TEST_F(ObjectTransformExecutorTest, Noop) {
+ BSONObj input = fromjson("{a: 1, b: 2}");
+
+ ObjectTransformExecutor node([&input](const BSONObj& pre) {
+ ASSERT_BSONOBJ_BINARY_EQ(pre, input);
+ return pre;
+ });
+
+ mutablebson::Document doc(input);
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_TRUE(result.noop);
+ ASSERT_FALSE(result.indexesAffected);
+ ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc);
+ ASSERT_TRUE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{}"), result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, NoneNoop) {
+ ObjectTransformExecutor node([](const BSONObj& pre) { return boost::none; });
+
+ mutablebson::Document doc(fromjson("{a: 1, b: 2}"));
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_TRUE(result.noop);
+ ASSERT_FALSE(result.indexesAffected);
+ ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc);
+ ASSERT_TRUE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{}"), result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, Replace) {
+ BSONObj from = fromjson("{a: 1, b: 2}");
+ BSONObj to = fromjson("{a: 1, b: 3}");
+
+ ObjectTransformExecutor node([&from, &to](const BSONObj& pre) {
+ ASSERT_BSONOBJ_BINARY_EQ(pre, from);
+ return to;
+ });
+
+ mutablebson::Document doc(from);
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_FALSE(result.noop);
+ ASSERT_TRUE(result.indexesAffected);
+ ASSERT_EQUALS(to, doc);
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(to, result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, ShouldSucceedWhenImmutableIdIsNotModified) {
+ auto obj = fromjson("{_id: 0, a: 1, b: 2}");
+ auto to = fromjson("{_id: 0, c: 1, d: 2}");
+ ObjectTransformExecutor node([&obj, &to](const BSONObj& pre) {
+ ASSERT_BSONOBJ_BINARY_EQ(pre, obj);
+ return to;
+ });
+
+ mutablebson::Document doc(obj);
+ addImmutablePath("_id");
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_FALSE(result.noop);
+ ASSERT_TRUE(result.indexesAffected);
+ ASSERT_EQUALS(to, doc);
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(to, result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotRemoveImmutablePath) {
+ auto from = fromjson("{_id: 0, a: {b: 1}}");
+ auto obj = fromjson("{_id: 0, c: 1}");
+ ObjectTransformExecutor node([&from, &obj](const BSONObj& pre) {
+ ASSERT_BSONOBJ_BINARY_EQ(pre, from);
+ return obj;
+ });
+
+ mutablebson::Document doc(from);
+ addImmutablePath("a.b");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::ImmutableField,
+ "After applying the update, the 'a.b' (required and immutable) "
+ "field was found to have been removed --{ _id: 0, a: { b: 1 } }");
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotReplaceImmutablePathWithArrayField) {
+ auto obj = fromjson("{_id: 0, a: [{b: 1}]}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{_id: 0, a: {b: 1}}"));
+ addImmutablePath("a.b");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::NotSingleValueField,
+ "After applying the update to the document, the (immutable) field "
+ "'a.b' was found to be an array or array descendant.");
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotMakeImmutablePathArrayDescendant) {
+ auto obj = fromjson("{_id: 0, a: [1]}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{_id: 0, a: {'0': 1}}"));
+ addImmutablePath("a.0");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::NotSingleValueField,
+ "After applying the update to the document, the (immutable) field "
+ "'a.0' was found to be an array or array descendant.");
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotModifyImmutablePath) {
+ auto obj = fromjson("{_id: 0, a: {b: 2}}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{_id: 0, a: {b: 1}}"));
+ addImmutablePath("a.b");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::ImmutableField,
+ "After applying the update, the (immutable) field 'a.b' was found "
+ "to have been altered to b: 2");
+}
+
+TEST_F(ObjectTransformExecutorTest, CannotModifyImmutableId) {
+ auto obj = fromjson("{_id: 1}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{_id: 0}"));
+ addImmutablePath("_id");
+ ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())),
+ AssertionException,
+ ErrorCodes::ImmutableField,
+ "After applying the update, the (immutable) field '_id' was found "
+ "to have been altered to _id: 1");
+}
+
+TEST_F(ObjectTransformExecutorTest, CanAddImmutableField) {
+ auto obj = fromjson("{a: {b: 1}}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{c: 1}"));
+ addImmutablePath("a.b");
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_FALSE(result.noop);
+ ASSERT_TRUE(result.indexesAffected);
+ ASSERT_EQUALS(fromjson("{a: {b: 1}}"), doc);
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: {b: 1}}"), result.oplogEntry);
+}
+
+TEST_F(ObjectTransformExecutorTest, CanAddImmutableId) {
+ auto obj = fromjson("{_id: 0}");
+ ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; });
+
+ mutablebson::Document doc(fromjson("{c: 1}"));
+ addImmutablePath("_id");
+ auto result = node.applyUpdate(getApplyParams(doc.root()));
+ ASSERT_FALSE(result.noop);
+ ASSERT_TRUE(result.indexesAffected);
+ ASSERT_EQUALS(fromjson("{_id: 0}"), doc);
+ ASSERT_FALSE(doc.isInPlaceModeEnabled());
+ ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0}"), result.oplogEntry);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/update/update_driver.cpp b/src/mongo/db/update/update_driver.cpp
index 1826267443d..2dd809aa99b 100644
--- a/src/mongo/db/update/update_driver.cpp
+++ b/src/mongo/db/update/update_driver.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/update/delta_executor.h"
#include "mongo/db/update/modifier_table.h"
#include "mongo/db/update/object_replace_executor.h"
+#include "mongo/db/update/object_transform_executor.h"
#include "mongo/db/update/path_support.h"
#include "mongo/db/update/storage_validation.h"
#include "mongo/db/update/update_oplog_entry_version.h"
@@ -165,6 +166,18 @@ void UpdateDriver::parse(
return;
}
+ if (updateMod.type() == write_ops::UpdateModification::Type::kTransform) {
+ uassert(5857811, "multi update is not supported for transform-style update", !multi);
+
+ uassert(5857812,
+ "arrayFilters may not be specified for transform-syle updates",
+ arrayFilters.empty());
+
+ _updateType = UpdateType::kTransform;
+ _updateExecutor = std::make_unique<ObjectTransformExecutor>(updateMod.getTransform());
+ return;
+ }
+
invariant(_updateType == UpdateType::kOperator);
// By this point we are expecting a "classic" update. This version of mongod only supports $v:
diff --git a/src/mongo/db/update/update_driver.h b/src/mongo/db/update/update_driver.h
index 18cce2c38b8..96f34b4cfaf 100644
--- a/src/mongo/db/update/update_driver.h
+++ b/src/mongo/db/update/update_driver.h
@@ -56,7 +56,7 @@ class OperationContext;
class UpdateDriver {
public:
- enum class UpdateType { kOperator, kReplacement, kPipeline, kDelta };
+ enum class UpdateType { kOperator, kReplacement, kPipeline, kDelta, kTransform };
UpdateDriver(const boost::intrusive_ptr<ExpressionContext>& expCtx);
diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp
index 79a20cdfc2d..88f5d685df0 100644
--- a/src/mongo/shell/bench.cpp
+++ b/src/mongo/shell/bench.cpp
@@ -1193,6 +1193,9 @@ void BenchRunOp::executeOnce(DBClientBase* conn,
// Delta updates are only executed on secondaries as part of oplog
// application.
MONGO_UNREACHABLE;
+ case write_ops::UpdateModification::Type::kTransform:
+ // It is not possible to run a transform update directly from a client.
+ MONGO_UNREACHABLE;
}
singleUpdate.append("multi", this->multi);
singleUpdate.append("upsert", this->upsert);