summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Shuvalov <andrew.shuvalov@mongodb.com>2021-05-21 19:05:01 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-21 19:41:53 +0000
commit7e2afe22b658e2724f0a35714c74d171868934b6 (patch)
treec3010fa0cb8b9226bdbe43e3a88751437f44fee4
parent3394ccd20ee2a77f3526c481799943c7418bff00 (diff)
downloadmongo-7e2afe22b658e2724f0a35714c74d171868934b6.tar.gz
SERVER-56374: BACKPORT-8902 Add ability to write retryable findAndModify updates to config.image_collection
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/namespace_string.cpp3
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/op_observer_impl.cpp61
-rw-r--r--src/mongo/db/ops/update.cpp13
-rw-r--r--src/mongo/db/ops/update_result.h2
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog.cpp51
8 files changed, 125 insertions, 11 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 5fc695bb2d7..0736608db0f 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -874,6 +874,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/repl/repl_server_parameters',
],
)
@@ -914,6 +915,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'transaction',
'$BUILD_DIR/mongo/db/commands/mongod_fcv',
+ '$BUILD_DIR/mongo/db/repl/image_collection_entry',
],
)
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 5e87c48e59c..bc333362e7f 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -77,6 +77,9 @@ const NamespaceString NamespaceString::kSystemReplSetNamespace(NamespaceString::
const NamespaceString NamespaceString::kIndexBuildEntryNamespace(NamespaceString::kConfigDb,
"system.indexBuilds");
+const NamespaceString NamespaceString::kConfigImagesNamespace(NamespaceString::kConfigDb,
+ "image_collection");
+
bool NamespaceString::isListCollectionsCursorNS() const {
return coll() == listCollectionsCursorCol;
}
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index f29febc2505..ae05ea7e097 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -101,6 +101,9 @@ public:
// Namespace for index build entries.
static const NamespaceString kIndexBuildEntryNamespace;
+ // Namespace used for storing retryable findAndModify images.
+ static const NamespaceString kConfigImagesNamespace;
+
/**
* Constructs an empty NamespaceString.
*/
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 527d989203f..e0df138b418 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -42,17 +42,21 @@
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer_util.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/transaction_participant.h"
@@ -167,21 +171,39 @@ struct OpTimeBundle {
Date_t wallClockTime;
};
+void writeToImagesCollection(OperationContext* opCtx,
+ BSONObj image,
+ repl::RetryImageEnum imageKind,
+ Timestamp ts) {
+ repl::ImageEntry imageEntry;
+ invariant(opCtx->getLogicalSessionId());
+ imageEntry.set_id(*opCtx->getLogicalSessionId());
+ imageEntry.setTs(ts);
+ imageEntry.setImage(std::move(image));
+ imageEntry.setImageKind(imageKind);
+ repl::UnreplicatedWritesBlock unreplicated(opCtx);
+ AutoGetCollection imageCollectionRaii(
+ opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX);
+ Helpers::upsert(opCtx, NamespaceString::kConfigImagesNamespace.toString(), imageEntry.toBSON());
+}
+
/**
* Write oplog entry(ies) for the update operation.
*/
-OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
+OpTimeBundle replLogUpdate(OperationContext* opCtx,
+ const OplogUpdateEntryArgs& args,
+ const bool storeImagesInSideCollection) {
BSONObj storeObj;
boost::optional<repl::RetryImageEnum> needsRetryImage;
if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
invariant(args.updateArgs.preImageDoc);
storeObj = *args.updateArgs.preImageDoc;
- if (repl::gStoreFindAndModifyImagesInSideCollection.load() && opCtx->getTxnNumber()) {
+ if (storeImagesInSideCollection && opCtx->getTxnNumber()) {
needsRetryImage = repl::RetryImageEnum::kPreImage;
}
} else if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage) {
storeObj = args.updateArgs.updatedDoc;
- if (repl::gStoreFindAndModifyImagesInSideCollection.load() && opCtx->getTxnNumber()) {
+ if (storeImagesInSideCollection && opCtx->getTxnNumber()) {
needsRetryImage = repl::RetryImageEnum::kPostImage;
}
}
@@ -199,8 +221,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs&
OpTimeBundle opTimes;
opTimes.wallClockTime = getWallClockTimeForOpLog(opCtx);
- if (!repl::gStoreFindAndModifyImagesInSideCollection.load() && !storeObj.isEmpty() &&
- opCtx->getTxnNumber()) {
+ if (!storeImagesInSideCollection && !storeObj.isEmpty() && opCtx->getTxnNumber()) {
auto noteUpdateOpTime = logOperation(opCtx,
"n",
args.nss,
@@ -250,7 +271,8 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
OptionalCollectionUUID uuid,
StmtId stmtId,
bool fromMigrate,
- const boost::optional<BSONObj>& deletedDoc) {
+ const boost::optional<BSONObj>& deletedDoc,
+ const bool storeImagesInSideCollection) {
OperationSessionInfo sessionInfo;
repl::OplogLink oplogLink;
@@ -266,7 +288,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
boost::optional<repl::RetryImageEnum> needsRetryImage;
if (deletedDoc && opCtx->getTxnNumber()) {
- if (repl::gStoreFindAndModifyImagesInSideCollection.load()) {
+ if (storeImagesInSideCollection) {
needsRetryImage = repl::RetryImageEnum::kPreImage;
} else {
auto noteOplog = logOperation(opCtx,
@@ -575,6 +597,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
auto txnParticipant = TransactionParticipant::get(opCtx);
const bool inMultiDocumentTransaction =
txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+ // Load only once to avoid race.
+ const bool storeImagesInSideCollection = repl::gStoreFindAndModifyImagesInSideCollection.load();
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
@@ -582,7 +606,23 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
args.nss, args.uuid, args.updateArgs.update, args.updateArgs.criteria);
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
- opTime = replLogUpdate(opCtx, args);
+ opTime = replLogUpdate(opCtx, args, storeImagesInSideCollection);
+ if (storeImagesInSideCollection && opCtx->getTxnNumber() &&
+ args.updateArgs.storeDocOption != CollectionUpdateArgs::StoreDocOption::None) {
+ BSONObj imageDoc;
+ repl::RetryImageEnum imageKind;
+ if (args.updateArgs.storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
+ invariant(args.updateArgs.preImageDoc);
+ imageDoc = *args.updateArgs.preImageDoc;
+ imageKind = repl::RetryImageEnum::kPreImage;
+ } else {
+ invariant(args.updateArgs.storeDocOption ==
+ CollectionUpdateArgs::StoreDocOption::PostImage);
+ imageDoc = args.updateArgs.updatedDoc;
+ imageKind = repl::RetryImageEnum::kPostImage;
+ }
+ writeToImagesCollection(opCtx, imageDoc, imageKind, opTime.writeOpTime.getTimestamp());
+ }
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
@@ -635,6 +675,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
auto txnParticipant = TransactionParticipant::get(opCtx);
const bool inMultiDocumentTransaction =
txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+ // Load only once to avoid race.
+ const bool storeImagesInSideCollection = repl::gStoreFindAndModifyImagesInSideCollection.load();
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
@@ -643,7 +685,8 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
} else {
boost::optional<BSONObj> deletedDoc =
args.deletedDoc ? boost::optional<BSONObj>(*(args.deletedDoc)) : boost::none;
- opTime = replLogDelete(opCtx, nss, uuid, stmtId, args.fromMigrate, deletedDoc);
+ opTime = replLogDelete(
+ opCtx, nss, uuid, stmtId, args.fromMigrate, deletedDoc, storeImagesInSideCollection);
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp
index a600f37a543..4b92274f55a 100644
--- a/src/mongo/db/ops/update.cpp
+++ b/src/mongo/db/ops/update.cpp
@@ -99,11 +99,22 @@ UpdateResult update(OperationContext* opCtx, Database* db, const UpdateRequest&
OpDebug* const nullOpDebug = nullptr;
auto exec = uassertStatusOK(getExecutorUpdate(opCtx, nullOpDebug, collection, &parsedUpdate));
+ BSONObj docImage;
+ PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
+ if (request.shouldReturnAnyDocs()) {
+ state = exec->getNext(&docImage, nullptr);
+ }
+
+ while (state == PlanExecutor::ADVANCED) {
+ state = exec->getNext(nullptr, nullptr);
+ }
uassertStatusOK(exec->executePlan());
const UpdateStats* updateStats = UpdateStage::getUpdateStats(exec.get());
- return UpdateStage::makeUpdateResult(updateStats);
+ UpdateResult result = UpdateStage::makeUpdateResult(updateStats);
+ result.requestedDocImage = docImage.getOwned();
+ return result;
}
BSONObj applyUpdateOperators(OperationContext* opCtx,
diff --git a/src/mongo/db/ops/update_result.h b/src/mongo/db/ops/update_result.h
index f50be92037d..feb3a756d4d 100644
--- a/src/mongo/db/ops/update_result.h
+++ b/src/mongo/db/ops/update_result.h
@@ -56,6 +56,8 @@ struct UpdateResult {
// if something was upserted, the new _id of the object
BSONObj upserted;
+
+ BSONObj requestedDocImage;
};
} // namespace mongo
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 639e1db964a..b7c03be5476 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -40,6 +40,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'dbcheck',
'local_oplog_info',
+ 'image_collection_entry',
'repl_coordinator_interface',
'repl_settings',
'timestamp_block',
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 04b693f8285..8e842732f86 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -72,6 +72,7 @@
#include "mongo/db/repl/apply_ops.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/dbcheck.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/local_oplog_info.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -329,6 +330,38 @@ void createIndexForApplyOps(OperationContext* opCtx,
}
}
+void writeToImageCollection(OperationContext* opCtx,
+ const BSONObj& op,
+ const BSONObj& image,
+ repl::RetryImageEnum imageKind,
+ bool* upsertConfigImage) {
+ AutoGetCollection autoColl(opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX);
+ repl::ImageEntry imageEntry;
+ LogicalSessionId sessionId =
+ LogicalSessionId::parse(IDLParserErrorContext("ParseSessionIdWhenWritingToImageCollection"),
+ op.getField(OplogEntryBase::kSessionIdFieldName).Obj());
+ imageEntry.set_id(sessionId);
+ imageEntry.setTs(op["ts"].timestamp());
+ imageEntry.setImageKind(imageKind);
+ imageEntry.setImage(image);
+
+ UpdateRequest request(NamespaceString::kConfigImagesNamespace);
+ request.setQuery(
+ BSON("_id" << imageEntry.get_id().toBSON() << "ts" << BSON("$lt" << imageEntry.getTs())));
+ request.setUpsert(*upsertConfigImage);
+ request.setUpdateModification(imageEntry.toBSON());
+ request.setFromOplogApplication(true);
+ try {
+ // This code path can also be hit by `applyOps`.
+ repl::UnreplicatedWritesBlock dontReplicate(opCtx);
+ ::mongo::update(opCtx, autoColl.getDb(), request);
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
+ // We can get a duplicate key when two upserts race on inserting a document.
+ *upsertConfigImage = false;
+ throw WriteConflictException();
+ }
+}
+
namespace {
/**
@@ -1656,6 +1689,17 @@ Status applyOperation_inlock(OperationContext* opCtx,
request.setUpdateModification(o);
request.setUpsert(upsert);
request.setFromOplogApplication(true);
+ boost::optional<repl::RetryImageEnum> imageKind;
+ if (op.hasField(OplogEntryBase::kNeedsRetryImageFieldName)) {
+ imageKind = repl::RetryImage_parse(
+ IDLParserErrorContext("applyUpdate"),
+ op.getField(OplogEntryBase::kNeedsRetryImageFieldName).String());
+ if (imageKind == repl::RetryImageEnum::kPreImage) {
+ request.setReturnDocs(UpdateRequest::ReturnDocOption::RETURN_OLD);
+ } else if (imageKind == repl::RetryImageEnum::kPostImage) {
+ request.setReturnDocs(UpdateRequest::ReturnDocOption::RETURN_NEW);
+ }
+ }
Timestamp timestamp;
if (assignOperationTimestamp) {
@@ -1663,6 +1707,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
const StringData ns = fieldNs.valueStringDataSafe();
+ bool upsertConfigImage = true;
auto status = writeConflictRetry(opCtx, "applyOps_update", ns, [&] {
WriteUnitOfWork wuow(opCtx);
if (timestamp != Timestamp::min()) {
@@ -1708,7 +1753,11 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
}
}
-
+ if (op.hasField(OplogEntryBase::kNeedsRetryImageFieldName)) {
+ invariant(imageKind);
+ writeToImageCollection(
+ opCtx, op, ur.requestedDocImage, *imageKind, &upsertConfigImage);
+ }
wuow.commit();
return Status::OK();
});