summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSergi Mateo Bellido <sergi.mateo-bellido@mongodb.com>2022-01-28 17:54:51 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-28 18:30:53 +0000
commit392ad3ef3d5a8b9a8dbd83ffe1d9a4f780e6fa61 (patch)
treef3928e1672c9652e59bb33d058626007bdf3d8a8 /src/mongo/db
parent1b18cd9b5dcb352c6fdd6b15d4bd0a40c0da35c3 (diff)
downloadmongo-392ad3ef3d5a8b9a8dbd83ffe1d9a4f780e6fa61.tar.gz
SERVER-62581 Handle direct inserts to shards introducing orphan documents
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/oplog.cpp20
1 files changed, 20 insertions, 0 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 37831e6e01a..61049e2df5d 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -67,6 +67,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/exec/write_stage_common.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/index_builds_coordinator.h"
@@ -91,6 +92,7 @@
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/repl/timestamp_block.h"
#include "mongo/db/repl/transaction_oplog_application.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/server_write_concern_metrics.h"
@@ -506,6 +508,8 @@ std::vector<OpTime> logInsertOps(
AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kLogOp);
auto oplogInfo = oplogWrite.getOplogInfo();
+ write_stage_common::PreWriteFilter preWriteFilter(opCtx, nss);
+
WriteUnitOfWork wuow(opCtx);
std::vector<OpTime> opTimes(count);
@@ -534,6 +538,22 @@ std::vector<OpTime> logInsertOps(
OplogLink oplogLink;
if (i > 0)
oplogLink.prevOpTime = opTimes[i - 1];
+
+ // Direct inserts to shards of orphan documents should not generate change stream events.
+ if (serverGlobalParams.clusterRole == ClusterRole::ShardServer &&
+ (!oplogEntry.getFromMigrate() || !*oplogEntry.getFromMigrate()) &&
+ !OperationShardingState::isOperationVersioned(opCtx) &&
+ preWriteFilter.computeAction(Document(begin[i].doc)) ==
+ write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) {
+ LOGV2_DEBUG(6258100,
+ 3,
+ "Marking insert operation of orphan document with the 'fromMigrate' flag "
+ "to prevent a wrong change stream event",
+ "namespace"_attr = nss,
+ "document"_attr = begin[i].doc);
+
+ oplogEntry.setFromMigrateIfTrue(true);
+ }
appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtIds);
opTimes[i] = insertStatementOplogSlot;