summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-03-30 15:45:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-30 16:18:20 +0000
commit9676cf4ad8d537518eb1b570fc79bad4f31d8a79 (patch)
tree9fc287f245b571582b7eb9aa414950f5619e1bf0
parent38c4df685dd246e27677ff7a0092828f378c3aa5 (diff)
downloadmongo-9676cf4ad8d537518eb1b570fc79bad4f31d8a79.tar.gz
SERVER-63899 Add a target batch execution time to BatchedDeleteStage
-rw-r--r--jstests/noPassthrough/batched_multi_deletes.js3
-rw-r--r--jstests/noPassthrough/batched_multi_deletes_write_conflict.js (renamed from jstests/noPassthrough/batched_multi_deletes_WC.js)0
-rw-r--r--src/mongo/db/exec/batched_delete_stage.cpp112
-rw-r--r--src/mongo/db/exec/batched_delete_stage.h28
-rw-r--r--src/mongo/dbtests/query_stage_batched_delete.cpp232
5 files changed, 334 insertions, 41 deletions
diff --git a/jstests/noPassthrough/batched_multi_deletes.js b/jstests/noPassthrough/batched_multi_deletes.js
index 98aedceddb8..d77b5bb1b21 100644
--- a/jstests/noPassthrough/batched_multi_deletes.js
+++ b/jstests/noPassthrough/batched_multi_deletes.js
@@ -51,6 +51,9 @@ function validateBatchedDeletes(conn) {
assert.commandWorked(
db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1}));
+ // For consistent results, don't enforce the targetBatchTimeMS.
+ assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0}));
+
// Explain plan and executionStats.
{
const expl = db.runCommand({
diff --git a/jstests/noPassthrough/batched_multi_deletes_WC.js b/jstests/noPassthrough/batched_multi_deletes_write_conflict.js
index a59d0549101..a59d0549101 100644
--- a/jstests/noPassthrough/batched_multi_deletes_WC.js
+++ b/jstests/noPassthrough/batched_multi_deletes_write_conflict.js
diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp
index 7c4fac03e24..c91b7daf16d 100644
--- a/src/mongo/db/exec/batched_delete_stage.cpp
+++ b/src/mongo/db/exec/batched_delete_stage.cpp
@@ -87,7 +87,6 @@ bool ensureStillMatches(OperationContext* opCtx,
}
return true;
}
-
} // namespace
/**
@@ -159,33 +158,42 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx,
BatchedDeleteStage::~BatchedDeleteStage() {}
PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
+ tassert(6389900, "Expected documents for batched deletion", _stagedDeletesBuffer.size() != 0);
try {
child()->saveState();
} catch (const WriteConflictException&) {
std::terminate();
}
- const auto startOfBatchTimestampMillis = Date_t::now().toMillisSinceEpoch();
+
+ std::set<RecordId> recordsThatNoLongerMatch;
+ Timer batchTimer(opCtx()->getServiceContext()->getTickSource());
+
unsigned int docsDeleted = 0;
- std::vector<RecordId> recordsThatNoLongerMatch;
+ unsigned int batchIdx = 0;
+
try {
// Start a WUOW with 'groupOplogEntries' which groups a delete batch into a single timestamp
// and oplog entry
WriteUnitOfWork wuow(opCtx(), true /* groupOplogEntries */);
-
- for (auto& [rid, snapshotId] : _ridMap) {
+ for (; batchIdx < _stagedDeletesBuffer.size(); ++batchIdx) {
if (MONGO_unlikely(throwWriteConflictExceptionInBatchedDeleteStage.shouldFail())) {
throw WriteConflictException();
}
+ auto& stagedDocument = _stagedDeletesBuffer.at(batchIdx);
+
// The PlanExecutor YieldPolicy may change snapshots between calls to 'doWork()'.
// Different documents may have different snapshots.
- bool docStillMatches =
- ensureStillMatches(opCtx(), collection(), rid, snapshotId, _params->canonicalQuery);
+ bool docStillMatches = ensureStillMatches(opCtx(),
+ collection(),
+ stagedDocument.rid,
+ stagedDocument.snapshotId,
+ _params->canonicalQuery);
if (docStillMatches) {
collection()->deleteDocument(opCtx(),
_params->stmtId,
- rid,
+ stagedDocument.rid,
_params->opDebug,
_params->fromMigrate,
false,
@@ -195,41 +203,47 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
docsDeleted++;
} else {
- recordsThatNoLongerMatch.emplace_back(rid);
+ recordsThatNoLongerMatch.insert(stagedDocument.rid);
+ }
+
+ const Milliseconds elapsedMillis(batchTimer.millis());
+ if (_batchParams->targetBatchTimeMS != Milliseconds(0) &&
+ elapsedMillis >= _batchParams->targetBatchTimeMS) {
+ // Met targetBatchTimeMS after evaluating _ridSnapShotBuffer[batchIdx].
+ break;
}
}
+
wuow.commit();
} catch (const WriteConflictException&) {
- for (auto rid : recordsThatNoLongerMatch) {
- // Lazily delete records that no longer match from the buffer. They should be skipped
- // when the deletes are retryed.
- _ridMap.erase(rid);
- }
-
- _drainRemainingBuffer = true;
- *out = WorkingSet::INVALID_ID;
- return NEED_YIELD;
+ return _prepareToRetryDrainAfterWCE(out, recordsThatNoLongerMatch);
}
- const auto endOfBatchTimestampMillis = Date_t::now().toMillisSinceEpoch();
incrementSSSMetricNoOverflow(batchedDeletesSSS.docs, docsDeleted);
incrementSSSMetricNoOverflow(batchedDeletesSSS.batches, 1);
- incrementSSSMetricNoOverflow(batchedDeletesSSS.timeMillis,
- endOfBatchTimestampMillis - startOfBatchTimestampMillis);
+ incrementSSSMetricNoOverflow(batchedDeletesSSS.timeMillis, batchTimer.millis());
// TODO (SERVER-63039): report batch size
_specificStats.docsDeleted += docsDeleted;
- _ridMap.clear();
- _drainRemainingBuffer = false;
- try {
- child()->restoreState(&collection());
- } catch (const WriteConflictException&) {
- // Note we don't need to retry anything in this case since the delete already was committed.
- *out = WorkingSet::INVALID_ID;
- return NEED_YIELD;
+ if (batchIdx < _stagedDeletesBuffer.size() - 1) {
+ // _stagedDeletesBuffer[batchIdx] is the last document evaluated in this batch - and it is
+ // not the last element in the buffer. targetBatchTimeMS was exceeded. Remove all records
+ // that have been evaluated (deleted or skipped because they no longer match the query) from
+ // the buffer before retrying.
+ _stagedDeletesBuffer.erase(_stagedDeletesBuffer.begin(),
+ _stagedDeletesBuffer.begin() + batchIdx + 1);
+
+ _drainRemainingBuffer = true;
+ return _tryRestoreState(out);
}
- return NEED_TIME;
+ // The elements in the buffer are preserved during document deletion so deletes can be retried
+ // in case of a write conflict. No write conflict occurred, update the buffer that all documents
+ // are deleted.
+ _stagedDeletesBuffer.clear();
+ _drainRemainingBuffer = false;
+
+ return _tryRestoreState(out);
}
PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) {
@@ -249,10 +263,13 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) {
return status;
case PlanStage::IS_EOF:
- if (!_ridMap.empty()) {
+ if (!_stagedDeletesBuffer.empty()) {
// Drain the outstanding deletions.
auto ret = _deleteBatch(out);
- if (ret != NEED_TIME) {
+ if (ret != NEED_TIME || (ret == NEED_TIME && _drainRemainingBuffer == true)) {
+ // Only return NEED_TIME if there is more to drain in the buffer. Otherwise,
+ // there is no more to fetch and NEED_TIME signals all documents have been
+ // sucessfully deleted.
return ret;
}
}
@@ -276,18 +293,45 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) {
invariant(member->hasObj());
if (!_params->isExplain) {
- _ridMap.insert(std::make_pair(recordId, member->doc.snapshotId()));
+ _stagedDeletesBuffer.push_back({recordId, member->doc.snapshotId()});
}
}
if (!_params->isExplain &&
(_drainRemainingBuffer ||
(_batchParams->targetBatchDocs &&
- _ridMap.size() >= static_cast<unsigned long long>(_batchParams->targetBatchDocs)))) {
+ _stagedDeletesBuffer.size() >=
+ static_cast<unsigned long long>(_batchParams->targetBatchDocs)))) {
return _deleteBatch(out);
}
return PlanStage::NEED_TIME;
}
+PlanStage::StageState BatchedDeleteStage::_tryRestoreState(WorkingSetID* out) {
+ try {
+ child()->restoreState(&collection());
+ } catch (const WriteConflictException&) {
+ *out = WorkingSet::INVALID_ID;
+ return NEED_YIELD;
+ }
+ return NEED_TIME;
+}
+
+PlanStage::StageState BatchedDeleteStage::_prepareToRetryDrainAfterWCE(
+ WorkingSetID* out, const std::set<RecordId>& recordsThatNoLongerMatch) {
+ // Remove records that no longer match the query before retrying.
+ _stagedDeletesBuffer.erase(std::remove_if(_stagedDeletesBuffer.begin(),
+ _stagedDeletesBuffer.end(),
+ [&](const auto& stagedDelete) {
+ return recordsThatNoLongerMatch.find(
+ stagedDelete.rid) !=
+ recordsThatNoLongerMatch.end();
+ }),
+ _stagedDeletesBuffer.end());
+ *out = WorkingSet::INVALID_ID;
+ _drainRemainingBuffer = true;
+ return NEED_YIELD;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/exec/batched_delete_stage.h b/src/mongo/db/exec/batched_delete_stage.h
index d0434013b3f..ffb16a2564b 100644
--- a/src/mongo/db/exec/batched_delete_stage.h
+++ b/src/mongo/db/exec/batched_delete_stage.h
@@ -88,15 +88,31 @@ public:
private:
/**
- * Deletes the documents staged in _ridMap in a batch.
- * Returns NEED_TIME on success.
+ * Returns NEED_YIELD when there is a write conflict. Otherwise, returns NEED_TIME when
+ * some, or all, of the documents staged in the _stagedDeletesBuffer are successfully deleted.
*/
PlanStage::StageState _deleteBatch(WorkingSetID* out);
- // Maps records to delete to the latest snapshot their data matched the query. Records must be
- // deleted in a single WriteUnitOrWork. Operation order has no impact on the outcome of the
- // WriteUnitOfWork since all operations become visible at the same time.
- stdx::unordered_map<RecordId, SnapshotId, RecordId::Hasher> _ridMap;
+ // Tries to restore the child's state. Returns NEED_TIME if the restore succeeds, NEED_YIELD
+ // upon write conflict.
+ PlanStage::StageState _tryRestoreState(WorkingSetID* out);
+
+ // Prepares to retry draining the _stagedDeletesBuffer after a write conflict. Removes
+ // 'recordsThatNoLongerMatch' then yields.
+ PlanStage::StageState _prepareToRetryDrainAfterWCE(
+ WorkingSetID* out, const std::set<RecordId>& recordsThatNoLongerMatch);
+
+ // Metadata of a document staged for deletion.
+ struct StagedDocumentMetadata {
+ RecordId rid;
+
+ // SnapshotId associated with the document when it is staged for deletion. Must be checked
+ // before deletion to ensure the document still matches the query.
+ SnapshotId snapshotId;
+ };
+
+ // Stores documents staged for deletion.
+ std::vector<StagedDocumentMetadata> _stagedDeletesBuffer;
// Whether there are remaining docs in the buffer from a previous call to doWork() that should
// be drained before fetching more documents.
diff --git a/src/mongo/dbtests/query_stage_batched_delete.cpp b/src/mongo/dbtests/query_stage_batched_delete.cpp
index 3970036378a..fdf55709d99 100644
--- a/src/mongo/dbtests/query_stage_batched_delete.cpp
+++ b/src/mongo/dbtests/query_stage_batched_delete.cpp
@@ -39,9 +39,11 @@
#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/delete_stage.h"
#include "mongo/db/exec/queued_data_stage.h"
+#include "mongo/db/op_observer_noop.h"
#include "mongo/db/query/canonical_query.h"
#include "mongo/db/service_context.h"
#include "mongo/dbtests/dbtests.h"
+#include "mongo/util/tick_source_mock.h"
namespace mongo {
namespace QueryStageBatchedDelete {
@@ -50,15 +52,57 @@ static const NamespaceString nss("unittests.QueryStageBatchedDelete");
// For the following tests, fix the targetBatchDocs to 10 documents.
static const int targetBatchDocs = 10;
+static const Milliseconds targetBatchTimeMS = Milliseconds(5);
+
+/**
+ * Simulates how long each document takes to delete.
+ *
+ * Deletes on a batch of documents are executed in a single call to BatchedDeleteStage::work(). The
+ * ClockAdvancingOpObserver is necessary to advance time per document delete, rather than per batch
+ * delete.
+ */
+class ClockAdvancingOpObserver : public OpObserverNoop {
+public:
+ void aboutToDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ const BSONObj& doc) override {
+
+ if (docDurationMap.find(doc) != docDurationMap.end()) {
+ tickSource->advance(docDurationMap.find(doc)->second);
+ }
+ }
+
+ void setDeleteRecordDurationMillis(BSONObj targetDoc, Milliseconds duration) {
+ docDurationMap.insert(std::make_pair(targetDoc, duration));
+ }
+
+ SimpleBSONObjUnorderedMap<Milliseconds> docDurationMap;
+ TickSourceMock<Milliseconds>* tickSource;
+};
class QueryStageBatchedDeleteTest : public unittest::Test {
public:
- QueryStageBatchedDeleteTest() : _client(&_opCtx) {}
+ QueryStageBatchedDeleteTest() : _client(&_opCtx) {
+ auto tickSource = std::make_unique<TickSourceMock<Milliseconds>>();
+ tickSource->reset(1);
+ _tickSource = tickSource.get();
+ _opCtx.getServiceContext()->setTickSource(std::move(tickSource));
+ std::unique_ptr<ClockAdvancingOpObserver> opObserverUniquePtr =
+ std::make_unique<ClockAdvancingOpObserver>();
+ opObserverUniquePtr->tickSource = _tickSource;
+ _opObserver = opObserverUniquePtr.get();
+ _opCtx.getServiceContext()->setOpObserver(std::move(opObserverUniquePtr));
+ }
virtual ~QueryStageBatchedDeleteTest() {
_client.dropCollection(nss.ns());
}
+ TickSourceMock<Milliseconds>* tickSource() {
+ return _tickSource;
+ }
+
// Populates the collection with nDocs of shape {_id: <int i>, a: <int i>}.
void prePopulateCollection(int nDocs) {
for (int i = 0; i < nDocs; i++) {
@@ -70,6 +114,26 @@ public:
_client.insert(nss.ns(), obj);
}
+ // Inserts documents later deleted in a single 'batch' due to targetBatchTimMS or
+ // targetBatchDocs. Tells the opObserver how much to advance the clock when a document is about
+ // to be deleted.
+ void insertTimedBatch(std::vector<std::pair<BSONObj, Milliseconds>> timedBatch) {
+ Milliseconds totalDurationOfBatch{0};
+ for (auto [doc, duration] : timedBatch) {
+ _client.insert(nss.ns(), doc);
+ _opObserver->setDeleteRecordDurationMillis(doc, duration);
+ totalDurationOfBatch += duration;
+ }
+
+ // Enfore test correctness:
+ // If the totalDurationOfBatch is larger than the targetBatchTimeMS, the last document of
+ // the 'timedBatch' made the batch exceed targetBatchTimeMS.
+ if (totalDurationOfBatch > targetBatchTimeMS) {
+ auto batchSize = timedBatch.size();
+ ASSERT_LT(totalDurationOfBatch - timedBatch[batchSize - 1].second, targetBatchTimeMS);
+ }
+ }
+
void remove(const BSONObj& obj) {
_client.remove(nss.ns(), obj);
}
@@ -123,6 +187,7 @@ public:
CollectionScanParams collScanParams;
auto batchedDeleteParams = std::make_unique<BatchedDeleteStageBatchParams>();
batchedDeleteParams->targetBatchDocs = targetBatchDocs;
+ batchedDeleteParams->targetBatchTimeMS = targetBatchTimeMS;
// DeleteStageParams must always be multi.
auto deleteParams = std::make_unique<DeleteStageParams>();
@@ -144,6 +209,8 @@ protected:
boost::intrusive_ptr<ExpressionContext> _expCtx =
make_intrusive<ExpressionContext>(&_opCtx, nullptr, nss);
+ ClockAdvancingOpObserver* _opObserver;
+ TickSourceMock<Milliseconds>* _tickSource;
private:
DBDirectClient _client;
@@ -422,5 +489,168 @@ TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteStagedDocIsUpdatedToNotMatchCli
ASSERT_EQUALS(state, PlanStage::IS_EOF);
ASSERT_EQUALS(stats->docsDeleted, nDocs - 1);
}
+
+// Tests targetBatchTimeMS is enforced.
+TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteTargetBatchTimeMSBasic) {
+ dbtests::WriteContextForTests ctx(&_opCtx, nss.ns());
+
+ std::vector<std::pair<BSONObj, Milliseconds>> timedBatch0{
+ {BSON("_id" << 1 << "a" << 1), Milliseconds(2)},
+ {BSON("_id" << 2 << "a" << 2), Milliseconds(2)},
+ {BSON("_id" << 3 << "a" << 3), Milliseconds(2)},
+ };
+ std::vector<std::pair<BSONObj, Milliseconds>> timedBatch1{
+ {BSON("_id" << 4 << "a" << 4), Milliseconds(2)},
+ {BSON("_id" << 5 << "a" << 5), Milliseconds(2)},
+ };
+
+ insertTimedBatch(timedBatch0);
+ insertTimedBatch(timedBatch1);
+
+ int batchSize0 = timedBatch0.size();
+ int batchSize1 = timedBatch1.size();
+ int nDocs = batchSize0 + batchSize1;
+
+ const CollectionPtr& coll = ctx.getCollection();
+ ASSERT(coll);
+
+ WorkingSet ws;
+ auto deleteStage = makeBatchedDeleteStage(&ws, coll);
+ const DeleteStats* stats = static_cast<const DeleteStats*>(deleteStage->getSpecificStats());
+
+ PlanStage::StageState state = PlanStage::NEED_TIME;
+ WorkingSetID id = WorkingSet::INVALID_ID;
+
+ // Stages all documents in the buffer before executing deletes since nDocs <
+ // targetBatchDocs.
+ {
+ ASSERT_LTE(nDocs, targetBatchDocs);
+ for (auto i = 0; i <= nDocs; i++) {
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, 0);
+ ASSERT_EQ(state, PlanStage::NEED_TIME);
+ }
+ }
+
+ // Batch 0 deletions.
+ {
+ Timer timer(tickSource());
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, batchSize0);
+ ASSERT_EQ(state, PlanStage::NEED_TIME);
+ ASSERT_GTE(Milliseconds(timer.millis()), targetBatchTimeMS);
+ }
+
+ // Batch 1 deletions.
+ {
+ // Drain the rest of the buffer before fetching from a new WorkingSetMember.
+ Timer timer(tickSource());
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, nDocs);
+ ASSERT_EQ(state, PlanStage::NEED_TIME);
+ ASSERT_LTE(Milliseconds(timer.millis()), targetBatchTimeMS);
+ }
+
+ // Completes multi delete execution.
+ {
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, nDocs);
+ ASSERT_EQ(state, PlanStage::IS_EOF);
+ }
+}
+
+// Tests when the total time it takes to delete targetBatchDocs exceeds targetBatchTimeMS.
+TEST_F(QueryStageBatchedDeleteTest, BatchedDeleteTargetBatchTimeMSWithTargetBatchDocs) {
+ dbtests::WriteContextForTests ctx(&_opCtx, nss.ns());
+
+ std::vector<std::pair<BSONObj, Milliseconds>> timedBatch0{
+ {BSON("_id" << 1 << "a" << 1), Milliseconds(1)},
+ {BSON("_id" << 2 << "a" << 2), Milliseconds(0)},
+ {BSON("_id" << 3 << "a" << 3), Milliseconds(0)},
+ {BSON("_id" << 4 << "a" << 4), Milliseconds(0)},
+ {BSON("_id" << 5 << "a" << 5), Milliseconds(0)},
+ {BSON("_id" << 6 << "a" << 6), Milliseconds(0)},
+ {BSON("_id" << 7 << "a" << 7), Milliseconds(0)},
+ {BSON("_id" << 8 << "a" << 8), Milliseconds(4)},
+ };
+
+ std::vector<std::pair<BSONObj, Milliseconds>> timedBatch1{
+ {BSON("_id" << 9 << "a" << 9), Milliseconds(1)},
+ {BSON("_id" << 10 << "a" << 10), Milliseconds(1)},
+ };
+
+ std::vector<std::pair<BSONObj, Milliseconds>> timedBatch2{
+ {BSON("_id" << 11 << "a" << 11), Milliseconds(1)},
+ {BSON("_id" << 12 << "a" << 12), Milliseconds(1)},
+ };
+
+ // Populate the collection before executing the BatchedDeleteStage.
+ insertTimedBatch(timedBatch0);
+ insertTimedBatch(timedBatch1);
+ insertTimedBatch(timedBatch2);
+
+ int batchSize0 = timedBatch0.size();
+ int batchSize1 = timedBatch1.size();
+ int batchSize2 = timedBatch2.size();
+ int nDocs = batchSize0 + batchSize1 + batchSize2;
+
+ const CollectionPtr& coll = ctx.getCollection();
+ ASSERT(coll);
+
+ WorkingSet ws;
+ auto deleteStage = makeBatchedDeleteStage(&ws, coll);
+
+ const DeleteStats* stats = static_cast<const DeleteStats*>(deleteStage->getSpecificStats());
+
+ PlanStage::StageState state = PlanStage::NEED_TIME;
+ WorkingSetID id = WorkingSet::INVALID_ID;
+
+ // Stages up to targetBatchDocs - 1 documents in the buffer.
+ {
+ for (auto i = 0; i < targetBatchDocs; i++) {
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, 0);
+ ASSERT_EQ(state, PlanStage::NEED_TIME);
+ }
+ }
+
+ // Batch0 deletions.
+ {
+ Timer timer(tickSource());
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, batchSize0);
+ ASSERT_EQ(state, PlanStage::NEED_TIME);
+ ASSERT_GTE(Milliseconds(timer.millis()), targetBatchTimeMS);
+ }
+
+ // Batch1 deletions.
+ {
+ Timer timer(tickSource());
+
+ // Drain the rest of the buffer before fetching from a new WorkingSetMember.
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, batchSize0 + batchSize1);
+ ASSERT_EQ(state, PlanStage::NEED_TIME);
+ ASSERT_LTE(Milliseconds(timer.millis()), targetBatchTimeMS);
+ }
+
+ // Stages the remaining documents.
+ {
+ for (auto i = 0; i < batchSize2; i++) {
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, batchSize0 + batchSize1);
+ ASSERT_EQ(state, PlanStage::NEED_TIME);
+ }
+ }
+
+ // Batch 2 deletions.
+ {
+ Timer timer(tickSource());
+ state = deleteStage->work(&id);
+ ASSERT_EQ(stats->docsDeleted, nDocs);
+ ASSERT_EQ(state, PlanStage::IS_EOF);
+ ASSERT_LT(Milliseconds(timer.millis()), targetBatchTimeMS);
+ }
+}
} // namespace QueryStageBatchedDelete
} // namespace mongo