summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-01-20 20:54:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-15 00:27:46 +0000
commit5c03327ab12e2ff0e1d49ce8a4517623dd8cc7ba (patch)
tree998fbeb55847b412fa4615aff9ec03ec15f9d13f
parent792eaed783453a5f840112919fc0adb628961d24 (diff)
downloadmongo-5c03327ab12e2ff0e1d49ce8a4517623dd8cc7ba.tar.gz
SERVER-58693 Implement deletion of expired pre-images.
-rw-r--r--jstests/libs/fail_point_util.js4
-rw-r--r--jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js206
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp68
4 files changed, 269 insertions, 10 deletions
diff --git a/jstests/libs/fail_point_util.js b/jstests/libs/fail_point_util.js
index d8592a653fb..c01c1d593e0 100644
--- a/jstests/libs/fail_point_util.js
+++ b/jstests/libs/fail_point_util.js
@@ -23,13 +23,13 @@ configureFailPoint = function(conn, failPointName, data = {}, failPointMode = "a
{configureFailPoint: failPointName, mode: failPointMode, data: data}))
.count,
wait:
- function(maxTimeMS = kDefaultWaitForFailPointTimeout) {
+ function(maxTimeMS = kDefaultWaitForFailPointTimeout, timesEntered = 1) {
// Can only be called once because this function does not keep track of the
// number of times the fail point is entered between the time it returns
// and the next time it gets called.
assert.commandWorked(conn.adminCommand({
waitForFailPoint: failPointName,
- timesEntered: this.timesEntered + 1,
+ timesEntered: this.timesEntered + timesEntered,
maxTimeMS: maxTimeMS
}));
},
diff --git a/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js
new file mode 100644
index 00000000000..6684f16742f
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_pre_image_time_based_expiration.js
@@ -0,0 +1,206 @@
+// Tests time-based pre-image retention policy of change stream pre-images remover job.
+// @tags: [
+// requires_fcv_53,
+// featureFlagChangeStreamPreAndPostImages,
+// featureFlagChangeStreamPreAndPostImagesTimeBasedRetentionPolicy,
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
+
+// Helper to verify if expected pre-images are present in pre-image collection.
+function verifyPreImages(preImageColl, expectedPreImages, collectionsInfo) {
+ const preImageDocs = preImageColl.find().toArray();
+
+ assert.eq(preImageDocs.length, expectedPreImages.length, preImageDocs);
+
+ for (let idx = 0; idx < preImageDocs.length; idx++) {
+ const [collIdx, preImageId] = expectedPreImages[idx];
+ const nsUUID = collectionsInfo[collIdx]["info"].uuid;
+
+ assert.eq(preImageDocs[idx]._id.nsUUID,
+ nsUUID,
+ "pre-image in collection: " + tojson(preImageDocs[idx]) +
+ ", expected collIdx: " + collIdx + ", docIdx: " + idx);
+
+ assert.eq(preImageDocs[idx].preImage._id,
+ preImageId,
+ "pre-image in collection: " + tojson(preImageDocs[idx]) +
+ ", expected _id: " + preImageId);
+ }
+}
+
+// Tests time-based change stream pre-image retention policy.
+function testTimeBasedPreImageRetentionPolicy(conn, primary) {
+ // Annotations for pre-images that define if pre-image is expected to expire or not.
+ const shouldExpire = "shouldExpire";
+ const shouldRetain = "shouldRetain";
+
+ // Each element defines a sequence of documents belonging to one collection.
+ // Each documents has has associated status - 'expire' or 'retain'. This structure models the
+ // state of the pre-images associated with a particular document of the collection, ie. the
+ // pre-images corresponding to the documents 'shouldExpire' will be deleted by the remover job
+ // and those with 'shouldRetain' will be retained.
+ const docsStatePerCollection = [
+ [shouldRetain],
+ [shouldExpire],
+ [shouldRetain, shouldExpire],
+ [shouldExpire, shouldRetain],
+ [shouldRetain, shouldRetain],
+ [shouldExpire, shouldExpire]
+ ];
+
+ const collectionCount = docsStatePerCollection.length;
+ const testDB = conn.getDB("test");
+
+ // Create several collections with pre- and post-images enabled.
+ for (let collIdx = 0; collIdx < collectionCount; collIdx++) {
+ const collName = "coll" + collIdx;
+ assert.commandWorked(
+ testDB.createCollection(collName, {changeStreamPreAndPostImages: {enabled: true}}));
+ }
+
+ // Get the collections information and sort them by uuid. The pre-image documents are naturally
+ // sorted first by uuid and then by timestamp in a pre-images collection. Sorting of collection
+ // helps in setting up of the pre-images in the required order. The 'collectionsInfo' has
+ // one-to-one mapping with 'docsStatePerCollection'.
+ let collectionsInfo = testDB.getCollectionInfos();
+ assert.eq(collectionsInfo.length, collectionCount);
+ collectionsInfo.sort((coll1, coll2) => {
+ return coll1.info.uuid <= coll2.info.uuid ? -1 : 1;
+ });
+
+ // Disable pre-image time-based expiration policy.
+ assert.commandWorked(conn.getDB("admin").runCommand(
+ {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: "off"}}));
+
+ let shouldRetainDocs = [];
+ let shouldExpireDocs = [];
+ let allDocs = [];
+
+ // Insert documents to each collection. Iterate through the documents and group them as
+ // 'shouldExpire' or 'shouldRetain' based on the document states. Each element of these groups
+ // is an array, where the first element is the collection index and the second is the document
+ // index.
+ for (let collIdx = 0; collIdx < collectionCount; collIdx++) {
+ const collName = collectionsInfo[collIdx]["name"];
+ const coll = testDB.getCollection(collName);
+ const docs = docsStatePerCollection[collIdx];
+
+ for (let docIdx = 0; docIdx < docs.length; docIdx++) {
+ assert.commandWorked(coll.insert({_id: docIdx}, {$set: {documentState: "inserted"}}));
+
+ const documentState = docs[docIdx];
+ allDocs.push([collIdx, docIdx]);
+ if (documentState !== shouldExpire) {
+ shouldRetainDocs.push([collIdx, docIdx]);
+ } else {
+ shouldExpireDocs.push([collIdx, docIdx]);
+ }
+ }
+ }
+
+ // The test case will first update the documents with documentState as 'shouldExpire' and then
+ // with 'shouldRetain'. To correctly infer the ordering of the pre-images in the collection, the
+ // sorting has to be done in such a way that for a each document, the documentState
+ // 'shouldExpire' should come before the documentState 'shouldRetain'.
+ allDocs.sort((doc1, doc2) => {
+ const [collIdx1, docIdx1] = doc1;
+ const [collIdx2, docIdx2] = doc2;
+
+ const annotation1 = docsStatePerCollection[collIdx1][docIdx1];
+ const annotation2 = docsStatePerCollection[collIdx2][docIdx2];
+
+ // If documents are from different collections or if they are from the same collection but
+ // have same document states, the preserve the original ordering.
+ if (collIdx1 != collIdx2 || annotation1 == annotation2) {
+ return 0;
+ }
+
+ // If documents belong to the same collection and have different document states, then
+ // document with 'shouldExpire' should come first.
+ return annotation1 == shouldExpire ? -1 : 1;
+ });
+
+ // Helper to update the document in the collection.
+ const updateDocument = (docInfo) => {
+ const [collIdx, docIdx] = docInfo;
+ const collName = collectionsInfo[collIdx]["name"];
+ const coll = testDB.getCollection(collName);
+
+ assert.commandWorked(coll.updateOne({_id: docIdx}, {$set: {documentState: "updated"}}));
+ };
+
+ // Update each document that should expire, this will create pre-images.
+ shouldExpireDocs.forEach(updateDocument);
+
+ const preImageColl = primary.getDB("config").getCollection("system.preimages");
+ const expireAfterSeconds = 1;
+
+ // Verify that pre-images to be expired is recorded.
+ verifyPreImages(preImageColl, shouldExpireDocs, collectionsInfo);
+
+ // Get the last pre-image that should expire and compute the current time using that. The
+ // current time is computed by adding (expireAfterSeconds + 0.001) seconds to the operation time
+ // of the last recorded pre-image.
+ const lastPreImageToExpire = preImageColl.find().sort({"_id.ts": -1}).limit(1).toArray();
+ assert.eq(lastPreImageToExpire.length, 1, lastPreImageToExpire);
+ const preImageShouldExpireAfter =
+ lastPreImageToExpire[0].operationTime.getTime() + expireAfterSeconds * 1000;
+ const currentTime = new Date(preImageShouldExpireAfter + 1);
+
+ // Sleep for 1 ms before doing the next updates. The will ensure that the difference in
+ // operation time between pre-images to be retained and pre-images to be expired is at least
+ // 1 ms.
+ sleep(1);
+
+ // Update each document for which pre-images will be retain. These pre-images will be followed
+ // by pre-images that should expire for a particular collection.
+ shouldRetainDocs.forEach(updateDocument);
+
+ // Verify that all pre-images are recorded.
+ verifyPreImages(preImageColl, allDocs, collectionsInfo);
+
+ // Configure the current time for the pre-image remover job. At this point, the time-based
+ // pre-image expiration is still disabled.
+ const currentTimeFailPoint =
+ configureFailPoint(primary,
+ "changeStreamPreImageRemoverCurrentTime",
+ {currentTimeForTimeBasedExpiration: currentTime});
+
+ // Wait until at least 1 complete cycle of pre-image removal job is completed.
+ currentTimeFailPoint.wait(kDefaultWaitForFailPointTimeout, 2);
+
+ // Verify that when time-based pre-image expiration disabled, no pre-images are not deleted.
+ verifyPreImages(preImageColl, allDocs, collectionsInfo);
+
+ // Enable time-based pre-image expiration and configure the 'expireAfterSeconds' to 1 seconds.
+ assert.commandWorked(conn.getDB("admin").runCommand(
+ {setChangeStreamOptions: 1, preAndPostImages: {expireAfterSeconds: expireAfterSeconds}}));
+
+ // Verify that at some point in time, all expired pre-images will be deleted.
+ assert.soon(() => {
+ return preImageColl.find().toArray().length == shouldRetainDocs.length;
+ });
+
+ // Verify that pre-images corresponding to documents with document states 'shouldRetain' are
+ // present.
+ verifyPreImages(preImageColl, shouldRetainDocs, collectionsInfo);
+
+ currentTimeFailPoint.off();
+}
+
+// Tests pre-image time based expiration on a replica-set.
+// TODO SERVER-61802: Add test cases for shared cluster.
+(function testChangeStreamPreImagesforTimeBasedExpirationOnReplicaSet() {
+ const replSetTest = new ReplSetTest({name: "replSet", nodes: 1});
+ replSetTest.startSet();
+ replSetTest.initiate();
+
+ const conn = replSetTest.getPrimary();
+ const primary = replSetTest.getPrimary();
+ testTimeBasedPreImageRetentionPolicy(conn, primary);
+ replSetTest.stopSet();
+})();
+}());
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 54c5aa8a498..0edf1244058 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -465,6 +465,7 @@ env.Library(
'change_stream_expired_pre_image_remover.cpp'
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/change_stream_options_manager',
'$BUILD_DIR/mongo/db/db_raii',
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/record_id_helpers',
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
index 614992ea99b..d2dd00901be 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -33,6 +33,7 @@
#include "change_stream_expired_pre_image_remover.h"
+#include "mongo/db/change_stream_options_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
@@ -43,10 +44,15 @@
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
+// Fail point to set current time for time-based expiration of pre-images.
+MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime);
namespace {
+
RecordId toRecordId(ChangeStreamPreImageId id) {
return record_id_helpers::keyForElem(
BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement());
@@ -286,7 +292,35 @@ private:
const boost::optional<Date_t> _preImageExpirationTime;
};
-void deleteExpiredChangeStreamPreImages(Client* client) {
+// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise.
+boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
+ ChangeStreamOptions& changeStreamOptions) {
+ if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages &&
+ preAndPostImages->getExpireAfterSeconds() &&
+ !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) {
+ return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds());
+ }
+
+ return boost::none;
+}
+
+// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none
+// otherwise.
+boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) {
+ boost::optional<std::int64_t> expireAfterSeconds = boost::none;
+
+ // Get the expiration time directly from the change stream manager.
+ if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) {
+ expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions);
+ }
+
+ // A pre-image is eligible for deletion if:
+ // pre-image's op-time + expireAfterSeconds < currentTime.
+ return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds))
+ : boost::none;
+}
+
+void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) {
const auto startTime = Date_t::now();
auto opCtx = client->makeOperationContext();
@@ -300,9 +334,8 @@ void deleteExpiredChangeStreamPreImages(Client* client) {
}
// Do not run the job on secondaries.
- const auto isPrimary = repl::ReplicationCoordinator::get(opCtx.get())
- ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb);
- if (!isPrimary) {
+ if (!repl::ReplicationCoordinator::get(opCtx.get())
+ ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) {
return;
}
@@ -311,11 +344,13 @@ void deleteExpiredChangeStreamPreImages(Client* client) {
repl::StorageInterface::get(client->getServiceContext())
->getEarliestOplogTimestamp(opCtx.get());
- // Iterate over all expired pre-images and remove them.
size_t numberOfRemovals = 0;
- // TODO SERVER-58693: pass expiration duration parameter to the iterator.
+
ChangeStreamExpiredPreImageIterator expiredPreImages(
- opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs);
+ opCtx.get(),
+ &preImagesColl,
+ currentEarliestOplogEntryTs,
+ getPreImageExpirationTime(opCtx.get(), currentTimeForTimeBasedExpiration));
for (const auto& collectionRange : expiredPreImages) {
writeConflictRetry(opCtx.get(),
@@ -374,7 +409,24 @@ void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceC
"ChangeStreamExpiredPreImagesRemover",
[](Client* client) {
try {
- deleteExpiredChangeStreamPreImages(client);
+ Date_t currentTimeForTimeBasedExpiration = Date_t::now();
+
+ changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
+ // Populate the current time for time based expiration of pre-images.
+ if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
+ const BSONType bsonType = currentTimeElem.type();
+ tassert(5869300,
+ str::stream()
+ << "Expected type for 'currentTimeForTimeBasedExpiration' is "
+ "'date', but found: "
+ << bsonType,
+ bsonType == BSONType::Date);
+
+ currentTimeForTimeBasedExpiration = currentTimeElem.Date();
+ }
+ });
+
+ deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
} catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted");
} catch (const DBException& exception) {