diff options
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 23 |
1 files changed, 22 insertions, 1 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 8e3c1c93b7d..6b7bd4ef6a8 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -48,6 +48,7 @@ #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/exec/write_stage_common.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/keys_collection_document_gen.h" @@ -65,6 +66,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_write_router.h" #include "mongo/db/server_options.h" #include "mongo/db/session_catalog_mongod.h" @@ -513,6 +515,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache()); if (inMultiDocumentTransaction) { + invariant(!fromMigrate); + // Do not add writes to the profile collection to the list of transaction operations, since // these are done outside the transaction. There is no top-level WriteUnitOfWork when we are // in a SideTransactionBlock. @@ -523,6 +527,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, const bool inRetryableInternalTransaction = isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); + write_stage_common::PreWriteFilter preWriteFilter(opCtx, nss); for (auto iter = first; iter != last; iter++) { const auto docKey = repl::getDocumentKey(opCtx, nss, iter->doc).getShardKeyAndId(); @@ -532,6 +537,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, } operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(iter->doc)); + + if (!OperationShardingState::isComingFromRouter(opCtx) && + preWriteFilter.computeAction(Document(iter->doc)) == + write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) { + LOGV2_DEBUG(6585801, + 3, + "Marking insert operation of orphan document with the 'fromMigrate' " + "flag to prevent a wrong change stream event", + "namespace"_attr = nss, + "document"_attr = iter->doc); + + operation.setFromMigrate(true); + } + txnParticipant.addTransactionOperation(opCtx, operation); } } else { @@ -697,7 +716,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc)); - + operation.setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate); txnParticipant.addTransactionOperation(opCtx, operation); } else { MutableOplogEntry oplogEntry; @@ -839,6 +858,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, if (inBatchedWrite) { auto operation = MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId()); + operation.setFromMigrateIfTrue(args.fromMigrate); batchedWriteContext.addBatchedOperation(opCtx, operation); } else if (inMultiDocumentTransaction) { const bool inRetryableInternalTransaction = @@ -897,6 +917,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, } operation.setDestinedRecipient(destinedRecipientDecoration(opCtx)); + operation.setFromMigrateIfTrue(args.fromMigrate); txnParticipant.addTransactionOperation(opCtx, operation); } else { MutableOplogEntry oplogEntry; |