diff options
Diffstat (limited to 'src/mongo/db/ops/write_ops_exec.cpp')
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 27 |
1 files changed, 21 insertions, 6 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 34d10394657..f8548e2a18d 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -397,6 +397,12 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx, return true; } +template <typename T> +StmtId getStmtIdForWriteOp(OperationContext* opCtx, const T& wholeOp, size_t opIndex) { + return opCtx->getTxnNumber() ? write_ops::getStmtIdForWriteAt(wholeOp, opIndex) + : kUninitializedStmtId; +} + } // namespace WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) { @@ -452,11 +458,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who // correct order. In an ordered insert, if one of the docs ahead of us fails, we should // behave as-if we never got to this document. } else { - const StmtId stmtId = opCtx->getTxnNumber() - ? write_ops::getStmtIdForWriteAt(wholeOp, stmtIdIndex++) - : kUninitializedStmtId; BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue()); - batch.emplace_back(stmtId, toInsert); + batch.emplace_back(getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), toInsert); bytesInBatch += batch.back().doc.objsize(); if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes) continue; // Add more to batch before inserting. @@ -485,6 +488,7 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, const NamespaceString& ns, + StmtId stmtId, const write_ops::UpdateOpEntry& op) { globalOpCounters.gotUpdate(); auto& curOp = *CurOp::get(opCtx); @@ -503,6 +507,7 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, request.setQuery(op.getQ()); request.setUpdates(op.getU()); request.setCollation(write_ops::collationOf(op)); + request.setStmtId(stmtId); request.setArrayFilters(write_ops::arrayFiltersOf(op)); request.setMulti(op.getMulti()); request.setUpsert(op.getUpsert()); @@ -582,6 +587,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation()); LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace()); + size_t stmtIdIndex = 0; WriteResult out; out.results.reserve(wholeOp.getUpdates().size()); for (auto&& singleOp : wholeOp.getUpdates()) { @@ -598,7 +604,10 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who try { lastOpFixer.startingOp(); out.results.emplace_back( - performSingleUpdateOp(opCtx, wholeOp.getNamespace(), singleOp)); + performSingleUpdateOp(opCtx, + wholeOp.getNamespace(), + getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), + singleOp)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = @@ -613,6 +622,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, const NamespaceString& ns, + StmtId stmtId, const write_ops::DeleteOpEntry& op) { globalOpCounters.gotDelete(); auto& curOp = *CurOp::get(opCtx); @@ -632,6 +642,7 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx, request.setCollation(write_ops::collationOf(op)); request.setMulti(op.getMulti()); request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated. + request.setStmtId(stmtId); ParsedDelete parsedDelete(opCtx, &request); uassertStatusOK(parsedDelete.parseRequest()); @@ -692,6 +703,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation()); LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace()); + size_t stmtIdIndex = 0; WriteResult out; out.results.reserve(wholeOp.getDeletes().size()); for (auto&& singleOp : wholeOp.getDeletes()) { @@ -708,7 +720,10 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who try { lastOpFixer.startingOp(); out.results.emplace_back( - performSingleDeleteOp(opCtx, wholeOp.getNamespace(), singleOp)); + performSingleDeleteOp(opCtx, + wholeOp.getNamespace(), + getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), + singleOp)); lastOpFixer.finishedOpSuccessfully(); } catch (const DBException& ex) { const bool canContinue = |