diff options
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/driver/gerrit/__init__.py | 19 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritconnection.py | 213 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritmodel.py | 18 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritreporter.py | 2 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerrittrigger.py | 7 |
5 files changed, 237 insertions, 22 deletions
diff --git a/zuul/driver/gerrit/__init__.py b/zuul/driver/gerrit/__init__.py index c720ba6a3..4930c7631 100644 --- a/zuul/driver/gerrit/__init__.py +++ b/zuul/driver/gerrit/__init__.py @@ -18,12 +18,31 @@ from zuul.driver.gerrit import gerritconnection from zuul.driver.gerrit import gerrittrigger from zuul.driver.gerrit import gerritsource from zuul.driver.gerrit import gerritreporter +from zuul.driver.util import to_list class GerritDriver(Driver, ConnectionInterface, TriggerInterface, SourceInterface, ReporterInterface): name = 'gerrit' + def reconfigure(self, tenant): + connection_checker_map = {} + for pipeline in tenant.layout.pipelines.values(): + for trigger in pipeline.triggers: + if isinstance(trigger, gerrittrigger.GerritTrigger): + con = trigger.connection + checkers = connection_checker_map.setdefault(con, []) + for trigger_item in to_list(trigger.config): + if trigger_item['event'] == 'pending-check': + d = {} + if 'uuid' in trigger_item: + d['uuid'] = trigger_item['uuid'] + elif 'scheme' in trigger_item: + d['scheme'] = trigger_item['scheme'] + checkers.append(d) + for (con, checkers) in connection_checker_map.items(): + con.setWatchedCheckers(checkers) + def getConnection(self, name, config): return gerritconnection.GerritConnection(self, name, config) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 1c4173005..8a10a5e73 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import json import re import re2 @@ -80,6 +81,7 @@ class GerritEventConnector(threading.Thread): log = get_annotated_logger(self.log, event) event.type = data.get('type') + event.uuid = data.get('uuid') # This catches when a change is merged, as it could potentially # have merged layout info which will need to be read in. # Ideally this would be done with a refupdate event so as to catch @@ -131,6 +133,7 @@ class GerritEventConnector(threading.Thread): 'ref-replication-scheduled': None, 'topic-changed': 'changer', 'project-created': None, # Gerrit 2.14 + 'pending-check': None, # Gerrit 3.0+ } event.account = None if event.type in accountfield_from_type: @@ -294,6 +297,58 @@ class GerritWatcher(threading.Thread): self._stopped = True +class GerritPoller(threading.Thread): + # Poll gerrit without stream-events + log = logging.getLogger("gerrit.GerritPoller") + poll_interval = 30 + + def __init__(self, connection): + threading.Thread.__init__(self) + self.connection = connection + self._stopped = False + self._stop_event = threading.Event() + + def _makeEvent(self, change, uuid, check): + return {'type': 'pending-check', + 'uuid': uuid, + 'change': { + 'project': change['patch_set']['repository'], + 'number': change['patch_set']['change_number'], + }, + 'patchSet': { + 'number': change['patch_set']['patch_set_id'], + }} + + def _run(self): + try: + for checker in self.connection.watched_checkers: + changes = self.connection.get( + 'plugins/checks/checks.pending/?' + 'query=checker:%s+(state:NOT_STARTED)' % checker) + for change in changes: + for uuid, check in change['pending_checks'].items(): + event = self._makeEvent(change, uuid, check) + self.connection.addEvent(event) + except Exception: + self.log.exception("Exception on Gerrit poll with %s:", + self.connection.connection_name) + + def run(self): + last_start = time.time() + while not self._stopped: + next_start = last_start + self.poll_interval + self._stop_event.wait(max(next_start - time.time(), 0)) + if self._stopped: + break + last_start = time.time() + self._run() + + def stop(self): + self.log.debug("Stopping watcher") + self._stopped = True + self._stop_event.set() + + class GerritConnection(BaseConnection): driver_name = 'gerrit' log = logging.getLogger("zuul.GerritConnection") @@ -305,6 +360,7 @@ class GerritConnection(BaseConnection): r"@{|\.\.|\.$|^@$|/$|^/|//+") # everything else we can check with re2 replication_timeout = 300 replication_retry_interval = 5 + _poller_class = GerritPoller def __init__(self, driver, connection_name, connection_config): super(GerritConnection, self).__init__(driver, connection_name, @@ -325,8 +381,11 @@ class GerritConnection(BaseConnection): self.keyfile = self.connection_config.get('sshkey', None) self.keepalive = int(self.connection_config.get('keepalive', 60)) self.watcher_thread = None + self.poller_thread = None self.event_queue = queue.Queue() self.client = None + self.watched_checkers = [] + self.project_checker_map = {} self.baseurl = self.connection_config.get( 'baseurl', 'https://%s' % self.server).rstrip('/') @@ -362,6 +421,42 @@ class GerritConnection(BaseConnection): self.auth = authclass( self.user, self.password) + def setWatchedCheckers(self, checkers_to_watch): + self.log.debug("Setting watched checkers to %s", checkers_to_watch) + self.watched_checkers = set() + self.project_checker_map = {} + schemes_to_watch = set() + uuids_to_watch = set() + for x in checkers_to_watch: + if 'scheme' in x: + schemes_to_watch.add(x['scheme']) + if 'uuid' in x: + uuids_to_watch.add(x['uuid']) + if schemes_to_watch: + # get a list of all configured checkers + try: + configured_checkers = self.get('plugins/checks/checkers/') + except Exception: + self.log.exception("Unable to get checkers") + configured_checkers = [] + + # filter it through scheme matches in checkers_to_watch + for checker in configured_checkers: + if checker['status'] != 'ENABLED': + continue + checker_scheme, checker_id = checker['uuid'].split(':') + repo = checker['repository'] + repo = self.canonical_hostname + '/' + repo + # map scheme matches to project names + if checker_scheme in schemes_to_watch: + repo_checkers = self.project_checker_map.setdefault( + repo, set()) + repo_checkers.add(checker['uuid']) + self.watched_checkers.add(checker['uuid']) + # add uuids from checkers_to_watch + for x in uuids_to_watch: + self.watched_checkers.add(x) + def toDict(self): d = super().toDict() d.update({ @@ -375,6 +470,28 @@ class GerritConnection(BaseConnection): def url(self, path): return self.baseurl + '/a/' + path + def get(self, path): + url = self.url(path) + self.log.debug('GET: %s' % (url,)) + r = self.session.get( + url, + verify=self.verify_ssl, + auth=self.auth, timeout=TIMEOUT, + headers={'User-Agent': self.user_agent}) + self.log.debug('Received: %s %s' % (r.status_code, r.text,)) + if r.status_code != 200: + raise Exception("Received response %s" % (r.status_code,)) + ret = None + if r.text and len(r.text) > 4: + try: + ret = json.loads(r.text[4:]) + except Exception: + self.log.exception( + "Unable to parse result %s from post to %s" % + (r.text, url)) + raise + return ret + def post(self, path, data): url = self.url(path) self.log.debug('POST: %s' % (url,)) @@ -836,17 +953,18 @@ class GerritConnection(BaseConnection): def eventDone(self): self.event_queue.task_done() - def review(self, change, message, action={}, + def review(self, item, message, action={}, file_comments={}, zuul_event_id=None): if self.session: meth = self.review_http else: meth = self.review_ssh - return meth(change, message, action=action, + return meth(item, message, action=action, file_comments=file_comments, zuul_event_id=zuul_event_id) - def review_ssh(self, change, message, action={}, + def review_ssh(self, item, message, action={}, file_comments={}, zuul_event_id=None): + change = item.change project = change.project.name cmd = 'gerrit review --project %s' % project if message: @@ -861,14 +979,56 @@ class GerritConnection(BaseConnection): out, err = self._ssh(cmd, zuul_event_id=zuul_event_id) return err - def review_http(self, change, message, action={}, + def report_checks(self, log, item, changeid, checkinfo): + change = item.change + checkinfo = checkinfo.copy() + uuid = checkinfo.pop('uuid', None) + scheme = checkinfo.pop('scheme', None) + if uuid is None: + uuids = self.project_checker_map.get( + change.project.canonical_name, set()) + for u in uuids: + if u.split(':')[0] == scheme: + uuid = u + break + if uuid is None: + log.error("Unable to find matching checker for %s %s", + item, checkinfo) + return + + def fmt(t): + return str(datetime.datetime.fromtimestamp(t)) + + if item.enqueue_time: + checkinfo['started'] = fmt(item.enqueue_time) + if item.report_time: + checkinfo['finished'] = fmt(item.report_time) + url = item.formatStatusUrl() + if url: + checkinfo['url'] = url + if checkinfo: + for x in range(1, 4): + try: + self.post('changes/%s/revisions/%s/checks/%s' % + (changeid, change.commit, uuid), + checkinfo) + break + except Exception: + log.exception("Error submitting check data to gerrit, " + "attempt %s", x) + time.sleep(x * 10) + + def review_http(self, item, message, action={}, file_comments={}, zuul_event_id=None): + change = item.change log = get_annotated_logger(self.log, zuul_event_id) data = dict(message=message, strict_labels=False) submit = False labels = {} for key, val in action.items(): + if key == 'checks_api': + continue if val is True: if key == 'submit': submit = True @@ -883,21 +1043,27 @@ class GerritConnection(BaseConnection): urllib.parse.quote(str(change.project), safe=''), urllib.parse.quote(str(change.branch), safe=''), change.id) - for x in range(1, 4): - try: - self.post('changes/%s/revisions/%s/review' % - (changeid, change.commit), - data) - break - except Exception: - log.exception("Error submitting data to gerrit, attempt %s", x) - time.sleep(x * 10) + if 'checks_api' in action: + self.report_checks(log, item, changeid, action['checks_api']) + if (message or data.get('labels') or data.get('comments')): + for x in range(1, 4): + try: + self.post('changes/%s/revisions/%s/review' % + (changeid, change.commit), + data) + break + except Exception: + log.exception( + "Error submitting data to gerrit, attempt %s", x) + time.sleep(x * 10) if change.is_current_patchset and submit: - try: - self.post('changes/%s/submit' % (changeid,), {}) - except Exception: - log.exception("Error submitting data to gerrit, attempt %s", x) - time.sleep(x * 10) + for x in range(1, 4): + try: + self.post('changes/%s/submit' % (changeid,), {}) + except Exception: + log.exception( + "Error submitting data to gerrit, attempt %s", x) + time.sleep(x * 10) def query(self, query, event=None): args = '--all-approvals --comments --commit-message' @@ -1078,11 +1244,13 @@ class GerritConnection(BaseConnection): def onLoad(self): self.log.debug("Starting Gerrit Connection/Watchers") self._start_watcher_thread() + self._start_poller_thread() self._start_event_connector() def onStop(self): self.log.debug("Stopping Gerrit Connection/Watchers") self._stop_watcher_thread() + self._stop_poller_thread() self._stop_event_connector() def _stop_watcher_thread(self): @@ -1100,6 +1268,15 @@ class GerritConnection(BaseConnection): keepalive=self.keepalive) self.watcher_thread.start() + def _stop_poller_thread(self): + if self.poller_thread: + self.poller_thread.stop() + self.poller_thread.join() + + def _start_poller_thread(self): + self.poller_thread = self._poller_class(self) + self.poller_thread.start() + def _stop_event_connector(self): if self.gerrit_event_connector: self.gerrit_event_connector.stop() diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py index 46a034a09..07c746dcf 100644 --- a/zuul/driver/gerrit/gerritmodel.py +++ b/zuul/driver/gerrit/gerritmodel.py @@ -35,6 +35,8 @@ class GerritTriggerEvent(TriggerEvent): def __init__(self): super(GerritTriggerEvent, self).__init__() self.approvals = [] + self.uuid = None + self.scheme = None def __repr__(self): ret = '<GerritTriggerEvent %s %s' % (self.type, @@ -154,8 +156,8 @@ class GerritApprovalFilter(object): class GerritEventFilter(EventFilter, GerritApprovalFilter): def __init__(self, trigger, types=[], branches=[], refs=[], event_approvals={}, comments=[], emails=[], usernames=[], - required_approvals=[], reject_approvals=[], - ignore_deletes=True): + required_approvals=[], reject_approvals=[], uuid=None, + scheme=None, ignore_deletes=True): EventFilter.__init__(self, trigger) @@ -176,6 +178,8 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): self.emails = [re.compile(x) for x in emails] self.usernames = [re.compile(x) for x in usernames] self.event_approvals = event_approvals + self.uuid = uuid + self.scheme = scheme self.ignore_deletes = ignore_deletes def __repr__(self): @@ -183,6 +187,10 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): if self._types: ret += ' types: %s' % ', '.join(self._types) + if self.uuid: + ret += ' uuid: %s' % (self.uuid,) + if self.scheme: + ret += ' scheme: %s' % (self.scheme,) if self._branches: ret += ' branches: %s' % ', '.join(self._branches) if self._refs: @@ -217,6 +225,12 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): if self.types and not matches_type: return False + if event.type == 'pending-check': + if self.uuid and event.uuid != self.uuid: + return False + if self.scheme and event.uuid.split(':')[0] != self.scheme: + return False + # branches are ORed matches_branch = False for branch in self.branches: diff --git a/zuul/driver/gerrit/gerritreporter.py b/zuul/driver/gerrit/gerritreporter.py index 90de89cc6..fa7968be9 100644 --- a/zuul/driver/gerrit/gerritreporter.py +++ b/zuul/driver/gerrit/gerritreporter.py @@ -61,7 +61,7 @@ class GerritReporter(BaseReporter): item.change._ref_sha = item.change.project.source.getRefSha( item.change.project, 'refs/heads/' + item.change.branch) - return self.connection.review(item.change, message, self.config, + return self.connection.review(item, message, self.config, comments, zuul_event_id=item.event) def getSubmitAllowNeeds(self): diff --git a/zuul/driver/gerrit/gerrittrigger.py b/zuul/driver/gerrit/gerrittrigger.py index 67608ad81..88d198d32 100644 --- a/zuul/driver/gerrit/gerrittrigger.py +++ b/zuul/driver/gerrit/gerrittrigger.py @@ -56,6 +56,8 @@ class GerritTrigger(BaseTrigger): reject_approvals=to_list( trigger.get('reject-approval') ), + uuid=trigger.get('uuid'), + scheme=trigger.get('scheme'), ignore_deletes=ignore_deletes ) efilters.append(f) @@ -80,7 +82,10 @@ def getSchema(): 'change-restored', 'change-merged', 'comment-added', - 'ref-updated')), + 'ref-updated', + 'pending-check')), + 'uuid': str, + 'scheme': str, 'comment_filter': scalar_or_list(str), 'comment': scalar_or_list(str), 'email_filter': scalar_or_list(str), |