diff options
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 61 |
1 files changed, 52 insertions, 9 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 527d989203f..e0df138b418 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -42,17 +42,21 @@ #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer_util.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/server_options.h" +#include "mongo/db/session_catalog.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/transaction_participant.h" @@ -167,21 +171,39 @@ struct OpTimeBundle { Date_t wallClockTime; }; +void writeToImagesCollection(OperationContext* opCtx, + BSONObj image, + repl::RetryImageEnum imageKind, + Timestamp ts) { + repl::ImageEntry imageEntry; + invariant(opCtx->getLogicalSessionId()); + imageEntry.set_id(*opCtx->getLogicalSessionId()); + imageEntry.setTs(ts); + imageEntry.setImage(std::move(image)); + imageEntry.setImageKind(imageKind); + repl::UnreplicatedWritesBlock unreplicated(opCtx); + AutoGetCollection imageCollectionRaii( + opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX); + Helpers::upsert(opCtx, NamespaceString::kConfigImagesNamespace.toString(), imageEntry.toBSON()); +} + /** * Write oplog entry(ies) for the update operation. */ -OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { +OpTimeBundle replLogUpdate(OperationContext* opCtx, + const OplogUpdateEntryArgs& args, + const bool storeImagesInSideCollection) { BSONObj storeObj; boost::optional<repl::RetryImageEnum> needsRetryImage; if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { invariant(args.updateArgs.preImageDoc); storeObj = *args.updateArgs.preImageDoc; - if (repl::gStoreFindAndModifyImagesInSideCollection.load() && opCtx->getTxnNumber()) { + if (storeImagesInSideCollection && opCtx->getTxnNumber()) { needsRetryImage = repl::RetryImageEnum::kPreImage; } } else if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage) { storeObj = args.updateArgs.updatedDoc; - if (repl::gStoreFindAndModifyImagesInSideCollection.load() && opCtx->getTxnNumber()) { + if (storeImagesInSideCollection && opCtx->getTxnNumber()) { needsRetryImage = repl::RetryImageEnum::kPostImage; } } @@ -199,8 +221,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& OpTimeBundle opTimes; opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx); - if (!repl::gStoreFindAndModifyImagesInSideCollection.load() && !storeObj.isEmpty() && - opCtx->getTxnNumber()) { + if (!storeImagesInSideCollection && !storeObj.isEmpty() && opCtx->getTxnNumber()) { auto noteUpdateOpTime = logOperation(opCtx, "n", args.nss, @@ -250,7 +271,8 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, OptionalCollectionUUID uuid, StmtId stmtId, bool fromMigrate, - const boost::optional<BSONObj>& deletedDoc) { + const boost::optional<BSONObj>& deletedDoc, + const bool storeImagesInSideCollection) { OperationSessionInfo sessionInfo; repl::OplogLink oplogLink; @@ -266,7 +288,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, boost::optional<repl::RetryImageEnum> needsRetryImage; if (deletedDoc && opCtx->getTxnNumber()) { - if (repl::gStoreFindAndModifyImagesInSideCollection.load()) { + if (storeImagesInSideCollection) { needsRetryImage = repl::RetryImageEnum::kPreImage; } else { auto noteOplog = logOperation(opCtx, @@ -575,6 +597,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg auto txnParticipant = TransactionParticipant::get(opCtx); const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); + // Load only once to avoid race. + const bool storeImagesInSideCollection = repl::gStoreFindAndModifyImagesInSideCollection.load(); OpTimeBundle opTime; if (inMultiDocumentTransaction) { @@ -582,7 +606,23 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria); txnParticipant.addTransactionOperation(opCtx, operation); } else { - opTime = replLogUpdate(opCtx, args); + opTime = replLogUpdate(opCtx, args, storeImagesInSideCollection); + if (storeImagesInSideCollection && opCtx->getTxnNumber() && + args.updateArgs.storeDocOption != CollectionUpdateArgs::StoreDocOption::None) { + BSONObj imageDoc; + repl::RetryImageEnum imageKind; + if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { + invariant(args.updateArgs.preImageDoc); + imageDoc = *args.updateArgs.preImageDoc; + imageKind = repl::RetryImageEnum::kPreImage; + } else { + invariant(args.updateArgs.storeDocOption == + CollectionUpdateArgs::StoreDocOption::PostImage); + imageDoc = args.updateArgs.updatedDoc; + imageKind = repl::RetryImageEnum::kPostImage; + } + writeToImagesCollection(opCtx, imageDoc, imageKind, opTime.writeOpTime.getTimestamp()); + } SessionTxnRecord sessionTxnRecord; sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime); sessionTxnRecord.setLastWriteDate(opTime.wallClockTime); @@ -635,6 +675,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, auto txnParticipant = TransactionParticipant::get(opCtx); const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); + // Load only once to avoid race. + const bool storeImagesInSideCollection = repl::gStoreFindAndModifyImagesInSideCollection.load(); OpTimeBundle opTime; if (inMultiDocumentTransaction) { @@ -643,7 +685,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, } else { boost::optional<BSONObj> deletedDoc = args.deletedDoc ? boost::optional<BSONObj>(*(args.deletedDoc)) : boost::none; - opTime = replLogDelete(opCtx, nss, uuid, stmtId, args.fromMigrate, deletedDoc); + opTime = replLogDelete( + opCtx, nss, uuid, stmtId, args.fromMigrate, deletedDoc, storeImagesInSideCollection); SessionTxnRecord sessionTxnRecord; sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime); sessionTxnRecord.setLastWriteDate(opTime.wallClockTime); |