summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp')
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp68
1 files changed, 60 insertions, 8 deletions
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
index 614992ea99b..d2dd00901be 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -33,6 +33,7 @@
#include "change_stream_expired_pre_image_remover.h"
+#include "mongo/db/change_stream_options_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
@@ -43,10 +44,15 @@
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/util/fail_point.h"
namespace mongo {
+// Fail point to set current time for time-based expiration of pre-images.
+MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime);
namespace {
+
RecordId toRecordId(ChangeStreamPreImageId id) {
return record_id_helpers::keyForElem(
BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement());
@@ -286,7 +292,35 @@ private:
const boost::optional<Date_t> _preImageExpirationTime;
};
-void deleteExpiredChangeStreamPreImages(Client* client) {
+// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise.
+boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
+ ChangeStreamOptions& changeStreamOptions) {
+ if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages &&
+ preAndPostImages->getExpireAfterSeconds() &&
+ !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) {
+ return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds());
+ }
+
+ return boost::none;
+}
+
+// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none
+// otherwise.
+boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) {
+ boost::optional<std::int64_t> expireAfterSeconds = boost::none;
+
+ // Get the expiration time directly from the change stream manager.
+ if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) {
+ expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions);
+ }
+
+ // A pre-image is eligible for deletion if:
+ // pre-image's op-time + expireAfterSeconds < currentTime.
+ return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds))
+ : boost::none;
+}
+
+void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) {
const auto startTime = Date_t::now();
auto opCtx = client->makeOperationContext();
@@ -300,9 +334,8 @@ void deleteExpiredChangeStreamPreImages(Client* client) {
}
// Do not run the job on secondaries.
- const auto isPrimary = repl::ReplicationCoordinator::get(opCtx.get())
- ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb);
- if (!isPrimary) {
+ if (!repl::ReplicationCoordinator::get(opCtx.get())
+ ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) {
return;
}
@@ -311,11 +344,13 @@ void deleteExpiredChangeStreamPreImages(Client* client) {
repl::StorageInterface::get(client->getServiceContext())
->getEarliestOplogTimestamp(opCtx.get());
- // Iterate over all expired pre-images and remove them.
size_t numberOfRemovals = 0;
- // TODO SERVER-58693: pass expiration duration parameter to the iterator.
+
ChangeStreamExpiredPreImageIterator expiredPreImages(
- opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs);
+ opCtx.get(),
+ &preImagesColl,
+ currentEarliestOplogEntryTs,
+ getPreImageExpirationTime(opCtx.get(), currentTimeForTimeBasedExpiration));
for (const auto& collectionRange : expiredPreImages) {
writeConflictRetry(opCtx.get(),
@@ -374,7 +409,24 @@ void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceC
"ChangeStreamExpiredPreImagesRemover",
[](Client* client) {
try {
- deleteExpiredChangeStreamPreImages(client);
+ Date_t currentTimeForTimeBasedExpiration = Date_t::now();
+
+ changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
+ // Populate the current time for time based expiration of pre-images.
+ if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
+ const BSONType bsonType = currentTimeElem.type();
+ tassert(5869300,
+ str::stream()
+ << "Expected type for 'currentTimeForTimeBasedExpiration' is "
+ "'date', but found: "
+ << bsonType,
+ bsonType == BSONType::Date);
+
+ currentTimeForTimeBasedExpiration = currentTimeElem.Date();
+ }
+ });
+
+ deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
} catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted");
} catch (const DBException& exception) {