summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-04-13 07:10:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-13 08:01:54 +0000
commit799265431351f4c93fb21e5552b33393dfc61737 (patch)
tree03436369f2e51798352aa08de7b5977f2c4585c5
parent9a41daf3be5ac63a7a5d5c9dde1f71f5cd01152b (diff)
downloadmongo-799265431351f4c93fb21e5552b33393dfc61737.tar.gz
SERVER-65157 Use WorkingSetMembers to track staged documents in BatchedDeleteStage
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/exec/batched_delete_stage.cpp146
-rw-r--r--src/mongo/db/exec/batched_delete_stage.h36
-rw-r--r--src/mongo/db/exec/batched_delete_stage_buffer.cpp90
-rw-r--r--src/mongo/db/exec/batched_delete_stage_buffer.h85
5 files changed, 255 insertions, 103 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index d562fb7cbc1..b08a246c84d 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1439,6 +1439,7 @@ env.Library(
'exec/and_sorted.cpp',
'exec/batched_delete_stage.idl',
'exec/batched_delete_stage.cpp',
+ 'exec/batched_delete_stage_buffer.cpp',
'exec/cached_plan.cpp',
'exec/collection_scan.cpp',
'exec/count.cpp',
diff --git a/src/mongo/db/exec/batched_delete_stage.cpp b/src/mongo/db/exec/batched_delete_stage.cpp
index b8a237ccfc3..d04271f9c05 100644
--- a/src/mongo/db/exec/batched_delete_stage.cpp
+++ b/src/mongo/db/exec/batched_delete_stage.cpp
@@ -76,30 +76,6 @@ void incrementSSSMetricNoOverflow(AtomicWord<long long>& metric, long long value
metric.fetchAndAdd(value);
}
}
-
-// Returns true to if the Record exists and its data still matches the query. Returns false
-// otherwise.
-bool ensureStillMatches(OperationContext* opCtx,
- const CollectionPtr& collection,
- RecordId rid,
- SnapshotId snapshotId,
- const CanonicalQuery* cq) {
-
- if (opCtx->recoveryUnit()->getSnapshotId() != snapshotId) {
- Snapshotted<BSONObj> docData;
- bool docExists = collection->findDoc(opCtx, rid, &docData);
- if (!docExists) {
- return false;
- }
-
- // Make sure the re-fetched doc still matches the predicate.
- if (cq && !cq->root()->matchesBSON(docData.value(), nullptr)) {
- // No longer matches.
- return false;
- }
- }
- return true;
-}
} // namespace
/**
@@ -129,7 +105,6 @@ struct BatchedDeletesSSS : ServerStatusSection {
AtomicWord<long long> timeMillis;
} batchedDeletesSSS;
-
BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx,
std::unique_ptr<DeleteStageParams> params,
std::unique_ptr<BatchedDeleteStageBatchParams> batchParams,
@@ -138,7 +113,9 @@ BatchedDeleteStage::BatchedDeleteStage(ExpressionContext* expCtx,
PlanStage* child)
: DeleteStage::DeleteStage(
kStageType.rawData(), expCtx, std::move(params), ws, collection, child),
- _batchParams(std::move(batchParams)) {
+ _batchParams(std::move(batchParams)),
+ _stagedDeletesBuffer(ws),
+ _drainRemainingBuffer(false) {
tassert(6303800,
"batched deletions only support multi-document deletions (multi: true)",
_params->isMulti);
@@ -179,11 +156,11 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
}
- std::set<RecordId> recordsThatNoLongerMatch;
+ std::set<WorkingSetID> recordsThatNoLongerMatch;
Timer batchTimer(opCtx()->getServiceContext()->getTickSource());
unsigned int docsDeleted = 0;
- unsigned int batchIdx = 0;
+ unsigned int bufferOffset = 0;
// Estimate the size of the oplog entry that would result from committing the batch,
// to ensure we emit an oplog entry that's within the 16MB BSON limit.
@@ -193,37 +170,40 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
// Start a WUOW with 'groupOplogEntries' which groups a delete batch into a single timestamp
// and oplog entry.
WriteUnitOfWork wuow(opCtx(), true /* groupOplogEntries */);
- for (; batchIdx < _stagedDeletesBuffer.size(); ++batchIdx) {
+ for (; bufferOffset < _stagedDeletesBuffer.size(); ++bufferOffset) {
if (MONGO_unlikely(throwWriteConflictExceptionInBatchedDeleteStage.shouldFail())) {
throw WriteConflictException();
}
- auto& stagedDocument = _stagedDeletesBuffer.at(batchIdx);
+ auto workingSetMemberId = _stagedDeletesBuffer.at(bufferOffset);
// The PlanExecutor YieldPolicy may change snapshots between calls to 'doWork()'.
// Different documents may have different snapshots.
- bool docStillMatches = ensureStillMatches(opCtx(),
- collection(),
- stagedDocument.rid,
- stagedDocument.snapshotId,
- _params->canonicalQuery);
+ bool docStillMatches = write_stage_common::ensureStillMatches(
+ collection(), opCtx(), _ws, workingSetMemberId, _params->canonicalQuery);
+
+ WorkingSetMember* member = _ws->get(workingSetMemberId);
if (docStillMatches) {
+ Snapshotted<Document> memberDoc = member->doc;
+ BSONObj bsonObjDoc = memberDoc.value().toBson();
applyOpsBytes += kApplyOpsArrayEntryPaddingBytes;
- // TODO (SERVER-65157): get accurate _id size from the snapshotted document.
- applyOpsBytes += (stagedDocument.rid.isLong() ? OID::kOIDSize
- : stagedDocument.rid.getStr().size());
+ tassert(6515700,
+ "Expected document to have an _id field present",
+ bsonObjDoc.hasField("_id"));
+ applyOpsBytes += bsonObjDoc.getField("_id").size();
if (applyOpsBytes > BSONObjMaxUserSize) {
// There's no room to fit this deletion in the current batch, as doing so would
// exceed 16MB of oplog entry: put this deletion back into the staging buffer
// and commit the batch.
- invariant(batchIdx > 0);
- batchIdx--;
+ invariant(bufferOffset > 0);
+ bufferOffset--;
break;
}
collection()->deleteDocument(opCtx(),
+ Snapshotted(memberDoc.snapshotId(), bsonObjDoc),
_params->stmtId,
- stagedDocument.rid,
+ member->recordId,
_params->opDebug,
_params->fromMigrate,
false,
@@ -249,14 +229,13 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
data.getIntField("nDocs");
});
} else {
- recordsThatNoLongerMatch.insert(stagedDocument.rid);
+ recordsThatNoLongerMatch.insert(workingSetMemberId);
}
const Milliseconds elapsedMillis(batchTimer.millis());
if (_batchParams->targetBatchTimeMS != Milliseconds(0) &&
elapsedMillis >= _batchParams->targetBatchTimeMS) {
- // Met targetBatchTimeMS after evaluating _ridSnapShotBuffer[batchIdx]: commit the
- // batch.
+ // Met 'targetBatchTimeMS' after evaluating the staged delete at 'bufferOffset'.
break;
}
}
@@ -272,24 +251,20 @@ PlanStage::StageState BatchedDeleteStage::_deleteBatch(WorkingSetID* out) {
// TODO (SERVER-63039): report batch size
_specificStats.docsDeleted += docsDeleted;
- if (batchIdx < _stagedDeletesBuffer.size() - 1) {
- // _stagedDeletesBuffer[batchIdx] is the last document evaluated in this batch - and it is
- // not the last element in the buffer. targetBatchTimeMS was exceeded. Remove all records
- // that have been evaluated (deleted or skipped because they no longer match the query) from
- // the buffer before retrying.
- _stagedDeletesBuffer.erase(_stagedDeletesBuffer.begin(),
- _stagedDeletesBuffer.begin() + batchIdx + 1);
-
- _drainRemainingBuffer = true;
- return _tryRestoreState(out);
+ if (bufferOffset < _stagedDeletesBuffer.size()) {
+ // targetBatchTimeMS was met. Remove staged deletes that have been evaluated
+ // (executed or skipped because they no longer match the query) from the buffer. If any
+ // staged deletes remain in the buffer, they will be retried in a subsequent batch.
+ _stagedDeletesBuffer.eraseUpToOffsetInclusive(bufferOffset);
+ } else {
+ // The individual deletes staged in the buffer are preserved until the batch is committed so
+ // they can be retried in case of a write conflict.
+ // No write conflict occurred, all staged deletes were successfully evaluated/executed, it
+ // is safe to clear the buffer.
+ _stagedDeletesBuffer.clear();
}
- // The elements in the buffer are preserved during document deletion so deletes can be retried
- // in case of a write conflict. No write conflict occurred, update the buffer that all documents
- // are deleted.
- _stagedDeletesBuffer.clear();
- _drainRemainingBuffer = false;
-
+ _signalIfDrainComplete();
return _tryRestoreState(out);
}
@@ -313,10 +288,10 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) {
if (!_stagedDeletesBuffer.empty()) {
// Drain the outstanding deletions.
auto ret = _deleteBatch(out);
- if (ret != NEED_TIME || (ret == NEED_TIME && _drainRemainingBuffer == true)) {
+ if (ret != NEED_TIME || (ret == NEED_TIME && _drainRemainingBuffer)) {
// Only return NEED_TIME if there is more to drain in the buffer. Otherwise,
- // there is no more to fetch and NEED_TIME signals all documents have been
- // sucessfully deleted.
+ // there is no more to fetch and NEED_TIME signals all staged deletes have
+ // been sucessfully executed.
return ret;
}
}
@@ -328,27 +303,29 @@ PlanStage::StageState BatchedDeleteStage::doWork(WorkingSetID* out) {
WorkingSetMember* member = _ws->get(id);
- // Free the WSM at the end of this scope. Retries will re-fetch by the RecordId and will not
- // need to keep the WSM around
ScopeGuard memberFreer([&] { _ws->free(id); });
-
invariant(member->hasRecordId());
- RecordId recordId = member->recordId;
// Deletes can't have projections. This means that covering analysis will always add
// a fetch. We should always get fetched data, and never just key data.
invariant(member->hasObj());
if (!_params->isExplain) {
- _stagedDeletesBuffer.push_back({recordId, member->doc.snapshotId()});
+ // Preserve the member until the delete is committed. Once a delete is staged in the
+ // buffer, its resources are freed when it is removed from the buffer.
+ memberFreer.dismiss();
+
+ // Ensure that the BSONObj underlying the WSM associated with 'id' is owned because
+ // saveState() is allowed to free the memory the BSONObj points to. The BSONObj will be
+ // needed later when it is passed to Collection::deleteDocument(). Note that the call to
+ // makeObjOwnedIfNeeded() will leave the WSM in the RID_AND_OBJ state in case we need to
+ // retry deleting it.
+ member->makeObjOwnedIfNeeded();
+ _stagedDeletesBuffer.append(id);
}
}
- if (!_params->isExplain &&
- (_drainRemainingBuffer ||
- (_batchParams->targetBatchDocs &&
- _stagedDeletesBuffer.size() >=
- static_cast<unsigned long long>(_batchParams->targetBatchDocs)))) {
+ if (!_params->isExplain && (_drainRemainingBuffer || _batchTargetMet())) {
return _deleteBatch(out);
}
@@ -366,19 +343,20 @@ PlanStage::StageState BatchedDeleteStage::_tryRestoreState(WorkingSetID* out) {
}
PlanStage::StageState BatchedDeleteStage::_prepareToRetryDrainAfterWCE(
- WorkingSetID* out, const std::set<RecordId>& recordsThatNoLongerMatch) {
- // Remove records that no longer match the query before retrying.
- _stagedDeletesBuffer.erase(std::remove_if(_stagedDeletesBuffer.begin(),
- _stagedDeletesBuffer.end(),
- [&](const auto& stagedDelete) {
- return recordsThatNoLongerMatch.find(
- stagedDelete.rid) !=
- recordsThatNoLongerMatch.end();
- }),
- _stagedDeletesBuffer.end());
+ WorkingSetID* out, const std::set<WorkingSetID>& recordsThatNoLongerMatch) {
+ _stagedDeletesBuffer.erase(recordsThatNoLongerMatch);
+ _signalIfDrainComplete();
*out = WorkingSet::INVALID_ID;
- _drainRemainingBuffer = true;
return NEED_YIELD;
}
+void BatchedDeleteStage::_signalIfDrainComplete() {
+ _drainRemainingBuffer = !_stagedDeletesBuffer.empty();
+}
+
+bool BatchedDeleteStage::_batchTargetMet() {
+ return _batchParams->targetBatchDocs &&
+ _stagedDeletesBuffer.size() >=
+ static_cast<unsigned long long>(_batchParams->targetBatchDocs);
+}
} // namespace mongo
diff --git a/src/mongo/db/exec/batched_delete_stage.h b/src/mongo/db/exec/batched_delete_stage.h
index ffb16a2564b..4aae7b0264b 100644
--- a/src/mongo/db/exec/batched_delete_stage.h
+++ b/src/mongo/db/exec/batched_delete_stage.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/exec/batched_delete_stage_buffer.h"
#include "mongo/db/exec/batched_delete_stage_gen.h"
#include "mongo/db/exec/delete_stage.h"
#include "mongo/db/exec/write_stage_common.h"
@@ -37,7 +38,7 @@
namespace mongo {
/**
- * Batch sizing parameters. A batch of documents staged for deletion is committed as soon
+ * Batch sizing parameters. A batch of staged document deletes is committed as soon
* as one of the targets below is met, or upon reaching EOF.
*/
struct BatchedDeleteStageBatchParams {
@@ -57,10 +58,9 @@ struct BatchedDeleteStageBatchParams {
};
/**
- * The BATCHED_DELETE stage deletes documents in batches, using RecordId's that are returned from
- * its child. In comparison, the base class DeleteStage deletes documents one by one. The stage
- * returns NEED_TIME after deleting a document, or after staging a document to be deleted in the
- * next batch.
+ * The BATCHED_DELETE stage deletes documents in batches. In comparison, the base class DeleteStage
+ * deletes documents one by one. The stage returns NEED_TIME after executing a batch of deletes, or
+ * after staging a delete for the next batch.
*
* Callers of work() must be holding a write lock (and, for replicated deletes, callers must have
* had the replication coordinator approve the write).
@@ -100,26 +100,24 @@ private:
// Prepares to retry draining the _stagedDeletesBuffer after a write conflict. Removes
// 'recordsThatNoLongerMatch' then yields.
PlanStage::StageState _prepareToRetryDrainAfterWCE(
- WorkingSetID* out, const std::set<RecordId>& recordsThatNoLongerMatch);
+ WorkingSetID* out, const std::set<WorkingSetID>& recordsThatNoLongerMatch);
- // Metadata of a document staged for deletion.
- struct StagedDocumentMetadata {
- RecordId rid;
+ // Either signals that all the elements in the buffer have been drained or that there are more
+ // elements to drain.
+ void _signalIfDrainComplete();
- // SnapshotId associated with the document when it is staged for deletion. Must be checked
- // before deletion to ensure the document still matches the query.
- SnapshotId snapshotId;
- };
+ // Returns true if one or more of the batch targets are met and it is time to delete the batch.
+ bool _batchTargetMet();
- // Stores documents staged for deletion.
- std::vector<StagedDocumentMetadata> _stagedDeletesBuffer;
+ // Batch targeting parameters.
+ std::unique_ptr<BatchedDeleteStageBatchParams> _batchParams;
+
+ // Holds information for each document staged for delete.
+ BatchedDeleteStageBuffer _stagedDeletesBuffer;
// Whether there are remaining docs in the buffer from a previous call to doWork() that should
// be drained before fetching more documents.
- bool _drainRemainingBuffer = false;
-
- // Batch targeting parameters.
- std::unique_ptr<BatchedDeleteStageBatchParams> _batchParams;
+ bool _drainRemainingBuffer;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/batched_delete_stage_buffer.cpp b/src/mongo/db/exec/batched_delete_stage_buffer.cpp
new file mode 100644
index 00000000000..56617ad24ad
--- /dev/null
+++ b/src/mongo/db/exec/batched_delete_stage_buffer.cpp
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/batched_delete_stage_buffer.h"
+
+#include <fmt/format.h>
+
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+using namespace fmt::literals;
+
+BatchedDeleteStageBuffer::BatchedDeleteStageBuffer(WorkingSet* ws) : _ws(ws) {}
+
+void BatchedDeleteStageBuffer::append(WorkingSetID id) {
+ _buffer.emplace_back(id);
+}
+
+void BatchedDeleteStageBuffer::eraseUpToOffsetInclusive(size_t bufferOffset) {
+ tassert(6515701,
+ "Cannot erase offset '{}' - beyond the size of the BatchedDeleteStageBuffer {}"_format(
+ bufferOffset, _buffer.size()),
+ bufferOffset < _buffer.size());
+ for (unsigned int i = 0; i <= bufferOffset; i++) {
+ auto id = _buffer.at(i);
+ _ws->free(id);
+ }
+
+ _buffer.erase(_buffer.begin(), _buffer.begin() + bufferOffset + 1);
+}
+
+void BatchedDeleteStageBuffer::erase(const std::set<WorkingSetID>& idsToRemove) {
+ for (auto& workingSetMemberId : idsToRemove) {
+ tassert(
+ 6515702,
+ "Attempted to free member with WorkingSetId '{}', which does not exist in the BatchedDeleteStageBuffer"_format(
+ workingSetMemberId),
+ std::find(_buffer.begin(), _buffer.end(), workingSetMemberId) != _buffer.end());
+
+ _ws->free(workingSetMemberId);
+ }
+
+ _buffer.erase(std::remove_if(_buffer.begin(),
+ _buffer.end(),
+ [&](auto& workingSetMemberId) {
+ return idsToRemove.find(workingSetMemberId) !=
+ idsToRemove.end();
+ }),
+ _buffer.end());
+}
+
+void BatchedDeleteStageBuffer::clear() {
+ for (auto& workingSetMemberId : _buffer) {
+ _ws->free(workingSetMemberId);
+ }
+
+ _buffer.clear();
+ invariant(empty());
+}
+} // namespace mongo
diff --git a/src/mongo/db/exec/batched_delete_stage_buffer.h b/src/mongo/db/exec/batched_delete_stage_buffer.h
new file mode 100644
index 00000000000..11624e90cab
--- /dev/null
+++ b/src/mongo/db/exec/batched_delete_stage_buffer.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/working_set_common.h"
+
+namespace mongo {
+
+/**
+ * Buffers documents staged for a batch delete. A document is represented by working set member id
+ * (WorkingSetID). Frees the documents whenever they are removed from the buffer.
+ */
+class BatchedDeleteStageBuffer {
+public:
+ BatchedDeleteStageBuffer(WorkingSet* ws);
+
+ size_t size() {
+ return _buffer.size();
+ }
+
+ bool empty() {
+ return _buffer.empty();
+ }
+
+ void append(WorkingSetID id);
+
+ /**
+ * Returns the WorkingSetID associated with the staged document at 'bufferOffset'.
+ */
+ WorkingSetID at(size_t bufferOffset) {
+ return _buffer.at(bufferOffset);
+ }
+
+ /**
+ * Erases up to 'bufferOffset' where 'bufferOffset' is inclusive. Frees up resources of
+ * WorkingSetMembers associated with the removed entries.
+ */
+ void eraseUpToOffsetInclusive(size_t bufferOffset);
+
+ /**
+ * Erases the subset of 'idsToRemove' that exist in the buffer. Frees up resources of the
+ * WorkingSetMembers associated with the removed entries.
+ */
+ void erase(const std::set<WorkingSetID>& idsToRemove);
+
+ /**
+ * Clears the buffer and frees up resources of the WorkingSetMembers associated with the removed
+ * entries.
+ */
+ void clear();
+
+
+private:
+ WorkingSet* _ws;
+ std::vector<WorkingSetID> _buffer;
+};
+
+} // namespace mongo