summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp61
1 files changed, 52 insertions, 9 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 527d989203f..e0df138b418 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -42,17 +42,21 @@
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer_util.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/transaction_participant.h"
@@ -167,21 +171,39 @@ struct OpTimeBundle {
Date_t wallClockTime;
};
+void writeToImagesCollection(OperationContext* opCtx,
+ BSONObj image,
+ repl::RetryImageEnum imageKind,
+ Timestamp ts) {
+ repl::ImageEntry imageEntry;
+ invariant(opCtx->getLogicalSessionId());
+ imageEntry.set_id(*opCtx->getLogicalSessionId());
+ imageEntry.setTs(ts);
+ imageEntry.setImage(std::move(image));
+ imageEntry.setImageKind(imageKind);
+ repl::UnreplicatedWritesBlock unreplicated(opCtx);
+ AutoGetCollection imageCollectionRaii(
+ opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX);
+ Helpers::upsert(opCtx, NamespaceString::kConfigImagesNamespace.toString(), imageEntry.toBSON());
+}
+
/**
* Write oplog entry(ies) for the update operation.
*/
-OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
+OpTimeBundle replLogUpdate(OperationContext* opCtx,
+ const OplogUpdateEntryArgs& args,
+ const bool storeImagesInSideCollection) {
BSONObj storeObj;
boost::optional<repl::RetryImageEnum> needsRetryImage;
if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
invariant(args.updateArgs.preImageDoc);
storeObj = *args.updateArgs.preImageDoc;
- if (repl::gStoreFindAndModifyImagesInSideCollection.load() && opCtx->getTxnNumber()) {
+ if (storeImagesInSideCollection && opCtx->getTxnNumber()) {
needsRetryImage = repl::RetryImageEnum::kPreImage;
}
} else if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage) {
storeObj = args.updateArgs.updatedDoc;
- if (repl::gStoreFindAndModifyImagesInSideCollection.load() && opCtx->getTxnNumber()) {
+ if (storeImagesInSideCollection && opCtx->getTxnNumber()) {
needsRetryImage = repl::RetryImageEnum::kPostImage;
}
}
@@ -199,8 +221,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs&
OpTimeBundle opTimes;
opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
- if (!repl::gStoreFindAndModifyImagesInSideCollection.load() && !storeObj.isEmpty() &&
- opCtx->getTxnNumber()) {
+ if (!storeImagesInSideCollection && !storeObj.isEmpty() && opCtx->getTxnNumber()) {
auto noteUpdateOpTime = logOperation(opCtx,
"n",
args.nss,
@@ -250,7 +271,8 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
OptionalCollectionUUID uuid,
StmtId stmtId,
bool fromMigrate,
- const boost::optional<BSONObj>& deletedDoc) {
+ const boost::optional<BSONObj>& deletedDoc,
+ const bool storeImagesInSideCollection) {
OperationSessionInfo sessionInfo;
repl::OplogLink oplogLink;
@@ -266,7 +288,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
boost::optional<repl::RetryImageEnum> needsRetryImage;
if (deletedDoc && opCtx->getTxnNumber()) {
- if (repl::gStoreFindAndModifyImagesInSideCollection.load()) {
+ if (storeImagesInSideCollection) {
needsRetryImage = repl::RetryImageEnum::kPreImage;
} else {
auto noteOplog = logOperation(opCtx,
@@ -575,6 +597,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
auto txnParticipant = TransactionParticipant::get(opCtx);
const bool inMultiDocumentTransaction =
txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+ // Load only once to avoid race.
+ const bool storeImagesInSideCollection = repl::gStoreFindAndModifyImagesInSideCollection.load();
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
@@ -582,7 +606,23 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria);
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
- opTime = replLogUpdate(opCtx, args);
+ opTime = replLogUpdate(opCtx, args, storeImagesInSideCollection);
+ if (storeImagesInSideCollection && opCtx->getTxnNumber() &&
+ args.updateArgs.storeDocOption != CollectionUpdateArgs::StoreDocOption::None) {
+ BSONObj imageDoc;
+ repl::RetryImageEnum imageKind;
+ if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
+ invariant(args.updateArgs.preImageDoc);
+ imageDoc = *args.updateArgs.preImageDoc;
+ imageKind = repl::RetryImageEnum::kPreImage;
+ } else {
+ invariant(args.updateArgs.storeDocOption ==
+ CollectionUpdateArgs::StoreDocOption::PostImage);
+ imageDoc = args.updateArgs.updatedDoc;
+ imageKind = repl::RetryImageEnum::kPostImage;
+ }
+ writeToImagesCollection(opCtx, imageDoc, imageKind, opTime.writeOpTime.getTimestamp());
+ }
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
@@ -635,6 +675,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
auto txnParticipant = TransactionParticipant::get(opCtx);
const bool inMultiDocumentTransaction =
txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+ // Load only once to avoid race.
+ const bool storeImagesInSideCollection = repl::gStoreFindAndModifyImagesInSideCollection.load();
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
@@ -643,7 +685,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
} else {
boost::optional<BSONObj> deletedDoc =
args.deletedDoc ? boost::optional<BSONObj>(*(args.deletedDoc)) : boost::none;
- opTime = replLogDelete(opCtx, nss, uuid, stmtId, args.fromMigrate, deletedDoc);
+ opTime = replLogDelete(
+ opCtx, nss, uuid, stmtId, args.fromMigrate, deletedDoc, storeImagesInSideCollection);
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);