summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
Diffstat (limited to 'zuul')
-rwxr-xr-xzuul/ansible/base/library/zuul_console.py17
-rwxr-xr-xzuul/cmd/client.py32
-rwxr-xr-xzuul/cmd/scheduler.py2
-rw-r--r--zuul/driver/gerrit/__init__.py19
-rw-r--r--zuul/driver/gerrit/gerritconnection.py589
-rw-r--r--zuul/driver/gerrit/gerritmodel.py137
-rw-r--r--zuul/driver/gerrit/gerritreporter.py2
-rw-r--r--zuul/driver/gerrit/gerritsource.py9
-rw-r--r--zuul/driver/gerrit/gerrittrigger.py7
-rw-r--r--zuul/model.py73
-rw-r--r--zuul/nodepool.py44
-rw-r--r--zuul/rpcclient.py4
-rw-r--r--zuul/rpclistener.py22
-rw-r--r--zuul/scheduler.py104
-rwxr-xr-xzuul/web/__init__.py26
-rw-r--r--zuul/zk.py182
16 files changed, 1015 insertions, 254 deletions
diff --git a/zuul/ansible/base/library/zuul_console.py b/zuul/ansible/base/library/zuul_console.py
index 6f988e682..9dffbbc3a 100755
--- a/zuul/ansible/base/library/zuul_console.py
+++ b/zuul/ansible/base/library/zuul_console.py
@@ -185,7 +185,7 @@ class Server(object):
console = self.chunkConsole(conn, log_uuid)
if console:
break
- conn.send('[Zuul] Log not found\n')
+ conn.send(b'[Zuul] Log not found\n')
time.sleep(0.5)
while True:
if self.followConsole(console, conn):
@@ -307,6 +307,19 @@ def main():
from ansible.module_utils.basic import * # noqa
from ansible.module_utils.basic import AnsibleModule
+#
+# To debug this, you can run it with arguments specified on the
+# command-line in a json file. e.g.
+#
+# $ cat args.json
+# {"ANSIBLE_MODULE_ARGS": {
+# "state": "present"
+# }
+# }
+#
+# Then from a virtualenv with Ansible installed you can run
+#
+# $ ./ansible-env/bin/python ./zuul_console.py args.json
+#
if __name__ == '__main__':
main()
-# test()
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 404fbc203..bef204dde 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -202,6 +202,13 @@ class Client(zuul.cmd.ZuulApp):
required=False, type=int, default=0)
cmd_autohold.set_defaults(func=self.autohold)
+ cmd_autohold_delete = subparsers.add_parser(
+ 'autohold-delete', help='delete autohold request')
+ cmd_autohold_delete.set_defaults(func=self.autohold_delete)
+ cmd_autohold_delete.add_argument('--id',
+ help='request ID',
+ required=True)
+
cmd_autohold_list = subparsers.add_parser(
'autohold-list', help='list autohold requests')
cmd_autohold_list.add_argument('--tenant', help='tenant name',
@@ -430,28 +437,35 @@ class Client(zuul.cmd.ZuulApp):
node_hold_expiration=node_hold_expiration)
return r
+ def autohold_delete(self):
+ client = self.get_client()
+ return client.autohold_delete(self.args.id)
+
def autohold_list(self):
client = self.get_client()
autohold_requests = client.autohold_list(tenant=self.args.tenant)
- if len(autohold_requests.keys()) == 0:
+ if not autohold_requests:
print("No autohold requests found")
return True
table = prettytable.PrettyTable(
field_names=[
- 'Tenant', 'Project', 'Job', 'Ref Filter', 'Count', 'Reason'
+ 'ID', 'Tenant', 'Project', 'Job', 'Ref Filter',
+ 'Max Count', 'Reason'
])
- for key, value in autohold_requests.items():
- # The key comes to us as a CSV string because json doesn't like
- # non-str keys.
- tenant_name, project_name, job_name, ref_filter = key.split(',')
- count, reason, node_hold_expiration = value
-
+ for request in autohold_requests:
table.add_row([
- tenant_name, project_name, job_name, ref_filter, count, reason
+ request['id'],
+ request['tenant'],
+ request['project'],
+ request['job'],
+ request['ref_filter'],
+ request['max_count'],
+ request['reason'],
])
+
print(table)
return True
diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py
index 67484bf87..1eefbe713 100755
--- a/zuul/cmd/scheduler.py
+++ b/zuul/cmd/scheduler.py
@@ -136,7 +136,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
merger = zuul.merger.client.MergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
- zookeeper = zuul.zk.ZooKeeper()
+ zookeeper = zuul.zk.ZooKeeper(enable_cache=True)
zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None)
if not zookeeper_hosts:
raise Exception("The zookeeper hosts config value is required")
diff --git a/zuul/driver/gerrit/__init__.py b/zuul/driver/gerrit/__init__.py
index c720ba6a3..4930c7631 100644
--- a/zuul/driver/gerrit/__init__.py
+++ b/zuul/driver/gerrit/__init__.py
@@ -18,12 +18,31 @@ from zuul.driver.gerrit import gerritconnection
from zuul.driver.gerrit import gerrittrigger
from zuul.driver.gerrit import gerritsource
from zuul.driver.gerrit import gerritreporter
+from zuul.driver.util import to_list
class GerritDriver(Driver, ConnectionInterface, TriggerInterface,
SourceInterface, ReporterInterface):
name = 'gerrit'
+ def reconfigure(self, tenant):
+ connection_checker_map = {}
+ for pipeline in tenant.layout.pipelines.values():
+ for trigger in pipeline.triggers:
+ if isinstance(trigger, gerrittrigger.GerritTrigger):
+ con = trigger.connection
+ checkers = connection_checker_map.setdefault(con, [])
+ for trigger_item in to_list(trigger.config):
+ if trigger_item['event'] == 'pending-check':
+ d = {}
+ if 'uuid' in trigger_item:
+ d['uuid'] = trigger_item['uuid']
+ elif 'scheme' in trigger_item:
+ d['scheme'] = trigger_item['scheme']
+ checkers.append(d)
+ for (con, checkers) in connection_checker_map.items():
+ con.setWatchedCheckers(checkers)
+
def getConnection(self, name, config):
return gerritconnection.GerritConnection(self, name, config)
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 1c4173005..8a64bad7c 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -13,36 +13,100 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
+import datetime
+import itertools
import json
+import logging
+import paramiko
+import pprint
+import queue
import re
import re2
+import requests
import select
+import shlex
import threading
import time
import urllib
-import paramiko
-import logging
-import pprint
-import shlex
-import queue
import urllib.parse
-import requests
from typing import Dict, List
from uuid import uuid4
+from zuul import version as zuul_version
from zuul.connection import BaseConnection
+from zuul.driver.gerrit.auth import FormAuth
+from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Tag, Branch, Project
-from zuul import exceptions
-from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
-from zuul.driver.gerrit.auth import FormAuth
-from zuul import version as zuul_version
# HTTP timeout in seconds
TIMEOUT = 30
+class GerritChangeData(object):
+ """Compatability layer for SSH/HTTP
+
+ This class holds the raw data returned from a change query over
+ SSH or HTTP. Most of the work of parsing the data and storing it
+ on the change is in the gerritmodel.GerritChange class, however
+ this does perform a small amount of parsing of dependencies since
+ they are handled outside of that class. This provides an API to
+ that data independent of the source.
+
+ """
+
+ SSH = 1
+ HTTP = 2
+
+ def __init__(self, fmt, data, related=None):
+ self.format = fmt
+ self.data = data
+
+ if fmt == self.SSH:
+ self.parseSSH(data)
+ else:
+ self.parseHTTP(data)
+ if related:
+ self.parseRelatedHTTP(data, related)
+
+ def parseSSH(self, data):
+ self.needed_by = []
+ self.depends_on = None
+ self.message = data['commitMessage']
+ self.current_patchset = str(data['currentPatchSet']['number'])
+ self.number = str(data['number'])
+
+ if 'dependsOn' in data:
+ parts = data['dependsOn'][0]['ref'].split('/')
+ self.depends_on = (parts[3], parts[4])
+
+ for needed in data.get('neededBy', []):
+ parts = needed['ref'].split('/')
+ self.needed_by.append((parts[3], parts[4]))
+
+ def parseHTTP(self, data):
+ rev = data['revisions'][data['current_revision']]
+ self.message = rev['commit']['message']
+ self.current_patchset = str(rev['_number'])
+ self.number = str(data['_number'])
+
+ def parseRelatedHTTP(self, data, related):
+ self.needed_by = []
+ self.depends_on = None
+ current_rev = data['revisions'][data['current_revision']]
+ for change in related['changes']:
+ for parent in current_rev['commit']['parents']:
+ if change['commit']['commit'] == parent['commit']:
+ self.depends_on = (change['_change_number'],
+ change['_revision_number'])
+ break
+ else:
+ self.needed_by.append((change['_change_number'],
+ change['_revision_number']))
+
+
class GerritEventConnector(threading.Thread):
"""Move events from Gerrit to the scheduler."""
@@ -80,6 +144,7 @@ class GerritEventConnector(threading.Thread):
log = get_annotated_logger(self.log, event)
event.type = data.get('type')
+ event.uuid = data.get('uuid')
# This catches when a change is merged, as it could potentially
# have merged layout info which will need to be read in.
# Ideally this would be done with a refupdate event so as to catch
@@ -131,6 +196,7 @@ class GerritEventConnector(threading.Thread):
'ref-replication-scheduled': None,
'topic-changed': 'changer',
'project-created': None, # Gerrit 2.14
+ 'pending-check': None, # Gerrit 3.0+
}
event.account = None
if event.type in accountfield_from_type:
@@ -294,6 +360,58 @@ class GerritWatcher(threading.Thread):
self._stopped = True
+class GerritPoller(threading.Thread):
+ # Poll gerrit without stream-events
+ log = logging.getLogger("gerrit.GerritPoller")
+ poll_interval = 30
+
+ def __init__(self, connection):
+ threading.Thread.__init__(self)
+ self.connection = connection
+ self._stopped = False
+ self._stop_event = threading.Event()
+
+ def _makeEvent(self, change, uuid, check):
+ return {'type': 'pending-check',
+ 'uuid': uuid,
+ 'change': {
+ 'project': change['patch_set']['repository'],
+ 'number': change['patch_set']['change_number'],
+ },
+ 'patchSet': {
+ 'number': change['patch_set']['patch_set_id'],
+ }}
+
+ def _run(self):
+ try:
+ for checker in self.connection.watched_checkers:
+ changes = self.connection.get(
+ 'plugins/checks/checks.pending/?'
+ 'query=checker:%s+(state:NOT_STARTED)' % checker)
+ for change in changes:
+ for uuid, check in change['pending_checks'].items():
+ event = self._makeEvent(change, uuid, check)
+ self.connection.addEvent(event)
+ except Exception:
+ self.log.exception("Exception on Gerrit poll with %s:",
+ self.connection.connection_name)
+
+ def run(self):
+ last_start = time.time()
+ while not self._stopped:
+ next_start = last_start + self.poll_interval
+ self._stop_event.wait(max(next_start - time.time(), 0))
+ if self._stopped:
+ break
+ last_start = time.time()
+ self._run()
+
+ def stop(self):
+ self.log.debug("Stopping watcher")
+ self._stopped = True
+ self._stop_event.set()
+
+
class GerritConnection(BaseConnection):
driver_name = 'gerrit'
log = logging.getLogger("zuul.GerritConnection")
@@ -305,6 +423,7 @@ class GerritConnection(BaseConnection):
r"@{|\.\.|\.$|^@$|/$|^/|//+") # everything else we can check with re2
replication_timeout = 300
replication_retry_interval = 5
+ _poller_class = GerritPoller
def __init__(self, driver, connection_name, connection_config):
super(GerritConnection, self).__init__(driver, connection_name,
@@ -324,9 +443,20 @@ class GerritConnection(BaseConnection):
self.port = int(self.connection_config.get('port', 29418))
self.keyfile = self.connection_config.get('sshkey', None)
self.keepalive = int(self.connection_config.get('keepalive', 60))
+ # TODO(corvus): Document this when the checks api is stable;
+ # it's not useful without it.
+ self.enable_stream_events = self.connection_config.get(
+ 'stream_events', True)
+ if self.enable_stream_events not in [
+ 'true', 'True', '1', 1, 'TRUE', True]:
+ self.enable_stream_events = False
self.watcher_thread = None
+ self.poller_thread = None
self.event_queue = queue.Queue()
self.client = None
+ self.watched_checkers = []
+ self.project_checker_map = {}
+ self.version = (0, 0, 0)
self.baseurl = self.connection_config.get(
'baseurl', 'https://%s' % self.server).rstrip('/')
@@ -362,6 +492,42 @@ class GerritConnection(BaseConnection):
self.auth = authclass(
self.user, self.password)
+ def setWatchedCheckers(self, checkers_to_watch):
+ self.log.debug("Setting watched checkers to %s", checkers_to_watch)
+ self.watched_checkers = set()
+ self.project_checker_map = {}
+ schemes_to_watch = set()
+ uuids_to_watch = set()
+ for x in checkers_to_watch:
+ if 'scheme' in x:
+ schemes_to_watch.add(x['scheme'])
+ if 'uuid' in x:
+ uuids_to_watch.add(x['uuid'])
+ if schemes_to_watch:
+ # get a list of all configured checkers
+ try:
+ configured_checkers = self.get('plugins/checks/checkers/')
+ except Exception:
+ self.log.exception("Unable to get checkers")
+ configured_checkers = []
+
+ # filter it through scheme matches in checkers_to_watch
+ for checker in configured_checkers:
+ if checker['status'] != 'ENABLED':
+ continue
+ checker_scheme, checker_id = checker['uuid'].split(':')
+ repo = checker['repository']
+ repo = self.canonical_hostname + '/' + repo
+ # map scheme matches to project names
+ if checker_scheme in schemes_to_watch:
+ repo_checkers = self.project_checker_map.setdefault(
+ repo, set())
+ repo_checkers.add(checker['uuid'])
+ self.watched_checkers.add(checker['uuid'])
+ # add uuids from checkers_to_watch
+ for x in uuids_to_watch:
+ self.watched_checkers.add(x)
+
def toDict(self):
d = super().toDict()
d.update({
@@ -375,6 +541,28 @@ class GerritConnection(BaseConnection):
def url(self, path):
return self.baseurl + '/a/' + path
+ def get(self, path):
+ url = self.url(path)
+ self.log.debug('GET: %s' % (url,))
+ r = self.session.get(
+ url,
+ verify=self.verify_ssl,
+ auth=self.auth, timeout=TIMEOUT,
+ headers={'User-Agent': self.user_agent})
+ self.log.debug('Received: %s %s' % (r.status_code, r.text,))
+ if r.status_code != 200:
+ raise Exception("Received response %s" % (r.status_code,))
+ ret = None
+ if r.text and len(r.text) > 4:
+ try:
+ ret = json.loads(r.text[4:])
+ except Exception:
+ self.log.exception(
+ "Unable to parse result %s from post to %s" %
+ (r.text, url))
+ raise
+ return ret
+
def post(self, path, data):
url = self.url(path)
self.log.debug('POST: %s' % (url,))
@@ -510,7 +698,7 @@ class GerritConnection(BaseConnection):
log.debug("Updating %s: Running query %s to find needed changes",
change, query)
records.extend(self.simpleQuery(query, event=event))
- return records
+ return [(x.number, x.current_patchset) for x in records]
def _getNeededByFromCommit(self, change_id, change, event):
log = get_annotated_logger(self.log, event)
@@ -522,11 +710,10 @@ class GerritConnection(BaseConnection):
results = self.simpleQuery(query, event=event)
for result in results:
for match in self.depends_on_re.findall(
- result['commitMessage']):
+ result.message):
if match != change_id:
continue
- key = (str(result['number']),
- str(result['currentPatchSet']['number']))
+ key = (result.number, result.current_patchset)
if key in seen:
continue
log.debug("Updating %s: Found change %s,%s "
@@ -534,7 +721,7 @@ class GerritConnection(BaseConnection):
change, key[0], key[1], change_id)
seen.add(key)
records.append(result)
- return records
+ return [(x.number, x.current_patchset) for x in records]
def _updateChange(self, change, event, history):
log = get_annotated_logger(self.log, event)
@@ -562,54 +749,16 @@ class GerritConnection(BaseConnection):
log.info("Updating %s", change)
data = self.queryChange(change.number, event=event)
- change._data = data
-
- if change.patchset is None:
- change.patchset = str(data['currentPatchSet']['number'])
- if 'project' not in data:
- raise exceptions.ChangeNotFound(change.number, change.patchset)
- change.project = self.source.getProject(data['project'])
- change.id = data['id']
- change.branch = data['branch']
- change.url = data['url']
- urlparse = urllib.parse.urlparse(self.baseurl)
- baseurl = "%s%s" % (urlparse.netloc, urlparse.path)
- baseurl = baseurl.rstrip('/')
- change.uris = [
- '%s/%s' % (baseurl, change.number),
- '%s/#/c/%s' % (baseurl, change.number),
- '%s/c/%s/+/%s' % (baseurl, change.project.name, change.number),
- ]
-
- max_ps = 0
- files = []
- for ps in data['patchSets']:
- if str(ps['number']) == change.patchset:
- change.ref = ps['ref']
- change.commit = ps['revision']
- for f in ps.get('files', []):
- files.append(f['file'])
- if int(ps['number']) > int(max_ps):
- max_ps = str(ps['number'])
- if max_ps == change.patchset:
- change.is_current_patchset = True
- else:
- change.is_current_patchset = False
- change.files = files
+ change.update(data, self)
- change.is_merged = self._isMerged(change)
- change.approvals = data['currentPatchSet'].get('approvals', [])
- change.open = data['open']
- change.status = data['status']
- change.owner = data['owner']
- change.message = data['commitMessage']
+ if not change.is_merged:
+ self._updateChangeDependencies(log, change, data, event, history)
- if change.is_merged:
- # This change is merged, so we don't need to look any further
- # for dependencies.
- log.debug("Updating %s: change is merged", change)
- return change
+ self.sched.onChangeUpdated(change, event)
+ return change
+
+ def _updateChangeDependencies(self, log, change, data, event, history):
if history is None:
history = []
else:
@@ -618,9 +767,8 @@ class GerritConnection(BaseConnection):
needs_changes = set()
git_needs_changes = []
- if 'dependsOn' in data:
- parts = data['dependsOn'][0]['ref'].split('/')
- dep_num, dep_ps = parts[3], parts[4]
+ if data.depends_on is not None:
+ dep_num, dep_ps = data.depends_on
log.debug("Updating %s: Getting git-dependent change %s,%s",
change, dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps, history=history,
@@ -634,10 +782,8 @@ class GerritConnection(BaseConnection):
change.git_needs_changes = git_needs_changes
compat_needs_changes = []
- for record in self._getDependsOnFromCommit(data['commitMessage'],
- change, event):
- dep_num = str(record['number'])
- dep_ps = str(record['currentPatchSet']['number'])
+ for (dep_num, dep_ps) in self._getDependsOnFromCommit(
+ change.message, change, event):
log.debug("Updating %s: Getting commit-dependent "
"change %s,%s", change, dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps, history=history,
@@ -649,24 +795,20 @@ class GerritConnection(BaseConnection):
needed_by_changes = set()
git_needed_by_changes = []
- if 'neededBy' in data:
- for needed in data['neededBy']:
- parts = needed['ref'].split('/')
- dep_num, dep_ps = parts[3], parts[4]
- log.debug("Updating %s: Getting git-needed change %s,%s",
- change, dep_num, dep_ps)
- dep = self._getChange(dep_num, dep_ps, history=history,
- event=event)
- if (dep.open and dep.is_current_patchset and
- dep not in needed_by_changes):
- git_needed_by_changes.append(dep)
- needed_by_changes.add(dep)
+ for (dep_num, dep_ps) in data.needed_by:
+ log.debug("Updating %s: Getting git-needed change %s,%s",
+ change, dep_num, dep_ps)
+ dep = self._getChange(dep_num, dep_ps, history=history,
+ event=event)
+ if (dep.open and dep.is_current_patchset and
+ dep not in needed_by_changes):
+ git_needed_by_changes.append(dep)
+ needed_by_changes.add(dep)
change.git_needed_by_changes = git_needed_by_changes
compat_needed_by_changes = []
- for record in self._getNeededByFromCommit(data['id'], change, event):
- dep_num = str(record['number'])
- dep_ps = str(record['currentPatchSet']['number'])
+ for (dep_num, dep_ps) in self._getNeededByFromCommit(
+ change.id, change, event):
log.debug("Updating %s: Getting commit-needed change %s,%s",
change, dep_num, dep_ps)
# Because a commit needed-by may be a cross-repo
@@ -683,10 +825,6 @@ class GerritConnection(BaseConnection):
needed_by_changes.add(dep)
change.compat_needed_by_changes = compat_needed_by_changes
- self.sched.onChangeUpdated(change, event)
-
- return change
-
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
@@ -696,8 +834,7 @@ class GerritConnection(BaseConnection):
return True
data = self.queryChange(change.number)
- change._data = data
- change.is_merged = self._isMerged(change)
+ change.update(data, self)
if change.is_merged:
self.log.debug("Change %s is merged" % (change,))
else:
@@ -717,17 +854,6 @@ class GerritConnection(BaseConnection):
(change))
return False
- def _isMerged(self, change):
- data = change._data
- if not data:
- return False
- status = data.get('status')
- if not status:
- return False
- if status == 'MERGED':
- return True
- return False
-
def _waitForRefSha(self, project: Project,
ref: str, old_sha: str='') -> bool:
# Wait for the ref to show up in the repo
@@ -756,35 +882,9 @@ class GerritConnection(BaseConnection):
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
- data = change._data
- if not data:
- return False
- if 'submitRecords' not in data:
- return False
- try:
- for sr in data['submitRecords']:
- if sr['status'] == 'OK':
- return True
- elif sr['status'] == 'NOT_READY':
- for label in sr['labels']:
- if label['status'] in ['OK', 'MAY']:
- continue
- elif label['status'] in ['NEED', 'REJECT']:
- # It may be our own rejection, so we ignore
- if label['label'] not in allow_needs:
- return False
- continue
- else:
- # IMPOSSIBLE
- return False
- else:
- # CLOSED, RULE_ERROR
- return False
- except Exception:
- log.exception("Exception determining whether change"
- "%s can merge:", change)
- return False
- return True
+ if change.missing_labels < set(allow_needs):
+ return True
+ return False
def getProjectOpenChanges(self, project: Project) -> List[GerritChange]:
# This is a best-effort function in case Gerrit is unable to return
@@ -797,11 +897,10 @@ class GerritConnection(BaseConnection):
for record in data:
try:
changes.append(
- self._getChange(record['number'],
- record['currentPatchSet']['number']))
+ self._getChange(record.number, record.current_patchset))
except Exception:
- self.log.exception("Unable to query change %s" %
- (record.get('number'),))
+ self.log.exception("Unable to query change %s",
+ record.number)
return changes
@staticmethod
@@ -836,17 +935,18 @@ class GerritConnection(BaseConnection):
def eventDone(self):
self.event_queue.task_done()
- def review(self, change, message, action={},
+ def review(self, item, message, action={},
file_comments={}, zuul_event_id=None):
if self.session:
meth = self.review_http
else:
meth = self.review_ssh
- return meth(change, message, action=action,
+ return meth(item, message, action=action,
file_comments=file_comments, zuul_event_id=zuul_event_id)
- def review_ssh(self, change, message, action={},
+ def review_ssh(self, item, message, action={},
file_comments={}, zuul_event_id=None):
+ change = item.change
project = change.project.name
cmd = 'gerrit review --project %s' % project
if message:
@@ -861,14 +961,56 @@ class GerritConnection(BaseConnection):
out, err = self._ssh(cmd, zuul_event_id=zuul_event_id)
return err
- def review_http(self, change, message, action={},
+ def report_checks(self, log, item, changeid, checkinfo):
+ change = item.change
+ checkinfo = checkinfo.copy()
+ uuid = checkinfo.pop('uuid', None)
+ scheme = checkinfo.pop('scheme', None)
+ if uuid is None:
+ uuids = self.project_checker_map.get(
+ change.project.canonical_name, set())
+ for u in uuids:
+ if u.split(':')[0] == scheme:
+ uuid = u
+ break
+ if uuid is None:
+ log.error("Unable to find matching checker for %s %s",
+ item, checkinfo)
+ return
+
+ def fmt(t):
+ return str(datetime.datetime.fromtimestamp(t))
+
+ if item.enqueue_time:
+ checkinfo['started'] = fmt(item.enqueue_time)
+ if item.report_time:
+ checkinfo['finished'] = fmt(item.report_time)
+ url = item.formatStatusUrl()
+ if url:
+ checkinfo['url'] = url
+ if checkinfo:
+ for x in range(1, 4):
+ try:
+ self.post('changes/%s/revisions/%s/checks/%s' %
+ (changeid, change.commit, uuid),
+ checkinfo)
+ break
+ except Exception:
+ log.exception("Error submitting check data to gerrit, "
+ "attempt %s", x)
+ time.sleep(x * 10)
+
+ def review_http(self, item, message, action={},
file_comments={}, zuul_event_id=None):
+ change = item.change
log = get_annotated_logger(self.log, zuul_event_id)
data = dict(message=message,
strict_labels=False)
submit = False
labels = {}
for key, val in action.items():
+ if key == 'checks_api':
+ continue
if val is True:
if key == 'submit':
submit = True
@@ -878,33 +1020,51 @@ class GerritConnection(BaseConnection):
if labels:
data['labels'] = labels
if file_comments:
- data['comments'] = file_comments
+ if self.version >= (2, 15, 0):
+ file_comments = copy.deepcopy(file_comments)
+ url = item.formatStatusUrl()
+ for comments in itertools.chain(file_comments.values()):
+ for comment in comments:
+ comment['robot_id'] = 'zuul'
+ comment['robot_run_id'] = \
+ item.current_build_set.uuid
+ if url:
+ comment['url'] = url
+ data['robot_comments'] = file_comments
+ else:
+ data['comments'] = file_comments
+ data['tag'] = 'autogenerated:zuul:%s' % (item.pipeline.name)
changeid = "%s~%s~%s" % (
urllib.parse.quote(str(change.project), safe=''),
urllib.parse.quote(str(change.branch), safe=''),
change.id)
- for x in range(1, 4):
- try:
- self.post('changes/%s/revisions/%s/review' %
- (changeid, change.commit),
- data)
- break
- except Exception:
- log.exception("Error submitting data to gerrit, attempt %s", x)
- time.sleep(x * 10)
+ if 'checks_api' in action:
+ self.report_checks(log, item, changeid, action['checks_api'])
+ if (message or data.get('labels') or data.get('comments')):
+ for x in range(1, 4):
+ try:
+ self.post('changes/%s/revisions/%s/review' %
+ (changeid, change.commit),
+ data)
+ break
+ except Exception:
+ log.exception(
+ "Error submitting data to gerrit, attempt %s", x)
+ time.sleep(x * 10)
if change.is_current_patchset and submit:
- try:
- self.post('changes/%s/submit' % (changeid,), {})
- except Exception:
- log.exception("Error submitting data to gerrit, attempt %s", x)
- time.sleep(x * 10)
-
- def query(self, query, event=None):
+ for x in range(1, 4):
+ try:
+ self.post('changes/%s/submit' % (changeid,), {})
+ except Exception:
+ log.exception(
+ "Error submitting data to gerrit, attempt %s", x)
+ time.sleep(x * 10)
+
+ def queryChangeSSH(self, number, event=None):
args = '--all-approvals --comments --commit-message'
args += ' --current-patch-set --dependencies --files'
args += ' --patch-sets --submit-records'
- cmd = 'gerrit query --format json %s %s' % (
- args, query)
+ cmd = 'gerrit query --format json %s %s' % (args, number)
out, err = self._ssh(cmd)
if not out:
return False
@@ -919,10 +1079,23 @@ class GerritConnection(BaseConnection):
pprint.pformat(data))
return data
+ def queryChangeHTTP(self, number, event=None):
+ data = self.get('changes/%s?o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&'
+ 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&'
+ 'o=DETAILED_LABELS' % (number,))
+ related = self.get('changes/%s/revisions/%s/related' % (
+ number, data['current_revision']))
+ return data, related
+
def queryChange(self, number, event=None):
- return self.query('change:%s' % number, event=event)
+ if self.session:
+ data, related = self.queryChangeHTTP(number, event=event)
+ return GerritChangeData(GerritChangeData.HTTP, data, related)
+ else:
+ data = self.queryChangeSSH(number, event=event)
+ return GerritChangeData(GerritChangeData.SSH, data)
- def simpleQuery(self, query, event=None):
+ def simpleQuerySSH(self, query, event=None):
def _query_chunk(query, event):
args = '--commit-message --current-patch-set'
@@ -974,9 +1147,63 @@ class GerritConnection(BaseConnection):
"%s %s" % (query, resume), event)
return alldata
+ def simpleQueryHTTP(self, query, event=None):
+ iolog = get_annotated_logger(self.iolog, event)
+ changes = []
+ sortkey = ''
+ done = False
+ offset = 0
+ while not done:
+ # We don't actually want to limit to 500, but that's the
+ # server-side default, and if we don't specify this, we
+ # won't get a _more_changes flag.
+ q = ('changes/?n=500%s&o=CURRENT_REVISION&o=CURRENT_COMMIT&'
+ 'q=%s' % (sortkey, query))
+ iolog.debug('Query: %s', q)
+ batch = self.get(q)
+ iolog.debug("Received data from Gerrit query: \n%s",
+ pprint.pformat(batch))
+ done = True
+ if batch:
+ changes += batch
+ if '_more_changes' in batch[-1]:
+ done = False
+ if '_sortkey' in batch[-1]:
+ sortkey = '&N=%s' % (batch[-1]['_sortkey'],)
+ else:
+ offset += len(batch)
+ sortkey = '&start=%s' % (offset,)
+ return changes
+
+ def simpleQuery(self, query, event=None):
+ if self.session:
+ # None of the users of this method require dependency
+ # data, so we only perform the change query and omit the
+ # related changes query.
+ alldata = self.simpleQueryHTTP(query, event=event)
+ return [GerritChangeData(GerritChangeData.HTTP, data)
+ for data in alldata]
+ else:
+ alldata = self.simpleQuerySSH(query, event=event)
+ return [GerritChangeData(GerritChangeData.SSH, data)
+ for data in alldata]
+
def _uploadPack(self, project: Project) -> str:
- cmd = "git-upload-pack %s" % project.name
- out, err = self._ssh(cmd, "0000")
+ if self.session:
+ url = ('%s/%s/info/refs?service=git-upload-pack' %
+ (self.baseurl, project.name))
+ r = self.session.get(
+ url,
+ verify=self.verify_ssl,
+ auth=self.auth, timeout=TIMEOUT,
+ headers={'User-Agent': self.user_agent})
+ self.iolog.debug('Received: %s %s' % (r.status_code, r.text,))
+ if r.status_code != 200:
+ raise Exception("Received response %s" % (r.status_code,))
+ out = r.text[r.text.find('\n') + 5:]
+ else:
+ cmd = "git-upload-pack %s" % project.name
+ out, err = self._ssh(cmd, "0000")
return out
def _open(self):
@@ -1065,8 +1292,14 @@ class GerritConnection(BaseConnection):
return ret
def getGitUrl(self, project: Project) -> str:
- url = 'ssh://%s@%s:%s/%s' % (self.user, self.server, self.port,
- project.name)
+ if self.session:
+ baseurl = list(urllib.parse.urlparse(self.baseurl))
+ baseurl[1] = '%s:%s@%s' % (self.user, self.password, baseurl[1])
+ baseurl = urllib.parse.urlunparse(baseurl)
+ url = ('%s/%s' % (baseurl, project.name))
+ else:
+ url = 'ssh://%s@%s:%s/%s' % (self.user, self.server, self.port,
+ project.name)
return url
def _getWebUrl(self, project: Project, sha: str=None) -> str:
@@ -1075,14 +1308,37 @@ class GerritConnection(BaseConnection):
project=project.getSafeAttributes(),
sha=sha)
+ def _getRemoteVersion(self):
+ version = self.get('config/server/version')
+ base = version.split('-')[0]
+ parts = base.split('.')
+ major = minor = micro = 0
+ if len(parts) > 0:
+ major = int(parts[0])
+ if len(parts) > 1:
+ minor = int(parts[1])
+ if len(parts) > 2:
+ micro = int(parts[2])
+ self.version = (major, minor, micro)
+ self.log.info("Remote version is: %s (parsed as %s)" %
+ (version, self.version))
+
def onLoad(self):
self.log.debug("Starting Gerrit Connection/Watchers")
- self._start_watcher_thread()
+ try:
+ self._getRemoteVersion()
+ except Exception:
+ self.log.exception("Unable to determine remote Gerrit version")
+
+ if self.enable_stream_events:
+ self._start_watcher_thread()
+ self._start_poller_thread()
self._start_event_connector()
def onStop(self):
self.log.debug("Stopping Gerrit Connection/Watchers")
self._stop_watcher_thread()
+ self._stop_poller_thread()
self._stop_event_connector()
def _stop_watcher_thread(self):
@@ -1100,6 +1356,15 @@ class GerritConnection(BaseConnection):
keepalive=self.keepalive)
self.watcher_thread.start()
+ def _stop_poller_thread(self):
+ if self.poller_thread:
+ self.poller_thread.stop()
+ self.poller_thread.join()
+
+ def _start_poller_thread(self):
+ self.poller_thread = self._poller_class(self)
+ self.poller_thread.start()
+
def _stop_event_connector(self):
if self.gerrit_event_connector:
self.gerrit_event_connector.stop()
diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py
index 46a034a09..3967fbf82 100644
--- a/zuul/driver/gerrit/gerritmodel.py
+++ b/zuul/driver/gerrit/gerritmodel.py
@@ -15,10 +15,14 @@
import copy
import re
import time
+import urllib.parse
+
+import dateutil.parser
from zuul.model import EventFilter, RefFilter
from zuul.model import Change, TriggerEvent
from zuul.driver.util import time_to_seconds
+from zuul import exceptions
EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes
@@ -29,12 +33,129 @@ class GerritChange(Change):
super(GerritChange, self).__init__(project)
self.approvals = []
+ def update(self, data, connection):
+ if data.format == data.SSH:
+ self.updateFromSSH(data.data, connection)
+ else:
+ self.updateFromHTTP(data.data, connection)
+
+ def updateFromSSH(self, data, connection):
+ if self.patchset is None:
+ self.patchset = str(data['currentPatchSet']['number'])
+ if 'project' not in data:
+ raise exceptions.ChangeNotFound(self.number, self.patchset)
+ self.project = connection.source.getProject(data['project'])
+ self.id = data['id']
+ self.branch = data['branch']
+ self.url = data['url']
+ urlparse = urllib.parse.urlparse(connection.baseurl)
+ baseurl = "%s%s" % (urlparse.netloc, urlparse.path)
+ baseurl = baseurl.rstrip('/')
+ self.uris = [
+ '%s/%s' % (baseurl, self.number),
+ '%s/#/c/%s' % (baseurl, self.number),
+ '%s/c/%s/+/%s' % (baseurl, self.project.name, self.number),
+ ]
+
+ max_ps = 0
+ files = []
+ for ps in data['patchSets']:
+ if str(ps['number']) == self.patchset:
+ self.ref = ps['ref']
+ self.commit = ps['revision']
+ for f in ps.get('files', []):
+ files.append(f['file'])
+ if int(ps['number']) > int(max_ps):
+ max_ps = str(ps['number'])
+ if max_ps == self.patchset:
+ self.is_current_patchset = True
+ else:
+ self.is_current_patchset = False
+ self.files = files
+
+ self.is_merged = data.get('status', '') == 'MERGED'
+ self.approvals = data['currentPatchSet'].get('approvals', [])
+ self.open = data['open']
+ self.status = data['status']
+ self.owner = data['owner']
+ self.message = data['commitMessage']
+
+ self.missing_labels = set()
+ for sr in data.get('submitRecords', []):
+ if sr['status'] == 'NOT_READY':
+ for label in sr['labels']:
+ if label['status'] in ['OK', 'MAY']:
+ continue
+ elif label['status'] in ['NEED', 'REJECT']:
+ self.missing_labels.add(label['label'])
+
+ def updateFromHTTP(self, data, connection):
+ urlparse = urllib.parse.urlparse(connection.baseurl)
+ baseurl = "%s%s" % (urlparse.netloc, urlparse.path)
+ baseurl = baseurl.rstrip('/')
+ current_revision = data['revisions'][data['current_revision']]
+ if self.patchset is None:
+ self.patchset = str(current_revision['_number'])
+ self.project = connection.source.getProject(data['project'])
+ self.id = data['change_id']
+ self.branch = data['branch']
+ self.url = '%s/%s' % (baseurl, self.number)
+ self.uris = [
+ '%s/%s' % (baseurl, self.number),
+ '%s/#/c/%s' % (baseurl, self.number),
+ '%s/c/%s/+/%s' % (baseurl, self.project.name, self.number),
+ ]
+
+ files = []
+ if str(current_revision['_number']) == self.patchset:
+ self.ref = current_revision['ref']
+ self.commit = data['current_revision']
+ files = current_revision.get('files', []).keys()
+ self.is_current_patchset = True
+ else:
+ self.is_current_patchset = False
+ self.files = files
+
+ self.is_merged = data['status'] == 'MERGED'
+ self.approvals = []
+ self.missing_labels = set()
+ for label_name, label_data in data.get('labels', {}).items():
+ for app in label_data.get('all', []):
+ if app.get('value', 0) == 0:
+ continue
+ by = {}
+ for k in ('name', 'username', 'email'):
+ if k in app:
+ by[k] = app[k]
+ self.approvals.append({
+ "type": label_name,
+ "description": label_name,
+ "value": app['value'],
+ "grantedOn":
+ dateutil.parser.parse(app['date']).timestamp(),
+ "by": by,
+ })
+ if label_data.get('optional', False):
+ continue
+ if label_data.get('blocking', False):
+ self.missing_labels.add(label_name)
+ continue
+ if 'approved' in label_data:
+ continue
+ self.missing_labels.add(label_name)
+ self.open = data['status'] == 'NEW'
+ self.status = data['status']
+ self.owner = data['owner']
+ self.message = current_revision['commit']['message']
+
class GerritTriggerEvent(TriggerEvent):
"""Incoming event from an external system."""
def __init__(self):
super(GerritTriggerEvent, self).__init__()
self.approvals = []
+ self.uuid = None
+ self.scheme = None
def __repr__(self):
ret = '<GerritTriggerEvent %s %s' % (self.type,
@@ -154,8 +275,8 @@ class GerritApprovalFilter(object):
class GerritEventFilter(EventFilter, GerritApprovalFilter):
def __init__(self, trigger, types=[], branches=[], refs=[],
event_approvals={}, comments=[], emails=[], usernames=[],
- required_approvals=[], reject_approvals=[],
- ignore_deletes=True):
+ required_approvals=[], reject_approvals=[], uuid=None,
+ scheme=None, ignore_deletes=True):
EventFilter.__init__(self, trigger)
@@ -176,6 +297,8 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter):
self.emails = [re.compile(x) for x in emails]
self.usernames = [re.compile(x) for x in usernames]
self.event_approvals = event_approvals
+ self.uuid = uuid
+ self.scheme = scheme
self.ignore_deletes = ignore_deletes
def __repr__(self):
@@ -183,6 +306,10 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter):
if self._types:
ret += ' types: %s' % ', '.join(self._types)
+ if self.uuid:
+ ret += ' uuid: %s' % (self.uuid,)
+ if self.scheme:
+ ret += ' scheme: %s' % (self.scheme,)
if self._branches:
ret += ' branches: %s' % ', '.join(self._branches)
if self._refs:
@@ -217,6 +344,12 @@ class GerritEventFilter(EventFilter, GerritApprovalFilter):
if self.types and not matches_type:
return False
+ if event.type == 'pending-check':
+ if self.uuid and event.uuid != self.uuid:
+ return False
+ if self.scheme and event.uuid.split(':')[0] != self.scheme:
+ return False
+
# branches are ORed
matches_branch = False
for branch in self.branches:
diff --git a/zuul/driver/gerrit/gerritreporter.py b/zuul/driver/gerrit/gerritreporter.py
index 90de89cc6..fa7968be9 100644
--- a/zuul/driver/gerrit/gerritreporter.py
+++ b/zuul/driver/gerrit/gerritreporter.py
@@ -61,7 +61,7 @@ class GerritReporter(BaseReporter):
item.change._ref_sha = item.change.project.source.getRefSha(
item.change.project, 'refs/heads/' + item.change.branch)
- return self.connection.review(item.change, message, self.config,
+ return self.connection.review(item, message, self.config,
comments, zuul_event_id=item.event)
def getSubmitAllowNeeds(self):
diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py
index df33b443f..ad7fcd2fb 100644
--- a/zuul/driver/gerrit/gerritsource.py
+++ b/zuul/driver/gerrit/gerritsource.py
@@ -75,7 +75,7 @@ class GerritSource(BaseSource):
if not results:
return None
change = self.connection._getChange(
- results[0]['number'], results[0]['currentPatchSet']['number'])
+ results[0].number, results[0].current_patchset)
return change
def getChangesDependingOn(self, change, projects, tenant):
@@ -89,7 +89,7 @@ class GerritSource(BaseSource):
results = self.connection.simpleQuery(query)
seen = set()
for result in results:
- for match in find_dependency_headers(result['commitMessage']):
+ for match in find_dependency_headers(result.message):
found = False
for uri in change.uris:
if uri in match:
@@ -97,13 +97,12 @@ class GerritSource(BaseSource):
break
if not found:
continue
- key = (str(result['number']),
- str(result['currentPatchSet']['number']))
+ key = (result.number, result.current_patchset)
if key in seen:
continue
seen.add(key)
change = self.connection._getChange(
- result['number'], result['currentPatchSet']['number'])
+ result.number, result.current_patchset)
changes.append(change)
return changes
diff --git a/zuul/driver/gerrit/gerrittrigger.py b/zuul/driver/gerrit/gerrittrigger.py
index 67608ad81..88d198d32 100644
--- a/zuul/driver/gerrit/gerrittrigger.py
+++ b/zuul/driver/gerrit/gerrittrigger.py
@@ -56,6 +56,8 @@ class GerritTrigger(BaseTrigger):
reject_approvals=to_list(
trigger.get('reject-approval')
),
+ uuid=trigger.get('uuid'),
+ scheme=trigger.get('scheme'),
ignore_deletes=ignore_deletes
)
efilters.append(f)
@@ -80,7 +82,10 @@ def getSchema():
'change-restored',
'change-merged',
'comment-added',
- 'ref-updated')),
+ 'ref-updated',
+ 'pending-check')),
+ 'uuid': str,
+ 'scheme': str,
'comment_filter': scalar_or_list(str),
'comment': scalar_or_list(str),
'email_filter': scalar_or_list(str),
diff --git a/zuul/model.py b/zuul/model.py
index 82c8821d0..91c492980 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -4674,6 +4674,79 @@ class WebInfo(object):
return d
+class HoldRequest(object):
+ def __init__(self):
+ self.lock = None
+ self.stat = None
+ self.id = None
+ self.tenant = None
+ self.project = None
+ self.job = None
+ self.ref_filter = None
+ self.reason = None
+ self.node_expiration = None
+ # When max_count == current_count, hold request can no longer be used.
+ self.max_count = 1
+ self.current_count = 0
+
+ def __str__(self):
+ return "<HoldRequest %s: tenant=%s project=%s job=%s ref_filter=%s>" \
+ % (self.id, self.tenant, self.project, self.job, self.ref_filter)
+
+ @staticmethod
+ def fromDict(data):
+ '''
+ Return a new object from the given data dictionary.
+ '''
+ obj = HoldRequest()
+ obj.tenant = data.get('tenant')
+ obj.project = data.get('project')
+ obj.job = data.get('job')
+ obj.ref_filter = data.get('ref_filter')
+ obj.max_count = data.get('max_count')
+ obj.current_count = data.get('current_count')
+ obj.reason = data.get('reason')
+ obj.node_expiration = data.get('node_expiration')
+ return obj
+
+ def toDict(self):
+ '''
+ Return a dictionary representation of the object.
+ '''
+ d = dict()
+ d['id'] = self.id
+ d['tenant'] = self.tenant
+ d['project'] = self.project
+ d['job'] = self.job
+ d['ref_filter'] = self.ref_filter
+ d['max_count'] = self.max_count
+ d['current_count'] = self.current_count
+ d['reason'] = self.reason
+ d['node_expiration'] = self.node_expiration
+ return d
+
+ def updateFromDict(self, d):
+ '''
+ Update current object with data from the given dictionary.
+ '''
+ self.tenant = d.get('tenant')
+ self.project = d.get('project')
+ self.job = d.get('job')
+ self.ref_filter = d.get('ref_filter')
+ self.max_count = d.get('max_count', 1)
+ self.current_count = d.get('current_count', 0)
+ self.reason = d.get('reason')
+ self.node_expiration = d.get('node_expiration')
+
+ def serialize(self):
+ '''
+ Return a representation of the object as a string.
+
+ Used for storing the object data in ZooKeeper.
+ '''
+ return json.dumps(self.toDict()).encode('utf8')
+
+
# AuthZ models
class AuthZRule(object):
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 64450e87d..703e7937f 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -168,38 +168,46 @@ class Nodepool(object):
except Exception:
log.exception("Unable to unlock node request %s", request)
- def holdNodeSet(self, nodeset, autohold_key):
+ def holdNodeSet(self, nodeset, request):
'''
Perform a hold on the given set of nodes.
:param NodeSet nodeset: The object containing the set of nodes to hold.
- :param set autohold_key: A set with the tenant/project/job names
- associated with the given NodeSet.
+ :param HoldRequest request: Hold request associated with the NodeSet
'''
self.log.info("Holding nodeset %s" % (nodeset,))
- (hold_iterations,
- reason,
- node_hold_expiration) = self.sched.autohold_requests[autohold_key]
nodes = nodeset.getNodes()
for node in nodes:
if node.lock is None:
raise Exception("Node %s is not locked" % (node,))
node.state = model.STATE_HOLD
- node.hold_job = " ".join(autohold_key)
- node.comment = reason
- if node_hold_expiration:
- node.hold_expiration = node_hold_expiration
+ node.hold_job = " ".join([request.tenant,
+ request.project,
+ request.job,
+ request.ref_filter])
+ node.comment = request.reason
+ if request.node_expiration:
+ node.hold_expiration = request.node_expiration
self.sched.zk.storeNode(node)
- # We remove the autohold when the number of nodes in hold
- # is equal to or greater than (run iteration count can be
- # altered) the number of nodes used in a single job run
- # times the number of run iterations requested.
- nodes_in_hold = self.sched.zk.heldNodeCount(autohold_key)
- if nodes_in_hold >= len(nodes) * hold_iterations:
- self.log.debug("Removing autohold for %s", autohold_key)
- del self.sched.autohold_requests[autohold_key]
+ request.current_count += 1
+
+ # Give ourselves a few seconds to try to obtain the lock rather than
+ # immediately give up.
+ self.sched.zk.lockHoldRequest(request, timeout=5)
+
+ try:
+ self.sched.zk.storeHoldRequest(request)
+ except Exception:
+ # If we fail to update the request count, we won't consider it
+ # a real autohold error by passing the exception up. It will
+ # just get used more than the original count specified.
+ self.log.exception("Unable to update hold request %s:", request)
+ finally:
+ # Although any exceptions thrown here are handled higher up in
+ # _doBuildCompletedEvent, we always want to try to unlock it.
+ self.sched.zk.unlockHoldRequest(request)
def useNodeSet(self, nodeset, build_set=None, event=None):
self.log.info("Setting nodeset %s in use" % (nodeset,))
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 9d1b877dc..45417a805 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -62,6 +62,10 @@ class RPCClient(object):
'node_hold_expiration': node_hold_expiration}
return not self.submitJob('zuul:autohold', data).failure
+ def autohold_delete(self, request_id):
+ data = {'request_id': request_id}
+ return not self.submitJob('zuul:autohold_delete', data).failure
+
# todo allow filtering per tenant, like in the REST API
def autohold_list(self, *args, **kwargs):
data = {}
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 2e1441323..eecf559e3 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -34,6 +34,7 @@ class RPCListener(object):
self.jobs = {}
functions = [
'autohold',
+ 'autohold_delete',
'autohold_list',
'allowed_labels_get',
'dequeue',
@@ -92,16 +93,19 @@ class RPCListener(object):
return
job.sendWorkComplete()
- def handle_autohold_list(self, job):
- req = {}
-
- # The json.dumps() call cannot handle dict keys that are not strings
- # so we convert our key to a CSV string that the caller can parse.
- for key, value in self.sched.autohold_requests.items():
- new_key = ','.join(key)
- req[new_key] = value
+ def handle_autohold_delete(self, job):
+ args = json.loads(job.arguments)
+ request_id = args['request_id']
+ try:
+ self.sched.autohold_delete(request_id)
+ except Exception as e:
+ job.sendWorkException(str(e).encode('utf8'))
+ return
+ job.sendWorkComplete()
- job.sendWorkComplete(json.dumps(req))
+ def handle_autohold_list(self, job):
+ data = self.sched.autohold_list()
+ job.sendWorkComplete(json.dumps(data))
def handle_autohold(self, job):
args = json.loads(job.arguments)
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e5d4da5da..ee89c08bb 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -40,7 +40,7 @@ from zuul.lib.logutil import get_annotated_logger
from zuul.lib.statsd import get_statsd
import zuul.lib.queue
import zuul.lib.repl
-from zuul.model import Build
+from zuul.model import Build, HoldRequest
COMMANDS = ['full-reconfigure', 'stop', 'repl', 'norepl']
@@ -326,7 +326,6 @@ class Scheduler(threading.Thread):
self.zuul_version = zuul_version.release_string
self.last_reconfigured = None
self.tenant_last_reconfigured = {}
- self.autohold_requests = {}
self.use_relative_priority = False
if self.config.has_option('scheduler', 'relative_priority'):
if self.config.getboolean('scheduler', 'relative_priority'):
@@ -555,12 +554,57 @@ class Scheduler(threading.Thread):
def autohold(self, tenant_name, project_name, job_name, ref_filter,
reason, count, node_hold_expiration):
key = (tenant_name, project_name, job_name, ref_filter)
- if count == 0 and key in self.autohold_requests:
- self.log.debug("Removing autohold for %s", key)
- del self.autohold_requests[key]
- else:
- self.log.debug("Autohold requested for %s", key)
- self.autohold_requests[key] = (count, reason, node_hold_expiration)
+ self.log.debug("Autohold requested for %s", key)
+
+ request = HoldRequest()
+ request.tenant = tenant_name
+ request.project = project_name
+ request.job = job_name
+ request.ref_filter = ref_filter
+ request.reason = reason
+ request.max_count = count
+ request.node_expiration = node_hold_expiration
+
+ # No need to lock it since we are creating a new one.
+ self.zk.storeHoldRequest(request)
+
+ def autohold_list(self):
+ '''
+ Return current hold requests as a list of dicts.
+ '''
+ data = []
+ for request_id in self.zk.getHoldRequests():
+ request = self.zk.getHoldRequest(request_id)
+ if not request:
+ continue
+ data.append(request.toDict())
+ return data
+
+ def autohold_delete(self, hold_request_id):
+ '''
+ Delete an autohold request.
+
+ :param str hold_request_id: The unique ID of the request to delete.
+ '''
+ try:
+ hold_request = self.zk.getHoldRequest(hold_request_id)
+ except Exception:
+ self.log.exception(
+ "Error retrieving autohold ID %s:", hold_request_id)
+
+ if not hold_request:
+ self.log.info("Ignored request to remove invalid autohold ID %s",
+ hold_request_id)
+ return
+
+ # (TODO): Release any nodes held for this request here
+
+ self.log.debug("Removing autohold %s", hold_request)
+ try:
+ self.zk.deleteHoldRequest(hold_request)
+ except Exception:
+ self.log.exception(
+ "Error removing autohold request %s:", hold_request)
def promote(self, tenant_name, pipeline_name, change_ids):
event = PromoteEvent(tenant_name, pipeline_name, change_ids)
@@ -1232,7 +1276,7 @@ class Scheduler(threading.Thread):
return
pipeline.manager.onBuildPaused(event.build)
- def _getAutoholdRequestKey(self, build):
+ def _getAutoholdRequest(self, build):
change = build.build_set.item.change
autohold_key_base = (build.pipeline.tenant.name,
@@ -1254,16 +1298,6 @@ class Scheduler(threading.Thread):
CHANGE = 2
REF = 3
- def autohold_key_base_issubset(base, request_key):
- """check whether the given key is a subset of the build key"""
- index = 0
- base_len = len(base)
- while index < base_len:
- if base[index] != request_key[index]:
- return False
- index += 1
- return True
-
# Do a partial match of the autohold key against all autohold
# requests, ignoring the last element of the key (ref filter),
# and finally do a regex match between ref filter from
@@ -1271,13 +1305,23 @@ class Scheduler(threading.Thread):
# if it matches. Lastly, make sure that we match the most
# specific autohold request by comparing "scopes"
# of requests - the most specific is selected.
- autohold_key = None
+ autohold = None
scope = Scope.NONE
self.log.debug("Checking build autohold key %s", autohold_key_base)
- for request in self.autohold_requests:
- ref_filter = request[-1]
- if not autohold_key_base_issubset(autohold_key_base, request) \
- or not re.match(ref_filter, change.ref):
+ for request_id in self.zk.getHoldRequests():
+ request = self.zk.getHoldRequest(request_id)
+ if not request:
+ continue
+ ref_filter = request.ref_filter
+
+ if request.current_count >= request.max_count:
+ # This request has been used the max number of times
+ continue
+ elif not (request.tenant == autohold_key_base[0] and
+ request.project == autohold_key_base[1] and
+ request.job == autohold_key_base[2]):
+ continue
+ elif not re.match(ref_filter, change.ref):
continue
if ref_filter == ".*":
@@ -1291,9 +1335,9 @@ class Scheduler(threading.Thread):
autohold_key_base, candidate_scope)
if candidate_scope > scope:
scope = candidate_scope
- autohold_key = request
+ autohold = request
- return autohold_key
+ return autohold
def _processAutohold(self, build):
# We explicitly only want to hold nodes for jobs if they have
@@ -1302,10 +1346,10 @@ class Scheduler(threading.Thread):
if build.result not in hold_list:
return
- autohold_key = self._getAutoholdRequestKey(build)
- self.log.debug("Got autohold key %s", autohold_key)
- if autohold_key is not None:
- self.nodepool.holdNodeSet(build.nodeset, autohold_key)
+ request = self._getAutoholdRequest(build)
+ self.log.debug("Got autohold %s", request)
+ if request is not None:
+ self.nodepool.holdNodeSet(build.nodeset, request)
def _doBuildCompletedEvent(self, event):
build = event.build
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index 4f6ce1384..4c5447365 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -417,19 +417,19 @@ class ZuulWebAPI(object):
else:
payload = json.loads(job.data[0])
result = []
- for key in payload:
- _tenant, _project, job, ref_filter = key.split(',')
- count, reason, hold_expiration = payload[key]
- if tenant == _tenant:
- if project is None or _project.endswith(project):
+ for request in payload:
+ if tenant == request['tenant']:
+ if (project is None or
+ request['project'].endswith(project)):
result.append(
- {'tenant': _tenant,
- 'project': _project,
- 'job': job,
- 'ref_filter': ref_filter,
- 'count': count,
- 'reason': reason,
- 'node_hold_expiration': hold_expiration})
+ {'tenant': request['tenant'],
+ 'project': request['project'],
+ 'job': request['job'],
+ 'ref_filter': request['ref_filter'],
+ 'count': request['max_count'],
+ 'reason': request['reason'],
+ 'node_hold_expiration': request['node_expiration']
+ })
return result
@cherrypy.expose
@@ -984,7 +984,7 @@ class ZuulWeb(object):
# instanciate handlers
self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port,
ssl_key, ssl_cert, ssl_ca)
- self.zk = zuul.zk.ZooKeeper()
+ self.zk = zuul.zk.ZooKeeper(enable_cache=True)
if zk_hosts:
self.zk.connect(hosts=zk_hosts, read_only=True)
self.connections = connections
diff --git a/zuul/zk.py b/zuul/zk.py
index 433fc983f..e3691ce95 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -17,6 +17,7 @@ import time
from kazoo.client import KazooClient, KazooState
from kazoo import exceptions as kze
from kazoo.handlers.threading import KazooTimeoutError
+from kazoo.recipe.cache import TreeCache, TreeEvent
from kazoo.recipe.lock import Lock
import zuul.model
@@ -43,17 +44,31 @@ class ZooKeeper(object):
REQUEST_ROOT = '/nodepool/requests'
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
NODE_ROOT = '/nodepool/nodes'
+ HOLD_REQUEST_ROOT = '/zuul/hold-requests'
# Log zookeeper retry every 10 seconds
retry_log_rate = 10
- def __init__(self):
+ def __init__(self, enable_cache=True):
'''
Initialize the ZooKeeper object.
+
+ :param bool enable_cache: When True, enables caching of ZooKeeper
+ objects (e.g., HoldRequests).
'''
self.client = None
self._became_lost = False
self._last_retry_log = 0
+ self.enable_cache = enable_cache
+
+ # The caching model we use is designed around handing out model
+ # data as objects. To do this, we use two caches: one is a TreeCache
+ # which contains raw znode data (among other details), and one for
+ # storing that data serialized as objects. This allows us to return
+ # objects from the APIs, and avoids calling the methods to serialize
+ # the data into objects more than once.
+ self._hold_request_tree = None
+ self._cached_hold_requests = {}
def _dictToStr(self, data):
return json.dumps(data).encode('utf8')
@@ -125,6 +140,67 @@ class ZooKeeper(object):
except KazooTimeoutError:
self.logConnectionRetryEvent()
+ if self.enable_cache:
+ self._hold_request_tree = TreeCache(self.client,
+ self.HOLD_REQUEST_ROOT)
+ self._hold_request_tree.listen_fault(self.cacheFaultListener)
+ self._hold_request_tree.listen(self.holdRequestCacheListener)
+ self._hold_request_tree.start()
+
+ def cacheFaultListener(self, e):
+ self.log.exception(e)
+
+ def holdRequestCacheListener(self, event):
+ '''
+ Keep the hold request object cache in sync with the TreeCache.
+ '''
+ try:
+ self._holdRequestCacheListener(event)
+ except Exception:
+ self.log.exception(
+ "Exception in hold request cache update for event: %s", event)
+
+ def _holdRequestCacheListener(self, event):
+ if hasattr(event.event_data, 'path'):
+ # Ignore root node
+ path = event.event_data.path
+ if path == self.HOLD_REQUEST_ROOT:
+ return
+
+ if event.event_type not in (TreeEvent.NODE_ADDED,
+ TreeEvent.NODE_UPDATED,
+ TreeEvent.NODE_REMOVED):
+ return
+
+ path = event.event_data.path
+ request_id = path.rsplit('/', 1)[1]
+
+ if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
+ # Requests with no data are invalid
+ if not event.event_data.data:
+ return
+
+ # Perform an in-place update of the already cached request
+ d = self._bytesToDict(event.event_data.data)
+ old_request = self._cached_hold_requests.get(request_id)
+ if old_request:
+ if event.event_data.stat.version <= old_request.stat.version:
+ # Don't update to older data
+ return
+ old_request.updateFromDict(d)
+ old_request.stat = event.event_data.stat
+ else:
+ request = zuul.model.HoldRequest.fromDict(d)
+ request.id = request_id
+ request.stat = event.event_data.stat
+ self._cached_hold_requests[request_id] = request
+
+ elif event.event_type == TreeEvent.NODE_REMOVED:
+ try:
+ del self._cached_hold_requests[request_id]
+ except KeyError:
+ pass
+
def disconnect(self):
'''
Close the ZooKeeper cluster connection.
@@ -132,6 +208,10 @@ class ZooKeeper(object):
You should call this method if you used connect() to establish a
cluster connection.
'''
+ if self._hold_request_tree is not None:
+ self._hold_request_tree.close()
+ self._hold_request_tree = None
+
if self.client is not None and self.client.connected:
self.client.stop()
self.client.close()
@@ -459,6 +539,106 @@ class ZooKeeper(object):
if node:
yield node
+ def getHoldRequests(self):
+ '''
+ Get the current list of all hold requests.
+ '''
+ try:
+ return self.client.get_children(self.HOLD_REQUEST_ROOT)
+ except kze.NoNodeError:
+ return []
+
+ def getHoldRequest(self, hold_request_id):
+ path = self.HOLD_REQUEST_ROOT + "/" + hold_request_id
+ try:
+ data, stat = self.client.get(path)
+ except kze.NoNodeError:
+ return None
+ if not data:
+ return None
+
+ obj = zuul.model.HoldRequest.fromDict(self._strToDict(data))
+ obj.id = hold_request_id
+ obj.stat = stat
+ return obj
+
+ def storeHoldRequest(self, hold_request):
+ '''
+ Create or update a hold request.
+
+ If this is a new request with no value for the `id` attribute of the
+ passed in request, then `id` will be set with the unique request
+ identifier after successful creation.
+
+ :param HoldRequest hold_request: Object representing the hold request.
+ '''
+ if hold_request.id is None:
+ path = self.client.create(
+ self.HOLD_REQUEST_ROOT + "/",
+ value=hold_request.serialize(),
+ sequence=True,
+ makepath=True)
+ hold_request.id = path.split('/')[-1]
+ else:
+ path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id
+ self.client.set(path, hold_request.serialize())
+
+ def deleteHoldRequest(self, hold_request):
+ '''
+ Delete a hold request.
+
+ :param HoldRequest hold_request: Object representing the hold request.
+ '''
+ path = self.HOLD_REQUEST_ROOT + "/" + hold_request.id
+ try:
+ self.client.delete(path, recursive=True)
+ except kze.NoNodeError:
+ pass
+
+ def lockHoldRequest(self, request, blocking=True, timeout=None):
+ '''
+ Lock a node request.
+
+ This will set the `lock` attribute of the request object when the
+ lock is successfully acquired.
+
+ :param HoldRequest request: The hold request to lock.
+ '''
+ if not request.id:
+ raise LockException(
+ "Hold request without an ID cannot be locked: %s" % request)
+
+ path = "%s/%s/lock" % (self.HOLD_REQUEST_ROOT, request.id)
+ try:
+ lock = Lock(self.client, path)
+ have_lock = lock.acquire(blocking, timeout)
+ except kze.LockTimeout:
+ raise LockException(
+ "Timeout trying to acquire lock %s" % path)
+
+ # If we aren't blocking, it's possible we didn't get the lock
+ # because someone else has it.
+ if not have_lock:
+ raise LockException("Did not get lock on %s" % path)
+
+ request.lock = lock
+
+ def unlockHoldRequest(self, request):
+ '''
+ Unlock a hold request.
+
+ The request must already have been locked.
+
+ :param HoldRequest request: The request to unlock.
+
+ :raises: ZKLockException if the request is not currently locked.
+ '''
+ if request.lock is None:
+ raise LockException(
+ "Request %s does not hold a lock" % request)
+ request.lock.release()
+ request.lock = None
+
class Launcher():
'''