diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2022-08-19 15:11:58 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2022-08-19 15:11:58 +0000 |
commit | 4c083c816333ef903fe7c32f412eaa53d7b959d3 (patch) | |
tree | 199c0a0034a2620374a92a47762bf4a4c07be7ca /workhorse | |
parent | 35d5ae4e3de6444c02725b965ef59774d6256d8e (diff) | |
download | gitlab-ce-4c083c816333ef903fe7c32f412eaa53d7b959d3.tar.gz |
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'workhorse')
-rw-r--r-- | workhorse/internal/redis/keywatcher.go | 67 |
1 files changed, 48 insertions, 19 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go index 82cb082f5f0..20e86daf5af 100644 --- a/workhorse/internal/redis/keywatcher.go +++ b/workhorse/internal/redis/keywatcher.go @@ -38,6 +38,19 @@ var ( Help: "How many messages gitlab-workhorse has received in total on pubsub.", }, ) + totalActions = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_keywatcher_actions_total", + Help: "Counts of various keywatcher actions", + }, + []string{"action"}, + ) + receivedBytes = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "gitlab_workhorse_keywatcher_received_bytes_total", + Help: "How many bytes of messages gitlab-workhorse has received in total on pubsub.", + }, + ) ) const ( @@ -50,6 +63,8 @@ type KeyChan struct { Chan chan string } +func countAction(action string) { totalActions.WithLabelValues(action).Add(1) } + func processInner(conn redis.Conn) error { defer conn.Close() psc := redis.PubSubConn{Conn: conn} @@ -63,13 +78,13 @@ func processInner(conn redis.Conn) error { case redis.Message: totalMessages.Inc() dataStr := string(v.Data) + receivedBytes.Add(float64(len(dataStr))) msg := strings.SplitN(dataStr, "=", 2) if len(msg) != 2 { log.WithError(fmt.Errorf("keywatcher: invalid notification: %q", dataStr)).Error() continue } - key, value := msg[0], msg[1] - notifyChanWatchers(key, value) + notifyChanWatchers(msg[0], msg[1]) case error: log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", v)).Error() // Intermittent error, return nil so that it doesn't wait before reconnect @@ -131,37 +146,52 @@ func Shutdown() { func notifyChanWatchers(key, value string) { keyWatcherMutex.Lock() defer keyWatcherMutex.Unlock() - if chanList, ok := keyWatcher[key]; ok { - for _, c := range chanList { - c <- value - keyWatchers.Dec() - } - delete(keyWatcher, key) + + chanList, ok := keyWatcher[key] + if !ok { + countAction("drop-message") + return + } + + countAction("deliver-message") + for _, c := range chanList { + c <- value + keyWatchers.Dec() } + delete(keyWatcher, key) } func addKeyChan(kc *KeyChan) { keyWatcherMutex.Lock() defer keyWatcherMutex.Unlock() + keyWatcher[kc.Key] = append(keyWatcher[kc.Key], kc.Chan) keyWatchers.Inc() + if len(keyWatcher[kc.Key]) == 1 { + countAction("create-subscription") + } } func delKeyChan(kc *KeyChan) { keyWatcherMutex.Lock() defer keyWatcherMutex.Unlock() - if chans, ok := keyWatcher[kc.Key]; ok { - for i, c := range chans { - if kc.Chan == c { - keyWatcher[kc.Key] = append(chans[:i], chans[i+1:]...) - keyWatchers.Dec() - break - } - } - if len(keyWatcher[kc.Key]) == 0 { - delete(keyWatcher, kc.Key) + + chans, ok := keyWatcher[kc.Key] + if !ok { + return + } + + for i, c := range chans { + if kc.Chan == c { + keyWatcher[kc.Key] = append(chans[:i], chans[i+1:]...) + keyWatchers.Dec() + break } } + if len(keyWatcher[kc.Key]) == 0 { + delete(keyWatcher, kc.Key) + countAction("delete-subscription") + } } // WatchKeyStatus is used to tell how WatchKey returned @@ -211,7 +241,6 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) return WatchKeyStatusNoChange, nil } return WatchKeyStatusSeenChange, nil - case <-time.After(timeout): return WatchKeyStatusTimeout, nil } |