summaryrefslogtreecommitdiff
path: root/workhorse/internal/queueing/queue.go
blob: c8d322803551439a7c6b21014e2f64f0ed4dd892 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package queueing

import (
	"errors"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
)

type errTooManyRequests struct{ error }
type errQueueingTimedout struct{ error }

var ErrTooManyRequests = &errTooManyRequests{errors.New("too many requests queued")}
var ErrQueueingTimedout = &errQueueingTimedout{errors.New("queueing timedout")}

type queueMetrics struct {
	queueingLimit        prometheus.Gauge
	queueingQueueLimit   prometheus.Gauge
	queueingQueueTimeout prometheus.Gauge
	queueingBusy         prometheus.Gauge
	queueingWaiting      prometheus.Gauge
	queueingWaitingTime  prometheus.Histogram
	queueingErrors       *prometheus.CounterVec
}

// newQueueMetrics prepares Prometheus metrics for queueing mechanism
// name specifies name of the queue, used to label metrics with ConstLabel `queue_name`
// timeout specifies the timeout of storing a request in queue - queueMetrics
//         uses it to calculate histogram buckets for gitlab_workhorse_queueing_waiting_time
//         metric
func newQueueMetrics(name string, timeout time.Duration, reg prometheus.Registerer) *queueMetrics {
	waitingTimeBuckets := []float64{
		timeout.Seconds() * 0.01,
		timeout.Seconds() * 0.05,
		timeout.Seconds() * 0.10,
		timeout.Seconds() * 0.25,
		timeout.Seconds() * 0.50,
		timeout.Seconds() * 0.75,
		timeout.Seconds() * 0.90,
		timeout.Seconds() * 0.95,
		timeout.Seconds() * 0.99,
		timeout.Seconds(),
	}

	promFactory := promauto.With(reg)
	metrics := &queueMetrics{
		queueingLimit: promFactory.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_limit",
			Help: "Current limit set for the queueing mechanism",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingQueueLimit: promFactory.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_queue_limit",
			Help: "Current queueLimit set for the queueing mechanism",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingQueueTimeout: promFactory.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_queue_timeout",
			Help: "Current queueTimeout set for the queueing mechanism",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingBusy: promFactory.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_busy",
			Help: "How many queued requests are now processed",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingWaiting: promFactory.NewGauge(prometheus.GaugeOpts{
			Name: "gitlab_workhorse_queueing_waiting",
			Help: "How many requests are now queued",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
		}),

		queueingWaitingTime: promFactory.NewHistogram(prometheus.HistogramOpts{
			Name: "gitlab_workhorse_queueing_waiting_time",
			Help: "How many time a request spent in queue",
			ConstLabels: prometheus.Labels{
				"queue_name": name,
			},
			Buckets: waitingTimeBuckets,
		}),

		queueingErrors: promFactory.NewCounterVec(
			prometheus.CounterOpts{
				Name: "gitlab_workhorse_queueing_errors",
				Help: "How many times the TooManyRequests or QueueintTimedout errors were returned while queueing, partitioned by error type",
				ConstLabels: prometheus.Labels{
					"queue_name": name,
				},
			},
			[]string{"type"},
		),
	}

	return metrics
}

type Queue struct {
	*queueMetrics

	name      string
	busyCh    chan struct{}
	waitingCh chan time.Time
	timeout   time.Duration
}

// newQueue creates a new queue
// name specifies name used to label queue metrics.
// limit specifies number of requests run concurrently
// queueLimit specifies maximum number of requests that can be queued
// timeout specifies the time limit of storing the request in the queue
// if the number of requests is above the limit
func newQueue(name string, limit, queueLimit uint, timeout time.Duration, reg prometheus.Registerer) *Queue {
	queue := &Queue{
		name:      name,
		busyCh:    make(chan struct{}, limit),
		waitingCh: make(chan time.Time, limit+queueLimit),
		timeout:   timeout,
	}

	queue.queueMetrics = newQueueMetrics(name, timeout, reg)
	queue.queueingLimit.Set(float64(limit))
	queue.queueingQueueLimit.Set(float64(queueLimit))
	queue.queueingQueueTimeout.Set(timeout.Seconds())

	return queue
}

// Acquire takes one slot from the Queue
// and returns when a request should be processed
// it allows up to (limit) of requests running at a time
// it allows to queue up to (queue-limit) requests
func (s *Queue) Acquire() (err error) {
	// push item to a queue to claim your own slot (non-blocking)
	select {
	case s.waitingCh <- time.Now():
		s.queueingWaiting.Inc()
		break
	default:
		s.queueingErrors.WithLabelValues("too_many_requests").Inc()
		return ErrTooManyRequests
	}

	defer func() {
		if err != nil {
			waitStarted := <-s.waitingCh
			s.queueingWaiting.Dec()
			s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))
		}
	}()

	// fast path: push item to current processed items (non-blocking)
	select {
	case s.busyCh <- struct{}{}:
		s.queueingBusy.Inc()
		return nil
	default:
		break
	}

	timer := time.NewTimer(s.timeout)
	defer timer.Stop()

	// push item to current processed items (blocking)
	select {
	case s.busyCh <- struct{}{}:
		s.queueingBusy.Inc()
		return nil

	case <-timer.C:
		s.queueingErrors.WithLabelValues("queueing_timedout").Inc()
		return ErrQueueingTimedout
	}
}

// Release marks the finish of processing of requests
// It triggers next request to be processed if it's in queue
func (s *Queue) Release() {
	// dequeue from queue to allow next request to be processed
	waitStarted := <-s.waitingCh
	s.queueingWaiting.Dec()
	s.queueingWaitingTime.Observe(float64(time.Since(waitStarted).Seconds()))

	<-s.busyCh
	s.queueingBusy.Dec()
}