diff options
author | Alyssa Wagenmaker <alyssa.wagenmaker@mongodb.com> | 2023-05-04 17:34:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-04 19:09:24 +0000 |
commit | 41d8e65dc85cf7a693fa3d46d7caf84b9dff802f (patch) | |
tree | 0d6cdf475ea7380e0502f4cea4757718d32d472f /src/mongo/db/exec/timeseries_modify.cpp | |
parent | 4973d6f85a1a394aff14d4a2124ba993d666c604 (diff) | |
download | mongo-41d8e65dc85cf7a693fa3d46d7caf84b9dff802f.tar.gz |
SERVER-76625 Support basic modifier updates in TS_MODIFY
Diffstat (limited to 'src/mongo/db/exec/timeseries_modify.cpp')
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.cpp | 97 |
1 files changed, 88 insertions, 9 deletions
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp index a998e23e46a..f1b96a0c80f 100644 --- a/src/mongo/db/exec/timeseries_modify.cpp +++ b/src/mongo/db/exec/timeseries_modify.cpp @@ -53,7 +53,6 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx, _bucketUnpacker{std::move(bucketUnpacker)}, _residualPredicate(std::move(residualPredicate)), _preWriteFilter(opCtx(), coll->ns()) { - uassert(ErrorCodes::InvalidOptions, "Arbitrary updates not yet enabled", !_params.isUpdate); tassert(7308200, "Multi deletes must have a residual predicate", _isSingletonWrite() || _residualPredicate || _params.isUpdate); @@ -84,7 +83,7 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx, // TODO SERVER-73143 Enable these cases. uassert(ErrorCodes::InvalidOptions, "Timeseries arbitrary updates must be modifier updates", - !_specificStats.isModUpdate); + !_params.isUpdate || _specificStats.isModUpdate); } bool TimeseriesModifyStage::isEOF() { @@ -106,14 +105,96 @@ std::unique_ptr<PlanStageStats> TimeseriesModifyStage::getStats() { return ret; } + +std::pair<std::vector<BSONObj>, std::vector<write_ops::InsertCommandRequest>> +TimeseriesModifyStage::_buildInsertOps(const std::vector<BSONObj>& matchedMeasurements, + std::vector<BSONObj>& unchangedMeasurements) { + // Determine which documents to update based on which ones are actually being changed. + std::vector<BSONObj> modifiedMeasurements; + + const bool isUserInitiatedWrite = opCtx()->writesAreReplicated() && + !(_params.isFromOplogApplication || + _params.updateDriver->type() == UpdateDriver::UpdateType::kDelta || _params.fromMigrate); + + for (auto&& measurement : matchedMeasurements) { + // Timeseries updates are never in place, because we execute them as a delete of the old + // measurement plus an insert of the modified one. + mutablebson::Document doc(measurement, mutablebson::Document::kInPlaceDisabled); + + // Note that timeseries measurment _ids are allowed to be changed, so we don't have any + // immutable fields. + FieldRefSet immutablePaths; + const bool isInsert = false; + bool docWasModified = false; + + if (!_params.updateDriver->needMatchDetails()) { + uassertStatusOK(_params.updateDriver->update(opCtx(), + "", + &doc, + isUserInitiatedWrite, + immutablePaths, + isInsert, + nullptr, + &docWasModified)); + } else { + // If there was a matched field, obtain it. + MatchDetails matchDetails; + matchDetails.requestElemMatchKey(); + + // We have to re-apply the filter to get the matched element. + tassert(7662500, + "measurement must pass filter", + _residualPredicate->matchesBSON(measurement, &matchDetails)); + + uassertStatusOK(_params.updateDriver->update( + opCtx(), + matchDetails.hasElemMatchKey() ? matchDetails.elemMatchKey() : "", + &doc, + isUserInitiatedWrite, + immutablePaths, + isInsert, + nullptr, + &docWasModified)); + } + + if (docWasModified) { + modifiedMeasurements.emplace_back(doc.getObject()); + } else { + // The document wasn't modified, write it back to the original bucket unchanged. + unchangedMeasurements.emplace_back(std::move(measurement)); + } + } + + auto insertOps = timeseries::makeInsertsToNewBuckets(modifiedMeasurements, + collection()->ns(), + *collection()->getTimeseriesOptions(), + collection()->getDefaultCollator()); + return std::make_pair<std::vector<BSONObj>, std::vector<write_ops::InsertCommandRequest>>( + std::move(modifiedMeasurements), std::move(insertOps)); +} + template <typename F> std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseriesBuckets( ScopeGuard<F>& bucketFreer, WorkingSetID bucketWsmId, - const std::vector<BSONObj>& unchangedMeasurements, - const std::vector<BSONObj>& modifiedMeasurements, + std::vector<BSONObj>&& unchangedMeasurements, + const std::vector<BSONObj>& matchedMeasurements, bool bucketFromMigrate) { - // No measurements needed to be deleted from the bucket document. + // No measurements needed to be updated or deleted from the bucket document. + if (matchedMeasurements.empty()) { + return {false, PlanStage::NEED_TIME}; + } + _specificStats.nMeasurementsMatched += matchedMeasurements.size(); + + auto updateResult = _params.isUpdate + ? _buildInsertOps(matchedMeasurements, unchangedMeasurements) + : std::pair<std::vector<BSONObj>, std::vector<write_ops::InsertCommandRequest>>{}; + + // If this is a delete, we will be deleting all matched measurements. If this is an update, we + // may not need to modify all measurements, since some may be no-op updates. + const auto& modifiedMeasurements = _params.isUpdate ? updateResult.first : matchedMeasurements; + + // After applying the updates, no measurements needed to be updated in the bucket document. if (modifiedMeasurements.empty()) { return {false, PlanStage::NEED_TIME}; } @@ -122,7 +203,6 @@ std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseries // stats and let the caller think as if the write succeeded if there's any deleted measurement. if (_params.isExplain) { _specificStats.nMeasurementsModified += modifiedMeasurements.size(); - _specificStats.nMeasurementsMatched += modifiedMeasurements.size(); return {true, PlanStage::NEED_TIME}; } @@ -154,7 +234,7 @@ std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseries collection(), recordId, modificationOp, - {}, + updateResult.second, bucketFromMigrate, _params.stmtId); return PlanStage::NEED_TIME; @@ -183,7 +263,6 @@ std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseries } throw; } - _specificStats.nMeasurementsMatched += modifiedMeasurements.size(); _specificStats.nMeasurementsModified += modifiedMeasurements.size(); // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in which @@ -356,7 +435,7 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) { auto isWriteSuccessful = false; std::tie(isWriteSuccessful, status) = _writeToTimeseriesBuckets( - bucketFreer, id, unchangedMeasurements, modifiedMeasurements, bucketFromMigrate); + bucketFreer, id, std::move(unchangedMeasurements), modifiedMeasurements, bucketFromMigrate); if (status != PlanStage::NEED_TIME) { *out = WorkingSet::INVALID_ID; if (_params.returnDeleted && isWriteSuccessful) { |