diff options
author | Benety Goh <benety@mongodb.com> | 2022-10-13 10:50:58 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-13 15:49:09 +0000 |
commit | aaadd6214e5abfe206fcab752fc5ba000909a191 (patch) | |
tree | 4390d096785c9d715fd5a09bff77edc5c579a70e /src/mongo/db/transaction | |
parent | 7993e99666a66429457ebaf7e1326ffdcbe40f64 (diff) | |
download | mongo-aaadd6214e5abfe206fcab752fc5ba000909a191.tar.gz |
SERVER-69749 move getApplyOpsOplogSlotAndOperationAssignmentForTransaction() from OpObserverImpl to TransactionOperations
Diffstat (limited to 'src/mongo/db/transaction')
-rw-r--r-- | src/mongo/db/transaction/transaction_operations.cpp | 103 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_operations.h | 35 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_operations_test.cpp | 224 |
3 files changed, 362 insertions, 0 deletions
diff --git a/src/mongo/db/transaction/transaction_operations.cpp b/src/mongo/db/transaction/transaction_operations.cpp index dd0a5544c71..5624c306a03 100644 --- a/src/mongo/db/transaction/transaction_operations.cpp +++ b/src/mongo/db/transaction/transaction_operations.cpp @@ -29,9 +29,66 @@ #include "mongo/db/transaction/transaction_operations.h" +#include <algorithm> #include <fmt/format.h> namespace mongo { +namespace { + +/** + * Returns operations that can fit into an "applyOps" entry. The returned operations are + * serialized to BSON. The operations are given by range ['operationsBegin', + * 'operationsEnd'). + * Multi-document transactions follow the following constraints for fitting the operations: (1) the + * resulting "applyOps" entry shouldn't exceed the 16MB limit, unless only one operation is + * allocated to it; (2) the number of operations is not larger than the maximum number of + * transaction statements allowed in one entry as defined by + * 'gMaxNumberOfTransactionOperationsInSingleOplogEntry'. Batched writes (WUOWs that pack writes + * into a single applyOps outside of a multi-doc transaction) are exempt from the constraints above. + * If the operations cannot be packed into a single applyOps that's within the BSON size limit + * (16MB), the batched write will fail with TransactionTooLarge. + */ +std::vector<BSONObj> packOperationsIntoApplyOps( + std::vector<repl::ReplOperation>::const_iterator operationsBegin, + std::vector<repl::ReplOperation>::const_iterator operationsEnd, + boost::optional<std::size_t> oplogEntryCountLimit, + boost::optional<std::size_t> oplogEntrySizeLimitBytes) { + std::vector<BSONObj> operations; + std::size_t totalOperationsSize{0}; + for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) { + const auto& operation = *operationIter; + + if (oplogEntryCountLimit) { + if (operations.size() == *oplogEntryCountLimit) { + break; + } + } + if (oplogEntrySizeLimitBytes) { + if ((operations.size() > 0 && + (totalOperationsSize + + repl::DurableOplogEntry::getDurableReplOperationSize(operation) > + *oplogEntrySizeLimitBytes))) { + break; + } + } + // If neither 'oplogEntryCountLimit' nor 'oplogEntrySizeLimitBytes' is provided, + // this is a batched write, so we don't break the batch into multiple applyOps. It is the + // responsibility of the caller to generate a batch that fits within a single applyOps. + // If the batch doesn't fit within an applyOps, we throw a TransactionTooLarge later + // on when serializing to BSON. + auto serializedOperation = operation.toBSON(); + totalOperationsSize += static_cast<std::size_t>(serializedOperation.objsize()); + + // Add BSON array element overhead since operations will ultimately be packed into BSON + // array. + totalOperationsSize += TransactionOperations::ApplyOpsInfo::kBSONArrayElementOverhead; + + operations.emplace_back(std::move(serializedOperation)); + } + return operations; +} + +} // namespace bool TransactionOperations::isEmpty() const { return _transactionOperations.empty(); @@ -124,6 +181,52 @@ TransactionOperations::CollectionUUIDs TransactionOperations::getCollectionUUIDs return uuids; } +TransactionOperations::ApplyOpsInfo TransactionOperations::getApplyOpsInfo( + const std::vector<OplogSlot>& oplogSlots, + bool prepare, + boost::optional<std::size_t> oplogEntryCountLimit, + boost::optional<std::size_t> oplogEntrySizeLimitBytes) const { + const auto& operations = _transactionOperations; + if (operations.empty()) { + return {{}, /*numberOfOplogSlotsUsed=*/0}; + } + tassert(6278504, "Insufficient number of oplogSlots", operations.size() <= oplogSlots.size()); + + std::vector<ApplyOpsInfo::ApplyOpsEntry> applyOpsEntries; + auto oplogSlotIter = oplogSlots.begin(); + auto getNextOplogSlot = [&]() { + tassert(6278505, "Unexpected end of oplog slot vector", oplogSlotIter != oplogSlots.end()); + return *oplogSlotIter++; + }; + + auto hasNeedsRetryImage = [](const repl::ReplOperation& operation) { + return static_cast<bool>(operation.getNeedsRetryImage()); + }; + + // Assign operations to "applyOps" entries. + for (auto operationIt = operations.begin(); operationIt != operations.end();) { + auto applyOpsOperations = packOperationsIntoApplyOps( + operationIt, operations.end(), oplogEntryCountLimit, oplogEntrySizeLimitBytes); + const auto opCountWithNeedsRetryImage = + std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage); + if (opCountWithNeedsRetryImage > 0) { + // Reserve a slot for a forged no-op entry. + getNextOplogSlot(); + } + operationIt += applyOpsOperations.size(); + applyOpsEntries.emplace_back( + ApplyOpsInfo::ApplyOpsEntry{getNextOplogSlot(), std::move(applyOpsOperations)}); + } + + // In the special case of writing the implicit 'prepare' oplog entry, we use the last reserved + // oplog slot. This may mean we skipped over some reserved slots, but there's no harm in that. + if (prepare) { + applyOpsEntries.back().oplogSlot = oplogSlots.back(); + } + return {std::move(applyOpsEntries), + static_cast<std::size_t>(oplogSlotIter - oplogSlots.begin())}; +} + std::vector<TransactionOperations::TransactionOperation>* TransactionOperations::getMutableOperationsForOpObserver() { return &_transactionOperations; diff --git a/src/mongo/db/transaction/transaction_operations.h b/src/mongo/db/transaction/transaction_operations.h index 13a0ea664f0..2ac9a069e78 100644 --- a/src/mongo/db/transaction/transaction_operations.h +++ b/src/mongo/db/transaction/transaction_operations.h @@ -34,6 +34,7 @@ #include <vector> #include "mongo/base/status.h" +#include "mongo/db/repl/oplog.h" // for OplogSlot #include "mongo/db/repl/oplog_entry.h" // for ReplOperation #include "mongo/stdx/unordered_set.h" #include "mongo/util/uuid.h" @@ -51,6 +52,28 @@ public: using TransactionOperation = repl::ReplOperation; using CollectionUUIDs = stdx::unordered_set<UUID, UUID::Hash>; + /** + * Contains "applyOps" oplog entries for a transaction. "applyOps" entries are not actual + * "applyOps" entries to be written to the oplog, but comprise certain parts of those entries - + * BSON serialized operations, and the assigned oplog slot. The operations in field + * 'ApplyOpsEntry::operations' should be considered opaque outside the OpObserver. + */ + struct ApplyOpsInfo { + // Conservative BSON array element overhead assuming maximum 6 digit array index. + static constexpr std::size_t kBSONArrayElementOverhead = 8U; + + struct ApplyOpsEntry { + OplogSlot oplogSlot; + std::vector<BSONObj> operations; + }; + + // Representation of "applyOps" oplog entries. + std::vector<ApplyOpsEntry> applyOpsEntries; + + // Number of oplog slots utilized. + std::size_t numberOfOplogSlotsUsed; + }; + TransactionOperations() = default; /** @@ -102,6 +125,18 @@ public: CollectionUUIDs getCollectionUUIDs() const; /** + * Returns oplog slots to be used for "applyOps" oplog entries, BSON serialized operations, + * their assignments to "applyOps" entries, and oplog slots to be used for writing pre- and + * post- image oplog entries for the transaction consisting of 'operations'. Allocates oplog + * slots from 'oplogSlots'. The 'prepare' indicates if the function is called when preparing a + * transaction. + */ + ApplyOpsInfo getApplyOpsInfo(const std::vector<OplogSlot>& oplogSlots, + bool prepare, + boost::optional<std::size_t> oplogEntryCountLimit, + boost::optional<std::size_t> oplogEntrySizeLimitBytes) const; + + /** * Returns pointer to vector of operations for integrating with * BatchedWriteContext, TransactionParticipant, and OpObserver interfaces * for multi-doc transactions. diff --git a/src/mongo/db/transaction/transaction_operations_test.cpp b/src/mongo/db/transaction/transaction_operations_test.cpp index b386cbe2e77..c3c84e2c5d7 100644 --- a/src/mongo/db/transaction/transaction_operations_test.cpp +++ b/src/mongo/db/transaction/transaction_operations_test.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/transaction/transaction_operations.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -204,5 +205,228 @@ TEST(TransactionOperationsTest, GetCollectionUUIDsIgnoresNoopOperations) { ASSERT(uuids.count(*op2.getUuid())); } +TEST(TransactionOperationsTest, GetApplyOpsInfoEmptyOps) { + TransactionOperations ops; + auto info = ops.getApplyOpsInfo(/*oplogSlots=*/{}, + /*prepare=*/false, + /*oplogEntryCountLimit=*/boost::none, + /*oplogEntrySizeLimitBytes=*/boost::none); + ASSERT_EQ(info.applyOpsEntries.size(), 0); + ASSERT_EQ(info.numberOfOplogSlotsUsed, 0); +} + +DEATH_TEST(TransactionOperationsTest, + GetApplyOpsInfoInsufficientSlots, + "Insufficient number of oplogSlots") { + TransactionOperations ops; + TransactionOperations::TransactionOperation op; + ASSERT_OK(ops.addOperation(op)); + ops.getApplyOpsInfo(/*oplogSlots=*/{}, + /*prepare=*/false, + /*oplogEntryCountLimit=*/boost::none, + /*oplogEntrySizeLimitBytes=*/boost::none); +} + +TEST(TransactionOperationsTest, GetApplyOpsInfoReturnsOneEntryContainingTwoOperations) { + TransactionOperations ops; + + TransactionOperations::TransactionOperation op1; + op1.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op1.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op1.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op1)); + + TransactionOperations::TransactionOperation op2; + op2.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op2.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op2.setObject(BSON("_id" << 2)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op2)); + + // We have to allocate as many oplog slots as operations even though only + // one applyOps entry will be generated. + std::vector<OplogSlot> oplogSlots; + oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL}); + oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL}); + + auto info = ops.getApplyOpsInfo(oplogSlots, + /*prepare=*/false, + /*oplogEntryCountLimit=*/boost::none, + /*oplogEntrySizeLimitBytes=*/boost::none); + + ASSERT_EQ(info.numberOfOplogSlotsUsed, 1U); + ASSERT_EQ(info.applyOpsEntries.size(), 1U); + ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[0]); // first oplog slot + ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 2U); + ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op1.toBSON()); + ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[1], op2.toBSON()); +} + +TEST(TransactionOperationsTest, GetApplyOpsInfoRespectsOperationCountLimit) { + TransactionOperations ops; + + TransactionOperations::TransactionOperation op1; + op1.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op1.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op1.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op1)); + + TransactionOperations::TransactionOperation op2; + op2.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op2.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op2.setObject(BSON("_id" << 2)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op2)); + + // We have to allocate as many oplog slots as operations even though only + // one applyOps entry will be generated. + std::vector<OplogSlot> oplogSlots; + oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL}); + oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL}); + + // Restrict each applyOps entry to holding at most one operation. + auto info = ops.getApplyOpsInfo( + oplogSlots, + /*prepare=*/false, + /*oplogEntryCountLimit=*/1U, + /*oplogEntrySizeLimitBytes=*/static_cast<std::size_t>(BSONObjMaxUserSize)); + + ASSERT_EQ(info.numberOfOplogSlotsUsed, 2U); + ASSERT_EQ(info.applyOpsEntries.size(), 2U); + + // Check first applyOps entry. + ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[0]); + ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 1U); + ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op1.toBSON()); + + // Check second applyOps entry. + ASSERT_EQ(info.applyOpsEntries[1].oplogSlot, oplogSlots[1]); + ASSERT_EQ(info.applyOpsEntries[1].operations.size(), 1U); + ASSERT_BSONOBJ_EQ(info.applyOpsEntries[1].operations[0], op2.toBSON()); +} + +TEST(TransactionOperationsTest, GetApplyOpsInfoRespectsOperationSizeLimit) { + TransactionOperations ops; + + TransactionOperations::TransactionOperation op1; + op1.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op1.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op1.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op1)); + + TransactionOperations::TransactionOperation op2; + op2.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op2.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op2.setObject(BSON("_id" << 2)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op2)); + + // We have to allocate as many oplog slots as operations even though only + // one applyOps entry will be generated. + std::vector<OplogSlot> oplogSlots; + oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL}); + oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL}); + + // Restrict each applyOps entry to holding at most one operation. + auto info = ops.getApplyOpsInfo( + oplogSlots, + /*prepare=*/false, + /*oplogEntryCountLimit=*/100U, + /*oplogEntrySizeLimitBytes=*/repl::DurableOplogEntry::getDurableReplOperationSize(op1) + + TransactionOperations::ApplyOpsInfo::kBSONArrayElementOverhead); + + ASSERT_EQ(info.numberOfOplogSlotsUsed, 2U); + ASSERT_EQ(info.applyOpsEntries.size(), 2U); + + // Check first applyOps entry. + ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[0]); + ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 1U); + ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op1.toBSON()); + + // Check second applyOps entry. + ASSERT_EQ(info.applyOpsEntries[1].oplogSlot, oplogSlots[1]); + ASSERT_EQ(info.applyOpsEntries[1].operations.size(), 1U); + ASSERT_BSONOBJ_EQ(info.applyOpsEntries[1].operations[0], op2.toBSON()); +} + +DEATH_TEST(TransactionOperationsTest, + GetApplyOpsInfoInsufficientSlotsDueToPreImage, + "Unexpected end of oplog slot vector") { + TransactionOperations ops; + + // Setting the "needs retry image" flag on 'op' forces getApplyOpsInfo() + // to request an additional slot, which will not be available due to an + // insufficiently sized 'oplogSlots' array. + TransactionOperations::TransactionOperation op; + op.setNeedsRetryImage(repl::RetryImageEnum::kPreImage); + op.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op)); + + // We allocated a slot for the operation but not for the pre-image. + std::vector<OplogSlot> oplogSlots; + oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL}); + + ops.getApplyOpsInfo(oplogSlots, + /*prepare=*/false, + /*oplogEntryCountLimit=*/boost::none, + /*oplogEntrySizeLimitBytes=*/boost::none); +} + +TEST(TransactionOperationsTest, GetApplyOpsInfoAssignsPreImageSlotBeforeOperation) { + TransactionOperations ops; + + // Setting the "needs retry image" flag on 'op' forces getApplyOpsInfo() + // to request an additional slot, which will not be available due to an + // insufficiently sized 'oplogSlots' array. + TransactionOperations::TransactionOperation op; + op.setNeedsRetryImage(repl::RetryImageEnum::kPreImage); + op.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op)); + + // We allocated a slot for the operation but not for the pre-image. + std::vector<OplogSlot> oplogSlots; + oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL}); + oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL}); + + auto info = ops.getApplyOpsInfo(oplogSlots, + /*prepare=*/false, + /*oplogEntryCountLimit=*/boost::none, + /*oplogEntrySizeLimitBytes=*/boost::none); + + ASSERT_EQ(info.numberOfOplogSlotsUsed, 2U); + ASSERT_EQ(info.applyOpsEntries.size(), 1U); + ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[1]); + ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 1U); + ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op.toBSON()); +} + +TEST(TransactionOperationsTest, GetApplyOpsInfoAssignsLastOplogSlotForPrepare) { + TransactionOperations ops; + + TransactionOperations::TransactionOperation op; + op.setOpType(repl::OpTypeEnum::kInsert); // required for DurableReplOperation::serialize() + op.setNss(NamespaceString{"test.t"}); // required for DurableReplOperation::serialize() + op.setObject(BSON("_id" << 1)); // required for DurableReplOperation::serialize() + ASSERT_OK(ops.addOperation(op)); + + // We allocate two oplog slots and confirm that the second oplog slot is assigned + // to the only applyOps entry + std::vector<OplogSlot> oplogSlots; + oplogSlots.push_back(OplogSlot{Timestamp(1, 0), /*term=*/1LL}); + oplogSlots.push_back(OplogSlot{Timestamp(2, 0), /*term=*/1LL}); + + auto info = ops.getApplyOpsInfo(oplogSlots, + /*prepare=*/true, + /*oplogEntryCountLimit=*/boost::none, + /*oplogEntrySizeLimitBytes=*/boost::none); + + ASSERT_EQ(info.numberOfOplogSlotsUsed, 1U); + ASSERT_EQ(info.applyOpsEntries.size(), 1U); + ASSERT_EQ(info.applyOpsEntries[0].oplogSlot, oplogSlots[1]); // last oplog slot + ASSERT_EQ(info.applyOpsEntries[0].operations.size(), 1U); + ASSERT_BSONOBJ_EQ(info.applyOpsEntries[0].operations[0], op.toBSON()); +} + } // namespace } // namespace mongo |