summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp23
1 files changed, 22 insertions, 1 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 8e3c1c93b7d..6b7bd4ef6a8 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/exec/write_stage_common.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/keys_collection_document_gen.h"
@@ -65,6 +66,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_write_router.h"
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -513,6 +515,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
if (inMultiDocumentTransaction) {
+ invariant(!fromMigrate);
+
// Do not add writes to the profile collection to the list of transaction operations, since
// these are done outside the transaction. There is no top-level WriteUnitOfWork when we are
// in a SideTransactionBlock.
@@ -523,6 +527,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
const bool inRetryableInternalTransaction =
isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+ write_stage_common::PreWriteFilter preWriteFilter(opCtx, nss);
for (auto iter = first; iter != last; iter++) {
const auto docKey = repl::getDocumentKey(opCtx, nss, iter->doc).getShardKeyAndId();
@@ -532,6 +537,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
}
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(iter->doc));
+
+ if (!OperationShardingState::isComingFromRouter(opCtx) &&
+ preWriteFilter.computeAction(Document(iter->doc)) ==
+ write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) {
+ LOGV2_DEBUG(6585801,
+ 3,
+ "Marking insert operation of orphan document with the 'fromMigrate' "
+ "flag to prevent a wrong change stream event",
+ "namespace"_attr = nss,
+ "document"_attr = iter->doc);
+
+ operation.setFromMigrate(true);
+ }
+
txnParticipant.addTransactionOperation(opCtx, operation);
}
} else {
@@ -697,7 +716,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
-
+ operation.setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate);
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
MutableOplogEntry oplogEntry;
@@ -839,6 +858,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
if (inBatchedWrite) {
auto operation =
MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId());
+ operation.setFromMigrateIfTrue(args.fromMigrate);
batchedWriteContext.addBatchedOperation(opCtx, operation);
} else if (inMultiDocumentTransaction) {
const bool inRetryableInternalTransaction =
@@ -897,6 +917,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
}
operation.setDestinedRecipient(destinedRecipientDecoration(opCtx));
+ operation.setFromMigrateIfTrue(args.fromMigrate);
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
MutableOplogEntry oplogEntry;