summaryrefslogtreecommitdiff
path: root/app/models/projects/build_artifacts_size_refresh.rb
blob: b791cb1254c3c58c23aa86b83ecd9f37ad5b78f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# frozen_string_literal: true

module Projects
  class BuildArtifactsSizeRefresh < ApplicationRecord
    include AfterCommitQueue
    include BulkInsertSafe

    STALE_WINDOW = 2.hours

    # This delay is set to 10 minutes to accommodate any ongoing
    # deletion that might have happened.
    # The delete on the database may have been committed before
    # the refresh completed its batching. If the resulting decrement is
    # pushed into Redis after the refresh has ended, it would result in net negative value.
    # The delay is needed to ensure this negative value is ignored.
    FINALIZE_DELAY = 10.minutes

    self.table_name = 'project_build_artifacts_size_refreshes'

    COUNTER_ATTRIBUTE_NAME = :build_artifacts_size

    belongs_to :project

    validates :project, presence: true

    # The refresh of the project statistics counter is performed in 4 stages:
    # 1. created - The refresh is on the queue to be processed by Projects::RefreshBuildArtifactsSizeStatisticsWorker
    # 2. running - The refresh is ongoing. The project statistics counter switches to the temporary refresh counter key.
    #    Counter increments are deduplicated.
    # 3. pending - The refresh is pending to be picked up by Projects::RefreshBuildArtifactsSizeStatisticsWorker again.
    # 4. finalizing - The refresh has finished summing existing job artifact size into the refresh counter key.
    #    The sum will need to be moved into the counter key.
    STATES = {
      created: 1,
      running: 2,
      pending: 3,
      finalizing: 4
    }.freeze

    state_machine :state, initial: :created do
      # created -> running <-> pending
      state :created, value: STATES[:created]
      state :running, value: STATES[:running]
      state :pending, value: STATES[:pending]
      state :finalizing, value: STATES[:finalizing]

      event :process do
        transition [:created, :pending, :running] => :running
      end

      event :requeue do
        transition running: :pending
      end

      event :schedule_finalize do
        transition running: :finalizing
      end

      before_transition created: :running do |refresh|
        refresh.reset_project_statistics!
        refresh.refresh_started_at = Time.zone.now
        refresh.last_job_artifact_id_on_refresh_start = refresh.project.job_artifacts.last&.id
      end

      before_transition running: any do |refresh, transition|
        refresh.updated_at = Time.zone.now
      end

      before_transition running: :pending do |refresh, transition|
        refresh.last_job_artifact_id = transition.args.first
      end

      before_transition running: :finalizing do |refresh, transition|
        refresh.schedule_finalize_worker
      end
    end

    scope :stale, -> { with_state(:running).where('updated_at < ?', STALE_WINDOW.ago) }
    scope :remaining, -> { with_state(:created, :pending).or(stale) }
    scope :processing_queue, -> { remaining.order(state: :desc) }

    after_destroy :schedule_namespace_aggregation_worker

    def self.enqueue_refresh(projects)
      now = Time.zone.now

      records = Array(projects).map do |project|
        new(project: project, state: STATES[:created], created_at: now, updated_at: now)
      end

      bulk_insert!(records, skip_duplicates: true)
    end

    def self.process_next_refresh!
      next_refresh = nil

      transaction do
        next_refresh = processing_queue
          .lock('FOR UPDATE SKIP LOCKED')
          .take

        next_refresh&.process!
      end

      next_refresh
    end

    def reset_project_statistics!
      project.statistics.initiate_refresh!(COUNTER_ATTRIBUTE_NAME)
    end

    def next_batch(limit:)
      project.job_artifacts.select(:id, :size)
        .id_before(last_job_artifact_id_on_refresh_start)
        .id_after(last_job_artifact_id.to_i)
        .ordered_by_id
        .limit(limit)
    end

    def started?
      !created?
    end

    def finalize!
      project.statistics.finalize_refresh(COUNTER_ATTRIBUTE_NAME)

      destroy!
    end

    def schedule_finalize_worker
      run_after_commit do
        Projects::FinalizeProjectStatisticsRefreshWorker.perform_in(FINALIZE_DELAY, self.class.to_s, id)
      end
    end

    private

    def schedule_namespace_aggregation_worker
      run_after_commit do
        Namespaces::ScheduleAggregationWorker.perform_async(project.namespace_id)
      end
    end
  end
end