summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2022-09-15 15:47:31 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 22:33:44 +0000
commit7fca187deec6b2b28339bbb8613a9f8809ab07c8 (patch)
treec3977f1c171f763e1eae9387568238f235647623
parent55f4d89a5b15ed14d930c7b06cb39e299d886406 (diff)
downloadmongo-7fca187deec6b2b28339bbb8613a9f8809ab07c8.tar.gz
SERVER-68860 add TransactionOperations
-rw-r--r--src/mongo/db/transaction/SConscript2
-rw-r--r--src/mongo/db/transaction/transaction_operations.cpp116
-rw-r--r--src/mongo/db/transaction/transaction_operations.h123
-rw-r--r--src/mongo/db/transaction/transaction_operations_test.cpp175
4 files changed, 416 insertions, 0 deletions
diff --git a/src/mongo/db/transaction/SConscript b/src/mongo/db/transaction/SConscript
index e119e0616dd..1140cfaf546 100644
--- a/src/mongo/db/transaction/SConscript
+++ b/src/mongo/db/transaction/SConscript
@@ -16,6 +16,7 @@ env.Library(
'transaction_participant.cpp',
'transaction_participant_resource_yielder.cpp',
'internal_transactions_reap_service.idl',
+ 'transaction_operations.cpp',
'transaction_participant.idl',
'transactions_stats.idl',
],
@@ -81,6 +82,7 @@ env.CppUnitTest(
'internal_transactions_reap_service_test.cpp',
'transaction_api_test.cpp',
'transaction_history_iterator_test.cpp',
+ 'transaction_operations_test.cpp',
'transaction_participant_retryable_writes_test.cpp',
'transaction_participant_test.cpp',
],
diff --git a/src/mongo/db/transaction/transaction_operations.cpp b/src/mongo/db/transaction/transaction_operations.cpp
new file mode 100644
index 00000000000..4dd72641184
--- /dev/null
+++ b/src/mongo/db/transaction/transaction_operations.cpp
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/transaction/transaction_operations.h"
+
+#include <fmt/format.h>
+
+namespace mongo {
+
+bool TransactionOperations::isEmpty() const {
+ return _transactionOperations.empty();
+}
+
+std::size_t TransactionOperations::numOperations() const {
+ return _transactionOperations.size();
+}
+
+std::size_t TransactionOperations::getTotalOperationBytes() const {
+ return _totalOperationBytes;
+}
+
+std::size_t TransactionOperations::getNumberOfPrePostImagesToWrite() const {
+ return _numberOfPrePostImagesToWrite;
+}
+
+void TransactionOperations::clear() {
+ _transactionOperations.clear();
+ _transactionStmtIds.clear();
+ _totalOperationBytes = 0;
+ _numberOfPrePostImagesToWrite = 0;
+}
+
+Status TransactionOperations::addOperation(const TransactionOperation& operation,
+ boost::optional<std::size_t> transactionSizeLimitBytes) {
+ auto stmtIdsToInsert = operation.getStatementIds();
+ auto newTransactionStmtIds = _transactionStmtIds;
+ for (auto stmtId : stmtIdsToInsert) {
+ auto [_, inserted] = newTransactionStmtIds.insert(stmtId);
+ if (inserted) {
+ continue;
+ }
+ return Status(static_cast<ErrorCodes::Error>(5875600),
+ fmt::format("Found two operations using the same stmtId of {}", stmtId));
+ }
+
+ auto opSize = repl::DurableOplogEntry::getDurableReplOperationSize(operation);
+
+ // The pre-image size is always added to the collection transaction size, but
+ // there are additional conditions for adding the pre-image to
+ // '_numberOfPrePostImagesToWrite'. See SERVER-59694.
+ std::size_t numberOfPrePostImagesToWrite = 0;
+ if (const auto& preImage = operation.getPreImage(); !preImage.isEmpty()) {
+ opSize += static_cast<std::size_t>(preImage.objsize());
+ if (operation.isPreImageRecordedForRetryableInternalTransaction()) {
+ numberOfPrePostImagesToWrite++;
+ }
+ }
+
+ // The post-image, if present, is always included in the size and pre/post image counters.
+ if (const auto& postImage = operation.getPostImage(); !postImage.isEmpty()) {
+ opSize += operation.getPostImage().objsize();
+ numberOfPrePostImagesToWrite++;
+ }
+
+ if (transactionSizeLimitBytes && (_totalOperationBytes + opSize) > *transactionSizeLimitBytes) {
+ return Status(ErrorCodes::TransactionTooLarge,
+ fmt::format("Total size of all transaction operations must be less than "
+ "server parameter 'transactionSizeLimitBytes' = {}",
+ *transactionSizeLimitBytes));
+ }
+
+ _transactionOperations.push_back(operation);
+ _transactionStmtIds = std::move(newTransactionStmtIds);
+ _totalOperationBytes += opSize;
+ _numberOfPrePostImagesToWrite += numberOfPrePostImagesToWrite;
+
+ return Status::OK();
+}
+
+std::vector<TransactionOperations::TransactionOperation>*
+TransactionOperations::getMutableOperationsForTransactionParticipant() {
+ return &_transactionOperations;
+}
+
+std::vector<TransactionOperations::TransactionOperation>
+TransactionOperations::getOperationsForTest() const {
+ return _transactionOperations;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/transaction/transaction_operations.h b/src/mongo/db/transaction/transaction_operations.h
new file mode 100644
index 00000000000..c0e936b0827
--- /dev/null
+++ b/src/mongo/db/transaction/transaction_operations.h
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <cstddef>
+#include <vector>
+
+#include "mongo/base/status.h"
+#include "mongo/db/repl/oplog_entry.h" // for ReplOperation
+#include "mongo/stdx/unordered_set.h"
+
+namespace mongo {
+
+/**
+ * Container for ReplOperation used in multi-doc transactions and batched writer context.
+ * Includes statistics on operations held in this container.
+ * Provides methods for exporting ReplOperations in one or more applyOps oplog entries.
+ * Concurrency control for this class is maintained by the TransactionParticipant.
+ */
+class TransactionOperations {
+public:
+ using TransactionOperation = repl::ReplOperation;
+
+ TransactionOperations() = default;
+
+ /**
+ * Returns true if '_transactionsOperations' is empty.
+ */
+ bool isEmpty() const;
+
+ /**
+ * Returns number of items in '_transactionOperations'.
+ */
+ std::size_t numOperations() const;
+
+ /**
+ * Total size in bytes of all operations within the _transactionOperations vector.
+ * See DurableOplogEntry::getDurableReplOperationSize().
+ */
+ std::size_t getTotalOperationBytes() const;
+
+ /**
+ * Returns number of operations that have pre-images or post-images to be written to
+ * noop oplog entries or the image collection.
+ */
+ std::size_t getNumberOfPrePostImagesToWrite() const;
+
+ /**
+ * Clears the operations stored in this container along with corresponding statistics.
+ */
+ void clear();
+
+ /**
+ * Adds an operation to this container and updates relevant statistics.
+ *
+ * Ensures that statement ids in operation do not conflict with the operations
+ * already added.
+ *
+ * Ensures that total size of collected operations after adding operation does not
+ * exceed 'transactionSizeLimitBytes' (if provided).
+ */
+ Status addOperation(const TransactionOperation& operation,
+ boost::optional<std::size_t> transactionSizeLimitBytes = boost::none);
+
+ /**
+ * Returns pointer to vector of operations for integrating with
+ * TransactionParticipant and OpObserver interfaces for multi-doc transactions.
+ *
+ * Caller assumes responsibility for keeping contents referenced by the pointer
+ * in sync with statistics maintained in this container.
+ */
+ std::vector<TransactionOperation>* getMutableOperationsForTransactionParticipant();
+
+ /**
+ * Returns copy of operations for TransactionParticipant testing.
+ */
+ std::vector<TransactionOperation> getOperationsForTest() const;
+
+private:
+ std::vector<TransactionOperation> _transactionOperations;
+
+ // Holds stmtIds for operations which have been applied in the current multi-document
+ // transaction.
+ stdx::unordered_set<StmtId> _transactionStmtIds;
+
+ // Size of operations in _transactionOperations as calculated by
+ // DurableOplogEntry::getDurableReplOperationSize().
+ std::size_t _totalOperationBytes{0};
+
+ // Number of operations that have pre-images or post-images to be written to noop oplog
+ // entries or the image collection.
+ std::size_t _numberOfPrePostImagesToWrite{0};
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/transaction/transaction_operations_test.cpp b/src/mongo/db/transaction/transaction_operations_test.cpp
new file mode 100644
index 00000000000..93e8303bbce
--- /dev/null
+++ b/src/mongo/db/transaction/transaction_operations_test.cpp
@@ -0,0 +1,175 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/transaction/transaction_operations.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+TEST(TransactionOperationsTest, Basic) {
+ TransactionOperations ops;
+ ASSERT(ops.isEmpty());
+ ASSERT_EQ(ops.numOperations(), 0);
+ ASSERT_EQ(ops.getTotalOperationBytes(), 0);
+
+ TransactionOperations::TransactionOperation op;
+ auto opSize = repl::DurableOplogEntry::getDurableReplOperationSize(op);
+ ASSERT_GTE(opSize, 1U);
+ ASSERT_OK(ops.addOperation(op));
+ ASSERT_FALSE(ops.isEmpty());
+ ASSERT_EQ(ops.numOperations(), 1U);
+
+ // Empty pre-images and post-images do not count towards operation size.
+ ASSERT(op.getPreImage().isEmpty());
+ ASSERT(op.getPostImage().isEmpty());
+ ASSERT_EQ(ops.getTotalOperationBytes(), opSize);
+
+ // The getMutableOperationsForTransactionParticipant() method supports integration with
+ // existing TransactionParticipant usage and OpObserver interfaces.
+ auto* mutableOps = ops.getMutableOperationsForTransactionParticipant();
+ ASSERT_EQ(mutableOps->size(), ops.numOperations());
+ std::size_t mutableOpsTotalOperationBytes = 0;
+ for (const auto& mutableOp : *mutableOps) {
+ mutableOpsTotalOperationBytes +=
+ repl::DurableOplogEntry::getDurableReplOperationSize(mutableOp);
+ }
+ ASSERT_EQ(mutableOpsTotalOperationBytes, ops.getTotalOperationBytes());
+
+ // Use clear() to reset container state.
+ ops.clear();
+ ASSERT(ops.isEmpty());
+ ASSERT_EQ(ops.numOperations(), 0);
+ ASSERT_EQ(ops.getTotalOperationBytes(), 0);
+}
+
+TEST(TransactionOperationsTest, AddTransactionFailsOnDuplicateStatementIds) {
+ TransactionOperations::TransactionOperation op1;
+ std::vector<StmtId> stmtIds1 = {1, 2, 3};
+ op1.setStatementIds(stdx::variant<StmtId, std::vector<StmtId>>(stmtIds1));
+
+ TransactionOperations::TransactionOperation op2;
+ std::vector<StmtId> stmtIds2 = {3, 4, 5};
+ op2.setStatementIds(stdx::variant<StmtId, std::vector<StmtId>>(stmtIds2));
+
+ TransactionOperations ops;
+ ASSERT_OK(ops.addOperation(op1));
+ ASSERT_EQ(static_cast<ErrorCodes::Error>(5875600), ops.addOperation(op2));
+}
+
+TEST(TransactionOperationsTest, AddTransactionIncludesPreImageStatistics) {
+ TransactionOperations ops;
+
+ // The size of 'op1' is added to the total byte count but it does not have
+ // the additional criteria to be added to 'numberOfPrePostImages'.
+ // See SERVER-58694.
+ TransactionOperations::TransactionOperation op1;
+ op1.setPreImage(BSON("a" << 123));
+ ASSERT_OK(ops.addOperation(op1));
+ ASSERT_EQ(ops.getTotalOperationBytes(),
+ repl::DurableOplogEntry::getDurableReplOperationSize(op1) +
+ static_cast<std::size_t>(op1.getPreImage().objsize()));
+ ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 0);
+
+ // Set "pre-image for retryable writes" flag to include the pre-image in
+ // the pre/post image count.
+ TransactionOperations::TransactionOperation op3;
+ op3.setPreImage(BSON("c" << 123));
+ op3.setPreImageRecordedForRetryableInternalTransaction();
+ ASSERT_OK(ops.addOperation(op3));
+ ASSERT_EQ(ops.getTotalOperationBytes(),
+ repl::DurableOplogEntry::getDurableReplOperationSize(op1) +
+ static_cast<std::size_t>(op1.getPreImage().objsize()) +
+ repl::DurableOplogEntry::getDurableReplOperationSize(op3) +
+ static_cast<std::size_t>(op3.getPreImage().objsize()));
+ ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 1U);
+
+ // Pre/post image counter should be reset after clear().
+ ops.clear();
+ ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 0);
+}
+
+TEST(TransactionOperationsTest, AddTransactionIncludesPostImageStatistics) {
+ TransactionOperations ops;
+
+ TransactionOperations::TransactionOperation op1;
+ op1.setPostImage(BSON("a" << 123));
+ ASSERT_OK(ops.addOperation(op1));
+ ASSERT_EQ(ops.getTotalOperationBytes(),
+ repl::DurableOplogEntry::getDurableReplOperationSize(op1) +
+ static_cast<std::size_t>(op1.getPostImage().objsize()));
+ ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 1U);
+
+ // Pre/post image counter should be reset after clear().
+ ops.clear();
+ ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 0);
+}
+
+TEST(TransactionOperationsTest, AddTransactionIncludesPreAndPostImageStatistics) {
+ TransactionOperations ops;
+
+ TransactionOperations::TransactionOperation op1;
+ op1.setPreImage(BSON("a" << 123));
+ op1.setPreImageRecordedForRetryableInternalTransaction();
+ ASSERT_OK(ops.addOperation(op1));
+ ASSERT_EQ(ops.getTotalOperationBytes(),
+ repl::DurableOplogEntry::getDurableReplOperationSize(op1) +
+ static_cast<std::size_t>(op1.getPreImage().objsize()));
+ ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 1U);
+
+ TransactionOperations::TransactionOperation op2;
+ op2.setPostImage(BSON("b" << 123));
+ ASSERT_OK(ops.addOperation(op2));
+ ASSERT_EQ(ops.getTotalOperationBytes(),
+ repl::DurableOplogEntry::getDurableReplOperationSize(op1) +
+ static_cast<std::size_t>(op1.getPreImage().objsize()) +
+ repl::DurableOplogEntry::getDurableReplOperationSize(op2) +
+ static_cast<std::size_t>(op2.getPostImage().objsize()));
+ ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 2U);
+
+ // Pre/post image counter should be reset after clear().
+ ops.clear();
+ ASSERT_EQ(ops.getNumberOfPrePostImagesToWrite(), 0);
+}
+
+TEST(TransactionOperationsTest, AddTransactionEnforceTotalOperationSizeLimit) {
+ TransactionOperations::TransactionOperation op1;
+ auto opSize1 = repl::DurableOplogEntry::getDurableReplOperationSize(op1);
+
+ TransactionOperations::TransactionOperation op2;
+ auto opSize2 = repl::DurableOplogEntry::getDurableReplOperationSize(op2);
+
+ auto sizeLimit = opSize1 + opSize2 - 1;
+ TransactionOperations ops;
+ ASSERT_OK(ops.addOperation(op1, sizeLimit));
+ ASSERT_EQ(ErrorCodes::TransactionTooLarge, ops.addOperation(op2, sizeLimit));
+}
+
+} // namespace
+} // namespace mongo