diff options
author | Svilen Mihaylov <svilen.mihaylov@mongodb.com> | 2020-04-30 14:41:39 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-07 18:01:40 +0000 |
commit | 19cf490787824b918df12c9a932501c0f6e01cda (patch) | |
tree | 97ebe844550821ccb4009ab1de98eaf45c537ec2 | |
parent | a8ad5db5bd820c8a9a851356fa394eecf2d2ea07 (diff) | |
download | mongo-19cf490787824b918df12c9a932501c0f6e01cda.tar.gz |
SERVER-47212: Retry full upsert path when duplicate key exception matches exact query predicate in findAndModify.
-rw-r--r-- | jstests/noPassthrough/upsert_duplicate_key_retry_findAndModify.js | 96 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 297 |
3 files changed, 279 insertions, 115 deletions
diff --git a/jstests/noPassthrough/upsert_duplicate_key_retry_findAndModify.js b/jstests/noPassthrough/upsert_duplicate_key_retry_findAndModify.js new file mode 100644 index 00000000000..76a4564f0ae --- /dev/null +++ b/jstests/noPassthrough/upsert_duplicate_key_retry_findAndModify.js @@ -0,0 +1,96 @@ +/** + * When two concurrent identical upsert operations are performed, for which a unique index exists on + * the query values, it is possible that they will both attempt to perform an insert with one of + * the two failing on the unique index constraint. This test confirms that the failed insert will be + * retried, resulting in an update. + * + * In order for one of the two conflicting upserts to make progress we require a storage engine + * which supports document level locking. + * @tags: [requires_replication, requires_document_locking] + */ + +(function() { +"use strict"; + +const rst = new ReplSetTest({nodes: 1}); +rst.startSet(); +rst.initiate(); + +const testDB = rst.getPrimary().getDB("test"); +const adminDB = testDB.getSiblingDB("admin"); +const collName = "upsert_duplicate_key_retry_findAndModify"; +const testColl = testDB.getCollection(collName); + +testDB.runCommand({drop: collName}); + +// Queries current operations until 'count' matching operations are found. +function awaitMatchingCurrentOpCount(message, count) { + assert.soon(() => { + const currentOp = + adminDB.aggregate([{$currentOp: {}}, {$match: {failpointMsg: message}}]).toArray(); + return (currentOp.length === count); + }); +} + +function performUpsert() { + // This function is called from startParallelShell(), so closed-over variables will not be + // available. We must re-obtain the value of 'testColl' in the function body. + const testColl = + db.getMongo().getDB("test").getCollection("upsert_duplicate_key_retry_findAndModify"); + testColl.findAndModify({query: {x: 3}, update: {$inc: {y: 1}}, upsert: true}); +} + +assert.commandWorked(testColl.createIndex({x: 1}, {unique: true})); + +// Will hang upsert operations just prior to performing an insert. +assert.commandWorked(testDB.adminCommand( + {configureFailPoint: "hangBeforeFindAndModifyPerformsUpdate", mode: "alwaysOn"})); + +const awaitUpdate1 = startParallelShell(performUpsert, rst.ports[0]); +const awaitUpdate2 = startParallelShell(performUpsert, rst.ports[0]); + +awaitMatchingCurrentOpCount("hangBeforeFindAndModifyPerformsUpdate", 2); + +assert.commandWorked(testDB.adminCommand( + {configureFailPoint: "hangBeforeFindAndModifyPerformsUpdate", mode: "off"})); + +awaitUpdate1(); +awaitUpdate2(); + +const cursor = testColl.find({}, {_id: 0}); +assert.eq(cursor.next(), {x: 3, y: 2}); +assert(!cursor.hasNext(), cursor.toArray()); + +// Confirm that oplog entries exist for both insert and update operation. +const oplogColl = testDB.getSiblingDB("local").getCollection("oplog.rs"); +assert.eq( + 1, + oplogColl.find({"op": "i", "ns": "test.upsert_duplicate_key_retry_findAndModify"}).itcount()); +assert.eq( + 1, + oplogColl.find({"op": "u", "ns": "test.upsert_duplicate_key_retry_findAndModify"}).itcount()); + +// +// Confirm DuplicateKey error for cases that should not be retried. +// +assert.commandWorked(testDB.runCommand({drop: collName})); +assert.commandWorked(testColl.createIndex({x: 1}, {unique: true})); + +// DuplicateKey error on replacement-style upsert, where the unique index key value to be +// written does not match the value of the query predicate. +assert.commandWorked(testColl.insert({_id: 1, 'a': 12345})); +assert.throws(function() { + testColl.findAndModify({query: {x: 3}, update: {}, upsert: true}); +}, []); + +// DuplicateKey error on update-style upsert, where the unique index key value to be written +// does not match the value of the query predicate. +assert.commandWorked(testColl.remove({})); +assert.commandWorked(testColl.insert({x: 3})); +assert.commandWorked(testColl.insert({x: 4})); +assert.throws(function() { + testColl.findAndModify({query: {x: 3}, update: {$inc: {x: 1}}, upsert: true}); +}, []); + +rst.stopSet(); +})(); diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index fc737ef2bf0..c5faad444fe 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -309,6 +309,7 @@ env.Library( '$BUILD_DIR/mongo/db/storage/storage_engine_common', '$BUILD_DIR/mongo/db/transaction', '$BUILD_DIR/mongo/db/views/views_mongod', + '$BUILD_DIR/mongo/util/log_and_backoff', '$BUILD_DIR/mongo/util/net/http_client', 'core', 'current_op_common', diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index de384efe645..dbb33df7899 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -42,6 +42,7 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/find_and_modify_common.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/delete.h" #include "mongo/db/exec/update_stage.h" @@ -68,12 +69,17 @@ #include "mongo/db/retryable_writes_stats.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/stats/top.h" +#include "mongo/db/storage/duplicate_key_error_info.h" #include "mongo/db/transaction_participant.h" #include "mongo/db/write_concern.h" #include "mongo/logv2/log.h" +#include "mongo/util/log_and_backoff.h" #include "mongo/util/scopeguard.h" namespace mongo { + +MONGO_FAIL_POINT_DEFINE(hangBeforeFindAndModifyPerformsUpdate); + namespace { /** @@ -362,155 +368,216 @@ public: // return the document under concurrency, if a matching document exists. return writeConflictRetry(opCtx, "findAndModify", nsString.ns(), [&] { if (args.isRemove()) { - auto request = DeleteRequest{}; - request.setNsString(nsString); - const bool isExplain = false; - makeDeleteRequest(opCtx, args, isExplain, &request); + return writeConflictRetryRemove( + opCtx, nsString, args, stmtId, curOp, opDebug, inTransaction, result); + } else { + if (MONGO_unlikely(hangBeforeFindAndModifyPerformsUpdate.shouldFail())) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &hangBeforeFindAndModifyPerformsUpdate, + opCtx, + "hangBeforeFindAndModifyPerformsUpdate"); + } + + // Nested retry loop to handle concurrent conflicting upserts with equality match. + int retryAttempts = 0; + for (;;) { + auto request = UpdateRequest(); + request.setNamespaceString(nsString); + const bool isExplain = false; + makeUpdateRequest(opCtx, args, isExplain, &request); - if (opCtx->getTxnNumber()) { - request.setStmtId(stmtId); + if (opCtx->getTxnNumber()) { + request.setStmtId(stmtId); + } + + const ExtensionsCallbackReal extensionsCallback(opCtx, + &request.getNamespaceString()); + ParsedUpdate parsedUpdate(opCtx, &request, extensionsCallback); + uassertStatusOK(parsedUpdate.parseRequest()); + + try { + return writeConflictRetryUpsert(opCtx, + nsString, + args, + curOp, + opDebug, + inTransaction, + &parsedUpdate, + result); + } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) { + if (!parsedUpdate.hasParsedQuery()) { + uassertStatusOK(parsedUpdate.parseQueryToCQ()); + } + + if (!UpdateStage::shouldRetryDuplicateKeyException( + parsedUpdate, *ex.extraInfo<DuplicateKeyErrorInfo>())) { + throw; + } + + ++retryAttempts; + logAndBackoff(4721200, + ::mongo::logv2::LogComponent::kWrite, + logv2::LogSeverity::Debug(1), + retryAttempts, + "Caught DuplicateKey exception during findAndModify upsert", + "namespace"_attr = nsString.ns()); + } } + } - ParsedDelete parsedDelete(opCtx, &request); - uassertStatusOK(parsedDelete.parseRequest()); + return true; + }); + } - AutoGetCollection autoColl(opCtx, nsString, MODE_IX); + static bool writeConflictRetryRemove(OperationContext* opCtx, + const NamespaceString& nsString, + const FindAndModifyRequest& args, + const int stmtId, + CurOp* const curOp, + OpDebug* const opDebug, + const bool inTransaction, + BSONObjBuilder& result) { + auto request = DeleteRequest{}; + request.setNsString(nsString); + const bool isExplain = false; + makeDeleteRequest(opCtx, args, isExplain, &request); + + if (opCtx->getTxnNumber()) { + request.setStmtId(stmtId); + } - { - boost::optional<int> dbProfilingLevel; - if (autoColl.getDb()) - dbProfilingLevel = autoColl.getDb()->getProfilingLevel(); + ParsedDelete parsedDelete(opCtx, &request); + uassertStatusOK(parsedDelete.parseRequest()); - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), dbProfilingLevel); - } + AutoGetCollection autoColl(opCtx, nsString, MODE_IX); - assertCanWrite(opCtx, nsString); + { + boost::optional<int> dbProfilingLevel; + if (autoColl.getDb()) + dbProfilingLevel = autoColl.getDb()->getProfilingLevel(); - Collection* const collection = autoColl.getCollection(); - checkIfTransactionOnCappedColl(collection, inTransaction); + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), dbProfilingLevel); + } - const auto exec = uassertStatusOK(getExecutorDelete( - opDebug, collection, &parsedDelete, boost::none /* verbosity */)); + assertCanWrite(opCtx, nsString); - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); - } + Collection* const collection = autoColl.getCollection(); + checkIfTransactionOnCappedColl(collection, inTransaction); - auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove()); - // Nothing after advancing the plan executor should throw a WriteConflictException, - // so the following bookkeeping with execution stats won't end up being done - // multiple times. + const auto exec = uassertStatusOK( + getExecutorDelete(opDebug, collection, &parsedDelete, boost::none /* verbosity */)); - PlanSummaryStats summaryStats; - Explain::getSummaryStats(*exec, &summaryStats); - if (collection) { - CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, summaryStats); - } - opDebug->setPlanSummaryMetrics(summaryStats); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + } - // Fill out OpDebug with the number of deleted docs. - opDebug->additiveMetrics.ndeleted = DeleteStage::getNumDeleted(*exec); + auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove()); + // Nothing after advancing the plan executor should throw a WriteConflictException, + // so the following bookkeeping with execution stats won't end up being done + // multiple times. - if (curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec.get(), &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); - } - recordStatsForTopCommand(opCtx); + PlanSummaryStats summaryStats; + Explain::getSummaryStats(*exec, &summaryStats); + if (collection) { + CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, summaryStats); + } + opDebug->setPlanSummaryMetrics(summaryStats); - appendCommandResponse(exec.get(), args.isRemove(), docFound, &result); - } else { - auto request = UpdateRequest(); - request.setNamespaceString(nsString); - const bool isExplain = false; - makeUpdateRequest(opCtx, args, isExplain, &request); + // Fill out OpDebug with the number of deleted docs. + opDebug->additiveMetrics.ndeleted = DeleteStage::getNumDeleted(*exec); - if (opCtx->getTxnNumber()) { - request.setStmtId(stmtId); - } + if (curOp->shouldDBProfile()) { + BSONObjBuilder execStatsBob; + Explain::getWinningPlanStats(exec.get(), &execStatsBob); + curOp->debug().execStats = execStatsBob.obj(); + } + recordStatsForTopCommand(opCtx); - const ExtensionsCallbackReal extensionsCallback(opCtx, - &request.getNamespaceString()); - ParsedUpdate parsedUpdate(opCtx, &request, extensionsCallback); - uassertStatusOK(parsedUpdate.parseRequest()); + appendCommandResponse(exec.get(), args.isRemove(), docFound, &result); - AutoGetCollection autoColl(opCtx, nsString, MODE_IX); - Database* db = autoColl.ensureDbExists(); + return true; + } - { - boost::optional<int> dbProfilingLevel; - dbProfilingLevel = db->getProfilingLevel(); + static bool writeConflictRetryUpsert(OperationContext* opCtx, + const NamespaceString& nsString, + const FindAndModifyRequest& args, + CurOp* const curOp, + OpDebug* const opDebug, + const bool inTransaction, + ParsedUpdate* parsedUpdate, + BSONObjBuilder& result) { + AutoGetCollection autoColl(opCtx, nsString, MODE_IX); + Database* db = autoColl.ensureDbExists(); + + { + boost::optional<int> dbProfilingLevel; + dbProfilingLevel = db->getProfilingLevel(); + + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), dbProfilingLevel); + } - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->enter_inlock(nsString.ns().c_str(), dbProfilingLevel); - } + assertCanWrite(opCtx, nsString); - assertCanWrite(opCtx, nsString); + Collection* collection = autoColl.getCollection(); - Collection* collection = autoColl.getCollection(); + // Create the collection if it does not exist when performing an upsert because the + // update stage does not create its own collection + if (!collection && args.isUpsert()) { + assertCanWrite(opCtx, nsString); - // Create the collection if it does not exist when performing an upsert because the - // update stage does not create its own collection - if (!collection && args.isUpsert()) { - assertCanWrite(opCtx, nsString); + collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nsString); - collection = - CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nsString); + // If someone else beat us to creating the collection, do nothing + if (!collection) { + uassertStatusOK(userAllowedCreateNS(nsString.db(), nsString.coll())); + WriteUnitOfWork wuow(opCtx); + CollectionOptions defaultCollectionOptions; + uassertStatusOK(db->userCreateNS(opCtx, nsString, defaultCollectionOptions)); + wuow.commit(); - // If someone else beat us to creating the collection, do nothing - if (!collection) { - uassertStatusOK(userAllowedCreateNS(nsString.db(), nsString.coll())); - WriteUnitOfWork wuow(opCtx); - CollectionOptions defaultCollectionOptions; - uassertStatusOK( - db->userCreateNS(opCtx, nsString, defaultCollectionOptions)); - wuow.commit(); + collection = + CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, nsString); + } - collection = CollectionCatalog::get(opCtx).lookupCollectionByNamespace( - opCtx, nsString); - } + invariant(collection); + } - invariant(collection); - } + checkIfTransactionOnCappedColl(collection, inTransaction); - checkIfTransactionOnCappedColl(collection, inTransaction); + const auto exec = uassertStatusOK( + getExecutorUpdate(opDebug, collection, parsedUpdate, boost::none /* verbosity */)); - const auto exec = uassertStatusOK(getExecutorUpdate( - opDebug, collection, &parsedUpdate, boost::none /* verbosity */)); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + } - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); - } + auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove()); + // Nothing after advancing the plan executor should throw a WriteConflictException, + // so the following bookkeeping with execution stats won't end up being done + // multiple times. - auto docFound = advanceExecutor(opCtx, exec.get(), args.isRemove()); - // Nothing after advancing the plan executor should throw a WriteConflictException, - // so the following bookkeeping with execution stats won't end up being done - // multiple times. + PlanSummaryStats summaryStats; + Explain::getSummaryStats(*exec, &summaryStats); + if (collection) { + CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, summaryStats); + } + UpdateStage::recordUpdateStatsInOpDebug(UpdateStage::getUpdateStats(exec.get()), opDebug); + opDebug->setPlanSummaryMetrics(summaryStats); - PlanSummaryStats summaryStats; - Explain::getSummaryStats(*exec, &summaryStats); - if (collection) { - CollectionQueryInfo::get(collection).notifyOfQuery(opCtx, summaryStats); - } - UpdateStage::recordUpdateStatsInOpDebug(UpdateStage::getUpdateStats(exec.get()), - opDebug); - opDebug->setPlanSummaryMetrics(summaryStats); - - if (curOp->shouldDBProfile()) { - BSONObjBuilder execStatsBob; - Explain::getWinningPlanStats(exec.get(), &execStatsBob); - curOp->debug().execStats = execStatsBob.obj(); - } - recordStatsForTopCommand(opCtx); + if (curOp->shouldDBProfile()) { + BSONObjBuilder execStatsBob; + Explain::getWinningPlanStats(exec.get(), &execStatsBob); + curOp->debug().execStats = execStatsBob.obj(); + } + recordStatsForTopCommand(opCtx); - appendCommandResponse(exec.get(), args.isRemove(), docFound, &result); - } + appendCommandResponse(exec.get(), args.isRemove(), docFound, &result); - return true; - }); + return true; } void appendMirrorableRequest(BSONObjBuilder* bob, const BSONObj& cmdObj) const override { |