diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2022-01-20 20:54:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-15 00:27:46 +0000 |
commit | 5c03327ab12e2ff0e1d49ce8a4517623dd8cc7ba (patch) | |
tree | 998fbeb55847b412fa4615aff9ec03ec15f9d13f /src/mongo | |
parent | 792eaed783453a5f840112919fc0adb628961d24 (diff) | |
download | mongo-5c03327ab12e2ff0e1d49ce8a4517623dd8cc7ba.tar.gz |
SERVER-58693 Implement deletion of expired pre-images.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp | 68 |
2 files changed, 61 insertions, 8 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 54c5aa8a498..0edf1244058 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -465,6 +465,7 @@ env.Library( 'change_stream_expired_pre_image_remover.cpp' ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/change_stream_options_manager', '$BUILD_DIR/mongo/db/db_raii', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/record_id_helpers', diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index 614992ea99b..d2dd00901be 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -33,6 +33,7 @@ #include "change_stream_expired_pre_image_remover.h" +#include "mongo/db/change_stream_options_manager.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" @@ -43,10 +44,15 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/util/fail_point.h" namespace mongo { +// Fail point to set current time for time-based expiration of pre-images. +MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime); namespace { + RecordId toRecordId(ChangeStreamPreImageId id) { return record_id_helpers::keyForElem( BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement()); @@ -286,7 +292,35 @@ private: const boost::optional<Date_t> _preImageExpirationTime; }; -void deleteExpiredChangeStreamPreImages(Client* client) { +// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise. +boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( + ChangeStreamOptions& changeStreamOptions) { + if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages && + preAndPostImages->getExpireAfterSeconds() && + !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) { + return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds()); + } + + return boost::none; +} + +// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none +// otherwise. +boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) { + boost::optional<std::int64_t> expireAfterSeconds = boost::none; + + // Get the expiration time directly from the change stream manager. + if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) { + expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions); + } + + // A pre-image is eligible for deletion if: + // pre-image's op-time + expireAfterSeconds < currentTime. + return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds)) + : boost::none; +} + +void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) { const auto startTime = Date_t::now(); auto opCtx = client->makeOperationContext(); @@ -300,9 +334,8 @@ void deleteExpiredChangeStreamPreImages(Client* client) { } // Do not run the job on secondaries. - const auto isPrimary = repl::ReplicationCoordinator::get(opCtx.get()) - ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb); - if (!isPrimary) { + if (!repl::ReplicationCoordinator::get(opCtx.get()) + ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) { return; } @@ -311,11 +344,13 @@ void deleteExpiredChangeStreamPreImages(Client* client) { repl::StorageInterface::get(client->getServiceContext()) ->getEarliestOplogTimestamp(opCtx.get()); - // Iterate over all expired pre-images and remove them. size_t numberOfRemovals = 0; - // TODO SERVER-58693: pass expiration duration parameter to the iterator. + ChangeStreamExpiredPreImageIterator expiredPreImages( - opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs); + opCtx.get(), + &preImagesColl, + currentEarliestOplogEntryTs, + getPreImageExpirationTime(opCtx.get(), currentTimeForTimeBasedExpiration)); for (const auto& collectionRange : expiredPreImages) { writeConflictRetry(opCtx.get(), @@ -374,7 +409,24 @@ void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceC "ChangeStreamExpiredPreImagesRemover", [](Client* client) { try { - deleteExpiredChangeStreamPreImages(client); + Date_t currentTimeForTimeBasedExpiration = Date_t::now(); + + changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { + // Populate the current time for time based expiration of pre-images. + if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) { + const BSONType bsonType = currentTimeElem.type(); + tassert(5869300, + str::stream() + << "Expected type for 'currentTimeForTimeBasedExpiration' is " + "'date', but found: " + << bsonType, + bsonType == BSONType::Date); + + currentTimeForTimeBasedExpiration = currentTimeElem.Date(); + } + }); + + deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration); } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted"); } catch (const DBException& exception) { |