diff options
Diffstat (limited to 'src/mongo/db/ops/write_ops_exec.cpp')
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 138 |
1 files changed, 59 insertions, 79 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 929d8990828..a92256312f2 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -1124,7 +1124,7 @@ WriteResult performDeletes(OperationContext* opCtx, Status performAtomicTimeseriesWrites( OperationContext* opCtx, const std::vector<write_ops::InsertCommandRequest>& insertOps, - const std::vector<write_ops::UpdateCommandRequest>& updateOps) { + const std::vector<write_ops::UpdateCommandRequest>& updateOps) try { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); invariant(!opCtx->inMultiDocumentTransaction()); invariant(!insertOps.empty() || !updateOps.empty()); @@ -1145,99 +1145,79 @@ Status performAtomicTimeseriesWrites( assertCanWrite_inlock(opCtx, ns); - try { - WriteUnitOfWork wuow{opCtx}; + WriteUnitOfWork wuow{opCtx}; - std::vector<repl::OpTime> oplogSlots; - boost::optional<std::vector<repl::OpTime>::iterator> slot; - if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns)) { - oplogSlots = repl::getNextOpTimes(opCtx, insertOps.size() + updateOps.size()); - slot = oplogSlots.begin(); - } + std::vector<repl::OpTime> oplogSlots; + boost::optional<std::vector<repl::OpTime>::iterator> slot; + if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns)) { + oplogSlots = repl::getNextOpTimes(opCtx, insertOps.size() + updateOps.size()); + slot = oplogSlots.begin(); + } - std::vector<InsertStatement> inserts; - inserts.reserve(insertOps.size()); + std::vector<InsertStatement> inserts; + inserts.reserve(insertOps.size()); - for (auto& op : insertOps) { - invariant(op.getDocuments().size() == 1); + for (auto& op : insertOps) { + invariant(op.getDocuments().size() == 1); - inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds() - : std::vector<StmtId>{kUninitializedStmtId}, - op.getDocuments().front(), - slot ? *(*slot)++ : OplogSlot{}); - } + inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds() + : std::vector<StmtId>{kUninitializedStmtId}, + op.getDocuments().front(), + slot ? *(*slot)++ : OplogSlot{}); + } - if (!insertOps.empty()) { - auto status = - coll->insertDocuments(opCtx, inserts.begin(), inserts.end(), &curOp->debug()); - if (!status.isOK()) { - return status; - } + if (!insertOps.empty()) { + auto status = coll->insertDocuments(opCtx, inserts.begin(), inserts.end(), &curOp->debug()); + if (!status.isOK()) { + return status; } + } - for (auto& op : updateOps) { - invariant(op.getUpdates().size() == 1); - auto& update = op.getUpdates().front(); - - // TODO (SERVER-56270): Remove handling for non-clustered time-series collections. - auto recordId = coll->isClustered() - ? record_id_helpers::keyForOID(update.getQ()["_id"].OID()) - : Helpers::findOne(opCtx, *coll, update.getQ(), false); - if (recordId.isNull()) { - return {ErrorCodes::TimeseriesBucketCleared, "Could not find time-series bucket"}; - } - - auto record = coll->getCursor(opCtx)->seekExact(recordId); - if (!record) { - return {ErrorCodes::TimeseriesBucketCleared, "Could not find time-series bucket"}; - } - - auto original = record->data.toBson(); - const bool mustCheckExistenceForInsertOperations = - static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx)); - auto [updated, indexesAffected] = - doc_diff::applyDiff(original, - update.getU().getDiff(), - &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx), - mustCheckExistenceForInsertOperations); - - CollectionUpdateArgs args; - if (const auto& stmtIds = op.getStmtIds()) { - args.stmtIds = *stmtIds; - } - args.preImageDoc = original; - args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff()); - args.criteria = update.getQ(); - args.source = OperationSource::kTimeseries; - if (slot) { - args.oplogSlot = *(*slot)++; - fassert(5481600, - opCtx->recoveryUnit()->setTimestamp(args.oplogSlot->getTimestamp())); - } - - coll->updateDocument( - opCtx, - recordId, - Snapshotted<BSONObj>{opCtx->recoveryUnit()->getSnapshotId(), std::move(original)}, - updated, - indexesAffected, - &curOp->debug(), - &args); + for (auto& op : updateOps) { + invariant(op.getUpdates().size() == 1); + auto& update = op.getUpdates().front(); + + // TODO (SERVER-56270): Remove handling for non-clustered time-series collections. + auto recordId = coll->isClustered() + ? record_id_helpers::keyForOID(update.getQ()["_id"].OID()) + : Helpers::findOne(opCtx, *coll, update.getQ(), false); + + auto original = coll->docFor(opCtx, recordId); + auto [updated, indexesAffected] = + doc_diff::applyDiff(original.value(), + update.getU().getDiff(), + &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx), + static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx))); + + CollectionUpdateArgs args; + if (const auto& stmtIds = op.getStmtIds()) { + args.stmtIds = *stmtIds; } - - if (MONGO_unlikely(failAtomicTimeseriesWrites.shouldFail())) { - return {ErrorCodes::FailPointEnabled, - "Failing time-series writes due to failAtomicTimeseriesWrites fail point"}; + args.preImageDoc = original.value(); + args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff()); + args.criteria = update.getQ(); + args.source = OperationSource::kTimeseries; + if (slot) { + args.oplogSlot = *(*slot)++; + fassert(5481600, opCtx->recoveryUnit()->setTimestamp(args.oplogSlot->getTimestamp())); } - wuow.commit(); - } catch (const DBException& ex) { - return ex.toStatus(); + coll->updateDocument( + opCtx, recordId, original, updated, indexesAffected, &curOp->debug(), &args); + } + + if (MONGO_unlikely(failAtomicTimeseriesWrites.shouldFail())) { + return {ErrorCodes::FailPointEnabled, + "Failing time-series writes due to failAtomicTimeseriesWrites fail point"}; } + wuow.commit(); + lastOpFixer.finishedOpSuccessfully(); return Status::OK(); +} catch (const DBException& ex) { + return ex.toStatus(); } void recordUpdateResultInOpDebug(const UpdateResult& updateResult, OpDebug* opDebug) { |