summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSvilen Mihaylov <svilen.mihaylov@mongodb.com>2020-04-30 14:41:39 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-07 18:01:40 +0000
commit19cf490787824b918df12c9a932501c0f6e01cda (patch)
tree97ebe844550821ccb4009ab1de98eaf45c537ec2
parenta8ad5db5bd820c8a9a851356fa394eecf2d2ea07 (diff)
downloadmongo-19cf490787824b918df12c9a932501c0f6e01cda.tar.gz
SERVER-47212: Retry full upsert path when duplicate key exception matches exact query predicate in findAndModify.
-rw-r--r--jstests/noPassthrough/upsert_duplicate_key_retry_findAndModify.js96
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp297
3 files changed, 279 insertions, 115 deletions
diff --git a/jstests/noPassthrough/upsert_duplicate_key_retry_findAndModify.js b/jstests/noPassthrough/upsert_duplicate_key_retry_findAndModify.js
new file mode 100644
index 00000000000..76a4564f0ae
--- /dev/null
+++ b/jstests/noPassthrough/upsert_duplicate_key_retry_findAndModify.js
@@ -0,0 +1,96 @@
+/**
+ * When two concurrent identical upsert operations are performed, for which a unique index exists on
+ * the query values, it is possible that they will both attempt to perform an insert with one of
+ * the two failing on the unique index constraint. This test confirms that the failed insert will be
+ * retried, resulting in an update.
+ *
+ * In order for one of the two conflicting upserts to make progress we require a storage engine
+ * which supports document level locking.
+ * @tags: [requires_replication, requires_document_locking]
+ */
+
+(function() {
+"use strict";
+
+const rst = new ReplSetTest({nodes: 1});
+rst.startSet();
+rst.initiate();
+
+const testDB = rst.getPrimary().getDB("test");
+const adminDB = testDB.getSiblingDB("admin");
+const collName = "upsert_duplicate_key_retry_findAndModify";
+const testColl = testDB.getCollection(collName);
+
+testDB.runCommand({drop: collName});
+
+// Queries current operations until 'count' matching operations are found.
+function awaitMatchingCurrentOpCount(message, count) {
+ assert.soon(() => {
+ const currentOp =
+ adminDB.aggregate([{$currentOp: {}}, {$match: {failpointMsg: message}}]).toArray();
+ return (currentOp.length === count);
+ });
+}
+
+function performUpsert() {
+ // This function is called from startParallelShell(), so closed-over variables will not be
+ // available. We must re-obtain the value of 'testColl' in the function body.
+ const testColl =
+ db.getMongo().getDB("test").getCollection("upsert_duplicate_key_retry_findAndModify");
+ testColl.findAndModify({query: {x: 3}, update: {$inc: {y: 1}}, upsert: true});
+}
+
+assert.commandWorked(testColl.createIndex({x: 1}, {unique: true}));
+
+// Will hang upsert operations just prior to performing an insert.
+assert.commandWorked(testDB.adminCommand(
+ {configureFailPoint: "hangBeforeFindAndModifyPerformsUpdate", mode: "alwaysOn"}));
+
+const awaitUpdate1 = startParallelShell(performUpsert, rst.ports[0]);
+const awaitUpdate2 = startParallelShell(performUpsert, rst.ports[0]);
+
+awaitMatchingCurrentOpCount("hangBeforeFindAndModifyPerformsUpdate", 2);
+
+assert.commandWorked(testDB.adminCommand(
+ {configureFailPoint: "hangBeforeFindAndModifyPerformsUpdate", mode: "off"}));
+
+awaitUpdate1();
+awaitUpdate2();
+
+const cursor = testColl.find({}, {_id: 0});
+assert.eq(cursor.next(), {x: 3, y: 2});
+assert(!cursor.hasNext(), cursor.toArray());
+
+// Confirm that oplog entries exist for both insert and update operation.
+const oplogColl = testDB.getSiblingDB("local").getCollection("oplog.rs");
+assert.eq(
+ 1,
+ oplogColl.find({"op": "i", "ns": "test.upsert_duplicate_key_retry_findAndModify"}).itcount());
+assert.eq(
+ 1,
+ oplogColl.find({"op": "u", "ns": "test.upsert_duplicate_key_retry_findAndModify"}).itcount());
+
+//
+// Confirm DuplicateKey error for cases that should not be retried.
+//
+assert.commandWorked(testDB.runCommand({drop: collName}));
+assert.commandWorked(testColl.createIndex({x: 1}, {unique: true}));
+
+// DuplicateKey error on replacement-style upsert, where the unique index key value to be
+// written does not match the value of the query predicate.
+assert.commandWorked(testColl.insert({_id: 1, 'a': 12345}));
+assert.throws(function() {
+ testColl.findAndModify({query: {x: 3}, update: {}, upsert: true});
+}, []);
+
+// DuplicateKey error on update-style upsert, where the unique index key value to be written
+// does not match the value of the query predicate.
+assert.commandWorked(testColl.remove({}));
+assert.commandWorked(testColl.insert({x: 3}));
+assert.commandWorked(testColl.insert({x: 4}));
+assert.throws(function() {
+ testColl.findAndModify({query: {x: 3}, update: {$inc: {x: 1}}, upsert: true});
+}, []);
+
+rst.stopSet();
+})();
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index fc737ef2bf0..c5faad444fe 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -309,6 +309,7 @@ env.Library(
'$BUILD_DIR/mongo/db/storage/storage_engine_common',
'$BUILD_DIR/mongo/db/transaction',
'$BUILD_DIR/mongo/db/views/views_mongod',
+ '$BUILD_DIR/mongo/util/log_and_backoff',
'$BUILD_DIR/mongo/util/net/http_client',
'core',
'current_op_common',
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index de384efe645..dbb33df7899 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/find_and_modify_common.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop_failpoint_helpers.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/delete.h"
#include "mongo/db/exec/update_stage.h"
@@ -68,12 +69,17 @@
#include "mongo/db/retryable_writes_stats.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/stats/top.h"
+#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/write_concern.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/log_and_backoff.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
+
+MONGO_FAIL_POINT_DEFINE(hangBeforeFindAndModifyPerformsUpdate);
+
namespace {
/**
@@ -362,155 +368,216 @@ public:
// return the document under concurrency, if a matching document exists.
return writeConflictRetry(opCtx, "findAndModify", nsString.ns(), [&] {
if (args.isRemove()) {
- auto request = DeleteRequest{};
- request.setNsString(nsString);
- const bool isExplain = false;
- makeDeleteRequest(opCtx, args, isExplain, &request);
+ return writeConflictRetryRemove(
+ opCtx, nsString, args, stmtId, curOp, opDebug, inTransaction, result);
+ } else {
+ if (MONGO_unlikely(hangBeforeFindAndModifyPerformsUpdate.shouldFail())) {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &hangBeforeFindAndModifyPerformsUpdate,
+ opCtx,
+ "hangBeforeFindAndModifyPerformsUpdate");
+ }
+
+ // Nested retry loop to handle concurrent conflicting upserts with equality match.
+ int retryAttempts = 0;
+ for (;;) {
+ auto request = UpdateRequest();
+ request.setNamespaceString(nsString);
+ const bool isExplain = false;
+ makeUpdateRequest(opCtx, args, isExplain, &request);
- if (opCtx->getTxnNumber()) {
- request.setStmtId(stmtId);
+ if (opCtx->getTxnNumber()) {
+ request.setStmtId(stmtId);
+ }
+
+ const ExtensionsCallbackReal extensionsCallback(opCtx,
+ &request.getNamespaceString());
+ ParsedUpdate parsedUpdate(opCtx, &request, extensionsCallback);
+ uassertStatusOK(parsedUpdate.parseRequest());
+
+ try {
+ return writeConflictRetryUpsert(opCtx,
+ nsString,
+ args,
+ curOp,
+ opDebug,
+ inTransaction,
+ &parsedUpdate,
+ result);
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
+ if (!parsedUpdate.hasParsedQuery()) {
+ uassertStatusOK(parsedUpdate.parseQueryToCQ());
+ }
+
+ if (!UpdateStage::shouldRetryDuplicateKeyException(
+ parsedUpdate, *ex.extraInfo<DuplicateKeyErrorInfo>())) {
+ throw;
+ }
+
+ ++retryAttempts;
+ logAndBackoff(4721200,
+ ::mongo::logv2::LogComponent::kWrite,
+ logv2::LogSeverity::Debug(1),
+ retryAttempts,
+ "Caught DuplicateKey exception during findAndModify upsert",
+ "namespace"_attr = nsString.ns());
+ }
}
+ }
- ParsedDelete parsedDelete(opCtx, &request);
- uassertStatusOK(parsedDelete.parseRequest());
+ return true;
+ });
+ }
- AutoGetCollection autoColl(opCtx, nsString, MODE_IX);
+ static bool writeConflictRetryRemove(OperationContext* opCtx,
+ const NamespaceString& nsString,
+ const FindAndModifyRequest& args,
+ const int stmtId,
+ CurOp* const curOp,
+ OpDebug* const opDebug,
+ const bool inTransaction,
+ BSONObjBuilder& result) {
+ auto request = DeleteRequest{};
+ request.setNsString(nsString);
+ const bool isExplain = false;
+ makeDeleteRequest(opCtx, args, isExplain, &request);
+
+ if (opCtx->getTxnNumber()) {
+ request.setStmtId(stmtId);
+ }
- {
- boost::optional<int> dbProfilingLevel;
- if (autoColl.getDb())
- dbProfilingLevel = autoColl.getDb()->getProfilingLevel();
+ ParsedDelete parsedDelete(opCtx, &request);
+ uassertStatusOK(parsedDelete.parseRequest());
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), dbProfilingLevel);
- }
+ AutoGetCollection autoColl(opCtx, nsString, MODE_IX);
- assertCanWrite(opCtx, nsString);
+ {
+ boost::optional<int> dbProfilingLevel;
+ if (autoColl.getDb())
+ dbProfilingLevel = autoColl.getDb()->getProfilingLevel();
- Collection* const collection = autoColl.getCollection();
- checkIfTransactionOnCappedColl(collection, inTransaction);
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), dbProfilingLevel);
+ }
- const auto exec = uassertStatusOK(getExecutorDelete(
- opDebug, collection, &parsedDelete, boost::none /* verbosity */));
+ assertCanWrite(opCtx, nsString);
- {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
- }
+ Collection* const collection = autoColl.getCollection();
+ checkIfTransactionOnCappedColl(collection, inTransaction);
- auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove());
- // Nothing after advancing the plan executor should throw a WriteConflictException,
- // so the following bookkeeping with execution stats won't end up being done
- // multiple times.
+ const auto exec = uassertStatusOK(
+ getExecutorDelete(opDebug, collection, &parsedDelete, boost::none /* verbosity */));
- PlanSummaryStats summaryStats;
- Explain::getSummaryStats(*exec, &summaryStats);
- if (collection) {
- CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, summaryStats);
- }
- opDebug->setPlanSummaryMetrics(summaryStats);
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ }
- // Fill out OpDebug with the number of deleted docs.
- opDebug->additiveMetrics.ndeleted = DeleteStage::getNumDeleted(*exec);
+ auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove());
+ // Nothing after advancing the plan executor should throw a WriteConflictException,
+ // so the following bookkeeping with execution stats won't end up being done
+ // multiple times.
- if (curOp->shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec.get(), &execStatsBob);
- curOp->debug().execStats = execStatsBob.obj();
- }
- recordStatsForTopCommand(opCtx);
+ PlanSummaryStats summaryStats;
+ Explain::getSummaryStats(*exec, &summaryStats);
+ if (collection) {
+ CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, summaryStats);
+ }
+ opDebug->setPlanSummaryMetrics(summaryStats);
- appendCommandResponse(exec.get(), args.isRemove(), docFound, &result);
- } else {
- auto request = UpdateRequest();
- request.setNamespaceString(nsString);
- const bool isExplain = false;
- makeUpdateRequest(opCtx, args, isExplain, &request);
+ // Fill out OpDebug with the number of deleted docs.
+ opDebug->additiveMetrics.ndeleted = DeleteStage::getNumDeleted(*exec);
- if (opCtx->getTxnNumber()) {
- request.setStmtId(stmtId);
- }
+ if (curOp->shouldDBProfile()) {
+ BSONObjBuilder execStatsBob;
+ Explain::getWinningPlanStats(exec.get(), &execStatsBob);
+ curOp->debug().execStats = execStatsBob.obj();
+ }
+ recordStatsForTopCommand(opCtx);
- const ExtensionsCallbackReal extensionsCallback(opCtx,
- &request.getNamespaceString());
- ParsedUpdate parsedUpdate(opCtx, &request, extensionsCallback);
- uassertStatusOK(parsedUpdate.parseRequest());
+ appendCommandResponse(exec.get(), args.isRemove(), docFound, &result);
- AutoGetCollection autoColl(opCtx, nsString, MODE_IX);
- Database* db = autoColl.ensureDbExists();
+ return true;
+ }
- {
- boost::optional<int> dbProfilingLevel;
- dbProfilingLevel = db->getProfilingLevel();
+ static bool writeConflictRetryUpsert(OperationContext* opCtx,
+ const NamespaceString& nsString,
+ const FindAndModifyRequest& args,
+ CurOp* const curOp,
+ OpDebug* const opDebug,
+ const bool inTransaction,
+ ParsedUpdate* parsedUpdate,
+ BSONObjBuilder& result) {
+ AutoGetCollection autoColl(opCtx, nsString, MODE_IX);
+ Database* db = autoColl.ensureDbExists();
+
+ {
+ boost::optional<int> dbProfilingLevel;
+ dbProfilingLevel = db->getProfilingLevel();
+
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), dbProfilingLevel);
+ }
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), dbProfilingLevel);
- }
+ assertCanWrite(opCtx, nsString);
- assertCanWrite(opCtx, nsString);
+ Collection* collection = autoColl.getCollection();
- Collection* collection = autoColl.getCollection();
+ // Create the collection if it does not exist when performing an upsert because the
+ // update stage does not create its own collection
+ if (!collection && args.isUpsert()) {
+ assertCanWrite(opCtx, nsString);
- // Create the collection if it does not exist when performing an upsert because the
- // update stage does not create its own collection
- if (!collection && args.isUpsert()) {
- assertCanWrite(opCtx, nsString);
+ collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nsString);
- collection =
- CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nsString);
+ // If someone else beat us to creating the collection, do nothing
+ if (!collection) {
+ uassertStatusOK(userAllowedCreateNS(nsString.db(), nsString.coll()));
+ WriteUnitOfWork wuow(opCtx);
+ CollectionOptions defaultCollectionOptions;
+ uassertStatusOK(db->userCreateNS(opCtx, nsString, defaultCollectionOptions));
+ wuow.commit();
- // If someone else beat us to creating the collection, do nothing
- if (!collection) {
- uassertStatusOK(userAllowedCreateNS(nsString.db(), nsString.coll()));
- WriteUnitOfWork wuow(opCtx);
- CollectionOptions defaultCollectionOptions;
- uassertStatusOK(
- db->userCreateNS(opCtx, nsString, defaultCollectionOptions));
- wuow.commit();
+ collection =
+ CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nsString);
+ }
- collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(
- opCtx, nsString);
- }
+ invariant(collection);
+ }
- invariant(collection);
- }
+ checkIfTransactionOnCappedColl(collection, inTransaction);
- checkIfTransactionOnCappedColl(collection, inTransaction);
+ const auto exec = uassertStatusOK(
+ getExecutorUpdate(opDebug, collection, parsedUpdate, boost::none /* verbosity */));
- const auto exec = uassertStatusOK(getExecutorUpdate(
- opDebug, collection, &parsedUpdate, boost::none /* verbosity */));
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ }
- {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
- }
+ auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove());
+ // Nothing after advancing the plan executor should throw a WriteConflictException,
+ // so the following bookkeeping with execution stats won't end up being done
+ // multiple times.
- auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove());
- // Nothing after advancing the plan executor should throw a WriteConflictException,
- // so the following bookkeeping with execution stats won't end up being done
- // multiple times.
+ PlanSummaryStats summaryStats;
+ Explain::getSummaryStats(*exec, &summaryStats);
+ if (collection) {
+ CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, summaryStats);
+ }
+ UpdateStage::recordUpdateStatsInOpDebug(UpdateStage::getUpdateStats(exec.get()), opDebug);
+ opDebug->setPlanSummaryMetrics(summaryStats);
- PlanSummaryStats summaryStats;
- Explain::getSummaryStats(*exec, &summaryStats);
- if (collection) {
- CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, summaryStats);
- }
- UpdateStage::recordUpdateStatsInOpDebug(UpdateStage::getUpdateStats(exec.get()),
- opDebug);
- opDebug->setPlanSummaryMetrics(summaryStats);
-
- if (curOp->shouldDBProfile()) {
- BSONObjBuilder execStatsBob;
- Explain::getWinningPlanStats(exec.get(), &execStatsBob);
- curOp->debug().execStats = execStatsBob.obj();
- }
- recordStatsForTopCommand(opCtx);
+ if (curOp->shouldDBProfile()) {
+ BSONObjBuilder execStatsBob;
+ Explain::getWinningPlanStats(exec.get(), &execStatsBob);
+ curOp->debug().execStats = execStatsBob.obj();
+ }
+ recordStatsForTopCommand(opCtx);
- appendCommandResponse(exec.get(), args.isRemove(), docFound, &result);
- }
+ appendCommandResponse(exec.get(), args.isRemove(), docFound, &result);
- return true;
- });
+ return true;
}
void appendMirrorableRequest(BSONObjBuilder* bob, const BSONObj& cmdObj) const override {