diff options
author | Yoonsoo Kim <yoonsoo.kim@mongodb.com> | 2023-03-14 04:32:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-14 05:30:54 +0000 |
commit | fc06eeaff4df9563dc25db706fe8f415ad2f8218 (patch) | |
tree | b6532d49bf8b0abcc82052e599923932501d31f9 /src/mongo/db | |
parent | 7f1c519cd8cc0c4304a5d67b03e2682f542b580c (diff) | |
download | mongo-fc06eeaff4df9563dc25db706fe8f415ad2f8218.tar.gz |
SERVER-73079 Skip unowned bucket or track write to orphaned bucket
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.cpp | 66 | ||||
-rw-r--r-- | src/mongo/db/exec/timeseries_modify.h | 28 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 1 |
3 files changed, 90 insertions, 5 deletions
diff --git a/src/mongo/db/exec/timeseries_modify.cpp b/src/mongo/db/exec/timeseries_modify.cpp index 739e142593b..0fa1273b551 100644 --- a/src/mongo/db/exec/timeseries_modify.cpp +++ b/src/mongo/db/exec/timeseries_modify.cpp @@ -37,15 +37,18 @@ namespace mongo { const char* TimeseriesModifyStage::kStageType = "TS_MODIFY"; TimeseriesModifyStage::TimeseriesModifyStage(ExpressionContext* expCtx, + std::unique_ptr<DeleteStageParams> params, WorkingSet* ws, std::unique_ptr<PlanStage> child, const CollectionPtr& coll, BucketUnpacker bucketUnpacker, std::unique_ptr<MatchExpression> residualPredicate) : RequiresCollectionStage(kStageType, expCtx, coll), + _params(std::move(params)), _ws(ws), _bucketUnpacker{std::move(bucketUnpacker)}, - _residualPredicate(std::move(residualPredicate)) { + _residualPredicate(std::move(residualPredicate)), + _preWriteFilter(opCtx(), coll->ns()) { _children.emplace_back(std::move(child)); } @@ -69,8 +72,13 @@ void TimeseriesModifyStage::_writeToTimeseriesBuckets() { _specificStats.measurementsDeleted += _deletedMeasurements.size(); _deletedMeasurements.clear(); _unchangedMeasurements.clear(); + _currentBucketFromMigrate = false; }); + if (_params->isExplain) { + return; + } + // No measurements needed to be deleted from the bucket document. if (_deletedMeasurements.empty()) { return; @@ -82,7 +90,7 @@ void TimeseriesModifyStage::_writeToTimeseriesBuckets() { write_ops::DeleteCommandRequest op(collection()->ns(), {deleteEntry}); // TODO (SERVER-73093): Handles the write failures through retry. auto result = timeseries::performAtomicWrites( - opCtx(), collection(), _currentBucketRid, op, /*fromMigrate=*/false); + opCtx(), collection(), _currentBucketRid, op, _currentBucketFromMigrate); } else { auto timeseriesOptions = collection()->getTimeseriesOptions(); auto metaFieldName = timeseriesOptions->getMetaField(); @@ -100,8 +108,40 @@ void TimeseriesModifyStage::_writeToTimeseriesBuckets() { write_ops::UpdateCommandRequest op(collection()->ns(), {updateEntry}); // TODO (SERVER-73093): Handles the write failures through retry. auto result = timeseries::performAtomicWrites( - opCtx(), collection(), _currentBucketRid, op, /*fromMigrate=*/false); + opCtx(), collection(), _currentBucketRid, op, _currentBucketFromMigrate); + } +} + +boost::optional<PlanStage::StageState> TimeseriesModifyStage::rememberIfWritingToOrphanedBucket( + WorkingSetMember* member) { + // If we are in explain mode, we do not need to check if the bucket is orphaned since we're not + // writing to bucket. If we are migrating a bucket, we also do not need to check if the bucket + // is not writable and just remember it. + if (_params->isExplain || _params->fromMigrate) { + _currentBucketFromMigrate = _params->fromMigrate; + return boost::none; + } + + auto [immediateReturnStageState, currentBucketFromMigrate] = + _preWriteFilter.checkIfNotWritable(member->doc.value(), + "timeseriesDelete"_sd, + collection()->ns(), + [&](const ExceptionFor<ErrorCodes::StaleConfig>& ex) { + planExecutorShardingCriticalSectionFuture(opCtx()) = + ex->getCriticalSectionSignal(); + // TODO (SERVER-73093): Retry the write if we're in + // the sharding critical section. + }); + + // We need to immediately return if the bucket is orphaned or we're in the sharding critical + // section and hence should yield. + if (immediateReturnStageState) { + return *immediateReturnStageState; } + + _currentBucketFromMigrate = currentBucketFromMigrate; + + return boost::none; } PlanStage::StageState TimeseriesModifyStage::_fetchBucket(WorkingSetID id) { @@ -159,8 +199,18 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) { auto status = _getNextBucket(id); if (PlanStage::ADVANCED == status) { + // We want to free this member when we return because we either have an owned copy of + // the bucket for normal write and write to orphan cases, or we skip the bucket, or we + // don't retry as of now. + // TODO (SERVER-73093): Need to dismiss 'memberFreer' if we're going to retry the write. + ScopeGuard memberFreer([&] { _ws->free(id); }); + auto member = _ws->get(id); tassert(7459100, "Expected a RecordId from the child stage", member->hasRecordId()); + + if (auto immediateReturnStageState = rememberIfWritingToOrphanedBucket(member)) { + return *immediateReturnStageState; + } _currentBucketRid = member->recordId; // Make an owned copy of the bucket document if necessary. The bucket will be @@ -188,4 +238,14 @@ PlanStage::StageState TimeseriesModifyStage::doWork(WorkingSetID* out) { return PlanStage::NEED_TIME; } + +void TimeseriesModifyStage::doRestoreStateRequiresCollection() { + const NamespaceString& ns = collection()->ns(); + uassert(ErrorCodes::PrimarySteppedDown, + "Demoted from primary while removing from {}"_format(ns.ns()), + !opCtx()->writesAreReplicated() || + repl::ReplicationCoordinator::get(opCtx())->canAcceptWritesFor(opCtx(), ns)); + + _preWriteFilter.restoreState(); +} } // namespace mongo diff --git a/src/mongo/db/exec/timeseries_modify.h b/src/mongo/db/exec/timeseries_modify.h index 99340bd04f0..0c29cfe0897 100644 --- a/src/mongo/db/exec/timeseries_modify.h +++ b/src/mongo/db/exec/timeseries_modify.h @@ -31,6 +31,7 @@ #pragma once #include "mongo/db/exec/bucket_unpacker.h" +#include "mongo/db/exec/delete_stage.h" #include "mongo/db/exec/plan_stats.h" #include "mongo/db/exec/requires_collection_stage.h" @@ -47,6 +48,7 @@ public: static const char* kStageType; TimeseriesModifyStage(ExpressionContext* expCtx, + std::unique_ptr<DeleteStageParams> params, WorkingSet* ws, std::unique_ptr<PlanStage> child, const CollectionPtr& coll, @@ -68,9 +70,11 @@ public: PlanStage::StageState doWork(WorkingSetID* id); protected: - void doSaveStateRequiresCollection() final {} + void doSaveStateRequiresCollection() final { + _preWriteFilter.saveState(); + } - void doRestoreStateRequiresCollection() final {} + void doRestoreStateRequiresCollection() final; private: /** @@ -78,6 +82,9 @@ private: */ void _writeToTimeseriesBuckets(); + boost::optional<PlanStage::StageState> rememberIfWritingToOrphanedBucket( + WorkingSetMember* member); + /** * Fetches the document for the bucket pointed to by this WSM. */ @@ -88,6 +95,8 @@ private: */ PlanStage::StageState _getNextBucket(WorkingSetID& id); + std::unique_ptr<DeleteStageParams> _params; + WorkingSet* _ws; // @@ -111,6 +120,21 @@ private: std::vector<BSONObj> _unchangedMeasurements; std::vector<BSONObj> _deletedMeasurements; + /** + * This member is used to check whether the write should be performed, and if so, any other + * behavior that should be done as part of the write (e.g. skipping it because it affects an + * orphan document). A yield cannot happen between the check and the write, so the checks are + * embedded in the stage. + * + * It's refreshed after yielding and reacquiring the locks. + */ + write_stage_common::PreWriteFilter _preWriteFilter; + + // True if the current bucket is an orphan and we're writing to an orphaned bucket, when such + // writes should be excluded from user-visible change stream events. This can be achieved by + // setting 'fromMigrate' flag when calling performAtomicWrites(). + bool _currentBucketFromMigrate = false; + TimeseriesModifyStats _specificStats{}; }; } // namespace mongo diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index 34928a81434..c12b1a15a13 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -1983,6 +1983,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorDele // directly. root = std::make_unique<TimeseriesModifyStage>( expCtxRaw, + std::move(deleteStageParams), ws.get(), std::move(root), collection, |