diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2022-10-17 13:34:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-17 14:05:03 +0000 |
commit | a749c7f391420dc339b3a873568d635dd729630f (patch) | |
tree | c5bdbe0c07ca9ad508b6fbb6b22b21d0fb2f2686 | |
parent | e180e7292c9310b449ac21c6e63254eb1be39407 (diff) | |
download | mongo-a749c7f391420dc339b3a873568d635dd729630f.tar.gz |
SERVER-70504 Handle transform updates in performAtomicTimeseriesWrites
-rw-r--r-- | src/mongo/db/ops/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec_test.cpp | 112 |
3 files changed, 140 insertions, 6 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index be406be81d0..695633c9342 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -114,15 +114,20 @@ env.Library( env.CppUnitTest( target='db_ops_test', source=[ + 'write_ops_exec_test.cpp', 'write_ops_parsers_test.cpp', 'write_ops_retryability_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/catalog/catalog_test_fixture', + '$BUILD_DIR/mongo/db/catalog/collection_crud', '$BUILD_DIR/mongo/db/query_exec', + '$BUILD_DIR/mongo/db/record_id_helpers', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', '$BUILD_DIR/mongo/db/repl/oplog_entry', '$BUILD_DIR/mongo/db/session/session_catalog', + '$BUILD_DIR/mongo/db/timeseries/bucket_compression', '$BUILD_DIR/mongo/db/transaction/transaction', 'write_ops', 'write_ops_exec', diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 1fbd221c707..0b299c38580 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -1421,20 +1421,37 @@ Status performAtomicTimeseriesWrites( auto recordId = record_id_helpers::keyForOID(update.getQ()["_id"].OID()); auto original = coll->docFor(opCtx, recordId); - auto [updated, indexesAffected] = - doc_diff::applyDiff(original.value(), - update.getU().getDiff(), - &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx), - static_cast<bool>(repl::tenantMigrationInfo(opCtx))); CollectionUpdateArgs args; if (const auto& stmtIds = op.getStmtIds()) { args.stmtIds = *stmtIds; } args.preImageDoc = original.value(); - args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff()); args.criteria = update.getQ(); args.source = OperationSource::kTimeseriesInsert; + + BSONObj updated; + bool indexesAffected = true; + if (update.getU().type() == write_ops::UpdateModification::Type::kDelta) { + auto result = doc_diff::applyDiff(original.value(), + update.getU().getDiff(), + &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx), + static_cast<bool>(repl::tenantMigrationInfo(opCtx))); + updated = result.postImage; + indexesAffected = result.indexesAffected; + args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff()); + } else if (update.getU().type() == write_ops::UpdateModification::Type::kTransform) { + const auto& transform = update.getU().getTransform(); + auto transformed = transform(original.value()); + tassert(7050400, + "Could not apply transformation to time series bucket document", + transformed.has_value()); + updated = std::move(transformed.value()); + args.update = update_oplog_entry::makeReplacementOplogEntry(updated); + } else { + invariant(false, "Unexpected update type"); + } + if (slot) { args.oplogSlots = {**slot}; fassert(5481600, diff --git a/src/mongo/db/ops/write_ops_exec_test.cpp b/src/mongo/db/ops/write_ops_exec_test.cpp new file mode 100644 index 00000000000..9c4caac330d --- /dev/null +++ b/src/mongo/db/ops/write_ops_exec_test.cpp @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/bson/unordered_fields_bsonobj_comparator.h" +#include "mongo/db/catalog/catalog_test_fixture.h" +#include "mongo/db/catalog/collection_write_path.h" +#include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/timeseries/bucket_compression.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class WriteOpsExecTest : public CatalogTestFixture { +protected: + using CatalogTestFixture::setUp; +}; + +TEST_F(WriteOpsExecTest, PerformAtomicTimeseriesWritesWithTransform) { + NamespaceString ns{"db_write_ops_exec_test", "ts"}; + auto opCtx = operationContext(); + ASSERT_OK(createCollection(opCtx, + ns.dbName(), + BSON("create" << ns.coll() << "timeseries" + << BSON("timeField" + << "time")))); + + // We're going to insert a compressed bucket and ensure we can successfully decompress it via a + // transform update using performAtomicTimeseriesWrites. + const BSONObj bucketDoc = ::mongo::fromjson( + R"({"_id":{"$oid":"629e1e680958e279dc29a517"}, + "control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1}, + "max":{"time":{"$date":"2022-06-06T15:34:30.000Z"},"a":3,"b":3}}, + "data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"}, + "1":{"$date":"2022-06-06T15:34:30.000Z"}, + "2":{"$date":"2022-06-06T15:34:30.000Z"}}, + "a":{"0":1,"1":2,"2":3}, + "b":{"0":1,"1":2,"2":3}}})"); + OID bucketId = OID::createFromString("629e1e680958e279dc29a517"_sd); + auto compressionResult = timeseries::compressBucket(bucketDoc, "time", ns, true, false); + ASSERT_TRUE(compressionResult.compressedBucket.has_value()); + const BSONObj compressedBucket = compressionResult.compressedBucket.value(); + + // Insert the compressed bucket. + AutoGetCollection bucketsColl(opCtx, ns.makeTimeseriesBucketsNamespace(), LockMode::MODE_IX); + { + WriteUnitOfWork wunit{opCtx}; + ASSERT_OK(collection_internal::insertDocument( + opCtx, *bucketsColl, InsertStatement{compressedBucket}, nullptr)); + wunit.commit(); + } + + // Decompress via transform. + { + auto bucketDecompressionFunc = [&](const BSONObj& bucketDoc) -> boost::optional<BSONObj> { + return timeseries::decompressBucket(bucketDoc); + }; + + + write_ops::UpdateModification u(std::move(bucketDecompressionFunc)); + write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u)); + write_ops::UpdateCommandRequest op(ns.makeTimeseriesBucketsNamespace(), {update}); + + write_ops::WriteCommandRequestBase base; + base.setBypassDocumentValidation(true); + base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId}); + + op.setWriteCommandRequestBase(std::move(base)); + + ASSERT_OK(write_ops_exec::performAtomicTimeseriesWrites(opCtx, {}, {op})); + } + + // Check the document is actually decompressed on disk. + { + auto recordId = record_id_helpers::keyForOID(bucketId); + auto retrievedBucket = bucketsColl->docFor(opCtx, recordId); + + UnorderedFieldsBSONObjComparator comparator; + ASSERT_EQ(0, comparator.compare(retrievedBucket.value(), bucketDoc)); + } +} + +} // namespace +} // namespace mongo |