diff options
author | Denis Grebennicov <denis.grebennicov@mongodb.com> | 2022-03-30 13:50:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-30 14:44:30 +0000 |
commit | b52ac902f140dba84dd104f76153f3bd08077a97 (patch) | |
tree | c5ed0a820803e021d61d845247695bdd48b3b847 | |
parent | d4206bdeb19f4c5862280c04cd6b831c9ec03a40 (diff) | |
download | mongo-b52ac902f140dba84dd104f76153f3bd08077a97.tar.gz |
SERVER-61339 Improve change stream pre-image purging job to leverage efficient multi-deletes
-rw-r--r-- | jstests/noPassthrough/change_streams_pre_image_removal_job.js | 157 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_preimage.idl | 8 | ||||
-rw-r--r-- | src/mongo/db/query/internal_plans.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/query/internal_plans.h | 6 |
5 files changed, 148 insertions, 84 deletions
diff --git a/jstests/noPassthrough/change_streams_pre_image_removal_job.js b/jstests/noPassthrough/change_streams_pre_image_removal_job.js index 98282febf4b..7b87be92ca7 100644 --- a/jstests/noPassthrough/change_streams_pre_image_removal_job.js +++ b/jstests/noPassthrough/change_streams_pre_image_removal_job.js @@ -13,7 +13,7 @@ "use strict"; load('jstests/replsets/rslib.js'); // For getLatestOp, getFirstOplogEntry. -load("jstests/libs/collection_drop_recreate.js"); // For assertCreateCollection. +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. const docA = { _id: 12345, @@ -31,86 +31,119 @@ const preImagesCollectionDatabase = "config"; const preImagesCollectionName = "system.preimages"; const oplogSizeMB = 1; -// Returns documents from the pre-images collection from 'node'. -function getPreImages(node) { - return node.getDB(preImagesCollectionDatabase)[preImagesCollectionName].find().toArray(); -} - // Set up the replica set with two nodes and two collections with 'changeStreamPreAndPostImages' // enabled and run expired pre-image removal job every second. const rst = new ReplSetTest({nodes: 2, oplogSize: oplogSizeMB}); rst.startSet({setParameter: {expiredChangeStreamPreImageRemovalJobSleepSecs: 1}}); rst.initiate(); +const largeStr = 'abcdefghi'.repeat(4 * 1024); const primaryNode = rst.getPrimary(); const testDB = primaryNode.getDB(jsTestName()); const localDB = primaryNode.getDB("local"); -const collA = - assertCreateCollection(testDB, "collA", {changeStreamPreAndPostImages: {enabled: true}}); -const collB = - assertCreateCollection(testDB, "collB", {changeStreamPreAndPostImages: {enabled: true}}); - -// Pre-images collection must be empty. -let preImages = getPreImages(primaryNode); -assert.eq(preImages.length, 0, preImages); - -// Perform insert and update operations. -for (const coll of [collA, collB]) { - assert.commandWorked(coll.insert(docA, {writeConcern: {w: "majority"}})); - assert.commandWorked(coll.update(docA, {$inc: {version: 1}})); - assert.commandWorked(coll.update(docB, {$inc: {version: 1}})); -} -// Pre-images collection should contain four pre-images. -preImages = getPreImages(primaryNode); -const preImagesToExpire = 4; -assert.eq(preImages.length, preImagesToExpire, preImages); - -// Roll over all current oplog entries. -const lastOplogEntryToBeRemoved = getLatestOp(primaryNode); -assert.neq(lastOplogEntryToBeRemoved, null); -const largeStr = 'abcdefghi'.repeat(4 * 1024); +// Returns documents from the pre-images collection from 'node'. +function getPreImages(node) { + return node.getDB(preImagesCollectionDatabase)[preImagesCollectionName].find().toArray(); +} // Checks if the oplog has been rolled over from the timestamp of -// 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater -// than the timestamp of the 'lastOplogEntryToBeRemoved' on each node of the replica set. -function oplogIsRolledOver() { +// 'lastOplogEntryTsToBeRemoved', ie. the timestamp of the first entry in the oplog is greater +// than the 'lastOplogEntryTsToBeRemoved' on each node of the replica set. +function oplogIsRolledOver(lastOplogEntryTsToBeRemoved) { return [primaryNode, rst.getSecondary()].every( - (node) => timestampCmp(lastOplogEntryToBeRemoved.ts, + (node) => timestampCmp(lastOplogEntryTsToBeRemoved, getFirstOplogEntry(node, {readConcern: "majority"}).ts) <= 0); } -while (!oplogIsRolledOver()) { - assert.commandWorked(collA.insert({long_str: largeStr}, {writeConcern: {w: "majority"}})); +// Tests that the pre-image removal job deletes only the expired pre-images by performing four +// updates leading to four pre-images being recorded, then the oplog is rolled over, removing the +// oplog entries of the previously recorded pre-images. Afterwards two updates are performed and +// therefore two new pre-images are recorded. The pre-images removal job must remove only the first +// four recorded pre-images. +// 'batchedDelete' determines whether pre-images will be removed in batches or document-by-document. +function testPreImageRemovalJob(batchedDelete) { + // Roll over the oplog, leading to 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job + // deleting all pre-images. + let lastOplogEntryToBeRemoved = getLatestOp(primaryNode); + while (!oplogIsRolledOver(lastOplogEntryToBeRemoved.ts)) { + assert.commandWorked( + testDB.tmp.insert({long_str: largeStr}, {writeConcern: {w: "majority"}})); + } + assert.soon(() => getPreImages(primaryNode).length == 0); + + // Set the 'batchedExpiredChangeStreamPreImageRemoval'. + assert.commandWorked(primaryNode.adminCommand( + {setParameter: 1, batchedExpiredChangeStreamPreImageRemoval: batchedDelete})); + + // Drop and recreate the collections with pre-images recording. + const collA = assertDropAndRecreateCollection( + testDB, "collA", {changeStreamPreAndPostImages: {enabled: true}}); + const collB = assertDropAndRecreateCollection( + testDB, "collB", {changeStreamPreAndPostImages: {enabled: true}}); + + // Perform insert and update operations. + for (const coll of [collA, collB]) { + assert.commandWorked(coll.insert(docA, {writeConcern: {w: "majority"}})); + assert.commandWorked(coll.update(docA, {$inc: {version: 1}})); + assert.commandWorked(coll.update(docB, {$inc: {version: 1}})); + } + + // Pre-images collection should contain four pre-images. + let preImages = getPreImages(primaryNode); + const preImagesToExpire = 4; + assert.eq(preImages.length, preImagesToExpire, preImages); + + // Roll over all current oplog entries. + lastOplogEntryToBeRemoved = getLatestOp(primaryNode); + assert.neq(lastOplogEntryToBeRemoved, null); + + // Checks if the oplog has been rolled over from the timestamp of + // 'lastOplogEntryToBeRemoved', ie. the timestamp of the first entry in the oplog is greater + // than the timestamp of the 'lastOplogEntryToBeRemoved' on each node of the replica set. + while (!oplogIsRolledOver(lastOplogEntryToBeRemoved.ts)) { + assert.commandWorked(collA.insert({long_str: largeStr}, {writeConcern: {w: "majority"}})); + } + + // Perform update operations that insert new pre-images that are not expired yet. + for (const coll of [collA, collB]) { + assert.commandWorked(coll.update(docC, {$inc: {version: 1}})); + } + + // Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job will delete the expired + // pre-images. + assert.soon(() => { + // Only two pre-images should still be there, as their timestamp is greater than the oldest + // oplog entry timestamp. + preImages = getPreImages(primaryNode); + const onlyTwoPreImagesLeft = preImages.length == 2; + const allPreImagesHaveBiggerTimestamp = preImages.every( + preImage => timestampCmp(preImage._id.ts, lastOplogEntryToBeRemoved.ts) == 1); + return onlyTwoPreImagesLeft && allPreImagesHaveBiggerTimestamp; + }); + + // Because the pre-images collection is implicitly replicated, validate that writes do not + // generate oplog entries, with the exception of deletions. + const preimagesNs = 'config.system.preimages'; + assert.eq(preImagesToExpire, localDB.oplog.rs.find({op: 'd', ns: preimagesNs}).itcount()); + assert.eq(0, localDB.oplog.rs.find({op: {'$ne': 'd'}, ns: preimagesNs}).itcount()); + + if (batchedDelete) { + const serverStatusBatches = testDB.serverStatus()['batchedDeletes']['batches']; + const serverStatusDocs = testDB.serverStatus()['batchedDeletes']['docs']; + assert.eq(serverStatusBatches, 2); + assert.eq(serverStatusDocs, preImagesToExpire); + } + + // Verify that pre-images collection content on the primary node is the same as on the + // secondary. + rst.awaitReplication(); + assert(bsonWoCompare(getPreImages(primaryNode), getPreImages(rst.getSecondary())) === 0); } -// Perform update operations that insert new pre-images that are not expired yet. -for (const coll of [collA, collB]) { - assert.commandWorked(coll.update(docC, {$inc: {version: 1}})); +for (const batchedDelete of [false, true]) { + testPreImageRemovalJob(batchedDelete); } -// Wait until 'PeriodicChangeStreamExpiredPreImagesRemover' periodic job will delete the expired -// pre-images. -assert.soon(() => { - // Only two pre-images should still be there, as their timestamp is greater than the oldest - // oplog entry timestamp. - preImages = getPreImages(primaryNode); - const onlyTwoPreImagesLeft = preImages.length == 2; - const allPreImagesHaveBiggerTimestamp = preImages.every( - preImage => timestampCmp(preImage._id.ts, lastOplogEntryToBeRemoved.ts) == 1); - return onlyTwoPreImagesLeft && allPreImagesHaveBiggerTimestamp; -}); - -// Because the pre-images collection is implicitly replicated, validate that writes do not generate -// oplog entries, with the exception of deletions. -const preimagesNs = 'config.system.preimages'; -assert.eq(preImagesToExpire, localDB.oplog.rs.find({op: 'd', ns: preimagesNs}).itcount()); -assert.eq(0, localDB.oplog.rs.find({op: {'$ne': 'd'}, ns: preimagesNs}).itcount()); - -// Verify that pre-images collection content on the primary node is the same as on the -// secondary. -rst.awaitReplication(); -assert(bsonWoCompare(getPreImages(primaryNode), getPreImages(rst.getSecondary())) === 0); - // Increase oplog size on each node to prevent oplog entries from being deleted which removes a // risk of replica set consistency check failure during tear down of the replica set. const largeOplogSizeMB = 1000; diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index 91f99d83668..ed65baf5abb 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -46,6 +46,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" +#include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/logv2/log.h" #include "mongo/util/background.h" #include "mongo/util/concurrency/idle_thread_block.h" @@ -344,6 +345,9 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim repl::StorageInterface::get(client->getServiceContext()) ->getEarliestOplogTimestamp(opCtx.get()); + const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load(); + const bool isMultiDeletesFeatureFlagEnabled = + feature_flags::gBatchMultiDeletes.isEnabled(serverGlobalParams.featureCompatibility); size_t numberOfRemovals = 0; ChangeStreamExpiredPreImageIterator expiredPreImages( @@ -354,23 +358,30 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim opCtx.get(), currentTimeForTimeBasedExpiration)); for (const auto& collectionRange : expiredPreImages) { - writeConflictRetry(opCtx.get(), - "ChangeStreamExpiredPreImagesRemover", - NamespaceString::kChangeStreamPreImagesNamespace.ns(), - [&] { - auto params = std::make_unique<DeleteStageParams>(); - params->isMulti = true; - - const auto exec = InternalPlanner::deleteWithCollectionScan( - opCtx.get(), - &preImagesColl, - std::move(params), - PlanYieldPolicy::YieldPolicy::YIELD_AUTO, - InternalPlanner::Direction::FORWARD, - RecordIdBound(collectionRange.first), - RecordIdBound(collectionRange.second)); - numberOfRemovals += exec->executeDelete(); - }); + writeConflictRetry( + opCtx.get(), + "ChangeStreamExpiredPreImagesRemover", + NamespaceString::kChangeStreamPreImagesNamespace.ns(), + [&] { + auto params = std::make_unique<DeleteStageParams>(); + params->isMulti = true; + + boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams; + if (isMultiDeletesFeatureFlagEnabled && isBatchedRemoval) { + batchParams = std::make_unique<BatchedDeleteStageBatchParams>(); + } + + auto exec = InternalPlanner::deleteWithCollectionScan( + opCtx.get(), + &preImagesColl, + std::move(params), + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + InternalPlanner::Direction::FORWARD, + RecordIdBound(collectionRange.first), + RecordIdBound(collectionRange.second), + std::move(batchParams)); + numberOfRemovals += exec->executeDelete(); + }); } LOGV2(5869104, diff --git a/src/mongo/db/pipeline/change_stream_preimage.idl b/src/mongo/db/pipeline/change_stream_preimage.idl index f23637192ef..4d3fa3e3b73 100644 --- a/src/mongo/db/pipeline/change_stream_preimage.idl +++ b/src/mongo/db/pipeline/change_stream_preimage.idl @@ -45,6 +45,14 @@ server_parameters: gte: 1 default: 10 + batchedExpiredChangeStreamPreImageRemoval: + description: >- + Specifies if expired pre-images are removed in batches rather than document-by-document. + set_at: [ startup, runtime ] + cpp_vartype: AtomicWord<bool> + cpp_varname: gBatchedExpiredChangeStreamPreImageRemoval + default: true + structs: ChangeStreamPreImageId: description: Uniquely identifies a pre-image for a given node or replica set. diff --git a/src/mongo/db/query/internal_plans.cpp b/src/mongo/db/query/internal_plans.cpp index 81ffba55a4e..c6128069392 100644 --- a/src/mongo/db/query/internal_plans.cpp +++ b/src/mongo/db/query/internal_plans.cpp @@ -207,7 +207,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith PlanYieldPolicy::YieldPolicy yieldPolicy, Direction direction, boost::optional<RecordIdBound> minRecord, - boost::optional<RecordIdBound> maxRecord) { + boost::optional<RecordIdBound> maxRecord, + boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams) { const auto& collection = *coll; invariant(collection); auto ws = std::make_unique<WorkingSet>(); @@ -221,8 +222,17 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith auto root = _collectionScan( expCtx, ws.get(), &collection, collScanParams, true /* relaxCappedConstraints */); - root = std::make_unique<DeleteStage>( - expCtx.get(), std::move(params), ws.get(), collection, root.release()); + if (batchParams) { + root = std::make_unique<BatchedDeleteStage>(expCtx.get(), + std::move(params), + std::move(*batchParams), + ws.get(), + collection, + root.release()); + } else { + root = std::make_unique<DeleteStage>( + expCtx.get(), std::move(params), ws.get(), collection, root.release()); + } auto executor = plan_executor_factory::make(expCtx, std::move(ws), diff --git a/src/mongo/db/query/internal_plans.h b/src/mongo/db/query/internal_plans.h index 967f9c3b073..79614b09233 100644 --- a/src/mongo/db/query/internal_plans.h +++ b/src/mongo/db/query/internal_plans.h @@ -31,6 +31,7 @@ #include "mongo/base/string_data.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/exec/batched_delete_stage.h" #include "mongo/db/exec/delete_stage.h" #include "mongo/db/query/index_bounds.h" #include "mongo/db/query/plan_executor.h" @@ -89,7 +90,7 @@ public: PlanYieldPolicy::YieldPolicy yieldPolicy); /** - * Returns a FETCH => DELETE plan. + * Returns a FETCH => DELETE plan, or a FETCH => BATCHED_DELETE plan if 'batchParams' is set. */ static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> deleteWithCollectionScan( OperationContext* opCtx, @@ -98,7 +99,8 @@ public: PlanYieldPolicy::YieldPolicy yieldPolicy, Direction direction = FORWARD, boost::optional<RecordIdBound> minRecord = boost::none, - boost::optional<RecordIdBound> maxRecord = boost::none); + boost::optional<RecordIdBound> maxRecord = boost::none, + boost::optional<std::unique_ptr<BatchedDeleteStageBatchParams>> batchParams = boost::none); /** * Returns an index scan. Caller owns returned pointer. |