diff options
Diffstat (limited to 'src/mongo/db')
4 files changed, 223 insertions, 55 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index c98887ab88e..84ffd9cca7c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -501,6 +501,7 @@ env.CppUnitTest( 'accumulator_js_test.cpp' if get_option('js-engine') != 'none' else [], 'accumulator_test.cpp', 'aggregation_request_test.cpp', + 'change_stream_expired_pre_image_remover_test.cpp', 'dependencies_test.cpp', 'dispatch_shard_pipeline_test.cpp', 'document_path_support_test.cpp', @@ -605,10 +606,13 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/change_stream_options_manager', + '$BUILD_DIR/mongo/db/commands/change_stream_options', '$BUILD_DIR/mongo/db/cst/cst', '$BUILD_DIR/mongo/db/exec/document_value/document_value', '$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util', '$BUILD_DIR/mongo/db/mongohasher', + '$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover', '$BUILD_DIR/mongo/db/query/collation/collator_interface_mock', '$BUILD_DIR/mongo/db/query/optimizer/optimizer', '$BUILD_DIR/mongo/db/query/optimizer/unit_test_utils', diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index d2dd00901be..9027c8d52de 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -51,6 +51,50 @@ namespace mongo { // Fail point to set current time for time-based expiration of pre-images. MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime); +namespace preImageRemoverInternal { + +bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime, + const Timestamp& earliestOplogEntryTimestamp) { + // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller + // than the 'earliestOplogEntryTimestamp'. + const bool preImageOplogEntryIsDeleted = ts < earliestOplogEntryTimestamp; + const auto expirationTime = preImageExpirationTime.get_value_or(Date_t::min()); + + // Pre-image is expired if its corresponding oplog entry is deleted or its operation + // time is less than or equal to the expiration time. + return preImageOplogEntryIsDeleted || operationTime <= expirationTime; +} + +// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise. +boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( + ChangeStreamOptions& changeStreamOptions) { + if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages && + preAndPostImages->getExpireAfterSeconds() && + !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) { + return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds()); + } + + return boost::none; +} + +// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none +// otherwise. +boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) { + boost::optional<std::int64_t> expireAfterSeconds = boost::none; + + // Get the expiration time directly from the change stream manager. + if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) { + expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions); + } + + // A pre-image is eligible for deletion if: + // pre-image's op-time + expireAfterSeconds < currentTime. + return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds)) + : boost::none; +} + +} // namespace preImageRemoverInternal + namespace { RecordId toRecordId(ChangeStreamPreImageId id) { @@ -91,12 +135,6 @@ public: // pre-image documents of one collection eligible for deletion due to expiration. Lower and // upper bounds of a range are inclusive. class Iterator { - struct PreImageAttributes { - mongo::UUID collectionUUID; - Timestamp ts; - Date_t operationTime; - }; - public: using RecordIdRange = std::pair<RecordId, RecordId>; @@ -143,7 +181,7 @@ public: void advance() { const auto getNextPreImageAttributes = [&](std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>& planExecutor) - -> boost::optional<PreImageAttributes> { + -> boost::optional<preImageRemoverInternal::PreImageAttributes> { BSONObj preImageObj; if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) { return boost::none; @@ -177,7 +215,8 @@ public: // If the first pre-image in the current collection is not expired, fetch the first // pre-image from the next collection. - if (!isExpiredPreImage(*preImageAttributes)) { + if (!preImageAttributes->isExpiredPreImage(_preImageExpirationTime, + _earliestOplogEntryTimestamp)) { continue; } @@ -199,7 +238,8 @@ public: // Iterate over all the expired pre-images in the collection in order to find // the max RecordId. while ((preImageAttributes = getNextPreImageAttributes(planExecutor)) && - isExpiredPreImage(*preImageAttributes) && + preImageAttributes->isExpiredPreImage(_preImageExpirationTime, + _earliestOplogEntryTimestamp) && preImageAttributes->collectionUUID == currentCollectionUUID) { lastExpiredPreimageTs = preImageAttributes->ts; } @@ -224,22 +264,6 @@ public: } } - // Computes if the pre-image is considered expired based on the expiration parameter being - // set. - bool isExpiredPreImage(const PreImageAttributes& preImageAttributes) const { - // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller - // than the 'earliestOplogEntryTimestamp'. - const bool preImageOplogEntryIsDeleted = - preImageAttributes.ts < _earliestOplogEntryTimestamp; - const auto expirationTime = _preImageExpirationTime.get_value_or(Date_t::min()); - - // Pre-image is expired if its corresponding oplog entry is deleted or its operation - // time is less than or equal to the expiration time. - return preImageOplogEntryIsDeleted || - preImageAttributes.operationTime <= expirationTime; - } - - // Set up the new collection scan to start from the 'minKey'. std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createCollectionScan( boost::optional<RecordIdBound> minKey) const { @@ -292,34 +316,6 @@ private: const boost::optional<Date_t> _preImageExpirationTime; }; -// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise. -boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( - ChangeStreamOptions& changeStreamOptions) { - if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages && - preAndPostImages->getExpireAfterSeconds() && - !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) { - return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds()); - } - - return boost::none; -} - -// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none -// otherwise. -boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) { - boost::optional<std::int64_t> expireAfterSeconds = boost::none; - - // Get the expiration time directly from the change stream manager. - if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) { - expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions); - } - - // A pre-image is eligible for deletion if: - // pre-image's op-time + expireAfterSeconds < currentTime. - return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds)) - : boost::none; -} - void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) { const auto startTime = Date_t::now(); auto opCtx = client->makeOperationContext(); @@ -350,7 +346,8 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs, - getPreImageExpirationTime(opCtx.get(), currentTimeForTimeBasedExpiration)); + ::mongo::preImageRemoverInternal::getPreImageExpirationTime( + opCtx.get(), currentTimeForTimeBasedExpiration)); for (const auto& collectionRange : expiredPreImages) { writeConflictRetry(opCtx.get(), @@ -439,5 +436,4 @@ void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceC _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job))); } - } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h index b711b7b21c5..57f6442328f 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/commands/change_stream_options_gen.h" #include "mongo/db/service_context.h" #include "mongo/platform/mutex.h" #include "mongo/util/hierarchical_acquisition.h" @@ -36,6 +37,28 @@ namespace mongo { +namespace preImageRemoverInternal { + +/** + * Specifies attributes that determines if the pre-image has been expired or not. + */ +struct PreImageAttributes { + mongo::UUID collectionUUID; + Timestamp ts; + Date_t operationTime; + + /** + * Determines if the pre-image is considered expired based on the expiration parameter being + * set. + */ + bool isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime, + const Timestamp& earliestOplogEntryTimestamp); +}; + +boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime); + +} // namespace preImageRemoverInternal + class ServiceContext; /** diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp new file mode 100644 index 00000000000..ec49c8453d3 --- /dev/null +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp @@ -0,0 +1,145 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/change_stream_options_manager.h" +#include "mongo/db/pipeline/change_stream_expired_pre_image_remover.h" +#include "mongo/db/service_context_test_fixture.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace { + +class ChangeStreamPreImageExpirationPolicyTest : public ServiceContextTest { +public: + ChangeStreamPreImageExpirationPolicyTest() { + ChangeStreamOptionsManager::create(getServiceContext()); + } + + std::unique_ptr<ChangeStreamOptions> populateChangeStreamPreImageOptions( + stdx::variant<std::string, std::int64_t> expireAfterSeconds) { + PreAndPostImagesOptions preAndPostImagesOptions; + preAndPostImagesOptions.setExpireAfterSeconds(expireAfterSeconds); + + auto changeStreamOptions = std::make_unique<ChangeStreamOptions>(); + changeStreamOptions->setPreAndPostImages(std::move(preAndPostImagesOptions)); + + return changeStreamOptions; + } + + void setChangeStreamOptionsToManager(OperationContext* opCtx, + ChangeStreamOptions& changeStreamOptions) { + auto& changeStreamOptionsManager = ChangeStreamOptionsManager::get(opCtx); + ASSERT_EQ(changeStreamOptionsManager.setOptions(opCtx, changeStreamOptions).getStatus(), + ErrorCodes::OK); + } + + bool isExpiredPreImage(const Timestamp& preImageTs, + const Date_t& preImageOperationTime, + const boost::optional<Date_t>& preImageExpirationTime, + const Timestamp& earliestOplogEntryTimestamp) { + preImageRemoverInternal::PreImageAttributes preImageAttributes{ + UUID::gen(), preImageTs, preImageOperationTime}; + return preImageAttributes.isExpiredPreImage(preImageExpirationTime, + earliestOplogEntryTimestamp); + } +}; + +TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithValidIntegralValue) { + auto opCtx = cc().makeOperationContext(); + const int64_t expireAfterSeconds = 10; + + auto changeStreamOptions = populateChangeStreamPreImageOptions(expireAfterSeconds); + setChangeStreamOptionsToManager(opCtx.get(), *changeStreamOptions.get()); + + auto currentTime = Date_t::now(); + auto receivedExpireAfterSeconds = + preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime); + ASSERT(receivedExpireAfterSeconds); + ASSERT_EQ(*receivedExpireAfterSeconds, currentTime - Seconds(expireAfterSeconds)); +} + +TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithUnsetValue) { + auto opCtx = cc().makeOperationContext(); + + auto currentTime = Date_t::now(); + auto receivedExpireAfterSeconds = + preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime); + ASSERT_FALSE(receivedExpireAfterSeconds); +} + +TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithOffValue) { + auto opCtx = cc().makeOperationContext(); + + auto changeStreamOptions = populateChangeStreamPreImageOptions("off"); + setChangeStreamOptionsToManager(opCtx.get(), *changeStreamOptions.get()); + + auto currentTime = Date_t::now(); + auto receivedExpireAfterSeconds = + preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime); + ASSERT_FALSE(receivedExpireAfterSeconds); +} + +TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderTimestamp) { + ASSERT_TRUE( + isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */, + Date_t::now() /* preImageOperationTime */, + Date_t::now() /* preImageExpirationTime */, + Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */); +} + +TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldNotHaveExpired) { + ASSERT_FALSE( + isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */, + Date_t::now() + Seconds(1) /* preImageOperationTime */, + Date_t::now() /* preImageExpirationTime */, + Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */); +} + +TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderOperationTime) { + ASSERT_TRUE( + isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */, + Date_t::now() /* preImageOperationTime */, + Date_t::now() + Seconds(1) /* preImageExpirationTime */, + Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */); +} + +TEST_F(ChangeStreamPreImageExpirationPolicyTest, + preImageShouldNotHaveExpiredWithNullExpirationTime) { + ASSERT_TRUE( + isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */, + Date_t::now() /* preImageOperationTime */, + boost::none /* preImageExpirationTime */, + Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */); +} + +} // namespace +} // namespace mongo |