diff options
author | Benety Goh <benety@mongodb.com> | 2022-10-13 07:32:39 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-13 12:25:48 +0000 |
commit | 1deed7e1caacf7050201f41cf3b58d1b66768c9f (patch) | |
tree | d7907a225f35dd8fc6cf39e30276dcaad1567ff5 | |
parent | be29f323c412bbd1487c954fade83817b77beaf4 (diff) | |
download | mongo-1deed7e1caacf7050201f41cf3b58d1b66768c9f.tar.gz |
SERVER-69749 move multi-doc transaction settings out of packOperationsIntoApplyOps()
-rw-r--r-- | src/mongo/db/op_observer/op_observer_impl.cpp | 89 |
1 files changed, 64 insertions, 25 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index 391de491530..c3bdd5bb3b8 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -1525,6 +1525,36 @@ void writeChangeStreamPreImagesForApplyOpsEntries( } /** + * Returns maximum number of operations to pack into a single oplog entry, + * when multi-oplog format for transactions is in use. + * + * Stop packing when either number of transaction operations is reached, or when the + * next one would make the total size of operations larger than the maximum BSON Object + * User Size. We rely on the headroom between BSONObjMaxUserSize and + * BSONObjMaxInternalSize to cover the BSON overhead and the other "applyOps" entry + * fields. But if a single operation in the set exceeds BSONObjMaxUserSize, we still fit + * it, as a single max-length operation should be able to be packed into an "applyOps" + * entry. + */ +std::size_t getMaxNumberOfTransactionOperationsInSingleOplogEntry() { + tassert(6278503, + "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number", + gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0); + return static_cast<std::size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry); +} + +/** + * Returns maximum size (bytes) of operations to pack into a single oplog entry, + * when multi-oplog format for transactions is in use. + * + * Refer to getMaxNumberOfTransactionOperationsInSingleOplogEntry() comments for a + * description on packing transaction operations into "applyOps" entries. + */ +std::size_t getMaxSizeOfTransactionOperationsInSingleOplogEntryBytes() { + return static_cast<std::size_t>(BSONObjMaxUserSize); +} + +/** * Returns operations that can fit into an "applyOps" entry. The returned operations are * serialized to BSON. The operations are given by range ['operationsBegin', * 'operationsEnd'). @@ -1538,40 +1568,34 @@ void writeChangeStreamPreImagesForApplyOpsEntries( * (16MB), the batched write will fail with TransactionTooLarge. */ std::vector<BSONObj> packOperationsIntoApplyOps( - OperationContext* opCtx, std::vector<repl::ReplOperation>::const_iterator operationsBegin, - std::vector<repl::ReplOperation>::const_iterator operationsEnd) { + std::vector<repl::ReplOperation>::const_iterator operationsEnd, + boost::optional<std::size_t> oplogEntryCountLimit, + boost::optional<std::size_t> oplogEntrySizeLimitBytes) { // Conservative BSON array element overhead assuming maximum 6 digit array index. constexpr size_t kBSONArrayElementOverhead{8}; - tassert(6278503, - "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number", - gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0); std::vector<BSONObj> operations; size_t totalOperationsSize{0}; for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) { const auto& operation = *operationIter; - if (TransactionParticipant::get(opCtx)) { - // Stop packing when either number of transaction operations is reached, or when the - // next one would make the total size of operations larger than the maximum BSON Object - // User Size. We rely on the headroom between BSONObjMaxUserSize and - // BSONObjMaxInternalSize to cover the BSON overhead and the other "applyOps" entry - // fields. But if a single operation in the set exceeds BSONObjMaxUserSize, we still fit - // it, as a single max-length operation should be able to be packed into an "applyOps" - // entry. - if (operations.size() == - static_cast<size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry) || - (operations.size() > 0 && + if (oplogEntryCountLimit) { + if (operations.size() == *oplogEntryCountLimit) { + break; + } + } + if (oplogEntrySizeLimitBytes) { + if ((operations.size() > 0 && (totalOperationsSize + DurableOplogEntry::getDurableReplOperationSize(operation) > - BSONObjMaxUserSize))) { + *oplogEntrySizeLimitBytes))) { break; } - } else { - // This a batched write, so we don't break the batch into multiple applyOps. It is the - // reponsibility of the caller to generate a batch that fits within a single applyOps. - // If the batch doesn't fit within an applyOps, we throw a TransactionTooLarge later - // on when serializing to BSON. } + // If neither 'oplogEntryCountLimit' nor 'oplogEntrySizeLimitBytes' is provided, + // this is a batched write, so we don't break the batch into multiple applyOps. It is the + // responsibility of the caller to generate a batch that fits within a single applyOps. + // If the batch doesn't fit within an applyOps, we throw a TransactionTooLarge later + // on when serializing to BSON. auto serializedOperation = operation.toBSON(); totalOperationsSize += static_cast<size_t>(serializedOperation.objsize()); @@ -1595,6 +1619,8 @@ getApplyOpsOplogSlotAndOperationAssignmentForTransaction( OperationContext* opCtx, const std::vector<OplogSlot>& oplogSlots, bool prepare, + boost::optional<std::size_t> oplogEntryCountLimit, + boost::optional<std::size_t> oplogEntrySizeLimitBytes, std::vector<repl::ReplOperation>& operations) { if (operations.empty()) { return {{}, 0 /*numberOfOplogSlotsUsed*/}; @@ -1614,7 +1640,8 @@ getApplyOpsOplogSlotAndOperationAssignmentForTransaction( // Assign operations to "applyOps" entries. for (auto operationIt = operations.begin(); operationIt != operations.end();) { - auto applyOpsOperations = packOperationsIntoApplyOps(opCtx, operationIt, operations.end()); + auto applyOpsOperations = packOperationsIntoApplyOps( + operationIt, operations.end(), oplogEntryCountLimit, oplogEntrySizeLimitBytes); const auto opCountWithNeedsRetryImage = std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage); if (opCountWithNeedsRetryImage > 0) { @@ -2060,7 +2087,12 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, // entries. const auto applyOpsOplogSlotAndOperationAssignment = getApplyOpsOplogSlotAndOperationAssignmentForTransaction( - opCtx, oplogSlots, false /*prepare*/, *statements); + opCtx, + oplogSlots, + /*prepare=*/false, + getMaxNumberOfTransactionOperationsInSingleOplogEntry(), + getMaxSizeOfTransactionOperationsInSingleOplogEntryBytes(), + *statements); const auto wallClockTime = getWallClockTimeForOpLog(opCtx); // Log in-progress entries for the transaction along with the implicit commit. @@ -2132,6 +2164,8 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) { opCtx, oplogSlots, false /*prepare*/, + /*oplogEntryCountLimit=*/boost::none, + /*oplogEntrySizeLimitBytes=*/boost::none, *(batchedOps->getMutableOperationsForOpObserver())); const auto wallClockTime = getWallClockTimeForOpLog(opCtx); logOplogEntries(opCtx, @@ -2182,7 +2216,12 @@ OpObserverImpl::preTransactionPrepare(OperationContext* opCtx, auto* statements = transactionOperations->getMutableOperationsForOpObserver(); auto applyOpsOplogSlotAndOperationAssignment = getApplyOpsOplogSlotAndOperationAssignmentForTransaction( - opCtx, reservedSlots, true /*prepare*/, *statements); + opCtx, + reservedSlots, + /*prepare=*/true, + getMaxNumberOfTransactionOperationsInSingleOplogEntry(), + getMaxSizeOfTransactionOperationsInSingleOplogEntryBytes(), + *statements); writeChangeStreamPreImagesForTransaction( opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime); return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>( |