summaryrefslogtreecommitdiff
path: root/src/mongo/db/ops/write_ops_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/ops/write_ops_exec.cpp')
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp27
1 files changed, 21 insertions, 6 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 34d10394657..f8548e2a18d 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -397,6 +397,12 @@ bool insertBatchAndHandleErrors(OperationContext* opCtx,
return true;
}
+template <typename T>
+StmtId getStmtIdForWriteOp(OperationContext* opCtx, const T& wholeOp, size_t opIndex) {
+ return opCtx->getTxnNumber() ? write_ops::getStmtIdForWriteAt(wholeOp, opIndex)
+ : kUninitializedStmtId;
+}
+
} // namespace
WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& wholeOp) {
@@ -452,11 +458,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
// correct order. In an ordered insert, if one of the docs ahead of us fails, we should
// behave as-if we never got to this document.
} else {
- const StmtId stmtId = opCtx->getTxnNumber()
- ? write_ops::getStmtIdForWriteAt(wholeOp, stmtIdIndex++)
- : kUninitializedStmtId;
BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());
- batch.emplace_back(stmtId, toInsert);
+ batch.emplace_back(getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++), toInsert);
bytesInBatch += batch.back().doc.objsize();
if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < insertVectorMaxBytes)
continue; // Add more to batch before inserting.
@@ -485,6 +488,7 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
const NamespaceString& ns,
+ StmtId stmtId,
const write_ops::UpdateOpEntry& op) {
globalOpCounters.gotUpdate();
auto& curOp = *CurOp::get(opCtx);
@@ -503,6 +507,7 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
request.setQuery(op.getQ());
request.setUpdates(op.getU());
request.setCollation(write_ops::collationOf(op));
+ request.setStmtId(stmtId);
request.setArrayFilters(write_ops::arrayFiltersOf(op));
request.setMulti(op.getMulti());
request.setUpsert(op.getUpsert());
@@ -582,6 +587,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation());
LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());
+ size_t stmtIdIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getUpdates().size());
for (auto&& singleOp : wholeOp.getUpdates()) {
@@ -598,7 +604,10 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
try {
lastOpFixer.startingOp();
out.results.emplace_back(
- performSingleUpdateOp(opCtx, wholeOp.getNamespace(), singleOp));
+ performSingleUpdateOp(opCtx,
+ wholeOp.getNamespace(),
+ getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++),
+ singleOp));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
const bool canContinue =
@@ -613,6 +622,7 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
const NamespaceString& ns,
+ StmtId stmtId,
const write_ops::DeleteOpEntry& op) {
globalOpCounters.gotDelete();
auto& curOp = *CurOp::get(opCtx);
@@ -632,6 +642,7 @@ static SingleWriteResult performSingleDeleteOp(OperationContext* opCtx,
request.setCollation(write_ops::collationOf(op));
request.setMulti(op.getMulti());
request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedDelete overrides this for $isolated.
+ request.setStmtId(stmtId);
ParsedDelete parsedDelete(opCtx, &request);
uassertStatusOK(parsedDelete.parseRequest());
@@ -692,6 +703,7 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation());
LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());
+ size_t stmtIdIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getDeletes().size());
for (auto&& singleOp : wholeOp.getDeletes()) {
@@ -708,7 +720,10 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
try {
lastOpFixer.startingOp();
out.results.emplace_back(
- performSingleDeleteOp(opCtx, wholeOp.getNamespace(), singleOp));
+ performSingleDeleteOp(opCtx,
+ wholeOp.getNamespace(),
+ getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++),
+ singleOp));
lastOpFixer.finishedOpSuccessfully();
} catch (const DBException& ex) {
const bool canContinue =