diff options
author | Mindaugas Malinauskas <mindaugas.malinauskas@mongodb.com> | 2022-02-08 17:01:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-09 15:36:16 +0000 |
commit | 88d59bfe9d5ee2c9938ae251f7a77a8bf1250a6b (patch) | |
tree | 11f60598f603a8d18811cddf690145e266de8433 /src/mongo/db/op_observer_impl.cpp | |
parent | 2ad330e831461b8451979716faf27a34af9bb8d2 (diff) | |
download | mongo-88d59bfe9d5ee2c9938ae251f7a77a8bf1250a6b.tar.gz |
SERVER-58694 Implement writing of pre-images for transactional update/replace/delete operations
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 84 |
1 files changed, 80 insertions, 4 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 895845a0fde..b8a7b4f28e3 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -80,6 +80,7 @@ namespace mongo { using repl::DurableOplogEntry; using repl::MutableOplogEntry; +using ChangeStreamPreImageRecordingMode = repl::ReplOperation::ChangeStreamPreImageRecordingMode; const OperationContext::Decoration<boost::optional<repl::DocumentKey>> documentKeyDecoration = OperationContext::declareDecoration<boost::optional<repl::DocumentKey>>(); @@ -639,6 +640,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { invariant(args.updateArgs->preImageDoc); operation.setPreImage(args.updateArgs->preImageDoc->getOwned()); + operation.setPreImageRecordedForRetryableInternalTransaction(); if (args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kSideCollection) { operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage); @@ -655,7 +657,26 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } } else if (args.updateArgs->preImageRecordingEnabledForCollection) { invariant(args.updateArgs->preImageDoc); + tassert( + 5869402, + "Change stream pre-image recording to the oplog and to the pre-image collection " + "requested at the same time", + !args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection); operation.setPreImage(args.updateArgs->preImageDoc->getOwned()); + operation.setChangeStreamPreImageRecordingMode( + ChangeStreamPreImageRecordingMode::kOplog); + } + + if (args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection) { + invariant(args.updateArgs->preImageDoc); + tassert( + 5869403, + "Change stream pre-image recording to the oplog and to the pre-image collection " + "requested at the same time", + !args.updateArgs->preImageRecordingEnabledForCollection); + operation.setPreImage(args.updateArgs->preImageDoc->getOwned()); + operation.setChangeStreamPreImageRecordingMode( + ChangeStreamPreImageRecordingMode::kPreImagesCollection); } operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc)); @@ -813,17 +834,33 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, "Deleted document must be present for pre-image recording", args.deletedDoc); operation.setPreImage(args.deletedDoc->getOwned()); + operation.setPreImageRecordedForRetryableInternalTransaction(); if (args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kSideCollection) { operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage); } } } - if (args.preImageRecordingEnabledForCollection) { + + if (args.changeStreamPreAndPostImagesEnabledForCollection) { + tassert(5869400, + "Deleted document must be present for pre-image recording", + args.deletedDoc); + tassert( + 5869401, + "Change stream pre-image recording to the oplog and to the pre-image collection " + "requested at the same time", + !args.preImageRecordingEnabledForCollection); + operation.setPreImage(args.deletedDoc->getOwned()); + operation.setChangeStreamPreImageRecordingMode( + ChangeStreamPreImageRecordingMode::kPreImagesCollection); + } else if (args.preImageRecordingEnabledForCollection) { tassert(5868701, "Deleted document must be present for pre-image recording", args.deletedDoc); operation.setPreImage(args.deletedDoc->getOwned()); + operation.setChangeStreamPreImageRecordingMode( + ChangeStreamPreImageRecordingMode::kOplog); } operation.setDestinedRecipient(destinedRecipientDecoration(opCtx)); @@ -1251,6 +1288,38 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, } namespace { + +/** + * Writes pre-images for update/replace/delete operations packed into a single "applyOps" entry to + * the change stream pre-images collection if required. The operations are defined by sequence + * ['stmtBegin', 'stmtEnd'). 'applyOpsTimestamp' and 'operationTime' are the timestamp and the wall + * clock time, respectively, of the "applyOps" entry. A pre-image is recorded for an operation only + * if pre-images are enabled for the collection the operation is issued on. + */ +void writeChangeStreamPreImagesForApplyOpsEntries( + OperationContext* opCtx, + const std::vector<repl::ReplOperation>::iterator& stmtBegin, + const std::vector<repl::ReplOperation>::iterator& stmtEnd, + Timestamp applyOpsTimestamp, + Date_t operationTime) { + int64_t applyOpsIndex{0}; + for (auto stmtIterator = stmtBegin; stmtIterator != stmtEnd; ++stmtIterator) { + auto& operation = *stmtIterator; + if (operation.isChangeStreamPreImageRecordedInPreImagesCollection() && + !operation.getNss().isTemporaryReshardingCollection()) { + invariant(operation.getUuid()); + invariant(!operation.getPreImage().isEmpty()); + writeToChangeStreamPreImagesCollection( + opCtx, + ChangeStreamPreImage{ + ChangeStreamPreImageId{*operation.getUuid(), applyOpsTimestamp, applyOpsIndex}, + operationTime, + operation.getPreImage()}); + } + ++applyOpsIndex; + } +} + // Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array // field. Appends as many operations as possible to the array (and their corresponding statement // ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the @@ -1448,8 +1517,11 @@ int logOplogEntriesForTransaction( if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) { for (auto& statement : *stmts) { - if (!statement.getPreImage().isEmpty() && - statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage) { + if (statement.isChangeStreamPreImageRecordedInOplog() || + (statement.isPreImageRecordedForRetryableInternalTransaction() && + statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) { + invariant(!statement.getPreImage().isEmpty()); + // Note that 'needsRetryImage' stores the image kind that needs to stored in the // image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the // pre-image will be written to the image collection (after all the applyOps oplog @@ -1573,6 +1645,10 @@ int logOplogEntriesForTransaction( prevWriteOpTime.writeOpTime.getTimestamp()}; } + const auto applyOpsEntryTimestamp = prevWriteOpTime.writeOpTime.getTimestamp(); + writeChangeStreamPreImagesForApplyOpsEntries( + opCtx, stmtsIter, nextStmt, applyOpsEntryTimestamp, oplogEntry.getWallClockTime()); + // Advance the iterator to the beginning of the remaining unpacked statements. stmtsIter = nextStmt; numEntriesWritten++; @@ -1626,7 +1702,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, }); } -} // namespace +} // namespace void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, std::vector<repl::ReplOperation>* statements, |