summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer
diff options
context:
space:
mode:
authorChristopher Caplinger <christopher.caplinger@mongodb.com>2023-04-24 19:12:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-24 20:52:31 +0000
commit5993f36ebb8e7adc26a51262558156a5c3a9e8ed (patch)
treea9b9a2ee092f485c84d7ad9a164485ca29edffda /src/mongo/db/op_observer
parent2765240643c83cdc06711083e5c1bb8b4c4052f5 (diff)
downloadmongo-5993f36ebb8e7adc26a51262558156a5c3a9e8ed.tar.gz
SERVER-74024: Shard Merge supports change stream pre-images
Diffstat (limited to 'src/mongo/db/op_observer')
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.cpp22
1 files changed, 17 insertions, 5 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp
index 67dc07d6e6b..5cbed7b2166 100644
--- a/src/mongo/db/op_observer/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl.cpp
@@ -120,6 +120,19 @@ repl::OpTime logOperation(OperationContext* opCtx,
return opTime;
}
+void writeChangeStreamPreImageEntry(
+ OperationContext* opCtx,
+ // Skip the pre-image insert if we are in the middle of a tenant migration. Pre-image inserts
+ // for writes during the oplog catchup phase are handled in the oplog application code.
+ boost::optional<TenantId> tenantId,
+ const ChangeStreamPreImage& preImage) {
+ if (repl::tenantMigrationInfo(opCtx)) {
+ return;
+ }
+
+ ChangeStreamPreImagesCollectionManager::get(opCtx).insertPreImage(opCtx, tenantId, preImage);
+}
+
/**
* Generic function that logs an operation.
* Intended to reduce branching at call-sites by accepting the least common denominator
@@ -917,8 +930,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx,
ChangeStreamPreImageId id(args.coll->uuid(), opTime.writeOpTime.getTimestamp(), 0);
ChangeStreamPreImage preImage(id, opTime.wallClockTime, preImageDoc);
- ChangeStreamPreImagesCollectionManager::get(opCtx).insertPreImage(
- opCtx, args.coll->ns().tenantId(), preImage);
+ writeChangeStreamPreImageEntry(opCtx, args.coll->ns().tenantId(), preImage);
}
SessionTxnRecord sessionTxnRecord;
@@ -1044,6 +1056,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
oplogEntry.setOpTime(args.oplogSlots.back());
}
}
+
opTime = replLogDelete(
opCtx, nss, &oplogEntry, uuid, stmtId, args.fromMigrate, _oplogWriter.get());
if (opAccumulator) {
@@ -1077,8 +1090,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
ChangeStreamPreImageId id(uuid, opTime.writeOpTime.getTimestamp(), 0);
ChangeStreamPreImage preImage(id, opTime.wallClockTime, *args.deletedDoc);
- ChangeStreamPreImagesCollectionManager::get(opCtx).insertPreImage(
- opCtx, nss.tenantId(), preImage);
+ writeChangeStreamPreImageEntry(opCtx, nss.tenantId(), preImage);
}
SessionTxnRecord sessionTxnRecord;
@@ -1546,7 +1558,7 @@ void writeChangeStreamPreImagesForApplyOpsEntries(
invariant(operation.getUuid());
invariant(!operation.getPreImage().isEmpty());
- ChangeStreamPreImagesCollectionManager::get(opCtx).insertPreImage(
+ writeChangeStreamPreImageEntry(
opCtx,
operation.getTid(),
ChangeStreamPreImage{