summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2022-10-13 10:50:58 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-13 15:49:09 +0000
commitaaadd6214e5abfe206fcab752fc5ba000909a191 (patch)
tree4390d096785c9d715fd5a09bff77edc5c579a70e /src/mongo/db/transaction
parent7993e99666a66429457ebaf7e1326ffdcbe40f64 (diff)
downloadmongo-aaadd6214e5abfe206fcab752fc5ba000909a191.tar.gz
SERVER-69749 move getApplyOpsOplogSlotAndOperationAssignmentForTransaction() from OpObserverImpl to TransactionOperations
Diffstat (limited to 'src/mongo/db/transaction')
-rw-r--r--src/mongo/db/transaction/transaction_operations.cpp103
-rw-r--r--src/mongo/db/transaction/transaction_operations.h35
-rw-r--r--src/mongo/db/transaction/transaction_operations_test.cpp224
3 files changed, 362 insertions, 0 deletions
diff --git a/src/mongo/db/transaction/transaction_operations.cpp b/src/mongo/db/transaction/transaction_operations.cpp
index dd0a5544c71..5624c306a03 100644
--- a/src/mongo/db/transaction/transaction_operations.cpp
+++ b/src/mongo/db/transaction/transaction_operations.cpp
@@ -29,9 +29,66 @@
#include "mongo/db/transaction/transaction_operations.h"
+#include <algorithm>
#include <fmt/format.h>
namespace mongo {
+namespace {
+
+/**
+ * Returns operations that can fit into an "applyOps" entry. The returned operations are
+ * serialized to BSON. The operations are given by range ['operationsBegin',
+ * 'operationsEnd').
+ * Multi-document transactions follow the following constraints for fitting the operations: (1) the
+ * resulting "applyOps" entry shouldn't exceed the 16MB limit, unless only one operation is
+ * allocated to it; (2) the number of operations is not larger than the maximum number of
+ * transaction statements allowed in one entry as defined by
+ * 'gMaxNumberOfTransactionOperationsInSingleOplogEntry'. Batched writes (WUOWs that pack writes
+ * into a single applyOps outside of a multi-doc transaction) are exempt from the constraints above.
+ * If the operations cannot be packed into a single applyOps that's within the BSON size limit
+ * (16MB), the batched write will fail with TransactionTooLarge.
+ */
+std::vector<BSONObj> packOperationsIntoApplyOps(
+ std::vector<repl::ReplOperation>::const_iterator operationsBegin,
+ std::vector<repl::ReplOperation>::const_iterator operationsEnd,
+ boost::optional<std::size_t> oplogEntryCountLimit,
+ boost::optional<std::size_t> oplogEntrySizeLimitBytes) {
+ std::vector<BSONObj> operations;
+ std::size_t totalOperationsSize{0};
+ for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) {
+ const auto& operation = *operationIter;
+
+ if (oplogEntryCountLimit) {
+ if (operations.size() == *oplogEntryCountLimit) {
+ break;
+ }
+ }
+ if (oplogEntrySizeLimitBytes) {
+ if ((operations.size() > 0 &&
+ (totalOperationsSize +
+ repl::DurableOplogEntry::getDurableReplOperationSize(operation) >
+ *oplogEntrySizeLimitBytes))) {
+ break;
+ }
+ }
+ // If neither 'oplogEntryCountLimit' nor 'oplogEntrySizeLimitBytes' is provided,
+ // this is a batched write, so we don't break the batch into multiple applyOps. It is the
+ // responsibility of the caller to generate a batch that fits within a single applyOps.
+ // If the batch doesn't fit within an applyOps, we throw a TransactionTooLarge later
+ // on when serializing to BSON.
+ auto serializedOperation = operation.toBSON();
+ totalOperationsSize += static_cast<std::size_t>(serializedOperation.objsize());
+
+ // Add BSON array element overhead since operations will ultimately be packed into BSON
+ // array.
+ totalOperationsSize += TransactionOperations::ApplyOpsInfo::kBSONArrayElementOverhead;
+
+ operations.emplace_back(std::move(serializedOperation));
+ }
+ return operations;
+}
+
+} // namespace
bool TransactionOperations::isEmpty() const {
return _transactionOperations.empty();
@@ -124,6 +181,52 @@ TransactionOperations::CollectionUUIDs TransactionOperations::getCollectionUUIDs
return uuids;
}
+TransactionOperations::ApplyOpsInfo TransactionOperations::getApplyOpsInfo(
+ const std::vector<OplogSlot>& oplogSlots,
+ bool prepare,
+ boost::optional<std::size_t> oplogEntryCountLimit,
+ boost::optional<std::size_t> oplogEntrySizeLimitBytes) const {
+ const auto& operations = _transactionOperations;
+ if (operations.empty()) {
+ return {{}, /*numberOfOplogSlotsUsed=*/0};
+ }
+ tassert(6278504, "Insufficient number of oplogSlots", operations.size() <= oplogSlots.size());
+
+ std::vector<ApplyOpsInfo::ApplyOpsEntry> applyOpsEntries;
+ auto oplogSlotIter = oplogSlots.begin();
+ auto getNextOplogSlot = [&]() {
+ tassert(6278505, "Unexpected end of oplog slot vector", oplogSlotIter != oplogSlots.end());
+ return *oplogSlotIter++;
+ };
+
+ auto hasNeedsRetryImage = [](const repl::ReplOperation& operation) {
+ return static_cast<bool>(operation.getNeedsRetryImage());
+ };
+
+ // Assign operations to "applyOps" entries.
+ for (auto operationIt = operations.begin(); operationIt != operations.end();) {
+ auto applyOpsOperations = packOperationsIntoApplyOps(
+ operationIt, operations.end(), oplogEntryCountLimit, oplogEntrySizeLimitBytes);
+ const auto opCountWithNeedsRetryImage =
+ std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage);
+ if (opCountWithNeedsRetryImage > 0) {
+ // Reserve a slot for a forged no-op entry.
+ getNextOplogSlot();
+ }
+ operationIt += applyOpsOperations.size();
+ applyOpsEntries.emplace_back(
+ ApplyOpsInfo::ApplyOpsEntry{getNextOplogSlot(), std::move(applyOpsOperations)});
+ }
+
+ // In the special case of writing the implicit 'prepare' oplog entry, we use the last reserved
+ // oplog slot. This may mean we skipped over some reserved slots, but there's no harm in that.
+ if (prepare) {
+ applyOpsEntries.back().oplogSlot = oplogSlots.back();
+ }
+ return {std::move(applyOpsEntries),
+ static_cast<std::size_t>(oplogSlotIter - oplogSlots.begin())};
+}
+
std::vector<TransactionOperations::TransactionOperation>*
TransactionOperations::getMutableOperationsForOpObserver() {
return &_transactionOperations;
diff --git a/src/mongo/db/transaction/transaction_operations.h b/src/mongo/db/transaction/transaction_operations.h
index 13a0ea664f0..2ac9a069e78 100644
--- a/src/mongo/db/transaction/transaction_operations.h
+++ b/src/mongo/db/transaction/transaction_operations.h
@@ -34,6 +34,7 @@
#include <vector>
#include "mongo/base/status.h"
+#include "mongo/db/repl/oplog.h" // for OplogSlot
#include "mongo/db/repl/oplog_entry.h" // for ReplOperation
#include "mongo/stdx/unordered_set.h"
#include "mongo/util/uuid.h"
@@ -51,6 +52,28 @@ public:
using TransactionOperation = repl::ReplOperation;
using CollectionUUIDs = stdx::unordered_set<UUID, UUID::Hash>;
+ /**
+ * Contains "applyOps" oplog entries for a transaction. "applyOps" entries are not actual
+ * "applyOps" entries to be written to the oplog, but comprise certain parts of those entries -
+ * BSON serialized operations, and the assigned oplog slot. The operations in field
+ * 'ApplyOpsEntry::operations' should be considered opaque outside the OpObserver.
+ */
+ struct ApplyOpsInfo {
+ // Conservative BSON array element overhead assuming maximum 6 digit array index.
+ static constexpr std::size_t kBSONArrayElementOverhead = 8U;
+
+ struct ApplyOpsEntry {
+ OplogSlot oplogSlot;
+ std::vector<BSONObj> operations;
+ };
+
+ // Representation of "applyOps" oplog entries.
+ std::vector<ApplyOpsEntry> applyOpsEntries;
+
+ // Number of oplog slots utilized.
+ std::size_t numberOfOplogSlotsUsed;
+ };
+
TransactionOperations() = default;
/**
@@ -102,6 +125,18 @@ public:
CollectionUUIDs getCollectionUUIDs() const;
/**
+ * Returns oplog slots to be used for "applyOps" oplog entries, BSON serialized operations,
+ * their assignments to "applyOps" entries, and oplog slots to be used for writing pre- and
+ * post- image oplog entries for the transaction consisting of 'operations'. Allocates oplog
+ * slots from 'oplogSlots'. The 'prepare' indicates if the function is called when preparing a
+ * transaction.
+ */
+ ApplyOpsInfo getApplyOpsInfo(const std::vector<OplogSlot>& oplogSlots,
+ bool prepare,
+ boost::optional<std::size_t> oplogEntryCountLimit,
+ boost::optional<std::size_t> oplogEntrySizeLimitBytes) const;
+
+ /**
* Returns pointer to vector of operations for integrating with
* BatchedWriteContext, TransactionParticipant, and OpObserver interfaces
* for multi-doc transactions.
diff --git a/src/mongo/db/transaction/transaction_operations_test.cpp b/src/mongo/db/transaction/transaction_operations_test.cpp
index b386cbe2e77..c3c84e2c5d7 100644
--- a/src/mongo/db/transaction/transaction_operations_test.cpp
+++ b/src/mongo/db/transaction/transaction_operations_test.cpp
@@ -28,6 +28,7 @@
*/
#include "mongo/db/transaction/transaction_operations.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -204,5 +205,228 @@ TEST(TransactionOperationsTest, GetCollectionUUIDsIgnoresNoopOperations) {
ASSERT(uuids.count(*op2.getUuid()));
}
+TEST(TransactionOperationsTest, GetApplyOpsInfoEmptyOps) {
+ TransactionOperations ops;
+ auto info = ops.getApplyOpsInfo(/*oplogSlots=*/{},
+ /*prepare=*/false,
+ /*oplogEntryCountLimit=*/boost::none,
+ /*oplogEntrySizeLimitBytes=*/boost::none);
+ ASSERT_EQ(info.applyOpsEntries.size(), 0);
+ ASSERT_EQ(info.numberOfOplogSlotsUsed, 0);
+}
+
+DEATH_TEST(TransactionOperationsTest,
+ GetApplyOpsInfoInsufficientSlots,
+ "Insufficient number of oplogSlots") {
+ TransactionOperations ops;
+ TransactionOperations::TransactionOperation op;
+ ASSERT_OK(ops.addOperation(op));
+ ops.getApplyOpsInfo(/*oplogSlots=*/{},
+ /*prepare=*/false,
+ /*oplogEntryCountLimit=*/boost::none,
+ /*oplogEntrySizeLimitBytes=*/boost::none);
+}
+
+TEST(TransactionOperationsTest, GetApplyOpsInfoReturnsOneEntryContainingTwoOperations) {
+ TransactionOperations ops;
+
+ TransactionOperations::TransactionOperation op1;
+ op1.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op1.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op1.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op1));
+
+ TransactionOperations::TransactionOperation op2;
+ op2.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op2.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op2.setObject(BSON("_id" << 2)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op2));
+
+ // We have to allocate as many oplog slots as operations even though only
+ // one applyOps entry will be generated.
+ std::vector<OplogSlot> oplogSlots;
+ oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL});
+ oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL});
+
+ auto info = ops.getApplyOpsInfo(oplogSlots,
+ /*prepare=*/false,
+ /*oplogEntryCountLimit=*/boost::none,
+ /*oplogEntrySizeLimitBytes=*/boost::none);
+
+ ASSERT_EQ(info.numberOfOplogSlotsUsed, 1U);
+ ASSERT_EQ(info.applyOpsEntries.size(), 1U);
+ ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[0]); // first oplog slot
+ ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 2U);
+ ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op1.toBSON());
+ ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[1], op2.toBSON());
+}
+
+TEST(TransactionOperationsTest, GetApplyOpsInfoRespectsOperationCountLimit) {
+ TransactionOperations ops;
+
+ TransactionOperations::TransactionOperation op1;
+ op1.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op1.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op1.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op1));
+
+ TransactionOperations::TransactionOperation op2;
+ op2.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op2.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op2.setObject(BSON("_id" << 2)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op2));
+
+ // We have to allocate as many oplog slots as operations even though only
+ // one applyOps entry will be generated.
+ std::vector<OplogSlot> oplogSlots;
+ oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL});
+ oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL});
+
+ // Restrict each applyOps entry to holding at most one operation.
+ auto info = ops.getApplyOpsInfo(
+ oplogSlots,
+ /*prepare=*/false,
+ /*oplogEntryCountLimit=*/1U,
+ /*oplogEntrySizeLimitBytes=*/static_cast<std::size_t>(BSONObjMaxUserSize));
+
+ ASSERT_EQ(info.numberOfOplogSlotsUsed, 2U);
+ ASSERT_EQ(info.applyOpsEntries.size(), 2U);
+
+ // Check first applyOps entry.
+ ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[0]);
+ ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 1U);
+ ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op1.toBSON());
+
+ // Check second applyOps entry.
+ ASSERT_EQ(info.applyOpsEntries[1].oplogSlot, oplogSlots[1]);
+ ASSERT_EQ(info.applyOpsEntries[1].operations.size(), 1U);
+ ASSERT_BSONOBJ_EQ(info.applyOpsEntries[1].operations[0], op2.toBSON());
+}
+
+TEST(TransactionOperationsTest, GetApplyOpsInfoRespectsOperationSizeLimit) {
+ TransactionOperations ops;
+
+ TransactionOperations::TransactionOperation op1;
+ op1.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op1.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op1.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op1));
+
+ TransactionOperations::TransactionOperation op2;
+ op2.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op2.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op2.setObject(BSON("_id" << 2)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op2));
+
+ // We have to allocate as many oplog slots as operations even though only
+ // one applyOps entry will be generated.
+ std::vector<OplogSlot> oplogSlots;
+ oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL});
+ oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL});
+
+ // Restrict each applyOps entry to holding at most one operation.
+ auto info = ops.getApplyOpsInfo(
+ oplogSlots,
+ /*prepare=*/false,
+ /*oplogEntryCountLimit=*/100U,
+ /*oplogEntrySizeLimitBytes=*/repl::DurableOplogEntry::getDurableReplOperationSize(op1) +
+ TransactionOperations::ApplyOpsInfo::kBSONArrayElementOverhead);
+
+ ASSERT_EQ(info.numberOfOplogSlotsUsed, 2U);
+ ASSERT_EQ(info.applyOpsEntries.size(), 2U);
+
+ // Check first applyOps entry.
+ ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[0]);
+ ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 1U);
+ ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op1.toBSON());
+
+ // Check second applyOps entry.
+ ASSERT_EQ(info.applyOpsEntries[1].oplogSlot, oplogSlots[1]);
+ ASSERT_EQ(info.applyOpsEntries[1].operations.size(), 1U);
+ ASSERT_BSONOBJ_EQ(info.applyOpsEntries[1].operations[0], op2.toBSON());
+}
+
+DEATH_TEST(TransactionOperationsTest,
+ GetApplyOpsInfoInsufficientSlotsDueToPreImage,
+ "Unexpected end of oplog slot vector") {
+ TransactionOperations ops;
+
+ // Setting the "needs retry image" flag on 'op' forces getApplyOpsInfo()
+ // to request an additional slot, which will not be available due to an
+ // insufficiently sized 'oplogSlots' array.
+ TransactionOperations::TransactionOperation op;
+ op.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
+ op.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op));
+
+ // We allocated a slot for the operation but not for the pre-image.
+ std::vector<OplogSlot> oplogSlots;
+ oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL});
+
+ ops.getApplyOpsInfo(oplogSlots,
+ /*prepare=*/false,
+ /*oplogEntryCountLimit=*/boost::none,
+ /*oplogEntrySizeLimitBytes=*/boost::none);
+}
+
+TEST(TransactionOperationsTest, GetApplyOpsInfoAssignsPreImageSlotBeforeOperation) {
+ TransactionOperations ops;
+
+ // Setting the "needs retry image" flag on 'op' forces getApplyOpsInfo()
+ // to request an additional slot, which will not be available due to an
+ // insufficiently sized 'oplogSlots' array.
+ TransactionOperations::TransactionOperation op;
+ op.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
+ op.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op));
+
+ // We allocated a slot for the operation but not for the pre-image.
+ std::vector<OplogSlot> oplogSlots;
+ oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL});
+ oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL});
+
+ auto info = ops.getApplyOpsInfo(oplogSlots,
+ /*prepare=*/false,
+ /*oplogEntryCountLimit=*/boost::none,
+ /*oplogEntrySizeLimitBytes=*/boost::none);
+
+ ASSERT_EQ(info.numberOfOplogSlotsUsed, 2U);
+ ASSERT_EQ(info.applyOpsEntries.size(), 1U);
+ ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[1]);
+ ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 1U);
+ ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op.toBSON());
+}
+
+TEST(TransactionOperationsTest, GetApplyOpsInfoAssignsLastOplogSlotForPrepare) {
+ TransactionOperations ops;
+
+ TransactionOperations::TransactionOperation op;
+ op.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize()
+ op.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize()
+ op.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize()
+ ASSERT_OK(ops.addOperation(op));
+
+ // We allocate two oplog slots and confirm that the second oplog slot is assigned
+ // to the only applyOps entry
+ std::vector<OplogSlot> oplogSlots;
+ oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL});
+ oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL});
+
+ auto info = ops.getApplyOpsInfo(oplogSlots,
+ /*prepare=*/true,
+ /*oplogEntryCountLimit=*/boost::none,
+ /*oplogEntrySizeLimitBytes=*/boost::none);
+
+ ASSERT_EQ(info.numberOfOplogSlotsUsed, 1U);
+ ASSERT_EQ(info.applyOpsEntries.size(), 1U);
+ ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[1]); // last oplog slot
+ ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 1U);
+ ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op.toBSON());
+}
+
} // namespace
} // namespace mongo