diff options
author | Yuhong Zhang <yuhong.zhang@mongodb.com> | 2023-04-21 16:19:50 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-24 02:09:29 +0000 |
commit | 62a6c820b41e884cfd7930bdfff48e826c8df348 (patch) | |
tree | 5063a56a96ddb2922f8616bafd914582f0007596 /src/mongo/db/exec | |
parent | e3e4b4d6a54a671e564d8bd9e4b34188ab4accd0 (diff) | |
download | mongo-62a6c820b41e884cfd7930bdfff48e826c8df348.tar.gz |
SERVER-73286 Extend timeseries::performAtomicWrites() to support user updates
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.cpp | 37 |
1 files changed, 18 insertions, 19 deletions
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp index e1c1d077581..26c10ebf3af 100644 --- a/src/mongo/db/exec/timeseries_modify.cpp +++ b/src/mongo/db/exec/timeseries_modify.cpp @@ -133,7 +133,7 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( write_ops::DeleteCommandRequest op(collection()->ns(), {deleteEntry}); auto result = timeseries::performAtomicWrites( - opCtx(), collection(), recordId, op, bucketFromMigrate, _params->stmtId); + opCtx(), collection(), recordId, op, {}, bucketFromMigrate, _params->stmtId); if (!result.isOK()) { return yieldAndRetry(7309300); } @@ -160,16 +160,17 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( write_ops::UpdateCommandRequest op(collection()->ns(), {updateEntry}); auto result = timeseries::performAtomicWrites( - opCtx(), collection(), recordId, op, bucketFromMigrate, _params->stmtId); + opCtx(), collection(), recordId, op, {}, bucketFromMigrate, _params->stmtId); if (!result.isOK()) { return yieldAndRetry(7309301); } } + _specificStats.nMeasurementsDeleted += deletedMeasurements.size(); - // As restoreState may restore (recreate) cursors, cursors are tied to the - // transaction in which they are created, and a WriteUnitOfWork is a transaction, - // make sure to restore the state outside of the WriteUnitOfWork. + // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in which + // they are created, and a WriteUnitOfWork is a transaction, make sure to restore the state + // outside of the WriteUnitOfWork. return handlePlanStageYield( expCtx(), "TimeseriesModifyStage restoreState", @@ -179,9 +180,8 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets( return PlanStage::NEED_TIME; }, // yieldHandler - // Note we don't need to retry anything in this case since the delete already - // was committed. However, we still need to return the deleted document (if it - // was requested). + // Note we don't need to retry anything in this case since the delete already was committed. + // However, we still need to return the deleted document (if it was requested). // TODO SERVER-73089 for findAndModify we need to return the deleted doc. [&] { /* noop */ }); } @@ -190,9 +190,9 @@ template <typename F> std::pair<boost::optional<PlanStage::StageState>, bool> TimeseriesModifyStage::_checkIfWritingToOrphanedBucket(ScopeGuard<F>& bucketFreer, WorkingSetID id) { - // If we are in explain mode, we do not need to check if the bucket is orphaned since - // we're not writing to bucket. If we are migrating a bucket, we also do not need to - // check if the bucket is not writable and just return it. + // If we are in explain mode, we do not need to check if the bucket is orphaned since we're not + // writing to bucket. If we are migrating a bucket, we also do not need to check if the bucket + // is not writable and just return it. if (_params->isExplain || _params->fromMigrate) { return {boost::none, _params->fromMigrate}; } @@ -220,9 +220,8 @@ PlanStage::StageState TimeseriesModifyStage::_getNextBucket(WorkingSetID& id) { _retryBucketId = WorkingSet::INVALID_ID; } - // We may not have an up-to-date bucket for this RecordId. Fetch it and ensure that it - // still exists and matches our bucket-level predicate if it is not believed to be - // up-to-date. + // We may not have an up-to-date bucket for this RecordId. Fetch it and ensure that it still + // exists and matches our bucket-level predicate if it is not believed to be up-to-date. bool docStillMatches; const auto status = handlePlanStageYield( @@ -274,16 +273,16 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) { return status; } - // We want to free this member when we return because we either have an owned copy of - // the bucket for normal write and write to orphan cases, or we skip the bucket. + // We want to free this member when we return because we either have an owned copy of the bucket + // for normal write and write to orphan cases, or we skip the bucket. ScopeGuard bucketFreer([&] { _ws->free(id); }); auto member = _ws->get(id); tassert(7459100, "Expected a RecordId from the child stage", member->hasRecordId()); - // Determine if we are writing to an orphaned bucket - such writes should be excluded - // from user-visible change stream events. This will be achieved later by setting - // 'fromMigrate' flag when calling performAtomicWrites(). + // Determine if we are writing to an orphaned bucket - such writes should be excluded from + // user-visible change stream events. This will be achieved later by setting 'fromMigrate' flag + // when calling performAtomicWrites(). auto [immediateReturnStageState, bucketFromMigrate] = _checkIfWritingToOrphanedBucket(bucketFreer, id); if (immediateReturnStageState) { |