1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# frozen_string_literal: true
module Projects
class BuildArtifactsSizeRefresh < ApplicationRecord
include AfterCommitQueue
include BulkInsertSafe
STALE_WINDOW = 2.hours
# This delay is set to 10 minutes to accommodate any ongoing
# deletion that might have happened.
# The delete on the database may have been committed before
# the refresh completed its batching. If the resulting decrement is
# pushed into Redis after the refresh has ended, it would result in net negative value.
# The delay is needed to ensure this negative value is ignored.
FINALIZE_DELAY = 10.minutes
self.table_name = 'project_build_artifacts_size_refreshes'
COUNTER_ATTRIBUTE_NAME = :build_artifacts_size
belongs_to :project
validates :project, presence: true
# The refresh of the project statistics counter is performed in 4 stages:
# 1. created - The refresh is on the queue to be processed by Projects::RefreshBuildArtifactsSizeStatisticsWorker
# 2. running - The refresh is ongoing. The project statistics counter switches to the temporary refresh counter key.
# Counter increments are deduplicated.
# 3. pending - The refresh is pending to be picked up by Projects::RefreshBuildArtifactsSizeStatisticsWorker again.
# 4. finalizing - The refresh has finished summing existing job artifact size into the refresh counter key.
# The sum will need to be moved into the counter key.
STATES = {
created: 1,
running: 2,
pending: 3,
finalizing: 4
}.freeze
state_machine :state, initial: :created do
# created -> running <-> pending
state :created, value: STATES[:created]
state :running, value: STATES[:running]
state :pending, value: STATES[:pending]
state :finalizing, value: STATES[:finalizing]
event :process do
transition [:created, :pending, :running] => :running
end
event :requeue do
transition running: :pending
end
event :schedule_finalize do
transition running: :finalizing
end
before_transition created: :running do |refresh|
refresh.reset_project_statistics!
refresh.refresh_started_at = Time.zone.now
refresh.last_job_artifact_id_on_refresh_start = refresh.project.job_artifacts.last&.id
end
before_transition running: any do |refresh, transition|
refresh.updated_at = Time.zone.now
end
before_transition running: :pending do |refresh, transition|
refresh.last_job_artifact_id = transition.args.first
end
before_transition running: :finalizing do |refresh, transition|
refresh.schedule_finalize_worker
end
end
scope :stale, -> { with_state(:running).where('updated_at < ?', STALE_WINDOW.ago) }
scope :remaining, -> { with_state(:created, :pending).or(stale) }
scope :processing_queue, -> { remaining.order(state: :desc) }
after_destroy :schedule_namespace_aggregation_worker
def self.enqueue_refresh(projects)
now = Time.zone.now
records = Array(projects).map do |project|
new(project: project, state: STATES[:created], created_at: now, updated_at: now)
end
bulk_insert!(records, skip_duplicates: true)
end
def self.process_next_refresh!
next_refresh = nil
transaction do
next_refresh = processing_queue
.lock('FOR UPDATE SKIP LOCKED')
.take
next_refresh&.process!
end
next_refresh
end
def reset_project_statistics!
project.statistics.initiate_refresh!(COUNTER_ATTRIBUTE_NAME)
end
def next_batch(limit:)
project.job_artifacts.select(:id, :size)
.id_before(last_job_artifact_id_on_refresh_start)
.id_after(last_job_artifact_id.to_i)
.ordered_by_id
.limit(limit)
end
def started?
!created?
end
def finalize!
project.statistics.finalize_refresh(COUNTER_ATTRIBUTE_NAME)
destroy!
end
def schedule_finalize_worker
run_after_commit do
Projects::FinalizeProjectStatisticsRefreshWorker.perform_in(FINALIZE_DELAY, self.class.to_s, id)
end
end
private
def schedule_namespace_aggregation_worker
run_after_commit do
Namespaces::ScheduleAggregationWorker.perform_async(project.namespace_id)
end
end
end
end
|