summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/timeseries_modify.cpp
diff options
context:
space:
mode:
authorAlyssa Wagenmaker <alyssa.wagenmaker@mongodb.com>2023-05-04 17:34:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-04 19:09:24 +0000
commit41d8e65dc85cf7a693fa3d46d7caf84b9dff802f (patch)
tree0d6cdf475ea7380e0502f4cea4757718d32d472f /src/mongo/db/exec/timeseries_modify.cpp
parent4973d6f85a1a394aff14d4a2124ba993d666c604 (diff)
downloadmongo-41d8e65dc85cf7a693fa3d46d7caf84b9dff802f.tar.gz
SERVER-76625 Support basic modifier updates in TS_MODIFY
Diffstat (limited to 'src/mongo/db/exec/timeseries_modify.cpp')
-rw-r--r--src/mongo/db/exec/timeseries_modify.cpp97
1 files changed, 88 insertions, 9 deletions
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp
index a998e23e46a..f1b96a0c80f 100644
--- a/src/mongo/db/exec/timeseries_modify.cpp
+++ b/src/mongo/db/exec/timeseries_modify.cpp
@@ -53,7 +53,6 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx,
_bucketUnpacker{std::move(bucketUnpacker)},
_residualPredicate(std::move(residualPredicate)),
_preWriteFilter(opCtx(), coll->ns()) {
- uassert(ErrorCodes::InvalidOptions, "Arbitrary updates not yet enabled", !_params.isUpdate);
tassert(7308200,
"Multi deletes must have a residual predicate",
_isSingletonWrite() || _residualPredicate || _params.isUpdate);
@@ -84,7 +83,7 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx,
// TODO SERVER-73143 Enable these cases.
uassert(ErrorCodes::InvalidOptions,
"Timeseries arbitrary updates must be modifier updates",
- !_specificStats.isModUpdate);
+ !_params.isUpdate || _specificStats.isModUpdate);
}
bool TimeseriesModifyStage::isEOF() {
@@ -106,14 +105,96 @@ std::unique_ptr<PlanStageStats> TimeseriesModifyStage::getStats() {
return ret;
}
+
+std::pair<std::vector<BSONObj>, std::vector<write_ops::InsertCommandRequest>>
+TimeseriesModifyStage::_buildInsertOps(const std::vector<BSONObj>& matchedMeasurements,
+ std::vector<BSONObj>& unchangedMeasurements) {
+ // Determine which documents to update based on which ones are actually being changed.
+ std::vector<BSONObj> modifiedMeasurements;
+
+ const bool isUserInitiatedWrite = opCtx()->writesAreReplicated() &&
+ !(_params.isFromOplogApplication ||
+ _params.updateDriver->type() == UpdateDriver::UpdateType::kDelta || _params.fromMigrate);
+
+ for (auto&& measurement : matchedMeasurements) {
+ // Timeseries updates are never in place, because we execute them as a delete of the old
+ // measurement plus an insert of the modified one.
+ mutablebson::Document doc(measurement, mutablebson::Document::kInPlaceDisabled);
+
+ // Note that timeseries measurment _ids are allowed to be changed, so we don't have any
+ // immutable fields.
+ FieldRefSet immutablePaths;
+ const bool isInsert = false;
+ bool docWasModified = false;
+
+ if (!_params.updateDriver->needMatchDetails()) {
+ uassertStatusOK(_params.updateDriver->update(opCtx(),
+ "",
+ &doc,
+ isUserInitiatedWrite,
+ immutablePaths,
+ isInsert,
+ nullptr,
+ &docWasModified));
+ } else {
+ // If there was a matched field, obtain it.
+ MatchDetails matchDetails;
+ matchDetails.requestElemMatchKey();
+
+ // We have to re-apply the filter to get the matched element.
+ tassert(7662500,
+ "measurement must pass filter",
+ _residualPredicate->matchesBSON(measurement, &matchDetails));
+
+ uassertStatusOK(_params.updateDriver->update(
+ opCtx(),
+ matchDetails.hasElemMatchKey() ? matchDetails.elemMatchKey() : "",
+ &doc,
+ isUserInitiatedWrite,
+ immutablePaths,
+ isInsert,
+ nullptr,
+ &docWasModified));
+ }
+
+ if (docWasModified) {
+ modifiedMeasurements.emplace_back(doc.getObject());
+ } else {
+ // The document wasn't modified, write it back to the original bucket unchanged.
+ unchangedMeasurements.emplace_back(std::move(measurement));
+ }
+ }
+
+ auto insertOps = timeseries::makeInsertsToNewBuckets(modifiedMeasurements,
+ collection()->ns(),
+ *collection()->getTimeseriesOptions(),
+ collection()->getDefaultCollator());
+ return std::make_pair<std::vector<BSONObj>, std::vector<write_ops::InsertCommandRequest>>(
+ std::move(modifiedMeasurements), std::move(insertOps));
+}
+
template <typename F>
std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseriesBuckets(
ScopeGuard<F>& bucketFreer,
WorkingSetID bucketWsmId,
- const std::vector<BSONObj>& unchangedMeasurements,
- const std::vector<BSONObj>& modifiedMeasurements,
+ std::vector<BSONObj>&& unchangedMeasurements,
+ const std::vector<BSONObj>& matchedMeasurements,
bool bucketFromMigrate) {
- // No measurements needed to be deleted from the bucket document.
+ // No measurements needed to be updated or deleted from the bucket document.
+ if (matchedMeasurements.empty()) {
+ return {false, PlanStage::NEED_TIME};
+ }
+ _specificStats.nMeasurementsMatched += matchedMeasurements.size();
+
+ auto updateResult = _params.isUpdate
+ ? _buildInsertOps(matchedMeasurements, unchangedMeasurements)
+ : std::pair<std::vector<BSONObj>, std::vector<write_ops::InsertCommandRequest>>{};
+
+ // If this is a delete, we will be deleting all matched measurements. If this is an update, we
+ // may not need to modify all measurements, since some may be no-op updates.
+ const auto& modifiedMeasurements = _params.isUpdate ? updateResult.first : matchedMeasurements;
+
+ // After applying the updates, no measurements needed to be updated in the bucket document.
if (modifiedMeasurements.empty()) {
return {false, PlanStage::NEED_TIME};
}
@@ -122,7 +203,6 @@ std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseries
// stats and let the caller think as if the write succeeded if there's any deleted measurement.
if (_params.isExplain) {
_specificStats.nMeasurementsModified += modifiedMeasurements.size();
- _specificStats.nMeasurementsMatched += modifiedMeasurements.size();
return {true, PlanStage::NEED_TIME};
}
@@ -154,7 +234,7 @@ std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseries
collection(),
recordId,
modificationOp,
- {},
+ updateResult.second,
bucketFromMigrate,
_params.stmtId);
return PlanStage::NEED_TIME;
@@ -183,7 +263,6 @@ std::pair<bool, PlanStage::StageState> TimeseriesModifyStage::_writeToTimeseries
}
throw;
}
- _specificStats.nMeasurementsMatched += modifiedMeasurements.size();
_specificStats.nMeasurementsModified += modifiedMeasurements.size();
// As restoreState may restore (recreate) cursors, cursors are tied to the transaction in which
@@ -356,7 +435,7 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) {
auto isWriteSuccessful = false;
std::tie(isWriteSuccessful, status) = _writeToTimeseriesBuckets(
- bucketFreer, id, unchangedMeasurements, modifiedMeasurements, bucketFromMigrate);
+ bucketFreer, id, std::move(unchangedMeasurements), modifiedMeasurements, bucketFromMigrate);
if (status != PlanStage::NEED_TIME) {
*out = WorkingSet::INVALID_ID;
if (_params.returnDeleted && isWriteSuccessful) {