summaryrefslogtreecommitdiff
path: root/zuul/driver/git/gitwatcher.py
blob: 07161247df7c934fbb3ce9f6c44412b669c3a840 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Copyright 2011 OpenStack, LLC.
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import os
import logging
import threading

import git
from opentelemetry import trace

from zuul.driver.git.gitmodel import EMPTY_GIT_REF
from zuul.zk.event_queues import EventReceiverElection


# This class may be used by any driver to implement git head polling.
class GitWatcher(threading.Thread):
    log = logging.getLogger("zuul.connection.git.watcher")
    tracer = trace.get_tracer("zuul")

    def __init__(self, connection, baseurl, poll_delay, callback,
                 election_name="watcher"):
        """Watch for branch changes

        Watch every project listed in the connection and call a
        callback method with information about branch changes.

        :param zuul.Connection connection:
           The Connection to watch.
        :param str baseurl:
           The HTTP(S) URL where git repos are hosted.
        :param int poll_delay:
           The interval between polls.
        :param function callback:
           A callback method to be called for each updated ref.  The sole
           argument is a dictionary describing the update.
        :param str election_name:
           Name to use in the Zookeeper election of the watcher.
        """
        threading.Thread.__init__(self)
        self.daemon = True
        self.connection = connection
        self.baseurl = baseurl
        self.poll_delay = poll_delay
        self._stopped = False
        self._stop_event = threading.Event()
        self.projects_refs = {}
        self.callback = callback
        self.watcher_election = EventReceiverElection(
            connection.sched.zk_client,
            connection.connection_name,
            election_name)
        # This is used by the test framework
        self._event_count = 0
        self._pause = False

    def lsRemote(self, project):
        refs = {}
        client = git.cmd.Git()
        output = client.ls_remote(
            "--heads", "--tags",
            os.path.join(self.baseurl, project))
        for line in output.splitlines():
            sha, ref = line.split('\t')
            if ref.startswith('refs/'):
                refs[ref] = sha
        return refs

    def compareRefs(self, project, refs):
        events = []
        # Fetch previous refs state
        base_refs = self.projects_refs.get(project)
        # Create list of created refs
        rcreateds = set(refs.keys()) - set(base_refs.keys())
        # Create list of deleted refs
        rdeleteds = set(base_refs.keys()) - set(refs.keys())
        # Create the list of updated refs
        updateds = {}
        for ref, sha in refs.items():
            if ref in base_refs and base_refs[ref] != sha:
                updateds[ref] = sha
        for ref in rcreateds:
            event = {
                'project': project,
                'ref': ref,
                'branch_created': True,
                'oldrev': EMPTY_GIT_REF,
                'newrev': refs[ref]
            }
            events.append(event)
        for ref in rdeleteds:
            event = {
                'project': project,
                'ref': ref,
                'branch_deleted': True,
                'oldrev': base_refs[ref],
                'newrev': EMPTY_GIT_REF
            }
            events.append(event)
        for ref, sha in updateds.items():
            event = {
                'project': project,
                'ref': ref,
                'branch_updated': True,
                'oldrev': base_refs[ref],
                'newrev': sha
            }
            events.append(event)
        return events

    def _poll(self):
        self.log.debug("Walk through projects refs for connection: %s" %
                       self.connection.connection_name)
        for project in list(self.connection.projects)[:]:
            refs = self.lsRemote(project)
            self.log.debug("Read %s refs for project %s",
                           len(refs), project)
            if not self.projects_refs.get(project):
                # State for this project does not exist yet so add it.
                # No event will be triggered in this loop as
                # projects_refs['project'] and refs are equal
                self.projects_refs[project] = refs
            events = self.compareRefs(project, refs)
            self.projects_refs[project] = refs
            # Send events to the scheduler
            for event in events:
                with self.tracer.start_as_current_span("GitEvent"):
                    self.log.debug("Sending event: %s" % event)
                    self.callback(event)
                self._event_count += 1

    def _run(self):
        while not self._stopped and self.watcher_election.is_still_valid():
            if not self._pause:
                # during tests, a sub-class _poll method is used to send
                # notifications
                self._poll()
                # Polling wait delay
            else:
                self.log.debug("Watcher is on pause")
            self._stop_event.wait(self.poll_delay)

    def run(self):
        while not self._stopped:
            try:
                self.watcher_election.run(self._run)
            except Exception:
                self.log.exception("Unexpected issue in _run loop:")

    def stop(self):
        self.log.debug("Stopping watcher")
        self._stopped = True
        self._stop_event.set()
        self.watcher_election.cancel()