summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/timeseries_modify.cpp266
-rw-r--r--src/mongo/db/exec/timeseries_modify.h33
2 files changed, 118 insertions, 181 deletions
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp
index c2298ff645d..f0e0f274923 100644
--- a/src/mongo/db/exec/timeseries_modify.cpp
+++ b/src/mongo/db/exec/timeseries_modify.cpp
@@ -55,8 +55,7 @@ TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx,
}
bool TimeseriesModifyStage::isEOF() {
- return !_bucketUnpacker.hasNext() && child()->isEOF() &&
- _retryBucketId == WorkingSet::INVALID_ID;
+ return child()->isEOF() && _retryBucketId == WorkingSet::INVALID_ID;
}
std::unique_ptr<PlanStageStats> TimeseriesModifyStage::getStats() {
@@ -69,33 +68,18 @@ std::unique_ptr<PlanStageStats> TimeseriesModifyStage::getStats() {
return ret;
}
-void TimeseriesModifyStage::resetCurrentBucket() {
- _deletedMeasurements.clear();
- _unchangedMeasurements.clear();
- _currentBucketFromMigrate = false;
- _currentBucketRid = RecordId{};
- _currentBucketSnapshotId = SnapshotId{};
-}
-
-PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets() {
- ON_BLOCK_EXIT([&] { resetCurrentBucket(); });
-
+PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets(
+ WorkingSetID bucketWsmId,
+ const std::vector<BSONObj>& unchangedMeasurements,
+ const std::vector<BSONObj>& deletedMeasurements,
+ bool bucketFromMigrate) {
if (_params->isExplain) {
- _specificStats.measurementsDeleted += _deletedMeasurements.size();
+ _specificStats.measurementsDeleted += deletedMeasurements.size();
return PlanStage::NEED_TIME;
}
// No measurements needed to be deleted from the bucket document.
- if (_deletedMeasurements.empty()) {
- return PlanStage::NEED_TIME;
- }
-
- if (opCtx()->recoveryUnit()->getSnapshotId() != _currentBucketSnapshotId) {
- // The snapshot has changed, so we have no way to prove that the bucket we're
- // unwinding still exists in the same shape it did originally. If it has changed, we
- // risk re-inserting a measurement that we can see in our cache but which has
- // actually since been deleted. So we have to fetch and retry this bucket.
- _retryBucket(_currentBucketRid);
+ if (deletedMeasurements.empty()) {
return PlanStage::NEED_TIME;
}
@@ -112,29 +96,35 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets() {
std::terminate();
});
- OID bucketId = record_id_helpers::toBSONAs(_currentBucketRid, "_id")["_id"].OID();
- if (_unchangedMeasurements.empty()) {
+ auto recordId = _ws->get(bucketWsmId)->recordId;
+
+ auto yieldAndRetry = [&](unsigned logId) {
+ LOGV2_DEBUG(logId,
+ 5,
+ "Retrying bucket due to conflict attempting to write out changes",
+ "bucket_rid"_attr = recordId);
+ _retryBucket(bucketWsmId);
+ return PlanStage::NEED_YIELD;
+ };
+
+ OID bucketId = record_id_helpers::toBSONAs(recordId, "_id")["_id"].OID();
+ if (unchangedMeasurements.empty()) {
write_ops::DeleteOpEntry deleteEntry(BSON("_id" << bucketId), false);
write_ops::DeleteCommandRequest op(collection()->ns(), {deleteEntry});
- auto result = timeseries::performAtomicWrites(
- opCtx(), collection(), _currentBucketRid, op, _currentBucketFromMigrate);
+ auto result =
+ timeseries::performAtomicWrites(opCtx(), collection(), recordId, op, bucketFromMigrate);
if (!result.isOK()) {
- LOGV2_DEBUG(7309300,
- 5,
- "Retrying bucket due to conflict attempting to write out changes",
- "bucket_rid"_attr = _currentBucketRid);
- _retryBucket(_currentBucketRid);
- return PlanStage::NEED_YIELD;
+ return yieldAndRetry(7309300);
}
} else {
auto timeseriesOptions = collection()->getTimeseriesOptions();
auto metaFieldName = timeseriesOptions->getMetaField();
auto metadata =
- metaFieldName ? _unchangedMeasurements[0].getField(*metaFieldName).wrap() : BSONObj();
+ metaFieldName ? unchangedMeasurements[0].getField(*metaFieldName).wrap() : BSONObj();
auto replaceBucket =
timeseries::makeNewDocumentForWrite(bucketId,
- _unchangedMeasurements,
+ unchangedMeasurements,
metadata,
timeseriesOptions,
collection()->getDefaultCollator());
@@ -143,22 +133,17 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets() {
write_ops::UpdateOpEntry updateEntry(BSON("_id" << bucketId), std::move(u));
write_ops::UpdateCommandRequest op(collection()->ns(), {updateEntry});
- auto result = timeseries::performAtomicWrites(
- opCtx(), collection(), _currentBucketRid, op, _currentBucketFromMigrate);
+ auto result =
+ timeseries::performAtomicWrites(opCtx(), collection(), recordId, op, bucketFromMigrate);
if (!result.isOK()) {
- LOGV2_DEBUG(7309301,
- 5,
- "Retrying bucket due to conflict attempting to write out changes",
- "bucket_rid"_attr = _currentBucketRid);
- _retryBucket(_currentBucketRid);
- return PlanStage::NEED_YIELD;
+ return yieldAndRetry(7309301);
}
}
- _specificStats.measurementsDeleted += _deletedMeasurements.size();
+ _specificStats.measurementsDeleted += deletedMeasurements.size();
- // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in
- // which they are created, and a WriteUnitOfWork is a transaction, make sure to restore the
- // state outside of the WriteUnitOfWork.
+ // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in which
+ // they are created, and a WriteUnitOfWork is a transaction, make sure to restore the state
+ // outside of the WriteUnitOfWork.
return handlePlanStageYield(
expCtx(),
"TimeseriesModifyStage restoreState",
@@ -168,46 +153,33 @@ PlanStage::StageState TimeseriesModifyStage::_writeToTimeseriesBuckets() {
return PlanStage::NEED_TIME;
},
// yieldHandler
- // Note we don't need to retry anything in this case since the
- // delete already was committed. However, we still need to
- // return the deleted document (if it was requested).
- // TODO for findAndModify we need to return the deleted doc.
+ // Note we don't need to retry anything in this case since the delete already was committed.
+ // However, we still need to return the deleted document (if it was requested).
+ // TODO SERVER-73089 for findAndModify we need to return the deleted doc.
[&] { /* noop */ });
}
template <typename F>
-boost::optional<PlanStage::StageState> TimeseriesModifyStage::_rememberIfWritingToOrphanedBucket(
- ScopeGuard<F>& bucketFreer, WorkingSetID id) {
+std::pair<boost::optional<PlanStage::StageState>, bool>
+TimeseriesModifyStage::_checkIfWritingToOrphanedBucket(ScopeGuard<F>& bucketFreer,
+ WorkingSetID id) {
// If we are in explain mode, we do not need to check if the bucket is orphaned since we're not
// writing to bucket. If we are migrating a bucket, we also do not need to check if the bucket
- // is not writable and just remember it.
+ // is not writable and just return it.
if (_params->isExplain || _params->fromMigrate) {
- _currentBucketFromMigrate = _params->fromMigrate;
- return boost::none;
+ return {boost::none, _params->fromMigrate};
}
-
- auto [immediateReturnStageState, currentBucketFromMigrate] =
- _preWriteFilter.checkIfNotWritable(_ws->get(id)->doc.value(),
- "timeseriesDelete"_sd,
- collection()->ns(),
- [&](const ExceptionFor<ErrorCodes::StaleConfig>& ex) {
- planExecutorShardingCriticalSectionFuture(opCtx()) =
- ex->getCriticalSectionSignal();
- // Retry the write if we're in the sharding critical
- // section.
- bucketFreer.dismiss();
- _retryBucket(id);
- });
-
- // We need to immediately return if the bucket is orphaned or we're in the sharding critical
- // section and hence should yield.
- if (immediateReturnStageState) {
- return *immediateReturnStageState;
- }
-
- _currentBucketFromMigrate = currentBucketFromMigrate;
-
- return boost::none;
+ return _preWriteFilter.checkIfNotWritable(_ws->get(id)->doc.value(),
+ "timeseriesDelete"_sd,
+ collection()->ns(),
+ [&](const ExceptionFor<ErrorCodes::StaleConfig>& ex) {
+ planExecutorShardingCriticalSectionFuture(
+ opCtx()) = ex->getCriticalSectionSignal();
+ // Retry the write if we're in the sharding
+ // critical section.
+ bucketFreer.dismiss();
+ _retryBucket(id);
+ });
}
PlanStage::StageState TimeseriesModifyStage::_getNextBucket(WorkingSetID& id) {
@@ -216,25 +188,16 @@ PlanStage::StageState TimeseriesModifyStage::_getNextBucket(WorkingSetID& id) {
if (status != PlanStage::ADVANCED) {
return status;
}
-
- auto member = _ws->get(id);
- // TODO SERVER-73142 remove this assert, we may not have an object if we have a spool child.
- tassert(7443600, "Child should have provided the whole document", member->hasObj());
- if (member->hasObj()) {
- // We already got the whole document from our child, no need to fetch.
- return PlanStage::ADVANCED;
- }
} else {
- // We have a bucket that we need to fetch before we can unpack it.
id = _retryBucketId;
_retryBucketId = WorkingSet::INVALID_ID;
}
- // We don't have an up-to-date document for this RecordId. Fetch it and ensure that it still
- // exists and matches our predicate.
+ // We may not have an up-to-date bucket for this RecordId. Fetch it and ensure that it still
+ // exists and matches our bucket-level predicate if it is not believed to be up-to-date.
bool docStillMatches;
- const auto ret = handlePlanStageYield(
+ const auto status = handlePlanStageYield(
expCtx(),
"TimeseriesModifyStage:: ensureStillMatches",
collection()->ns().ns(),
@@ -248,14 +211,13 @@ PlanStage::StageState TimeseriesModifyStage::_getNextBucket(WorkingSetID& id) {
// There was a problem trying to detect if the document still exists, so retry.
_retryBucket(id);
});
- if (ret != PlanStage::NEED_TIME) {
- return ret;
+ if (status != PlanStage::NEED_TIME) {
+ return status;
}
-
return docStillMatches ? PlanStage::ADVANCED : PlanStage::NEED_TIME;
}
-void TimeseriesModifyStage::_retryBucket(const stdx::variant<WorkingSetID, RecordId>& bucketId) {
+void TimeseriesModifyStage::_retryBucket(WorkingSetID bucketId) {
tassert(7309302,
"Cannot be in the middle of unpacking a bucket if retrying",
!_bucketUnpacker.hasNext());
@@ -263,20 +225,7 @@ void TimeseriesModifyStage::_retryBucket(const stdx::variant<WorkingSetID, Recor
"Cannot retry two buckets at the same time",
_retryBucketId == WorkingSet::INVALID_ID);
- stdx::visit(OverloadedVisitor{
- [&](WorkingSetID id) { _retryBucketId = id; },
- [&](const RecordId& rid) {
- // We don't have a working set member referencing this bucket, allocate one.
- _retryBucketId = _ws->allocate();
- auto member = _ws->get(_retryBucketId);
- member->recordId = rid;
- member->doc.setSnapshotId(_currentBucketSnapshotId);
- member->transitionToRecordIdAndObj();
- },
- },
- bucketId);
-
- resetCurrentBucket();
+ _retryBucketId = bucketId;
}
PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) {
@@ -284,61 +233,62 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) {
return PlanStage::IS_EOF;
}
- // The current bucket is exhausted. Perform an atomic write to modify the bucket and move on to
- // the next.
- if (!_bucketUnpacker.hasNext()) {
- auto status = _writeToTimeseriesBuckets();
- if (status != PlanStage::NEED_TIME) {
- *out = WorkingSet::INVALID_ID;
- return status;
+ tassert(7495500,
+ "Expected bucketUnpacker's current bucket to be exhausted",
+ !_bucketUnpacker.hasNext());
+
+ auto id = WorkingSet::INVALID_ID;
+ auto status = _getNextBucket(id);
+ if (status != PlanStage::ADVANCED) {
+ if (status == PlanStage::NEED_YIELD) {
+ *out = id;
}
+ return status;
+ }
+
+ // We want to free this member when we return because we either have an owned copy of the bucket
+ // for normal write and write to orphan cases, or we skip the bucket.
+ ScopeGuard bucketFreer([&] { _ws->free(id); });
+
+ auto member = _ws->get(id);
+ tassert(7459100, "Expected a RecordId from the child stage", member->hasRecordId());
+
+ // Determine if we are writing to an orphaned bucket - such writes should be excluded from
+ // user-visible change stream events. This will be achieved later by setting 'fromMigrate' flag
+ // when calling performAtomicWrites().
+ auto [immediateReturnStageState, bucketFromMigrate] =
+ _checkIfWritingToOrphanedBucket(bucketFreer, id);
+ if (immediateReturnStageState) {
+ return *immediateReturnStageState;
+ }
+ tassert(7309304,
+ "Expected no bucket to retry after getting a new bucket",
+ _retryBucketId == WorkingSet::INVALID_ID);
+
+ // Unpack the bucket and determine which measurements match the residual predicate.
+ auto ownedBucket = member->doc.value().toBson().getOwned();
+ _bucketUnpacker.reset(std::move(ownedBucket));
+ ++_specificStats.bucketsUnpacked;
- auto id = WorkingSet::INVALID_ID;
- status = _getNextBucket(id);
-
- if (PlanStage::ADVANCED == status) {
- // We want to free this member when we return because we either have an owned copy
- // of the bucket for normal write and write to orphan cases, or we skip the bucket.
- ScopeGuard bucketFreer([&] { _ws->free(id); });
-
- auto member = _ws->get(id);
- tassert(7459100, "Expected a RecordId from the child stage", member->hasRecordId());
-
- if (auto immediateReturnStageState =
- _rememberIfWritingToOrphanedBucket(bucketFreer, id)) {
- return *immediateReturnStageState;
- }
-
- tassert(7309304,
- "Expected no bucket to retry after getting a new bucket",
- _retryBucketId == WorkingSet::INVALID_ID);
- _currentBucketRid = member->recordId;
- _currentBucketSnapshotId = member->doc.snapshotId();
-
- // Make an owned copy of the bucket document if necessary. The bucket will be
- // unwound across multiple calls to 'doWork()', so we need to hold our own copy in
- // the query execution layer in case the storage engine reclaims the memory for the
- // bucket between calls to 'doWork()'.
- auto ownedBucket = member->doc.value().toBson().getOwned();
- _bucketUnpacker.reset(std::move(ownedBucket));
- ++_specificStats.bucketsUnpacked;
+ std::vector<BSONObj> unchangedMeasurements;
+ std::vector<BSONObj> deletedMeasurements;
+
+ while (_bucketUnpacker.hasNext()) {
+ auto measurement = _bucketUnpacker.getNext().toBson();
+ if (_residualPredicate->matchesBSON(measurement)) {
+ deletedMeasurements.push_back(measurement);
} else {
- if (PlanStage::NEED_YIELD == status) {
- *out = id;
- }
- return status;
+ unchangedMeasurements.push_back(measurement);
}
}
- invariant(_bucketUnpacker.hasNext());
- auto measurement = _bucketUnpacker.getNext().toBson().getOwned();
- if (_residualPredicate->matchesBSON(measurement)) {
- _deletedMeasurements.push_back(measurement);
- } else {
- _unchangedMeasurements.push_back(measurement);
+ status = _writeToTimeseriesBuckets(
+ id, unchangedMeasurements, deletedMeasurements, bucketFromMigrate);
+ if (status != PlanStage::NEED_TIME) {
+ *out = WorkingSet::INVALID_ID;
+ bucketFreer.dismiss();
}
-
- return PlanStage::NEED_TIME;
+ return status;
}
void TimeseriesModifyStage::doRestoreStateRequiresCollection() {
diff --git a/src/mongo/db/exec/timeseries_modify.h b/src/mongo/db/exec/timeseries_modify.h
index 2c14996f83c..e428698309b 100644
--- a/src/mongo/db/exec/timeseries_modify.h
+++ b/src/mongo/db/exec/timeseries_modify.h
@@ -40,8 +40,8 @@ namespace mongo {
/**
* Unpacks time-series bucket documents and writes the modified documents.
*
- * The stage processes one measurement at a time, but only performs a write after each bucket is
- * exhausted.
+ * The stage processes one bucket at a time, unpacking all the measurements and writing the output
+ * bucket in a single doWork() call.
*/
class TimeseriesModifyStage final : public RequiresMutableCollectionStage {
public:
@@ -78,18 +78,22 @@ protected:
private:
/**
- * Writes the modifications to a bucket when the end of the bucket is detected.
+ * Writes the modifications to a bucket.
*/
- PlanStage::StageState _writeToTimeseriesBuckets();
+ PlanStage::StageState _writeToTimeseriesBuckets(
+ WorkingSetID bucketWsmId,
+ const std::vector<BSONObj>& unchangedMeasurements,
+ const std::vector<BSONObj>& deletedMeasurements,
+ bool bucketFromMigrate);
/**
* Helper to set up state to retry 'bucketId' after yielding and establishing a new storage
* snapshot.
*/
- void _retryBucket(const stdx::variant<WorkingSetID, RecordId>& bucketId);
+ void _retryBucket(WorkingSetID bucketId);
template <typename F>
- boost::optional<PlanStage::StageState> _rememberIfWritingToOrphanedBucket(
+ std::pair<boost::optional<PlanStage::StageState>, bool> _checkIfWritingToOrphanedBucket(
ScopeGuard<F>& bucketFreer, WorkingSetID id);
/**
@@ -97,8 +101,6 @@ private:
*/
PlanStage::StageState _getNextBucket(WorkingSetID& id);
- void resetCurrentBucket();
-
std::unique_ptr<DeleteStageParams> _params;
WorkingSet* _ws;
@@ -113,16 +115,6 @@ private:
// unmodified.
std::unique_ptr<MatchExpression> _residualPredicate;
- // The RecordId (also "_id" for the clustered collection) value of the current bucket.
- RecordId _currentBucketRid = RecordId{};
- // Maintained similarly to '_currentBucketRid', but used to determine if we can actually use the
- // results of unpacking to do a write. If the storage engine snapshot has changed, all bets are
- // off and it's unsafe to proceed - more details in the implementation which reads this value.
- SnapshotId _currentBucketSnapshotId = SnapshotId{};
-
- std::vector<BSONObj> _unchangedMeasurements;
- std::vector<BSONObj> _deletedMeasurements;
-
/**
* This member is used to check whether the write should be performed, and if so, any other
* behavior that should be done as part of the write (e.g. skipping it because it affects an
@@ -133,11 +125,6 @@ private:
*/
write_stage_common::PreWriteFilter _preWriteFilter;
- // True if the current bucket is an orphan and we're writing to an orphaned bucket, when such
- // writes should be excluded from user-visible change stream events. This can be achieved by
- // setting 'fromMigrate' flag when calling performAtomicWrites().
- bool _currentBucketFromMigrate = false;
-
TimeseriesModifyStats _specificStats{};
// A pending retry to get to after a NEED_YIELD propagation and a new storage snapshot is