summaryrefslogtreecommitdiff
path: root/src/mongo/db/change_stream_pre_images_collection_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/change_stream_pre_images_collection_manager.cpp')
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.cpp154
1 files changed, 97 insertions, 57 deletions
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
index 103ce9a2fb1..7d606635708 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
@@ -77,6 +77,7 @@ boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions(
// Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none
// otherwise.
boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) {
+ invariant(!change_stream_serverless_helpers::isChangeCollectionsModeActive());
boost::optional<std::int64_t> expireAfterSeconds = boost::none;
// Get the expiration time directly from the change stream manager.
@@ -140,7 +141,6 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op
<< preImage.getId().getApplyOpsIndex(),
preImage.getId().getApplyOpsIndex() >= 0);
- // TODO SERVER-66642 Consider using internal test-tenant id if applicable.
const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(
change_stream_serverless_helpers::resolveTenantId(tenantId));
@@ -231,49 +231,15 @@ boost::optional<UUID> findNextCollectionUUID(OperationContext* opCtx,
* | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 |
* +-------------------+ +-------------------+ +-------------------+ +-------------------+
*/
-size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx,
- Date_t currentTimeForTimeBasedExpiration) {
- // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
- // doesn't exist.
- // TODO SERVER-66642 Account for multitenancy.
- AutoGetCollection autoColl(
- opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX);
- const auto& preImagesColl = autoColl.getCollection();
- if (!preImagesColl) {
- return 0;
- }
-
- // Do not run the job on secondaries.
- if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
- opCtx, NamespaceString::kConfigDb)) {
- return 0;
- }
-
- // Get the timestamp of the earliest oplog entry.
- const auto currentEarliestOplogEntryTs =
- repl::StorageInterface::get(opCtx->getServiceContext())->getEarliestOplogTimestamp(opCtx);
-
- const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
+size_t _deleteExpiredChangeStreamPreImagesCommon(OperationContext* opCtx,
+ const CollectionPtr& preImageColl,
+ const MatchExpression* filterPtr,
+ Timestamp maxRecordIdTimestamp) {
size_t numberOfRemovals = 0;
- const auto preImageExpirationTime = change_stream_pre_image_helpers::getPreImageExpirationTime(
- opCtx, currentTimeForTimeBasedExpiration);
-
- // Configure the filter for the case when expiration parameter is set.
- OrMatchExpression filter;
- const MatchExpression* filterPtr = nullptr;
- if (preImageExpirationTime) {
- filter.add(
- std::make_unique<LTMatchExpression>("_id.ts"_sd, Value(currentEarliestOplogEntryTs)));
- filter.add(std::make_unique<LTEMatchExpression>("operationTime"_sd,
- Value(*preImageExpirationTime)));
- filterPtr = &filter;
- }
- const bool shouldReturnEofOnFilterMismatch = preImageExpirationTime.has_value();
-
- // TODO SERVER-66642 Account for multitenancy.
+ const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
boost::optional<UUID> currentCollectionUUID = boost::none;
while ((currentCollectionUUID =
- findNextCollectionUUID(opCtx, &preImagesColl, currentCollectionUUID))) {
+ findNextCollectionUUID(opCtx, &preImageColl, currentCollectionUUID))) {
writeConflictRetry(
opCtx,
"ChangeStreamExpiredPreImagesRemover",
@@ -288,22 +254,14 @@ size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx,
}
RecordIdBound minRecordId(
toRecordId(ChangeStreamPreImageId(*currentCollectionUUID, Timestamp(), 0)));
-
- // If the expiration parameter is set, the 'maxRecord' is set to the maximum
- // RecordId for this collection. Whether the pre-image has to be deleted will be
- // determined by the filtering MatchExpression.
- //
- // If the expiration parameter is not set, then the last expired pre-image timestamp
- // equals to one increment before the 'currentEarliestOplogEntryTs'.
- RecordIdBound maxRecordId = RecordIdBound(toRecordId(ChangeStreamPreImageId(
- *currentCollectionUUID,
- preImageExpirationTime ? Timestamp::max()
- : Timestamp(currentEarliestOplogEntryTs.asULL() - 1),
- std::numeric_limits<int64_t>::max())));
+ RecordIdBound maxRecordId = RecordIdBound(
+ toRecordId(ChangeStreamPreImageId(*currentCollectionUUID,
+ maxRecordIdTimestamp,
+ std::numeric_limits<int64_t>::max())));
auto exec = InternalPlanner::deleteWithCollectionScan(
opCtx,
- &preImagesColl,
+ &preImageColl,
std::move(params),
PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
InternalPlanner::Direction::FORWARD,
@@ -312,12 +270,83 @@ size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx,
CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
std::move(batchedDeleteParams),
filterPtr,
- shouldReturnEofOnFilterMismatch);
+ filterPtr != nullptr);
numberOfRemovals += exec->executeDelete();
});
}
return numberOfRemovals;
}
+
+size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx,
+ Date_t currentTimeForTimeBasedExpiration) {
+ // Acquire intent-exclusive lock on the change collection.
+ AutoGetCollection preImageColl(
+ opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX);
+
+ // Early exit if the collection doesn't exist or running on a secondary.
+ if (!preImageColl ||
+ !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kConfigDb)) {
+ return 0;
+ }
+
+ // Get the timestamp of the earliest oplog entry.
+ const auto currentEarliestOplogEntryTs =
+ repl::StorageInterface::get(opCtx->getServiceContext())->getEarliestOplogTimestamp(opCtx);
+
+ const auto preImageExpirationTime = change_stream_pre_image_helpers::getPreImageExpirationTime(
+ opCtx, currentTimeForTimeBasedExpiration);
+
+ // Configure the filter for the case when expiration parameter is set.
+ if (preImageExpirationTime) {
+ OrMatchExpression filter;
+ filter.add(
+ std::make_unique<LTMatchExpression>("_id.ts"_sd, Value(currentEarliestOplogEntryTs)));
+ filter.add(std::make_unique<LTEMatchExpression>("operationTime"_sd,
+ Value(*preImageExpirationTime)));
+ // If 'preImageExpirationTime' is set, set 'maxRecordIdTimestamp' is set to the maximum
+ // RecordId for this collection. Whether the pre-image has to be deleted will be determined
+ // by the 'filter' parameter.
+ return _deleteExpiredChangeStreamPreImagesCommon(
+ opCtx, *preImageColl, &filter, Timestamp::max() /* maxRecordIdTimestamp */);
+ }
+
+ // 'preImageExpirationTime' is not set, so the last expired pre-image timestamp is less than
+ // 'currentEarliestOplogEntryTs'.
+ return _deleteExpiredChangeStreamPreImagesCommon(
+ opCtx,
+ *preImageColl,
+ nullptr /* filterPtr */,
+ Timestamp(currentEarliestOplogEntryTs.asULL() - 1) /* maxRecordIdTimestamp */);
+}
+
+size_t deleteExpiredChangeStreamPreImagesForTenants(OperationContext* opCtx,
+ const TenantId& tenantId,
+ Date_t currentTimeForTimeBasedExpiration) {
+
+ // Acquire intent-exclusive lock on the change collection.
+ AutoGetCollection preImageColl(opCtx,
+ NamespaceString::makePreImageCollectionNSS(
+ change_stream_serverless_helpers::resolveTenantId(tenantId)),
+ MODE_IX);
+
+ // Early exit if the collection doesn't exist or running on a secondary.
+ if (!preImageColl ||
+ !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(
+ opCtx, NamespaceString::kConfigDb)) {
+ return 0;
+ }
+
+ auto expiredAfterSeconds = change_stream_serverless_helpers::getExpireAfterSeconds(tenantId);
+ LTEMatchExpression filter{
+ "operationTime"_sd,
+ Value(currentTimeForTimeBasedExpiration - Seconds(expiredAfterSeconds))};
+
+ // Set the 'maxRecordIdTimestamp' parameter (upper scan boundary) to maximum possible. Whether
+ // the pre-image has to be deleted will be determined by the 'filter' parameter.
+ return _deleteExpiredChangeStreamPreImagesCommon(
+ opCtx, *preImageColl, &filter, Timestamp::max() /* maxRecordIdTimestamp */);
+}
} // namespace
void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImagesRemovalPass(
@@ -342,9 +371,20 @@ void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImages
ServiceContext::UniqueOperationContext opCtx;
try {
opCtx = client->makeOperationContext();
+ size_t numberOfRemovals = 0;
+
+ if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
+ const auto tenantIds =
+ change_stream_serverless_helpers::getConfigDbTenants(opCtx.get());
+ for (const auto& tenantId : tenantIds) {
+ numberOfRemovals += deleteExpiredChangeStreamPreImagesForTenants(
+ opCtx.get(), tenantId, currentTimeForTimeBasedExpiration);
+ }
+ } else {
+ numberOfRemovals =
+ deleteExpiredChangeStreamPreImages(opCtx.get(), currentTimeForTimeBasedExpiration);
+ }
- auto numberOfRemovals =
- deleteExpiredChangeStreamPreImages(opCtx.get(), currentTimeForTimeBasedExpiration);
if (numberOfRemovals > 0) {
LOGV2_DEBUG(5869104,
3,