diff options
author | Sergi Mateo Bellido <sergi.mateo-bellido@mongodb.com> | 2022-01-28 17:54:51 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-28 18:30:53 +0000 |
commit | 392ad3ef3d5a8b9a8dbd83ffe1d9a4f780e6fa61 (patch) | |
tree | f3928e1672c9652e59bb33d058626007bdf3d8a8 /src/mongo/db | |
parent | 1b18cd9b5dcb352c6fdd6b15d4bd0a40c0da35c3 (diff) | |
download | mongo-392ad3ef3d5a8b9a8dbd83ffe1d9a4f780e6fa61.tar.gz |
SERVER-62581 Handle direct inserts to shards introducing orphan documents
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 37831e6e01a..61049e2df5d 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -67,6 +67,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/exec/write_stage_common.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/index_builds_coordinator.h" @@ -91,6 +92,7 @@ #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/repl/timestamp_block.h" #include "mongo/db/repl/transaction_oplog_application.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/server_write_concern_metrics.h" @@ -506,6 +508,8 @@ std::vector<OpTime> logInsertOps( AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kLogOp); auto oplogInfo = oplogWrite.getOplogInfo(); + write_stage_common::PreWriteFilter preWriteFilter(opCtx, nss); + WriteUnitOfWork wuow(opCtx); std::vector<OpTime> opTimes(count); @@ -534,6 +538,22 @@ std::vector<OpTime> logInsertOps( OplogLink oplogLink; if (i > 0) oplogLink.prevOpTime = opTimes[i - 1]; + + // Direct inserts to shards of orphan documents should not generate change stream events. + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && + (!oplogEntry.getFromMigrate() || !*oplogEntry.getFromMigrate()) && + !OperationShardingState::isOperationVersioned(opCtx) && + preWriteFilter.computeAction(Document(begin[i].doc)) == + write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) { + LOGV2_DEBUG(6258100, + 3, + "Marking insert operation of orphan document with the 'fromMigrate' flag " + "to prevent a wrong change stream event", + "namespace"_attr = nss, + "document"_attr = begin[i].doc); + + oplogEntry.setFromMigrateIfTrue(true); + } appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtIds); opTimes[i] = insertStatementOplogSlot; |