diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-04-29 17:10:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-29 17:39:15 +0000 |
commit | 320e415445fcbac769c7d5124a7b910f6edff827 (patch) | |
tree | 45957abf47f31e4a9211e44a8e312ceeddc56a72 /src/mongo/db/exec | |
parent | e2abe094a39dda8d4b8f623de23423868c7efb8f (diff) | |
download | mongo-320e415445fcbac769c7d5124a7b910f6edff827.tar.gz |
SERVER-63043 Add bounds to batched deletion pasess
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage.cpp | 216 | ||||
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage.h | 85 | ||||
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage.idl | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/delete_stage.h | 2 |
4 files changed, 201 insertions, 103 deletions
diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp index 78bfd05e352..5b246c02b6b 100644 --- a/src/mongo/db/exec/batched_delete_stage.cpp +++ b/src/mongo/db/exec/batched_delete_stage.cpp @@ -42,8 +42,6 @@ #include "mongo/db/exec/write_stage_common.h" #include "mongo/db/op_observer.h" #include "mongo/db/query/canonical_query.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" #include "mongo/s/pm2423_feature_flags_gen.h" @@ -87,7 +85,7 @@ struct BatchedDeletesSSS : ServerStatusSection { batches(0), docs(0), stagedSizeBytes(0), - timeMillis(0) {} + timeInBatchMillis(0) {} bool includeByDefault() const override { return true; @@ -98,7 +96,7 @@ struct BatchedDeletesSSS : ServerStatusSection { bob.appendNumber("batches", batches.loadRelaxed()); bob.appendNumber("docs", docs.loadRelaxed()); bob.appendNumber("stagedSizeBytes", stagedSizeBytes.loadRelaxed()); - bob.append("timeMillis", timeMillis.loadRelaxed()); + bob.appendNumber("timeInBatchMillis", timeInBatchMillis.loadRelaxed()); return bob.obj(); } @@ -106,7 +104,7 @@ struct BatchedDeletesSSS : ServerStatusSection { AtomicWord<long long> batches; AtomicWord<long long> docs; AtomicWord<long long> stagedSizeBytes; - AtomicWord<long long> timeMillis; + AtomicWord<long long> timeInBatchMillis; } batchedDeletesSSS; BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, @@ -115,12 +113,31 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, WorkingSet* ws, const CollectionPtr& collection, PlanStage* child) + : BatchedDeleteStage(expCtx, + std::move(params), + std::move(batchParams), + std::make_unique<BatchedDeleteStagePassParams>(), + ws, + collection, + child) {} + +BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, + std::unique_ptr<DeleteStageParams> params, + std::unique_ptr<BatchedDeleteStageBatchParams> batchParams, + std::unique_ptr<BatchedDeleteStagePassParams> passParams, + WorkingSet* ws, + const CollectionPtr& collection, + PlanStage* child) : DeleteStage::DeleteStage( kStageType.rawData(), expCtx, std::move(params), ws, collection, child), _batchParams(std::move(batchParams)), + _passParams(std::move(passParams)), _stagedDeletesBuffer(ws), _stagedDeletesWatermarkBytes(0), - _drainRemainingBuffer(false) { + _passTotalDocsStaged(0), + _passTimer(expCtx->opCtx->getServiceContext()->getTickSource()), + _commitStagedDeletes(false), + _passStagingComplete(false) { tassert(6303800, "batched deletions only support multi-document deletions (multi: true)", _params->isMulti); @@ -146,15 +163,65 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, BatchedDeleteStage::~BatchedDeleteStage() {} +bool BatchedDeleteStage::isEOF() { + return _stagedDeletesBuffer.empty() && _passStagingComplete; +} + +PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) { + WorkingSetID idToReturn = WorkingSet::INVALID_ID; + PlanStage::StageState planStageState = PlanStage::NEED_TIME; + + if (!_commitStagedDeletes && !_passStagingComplete) { + // It's okay to stage more documents. + planStageState = _doStaging(&idToReturn); + + _passStagingComplete = planStageState == PlanStage::IS_EOF; + _commitStagedDeletes = _passStagingComplete || _batchTargetMet(); + } + + if (!_params->isExplain && _commitStagedDeletes) { + // Overwriting 'planStageState' potentially means throwing away the result produced from + // staging. We expect to commit deletes after a new documet is staged and the batch targets + // are met (planStageState = PlanStage::NEED_TIME), after there are no more documents to + // stage (planStageState = PlanStage::IS_EOF), or when resuming to commit deletes in the + // buffer before more can be staged (planStageState = PlanStage::NEED_TIME by default). + // + // Enforce that if staging occurred, the resulting 'planStageState' is only overwritten when + // we should be committing deletes. + tassert(6304300, + "Fetched unexpected plan stage state before committing deletes", + planStageState == PlanStage::NEED_TIME || planStageState == PlanStage::IS_EOF); + + _stagedDeletesWatermarkBytes = 0; + planStageState = _deleteBatch(&idToReturn); + + _passStagingComplete = _passStagingComplete || _passTargetMet(); + _commitStagedDeletes = _passStagingComplete || !_stagedDeletesBuffer.empty(); + } + + if (isEOF()) { + invariant(planStageState != PlanStage::NEED_YIELD); + return PlanStage::IS_EOF; + } + + if (planStageState == PlanStage::NEED_YIELD) { + *out = idToReturn; + } + + return planStageState; +} + PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { - tassert(6389900, "Expected documents for batched deletion", _stagedDeletesBuffer.size() != 0); + if (!_stagedDeletesBuffer.size()) { + return PlanStage::NEED_TIME; + } + try { child()->saveState(); } catch (const WriteConflictException&) { std::terminate(); } - std::set<WorkingSetID> recordsThatNoLongerMatch; Timer batchTimer(opCtx()->getServiceContext()->getTickSource()); @@ -174,14 +241,14 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { throw WriteConflictException(); } - auto workingSetMemberId = _stagedDeletesBuffer.at(bufferOffset); + auto workingSetMemberID = _stagedDeletesBuffer.at(bufferOffset); // The PlanExecutor YieldPolicy may change snapshots between calls to 'doWork()'. // Different documents may have different snapshots. bool docStillMatches = write_stage_common::ensureStillMatches( - collection(), opCtx(), _ws, workingSetMemberId, _params->canonicalQuery); + collection(), opCtx(), _ws, workingSetMemberID, _params->canonicalQuery); - WorkingSetMember* member = _ws->get(workingSetMemberId); + WorkingSetMember* member = _ws->get(workingSetMemberID); if (docStillMatches) { Snapshotted<Document> memberDoc = member->doc; BSONObj bsonObjDoc = memberDoc.value().toBson(); @@ -221,14 +288,16 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { // hangAfterApproxNDocs is roughly estimated as the number of deletes // committed // + the number of documents deleted in the current unit of work. + + // Assume nDocs is positive. return data.hasField("sleepMs") && data.hasField("ns") && data.getStringField("ns") == collection()->ns().toString() && data.hasField("nDocs") && - static_cast<int>(_specificStats.docsDeleted + docsDeleted) >= - data.getIntField("nDocs"); + _specificStats.docsDeleted + docsDeleted >= + static_cast<unsigned int>(data.getIntField("nDocs")); }); } else { - recordsThatNoLongerMatch.insert(workingSetMemberId); + recordsThatNoLongerMatch.insert(workingSetMemberID); } const Milliseconds elapsedMillis(batchTimer.millis()); @@ -246,7 +315,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { incrementSSSMetricNoOverflow(batchedDeletesSSS.docs, docsDeleted); incrementSSSMetricNoOverflow(batchedDeletesSSS.batches, 1); - incrementSSSMetricNoOverflow(batchedDeletesSSS.timeMillis, batchTimer.millis()); + incrementSSSMetricNoOverflow(batchedDeletesSSS.timeInBatchMillis, batchTimer.millis()); _specificStats.docsDeleted += docsDeleted; if (bufferOffset < _stagedDeletesBuffer.size()) { @@ -262,77 +331,51 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { _stagedDeletesBuffer.clear(); } - _signalIfDrainComplete(); return _tryRestoreState(out); } -PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) { - if (!_drainRemainingBuffer) { - WorkingSetID id; - auto status = child()->work(&id); +PlanStage::StageState BatchedDeleteStage::_doStaging(WorkingSetID* idToReturn) { + auto status = child()->work(idToReturn); - switch (status) { - case PlanStage::ADVANCED: - break; + switch (status) { + case PlanStage::ADVANCED: { + _stageNewDelete(idToReturn); + return PlanStage::NEED_TIME; + } + default: + return status; + } +} - case PlanStage::NEED_TIME: - return status; - - case PlanStage::NEED_YIELD: - *out = id; - return status; - - case PlanStage::IS_EOF: - if (!_stagedDeletesBuffer.empty()) { - // Drain the outstanding deletions. - auto ret = _deleteBatch(out); - if (ret != NEED_TIME || (ret == NEED_TIME && _drainRemainingBuffer)) { - // Only return NEED_TIME if there is more to drain in the buffer. Otherwise, - // there is no more to fetch and NEED_TIME signals all staged deletes have - // been sucessfully executed. - return ret; - } - } - return status; +void BatchedDeleteStage::_stageNewDelete(WorkingSetID* workingSetMemberID) { - default: - MONGO_UNREACHABLE; - } + WorkingSetMember* member = _ws->get(*workingSetMemberID); - WorkingSetMember* member = _ws->get(id); - - ScopeGuard memberFreer([&] { _ws->free(id); }); - invariant(member->hasRecordId()); - - // Deletes can't have projections. This means that covering analysis will always add - // a fetch. We should always get fetched data, and never just key data. - invariant(member->hasObj()); - - if (!_params->isExplain) { - // Preserve the member until the delete is committed. Once a delete is staged in the - // buffer, its resources are freed when it is removed from the buffer. - memberFreer.dismiss(); - - // Ensure that the BSONObj underlying the WSM associated with 'id' is owned because - // saveState() is allowed to free the memory the BSONObj points to. The BSONObj will be - // needed later when it is passed to Collection::deleteDocument(). Note that the call to - // makeObjOwnedIfNeeded() will leave the WSM in the RID_AND_OBJ state in case we need to - // retry deleting it. - member->makeObjOwnedIfNeeded(); - _stagedDeletesBuffer.append(id); - const auto memberMemFootprintBytes = member->getMemUsage(); - _stagedDeletesWatermarkBytes += memberMemFootprintBytes; - incrementSSSMetricNoOverflow(batchedDeletesSSS.stagedSizeBytes, - memberMemFootprintBytes); - } - } + ScopeGuard memberFreer([&] { _ws->free(*workingSetMemberID); }); + invariant(member->hasRecordId()); - if (!_params->isExplain && (_drainRemainingBuffer || _batchTargetMet())) { - _stagedDeletesWatermarkBytes = 0; - return _deleteBatch(out); - } + // Deletes can't have projections. This means that covering analysis will always add + // a fetch. We should always get fetched data, and never just key data. + invariant(member->hasObj()); + + if (!_params->isExplain) { + // Preserve the member until the delete is committed. Once a delete is staged in the + // buffer, its resources are freed when it is removed from the buffer. + memberFreer.dismiss(); + + // Ensure that the BSONObj underlying the WSM associated with 'id' is owned because + // saveState() is allowed to free the memory the BSONObj points to. The BSONObj will be + // needed later when it is passed to Collection::deleteDocument(). Note that the call to + // makeObjOwnedIfNeeded() will leave the WSM in the RID_AND_OBJ state in case we need to + // retry deleting it. + member->makeObjOwnedIfNeeded(); - return PlanStage::NEED_TIME; + _stagedDeletesBuffer.append(*workingSetMemberID); + const auto memberMemFootprintBytes = member->getMemUsage(); + _stagedDeletesWatermarkBytes += memberMemFootprintBytes; + _passTotalDocsStaged += 1; + incrementSSSMetricNoOverflow(batchedDeletesSSS.stagedSizeBytes, memberMemFootprintBytes); + } } PlanStage::StageState BatchedDeleteStage::_tryRestoreState(WorkingSetID* out) { @@ -348,25 +391,22 @@ PlanStage::StageState BatchedDeleteStage::_tryRestoreState(WorkingSetID* out) { PlanStage::StageState BatchedDeleteStage::_prepareToRetryDrainAfterWCE( WorkingSetID* out, const std::set<WorkingSetID>& recordsThatNoLongerMatch) { _stagedDeletesBuffer.erase(recordsThatNoLongerMatch); - _signalIfDrainComplete(); *out = WorkingSet::INVALID_ID; return NEED_YIELD; } -void BatchedDeleteStage::_signalIfDrainComplete() { - _drainRemainingBuffer = !_stagedDeletesBuffer.empty(); -} - bool BatchedDeleteStage::_batchTargetMet() { - tassert(6303900, - "not expecting to be still draining staged deletions while evaluating whether to " - "commit staged deletions", - !_drainRemainingBuffer); return (_batchParams->targetBatchDocs && - _stagedDeletesBuffer.size() >= - static_cast<unsigned long long>(_batchParams->targetBatchDocs)) || + _stagedDeletesBuffer.size() >= static_cast<size_t>(_batchParams->targetBatchDocs)) || (_batchParams->targetStagedDocBytes && _stagedDeletesWatermarkBytes >= static_cast<unsigned long long>(_batchParams->targetStagedDocBytes)); } + +bool BatchedDeleteStage::_passTargetMet() { + return (_passParams->targetPassDocs && _passTotalDocsStaged >= _passParams->targetPassDocs) || + (_passParams->targetPassTimeMS != Milliseconds(0) && + Milliseconds(_passTimer.millis()) >= _passParams->targetPassTimeMS); +} + } // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage.h b/src/mongo/db/exec/batched_delete_stage.h index 6c8d859e97e..89e99a5e6db 100644 --- a/src/mongo/db/exec/batched_delete_stage.h +++ b/src/mongo/db/exec/batched_delete_stage.h @@ -38,7 +38,7 @@ namespace mongo { /** - * Batch sizing parameters. A batch of staged document deletes is committed as soon + * 'Batch' sizing parameters. A batch of staged document deletes is committed as soon * as one of the targets below is met, or upon reaching EOF. */ struct BatchedDeleteStageBatchParams { @@ -46,6 +46,9 @@ struct BatchedDeleteStageBatchParams { : targetBatchDocs(gBatchedDeletesTargetBatchDocs.load()), targetBatchTimeMS(Milliseconds(gBatchedDeletesTargetBatchTimeMS.load())), targetStagedDocBytes(gBatchedDeletesTargetStagedDocBytes.load()) {} + // + // A 'batch' refers to the deletes executed in a single WriteUnitOfWork. + // // Documents staged for deletions are processed in a batch once this document count target is // met. A value of zero means unlimited. @@ -58,6 +61,28 @@ struct BatchedDeleteStageBatchParams { }; /** + * 'Pass' parameters. A 'pass' defines a approximate target number of documents or runtime after + * which the deletion stops staging documents, executes any remaining deletes, and eventually + * returns completion. 'Pass' parameters are approximate because they are checked at a per batch + * commit granularity. + */ +struct BatchedDeleteStagePassParams { + BatchedDeleteStagePassParams() : targetPassDocs(0), targetPassTimeMS(Milliseconds(0)) {} + + // Limits the amount of documents processed in a single pass. Once met, no more documents will + // be fetched for delete - any remaining staged deletes will be executed provided they still + // match the query and haven't been deleted by a concurrent operation. A value of zero means + // unlimited. + long long targetPassDocs; + + // Limits the time spent staging and executing deletes in a single pass. Once met, no more + // documents will be fetched for delete - any remaining staged deletes will be executed provided + // they still match the query and haven't been deleted by a concurrent operation. A value of + // zero means unlimited. + Milliseconds targetPassTimeMS; +}; + +/** * The BATCHED_DELETE stage deletes documents in batches. In comparison, the base class DeleteStage * deletes documents one by one. The stage returns NEED_TIME after executing a batch of deletes, or * after staging a delete for the next batch. @@ -71,15 +96,28 @@ class BatchedDeleteStage final : public DeleteStage { public: static constexpr StringData kStageType = "BATCHED_DELETE"_sd; - + // Preferred constructor that uses default 'BatchedDeleteStagePassParams'. Changing the + // 'BatchedDeletePassParams' from their default may cause the delete operation to only partially + // delete results that match the query and should only be done for specific internal operations. + BatchedDeleteStage(ExpressionContext* expCtx, + std::unique_ptr<DeleteStageParams> params, + std::unique_ptr<BatchedDeleteStageBatchParams> batchParams, + WorkingSet* ws, + const CollectionPtr& collection, + PlanStage* child); BatchedDeleteStage(ExpressionContext* expCtx, std::unique_ptr<DeleteStageParams> params, std::unique_ptr<BatchedDeleteStageBatchParams> batchParams, + std::unique_ptr<BatchedDeleteStagePassParams> passParams, WorkingSet* ws, const CollectionPtr& collection, PlanStage* child); + ~BatchedDeleteStage(); + // Returns true when no more work can be done (there are no more deletes to commit). + bool isEOF() final; + StageState doWork(WorkingSetID* out); StageType stageType() const final { @@ -87,12 +125,20 @@ public: } private: - /** - * Returns NEED_YIELD when there is a write conflict. Otherwise, returns NEED_TIME when - * some, or all, of the documents staged in the _stagedDeletesBuffer are successfully deleted. - */ + // Returns NEED_YIELD when there is a write conflict. Otherwise, returns NEED_TIME when + // some, or all, of the documents staged in the _stagedDeletesBuffer are successfully deleted. PlanStage::StageState _deleteBatch(WorkingSetID* out); + // Attempts to stage a new delete in the _stagedDeletesBuffer. Returns the PlanStage::StageState + // fetched directly from the child except when there is a document to stage. Converts + // PlanStage::ADVANCED to PlanStage::NEED_TIME before returning when a document is staged for + // delete - PlanStage:ADVANCED doesn't hold meaning in a batched delete since nothing will ever + // be directly returned from this stage. + PlanStage::StageState _doStaging(WorkingSetID* out); + + // Stages the document tied to workingSetMemberID into the _stagedDeletesBuffer. + void _stageNewDelete(WorkingSetID* workingSetMemberID); + // Tries to restore the child's state. Returns NEED_TIME if the restore succeeds, NEED_YIELD // upon write conflict. PlanStage::StageState _tryRestoreState(WorkingSetID* out); @@ -102,16 +148,20 @@ private: PlanStage::StageState _prepareToRetryDrainAfterWCE( WorkingSetID* out, const std::set<WorkingSetID>& recordsThatNoLongerMatch); - // Either signals that all the elements in the buffer have been drained or that there are more - // elements to drain. - void _signalIfDrainComplete(); - // Returns true if one or more of the batch targets are met and it is time to delete the batch. bool _batchTargetMet(); + // Returns true if one or more of the pass targets are met and it is time to drain the remaining + // buffer and return completion. Note - this method checks a timer and repeated calls can become + // expensive. + bool _passTargetMet(); + // Batch targeting parameters. std::unique_ptr<BatchedDeleteStageBatchParams> _batchParams; + // Pass targeting parameters. + std::unique_ptr<BatchedDeleteStagePassParams> _passParams; + // Holds information for each document staged for delete. BatchedDeleteStageBuffer _stagedDeletesBuffer; @@ -120,9 +170,18 @@ private: // regardless of whether all staged deletes have been committed yet. size_t _stagedDeletesWatermarkBytes; - // Whether there are remaining docs in the buffer from a previous call to doWork() that should - // be drained before fetching more documents. - bool _drainRemainingBuffer; + // Tracks the cumulative number of documents staged for deletes over the operation. + long long _passTotalDocsStaged; + + // Tracks the cumulative elapsed time since the operation began. + Timer _passTimer; + + // True when the deletes in the buffer must be committed before more documents can be staged. + bool _commitStagedDeletes; + + // True when the operation is done staging new documents. The only work left is to drain the + // remaining buffer. + bool _passStagingComplete; }; } // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage.idl b/src/mongo/db/exec/batched_delete_stage.idl index 5ccf0cdab72..73b0918f9ec 100644 --- a/src/mongo/db/exec/batched_delete_stage.idl +++ b/src/mongo/db/exec/batched_delete_stage.idl @@ -57,4 +57,3 @@ server_parameters: default: 5 validator: gte: 0 - diff --git a/src/mongo/db/exec/delete_stage.h b/src/mongo/db/exec/delete_stage.h index fbf41decd21..244c8fb7c16 100644 --- a/src/mongo/db/exec/delete_stage.h +++ b/src/mongo/db/exec/delete_stage.h @@ -121,7 +121,7 @@ public: const CollectionPtr& collection, PlanStage* child); - bool isEOF() final; + bool isEOF(); StageState doWork(WorkingSetID* out); StageType stageType() const { |