summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2022-03-30 13:50:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-30 14:44:30 +0000
commitb52ac902f140dba84dd104f76153f3bd08077a97 (patch)
treec5ed0a820803e021d61d845247695bdd48b3b847
parentd4206bdeb19f4c5862280c04cd6b831c9ec03a40 (diff)
downloadmongo-b52ac902f140dba84dd104f76153f3bd08077a97.tar.gz
SERVER-61339 Improve change stream pre-image purging job to leverage efficient multi-deletes
-rw-r--r--jstests/noPassthrough/change_streams_pre_image_removal_job.js157
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp45
-rw-r--r--src/mongo/db/pipeline/change_stream_preimage.idl8
-rw-r--r--src/mongo/db/query/internal_plans.cpp16
-rw-r--r--src/mongo/db/query/internal_plans.h6
5 files changed, 148 insertions, 84 deletions
diff --git a/jstests/noPassthrough/change_streams_pre_image_removal_job.js b/jstests/noPassthrough/change_streams_pre_image_removal_job.js
index 98282febf4b..7b87be92ca7 100644
--- a/jstests/noPassthrough/change_streams_pre_image_removal_job.js
+++ b/jstests/noPassthrough/change_streams_pre_image_removal_job.js
@@ -13,7 +13,7 @@
"use strict";
load('jstests/replsets/rslib.js'); // For getLatestOp, getFirstOplogEntry.
-load("jstests/libs/collection_drop_recreate.js"); // For assertCreateCollection.
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
const docA = {
_id: 12345,
@@ -31,86 +31,119 @@ const preImagesCollectionDatabase = "config";
const preImagesCollectionName = "system.preimages";
const oplogSizeMB = 1;
-// Returns documents from the pre-images collection from 'node'.
-function getPreImages(node) {
- return node.getDB(preImagesCollectionDatabase)[preImagesCollectionName].find().toArray();
-}
-
// Set up the replica set with two nodes and two collections with 'changeStreamPreAndPostImages'
// enabled and run expired pre-image removal job every second.
const rst = new ReplSetTest({nodes: 2, oplogSize: oplogSizeMB});
rst.startSet({setParameter: {expiredChangeStreamPreImageRemovalJobSleepSecs: 1}});
rst.initiate();
+const largeStr = 'abcdefghi'.repeat(4 * 1024);
const primaryNode = rst.getPrimary();
const testDB = primaryNode.getDB(jsTestName());
const localDB = primaryNode.getDB("local");
-const collA =
- assertCreateCollection(testDB, "collA", {changeStreamPreAndPostImages: {enabled: true}});
-const collB =
- assertCreateCollection(testDB, "collB", {changeStreamPreAndPostImages: {enabled: true}});
-
-// Pre-images collection must be empty.
-let preImages = getPreImages(primaryNode);
-assert.eq(preImages.length, 0, preImages);
-
-// Perform insert and update operations.
-for (const coll of [collA, collB]) {
- assert.commandWorked(coll.insert(docA, {writeConcern: {w: "majority"}}));
- assert.commandWorked(coll.update(docA, {$inc: {version: 1}}));
- assert.commandWorked(coll.update(docB, {$inc: {version: 1}}));
-}
-// Pre-images collection should contain four pre-images.
-preImages = getPreImages(primaryNode);
-const preImagesToExpire = 4;
-assert.eq(preImages.length, preImagesToExpire, preImages);
-
-// Roll over all current oplog entries.
-const lastOplogEntryToBeRemoved = getLatestOp(primaryNode);
-assert.neq(lastOplogEntryToBeRemoved, null);
-const largeStr = 'abcdefghi'.repeat(4 * 1024);
+// Returns documents from the pre-images collection from 'node'.
+function getPreImages(node) {
+ return node.getDB(preImagesCollectionDatabase)[preImagesCollectionName].find().toArray();
+}
// Checks if the oplog has been rolled over from the timestamp of
-// 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater
-// than the timestamp of the 'lastOplogEntryToBeRemoved' on each node of the replica set.
-function oplogIsRolledOver() {
+// 'lastOplogEntryTsToBeRemoved', ie. the timestamp of the first entry in the oplog is greater
+// than the 'lastOplogEntryTsToBeRemoved' on each node of the replica set.
+function oplogIsRolledOver(lastOplogEntryTsToBeRemoved) {
return [primaryNode, rst.getSecondary()].every(
- (node) => timestampCmp(lastOplogEntryToBeRemoved.ts,
+ (node) => timestampCmp(lastOplogEntryTsToBeRemoved,
getFirstOplogEntry(node, {readConcern: "majority"}).ts) <= 0);
}
-while (!oplogIsRolledOver()) {
- assert.commandWorked(collA.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
+// Tests that the pre-image removal job deletes only the expired pre-images by performing four
+// updates leading to four pre-images being recorded, then the oplog is rolled over, removing the
+// oplog entries of the previously recorded pre-images. Afterwards two updates are performed and
+// therefore two new pre-images are recorded. The pre-images removal job must remove only the first
+// four recorded pre-images.
+// 'batchedDelete' determines whether pre-images will be removed in batches or document-by-document.
+function testPreImageRemovalJob(batchedDelete) {
+ // Roll over the oplog, leading to 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job
+ // deleting all pre-images.
+ let lastOplogEntryToBeRemoved = getLatestOp(primaryNode);
+ while (!oplogIsRolledOver(lastOplogEntryToBeRemoved.ts)) {
+ assert.commandWorked(
+ testDB.tmp.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
+ }
+ assert.soon(() => getPreImages(primaryNode).length == 0);
+
+ // Set the 'batchedExpiredChangeStreamPreImageRemoval'.
+ assert.commandWorked(primaryNode.adminCommand(
+ {setParameter: 1, batchedExpiredChangeStreamPreImageRemoval: batchedDelete}));
+
+ // Drop and recreate the collections with pre-images recording.
+ const collA = assertDropAndRecreateCollection(
+ testDB, "collA", {changeStreamPreAndPostImages: {enabled: true}});
+ const collB = assertDropAndRecreateCollection(
+ testDB, "collB", {changeStreamPreAndPostImages: {enabled: true}});
+
+ // Perform insert and update operations.
+ for (const coll of [collA, collB]) {
+ assert.commandWorked(coll.insert(docA, {writeConcern: {w: "majority"}}));
+ assert.commandWorked(coll.update(docA, {$inc: {version: 1}}));
+ assert.commandWorked(coll.update(docB, {$inc: {version: 1}}));
+ }
+
+ // Pre-images collection should contain four pre-images.
+ let preImages = getPreImages(primaryNode);
+ const preImagesToExpire = 4;
+ assert.eq(preImages.length, preImagesToExpire, preImages);
+
+ // Roll over all current oplog entries.
+ lastOplogEntryToBeRemoved = getLatestOp(primaryNode);
+ assert.neq(lastOplogEntryToBeRemoved, null);
+
+ // Checks if the oplog has been rolled over from the timestamp of
+ // 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater
+ // than the timestamp of the 'lastOplogEntryToBeRemoved' on each node of the replica set.
+ while (!oplogIsRolledOver(lastOplogEntryToBeRemoved.ts)) {
+ assert.commandWorked(collA.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
+ }
+
+ // Perform update operations that insert new pre-images that are not expired yet.
+ for (const coll of [collA, collB]) {
+ assert.commandWorked(coll.update(docC, {$inc: {version: 1}}));
+ }
+
+ // Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job will delete the expired
+ // pre-images.
+ assert.soon(() => {
+ // Only two pre-images should still be there, as their timestamp is greater than the oldest
+ // oplog entry timestamp.
+ preImages = getPreImages(primaryNode);
+ const onlyTwoPreImagesLeft = preImages.length == 2;
+ const allPreImagesHaveBiggerTimestamp = preImages.every(
+ preImage => timestampCmp(preImage._id.ts, lastOplogEntryToBeRemoved.ts) == 1);
+ return onlyTwoPreImagesLeft && allPreImagesHaveBiggerTimestamp;
+ });
+
+ // Because the pre-images collection is implicitly replicated, validate that writes do not
+ // generate oplog entries, with the exception of deletions.
+ const preimagesNs = 'config.system.preimages';
+ assert.eq(preImagesToExpire, localDB.oplog.rs.find({op: 'd', ns: preimagesNs}).itcount());
+ assert.eq(0, localDB.oplog.rs.find({op: {'$ne': 'd'}, ns: preimagesNs}).itcount());
+
+ if (batchedDelete) {
+ const serverStatusBatches = testDB.serverStatus()['batchedDeletes']['batches'];
+ const serverStatusDocs = testDB.serverStatus()['batchedDeletes']['docs'];
+ assert.eq(serverStatusBatches, 2);
+ assert.eq(serverStatusDocs, preImagesToExpire);
+ }
+
+ // Verify that pre-images collection content on the primary node is the same as on the
+ // secondary.
+ rst.awaitReplication();
+ assert(bsonWoCompare(getPreImages(primaryNode), getPreImages(rst.getSecondary())) === 0);
}
-// Perform update operations that insert new pre-images that are not expired yet.
-for (const coll of [collA, collB]) {
- assert.commandWorked(coll.update(docC, {$inc: {version: 1}}));
+for (const batchedDelete of [false, true]) {
+ testPreImageRemovalJob(batchedDelete);
}
-// Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job will delete the expired
-// pre-images.
-assert.soon(() => {
- // Only two pre-images should still be there, as their timestamp is greater than the oldest
- // oplog entry timestamp.
- preImages = getPreImages(primaryNode);
- const onlyTwoPreImagesLeft = preImages.length == 2;
- const allPreImagesHaveBiggerTimestamp = preImages.every(
- preImage => timestampCmp(preImage._id.ts, lastOplogEntryToBeRemoved.ts) == 1);
- return onlyTwoPreImagesLeft && allPreImagesHaveBiggerTimestamp;
-});
-
-// Because the pre-images collection is implicitly replicated, validate that writes do not generate
-// oplog entries, with the exception of deletions.
-const preimagesNs = 'config.system.preimages';
-assert.eq(preImagesToExpire, localDB.oplog.rs.find({op: 'd', ns: preimagesNs}).itcount());
-assert.eq(0, localDB.oplog.rs.find({op: {'$ne': 'd'}, ns: preimagesNs}).itcount());
-
-// Verify that pre-images collection content on the primary node is the same as on the
-// secondary.
-rst.awaitReplication();
-assert(bsonWoCompare(getPreImages(primaryNode), getPreImages(rst.getSecondary())) === 0);
-
// Increase oplog size on each node to prevent oplog entries from being deleted which removes a
// risk of replica set consistency check failure during tear down of the replica set.
const largeOplogSizeMB = 1000;
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
index 91f99d83668..ed65baf5abb 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/util/background.h"
#include "mongo/util/concurrency/idle_thread_block.h"
@@ -344,6 +345,9 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
repl::StorageInterface::get(client->getServiceContext())
->getEarliestOplogTimestamp(opCtx.get());
+ const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
+ const bool isMultiDeletesFeatureFlagEnabled =
+ feature_flags::gBatchMultiDeletes.isEnabled(serverGlobalParams.featureCompatibility);
size_t numberOfRemovals = 0;
ChangeStreamExpiredPreImageIterator expiredPreImages(
@@ -354,23 +358,30 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
opCtx.get(), currentTimeForTimeBasedExpiration));
for (const auto& collectionRange : expiredPreImages) {
- writeConflictRetry(opCtx.get(),
- "ChangeStreamExpiredPreImagesRemover",
- NamespaceString::kChangeStreamPreImagesNamespace.ns(),
- [&] {
- auto params = std::make_unique<DeleteStageParams>();
- params->isMulti = true;
-
- const auto exec = InternalPlanner::deleteWithCollectionScan(
- opCtx.get(),
- &preImagesColl,
- std::move(params),
- PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
- InternalPlanner::Direction::FORWARD,
- RecordIdBound(collectionRange.first),
- RecordIdBound(collectionRange.second));
- numberOfRemovals += exec->executeDelete();
- });
+ writeConflictRetry(
+ opCtx.get(),
+ "ChangeStreamExpiredPreImagesRemover",
+ NamespaceString::kChangeStreamPreImagesNamespace.ns(),
+ [&] {
+ auto params = std::make_unique<DeleteStageParams>();
+ params->isMulti = true;
+
+ boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams;
+ if (isMultiDeletesFeatureFlagEnabled && isBatchedRemoval) {
+ batchParams = std::make_unique<BatchedDeleteStageBatchParams>();
+ }
+
+ auto exec = InternalPlanner::deleteWithCollectionScan(
+ opCtx.get(),
+ &preImagesColl,
+ std::move(params),
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD,
+ RecordIdBound(collectionRange.first),
+ RecordIdBound(collectionRange.second),
+ std::move(batchParams));
+ numberOfRemovals += exec->executeDelete();
+ });
}
LOGV2(5869104,
diff --git a/src/mongo/db/pipeline/change_stream_preimage.idl b/src/mongo/db/pipeline/change_stream_preimage.idl
index f23637192ef..4d3fa3e3b73 100644
--- a/src/mongo/db/pipeline/change_stream_preimage.idl
+++ b/src/mongo/db/pipeline/change_stream_preimage.idl
@@ -45,6 +45,14 @@ server_parameters:
gte: 1
default: 10
+ batchedExpiredChangeStreamPreImageRemoval:
+ description: >-
+ Specifies if expired pre-images are removed in batches rather than document-by-document.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<bool>
+ cpp_varname: gBatchedExpiredChangeStreamPreImageRemoval
+ default: true
+
structs:
ChangeStreamPreImageId:
description: Uniquely identifies a pre-image for a given node or replica set.
diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp
index 81ffba55a4e..c6128069392 100644
--- a/src/mongo/db/query/internal_plans.cpp
+++ b/src/mongo/db/query/internal_plans.cpp
@@ -207,7 +207,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith
PlanYieldPolicy::YieldPolicy yieldPolicy,
Direction direction,
boost::optional<RecordIdBound> minRecord,
- boost::optional<RecordIdBound> maxRecord) {
+ boost::optional<RecordIdBound> maxRecord,
+ boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams) {
const auto& collection = *coll;
invariant(collection);
auto ws = std::make_unique<WorkingSet>();
@@ -221,8 +222,17 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith
auto root = _collectionScan(
expCtx, ws.get(), &collection, collScanParams, true /* relaxCappedConstraints */);
- root = std::make_unique<DeleteStage>(
- expCtx.get(), std::move(params), ws.get(), collection, root.release());
+ if (batchParams) {
+ root = std::make_unique<BatchedDeleteStage>(expCtx.get(),
+ std::move(params),
+ std::move(*batchParams),
+ ws.get(),
+ collection,
+ root.release());
+ } else {
+ root = std::make_unique<DeleteStage>(
+ expCtx.get(), std::move(params), ws.get(), collection, root.release());
+ }
auto executor = plan_executor_factory::make(expCtx,
std::move(ws),
diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h
index 967f9c3b073..79614b09233 100644
--- a/src/mongo/db/query/internal_plans.h
+++ b/src/mongo/db/query/internal_plans.h
@@ -31,6 +31,7 @@
#include "mongo/base/string_data.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/exec/batched_delete_stage.h"
#include "mongo/db/exec/delete_stage.h"
#include "mongo/db/query/index_bounds.h"
#include "mongo/db/query/plan_executor.h"
@@ -89,7 +90,7 @@ public:
PlanYieldPolicy::YieldPolicy yieldPolicy);
/**
- * Returns a FETCH => DELETE plan.
+ * Returns a FETCH => DELETE plan, or a FETCH => BATCHED_DELETE plan if 'batchParams' is set.
*/
static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> deleteWithCollectionScan(
OperationContext* opCtx,
@@ -98,7 +99,8 @@ public:
PlanYieldPolicy::YieldPolicy yieldPolicy,
Direction direction = FORWARD,
boost::optional<RecordIdBound> minRecord = boost::none,
- boost::optional<RecordIdBound> maxRecord = boost::none);
+ boost::optional<RecordIdBound> maxRecord = boost::none,
+ boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams = boost::none);
/**
* Returns an index scan. Caller owns returned pointer.