diff options
author | Mindaugas Malinauskas <mindaugas.malinauskas@mongodb.com> | 2022-07-13 15:53:21 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-07-25 17:24:35 +0000 |
commit | 0e465a0d7adefc42b9399fcbde011d712785ad76 (patch) | |
tree | fb2bc3d7bf4c97f3545eb4e633d203004db94c20 | |
parent | a29bfd88f98f6acb0340a3ac500e95a5aee1d15e (diff) | |
download | mongo-0e465a0d7adefc42b9399fcbde011d712785ad76.tar.gz |
SERVER-67615 Stop using ErrorCategory::Interruption in the change stream pre-images purging job
-rw-r--r-- | src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp | 172 |
1 files changed, 89 insertions, 83 deletions
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index 5be57605e9d..81931be466c 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -325,100 +325,106 @@ private: void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) { const auto startTime = Date_t::now(); - auto opCtx = client->makeOperationContext(); - - // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection - // doesn't exist. - AutoGetCollection autoColl( - opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX); - const auto& preImagesColl = autoColl.getCollection(); - if (!preImagesColl) { - return; - } - - // Do not run the job on secondaries. - if (!repl::ReplicationCoordinator::get(opCtx.get()) - ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) { - return; - } + ServiceContext::UniqueOperationContext opCtx; + try { + opCtx = client->makeOperationContext(); + + // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection + // doesn't exist. + AutoGetCollection autoColl( + opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX); + const auto& preImagesColl = autoColl.getCollection(); + if (!preImagesColl) { + return; + } - // Get the timestamp of the ealiest oplog entry. - const auto currentEarliestOplogEntryTs = - repl::StorageInterface::get(client->getServiceContext()) - ->getEarliestOplogTimestamp(opCtx.get()); + // Do not run the job on secondaries. + if (!repl::ReplicationCoordinator::get(opCtx.get()) + ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) { + return; + } - const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load(); - size_t numberOfRemovals = 0; + // Get the timestamp of the earliest oplog entry. + const auto currentEarliestOplogEntryTs = + repl::StorageInterface::get(client->getServiceContext()) + ->getEarliestOplogTimestamp(opCtx.get()); - ChangeStreamExpiredPreImageIterator expiredPreImages( - opCtx.get(), - &preImagesColl, - currentEarliestOplogEntryTs, - ::mongo::preImageRemoverInternal::getPreImageExpirationTime( - opCtx.get(), currentTimeForTimeBasedExpiration)); + const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load(); + size_t numberOfRemovals = 0; - for (const auto& collectionRange : expiredPreImages) { - writeConflictRetry( + ChangeStreamExpiredPreImageIterator expiredPreImages( opCtx.get(), - "ChangeStreamExpiredPreImagesRemover", - NamespaceString::kChangeStreamPreImagesNamespace.ns(), - [&] { - auto params = std::make_unique<DeleteStageParams>(); - params->isMulti = true; - - std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams; - if (isBatchedRemoval) { - batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>(); - } + &preImagesColl, + currentEarliestOplogEntryTs, + ::mongo::preImageRemoverInternal::getPreImageExpirationTime( + opCtx.get(), currentTimeForTimeBasedExpiration)); + + for (const auto& collectionRange : expiredPreImages) { + writeConflictRetry( + opCtx.get(), + "ChangeStreamExpiredPreImagesRemover", + NamespaceString::kChangeStreamPreImagesNamespace.ns(), + [&] { + auto params = std::make_unique<DeleteStageParams>(); + params->isMulti = true; + + std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams; + if (isBatchedRemoval) { + batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>(); + } - auto exec = InternalPlanner::deleteWithCollectionScan( - opCtx.get(), - &preImagesColl, - std::move(params), - PlanYieldPolicy::YieldPolicy::YIELD_AUTO, - InternalPlanner::Direction::FORWARD, - RecordIdBound(collectionRange.first), - RecordIdBound(collectionRange.second), - CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, - std::move(batchedDeleteParams)); - numberOfRemovals += exec->executeDelete(); - }); - } + auto exec = InternalPlanner::deleteWithCollectionScan( + opCtx.get(), + &preImagesColl, + std::move(params), + PlanYieldPolicy::YieldPolicy::YIELD_AUTO, + InternalPlanner::Direction::FORWARD, + RecordIdBound(collectionRange.first), + RecordIdBound(collectionRange.second), + CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords, + std::move(batchedDeleteParams)); + numberOfRemovals += exec->executeDelete(); + }); + } - if (numberOfRemovals > 0) { - LOGV2_DEBUG(5869104, - 3, - "Periodic expired pre-images removal job finished executing", - "numberOfRemovals"_attr = numberOfRemovals, - "jobDuration"_attr = (Date_t::now() - startTime).toString()); + if (numberOfRemovals > 0) { + LOGV2_DEBUG(5869104, + 3, + "Periodic expired pre-images removal job finished executing", + "numberOfRemovals"_attr = numberOfRemovals, + "jobDuration"_attr = (Date_t::now() - startTime).toString()); + } + } catch (const DBException& exception) { + if (opCtx && opCtx.get()->getKillStatus() != ErrorCodes::OK) { + LOGV2_DEBUG(5869105, + 3, + "Periodic expired pre-images removal job operation was killed", + "errorCode"_attr = opCtx.get()->getKillStatus()); + } else { + LOGV2_ERROR(5869106, + "Periodic expired pre-images removal job failed", + "reason"_attr = exception.reason()); + } } } void performExpiredChangeStreamPreImagesRemovalPass(Client* client) { - try { - Date_t currentTimeForTimeBasedExpiration = Date_t::now(); - - changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { - // Populate the current time for time based expiration of pre-images. - if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) { - const BSONType bsonType = currentTimeElem.type(); - tassert(5869300, - str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is " - "'date', but found: " - << bsonType, - bsonType == BSONType::Date); - - currentTimeForTimeBasedExpiration = currentTimeElem.Date(); - } - }); - deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration); - } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { - LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted"); - } catch (const DBException& exception) { - LOGV2_ERROR(5869106, - "Periodic expired pre-images removal job failed", - "reason"_attr = exception.reason()); - } + Date_t currentTimeForTimeBasedExpiration = Date_t::now(); + + changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { + // Populate the current time for time based expiration of pre-images. + if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) { + const BSONType bsonType = currentTimeElem.type(); + tassert(5869300, + str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is " + "'date', but found: " + << bsonType, + bsonType == BSONType::Date); + + currentTimeForTimeBasedExpiration = currentTimeElem.Date(); + } + }); + deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration); } } // namespace |