diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2020-02-10 10:14:32 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-10 13:08:24 +0000 |
commit | 6c9c0b19d5980f065e1ff2ad624bb8d18bb88fe5 (patch) | |
tree | 678fca12abb4d786006bac635c430f806bb0ab13 /src/mongo | |
parent | 43c2b5b172cf6783319944c0d6931478db01eefa (diff) | |
download | mongo-6c9c0b19d5980f065e1ff2ad624bb8d18bb88fe5.tar.gz |
SERVER-45806 Record pre-images on updates and deletes when recordPreImage is enabled
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/auth/auth_op_observer.h | 6 | ||||
-rw-r--r-- | src/mongo/db/auth/auth_op_observer_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/free_mon/free_mon_op_observer.h | 6 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 20 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 147 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.h | 6 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 267 | ||||
-rw-r--r-- | src/mongo/db/op_observer_noop.h | 8 | ||||
-rw-r--r-- | src/mongo/db/op_observer_registry.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.idl | 10 | ||||
-rw-r--r-- | src/mongo/db/s/config_server_op_observer.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.h | 8 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 3 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 21 |
18 files changed, 421 insertions, 172 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index 7287876df0b..47293e12e09 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -161,7 +161,8 @@ public: OptionalCollectionUUID uuid) final; void onUnpreparedTransactionCommit(OperationContext* opCtx, - const std::vector<repl::ReplOperation>& statements) final {} + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) final {} void onPreparedTransactionCommit( OperationContext* opCtx, @@ -171,7 +172,8 @@ public: void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) final {} + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/auth/auth_op_observer_test.cpp b/src/mongo/db/auth/auth_op_observer_test.cpp index e97db4656ef..da3c89d0541 100644 --- a/src/mongo/db/auth/auth_op_observer_test.cpp +++ b/src/mongo/db/auth/auth_op_observer_test.cpp @@ -137,9 +137,9 @@ TEST_F(AuthOpObserverTest, MultipleAboutToDeleteAndOnDelete) { AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1)); - opObserver.onDelete(opCtx.get(), nss, uuid, {}, false, {}); + opObserver.onDelete(opCtx.get(), nss, uuid, {}, false, boost::none); opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1)); - opObserver.onDelete(opCtx.get(), nss, uuid, {}, false, {}); + opObserver.onDelete(opCtx.get(), nss, uuid, {}, false, boost::none); } DEATH_TEST_F(AuthOpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") { @@ -147,7 +147,7 @@ DEATH_TEST_F(AuthOpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") auto opCtx = cc().makeOperationContext(); cc().swapLockState(std::make_unique<LockerNoop>()); NamespaceString nss = {"test", "coll"}; - opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); + opObserver.onDelete(opCtx.get(), nss, {}, {}, false, boost::none); } DEATH_TEST_F(AuthOpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") { @@ -156,8 +156,8 @@ DEATH_TEST_F(AuthOpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") cc().swapLockState(std::make_unique<LockerNoop>()); NamespaceString nss = {"test", "coll"}; opObserver.aboutToDelete(opCtx.get(), nss, {}); - opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); - opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {}); + opObserver.onDelete(opCtx.get(), nss, {}, {}, false, boost::none); + opObserver.onDelete(opCtx.get(), nss, {}, {}, false, boost::none); } } // namespace diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 3220c2b3985..21478a822e5 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -161,7 +161,8 @@ public: OptionalCollectionUUID uuid) final {} void onUnpreparedTransactionCommit(OperationContext* opCtx, - const std::vector<repl::ReplOperation>& statements) final {} + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) final {} void onPreparedTransactionCommit( OperationContext* opCtx, @@ -171,7 +172,8 @@ public: void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) final {} + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 2c98b853cd9..e75669a01cb 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -139,6 +139,10 @@ public: * "fromMigrate" indicates whether the delete was induced by a chunk migration, and * so should be ignored by the user as an internal maintenance operation and not a * real delete. + * + * "deletedDoc" is a reference to an optional copy of the pre-image of the doc before deletion. + * If deletedDoc != boost::none, then the opObserver should assume that the caller intended + * the pre-image to be stored/logged in addition to the documentKey. */ virtual void onDelete(OperationContext* opCtx, const NamespaceString& nss, @@ -291,9 +295,14 @@ public: * transaction is active. * * The 'statements' are the list of CRUD operations to be applied in this transaction. + * + * The 'numberOfPreImagesToWrite' is the number of CRUD operations that have a pre-image + * to write as a noop oplog entry. The op observer will reserve oplog slots for these + * preimages in addition to the statements. */ - virtual void onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) = 0; + virtual void onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) = 0; /** * The onPreparedTransactionCommit method is called on the commit of a prepared transaction, * after the RecoveryUnit onCommit() is called. It must not be called when no transaction is @@ -318,10 +327,15 @@ public: * last reserved slot represents the prepareOpTime used for the prepare oplog entry. * * The 'statements' are the list of CRUD operations to be applied in this transaction. + * + * The 'numberOfPreImagesToWrite' is the number of CRUD operations that have a pre-image + * to write as a noop oplog entry. The op observer will reserve oplog slots for these + * preimages in addition to the statements. */ virtual void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) = 0; + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) = 0; /** * The onTransactionAbort method is called when an atomic transaction aborts, before the diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index af7bd9a41eb..26de717a90a 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -152,45 +152,45 @@ struct OpTimeBundle { * Write oplog entry(ies) for the update operation. */ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - BSONObj storeObj; - if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { - invariant(args.updateArgs.preImageDoc); - storeObj = *args.updateArgs.preImageDoc; - } else if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage) { - storeObj = args.updateArgs.updatedDoc; - } - MutableOplogEntry oplogEntry; oplogEntry.setNss(args.nss); oplogEntry.setUuid(args.uuid); repl::OplogLink oplogLink; - repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); + repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); OpTimeBundle opTimes; - - if (!storeObj.isEmpty() && opCtx->getTxnNumber()) { + const auto storePreImageForRetryableWrite = + (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage && + opCtx->getTxnNumber()); + if (storePreImageForRetryableWrite || args.updateArgs.preImageRecordingEnabledForCollection) { MutableOplogEntry noopEntry = oplogEntry; + invariant(args.updateArgs.preImageDoc); noopEntry.setOpType(repl::OpTypeEnum::kNoop); - noopEntry.setObject(std::move(storeObj)); - auto noteUpdateOpTime = logOperation(opCtx, &noopEntry); - - opTimes.prePostImageOpTime = noteUpdateOpTime; - - if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { - oplogLink.preImageOpTime = noteUpdateOpTime; - } else if (args.updateArgs.storeDocOption == - CollectionUpdateArgs::StoreDocOption::PostImage) { - oplogLink.postImageOpTime = noteUpdateOpTime; + noopEntry.setObject(*args.updateArgs.preImageDoc); + oplogLink.preImageOpTime = logOperation(opCtx, &noopEntry); + if (storePreImageForRetryableWrite) { + opTimes.prePostImageOpTime = oplogLink.preImageOpTime; } } + // This case handles storing the post image for retryable findAndModify's. + if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage && + opCtx->getTxnNumber()) { + MutableOplogEntry noopEntry = oplogEntry; + noopEntry.setOpType(repl::OpTypeEnum::kNoop); + noopEntry.setObject(args.updateArgs.updatedDoc); + oplogLink.postImageOpTime = logOperation(opCtx, &noopEntry); + invariant(opTimes.prePostImageOpTime.isNull()); + opTimes.prePostImageOpTime = oplogLink.postImageOpTime; + } + oplogEntry.setOpType(repl::OpTypeEnum::kUpdate); oplogEntry.setObject(args.updateArgs.update); oplogEntry.setObject2(args.updateArgs.criteria); oplogEntry.setFromMigrateIfTrue(args.updateArgs.fromMigrate); // oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write. - repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); + repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId); opTimes.writeOpTime = logOperation(opCtx, &oplogEntry); opTimes.wallClockTime = oplogEntry.getWallClockTime(); return opTimes; @@ -210,14 +210,13 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, oplogEntry.setUuid(uuid); repl::OplogLink oplogLink; - repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId); + repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, stmtId); OpTimeBundle opTimes; - - if (deletedDoc && opCtx->getTxnNumber()) { + if (deletedDoc) { MutableOplogEntry noopEntry = oplogEntry; noopEntry.setOpType(repl::OpTypeEnum::kNoop); - noopEntry.setObject(deletedDoc.get()); + noopEntry.setObject(*deletedDoc); auto noteOplog = logOperation(opCtx, &noopEntry); opTimes.prePostImageOpTime = noteOplog; oplogLink.preImageOpTime = noteOplog; @@ -227,7 +226,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, oplogEntry.setObject(documentKeyDecoration(opCtx)); oplogEntry.setFromMigrateIfTrue(fromMigrate); // oplogLink could have been changed to include preImageOpTime by the previous no-op write. - repl::appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, stmtId); + repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, stmtId); opTimes.writeOpTime = logOperation(opCtx, &oplogEntry); opTimes.wallClockTime = oplogEntry.getWallClockTime(); return opTimes; @@ -500,6 +499,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg if (inMultiDocumentTransaction) { auto operation = MutableOplogEntry::makeUpdateOperation( args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria); + + if (args.updateArgs.preImageRecordingEnabledForCollection) { + invariant(args.updateArgs.preImageDoc); + operation.setPreImage(args.updateArgs.preImageDoc->getOwned()); + } + txnParticipant.addTransactionOperation(opCtx, operation); } else { opTime = replLogUpdate(opCtx, args); @@ -562,8 +567,11 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, OpTimeBundle opTime; if (inMultiDocumentTransaction) { - auto operation = MutableOplogEntry::makeDeleteOperation( - nss, uuid.get(), deletedDoc ? deletedDoc.get() : documentKey); + auto operation = MutableOplogEntry::makeDeleteOperation(nss, uuid.get(), documentKey); + if (deletedDoc) { + operation.setPreImage(deletedDoc->getOwned()); + } + txnParticipant.addTransactionOperation(opCtx, operation); } else { opTime = replLogDelete(opCtx, nss, uuid, stmtId, fromMigrate, deletedDoc); @@ -827,18 +835,17 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, } namespace { - // Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array // field. Appends as many operations as possible until either the constructed object exceeds the // 16MB limit or the maximum number of transaction statements allowed in one entry. // // Returns an iterator to the first statement that wasn't packed into the applyOps object. -std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApplyOps( +std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( BSONObjBuilder* applyOpsBuilder, - std::vector<repl::ReplOperation>::const_iterator stmtBegin, - std::vector<repl::ReplOperation>::const_iterator stmtEnd) { + std::vector<repl::ReplOperation>::iterator stmtBegin, + std::vector<repl::ReplOperation>::iterator stmtEnd) { - std::vector<repl::ReplOperation>::const_iterator stmtIter; + std::vector<repl::ReplOperation>::iterator stmtIter; BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd)); for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) { const auto& stmt = *stmtIter; @@ -933,11 +940,12 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, // // The number of oplog entries written is returned. int logOplogEntriesForTransaction(OperationContext* opCtx, - const std::vector<repl::ReplOperation>& stmts, + std::vector<repl::ReplOperation>* stmts, const std::vector<OplogSlot>& oplogSlots, + size_t numberOfPreImagesToWrite, bool prepare) { - invariant(!stmts.empty()); - invariant(stmts.size() <= oplogSlots.size()); + invariant(!stmts->empty()); + invariant(stmts->size() <= oplogSlots.size()); // Storage transaction commit is the last place inside a transaction that can throw an // exception. In order to safely allow exceptions to be thrown at that point, this function must @@ -956,21 +964,42 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); auto currOplogSlot = oplogSlots.begin(); + if (numberOfPreImagesToWrite > 0) { + for (auto& statement : *stmts) { + if (statement.getPreImage().isEmpty()) { + continue; + } + + auto slot = *currOplogSlot; + ++currOplogSlot; + + MutableOplogEntry preImageEntry; + preImageEntry.setOpType(repl::OpTypeEnum::kNoop); + preImageEntry.setObject(statement.getPreImage()); + preImageEntry.setNss(statement.getNss()); + preImageEntry.setUuid(statement.getUuid()); + preImageEntry.setOpTime(slot); + + auto opTime = logOperation(opCtx, &preImageEntry); + statement.setPreImageOpTime(opTime); + } + } + // At the beginning of each loop iteration below, 'stmtsIter' will always point to the // first statement of the sequence of remaining, unpacked transaction statements. If all // statements have been packed, it should point to stmts.end(), which is the loop's // termination condition. - auto stmtsIter = stmts.begin(); - while (stmtsIter != stmts.end()) { + auto stmtsIter = stmts->begin(); + while (stmtsIter != stmts->end()) { BSONObjBuilder applyOpsBuilder; auto nextStmt = - packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts.end()); + packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts->end()); // If we packed the last op, then the next oplog entry we log should be the implicit // commit or implicit prepare, i.e. we omit the 'partialTxn' field. - auto firstOp = stmtsIter == stmts.begin(); - auto lastOp = nextStmt == stmts.end(); + auto firstOp = stmtsIter == stmts->begin(); + auto lastOp = nextStmt == stmts->end(); auto implicitCommit = lastOp && !prepare; auto implicitPrepare = lastOp && prepare; @@ -987,7 +1016,7 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, // The 'count' field gives the total number of individual operations in the // transaction, and is included on a non-initial implicit commit or prepare entry. if (lastOp && !firstOp) { - applyOpsBuilder.append("count", static_cast<long long>(stmts.size())); + applyOpsBuilder.append("count", static_cast<long long>(stmts->size())); } // For both prepared and unprepared transactions, update the transactions table on @@ -1067,8 +1096,9 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, } // namespace -void OpObserverImpl::onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) { +void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) { invariant(opCtx->getTxnNumber()); if (!opCtx->writesAreReplicated()) { @@ -1077,14 +1107,13 @@ void OpObserverImpl::onUnpreparedTransactionCommit( // It is possible that the transaction resulted in no changes. In that case, we should // not write an empty applyOps entry. - if (statements.empty()) + if (statements->empty()) return; repl::OpTime commitOpTime; // Reserve all the optimes in advance, so we only need to get the optime mutex once. We // reserve enough entries for all statements in the transaction. - auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); - invariant(oplogSlots.size() == statements.size()); + auto oplogSlots = repl::getNextOpTimes(opCtx, statements->size() + numberOfPreImagesToWrite); if (MONGO_unlikely(hangAndFailUnpreparedCommitAfterReservingOplogSlot.shouldFail())) { hangAndFailUnpreparedCommitAfterReservingOplogSlot.pauseWhileSet(opCtx); @@ -1092,10 +1121,11 @@ void OpObserverImpl::onUnpreparedTransactionCommit( } // Log in-progress entries for the transaction along with the implicit commit. - int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false); + int numOplogEntries = logOplogEntriesForTransaction( + opCtx, statements, oplogSlots, numberOfPreImagesToWrite, false); commitOpTime = oplogSlots[numOplogEntries - 1]; invariant(!commitOpTime.isNull()); - shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime); + shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime); } void OpObserverImpl::onPreparedTransactionCommit( @@ -1123,7 +1153,8 @@ void OpObserverImpl::onPreparedTransactionCommit( void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) { + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) { invariant(!reservedSlots.empty()); const auto prepareOpTime = reservedSlots.back(); invariant(opCtx->getTxnNumber()); @@ -1136,7 +1167,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, { // We should have reserved enough slots. - invariant(reservedSlots.size() >= statements.size()); + invariant(reservedSlots.size() >= statements->size()); TransactionParticipant::SideTransactionBlock sideTxn(opCtx); writeConflictRetry( @@ -1148,14 +1179,18 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, WriteUnitOfWork wuow(opCtx); // It is possible that the transaction resulted in no changes, In that case, we // should not write any operations other than the prepare oplog entry. - if (!statements.empty()) { + if (!statements->empty()) { // We had reserved enough oplog slots for the worst case where each operation // produced one oplog entry. When operations are smaller and can be packed, we // will waste the extra slots. The implicit prepare oplog entry will still use // the last reserved slot, because the transaction participant has already used // that as the prepare time. - logOplogEntriesForTransaction( - opCtx, statements, reservedSlots, true /* prepare */); + logOplogEntriesForTransaction(opCtx, + statements, + reservedSlots, + numberOfPreImagesToWrite, + true /* prepare */); + } else { // Log an empty 'prepare' oplog entry. // We need to have at least one reserved slot. @@ -1180,7 +1215,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, }); } - shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, prepareOpTime); + shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, prepareOpTime); } void OpObserverImpl::onTransactionAbort(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index b2d09b6d58c..51d6ebd6c99 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -141,7 +141,8 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid); void onUnpreparedTransactionCommit(OperationContext* opCtx, - const std::vector<repl::ReplOperation>& statements) final; + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) final; void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, @@ -149,7 +150,8 @@ public: const std::vector<repl::ReplOperation>& statements) noexcept final; void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) final; + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) final; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final; void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) final; diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 54e856c418d..06b18144353 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -532,9 +532,9 @@ TEST_F(OpObserverTest, MultipleAboutToDeleteAndOnDelete) { AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X); WriteUnitOfWork wunit(opCtx.get()); opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1)); - opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, false, {}); + opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, false, boost::none); opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1)); - opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, false, {}); + opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, false, boost::none); } DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") { @@ -542,7 +542,7 @@ DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") { auto opCtx = cc().makeOperationContext(); cc().swapLockState(std::make_unique<LockerNoop>()); NamespaceString nss = {"test", "coll"}; - opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {}); + opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, boost::none); } DEATH_TEST_F(OpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") { @@ -551,8 +551,8 @@ DEATH_TEST_F(OpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") { cc().swapLockState(std::make_unique<LockerNoop>()); NamespaceString nss = {"test", "coll"}; opObserver.aboutToDelete(opCtx.get(), nss, {}); - opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {}); - opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, {}); + opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, boost::none); + opObserver.onDelete(opCtx.get(), nss, {}, kUninitializedStmtId, false, boost::none); } DEATH_TEST_F(OpObserverTest, @@ -742,8 +742,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { auto prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); @@ -799,8 +799,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { const auto prepareSlot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot); prepareTimestamp = prepareSlot.getTimestamp(); - opObserver().onTransactionPrepare( - opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), {prepareSlot}, &txnOps, 0); commitSlot = repl::getNextOpTime(opCtx()); } @@ -866,8 +866,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) { const auto prepareSlot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot); - opObserver().onTransactionPrepare( - opCtx(), {prepareSlot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), {prepareSlot}, &txnOps, 0); abortSlot = repl::getNextOpTime(opCtx()); } @@ -946,10 +946,8 @@ TEST_F(OpObserverTransactionTest, prepareOpTime = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), - {prepareOpTime}, - txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), {prepareOpTime}, &txnOps, 0); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); @@ -980,8 +978,8 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable) OplogSlot slot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), slot); prepareOpTime = slot; - opObserver().onTransactionPrepare( - opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp()); } @@ -1014,8 +1012,8 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0); txnParticipant.transitionToPreparedforTest(opCtx(), slot); abortSlot = repl::getNextOpTime(opCtx()); } @@ -1052,8 +1050,8 @@ TEST_F(OpObserverTransactionTest, CommittingUnpreparedNonEmptyTransactionWritesT opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false); } - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); opCtx()->getWriteUnitOfWork()->commit(); assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kCommitted); @@ -1064,8 +1062,8 @@ TEST_F(OpObserverTransactionTest, auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); txnParticipant.stashTransactionResources(opCtx()); @@ -1088,8 +1086,8 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti OplogSlot slot = repl::getNextOpTime(opCtx()); prepareOpTime = slot; opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), {slot}, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0); txnParticipant.transitionToPreparedforTest(opCtx(), slot); } @@ -1139,8 +1137,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); @@ -1202,8 +1200,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); @@ -1247,8 +1245,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { BSON("_id" << 1 << "data" << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); @@ -1292,8 +1290,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) { WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss, MODE_IX); opObserver().onInserts(opCtx(), nss, uuid, inserts.begin(), inserts.end(), false); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntryObj = getNOplogEntries(opCtx(), 1)[0]; checkSessionAndTransactionFields(oplogEntryObj); auto oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); @@ -1327,8 +1325,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -1404,8 +1402,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -1443,6 +1441,161 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject()); } +TEST_F(OpObserverMultiEntryTransactionTest, TransactionPreImageTest) { + const NamespaceString nss1("testDB", "testColl"); + auto uuid1 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "txntest"); + + CollectionUpdateArgs updateArgs1; + const auto updateSpec = BSON("$set" << BSON("data" + << "x")); + const auto updatePreImage = BSON("_id" << 0 << "data" + << "y"); + const auto updatePostImage = BSON("_id" << 0 << "data" + << "x"); + const auto updateFilter = BSON("_id" << 0); + + updateArgs1.stmtId = 0; + updateArgs1.updatedDoc = updatePostImage; + updateArgs1.update = updateSpec; + updateArgs1.preImageDoc = updatePreImage; + updateArgs1.preImageRecordingEnabledForCollection = true; + updateArgs1.criteria = updateFilter; + OplogUpdateEntryArgs update1(std::move(updateArgs1), nss1, uuid1); + + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + opObserver().onUpdate(opCtx(), update1); + + const auto deletedDoc = BSON("_id" << 1 << "data" + << "z"); + opObserver().aboutToDelete(opCtx(), nss1, deletedDoc); + opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, deletedDoc); + + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 2); + + auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); + std::vector<OplogEntry> oplogEntries; + mongo::repl::OpTime expectedPrevWriteOpTime; + for (const auto& oplogEntryObj : oplogEntryObjs) { + oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); + const auto& oplogEntry = oplogEntries.back(); + if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) { + continue; + } + checkSessionAndTransactionFields(oplogEntryObj); + ASSERT(!oplogEntry.shouldPrepare()); + ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); + expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + } + + ASSERT(oplogEntries[0].getOpType() == repl::OpTypeEnum::kNoop); + ASSERT_BSONOBJ_EQ(updatePreImage, oplogEntries[0].getObject()); + ASSERT(oplogEntries[1].getOpType() == repl::OpTypeEnum::kNoop); + ASSERT_BSONOBJ_EQ(deletedDoc, oplogEntries[1].getObject()); + ASSERT_BSONOBJ_EQ(BSON("applyOps" + << BSON_ARRAY(BSON("op" + << "u" + << "ns" << nss1.toString() << "ui" << uuid1 << "o" + << updateSpec << "o2" << BSON("_id" << 0) + << "preImageOpTime" << oplogEntries[0].getOpTime())) + << "partialTxn" << true), + oplogEntries[2].getObject()); + ASSERT_BSONOBJ_EQ(BSON("applyOps" + << BSON_ARRAY(BSON("op" + << "d" + << "ns" << nss1.toString() << "ui" << uuid1 << "o" + << BSON("_id" << 1) << "preImageOpTime" + << oplogEntries[1].getOpTime())) + << "count" << 2), + oplogEntries[3].getObject()); +} + +TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPreImageTest) { + const NamespaceString nss1("testDB", "testColl"); + auto uuid1 = CollectionUUID::gen(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.unstashTransactionResources(opCtx(), "txntest"); + + CollectionUpdateArgs updateArgs1; + const auto updateSpec = BSON("$set" << BSON("data" + << "x")); + const auto updatePreImage = BSON("_id" << 0 << "data" + << "y"); + const auto updatePostImage = BSON("_id" << 0 << "data" + << "x"); + const auto updateFilter = BSON("_id" << 0); + + updateArgs1.stmtId = 0; + updateArgs1.updatedDoc = updatePostImage; + updateArgs1.update = updateSpec; + updateArgs1.preImageDoc = updatePreImage; + updateArgs1.preImageRecordingEnabledForCollection = true; + updateArgs1.criteria = updateFilter; + OplogUpdateEntryArgs update1(std::move(updateArgs1), nss1, uuid1); + + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + opObserver().onUpdate(opCtx(), update1); + + const auto deletedDoc = BSON("_id" << 1 << "data" + << "z"); + opObserver().aboutToDelete(opCtx(), nss1, deletedDoc); + opObserver().onDelete(opCtx(), nss1, uuid1, 0, false, deletedDoc); + + repl::OpTime prepareOpTime; + { + Lock::GlobalLock lk(opCtx(), MODE_IX); + auto reservedSlots = repl::getNextOpTimes(opCtx(), 4); + prepareOpTime = reservedSlots.back(); + txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 2); + } + + auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); + std::vector<OplogEntry> oplogEntries; + mongo::repl::OpTime expectedPrevWriteOpTime; + for (const auto& oplogEntryObj : oplogEntryObjs) { + oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj))); + const auto& oplogEntry = oplogEntries.back(); + if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) { + continue; + } + checkSessionAndTransactionFields(oplogEntryObj); + ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction()); + ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp()); + expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()}; + } + + ASSERT(oplogEntries[0].getOpType() == repl::OpTypeEnum::kNoop); + ASSERT_BSONOBJ_EQ(updatePreImage, oplogEntries[0].getObject()); + ASSERT(oplogEntries[1].getOpType() == repl::OpTypeEnum::kNoop); + ASSERT_BSONOBJ_EQ(deletedDoc, oplogEntries[1].getObject()); + ASSERT_BSONOBJ_EQ(BSON("applyOps" + << BSON_ARRAY(BSON("op" + << "u" + << "ns" << nss1.toString() << "ui" << uuid1 << "o" + << updateSpec << "o2" << BSON("_id" << 0) + << "preImageOpTime" << oplogEntries[0].getOpTime())) + << "partialTxn" << true), + oplogEntries[2].getObject()); + ASSERT_BSONOBJ_EQ(BSON("applyOps" + << BSON_ARRAY(BSON("op" + << "d" + << "ns" << nss1.toString() << "ui" << uuid1 << "o" + << BSON("_id" << 1) << "preImageOpTime" + << oplogEntries[1].getOpTime())) + << "prepare" << true << "count" << 2), + oplogEntries[3].getObject()); +} + TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { const NamespaceString nss1("testDB", "testColl"); const NamespaceString nss2("testDB2", "testColl2"); @@ -1465,8 +1618,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { BSON("_id" << 1 << "data" << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, false, boost::none); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -1526,8 +1679,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); std::vector<OplogEntry> oplogEntries; @@ -1614,8 +1767,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); std::vector<OplogEntry> oplogEntries; @@ -1688,8 +1841,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); prepareOpTime = reservedSlots.back(); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); @@ -1752,8 +1905,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); @@ -1835,8 +1988,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); @@ -1906,8 +2059,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) { AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -1966,8 +2119,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPackingTest) { prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); auto oplogEntryObj = getSingleOplogEntry(opCtx()); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -2023,8 +2176,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedPackingTest) { txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - opObserver().onTransactionPrepare( - opCtx(), reservedSlots, txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); @@ -2123,8 +2276,8 @@ TEST_F(OpObserverLargeTransactionTest, LargeTransactionCreatesMultipleOplogEntri << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral))); txnParticipant.addTransactionOperation(opCtx(), operation1); txnParticipant.addTransactionOperation(opCtx(), operation2); - opObserver().onUnpreparedTransactionCommit( - opCtx(), txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); + opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index e22a3d9097b..5c5816e7c0b 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -140,8 +140,9 @@ public: void onEmptyCapped(OperationContext* opCtx, const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} - void onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override{}; + void onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override {} void onPreparedTransactionCommit( OperationContext* opCtx, OplogSlot commitOplogEntryOpTime, @@ -149,7 +150,8 @@ public: const std::vector<repl::ReplOperation>& statements) noexcept override{}; void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) override{}; + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override{}; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override{}; void onReplicationRollback(OperationContext* opCtx, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index 8da173e1c91..2470baa74cc 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -268,11 +268,12 @@ public: o->onEmptyCapped(opCtx, collectionName, uuid); } - void onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override { + void onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override { ReservedTimes times{opCtx}; for (auto& o : _observers) - o->onUnpreparedTransactionCommit(opCtx, statements); + o->onUnpreparedTransactionCommit(opCtx, statements, numberOfPreImagesToWrite); } void onPreparedTransactionCommit( @@ -288,10 +289,12 @@ public: void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) override { + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override { ReservedTimes times{opCtx}; for (auto& observer : _observers) { - observer->onTransactionPrepare(opCtx, reservedSlots, statements); + observer->onTransactionPrepare( + opCtx, reservedSlots, statements, numberOfPreImagesToWrite); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 9818c9b050f..bbd877e2a00 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -402,7 +402,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, OplogLink oplogLink; if (i > 0) oplogLink.prevOpTime = opTimes[i - 1]; - appendRetryableWriteInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtId); + appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtId); opTimes[i] = insertStatementOplogSlot; timestamps[i] = insertStatementOplogSlot.getTimestamp(); @@ -430,13 +430,20 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, return opTimes; } -void appendRetryableWriteInfo(OperationContext* opCtx, - MutableOplogEntry* oplogEntry, - OplogLink* oplogLink, - StmtId stmtId) { +void appendOplogEntryChainInfo(OperationContext* opCtx, + MutableOplogEntry* oplogEntry, + OplogLink* oplogLink, + StmtId stmtId) { + // We sometimes have a pre-image no-op entry even for normal non-retryable writes + // if recordPreImages is enabled on the collection. + if (!oplogLink->preImageOpTime.isNull()) { + oplogEntry->setPreImageOpTime(oplogLink->preImageOpTime); + } + // Not a retryable write. - if (stmtId == kUninitializedStmtId) + if (stmtId == kUninitializedStmtId) { return; + } const auto txnParticipant = TransactionParticipant::get(opCtx); invariant(txnParticipant); @@ -447,9 +454,6 @@ void appendRetryableWriteInfo(OperationContext* opCtx, oplogLink->prevOpTime = txnParticipant.getLastWriteOpTime(); } oplogEntry->setPrevWriteOpTimeInTransaction(oplogLink->prevOpTime); - if (!oplogLink->preImageOpTime.isNull()) { - oplogEntry->setPreImageOpTime(oplogLink->preImageOpTime); - } if (!oplogLink->postImageOpTime.isNull()) { oplogEntry->setPostImageOpTime(oplogLink->postImageOpTime); } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index d23f689e2b4..eaac4f4eadc 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -91,10 +91,10 @@ struct OplogLink { * Similarly, the "postImageOpTime" field will only be set if the given oplogLink.postImageOpTime is * not null. */ -void appendRetryableWriteInfo(OperationContext* opCtx, - MutableOplogEntry* oplogEntry, - OplogLink* oplogLink, - StmtId stmtId); +void appendOplogEntryChainInfo(OperationContext* opCtx, + MutableOplogEntry* oplogEntry, + OplogLink* oplogLink, + StmtId stmtId); /** * Create a new capped collection for the oplog if it doesn't yet exist. diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 263da41308b..d17c0a4c512 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -67,8 +67,17 @@ public: _preImageDocumentKey = std::move(value); } + const BSONObj& getPreImage() const { + return _fullPreImage; + } + + void setPreImage(BSONObj value) { + _fullPreImage = std::move(value); + } + private: BSONObj _preImageDocumentKey; + BSONObj _fullPreImage; }; /** @@ -139,6 +148,14 @@ public: getDurableReplOperation().setUpsert(std::move(value)); } + void setPreImageOpTime(boost::optional<OpTime> value) { + getDurableReplOperation().setPreImageOpTime(std::move(value)); + } + + const boost::optional<OpTime>& getPreImageOpTime() const { + return getDurableReplOperation().getPreImageOpTime(); + } + void setTimestamp(Timestamp value) & { getOpTimeAndWallTimeBase().setTimestamp(std::move(value)); } diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 30d269f0120..16311f2c71a 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -84,6 +84,11 @@ structs: entry of an applyOps command that was executed with alwaysUpsert true (the default). Originally added for backwards compatibility with updates from 3.6 and before." + preImageOpTime: + type: optime + optional: true + description: "The optime of another oplog entry that contains the document + before an update/remove was applied." OplogEntryBase: description: A document in which the server stores an oplog entry. @@ -122,11 +127,6 @@ structs: type: optime optional: true # Only for writes that are part of a transaction description: "The opTime of the previous write with the same transaction." - preImageOpTime: - type: optime - optional: true - description: "The optime of another oplog entry that contains the document - before an update/remove was applied." postImageOpTime: type: optime optional: true diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 38f31c6caef..d274dfef21a 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -161,8 +161,9 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} - void onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {} + void onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override {} void onPreparedTransactionCommit( OperationContext* opCtx, @@ -172,7 +173,8 @@ public: void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) override {} + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index b834fb751f7..2b16ac88150 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -161,8 +161,9 @@ public: const NamespaceString& collectionName, OptionalCollectionUUID uuid) override {} - void onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override {} + void onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override {} void onPreparedTransactionCommit( OperationContext* opCtx, @@ -172,7 +173,8 @@ public: void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) override {} + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 5225cf6fcaf..1317807cca6 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -900,6 +900,9 @@ private: // Total size in bytes of all operations within the _transactionOperations vector. size_t transactionOperationBytes{0}; + // Number of operations that have pre-images to be written to noop oplog entries. + size_t numberOfPreImagesToWrite{0}; + // The autocommit setting of this transaction. Should always be false for multi-statement // transaction. Currently only needed for diagnostics reporting. boost::optional<bool> autoCommit; diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index ac84b7dde9f..6e4d4b096e7 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -90,9 +90,11 @@ class OpObserverMock : public OpObserverNoop { public: void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) override { + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements); + OpObserverNoop::onTransactionPrepare( + opCtx, reservedSlots, statements, numberOfPreImagesToWrite); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", @@ -105,10 +107,11 @@ public: bool transactionPrepared = false; std::function<void()> onTransactionPrepareFn = [this]() { transactionPrepared = true; }; - void onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) override { + void onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onUnpreparedTransactionCommit(opCtx, statements); + OpObserverNoop::onUnpreparedTransactionCommit(opCtx, statements, numberOfPreImagesToWrite); uassert(ErrorCodes::OperationFailed, "onUnpreparedTransactionCommit() failed", diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index a364e9f8283..932ef261e88 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -100,14 +100,16 @@ class OpObserverMock : public OpObserverNoop { public: void onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) override; + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override; bool onTransactionPrepareThrowsException = false; bool transactionPrepared = false; std::function<void()> onTransactionPrepareFn = []() {}; void onUnpreparedTransactionCommit(OperationContext* opCtx, - const std::vector<repl::ReplOperation>& statements) override; + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) override; bool onUnpreparedTransactionCommitThrowsException = false; bool unpreparedTransactionCommitted = false; std::function<void(const std::vector<repl::ReplOperation>&)> onUnpreparedTransactionCommitFn = @@ -142,9 +144,11 @@ public: void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>& statements) { + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, reservedSlots, statements); + OpObserverNoop::onTransactionPrepare( + opCtx, reservedSlots, statements, numberOfPreImagesToWrite); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", @@ -153,18 +157,19 @@ void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, onTransactionPrepareFn(); } -void OpObserverMock::onUnpreparedTransactionCommit( - OperationContext* opCtx, const std::vector<repl::ReplOperation>& statements) { +void OpObserverMock::onUnpreparedTransactionCommit(OperationContext* opCtx, + std::vector<repl::ReplOperation>* statements, + size_t numberOfPreImagesToWrite) { ASSERT(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onUnpreparedTransactionCommit(opCtx, statements); + OpObserverNoop::onUnpreparedTransactionCommit(opCtx, statements, numberOfPreImagesToWrite); uassert(ErrorCodes::OperationFailed, "onUnpreparedTransactionCommit() failed", !onUnpreparedTransactionCommitThrowsException); unpreparedTransactionCommitted = true; - onUnpreparedTransactionCommitFn(statements); + onUnpreparedTransactionCommitFn(*statements); } void OpObserverMock::onPreparedTransactionCommit( |