diff options
Diffstat (limited to 'src/mongo/db/change_stream_pre_images_collection_manager.cpp')
-rw-r--r-- | src/mongo/db/change_stream_pre_images_collection_manager.cpp | 154 |
1 files changed, 97 insertions, 57 deletions
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp index 103ce9a2fb1..7d606635708 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp +++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp @@ -77,6 +77,7 @@ boost::optional<std::int64_t> getExpireAfterSecondsFromChangeStreamOptions( // Returns pre-images expiry time in milliseconds since the epoch time if configured, boost::none // otherwise. boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_t currentTime) { + invariant(!change_stream_serverless_helpers::isChangeCollectionsModeActive()); boost::optional<std::int64_t> expireAfterSeconds = boost::none; // Get the expiration time directly from the change stream manager. @@ -140,7 +141,6 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op << preImage.getId().getApplyOpsIndex(), preImage.getId().getApplyOpsIndex() >= 0); - // TODO SERVER-66642 Consider using internal test-tenant id if applicable. const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS( change_stream_serverless_helpers::resolveTenantId(tenantId)); @@ -231,49 +231,15 @@ boost::optional<UUID> findNextCollectionUUID(OperationContext* opCtx, * | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 0 | | applyIndex: 1 | * +-------------------+ +-------------------+ +-------------------+ +-------------------+ */ -size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx, - Date_t currentTimeForTimeBasedExpiration) { - // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection - // doesn't exist. - // TODO SERVER-66642 Account for multitenancy. - AutoGetCollection autoColl( - opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX); - const auto& preImagesColl = autoColl.getCollection(); - if (!preImagesColl) { - return 0; - } - - // Do not run the job on secondaries. - if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( - opCtx, NamespaceString::kConfigDb)) { - return 0; - } - - // Get the timestamp of the earliest oplog entry. - const auto currentEarliestOplogEntryTs = - repl::StorageInterface::get(opCtx->getServiceContext())->getEarliestOplogTimestamp(opCtx); - - const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load(); +size_t _deleteExpiredChangeStreamPreImagesCommon(OperationContext* opCtx, + const CollectionPtr& preImageColl, + const MatchExpression* filterPtr, + Timestamp maxRecordIdTimestamp) { size_t numberOfRemovals = 0; - const auto preImageExpirationTime = change_stream_pre_image_helpers::getPreImageExpirationTime( - opCtx, currentTimeForTimeBasedExpiration); - - // Configure the filter for the case when expiration parameter is set. - OrMatchExpression filter; - const MatchExpression* filterPtr = nullptr; - if (preImageExpirationTime) { - filter.add( - std::make_unique<LTMatchExpression>("_id.ts"_sd, Value(currentEarliestOplogEntryTs))); - filter.add(std::make_unique<LTEMatchExpression>("operationTime"_sd, - Value(*preImageExpirationTime))); - filterPtr = &filter; - } - const bool shouldReturnEofOnFilterMismatch = preImageExpirationTime.has_value(); - - // TODO SERVER-66642 Account for multitenancy. + const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load(); boost::optional<UUID> currentCollectionUUID = boost::none; while ((currentCollectionUUID = - findNextCollectionUUID(opCtx, &preImagesColl, currentCollectionUUID))) { + findNextCollectionUUID(opCtx, &preImageColl, currentCollectionUUID))) { writeConflictRetry( opCtx, "ChangeStreamExpiredPreImagesRemover", @@ -288,22 +254,14 @@ size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx, } RecordIdBound minRecordId( toRecordId(ChangeStreamPreImageId(*currentCollectionUUID, Timestamp(), 0))); - - // If the expiration parameter is set, the 'maxRecord' is set to the maximum - // RecordId for this collection. Whether the pre-image has to be deleted will be - // determined by the filtering MatchExpression. - // - // If the expiration parameter is not set, then the last expired pre-image timestamp - // equals to one increment before the 'currentEarliestOplogEntryTs'. - RecordIdBound maxRecordId = RecordIdBound(toRecordId(ChangeStreamPreImageId( - *currentCollectionUUID, - preImageExpirationTime ? Timestamp::max() - : Timestamp(currentEarliestOplogEntryTs.asULL() - 1), - std::numeric_limits<int64_t>::max()))); + RecordIdBound maxRecordId = RecordIdBound( + toRecordId(ChangeStreamPreImageId(*currentCollectionUUID, + maxRecordIdTimestamp, + std::numeric_limits<int64_t>::max()))); auto exec = InternalPlanner::deleteWithCollectionScan( opCtx, - &preImagesColl, + &preImageColl, std::move(params), PlanYieldPolicy::YieldPolicy::YIELD_AUTO, InternalPlanner::Direction::FORWARD, @@ -312,12 +270,83 @@ size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx, CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, std::move(batchedDeleteParams), filterPtr, - shouldReturnEofOnFilterMismatch); + filterPtr != nullptr); numberOfRemovals += exec->executeDelete(); }); } return numberOfRemovals; } + +size_t deleteExpiredChangeStreamPreImages(OperationContext* opCtx, + Date_t currentTimeForTimeBasedExpiration) { + // Acquire intent-exclusive lock on the change collection. + AutoGetCollection preImageColl( + opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX); + + // Early exit if the collection doesn't exist or running on a secondary. + if (!preImageColl || + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( + opCtx, NamespaceString::kConfigDb)) { + return 0; + } + + // Get the timestamp of the earliest oplog entry. + const auto currentEarliestOplogEntryTs = + repl::StorageInterface::get(opCtx->getServiceContext())->getEarliestOplogTimestamp(opCtx); + + const auto preImageExpirationTime = change_stream_pre_image_helpers::getPreImageExpirationTime( + opCtx, currentTimeForTimeBasedExpiration); + + // Configure the filter for the case when expiration parameter is set. + if (preImageExpirationTime) { + OrMatchExpression filter; + filter.add( + std::make_unique<LTMatchExpression>("_id.ts"_sd, Value(currentEarliestOplogEntryTs))); + filter.add(std::make_unique<LTEMatchExpression>("operationTime"_sd, + Value(*preImageExpirationTime))); + // If 'preImageExpirationTime' is set, set 'maxRecordIdTimestamp' is set to the maximum + // RecordId for this collection. Whether the pre-image has to be deleted will be determined + // by the 'filter' parameter. + return _deleteExpiredChangeStreamPreImagesCommon( + opCtx, *preImageColl, &filter, Timestamp::max() /* maxRecordIdTimestamp */); + } + + // 'preImageExpirationTime' is not set, so the last expired pre-image timestamp is less than + // 'currentEarliestOplogEntryTs'. + return _deleteExpiredChangeStreamPreImagesCommon( + opCtx, + *preImageColl, + nullptr /* filterPtr */, + Timestamp(currentEarliestOplogEntryTs.asULL() - 1) /* maxRecordIdTimestamp */); +} + +size_t deleteExpiredChangeStreamPreImagesForTenants(OperationContext* opCtx, + const TenantId& tenantId, + Date_t currentTimeForTimeBasedExpiration) { + + // Acquire intent-exclusive lock on the change collection. + AutoGetCollection preImageColl(opCtx, + NamespaceString::makePreImageCollectionNSS( + change_stream_serverless_helpers::resolveTenantId(tenantId)), + MODE_IX); + + // Early exit if the collection doesn't exist or running on a secondary. + if (!preImageColl || + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( + opCtx, NamespaceString::kConfigDb)) { + return 0; + } + + auto expiredAfterSeconds = change_stream_serverless_helpers::getExpireAfterSeconds(tenantId); + LTEMatchExpression filter{ + "operationTime"_sd, + Value(currentTimeForTimeBasedExpiration - Seconds(expiredAfterSeconds))}; + + // Set the 'maxRecordIdTimestamp' parameter (upper scan boundary) to maximum possible. Whether + // the pre-image has to be deleted will be determined by the 'filter' parameter. + return _deleteExpiredChangeStreamPreImagesCommon( + opCtx, *preImageColl, &filter, Timestamp::max() /* maxRecordIdTimestamp */); +} } // namespace void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImagesRemovalPass( @@ -342,9 +371,20 @@ void ChangeStreamPreImagesCollectionManager::performExpiredChangeStreamPreImages ServiceContext::UniqueOperationContext opCtx; try { opCtx = client->makeOperationContext(); + size_t numberOfRemovals = 0; + + if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { + const auto tenantIds = + change_stream_serverless_helpers::getConfigDbTenants(opCtx.get()); + for (const auto& tenantId : tenantIds) { + numberOfRemovals += deleteExpiredChangeStreamPreImagesForTenants( + opCtx.get(), tenantId, currentTimeForTimeBasedExpiration); + } + } else { + numberOfRemovals = + deleteExpiredChangeStreamPreImages(opCtx.get(), currentTimeForTimeBasedExpiration); + } - auto numberOfRemovals = - deleteExpiredChangeStreamPreImages(opCtx.get(), currentTimeForTimeBasedExpiration); if (numberOfRemovals > 0) { LOGV2_DEBUG(5869104, 3, |