diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2022-01-20 20:54:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-15 00:27:46 +0000 |
commit | 5c03327ab12e2ff0e1d49ce8a4517623dd8cc7ba (patch) | |
tree | 998fbeb55847b412fa4615aff9ec03ec15f9d13f | |
parent | 792eaed783453a5f840112919fc0adb628961d24 (diff) | |
download | mongo-5c03327ab12e2ff0e1d49ce8a4517623dd8cc7ba.tar.gz |
SERVER-58693 Implement deletion of expired pre-images.
-rw-r--r-- | jstests/libs/fail_point_util.js | 4 | ||||
-rw-r--r-- | jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js | 206 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp | 68 |
4 files changed, 269 insertions, 10 deletions
diff --git a/jstests/libs/fail_point_util.js b/jstests/libs/fail_point_util.js index d8592a653fb..c01c1d593e0 100644 --- a/jstests/libs/fail_point_util.js +++ b/jstests/libs/fail_point_util.js @@ -23,13 +23,13 @@ configureFailPoint = function(conn, failPointName, data = {}, failPointMode = "a {configureFailPoint: failPointName, mode: failPointMode, data: data})) .count, wait: - function(maxTimeMS = kDefaultWaitForFailPointTimeout) { + function(maxTimeMS = kDefaultWaitForFailPointTimeout, timesEntered = 1) { // Can only be called once because this function does not keep track of the // number of times the fail point is entered between the time it returns // and the next time it gets called. assert.commandWorked(conn.adminCommand({ waitForFailPoint: failPointName, - timesEntered: this.timesEntered + 1, + timesEntered: this.timesEntered + timesEntered, maxTimeMS: maxTimeMS })); }, diff --git a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js new file mode 100644 index 00000000000..6684f16742f --- /dev/null +++ b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js @@ -0,0 +1,206 @@ +// Tests time-based pre-image retention policy of change stream pre-images remover job. +// @tags: [ +// requires_fcv_53, +// featureFlagChangeStreamPreAndPostImages, +// featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy, +// ] +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); // For configureFailPoint. + +// Helper to verify if expected pre-images are present in pre-image collection. +function verifyPreImages(preImageColl, expectedPreImages, collectionsInfo) { + const preImageDocs = preImageColl.find().toArray(); + + assert.eq(preImageDocs.length, expectedPreImages.length, preImageDocs); + + for (let idx = 0; idx < preImageDocs.length; idx++) { + const [collIdx, preImageId] = expectedPreImages[idx]; + const nsUUID = collectionsInfo[collIdx]["info"].uuid; + + assert.eq(preImageDocs[idx]._id.nsUUID, + nsUUID, + "pre-image in collection: " + tojson(preImageDocs[idx]) + + ", expected collIdx: " + collIdx + ", docIdx: " + idx); + + assert.eq(preImageDocs[idx].preImage._id, + preImageId, + "pre-image in collection: " + tojson(preImageDocs[idx]) + + ", expected _id: " + preImageId); + } +} + +// Tests time-based change stream pre-image retention policy. +function testTimeBasedPreImageRetentionPolicy(conn, primary) { + // Annotations for pre-images that define if pre-image is expected to expire or not. + const shouldExpire = "shouldExpire"; + const shouldRetain = "shouldRetain"; + + // Each element defines a sequence of documents belonging to one collection. + // Each documents has has associated status - 'expire' or 'retain'. This structure models the + // state of the pre-images associated with a particular document of the collection, ie. the + // pre-images corresponding to the documents 'shouldExpire' will be deleted by the remover job + // and those with 'shouldRetain' will be retained. + const docsStatePerCollection = [ + [shouldRetain], + [shouldExpire], + [shouldRetain, shouldExpire], + [shouldExpire, shouldRetain], + [shouldRetain, shouldRetain], + [shouldExpire, shouldExpire] + ]; + + const collectionCount = docsStatePerCollection.length; + const testDB = conn.getDB("test"); + + // Create several collections with pre- and post-images enabled. + for (let collIdx = 0; collIdx < collectionCount; collIdx++) { + const collName = "coll" + collIdx; + assert.commandWorked( + testDB.createCollection(collName, {changeStreamPreAndPostImages: {enabled: true}})); + } + + // Get the collections information and sort them by uuid. The pre-image documents are naturally + // sorted first by uuid and then by timestamp in a pre-images collection. Sorting of collection + // helps in setting up of the pre-images in the required order. The 'collectionsInfo' has + // one-to-one mapping with 'docsStatePerCollection'. + let collectionsInfo = testDB.getCollectionInfos(); + assert.eq(collectionsInfo.length, collectionCount); + collectionsInfo.sort((coll1, coll2) => { + return coll1.info.uuid <= coll2.info.uuid ? -1 : 1; + }); + + // Disable pre-image time-based expiration policy. + assert.commandWorked(conn.getDB("admin").runCommand( + {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: "off"}})); + + let shouldRetainDocs = []; + let shouldExpireDocs = []; + let allDocs = []; + + // Insert documents to each collection. Iterate through the documents and group them as + // 'shouldExpire' or 'shouldRetain' based on the document states. Each element of these groups + // is an array, where the first element is the collection index and the second is the document + // index. + for (let collIdx = 0; collIdx < collectionCount; collIdx++) { + const collName = collectionsInfo[collIdx]["name"]; + const coll = testDB.getCollection(collName); + const docs = docsStatePerCollection[collIdx]; + + for (let docIdx = 0; docIdx < docs.length; docIdx++) { + assert.commandWorked(coll.insert({_id: docIdx}, {$set: {documentState: "inserted"}})); + + const documentState = docs[docIdx]; + allDocs.push([collIdx, docIdx]); + if (documentState !== shouldExpire) { + shouldRetainDocs.push([collIdx, docIdx]); + } else { + shouldExpireDocs.push([collIdx, docIdx]); + } + } + } + + // The test case will first update the documents with documentState as 'shouldExpire' and then + // with 'shouldRetain'. To correctly infer the ordering of the pre-images in the collection, the + // sorting has to be done in such a way that for a each document, the documentState + // 'shouldExpire' should come before the documentState 'shouldRetain'. + allDocs.sort((doc1, doc2) => { + const [collIdx1, docIdx1] = doc1; + const [collIdx2, docIdx2] = doc2; + + const annotation1 = docsStatePerCollection[collIdx1][docIdx1]; + const annotation2 = docsStatePerCollection[collIdx2][docIdx2]; + + // If documents are from different collections or if they are from the same collection but + // have same document states, the preserve the original ordering. + if (collIdx1 != collIdx2 || annotation1 == annotation2) { + return 0; + } + + // If documents belong to the same collection and have different document states, then + // document with 'shouldExpire' should come first. + return annotation1 == shouldExpire ? -1 : 1; + }); + + // Helper to update the document in the collection. + const updateDocument = (docInfo) => { + const [collIdx, docIdx] = docInfo; + const collName = collectionsInfo[collIdx]["name"]; + const coll = testDB.getCollection(collName); + + assert.commandWorked(coll.updateOne({_id: docIdx}, {$set: {documentState: "updated"}})); + }; + + // Update each document that should expire, this will create pre-images. + shouldExpireDocs.forEach(updateDocument); + + const preImageColl = primary.getDB("config").getCollection("system.preimages"); + const expireAfterSeconds = 1; + + // Verify that pre-images to be expired is recorded. + verifyPreImages(preImageColl, shouldExpireDocs, collectionsInfo); + + // Get the last pre-image that should expire and compute the current time using that. The + // current time is computed by adding (expireAfterSeconds + 0.001) seconds to the operation time + // of the last recorded pre-image. + const lastPreImageToExpire = preImageColl.find().sort({"_id.ts": -1}).limit(1).toArray(); + assert.eq(lastPreImageToExpire.length, 1, lastPreImageToExpire); + const preImageShouldExpireAfter = + lastPreImageToExpire[0].operationTime.getTime() + expireAfterSeconds * 1000; + const currentTime = new Date(preImageShouldExpireAfter + 1); + + // Sleep for 1 ms before doing the next updates. The will ensure that the difference in + // operation time between pre-images to be retained and pre-images to be expired is at least + // 1 ms. + sleep(1); + + // Update each document for which pre-images will be retain. These pre-images will be followed + // by pre-images that should expire for a particular collection. + shouldRetainDocs.forEach(updateDocument); + + // Verify that all pre-images are recorded. + verifyPreImages(preImageColl, allDocs, collectionsInfo); + + // Configure the current time for the pre-image remover job. At this point, the time-based + // pre-image expiration is still disabled. + const currentTimeFailPoint = + configureFailPoint(primary, + "changeStreamPreImageRemoverCurrentTime", + {currentTimeForTimeBasedExpiration: currentTime}); + + // Wait until at least 1 complete cycle of pre-image removal job is completed. + currentTimeFailPoint.wait(kDefaultWaitForFailPointTimeout, 2); + + // Verify that when time-based pre-image expiration disabled, no pre-images are not deleted. + verifyPreImages(preImageColl, allDocs, collectionsInfo); + + // Enable time-based pre-image expiration and configure the 'expireAfterSeconds' to 1 seconds. + assert.commandWorked(conn.getDB("admin").runCommand( + {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: expireAfterSeconds}})); + + // Verify that at some point in time, all expired pre-images will be deleted. + assert.soon(() => { + return preImageColl.find().toArray().length == shouldRetainDocs.length; + }); + + // Verify that pre-images corresponding to documents with document states 'shouldRetain' are + // present. + verifyPreImages(preImageColl, shouldRetainDocs, collectionsInfo); + + currentTimeFailPoint.off(); +} + +// Tests pre-image time based expiration on a replica-set. +// TODO SERVER-61802: Add test cases for shared cluster. +(function testChangeStreamPreImagesforTimeBasedExpirationOnReplicaSet() { + const replSetTest = new ReplSetTest({name: "replSet", nodes: 1}); + replSetTest.startSet(); + replSetTest.initiate(); + + const conn = replSetTest.getPrimary(); + const primary = replSetTest.getPrimary(); + testTimeBasedPreImageRetentionPolicy(conn, primary); + replSetTest.stopSet(); +})(); +}()); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 54c5aa8a498..0edf1244058 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -465,6 +465,7 @@ env.Library( 'change_stream_expired_pre_image_remover.cpp' ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/record_id_helpers', diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index 614992ea99b..d2dd00901be 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -33,6 +33,7 @@ #include "change_stream_expired_pre_image_remover.h" +#include "mongo/db/change_stream_options_manager.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" @@ -43,10 +44,15 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/util/fail_point.h" namespace mongo { +// Fail point to set current time for time-based expiration of pre-images. +MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime); namespace { + RecordId toRecordId(ChangeStreamPreImageId id) { return record_id_helpers::keyForElem( BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement()); @@ -286,7 +292,35 @@ private: const boost::optional<Date_t> _preImageExpirationTime; }; -void deleteExpiredChangeStreamPreImages(Client* client) { +// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise. +boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( + ChangeStreamOptions& changeStreamOptions) { + if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages && + preAndPostImages->getExpireAfterSeconds() && + !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) { + return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds()); + } + + return boost::none; +} + +// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none +// otherwise. +boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) { + boost::optional<std::int64_t> expireAfterSeconds = boost::none; + + // Get the expiration time directly from the change stream manager. + if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) { + expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions); + } + + // A pre-image is eligible for deletion if: + // pre-image's op-time + expireAfterSeconds < currentTime. + return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds)) + : boost::none; +} + +void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) { const auto startTime = Date_t::now(); auto opCtx = client->makeOperationContext(); @@ -300,9 +334,8 @@ void deleteExpiredChangeStreamPreImages(Client* client) { } // Do not run the job on secondaries. - const auto isPrimary = repl::ReplicationCoordinator::get(opCtx.get()) - ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb); - if (!isPrimary) { + if (!repl::ReplicationCoordinator::get(opCtx.get()) + ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) { return; } @@ -311,11 +344,13 @@ void deleteExpiredChangeStreamPreImages(Client* client) { repl::StorageInterface::get(client->getServiceContext()) ->getEarliestOplogTimestamp(opCtx.get()); - // Iterate over all expired pre-images and remove them. size_t numberOfRemovals = 0; - // TODO SERVER-58693: pass expiration duration parameter to the iterator. + ChangeStreamExpiredPreImageIterator expiredPreImages( - opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs); + opCtx.get(), + &preImagesColl, + currentEarliestOplogEntryTs, + getPreImageExpirationTime(opCtx.get(), currentTimeForTimeBasedExpiration)); for (const auto& collectionRange : expiredPreImages) { writeConflictRetry(opCtx.get(), @@ -374,7 +409,24 @@ void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceC "ChangeStreamExpiredPreImagesRemover", [](Client* client) { try { - deleteExpiredChangeStreamPreImages(client); + Date_t currentTimeForTimeBasedExpiration = Date_t::now(); + + changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { + // Populate the current time for time based expiration of pre-images. + if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) { + const BSONType bsonType = currentTimeElem.type(); + tassert(5869300, + str::stream() + << "Expected type for 'currentTimeForTimeBasedExpiration' is " + "'date', but found: " + << bsonType, + bsonType == BSONType::Date); + + currentTimeForTimeBasedExpiration = currentTimeElem.Date(); + } + }); + + deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration); } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted"); } catch (const DBException& exception) { |