summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-04-29 17:10:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-29 17:39:15 +0000
commit320e415445fcbac769c7d5124a7b910f6edff827 (patch)
tree45957abf47f31e4a9211e44a8e312ceeddc56a72 /src/mongo/db/exec
parente2abe094a39dda8d4b8f623de23423868c7efb8f (diff)
downloadmongo-320e415445fcbac769c7d5124a7b910f6edff827.tar.gz
SERVER-63043 Add bounds to batched deletion pasess
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/batched_delete_stage.cpp216
-rw-r--r--src/mongo/db/exec/batched_delete_stage.h85
-rw-r--r--src/mongo/db/exec/batched_delete_stage.idl1
-rw-r--r--src/mongo/db/exec/delete_stage.h2
4 files changed, 201 insertions, 103 deletions
diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp
index 78bfd05e352..5b246c02b6b 100644
--- a/src/mongo/db/exec/batched_delete_stage.cpp
+++ b/src/mongo/db/exec/batched_delete_stage.cpp
@@ -42,8 +42,6 @@
#include "mongo/db/exec/write_stage_common.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/query/canonical_query.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
#include "mongo/s/pm2423_feature_flags_gen.h"
@@ -87,7 +85,7 @@ struct BatchedDeletesSSS : ServerStatusSection {
batches(0),
docs(0),
stagedSizeBytes(0),
- timeMillis(0) {}
+ timeInBatchMillis(0) {}
bool includeByDefault() const override {
return true;
@@ -98,7 +96,7 @@ struct BatchedDeletesSSS : ServerStatusSection {
bob.appendNumber("batches", batches.loadRelaxed());
bob.appendNumber("docs", docs.loadRelaxed());
bob.appendNumber("stagedSizeBytes", stagedSizeBytes.loadRelaxed());
- bob.append("timeMillis", timeMillis.loadRelaxed());
+ bob.appendNumber("timeInBatchMillis", timeInBatchMillis.loadRelaxed());
return bob.obj();
}
@@ -106,7 +104,7 @@ struct BatchedDeletesSSS : ServerStatusSection {
AtomicWord<long long> batches;
AtomicWord<long long> docs;
AtomicWord<long long> stagedSizeBytes;
- AtomicWord<long long> timeMillis;
+ AtomicWord<long long> timeInBatchMillis;
} batchedDeletesSSS;
BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx,
@@ -115,12 +113,31 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx,
WorkingSet* ws,
const CollectionPtr& collection,
PlanStage* child)
+ : BatchedDeleteStage(expCtx,
+ std::move(params),
+ std::move(batchParams),
+ std::make_unique<BatchedDeleteStagePassParams>(),
+ ws,
+ collection,
+ child) {}
+
+BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx,
+ std::unique_ptr<DeleteStageParams> params,
+ std::unique_ptr<BatchedDeleteStageBatchParams> batchParams,
+ std::unique_ptr<BatchedDeleteStagePassParams> passParams,
+ WorkingSet* ws,
+ const CollectionPtr& collection,
+ PlanStage* child)
: DeleteStage::DeleteStage(
kStageType.rawData(), expCtx, std::move(params), ws, collection, child),
_batchParams(std::move(batchParams)),
+ _passParams(std::move(passParams)),
_stagedDeletesBuffer(ws),
_stagedDeletesWatermarkBytes(0),
- _drainRemainingBuffer(false) {
+ _passTotalDocsStaged(0),
+ _passTimer(expCtx->opCtx->getServiceContext()->getTickSource()),
+ _commitStagedDeletes(false),
+ _passStagingComplete(false) {
tassert(6303800,
"batched deletions only support multi-document deletions (multi: true)",
_params->isMulti);
@@ -146,15 +163,65 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx,
BatchedDeleteStage::~BatchedDeleteStage() {}
+bool BatchedDeleteStage::isEOF() {
+ return _stagedDeletesBuffer.empty() && _passStagingComplete;
+}
+
+PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) {
+ WorkingSetID idToReturn = WorkingSet::INVALID_ID;
+ PlanStage::StageState planStageState = PlanStage::NEED_TIME;
+
+ if (!_commitStagedDeletes && !_passStagingComplete) {
+ // It's okay to stage more documents.
+ planStageState = _doStaging(&idToReturn);
+
+ _passStagingComplete = planStageState == PlanStage::IS_EOF;
+ _commitStagedDeletes = _passStagingComplete || _batchTargetMet();
+ }
+
+ if (!_params->isExplain && _commitStagedDeletes) {
+ // Overwriting 'planStageState' potentially means throwing away the result produced from
+ // staging. We expect to commit deletes after a new documet is staged and the batch targets
+ // are met (planStageState = PlanStage::NEED_TIME), after there are no more documents to
+ // stage (planStageState = PlanStage::IS_EOF), or when resuming to commit deletes in the
+ // buffer before more can be staged (planStageState = PlanStage::NEED_TIME by default).
+ //
+ // Enforce that if staging occurred, the resulting 'planStageState' is only overwritten when
+ // we should be committing deletes.
+ tassert(6304300,
+ "Fetched unexpected plan stage state before committing deletes",
+ planStageState == PlanStage::NEED_TIME || planStageState == PlanStage::IS_EOF);
+
+ _stagedDeletesWatermarkBytes = 0;
+ planStageState = _deleteBatch(&idToReturn);
+
+ _passStagingComplete = _passStagingComplete || _passTargetMet();
+ _commitStagedDeletes = _passStagingComplete || !_stagedDeletesBuffer.empty();
+ }
+
+ if (isEOF()) {
+ invariant(planStageState != PlanStage::NEED_YIELD);
+ return PlanStage::IS_EOF;
+ }
+
+ if (planStageState == PlanStage::NEED_YIELD) {
+ *out = idToReturn;
+ }
+
+ return planStageState;
+}
+
PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
- tassert(6389900, "Expected documents for batched deletion", _stagedDeletesBuffer.size() != 0);
+ if (!_stagedDeletesBuffer.size()) {
+ return PlanStage::NEED_TIME;
+ }
+
try {
child()->saveState();
} catch (const WriteConflictException&) {
std::terminate();
}
-
std::set<WorkingSetID> recordsThatNoLongerMatch;
Timer batchTimer(opCtx()->getServiceContext()->getTickSource());
@@ -174,14 +241,14 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
throw WriteConflictException();
}
- auto workingSetMemberId = _stagedDeletesBuffer.at(bufferOffset);
+ auto workingSetMemberID = _stagedDeletesBuffer.at(bufferOffset);
// The PlanExecutor YieldPolicy may change snapshots between calls to 'doWork()'.
// Different documents may have different snapshots.
bool docStillMatches = write_stage_common::ensureStillMatches(
- collection(), opCtx(), _ws, workingSetMemberId, _params->canonicalQuery);
+ collection(), opCtx(), _ws, workingSetMemberID, _params->canonicalQuery);
- WorkingSetMember* member = _ws->get(workingSetMemberId);
+ WorkingSetMember* member = _ws->get(workingSetMemberID);
if (docStillMatches) {
Snapshotted<Document> memberDoc = member->doc;
BSONObj bsonObjDoc = memberDoc.value().toBson();
@@ -221,14 +288,16 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
// hangAfterApproxNDocs is roughly estimated as the number of deletes
// committed
// + the number of documents deleted in the current unit of work.
+
+ // Assume nDocs is positive.
return data.hasField("sleepMs") && data.hasField("ns") &&
data.getStringField("ns") == collection()->ns().toString() &&
data.hasField("nDocs") &&
- static_cast<int>(_specificStats.docsDeleted + docsDeleted) >=
- data.getIntField("nDocs");
+ _specificStats.docsDeleted + docsDeleted >=
+ static_cast<unsigned int>(data.getIntField("nDocs"));
});
} else {
- recordsThatNoLongerMatch.insert(workingSetMemberId);
+ recordsThatNoLongerMatch.insert(workingSetMemberID);
}
const Milliseconds elapsedMillis(batchTimer.millis());
@@ -246,7 +315,7 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
incrementSSSMetricNoOverflow(batchedDeletesSSS.docs, docsDeleted);
incrementSSSMetricNoOverflow(batchedDeletesSSS.batches, 1);
- incrementSSSMetricNoOverflow(batchedDeletesSSS.timeMillis, batchTimer.millis());
+ incrementSSSMetricNoOverflow(batchedDeletesSSS.timeInBatchMillis, batchTimer.millis());
_specificStats.docsDeleted += docsDeleted;
if (bufferOffset < _stagedDeletesBuffer.size()) {
@@ -262,77 +331,51 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
_stagedDeletesBuffer.clear();
}
- _signalIfDrainComplete();
return _tryRestoreState(out);
}
-PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) {
- if (!_drainRemainingBuffer) {
- WorkingSetID id;
- auto status = child()->work(&id);
+PlanStage::StageState BatchedDeleteStage::_doStaging(WorkingSetID* idToReturn) {
+ auto status = child()->work(idToReturn);
- switch (status) {
- case PlanStage::ADVANCED:
- break;
+ switch (status) {
+ case PlanStage::ADVANCED: {
+ _stageNewDelete(idToReturn);
+ return PlanStage::NEED_TIME;
+ }
+ default:
+ return status;
+ }
+}
- case PlanStage::NEED_TIME:
- return status;
-
- case PlanStage::NEED_YIELD:
- *out = id;
- return status;
-
- case PlanStage::IS_EOF:
- if (!_stagedDeletesBuffer.empty()) {
- // Drain the outstanding deletions.
- auto ret = _deleteBatch(out);
- if (ret != NEED_TIME || (ret == NEED_TIME && _drainRemainingBuffer)) {
- // Only return NEED_TIME if there is more to drain in the buffer. Otherwise,
- // there is no more to fetch and NEED_TIME signals all staged deletes have
- // been sucessfully executed.
- return ret;
- }
- }
- return status;
+void BatchedDeleteStage::_stageNewDelete(WorkingSetID* workingSetMemberID) {
- default:
- MONGO_UNREACHABLE;
- }
+ WorkingSetMember* member = _ws->get(*workingSetMemberID);
- WorkingSetMember* member = _ws->get(id);
-
- ScopeGuard memberFreer([&] { _ws->free(id); });
- invariant(member->hasRecordId());
-
- // Deletes can't have projections. This means that covering analysis will always add
- // a fetch. We should always get fetched data, and never just key data.
- invariant(member->hasObj());
-
- if (!_params->isExplain) {
- // Preserve the member until the delete is committed. Once a delete is staged in the
- // buffer, its resources are freed when it is removed from the buffer.
- memberFreer.dismiss();
-
- // Ensure that the BSONObj underlying the WSM associated with 'id' is owned because
- // saveState() is allowed to free the memory the BSONObj points to. The BSONObj will be
- // needed later when it is passed to Collection::deleteDocument(). Note that the call to
- // makeObjOwnedIfNeeded() will leave the WSM in the RID_AND_OBJ state in case we need to
- // retry deleting it.
- member->makeObjOwnedIfNeeded();
- _stagedDeletesBuffer.append(id);
- const auto memberMemFootprintBytes = member->getMemUsage();
- _stagedDeletesWatermarkBytes += memberMemFootprintBytes;
- incrementSSSMetricNoOverflow(batchedDeletesSSS.stagedSizeBytes,
- memberMemFootprintBytes);
- }
- }
+ ScopeGuard memberFreer([&] { _ws->free(*workingSetMemberID); });
+ invariant(member->hasRecordId());
- if (!_params->isExplain && (_drainRemainingBuffer || _batchTargetMet())) {
- _stagedDeletesWatermarkBytes = 0;
- return _deleteBatch(out);
- }
+ // Deletes can't have projections. This means that covering analysis will always add
+ // a fetch. We should always get fetched data, and never just key data.
+ invariant(member->hasObj());
+
+ if (!_params->isExplain) {
+ // Preserve the member until the delete is committed. Once a delete is staged in the
+ // buffer, its resources are freed when it is removed from the buffer.
+ memberFreer.dismiss();
+
+ // Ensure that the BSONObj underlying the WSM associated with 'id' is owned because
+ // saveState() is allowed to free the memory the BSONObj points to. The BSONObj will be
+ // needed later when it is passed to Collection::deleteDocument(). Note that the call to
+ // makeObjOwnedIfNeeded() will leave the WSM in the RID_AND_OBJ state in case we need to
+ // retry deleting it.
+ member->makeObjOwnedIfNeeded();
- return PlanStage::NEED_TIME;
+ _stagedDeletesBuffer.append(*workingSetMemberID);
+ const auto memberMemFootprintBytes = member->getMemUsage();
+ _stagedDeletesWatermarkBytes += memberMemFootprintBytes;
+ _passTotalDocsStaged += 1;
+ incrementSSSMetricNoOverflow(batchedDeletesSSS.stagedSizeBytes, memberMemFootprintBytes);
+ }
}
PlanStage::StageState BatchedDeleteStage::_tryRestoreState(WorkingSetID* out) {
@@ -348,25 +391,22 @@ PlanStage::StageState BatchedDeleteStage::_tryRestoreState(WorkingSetID* out) {
PlanStage::StageState BatchedDeleteStage::_prepareToRetryDrainAfterWCE(
WorkingSetID* out, const std::set<WorkingSetID>& recordsThatNoLongerMatch) {
_stagedDeletesBuffer.erase(recordsThatNoLongerMatch);
- _signalIfDrainComplete();
*out = WorkingSet::INVALID_ID;
return NEED_YIELD;
}
-void BatchedDeleteStage::_signalIfDrainComplete() {
- _drainRemainingBuffer = !_stagedDeletesBuffer.empty();
-}
-
bool BatchedDeleteStage::_batchTargetMet() {
- tassert(6303900,
- "not expecting to be still draining staged deletions while evaluating whether to "
- "commit staged deletions",
- !_drainRemainingBuffer);
return (_batchParams->targetBatchDocs &&
- _stagedDeletesBuffer.size() >=
- static_cast<unsigned long long>(_batchParams->targetBatchDocs)) ||
+ _stagedDeletesBuffer.size() >= static_cast<size_t>(_batchParams->targetBatchDocs)) ||
(_batchParams->targetStagedDocBytes &&
_stagedDeletesWatermarkBytes >=
static_cast<unsigned long long>(_batchParams->targetStagedDocBytes));
}
+
+bool BatchedDeleteStage::_passTargetMet() {
+ return (_passParams->targetPassDocs && _passTotalDocsStaged >= _passParams->targetPassDocs) ||
+ (_passParams->targetPassTimeMS != Milliseconds(0) &&
+ Milliseconds(_passTimer.millis()) >= _passParams->targetPassTimeMS);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/exec/batched_delete_stage.h b/src/mongo/db/exec/batched_delete_stage.h
index 6c8d859e97e..89e99a5e6db 100644
--- a/src/mongo/db/exec/batched_delete_stage.h
+++ b/src/mongo/db/exec/batched_delete_stage.h
@@ -38,7 +38,7 @@
namespace mongo {
/**
- * Batch sizing parameters. A batch of staged document deletes is committed as soon
+ * 'Batch' sizing parameters. A batch of staged document deletes is committed as soon
* as one of the targets below is met, or upon reaching EOF.
*/
struct BatchedDeleteStageBatchParams {
@@ -46,6 +46,9 @@ struct BatchedDeleteStageBatchParams {
: targetBatchDocs(gBatchedDeletesTargetBatchDocs.load()),
targetBatchTimeMS(Milliseconds(gBatchedDeletesTargetBatchTimeMS.load())),
targetStagedDocBytes(gBatchedDeletesTargetStagedDocBytes.load()) {}
+ //
+ // A 'batch' refers to the deletes executed in a single WriteUnitOfWork.
+ //
// Documents staged for deletions are processed in a batch once this document count target is
// met. A value of zero means unlimited.
@@ -58,6 +61,28 @@ struct BatchedDeleteStageBatchParams {
};
/**
+ * 'Pass' parameters. A 'pass' defines a approximate target number of documents or runtime after
+ * which the deletion stops staging documents, executes any remaining deletes, and eventually
+ * returns completion. 'Pass' parameters are approximate because they are checked at a per batch
+ * commit granularity.
+ */
+struct BatchedDeleteStagePassParams {
+ BatchedDeleteStagePassParams() : targetPassDocs(0), targetPassTimeMS(Milliseconds(0)) {}
+
+ // Limits the amount of documents processed in a single pass. Once met, no more documents will
+ // be fetched for delete - any remaining staged deletes will be executed provided they still
+ // match the query and haven't been deleted by a concurrent operation. A value of zero means
+ // unlimited.
+ long long targetPassDocs;
+
+ // Limits the time spent staging and executing deletes in a single pass. Once met, no more
+ // documents will be fetched for delete - any remaining staged deletes will be executed provided
+ // they still match the query and haven't been deleted by a concurrent operation. A value of
+ // zero means unlimited.
+ Milliseconds targetPassTimeMS;
+};
+
+/**
* The BATCHED_DELETE stage deletes documents in batches. In comparison, the base class DeleteStage
* deletes documents one by one. The stage returns NEED_TIME after executing a batch of deletes, or
* after staging a delete for the next batch.
@@ -71,15 +96,28 @@ class BatchedDeleteStage final : public DeleteStage {
public:
static constexpr StringData kStageType = "BATCHED_DELETE"_sd;
-
+ // Preferred constructor that uses default 'BatchedDeleteStagePassParams'. Changing the
+ // 'BatchedDeletePassParams' from their default may cause the delete operation to only partially
+ // delete results that match the query and should only be done for specific internal operations.
+ BatchedDeleteStage(ExpressionContext* expCtx,
+ std::unique_ptr<DeleteStageParams> params,
+ std::unique_ptr<BatchedDeleteStageBatchParams> batchParams,
+ WorkingSet* ws,
+ const CollectionPtr& collection,
+ PlanStage* child);
BatchedDeleteStage(ExpressionContext* expCtx,
std::unique_ptr<DeleteStageParams> params,
std::unique_ptr<BatchedDeleteStageBatchParams> batchParams,
+ std::unique_ptr<BatchedDeleteStagePassParams> passParams,
WorkingSet* ws,
const CollectionPtr& collection,
PlanStage* child);
+
~BatchedDeleteStage();
+ // Returns true when no more work can be done (there are no more deletes to commit).
+ bool isEOF() final;
+
StageState doWork(WorkingSetID* out);
StageType stageType() const final {
@@ -87,12 +125,20 @@ public:
}
private:
- /**
- * Returns NEED_YIELD when there is a write conflict. Otherwise, returns NEED_TIME when
- * some, or all, of the documents staged in the _stagedDeletesBuffer are successfully deleted.
- */
+ // Returns NEED_YIELD when there is a write conflict. Otherwise, returns NEED_TIME when
+ // some, or all, of the documents staged in the _stagedDeletesBuffer are successfully deleted.
PlanStage::StageState _deleteBatch(WorkingSetID* out);
+ // Attempts to stage a new delete in the _stagedDeletesBuffer. Returns the PlanStage::StageState
+ // fetched directly from the child except when there is a document to stage. Converts
+ // PlanStage::ADVANCED to PlanStage::NEED_TIME before returning when a document is staged for
+ // delete - PlanStage:ADVANCED doesn't hold meaning in a batched delete since nothing will ever
+ // be directly returned from this stage.
+ PlanStage::StageState _doStaging(WorkingSetID* out);
+
+ // Stages the document tied to workingSetMemberID into the _stagedDeletesBuffer.
+ void _stageNewDelete(WorkingSetID* workingSetMemberID);
+
// Tries to restore the child's state. Returns NEED_TIME if the restore succeeds, NEED_YIELD
// upon write conflict.
PlanStage::StageState _tryRestoreState(WorkingSetID* out);
@@ -102,16 +148,20 @@ private:
PlanStage::StageState _prepareToRetryDrainAfterWCE(
WorkingSetID* out, const std::set<WorkingSetID>& recordsThatNoLongerMatch);
- // Either signals that all the elements in the buffer have been drained or that there are more
- // elements to drain.
- void _signalIfDrainComplete();
-
// Returns true if one or more of the batch targets are met and it is time to delete the batch.
bool _batchTargetMet();
+ // Returns true if one or more of the pass targets are met and it is time to drain the remaining
+ // buffer and return completion. Note - this method checks a timer and repeated calls can become
+ // expensive.
+ bool _passTargetMet();
+
// Batch targeting parameters.
std::unique_ptr<BatchedDeleteStageBatchParams> _batchParams;
+ // Pass targeting parameters.
+ std::unique_ptr<BatchedDeleteStagePassParams> _passParams;
+
// Holds information for each document staged for delete.
BatchedDeleteStageBuffer _stagedDeletesBuffer;
@@ -120,9 +170,18 @@ private:
// regardless of whether all staged deletes have been committed yet.
size_t _stagedDeletesWatermarkBytes;
- // Whether there are remaining docs in the buffer from a previous call to doWork() that should
- // be drained before fetching more documents.
- bool _drainRemainingBuffer;
+ // Tracks the cumulative number of documents staged for deletes over the operation.
+ long long _passTotalDocsStaged;
+
+ // Tracks the cumulative elapsed time since the operation began.
+ Timer _passTimer;
+
+ // True when the deletes in the buffer must be committed before more documents can be staged.
+ bool _commitStagedDeletes;
+
+ // True when the operation is done staging new documents. The only work left is to drain the
+ // remaining buffer.
+ bool _passStagingComplete;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/batched_delete_stage.idl b/src/mongo/db/exec/batched_delete_stage.idl
index 5ccf0cdab72..73b0918f9ec 100644
--- a/src/mongo/db/exec/batched_delete_stage.idl
+++ b/src/mongo/db/exec/batched_delete_stage.idl
@@ -57,4 +57,3 @@ server_parameters:
default: 5
validator:
gte: 0
-
diff --git a/src/mongo/db/exec/delete_stage.h b/src/mongo/db/exec/delete_stage.h
index fbf41decd21..244c8fb7c16 100644
--- a/src/mongo/db/exec/delete_stage.h
+++ b/src/mongo/db/exec/delete_stage.h
@@ -121,7 +121,7 @@ public:
const CollectionPtr& collection,
PlanStage* child);
- bool isEOF() final;
+ bool isEOF();
StageState doWork(WorkingSetID* out);
StageType stageType() const {