diff options
author | Christopher Caplinger <christopher.caplinger@mongodb.com> | 2023-04-24 19:12:57 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-24 20:52:31 +0000 |
commit | 5993f36ebb8e7adc26a51262558156a5c3a9e8ed (patch) | |
tree | a9b9a2ee092f485c84d7ad9a164485ca29edffda /src/mongo/db/op_observer | |
parent | 2765240643c83cdc06711083e5c1bb8b4c4052f5 (diff) | |
download | mongo-5993f36ebb8e7adc26a51262558156a5c3a9e8ed.tar.gz |
SERVER-74024: Shard Merge supports change stream pre-images
Diffstat (limited to 'src/mongo/db/op_observer')
-rw-r--r-- | src/mongo/db/op_observer/op_observer_impl.cpp | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index 67dc07d6e6b..5cbed7b2166 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -120,6 +120,19 @@ repl::OpTime logOperation(OperationContext* opCtx, return opTime; } +void writeChangeStreamPreImageEntry( + OperationContext* opCtx, + // Skip the pre-image insert if we are in the middle of a tenant migration. Pre-image inserts + // for writes during the oplog catchup phase are handled in the oplog application code. + boost::optional<TenantId> tenantId, + const ChangeStreamPreImage& preImage) { + if (repl::tenantMigrationInfo(opCtx)) { + return; + } + + ChangeStreamPreImagesCollectionManager::get(opCtx).insertPreImage(opCtx, tenantId, preImage); +} + /** * Generic function that logs an operation. * Intended to reduce branching at call-sites by accepting the least common denominator @@ -917,8 +930,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, ChangeStreamPreImageId id(args.coll->uuid(), opTime.writeOpTime.getTimestamp(), 0); ChangeStreamPreImage preImage(id, opTime.wallClockTime, preImageDoc); - ChangeStreamPreImagesCollectionManager::get(opCtx).insertPreImage( - opCtx, args.coll->ns().tenantId(), preImage); + writeChangeStreamPreImageEntry(opCtx, args.coll->ns().tenantId(), preImage); } SessionTxnRecord sessionTxnRecord; @@ -1044,6 +1056,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, oplogEntry.setOpTime(args.oplogSlots.back()); } } + opTime = replLogDelete( opCtx, nss, &oplogEntry, uuid, stmtId, args.fromMigrate, _oplogWriter.get()); if (opAccumulator) { @@ -1077,8 +1090,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, ChangeStreamPreImageId id(uuid, opTime.writeOpTime.getTimestamp(), 0); ChangeStreamPreImage preImage(id, opTime.wallClockTime, *args.deletedDoc); - ChangeStreamPreImagesCollectionManager::get(opCtx).insertPreImage( - opCtx, nss.tenantId(), preImage); + writeChangeStreamPreImageEntry(opCtx, nss.tenantId(), preImage); } SessionTxnRecord sessionTxnRecord; @@ -1546,7 +1558,7 @@ void writeChangeStreamPreImagesForApplyOpsEntries( invariant(operation.getUuid()); invariant(!operation.getPreImage().isEmpty()); - ChangeStreamPreImagesCollectionManager::get(opCtx).insertPreImage( + writeChangeStreamPreImageEntry( opCtx, operation.getTid(), ChangeStreamPreImage{ |