summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2022-10-13 07:32:39 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-13 12:25:48 +0000
commit1deed7e1caacf7050201f41cf3b58d1b66768c9f (patch)
treed7907a225f35dd8fc6cf39e30276dcaad1567ff5
parentbe29f323c412bbd1487c954fade83817b77beaf4 (diff)
downloadmongo-1deed7e1caacf7050201f41cf3b58d1b66768c9f.tar.gz
SERVER-69749 move multi-doc transaction settings out of packOperationsIntoApplyOps()
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.cpp89
1 files changed, 64 insertions, 25 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp
index 391de491530..c3bdd5bb3b8 100644
--- a/src/mongo/db/op_observer/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl.cpp
@@ -1525,6 +1525,36 @@ void writeChangeStreamPreImagesForApplyOpsEntries(
}
/**
+ * Returns maximum number of operations to pack into a single oplog entry,
+ * when multi-oplog format for transactions is in use.
+ *
+ * Stop packing when either number of transaction operations is reached, or when the
+ * next one would make the total size of operations larger than the maximum BSON Object
+ * User Size. We rely on the headroom between BSONObjMaxUserSize and
+ * BSONObjMaxInternalSize to cover the BSON overhead and the other "applyOps" entry
+ * fields. But if a single operation in the set exceeds BSONObjMaxUserSize, we still fit
+ * it, as a single max-length operation should be able to be packed into an "applyOps"
+ * entry.
+ */
+std::size_t getMaxNumberOfTransactionOperationsInSingleOplogEntry() {
+ tassert(6278503,
+ "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number",
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0);
+ return static_cast<std::size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry);
+}
+
+/**
+ * Returns maximum size (bytes) of operations to pack into a single oplog entry,
+ * when multi-oplog format for transactions is in use.
+ *
+ * Refer to getMaxNumberOfTransactionOperationsInSingleOplogEntry() comments for a
+ * description on packing transaction operations into "applyOps" entries.
+ */
+std::size_t getMaxSizeOfTransactionOperationsInSingleOplogEntryBytes() {
+ return static_cast<std::size_t>(BSONObjMaxUserSize);
+}
+
+/**
* Returns operations that can fit into an "applyOps" entry. The returned operations are
* serialized to BSON. The operations are given by range ['operationsBegin',
* 'operationsEnd').
@@ -1538,40 +1568,34 @@ void writeChangeStreamPreImagesForApplyOpsEntries(
* (16MB), the batched write will fail with TransactionTooLarge.
*/
std::vector<BSONObj> packOperationsIntoApplyOps(
- OperationContext* opCtx,
std::vector<repl::ReplOperation>::const_iterator operationsBegin,
- std::vector<repl::ReplOperation>::const_iterator operationsEnd) {
+ std::vector<repl::ReplOperation>::const_iterator operationsEnd,
+ boost::optional<std::size_t> oplogEntryCountLimit,
+ boost::optional<std::size_t> oplogEntrySizeLimitBytes) {
// Conservative BSON array element overhead assuming maximum 6 digit array index.
constexpr size_t kBSONArrayElementOverhead{8};
- tassert(6278503,
- "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number",
- gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0);
std::vector<BSONObj> operations;
size_t totalOperationsSize{0};
for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) {
const auto& operation = *operationIter;
- if (TransactionParticipant::get(opCtx)) {
- // Stop packing when either number of transaction operations is reached, or when the
- // next one would make the total size of operations larger than the maximum BSON Object
- // User Size. We rely on the headroom between BSONObjMaxUserSize and
- // BSONObjMaxInternalSize to cover the BSON overhead and the other "applyOps" entry
- // fields. But if a single operation in the set exceeds BSONObjMaxUserSize, we still fit
- // it, as a single max-length operation should be able to be packed into an "applyOps"
- // entry.
- if (operations.size() ==
- static_cast<size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry) ||
- (operations.size() > 0 &&
+ if (oplogEntryCountLimit) {
+ if (operations.size() == *oplogEntryCountLimit) {
+ break;
+ }
+ }
+ if (oplogEntrySizeLimitBytes) {
+ if ((operations.size() > 0 &&
(totalOperationsSize + DurableOplogEntry::getDurableReplOperationSize(operation) >
- BSONObjMaxUserSize))) {
+ *oplogEntrySizeLimitBytes))) {
break;
}
- } else {
- // This a batched write, so we don't break the batch into multiple applyOps. It is the
- // reponsibility of the caller to generate a batch that fits within a single applyOps.
- // If the batch doesn't fit within an applyOps, we throw a TransactionTooLarge later
- // on when serializing to BSON.
}
+ // If neither 'oplogEntryCountLimit' nor 'oplogEntrySizeLimitBytes' is provided,
+ // this is a batched write, so we don't break the batch into multiple applyOps. It is the
+ // responsibility of the caller to generate a batch that fits within a single applyOps.
+ // If the batch doesn't fit within an applyOps, we throw a TransactionTooLarge later
+ // on when serializing to BSON.
auto serializedOperation = operation.toBSON();
totalOperationsSize += static_cast<size_t>(serializedOperation.objsize());
@@ -1595,6 +1619,8 @@ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
OperationContext* opCtx,
const std::vector<OplogSlot>& oplogSlots,
bool prepare,
+ boost::optional<std::size_t> oplogEntryCountLimit,
+ boost::optional<std::size_t> oplogEntrySizeLimitBytes,
std::vector<repl::ReplOperation>& operations) {
if (operations.empty()) {
return {{}, 0 /*numberOfOplogSlotsUsed*/};
@@ -1614,7 +1640,8 @@ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
// Assign operations to "applyOps" entries.
for (auto operationIt = operations.begin(); operationIt != operations.end();) {
- auto applyOpsOperations = packOperationsIntoApplyOps(opCtx, operationIt, operations.end());
+ auto applyOpsOperations = packOperationsIntoApplyOps(
+ operationIt, operations.end(), oplogEntryCountLimit, oplogEntrySizeLimitBytes);
const auto opCountWithNeedsRetryImage =
std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage);
if (opCountWithNeedsRetryImage > 0) {
@@ -2060,7 +2087,12 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
// entries.
const auto applyOpsOplogSlotAndOperationAssignment =
getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
- opCtx, oplogSlots, false /*prepare*/, *statements);
+ opCtx,
+ oplogSlots,
+ /*prepare=*/false,
+ getMaxNumberOfTransactionOperationsInSingleOplogEntry(),
+ getMaxSizeOfTransactionOperationsInSingleOplogEntryBytes(),
+ *statements);
const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
// Log in-progress entries for the transaction along with the implicit commit.
@@ -2132,6 +2164,8 @@ void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) {
opCtx,
oplogSlots,
false /*prepare*/,
+ /*oplogEntryCountLimit=*/boost::none,
+ /*oplogEntrySizeLimitBytes=*/boost::none,
*(batchedOps->getMutableOperationsForOpObserver()));
const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
logOplogEntries(opCtx,
@@ -2182,7 +2216,12 @@ OpObserverImpl::preTransactionPrepare(OperationContext* opCtx,
auto* statements = transactionOperations->getMutableOperationsForOpObserver();
auto applyOpsOplogSlotAndOperationAssignment =
getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
- opCtx, reservedSlots, true /*prepare*/, *statements);
+ opCtx,
+ reservedSlots,
+ /*prepare=*/true,
+ getMaxNumberOfTransactionOperationsInSingleOplogEntry(),
+ getMaxSizeOfTransactionOperationsInSingleOplogEntryBytes(),
+ *statements);
writeChangeStreamPreImagesForTransaction(
opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime);
return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>(