From 7fca187deec6b2b28339bbb8613a9f8809ab07c8 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Thu, 15 Sep 2022 15:47:31 -0400 Subject: SERVER-68860 add TransactionOperations --- src/mongo/db/transaction/SConscript | 2 + .../db/transaction/transaction_operations.cpp | 116 ++++++++++++++ src/mongo/db/transaction/transaction_operations.h | 123 +++++++++++++++ .../db/transaction/transaction_operations_test.cpp | 175 +++++++++++++++++++++ 4 files changed, 416 insertions(+) create mode 100644 src/mongo/db/transaction/transaction_operations.cpp create mode 100644 src/mongo/db/transaction/transaction_operations.h create mode 100644 src/mongo/db/transaction/transaction_operations_test.cpp diff --git a/src/mongo/db/transaction/SConscript b/src/mongo/db/transaction/SConscript index e119e0616dd..1140cfaf546 100644 --- a/src/mongo/db/transaction/SConscript +++ b/src/mongo/db/transaction/SConscript @@ -16,6 +16,7 @@ env.Library( 'transaction_participant.cpp', 'transaction_participant_resource_yielder.cpp', 'internal_transactions_reap_service.idl', + 'transaction_operations.cpp', 'transaction_participant.idl', 'transactions_stats.idl', ], @@ -81,6 +82,7 @@ env.CppUnitTest( 'internal_transactions_reap_service_test.cpp', 'transaction_api_test.cpp', 'transaction_history_iterator_test.cpp', + 'transaction_operations_test.cpp', 'transaction_participant_retryable_writes_test.cpp', 'transaction_participant_test.cpp', ], diff --git a/src/mongo/db/transaction/transaction_operations.cpp b/src/mongo/db/transaction/transaction_operations.cpp new file mode 100644 index 00000000000..4dd72641184 --- /dev/null +++ b/src/mongo/db/transaction/transaction_operations.cpp @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/transaction/transaction_operations.h" + +#include + +namespace mongo { + +bool TransactionOperations::isEmpty() const { + return _transactionOperations.empty(); +} + +std::size_t TransactionOperations::numOperations() const { + return _transactionOperations.size(); +} + +std::size_t TransactionOperations::getTotalOperationBytes() const { + return _totalOperationBytes; +} + +std::size_t TransactionOperations::getNumberOfPrePostImagesToWrite() const { + return _numberOfPrePostImagesToWrite; +} + +void TransactionOperations::clear() { + _transactionOperations.clear(); + _transactionStmtIds.clear(); + _totalOperationBytes = 0; + _numberOfPrePostImagesToWrite = 0; +} + +Status TransactionOperations::addOperation(const TransactionOperation& operation, + boost::optional transactionSizeLimitBytes) { + auto stmtIdsToInsert = operation.getStatementIds(); + auto newTransactionStmtIds = _transactionStmtIds; + for (auto stmtId : stmtIdsToInsert) { + auto [_, inserted] = newTransactionStmtIds.insert(stmtId); + if (inserted) { + continue; + } + return Status(static_cast(5875600), + fmt::format("Found two operations using the same stmtId of {}", stmtId)); + } + + auto opSize = repl::DurableOplogEntry::getDurableReplOperationSize(operation); + + // The pre-image size is always added to the collection transaction size, but + // there are additional conditions for adding the pre-image to + // '_numberOfPrePostImagesToWrite'. See SERVER-59694. + std::size_t numberOfPrePostImagesToWrite = 0; + if (const auto& preImage = operation.getPreImage(); !preImage.isEmpty()) { + opSize += static_cast(preImage.objsize()); + if (operation.isPreImageRecordedForRetryableInternalTransaction()) { + numberOfPrePostImagesToWrite++; + } + } + + // The post-image, if present, is always included in the size and pre/post image counters. + if (const auto& postImage = operation.getPostImage(); !postImage.isEmpty()) { + opSize += operation.getPostImage().objsize(); + numberOfPrePostImagesToWrite++; + } + + if (transactionSizeLimitBytes && (_totalOperationBytes + opSize) > *transactionSizeLimitBytes) { + return Status(ErrorCodes::TransactionTooLarge, + fmt::format("Total size of all transaction operations must be less than " + "server parameter 'transactionSizeLimitBytes' = {}", + *transactionSizeLimitBytes)); + } + + _transactionOperations.push_back(operation); + _transactionStmtIds = std::move(newTransactionStmtIds); + _totalOperationBytes += opSize; + _numberOfPrePostImagesToWrite += numberOfPrePostImagesToWrite; + + return Status::OK(); +} + +std::vector* +TransactionOperations::getMutableOperationsForTransactionParticipant() { + return &_transactionOperations; +} + +std::vector +TransactionOperations::getOperationsForTest() const { + return _transactionOperations; +} + +} // namespace mongo diff --git a/src/mongo/db/transaction/transaction_operations.h b/src/mongo/db/transaction/transaction_operations.h new file mode 100644 index 00000000000..c0e936b0827 --- /dev/null +++ b/src/mongo/db/transaction/transaction_operations.h @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include +#include +#include + +#include "mongo/base/status.h" +#include "mongo/db/repl/oplog_entry.h" // for ReplOperation +#include "mongo/stdx/unordered_set.h" + +namespace mongo { + +/** + * Container for ReplOperation used in multi-doc transactions and batched writer context. + * Includes statistics on operations held in this container. + * Provides methods for exporting ReplOperations in one or more applyOps oplog entries. + * Concurrency control for this class is maintained by the TransactionParticipant. + */ +class TransactionOperations { +public: + using TransactionOperation = repl::ReplOperation; + + TransactionOperations() = default; + + /** + * Returns true if '_transactionsOperations' is empty. + */ + bool isEmpty() const; + + /** + * Returns number of items in '_transactionOperations'. + */ + std::size_t numOperations() const; + + /** + * Total size in bytes of all operations within the _transactionOperations vector. + * See DurableOplogEntry::getDurableReplOperationSize(). + */ + std::size_t getTotalOperationBytes() const; + + /** + * Returns number of operations that have pre-images or post-images to be written to + * noop oplog entries or the image collection. + */ + std::size_t getNumberOfPrePostImagesToWrite() const; + + /** + * Clears the operations stored in this container along with corresponding statistics. + */ + void clear(); + + /** + * Adds an operation to this container and updates relevant statistics. + * + * Ensures that statement ids in operation do not conflict with the operations + * already added. + * + * Ensures that total size of collected operations after adding operation does not + * exceed 'transactionSizeLimitBytes' (if provided). + */ + Status addOperation(const TransactionOperation& operation, + boost::optional transactionSizeLimitBytes = boost::none); + + /** + * Returns pointer to vector of operations for integrating with + * TransactionParticipant and OpObserver interfaces for multi-doc transactions. + * + * Caller assumes responsibility for keeping contents referenced by the pointer + * in sync with statistics maintained in this container. + */ + std::vector* getMutableOperationsForTransactionParticipant(); + + /** + * Returns copy of operations for TransactionParticipant testing. + */ + std::vector getOperationsForTest() const; + +private: + std::vector _transactionOperations; + + // Holds stmtIds for operations which have been applied in the current multi-document + // transaction. + stdx::unordered_set _transactionStmtIds; + + // Size of operations in _transactionOperations as calculated by + // DurableOplogEntry::getDurableReplOperationSize(). + std::size_t _totalOperationBytes{0}; + + // Number of operations that have pre-images or post-images to be written to noop oplog + // entries or the image collection. + std::size_t _numberOfPrePostImagesToWrite{0}; +}; + +} // namespace mongo diff --git a/src/mongo/db/transaction/transaction_operations_test.cpp b/src/mongo/db/transaction/transaction_operations_test.cpp new file mode 100644 index 00000000000..93e8303bbce --- /dev/null +++ b/src/mongo/db/transaction/transaction_operations_test.cpp @@ -0,0 +1,175 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/transaction/transaction_operations.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +TEST(TransactionOperationsTest, Basic) { + TransactionOperations ops; + ASSERT(ops.isEmpty()); + ASSERT_EQ(ops.numOperations(), 0); + ASSERT_EQ(ops.getTotalOperationBytes(), 0); + + TransactionOperations::TransactionOperation op; + auto opSize = repl::DurableOplogEntry::getDurableReplOperationSize(op); + ASSERT_GTE(opSize, 1U); + ASSERT_OK(ops.addOperation(op)); + ASSERT_FALSE(ops.isEmpty()); + ASSERT_EQ(ops.numOperations(), 1U); + + // Empty pre-images and post-images do not count towards operation size. + ASSERT(op.getPreImage().isEmpty()); + ASSERT(op.getPostImage().isEmpty()); + ASSERT_EQ(ops.getTotalOperationBytes(), opSize); + + // The getMutableOperationsForTransactionParticipant() method supports integration with + // existing TransactionParticipant usage and OpObserver interfaces. + auto* mutableOps = ops.getMutableOperationsForTransactionParticipant(); + ASSERT_EQ(mutableOps->size(), ops.numOperations()); + std::size_t mutableOpsTotalOperationBytes = 0; + for (const auto& mutableOp : *mutableOps) { + mutableOpsTotalOperationBytes += + repl::DurableOplogEntry::getDurableReplOperationSize(mutableOp); + } + ASSERT_EQ(mutableOpsTotalOperationBytes, ops.getTotalOperationBytes()); + + // Use clear() to reset container state. + ops.clear(); + ASSERT(ops.isEmpty()); + ASSERT_EQ(ops.numOperations(), 0); + ASSERT_EQ(ops.getTotalOperationBytes(), 0); +} + +TEST(TransactionOperationsTest, AddTransactionFailsOnDuplicateStatementIds) { + TransactionOperations::TransactionOperation op1; + std::vector stmtIds1 = {1, 2, 3}; + op1.setStatementIds(stdx::variant>(stmtIds1)); + + TransactionOperations::TransactionOperation op2; + std::vector stmtIds2 = {3, 4, 5}; + op2.setStatementIds(stdx::variant>(stmtIds2)); + + TransactionOperations ops; + ASSERT_OK(ops.addOperation(op1)); + ASSERT_EQ(static_cast(5875600), ops.addOperation(op2)); +} + +TEST(TransactionOperationsTest, AddTransactionIncludesPreImageStatistics) { + TransactionOperations ops; + + // The size of 'op1' is added to the total byte count but it does not have + // the additional criteria to be added to 'numberOfPrePostImages'. + // See SERVER-58694. + TransactionOperations::TransactionOperation op1; + op1.setPreImage(BSON("a" << 123)); + ASSERT_OK(ops.addOperation(op1)); + ASSERT_EQ(ops.getTotalOperationBytes(), + repl::DurableOplogEntry::getDurableReplOperationSize(op1) + + static_cast(op1.getPreImage().objsize())); + ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 0); + + // Set "pre-image for retryable writes" flag to include the pre-image in + // the pre/post image count. + TransactionOperations::TransactionOperation op3; + op3.setPreImage(BSON("c" << 123)); + op3.setPreImageRecordedForRetryableInternalTransaction(); + ASSERT_OK(ops.addOperation(op3)); + ASSERT_EQ(ops.getTotalOperationBytes(), + repl::DurableOplogEntry::getDurableReplOperationSize(op1) + + static_cast(op1.getPreImage().objsize()) + + repl::DurableOplogEntry::getDurableReplOperationSize(op3) + + static_cast(op3.getPreImage().objsize())); + ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 1U); + + // Pre/post image counter should be reset after clear(). + ops.clear(); + ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 0); +} + +TEST(TransactionOperationsTest, AddTransactionIncludesPostImageStatistics) { + TransactionOperations ops; + + TransactionOperations::TransactionOperation op1; + op1.setPostImage(BSON("a" << 123)); + ASSERT_OK(ops.addOperation(op1)); + ASSERT_EQ(ops.getTotalOperationBytes(), + repl::DurableOplogEntry::getDurableReplOperationSize(op1) + + static_cast(op1.getPostImage().objsize())); + ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 1U); + + // Pre/post image counter should be reset after clear(). + ops.clear(); + ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 0); +} + +TEST(TransactionOperationsTest, AddTransactionIncludesPreAndPostImageStatistics) { + TransactionOperations ops; + + TransactionOperations::TransactionOperation op1; + op1.setPreImage(BSON("a" << 123)); + op1.setPreImageRecordedForRetryableInternalTransaction(); + ASSERT_OK(ops.addOperation(op1)); + ASSERT_EQ(ops.getTotalOperationBytes(), + repl::DurableOplogEntry::getDurableReplOperationSize(op1) + + static_cast(op1.getPreImage().objsize())); + ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 1U); + + TransactionOperations::TransactionOperation op2; + op2.setPostImage(BSON("b" << 123)); + ASSERT_OK(ops.addOperation(op2)); + ASSERT_EQ(ops.getTotalOperationBytes(), + repl::DurableOplogEntry::getDurableReplOperationSize(op1) + + static_cast(op1.getPreImage().objsize()) + + repl::DurableOplogEntry::getDurableReplOperationSize(op2) + + static_cast(op2.getPostImage().objsize())); + ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 2U); + + // Pre/post image counter should be reset after clear(). + ops.clear(); + ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 0); +} + +TEST(TransactionOperationsTest, AddTransactionEnforceTotalOperationSizeLimit) { + TransactionOperations::TransactionOperation op1; + auto opSize1 = repl::DurableOplogEntry::getDurableReplOperationSize(op1); + + TransactionOperations::TransactionOperation op2; + auto opSize2 = repl::DurableOplogEntry::getDurableReplOperationSize(op2); + + auto sizeLimit = opSize1 + opSize2 - 1; + TransactionOperations ops; + ASSERT_OK(ops.addOperation(op1, sizeLimit)); + ASSERT_EQ(ErrorCodes::TransactionTooLarge, ops.addOperation(op2, sizeLimit)); +} + +} // namespace +} // namespace mongo -- cgit v1.2.1