summaryrefslogtreecommitdiff
path: root/workhorse/internal/redis/keywatcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/redis/keywatcher.go')
-rw-r--r--workhorse/internal/redis/keywatcher.go230
1 files changed, 149 insertions, 81 deletions
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index 82cb082f5f0..cdf6ccd7e83 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -15,61 +15,99 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
)
-var (
- keyWatcher = make(map[string][]chan string)
- keyWatcherMutex sync.Mutex
- shutdown = make(chan struct{})
- redisReconnectTimeout = backoff.Backoff{
- //These are the defaults
- Min: 100 * time.Millisecond,
- Max: 60 * time.Second,
- Factor: 2,
- Jitter: true,
+type KeyWatcher struct {
+ mu sync.Mutex
+ subscribers map[string][]chan string
+ shutdown chan struct{}
+ reconnectBackoff backoff.Backoff
+ conn *redis.PubSubConn
+}
+
+func NewKeyWatcher() *KeyWatcher {
+ return &KeyWatcher{
+ shutdown: make(chan struct{}),
+ reconnectBackoff: backoff.Backoff{
+ Min: 100 * time.Millisecond,
+ Max: 60 * time.Second,
+ Factor: 2,
+ Jitter: true,
+ },
}
+}
+
+var (
keyWatchers = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_keywatcher_keywatchers",
Help: "The number of keys that is being watched by gitlab-workhorse",
},
)
+ redisSubscriptions = promauto.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_keywatcher_redis_subscriptions",
+ Help: "Current number of keywatcher Redis pubsub subscriptions",
+ },
+ )
totalMessages = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_keywatcher_total_messages",
Help: "How many messages gitlab-workhorse has received in total on pubsub.",
},
)
+ totalActions = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_keywatcher_actions_total",
+ Help: "Counts of various keywatcher actions",
+ },
+ []string{"action"},
+ )
+ receivedBytes = promauto.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_keywatcher_received_bytes_total",
+ Help: "How many bytes of messages gitlab-workhorse has received in total on pubsub.",
+ },
+ )
)
-const (
- keySubChannel = "workhorse:notifications"
-)
+const channelPrefix = "workhorse:notifications:"
-// KeyChan holds a key and a channel
-type KeyChan struct {
- Key string
- Chan chan string
-}
+func countAction(action string) { totalActions.WithLabelValues(action).Add(1) }
-func processInner(conn redis.Conn) error {
- defer conn.Close()
- psc := redis.PubSubConn{Conn: conn}
- if err := psc.Subscribe(keySubChannel); err != nil {
- return err
- }
- defer psc.Unsubscribe(keySubChannel)
+func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error {
+ kw.mu.Lock()
+ // We must share kw.conn with the goroutines that call SUBSCRIBE and
+ // UNSUBSCRIBE because Redis pubsub subscriptions are tied to the
+ // connection.
+ kw.conn = &redis.PubSubConn{Conn: conn}
+ kw.mu.Unlock()
+
+ defer func() {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ kw.conn.Close()
+ kw.conn = nil
+
+ // Reset kw.subscribers because it is tied to Redis server side state of
+ // kw.conn and we just closed that connection.
+ for _, chans := range kw.subscribers {
+ for _, ch := range chans {
+ close(ch)
+ keyWatchers.Dec()
+ }
+ }
+ kw.subscribers = nil
+ }()
for {
- switch v := psc.Receive().(type) {
+ switch v := kw.conn.Receive().(type) {
case redis.Message:
totalMessages.Inc()
- dataStr := string(v.Data)
- msg := strings.SplitN(dataStr, "=", 2)
- if len(msg) != 2 {
- log.WithError(fmt.Errorf("keywatcher: invalid notification: %q", dataStr)).Error()
- continue
+ receivedBytes.Add(float64(len(v.Data)))
+ if strings.HasPrefix(v.Channel, channelPrefix) {
+ kw.notifySubscribers(v.Channel[len(channelPrefix):], string(v.Data))
}
- key, value := msg[0], msg[1]
- notifyChanWatchers(key, value)
+ case redis.Subscription:
+ redisSubscriptions.Set(float64(v.Count))
case error:
log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", v)).Error()
// Intermittent error, return nil so that it doesn't wait before reconnect
@@ -94,72 +132,106 @@ func dialPubSub(dialer redisDialerFunc) (redis.Conn, error) {
return conn, nil
}
-// Process redis subscriptions
-//
-// NOTE: There Can Only Be One!
-func Process() {
+func (kw *KeyWatcher) Process() {
log.Info("keywatcher: starting process loop")
for {
conn, err := dialPubSub(workerDialFunc)
if err != nil {
log.WithError(fmt.Errorf("keywatcher: %v", err)).Error()
- time.Sleep(redisReconnectTimeout.Duration())
+ time.Sleep(kw.reconnectBackoff.Duration())
continue
}
- redisReconnectTimeout.Reset()
+ kw.reconnectBackoff.Reset()
- if err = processInner(conn); err != nil {
- log.WithError(fmt.Errorf("keywatcher: process loop: %v", err)).Error()
+ if err = kw.receivePubSubStream(conn); err != nil {
+ log.WithError(fmt.Errorf("keywatcher: receivePubSubStream: %v", err)).Error()
}
}
}
-func Shutdown() {
+func (kw *KeyWatcher) Shutdown() {
log.Info("keywatcher: shutting down")
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
select {
- case <-shutdown:
+ case <-kw.shutdown:
// already closed
default:
- close(shutdown)
+ close(kw.shutdown)
}
}
-func notifyChanWatchers(key, value string) {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- if chanList, ok := keyWatcher[key]; ok {
- for _, c := range chanList {
- c <- value
- keyWatchers.Dec()
+func (kw *KeyWatcher) notifySubscribers(key, value string) {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ chanList, ok := kw.subscribers[key]
+ if !ok {
+ countAction("drop-message")
+ return
+ }
+
+ countAction("deliver-message")
+ for _, c := range chanList {
+ select {
+ case c <- value:
+ default:
}
- delete(keyWatcher, key)
}
}
-func addKeyChan(kc *KeyChan) {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- keyWatcher[kc.Key] = append(keyWatcher[kc.Key], kc.Chan)
+func (kw *KeyWatcher) addSubscription(key string, notify chan string) error {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ if kw.conn == nil {
+ // This can happen because CI long polling is disabled in this Workhorse
+ // process. It can also be that we are waiting for the pubsub connection
+ // to be established. Either way it is OK to fail fast.
+ return errors.New("no redis connection")
+ }
+
+ if len(kw.subscribers[key]) == 0 {
+ countAction("create-subscription")
+ if err := kw.conn.Subscribe(channelPrefix + key); err != nil {
+ return err
+ }
+ }
+
+ if kw.subscribers == nil {
+ kw.subscribers = make(map[string][]chan string)
+ }
+ kw.subscribers[key] = append(kw.subscribers[key], notify)
keyWatchers.Inc()
+
+ return nil
}
-func delKeyChan(kc *KeyChan) {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- if chans, ok := keyWatcher[kc.Key]; ok {
- for i, c := range chans {
- if kc.Chan == c {
- keyWatcher[kc.Key] = append(chans[:i], chans[i+1:]...)
- keyWatchers.Dec()
- break
- }
+func (kw *KeyWatcher) delSubscription(key string, notify chan string) {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ chans, ok := kw.subscribers[key]
+ if !ok {
+ // This can happen if the pubsub connection dropped while we were
+ // waiting.
+ return
+ }
+
+ for i, c := range chans {
+ if notify == c {
+ kw.subscribers[key] = append(chans[:i], chans[i+1:]...)
+ keyWatchers.Dec()
+ break
}
- if len(keyWatcher[kc.Key]) == 0 {
- delete(keyWatcher, kc.Key)
+ }
+ if len(kw.subscribers[key]) == 0 {
+ delete(kw.subscribers, key)
+ countAction("delete-subscription")
+ if kw.conn != nil {
+ kw.conn.Unsubscribe(channelPrefix + key)
}
}
}
@@ -179,15 +251,12 @@ const (
WatchKeyStatusNoChange
)
-// WatchKey waits for a key to be updated or expired
-func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) {
- kw := &KeyChan{
- Key: key,
- Chan: make(chan string, 1),
+func (kw *KeyWatcher) WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) {
+ notify := make(chan string, 1)
+ if err := kw.addSubscription(key, notify); err != nil {
+ return WatchKeyStatusNoChange, err
}
-
- addKeyChan(kw)
- defer delKeyChan(kw)
+ defer kw.delSubscription(key, notify)
currentValue, err := GetString(key)
if errors.Is(err, redis.ErrNil) {
@@ -200,10 +269,10 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
}
select {
- case <-shutdown:
+ case <-kw.shutdown:
log.WithFields(log.Fields{"key": key}).Info("stopping watch due to shutdown")
return WatchKeyStatusNoChange, nil
- case currentValue := <-kw.Chan:
+ case currentValue := <-notify:
if currentValue == "" {
return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET failed")
}
@@ -211,7 +280,6 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
return WatchKeyStatusNoChange, nil
}
return WatchKeyStatusSeenChange, nil
-
case <-time.After(timeout):
return WatchKeyStatusTimeout, nil
}