summaryrefslogtreecommitdiff
path: root/app/services/ci/update_build_queue_service.rb
blob: 5a011a8cac629bc7fabedbfd636467686e15e040 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# frozen_string_literal: true

module Ci
  class UpdateBuildQueueService
    InvalidQueueTransition = Class.new(StandardError)

    attr_reader :metrics

    def initialize(metrics = ::Gitlab::Ci::Queue::Metrics)
      @metrics = metrics
    end

    ##
    # Add a build to the pending builds queue
    #
    def push(build, transition)
      return unless maintain_pending_builds_queue?

      raise InvalidQueueTransition unless transition.to == 'pending'

      transition.within_transaction do
        result = build.create_queuing_entry!

        unless result.empty?
          metrics.increment_queue_operation(:build_queue_push)

          result.rows.dig(0, 0)
        end
      end
    end

    ##
    # Remove a build from the pending builds queue
    #
    def pop(build, transition)
      return unless maintain_pending_builds_queue?

      raise InvalidQueueTransition unless transition.from == 'pending'

      transition.within_transaction do
        removed = build.all_queuing_entries.delete_all

        if removed > 0
          metrics.increment_queue_operation(:build_queue_pop)

          build.id
        end
      end
    end

    ##
    # Add shared runner build tracking entry (used for queuing).
    #
    def track(build, transition)
      return unless maintain_pending_builds_queue?
      return unless build.shared_runner_build?

      raise InvalidQueueTransition unless transition.to == 'running'

      transition.within_transaction do
        result = ::Ci::RunningBuild.upsert_shared_runner_build!(build)

        unless result.empty?
          metrics.increment_queue_operation(:shared_runner_build_new)

          result.rows.dig(0, 0)
        end
      end
    end

    ##
    # Remove a runtime build tracking entry for a shared runner build (used for
    # queuing).
    #
    def untrack(build, transition)
      return unless maintain_pending_builds_queue?
      return unless build.shared_runner_build?

      raise InvalidQueueTransition unless transition.from == 'running'

      transition.within_transaction do
        removed = build.all_runtime_metadata.delete_all

        if removed > 0
          metrics.increment_queue_operation(:shared_runner_build_done)

          build.id
        end
      end
    end

    ##
    # Unblock runner associated with given project / build
    #
    def tick(build)
      tick_for(build, build.project.all_available_runners)
    end

    private

    def tick_for(build, runners)
      runners = runners.with_recent_runner_queue
      runners = runners.with_tags

      metrics.observe_active_runners(-> { runners.to_a.size })

      runners.each do |runner|
        metrics.increment_runner_tick(runner)

        runner.pick_build!(build)
      end
    end

    def maintain_pending_builds_queue?
      ::Ci::PendingBuild.maintain_denormalized_data?
    end
  end
end