diff options
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 48 |
1 files changed, 26 insertions, 22 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index cf107fab9d3..b4b93f6baef 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/import_collection_oplog_entry_gen.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -61,7 +62,7 @@ #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_write_router.h" #include "mongo/db/server_options.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/timeseries/bucket_catalog.h" @@ -486,8 +487,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<repl::OpTime> opTimeList; repl::OpTime lastOpTime; - auto* const css = CollectionShardingState::get(opCtx, nss); - auto collDesc = css->getCollectionDescription(opCtx); + ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache()); + if (inMultiDocumentTransaction) { // Do not add writes to the profile collection to the list of transaction operations, since // these are done outside the transaction. There is no top-level WriteUnitOfWork when we are @@ -499,14 +500,16 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, for (auto iter = first; iter != last; iter++) { auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid.get(), iter->doc); - shardAnnotateOplogEntry(opCtx, nss, iter->doc, operation, css, collDesc); + operation.setDestinedRecipient( + shardingWriteRouter.getReshardingDestinedRecipient(iter->doc)); txnParticipant.addTransactionOperation(opCtx, operation); } } else { std::function<boost::optional<ShardId>(const BSONObj& doc)> getDestinedRecipientFn = - [&](const BSONObj& doc) { - return getDestinedRecipient(opCtx, nss, doc, css, collDesc); + [&shardingWriteRouter](const BSONObj& doc) { + return shardingWriteRouter.getReshardingDestinedRecipient(doc); }; + MutableOplogEntry oplogEntryTemplate; oplogEntryTemplate.setNss(nss); oplogEntryTemplate.setUuid(uuid); @@ -538,8 +541,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, size_t index = 0; for (auto it = first; it != last; it++, index++) { auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index]; - shardObserveInsertOp( - opCtx, nss, it->doc, opTime, css, fromMigrate, inMultiDocumentTransaction); + shardObserveInsertOp(opCtx, + nss, + it->doc, + opTime, + shardingWriteRouter, + fromMigrate, + inMultiDocumentTransaction); } if (nss.coll() == "system.js") { @@ -594,16 +602,15 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); - auto* const css = CollectionShardingState::get(opCtx, args.nss); - auto collDesc = css->getCollectionDescription(opCtx); + ShardingWriteRouter shardingWriteRouter(opCtx, args.nss, Grid::get(opCtx)->catalogCache()); OpTimeBundle opTime; if (inMultiDocumentTransaction) { auto operation = MutableOplogEntry::makeUpdateOperation( args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria); - shardAnnotateOplogEntry( - opCtx, args.nss, args.updateArgs.updatedDoc, operation, css, collDesc); + operation.setDestinedRecipient( + shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc)); if (args.updateArgs.preImageRecordingEnabledForCollection) { invariant(args.updateArgs.preImageDoc); @@ -613,12 +620,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg txnParticipant.addTransactionOperation(opCtx, operation); } else { MutableOplogEntry oplogEntry; - shardAnnotateOplogEntry(opCtx, - args.nss, - args.updateArgs.updatedDoc, - oplogEntry.getDurableReplOperation(), - css, - collDesc); + oplogEntry.getDurableReplOperation().setDestinedRecipient( + shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc)); if (opCtx->getTxnNumber() && args.updateArgs.storeImageInSideCollection) { // If we've stored a preImage: @@ -665,7 +668,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg args.updateArgs.preImageDoc, args.updateArgs.updatedDoc, opTime.writeOpTime, - css, + shardingWriteRouter, opTime.prePostImageOpTime, inMultiDocumentTransaction); } @@ -697,9 +700,10 @@ void OpObserverImpl::aboutToDelete(OperationContext* opCtx, auto* const css = CollectionShardingState::get(opCtx, nss); auto collDesc = css->getCollectionDescription(opCtx); + ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache()); repl::DurableReplOperation op; - shardAnnotateOplogEntry(opCtx, nss, doc, op, css, collDesc); + op.setDestinedRecipient(shardingWriteRouter.getReshardingDestinedRecipient(doc)); destinedRecipientDecoration(opCtx) = op.getDestinedRecipient(); shardObserveAboutToDelete(opCtx, nss, doc); @@ -772,12 +776,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, if (nss != NamespaceString::kSessionTransactionsTableNamespace) { if (!args.fromMigrate) { - auto* const css = CollectionShardingState::get(opCtx, nss); + ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache()); shardObserveDeleteOp(opCtx, nss, documentKey.getShardKeyAndId(), opTime.writeOpTime, - css, + shardingWriteRouter, opTime.prePostImageOpTime, inMultiDocumentTransaction); } |