summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp84
1 files changed, 80 insertions, 4 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 895845a0fde..b8a7b4f28e3 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -80,6 +80,7 @@
namespace mongo {
using repl::DurableOplogEntry;
using repl::MutableOplogEntry;
+using ChangeStreamPreImageRecordingMode = repl::ReplOperation::ChangeStreamPreImageRecordingMode;
const OperationContext::Decoration<boost::optional<repl::DocumentKey>> documentKeyDecoration =
OperationContext::declareDecoration<boost::optional<repl::DocumentKey>>();
@@ -639,6 +640,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
invariant(args.updateArgs->preImageDoc);
operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setPreImageRecordedForRetryableInternalTransaction();
if (args.retryableFindAndModifyLocation ==
RetryableFindAndModifyLocation::kSideCollection) {
operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
@@ -655,7 +657,26 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
} else if (args.updateArgs->preImageRecordingEnabledForCollection) {
invariant(args.updateArgs->preImageDoc);
+ tassert(
+ 5869402,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection);
operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kOplog);
+ }
+
+ if (args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection) {
+ invariant(args.updateArgs->preImageDoc);
+ tassert(
+ 5869403,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.updateArgs->preImageRecordingEnabledForCollection);
+ operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kPreImagesCollection);
}
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
@@ -813,17 +834,33 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
"Deleted document must be present for pre-image recording",
args.deletedDoc);
operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setPreImageRecordedForRetryableInternalTransaction();
if (args.retryableFindAndModifyLocation ==
RetryableFindAndModifyLocation::kSideCollection) {
operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
}
}
}
- if (args.preImageRecordingEnabledForCollection) {
+
+ if (args.changeStreamPreAndPostImagesEnabledForCollection) {
+ tassert(5869400,
+ "Deleted document must be present for pre-image recording",
+ args.deletedDoc);
+ tassert(
+ 5869401,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.preImageRecordingEnabledForCollection);
+ operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kPreImagesCollection);
+ } else if (args.preImageRecordingEnabledForCollection) {
tassert(5868701,
"Deleted document must be present for pre-image recording",
args.deletedDoc);
operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kOplog);
}
operation.setDestinedRecipient(destinedRecipientDecoration(opCtx));
@@ -1251,6 +1288,38 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
}
namespace {
+
+/**
+ * Writes pre-images for update/replace/delete operations packed into a single "applyOps" entry to
+ * the change stream pre-images collection if required. The operations are defined by sequence
+ * ['stmtBegin', 'stmtEnd'). 'applyOpsTimestamp' and 'operationTime' are the timestamp and the wall
+ * clock time, respectively, of the "applyOps" entry. A pre-image is recorded for an operation only
+ * if pre-images are enabled for the collection the operation is issued on.
+ */
+void writeChangeStreamPreImagesForApplyOpsEntries(
+ OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>::iterator& stmtBegin,
+ const std::vector<repl::ReplOperation>::iterator& stmtEnd,
+ Timestamp applyOpsTimestamp,
+ Date_t operationTime) {
+ int64_t applyOpsIndex{0};
+ for (auto stmtIterator = stmtBegin; stmtIterator != stmtEnd; ++stmtIterator) {
+ auto& operation = *stmtIterator;
+ if (operation.isChangeStreamPreImageRecordedInPreImagesCollection() &&
+ !operation.getNss().isTemporaryReshardingCollection()) {
+ invariant(operation.getUuid());
+ invariant(!operation.getPreImage().isEmpty());
+ writeToChangeStreamPreImagesCollection(
+ opCtx,
+ ChangeStreamPreImage{
+ ChangeStreamPreImageId{*operation.getUuid(), applyOpsTimestamp, applyOpsIndex},
+ operationTime,
+ operation.getPreImage()});
+ }
+ ++applyOpsIndex;
+ }
+}
+
// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
// field. Appends as many operations as possible to the array (and their corresponding statement
// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the
@@ -1448,8 +1517,11 @@ int logOplogEntriesForTransaction(
if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) {
for (auto& statement : *stmts) {
- if (!statement.getPreImage().isEmpty() &&
- statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage) {
+ if (statement.isChangeStreamPreImageRecordedInOplog() ||
+ (statement.isPreImageRecordedForRetryableInternalTransaction() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) {
+ invariant(!statement.getPreImage().isEmpty());
+
// Note that 'needsRetryImage' stores the image kind that needs to stored in the
// image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the
// pre-image will be written to the image collection (after all the applyOps oplog
@@ -1573,6 +1645,10 @@ int logOplogEntriesForTransaction(
prevWriteOpTime.writeOpTime.getTimestamp()};
}
+ const auto applyOpsEntryTimestamp = prevWriteOpTime.writeOpTime.getTimestamp();
+ writeChangeStreamPreImagesForApplyOpsEntries(
+ opCtx, stmtsIter, nextStmt, applyOpsEntryTimestamp, oplogEntry.getWallClockTime());
+
// Advance the iterator to the beginning of the remaining unpacked statements.
stmtsIter = nextStmt;
numEntriesWritten++;
@@ -1626,7 +1702,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
});
}
-} // namespace
+} // namespace
void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
std::vector<repl::ReplOperation>* statements,