summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp147
1 files changed, 91 insertions, 56 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index af7bd9a41eb..26de717a90a 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -152,45 +152,45 @@ struct OpTimeBundle {
* Write oplog entry(ies) for the update operation.
*/
OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
- BSONObj storeObj;
- if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
- invariant(args.updateArgs.preImageDoc);
- storeObj = *args.updateArgs.preImageDoc;
- } else if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage) {
- storeObj = args.updateArgs.updatedDoc;
- }
-
MutableOplogEntry oplogEntry;
oplogEntry.setNss(args.nss);
oplogEntry.setUuid(args.uuid);
repl::OplogLink oplogLink;
- repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId);
+ repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId);
OpTimeBundle opTimes;
-
- if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
+ const auto storePreImageForRetryableWrite =
+ (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage &&
+ opCtx->getTxnNumber());
+ if (storePreImageForRetryableWrite || args.updateArgs.preImageRecordingEnabledForCollection) {
MutableOplogEntry noopEntry = oplogEntry;
+ invariant(args.updateArgs.preImageDoc);
noopEntry.setOpType(repl::OpTypeEnum::kNoop);
- noopEntry.setObject(std::move(storeObj));
- auto noteUpdateOpTime = logOperation(opCtx, &noopEntry);
-
- opTimes.prePostImageOpTime = noteUpdateOpTime;
-
- if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
- oplogLink.preImageOpTime = noteUpdateOpTime;
- } else if (args.updateArgs.storeDocOption ==
- CollectionUpdateArgs::StoreDocOption::PostImage) {
- oplogLink.postImageOpTime = noteUpdateOpTime;
+ noopEntry.setObject(*args.updateArgs.preImageDoc);
+ oplogLink.preImageOpTime = logOperation(opCtx, &noopEntry);
+ if (storePreImageForRetryableWrite) {
+ opTimes.prePostImageOpTime = oplogLink.preImageOpTime;
}
}
+ // This case handles storing the post image for retryable findAndModify's.
+ if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage &&
+ opCtx->getTxnNumber()) {
+ MutableOplogEntry noopEntry = oplogEntry;
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setObject(args.updateArgs.updatedDoc);
+ oplogLink.postImageOpTime = logOperation(opCtx, &noopEntry);
+ invariant(opTimes.prePostImageOpTime.isNull());
+ opTimes.prePostImageOpTime = oplogLink.postImageOpTime;
+ }
+
oplogEntry.setOpType(repl::OpTypeEnum::kUpdate);
oplogEntry.setObject(args.updateArgs.update);
oplogEntry.setObject2(args.updateArgs.criteria);
oplogEntry.setFromMigrateIfTrue(args.updateArgs.fromMigrate);
// oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write.
- repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId);
+ repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId);
opTimes.writeOpTime = logOperation(opCtx, &oplogEntry);
opTimes.wallClockTime = oplogEntry.getWallClockTime();
return opTimes;
@@ -210,14 +210,13 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
oplogEntry.setUuid(uuid);
repl::OplogLink oplogLink;
- repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId);
+ repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, stmtId);
OpTimeBundle opTimes;
-
- if (deletedDoc && opCtx->getTxnNumber()) {
+ if (deletedDoc) {
MutableOplogEntry noopEntry = oplogEntry;
noopEntry.setOpType(repl::OpTypeEnum::kNoop);
- noopEntry.setObject(deletedDoc.get());
+ noopEntry.setObject(*deletedDoc);
auto noteOplog = logOperation(opCtx, &noopEntry);
opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageOpTime = noteOplog;
@@ -227,7 +226,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
oplogEntry.setObject(documentKeyDecoration(opCtx));
oplogEntry.setFromMigrateIfTrue(fromMigrate);
// oplogLink could have been changed to include preImageOpTime by the previous no-op write.
- repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId);
+ repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, stmtId);
opTimes.writeOpTime = logOperation(opCtx, &oplogEntry);
opTimes.wallClockTime = oplogEntry.getWallClockTime();
return opTimes;
@@ -500,6 +499,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
if (inMultiDocumentTransaction) {
auto operation = MutableOplogEntry::makeUpdateOperation(
args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria);
+
+ if (args.updateArgs.preImageRecordingEnabledForCollection) {
+ invariant(args.updateArgs.preImageDoc);
+ operation.setPreImage(args.updateArgs.preImageDoc->getOwned());
+ }
+
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
opTime = replLogUpdate(opCtx, args);
@@ -562,8 +567,11 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
- auto operation = MutableOplogEntry::makeDeleteOperation(
- nss, uuid.get(), deletedDoc ? deletedDoc.get() : documentKey);
+ auto operation = MutableOplogEntry::makeDeleteOperation(nss, uuid.get(), documentKey);
+ if (deletedDoc) {
+ operation.setPreImage(deletedDoc->getOwned());
+ }
+
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
opTime = replLogDelete(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc);
@@ -827,18 +835,17 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
}
namespace {
-
// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
// field. Appends as many operations as possible until either the constructed object exceeds the
// 16MB limit or the maximum number of transaction statements allowed in one entry.
//
// Returns an iterator to the first statement that wasn't packed into the applyOps object.
-std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApplyOps(
+std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
BSONObjBuilder* applyOpsBuilder,
- std::vector<repl::ReplOperation>::const_iterator stmtBegin,
- std::vector<repl::ReplOperation>::const_iterator stmtEnd) {
+ std::vector<repl::ReplOperation>::iterator stmtBegin,
+ std::vector<repl::ReplOperation>::iterator stmtEnd) {
- std::vector<repl::ReplOperation>::const_iterator stmtIter;
+ std::vector<repl::ReplOperation>::iterator stmtIter;
BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd));
for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) {
const auto& stmt = *stmtIter;
@@ -933,11 +940,12 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
//
// The number of oplog entries written is returned.
int logOplogEntriesForTransaction(OperationContext* opCtx,
- const std::vector<repl::ReplOperation>& stmts,
+ std::vector<repl::ReplOperation>* stmts,
const std::vector<OplogSlot>& oplogSlots,
+ size_t numberOfPreImagesToWrite,
bool prepare) {
- invariant(!stmts.empty());
- invariant(stmts.size() <= oplogSlots.size());
+ invariant(!stmts->empty());
+ invariant(stmts->size() <= oplogSlots.size());
// Storage transaction commit is the last place inside a transaction that can throw an
// exception. In order to safely allow exceptions to be thrown at that point, this function must
@@ -956,21 +964,42 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime();
auto currOplogSlot = oplogSlots.begin();
+ if (numberOfPreImagesToWrite > 0) {
+ for (auto& statement : *stmts) {
+ if (statement.getPreImage().isEmpty()) {
+ continue;
+ }
+
+ auto slot = *currOplogSlot;
+ ++currOplogSlot;
+
+ MutableOplogEntry preImageEntry;
+ preImageEntry.setOpType(repl::OpTypeEnum::kNoop);
+ preImageEntry.setObject(statement.getPreImage());
+ preImageEntry.setNss(statement.getNss());
+ preImageEntry.setUuid(statement.getUuid());
+ preImageEntry.setOpTime(slot);
+
+ auto opTime = logOperation(opCtx, &preImageEntry);
+ statement.setPreImageOpTime(opTime);
+ }
+ }
+
// At the beginning of each loop iteration below, 'stmtsIter' will always point to the
// first statement of the sequence of remaining, unpacked transaction statements. If all
// statements have been packed, it should point to stmts.end(), which is the loop's
// termination condition.
- auto stmtsIter = stmts.begin();
- while (stmtsIter != stmts.end()) {
+ auto stmtsIter = stmts->begin();
+ while (stmtsIter != stmts->end()) {
BSONObjBuilder applyOpsBuilder;
auto nextStmt =
- packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts.end());
+ packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts->end());
// If we packed the last op, then the next oplog entry we log should be the implicit
// commit or implicit prepare, i.e. we omit the 'partialTxn' field.
- auto firstOp = stmtsIter == stmts.begin();
- auto lastOp = nextStmt == stmts.end();
+ auto firstOp = stmtsIter == stmts->begin();
+ auto lastOp = nextStmt == stmts->end();
auto implicitCommit = lastOp && !prepare;
auto implicitPrepare = lastOp && prepare;
@@ -987,7 +1016,7 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
// The 'count' field gives the total number of individual operations in the
// transaction, and is included on a non-initial implicit commit or prepare entry.
if (lastOp && !firstOp) {
- applyOpsBuilder.append("count", static_cast<long long>(stmts.size()));
+ applyOpsBuilder.append("count", static_cast<long long>(stmts->size()));
}
// For both prepared and unprepared transactions, update the transactions table on
@@ -1067,8 +1096,9 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
} // namespace
-void OpObserverImpl::onUnpreparedTransactionCommit(
- OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) {
+void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
+ std::vector<repl::ReplOperation>* statements,
+ size_t numberOfPreImagesToWrite) {
invariant(opCtx->getTxnNumber());
if (!opCtx->writesAreReplicated()) {
@@ -1077,14 +1107,13 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
// It is possible that the transaction resulted in no changes. In that case, we should
// not write an empty applyOps entry.
- if (statements.empty())
+ if (statements->empty())
return;
repl::OpTime commitOpTime;
// Reserve all the optimes in advance, so we only need to get the optime mutex once. We
// reserve enough entries for all statements in the transaction.
- auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size());
- invariant(oplogSlots.size() == statements.size());
+ auto oplogSlots = repl::getNextOpTimes(opCtx, statements->size() + numberOfPreImagesToWrite);
if (MONGO_unlikely(hangAndFailUnpreparedCommitAfterReservingOplogSlot.shouldFail())) {
hangAndFailUnpreparedCommitAfterReservingOplogSlot.pauseWhileSet(opCtx);
@@ -1092,10 +1121,11 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
}
// Log in-progress entries for the transaction along with the implicit commit.
- int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false);
+ int numOplogEntries = logOplogEntriesForTransaction(
+ opCtx, statements, oplogSlots, numberOfPreImagesToWrite, false);
commitOpTime = oplogSlots[numOplogEntries - 1];
invariant(!commitOpTime.isNull());
- shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime);
+ shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime);
}
void OpObserverImpl::onPreparedTransactionCommit(
@@ -1123,7 +1153,8 @@ void OpObserverImpl::onPreparedTransactionCommit(
void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>& statements) {
+ std::vector<repl::ReplOperation>* statements,
+ size_t numberOfPreImagesToWrite) {
invariant(!reservedSlots.empty());
const auto prepareOpTime = reservedSlots.back();
invariant(opCtx->getTxnNumber());
@@ -1136,7 +1167,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
{
// We should have reserved enough slots.
- invariant(reservedSlots.size() >= statements.size());
+ invariant(reservedSlots.size() >= statements->size());
TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
writeConflictRetry(
@@ -1148,14 +1179,18 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
WriteUnitOfWork wuow(opCtx);
// It is possible that the transaction resulted in no changes, In that case, we
// should not write any operations other than the prepare oplog entry.
- if (!statements.empty()) {
+ if (!statements->empty()) {
// We had reserved enough oplog slots for the worst case where each operation
// produced one oplog entry. When operations are smaller and can be packed, we
// will waste the extra slots. The implicit prepare oplog entry will still use
// the last reserved slot, because the transaction participant has already used
// that as the prepare time.
- logOplogEntriesForTransaction(
- opCtx, statements, reservedSlots, true /* prepare */);
+ logOplogEntriesForTransaction(opCtx,
+ statements,
+ reservedSlots,
+ numberOfPreImagesToWrite,
+ true /* prepare */);
+
} else {
// Log an empty 'prepare' oplog entry.
// We need to have at least one reserved slot.
@@ -1180,7 +1215,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
});
}
- shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, prepareOpTime);
+ shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, prepareOpTime);
}
void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,