diff options
27 files changed, 1151 insertions, 210 deletions
diff --git a/jstests/core/timeseries/libs/timeseries.js b/jstests/core/timeseries/libs/timeseries.js index c9d27895566..88a3d38d705 100644 --- a/jstests/core/timeseries/libs/timeseries.js +++ b/jstests/core/timeseries/libs/timeseries.js @@ -11,6 +11,16 @@ var TimeseriesTest = class { } /** + * Returns whether time-series bucket compression are supported. + */ + static timeseriesBucketCompressionEnabled(conn) { + return assert + .commandWorked( + conn.adminCommand({getParameter: 1, featureFlagTimeseriesBucketCompression: 1})) + .featureFlagTimeseriesBucketCompression.value; + } + + /** * Returns whether time-series updates and deletes are supported. */ static timeseriesUpdatesAndDeletesEnabled(conn) { diff --git a/jstests/core/timeseries/timeseries_bucket_limit_count.js b/jstests/core/timeseries/timeseries_bucket_limit_count.js index 5f6869cc838..7183a561ec6 100644 --- a/jstests/core/timeseries/timeseries_bucket_limit_count.js +++ b/jstests/core/timeseries/timeseries_bucket_limit_count.js @@ -13,6 +13,9 @@ load("jstests/core/timeseries/libs/timeseries.js"); TimeseriesTest.run((insert) => { + const isTimeseriesBucketCompressionEnabled = + TimeseriesTest.timeseriesBucketCompressionEnabled(db); + const collNamePrefix = 'timeseries_bucket_limit_count_'; // Assumes each bucket has a limit of 1000 measurements. @@ -68,6 +71,9 @@ TimeseriesTest.run((insert) => { assert.eq(bucketMaxCount - 1, bucketDocs[0].control.max.x, 'invalid control.max for x in first bucket: ' + tojson(bucketDocs)); + assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1, + bucketDocs[0].control.version, + 'unexpected control.version in first bucket: ' + tojson(bucketDocs)); // Second bucket should contain the remaining documents. assert.eq(bucketMaxCount, @@ -82,6 +88,9 @@ TimeseriesTest.run((insert) => { assert.eq(numDocs - 1, bucketDocs[1].control.max.x, 'invalid control.max for x in second bucket: ' + tojson(bucketDocs)); + assert.eq(1, + bucketDocs[1].control.version, + 'unexpected control.version in first bucket: ' + tojson(bucketDocs)); }; runTest(1); diff --git a/jstests/core/timeseries/timeseries_bucket_limit_size.js b/jstests/core/timeseries/timeseries_bucket_limit_size.js index bb29d1141a8..2d410f9eb1c 100644 --- a/jstests/core/timeseries/timeseries_bucket_limit_size.js +++ b/jstests/core/timeseries/timeseries_bucket_limit_size.js @@ -13,17 +13,21 @@ load("jstests/core/timeseries/libs/timeseries.js"); TimeseriesTest.run((insert) => { + const isTimeseriesBucketCompressionEnabled = + TimeseriesTest.timeseriesBucketCompressionEnabled(db); + const collNamePrefix = 'timeseries_bucket_limit_size_'; const timeFieldName = 'time'; // Assumes each bucket has a limit of 125kB on the measurements stored in the 'data' field. const bucketMaxSizeKB = 125; - const numDocs = 2; + const numDocs = 3; - // The measurement data should not take up all of the 'bucketMaxSizeKB' limit because we need - // to leave a little room for the _id and the time fields. - const largeValue = 'x'.repeat((bucketMaxSizeKB - 1) * 1024); + // The measurement data should not take up all of the 'bucketMaxSizeKB' limit because we need to + // leave a little room for the _id and the time fields. We need to fit two measurements within + // this limit to trigger compression if enabled. + const largeValue = 'x'.repeat(((bucketMaxSizeKB - 1) / 2) * 1024); const runTest = function(numDocsPerInsert) { const coll = db.getCollection(collNamePrefix + numDocsPerInsert); @@ -58,7 +62,7 @@ TimeseriesTest.run((insert) => { assert.eq(2, bucketDocs.length, bucketDocs); // Check both buckets. - // First bucket should be full with one document since we spill the second document over + // First bucket should be full with two documents since we spill the third document over // into the second bucket due to size constraints on 'data'. assert.eq(0, bucketDocs[0].control.min._id, @@ -66,12 +70,15 @@ TimeseriesTest.run((insert) => { assert.eq(largeValue, bucketDocs[0].control.min.x, 'invalid control.min for x in first bucket: ' + tojson(bucketDocs[0].control)); - assert.eq(0, + assert.eq(1, bucketDocs[0].control.max._id, 'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control)); assert.eq(largeValue, bucketDocs[0].control.max.x, 'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control)); + assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1, + bucketDocs[0].control.version, + 'unexpected control.version in first bucket: ' + tojson(bucketDocs)); // Second bucket should contain the remaining document. assert.eq(numDocs - 1, @@ -86,6 +93,9 @@ TimeseriesTest.run((insert) => { assert.eq(largeValue, bucketDocs[1].control.max.x, 'invalid control.max for x in second bucket: ' + tojson(bucketDocs[1].control)); + assert.eq(1, + bucketDocs[1].control.version, + 'unexpected control.version in first bucket: ' + tojson(bucketDocs)); }; runTest(1); diff --git a/jstests/core/timeseries/timeseries_bucket_limit_time_range.js b/jstests/core/timeseries/timeseries_bucket_limit_time_range.js index a46c8f52bf6..e2ece34dbb5 100644 --- a/jstests/core/timeseries/timeseries_bucket_limit_time_range.js +++ b/jstests/core/timeseries/timeseries_bucket_limit_time_range.js @@ -13,13 +13,23 @@ load("jstests/core/timeseries/libs/timeseries.js"); TimeseriesTest.run((insert) => { + const isTimeseriesBucketCompressionEnabled = + TimeseriesTest.timeseriesBucketCompressionEnabled(db); + const collNamePrefix = 'timeseries_bucket_limit_time_range_'; const timeFieldName = 'time'; // Assumes the measurements in each bucket span at most one hour (based on the time field). - const docTimes = [ISODate("2020-11-13T01:00:00Z"), ISODate("2020-11-13T03:00:00Z")]; - const numDocs = 2; + // Make sure we have three measurements to trigger compression if enabled. The data types in + // this test are so small so two measurements may not yield a smaller compressed object + const docTimes = [ + ISODate("2020-11-13T01:00:00Z"), + ISODate("2020-11-13T01:00:01Z"), + ISODate("2020-11-13T01:00:02Z"), + ISODate("2020-11-13T03:00:00Z") + ]; + const numDocs = 4; const runTest = function(numDocsPerInsert) { const coll = db.getCollection(collNamePrefix + numDocsPerInsert); @@ -67,15 +77,18 @@ TimeseriesTest.run((insert) => { assert.eq(docTimes[0], bucketDocs[0].control.min[timeFieldName], 'invalid control.min for time in first bucket: ' + tojson(bucketDocs[0].control)); - assert.eq(0, + assert.eq(2, bucketDocs[0].control.max._id, 'invalid control.max for _id in first bucket: ' + tojson(bucketDocs[0].control)); - assert.eq(0, + assert.eq(2, bucketDocs[0].control.max.x, 'invalid control.max for x in first bucket: ' + tojson(bucketDocs[0].control)); - assert.eq(docTimes[0], + assert.eq(docTimes[2], bucketDocs[0].control.max[timeFieldName], 'invalid control.max for time in first bucket: ' + tojson(bucketDocs[0].control)); + assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1, + bucketDocs[0].control.version, + 'unexpected control.version in first bucket: ' + tojson(bucketDocs)); // Second bucket should contain the remaining document. assert.eq(numDocs - 1, @@ -98,6 +111,9 @@ TimeseriesTest.run((insert) => { docTimes[numDocs - 1], bucketDocs[1].control.max[timeFieldName], 'invalid control.max for time in second bucket: ' + tojson(bucketDocs[1].control)); + assert.eq(1, + bucketDocs[1].control.version, + 'unexpected control.version in first bucket: ' + tojson(bucketDocs)); }; runTest(1); diff --git a/jstests/core/timeseries/timeseries_idle_buckets.js b/jstests/core/timeseries/timeseries_idle_buckets.js index b0d183b493a..dc402a7fadb 100644 --- a/jstests/core/timeseries/timeseries_idle_buckets.js +++ b/jstests/core/timeseries/timeseries_idle_buckets.js @@ -14,11 +14,15 @@ load("jstests/core/timeseries/libs/timeseries.js"); TimeseriesTest.run((insert) => { + const isTimeseriesBucketCompressionEnabled = + TimeseriesTest.timeseriesBucketCompressionEnabled(db); + const coll = db.timeseries_idle_buckets; const bucketsColl = db.getCollection('system.buckets.' + coll.getName()); const timeFieldName = 'time'; const metaFieldName = 'meta'; + const valueFieldName = 'value'; coll.drop(); assert.commandWorked(db.createCollection( @@ -30,18 +34,44 @@ TimeseriesTest.run((insert) => { const numDocs = 100; const metaValue = 'a'.repeat(1024 * 1024); for (let i = 0; i < numDocs; i++) { - assert.commandWorked(insert( - coll, {[timeFieldName]: ISODate(), [metaFieldName]: {[i.toString()]: metaValue}})); + // Insert a couple of measurements in the bucket to make sure compression is triggered if + // enabled + assert.commandWorked(insert(coll, { + [timeFieldName]: ISODate(), + [metaFieldName]: {[i.toString()]: metaValue}, + [valueFieldName]: 0 + })); + assert.commandWorked(insert(coll, { + [timeFieldName]: ISODate(), + [metaFieldName]: {[i.toString()]: metaValue}, + [valueFieldName]: 1 + })); + assert.commandWorked(insert(coll, { + [timeFieldName]: ISODate(), + [metaFieldName]: {[i.toString()]: metaValue}, + [valueFieldName]: 3 + })); } // Insert a document with the metadata of a bucket which should have been expired. Thus, a new // bucket will be created. - assert.commandWorked( - insert(coll, {[timeFieldName]: ISODate(), [metaFieldName]: {0: metaValue}})); - let bucketDocs = bucketsColl.find({meta: {0: metaValue}}).toArray(); + assert.commandWorked(insert( + coll, {[timeFieldName]: ISODate(), [metaFieldName]: {0: metaValue}, [valueFieldName]: 3})); + + // Check buckets. + let bucketDocs = + bucketsColl.find({meta: {0: metaValue}}).sort({'control.min._id': 1}).toArray(); assert.eq( bucketDocs.length, 2, 'Invalid number of buckets for metadata 0: ' + tojson(bucketDocs)); + // If bucket compression is enabled the expired bucket should have been compressed + assert.eq(isTimeseriesBucketCompressionEnabled ? 2 : 1, + bucketDocs[0].control.version, + 'unexpected control.version in first bucket: ' + tojson(bucketDocs)); + assert.eq(1, + bucketDocs[1].control.version, + 'unexpected control.version in second bucket: ' + tojson(bucketDocs)); + // Insert a document with the metadata of a bucket with should still be open. Thus, the existing // bucket will be used. assert.commandWorked( @@ -51,5 +81,8 @@ TimeseriesTest.run((insert) => { bucketDocs.length, 1, 'Invalid number of buckets for metadata ' + (numDocs - 1) + ': ' + tojson(bucketDocs)); + assert.eq(1, + bucketDocs[0].control.version, + 'unexpected control.version in second bucket: ' + tojson(bucketDocs)); }); })(); diff --git a/src/mongo/bson/util/bsoncolumnbuilder.cpp b/src/mongo/bson/util/bsoncolumnbuilder.cpp index 1d7ba9334c4..c7dee5223cb 100644 --- a/src/mongo/bson/util/bsoncolumnbuilder.cpp +++ b/src/mongo/bson/util/bsoncolumnbuilder.cpp @@ -60,13 +60,18 @@ std::pair<int64_t, uint8_t> scaleAndEncodeDouble(double value, uint8_t minScaleI } // namespace BSONColumnBuilder::BSONColumnBuilder(StringData fieldName) + : BSONColumnBuilder(fieldName, BufBuilder()) {} + +BSONColumnBuilder::BSONColumnBuilder(StringData fieldName, BufBuilder&& builder) : _simple8bBuilder64(_createBufferWriter()), _simple8bBuilder128(_createBufferWriter()), _scaleIndex(Simple8bTypeUtil::kMemoryAsInteger), + _bufBuilder(std::move(builder)), _fieldName(fieldName) { // Leave space for element count at the beginning static_assert(sizeof(_elementCount) == kElementCountBytes, "Element count for BSONColumn should be 4 bytes"); + _bufBuilder.reset(); _bufBuilder.skip(kElementCountBytes); // Store EOO type with empty field name as previous. _storePrevious(BSONElement()); @@ -370,6 +375,11 @@ BSONBinData BSONColumnBuilder::finalize() { return {_bufBuilder.buf(), _bufBuilder.len(), BinDataType::Column}; } +BufBuilder BSONColumnBuilder::detach() { + return std::move(_bufBuilder); +} + + void BSONColumnBuilder::_storePrevious(BSONElement elem) { auto valuesize = elem.valuesize(); diff --git a/src/mongo/bson/util/bsoncolumnbuilder.h b/src/mongo/bson/util/bsoncolumnbuilder.h index 449a08e9a2c..036f30cccf9 100644 --- a/src/mongo/bson/util/bsoncolumnbuilder.h +++ b/src/mongo/bson/util/bsoncolumnbuilder.h @@ -45,6 +45,7 @@ namespace mongo { class BSONColumnBuilder { public: BSONColumnBuilder(StringData fieldName); + BSONColumnBuilder(StringData fieldName, BufBuilder&& builder); BSONColumnBuilder(BSONColumnBuilder&&) = delete; /** @@ -84,6 +85,12 @@ public: */ BSONBinData finalize(); + /** + * Detaches the buffer associated with this BSONColumnBuilder. Allows the memory to be reused + * for building another BSONColumn. + */ + BufBuilder detach(); + private: BSONElement _previous() const; diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 712dd0d573e..3d72dd2dbd7 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -363,6 +363,7 @@ env.Library( '$BUILD_DIR/mongo/db/storage/storage_engine_common', "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl", '$BUILD_DIR/mongo/db/timeseries/bucket_catalog', + '$BUILD_DIR/mongo/db/timeseries/bucket_compression', '$BUILD_DIR/mongo/db/timeseries/catalog_helper', '$BUILD_DIR/mongo/db/timeseries/timeseries_commands_conversion_helper', '$BUILD_DIR/mongo/db/timeseries/timeseries_index_schema_conversion_functions', diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index d2b41047053..ceafad09b8a 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + #include "mongo/base/checked_cast.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/mutable/document.h" @@ -65,12 +67,14 @@ #include "mongo/db/storage/duplicate_key_error_info.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/db/timeseries/bucket_catalog.h" +#include "mongo/db/timeseries/bucket_compression.h" #include "mongo/db/timeseries/timeseries_constants.h" #include "mongo/db/timeseries/timeseries_index_schema_conversion_functions.h" #include "mongo/db/timeseries/timeseries_update_delete_util.h" #include "mongo/db/transaction_participant.h" #include "mongo/db/views/view_catalog.h" #include "mongo/db/write_concern.h" +#include "mongo/logv2/log.h" #include "mongo/logv2/redaction.h" #include "mongo/s/stale_exception.h" #include "mongo/util/fail_point.h" @@ -209,6 +213,20 @@ write_ops::UpdateOpEntry makeTimeseriesUpdateOpEntry( } /** + * Transforms a single time-series insert to an update request on an existing bucket. + */ +write_ops::UpdateOpEntry makeTimeseriesCompressionOpEntry( + OperationContext* opCtx, + const OID& bucketId, + write_ops::UpdateModification::TransformFunc compressionFunc) { + write_ops::UpdateModification u(std::move(compressionFunc)); + write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u)); + invariant(!update.getMulti(), bucketId.toString()); + invariant(!update.getUpsert(), bucketId.toString()); + return update; +} + +/** * Returns the document for inserting a new bucket. */ BSONObj makeTimeseriesInsertDocument(std::shared_ptr<BucketCatalog::WriteBatch> batch, @@ -611,6 +629,27 @@ public: return op; } + write_ops::UpdateCommandRequest _makeTimeseriesCompressionOp( + OperationContext* opCtx, + const OID& bucketId, + write_ops::UpdateModification::TransformFunc compressionFunc) const { + write_ops::UpdateCommandRequest op( + makeTimeseriesBucketsNamespace(ns()), + {makeTimeseriesCompressionOpEntry(opCtx, bucketId, std::move(compressionFunc))}); + + write_ops::WriteCommandRequestBase base; + // The schema validation configured in the bucket collection is intended for direct + // operations by end users and is not applicable here. + base.setBypassDocumentValidation(true); + + // Timeseries compression operation is not a user operation and should not use a + // statement id from any user op. Set to Uninitialized to bypass. + base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId}); + + op.setWriteCommandRequestBase(std::move(base)); + return op; + } + StatusWith<SingleWriteResult> _performTimeseriesInsert( OperationContext* opCtx, std::shared_ptr<BucketCatalog::WriteBatch> batch, @@ -641,6 +680,40 @@ public: OperationSource::kTimeseriesInsert)); } + StatusWith<SingleWriteResult> _performTimeseriesBucketCompression( + OperationContext* opCtx, const BucketCatalog::ClosedBucket& closedBucket) const { + if (!feature_flags::gTimeseriesBucketCompression.isEnabled( + serverGlobalParams.featureCompatibility)) { + return SingleWriteResult(); + } + + // Buckets with just a single measurement is not worth compressing. + if (closedBucket.numMeasurements <= 1) { + return SingleWriteResult(); + } + + auto bucketCompressionFunc = + [&closedBucket](const BSONObj& bucketDoc) -> boost::optional<BSONObj> { + auto compressed = timeseries::compressBucket(bucketDoc, closedBucket.timeField); + // If compressed object size is larger than uncompressed, skip compression update. + if (compressed && compressed->objsize() > bucketDoc.objsize()) { + LOGV2_DEBUG(5857802, + 1, + "Skipping time-series bucket compression, compressed object is " + "larger than original", + "originalSize"_attr = bucketDoc.objsize(), + "compressedSize"_attr = compressed->objsize()); + return boost::none; + } + return compressed; + }; + + return _getTimeseriesSingleWriteResult(write_ops_exec::performUpdates( + opCtx, + _makeTimeseriesCompressionOp(opCtx, closedBucket.bucketId, bucketCompressionFunc), + OperationSource::kStandard)); + } + void _commitTimeseriesBucket(OperationContext* opCtx, std::shared_ptr<BucketCatalog::WriteBatch> batch, size_t start, @@ -705,8 +778,19 @@ public: getOpTimeAndElectionId(opCtx, opTime, electionId); - bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId}); + auto closedBucket = + bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId}); + batchGuard.dismiss(); + + if (closedBucket) { + // If this write closed a bucket, compress the bucket + auto result = _performTimeseriesBucketCompression(opCtx, *closedBucket); + if (auto error = generateError(opCtx, result, start + index, errors->size())) { + errors->push_back(*error); + return; + } + } } bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx, @@ -769,21 +853,38 @@ public: getOpTimeAndElectionId(opCtx, opTime, electionId); + bool compressClosedBuckets = true; for (auto batch : batchesToCommit) { - bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId}); + auto closedBucket = + bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId}); batch.get().reset(); + + if (!closedBucket || !compressClosedBuckets) { + continue; + } + + // If this write closed a bucket, compress the bucket + auto ret = _performTimeseriesBucketCompression(opCtx, *closedBucket); + if (!ret.isOK()) { + // Don't try to compress any other buckets if we fail. We're not allowed to do + // more write operations. + compressClosedBuckets = false; + } } return true; } - std::tuple<TimeseriesBatches, TimeseriesStmtIds, size_t> _insertIntoBucketCatalog( - OperationContext* opCtx, - size_t start, - size_t numDocs, - const std::vector<size_t>& indices, - std::vector<BSONObj>* errors, - bool* containsRetry) const { + std::tuple<TimeseriesBatches, + TimeseriesStmtIds, + size_t /* numInserted */, + bool /* canContinue */> + _insertIntoBucketCatalog(OperationContext* opCtx, + size_t start, + size_t numDocs, + const std::vector<size_t>& indices, + std::vector<BSONObj>* errors, + bool* containsRetry) const { auto& bucketCatalog = BucketCatalog::get(opCtx); auto bucketsNs = makeTimeseriesBucketsNamespace(ns()); @@ -800,6 +901,7 @@ public: TimeseriesBatches batches; TimeseriesStmtIds stmtIds; + bool canContinue = true; auto insert = [&](size_t index) { invariant(start + index < request().getDocuments().size()); @@ -828,13 +930,32 @@ public: errors->push_back(*error); return false; } else { - const auto& batch = result.getValue(); + const auto& batch = result.getValue().batch; batches.emplace_back(batch, index); if (isTimeseriesWriteRetryable(opCtx)) { stmtIds[batch->bucket()].push_back(stmtId); } } + // If this insert closed buckets, rewrite to be a compressed column. If we cannot + // perform write operations at this point the bucket will be left uncompressed. + for (const auto& closedBucket : result.getValue().closedBuckets) { + if (!canContinue) { + break; + } + + // If this write closed a bucket, compress the bucket + auto ret = _performTimeseriesBucketCompression(opCtx, closedBucket); + if (auto error = generateError(opCtx, ret, start + index, errors->size())) { + // Bucket compression only fail when we may not try to perform any other + // write operation. When handleError() inside write_ops_exec.cpp return + // false. + errors->push_back(*error); + canContinue = false; + return false; + } + } + return true; }; @@ -843,12 +964,15 @@ public: } else { for (size_t i = 0; i < numDocs; i++) { if (!insert(i) && request().getOrdered()) { - return {std::move(batches), std::move(stmtIds), i}; + return {std::move(batches), std::move(stmtIds), i, canContinue}; } } } - return {std::move(batches), std::move(stmtIds), request().getDocuments().size()}; + return {std::move(batches), + std::move(stmtIds), + request().getDocuments().size(), + canContinue}; } void _getTimeseriesBatchResults(OperationContext* opCtx, @@ -889,8 +1013,13 @@ public: boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, bool* containsRetry) const { - auto [batches, stmtIds, numInserted] = _insertIntoBucketCatalog( + auto [batches, stmtIds, numInserted, canContinue] = _insertIntoBucketCatalog( opCtx, 0, request().getDocuments().size(), {}, errors, containsRetry); + if (!canContinue) { + // If we are not allowed to continue with any write operation return true here to + // prevent the ordered inserts from being retried one by one. + return true; + } hangTimeseriesInsertBeforeCommit.pauseWhileSet(); @@ -942,13 +1071,17 @@ public: boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, bool* containsRetry) const { - auto [batches, bucketStmtIds, _] = + auto [batches, bucketStmtIds, _, canContinue] = _insertIntoBucketCatalog(opCtx, start, numDocs, indices, errors, containsRetry); hangTimeseriesInsertBeforeCommit.pauseWhileSet(); std::vector<size_t> docsToRetry; + if (!canContinue) { + return docsToRetry; + } + for (auto& [batch, index] : batches) { if (batch->claimCommitRights()) { auto stmtIds = isTimeseriesWriteRetryable(opCtx) diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript index 963d12c5372..88655a990db 100644 --- a/src/mongo/db/exec/SConscript +++ b/src/mongo/db/exec/SConscript @@ -151,7 +151,7 @@ env.CppUnitTest( ], LIBDEPS=[ "$BUILD_DIR/mongo/base", - "$BUILD_DIR/mongo/bson/util/bson_column", + "$BUILD_DIR/mongo/bson/util/bson_column", "$BUILD_DIR/mongo/db/auth/authmocks", "$BUILD_DIR/mongo/db/query/collation/collator_factory_mock", "$BUILD_DIR/mongo/db/query/collation/collator_interface_mock", @@ -159,6 +159,7 @@ env.CppUnitTest( "$BUILD_DIR/mongo/db/query_exec", "$BUILD_DIR/mongo/db/record_id_helpers", "$BUILD_DIR/mongo/db/service_context_d_test_fixture", + "$BUILD_DIR/mongo/db/timeseries/bucket_compression", "$BUILD_DIR/mongo/dbtests/mocklib", "$BUILD_DIR/mongo/util/clock_source_mock", "document_value/document_value", diff --git a/src/mongo/db/exec/bucket_unpacker_test.cpp b/src/mongo/db/exec/bucket_unpacker_test.cpp index fcb285dc94b..85e2bfb45d5 100644 --- a/src/mongo/db/exec/bucket_unpacker_test.cpp +++ b/src/mongo/db/exec/bucket_unpacker_test.cpp @@ -30,9 +30,9 @@ #include "mongo/platform/basic.h" #include "mongo/bson/json.h" -#include "mongo/bson/util/bsoncolumnbuilder.h" #include "mongo/db/exec/bucket_unpacker.h" #include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/timeseries/bucket_compression.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -102,65 +102,6 @@ public: BSONObj obj = root.obj(); return {obj, "time"_sd}; } - - // Simple bucket compressor. Does not handle data fields out of order and does not sort fields - // on time. - // TODO (SERVER-58578): Replace with real bucket compressor - BSONObj compress(BSONObj uncompressed, StringData timeField) { - // Rewrite data fields as columns. - BSONObjBuilder builder; - for (auto& elem : uncompressed) { - if (elem.fieldNameStringData() == "control") { - BSONObjBuilder control(builder.subobjStart("control")); - - // Set right version, leave other control fields unchanged - for (const auto& controlField : elem.Obj()) { - if (controlField.fieldNameStringData() == "version") { - control.append("version", 2); - } else { - control.append(controlField); - } - } - - continue; - } - if (elem.fieldNameStringData() != "data") { - // Non-data fields can be unchanged. - builder.append(elem); - continue; - } - - BSONObjBuilder dataBuilder = builder.subobjStart("data"); - std::list<BSONColumnBuilder> columnBuilders; - size_t numTimeFields = 0; - for (auto& column : elem.Obj()) { - // Compress all data fields - columnBuilders.emplace_back(column.fieldNameStringData()); - auto& columnBuilder = columnBuilders.back(); - - for (auto& measurement : column.Obj()) { - int index = std::atoi(measurement.fieldName()); - for (int i = columnBuilder.size(); i < index; ++i) { - columnBuilder.skip(); - } - columnBuilder.append(measurement); - } - if (columnBuilder.fieldName() == timeField) { - numTimeFields = columnBuilder.size(); - } - } - - for (auto& builder : columnBuilders) { - for (size_t i = builder.size(); i < numTimeFields; ++i) { - builder.skip(); - } - - BSONBinData binData = builder.finalize(); - dataBuilder.append(builder.fieldName(), binData); - } - } - return builder.obj(); - } }; TEST_F(BucketUnpackerTest, UnpackBasicIncludeAllMeasurementFields) { @@ -212,7 +153,7 @@ TEST_F(BucketUnpackerTest, ExcludeASingleField) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, EmptyIncludeGetsEmptyMeasurements) { @@ -238,7 +179,7 @@ TEST_F(BucketUnpackerTest, EmptyIncludeGetsEmptyMeasurements) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, EmptyExcludeMaterializesAllFields) { @@ -266,7 +207,7 @@ TEST_F(BucketUnpackerTest, EmptyExcludeMaterializesAllFields) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, SparseColumnsWhereOneColumnIsExhaustedBeforeTheOther) { @@ -292,7 +233,7 @@ TEST_F(BucketUnpackerTest, SparseColumnsWhereOneColumnIsExhaustedBeforeTheOther) }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, UnpackBasicIncludeWithDollarPrefix) { @@ -321,7 +262,7 @@ TEST_F(BucketUnpackerTest, UnpackBasicIncludeWithDollarPrefix) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, BucketsWithMetadataOnly) { @@ -346,7 +287,7 @@ TEST_F(BucketUnpackerTest, BucketsWithMetadataOnly) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, UnorderedRowKeysDoesntAffectMaterialization) { @@ -404,7 +345,7 @@ TEST_F(BucketUnpackerTest, MissingMetaFieldDoesntMaterializeMetadata) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, MissingMetaFieldDoesntMaterializeMetadataUnorderedKeys) { @@ -459,7 +400,7 @@ TEST_F(BucketUnpackerTest, ExcludedMetaFieldDoesntMaterializeMetadataWhenBucketH }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUndefinedMeta) { @@ -478,7 +419,7 @@ TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUndefinedMeta) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUnexpectedMeta) { @@ -498,7 +439,7 @@ TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnUnexpectedMeta) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, NullMetaInBucketMaterializesAsNull) { @@ -525,7 +466,7 @@ TEST_F(BucketUnpackerTest, NullMetaInBucketMaterializesAsNull) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, GetNextHandlesMissingMetaInBucket) { @@ -557,7 +498,7 @@ TEST_F(BucketUnpackerTest, GetNextHandlesMissingMetaInBucket) { }; test(bucket); - test(compress(bucket, "time"_sd)); + test(*timeseries::compressBucket(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, EmptyDataRegionInBucketIsTolerated) { @@ -576,7 +517,6 @@ TEST_F(BucketUnpackerTest, EmptyDataRegionInBucketIsTolerated) { }; test(bucket); - test(compress(bucket, "time"_sd)); } TEST_F(BucketUnpackerTest, UnpackerResetThrowsOnEmptyBucket) { diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp index 5c60f26fb6c..ea387ae920c 100644 --- a/src/mongo/db/ops/write_ops.cpp +++ b/src/mongo/db/ops/write_ops.cpp @@ -247,6 +247,9 @@ write_ops::UpdateModification write_ops::UpdateModification::parseFromOplogEntry write_ops::UpdateModification::UpdateModification(doc_diff::Diff diff, DiffOptions options) : _update(DeltaUpdate{std::move(diff), options}) {} +write_ops::UpdateModification::UpdateModification(TransformFunc transform) + : _update(TransformUpdate{std::move(transform)}) {} + write_ops::UpdateModification::UpdateModification(BSONElement update) { const auto type = update.type(); if (type == BSONType::Object) { @@ -297,7 +300,8 @@ int write_ops::UpdateModification::objsize() const { return size + kWriteCommandBSONArrayPerElementOverheadBytes; }, - [](const DeltaUpdate& delta) -> int { return delta.diff.objsize(); }}, + [](const DeltaUpdate& delta) -> int { return delta.diff.objsize(); }, + [](const TransformUpdate& transform) -> int { return 0; }}, _update); } @@ -307,7 +311,8 @@ write_ops::UpdateModification::Type write_ops::UpdateModification::type() const visit_helper::Overloaded{ [](const ClassicUpdate& classic) { return Type::kClassic; }, [](const PipelineUpdate& pipelineUpdate) { return Type::kPipeline; }, - [](const DeltaUpdate& delta) { return Type::kDelta; }}, + [](const DeltaUpdate& delta) { return Type::kDelta; }, + [](const TransformUpdate& transform) { return Type::kTransform; }}, _update); } @@ -328,7 +333,8 @@ void write_ops::UpdateModification::serializeToBSON(StringData fieldName, } arrayBuilder.doneFast(); }, - [fieldName, bob](const DeltaUpdate& delta) { *bob << fieldName << delta.diff; }}, + [fieldName, bob](const DeltaUpdate& delta) { *bob << fieldName << delta.diff; }, + [](const TransformUpdate& transform) {}}, _update); } diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h index 788ae259722..4a70bfdc0da 100644 --- a/src/mongo/db/ops/write_ops_parsers.h +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -79,7 +79,8 @@ repl::OpTime opTimeParser(BSONElement elem); class UpdateModification { public: - enum class Type { kClassic, kPipeline, kDelta }; + enum class Type { kClassic, kPipeline, kDelta, kTransform }; + using TransformFunc = std::function<boost::optional<BSONObj>(const BSONObj&)>; /** * Used to indicate that a certain type of update is being passed to the constructor. @@ -105,6 +106,8 @@ public: UpdateModification(BSONElement update); UpdateModification(std::vector<BSONObj> pipeline); UpdateModification(doc_diff::Diff, DiffOptions); + // Creates an transform-style update. The transform function MUST preserve the _id element. + UpdateModification(TransformFunc transform); // This constructor exists only to provide a fast-path for constructing classic-style updates. UpdateModification(const BSONObj& update, ClassicTag); @@ -141,6 +144,11 @@ public: return stdx::get<DeltaUpdate>(_update).diff; } + const TransformFunc& getTransform() const { + invariant(type() == Type::kTransform); + return stdx::get<TransformUpdate>(_update).transform; + } + bool mustCheckExistenceForInsertOperations() const { invariant(type() == Type::kDelta); return stdx::get<DeltaUpdate>(_update).options.mustCheckExistenceForInsertOperations; @@ -149,19 +157,19 @@ public: std::string toString() const { StringBuilder sb; - stdx::visit(visit_helper::Overloaded{[&sb](const ClassicUpdate& classic) { - sb << "{type: Classic, update: " << classic.bson - << "}"; - }, - [&sb](const PipelineUpdate& pipeline) { - sb << "{type: Pipeline, update: " - << Value(pipeline).toString() << "}"; - }, - [&sb](const DeltaUpdate& delta) { - sb << "{type: Delta, update: " << delta.diff - << "}"; - }}, - _update); + stdx::visit( + visit_helper::Overloaded{ + [&sb](const ClassicUpdate& classic) { + sb << "{type: Classic, update: " << classic.bson << "}"; + }, + [&sb](const PipelineUpdate& pipeline) { + sb << "{type: Pipeline, update: " << Value(pipeline).toString() << "}"; + }, + [&sb](const DeltaUpdate& delta) { + sb << "{type: Delta, update: " << delta.diff << "}"; + }, + [&sb](const TransformUpdate& transform) { sb << "{type: Transform}"; }}, + _update); return sb.str(); } @@ -176,7 +184,10 @@ private: doc_diff::Diff diff; DiffOptions options; }; - stdx::variant<ClassicUpdate, PipelineUpdate, DeltaUpdate> _update; + struct TransformUpdate { + TransformFunc transform; + }; + stdx::variant<ClassicUpdate, PipelineUpdate, DeltaUpdate, TransformUpdate> _update; }; } // namespace write_ops diff --git a/src/mongo/db/timeseries/SConscript b/src/mongo/db/timeseries/SConscript index 18ae4a3885b..505e7ba6cec 100644 --- a/src/mongo/db/timeseries/SConscript +++ b/src/mongo/db/timeseries/SConscript @@ -34,6 +34,19 @@ env.Library( ) env.Library( + target='bucket_compression', + source=[ + 'bucket_compression.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/bson/util/bson_column', + ] +) + +env.Library( target='timeseries_index_schema_conversion_functions', source=[ 'timeseries_index_schema_conversion_functions.cpp', diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 2780942ce78..3b5da81a3bb 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -177,7 +177,7 @@ BSONObj BucketCatalog::getMetadata(Bucket* ptr) const { return bucket->_metadata.toBSON(); } -StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( +StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( OperationContext* opCtx, const NamespaceString& ns, const StringData::ComparatorInterface* comparator, @@ -204,7 +204,8 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( auto time = timeElem.Date(); - BucketAccess bucket{this, key, options, stats.get(), time}; + ClosedBuckets closedBuckets; + BucketAccess bucket{this, key, options, stats.get(), &closedBuckets, time}; invariant(bucket); NewFieldNames newFieldNamesToBeInserted; @@ -239,7 +240,8 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( }; if (!bucket->_ns.isEmpty() && isBucketFull(&bucket)) { - bucket.rollover(isBucketFull); + bucket.rollover(isBucketFull, &closedBuckets); + bucket->_calculateBucketFieldsAndSizeChange(doc, options.getMetaField(), &newFieldNamesToBeInserted, @@ -273,7 +275,7 @@ StatusWith<std::shared_ptr<BucketCatalog::WriteBatch>> BucketCatalog::insert( } _memoryUsage.fetchAndAdd(bucket->_memoryUsage); - return batch; + return InsertResult{batch, closedBuckets}; } bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { @@ -302,10 +304,13 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { return true; } -void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info) { +boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( + std::shared_ptr<WriteBatch> batch, const CommitInfo& info) { invariant(!batch->finished()); invariant(!batch->active()); + boost::optional<ClosedBucket> closedBucket; + Bucket* ptr(batch->bucket()); batch->_finish(info); @@ -346,6 +351,9 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& bucket.release(); auto lk = _lockExclusive(); + closedBucket = + ClosedBucket{ptr->_id, ptr->getTimeField().toString(), ptr->numMeasurements()}; + // Only remove from _allBuckets and _idleBuckets. If it was marked full, we know that // happened in BucketAccess::rollover, and that there is already a new open bucket for // this metadata. @@ -359,6 +367,7 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& _markBucketIdle(bucket); } } + return closedBucket; } void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, @@ -580,18 +589,25 @@ void BucketCatalog::_verifyBucketIsUnused(Bucket* bucket) const { stdx::lock_guard<Mutex> lk{bucket->_mutex}; } -void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats) { +void BucketCatalog::_expireIdleBuckets(ExecutionStats* stats, + std::vector<BucketCatalog::ClosedBucket>* closedBuckets) { // Must hold an exclusive lock on _bucketMutex from outside. stdx::lock_guard lk{_idleMutex}; - // As long as we still need space and have entries, close idle buckets. + // As long as we still need space and have entries and remaining attempts, close idle buckets. + int32_t numClosed = 0; while (!_idleBuckets.empty() && _memoryUsage.load() > - static_cast<std::uint64_t>(gTimeseriesIdleBucketExpiryMemoryUsageThreshold)) { + static_cast<std::uint64_t>(gTimeseriesIdleBucketExpiryMemoryUsageThreshold) && + numClosed <= gTimeseriesIdleBucketExpiryMaxCountPerAttempt) { Bucket* bucket = _idleBuckets.back(); _verifyBucketIsUnused(bucket); + ClosedBucket closed{ + bucket->id(), bucket->getTimeField().toString(), bucket->numMeasurements()}; if (_removeBucket(bucket, true /* expiringBuckets */)) { stats->numBucketsClosedDueToMemoryThreshold.fetchAndAddRelaxed(1); + closedBuckets->push_back(closed); + ++numClosed; } } } @@ -605,8 +621,9 @@ BucketCatalog::Bucket* BucketCatalog::_allocateBucket(const BucketKey& key, const Date_t& time, const TimeseriesOptions& options, ExecutionStats* stats, + ClosedBuckets* closedBuckets, bool openedDuetoMetadata) { - _expireIdleBuckets(stats); + _expireIdleBuckets(stats, closedBuckets); auto [it, inserted] = _allBuckets.insert(std::make_unique<Bucket>()); Bucket* bucket = it->get(); @@ -652,6 +669,7 @@ void BucketCatalog::_setIdTimestamp(Bucket* bucket, auto roundedTime = timeseries::roundTimestampToGranularity(time, options.getGranularity()); auto const roundedSeconds = durationCount<Seconds>(roundedTime.toDurationSinceEpoch()); bucket->_id.setTimestamp(roundedSeconds); + bucket->_timeField = options.getTimeField().toString(); // Make sure we set the control.min time field to match the rounded _id timestamp. auto controlDoc = buildControlMinTimestampDoc(options.getTimeField(), roundedTime); @@ -760,6 +778,10 @@ const OID& BucketCatalog::Bucket::id() const { return _id; } +StringData BucketCatalog::Bucket::getTimeField() { + return _timeField; +} + void BucketCatalog::Bucket::_calculateBucketFieldsAndSizeChange( const BSONObj& doc, boost::optional<StringData> metaField, @@ -806,6 +828,10 @@ bool BucketCatalog::Bucket::allCommitted() const { return _batches.empty() && !_preparedBatch; } +uint32_t BucketCatalog::Bucket::numMeasurements() const { + return _numMeasurements; +} + std::shared_ptr<BucketCatalog::WriteBatch> BucketCatalog::Bucket::_activeBatch( OperationId opId, const std::shared_ptr<ExecutionStats>& stats) { auto it = _batches.find(opId); @@ -819,6 +845,7 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, BucketKey& key, const TimeseriesOptions& options, ExecutionStats* stats, + ClosedBuckets* closedBuckets, const Date_t& time) : _catalog(catalog), _key(&key), _options(&options), _stats(stats), _time(&time) { @@ -875,7 +902,7 @@ BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, : key.withCopiedMetadata(BSONObj()); hashedKey.key = &originalBucketKey; auto lk = _catalog->_lockExclusive(); - _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey); + _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets); } BucketCatalog::BucketAccess::BucketAccess(BucketCatalog* catalog, @@ -977,11 +1004,13 @@ BucketCatalog::BucketState BucketCatalog::BucketAccess::_confirmStateForAcquired } void BucketCatalog::BucketAccess::_findOrCreateOpenBucketThenLock( - const HashedBucketKey& normalizedKey, const HashedBucketKey& nonNormalizedKey) { + const HashedBucketKey& normalizedKey, + const HashedBucketKey& nonNormalizedKey, + ClosedBuckets* closedBuckets) { auto it = _catalog->_openBuckets.find(normalizedKey); if (it == _catalog->_openBuckets.end()) { // No open bucket for this metadata. - _create(normalizedKey, nonNormalizedKey); + _create(normalizedKey, nonNormalizedKey, closedBuckets); return; } @@ -995,7 +1024,7 @@ void BucketCatalog::BucketAccess::_findOrCreateOpenBucketThenLock( } _catalog->_abort(_guard, _bucket, nullptr, boost::none); - _create(normalizedKey, nonNormalizedKey); + _create(normalizedKey, nonNormalizedKey, closedBuckets); } void BucketCatalog::BucketAccess::_acquire() { @@ -1005,10 +1034,11 @@ void BucketCatalog::BucketAccess::_acquire() { void BucketCatalog::BucketAccess::_create(const HashedBucketKey& normalizedKey, const HashedBucketKey& nonNormalizedKey, + ClosedBuckets* closedBuckets, bool openedDuetoMetadata) { invariant(_options); - _bucket = - _catalog->_allocateBucket(normalizedKey, *_time, *_options, _stats, openedDuetoMetadata); + _bucket = _catalog->_allocateBucket( + normalizedKey, *_time, *_options, _stats, closedBuckets, openedDuetoMetadata); _catalog->_openBuckets[nonNormalizedKey] = _bucket; _bucket->_nonNormalizedKeyMetadatas.push_back(nonNormalizedKey.key->metadata.toBSON()); _acquire(); @@ -1037,7 +1067,8 @@ BucketCatalog::BucketAccess::operator BucketCatalog::Bucket*() const { return _bucket; } -void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess*)>& isBucketFull) { +void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess*)>& isBucketFull, + ClosedBuckets* closedBuckets) { invariant(isLocked()); invariant(_key); invariant(_time); @@ -1054,7 +1085,7 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess auto hashedKey = BucketHasher{}.hashed_key(prevBucketKey); auto lk = _catalog->_lockExclusive(); - _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey); + _findOrCreateOpenBucketThenLock(hashedNormalizedKey, hashedKey, closedBuckets); // Recheck if still full now that we've reacquired the bucket. bool sameBucket = @@ -1064,6 +1095,9 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess if (_bucket->allCommitted()) { // The bucket does not contain any measurements that are yet to be committed, so we can // remove it now. Otherwise, we must keep the bucket around until it is committed. + closedBuckets->push_back(ClosedBucket{ + _bucket->id(), _bucket->getTimeField().toString(), _bucket->numMeasurements()}); + oldBucket = _bucket; release(); bool removed = _catalog->_removeBucket(oldBucket, false /* expiringBuckets */); @@ -1077,7 +1111,7 @@ void BucketCatalog::BucketAccess::rollover(const std::function<bool(BucketAccess release(); } - _create(hashedNormalizedKey, hashedKey, false /* openedDueToMetadata */); + _create(hashedNormalizedKey, hashedKey, closedBuckets, false /* openedDueToMetadata */); } } diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 44c59a9ede1..2c2b1994443 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -66,6 +66,16 @@ public: }; /** + * Information of a Bucket that got closed when performing an operation on this BucketCatalog. + */ + struct ClosedBucket { + OID bucketId; + std::string timeField; + uint32_t numMeasurements; + }; + using ClosedBuckets = std::vector<ClosedBucket>; + + /** * The basic unit of work for a bucket. Each insert will return a shared_ptr to a WriteBatch. * When a writer is finished with all their insertions, they should then take steps to ensure * each batch they wrote into is committed. To ensure a batch is committed, a writer should @@ -164,6 +174,15 @@ public: SharedPromise<CommitInfo> _promise; }; + /** + * Return type for the insert function. + * See comment above insert() for more information. + */ + struct InsertResult { + std::shared_ptr<WriteBatch> batch; + ClosedBuckets closedBuckets; + }; + static BucketCatalog& get(ServiceContext* svcCtx); static BucketCatalog& get(OperationContext* opCtx); @@ -183,17 +202,16 @@ public: BSONObj getMetadata(Bucket* bucket) const; /** - * Returns the WriteBatch into which the document was inserted. Any caller who receives the same - * batch may commit or abort the batch after claiming commit rights. See WriteBatch for more - * details. + * Returns the WriteBatch into which the document was inserted and optional information about a + * bucket if one was closed. Any caller who receives the same batch may commit or abort the + * batch after claiming commit rights. See WriteBatch for more details. */ - StatusWith<std::shared_ptr<WriteBatch>> insert( - OperationContext* opCtx, - const NamespaceString& ns, - const StringData::ComparatorInterface* comparator, - const TimeseriesOptions& options, - const BSONObj& doc, - CombineWithInsertsFromOtherClients combine); + StatusWith<InsertResult> insert(OperationContext* opCtx, + const NamespaceString& ns, + const StringData::ComparatorInterface* comparator, + const TimeseriesOptions& options, + const BSONObj& doc, + CombineWithInsertsFromOtherClients combine); /** * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have @@ -205,8 +223,10 @@ public: /** * Records the result of a batch commit. Caller must already have commit rights on batch, and * batch must have been previously prepared. + * + * Returns bucket information of a bucket if one was closed. */ - void finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info); + boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info); /** * Aborts the given write batch and any other outstanding batches on the same bucket. Caller @@ -342,10 +362,20 @@ public: const OID& id() const; /** + * Returns the timefield for the underlying bucket. + */ + StringData getTimeField(); + + /** * Returns whether all measurements have been committed. */ bool allCommitted() const; + /** + * Returns total number of measurements in the bucket. + */ + uint32_t numMeasurements() const; + private: /** * Determines the effect of adding 'doc' to this bucket. If adding 'doc' causes this bucket @@ -389,6 +419,9 @@ public: // Top-level field names of the measurements that have been inserted into the bucket. StringSet _fieldNames; + // Time field for the measurements that have been inserted into the bucket. + std::string _timeField; + // The minimum and maximum values for each field in the bucket. timeseries::MinMax _minmax; @@ -549,6 +582,7 @@ private: BucketKey& key, const TimeseriesOptions& options, ExecutionStats* stats, + ClosedBuckets* closedBuckets, const Date_t& time); BucketAccess(BucketCatalog* catalog, Bucket* bucket, @@ -568,8 +602,11 @@ private: * Parameter is a function which should check that the bucket is indeed still full after * reacquiring the necessary locks. The first parameter will give the function access to * this BucketAccess instance, with the bucket locked. + * + * Returns bucket information of a bucket if one was closed. */ - void rollover(const std::function<bool(BucketAccess*)>& isBucketFull); + void rollover(const std::function<bool(BucketAccess*)>& isBucketFull, + ClosedBuckets* closedBuckets); // Retrieve the time associated with the bucket (id) Date_t getTime() const; @@ -607,7 +644,8 @@ private: // Helper to find an open bucket for the given metadata if it exists, create it if it // doesn't, and lock it. Requires an exclusive lock on the catalog. void _findOrCreateOpenBucketThenLock(const HashedBucketKey& normalizedKey, - const HashedBucketKey& key); + const HashedBucketKey& key, + ClosedBuckets* closedBuckets); // Lock _bucket. void _acquire(); @@ -616,6 +654,7 @@ private: // a lock on it. void _create(const HashedBucketKey& normalizedKey, const HashedBucketKey& key, + ClosedBuckets* closedBuckets, bool openedDuetoMetadata = true); BucketCatalog* _catalog; @@ -675,7 +714,7 @@ private: /** * Expires idle buckets until the bucket catalog's memory usage is below the expiry threshold. */ - void _expireIdleBuckets(ExecutionStats* stats); + void _expireIdleBuckets(ExecutionStats* stats, ClosedBuckets* closedBuckets); std::size_t _numberOfIdleBuckets() const; @@ -684,6 +723,7 @@ private: const Date_t& time, const TimeseriesOptions& options, ExecutionStats* stats, + ClosedBuckets* closedBuckets, bool openedDuetoMetadata); std::shared_ptr<ExecutionStats> _getExecutionStats(const NamespaceString& ns); diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 8658b6807dd..02f056ff629 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -168,7 +168,7 @@ void BucketCatalogTest::_insertOneAndCommit(const NamespaceString& ns, _getTimeseriesOptions(ns), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto& batch = result.getValue(); + auto& batch = result.getValue().batch; _commit(batch, numPreviouslyCommittedMeasurements); } @@ -187,7 +187,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto batch1 = result1.getValue(); + auto batch1 = result1.getValue().batch; ASSERT(batch1->claimCommitRights()); ASSERT(batch1->active()); @@ -200,7 +200,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto batch2 = result2.getValue(); + auto batch2 = result2.getValue().batch; ASSERT_EQ(batch1, batch2); ASSERT(!batch2->claimCommitRights()); @@ -233,7 +233,8 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT(batch->claimCommitRights()); auto bucket = batch->bucket(); _bucketCatalog->abort(batch); @@ -264,20 +265,21 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); // Inserts should all be into three distinct buckets (and therefore batches). - ASSERT_NE(result1.getValue(), result2.getValue()); - ASSERT_NE(result1.getValue(), result3.getValue()); - ASSERT_NE(result2.getValue(), result3.getValue()); + ASSERT_NE(result1.getValue().batch, result2.getValue().batch); + ASSERT_NE(result1.getValue().batch, result3.getValue().batch); + ASSERT_NE(result2.getValue().batch, result3.getValue().batch); // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"), - _bucketCatalog->getMetadata(result1.getValue()->bucket())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()), - _bucketCatalog->getMetadata(result2.getValue()->bucket())); - ASSERT(_bucketCatalog->getMetadata(result3.getValue()->bucket()).isEmpty()); + _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); + ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucket()).isEmpty()); // Committing one bucket should only return the one document in that bucket and should not // affect the other bucket. - for (const auto& batch : {result1.getValue(), result2.getValue(), result3.getValue()}) { + for (const auto& batch : + {result1.getValue().batch, result2.getValue().batch, result3.getValue().batch}) { _commit(batch, 0); } } @@ -298,13 +300,13 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) { BSON(_timeField << Date_t::now() << _metaField << BSON_ARRAY(BSON("b" << 1 << "a" << 0))), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - ASSERT_EQ(result1.getValue(), result2.getValue()); + ASSERT_EQ(result1.getValue().batch, result2.getValue().batch); // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - _bucketCatalog->getMetadata(result1.getValue()->bucket())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - _bucketCatalog->getMetadata(result2.getValue()->bucket())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); } TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { @@ -327,17 +329,17 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { << BSON("g" << 0 << "f" << 1))))), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - ASSERT_EQ(result1.getValue(), result2.getValue()); + ASSERT_EQ(result1.getValue().batch, result2.getValue().batch); // Check metadata in buckets. ASSERT_BSONOBJ_EQ( BSON(_metaField << BSONObj(BSON( "c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - _bucketCatalog->getMetadata(result1.getValue()->bucket())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); ASSERT_BSONOBJ_EQ( BSON(_metaField << BSONObj(BSON( "c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - _bucketCatalog->getMetadata(result2.getValue()->bucket())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); } @@ -363,17 +365,17 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) { << "456"))))), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - ASSERT_EQ(result1.getValue(), result2.getValue()); + ASSERT_EQ(result1.getValue().batch, result2.getValue().batch); // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - _bucketCatalog->getMetadata(result1.getValue()->bucket())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - _bucketCatalog->getMetadata(result2.getValue()->bucket())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); } TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) { @@ -393,16 +395,16 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); // Inserts should all be into three distinct buckets (and therefore batches). - ASSERT_NE(result1.getValue(), result2.getValue()); + ASSERT_NE(result1.getValue().batch, result2.getValue().batch); // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL), - _bucketCatalog->getMetadata(result1.getValue()->bucket())); - ASSERT(_bucketCatalog->getMetadata(result2.getValue()->bucket()).isEmpty()); + _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); + ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucket()).isEmpty()); // Committing one bucket should only return the one document in that bucket and should not // affect the other bucket. - for (const auto& batch : {result1.getValue(), result2.getValue()}) { + for (const auto& batch : {result1.getValue().batch, result2.getValue().batch}) { _commit(batch, 0); } } @@ -444,7 +446,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT(batch1->claimCommitRights()); _bucketCatalog->prepareCommit(batch1); ASSERT_EQ(batch1->measurements().size(), 1); @@ -458,7 +461,8 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT_NE(batch1, batch2); _bucketCatalog->finish(batch1, {}); @@ -475,7 +479,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto& batch = result.getValue(); + auto& batch = result.getValue().batch; _bucketCatalog->prepareCommit(batch); } @@ -486,7 +490,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotFinishUnpreparedBatch, "invariant") { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto& batch = result.getValue(); + auto& batch = result.getValue().batch; ASSERT(batch->claimCommitRights()); _bucketCatalog->finish(batch, {}); } @@ -499,7 +503,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucket())); @@ -514,8 +519,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << 0), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - ASSERT_OK(result); - auto batch = result.getValue(); + ASSERT(result.isOK()); + auto batch = result.getValue().batch; auto oldId = batch->bucket()->id(); _commit(batch, 0); ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON(); @@ -530,8 +535,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << 1), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - ASSERT_OK(result); - batch = result.getValue(); + ASSERT(result.isOK()); + batch = result.getValue().batch; _commit(batch, 1); ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON(); @@ -542,8 +547,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << 2 << "b" << 2), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - ASSERT_OK(result); - batch = result.getValue(); + ASSERT(result.isOK()); + batch = result.getValue().batch; _commit(batch, 2); ASSERT_EQ(1U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON(); ASSERT(batch->newFieldNamesToBeInserted().count("b")) << batch->toBSON(); @@ -556,8 +561,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << i), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - ASSERT_OK(result); - batch = result.getValue(); + ASSERT(result.isOK()); + batch = result.getValue().batch; _commit(batch, i); ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << i << ":" << batch->toBSON(); } @@ -571,7 +576,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); - auto& batch2 = result2.getValue(); + auto& batch2 = result2.getValue().batch; ASSERT_NE(oldId, batch2->bucket()->id()); _commit(batch2, 0); ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON(); @@ -587,7 +592,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT(batch1->claimCommitRights()); _bucketCatalog->prepareCommit(batch1); ASSERT_EQ(batch1->measurements().size(), 1); @@ -601,7 +607,8 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT_NE(batch1, batch2); ASSERT(batch2->claimCommitRights()); @@ -622,7 +629,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT(batch->claimCommitRights()); _bucketCatalog->clear(_ns1); @@ -639,7 +647,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT(batch->claimCommitRights()); _bucketCatalog->prepareCommit(batch); ASSERT_EQ(batch->measurements().size(), 1); @@ -666,7 +675,8 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT(batch->claimCommitRights()); _bucketCatalog->prepareCommit(batch); ASSERT_EQ(batch->measurements().size(), 1); @@ -687,7 +697,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT(batch1->claimCommitRights()); _bucketCatalog->prepareCommit(batch1); ASSERT_EQ(batch1->measurements().size(), 1); @@ -701,7 +712,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT_NE(batch1, batch2); ASSERT_EQ(batch1->bucket(), batch2->bucket()); @@ -726,7 +738,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT_NE(batch1, batch3); ASSERT_NE(batch2, batch3); ASSERT_NE(batch1->bucket(), batch3->bucket()); @@ -748,7 +761,8 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT(batch->claimCommitRights()); _bucketCatalog->abort(batch); @@ -769,7 +783,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) - .getValue(); + .getValue() + .batch; auto batch2 = _bucketCatalog ->insert(_makeOperationContext().second.get(), @@ -778,7 +793,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) - .getValue(); + .getValue() + .batch; auto batch3 = _bucketCatalog ->insert(_makeOperationContext().second.get(), @@ -787,7 +803,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; auto batch4 = _bucketCatalog ->insert(_makeOperationContext().second.get(), @@ -796,7 +813,8 @@ TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) - .getValue(); + .getValue() + .batch; ASSERT_NE(batch1, batch2); ASSERT_NE(batch1, batch3); @@ -816,7 +834,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) - .getValue(); + .getValue() + .batch; auto batch2 = _bucketCatalog ->insert(_makeOperationContext().second.get(), @@ -825,7 +844,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) - .getValue(); + .getValue() + .batch; ASSERT(batch1->claimCommitRights()); ASSERT(batch2->claimCommitRights()); @@ -852,7 +872,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) - .getValue(); + .getValue() + .batch; auto batch2 = _bucketCatalog ->insert(_makeOperationContext().second.get(), @@ -861,7 +882,8 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { _getTimeseriesOptions(_ns1), BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) - .getValue(); + .getValue() + .batch; // Batch 2 is the first batch to commit the time field. ASSERT(batch2->claimCommitRights()); diff --git a/src/mongo/db/timeseries/bucket_compression.cpp b/src/mongo/db/timeseries/bucket_compression.cpp new file mode 100644 index 00000000000..06f65f5252e --- /dev/null +++ b/src/mongo/db/timeseries/bucket_compression.cpp @@ -0,0 +1,203 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage + +#include "mongo/db/timeseries/bucket_compression.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bsoncolumnbuilder.h" +#include "mongo/db/timeseries/timeseries_constants.h" +#include "mongo/logv2/log.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +namespace timeseries { + +boost::optional<BSONObj> compressBucket(const BSONObj& bucketDoc, StringData timeFieldName) try { + // Helper for uncompressed measurements + struct Measurement { + BSONElement timeField; + std::vector<BSONElement> dataFields; + }; + + BSONObjBuilder builder; // builder to build the compressed bucket + std::vector<Measurement> measurements; // Extracted measurements from uncompressed bucket + boost::optional<BSONObjIterator> time; // Iterator to read time fields from uncompressed bucket + std::vector<std::pair<StringData, BSONObjIterator>> + columns; // Iterators to read data fields from uncompressed bucket + + // Read everything from the uncompressed bucket + for (auto& elem : bucketDoc) { + // Control field is left as-is except for the version field. + if (elem.fieldNameStringData() == kBucketControlFieldName) { + BSONObjBuilder control(builder.subobjStart(kBucketControlFieldName)); + + // Set right version, leave other control fields unchanged + bool versionSet = false; + for (const auto& controlField : elem.Obj()) { + if (controlField.fieldNameStringData() == kBucketControlVersionFieldName) { + control.append(kBucketControlVersionFieldName, + kTimeseriesControlCompressedVersion); + versionSet = true; + } else { + control.append(controlField); + } + } + + // Set version if it was missing from uncompressed bucket + if (!versionSet) { + control.append(kBucketControlVersionFieldName, kTimeseriesControlCompressedVersion); + } + + continue; + } + + // Everything that's not under data or control is left as-is + if (elem.fieldNameStringData() != kBucketDataFieldName) { + // Skip any updates to non-data fields. + builder.append(elem); + continue; + } + + // Setup iterators to read all fields under data in lock-step + for (auto& columnObj : elem.Obj()) { + if (columnObj.fieldNameStringData() == timeFieldName) { + time.emplace(columnObj.Obj()); + } else { + columns.emplace_back(columnObj.fieldNameStringData(), columnObj.Obj()); + } + } + } + + // If provided time field didn't exist then there is nothing to do + if (!time) { + return boost::none; + } + + // Read all measurements from bucket + while (time->more()) { + // Get and advance the time iterator + auto timeElement = time->next(); + + // Get BSONElement's to all data elements. Missing data fields are represented as EOO. + Measurement measurement; + measurement.timeField = timeElement; + measurement.dataFields.resize(columns.size()); + + // Read one element from each data field iterator + for (size_t i = 0; i < columns.size(); ++i) { + auto& column = columns[i].second; + // If we reach the end nothing more to do, all subsequent elements will be left as + // EOO/missing. + if (!column.more()) { + continue; + } + + // Check that the field name match the name of the time field. Field names are + // strings of integer indexes, "0", "1" etc. Data fields may have missing entries + // where the time field may not. So we can use this fact and just do a simple string + // compare against the time field. If it does not match our data field is skipped + // and the iterator is positioned at an element with a higher index. We should then + // leave the data field as EOO and not advance the data iterator. + auto elem = *column; + if (timeElement.fieldNameStringData() == elem.fieldNameStringData()) { + // Extract the element and advance the iterator + measurement.dataFields[i] = elem; + column.advance(elem); + } + } + + measurements.push_back(std::move(measurement)); + } + + // Verify that we are at end for all data iterators, if we are not then there is something + // funky with the bucket and we have not read everything. We cannot compress as that would + // lose user data. + // This can happen if the bucket contain unordered keys in its data fields {"0": ..., "2": + // ..., "1": ...}. Or if there are more data fields than time fields. + if (std::any_of(columns.begin(), columns.end(), [](const auto& entry) { + return entry.second.more(); + })) { + LOGV2_DEBUG(5857801, + 1, + "Failed to parse timeseries bucket during compression, leaving uncompressed"); + return boost::none; + } + + // Sort all the measurements on time order. + std::sort(measurements.begin(), + measurements.end(), + [](const Measurement& lhs, const Measurement& rhs) { + return lhs.timeField.timestamp() < rhs.timeField.timestamp(); + }); + + // Last, compress elements and build compressed bucket + { + BSONObjBuilder dataBuilder = builder.subobjStart(kBucketDataFieldName); + BufBuilder columnBuffer; // Reusable buffer to avoid extra allocs per column. + + // Add compressed time field first + { + BSONColumnBuilder timeColumn(timeFieldName, std::move(columnBuffer)); + for (const auto& measurement : measurements) { + timeColumn.append(measurement.timeField); + } + dataBuilder.append(timeFieldName, timeColumn.finalize()); + columnBuffer = timeColumn.detach(); + } + + // Then add compressed data fields. + for (size_t i = 0; i < columns.size(); ++i) { + BSONColumnBuilder column(columns[i].first, std::move(columnBuffer)); + for (const auto& measurement : measurements) { + if (auto elem = measurement.dataFields[i]) { + column.append(elem); + } else { + column.skip(); + } + } + dataBuilder.append(column.fieldName(), column.finalize()); + columnBuffer = column.detach(); + } + } + + return builder.obj(); +} catch (...) { + // Skip compression if we encounter any exception + LOGV2_DEBUG(5857800, + 1, + "Exception when compressing timeseries bucket, leaving it uncompressed", + "error"_attr = exceptionToStatus()); + return boost::none; +} + +} // namespace timeseries +} // namespace mongo diff --git a/src/mongo/db/timeseries/bucket_compression.h b/src/mongo/db/timeseries/bucket_compression.h new file mode 100644 index 00000000000..f28360bf786 --- /dev/null +++ b/src/mongo/db/timeseries/bucket_compression.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" + +namespace mongo { + +namespace timeseries { + +/** + * Returns a compressed timeseries bucket in v2 format for a given uncompressed v1 bucket and time + * field. The compressed bucket will have all measurements sorted by time. + * + * If bucket compression is not possible for any reason, boost::none is returned. + */ +boost::optional<BSONObj> compressBucket(const BSONObj& bucketDoc, StringData timeFieldName); + +} // namespace timeseries +} // namespace mongo diff --git a/src/mongo/db/timeseries/timeseries.idl b/src/mongo/db/timeseries/timeseries.idl index c97e9cf1887..ba01d4e7121 100644 --- a/src/mongo/db/timeseries/timeseries.idl +++ b/src/mongo/db/timeseries/timeseries.idl @@ -54,6 +54,13 @@ server_parameters: cpp_varname: "gTimeseriesIdleBucketExpiryMemoryUsageThreshold" default: 104857600 # 100MB validator: { gte: 1 } + "timeseriesIdleBucketExpiryMaxCountPerAttempt": + description: "The maximum number of buckets that may be closed due to expiry at each attempt" + set_at: [ startup ] + cpp_vartype: "std::int32_t" + cpp_varname: "gTimeseriesIdleBucketExpiryMaxCountPerAttempt" + default: 3 + validator: { gte: 2 } enums: BucketGranularity: diff --git a/src/mongo/db/update/SConscript b/src/mongo/db/update/SConscript index 500fdde1adc..a8f0bd1dc2b 100644 --- a/src/mongo/db/update/SConscript +++ b/src/mongo/db/update/SConscript @@ -59,6 +59,7 @@ env.Library( source=[ 'delta_executor.cpp', 'object_replace_executor.cpp', + 'object_transform_executor.cpp', 'pipeline_executor.cpp', ], LIBDEPS=[ @@ -125,6 +126,7 @@ env.CppUnitTest( 'field_checker_test.cpp', 'modifier_table_test.cpp', 'object_replace_executor_test.cpp', + 'object_transform_executor_test.cpp', 'path_support_test.cpp', 'pipeline_executor_test.cpp', 'pop_node_test.cpp', diff --git a/src/mongo/db/update/object_transform_executor.cpp b/src/mongo/db/update/object_transform_executor.cpp new file mode 100644 index 00000000000..597b7dbb335 --- /dev/null +++ b/src/mongo/db/update/object_transform_executor.cpp @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/update/object_transform_executor.h" + +#include "mongo/db/update/object_replace_executor.h" +#include "mongo/db/update/storage_validation.h" +#include "mongo/db/update/update_oplog_entry_serialization.h" + +namespace mongo { + +ObjectTransformExecutor::ObjectTransformExecutor(TransformFunc transformFunc) + : _transformFunc(std::move(transformFunc)) {} + +UpdateExecutor::ApplyResult ObjectTransformExecutor::applyTransformUpdate( + ApplyParams applyParams, const TransformFunc& transformFunc) { + + auto originalDoc = applyParams.element.getDocument().getObject(); + auto transformedDoc = transformFunc(originalDoc); + if (!transformedDoc) { + return ApplyResult::noopResult(); + } + + return ObjectReplaceExecutor::applyReplacementUpdate( + std::move(applyParams), + *transformedDoc, + true /* replacementDocContainsIdField */, + true /* allowTopLevelDollarPrefixedFields */); +} + +UpdateExecutor::ApplyResult ObjectTransformExecutor::applyUpdate(ApplyParams applyParams) const { + auto ret = applyTransformUpdate(applyParams, _transformFunc); + + if (!ret.noop && applyParams.logMode != ApplyParams::LogMode::kDoNotGenerateOplogEntry) { + // Using a full replacement oplog entry as the current use case is doing major changes. + // Consider supporting v:2 update in the future. + ret.oplogEntry = update_oplog_entry::makeReplacementOplogEntry( + applyParams.element.getDocument().getObject()); + } + return ret; +} + +Value ObjectTransformExecutor::serialize() const { + MONGO_UNREACHABLE_TASSERT(5857810); +} +} // namespace mongo diff --git a/src/mongo/db/update/object_transform_executor.h b/src/mongo/db/update/object_transform_executor.h new file mode 100644 index 00000000000..aee80ac80b7 --- /dev/null +++ b/src/mongo/db/update/object_transform_executor.h @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <functional> + +#include "mongo/db/update/update_executor.h" + +namespace mongo { + +/** + * An UpdateExecutor representing a transform-style update. + * + * A transform-style update is similar to a replacement-style update with the difference that the + * replacement document is calculated from the original document via a user-provided function. In + * case of a WriteConflict or similar where the update needs to be retried the output document will + * be re-calculated. + * + * The replacement function MUST preserve the _id element from the original document. + * + * If the replacement function returns boost::none the update operation will be considered a no-op. + */ +class ObjectTransformExecutor : public UpdateExecutor { +public: + using TransformFunc = std::function<boost::optional<BSONObj>(const BSONObj&)>; + + /** + * Applies a transform style update to 'applyParams.element'. + * + * The transform function MUST preserve the _id element in the original document. + * + * This function will ignore the log mode provided in 'applyParams'. The 'oplogEntry' field + * of the returned ApplyResult is always empty. + */ + static ApplyResult applyTransformUpdate(ApplyParams applyParams, + const TransformFunc& transformFunc); + + /** + * Initializes the node with the transform function to apply to the original document. + */ + explicit ObjectTransformExecutor(TransformFunc transform); + + /** + * Replaces the document that 'applyParams.element' belongs to with result of transform + * function. 'applyParams.element' must be the root of the document. Always returns a result + * stating that indexes are affected when the replacement is not a noop. + */ + ApplyResult applyUpdate(ApplyParams applyParams) const final; + + Value serialize() const final; + +private: + TransformFunc _transformFunc; +}; + +} // namespace mongo diff --git a/src/mongo/db/update/object_transform_executor_test.cpp b/src/mongo/db/update/object_transform_executor_test.cpp new file mode 100644 index 00000000000..06cca4da7cd --- /dev/null +++ b/src/mongo/db/update/object_transform_executor_test.cpp @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/update/object_transform_executor.h" + +#include "mongo/bson/mutable/algorithm.h" +#include "mongo/bson/mutable/mutable_bson_test_utils.h" +#include "mongo/db/json.h" +#include "mongo/db/update/update_node_test_fixture.h" +#include "mongo/idl/server_parameter_test_util.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { +using ObjectTransformExecutorTest = UpdateNodeTest; +using mongo::mutablebson::countChildren; +using mongo::mutablebson::Element; + +TEST_F(ObjectTransformExecutorTest, Noop) { + BSONObj input = fromjson("{a: 1, b: 2}"); + + ObjectTransformExecutor node([&input](const BSONObj& pre) { + ASSERT_BSONOBJ_BINARY_EQ(pre, input); + return pre; + }); + + mutablebson::Document doc(input); + auto result = node.applyUpdate(getApplyParams(doc.root())); + ASSERT_TRUE(result.noop); + ASSERT_FALSE(result.indexesAffected); + ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc); + ASSERT_TRUE(doc.isInPlaceModeEnabled()); + ASSERT_BSONOBJ_BINARY_EQ(fromjson("{}"), result.oplogEntry); +} + +TEST_F(ObjectTransformExecutorTest, NoneNoop) { + ObjectTransformExecutor node([](const BSONObj& pre) { return boost::none; }); + + mutablebson::Document doc(fromjson("{a: 1, b: 2}")); + auto result = node.applyUpdate(getApplyParams(doc.root())); + ASSERT_TRUE(result.noop); + ASSERT_FALSE(result.indexesAffected); + ASSERT_EQUALS(fromjson("{a: 1, b: 2}"), doc); + ASSERT_TRUE(doc.isInPlaceModeEnabled()); + ASSERT_BSONOBJ_BINARY_EQ(fromjson("{}"), result.oplogEntry); +} + +TEST_F(ObjectTransformExecutorTest, Replace) { + BSONObj from = fromjson("{a: 1, b: 2}"); + BSONObj to = fromjson("{a: 1, b: 3}"); + + ObjectTransformExecutor node([&from, &to](const BSONObj& pre) { + ASSERT_BSONOBJ_BINARY_EQ(pre, from); + return to; + }); + + mutablebson::Document doc(from); + auto result = node.applyUpdate(getApplyParams(doc.root())); + ASSERT_FALSE(result.noop); + ASSERT_TRUE(result.indexesAffected); + ASSERT_EQUALS(to, doc); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_BSONOBJ_BINARY_EQ(to, result.oplogEntry); +} + +TEST_F(ObjectTransformExecutorTest, ShouldSucceedWhenImmutableIdIsNotModified) { + auto obj = fromjson("{_id: 0, a: 1, b: 2}"); + auto to = fromjson("{_id: 0, c: 1, d: 2}"); + ObjectTransformExecutor node([&obj, &to](const BSONObj& pre) { + ASSERT_BSONOBJ_BINARY_EQ(pre, obj); + return to; + }); + + mutablebson::Document doc(obj); + addImmutablePath("_id"); + auto result = node.applyUpdate(getApplyParams(doc.root())); + ASSERT_FALSE(result.noop); + ASSERT_TRUE(result.indexesAffected); + ASSERT_EQUALS(to, doc); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_BSONOBJ_BINARY_EQ(to, result.oplogEntry); +} + +TEST_F(ObjectTransformExecutorTest, CannotRemoveImmutablePath) { + auto from = fromjson("{_id: 0, a: {b: 1}}"); + auto obj = fromjson("{_id: 0, c: 1}"); + ObjectTransformExecutor node([&from, &obj](const BSONObj& pre) { + ASSERT_BSONOBJ_BINARY_EQ(pre, from); + return obj; + }); + + mutablebson::Document doc(from); + addImmutablePath("a.b"); + ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())), + AssertionException, + ErrorCodes::ImmutableField, + "After applying the update, the 'a.b' (required and immutable) " + "field was found to have been removed --{ _id: 0, a: { b: 1 } }"); +} + +TEST_F(ObjectTransformExecutorTest, CannotReplaceImmutablePathWithArrayField) { + auto obj = fromjson("{_id: 0, a: [{b: 1}]}"); + ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; }); + + mutablebson::Document doc(fromjson("{_id: 0, a: {b: 1}}")); + addImmutablePath("a.b"); + ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())), + AssertionException, + ErrorCodes::NotSingleValueField, + "After applying the update to the document, the (immutable) field " + "'a.b' was found to be an array or array descendant."); +} + +TEST_F(ObjectTransformExecutorTest, CannotMakeImmutablePathArrayDescendant) { + auto obj = fromjson("{_id: 0, a: [1]}"); + ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; }); + + mutablebson::Document doc(fromjson("{_id: 0, a: {'0': 1}}")); + addImmutablePath("a.0"); + ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())), + AssertionException, + ErrorCodes::NotSingleValueField, + "After applying the update to the document, the (immutable) field " + "'a.0' was found to be an array or array descendant."); +} + +TEST_F(ObjectTransformExecutorTest, CannotModifyImmutablePath) { + auto obj = fromjson("{_id: 0, a: {b: 2}}"); + ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; }); + + mutablebson::Document doc(fromjson("{_id: 0, a: {b: 1}}")); + addImmutablePath("a.b"); + ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())), + AssertionException, + ErrorCodes::ImmutableField, + "After applying the update, the (immutable) field 'a.b' was found " + "to have been altered to b: 2"); +} + +TEST_F(ObjectTransformExecutorTest, CannotModifyImmutableId) { + auto obj = fromjson("{_id: 1}"); + ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; }); + + mutablebson::Document doc(fromjson("{_id: 0}")); + addImmutablePath("_id"); + ASSERT_THROWS_CODE_AND_WHAT(node.applyUpdate(getApplyParams(doc.root())), + AssertionException, + ErrorCodes::ImmutableField, + "After applying the update, the (immutable) field '_id' was found " + "to have been altered to _id: 1"); +} + +TEST_F(ObjectTransformExecutorTest, CanAddImmutableField) { + auto obj = fromjson("{a: {b: 1}}"); + ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; }); + + mutablebson::Document doc(fromjson("{c: 1}")); + addImmutablePath("a.b"); + auto result = node.applyUpdate(getApplyParams(doc.root())); + ASSERT_FALSE(result.noop); + ASSERT_TRUE(result.indexesAffected); + ASSERT_EQUALS(fromjson("{a: {b: 1}}"), doc); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_BSONOBJ_BINARY_EQ(fromjson("{a: {b: 1}}"), result.oplogEntry); +} + +TEST_F(ObjectTransformExecutorTest, CanAddImmutableId) { + auto obj = fromjson("{_id: 0}"); + ObjectTransformExecutor node([&obj](const BSONObj& pre) { return obj; }); + + mutablebson::Document doc(fromjson("{c: 1}")); + addImmutablePath("_id"); + auto result = node.applyUpdate(getApplyParams(doc.root())); + ASSERT_FALSE(result.noop); + ASSERT_TRUE(result.indexesAffected); + ASSERT_EQUALS(fromjson("{_id: 0}"), doc); + ASSERT_FALSE(doc.isInPlaceModeEnabled()); + ASSERT_BSONOBJ_BINARY_EQ(fromjson("{_id: 0}"), result.oplogEntry); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/update/update_driver.cpp b/src/mongo/db/update/update_driver.cpp index 1826267443d..2dd809aa99b 100644 --- a/src/mongo/db/update/update_driver.cpp +++ b/src/mongo/db/update/update_driver.cpp @@ -43,6 +43,7 @@ #include "mongo/db/update/delta_executor.h" #include "mongo/db/update/modifier_table.h" #include "mongo/db/update/object_replace_executor.h" +#include "mongo/db/update/object_transform_executor.h" #include "mongo/db/update/path_support.h" #include "mongo/db/update/storage_validation.h" #include "mongo/db/update/update_oplog_entry_version.h" @@ -165,6 +166,18 @@ void UpdateDriver::parse( return; } + if (updateMod.type() == write_ops::UpdateModification::Type::kTransform) { + uassert(5857811, "multi update is not supported for transform-style update", !multi); + + uassert(5857812, + "arrayFilters may not be specified for transform-syle updates", + arrayFilters.empty()); + + _updateType = UpdateType::kTransform; + _updateExecutor = std::make_unique<ObjectTransformExecutor>(updateMod.getTransform()); + return; + } + invariant(_updateType == UpdateType::kOperator); // By this point we are expecting a "classic" update. This version of mongod only supports $v: diff --git a/src/mongo/db/update/update_driver.h b/src/mongo/db/update/update_driver.h index 18cce2c38b8..96f34b4cfaf 100644 --- a/src/mongo/db/update/update_driver.h +++ b/src/mongo/db/update/update_driver.h @@ -56,7 +56,7 @@ class OperationContext; class UpdateDriver { public: - enum class UpdateType { kOperator, kReplacement, kPipeline, kDelta }; + enum class UpdateType { kOperator, kReplacement, kPipeline, kDelta, kTransform }; UpdateDriver(const boost::intrusive_ptr<ExpressionContext>& expCtx); diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp index 79a20cdfc2d..88f5d685df0 100644 --- a/src/mongo/shell/bench.cpp +++ b/src/mongo/shell/bench.cpp @@ -1193,6 +1193,9 @@ void BenchRunOp::executeOnce(DBClientBase* conn, // Delta updates are only executed on secondaries as part of oplog // application. MONGO_UNREACHABLE; + case write_ops::UpdateModification::Type::kTransform: + // It is not possible to run a transform update directly from a client. + MONGO_UNREACHABLE; } singleUpdate.append("multi", this->multi); singleUpdate.append("upsert", this->upsert); |