summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/exec/timeseries_modify.cpp66
-rw-r--r--src/mongo/db/exec/timeseries_modify.h28
-rw-r--r--src/mongo/db/query/get_executor.cpp1
3 files changed, 90 insertions, 5 deletions
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp
index 739e142593b..0fa1273b551 100644
--- a/src/mongo/db/exec/timeseries_modify.cpp
+++ b/src/mongo/db/exec/timeseries_modify.cpp
@@ -37,15 +37,18 @@ namespace mongo {
const char* TimeseriesModifyStage::kStageType = "TS_MODIFY";
TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx,
+ std::unique_ptr<DeleteStageParams> params,
WorkingSet* ws,
std::unique_ptr<PlanStage> child,
const CollectionPtr& coll,
BucketUnpacker bucketUnpacker,
std::unique_ptr<MatchExpression> residualPredicate)
: RequiresCollectionStage(kStageType, expCtx, coll),
+ _params(std::move(params)),
_ws(ws),
_bucketUnpacker{std::move(bucketUnpacker)},
- _residualPredicate(std::move(residualPredicate)) {
+ _residualPredicate(std::move(residualPredicate)),
+ _preWriteFilter(opCtx(), coll->ns()) {
_children.emplace_back(std::move(child));
}
@@ -69,8 +72,13 @@ void TimeseriesModifyStage::_writeToTimeseriesBuckets() {
_specificStats.measurementsDeleted += _deletedMeasurements.size();
_deletedMeasurements.clear();
_unchangedMeasurements.clear();
+ _currentBucketFromMigrate = false;
});
+ if (_params->isExplain) {
+ return;
+ }
+
// No measurements needed to be deleted from the bucket document.
if (_deletedMeasurements.empty()) {
return;
@@ -82,7 +90,7 @@ void TimeseriesModifyStage::_writeToTimeseriesBuckets() {
write_ops::DeleteCommandRequest op(collection()->ns(), {deleteEntry});
// TODO (SERVER-73093): Handles the write failures through retry.
auto result = timeseries::performAtomicWrites(
- opCtx(), collection(), _currentBucketRid, op, /*fromMigrate=*/false);
+ opCtx(), collection(), _currentBucketRid, op, _currentBucketFromMigrate);
} else {
auto timeseriesOptions = collection()->getTimeseriesOptions();
auto metaFieldName = timeseriesOptions->getMetaField();
@@ -100,8 +108,40 @@ void TimeseriesModifyStage::_writeToTimeseriesBuckets() {
write_ops::UpdateCommandRequest op(collection()->ns(), {updateEntry});
// TODO (SERVER-73093): Handles the write failures through retry.
auto result = timeseries::performAtomicWrites(
- opCtx(), collection(), _currentBucketRid, op, /*fromMigrate=*/false);
+ opCtx(), collection(), _currentBucketRid, op, _currentBucketFromMigrate);
+ }
+}
+
+boost::optional<PlanStage::StageState> TimeseriesModifyStage::rememberIfWritingToOrphanedBucket(
+ WorkingSetMember* member) {
+ // If we are in explain mode, we do not need to check if the bucket is orphaned since we're not
+ // writing to bucket. If we are migrating a bucket, we also do not need to check if the bucket
+ // is not writable and just remember it.
+ if (_params->isExplain || _params->fromMigrate) {
+ _currentBucketFromMigrate = _params->fromMigrate;
+ return boost::none;
+ }
+
+ auto [immediateReturnStageState, currentBucketFromMigrate] =
+ _preWriteFilter.checkIfNotWritable(member->doc.value(),
+ "timeseriesDelete"_sd,
+ collection()->ns(),
+ [&](const ExceptionFor<ErrorCodes::StaleConfig>& ex) {
+ planExecutorShardingCriticalSectionFuture(opCtx()) =
+ ex->getCriticalSectionSignal();
+ // TODO (SERVER-73093): Retry the write if we're in
+ // the sharding critical section.
+ });
+
+ // We need to immediately return if the bucket is orphaned or we're in the sharding critical
+ // section and hence should yield.
+ if (immediateReturnStageState) {
+ return *immediateReturnStageState;
}
+
+ _currentBucketFromMigrate = currentBucketFromMigrate;
+
+ return boost::none;
}
PlanStage::StageState TimeseriesModifyStage::_fetchBucket(WorkingSetID id) {
@@ -159,8 +199,18 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) {
auto status = _getNextBucket(id);
if (PlanStage::ADVANCED == status) {
+ // We want to free this member when we return because we either have an owned copy of
+ // the bucket for normal write and write to orphan cases, or we skip the bucket, or we
+ // don't retry as of now.
+ // TODO (SERVER-73093): Need to dismiss 'memberFreer' if we're going to retry the write.
+ ScopeGuard memberFreer([&] { _ws->free(id); });
+
auto member = _ws->get(id);
tassert(7459100, "Expected a RecordId from the child stage", member->hasRecordId());
+
+ if (auto immediateReturnStageState = rememberIfWritingToOrphanedBucket(member)) {
+ return *immediateReturnStageState;
+ }
_currentBucketRid = member->recordId;
// Make an owned copy of the bucket document if necessary. The bucket will be
@@ -188,4 +238,14 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) {
return PlanStage::NEED_TIME;
}
+
+void TimeseriesModifyStage::doRestoreStateRequiresCollection() {
+ const NamespaceString& ns = collection()->ns();
+ uassert(ErrorCodes::PrimarySteppedDown,
+ "Demoted from primary while removing from {}"_format(ns.ns()),
+ !opCtx()->writesAreReplicated() ||
+ repl::ReplicationCoordinator::get(opCtx())->canAcceptWritesFor(opCtx(), ns));
+
+ _preWriteFilter.restoreState();
+}
} // namespace mongo
diff --git a/src/mongo/db/exec/timeseries_modify.h b/src/mongo/db/exec/timeseries_modify.h
index 99340bd04f0..0c29cfe0897 100644
--- a/src/mongo/db/exec/timeseries_modify.h
+++ b/src/mongo/db/exec/timeseries_modify.h
@@ -31,6 +31,7 @@
#pragma once
#include "mongo/db/exec/bucket_unpacker.h"
+#include "mongo/db/exec/delete_stage.h"
#include "mongo/db/exec/plan_stats.h"
#include "mongo/db/exec/requires_collection_stage.h"
@@ -47,6 +48,7 @@ public:
static const char* kStageType;
TimeseriesModifyStage(ExpressionContext* expCtx,
+ std::unique_ptr<DeleteStageParams> params,
WorkingSet* ws,
std::unique_ptr<PlanStage> child,
const CollectionPtr& coll,
@@ -68,9 +70,11 @@ public:
PlanStage::StageState doWork(WorkingSetID* id);
protected:
- void doSaveStateRequiresCollection() final {}
+ void doSaveStateRequiresCollection() final {
+ _preWriteFilter.saveState();
+ }
- void doRestoreStateRequiresCollection() final {}
+ void doRestoreStateRequiresCollection() final;
private:
/**
@@ -78,6 +82,9 @@ private:
*/
void _writeToTimeseriesBuckets();
+ boost::optional<PlanStage::StageState> rememberIfWritingToOrphanedBucket(
+ WorkingSetMember* member);
+
/**
* Fetches the document for the bucket pointed to by this WSM.
*/
@@ -88,6 +95,8 @@ private:
*/
PlanStage::StageState _getNextBucket(WorkingSetID& id);
+ std::unique_ptr<DeleteStageParams> _params;
+
WorkingSet* _ws;
//
@@ -111,6 +120,21 @@ private:
std::vector<BSONObj> _unchangedMeasurements;
std::vector<BSONObj> _deletedMeasurements;
+ /**
+ * This member is used to check whether the write should be performed, and if so, any other
+ * behavior that should be done as part of the write (e.g. skipping it because it affects an
+ * orphan document). A yield cannot happen between the check and the write, so the checks are
+ * embedded in the stage.
+ *
+ * It's refreshed after yielding and reacquiring the locks.
+ */
+ write_stage_common::PreWriteFilter _preWriteFilter;
+
+ // True if the current bucket is an orphan and we're writing to an orphaned bucket, when such
+ // writes should be excluded from user-visible change stream events. This can be achieved by
+ // setting 'fromMigrate' flag when calling performAtomicWrites().
+ bool _currentBucketFromMigrate = false;
+
TimeseriesModifyStats _specificStats{};
};
} // namespace mongo
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 34928a81434..c12b1a15a13 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -1983,6 +1983,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele
// directly.
root = std::make_unique<TimeseriesModifyStage>(
expCtxRaw,
+ std::move(deleteStageParams),
ws.get(),
std::move(root),
collection,