diff options
Diffstat (limited to 'zuul')
-rwxr-xr-x | zuul/ansible/base/library/zuul_console.py | 17 | ||||
-rwxr-xr-x | zuul/cmd/client.py | 32 | ||||
-rwxr-xr-x | zuul/cmd/scheduler.py | 2 | ||||
-rw-r--r-- | zuul/driver/gerrit/__init__.py | 19 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritconnection.py | 589 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritmodel.py | 137 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritreporter.py | 2 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritsource.py | 9 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerrittrigger.py | 7 | ||||
-rw-r--r-- | zuul/model.py | 73 | ||||
-rw-r--r-- | zuul/nodepool.py | 44 | ||||
-rw-r--r-- | zuul/rpcclient.py | 4 | ||||
-rw-r--r-- | zuul/rpclistener.py | 22 | ||||
-rw-r--r-- | zuul/scheduler.py | 104 | ||||
-rwxr-xr-x | zuul/web/__init__.py | 26 | ||||
-rw-r--r-- | zuul/zk.py | 182 |
16 files changed, 1015 insertions, 254 deletions
diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py index 6f988e682..9dffbbc3a 100755 --- a/zuul/ansible/base/library/zuul_console.py +++ b/zuul/ansible/base/library/zuul_console.py @@ -185,7 +185,7 @@ class Server(object): console = self.chunkConsole(conn, log_uuid) if console: break - conn.send('[Zuul] Log not found\n') + conn.send(b'[Zuul] Log not found\n') time.sleep(0.5) while True: if self.followConsole(console, conn): @@ -307,6 +307,19 @@ def main(): from ansible.module_utils.basic import * # noqa from ansible.module_utils.basic import AnsibleModule +# +# To debug this, you can run it with arguments specified on the +# command-line in a json file. e.g. +# +# $ cat args.json +# {"ANSIBLE_MODULE_ARGS": { +# "state": "present" +# } +# } +# +# Then from a virtualenv with Ansible installed you can run +# +# $ ./ansible-env/bin/python ./zuul_console.py args.json +# if __name__ == '__main__': main() -# test() diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index 404fbc203..bef204dde 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -202,6 +202,13 @@ class Client(zuul.cmd.ZuulApp): required=False, type=int, default=0) cmd_autohold.set_defaults(func=self.autohold) + cmd_autohold_delete = subparsers.add_parser( + 'autohold-delete', help='delete autohold request') + cmd_autohold_delete.set_defaults(func=self.autohold_delete) + cmd_autohold_delete.add_argument('--id', + help='request ID', + required=True) + cmd_autohold_list = subparsers.add_parser( 'autohold-list', help='list autohold requests') cmd_autohold_list.add_argument('--tenant', help='tenant name', @@ -430,28 +437,35 @@ class Client(zuul.cmd.ZuulApp): node_hold_expiration=node_hold_expiration) return r + def autohold_delete(self): + client = self.get_client() + return client.autohold_delete(self.args.id) + def autohold_list(self): client = self.get_client() autohold_requests = client.autohold_list(tenant=self.args.tenant) - if len(autohold_requests.keys()) == 0: + if not autohold_requests: print("No autohold requests found") return True table = prettytable.PrettyTable( field_names=[ - 'Tenant', 'Project', 'Job', 'Ref Filter', 'Count', 'Reason' + 'ID', 'Tenant', 'Project', 'Job', 'Ref Filter', + 'Max Count', 'Reason' ]) - for key, value in autohold_requests.items(): - # The key comes to us as a CSV string because json doesn't like - # non-str keys. - tenant_name, project_name, job_name, ref_filter = key.split(',') - count, reason, node_hold_expiration = value - + for request in autohold_requests: table.add_row([ - tenant_name, project_name, job_name, ref_filter, count, reason + request['id'], + request['tenant'], + request['project'], + request['job'], + request['ref_filter'], + request['max_count'], + request['reason'], ]) + print(table) return True diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 67484bf87..1eefbe713 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -136,7 +136,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): merger = zuul.merger.client.MergeClient(self.config, self.sched) nodepool = zuul.nodepool.Nodepool(self.sched) - zookeeper = zuul.zk.ZooKeeper() + zookeeper = zuul.zk.ZooKeeper(enable_cache=True) zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None) if not zookeeper_hosts: raise Exception("The zookeeper hosts config value is required") diff --git a/zuul/driver/gerrit/__init__.py b/zuul/driver/gerrit/__init__.py index c720ba6a3..4930c7631 100644 --- a/zuul/driver/gerrit/__init__.py +++ b/zuul/driver/gerrit/__init__.py @@ -18,12 +18,31 @@ from zuul.driver.gerrit import gerritconnection from zuul.driver.gerrit import gerrittrigger from zuul.driver.gerrit import gerritsource from zuul.driver.gerrit import gerritreporter +from zuul.driver.util import to_list class GerritDriver(Driver, ConnectionInterface, TriggerInterface, SourceInterface, ReporterInterface): name = 'gerrit' + def reconfigure(self, tenant): + connection_checker_map = {} + for pipeline in tenant.layout.pipelines.values(): + for trigger in pipeline.triggers: + if isinstance(trigger, gerrittrigger.GerritTrigger): + con = trigger.connection + checkers = connection_checker_map.setdefault(con, []) + for trigger_item in to_list(trigger.config): + if trigger_item['event'] == 'pending-check': + d = {} + if 'uuid' in trigger_item: + d['uuid'] = trigger_item['uuid'] + elif 'scheme' in trigger_item: + d['scheme'] = trigger_item['scheme'] + checkers.append(d) + for (con, checkers) in connection_checker_map.items(): + con.setWatchedCheckers(checkers) + def getConnection(self, name, config): return gerritconnection.GerritConnection(self, name, config) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 1c4173005..8a64bad7c 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -13,36 +13,100 @@ # License for the specific language governing permissions and limitations # under the License. +import copy +import datetime +import itertools import json +import logging +import paramiko +import pprint +import queue import re import re2 +import requests import select +import shlex import threading import time import urllib -import paramiko -import logging -import pprint -import shlex -import queue import urllib.parse -import requests from typing import Dict, List from uuid import uuid4 +from zuul import version as zuul_version from zuul.connection import BaseConnection +from zuul.driver.gerrit.auth import FormAuth +from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent from zuul.lib.logutil import get_annotated_logger from zuul.model import Ref, Tag, Branch, Project -from zuul import exceptions -from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent -from zuul.driver.gerrit.auth import FormAuth -from zuul import version as zuul_version # HTTP timeout in seconds TIMEOUT = 30 +class GerritChangeData(object): + """Compatability layer for SSH/HTTP + + This class holds the raw data returned from a change query over + SSH or HTTP. Most of the work of parsing the data and storing it + on the change is in the gerritmodel.GerritChange class, however + this does perform a small amount of parsing of dependencies since + they are handled outside of that class. This provides an API to + that data independent of the source. + + """ + + SSH = 1 + HTTP = 2 + + def __init__(self, fmt, data, related=None): + self.format = fmt + self.data = data + + if fmt == self.SSH: + self.parseSSH(data) + else: + self.parseHTTP(data) + if related: + self.parseRelatedHTTP(data, related) + + def parseSSH(self, data): + self.needed_by = [] + self.depends_on = None + self.message = data['commitMessage'] + self.current_patchset = str(data['currentPatchSet']['number']) + self.number = str(data['number']) + + if 'dependsOn' in data: + parts = data['dependsOn'][0]['ref'].split('/') + self.depends_on = (parts[3], parts[4]) + + for needed in data.get('neededBy', []): + parts = needed['ref'].split('/') + self.needed_by.append((parts[3], parts[4])) + + def parseHTTP(self, data): + rev = data['revisions'][data['current_revision']] + self.message = rev['commit']['message'] + self.current_patchset = str(rev['_number']) + self.number = str(data['_number']) + + def parseRelatedHTTP(self, data, related): + self.needed_by = [] + self.depends_on = None + current_rev = data['revisions'][data['current_revision']] + for change in related['changes']: + for parent in current_rev['commit']['parents']: + if change['commit']['commit'] == parent['commit']: + self.depends_on = (change['_change_number'], + change['_revision_number']) + break + else: + self.needed_by.append((change['_change_number'], + change['_revision_number'])) + + class GerritEventConnector(threading.Thread): """Move events from Gerrit to the scheduler.""" @@ -80,6 +144,7 @@ class GerritEventConnector(threading.Thread): log = get_annotated_logger(self.log, event) event.type = data.get('type') + event.uuid = data.get('uuid') # This catches when a change is merged, as it could potentially # have merged layout info which will need to be read in. # Ideally this would be done with a refupdate event so as to catch @@ -131,6 +196,7 @@ class GerritEventConnector(threading.Thread): 'ref-replication-scheduled': None, 'topic-changed': 'changer', 'project-created': None, # Gerrit 2.14 + 'pending-check': None, # Gerrit 3.0+ } event.account = None if event.type in accountfield_from_type: @@ -294,6 +360,58 @@ class GerritWatcher(threading.Thread): self._stopped = True +class GerritPoller(threading.Thread): + # Poll gerrit without stream-events + log = logging.getLogger("gerrit.GerritPoller") + poll_interval = 30 + + def __init__(self, connection): + threading.Thread.__init__(self) + self.connection = connection + self._stopped = False + self._stop_event = threading.Event() + + def _makeEvent(self, change, uuid, check): + return {'type': 'pending-check', + 'uuid': uuid, + 'change': { + 'project': change['patch_set']['repository'], + 'number': change['patch_set']['change_number'], + }, + 'patchSet': { + 'number': change['patch_set']['patch_set_id'], + }} + + def _run(self): + try: + for checker in self.connection.watched_checkers: + changes = self.connection.get( + 'plugins/checks/checks.pending/?' + 'query=checker:%s+(state:NOT_STARTED)' % checker) + for change in changes: + for uuid, check in change['pending_checks'].items(): + event = self._makeEvent(change, uuid, check) + self.connection.addEvent(event) + except Exception: + self.log.exception("Exception on Gerrit poll with %s:", + self.connection.connection_name) + + def run(self): + last_start = time.time() + while not self._stopped: + next_start = last_start + self.poll_interval + self._stop_event.wait(max(next_start - time.time(), 0)) + if self._stopped: + break + last_start = time.time() + self._run() + + def stop(self): + self.log.debug("Stopping watcher") + self._stopped = True + self._stop_event.set() + + class GerritConnection(BaseConnection): driver_name = 'gerrit' log = logging.getLogger("zuul.GerritConnection") @@ -305,6 +423,7 @@ class GerritConnection(BaseConnection): r"@{|\.\.|\.$|^@$|/$|^/|//+") # everything else we can check with re2 replication_timeout = 300 replication_retry_interval = 5 + _poller_class = GerritPoller def __init__(self, driver, connection_name, connection_config): super(GerritConnection, self).__init__(driver, connection_name, @@ -324,9 +443,20 @@ class GerritConnection(BaseConnection): self.port = int(self.connection_config.get('port', 29418)) self.keyfile = self.connection_config.get('sshkey', None) self.keepalive = int(self.connection_config.get('keepalive', 60)) + # TODO(corvus): Document this when the checks api is stable; + # it's not useful without it. + self.enable_stream_events = self.connection_config.get( + 'stream_events', True) + if self.enable_stream_events not in [ + 'true', 'True', '1', 1, 'TRUE', True]: + self.enable_stream_events = False self.watcher_thread = None + self.poller_thread = None self.event_queue = queue.Queue() self.client = None + self.watched_checkers = [] + self.project_checker_map = {} + self.version = (0, 0, 0) self.baseurl = self.connection_config.get( 'baseurl', 'https://%s' % self.server).rstrip('/') @@ -362,6 +492,42 @@ class GerritConnection(BaseConnection): self.auth = authclass( self.user, self.password) + def setWatchedCheckers(self, checkers_to_watch): + self.log.debug("Setting watched checkers to %s", checkers_to_watch) + self.watched_checkers = set() + self.project_checker_map = {} + schemes_to_watch = set() + uuids_to_watch = set() + for x in checkers_to_watch: + if 'scheme' in x: + schemes_to_watch.add(x['scheme']) + if 'uuid' in x: + uuids_to_watch.add(x['uuid']) + if schemes_to_watch: + # get a list of all configured checkers + try: + configured_checkers = self.get('plugins/checks/checkers/') + except Exception: + self.log.exception("Unable to get checkers") + configured_checkers = [] + + # filter it through scheme matches in checkers_to_watch + for checker in configured_checkers: + if checker['status'] != 'ENABLED': + continue + checker_scheme, checker_id = checker['uuid'].split(':') + repo = checker['repository'] + repo = self.canonical_hostname + '/' + repo + # map scheme matches to project names + if checker_scheme in schemes_to_watch: + repo_checkers = self.project_checker_map.setdefault( + repo, set()) + repo_checkers.add(checker['uuid']) + self.watched_checkers.add(checker['uuid']) + # add uuids from checkers_to_watch + for x in uuids_to_watch: + self.watched_checkers.add(x) + def toDict(self): d = super().toDict() d.update({ @@ -375,6 +541,28 @@ class GerritConnection(BaseConnection): def url(self, path): return self.baseurl + '/a/' + path + def get(self, path): + url = self.url(path) + self.log.debug('GET: %s' % (url,)) + r = self.session.get( + url, + verify=self.verify_ssl, + auth=self.auth, timeout=TIMEOUT, + headers={'User-Agent': self.user_agent}) + self.log.debug('Received: %s %s' % (r.status_code, r.text,)) + if r.status_code != 200: + raise Exception("Received response %s" % (r.status_code,)) + ret = None + if r.text and len(r.text) > 4: + try: + ret = json.loads(r.text[4:]) + except Exception: + self.log.exception( + "Unable to parse result %s from post to %s" % + (r.text, url)) + raise + return ret + def post(self, path, data): url = self.url(path) self.log.debug('POST: %s' % (url,)) @@ -510,7 +698,7 @@ class GerritConnection(BaseConnection): log.debug("Updating %s: Running query %s to find needed changes", change, query) records.extend(self.simpleQuery(query, event=event)) - return records + return [(x.number, x.current_patchset) for x in records] def _getNeededByFromCommit(self, change_id, change, event): log = get_annotated_logger(self.log, event) @@ -522,11 +710,10 @@ class GerritConnection(BaseConnection): results = self.simpleQuery(query, event=event) for result in results: for match in self.depends_on_re.findall( - result['commitMessage']): + result.message): if match != change_id: continue - key = (str(result['number']), - str(result['currentPatchSet']['number'])) + key = (result.number, result.current_patchset) if key in seen: continue log.debug("Updating %s: Found change %s,%s " @@ -534,7 +721,7 @@ class GerritConnection(BaseConnection): change, key[0], key[1], change_id) seen.add(key) records.append(result) - return records + return [(x.number, x.current_patchset) for x in records] def _updateChange(self, change, event, history): log = get_annotated_logger(self.log, event) @@ -562,54 +749,16 @@ class GerritConnection(BaseConnection): log.info("Updating %s", change) data = self.queryChange(change.number, event=event) - change._data = data - - if change.patchset is None: - change.patchset = str(data['currentPatchSet']['number']) - if 'project' not in data: - raise exceptions.ChangeNotFound(change.number, change.patchset) - change.project = self.source.getProject(data['project']) - change.id = data['id'] - change.branch = data['branch'] - change.url = data['url'] - urlparse = urllib.parse.urlparse(self.baseurl) - baseurl = "%s%s" % (urlparse.netloc, urlparse.path) - baseurl = baseurl.rstrip('/') - change.uris = [ - '%s/%s' % (baseurl, change.number), - '%s/#/c/%s' % (baseurl, change.number), - '%s/c/%s/+/%s' % (baseurl, change.project.name, change.number), - ] - - max_ps = 0 - files = [] - for ps in data['patchSets']: - if str(ps['number']) == change.patchset: - change.ref = ps['ref'] - change.commit = ps['revision'] - for f in ps.get('files', []): - files.append(f['file']) - if int(ps['number']) > int(max_ps): - max_ps = str(ps['number']) - if max_ps == change.patchset: - change.is_current_patchset = True - else: - change.is_current_patchset = False - change.files = files + change.update(data, self) - change.is_merged = self._isMerged(change) - change.approvals = data['currentPatchSet'].get('approvals', []) - change.open = data['open'] - change.status = data['status'] - change.owner = data['owner'] - change.message = data['commitMessage'] + if not change.is_merged: + self._updateChangeDependencies(log, change, data, event, history) - if change.is_merged: - # This change is merged, so we don't need to look any further - # for dependencies. - log.debug("Updating %s: change is merged", change) - return change + self.sched.onChangeUpdated(change, event) + return change + + def _updateChangeDependencies(self, log, change, data, event, history): if history is None: history = [] else: @@ -618,9 +767,8 @@ class GerritConnection(BaseConnection): needs_changes = set() git_needs_changes = [] - if 'dependsOn' in data: - parts = data['dependsOn'][0]['ref'].split('/') - dep_num, dep_ps = parts[3], parts[4] + if data.depends_on is not None: + dep_num, dep_ps = data.depends_on log.debug("Updating %s: Getting git-dependent change %s,%s", change, dep_num, dep_ps) dep = self._getChange(dep_num, dep_ps, history=history, @@ -634,10 +782,8 @@ class GerritConnection(BaseConnection): change.git_needs_changes = git_needs_changes compat_needs_changes = [] - for record in self._getDependsOnFromCommit(data['commitMessage'], - change, event): - dep_num = str(record['number']) - dep_ps = str(record['currentPatchSet']['number']) + for (dep_num, dep_ps) in self._getDependsOnFromCommit( + change.message, change, event): log.debug("Updating %s: Getting commit-dependent " "change %s,%s", change, dep_num, dep_ps) dep = self._getChange(dep_num, dep_ps, history=history, @@ -649,24 +795,20 @@ class GerritConnection(BaseConnection): needed_by_changes = set() git_needed_by_changes = [] - if 'neededBy' in data: - for needed in data['neededBy']: - parts = needed['ref'].split('/') - dep_num, dep_ps = parts[3], parts[4] - log.debug("Updating %s: Getting git-needed change %s,%s", - change, dep_num, dep_ps) - dep = self._getChange(dep_num, dep_ps, history=history, - event=event) - if (dep.open and dep.is_current_patchset and - dep not in needed_by_changes): - git_needed_by_changes.append(dep) - needed_by_changes.add(dep) + for (dep_num, dep_ps) in data.needed_by: + log.debug("Updating %s: Getting git-needed change %s,%s", + change, dep_num, dep_ps) + dep = self._getChange(dep_num, dep_ps, history=history, + event=event) + if (dep.open and dep.is_current_patchset and + dep not in needed_by_changes): + git_needed_by_changes.append(dep) + needed_by_changes.add(dep) change.git_needed_by_changes = git_needed_by_changes compat_needed_by_changes = [] - for record in self._getNeededByFromCommit(data['id'], change, event): - dep_num = str(record['number']) - dep_ps = str(record['currentPatchSet']['number']) + for (dep_num, dep_ps) in self._getNeededByFromCommit( + change.id, change, event): log.debug("Updating %s: Getting commit-needed change %s,%s", change, dep_num, dep_ps) # Because a commit needed-by may be a cross-repo @@ -683,10 +825,6 @@ class GerritConnection(BaseConnection): needed_by_changes.add(dep) change.compat_needed_by_changes = compat_needed_by_changes - self.sched.onChangeUpdated(change, event) - - return change - def isMerged(self, change, head=None): self.log.debug("Checking if change %s is merged" % change) if not change.number: @@ -696,8 +834,7 @@ class GerritConnection(BaseConnection): return True data = self.queryChange(change.number) - change._data = data - change.is_merged = self._isMerged(change) + change.update(data, self) if change.is_merged: self.log.debug("Change %s is merged" % (change,)) else: @@ -717,17 +854,6 @@ class GerritConnection(BaseConnection): (change)) return False - def _isMerged(self, change): - data = change._data - if not data: - return False - status = data.get('status') - if not status: - return False - if status == 'MERGED': - return True - return False - def _waitForRefSha(self, project: Project, ref: str, old_sha: str='') -> bool: # Wait for the ref to show up in the repo @@ -756,35 +882,9 @@ class GerritConnection(BaseConnection): # Good question. It's probably ref-updated, which, ah, # means it's merged. return True - data = change._data - if not data: - return False - if 'submitRecords' not in data: - return False - try: - for sr in data['submitRecords']: - if sr['status'] == 'OK': - return True - elif sr['status'] == 'NOT_READY': - for label in sr['labels']: - if label['status'] in ['OK', 'MAY']: - continue - elif label['status'] in ['NEED', 'REJECT']: - # It may be our own rejection, so we ignore - if label['label'] not in allow_needs: - return False - continue - else: - # IMPOSSIBLE - return False - else: - # CLOSED, RULE_ERROR - return False - except Exception: - log.exception("Exception determining whether change" - "%s can merge:", change) - return False - return True + if change.missing_labels < set(allow_needs): + return True + return False def getProjectOpenChanges(self, project: Project) -> List[GerritChange]: # This is a best-effort function in case Gerrit is unable to return @@ -797,11 +897,10 @@ class GerritConnection(BaseConnection): for record in data: try: changes.append( - self._getChange(record['number'], - record['currentPatchSet']['number'])) + self._getChange(record.number, record.current_patchset)) except Exception: - self.log.exception("Unable to query change %s" % - (record.get('number'),)) + self.log.exception("Unable to query change %s", + record.number) return changes @staticmethod @@ -836,17 +935,18 @@ class GerritConnection(BaseConnection): def eventDone(self): self.event_queue.task_done() - def review(self, change, message, action={}, + def review(self, item, message, action={}, file_comments={}, zuul_event_id=None): if self.session: meth = self.review_http else: meth = self.review_ssh - return meth(change, message, action=action, + return meth(item, message, action=action, file_comments=file_comments, zuul_event_id=zuul_event_id) - def review_ssh(self, change, message, action={}, + def review_ssh(self, item, message, action={}, file_comments={}, zuul_event_id=None): + change = item.change project = change.project.name cmd = 'gerrit review --project %s' % project if message: @@ -861,14 +961,56 @@ class GerritConnection(BaseConnection): out, err = self._ssh(cmd, zuul_event_id=zuul_event_id) return err - def review_http(self, change, message, action={}, + def report_checks(self, log, item, changeid, checkinfo): + change = item.change + checkinfo = checkinfo.copy() + uuid = checkinfo.pop('uuid', None) + scheme = checkinfo.pop('scheme', None) + if uuid is None: + uuids = self.project_checker_map.get( + change.project.canonical_name, set()) + for u in uuids: + if u.split(':')[0] == scheme: + uuid = u + break + if uuid is None: + log.error("Unable to find matching checker for %s %s", + item, checkinfo) + return + + def fmt(t): + return str(datetime.datetime.fromtimestamp(t)) + + if item.enqueue_time: + checkinfo['started'] = fmt(item.enqueue_time) + if item.report_time: + checkinfo['finished'] = fmt(item.report_time) + url = item.formatStatusUrl() + if url: + checkinfo['url'] = url + if checkinfo: + for x in range(1, 4): + try: + self.post('changes/%s/revisions/%s/checks/%s' % + (changeid, change.commit, uuid), + checkinfo) + break + except Exception: + log.exception("Error submitting check data to gerrit, " + "attempt %s", x) + time.sleep(x * 10) + + def review_http(self, item, message, action={}, file_comments={}, zuul_event_id=None): + change = item.change log = get_annotated_logger(self.log, zuul_event_id) data = dict(message=message, strict_labels=False) submit = False labels = {} for key, val in action.items(): + if key == 'checks_api': + continue if val is True: if key == 'submit': submit = True @@ -878,33 +1020,51 @@ class GerritConnection(BaseConnection): if labels: data['labels'] = labels if file_comments: - data['comments'] = file_comments + if self.version >= (2, 15, 0): + file_comments = copy.deepcopy(file_comments) + url = item.formatStatusUrl() + for comments in itertools.chain(file_comments.values()): + for comment in comments: + comment['robot_id'] = 'zuul' + comment['robot_run_id'] = \ + item.current_build_set.uuid + if url: + comment['url'] = url + data['robot_comments'] = file_comments + else: + data['comments'] = file_comments + data['tag'] = 'autogenerated:zuul:%s' % (item.pipeline.name) changeid = "%s~%s~%s" % ( urllib.parse.quote(str(change.project), safe=''), urllib.parse.quote(str(change.branch), safe=''), change.id) - for x in range(1, 4): - try: - self.post('changes/%s/revisions/%s/review' % - (changeid, change.commit), - data) - break - except Exception: - log.exception("Error submitting data to gerrit, attempt %s", x) - time.sleep(x * 10) + if 'checks_api' in action: + self.report_checks(log, item, changeid, action['checks_api']) + if (message or data.get('labels') or data.get('comments')): + for x in range(1, 4): + try: + self.post('changes/%s/revisions/%s/review' % + (changeid, change.commit), + data) + break + except Exception: + log.exception( + "Error submitting data to gerrit, attempt %s", x) + time.sleep(x * 10) if change.is_current_patchset and submit: - try: - self.post('changes/%s/submit' % (changeid,), {}) - except Exception: - log.exception("Error submitting data to gerrit, attempt %s", x) - time.sleep(x * 10) - - def query(self, query, event=None): + for x in range(1, 4): + try: + self.post('changes/%s/submit' % (changeid,), {}) + except Exception: + log.exception( + "Error submitting data to gerrit, attempt %s", x) + time.sleep(x * 10) + + def queryChangeSSH(self, number, event=None): args = '--all-approvals --comments --commit-message' args += ' --current-patch-set --dependencies --files' args += ' --patch-sets --submit-records' - cmd = 'gerrit query --format json %s %s' % ( - args, query) + cmd = 'gerrit query --format json %s %s' % (args, number) out, err = self._ssh(cmd) if not out: return False @@ -919,10 +1079,23 @@ class GerritConnection(BaseConnection): pprint.pformat(data)) return data + def queryChangeHTTP(self, number, event=None): + data = self.get('changes/%s?o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&' + 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&' + 'o=DETAILED_LABELS' % (number,)) + related = self.get('changes/%s/revisions/%s/related' % ( + number, data['current_revision'])) + return data, related + def queryChange(self, number, event=None): - return self.query('change:%s' % number, event=event) + if self.session: + data, related = self.queryChangeHTTP(number, event=event) + return GerritChangeData(GerritChangeData.HTTP, data, related) + else: + data = self.queryChangeSSH(number, event=event) + return GerritChangeData(GerritChangeData.SSH, data) - def simpleQuery(self, query, event=None): + def simpleQuerySSH(self, query, event=None): def _query_chunk(query, event): args = '--commit-message --current-patch-set' @@ -974,9 +1147,63 @@ class GerritConnection(BaseConnection): "%s %s" % (query, resume), event) return alldata + def simpleQueryHTTP(self, query, event=None): + iolog = get_annotated_logger(self.iolog, event) + changes = [] + sortkey = '' + done = False + offset = 0 + while not done: + # We don't actually want to limit to 500, but that's the + # server-side default, and if we don't specify this, we + # won't get a _more_changes flag. + q = ('changes/?n=500%s&o=CURRENT_REVISION&o=CURRENT_COMMIT&' + 'q=%s' % (sortkey, query)) + iolog.debug('Query: %s', q) + batch = self.get(q) + iolog.debug("Received data from Gerrit query: \n%s", + pprint.pformat(batch)) + done = True + if batch: + changes += batch + if '_more_changes' in batch[-1]: + done = False + if '_sortkey' in batch[-1]: + sortkey = '&N=%s' % (batch[-1]['_sortkey'],) + else: + offset += len(batch) + sortkey = '&start=%s' % (offset,) + return changes + + def simpleQuery(self, query, event=None): + if self.session: + # None of the users of this method require dependency + # data, so we only perform the change query and omit the + # related changes query. + alldata = self.simpleQueryHTTP(query, event=event) + return [GerritChangeData(GerritChangeData.HTTP, data) + for data in alldata] + else: + alldata = self.simpleQuerySSH(query, event=event) + return [GerritChangeData(GerritChangeData.SSH, data) + for data in alldata] + def _uploadPack(self, project: Project) -> str: - cmd = "git-upload-pack %s" % project.name - out, err = self._ssh(cmd, "0000") + if self.session: + url = ('%s/%s/info/refs?service=git-upload-pack' % + (self.baseurl, project.name)) + r = self.session.get( + url, + verify=self.verify_ssl, + auth=self.auth, timeout=TIMEOUT, + headers={'User-Agent': self.user_agent}) + self.iolog.debug('Received: %s %s' % (r.status_code, r.text,)) + if r.status_code != 200: + raise Exception("Received response %s" % (r.status_code,)) + out = r.text[r.text.find('\n') + 5:] + else: + cmd = "git-upload-pack %s" % project.name + out, err = self._ssh(cmd, "0000") return out def _open(self): @@ -1065,8 +1292,14 @@ class GerritConnection(BaseConnection): return ret def getGitUrl(self, project: Project) -> str: - url = 'ssh://%s@%s:%s/%s' % (self.user, self.server, self.port, - project.name) + if self.session: + baseurl = list(urllib.parse.urlparse(self.baseurl)) + baseurl[1] = '%s:%s@%s' % (self.user, self.password, baseurl[1]) + baseurl = urllib.parse.urlunparse(baseurl) + url = ('%s/%s' % (baseurl, project.name)) + else: + url = 'ssh://%s@%s:%s/%s' % (self.user, self.server, self.port, + project.name) return url def _getWebUrl(self, project: Project, sha: str=None) -> str: @@ -1075,14 +1308,37 @@ class GerritConnection(BaseConnection): project=project.getSafeAttributes(), sha=sha) + def _getRemoteVersion(self): + version = self.get('config/server/version') + base = version.split('-')[0] + parts = base.split('.') + major = minor = micro = 0 + if len(parts) > 0: + major = int(parts[0]) + if len(parts) > 1: + minor = int(parts[1]) + if len(parts) > 2: + micro = int(parts[2]) + self.version = (major, minor, micro) + self.log.info("Remote version is: %s (parsed as %s)" % + (version, self.version)) + def onLoad(self): self.log.debug("Starting Gerrit Connection/Watchers") - self._start_watcher_thread() + try: + self._getRemoteVersion() + except Exception: + self.log.exception("Unable to determine remote Gerrit version") + + if self.enable_stream_events: + self._start_watcher_thread() + self._start_poller_thread() self._start_event_connector() def onStop(self): self.log.debug("Stopping Gerrit Connection/Watchers") self._stop_watcher_thread() + self._stop_poller_thread() self._stop_event_connector() def _stop_watcher_thread(self): @@ -1100,6 +1356,15 @@ class GerritConnection(BaseConnection): keepalive=self.keepalive) self.watcher_thread.start() + def _stop_poller_thread(self): + if self.poller_thread: + self.poller_thread.stop() + self.poller_thread.join() + + def _start_poller_thread(self): + self.poller_thread = self._poller_class(self) + self.poller_thread.start() + def _stop_event_connector(self): if self.gerrit_event_connector: self.gerrit_event_connector.stop() diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py index 46a034a09..3967fbf82 100644 --- a/zuul/driver/gerrit/gerritmodel.py +++ b/zuul/driver/gerrit/gerritmodel.py @@ -15,10 +15,14 @@ import copy import re import time +import urllib.parse + +import dateutil.parser from zuul.model import EventFilter, RefFilter from zuul.model import Change, TriggerEvent from zuul.driver.util import time_to_seconds +from zuul import exceptions EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes @@ -29,12 +33,129 @@ class GerritChange(Change): super(GerritChange, self).__init__(project) self.approvals = [] + def update(self, data, connection): + if data.format == data.SSH: + self.updateFromSSH(data.data, connection) + else: + self.updateFromHTTP(data.data, connection) + + def updateFromSSH(self, data, connection): + if self.patchset is None: + self.patchset = str(data['currentPatchSet']['number']) + if 'project' not in data: + raise exceptions.ChangeNotFound(self.number, self.patchset) + self.project = connection.source.getProject(data['project']) + self.id = data['id'] + self.branch = data['branch'] + self.url = data['url'] + urlparse = urllib.parse.urlparse(connection.baseurl) + baseurl = "%s%s" % (urlparse.netloc, urlparse.path) + baseurl = baseurl.rstrip('/') + self.uris = [ + '%s/%s' % (baseurl, self.number), + '%s/#/c/%s' % (baseurl, self.number), + '%s/c/%s/+/%s' % (baseurl, self.project.name, self.number), + ] + + max_ps = 0 + files = [] + for ps in data['patchSets']: + if str(ps['number']) == self.patchset: + self.ref = ps['ref'] + self.commit = ps['revision'] + for f in ps.get('files', []): + files.append(f['file']) + if int(ps['number']) > int(max_ps): + max_ps = str(ps['number']) + if max_ps == self.patchset: + self.is_current_patchset = True + else: + self.is_current_patchset = False + self.files = files + + self.is_merged = data.get('status', '') == 'MERGED' + self.approvals = data['currentPatchSet'].get('approvals', []) + self.open = data['open'] + self.status = data['status'] + self.owner = data['owner'] + self.message = data['commitMessage'] + + self.missing_labels = set() + for sr in data.get('submitRecords', []): + if sr['status'] == 'NOT_READY': + for label in sr['labels']: + if label['status'] in ['OK', 'MAY']: + continue + elif label['status'] in ['NEED', 'REJECT']: + self.missing_labels.add(label['label']) + + def updateFromHTTP(self, data, connection): + urlparse = urllib.parse.urlparse(connection.baseurl) + baseurl = "%s%s" % (urlparse.netloc, urlparse.path) + baseurl = baseurl.rstrip('/') + current_revision = data['revisions'][data['current_revision']] + if self.patchset is None: + self.patchset = str(current_revision['_number']) + self.project = connection.source.getProject(data['project']) + self.id = data['change_id'] + self.branch = data['branch'] + self.url = '%s/%s' % (baseurl, self.number) + self.uris = [ + '%s/%s' % (baseurl, self.number), + '%s/#/c/%s' % (baseurl, self.number), + '%s/c/%s/+/%s' % (baseurl, self.project.name, self.number), + ] + + files = [] + if str(current_revision['_number']) == self.patchset: + self.ref = current_revision['ref'] + self.commit = data['current_revision'] + files = current_revision.get('files', []).keys() + self.is_current_patchset = True + else: + self.is_current_patchset = False + self.files = files + + self.is_merged = data['status'] == 'MERGED' + self.approvals = [] + self.missing_labels = set() + for label_name, label_data in data.get('labels', {}).items(): + for app in label_data.get('all', []): + if app.get('value', 0) == 0: + continue + by = {} + for k in ('name', 'username', 'email'): + if k in app: + by[k] = app[k] + self.approvals.append({ + "type": label_name, + "description": label_name, + "value": app['value'], + "grantedOn": + dateutil.parser.parse(app['date']).timestamp(), + "by": by, + }) + if label_data.get('optional', False): + continue + if label_data.get('blocking', False): + self.missing_labels.add(label_name) + continue + if 'approved' in label_data: + continue + self.missing_labels.add(label_name) + self.open = data['status'] == 'NEW' + self.status = data['status'] + self.owner = data['owner'] + self.message = current_revision['commit']['message'] + class GerritTriggerEvent(TriggerEvent): """Incoming event from an external system.""" def __init__(self): super(GerritTriggerEvent, self).__init__() self.approvals = [] + self.uuid = None + self.scheme = None def __repr__(self): ret = '<GerritTriggerEvent %s %s' % (self.type, @@ -154,8 +275,8 @@ class GerritApprovalFilter(object): class GerritEventFilter(EventFilter, GerritApprovalFilter): def __init__(self, trigger, types=[], branches=[], refs=[], event_approvals={}, comments=[], emails=[], usernames=[], - required_approvals=[], reject_approvals=[], - ignore_deletes=True): + required_approvals=[], reject_approvals=[], uuid=None, + scheme=None, ignore_deletes=True): EventFilter.__init__(self, trigger) @@ -176,6 +297,8 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): self.emails = [re.compile(x) for x in emails] self.usernames = [re.compile(x) for x in usernames] self.event_approvals = event_approvals + self.uuid = uuid + self.scheme = scheme self.ignore_deletes = ignore_deletes def __repr__(self): @@ -183,6 +306,10 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): if self._types: ret += ' types: %s' % ', '.join(self._types) + if self.uuid: + ret += ' uuid: %s' % (self.uuid,) + if self.scheme: + ret += ' scheme: %s' % (self.scheme,) if self._branches: ret += ' branches: %s' % ', '.join(self._branches) if self._refs: @@ -217,6 +344,12 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter): if self.types and not matches_type: return False + if event.type == 'pending-check': + if self.uuid and event.uuid != self.uuid: + return False + if self.scheme and event.uuid.split(':')[0] != self.scheme: + return False + # branches are ORed matches_branch = False for branch in self.branches: diff --git a/zuul/driver/gerrit/gerritreporter.py b/zuul/driver/gerrit/gerritreporter.py index 90de89cc6..fa7968be9 100644 --- a/zuul/driver/gerrit/gerritreporter.py +++ b/zuul/driver/gerrit/gerritreporter.py @@ -61,7 +61,7 @@ class GerritReporter(BaseReporter): item.change._ref_sha = item.change.project.source.getRefSha( item.change.project, 'refs/heads/' + item.change.branch) - return self.connection.review(item.change, message, self.config, + return self.connection.review(item, message, self.config, comments, zuul_event_id=item.event) def getSubmitAllowNeeds(self): diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py index df33b443f..ad7fcd2fb 100644 --- a/zuul/driver/gerrit/gerritsource.py +++ b/zuul/driver/gerrit/gerritsource.py @@ -75,7 +75,7 @@ class GerritSource(BaseSource): if not results: return None change = self.connection._getChange( - results[0]['number'], results[0]['currentPatchSet']['number']) + results[0].number, results[0].current_patchset) return change def getChangesDependingOn(self, change, projects, tenant): @@ -89,7 +89,7 @@ class GerritSource(BaseSource): results = self.connection.simpleQuery(query) seen = set() for result in results: - for match in find_dependency_headers(result['commitMessage']): + for match in find_dependency_headers(result.message): found = False for uri in change.uris: if uri in match: @@ -97,13 +97,12 @@ class GerritSource(BaseSource): break if not found: continue - key = (str(result['number']), - str(result['currentPatchSet']['number'])) + key = (result.number, result.current_patchset) if key in seen: continue seen.add(key) change = self.connection._getChange( - result['number'], result['currentPatchSet']['number']) + result.number, result.current_patchset) changes.append(change) return changes diff --git a/zuul/driver/gerrit/gerrittrigger.py b/zuul/driver/gerrit/gerrittrigger.py index 67608ad81..88d198d32 100644 --- a/zuul/driver/gerrit/gerrittrigger.py +++ b/zuul/driver/gerrit/gerrittrigger.py @@ -56,6 +56,8 @@ class GerritTrigger(BaseTrigger): reject_approvals=to_list( trigger.get('reject-approval') ), + uuid=trigger.get('uuid'), + scheme=trigger.get('scheme'), ignore_deletes=ignore_deletes ) efilters.append(f) @@ -80,7 +82,10 @@ def getSchema(): 'change-restored', 'change-merged', 'comment-added', - 'ref-updated')), + 'ref-updated', + 'pending-check')), + 'uuid': str, + 'scheme': str, 'comment_filter': scalar_or_list(str), 'comment': scalar_or_list(str), 'email_filter': scalar_or_list(str), diff --git a/zuul/model.py b/zuul/model.py index 82c8821d0..91c492980 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -4674,6 +4674,79 @@ class WebInfo(object): return d +class HoldRequest(object): + def __init__(self): + self.lock = None + self.stat = None + self.id = None + self.tenant = None + self.project = None + self.job = None + self.ref_filter = None + self.reason = None + self.node_expiration = None + # When max_count == current_count, hold request can no longer be used. + self.max_count = 1 + self.current_count = 0 + + def __str__(self): + return "<HoldRequest %s: tenant=%s project=%s job=%s ref_filter=%s>" \ + % (self.id, self.tenant, self.project, self.job, self.ref_filter) + + @staticmethod + def fromDict(data): + ''' + Return a new object from the given data dictionary. + ''' + obj = HoldRequest() + obj.tenant = data.get('tenant') + obj.project = data.get('project') + obj.job = data.get('job') + obj.ref_filter = data.get('ref_filter') + obj.max_count = data.get('max_count') + obj.current_count = data.get('current_count') + obj.reason = data.get('reason') + obj.node_expiration = data.get('node_expiration') + return obj + + def toDict(self): + ''' + Return a dictionary representation of the object. + ''' + d = dict() + d['id'] = self.id + d['tenant'] = self.tenant + d['project'] = self.project + d['job'] = self.job + d['ref_filter'] = self.ref_filter + d['max_count'] = self.max_count + d['current_count'] = self.current_count + d['reason'] = self.reason + d['node_expiration'] = self.node_expiration + return d + + def updateFromDict(self, d): + ''' + Update current object with data from the given dictionary. + ''' + self.tenant = d.get('tenant') + self.project = d.get('project') + self.job = d.get('job') + self.ref_filter = d.get('ref_filter') + self.max_count = d.get('max_count', 1) + self.current_count = d.get('current_count', 0) + self.reason = d.get('reason') + self.node_expiration = d.get('node_expiration') + + def serialize(self): + ''' + Return a representation of the object as a string. + + Used for storing the object data in ZooKeeper. + ''' + return json.dumps(self.toDict()).encode('utf8') + + # AuthZ models class AuthZRule(object): diff --git a/zuul/nodepool.py b/zuul/nodepool.py index 64450e87d..703e7937f 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -168,38 +168,46 @@ class Nodepool(object): except Exception: log.exception("Unable to unlock node request %s", request) - def holdNodeSet(self, nodeset, autohold_key): + def holdNodeSet(self, nodeset, request): ''' Perform a hold on the given set of nodes. :param NodeSet nodeset: The object containing the set of nodes to hold. - :param set autohold_key: A set with the tenant/project/job names - associated with the given NodeSet. + :param HoldRequest request: Hold request associated with the NodeSet ''' self.log.info("Holding nodeset %s" % (nodeset,)) - (hold_iterations, - reason, - node_hold_expiration) = self.sched.autohold_requests[autohold_key] nodes = nodeset.getNodes() for node in nodes: if node.lock is None: raise Exception("Node %s is not locked" % (node,)) node.state = model.STATE_HOLD - node.hold_job = " ".join(autohold_key) - node.comment = reason - if node_hold_expiration: - node.hold_expiration = node_hold_expiration + node.hold_job = " ".join([request.tenant, + request.project, + request.job, + request.ref_filter]) + node.comment = request.reason + if request.node_expiration: + node.hold_expiration = request.node_expiration self.sched.zk.storeNode(node) - # We remove the autohold when the number of nodes in hold - # is equal to or greater than (run iteration count can be - # altered) the number of nodes used in a single job run - # times the number of run iterations requested. - nodes_in_hold = self.sched.zk.heldNodeCount(autohold_key) - if nodes_in_hold >= len(nodes) * hold_iterations: - self.log.debug("Removing autohold for %s", autohold_key) - del self.sched.autohold_requests[autohold_key] + request.current_count += 1 + + # Give ourselves a few seconds to try to obtain the lock rather than + # immediately give up. + self.sched.zk.lockHoldRequest(request, timeout=5) + + try: + self.sched.zk.storeHoldRequest(request) + except Exception: + # If we fail to update the request count, we won't consider it + # a real autohold error by passing the exception up. It will + # just get used more than the original count specified. + self.log.exception("Unable to update hold request %s:", request) + finally: + # Although any exceptions thrown here are handled higher up in + # _doBuildCompletedEvent, we always want to try to unlock it. + self.sched.zk.unlockHoldRequest(request) def useNodeSet(self, nodeset, build_set=None, event=None): self.log.info("Setting nodeset %s in use" % (nodeset,)) diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py index 9d1b877dc..45417a805 100644 --- a/zuul/rpcclient.py +++ b/zuul/rpcclient.py @@ -62,6 +62,10 @@ class RPCClient(object): 'node_hold_expiration': node_hold_expiration} return not self.submitJob('zuul:autohold', data).failure + def autohold_delete(self, request_id): + data = {'request_id': request_id} + return not self.submitJob('zuul:autohold_delete', data).failure + # todo allow filtering per tenant, like in the REST API def autohold_list(self, *args, **kwargs): data = {} diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index 2e1441323..eecf559e3 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -34,6 +34,7 @@ class RPCListener(object): self.jobs = {} functions = [ 'autohold', + 'autohold_delete', 'autohold_list', 'allowed_labels_get', 'dequeue', @@ -92,16 +93,19 @@ class RPCListener(object): return job.sendWorkComplete() - def handle_autohold_list(self, job): - req = {} - - # The json.dumps() call cannot handle dict keys that are not strings - # so we convert our key to a CSV string that the caller can parse. - for key, value in self.sched.autohold_requests.items(): - new_key = ','.join(key) - req[new_key] = value + def handle_autohold_delete(self, job): + args = json.loads(job.arguments) + request_id = args['request_id'] + try: + self.sched.autohold_delete(request_id) + except Exception as e: + job.sendWorkException(str(e).encode('utf8')) + return + job.sendWorkComplete() - job.sendWorkComplete(json.dumps(req)) + def handle_autohold_list(self, job): + data = self.sched.autohold_list() + job.sendWorkComplete(json.dumps(data)) def handle_autohold(self, job): args = json.loads(job.arguments) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e5d4da5da..ee89c08bb 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -40,7 +40,7 @@ from zuul.lib.logutil import get_annotated_logger from zuul.lib.statsd import get_statsd import zuul.lib.queue import zuul.lib.repl -from zuul.model import Build +from zuul.model import Build, HoldRequest COMMANDS = ['full-reconfigure', 'stop', 'repl', 'norepl'] @@ -326,7 +326,6 @@ class Scheduler(threading.Thread): self.zuul_version = zuul_version.release_string self.last_reconfigured = None self.tenant_last_reconfigured = {} - self.autohold_requests = {} self.use_relative_priority = False if self.config.has_option('scheduler', 'relative_priority'): if self.config.getboolean('scheduler', 'relative_priority'): @@ -555,12 +554,57 @@ class Scheduler(threading.Thread): def autohold(self, tenant_name, project_name, job_name, ref_filter, reason, count, node_hold_expiration): key = (tenant_name, project_name, job_name, ref_filter) - if count == 0 and key in self.autohold_requests: - self.log.debug("Removing autohold for %s", key) - del self.autohold_requests[key] - else: - self.log.debug("Autohold requested for %s", key) - self.autohold_requests[key] = (count, reason, node_hold_expiration) + self.log.debug("Autohold requested for %s", key) + + request = HoldRequest() + request.tenant = tenant_name + request.project = project_name + request.job = job_name + request.ref_filter = ref_filter + request.reason = reason + request.max_count = count + request.node_expiration = node_hold_expiration + + # No need to lock it since we are creating a new one. + self.zk.storeHoldRequest(request) + + def autohold_list(self): + ''' + Return current hold requests as a list of dicts. + ''' + data = [] + for request_id in self.zk.getHoldRequests(): + request = self.zk.getHoldRequest(request_id) + if not request: + continue + data.append(request.toDict()) + return data + + def autohold_delete(self, hold_request_id): + ''' + Delete an autohold request. + + :param str hold_request_id: The unique ID of the request to delete. + ''' + try: + hold_request = self.zk.getHoldRequest(hold_request_id) + except Exception: + self.log.exception( + "Error retrieving autohold ID %s:", hold_request_id) + + if not hold_request: + self.log.info("Ignored request to remove invalid autohold ID %s", + hold_request_id) + return + + # (TODO): Release any nodes held for this request here + + self.log.debug("Removing autohold %s", hold_request) + try: + self.zk.deleteHoldRequest(hold_request) + except Exception: + self.log.exception( + "Error removing autohold request %s:", hold_request) def promote(self, tenant_name, pipeline_name, change_ids): event = PromoteEvent(tenant_name, pipeline_name, change_ids) @@ -1232,7 +1276,7 @@ class Scheduler(threading.Thread): return pipeline.manager.onBuildPaused(event.build) - def _getAutoholdRequestKey(self, build): + def _getAutoholdRequest(self, build): change = build.build_set.item.change autohold_key_base = (build.pipeline.tenant.name, @@ -1254,16 +1298,6 @@ class Scheduler(threading.Thread): CHANGE = 2 REF = 3 - def autohold_key_base_issubset(base, request_key): - """check whether the given key is a subset of the build key""" - index = 0 - base_len = len(base) - while index < base_len: - if base[index] != request_key[index]: - return False - index += 1 - return True - # Do a partial match of the autohold key against all autohold # requests, ignoring the last element of the key (ref filter), # and finally do a regex match between ref filter from @@ -1271,13 +1305,23 @@ class Scheduler(threading.Thread): # if it matches. Lastly, make sure that we match the most # specific autohold request by comparing "scopes" # of requests - the most specific is selected. - autohold_key = None + autohold = None scope = Scope.NONE self.log.debug("Checking build autohold key %s", autohold_key_base) - for request in self.autohold_requests: - ref_filter = request[-1] - if not autohold_key_base_issubset(autohold_key_base, request) \ - or not re.match(ref_filter, change.ref): + for request_id in self.zk.getHoldRequests(): + request = self.zk.getHoldRequest(request_id) + if not request: + continue + ref_filter = request.ref_filter + + if request.current_count >= request.max_count: + # This request has been used the max number of times + continue + elif not (request.tenant == autohold_key_base[0] and + request.project == autohold_key_base[1] and + request.job == autohold_key_base[2]): + continue + elif not re.match(ref_filter, change.ref): continue if ref_filter == ".*": @@ -1291,9 +1335,9 @@ class Scheduler(threading.Thread): autohold_key_base, candidate_scope) if candidate_scope > scope: scope = candidate_scope - autohold_key = request + autohold = request - return autohold_key + return autohold def _processAutohold(self, build): # We explicitly only want to hold nodes for jobs if they have @@ -1302,10 +1346,10 @@ class Scheduler(threading.Thread): if build.result not in hold_list: return - autohold_key = self._getAutoholdRequestKey(build) - self.log.debug("Got autohold key %s", autohold_key) - if autohold_key is not None: - self.nodepool.holdNodeSet(build.nodeset, autohold_key) + request = self._getAutoholdRequest(build) + self.log.debug("Got autohold %s", request) + if request is not None: + self.nodepool.holdNodeSet(build.nodeset, request) def _doBuildCompletedEvent(self, event): build = event.build diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index 4f6ce1384..4c5447365 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -417,19 +417,19 @@ class ZuulWebAPI(object): else: payload = json.loads(job.data[0]) result = [] - for key in payload: - _tenant, _project, job, ref_filter = key.split(',') - count, reason, hold_expiration = payload[key] - if tenant == _tenant: - if project is None or _project.endswith(project): + for request in payload: + if tenant == request['tenant']: + if (project is None or + request['project'].endswith(project)): result.append( - {'tenant': _tenant, - 'project': _project, - 'job': job, - 'ref_filter': ref_filter, - 'count': count, - 'reason': reason, - 'node_hold_expiration': hold_expiration}) + {'tenant': request['tenant'], + 'project': request['project'], + 'job': request['job'], + 'ref_filter': request['ref_filter'], + 'count': request['max_count'], + 'reason': request['reason'], + 'node_hold_expiration': request['node_expiration'] + }) return result @cherrypy.expose @@ -984,7 +984,7 @@ class ZuulWeb(object): # instanciate handlers self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port, ssl_key, ssl_cert, ssl_ca) - self.zk = zuul.zk.ZooKeeper() + self.zk = zuul.zk.ZooKeeper(enable_cache=True) if zk_hosts: self.zk.connect(hosts=zk_hosts, read_only=True) self.connections = connections diff --git a/zuul/zk.py b/zuul/zk.py index 433fc983f..e3691ce95 100644 --- a/zuul/zk.py +++ b/zuul/zk.py @@ -17,6 +17,7 @@ import time from kazoo.client import KazooClient, KazooState from kazoo import exceptions as kze from kazoo.handlers.threading import KazooTimeoutError +from kazoo.recipe.cache import TreeCache, TreeEvent from kazoo.recipe.lock import Lock import zuul.model @@ -43,17 +44,31 @@ class ZooKeeper(object): REQUEST_ROOT = '/nodepool/requests' REQUEST_LOCK_ROOT = "/nodepool/requests-lock" NODE_ROOT = '/nodepool/nodes' + HOLD_REQUEST_ROOT = '/zuul/hold-requests' # Log zookeeper retry every 10 seconds retry_log_rate = 10 - def __init__(self): + def __init__(self, enable_cache=True): ''' Initialize the ZooKeeper object. + + :param bool enable_cache: When True, enables caching of ZooKeeper + objects (e.g., HoldRequests). ''' self.client = None self._became_lost = False self._last_retry_log = 0 + self.enable_cache = enable_cache + + # The caching model we use is designed around handing out model + # data as objects. To do this, we use two caches: one is a TreeCache + # which contains raw znode data (among other details), and one for + # storing that data serialized as objects. This allows us to return + # objects from the APIs, and avoids calling the methods to serialize + # the data into objects more than once. + self._hold_request_tree = None + self._cached_hold_requests = {} def _dictToStr(self, data): return json.dumps(data).encode('utf8') @@ -125,6 +140,67 @@ class ZooKeeper(object): except KazooTimeoutError: self.logConnectionRetryEvent() + if self.enable_cache: + self._hold_request_tree = TreeCache(self.client, + self.HOLD_REQUEST_ROOT) + self._hold_request_tree.listen_fault(self.cacheFaultListener) + self._hold_request_tree.listen(self.holdRequestCacheListener) + self._hold_request_tree.start() + + def cacheFaultListener(self, e): + self.log.exception(e) + + def holdRequestCacheListener(self, event): + ''' + Keep the hold request object cache in sync with the TreeCache. + ''' + try: + self._holdRequestCacheListener(event) + except Exception: + self.log.exception( + "Exception in hold request cache update for event: %s", event) + + def _holdRequestCacheListener(self, event): + if hasattr(event.event_data, 'path'): + # Ignore root node + path = event.event_data.path + if path == self.HOLD_REQUEST_ROOT: + return + + if event.event_type not in (TreeEvent.NODE_ADDED, + TreeEvent.NODE_UPDATED, + TreeEvent.NODE_REMOVED): + return + + path = event.event_data.path + request_id = path.rsplit('/', 1)[1] + + if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED): + # Requests with no data are invalid + if not event.event_data.data: + return + + # Perform an in-place update of the already cached request + d = self._bytesToDict(event.event_data.data) + old_request = self._cached_hold_requests.get(request_id) + if old_request: + if event.event_data.stat.version <= old_request.stat.version: + # Don't update to older data + return + old_request.updateFromDict(d) + old_request.stat = event.event_data.stat + else: + request = zuul.model.HoldRequest.fromDict(d) + request.id = request_id + request.stat = event.event_data.stat + self._cached_hold_requests[request_id] = request + + elif event.event_type == TreeEvent.NODE_REMOVED: + try: + del self._cached_hold_requests[request_id] + except KeyError: + pass + def disconnect(self): ''' Close the ZooKeeper cluster connection. @@ -132,6 +208,10 @@ class ZooKeeper(object): You should call this method if you used connect() to establish a cluster connection. ''' + if self._hold_request_tree is not None: + self._hold_request_tree.close() + self._hold_request_tree = None + if self.client is not None and self.client.connected: self.client.stop() self.client.close() @@ -459,6 +539,106 @@ class ZooKeeper(object): if node: yield node + def getHoldRequests(self): + ''' + Get the current list of all hold requests. + ''' + try: + return self.client.get_children(self.HOLD_REQUEST_ROOT) + except kze.NoNodeError: + return [] + + def getHoldRequest(self, hold_request_id): + path = self.HOLD_REQUEST_ROOT + "/" + hold_request_id + try: + data, stat = self.client.get(path) + except kze.NoNodeError: + return None + if not data: + return None + + obj = zuul.model.HoldRequest.fromDict(self._strToDict(data)) + obj.id = hold_request_id + obj.stat = stat + return obj + + def storeHoldRequest(self, hold_request): + ''' + Create or update a hold request. + + If this is a new request with no value for the `id` attribute of the + passed in request, then `id` will be set with the unique request + identifier after successful creation. + + :param HoldRequest hold_request: Object representing the hold request. + ''' + if hold_request.id is None: + path = self.client.create( + self.HOLD_REQUEST_ROOT + "/", + value=hold_request.serialize(), + sequence=True, + makepath=True) + hold_request.id = path.split('/')[-1] + else: + path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id + self.client.set(path, hold_request.serialize()) + + def deleteHoldRequest(self, hold_request): + ''' + Delete a hold request. + + :param HoldRequest hold_request: Object representing the hold request. + ''' + path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id + try: + self.client.delete(path, recursive=True) + except kze.NoNodeError: + pass + + def lockHoldRequest(self, request, blocking=True, timeout=None): + ''' + Lock a node request. + + This will set the `lock` attribute of the request object when the + lock is successfully acquired. + + :param HoldRequest request: The hold request to lock. + ''' + if not request.id: + raise LockException( + "Hold request without an ID cannot be locked: %s" % request) + + path = "%s/%s/lock" % (self.HOLD_REQUEST_ROOT, request.id) + try: + lock = Lock(self.client, path) + have_lock = lock.acquire(blocking, timeout) + except kze.LockTimeout: + raise LockException( + "Timeout trying to acquire lock %s" % path) + + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + raise LockException("Did not get lock on %s" % path) + + request.lock = lock + + def unlockHoldRequest(self, request): + ''' + Unlock a hold request. + + The request must already have been locked. + + :param HoldRequest request: The request to unlock. + + :raises: ZKLockException if the request is not currently locked. + ''' + if request.lock is None: + raise LockException( + "Request %s does not hold a lock" % request) + request.lock.release() + request.lock = None + class Launcher(): ''' |