summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2022-10-17 13:34:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-17 14:05:03 +0000
commita749c7f391420dc339b3a873568d635dd729630f (patch)
treec5bdbe0c07ca9ad508b6fbb6b22b21d0fb2f2686
parente180e7292c9310b449ac21c6e63254eb1be39407 (diff)
downloadmongo-a749c7f391420dc339b3a873568d635dd729630f.tar.gz
SERVER-70504 Handle transform updates in performAtomicTimeseriesWrites
-rw-r--r--src/mongo/db/ops/SConscript5
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp29
-rw-r--r--src/mongo/db/ops/write_ops_exec_test.cpp112
3 files changed, 140 insertions, 6 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index be406be81d0..695633c9342 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -114,15 +114,20 @@ env.Library(
env.CppUnitTest(
target='db_ops_test',
source=[
+ 'write_ops_exec_test.cpp',
'write_ops_parsers_test.cpp',
'write_ops_retryability_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/catalog/catalog_test_fixture',
+ '$BUILD_DIR/mongo/db/catalog/collection_crud',
'$BUILD_DIR/mongo/db/query_exec',
+ '$BUILD_DIR/mongo/db/record_id_helpers',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
'$BUILD_DIR/mongo/db/session/session_catalog',
+ '$BUILD_DIR/mongo/db/timeseries/bucket_compression',
'$BUILD_DIR/mongo/db/transaction/transaction',
'write_ops',
'write_ops_exec',
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 1fbd221c707..0b299c38580 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -1421,20 +1421,37 @@ Status performAtomicTimeseriesWrites(
auto recordId = record_id_helpers::keyForOID(update.getQ()["_id"].OID());
auto original = coll->docFor(opCtx, recordId);
- auto [updated, indexesAffected] =
- doc_diff::applyDiff(original.value(),
- update.getU().getDiff(),
- &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx),
- static_cast<bool>(repl::tenantMigrationInfo(opCtx)));
CollectionUpdateArgs args;
if (const auto& stmtIds = op.getStmtIds()) {
args.stmtIds = *stmtIds;
}
args.preImageDoc = original.value();
- args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff());
args.criteria = update.getQ();
args.source = OperationSource::kTimeseriesInsert;
+
+ BSONObj updated;
+ bool indexesAffected = true;
+ if (update.getU().type() == write_ops::UpdateModification::Type::kDelta) {
+ auto result = doc_diff::applyDiff(original.value(),
+ update.getU().getDiff(),
+ &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx),
+ static_cast<bool>(repl::tenantMigrationInfo(opCtx)));
+ updated = result.postImage;
+ indexesAffected = result.indexesAffected;
+ args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff());
+ } else if (update.getU().type() == write_ops::UpdateModification::Type::kTransform) {
+ const auto& transform = update.getU().getTransform();
+ auto transformed = transform(original.value());
+ tassert(7050400,
+ "Could not apply transformation to time series bucket document",
+ transformed.has_value());
+ updated = std::move(transformed.value());
+ args.update = update_oplog_entry::makeReplacementOplogEntry(updated);
+ } else {
+ invariant(false, "Unexpected update type");
+ }
+
if (slot) {
args.oplogSlots = {**slot};
fassert(5481600,
diff --git a/src/mongo/db/ops/write_ops_exec_test.cpp b/src/mongo/db/ops/write_ops_exec_test.cpp
new file mode 100644
index 00000000000..9c4caac330d
--- /dev/null
+++ b/src/mongo/db/ops/write_ops_exec_test.cpp
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/bson/unordered_fields_bsonobj_comparator.h"
+#include "mongo/db/catalog/catalog_test_fixture.h"
+#include "mongo/db/catalog/collection_write_path.h"
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/timeseries/bucket_compression.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class WriteOpsExecTest : public CatalogTestFixture {
+protected:
+ using CatalogTestFixture::setUp;
+};
+
+TEST_F(WriteOpsExecTest, PerformAtomicTimeseriesWritesWithTransform) {
+ NamespaceString ns{"db_write_ops_exec_test", "ts"};
+ auto opCtx = operationContext();
+ ASSERT_OK(createCollection(opCtx,
+ ns.dbName(),
+ BSON("create" << ns.coll() << "timeseries"
+ << BSON("timeField"
+ << "time"))));
+
+ // We're going to insert a compressed bucket and ensure we can successfully decompress it via a
+ // transform update using performAtomicTimeseriesWrites.
+ const BSONObj bucketDoc = ::mongo::fromjson(
+ R"({"_id":{"$oid":"629e1e680958e279dc29a517"},
+ "control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1},
+ "max":{"time":{"$date":"2022-06-06T15:34:30.000Z"},"a":3,"b":3}},
+ "data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"},
+ "1":{"$date":"2022-06-06T15:34:30.000Z"},
+ "2":{"$date":"2022-06-06T15:34:30.000Z"}},
+ "a":{"0":1,"1":2,"2":3},
+ "b":{"0":1,"1":2,"2":3}}})");
+ OID bucketId = OID::createFromString("629e1e680958e279dc29a517"_sd);
+ auto compressionResult = timeseries::compressBucket(bucketDoc, "time", ns, true, false);
+ ASSERT_TRUE(compressionResult.compressedBucket.has_value());
+ const BSONObj compressedBucket = compressionResult.compressedBucket.value();
+
+ // Insert the compressed bucket.
+ AutoGetCollection bucketsColl(opCtx, ns.makeTimeseriesBucketsNamespace(), LockMode::MODE_IX);
+ {
+ WriteUnitOfWork wunit{opCtx};
+ ASSERT_OK(collection_internal::insertDocument(
+ opCtx, *bucketsColl, InsertStatement{compressedBucket}, nullptr));
+ wunit.commit();
+ }
+
+ // Decompress via transform.
+ {
+ auto bucketDecompressionFunc = [&](const BSONObj& bucketDoc) -> boost::optional<BSONObj> {
+ return timeseries::decompressBucket(bucketDoc);
+ };
+
+
+ write_ops::UpdateModification u(std::move(bucketDecompressionFunc));
+ write_ops::UpdateOpEntry update(BSON("_id" << bucketId), std::move(u));
+ write_ops::UpdateCommandRequest op(ns.makeTimeseriesBucketsNamespace(), {update});
+
+ write_ops::WriteCommandRequestBase base;
+ base.setBypassDocumentValidation(true);
+ base.setStmtIds(std::vector<StmtId>{kUninitializedStmtId});
+
+ op.setWriteCommandRequestBase(std::move(base));
+
+ ASSERT_OK(write_ops_exec::performAtomicTimeseriesWrites(opCtx, {}, {op}));
+ }
+
+ // Check the document is actually decompressed on disk.
+ {
+ auto recordId = record_id_helpers::keyForOID(bucketId);
+ auto retrievedBucket = bucketsColl->docFor(opCtx, recordId);
+
+ UnorderedFieldsBSONObjComparator comparator;
+ ASSERT_EQ(0, comparator.compare(retrievedBucket.value(), bucketDoc));
+ }
+}
+
+} // namespace
+} // namespace mongo