diff options
author | James E. Blair <jeblair@redhat.com> | 2019-09-06 14:50:05 -0700 |
---|---|---|
committer | James E. Blair <jeblair@redhat.com> | 2019-09-17 14:15:18 -0700 |
commit | e78e948284392477d385d493fc9ec194d544483f (patch) | |
tree | facf580009af1a9587cae68b570120736588e603 /zuul | |
parent | 48aa3ebd98c60b10f2a1c6c02311203564fa6b04 (diff) | |
download | zuul-e78e948284392477d385d493fc9ec194d544483f.tar.gz |
Add support for the Gerrit checks plugin
This adds initial support for the Gerrit checks plugin.
Development of that plugin is still in progress, and hopefully it
(and our support for it) will change over time. Because we expect
to change how we interact with it in the near future, this is
documented as experimental support for now. A release note is
intentionally omitted -- that's more appropriate when we remove
the 'experimental' label.
Change-Id: Ida0cdef682ca2ce117617eacfb67f371426a3131
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/driver/gerrit/__init__.py | 19 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritconnection.py | 213 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritmodel.py | 18 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritreporter.py | 2 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerrittrigger.py | 7 |
5 files changed, 237 insertions, 22 deletions
diff --git a/zuul/driver/gerrit/__init__.py b/zuul/driver/gerrit/__init__.py index c720ba6a3..4930c7631 100644 --- a/zuul/driver/gerrit/__init__.py +++ b/zuul/driver/gerrit/__init__.py @@ -18,12 +18,31 @@ from zuul.driver.gerrit import gerritconnection from zuul.driver.gerrit import gerrittrigger from zuul.driver.gerrit import gerritsource from zuul.driver.gerrit import gerritreporter +from zuul.driver.util import to_list class GerritDriver(Driver, ConnectionInterface, TriggerInterface, SourceInterface, ReporterInterface): name = 'gerrit' + def reconfigure(self, tenant): + connection_checker_map = {} + for pipeline in tenant.layout.pipelines.values(): + for trigger in pipeline.triggers: + if isinstance(trigger, gerrittrigger.GerritTrigger): + con = trigger.connection + checkers = connection_checker_map.setdefault(con, []) + for trigger_item in to_list(trigger.config): + if trigger_item['event'] == 'pending-check': + d = {} + if 'uuid' in trigger_item: + d['uuid'] = trigger_item['uuid'] + elif 'scheme' in trigger_item: + d['scheme'] = trigger_item['scheme'] + checkers.append(d) + for (con, checkers) in connection_checker_map.items(): + con.setWatchedCheckers(checkers) + def getConnection(self, name, config): return gerritconnection.GerritConnection(self, name, config) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 1c4173005..8a10a5e73 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import json import re import re2 @@ -80,6 +81,7 @@ class GerritEventConnector(threading.Thread): log = get_annotated_logger(self.log, event) event.type = data.get('type') + event.uuid = data.get('uuid') # This catches when a change is merged, as it could potentially # have merged layout info which will need to be read in. # Ideally this would be done with a refupdate event so as to catch @@ -131,6 +133,7 @@ class GerritEventConnector(threading.Thread): 'ref-replication-scheduled': None, 'topic-changed': 'changer', 'project-created': None, # Gerrit 2.14 + 'pending-check': None, # Gerrit 3.0+ } event.account = None if event.type in accountfield_from_type: @@ -294,6 +297,58 @@ class GerritWatcher(threading.Thread): self._stopped = True +class GerritPoller(threading.Thread): + # Poll gerrit without stream-events + log = logging.getLogger("gerrit.GerritPoller") + poll_interval = 30 + + def __init__(self, connection): + threading.Thread.__init__(self) + self.connection = connection + self._stopped = False + self._stop_event = threading.Event() + + def _makeEvent(self, change, uuid, check): + return {'type': 'pending-check', + 'uuid': uuid, + 'change': { + 'project': change['patch_set']['repository'], + 'number': change['patch_set']['change_number'], + }, + 'patchSet': { + 'number': change['patch_set']['patch_set_id'], + }} + + def _run(self): + try: + for checker in self.connection.watched_checkers: + changes = self.connection.get( + 'plugins/checks/checks.pending/?' + 'query=checker:%s+(state:NOT_STARTED)' % checker) + for change in changes: + for uuid, check in change['pending_checks'].items(): + event = self._makeEvent(change, uuid, check) + self.connection.addEvent(event) + except Exception: + self.log.exception("Exception on Gerrit poll with %s:", + self.connection.connection_name) + + def run(self): + last_start = time.time() + while not self._stopped: + next_start = last_start + self.poll_interval + self._stop_event.wait(max(next_start - time.time(), 0)) + if self._stopped: + break + last_start = time.time() + self._run() + + def stop(self): + self.log.debug("Stopping watcher") + self._stopped = True + self._stop_event.set() + + class GerritConnection(BaseConnection): driver_name = 'gerrit' log = logging.getLogger("zuul.GerritConnection") @@ -305,6 +360,7 @@ class GerritConnection(BaseConnection): r"@{|\.\.|\.$|^@$|/$|^/|//+") # everything else we can check with re2 replication_timeout = 300 replication_retry_interval = 5 + _poller_class = GerritPoller def __init__(self, driver, connection_name, connection_config): super(GerritConnection, self).__init__(driver, connection_name, @@ -325,8 +381,11 @@ class GerritConnection(BaseConnection): self.keyfile = self.connection_config.get('sshkey', None) self.keepalive = int(self.connection_config.get('keepalive', 60)) self.watcher_thread = None + self.poller_thread = None self.event_queue = queue.Queue() self.client = None + self.watched_checkers = [] + self.project_checker_map = {} self.baseurl = self.connection_config.get( 'baseurl', 'https://%s' % self.server).rstrip('/') @@ -362,6 +421,42 @@ class GerritConnection(BaseConnection): self.auth = authclass( self.user, self.password) + def setWatchedCheckers(self, checkers_to_watch): + self.log.debug("Setting watched checkers to %s", checkers_to_watch) + self.watched_checkers = set() + self.project_checker_map = {} + schemes_to_watch = set() + uuids_to_watch = set() + for x in checkers_to_watch: + if 'scheme' in x: + schemes_to_watch.add(x['scheme']) + if 'uuid' in x: + uuids_to_watch.add(x['uuid']) + if schemes_to_watch: + # get a list of all configured checkers + try: + configured_checkers = self.get('plugins/checks/checkers/') + except Exception: + self.log.exception("Unable to get checkers") + configured_checkers = [] + + # filter it through scheme matches in checkers_to_watch + for checker in configured_checkers: + if checker['status'] != 'ENABLED': + continue + checker_scheme, checker_id = checker['uuid'].split(':') + repo = checker['repository'] + repo = self.canonical_hostname + '/' + repo + # map scheme matches to project names + if checker_scheme in schemes_to_watch: + repo_checkers = self.project_checker_map.setdefault( + repo, set()) + repo_checkers.add(checker['uuid']) + self.watched_checkers.add(checker['uuid']) + # add uuids from checkers_to_watch + for x in uuids_to_watch: + self.watched_checkers.add(x) + def toDict(self): d = super().toDict() d.update({ @@ -375,6 +470,28 @@ class GerritConnection(BaseConnection): def url(self, path): return self.baseurl + '/a/' + path + def get(self, path): + url = self.url(path) + self.log.debug('GET: %s' % (url,)) + r = self.session.get( + url, + verify=self.verify_ssl, + auth=self.auth, timeout=TIMEOUT, + headers={'User-Agent': self.user_agent}) + self.log.debug('Received: %s %s' % (r.status_code, r.text,)) + if r.status_code != 200: + raise Exception("Received response %s" % (r.status_code,)) + ret = None + if r.text and len(r.text) > 4: + try: + ret = json.loads(r.text[4:]) + except Exception: + self.log.exception( + "Unable to parse result %s from post to %s" % + (r.text, url)) + raise + return ret + def post(self, path, data): url = self.url(path) self.log.debug('POST: %s' % (url,)) @@ -836,17 +953,18 @@ class GerritConnection(BaseConnection): def eventDone(self): self.event_queue.task_done() - def review(self, change, message, action={}, + def review(self, item, message, action={}, file_comments={}, zuul_event_id=None): if self.session: meth = self.review_http else: meth = self.review_ssh - return meth(change, message, action=action, + return meth(item, message, action=action, file_comments=file_comments, zuul_event_id=zuul_event_id) - def review_ssh(self, change, message, action={}, + def review_ssh(self, item, message, action={}, file_comments={}, zuul_event_id=None): + change = item.change project = change.project.name cmd = 'gerrit review --project %s' % project if message: @@ -861,14 +979,56 @@ class GerritConnection(BaseConnection): out, err = self._ssh(cmd, zuul_event_id=zuul_event_id) return err - def review_http(self, change, message, action={}, + def report_checks(self, log, item, changeid, checkinfo): + change = item.change + checkinfo = checkinfo.copy() + uuid = checkinfo.pop('uuid', None) + scheme = checkinfo.pop('scheme', None) + if uuid is None: + uuids = self.project_checker_map.get( + change.project.canonical_name, set()) + for u in uuids: + if u.split(':')[0] == scheme: + uuid = u + break + if uuid is None: + log.error("Unable to find matching checker for %s %s", + item, checkinfo) + return + + def fmt(t): + return str(datetime.datetime.fromtimestamp(t)) + + if item.enqueue_time: + checkinfo['started'] = fmt(item.enqueue_time) + if item.report_time: + checkinfo['finished'] = fmt(item.report_time) + url = item.formatStatusUrl() + if url: + checkinfo['url'] = url + if checkinfo: + for x in range(1, 4): + try: + self.post('changes/%s/revisions/%s/checks/%s' % + (changeid, change.commit, uuid), + checkinfo) + break + except Exception: + log.exception("Error submitting check data to gerrit, " + "attempt %s", x) + time.sleep(x * 10) + + def review_http(self, item, message, action={}, file_comments={}, zuul_event_id=None): + change = item.change log = get_annotated_logger(self.log, zuul_event_id) data = dict(message=message, strict_labels=False) submit = False labels = {} for key, val in action.items(): + if key == 'checks_api': + continue if val is True: if key == 'submit': submit = True @@ -883,21 +1043,27 @@ class GerritConnection(BaseConnection): urllib.parse.quote(str(change.project), safe=''), urllib.parse.quote(str(change.branch), safe=''), change.id) - for x in range(1, 4): - try: - self.post('changes/%s/revisions/%s/review' % - (changeid, change.commit), - data) - break - except Exception: - log.exception("Error submitting data to gerrit, attempt %s", x) - time.sleep(x * 10) + if 'checks_api' in action: + self.report_checks(log, item, changeid, action['checks_api']) + if (message or data.get('labels') or data.get('comments')): + for x in range(1, 4): + try: + self.post('changes/%s/revisions/%s/review' % + (changeid, change.commit), + data) + break + except Exception: + log.exception( + "Error submitting data to gerrit, attempt %s", x) + time.sleep(x * 10) if change.is_current_patchset and submit: - try: - self.post('changes/%s/submit' % (changeid,), {}) - except Exception: - log.exception("Error submitting data to gerrit, attempt %s", x) - time.sleep(x * 10) + for x in range(1, 4): + try: + self.post('changes/%s/submit' % (changeid,), {}) + except Exception: + log.exception( + "Error submitting data to gerrit, attempt %s", x) + time.sleep(x * 10) def query(self, query, event=None): args = '--all-approvals --comments --commit-message' @@ -1078,11 +1244,13 @@ class GerritConnection(BaseConnection): def onLoad(self): self.log.debug("Starting Gerrit Connection/Watchers") self._start_watcher_thread() + self._start_poller_thread() self._start_event_connector() def onStop(self): self.log.debug("Stopping Gerrit Connection/Watchers") self._stop_watcher_thread() + self._stop_poller_thread() self._stop_event_connector() def _stop_watcher_thread(self): @@ -1100,6 +1268,15 @@ class GerritConnection(BaseConnection): keepalive=self.keepalive) self.watcher_thread.start() + def _stop_poller_thread(self): + if self.poller_thread: + self.poller_thread.stop() + self.poller_thread.join() + + def _start_poller_thread(self): + self.poller_thread = self._poller_class(self) + self.poller_thread.start() + def _stop_event_connector(self): if self.gerrit_event_connector: self.gerrit_event_connector.stop() diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py index 46a034a09..07c746dcf 100644 --- a/zuul/driver/gerrit/gerritmodel.py +++ b/zuul/driver/gerrit/gerritmodel.py @@ -35,6 +35,8 @@ class GerritTriggerEvent(TriggerEvent): def __init__(self): super(GerritTriggerEvent, self).__init__() self.approvals = [] + self.uuid = None + self.scheme = None def __repr__(self): ret = '<GerritTriggerEvent %s %s' % (self.type, @@ -154,8 +156,8 @@ class GerritApprovalFilter(object): class GerritEventFilter(EventFilter, GerritApprovalFilter): def __init__(self, trigger, types=[], branches=[], refs=[], event_approvals={}, comments=[], emails=[], usernames=[], - required_approvals=[], reject_approvals=[], - ignore_deletes=True): + required_approvals=[], reject_approvals=[], uuid=None, + scheme=None, ignore_deletes=True): EventFilter.__init__(self, trigger) @@ -176,6 +178,8 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): self.emails = [re.compile(x) for x in emails] self.usernames = [re.compile(x) for x in usernames] self.event_approvals = event_approvals + self.uuid = uuid + self.scheme = scheme self.ignore_deletes = ignore_deletes def __repr__(self): @@ -183,6 +187,10 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): if self._types: ret += ' types: %s' % ', '.join(self._types) + if self.uuid: + ret += ' uuid: %s' % (self.uuid,) + if self.scheme: + ret += ' scheme: %s' % (self.scheme,) if self._branches: ret += ' branches: %s' % ', '.join(self._branches) if self._refs: @@ -217,6 +225,12 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): if self.types and not matches_type: return False + if event.type == 'pending-check': + if self.uuid and event.uuid != self.uuid: + return False + if self.scheme and event.uuid.split(':')[0] != self.scheme: + return False + # branches are ORed matches_branch = False for branch in self.branches: diff --git a/zuul/driver/gerrit/gerritreporter.py b/zuul/driver/gerrit/gerritreporter.py index 90de89cc6..fa7968be9 100644 --- a/zuul/driver/gerrit/gerritreporter.py +++ b/zuul/driver/gerrit/gerritreporter.py @@ -61,7 +61,7 @@ class GerritReporter(BaseReporter): item.change._ref_sha = item.change.project.source.getRefSha( item.change.project, 'refs/heads/' + item.change.branch) - return self.connection.review(item.change, message, self.config, + return self.connection.review(item, message, self.config, comments, zuul_event_id=item.event) def getSubmitAllowNeeds(self): diff --git a/zuul/driver/gerrit/gerrittrigger.py b/zuul/driver/gerrit/gerrittrigger.py index 67608ad81..88d198d32 100644 --- a/zuul/driver/gerrit/gerrittrigger.py +++ b/zuul/driver/gerrit/gerrittrigger.py @@ -56,6 +56,8 @@ class GerritTrigger(BaseTrigger): reject_approvals=to_list( trigger.get('reject-approval') ), + uuid=trigger.get('uuid'), + scheme=trigger.get('scheme'), ignore_deletes=ignore_deletes ) efilters.append(f) @@ -80,7 +82,10 @@ def getSchema(): 'change-restored', 'change-merged', 'comment-added', - 'ref-updated')), + 'ref-updated', + 'pending-check')), + 'uuid': str, + 'scheme': str, 'comment_filter': scalar_or_list(str), 'comment': scalar_or_list(str), 'email_filter': scalar_or_list(str), |