summaryrefslogtreecommitdiff
path: root/src/mongo/db/ops/write_ops_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/ops/write_ops_exec.cpp')
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp138
1 files changed, 59 insertions, 79 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 929d8990828..a92256312f2 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -1124,7 +1124,7 @@ WriteResult performDeletes(OperationContext* opCtx,
Status performAtomicTimeseriesWrites(
OperationContext* opCtx,
const std::vector<write_ops::InsertCommandRequest>& insertOps,
- const std::vector<write_ops::UpdateCommandRequest>& updateOps) {
+ const std::vector<write_ops::UpdateCommandRequest>& updateOps) try {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
invariant(!opCtx->inMultiDocumentTransaction());
invariant(!insertOps.empty() || !updateOps.empty());
@@ -1145,99 +1145,79 @@ Status performAtomicTimeseriesWrites(
assertCanWrite_inlock(opCtx, ns);
- try {
- WriteUnitOfWork wuow{opCtx};
+ WriteUnitOfWork wuow{opCtx};
- std::vector<repl::OpTime> oplogSlots;
- boost::optional<std::vector<repl::OpTime>::iterator> slot;
- if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns)) {
- oplogSlots = repl::getNextOpTimes(opCtx, insertOps.size() + updateOps.size());
- slot = oplogSlots.begin();
- }
+ std::vector<repl::OpTime> oplogSlots;
+ boost::optional<std::vector<repl::OpTime>::iterator> slot;
+ if (!repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, ns)) {
+ oplogSlots = repl::getNextOpTimes(opCtx, insertOps.size() + updateOps.size());
+ slot = oplogSlots.begin();
+ }
- std::vector<InsertStatement> inserts;
- inserts.reserve(insertOps.size());
+ std::vector<InsertStatement> inserts;
+ inserts.reserve(insertOps.size());
- for (auto& op : insertOps) {
- invariant(op.getDocuments().size() == 1);
+ for (auto& op : insertOps) {
+ invariant(op.getDocuments().size() == 1);
- inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds()
- : std::vector<StmtId>{kUninitializedStmtId},
- op.getDocuments().front(),
- slot ? *(*slot)++ : OplogSlot{});
- }
+ inserts.emplace_back(op.getStmtIds() ? *op.getStmtIds()
+ : std::vector<StmtId>{kUninitializedStmtId},
+ op.getDocuments().front(),
+ slot ? *(*slot)++ : OplogSlot{});
+ }
- if (!insertOps.empty()) {
- auto status =
- coll->insertDocuments(opCtx, inserts.begin(), inserts.end(), &curOp->debug());
- if (!status.isOK()) {
- return status;
- }
+ if (!insertOps.empty()) {
+ auto status = coll->insertDocuments(opCtx, inserts.begin(), inserts.end(), &curOp->debug());
+ if (!status.isOK()) {
+ return status;
}
+ }
- for (auto& op : updateOps) {
- invariant(op.getUpdates().size() == 1);
- auto& update = op.getUpdates().front();
-
- // TODO (SERVER-56270): Remove handling for non-clustered time-series collections.
- auto recordId = coll->isClustered()
- ? record_id_helpers::keyForOID(update.getQ()["_id"].OID())
- : Helpers::findOne(opCtx, *coll, update.getQ(), false);
- if (recordId.isNull()) {
- return {ErrorCodes::TimeseriesBucketCleared, "Could not find time-series bucket"};
- }
-
- auto record = coll->getCursor(opCtx)->seekExact(recordId);
- if (!record) {
- return {ErrorCodes::TimeseriesBucketCleared, "Could not find time-series bucket"};
- }
-
- auto original = record->data.toBson();
- const bool mustCheckExistenceForInsertOperations =
- static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx));
- auto [updated, indexesAffected] =
- doc_diff::applyDiff(original,
- update.getU().getDiff(),
- &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx),
- mustCheckExistenceForInsertOperations);
-
- CollectionUpdateArgs args;
- if (const auto& stmtIds = op.getStmtIds()) {
- args.stmtIds = *stmtIds;
- }
- args.preImageDoc = original;
- args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff());
- args.criteria = update.getQ();
- args.source = OperationSource::kTimeseries;
- if (slot) {
- args.oplogSlot = *(*slot)++;
- fassert(5481600,
- opCtx->recoveryUnit()->setTimestamp(args.oplogSlot->getTimestamp()));
- }
-
- coll->updateDocument(
- opCtx,
- recordId,
- Snapshotted<BSONObj>{opCtx->recoveryUnit()->getSnapshotId(), std::move(original)},
- updated,
- indexesAffected,
- &curOp->debug(),
- &args);
+ for (auto& op : updateOps) {
+ invariant(op.getUpdates().size() == 1);
+ auto& update = op.getUpdates().front();
+
+ // TODO (SERVER-56270): Remove handling for non-clustered time-series collections.
+ auto recordId = coll->isClustered()
+ ? record_id_helpers::keyForOID(update.getQ()["_id"].OID())
+ : Helpers::findOne(opCtx, *coll, update.getQ(), false);
+
+ auto original = coll->docFor(opCtx, recordId);
+ auto [updated, indexesAffected] =
+ doc_diff::applyDiff(original.value(),
+ update.getU().getDiff(),
+ &CollectionQueryInfo::get(*coll).getIndexKeys(opCtx),
+ static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx)));
+
+ CollectionUpdateArgs args;
+ if (const auto& stmtIds = op.getStmtIds()) {
+ args.stmtIds = *stmtIds;
}
-
- if (MONGO_unlikely(failAtomicTimeseriesWrites.shouldFail())) {
- return {ErrorCodes::FailPointEnabled,
- "Failing time-series writes due to failAtomicTimeseriesWrites fail point"};
+ args.preImageDoc = original.value();
+ args.update = update_oplog_entry::makeDeltaOplogEntry(update.getU().getDiff());
+ args.criteria = update.getQ();
+ args.source = OperationSource::kTimeseries;
+ if (slot) {
+ args.oplogSlot = *(*slot)++;
+ fassert(5481600, opCtx->recoveryUnit()->setTimestamp(args.oplogSlot->getTimestamp()));
}
- wuow.commit();
- } catch (const DBException& ex) {
- return ex.toStatus();
+ coll->updateDocument(
+ opCtx, recordId, original, updated, indexesAffected, &curOp->debug(), &args);
+ }
+
+ if (MONGO_unlikely(failAtomicTimeseriesWrites.shouldFail())) {
+ return {ErrorCodes::FailPointEnabled,
+ "Failing time-series writes due to failAtomicTimeseriesWrites fail point"};
}
+ wuow.commit();
+
lastOpFixer.finishedOpSuccessfully();
return Status::OK();
+} catch (const DBException& ex) {
+ return ex.toStatus();
}
void recordUpdateResultInOpDebug(const UpdateResult& updateResult, OpDebug* opDebug) {