summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/bulk_write.cpp
diff options
context:
space:
mode:
authorseanzimm <sean.zimmerman@mongodb.com>2023-04-13 13:38:49 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-13 14:23:24 +0000
commit820e0cdd1b0d89a1c4eb6f753ff7d197ee4e2943 (patch)
tree727e44b0f7cafab2d5c5dd37f3b2f454e0b13e37 /src/mongo/db/commands/bulk_write.cpp
parentd593c4224e75133128bf884da5580df2ae197697 (diff)
downloadmongo-820e0cdd1b0d89a1c4eb6f753ff7d197ee4e2943.tar.gz
SERVER-72988: Support retryable writes for bulkWrite on mongod
Diffstat (limited to 'src/mongo/db/commands/bulk_write.cpp')
-rw-r--r--src/mongo/db/commands/bulk_write.cpp285
1 files changed, 251 insertions, 34 deletions
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index 34c5b905c0a..8790651596b 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/util/assert_util.h"
#include <string>
#include <vector>
@@ -55,6 +56,7 @@
#include "mongo/db/ops/update_request.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/ops/write_ops_retryability.h"
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/oplog.h"
@@ -62,6 +64,8 @@
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/transaction/retryable_writes_stats.h"
+#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/db/transaction_validation.h"
#include "mongo/logv2/log.h"
#include "mongo/util/log_and_backoff.h"
@@ -73,11 +77,15 @@ namespace {
MONGO_FAIL_POINT_DEFINE(hangBeforeBulkWritePerformsUpdate);
-using UpdateCallback = std::function<void(
- int /* currentOpIdx */, const UpdateResult&, const boost::optional<BSONObj>& /* value */)>;
+using UpdateCallback = std::function<void(int /* currentOpIdx */,
+ const UpdateResult&,
+ const boost::optional<BSONObj>& /* value */,
+ const boost::optional<int32_t>& /* stmtId */)>;
-using DeleteCallback = std::function<void(
- int /* currentOpIdx */, long long /* nDeleted */, const boost::optional<BSONObj>& /* value */)>;
+using DeleteCallback = std::function<void(int /* currentOpIdx */,
+ long long /* nDeleted */,
+ const boost::optional<BSONObj>& /* value */,
+ const boost::optional<int32_t>& /* stmtId */)>;
using ErrorCallback = std::function<void(int /* currentOpIdx */, const Status&)>;
@@ -108,7 +116,18 @@ public:
return _batch.empty();
}
- // Returns true if the write was successful and did not encounter errors.
+ void addRetryableWriteResult(OperationContext* opCtx, size_t idx, int32_t stmtId) {
+ write_ops_exec::WriteResult out;
+ SingleWriteResult res;
+ res.setN(1);
+ res.setNModified(0);
+ out.retriedStmtIds.push_back(stmtId);
+ out.results.emplace_back(res);
+
+ _replyFn(opCtx, idx, out);
+ }
+
+ // Returns true if the bulkWrite operation can continue and false if it should stop.
bool flush(OperationContext* opCtx) {
if (empty()) {
return true;
@@ -205,6 +224,11 @@ public:
write_ops_exec::WriteResult& writes) {
invariant(!writes.results.empty());
+ // Copy over retriedStmtIds.
+ for (auto& stmtId : writes.retriedStmtIds) {
+ _retriedStmtIds.emplace_back(stmtId);
+ }
+
for (size_t i = 0; i < writes.results.size(); ++i) {
auto idx = firstOpIdx + i;
// We do not pass in a proper numErrors since it causes unwanted truncation in error
@@ -223,41 +247,60 @@ public:
void addUpdateReply(size_t currentOpIdx,
const UpdateResult& result,
- const boost::optional<BSONObj>& value) {
+ const boost::optional<BSONObj>& value,
+ const boost::optional<int32_t>& stmtId) {
auto replyItem = BulkWriteReplyItem(currentOpIdx);
replyItem.setNModified(result.numDocsModified);
if (!result.upsertedId.isEmpty()) {
replyItem.setUpserted(
write_ops::Upserted(0, IDLAnyTypeOwned(result.upsertedId.firstElement())));
}
+
if (value) {
replyItem.setValue(value);
}
+
+ if (stmtId) {
+ _retriedStmtIds.emplace_back(*stmtId);
+ }
+
_replies.emplace_back(replyItem);
}
void addDeleteReply(size_t currentOpIdx,
long long nDeleted,
- const boost::optional<BSONObj>& value) {
+ const boost::optional<BSONObj>& value,
+ const boost::optional<int32_t>& stmtId) {
auto replyItem = BulkWriteReplyItem(currentOpIdx);
replyItem.setN(nDeleted);
+
if (value) {
replyItem.setValue(value);
}
+
+ if (stmtId) {
+ _retriedStmtIds.emplace_back(*stmtId);
+ }
+
_replies.emplace_back(replyItem);
}
+ void addErrorReply(size_t currentOpIdx, const Status& status) {
+ _replies.emplace_back(currentOpIdx, status);
+ }
+
std::vector<BulkWriteReplyItem>& getReplies() {
return _replies;
}
- void addErrorReply(size_t currentOpIdx, const Status& status) {
- _replies.emplace_back(currentOpIdx, status);
+ std::vector<int>& getRetriedStmtIds() {
+ return _retriedStmtIds;
}
private:
const BulkWriteCommandRequest& _req;
std::vector<BulkWriteReplyItem> _replies;
+ std::vector<int32_t> _retriedStmtIds;
};
void finishCurOp(OperationContext* opCtx, CurOp* curOp) {
@@ -320,6 +363,103 @@ int32_t getStatementId(OperationContext* opCtx,
return kUninitializedStmtId;
}
+std::tuple<long long, boost::optional<BSONObj>> getRetryResultForDelete(
+ OperationContext* opCtx,
+ const NamespaceString& nsString,
+ const boost::optional<repl::OplogEntry>& entry) {
+ // Use a SideTransactionBlock since 'parseOplogEntryForFindAndModify' might need
+ // to fetch a pre/post image from the oplog and if this is a retry inside an
+ // in-progress retryable internal transaction, this 'opCtx' would have an active
+ // WriteUnitOfWork and it is illegal to read the the oplog when there is an
+ // active WriteUnitOfWork.
+ TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
+
+ // Need to create a dummy FindAndModifyRequest to use to parse the oplog entry
+ // using existing helpers.
+ // The helper only checks a couple of booleans for validation so we do not need
+ // to copy over all fields.
+ auto findAndModifyReq = write_ops::FindAndModifyCommandRequest(nsString);
+ findAndModifyReq.setRemove(true);
+ findAndModifyReq.setNew(false);
+
+ auto findAndModifyReply = parseOplogEntryForFindAndModify(opCtx, findAndModifyReq, *entry);
+
+ return std::make_tuple(findAndModifyReply.getLastErrorObject().getNumDocs(),
+ findAndModifyReply.getValue());
+}
+
+std::tuple<UpdateResult, boost::optional<BSONObj>> getRetryResultForUpdate(
+ OperationContext* opCtx,
+ const NamespaceString& nsString,
+ const BulkWriteUpdateOp* op,
+ const boost::optional<repl::OplogEntry>& entry) {
+ // If 'return' is not specified then fetch this statement using the normal update
+ // helpers. If 'return' is specified we need to use the findAndModify helpers.
+ // findAndModify helpers do not support Updates executed with a none return so this
+ // split is necessary.
+ if (!op->getReturn()) {
+ auto writeResult = parseOplogEntryForUpdate(*entry);
+
+ // Since multi cannot be true for retryable writes numDocsModified + upserted should be 1
+ tassert(ErrorCodes::BadValue,
+ "bulkWrite retryable update must only modify one document",
+ writeResult.getNModified() + (writeResult.getUpsertedId().isEmpty() ? 0 : 1) == 1);
+
+ // We only care about the values of numDocsModified and upserted from the Update
+ // result.
+ // TODO SERVER-75946 Set numMatched correctly.
+ return std::make_tuple(
+ UpdateResult(false, false, writeResult.getNModified(), 0, writeResult.getUpsertedId()),
+ boost::none);
+ }
+
+ // Use a SideTransactionBlock since 'parseOplogEntryForFindAndModify' might need
+ // to fetch a pre/post image from the oplog and if this is a retry inside an
+ // in-progress retryable internal transaction, this 'opCtx' would have an active
+ // WriteUnitOfWork and it is illegal to read the the oplog when there is an
+ // active WriteUnitOfWork.
+ TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
+
+ // Need to create a dummy FindAndModifyRequest to use to parse the oplog entry
+ // using existing helpers.
+ // The helper only checks a couple of booleans for validation so we do not need
+ // to copy over all fields.
+ auto findAndModifyReq = write_ops::FindAndModifyCommandRequest(nsString);
+ findAndModifyReq.setUpsert(op->getUpsert());
+ findAndModifyReq.setRemove(false);
+ if (op->getReturn() && op->getReturn().get() == "post") {
+ findAndModifyReq.setNew(true);
+ }
+
+ auto findAndModifyReply = parseOplogEntryForFindAndModify(opCtx, findAndModifyReq, *entry);
+
+ int numDocsModified = findAndModifyReply.getLastErrorObject().getNumDocs();
+
+ // TODO SERVER-75946 we should use IDLAnyTypeOwned from the findAndModifyReply instead of this.
+ BSONObj upserted = BSONObj();
+ if (entry->getOpType() == repl::OpTypeEnum::kInsert) {
+ auto owned = entry->getObject().getOwned();
+ auto id = owned.getField("_id");
+ if (id) {
+ upserted = owned;
+ // An 'upserted' doc does not count as a modified doc but counts in the
+ // numDocs total. Since numDocs is either 1 or 0 it should be 0 here.
+ numDocsModified = 0;
+ }
+ }
+
+ // Since multi cannot be true for retryable writes numDocsModified + upserted should be 1
+ tassert(ErrorCodes::BadValue,
+ "bulkWrite retryable update must only modify one document",
+ numDocsModified + (upserted.isEmpty() ? 0 : 1) == 1);
+
+ // We only care about the values of numDocsModified and upserted from the Update
+ // result.
+ // TODO SERVER-75946 Set numMatched correctly.
+ return std::make_tuple(UpdateResult(false, false, numDocsModified, 0, upserted),
+ findAndModifyReply.getValue());
+}
+
bool handleInsertOp(OperationContext* opCtx,
const BulkWriteInsertOp* op,
const BulkWriteCommandRequest& req,
@@ -330,10 +470,23 @@ bool handleInsertOp(OperationContext* opCtx,
auto idx = op->getInsert();
auto stmtId = getStatementId(opCtx, req, currentOpIdx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
+ if (opCtx->isRetryableWrite() &&
+ txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx, stmtId)) {
+ if (!batch.flush(opCtx)) {
+ return false;
+ }
+
+ RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
+ batch.addRetryableWriteResult(opCtx, currentOpIdx, stmtId);
+ return true;
+ }
+
bool containsDotsAndDollarsField = false;
auto fixedDoc = fixDocumentForInsert(opCtx, op->getDocument(), &containsDotsAndDollarsField);
- // TODO SERVER-72988: handle retryable writes.
if (!fixedDoc.isOK()) {
if (!batch.flush(opCtx)) {
return false;
@@ -379,6 +532,10 @@ bool handleUpdateOp(OperationContext* opCtx,
uassert(ErrorCodes::InvalidOptions,
"May not specify both multi and return in bulkWrite command.",
!op->getReturn());
+
+ uassert(ErrorCodes::InvalidOptions,
+ "Cannot use retryable writes with multi=true",
+ !opCtx->isRetryableWrite());
}
if (op->getReturnFields()) {
@@ -393,6 +550,20 @@ bool handleUpdateOp(OperationContext* opCtx,
doTransactionValidationForWrites(opCtx, nsString);
+ auto stmtId = getStatementId(opCtx, req, currentOpIdx);
+ if (opCtx->isRetryableWrite()) {
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) {
+ RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
+
+ auto [updateResult, image] = getRetryResultForUpdate(opCtx, nsString, op, entry);
+
+ replyCB(currentOpIdx, updateResult, image, stmtId);
+
+ return true;
+ }
+ }
+
const bool inTransaction = opCtx->inMultiDocumentTransaction();
auto updateRequest = UpdateRequest();
@@ -419,11 +590,8 @@ bool handleUpdateOp(OperationContext* opCtx,
updateRequest.setYieldPolicy(inTransaction ? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY
: PlanYieldPolicy::YieldPolicy::YIELD_AUTO);
- if (req.getStmtIds()) {
- updateRequest.setStmtIds(*req.getStmtIds());
- } else if (req.getStmtId()) {
- updateRequest.setStmtIds({*req.getStmtId()});
- }
+ // We only execute one update op at a time.
+ updateRequest.setStmtIds({stmtId});
// Although usually the PlanExecutor handles WCE internally, it will throw WCEs when it
// is executing an update. This is done to ensure that we can always match,
@@ -454,7 +622,7 @@ bool handleUpdateOp(OperationContext* opCtx,
updateRequest.isUpsert(),
docFound,
&parsedUpdate);
- replyCB(currentOpIdx, result, docFound);
+ replyCB(currentOpIdx, result, docFound, boost::none);
return true;
} catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
if (!parsedUpdate.hasParsedQuery()) {
@@ -477,6 +645,10 @@ bool handleUpdateOp(OperationContext* opCtx,
}
});
} catch (const DBException& ex) {
+ // IncompleteTrasactionHistory should always be command fatal.
+ if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
+ throw;
+ }
errorCB(currentOpIdx, ex.toStatus());
write_ops_exec::WriteResult out;
return write_ops_exec::handleError(
@@ -498,6 +670,10 @@ bool handleDeleteOp(OperationContext* opCtx,
uassert(ErrorCodes::InvalidOptions,
"May not specify both multi and return in bulkWrite command.",
!op->getReturn());
+
+ uassert(ErrorCodes::InvalidOptions,
+ "Cannot use retryable writes with multi=true",
+ !opCtx->isRetryableWrite());
}
if (op->getReturnFields()) {
@@ -512,6 +688,28 @@ bool handleDeleteOp(OperationContext* opCtx,
doTransactionValidationForWrites(opCtx, nsString);
+ auto stmtId = getStatementId(opCtx, req, currentOpIdx);
+ if (opCtx->isRetryableWrite()) {
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ // If 'return' is not specified then we do not need to parse the statement. Since
+ // multi:true is not allowed with retryable writes if the statement was executed
+ // there will always be 1 document deleted.
+ if (!op->getReturn()) {
+ if (txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx, stmtId)) {
+ RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
+ replyCB(currentOpIdx, 1, boost::none, stmtId);
+ return true;
+ }
+ } else {
+ if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) {
+ RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
+ auto [numDocs, image] = getRetryResultForDelete(opCtx, nsString, entry);
+ replyCB(currentOpIdx, numDocs, image, stmtId);
+ return true;
+ }
+ }
+ }
+
auto deleteRequest = DeleteRequest();
deleteRequest.setNsString(nsString);
deleteRequest.setQuery(op->getFilter());
@@ -529,9 +727,7 @@ bool handleDeleteOp(OperationContext* opCtx,
? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY
: PlanYieldPolicy::YieldPolicy::YIELD_AUTO);
- if (opCtx->getTxnNumber() && req.getStmtId()) {
- deleteRequest.setStmtId(*req.getStmtId());
- }
+ deleteRequest.setStmtId(stmtId);
const bool inTransaction = opCtx->inMultiDocumentTransaction();
@@ -539,10 +735,14 @@ bool handleDeleteOp(OperationContext* opCtx,
boost::optional<BSONObj> docFound;
auto nDeleted = write_ops_exec::writeConflictRetryRemove(
opCtx, nsString, &deleteRequest, curOp, opDebug, inTransaction, docFound);
- replyCB(currentOpIdx, nDeleted, docFound);
+ replyCB(currentOpIdx, nDeleted, docFound, boost::none);
return true;
});
} catch (const DBException& ex) {
+ // IncompleteTrasactionHistory should always be command fatal.
+ if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
+ throw;
+ }
errorCB(currentOpIdx, ex.toStatus());
write_ops_exec::WriteResult out;
return write_ops_exec::handleError(
@@ -618,9 +818,9 @@ public:
bulk_write_common::validateRequest(req);
// Apply all of the write operations.
- auto replies = bulk_write::performWrites(opCtx, req);
+ auto [replies, retriedStmtIds] = bulk_write::performWrites(opCtx, req);
- return _populateCursorReply(opCtx, req, std::move(replies));
+ return _populateCursorReply(opCtx, req, std::move(replies), std::move(retriedStmtIds));
}
void doCheckAuthorization(OperationContext* opCtx) const final try {
@@ -639,7 +839,8 @@ public:
private:
Reply _populateCursorReply(OperationContext* opCtx,
const BulkWriteCommandRequest& req,
- std::vector<BulkWriteReplyItem> replies) {
+ bulk_write::BulkWriteReplyItems replies,
+ bulk_write::RetriedStmtIds retriedStmtIds) {
const NamespaceString cursorNss = NamespaceString::makeBulkWriteNSS();
auto expCtx = make_intrusive<ExpressionContext>(
opCtx, std::unique_ptr<CollatorInterface>(nullptr), ns());
@@ -695,8 +896,12 @@ public:
}
if (exec->isEOF()) {
invariant(numReplies == replies.size());
- return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
+ auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor(
0, std::vector<BulkWriteReplyItem>(std::move(replies))));
+ if (!retriedStmtIds.empty()) {
+ reply.setRetriedStmtIds(std::move(retriedStmtIds));
+ }
+ return reply;
}
exec->saveState();
@@ -719,8 +924,12 @@ public:
pinnedCursor->incNReturnedSoFar(replies.size());
replies.resize(numReplies);
- return BulkWriteCommandReply(BulkWriteCommandResponseCursor(
+ auto reply = BulkWriteCommandReply(BulkWriteCommandResponseCursor(
cursorId, std::vector<BulkWriteReplyItem>(std::move(replies))));
+ if (!retriedStmtIds.empty()) {
+ reply.setRetriedStmtIds(std::move(retriedStmtIds));
+ }
+ return reply;
}
};
@@ -730,8 +939,7 @@ public:
namespace bulk_write {
-std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
- const BulkWriteCommandRequest& req) {
+BulkWriteReply performWrites(OperationContext* opCtx, const BulkWriteCommandRequest& req) {
const auto& ops = req.getOps();
const auto& bypassDocumentValidation = req.getBypassDocumentValidation();
@@ -751,13 +959,16 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
};
auto updateCB = [&responses](int currentOpIdx,
const UpdateResult& result,
- const boost::optional<BSONObj>& value) {
- responses.addUpdateReply(currentOpIdx, result, value);
+ const boost::optional<BSONObj>& value,
+ const boost::optional<int32_t>& stmtId) {
+ responses.addUpdateReply(currentOpIdx, result, value, stmtId);
+ };
+ auto deleteCB = [&responses](int currentOpIdx,
+ long long nDeleted,
+ const boost::optional<BSONObj>& value,
+ const boost::optional<int32_t>& stmtId) {
+ responses.addDeleteReply(currentOpIdx, nDeleted, value, stmtId);
};
- auto deleteCB =
- [&responses](int currentOpIdx, long long nDeleted, const boost::optional<BSONObj>& value) {
- responses.addDeleteReply(currentOpIdx, nDeleted, value);
- };
auto errorCB = [&responses](int currentOpIdx, const Status& status) {
responses.addErrorReply(currentOpIdx, status);
@@ -776,6 +987,12 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
if (curOp) {
finishCurOp(opCtx, &*curOp);
}
+
+ const auto& retriedStmtIds = responses.getRetriedStmtIds();
+ // If any statements were retried then incremement command counter.
+ if (!retriedStmtIds.empty()) {
+ RetryableWritesStats::get(opCtx)->incrementRetriedCommandsCount();
+ }
});
// Tell mongod what the shard and database versions are. This will cause writes to fail in case
@@ -823,7 +1040,7 @@ std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
invariant(batch.empty());
- return responses.getReplies();
+ return make_tuple(responses.getReplies(), responses.getRetriedStmtIds());
}
} // namespace bulk_write