summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorYuhong Zhang <yuhong.zhang@mongodb.com>2023-04-21 16:19:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-24 02:09:29 +0000
commit62a6c820b41e884cfd7930bdfff48e826c8df348 (patch)
tree5063a56a96ddb2922f8616bafd914582f0007596 /src/mongo/db/exec
parente3e4b4d6a54a671e564d8bd9e4b34188ab4accd0 (diff)
downloadmongo-62a6c820b41e884cfd7930bdfff48e826c8df348.tar.gz
SERVER-73286 Extend timeseries::performAtomicWrites() to support user updates
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/timeseries_modify.cpp37
1 files changed, 18 insertions, 19 deletions
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp
index e1c1d077581..26c10ebf3af 100644
--- a/src/mongo/db/exec/timeseries_modify.cpp
+++ b/src/mongo/db/exec/timeseries_modify.cpp
@@ -133,7 +133,7 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets(
write_ops::DeleteCommandRequest op(collection()->ns(), {deleteEntry});
auto result = timeseries::performAtomicWrites(
- opCtx(), collection(), recordId, op, bucketFromMigrate, _params->stmtId);
+ opCtx(), collection(), recordId, op, {}, bucketFromMigrate, _params->stmtId);
if (!result.isOK()) {
return yieldAndRetry(7309300);
}
@@ -160,16 +160,17 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets(
write_ops::UpdateCommandRequest op(collection()->ns(), {updateEntry});
auto result = timeseries::performAtomicWrites(
- opCtx(), collection(), recordId, op, bucketFromMigrate, _params->stmtId);
+ opCtx(), collection(), recordId, op, {}, bucketFromMigrate, _params->stmtId);
if (!result.isOK()) {
return yieldAndRetry(7309301);
}
}
+
_specificStats.nMeasurementsDeleted += deletedMeasurements.size();
- // As restoreState may restore (recreate) cursors, cursors are tied to the
- // transaction in which they are created, and a WriteUnitOfWork is a transaction,
- // make sure to restore the state outside of the WriteUnitOfWork.
+ // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in which
+ // they are created, and a WriteUnitOfWork is a transaction, make sure to restore the state
+ // outside of the WriteUnitOfWork.
return handlePlanStageYield(
expCtx(),
"TimeseriesModifyStage restoreState",
@@ -179,9 +180,8 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets(
return PlanStage::NEED_TIME;
},
// yieldHandler
- // Note we don't need to retry anything in this case since the delete already
- // was committed. However, we still need to return the deleted document (if it
- // was requested).
+ // Note we don't need to retry anything in this case since the delete already was committed.
+ // However, we still need to return the deleted document (if it was requested).
// TODO SERVER-73089 for findAndModify we need to return the deleted doc.
[&] { /* noop */ });
}
@@ -190,9 +190,9 @@ template <typename F>
std::pair<boost::optional<PlanStage::StageState>, bool>
TimeseriesModifyStage::_checkIfWritingToOrphanedBucket(ScopeGuard<F>& bucketFreer,
WorkingSetID id) {
- // If we are in explain mode, we do not need to check if the bucket is orphaned since
- // we're not writing to bucket. If we are migrating a bucket, we also do not need to
- // check if the bucket is not writable and just return it.
+ // If we are in explain mode, we do not need to check if the bucket is orphaned since we're not
+ // writing to bucket. If we are migrating a bucket, we also do not need to check if the bucket
+ // is not writable and just return it.
if (_params->isExplain || _params->fromMigrate) {
return {boost::none, _params->fromMigrate};
}
@@ -220,9 +220,8 @@ PlanStage::StageState TimeseriesModifyStage::_getNextBucket(WorkingSetID& id) {
_retryBucketId = WorkingSet::INVALID_ID;
}
- // We may not have an up-to-date bucket for this RecordId. Fetch it and ensure that it
- // still exists and matches our bucket-level predicate if it is not believed to be
- // up-to-date.
+ // We may not have an up-to-date bucket for this RecordId. Fetch it and ensure that it still
+ // exists and matches our bucket-level predicate if it is not believed to be up-to-date.
bool docStillMatches;
const auto status = handlePlanStageYield(
@@ -274,16 +273,16 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) {
return status;
}
- // We want to free this member when we return because we either have an owned copy of
- // the bucket for normal write and write to orphan cases, or we skip the bucket.
+ // We want to free this member when we return because we either have an owned copy of the bucket
+ // for normal write and write to orphan cases, or we skip the bucket.
ScopeGuard bucketFreer([&] { _ws->free(id); });
auto member = _ws->get(id);
tassert(7459100, "Expected a RecordId from the child stage", member->hasRecordId());
- // Determine if we are writing to an orphaned bucket - such writes should be excluded
- // from user-visible change stream events. This will be achieved later by setting
- // 'fromMigrate' flag when calling performAtomicWrites().
+ // Determine if we are writing to an orphaned bucket - such writes should be excluded from
+ // user-visible change stream events. This will be achieved later by setting 'fromMigrate' flag
+ // when calling performAtomicWrites().
auto [immediateReturnStageState, bucketFromMigrate] =
_checkIfWritingToOrphanedBucket(bucketFreer, id);
if (immediateReturnStageState) {