summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp48
1 files changed, 26 insertions, 22 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index cf107fab9d3..b4b93f6baef 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/import_collection_oplog_entry_gen.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -61,7 +62,7 @@
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_write_router.h"
#include "mongo/db/server_options.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/timeseries/bucket_catalog.h"
@@ -486,8 +487,8 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<repl::OpTime> opTimeList;
repl::OpTime lastOpTime;
- auto* const css = CollectionShardingState::get(opCtx, nss);
- auto collDesc = css->getCollectionDescription(opCtx);
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
+
if (inMultiDocumentTransaction) {
// Do not add writes to the profile collection to the list of transaction operations, since
// these are done outside the transaction. There is no top-level WriteUnitOfWork when we are
@@ -499,14 +500,16 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
for (auto iter = first; iter != last; iter++) {
auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid.get(), iter->doc);
- shardAnnotateOplogEntry(opCtx, nss, iter->doc, operation, css, collDesc);
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(iter->doc));
txnParticipant.addTransactionOperation(opCtx, operation);
}
} else {
std::function<boost::optional<ShardId>(const BSONObj& doc)> getDestinedRecipientFn =
- [&](const BSONObj& doc) {
- return getDestinedRecipient(opCtx, nss, doc, css, collDesc);
+ [&shardingWriteRouter](const BSONObj& doc) {
+ return shardingWriteRouter.getReshardingDestinedRecipient(doc);
};
+
MutableOplogEntry oplogEntryTemplate;
oplogEntryTemplate.setNss(nss);
oplogEntryTemplate.setUuid(uuid);
@@ -538,8 +541,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
size_t index = 0;
for (auto it = first; it != last; it++, index++) {
auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index];
- shardObserveInsertOp(
- opCtx, nss, it->doc, opTime, css, fromMigrate, inMultiDocumentTransaction);
+ shardObserveInsertOp(opCtx,
+ nss,
+ it->doc,
+ opTime,
+ shardingWriteRouter,
+ fromMigrate,
+ inMultiDocumentTransaction);
}
if (nss.coll() == "system.js") {
@@ -594,16 +602,15 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
const bool inMultiDocumentTransaction =
txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
- auto* const css = CollectionShardingState::get(opCtx, args.nss);
- auto collDesc = css->getCollectionDescription(opCtx);
+ ShardingWriteRouter shardingWriteRouter(opCtx, args.nss, Grid::get(opCtx)->catalogCache());
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
auto operation = MutableOplogEntry::makeUpdateOperation(
args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria);
- shardAnnotateOplogEntry(
- opCtx, args.nss, args.updateArgs.updatedDoc, operation, css, collDesc);
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc));
if (args.updateArgs.preImageRecordingEnabledForCollection) {
invariant(args.updateArgs.preImageDoc);
@@ -613,12 +620,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
MutableOplogEntry oplogEntry;
- shardAnnotateOplogEntry(opCtx,
- args.nss,
- args.updateArgs.updatedDoc,
- oplogEntry.getDurableReplOperation(),
- css,
- collDesc);
+ oplogEntry.getDurableReplOperation().setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs.updatedDoc));
if (opCtx->getTxnNumber() && args.updateArgs.storeImageInSideCollection) {
// If we've stored a preImage:
@@ -665,7 +668,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
args.updateArgs.preImageDoc,
args.updateArgs.updatedDoc,
opTime.writeOpTime,
- css,
+ shardingWriteRouter,
opTime.prePostImageOpTime,
inMultiDocumentTransaction);
}
@@ -697,9 +700,10 @@ void OpObserverImpl::aboutToDelete(OperationContext* opCtx,
auto* const css = CollectionShardingState::get(opCtx, nss);
auto collDesc = css->getCollectionDescription(opCtx);
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
repl::DurableReplOperation op;
- shardAnnotateOplogEntry(opCtx, nss, doc, op, css, collDesc);
+ op.setDestinedRecipient(shardingWriteRouter.getReshardingDestinedRecipient(doc));
destinedRecipientDecoration(opCtx) = op.getDestinedRecipient();
shardObserveAboutToDelete(opCtx, nss, doc);
@@ -772,12 +776,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
if (nss != NamespaceString::kSessionTransactionsTableNamespace) {
if (!args.fromMigrate) {
- auto* const css = CollectionShardingState::get(opCtx, nss);
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
shardObserveDeleteOp(opCtx,
nss,
documentKey.getShardKeyAndId(),
opTime.writeOpTime,
- css,
+ shardingWriteRouter,
opTime.prePostImageOpTime,
inMultiDocumentTransaction);
}