diff options
Diffstat (limited to 'app/workers/pipeline_schedule_worker.rb')
-rw-r--r-- | app/workers/pipeline_schedule_worker.rb | 43 |
1 files changed, 38 insertions, 5 deletions
diff --git a/app/workers/pipeline_schedule_worker.rb b/app/workers/pipeline_schedule_worker.rb index 5a53d53ccf9..fb843bd421c 100644 --- a/app/workers/pipeline_schedule_worker.rb +++ b/app/workers/pipeline_schedule_worker.rb @@ -6,19 +6,52 @@ class PipelineScheduleWorker # rubocop:disable Scalability/IdempotentWorker data_consistency :always include CronjobQueue + include ::Gitlab::ExclusiveLeaseHelpers + + LOCK_RETRY = 3 + LOCK_TTL = 5.minutes feature_category :continuous_integration worker_resource_boundary :cpu def perform - Ci::PipelineSchedule.runnable_schedules.preloaded.find_in_batches do |schedules| - schedules.each do |schedule| - next unless schedule.project + if Feature.enabled?(:ci_use_run_pipeline_schedule_worker) + in_lock(lock_key, **lock_params) do + Ci::PipelineSchedule + .select(:id, :owner_id, :project_id) # Minimize the selected columns + .runnable_schedules + .preloaded + .find_in_batches do |schedules| + RunPipelineScheduleWorker.bulk_perform_async_with_contexts( + schedules, + arguments_proc: ->(schedule) { [schedule.id, schedule.owner_id] }, + context_proc: ->(schedule) { { project: schedule.project, user: schedule.owner } } + ) + end + end + else + Ci::PipelineSchedule.runnable_schedules.preloaded.find_in_batches do |schedules| + schedules.each do |schedule| + next unless schedule.project - with_context(project: schedule.project, user: schedule.owner) do - Ci::PipelineScheduleService.new(schedule.project, schedule.owner).execute(schedule) + with_context(project: schedule.project, user: schedule.owner) do + Ci::PipelineScheduleService.new(schedule.project, schedule.owner).execute(schedule) + end end end end end + + private + + def lock_key + self.class.name.underscore + end + + def lock_params + { + ttl: LOCK_TTL, + retries: LOCK_RETRY + } + end end |