summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMindaugas Malinauskas <mindaugas.malinauskas@mongodb.com>2022-07-13 15:53:21 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-07-25 17:24:35 +0000
commit0e465a0d7adefc42b9399fcbde011d712785ad76 (patch)
treefb2bc3d7bf4c97f3545eb4e633d203004db94c20
parenta29bfd88f98f6acb0340a3ac500e95a5aee1d15e (diff)
downloadmongo-0e465a0d7adefc42b9399fcbde011d712785ad76.tar.gz
SERVER-67615 Stop using ErrorCategory::Interruption in the change stream pre-images purging job
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp172
1 files changed, 89 insertions, 83 deletions
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
index 5be57605e9d..81931be466c 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -325,100 +325,106 @@ private:
void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTimeBasedExpiration) {
const auto startTime = Date_t::now();
- auto opCtx = client->makeOperationContext();
-
- // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
- // doesn't exist.
- AutoGetCollection autoColl(
- opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX);
- const auto& preImagesColl = autoColl.getCollection();
- if (!preImagesColl) {
- return;
- }
-
- // Do not run the job on secondaries.
- if (!repl::ReplicationCoordinator::get(opCtx.get())
- ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) {
- return;
- }
+ ServiceContext::UniqueOperationContext opCtx;
+ try {
+ opCtx = client->makeOperationContext();
+
+ // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
+ // doesn't exist.
+ AutoGetCollection autoColl(
+ opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX);
+ const auto& preImagesColl = autoColl.getCollection();
+ if (!preImagesColl) {
+ return;
+ }
- // Get the timestamp of the ealiest oplog entry.
- const auto currentEarliestOplogEntryTs =
- repl::StorageInterface::get(client->getServiceContext())
- ->getEarliestOplogTimestamp(opCtx.get());
+ // Do not run the job on secondaries.
+ if (!repl::ReplicationCoordinator::get(opCtx.get())
+ ->canAcceptWritesForDatabase(opCtx.get(), NamespaceString::kAdminDb)) {
+ return;
+ }
- const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
- size_t numberOfRemovals = 0;
+ // Get the timestamp of the earliest oplog entry.
+ const auto currentEarliestOplogEntryTs =
+ repl::StorageInterface::get(client->getServiceContext())
+ ->getEarliestOplogTimestamp(opCtx.get());
- ChangeStreamExpiredPreImageIterator expiredPreImages(
- opCtx.get(),
- &preImagesColl,
- currentEarliestOplogEntryTs,
- ::mongo::preImageRemoverInternal::getPreImageExpirationTime(
- opCtx.get(), currentTimeForTimeBasedExpiration));
+ const bool isBatchedRemoval = gBatchedExpiredChangeStreamPreImageRemoval.load();
+ size_t numberOfRemovals = 0;
- for (const auto& collectionRange : expiredPreImages) {
- writeConflictRetry(
+ ChangeStreamExpiredPreImageIterator expiredPreImages(
opCtx.get(),
- "ChangeStreamExpiredPreImagesRemover",
- NamespaceString::kChangeStreamPreImagesNamespace.ns(),
- [&] {
- auto params = std::make_unique<DeleteStageParams>();
- params->isMulti = true;
-
- std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams;
- if (isBatchedRemoval) {
- batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>();
- }
+ &preImagesColl,
+ currentEarliestOplogEntryTs,
+ ::mongo::preImageRemoverInternal::getPreImageExpirationTime(
+ opCtx.get(), currentTimeForTimeBasedExpiration));
+
+ for (const auto& collectionRange : expiredPreImages) {
+ writeConflictRetry(
+ opCtx.get(),
+ "ChangeStreamExpiredPreImagesRemover",
+ NamespaceString::kChangeStreamPreImagesNamespace.ns(),
+ [&] {
+ auto params = std::make_unique<DeleteStageParams>();
+ params->isMulti = true;
+
+ std::unique_ptr<BatchedDeleteStageParams> batchedDeleteParams;
+ if (isBatchedRemoval) {
+ batchedDeleteParams = std::make_unique<BatchedDeleteStageParams>();
+ }
- auto exec = InternalPlanner::deleteWithCollectionScan(
- opCtx.get(),
- &preImagesColl,
- std::move(params),
- PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
- InternalPlanner::Direction::FORWARD,
- RecordIdBound(collectionRange.first),
- RecordIdBound(collectionRange.second),
- CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
- std::move(batchedDeleteParams));
- numberOfRemovals += exec->executeDelete();
- });
- }
+ auto exec = InternalPlanner::deleteWithCollectionScan(
+ opCtx.get(),
+ &preImagesColl,
+ std::move(params),
+ PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
+ InternalPlanner::Direction::FORWARD,
+ RecordIdBound(collectionRange.first),
+ RecordIdBound(collectionRange.second),
+ CollectionScanParams::ScanBoundInclusion::kIncludeBothStartAndEndRecords,
+ std::move(batchedDeleteParams));
+ numberOfRemovals += exec->executeDelete();
+ });
+ }
- if (numberOfRemovals > 0) {
- LOGV2_DEBUG(5869104,
- 3,
- "Periodic expired pre-images removal job finished executing",
- "numberOfRemovals"_attr = numberOfRemovals,
- "jobDuration"_attr = (Date_t::now() - startTime).toString());
+ if (numberOfRemovals > 0) {
+ LOGV2_DEBUG(5869104,
+ 3,
+ "Periodic expired pre-images removal job finished executing",
+ "numberOfRemovals"_attr = numberOfRemovals,
+ "jobDuration"_attr = (Date_t::now() - startTime).toString());
+ }
+ } catch (const DBException& exception) {
+ if (opCtx && opCtx.get()->getKillStatus() != ErrorCodes::OK) {
+ LOGV2_DEBUG(5869105,
+ 3,
+ "Periodic expired pre-images removal job operation was killed",
+ "errorCode"_attr = opCtx.get()->getKillStatus());
+ } else {
+ LOGV2_ERROR(5869106,
+ "Periodic expired pre-images removal job failed",
+ "reason"_attr = exception.reason());
+ }
}
}
void performExpiredChangeStreamPreImagesRemovalPass(Client* client) {
- try {
- Date_t currentTimeForTimeBasedExpiration = Date_t::now();
-
- changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
- // Populate the current time for time based expiration of pre-images.
- if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
- const BSONType bsonType = currentTimeElem.type();
- tassert(5869300,
- str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is "
- "'date', but found: "
- << bsonType,
- bsonType == BSONType::Date);
-
- currentTimeForTimeBasedExpiration = currentTimeElem.Date();
- }
- });
- deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
- } catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
- LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted");
- } catch (const DBException& exception) {
- LOGV2_ERROR(5869106,
- "Periodic expired pre-images removal job failed",
- "reason"_attr = exception.reason());
- }
+ Date_t currentTimeForTimeBasedExpiration = Date_t::now();
+
+ changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
+ // Populate the current time for time based expiration of pre-images.
+ if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
+ const BSONType bsonType = currentTimeElem.type();
+ tassert(5869300,
+ str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is "
+ "'date', but found: "
+ << bsonType,
+ bsonType == BSONType::Date);
+
+ currentTimeForTimeBasedExpiration = currentTimeElem.Date();
+ }
+ });
+ deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
}
} // namespace