diff options
author | seanzimm <sean.zimmerman@mongodb.com> | 2023-04-13 13:38:49 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-13 14:23:24 +0000 |
commit | 820e0cdd1b0d89a1c4eb6f753ff7d197ee4e2943 (patch) | |
tree | 727e44b0f7cafab2d5c5dd37f3b2f454e0b13e37 /src/mongo/db/commands/bulk_write.cpp | |
parent | d593c4224e75133128bf884da5580df2ae197697 (diff) | |
download | mongo-820e0cdd1b0d89a1c4eb6f753ff7d197ee4e2943.tar.gz |
SERVER-72988: Support retryable writes for bulkWrite on mongod
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r-- | src/mongo/db/commands/bulk_write.cpp | 285 |
1 files changed, 251 insertions, 34 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index 34c5b905c0a..8790651596b 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" +#include "mongo/util/assert_util.h" #include <string> #include <vector> @@ -55,6 +56,7 @@ #include "mongo/db/ops/update_request.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/ops/write_ops_retryability.h" #include "mongo/db/query/plan_executor_factory.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/oplog.h" @@ -62,6 +64,8 @@ #include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" +#include "mongo/db/transaction/retryable_writes_stats.h" +#include "mongo/db/transaction/transaction_participant.h" #include "mongo/db/transaction_validation.h" #include "mongo/logv2/log.h" #include "mongo/util/log_and_backoff.h" @@ -73,11 +77,15 @@ namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeBulkWritePerformsUpdate); -using UpdateCallback = std::function<void( - int /* currentOpIdx */, const UpdateResult&, const boost::optional<BSONObj>& /* value */)>; +using UpdateCallback = std::function<void(int /* currentOpIdx */, + const UpdateResult&, + const boost::optional<BSONObj>& /* value */, + const boost::optional<int32_t>& /* stmtId */)>; -using DeleteCallback = std::function<void( - int /* currentOpIdx */, long long /* nDeleted */, const boost::optional<BSONObj>& /* value */)>; +using DeleteCallback = std::function<void(int /* currentOpIdx */, + long long /* nDeleted */, + const boost::optional<BSONObj>& /* value */, + const boost::optional<int32_t>& /* stmtId */)>; using ErrorCallback = std::function<void(int /* currentOpIdx */, const Status&)>; @@ -108,7 +116,18 @@ public: return _batch.empty(); } - // Returns true if the write was successful and did not encounter errors. + void addRetryableWriteResult(OperationContext* opCtx, size_t idx, int32_t stmtId) { + write_ops_exec::WriteResult out; + SingleWriteResult res; + res.setN(1); + res.setNModified(0); + out.retriedStmtIds.push_back(stmtId); + out.results.emplace_back(res); + + _replyFn(opCtx, idx, out); + } + + // Returns true if the bulkWrite operation can continue and false if it should stop. bool flush(OperationContext* opCtx) { if (empty()) { return true; @@ -205,6 +224,11 @@ public: write_ops_exec::WriteResult& writes) { invariant(!writes.results.empty()); + // Copy over retriedStmtIds. + for (auto& stmtId : writes.retriedStmtIds) { + _retriedStmtIds.emplace_back(stmtId); + } + for (size_t i = 0; i < writes.results.size(); ++i) { auto idx = firstOpIdx + i; // We do not pass in a proper numErrors since it causes unwanted truncation in error @@ -223,41 +247,60 @@ public: void addUpdateReply(size_t currentOpIdx, const UpdateResult& result, - const boost::optional<BSONObj>& value) { + const boost::optional<BSONObj>& value, + const boost::optional<int32_t>& stmtId) { auto replyItem = BulkWriteReplyItem(currentOpIdx); replyItem.setNModified(result.numDocsModified); if (!result.upsertedId.isEmpty()) { replyItem.setUpserted( write_ops::Upserted(0, IDLAnyTypeOwned(result.upsertedId.firstElement()))); } + if (value) { replyItem.setValue(value); } + + if (stmtId) { + _retriedStmtIds.emplace_back(*stmtId); + } + _replies.emplace_back(replyItem); } void addDeleteReply(size_t currentOpIdx, long long nDeleted, - const boost::optional<BSONObj>& value) { + const boost::optional<BSONObj>& value, + const boost::optional<int32_t>& stmtId) { auto replyItem = BulkWriteReplyItem(currentOpIdx); replyItem.setN(nDeleted); + if (value) { replyItem.setValue(value); } + + if (stmtId) { + _retriedStmtIds.emplace_back(*stmtId); + } + _replies.emplace_back(replyItem); } + void addErrorReply(size_t currentOpIdx, const Status& status) { + _replies.emplace_back(currentOpIdx, status); + } + std::vector<BulkWriteReplyItem>& getReplies() { return _replies; } - void addErrorReply(size_t currentOpIdx, const Status& status) { - _replies.emplace_back(currentOpIdx, status); + std::vector<int>& getRetriedStmtIds() { + return _retriedStmtIds; } private: const BulkWriteCommandRequest& _req; std::vector<BulkWriteReplyItem> _replies; + std::vector<int32_t> _retriedStmtIds; }; void finishCurOp(OperationContext* opCtx, CurOp* curOp) { @@ -320,6 +363,103 @@ int32_t getStatementId(OperationContext* opCtx, return kUninitializedStmtId; } +std::tuple<long long, boost::optional<BSONObj>> getRetryResultForDelete( + OperationContext* opCtx, + const NamespaceString& nsString, + const boost::optional<repl::OplogEntry>& entry) { + // Use a SideTransactionBlock since 'parseOplogEntryForFindAndModify' might need + // to fetch a pre/post image from the oplog and if this is a retry inside an + // in-progress retryable internal transaction, this 'opCtx' would have an active + // WriteUnitOfWork and it is illegal to read the the oplog when there is an + // active WriteUnitOfWork. + TransactionParticipant::SideTransactionBlock sideTxn(opCtx); + + // Need to create a dummy FindAndModifyRequest to use to parse the oplog entry + // using existing helpers. + // The helper only checks a couple of booleans for validation so we do not need + // to copy over all fields. + auto findAndModifyReq = write_ops::FindAndModifyCommandRequest(nsString); + findAndModifyReq.setRemove(true); + findAndModifyReq.setNew(false); + + auto findAndModifyReply = parseOplogEntryForFindAndModify(opCtx, findAndModifyReq, *entry); + + return std::make_tuple(findAndModifyReply.getLastErrorObject().getNumDocs(), + findAndModifyReply.getValue()); +} + +std::tuple<UpdateResult, boost::optional<BSONObj>> getRetryResultForUpdate( + OperationContext* opCtx, + const NamespaceString& nsString, + const BulkWriteUpdateOp* op, + const boost::optional<repl::OplogEntry>& entry) { + // If 'return' is not specified then fetch this statement using the normal update + // helpers. If 'return' is specified we need to use the findAndModify helpers. + // findAndModify helpers do not support Updates executed with a none return so this + // split is necessary. + if (!op->getReturn()) { + auto writeResult = parseOplogEntryForUpdate(*entry); + + // Since multi cannot be true for retryable writes numDocsModified + upserted should be 1 + tassert(ErrorCodes::BadValue, + "bulkWrite retryable update must only modify one document", + writeResult.getNModified() + (writeResult.getUpsertedId().isEmpty() ? 0 : 1) == 1); + + // We only care about the values of numDocsModified and upserted from the Update + // result. + // TODO SERVER-75946 Set numMatched correctly. + return std::make_tuple( + UpdateResult(false, false, writeResult.getNModified(), 0, writeResult.getUpsertedId()), + boost::none); + } + + // Use a SideTransactionBlock since 'parseOplogEntryForFindAndModify' might need + // to fetch a pre/post image from the oplog and if this is a retry inside an + // in-progress retryable internal transaction, this 'opCtx' would have an active + // WriteUnitOfWork and it is illegal to read the the oplog when there is an + // active WriteUnitOfWork. + TransactionParticipant::SideTransactionBlock sideTxn(opCtx); + + // Need to create a dummy FindAndModifyRequest to use to parse the oplog entry + // using existing helpers. + // The helper only checks a couple of booleans for validation so we do not need + // to copy over all fields. + auto findAndModifyReq = write_ops::FindAndModifyCommandRequest(nsString); + findAndModifyReq.setUpsert(op->getUpsert()); + findAndModifyReq.setRemove(false); + if (op->getReturn() && op->getReturn().get() == "post") { + findAndModifyReq.setNew(true); + } + + auto findAndModifyReply = parseOplogEntryForFindAndModify(opCtx, findAndModifyReq, *entry); + + int numDocsModified = findAndModifyReply.getLastErrorObject().getNumDocs(); + + // TODO SERVER-75946 we should use IDLAnyTypeOwned from the findAndModifyReply instead of this. + BSONObj upserted = BSONObj(); + if (entry->getOpType() == repl::OpTypeEnum::kInsert) { + auto owned = entry->getObject().getOwned(); + auto id = owned.getField("_id"); + if (id) { + upserted = owned; + // An 'upserted' doc does not count as a modified doc but counts in the + // numDocs total. Since numDocs is either 1 or 0 it should be 0 here. + numDocsModified = 0; + } + } + + // Since multi cannot be true for retryable writes numDocsModified + upserted should be 1 + tassert(ErrorCodes::BadValue, + "bulkWrite retryable update must only modify one document", + numDocsModified + (upserted.isEmpty() ? 0 : 1) == 1); + + // We only care about the values of numDocsModified and upserted from the Update + // result. + // TODO SERVER-75946 Set numMatched correctly. + return std::make_tuple(UpdateResult(false, false, numDocsModified, 0, upserted), + findAndModifyReply.getValue()); +} + bool handleInsertOp(OperationContext* opCtx, const BulkWriteInsertOp* op, const BulkWriteCommandRequest& req, @@ -330,10 +470,23 @@ bool handleInsertOp(OperationContext* opCtx, auto idx = op->getInsert(); auto stmtId = getStatementId(opCtx, req, currentOpIdx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + + if (opCtx->isRetryableWrite() && + txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx, stmtId)) { + if (!batch.flush(opCtx)) { + return false; + } + + RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); + batch.addRetryableWriteResult(opCtx, currentOpIdx, stmtId); + return true; + } + bool containsDotsAndDollarsField = false; auto fixedDoc = fixDocumentForInsert(opCtx, op->getDocument(), &containsDotsAndDollarsField); - // TODO SERVER-72988: handle retryable writes. if (!fixedDoc.isOK()) { if (!batch.flush(opCtx)) { return false; @@ -379,6 +532,10 @@ bool handleUpdateOp(OperationContext* opCtx, uassert(ErrorCodes::InvalidOptions, "May not specify both multi and return in bulkWrite command.", !op->getReturn()); + + uassert(ErrorCodes::InvalidOptions, + "Cannot use retryable writes with multi=true", + !opCtx->isRetryableWrite()); } if (op->getReturnFields()) { @@ -393,6 +550,20 @@ bool handleUpdateOp(OperationContext* opCtx, doTransactionValidationForWrites(opCtx, nsString); + auto stmtId = getStatementId(opCtx, req, currentOpIdx); + if (opCtx->isRetryableWrite()) { + const auto txnParticipant = TransactionParticipant::get(opCtx); + if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) { + RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); + + auto [updateResult, image] = getRetryResultForUpdate(opCtx, nsString, op, entry); + + replyCB(currentOpIdx, updateResult, image, stmtId); + + return true; + } + } + const bool inTransaction = opCtx->inMultiDocumentTransaction(); auto updateRequest = UpdateRequest(); @@ -419,11 +590,8 @@ bool handleUpdateOp(OperationContext* opCtx, updateRequest.setYieldPolicy(inTransaction ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY : PlanYieldPolicy::YieldPolicy::YIELD_AUTO); - if (req.getStmtIds()) { - updateRequest.setStmtIds(*req.getStmtIds()); - } else if (req.getStmtId()) { - updateRequest.setStmtIds({*req.getStmtId()}); - } + // We only execute one update op at a time. + updateRequest.setStmtIds({stmtId}); // Although usually the PlanExecutor handles WCE internally, it will throw WCEs when it // is executing an update. This is done to ensure that we can always match, @@ -454,7 +622,7 @@ bool handleUpdateOp(OperationContext* opCtx, updateRequest.isUpsert(), docFound, &parsedUpdate); - replyCB(currentOpIdx, result, docFound); + replyCB(currentOpIdx, result, docFound, boost::none); return true; } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) { if (!parsedUpdate.hasParsedQuery()) { @@ -477,6 +645,10 @@ bool handleUpdateOp(OperationContext* opCtx, } }); } catch (const DBException& ex) { + // IncompleteTrasactionHistory should always be command fatal. + if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { + throw; + } errorCB(currentOpIdx, ex.toStatus()); write_ops_exec::WriteResult out; return write_ops_exec::handleError( @@ -498,6 +670,10 @@ bool handleDeleteOp(OperationContext* opCtx, uassert(ErrorCodes::InvalidOptions, "May not specify both multi and return in bulkWrite command.", !op->getReturn()); + + uassert(ErrorCodes::InvalidOptions, + "Cannot use retryable writes with multi=true", + !opCtx->isRetryableWrite()); } if (op->getReturnFields()) { @@ -512,6 +688,28 @@ bool handleDeleteOp(OperationContext* opCtx, doTransactionValidationForWrites(opCtx, nsString); + auto stmtId = getStatementId(opCtx, req, currentOpIdx); + if (opCtx->isRetryableWrite()) { + const auto txnParticipant = TransactionParticipant::get(opCtx); + // If 'return' is not specified then we do not need to parse the statement. Since + // multi:true is not allowed with retryable writes if the statement was executed + // there will always be 1 document deleted. + if (!op->getReturn()) { + if (txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx, stmtId)) { + RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); + replyCB(currentOpIdx, 1, boost::none, stmtId); + return true; + } + } else { + if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) { + RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); + auto [numDocs, image] = getRetryResultForDelete(opCtx, nsString, entry); + replyCB(currentOpIdx, numDocs, image, stmtId); + return true; + } + } + } + auto deleteRequest = DeleteRequest(); deleteRequest.setNsString(nsString); deleteRequest.setQuery(op->getFilter()); @@ -529,9 +727,7 @@ bool handleDeleteOp(OperationContext* opCtx, ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY : PlanYieldPolicy::YieldPolicy::YIELD_AUTO); - if (opCtx->getTxnNumber() && req.getStmtId()) { - deleteRequest.setStmtId(*req.getStmtId()); - } + deleteRequest.setStmtId(stmtId); const bool inTransaction = opCtx->inMultiDocumentTransaction(); @@ -539,10 +735,14 @@ bool handleDeleteOp(OperationContext* opCtx, boost::optional<BSONObj> docFound; auto nDeleted = write_ops_exec::writeConflictRetryRemove( opCtx, nsString, &deleteRequest, curOp, opDebug, inTransaction, docFound); - replyCB(currentOpIdx, nDeleted, docFound); + replyCB(currentOpIdx, nDeleted, docFound, boost::none); return true; }); } catch (const DBException& ex) { + // IncompleteTrasactionHistory should always be command fatal. + if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { + throw; + } errorCB(currentOpIdx, ex.toStatus()); write_ops_exec::WriteResult out; return write_ops_exec::handleError( @@ -618,9 +818,9 @@ public: bulk_write_common::validateRequest(req); // Apply all of the write operations. - auto replies = bulk_write::performWrites(opCtx, req); + auto [replies, retriedStmtIds] = bulk_write::performWrites(opCtx, req); - return _populateCursorReply(opCtx, req, std::move(replies)); + return _populateCursorReply(opCtx, req, std::move(replies), std::move(retriedStmtIds)); } void doCheckAuthorization(OperationContext* opCtx) const final try { @@ -639,7 +839,8 @@ public: private: Reply _populateCursorReply(OperationContext* opCtx, const BulkWriteCommandRequest& req, - std::vector<BulkWriteReplyItem> replies) { + bulk_write::BulkWriteReplyItems replies, + bulk_write::RetriedStmtIds retriedStmtIds) { const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS(); auto expCtx = make_intrusive<ExpressionContext>( opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns()); @@ -695,8 +896,12 @@ public: } if (exec->isEOF()) { invariant(numReplies == replies.size()); - return BulkWriteCommandReply(BulkWriteCommandResponseCursor( + auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor( 0, std::vector<BulkWriteReplyItem>(std::move(replies)))); + if (!retriedStmtIds.empty()) { + reply.setRetriedStmtIds(std::move(retriedStmtIds)); + } + return reply; } exec->saveState(); @@ -719,8 +924,12 @@ public: pinnedCursor->incNReturnedSoFar(replies.size()); replies.resize(numReplies); - return BulkWriteCommandReply(BulkWriteCommandResponseCursor( + auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor( cursorId, std::vector<BulkWriteReplyItem>(std::move(replies)))); + if (!retriedStmtIds.empty()) { + reply.setRetriedStmtIds(std::move(retriedStmtIds)); + } + return reply; } }; @@ -730,8 +939,7 @@ public: namespace bulk_write { -std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, - const BulkWriteCommandRequest& req) { +BulkWriteReply performWrites(OperationContext* opCtx, const BulkWriteCommandRequest& req) { const auto& ops = req.getOps(); const auto& bypassDocumentValidation = req.getBypassDocumentValidation(); @@ -751,13 +959,16 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, }; auto updateCB = [&responses](int currentOpIdx, const UpdateResult& result, - const boost::optional<BSONObj>& value) { - responses.addUpdateReply(currentOpIdx, result, value); + const boost::optional<BSONObj>& value, + const boost::optional<int32_t>& stmtId) { + responses.addUpdateReply(currentOpIdx, result, value, stmtId); + }; + auto deleteCB = [&responses](int currentOpIdx, + long long nDeleted, + const boost::optional<BSONObj>& value, + const boost::optional<int32_t>& stmtId) { + responses.addDeleteReply(currentOpIdx, nDeleted, value, stmtId); }; - auto deleteCB = - [&responses](int currentOpIdx, long long nDeleted, const boost::optional<BSONObj>& value) { - responses.addDeleteReply(currentOpIdx, nDeleted, value); - }; auto errorCB = [&responses](int currentOpIdx, const Status& status) { responses.addErrorReply(currentOpIdx, status); @@ -776,6 +987,12 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, if (curOp) { finishCurOp(opCtx, &*curOp); } + + const auto& retriedStmtIds = responses.getRetriedStmtIds(); + // If any statements were retried then incremement command counter. + if (!retriedStmtIds.empty()) { + RetryableWritesStats::get(opCtx)->incrementRetriedCommandsCount(); + } }); // Tell mongod what the shard and database versions are. This will cause writes to fail in case @@ -823,7 +1040,7 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx, invariant(batch.empty()); - return responses.getReplies(); + return make_tuple(responses.getReplies(), responses.getRetriedStmtIds()); } } // namespace bulk_write |