diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-04-13 07:10:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-13 08:01:54 +0000 |
commit | 799265431351f4c93fb21e5552b33393dfc61737 (patch) | |
tree | 03436369f2e51798352aa08de7b5977f2c4585c5 | |
parent | 9a41daf3be5ac63a7a5d5c9dde1f71f5cd01152b (diff) | |
download | mongo-799265431351f4c93fb21e5552b33393dfc61737.tar.gz |
SERVER-65157 Use WorkingSetMembers to track staged documents in BatchedDeleteStage
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage.cpp | 146 | ||||
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage.h | 36 | ||||
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage_buffer.cpp | 90 | ||||
-rw-r--r-- | src/mongo/db/exec/batched_delete_stage_buffer.h | 85 |
5 files changed, 255 insertions, 103 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index d562fb7cbc1..b08a246c84d 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1439,6 +1439,7 @@ env.Library( 'exec/and_sorted.cpp', 'exec/batched_delete_stage.idl', 'exec/batched_delete_stage.cpp', + 'exec/batched_delete_stage_buffer.cpp', 'exec/cached_plan.cpp', 'exec/collection_scan.cpp', 'exec/count.cpp', diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp index b8a237ccfc3..d04271f9c05 100644 --- a/src/mongo/db/exec/batched_delete_stage.cpp +++ b/src/mongo/db/exec/batched_delete_stage.cpp @@ -76,30 +76,6 @@ void incrementSSSMetricNoOverflow(AtomicWord<long long>& metric, long long value metric.fetchAndAdd(value); } } - -// Returns true to if the Record exists and its data still matches the query. Returns false -// otherwise. -bool ensureStillMatches(OperationContext* opCtx, - const CollectionPtr& collection, - RecordId rid, - SnapshotId snapshotId, - const CanonicalQuery* cq) { - - if (opCtx->recoveryUnit()->getSnapshotId() != snapshotId) { - Snapshotted<BSONObj> docData; - bool docExists = collection->findDoc(opCtx, rid, &docData); - if (!docExists) { - return false; - } - - // Make sure the re-fetched doc still matches the predicate. - if (cq && !cq->root()->matchesBSON(docData.value(), nullptr)) { - // No longer matches. - return false; - } - } - return true; -} } // namespace /** @@ -129,7 +105,6 @@ struct BatchedDeletesSSS : ServerStatusSection { AtomicWord<long long> timeMillis; } batchedDeletesSSS; - BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, std::unique_ptr<DeleteStageParams> params, std::unique_ptr<BatchedDeleteStageBatchParams> batchParams, @@ -138,7 +113,9 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx, PlanStage* child) : DeleteStage::DeleteStage( kStageType.rawData(), expCtx, std::move(params), ws, collection, child), - _batchParams(std::move(batchParams)) { + _batchParams(std::move(batchParams)), + _stagedDeletesBuffer(ws), + _drainRemainingBuffer(false) { tassert(6303800, "batched deletions only support multi-document deletions (multi: true)", _params->isMulti); @@ -179,11 +156,11 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { } - std::set<RecordId> recordsThatNoLongerMatch; + std::set<WorkingSetID> recordsThatNoLongerMatch; Timer batchTimer(opCtx()->getServiceContext()->getTickSource()); unsigned int docsDeleted = 0; - unsigned int batchIdx = 0; + unsigned int bufferOffset = 0; // Estimate the size of the oplog entry that would result from committing the batch, // to ensure we emit an oplog entry that's within the 16MB BSON limit. @@ -193,37 +170,40 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { // Start a WUOW with 'groupOplogEntries' which groups a delete batch into a single timestamp // and oplog entry. WriteUnitOfWork wuow(opCtx(), true /* groupOplogEntries */); - for (; batchIdx < _stagedDeletesBuffer.size(); ++batchIdx) { + for (; bufferOffset < _stagedDeletesBuffer.size(); ++bufferOffset) { if (MONGO_unlikely(throwWriteConflictExceptionInBatchedDeleteStage.shouldFail())) { throw WriteConflictException(); } - auto& stagedDocument = _stagedDeletesBuffer.at(batchIdx); + auto workingSetMemberId = _stagedDeletesBuffer.at(bufferOffset); // The PlanExecutor YieldPolicy may change snapshots between calls to 'doWork()'. // Different documents may have different snapshots. - bool docStillMatches = ensureStillMatches(opCtx(), - collection(), - stagedDocument.rid, - stagedDocument.snapshotId, - _params->canonicalQuery); + bool docStillMatches = write_stage_common::ensureStillMatches( + collection(), opCtx(), _ws, workingSetMemberId, _params->canonicalQuery); + + WorkingSetMember* member = _ws->get(workingSetMemberId); if (docStillMatches) { + Snapshotted<Document> memberDoc = member->doc; + BSONObj bsonObjDoc = memberDoc.value().toBson(); applyOpsBytes += kApplyOpsArrayEntryPaddingBytes; - // TODO (SERVER-65157): get accurate _id size from the snapshotted document. - applyOpsBytes += (stagedDocument.rid.isLong() ? OID::kOIDSize - : stagedDocument.rid.getStr().size()); + tassert(6515700, + "Expected document to have an _id field present", + bsonObjDoc.hasField("_id")); + applyOpsBytes += bsonObjDoc.getField("_id").size(); if (applyOpsBytes > BSONObjMaxUserSize) { // There's no room to fit this deletion in the current batch, as doing so would // exceed 16MB of oplog entry: put this deletion back into the staging buffer // and commit the batch. - invariant(batchIdx > 0); - batchIdx--; + invariant(bufferOffset > 0); + bufferOffset--; break; } collection()->deleteDocument(opCtx(), + Snapshotted(memberDoc.snapshotId(), bsonObjDoc), _params->stmtId, - stagedDocument.rid, + member->recordId, _params->opDebug, _params->fromMigrate, false, @@ -249,14 +229,13 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { data.getIntField("nDocs"); }); } else { - recordsThatNoLongerMatch.insert(stagedDocument.rid); + recordsThatNoLongerMatch.insert(workingSetMemberId); } const Milliseconds elapsedMillis(batchTimer.millis()); if (_batchParams->targetBatchTimeMS != Milliseconds(0) && elapsedMillis >= _batchParams->targetBatchTimeMS) { - // Met targetBatchTimeMS after evaluating _ridSnapShotBuffer[batchIdx]: commit the - // batch. + // Met 'targetBatchTimeMS' after evaluating the staged delete at 'bufferOffset'. break; } } @@ -272,24 +251,20 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) { // TODO (SERVER-63039): report batch size _specificStats.docsDeleted += docsDeleted; - if (batchIdx < _stagedDeletesBuffer.size() - 1) { - // _stagedDeletesBuffer[batchIdx] is the last document evaluated in this batch - and it is - // not the last element in the buffer. targetBatchTimeMS was exceeded. Remove all records - // that have been evaluated (deleted or skipped because they no longer match the query) from - // the buffer before retrying. - _stagedDeletesBuffer.erase(_stagedDeletesBuffer.begin(), - _stagedDeletesBuffer.begin() + batchIdx + 1); - - _drainRemainingBuffer = true; - return _tryRestoreState(out); + if (bufferOffset < _stagedDeletesBuffer.size()) { + // targetBatchTimeMS was met. Remove staged deletes that have been evaluated + // (executed or skipped because they no longer match the query) from the buffer. If any + // staged deletes remain in the buffer, they will be retried in a subsequent batch. + _stagedDeletesBuffer.eraseUpToOffsetInclusive(bufferOffset); + } else { + // The individual deletes staged in the buffer are preserved until the batch is committed so + // they can be retried in case of a write conflict. + // No write conflict occurred, all staged deletes were successfully evaluated/executed, it + // is safe to clear the buffer. + _stagedDeletesBuffer.clear(); } - // The elements in the buffer are preserved during document deletion so deletes can be retried - // in case of a write conflict. No write conflict occurred, update the buffer that all documents - // are deleted. - _stagedDeletesBuffer.clear(); - _drainRemainingBuffer = false; - + _signalIfDrainComplete(); return _tryRestoreState(out); } @@ -313,10 +288,10 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) { if (!_stagedDeletesBuffer.empty()) { // Drain the outstanding deletions. auto ret = _deleteBatch(out); - if (ret != NEED_TIME || (ret == NEED_TIME && _drainRemainingBuffer == true)) { + if (ret != NEED_TIME || (ret == NEED_TIME && _drainRemainingBuffer)) { // Only return NEED_TIME if there is more to drain in the buffer. Otherwise, - // there is no more to fetch and NEED_TIME signals all documents have been - // sucessfully deleted. + // there is no more to fetch and NEED_TIME signals all staged deletes have + // been sucessfully executed. return ret; } } @@ -328,27 +303,29 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) { WorkingSetMember* member = _ws->get(id); - // Free the WSM at the end of this scope. Retries will re-fetch by the RecordId and will not - // need to keep the WSM around ScopeGuard memberFreer([&] { _ws->free(id); }); - invariant(member->hasRecordId()); - RecordId recordId = member->recordId; // Deletes can't have projections. This means that covering analysis will always add // a fetch. We should always get fetched data, and never just key data. invariant(member->hasObj()); if (!_params->isExplain) { - _stagedDeletesBuffer.push_back({recordId, member->doc.snapshotId()}); + // Preserve the member until the delete is committed. Once a delete is staged in the + // buffer, its resources are freed when it is removed from the buffer. + memberFreer.dismiss(); + + // Ensure that the BSONObj underlying the WSM associated with 'id' is owned because + // saveState() is allowed to free the memory the BSONObj points to. The BSONObj will be + // needed later when it is passed to Collection::deleteDocument(). Note that the call to + // makeObjOwnedIfNeeded() will leave the WSM in the RID_AND_OBJ state in case we need to + // retry deleting it. + member->makeObjOwnedIfNeeded(); + _stagedDeletesBuffer.append(id); } } - if (!_params->isExplain && - (_drainRemainingBuffer || - (_batchParams->targetBatchDocs && - _stagedDeletesBuffer.size() >= - static_cast<unsigned long long>(_batchParams->targetBatchDocs)))) { + if (!_params->isExplain && (_drainRemainingBuffer || _batchTargetMet())) { return _deleteBatch(out); } @@ -366,19 +343,20 @@ PlanStage::StageState BatchedDeleteStage::_tryRestoreState(WorkingSetID* out) { } PlanStage::StageState BatchedDeleteStage::_prepareToRetryDrainAfterWCE( - WorkingSetID* out, const std::set<RecordId>& recordsThatNoLongerMatch) { - // Remove records that no longer match the query before retrying. - _stagedDeletesBuffer.erase(std::remove_if(_stagedDeletesBuffer.begin(), - _stagedDeletesBuffer.end(), - [&](const auto& stagedDelete) { - return recordsThatNoLongerMatch.find( - stagedDelete.rid) != - recordsThatNoLongerMatch.end(); - }), - _stagedDeletesBuffer.end()); + WorkingSetID* out, const std::set<WorkingSetID>& recordsThatNoLongerMatch) { + _stagedDeletesBuffer.erase(recordsThatNoLongerMatch); + _signalIfDrainComplete(); *out = WorkingSet::INVALID_ID; - _drainRemainingBuffer = true; return NEED_YIELD; } +void BatchedDeleteStage::_signalIfDrainComplete() { + _drainRemainingBuffer = !_stagedDeletesBuffer.empty(); +} + +bool BatchedDeleteStage::_batchTargetMet() { + return _batchParams->targetBatchDocs && + _stagedDeletesBuffer.size() >= + static_cast<unsigned long long>(_batchParams->targetBatchDocs); +} } // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage.h b/src/mongo/db/exec/batched_delete_stage.h index ffb16a2564b..4aae7b0264b 100644 --- a/src/mongo/db/exec/batched_delete_stage.h +++ b/src/mongo/db/exec/batched_delete_stage.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/exec/batched_delete_stage_buffer.h" #include "mongo/db/exec/batched_delete_stage_gen.h" #include "mongo/db/exec/delete_stage.h" #include "mongo/db/exec/write_stage_common.h" @@ -37,7 +38,7 @@ namespace mongo { /** - * Batch sizing parameters. A batch of documents staged for deletion is committed as soon + * Batch sizing parameters. A batch of staged document deletes is committed as soon * as one of the targets below is met, or upon reaching EOF. */ struct BatchedDeleteStageBatchParams { @@ -57,10 +58,9 @@ struct BatchedDeleteStageBatchParams { }; /** - * The BATCHED_DELETE stage deletes documents in batches, using RecordId's that are returned from - * its child. In comparison, the base class DeleteStage deletes documents one by one. The stage - * returns NEED_TIME after deleting a document, or after staging a document to be deleted in the - * next batch. + * The BATCHED_DELETE stage deletes documents in batches. In comparison, the base class DeleteStage + * deletes documents one by one. The stage returns NEED_TIME after executing a batch of deletes, or + * after staging a delete for the next batch. * * Callers of work() must be holding a write lock (and, for replicated deletes, callers must have * had the replication coordinator approve the write). @@ -100,26 +100,24 @@ private: // Prepares to retry draining the _stagedDeletesBuffer after a write conflict. Removes // 'recordsThatNoLongerMatch' then yields. PlanStage::StageState _prepareToRetryDrainAfterWCE( - WorkingSetID* out, const std::set<RecordId>& recordsThatNoLongerMatch); + WorkingSetID* out, const std::set<WorkingSetID>& recordsThatNoLongerMatch); - // Metadata of a document staged for deletion. - struct StagedDocumentMetadata { - RecordId rid; + // Either signals that all the elements in the buffer have been drained or that there are more + // elements to drain. + void _signalIfDrainComplete(); - // SnapshotId associated with the document when it is staged for deletion. Must be checked - // before deletion to ensure the document still matches the query. - SnapshotId snapshotId; - }; + // Returns true if one or more of the batch targets are met and it is time to delete the batch. + bool _batchTargetMet(); - // Stores documents staged for deletion. - std::vector<StagedDocumentMetadata> _stagedDeletesBuffer; + // Batch targeting parameters. + std::unique_ptr<BatchedDeleteStageBatchParams> _batchParams; + + // Holds information for each document staged for delete. + BatchedDeleteStageBuffer _stagedDeletesBuffer; // Whether there are remaining docs in the buffer from a previous call to doWork() that should // be drained before fetching more documents. - bool _drainRemainingBuffer = false; - - // Batch targeting parameters. - std::unique_ptr<BatchedDeleteStageBatchParams> _batchParams; + bool _drainRemainingBuffer; }; } // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage_buffer.cpp b/src/mongo/db/exec/batched_delete_stage_buffer.cpp new file mode 100644 index 00000000000..56617ad24ad --- /dev/null +++ b/src/mongo/db/exec/batched_delete_stage_buffer.cpp @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/batched_delete_stage_buffer.h" + +#include <fmt/format.h> + +#include "mongo/logv2/log.h" + +namespace mongo { +using namespace fmt::literals; + +BatchedDeleteStageBuffer::BatchedDeleteStageBuffer(WorkingSet* ws) : _ws(ws) {} + +void BatchedDeleteStageBuffer::append(WorkingSetID id) { + _buffer.emplace_back(id); +} + +void BatchedDeleteStageBuffer::eraseUpToOffsetInclusive(size_t bufferOffset) { + tassert(6515701, + "Cannot erase offset '{}' - beyond the size of the BatchedDeleteStageBuffer {}"_format( + bufferOffset, _buffer.size()), + bufferOffset < _buffer.size()); + for (unsigned int i = 0; i <= bufferOffset; i++) { + auto id = _buffer.at(i); + _ws->free(id); + } + + _buffer.erase(_buffer.begin(), _buffer.begin() + bufferOffset + 1); +} + +void BatchedDeleteStageBuffer::erase(const std::set<WorkingSetID>& idsToRemove) { + for (auto& workingSetMemberId : idsToRemove) { + tassert( + 6515702, + "Attempted to free member with WorkingSetId '{}', which does not exist in the BatchedDeleteStageBuffer"_format( + workingSetMemberId), + std::find(_buffer.begin(), _buffer.end(), workingSetMemberId) != _buffer.end()); + + _ws->free(workingSetMemberId); + } + + _buffer.erase(std::remove_if(_buffer.begin(), + _buffer.end(), + [&](auto& workingSetMemberId) { + return idsToRemove.find(workingSetMemberId) != + idsToRemove.end(); + }), + _buffer.end()); +} + +void BatchedDeleteStageBuffer::clear() { + for (auto& workingSetMemberId : _buffer) { + _ws->free(workingSetMemberId); + } + + _buffer.clear(); + invariant(empty()); +} +} // namespace mongo diff --git a/src/mongo/db/exec/batched_delete_stage_buffer.h b/src/mongo/db/exec/batched_delete_stage_buffer.h new file mode 100644 index 00000000000..11624e90cab --- /dev/null +++ b/src/mongo/db/exec/batched_delete_stage_buffer.h @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/working_set_common.h" + +namespace mongo { + +/** + * Buffers documents staged for a batch delete. A document is represented by working set member id + * (WorkingSetID). Frees the documents whenever they are removed from the buffer. + */ +class BatchedDeleteStageBuffer { +public: + BatchedDeleteStageBuffer(WorkingSet* ws); + + size_t size() { + return _buffer.size(); + } + + bool empty() { + return _buffer.empty(); + } + + void append(WorkingSetID id); + + /** + * Returns the WorkingSetID associated with the staged document at 'bufferOffset'. + */ + WorkingSetID at(size_t bufferOffset) { + return _buffer.at(bufferOffset); + } + + /** + * Erases up to 'bufferOffset' where 'bufferOffset' is inclusive. Frees up resources of + * WorkingSetMembers associated with the removed entries. + */ + void eraseUpToOffsetInclusive(size_t bufferOffset); + + /** + * Erases the subset of 'idsToRemove' that exist in the buffer. Frees up resources of the + * WorkingSetMembers associated with the removed entries. + */ + void erase(const std::set<WorkingSetID>& idsToRemove); + + /** + * Clears the buffer and frees up resources of the WorkingSetMembers associated with the removed + * entries. + */ + void clear(); + + +private: + WorkingSet* _ws; + std::vector<WorkingSetID> _buffer; +}; + +} // namespace mongo |