diff options
-rw-r--r-- | jstests/noPassthrough/batched_multi_deletes.js | 3 | ||||
-rw-r--r-- | jstests/noPassthrough/batched_multi_deletes_write_conflict.js (renamed from jstests/noPassthrough/batched_multi_deletes_WC.js) | 0 | ||||
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage.h | 28 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_batched_delete.cpp | 232 |
5 files changed, 334 insertions, 41 deletions
diff --git a/jstests/noPassthrough/batched_multi_deletes.js b/jstests/noPassthrough/batched_multi_deletes.js index 98aedceddb8..d77b5bb1b21 100644 --- a/jstests/noPassthrough/batched_multi_deletes.js +++ b/jstests/noPassthrough/batched_multi_deletes.js @@ -51,6 +51,9 @@ function validateBatchedDeletes(conn) { assert.commandWorked( db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1})); + // For consistent results, don't enforce the targetBatchTimeMS. + assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0})); + // Explain plan and executionStats. { const expl = db.runCommand({ diff --git a/jstests/noPassthrough/batched_multi_deletes_WC.js b/jstests/noPassthrough/batched_multi_deletes_write_conflict.js index a59d0549101..a59d0549101 100644 --- a/jstests/noPassthrough/batched_multi_deletes_WC.js +++ b/jstests/noPassthrough/batched_multi_deletes_write_conflict.js diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp index 7c4fac03e24..c91b7daf16d 100644 --- a/src/mongo/db/exec/batched_delete_stage.cpp +++ b/src/mongo/db/exec/batched_delete_stage.cpp @@ -87,7 +87,6 @@ bool ensureStillMatches(OperationContext* opCtx, } return true; } - } // namespace /** @@ -159,33 +158,42 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, BatchedDeleteStage::~BatchedDeleteStage() {} PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { + tassert(6389900, "Expected documents for batched deletion", _stagedDeletesBuffer.size() != 0); try { child()->saveState(); } catch (const WriteConflictException&) { std::terminate(); } - const auto startOfBatchTimestampMillis = Date_t::now().toMillisSinceEpoch(); + + std::set<RecordId> recordsThatNoLongerMatch; + Timer batchTimer(opCtx()->getServiceContext()->getTickSource()); + unsigned int docsDeleted = 0; - std::vector<RecordId> recordsThatNoLongerMatch; + unsigned int batchIdx = 0; + try { // Start a WUOW with 'groupOplogEntries' which groups a delete batch into a single timestamp // and oplog entry WriteUnitOfWork wuow(opCtx(), true /* groupOplogEntries */); - - for (auto& [rid, snapshotId] : _ridMap) { + for (; batchIdx < _stagedDeletesBuffer.size(); ++batchIdx) { if (MONGO_unlikely(throwWriteConflictExceptionInBatchedDeleteStage.shouldFail())) { throw WriteConflictException(); } + auto& stagedDocument = _stagedDeletesBuffer.at(batchIdx); + // The PlanExecutor YieldPolicy may change snapshots between calls to 'doWork()'. // Different documents may have different snapshots. - bool docStillMatches = - ensureStillMatches(opCtx(), collection(), rid, snapshotId, _params->canonicalQuery); + bool docStillMatches = ensureStillMatches(opCtx(), + collection(), + stagedDocument.rid, + stagedDocument.snapshotId, + _params->canonicalQuery); if (docStillMatches) { collection()->deleteDocument(opCtx(), _params->stmtId, - rid, + stagedDocument.rid, _params->opDebug, _params->fromMigrate, false, @@ -195,41 +203,47 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { docsDeleted++; } else { - recordsThatNoLongerMatch.emplace_back(rid); + recordsThatNoLongerMatch.insert(stagedDocument.rid); + } + + const Milliseconds elapsedMillis(batchTimer.millis()); + if (_batchParams->targetBatchTimeMS != Milliseconds(0) && + elapsedMillis >= _batchParams->targetBatchTimeMS) { + // Met targetBatchTimeMS after evaluating _ridSnapShotBuffer[batchIdx]. + break; } } + wuow.commit(); } catch (const WriteConflictException&) { - for (auto rid : recordsThatNoLongerMatch) { - // Lazily delete records that no longer match from the buffer. They should be skipped - // when the deletes are retryed. - _ridMap.erase(rid); - } - - _drainRemainingBuffer = true; - *out = WorkingSet::INVALID_ID; - return NEED_YIELD; + return _prepareToRetryDrainAfterWCE(out, recordsThatNoLongerMatch); } - const auto endOfBatchTimestampMillis = Date_t::now().toMillisSinceEpoch(); incrementSSSMetricNoOverflow(batchedDeletesSSS.docs, docsDeleted); incrementSSSMetricNoOverflow(batchedDeletesSSS.batches, 1); - incrementSSSMetricNoOverflow(batchedDeletesSSS.timeMillis, - endOfBatchTimestampMillis - startOfBatchTimestampMillis); + incrementSSSMetricNoOverflow(batchedDeletesSSS.timeMillis, batchTimer.millis()); // TODO (SERVER-63039): report batch size _specificStats.docsDeleted += docsDeleted; - _ridMap.clear(); - _drainRemainingBuffer = false; - try { - child()->restoreState(&collection()); - } catch (const WriteConflictException&) { - // Note we don't need to retry anything in this case since the delete already was committed. - *out = WorkingSet::INVALID_ID; - return NEED_YIELD; + if (batchIdx < _stagedDeletesBuffer.size() - 1) { + // _stagedDeletesBuffer[batchIdx] is the last document evaluated in this batch - and it is + // not the last element in the buffer. targetBatchTimeMS was exceeded. Remove all records + // that have been evaluated (deleted or skipped because they no longer match the query) from + // the buffer before retrying. + _stagedDeletesBuffer.erase(_stagedDeletesBuffer.begin(), + _stagedDeletesBuffer.begin() + batchIdx + 1); + + _drainRemainingBuffer = true; + return _tryRestoreState(out); } - return NEED_TIME; + // The elements in the buffer are preserved during document deletion so deletes can be retried + // in case of a write conflict. No write conflict occurred, update the buffer that all documents + // are deleted. + _stagedDeletesBuffer.clear(); + _drainRemainingBuffer = false; + + return _tryRestoreState(out); } PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) { @@ -249,10 +263,13 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) { return status; case PlanStage::IS_EOF: - if (!_ridMap.empty()) { + if (!_stagedDeletesBuffer.empty()) { // Drain the outstanding deletions. auto ret = _deleteBatch(out); - if (ret != NEED_TIME) { + if (ret != NEED_TIME || (ret == NEED_TIME && _drainRemainingBuffer == true)) { + // Only return NEED_TIME if there is more to drain in the buffer. Otherwise, + // there is no more to fetch and NEED_TIME signals all documents have been + // sucessfully deleted. return ret; } } @@ -276,18 +293,45 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) { invariant(member->hasObj()); if (!_params->isExplain) { - _ridMap.insert(std::make_pair(recordId, member->doc.snapshotId())); + _stagedDeletesBuffer.push_back({recordId, member->doc.snapshotId()}); } } if (!_params->isExplain && (_drainRemainingBuffer || (_batchParams->targetBatchDocs && - _ridMap.size() >= static_cast<unsigned long long>(_batchParams->targetBatchDocs)))) { + _stagedDeletesBuffer.size() >= + static_cast<unsigned long long>(_batchParams->targetBatchDocs)))) { return _deleteBatch(out); } return PlanStage::NEED_TIME; } +PlanStage::StageState BatchedDeleteStage::_tryRestoreState(WorkingSetID* out) { + try { + child()->restoreState(&collection()); + } catch (const WriteConflictException&) { + *out = WorkingSet::INVALID_ID; + return NEED_YIELD; + } + return NEED_TIME; +} + +PlanStage::StageState BatchedDeleteStage::_prepareToRetryDrainAfterWCE( + WorkingSetID* out, const std::set<RecordId>& recordsThatNoLongerMatch) { + // Remove records that no longer match the query before retrying. + _stagedDeletesBuffer.erase(std::remove_if(_stagedDeletesBuffer.begin(), + _stagedDeletesBuffer.end(), + [&](const auto& stagedDelete) { + return recordsThatNoLongerMatch.find( + stagedDelete.rid) != + recordsThatNoLongerMatch.end(); + }), + _stagedDeletesBuffer.end()); + *out = WorkingSet::INVALID_ID; + _drainRemainingBuffer = true; + return NEED_YIELD; +} + } // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage.h b/src/mongo/db/exec/batched_delete_stage.h index d0434013b3f..ffb16a2564b 100644 --- a/src/mongo/db/exec/batched_delete_stage.h +++ b/src/mongo/db/exec/batched_delete_stage.h @@ -88,15 +88,31 @@ public: private: /** - * Deletes the documents staged in _ridMap in a batch. - * Returns NEED_TIME on success. + * Returns NEED_YIELD when there is a write conflict. Otherwise, returns NEED_TIME when + * some, or all, of the documents staged in the _stagedDeletesBuffer are successfully deleted. */ PlanStage::StageState _deleteBatch(WorkingSetID* out); - // Maps records to delete to the latest snapshot their data matched the query. Records must be - // deleted in a single WriteUnitOrWork. Operation order has no impact on the outcome of the - // WriteUnitOfWork since all operations become visible at the same time. - stdx::unordered_map<RecordId, SnapshotId, RecordId::Hasher> _ridMap; + // Tries to restore the child's state. Returns NEED_TIME if the restore succeeds, NEED_YIELD + // upon write conflict. + PlanStage::StageState _tryRestoreState(WorkingSetID* out); + + // Prepares to retry draining the _stagedDeletesBuffer after a write conflict. Removes + // 'recordsThatNoLongerMatch' then yields. + PlanStage::StageState _prepareToRetryDrainAfterWCE( + WorkingSetID* out, const std::set<RecordId>& recordsThatNoLongerMatch); + + // Metadata of a document staged for deletion. + struct StagedDocumentMetadata { + RecordId rid; + + // SnapshotId associated with the document when it is staged for deletion. Must be checked + // before deletion to ensure the document still matches the query. + SnapshotId snapshotId; + }; + + // Stores documents staged for deletion. + std::vector<StagedDocumentMetadata> _stagedDeletesBuffer; // Whether there are remaining docs in the buffer from a previous call to doWork() that should // be drained before fetching more documents. diff --git a/src/mongo/dbtests/query_stage_batched_delete.cpp b/src/mongo/dbtests/query_stage_batched_delete.cpp index 3970036378a..fdf55709d99 100644 --- a/src/mongo/dbtests/query_stage_batched_delete.cpp +++ b/src/mongo/dbtests/query_stage_batched_delete.cpp @@ -39,9 +39,11 @@ #include "mongo/db/exec/collection_scan.h" #include "mongo/db/exec/delete_stage.h" #include "mongo/db/exec/queued_data_stage.h" +#include "mongo/db/op_observer_noop.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/service_context.h" #include "mongo/dbtests/dbtests.h" +#include "mongo/util/tick_source_mock.h" namespace mongo { namespace QueryStageBatchedDelete { @@ -50,15 +52,57 @@ static const NamespaceString nss("unittests.QueryStageBatchedDelete"); // For the following tests, fix the targetBatchDocs to 10 documents. static const int targetBatchDocs = 10; +static const Milliseconds targetBatchTimeMS = Milliseconds(5); + +/** + * Simulates how long each document takes to delete. + * + * Deletes on a batch of documents are executed in a single call to BatchedDeleteStage::work(). The + * ClockAdvancingOpObserver is necessary to advance time per document delete, rather than per batch + * delete. + */ +class ClockAdvancingOpObserver : public OpObserverNoop { +public: + void aboutToDelete(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + const BSONObj& doc) override { + + if (docDurationMap.find(doc) != docDurationMap.end()) { + tickSource->advance(docDurationMap.find(doc)->second); + } + } + + void setDeleteRecordDurationMillis(BSONObj targetDoc, Milliseconds duration) { + docDurationMap.insert(std::make_pair(targetDoc, duration)); + } + + SimpleBSONObjUnorderedMap<Milliseconds> docDurationMap; + TickSourceMock<Milliseconds>* tickSource; +}; class QueryStageBatchedDeleteTest : public unittest::Test { public: - QueryStageBatchedDeleteTest() : _client(&_opCtx) {} + QueryStageBatchedDeleteTest() : _client(&_opCtx) { + auto tickSource = std::make_unique<TickSourceMock<Milliseconds>>(); + tickSource->reset(1); + _tickSource = tickSource.get(); + _opCtx.getServiceContext()->setTickSource(std::move(tickSource)); + std::unique_ptr<ClockAdvancingOpObserver> opObserverUniquePtr = + std::make_unique<ClockAdvancingOpObserver>(); + opObserverUniquePtr->tickSource = _tickSource; + _opObserver = opObserverUniquePtr.get(); + _opCtx.getServiceContext()->setOpObserver(std::move(opObserverUniquePtr)); + } virtual ~QueryStageBatchedDeleteTest() { _client.dropCollection(nss.ns()); } + TickSourceMock<Milliseconds>* tickSource() { + return _tickSource; + } + // Populates the collection with nDocs of shape {_id: <int i>, a: <int i>}. void prePopulateCollection(int nDocs) { for (int i = 0; i < nDocs; i++) { @@ -70,6 +114,26 @@ public: _client.insert(nss.ns(), obj); } + // Inserts documents later deleted in a single 'batch' due to targetBatchTimMS or + // targetBatchDocs. Tells the opObserver how much to advance the clock when a document is about + // to be deleted. + void insertTimedBatch(std::vector<std::pair<BSONObj, Milliseconds>> timedBatch) { + Milliseconds totalDurationOfBatch{0}; + for (auto [doc, duration] : timedBatch) { + _client.insert(nss.ns(), doc); + _opObserver->setDeleteRecordDurationMillis(doc, duration); + totalDurationOfBatch += duration; + } + + // Enfore test correctness: + // If the totalDurationOfBatch is larger than the targetBatchTimeMS, the last document of + // the 'timedBatch' made the batch exceed targetBatchTimeMS. + if (totalDurationOfBatch > targetBatchTimeMS) { + auto batchSize = timedBatch.size(); + ASSERT_LT(totalDurationOfBatch - timedBatch[batchSize - 1].second, targetBatchTimeMS); + } + } + void remove(const BSONObj& obj) { _client.remove(nss.ns(), obj); } @@ -123,6 +187,7 @@ public: CollectionScanParams collScanParams; auto batchedDeleteParams = std::make_unique<BatchedDeleteStageBatchParams>(); batchedDeleteParams->targetBatchDocs = targetBatchDocs; + batchedDeleteParams->targetBatchTimeMS = targetBatchTimeMS; // DeleteStageParams must always be multi. auto deleteParams = std::make_unique<DeleteStageParams>(); @@ -144,6 +209,8 @@ protected: boost::intrusive_ptr<ExpressionContext> _expCtx = make_intrusive<ExpressionContext>(&_opCtx, nullptr, nss); + ClockAdvancingOpObserver* _opObserver; + TickSourceMock<Milliseconds>* _tickSource; private: DBDirectClient _client; @@ -422,5 +489,168 @@ TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteStagedDocIsUpdatedToNotMatchCli ASSERT_EQUALS(state, PlanStage::IS_EOF); ASSERT_EQUALS(stats->docsDeleted, nDocs - 1); } + +// Tests targetBatchTimeMS is enforced. +TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteTargetBatchTimeMSBasic) { + dbtests::WriteContextForTests ctx(&_opCtx, nss.ns()); + + std::vector<std::pair<BSONObj, Milliseconds>> timedBatch0{ + {BSON("_id" << 1 << "a" << 1), Milliseconds(2)}, + {BSON("_id" << 2 << "a" << 2), Milliseconds(2)}, + {BSON("_id" << 3 << "a" << 3), Milliseconds(2)}, + }; + std::vector<std::pair<BSONObj, Milliseconds>> timedBatch1{ + {BSON("_id" << 4 << "a" << 4), Milliseconds(2)}, + {BSON("_id" << 5 << "a" << 5), Milliseconds(2)}, + }; + + insertTimedBatch(timedBatch0); + insertTimedBatch(timedBatch1); + + int batchSize0 = timedBatch0.size(); + int batchSize1 = timedBatch1.size(); + int nDocs = batchSize0 + batchSize1; + + const CollectionPtr& coll = ctx.getCollection(); + ASSERT(coll); + + WorkingSet ws; + auto deleteStage = makeBatchedDeleteStage(&ws, coll); + const DeleteStats* stats = static_cast<const DeleteStats*>(deleteStage->getSpecificStats()); + + PlanStage::StageState state = PlanStage::NEED_TIME; + WorkingSetID id = WorkingSet::INVALID_ID; + + // Stages all documents in the buffer before executing deletes since nDocs < + // targetBatchDocs. + { + ASSERT_LTE(nDocs, targetBatchDocs); + for (auto i = 0; i <= nDocs; i++) { + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, 0); + ASSERT_EQ(state, PlanStage::NEED_TIME); + } + } + + // Batch 0 deletions. + { + Timer timer(tickSource()); + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, batchSize0); + ASSERT_EQ(state, PlanStage::NEED_TIME); + ASSERT_GTE(Milliseconds(timer.millis()), targetBatchTimeMS); + } + + // Batch 1 deletions. + { + // Drain the rest of the buffer before fetching from a new WorkingSetMember. + Timer timer(tickSource()); + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, nDocs); + ASSERT_EQ(state, PlanStage::NEED_TIME); + ASSERT_LTE(Milliseconds(timer.millis()), targetBatchTimeMS); + } + + // Completes multi delete execution. + { + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, nDocs); + ASSERT_EQ(state, PlanStage::IS_EOF); + } +} + +// Tests when the total time it takes to delete targetBatchDocs exceeds targetBatchTimeMS. +TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteTargetBatchTimeMSWithTargetBatchDocs) { + dbtests::WriteContextForTests ctx(&_opCtx, nss.ns()); + + std::vector<std::pair<BSONObj, Milliseconds>> timedBatch0{ + {BSON("_id" << 1 << "a" << 1), Milliseconds(1)}, + {BSON("_id" << 2 << "a" << 2), Milliseconds(0)}, + {BSON("_id" << 3 << "a" << 3), Milliseconds(0)}, + {BSON("_id" << 4 << "a" << 4), Milliseconds(0)}, + {BSON("_id" << 5 << "a" << 5), Milliseconds(0)}, + {BSON("_id" << 6 << "a" << 6), Milliseconds(0)}, + {BSON("_id" << 7 << "a" << 7), Milliseconds(0)}, + {BSON("_id" << 8 << "a" << 8), Milliseconds(4)}, + }; + + std::vector<std::pair<BSONObj, Milliseconds>> timedBatch1{ + {BSON("_id" << 9 << "a" << 9), Milliseconds(1)}, + {BSON("_id" << 10 << "a" << 10), Milliseconds(1)}, + }; + + std::vector<std::pair<BSONObj, Milliseconds>> timedBatch2{ + {BSON("_id" << 11 << "a" << 11), Milliseconds(1)}, + {BSON("_id" << 12 << "a" << 12), Milliseconds(1)}, + }; + + // Populate the collection before executing the BatchedDeleteStage. + insertTimedBatch(timedBatch0); + insertTimedBatch(timedBatch1); + insertTimedBatch(timedBatch2); + + int batchSize0 = timedBatch0.size(); + int batchSize1 = timedBatch1.size(); + int batchSize2 = timedBatch2.size(); + int nDocs = batchSize0 + batchSize1 + batchSize2; + + const CollectionPtr& coll = ctx.getCollection(); + ASSERT(coll); + + WorkingSet ws; + auto deleteStage = makeBatchedDeleteStage(&ws, coll); + + const DeleteStats* stats = static_cast<const DeleteStats*>(deleteStage->getSpecificStats()); + + PlanStage::StageState state = PlanStage::NEED_TIME; + WorkingSetID id = WorkingSet::INVALID_ID; + + // Stages up to targetBatchDocs - 1 documents in the buffer. + { + for (auto i = 0; i < targetBatchDocs; i++) { + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, 0); + ASSERT_EQ(state, PlanStage::NEED_TIME); + } + } + + // Batch0 deletions. + { + Timer timer(tickSource()); + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, batchSize0); + ASSERT_EQ(state, PlanStage::NEED_TIME); + ASSERT_GTE(Milliseconds(timer.millis()), targetBatchTimeMS); + } + + // Batch1 deletions. + { + Timer timer(tickSource()); + + // Drain the rest of the buffer before fetching from a new WorkingSetMember. + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, batchSize0 + batchSize1); + ASSERT_EQ(state, PlanStage::NEED_TIME); + ASSERT_LTE(Milliseconds(timer.millis()), targetBatchTimeMS); + } + + // Stages the remaining documents. + { + for (auto i = 0; i < batchSize2; i++) { + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, batchSize0 + batchSize1); + ASSERT_EQ(state, PlanStage::NEED_TIME); + } + } + + // Batch 2 deletions. + { + Timer timer(tickSource()); + state = deleteStage->work(&id); + ASSERT_EQ(stats->docsDeleted, nDocs); + ASSERT_EQ(state, PlanStage::IS_EOF); + ASSERT_LT(Milliseconds(timer.millis()), targetBatchTimeMS); + } +} } // namespace QueryStageBatchedDelete } // namespace mongo |