diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2020-02-10 10:14:32 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-10 13:08:24 +0000 |
commit | 6c9c0b19d5980f065e1ff2ad624bb8d18bb88fe5 (patch) | |
tree | 678fca12abb4d786006bac635c430f806bb0ab13 /src/mongo/db/op_observer_impl.cpp | |
parent | 43c2b5b172cf6783319944c0d6931478db01eefa (diff) | |
download | mongo-6c9c0b19d5980f065e1ff2ad624bb8d18bb88fe5.tar.gz |
SERVER-45806 Record pre-images on updates and deletes when recordPreImage is enabled
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 147 |
1 files changed, 91 insertions, 56 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index af7bd9a41eb..26de717a90a 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -152,45 +152,45 @@ struct OpTimeBundle { * Write oplog entry(ies) for the update operation. */ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - BSONObj storeObj; - if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { - invariant(args.updateArgs.preImageDoc); - storeObj = *args.updateArgs.preImageDoc; - } else if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage) { - storeObj = args.updateArgs.updatedDoc; - } - MutableOplogEntry oplogEntry; oplogEntry.setNss(args.nss); oplogEntry.setUuid(args.uuid); repl::OplogLink oplogLink; - repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); + repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); OpTimeBundle opTimes; - - if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { + const auto storePreImageForRetryableWrite = + (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage && + opCtx->getTxnNumber()); + if (storePreImageForRetryableWrite || args.updateArgs.preImageRecordingEnabledForCollection) { MutableOplogEntry noopEntry = oplogEntry; + invariant(args.updateArgs.preImageDoc); noopEntry.setOpType(repl::OpTypeEnum::kNoop); - noopEntry.setObject(std::move(storeObj)); - auto noteUpdateOpTime = logOperation(opCtx, &noopEntry); - - opTimes.prePostImageOpTime = noteUpdateOpTime; - - if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { - oplogLink.preImageOpTime = noteUpdateOpTime; - } else if (args.updateArgs.storeDocOption == - CollectionUpdateArgs::StoreDocOption::PostImage) { - oplogLink.postImageOpTime = noteUpdateOpTime; + noopEntry.setObject(*args.updateArgs.preImageDoc); + oplogLink.preImageOpTime = logOperation(opCtx, &noopEntry); + if (storePreImageForRetryableWrite) { + opTimes.prePostImageOpTime = oplogLink.preImageOpTime; } } + // This case handles storing the post image for retryable findAndModify's. + if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage && + opCtx->getTxnNumber()) { + MutableOplogEntry noopEntry = oplogEntry; + noopEntry.setOpType(repl::OpTypeEnum::kNoop); + noopEntry.setObject(args.updateArgs.updatedDoc); + oplogLink.postImageOpTime = logOperation(opCtx, &noopEntry); + invariant(opTimes.prePostImageOpTime.isNull()); + opTimes.prePostImageOpTime = oplogLink.postImageOpTime; + } + oplogEntry.setOpType(repl::OpTypeEnum::kUpdate); oplogEntry.setObject(args.updateArgs.update); oplogEntry.setObject2(args.updateArgs.criteria); oplogEntry.setFromMigrateIfTrue(args.updateArgs.fromMigrate); // oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write. - repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); + repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); opTimes.writeOpTime = logOperation(opCtx, &oplogEntry); opTimes.wallClockTime = oplogEntry.getWallClockTime(); return opTimes; @@ -210,14 +210,13 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, oplogEntry.setUuid(uuid); repl::OplogLink oplogLink; - repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId); + repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, stmtId); OpTimeBundle opTimes; - - if (deletedDoc && opCtx->getTxnNumber()) { + if (deletedDoc) { MutableOplogEntry noopEntry = oplogEntry; noopEntry.setOpType(repl::OpTypeEnum::kNoop); - noopEntry.setObject(deletedDoc.get()); + noopEntry.setObject(*deletedDoc); auto noteOplog = logOperation(opCtx, &noopEntry); opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageOpTime = noteOplog; @@ -227,7 +226,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, oplogEntry.setObject(documentKeyDecoration(opCtx)); oplogEntry.setFromMigrateIfTrue(fromMigrate); // oplogLink could have been changed to include preImageOpTime by the previous no-op write. - repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId); + repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, stmtId); opTimes.writeOpTime = logOperation(opCtx, &oplogEntry); opTimes.wallClockTime = oplogEntry.getWallClockTime(); return opTimes; @@ -500,6 +499,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg if (inMultiDocumentTransaction) { auto operation = MutableOplogEntry::makeUpdateOperation( args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria); + + if (args.updateArgs.preImageRecordingEnabledForCollection) { + invariant(args.updateArgs.preImageDoc); + operation.setPreImage(args.updateArgs.preImageDoc->getOwned()); + } + txnParticipant.addTransactionOperation(opCtx, operation); } else { opTime = replLogUpdate(opCtx, args); @@ -562,8 +567,11 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, OpTimeBundle opTime; if (inMultiDocumentTransaction) { - auto operation = MutableOplogEntry::makeDeleteOperation( - nss, uuid.get(), deletedDoc ? deletedDoc.get() : documentKey); + auto operation = MutableOplogEntry::makeDeleteOperation(nss, uuid.get(), documentKey); + if (deletedDoc) { + operation.setPreImage(deletedDoc->getOwned()); + } + txnParticipant.addTransactionOperation(opCtx, operation); } else { opTime = replLogDelete(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc); @@ -827,18 +835,17 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, } namespace { - // Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array // field. Appends as many operations as possible until either the constructed object exceeds the // 16MB limit or the maximum number of transaction statements allowed in one entry. // // Returns an iterator to the first statement that wasn't packed into the applyOps object. -std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApplyOps( +std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( BSONObjBuilder* applyOpsBuilder, - std::vector<repl::ReplOperation>::const_iterator stmtBegin, - std::vector<repl::ReplOperation>::const_iterator stmtEnd) { + std::vector<repl::ReplOperation>::iterator stmtBegin, + std::vector<repl::ReplOperation>::iterator stmtEnd) { - std::vector<repl::ReplOperation>::const_iterator stmtIter; + std::vector<repl::ReplOperation>::iterator stmtIter; BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd)); for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) { const auto& stmt = *stmtIter; @@ -933,11 +940,12 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, // // The number of oplog entries written is returned. int logOplogEntriesForTransaction(OperationContext* opCtx, - const std::vector<repl::ReplOperation>& stmts, + std::vector<repl::ReplOperation>* stmts, const std::vector<OplogSlot>& oplogSlots, + size_t numberOfPreImagesToWrite, bool prepare) { - invariant(!stmts.empty()); - invariant(stmts.size() <= oplogSlots.size()); + invariant(!stmts->empty()); + invariant(stmts->size() <= oplogSlots.size()); // Storage transaction commit is the last place inside a transaction that can throw an // exception. In order to safely allow exceptions to be thrown at that point, this function must @@ -956,21 +964,42 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); auto currOplogSlot = oplogSlots.begin(); + if (numberOfPreImagesToWrite > 0) { + for (auto& statement : *stmts) { + if (statement.getPreImage().isEmpty()) { + continue; + } + + auto slot = *currOplogSlot; + ++currOplogSlot; + + MutableOplogEntry preImageEntry; + preImageEntry.setOpType(repl::OpTypeEnum::kNoop); + preImageEntry.setObject(statement.getPreImage()); + preImageEntry.setNss(statement.getNss()); + preImageEntry.setUuid(statement.getUuid()); + preImageEntry.setOpTime(slot); + + auto opTime = logOperation(opCtx, &preImageEntry); + statement.setPreImageOpTime(opTime); + } + } + // At the beginning of each loop iteration below, 'stmtsIter' will always point to the // first statement of the sequence of remaining, unpacked transaction statements. If all // statements have been packed, it should point to stmts.end(), which is the loop's // termination condition. - auto stmtsIter = stmts.begin(); - while (stmtsIter != stmts.end()) { + auto stmtsIter = stmts->begin(); + while (stmtsIter != stmts->end()) { BSONObjBuilder applyOpsBuilder; auto nextStmt = - packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts.end()); + packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts->end()); // If we packed the last op, then the next oplog entry we log should be the implicit // commit or implicit prepare, i.e. we omit the 'partialTxn' field. - auto firstOp = stmtsIter == stmts.begin(); - auto lastOp = nextStmt == stmts.end(); + auto firstOp = stmtsIter == stmts->begin(); + auto lastOp = nextStmt == stmts->end(); auto implicitCommit = lastOp && !prepare; auto implicitPrepare = lastOp && prepare; @@ -987,7 +1016,7 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, // The 'count' field gives the total number of individual operations in the // transaction, and is included on a non-initial implicit commit or prepare entry. if (lastOp && !firstOp) { - applyOpsBuilder.append("count", static_cast<long long>(stmts.size())); + applyOpsBuilder.append("count", static_cast<long long>(stmts->size())); } // For both prepared and unprepared transactions, update the transactions table on @@ -1067,8 +1096,9 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, } // namespace -void OpObserverImpl::onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) { +void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) { invariant(opCtx->getTxnNumber()); if (!opCtx->writesAreReplicated()) { @@ -1077,14 +1107,13 @@ void OpObserverImpl::onUnpreparedTransactionCommit( // It is possible that the transaction resulted in no changes. In that case, we should // not write an empty applyOps entry. - if (statements.empty()) + if (statements->empty()) return; repl::OpTime commitOpTime; // Reserve all the optimes in advance, so we only need to get the optime mutex once. We // reserve enough entries for all statements in the transaction. - auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); - invariant(oplogSlots.size() == statements.size()); + auto oplogSlots = repl::getNextOpTimes(opCtx, statements->size() + numberOfPreImagesToWrite); if (MONGO_unlikely(hangAndFailUnpreparedCommitAfterReservingOplogSlot.shouldFail())) { hangAndFailUnpreparedCommitAfterReservingOplogSlot.pauseWhileSet(opCtx); @@ -1092,10 +1121,11 @@ void OpObserverImpl::onUnpreparedTransactionCommit( } // Log in-progress entries for the transaction along with the implicit commit. - int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false); + int numOplogEntries = logOplogEntriesForTransaction( + opCtx, statements, oplogSlots, numberOfPreImagesToWrite, false); commitOpTime = oplogSlots[numOplogEntries - 1]; invariant(!commitOpTime.isNull()); - shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime); + shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime); } void OpObserverImpl::onPreparedTransactionCommit( @@ -1123,7 +1153,8 @@ void OpObserverImpl::onPreparedTransactionCommit( void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) { + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) { invariant(!reservedSlots.empty()); const auto prepareOpTime = reservedSlots.back(); invariant(opCtx->getTxnNumber()); @@ -1136,7 +1167,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, { // We should have reserved enough slots. - invariant(reservedSlots.size() >= statements.size()); + invariant(reservedSlots.size() >= statements->size()); TransactionParticipant::SideTransactionBlock sideTxn(opCtx); writeConflictRetry( @@ -1148,14 +1179,18 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, WriteUnitOfWork wuow(opCtx); // It is possible that the transaction resulted in no changes, In that case, we // should not write any operations other than the prepare oplog entry. - if (!statements.empty()) { + if (!statements->empty()) { // We had reserved enough oplog slots for the worst case where each operation // produced one oplog entry. When operations are smaller and can be packed, we // will waste the extra slots. The implicit prepare oplog entry will still use // the last reserved slot, because the transaction participant has already used // that as the prepare time. - logOplogEntriesForTransaction( - opCtx, statements, reservedSlots, true /* prepare */); + logOplogEntriesForTransaction(opCtx, + statements, + reservedSlots, + numberOfPreImagesToWrite, + true /* prepare */); + } else { // Log an empty 'prepare' oplog entry. // We need to have at least one reserved slot. @@ -1180,7 +1215,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, }); } - shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, prepareOpTime); + shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, prepareOpTime); } void OpObserverImpl::onTransactionAbort(OperationContext* opCtx, |