summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/SConscript4
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp106
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h23
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp145
4 files changed, 223 insertions, 55 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index c98887ab88e..84ffd9cca7c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -501,6 +501,7 @@ env.CppUnitTest(
'accumulator_js_test.cpp' if get_option('js-engine') != 'none' else [],
'accumulator_test.cpp',
'aggregation_request_test.cpp',
+ 'change_stream_expired_pre_image_remover_test.cpp',
'dependencies_test.cpp',
'dispatch_shard_pipeline_test.cpp',
'document_path_support_test.cpp',
@@ -605,10 +606,13 @@ env.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/change_stream_options_manager',
+ '$BUILD_DIR/mongo/db/commands/change_stream_options',
'$BUILD_DIR/mongo/db/cst/cst',
'$BUILD_DIR/mongo/db/exec/document_value/document_value',
'$BUILD_DIR/mongo/db/exec/document_value/document_value_test_util',
'$BUILD_DIR/mongo/db/mongohasher',
+ '$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover',
'$BUILD_DIR/mongo/db/query/collation/collator_interface_mock',
'$BUILD_DIR/mongo/db/query/optimizer/optimizer',
'$BUILD_DIR/mongo/db/query/optimizer/unit_test_utils',
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
index d2dd00901be..9027c8d52de 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -51,6 +51,50 @@ namespace mongo {
// Fail point to set current time for time-based expiration of pre-images.
MONGO_FAIL_POINT_DEFINE(changeStreamPreImageRemoverCurrentTime);
+namespace preImageRemoverInternal {
+
+bool PreImageAttributes::isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime,
+ const Timestamp& earliestOplogEntryTimestamp) {
+ // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller
+ // than the 'earliestOplogEntryTimestamp'.
+ const bool preImageOplogEntryIsDeleted = ts < earliestOplogEntryTimestamp;
+ const auto expirationTime = preImageExpirationTime.get_value_or(Date_t::min());
+
+ // Pre-image is expired if its corresponding oplog entry is deleted or its operation
+ // time is less than or equal to the expiration time.
+ return preImageOplogEntryIsDeleted || operationTime <= expirationTime;
+}
+
+// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise.
+boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
+ ChangeStreamOptions& changeStreamOptions) {
+ if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages &&
+ preAndPostImages->getExpireAfterSeconds() &&
+ !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) {
+ return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds());
+ }
+
+ return boost::none;
+}
+
+// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none
+// otherwise.
+boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) {
+ boost::optional<std::int64_t> expireAfterSeconds = boost::none;
+
+ // Get the expiration time directly from the change stream manager.
+ if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) {
+ expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions);
+ }
+
+ // A pre-image is eligible for deletion if:
+ // pre-image's op-time + expireAfterSeconds < currentTime.
+ return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds))
+ : boost::none;
+}
+
+} // namespace preImageRemoverInternal
+
namespace {
RecordId toRecordId(ChangeStreamPreImageId id) {
@@ -91,12 +135,6 @@ public:
// pre-image documents of one collection eligible for deletion due to expiration. Lower and
// upper bounds of a range are inclusive.
class Iterator {
- struct PreImageAttributes {
- mongo::UUID collectionUUID;
- Timestamp ts;
- Date_t operationTime;
- };
-
public:
using RecordIdRange = std::pair<RecordId, RecordId>;
@@ -143,7 +181,7 @@ public:
void advance() {
const auto getNextPreImageAttributes =
[&](std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>& planExecutor)
- -> boost::optional<PreImageAttributes> {
+ -> boost::optional<preImageRemoverInternal::PreImageAttributes> {
BSONObj preImageObj;
if (planExecutor->getNext(&preImageObj, nullptr) == PlanExecutor::IS_EOF) {
return boost::none;
@@ -177,7 +215,8 @@ public:
// If the first pre-image in the current collection is not expired, fetch the first
// pre-image from the next collection.
- if (!isExpiredPreImage(*preImageAttributes)) {
+ if (!preImageAttributes->isExpiredPreImage(_preImageExpirationTime,
+ _earliestOplogEntryTimestamp)) {
continue;
}
@@ -199,7 +238,8 @@ public:
// Iterate over all the expired pre-images in the collection in order to find
// the max RecordId.
while ((preImageAttributes = getNextPreImageAttributes(planExecutor)) &&
- isExpiredPreImage(*preImageAttributes) &&
+ preImageAttributes->isExpiredPreImage(_preImageExpirationTime,
+ _earliestOplogEntryTimestamp) &&
preImageAttributes->collectionUUID == currentCollectionUUID) {
lastExpiredPreimageTs = preImageAttributes->ts;
}
@@ -224,22 +264,6 @@ public:
}
}
- // Computes if the pre-image is considered expired based on the expiration parameter being
- // set.
- bool isExpiredPreImage(const PreImageAttributes& preImageAttributes) const {
- // Pre-image oplog entry is no longer present in the oplog if its timestamp is smaller
- // than the 'earliestOplogEntryTimestamp'.
- const bool preImageOplogEntryIsDeleted =
- preImageAttributes.ts < _earliestOplogEntryTimestamp;
- const auto expirationTime = _preImageExpirationTime.get_value_or(Date_t::min());
-
- // Pre-image is expired if its corresponding oplog entry is deleted or its operation
- // time is less than or equal to the expiration time.
- return preImageOplogEntryIsDeleted ||
- preImageAttributes.operationTime <= expirationTime;
- }
-
-
// Set up the new collection scan to start from the 'minKey'.
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createCollectionScan(
boost::optional<RecordIdBound> minKey) const {
@@ -292,34 +316,6 @@ private:
const boost::optional<Date_t> _preImageExpirationTime;
};
-// Get the 'expireAfterSeconds' from the 'ChangeStreamOptions' if present, boost::none otherwise.
-boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
- ChangeStreamOptions& changeStreamOptions) {
- if (auto preAndPostImages = changeStreamOptions.getPreAndPostImages(); preAndPostImages &&
- preAndPostImages->getExpireAfterSeconds() &&
- !stdx::holds_alternative<std::string>(*preAndPostImages->getExpireAfterSeconds())) {
- return stdx::get<std::int64_t>(*preAndPostImages->getExpireAfterSeconds());
- }
-
- return boost::none;
-}
-
-// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none
-// otherwise.
-boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) {
- boost::optional<std::int64_t> expireAfterSeconds = boost::none;
-
- // Get the expiration time directly from the change stream manager.
- if (auto changeStreamOptions = ChangeStreamOptionsManager::get(opCtx).getOptions(opCtx)) {
- expireAfterSeconds = getExpireAfterSecondsFromChangeStreamOptions(*changeStreamOptions);
- }
-
- // A pre-image is eligible for deletion if:
- // pre-image's op-time + expireAfterSeconds < currentTime.
- return expireAfterSeconds ? boost::optional<Date_t>(currentTime - Seconds(*expireAfterSeconds))
- : boost::none;
-}
-
void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) {
const auto startTime = Date_t::now();
auto opCtx = client->makeOperationContext();
@@ -350,7 +346,8 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
opCtx.get(),
&preImagesColl,
currentEarliestOplogEntryTs,
- getPreImageExpirationTime(opCtx.get(), currentTimeForTimeBasedExpiration));
+ ::mongo::preImageRemoverInternal::getPreImageExpirationTime(
+ opCtx.get(), currentTimeForTimeBasedExpiration));
for (const auto& collectionRange : expiredPreImages) {
writeConflictRetry(opCtx.get(),
@@ -439,5 +436,4 @@ void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceC
_anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
index b711b7b21c5..57f6442328f 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/commands/change_stream_options_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/mutex.h"
#include "mongo/util/hierarchical_acquisition.h"
@@ -36,6 +37,28 @@
namespace mongo {
+namespace preImageRemoverInternal {
+
+/**
+ * Specifies attributes that determines if the pre-image has been expired or not.
+ */
+struct PreImageAttributes {
+ mongo::UUID collectionUUID;
+ Timestamp ts;
+ Date_t operationTime;
+
+ /**
+ * Determines if the pre-image is considered expired based on the expiration parameter being
+ * set.
+ */
+ bool isExpiredPreImage(const boost::optional<Date_t>& preImageExpirationTime,
+ const Timestamp& earliestOplogEntryTimestamp);
+};
+
+boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime);
+
+} // namespace preImageRemoverInternal
+
class ServiceContext;
/**
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp
new file mode 100644
index 00000000000..ec49c8453d3
--- /dev/null
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover_test.cpp
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/change_stream_options_manager.h"
+#include "mongo/db/pipeline/change_stream_expired_pre_image_remover.h"
+#include "mongo/db/service_context_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace {
+
+class ChangeStreamPreImageExpirationPolicyTest : public ServiceContextTest {
+public:
+ ChangeStreamPreImageExpirationPolicyTest() {
+ ChangeStreamOptionsManager::create(getServiceContext());
+ }
+
+ std::unique_ptr<ChangeStreamOptions> populateChangeStreamPreImageOptions(
+ stdx::variant<std::string, std::int64_t> expireAfterSeconds) {
+ PreAndPostImagesOptions preAndPostImagesOptions;
+ preAndPostImagesOptions.setExpireAfterSeconds(expireAfterSeconds);
+
+ auto changeStreamOptions = std::make_unique<ChangeStreamOptions>();
+ changeStreamOptions->setPreAndPostImages(std::move(preAndPostImagesOptions));
+
+ return changeStreamOptions;
+ }
+
+ void setChangeStreamOptionsToManager(OperationContext* opCtx,
+ ChangeStreamOptions& changeStreamOptions) {
+ auto& changeStreamOptionsManager = ChangeStreamOptionsManager::get(opCtx);
+ ASSERT_EQ(changeStreamOptionsManager.setOptions(opCtx, changeStreamOptions).getStatus(),
+ ErrorCodes::OK);
+ }
+
+ bool isExpiredPreImage(const Timestamp& preImageTs,
+ const Date_t& preImageOperationTime,
+ const boost::optional<Date_t>& preImageExpirationTime,
+ const Timestamp& earliestOplogEntryTimestamp) {
+ preImageRemoverInternal::PreImageAttributes preImageAttributes{
+ UUID::gen(), preImageTs, preImageOperationTime};
+ return preImageAttributes.isExpiredPreImage(preImageExpirationTime,
+ earliestOplogEntryTimestamp);
+ }
+};
+
+TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithValidIntegralValue) {
+ auto opCtx = cc().makeOperationContext();
+ const int64_t expireAfterSeconds = 10;
+
+ auto changeStreamOptions = populateChangeStreamPreImageOptions(expireAfterSeconds);
+ setChangeStreamOptionsToManager(opCtx.get(), *changeStreamOptions.get());
+
+ auto currentTime = Date_t::now();
+ auto receivedExpireAfterSeconds =
+ preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime);
+ ASSERT(receivedExpireAfterSeconds);
+ ASSERT_EQ(*receivedExpireAfterSeconds, currentTime - Seconds(expireAfterSeconds));
+}
+
+TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithUnsetValue) {
+ auto opCtx = cc().makeOperationContext();
+
+ auto currentTime = Date_t::now();
+ auto receivedExpireAfterSeconds =
+ preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime);
+ ASSERT_FALSE(receivedExpireAfterSeconds);
+}
+
+TEST_F(ChangeStreamPreImageExpirationPolicyTest, getPreImageExpirationTimeWithOffValue) {
+ auto opCtx = cc().makeOperationContext();
+
+ auto changeStreamOptions = populateChangeStreamPreImageOptions("off");
+ setChangeStreamOptionsToManager(opCtx.get(), *changeStreamOptions.get());
+
+ auto currentTime = Date_t::now();
+ auto receivedExpireAfterSeconds =
+ preImageRemoverInternal::getPreImageExpirationTime(opCtx.get(), currentTime);
+ ASSERT_FALSE(receivedExpireAfterSeconds);
+}
+
+TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderTimestamp) {
+ ASSERT_TRUE(
+ isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */,
+ Date_t::now() /* preImageOperationTime */,
+ Date_t::now() /* preImageExpirationTime */,
+ Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */);
+}
+
+TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldNotHaveExpired) {
+ ASSERT_FALSE(
+ isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */,
+ Date_t::now() + Seconds(1) /* preImageOperationTime */,
+ Date_t::now() /* preImageExpirationTime */,
+ Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */);
+}
+
+TEST_F(ChangeStreamPreImageExpirationPolicyTest, preImageShouldHaveExpiredWithOlderOperationTime) {
+ ASSERT_TRUE(
+ isExpiredPreImage(Timestamp(Seconds(100000), 1U) /* preImageTs */,
+ Date_t::now() /* preImageOperationTime */,
+ Date_t::now() + Seconds(1) /* preImageExpirationTime */,
+ Timestamp(Seconds(100000), 0U)) /* earliestOplogEntryTimestamp */);
+}
+
+TEST_F(ChangeStreamPreImageExpirationPolicyTest,
+ preImageShouldNotHaveExpiredWithNullExpirationTime) {
+ ASSERT_TRUE(
+ isExpiredPreImage(Timestamp(Seconds(100000), 0U) /* preImageTs */,
+ Date_t::now() /* preImageOperationTime */,
+ boost::none /* preImageExpirationTime */,
+ Timestamp(Seconds(100000), 1U)) /* earliestOplogEntryTimestamp */);
+}
+
+} // namespace
+} // namespace mongo