summaryrefslogtreecommitdiff
path: root/zuul
diff options
context:
space:
mode:
Diffstat (limited to 'zuul')
-rw-r--r--zuul/ansible/base/callback/zuul_stream.py5
-rwxr-xr-xzuul/cmd/client.py44
-rw-r--r--zuul/configloader.py112
-rw-r--r--zuul/driver/elasticsearch/reporter.py4
-rw-r--r--zuul/driver/gerrit/gerritconnection.py52
-rw-r--r--zuul/driver/gerrit/gerritmodel.py4
-rw-r--r--zuul/driver/gerrit/gerritsource.py11
-rw-r--r--zuul/driver/github/githubconnection.py22
-rw-r--r--zuul/driver/github/githubreporter.py13
-rw-r--r--zuul/driver/gitlab/gitlabconnection.py51
-rw-r--r--zuul/driver/mqtt/mqttconnection.py15
-rw-r--r--zuul/driver/sql/sqlconnection.py39
-rw-r--r--zuul/executor/server.py5
-rw-r--r--zuul/manager/__init__.py92
-rw-r--r--zuul/merger/merger.py28
-rw-r--r--zuul/model.py129
-rw-r--r--zuul/model_api.py2
-rw-r--r--zuul/scheduler.py19
-rw-r--r--zuul/zk/job_request_queue.py2
19 files changed, 455 insertions, 194 deletions
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py
index b5c14691b..3f886c797 100644
--- a/zuul/ansible/base/callback/zuul_stream.py
+++ b/zuul/ansible/base/callback/zuul_stream.py
@@ -44,6 +44,7 @@ import time
from ansible.plugins.callback import default
from ansible.module_utils._text import to_text
+from ansible.module_utils.parsing.convert_bool import boolean
from zuul.ansible import paths
from zuul.ansible import logconfig
@@ -333,6 +334,10 @@ class CallbackModule(default.CallbackModule):
if (ip in ('localhost', '127.0.0.1')):
# Don't try to stream from localhost
continue
+ if boolean(play_vars[host].get(
+ 'zuul_console_disabled', False)):
+ # The user has told us not to even try
+ continue
if play_vars[host].get('ansible_connection') in ('winrm',):
# The winrm connections don't support streaming for now
continue
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 031b10a1e..6fa20c7c4 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -30,16 +30,14 @@ import time
import textwrap
import requests
import urllib.parse
-from uuid import uuid4
import zuul.cmd
from zuul.lib.config import get_default
-from zuul.model import SystemAttributes, PipelineState
+from zuul.model import SystemAttributes, PipelineState, PipelineChangeList
from zuul.zk import ZooKeeperClient
from zuul.lib.keystorage import KeyStorage
-from zuul.zk.locks import tenant_write_lock
+from zuul.zk.locks import tenant_read_lock, pipeline_lock
from zuul.zk.zkobject import ZKContext
-from zuul.zk.layout import LayoutState, LayoutStateStore
from zuul.zk.components import COMPONENT_REGISTRY
@@ -542,6 +540,10 @@ class Client(zuul.cmd.ZuulApp):
cmd_prune_database.add_argument(
'--older-than',
help='relative time (e.g., "24h" or "180d")')
+ cmd_prune_database.add_argument(
+ '--batch-size',
+ default=10000,
+ help='transaction batch size')
cmd_prune_database.set_defaults(func=self.prune_database)
return parser
@@ -1029,28 +1031,18 @@ class Client(zuul.cmd.ZuulApp):
safe_tenant = urllib.parse.quote_plus(args.tenant)
safe_pipeline = urllib.parse.quote_plus(args.pipeline)
COMPONENT_REGISTRY.create(zk_client)
- with tenant_write_lock(zk_client, args.tenant) as lock:
+ self.log.info('get tenant')
+ with tenant_read_lock(zk_client, args.tenant):
path = f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}'
- layout_uuid = None
- zk_client.client.delete(
- f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}',
- recursive=True)
- with ZKContext(zk_client, lock, None, self.log) as context:
- ps = PipelineState.new(context, _path=path,
- layout_uuid=layout_uuid)
- # Force everyone to make a new layout for this tenant in
- # order to rebuild the shared change queues.
- layout_state = LayoutState(
- tenant_name=args.tenant,
- hostname='admin command',
- last_reconfigured=int(time.time()),
- last_reconfigure_event_ltime=-1,
- uuid=uuid4().hex,
- branch_cache_min_ltimes={},
- ltime=ps._zstat.last_modified_transaction_id,
- )
- tenant_layout_state = LayoutStateStore(zk_client, lambda: None)
- tenant_layout_state[args.tenant] = layout_state
+ self.log.info('get pipe')
+ with pipeline_lock(
+ zk_client, args.tenant, args.pipeline
+ ) as plock:
+ self.log.info('got locks')
+ zk_client.client.delete(path, recursive=True)
+ with ZKContext(zk_client, plock, None, self.log) as context:
+ PipelineState.new(context, _path=path, layout_uuid=None)
+ PipelineChangeList.new(context)
sys.exit(0)
@@ -1061,7 +1053,7 @@ class Client(zuul.cmd.ZuulApp):
cutoff = parse_cutoff(now, args.before, args.older_than)
self.configure_connections(source_only=False, require_sql=True)
connection = self.connections.getSqlConnection()
- connection.deleteBuildsets(cutoff)
+ connection.deleteBuildsets(cutoff, args.batch_size)
sys.exit(0)
diff --git a/zuul/configloader.py b/zuul/configloader.py
index fe22fe0f8..4f472cb4e 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -2151,21 +2151,26 @@ class TenantParser(object):
for future in futures:
future.result()
- try:
- self._processCatJobs(abide, tenant, loading_errors, jobs,
- min_ltimes)
- except Exception:
- self.log.exception("Error processing cat jobs, canceling")
- for job in jobs:
+ for i, job in enumerate(jobs, start=1):
+ try:
try:
- self.log.debug("Canceling cat job %s", job)
+ self._processCatJob(abide, tenant, loading_errors, job,
+ min_ltimes)
+ except TimeoutError:
self.merger.cancel(job)
- except Exception:
- self.log.exception("Unable to cancel job %s", job)
- if not ignore_cat_exception:
- raise
- if not ignore_cat_exception:
- raise
+ raise
+ except Exception:
+ self.log.exception("Error processing cat job")
+ if not ignore_cat_exception:
+ # Cancel remaining jobs
+ for cancel_job in jobs[i:]:
+ self.log.debug("Canceling cat job %s", cancel_job)
+ try:
+ self.merger.cancel(cancel_job)
+ except Exception:
+ self.log.exception(
+ "Unable to cancel job %s", cancel_job)
+ raise
def _cacheTenantYAMLBranch(self, abide, tenant, loading_errors, min_ltimes,
tpc, project, branch, jobs):
@@ -2234,49 +2239,48 @@ class TenantParser(object):
job.source_context = source_context
jobs.append(job)
- def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes):
+ def _processCatJob(self, abide, tenant, loading_errors, job, min_ltimes):
# Called at the end of _cacheTenantYAML after all cat jobs
# have been submitted
- for job in jobs:
- self.log.debug("Waiting for cat job %s" % (job,))
- res = job.wait(self.merger.git_timeout)
- if not res:
- # We timed out
- raise Exception("Cat job %s timed out; consider setting "
- "merger.git_timeout in zuul.conf" % (job,))
- if not job.updated:
- raise Exception("Cat job %s failed" % (job,))
- self.log.debug("Cat job %s got files %s" %
- (job, job.files.keys()))
-
- self._updateUnparsedBranchCache(abide, tenant, job.source_context,
- job.files, loading_errors,
- job.ltime, min_ltimes)
-
- # Save all config files in Zookeeper (not just for the current tpc)
- files_cache = self.unparsed_config_cache.getFilesCache(
- job.source_context.project_canonical_name,
- job.source_context.branch)
- with self.unparsed_config_cache.writeLock(
- job.source_context.project_canonical_name):
- # Prevent files cache ltime from going backward
- if files_cache.ltime >= job.ltime:
- self.log.info(
- "Discarding job %s result since the files cache was "
- "updated in the meantime", job)
- continue
- # Since the cat job returns all required config files
- # for ALL tenants the project is a part of, we can
- # clear the whole cache and then populate it with the
- # updated content.
- files_cache.clear()
- for fn, content in job.files.items():
- # Cache file in Zookeeper
- if content is not None:
- files_cache[fn] = content
- files_cache.setValidFor(job.extra_config_files,
- job.extra_config_dirs,
- job.ltime)
+ self.log.debug("Waiting for cat job %s" % (job,))
+ res = job.wait(self.merger.git_timeout)
+ if not res:
+ # We timed out
+ raise TimeoutError(f"Cat job {job} timed out; consider setting "
+ "merger.git_timeout in zuul.conf")
+ if not job.updated:
+ raise Exception("Cat job %s failed" % (job,))
+ self.log.debug("Cat job %s got files %s" %
+ (job, job.files.keys()))
+
+ self._updateUnparsedBranchCache(abide, tenant, job.source_context,
+ job.files, loading_errors,
+ job.ltime, min_ltimes)
+
+ # Save all config files in Zookeeper (not just for the current tpc)
+ files_cache = self.unparsed_config_cache.getFilesCache(
+ job.source_context.project_canonical_name,
+ job.source_context.branch)
+ with self.unparsed_config_cache.writeLock(
+ job.source_context.project_canonical_name):
+ # Prevent files cache ltime from going backward
+ if files_cache.ltime >= job.ltime:
+ self.log.info(
+ "Discarding job %s result since the files cache was "
+ "updated in the meantime", job)
+ return
+ # Since the cat job returns all required config files
+ # for ALL tenants the project is a part of, we can
+ # clear the whole cache and then populate it with the
+ # updated content.
+ files_cache.clear()
+ for fn, content in job.files.items():
+ # Cache file in Zookeeper
+ if content is not None:
+ files_cache[fn] = content
+ files_cache.setValidFor(job.extra_config_files,
+ job.extra_config_dirs,
+ job.ltime)
def _updateUnparsedBranchCache(self, abide, tenant, source_context, files,
loading_errors, ltime, min_ltimes):
diff --git a/zuul/driver/elasticsearch/reporter.py b/zuul/driver/elasticsearch/reporter.py
index 7802cb609..e5e90e052 100644
--- a/zuul/driver/elasticsearch/reporter.py
+++ b/zuul/driver/elasticsearch/reporter.py
@@ -103,7 +103,9 @@ class ElasticsearchReporter(BaseReporter):
build_doc['job_vars'] = job.variables
if self.index_returned_vars:
- build_doc['job_returned_vars'] = build.result_data
+ rdata = build.result_data.copy()
+ rdata.pop('zuul', None)
+ build_doc['job_returned_vars'] = rdata
docs.append(build_doc)
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 276365e1d..990b7b235 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -161,6 +161,8 @@ class GerritEventConnector(threading.Thread):
IGNORED_EVENTS = (
'cache-eviction', # evict-cache plugin
+ 'fetch-ref-replicated',
+ 'fetch-ref-replication-scheduled',
'ref-replicated',
'ref-replication-scheduled',
'ref-replication-done'
@@ -1180,9 +1182,34 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return True
if change.wip:
return False
- if change.missing_labels <= set(allow_needs):
- return True
- return False
+ if change.missing_labels > set(allow_needs):
+ self.log.debug("Unable to merge due to "
+ "missing labels: %s", change.missing_labels)
+ return False
+ for sr in change.submit_requirements:
+ if sr.get('status') == 'UNSATISFIED':
+ # Otherwise, we don't care and should skip.
+
+ # We're going to look at each unsatisfied submit
+ # requirement, and if one of the involved labels is an
+ # "allow_needs" label, we will assume that Zuul may be
+ # able to take an action which can cause the
+ # requirement to be satisfied, and we will ignore it.
+ # Otherwise, it is likely a requirement that Zuul can
+ # not alter in which case the requirement should stand
+ # and block merging.
+ result = sr.get("submittability_expression_result", {})
+ expression = result.get("expression", '')
+ expr_contains_allow = False
+ for allow in allow_needs:
+ if f'label:{allow}' in expression:
+ expr_contains_allow = True
+ break
+ if not expr_contains_allow:
+ self.log.debug("Unable to merge due to "
+ "submit requirement: %s", sr)
+ return False
+ return True
def getProjectOpenChanges(self, project: Project) -> List[GerritChange]:
# This is a best-effort function in case Gerrit is unable to return
@@ -1441,13 +1468,22 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return data
def queryChangeHTTP(self, number, event=None):
- data = self.get('changes/%s?o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&'
- 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&'
- 'o=DETAILED_LABELS' % (number,))
+ query = ('changes/%s?o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&'
+ 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&'
+ 'o=DETAILED_LABELS' % (number,))
+ if self.version >= (3, 5, 0):
+ query += '&o=SUBMIT_REQUIREMENTS'
+ data = self.get(query)
related = self.get('changes/%s/revisions/%s/related' % (
number, data['current_revision']))
- files = self.get('changes/%s/revisions/%s/files?parent=1' % (
- number, data['current_revision']))
+
+ files_query = 'changes/%s/revisions/%s/files' % (
+ number, data['current_revision'])
+
+ if data['revisions'][data['current_revision']]['commit']['parents']:
+ files_query += '?parent=1'
+
+ files = self.get(files_query)
return data, related, files
def queryChange(self, number, event=None):
diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py
index 0ac3e7f9d..4ac291f2b 100644
--- a/zuul/driver/gerrit/gerritmodel.py
+++ b/zuul/driver/gerrit/gerritmodel.py
@@ -34,6 +34,7 @@ class GerritChange(Change):
self.wip = None
self.approvals = []
self.missing_labels = set()
+ self.submit_requirements = []
self.commit = None
self.zuul_query_ltime = None
@@ -52,6 +53,7 @@ class GerritChange(Change):
"wip": self.wip,
"approvals": self.approvals,
"missing_labels": list(self.missing_labels),
+ "submit_requirements": self.submit_requirements,
"commit": self.commit,
"zuul_query_ltime": self.zuul_query_ltime,
})
@@ -64,6 +66,7 @@ class GerritChange(Change):
self.wip = data["wip"]
self.approvals = data["approvals"]
self.missing_labels = set(data["missing_labels"])
+ self.submit_requirements = data.get("submit_requirements", [])
self.commit = data.get("commit")
self.zuul_query_ltime = data.get("zuul_query_ltime")
@@ -189,6 +192,7 @@ class GerritChange(Change):
if 'approved' in label_data:
continue
self.missing_labels.add(label_name)
+ self.submit_requirements = data.get('submit_requirements', [])
self.open = data['status'] == 'NEW'
self.status = data['status']
self.wip = data.get('work_in_progress', False)
diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py
index f42e93254..b0bd3c448 100644
--- a/zuul/driver/gerrit/gerritsource.py
+++ b/zuul/driver/gerrit/gerritsource.py
@@ -164,11 +164,14 @@ class GerritSource(BaseSource):
change = self.connection._getChange(change_key)
changes[change_key] = change
- for change in changes.values():
- for git_key in change.git_needs_changes:
- if git_key in changes:
+ # Convert to list here because the recursive call can mutate
+ # the set.
+ for change in list(changes.values()):
+ for git_change_ref in change.git_needs_changes:
+ change_key = ChangeKey.fromReference(git_change_ref)
+ if change_key in changes:
continue
- git_change = self.getChange(git_key)
+ git_change = self.getChange(change_key)
if not git_change.topic or git_change.topic == topic:
continue
self.getChangesByTopic(git_change.topic, changes)
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 182c83bae..a1353cb4d 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -81,6 +81,10 @@ ANNOTATION_LEVELS = {
"warning": "warning",
"error": "failure",
}
+# The maximum size for the 'message' field is 64 KB. Since it's unclear
+# from the Github docs if the unit is KiB or KB we'll use KB to be on
+# the safe side.
+ANNOTATION_MAX_MESSAGE_SIZE = 64 * 1000
EventTuple = collections.namedtuple(
"EventTuple", [
@@ -403,7 +407,8 @@ class GithubEventProcessor(object):
# Returns empty on unhandled events
return
- self.log.debug("Handling %s event", self.event_type)
+ self.log.debug("Handling %s event with installation id %s",
+ self.event_type, installation_id)
events = []
try:
events = method()
@@ -439,7 +444,11 @@ class GithubEventProcessor(object):
# branch is now protected.
if hasattr(event, "branch") and event.branch:
protected = None
- if change:
+ # Only use the `branch_protected` flag if the
+ # target branch of change and event are the same.
+ # The base branch could have changed in the
+ # meantime.
+ if change and change.branch == event.branch:
# PR based events already have the information if the
# target branch is protected so take the information
# from there.
@@ -675,6 +684,11 @@ class GithubEventProcessor(object):
branch, project_name)
events.append(
self._branch_protection_rule_to_event(project_name, branch))
+
+ for event in events:
+ # Make sure every event has a branch cache ltime
+ self.connection.clearConnectionCacheOnBranchEvent(event)
+
return events
def _branch_protection_rule_to_event(self, project_name, branch):
@@ -2428,7 +2442,9 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
raw_annotation = {
"path": fn,
"annotation_level": annotation_level,
- "message": comment["message"],
+ "message": comment["message"].encode(
+ "utf8")[:ANNOTATION_MAX_MESSAGE_SIZE].decode(
+ "utf8", "ignore"),
"start_line": start_line,
"end_line": end_line,
"start_column": start_column,
diff --git a/zuul/driver/github/githubreporter.py b/zuul/driver/github/githubreporter.py
index 1f44303bd..396516038 100644
--- a/zuul/driver/github/githubreporter.py
+++ b/zuul/driver/github/githubreporter.py
@@ -193,13 +193,13 @@ class GithubReporter(BaseReporter):
self.log.warning('Merge mode %s not supported by Github', mode)
raise MergeFailure('Merge mode %s not supported by Github' % mode)
- merge_mode = self.merge_modes[merge_mode]
project = item.change.project.name
pr_number = item.change.number
sha = item.change.patchset
log.debug('Reporting change %s, params %s, merging via API',
item.change, self.config)
- message = self._formatMergeMessage(item.change)
+ message = self._formatMergeMessage(item.change, merge_mode)
+ merge_mode = self.merge_modes[merge_mode]
for i in [1, 2]:
try:
@@ -319,10 +319,13 @@ class GithubReporter(BaseReporter):
self.connection.unlabelPull(project, pr_number, label,
zuul_event_id=item.event)
- def _formatMergeMessage(self, change):
+ def _formatMergeMessage(self, change, merge_mode):
message = []
- if change.title:
- message.append(change.title)
+ # For squash merges we don't need to add the title to the body
+ # as it will already be set as the commit subject.
+ if merge_mode != model.MERGER_SQUASH_MERGE:
+ if change.title:
+ message.append(change.title)
if change.body_text:
message.append(change.body_text)
merge_message = "\n\n".join(message)
diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py
index 40cac241a..da423f085 100644
--- a/zuul/driver/gitlab/gitlabconnection.py
+++ b/zuul/driver/gitlab/gitlabconnection.py
@@ -281,6 +281,7 @@ class GitlabAPIClient():
self.api_token = api_token
self.keepalive = keepalive
self.disable_pool = disable_pool
+ self.get_mr_wait_factor = 2
self.headers = {'Authorization': 'Bearer %s' % (
self.api_token)}
@@ -342,11 +343,36 @@ class GitlabAPIClient():
# https://docs.gitlab.com/ee/api/merge_requests.html#get-single-mr
def get_mr(self, project_name, number, zuul_event_id=None):
- path = "/projects/%s/merge_requests/%s" % (
- quote_plus(project_name), number)
- resp = self.get(self.baseurl + path, zuul_event_id=zuul_event_id)
- self._manage_error(*resp, zuul_event_id=zuul_event_id)
- return resp[0]
+ log = get_annotated_logger(self.log, zuul_event_id)
+ attempts = 0
+
+ def _get_mr():
+ path = "/projects/%s/merge_requests/%s" % (
+ quote_plus(project_name), number)
+ resp = self.get(self.baseurl + path, zuul_event_id=zuul_event_id)
+ self._manage_error(*resp, zuul_event_id=zuul_event_id)
+ return resp[0]
+
+ # The Gitlab API might not return a complete MR description as
+ # some attributes are updated asynchronously. This loop ensures
+ # we query the API until all async attributes are available or until
+ # a defined delay is reached.
+ while True:
+ attempts += 1
+ mr = _get_mr()
+ # The diff_refs attribute is updated asynchronously
+ if all(map(lambda k: mr.get(k, None), ['diff_refs'])):
+ return mr
+ if attempts > 4:
+ log.warning(
+ "Fetched MR %s#%s with imcomplete data" % (
+ project_name, number))
+ return mr
+ wait_delay = attempts * self.get_mr_wait_factor
+ log.info(
+ "Will retry to fetch %s#%s due to imcomplete data "
+ "(in %s seconds) ..." % (project_name, number, wait_delay))
+ time.sleep(wait_delay)
# https://docs.gitlab.com/ee/api/branches.html#list-repository-branches
def get_project_branches(self, project_name, exclude_unprotected,
@@ -607,17 +633,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return change
project = self.source.getProject(change_key.project_name)
if not change:
- if not event:
- self.log.error("Change %s not found in cache and no event",
- change_key)
- if event:
- url = event.change_url
change = MergeRequest(project.name)
change.project = project
change.number = number
# patch_number is the tips commit SHA of the MR
change.patchset = change_key.revision
- change.url = url or self.getMRUrl(project.name, number)
+ change.url = self.getMRUrl(project.name, number)
change.uris = [change.url.split('://', 1)[-1]] # remove scheme
log.debug("Getting change mr#%s from project %s" % (
@@ -672,8 +693,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
change.ref = "refs/merge-requests/%s/head" % change.number
change.branch = change.mr['target_branch']
change.is_current_patchset = (change.mr['sha'] == change.patchset)
- change.base_sha = change.mr['diff_refs'].get('base_sha')
- change.commit_id = change.mr['diff_refs'].get('head_sha')
+ change.commit_id = event.patch_number
+ diff_refs = change.mr.get("diff_refs", {})
+ if diff_refs:
+ change.base_sha = diff_refs.get('base_sha')
+ else:
+ change.base_sha = None
change.owner = change.mr['author'].get('username')
# Files changes are not part of the Merge Request data
# See api/merge_requests.html#get-single-mr-changes
diff --git a/zuul/driver/mqtt/mqttconnection.py b/zuul/driver/mqtt/mqttconnection.py
index 7f221282f..4a028ba23 100644
--- a/zuul/driver/mqtt/mqttconnection.py
+++ b/zuul/driver/mqtt/mqttconnection.py
@@ -64,6 +64,12 @@ class MQTTConnection(BaseConnection):
def onLoad(self, zk_client, component_registry):
self.log.debug("Starting MQTT Connection")
+
+ # If the connection was not loaded by a scheduler, but by e.g.
+ # zuul-web, we want to stop here.
+ if not self.sched:
+ return
+
try:
self.client.connect(
self.connection_config.get('server', 'localhost'),
@@ -76,10 +82,11 @@ class MQTTConnection(BaseConnection):
self.client.loop_start()
def onStop(self):
- self.log.debug("Stopping MQTT Connection")
- self.client.loop_stop()
- self.client.disconnect()
- self.connected = False
+ if self.connected:
+ self.log.debug("Stopping MQTT Connection")
+ self.client.loop_stop()
+ self.client.disconnect()
+ self.connected = False
def publish(self, topic, message, qos, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py
index 2d5c39ec3..7a4aea626 100644
--- a/zuul/driver/sql/sqlconnection.py
+++ b/zuul/driver/sql/sqlconnection.py
@@ -247,12 +247,25 @@ class DatabaseSession(object):
except sqlalchemy.orm.exc.MultipleResultsFound:
raise Exception("Multiple buildset found with uuid %s", uuid)
- def deleteBuildsets(self, cutoff):
+ def deleteBuildsets(self, cutoff, batch_size):
"""Delete buildsets before the cutoff"""
# delete buildsets updated before the cutoff
- for buildset in self.getBuildsets(updated_max=cutoff):
- self.session().delete(buildset)
+ deleted = True
+ while deleted:
+ deleted = False
+ oldest = None
+ for buildset in self.getBuildsets(
+ updated_max=cutoff, limit=batch_size):
+ deleted = True
+ if oldest is None:
+ oldest = buildset.updated
+ else:
+ oldest = min(oldest, buildset.updated)
+ self.session().delete(buildset)
+ self.session().commit()
+ if deleted:
+ self.log.info("Deleted from %s to %s", oldest, cutoff)
class SQLConnection(BaseConnection):
@@ -409,7 +422,10 @@ class SQLConnection(BaseConnection):
final = sa.Column(sa.Boolean)
held = sa.Column(sa.Boolean)
nodeset = sa.Column(sa.String(255))
- buildset = orm.relationship(BuildSetModel, backref="builds")
+ buildset = orm.relationship(BuildSetModel,
+ backref=orm.backref(
+ "builds",
+ cascade="all, delete-orphan"))
sa.Index(self.table_prefix + 'job_name_buildset_id_idx',
job_name, buildset_id)
@@ -468,7 +484,10 @@ class SQLConnection(BaseConnection):
name = sa.Column(sa.String(255))
url = sa.Column(sa.TEXT())
meta = sa.Column('metadata', sa.TEXT())
- build = orm.relationship(BuildModel, backref="artifacts")
+ build = orm.relationship(BuildModel,
+ backref=orm.backref(
+ "artifacts",
+ cascade="all, delete-orphan"))
class ProvidesModel(Base):
__tablename__ = self.table_prefix + PROVIDES_TABLE
@@ -476,7 +495,10 @@ class SQLConnection(BaseConnection):
build_id = sa.Column(sa.Integer, sa.ForeignKey(
self.table_prefix + BUILD_TABLE + ".id"))
name = sa.Column(sa.String(255))
- build = orm.relationship(BuildModel, backref="provides")
+ build = orm.relationship(BuildModel,
+ backref=orm.backref(
+ "provides",
+ cascade="all, delete-orphan"))
class BuildEventModel(Base):
__tablename__ = self.table_prefix + BUILD_EVENTS_TABLE
@@ -486,7 +508,10 @@ class SQLConnection(BaseConnection):
event_time = sa.Column(sa.DateTime)
event_type = sa.Column(sa.String(255))
description = sa.Column(sa.TEXT())
- build = orm.relationship(BuildModel, backref="build_events")
+ build = orm.relationship(BuildModel,
+ backref=orm.backref(
+ "build_events",
+ cascade="all, delete-orphan"))
self.buildEventModel = BuildEventModel
self.zuul_build_event_table = self.buildEventModel.__table__
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index a49bbbbbf..6dbf62de0 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -1931,6 +1931,7 @@ class AnsibleJob(object):
region=node.region,
host_id=node.host_id,
external_id=getattr(node, 'external_id', None),
+ slot=node.slot,
interface_ip=node.interface_ip,
public_ipv4=node.public_ipv4,
private_ipv4=node.private_ipv4,
@@ -3632,6 +3633,10 @@ class ExecutorServer(BaseMergeServer):
log.exception('Process pool got broken')
self.resetProcessPool()
task.transient_error = True
+ except IOError:
+ log.exception('Got I/O error while updating repo %s/%s',
+ task.connection_name, task.project_name)
+ task.transient_error = True
except Exception:
log.exception('Got exception while updating repo %s/%s',
task.connection_name, task.project_name)
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 36361df11..c3d082a47 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -243,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
- self.updateCommitDependencies(change, None, event=None)
+ self.updateCommitDependencies(change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -285,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
- def isAnyVersionOfChangeInPipeline(self, change):
- # Checks any items in the pipeline
+ def isChangeRelevantToPipeline(self, change):
+ # Checks if any version of the change or its deps matches any
+ # item in the pipeline.
for change_key in self.pipeline.change_list.getChangeKeys():
if change.cache_stat.key.isSameChange(change_key):
return True
+ if isinstance(change, model.Change):
+ for dep_change_ref in change.getNeedsChanges(
+ self.useDependenciesByTopic(change.project)):
+ dep_change_key = ChangeKey.fromReference(dep_change_ref)
+ if change.cache_stat.key.isSameChange(dep_change_key):
+ return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
@@ -315,7 +322,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item.change)
for existing_change in to_refresh:
- self.updateCommitDependencies(existing_change, None, event)
+ self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@@ -516,7 +523,8 @@ class PipelineManager(metaclass=ABCMeta):
def addChange(self, change, event, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True,
- change_queue=None, history=None, dependency_graph=None):
+ change_queue=None, history=None, dependency_graph=None,
+ skip_presence_check=False):
log = get_annotated_logger(self.log, event)
log.debug("Considering adding change %s" % change)
@@ -531,7 +539,9 @@ class PipelineManager(metaclass=ABCMeta):
# If we are adding a live change, check if it's a live item
# anywhere in the pipeline. Otherwise, we will perform the
# duplicate check below on the specific change_queue.
- if live and self.isChangeAlreadyInPipeline(change):
+ if (live and
+ self.isChangeAlreadyInPipeline(change) and
+ not skip_presence_check):
log.debug("Change %s is already in pipeline, ignoring" % change)
return True
@@ -564,7 +574,7 @@ class PipelineManager(metaclass=ABCMeta):
# to date and this is a noop; otherwise, we need to refresh
# them anyway.
if isinstance(change, model.Change):
- self.updateCommitDependencies(change, None, event)
+ self.updateCommitDependencies(change, event)
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
@@ -590,8 +600,10 @@ class PipelineManager(metaclass=ABCMeta):
log.debug("History after enqueuing changes ahead: %s", history)
if self.isChangeAlreadyInQueue(change, change_queue):
- log.debug("Change %s is already in queue, ignoring" % change)
- return True
+ if not skip_presence_check:
+ log.debug("Change %s is already in queue, ignoring",
+ change)
+ return True
cycle = []
if isinstance(change, model.Change):
@@ -625,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta):
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
- self.reportStats(item, added=True)
+ self.reportStats(item, trigger_event=event)
item.quiet = quiet
if item.live:
@@ -857,7 +869,7 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name].put(
event, needs_result=False)
- def updateCommitDependencies(self, change, change_queue, event):
+ def updateCommitDependencies(self, change, event):
log = get_annotated_logger(self.log, event)
must_update_commit_deps = (
@@ -1448,16 +1460,17 @@ class PipelineManager(metaclass=ABCMeta):
item.bundle and
item.bundle.updatesConfig(tenant) and tpc is not None
):
- extra_config_files = set(tpc.extra_config_files)
- extra_config_dirs = set(tpc.extra_config_dirs)
- # Merge extra_config_files and extra_config_dirs of the
- # dependent change
- for item_ahead in item.items_ahead:
- tpc_ahead = tenant.project_configs.get(
- item_ahead.change.project.canonical_name)
- if tpc_ahead:
- extra_config_files.update(tpc_ahead.extra_config_files)
- extra_config_dirs.update(tpc_ahead.extra_config_dirs)
+ # Collect extra config files and dirs of required changes.
+ extra_config_files = set()
+ extra_config_dirs = set()
+ for merger_item in item.current_build_set.merger_items:
+ source = self.sched.connections.getSource(
+ merger_item["connection"])
+ project = source.getProject(merger_item["project"])
+ tpc = tenant.project_configs.get(project.canonical_name)
+ if tpc:
+ extra_config_files.update(tpc.extra_config_files)
+ extra_config_dirs.update(tpc.extra_config_dirs)
ready = self.scheduleMerge(
item,
@@ -1554,6 +1567,7 @@ class PipelineManager(metaclass=ABCMeta):
log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
self.cancelJobs(item)
+ quiet_dequeue = False
if item.isBundleFailing():
item.setDequeuedBundleFailing('Bundle is failing')
elif not meets_reqs:
@@ -1565,7 +1579,28 @@ class PipelineManager(metaclass=ABCMeta):
else:
msg = f'Change {clist} is needed.'
item.setDequeuedNeedingChange(msg)
- if item.live:
+ # If all the dependencies are already in the pipeline
+ # (but not ahead of this change), then we probably
+ # just added updated versions of them, possibly
+ # updating a cycle. In that case, attempt to
+ # re-enqueue this change with the updated deps.
+ if (item.live and
+ all([self.isChangeAlreadyInPipeline(c)
+ for c in needs_changes])):
+ # Try enqueue, if that succeeds, keep this dequeue quiet
+ try:
+ log.info("Attempting re-enqueue of change %s",
+ item.change)
+ quiet_dequeue = self.addChange(
+ item.change, item.event,
+ enqueue_time=item.enqueue_time,
+ quiet=True,
+ skip_presence_check=True)
+ except Exception:
+ log.exception("Unable to re-enqueue change %s "
+ "which is missing dependencies",
+ item.change)
+ if item.live and not quiet_dequeue:
try:
self.reportItem(item)
except exceptions.MergeFailure:
@@ -2197,7 +2232,7 @@ class PipelineManager(metaclass=ABCMeta):
log.error("Reporting item %s received: %s", item, ret)
return action, (not ret)
- def reportStats(self, item, added=False):
+ def reportStats(self, item, trigger_event=None):
if not self.sched.statsd:
return
try:
@@ -2236,18 +2271,21 @@ class PipelineManager(metaclass=ABCMeta):
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
+ if (
+ trigger_event
+ and hasattr(trigger_event, 'arrived_at_scheduler_timestamp')
+ ):
now = time.time()
- arrived = item.event.arrived_at_scheduler_timestamp
+ arrived = trigger_event.arrived_at_scheduler_timestamp
processing = (now - arrived) * 1000
- elapsed = (now - item.event.timestamp) * 1000
+ elapsed = (now - trigger_event.timestamp) * 1000
self.sched.statsd.timing(
basekey + '.event_enqueue_processing_time',
processing)
self.sched.statsd.timing(
basekey + '.event_enqueue_time', elapsed)
self.reportPipelineTiming('event_enqueue_time',
- item.event.timestamp)
+ trigger_event.timestamp)
except Exception:
self.log.exception("Exception reporting pipeline stats")
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index e4688a1b7..845925bfa 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -595,14 +595,32 @@ class Repo(object):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
self.fetch(ref, zuul_event_id=zuul_event_id)
- if len(repo.commit("FETCH_HEAD").parents) > 1:
+ fetch_head = repo.commit("FETCH_HEAD")
+ if len(fetch_head.parents) > 1:
args = ["-s", "resolve", "FETCH_HEAD"]
log.debug("Merging %s with args %s instead of cherry-picking",
ref, args)
repo.git.merge(*args)
else:
log.debug("Cherry-picking %s", ref)
- repo.git.cherry_pick("FETCH_HEAD")
+ # Git doesn't have an option to ignore commits that are already
+ # applied to the working tree when cherry-picking, so pass the
+ # --keep-redundant-commits option, which will cause it to make an
+ # empty commit
+ repo.git.cherry_pick("FETCH_HEAD", keep_redundant_commits=True)
+
+ # If the newly applied commit is empty, it means either:
+ # 1) The commit being cherry-picked was empty, in which the empty
+ # commit should be kept
+ # 2) The commit being cherry-picked was already applied to the
+ # tree, in which case the empty commit should be backed out
+ head = repo.commit("HEAD")
+ parent = head.parents[0]
+ if not any(head.diff(parent)) and \
+ any(fetch_head.diff(fetch_head.parents[0])):
+ log.debug("%s was already applied. Removing it", ref)
+ self._checkout(repo, parent)
+
return repo.head.commit
def merge(self, ref, strategy=None, zuul_event_id=None):
@@ -704,9 +722,11 @@ class Repo(object):
ret = {}
repo = self.createRepoObject(zuul_event_id)
if branch:
- tree = repo.heads[branch].commit.tree
+ head = repo.heads[branch].commit
else:
- tree = repo.commit(commit).tree
+ head = repo.commit(commit)
+ log.debug("Getting files for %s at %s", self.local_path, head.hexsha)
+ tree = head.tree
for fn in files:
if fn in tree:
if tree[fn].type != 'blob':
diff --git a/zuul/model.py b/zuul/model.py
index 1d82b5f2c..e36f9d670 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1408,6 +1408,7 @@ class Node(ConfigObject):
self.private_ipv6 = None
self.connection_port = 22
self.connection_type = None
+ self.slot = None
self._keys = []
self.az = None
self.provider = None
@@ -3006,20 +3007,30 @@ class Job(ConfigObject):
# possibility of success, which may help prevent errors in
# most cases. If we don't raise an error here, the
# possibility of later failure still remains.
- nonfinal_parents = [p for p in parents if not p.final]
- if not nonfinal_parents:
+ nonfinal_parent_found = False
+ nonintermediate_parent_found = False
+ nonprotected_parent_found = False
+ for p in parents:
+ if not p.final:
+ nonfinal_parent_found = True
+ if not p.intermediate:
+ nonintermediate_parent_found = True
+ if not p.protected:
+ nonprotected_parent_found = True
+ if (nonfinal_parent_found and
+ nonintermediate_parent_found and
+ nonprotected_parent_found):
+ break
+
+ if not nonfinal_parent_found:
raise Exception(
f'The parent of job "{self.name}", "{self.parent}" '
'is final and can not act as a parent')
- nonintermediate_parents = [
- p for p in parents if not p.intermediate]
- if not nonintermediate_parents and not self.abstract:
+ if not nonintermediate_parent_found and not self.abstract:
raise Exception(
f'The parent of job "{self.name}", "{self.parent}" '
f'is intermediate but "{self.name}" is not abstract')
- nonprotected_parents = [
- p for p in parents if not p.protected]
- if (not nonprotected_parents and
+ if (not nonprotected_parent_found and
parents[0].source_context.project_canonical_name !=
self.source_context.project_canonical_name):
raise Exception(
@@ -4666,6 +4677,37 @@ class BuildSet(zkobject.ZKObject):
return Attributes(uuid=self.uuid)
+class EventInfo:
+
+ def __init__(self):
+ self.zuul_event_id = None
+ self.timestamp = time.time()
+ self.span_context = None
+
+ @classmethod
+ def fromEvent(cls, event):
+ tinfo = cls()
+ tinfo.zuul_event_id = event.zuul_event_id
+ tinfo.timestamp = event.timestamp
+ tinfo.span_context = event.span_context
+ return tinfo
+
+ @classmethod
+ def fromDict(cls, d):
+ tinfo = cls()
+ tinfo.zuul_event_id = d["zuul_event_id"]
+ tinfo.timestamp = d["timestamp"]
+ tinfo.span_context = d["span_context"]
+ return tinfo
+
+ def toDict(self):
+ return {
+ "zuul_event_id": self.zuul_event_id,
+ "timestamp": self.timestamp,
+ "span_context": self.span_context,
+ }
+
+
class QueueItem(zkobject.ZKObject):
"""Represents the position of a Change in a ChangeQueue.
@@ -4700,7 +4742,7 @@ class QueueItem(zkobject.ZKObject):
live=True, # Whether an item is intended to be processed at all
layout_uuid=None,
_cached_sql_results={},
- event=None, # The trigger event that lead to this queue item
+ event=None, # Info about the event that lead to this queue item
# Additional container for connection specifig information to be
# used by reporters throughout the lifecycle
@@ -4722,6 +4764,9 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
+ if COMPONENT_REGISTRY.model_api >= 13:
+ obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
+
data = obj._trySerialize(context)
obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
@@ -4750,10 +4795,18 @@ class QueueItem(zkobject.ZKObject):
return (tenant, pipeline, uuid)
def serialize(self, context):
- if isinstance(self.event, TriggerEvent):
- event_type = "TriggerEvent"
+ if COMPONENT_REGISTRY.model_api < 13:
+ if isinstance(self.event, TriggerEvent):
+ event_type = "TriggerEvent"
+ else:
+ event_type = self.event.__class__.__name__
else:
- event_type = self.event.__class__.__name__
+ event_type = "EventInfo"
+ if not isinstance(self.event, EventInfo):
+ # Convert our local trigger event to a trigger info
+ # object. This will only happen on the transition to
+ # model API version 13.
+ self._set(event=EventInfo.fromEvent(self.event))
data = {
"uuid": self.uuid,
@@ -4795,14 +4848,18 @@ class QueueItem(zkobject.ZKObject):
# child objects.
self._set(uuid=data["uuid"])
- event_type = data["event"]["type"]
- if event_type == "TriggerEvent":
- event_class = (
- self.pipeline.manager.sched.connections.getTriggerEventClass(
- data["event"]["data"]["driver_name"])
- )
+ if COMPONENT_REGISTRY.model_api < 13:
+ event_type = data["event"]["type"]
+ if event_type == "TriggerEvent":
+ event_class = (
+ self.pipeline.manager.sched.connections
+ .getTriggerEventClass(
+ data["event"]["data"]["driver_name"])
+ )
+ else:
+ event_class = EventTypeIndex.event_type_mapping.get(event_type)
else:
- event_class = EventTypeIndex.event_type_mapping.get(event_type)
+ event_class = EventInfo
if event_class is None:
raise NotImplementedError(
@@ -7740,31 +7797,33 @@ class Layout(object):
def addJob(self, job):
# We can have multiple variants of a job all with the same
# name, but these variants must all be defined in the same repo.
- prior_jobs = [j for j in self.getJobs(job.name) if
- j.source_context.project_canonical_name !=
- job.source_context.project_canonical_name]
# Unless the repo is permitted to shadow another. If so, and
# the job we are adding is from a repo that is permitted to
# shadow the one with the older jobs, skip adding this job.
job_project = job.source_context.project_canonical_name
job_tpc = self.tenant.project_configs[job_project]
skip_add = False
- for prior_job in prior_jobs[:]:
- prior_project = prior_job.source_context.project_canonical_name
- if prior_project in job_tpc.shadow_projects:
- prior_jobs.remove(prior_job)
- skip_add = True
-
+ prior_jobs = self.jobs.get(job.name, [])
if prior_jobs:
- raise Exception("Job %s in %s is not permitted to shadow "
- "job %s in %s" % (
- job,
- job.source_context.project_name,
- prior_jobs[0],
- prior_jobs[0].source_context.project_name))
+ # All jobs we've added so far should be from the same
+ # project, so pick the first one.
+ prior_job = prior_jobs[0]
+ if (prior_job.source_context.project_canonical_name !=
+ job.source_context.project_canonical_name):
+ prior_project = prior_job.source_context.project_canonical_name
+ if prior_project in job_tpc.shadow_projects:
+ skip_add = True
+ else:
+ raise Exception("Job %s in %s is not permitted to shadow "
+ "job %s in %s" % (
+ job,
+ job.source_context.project_name,
+ prior_job,
+ prior_job.source_context.project_name))
+
if skip_add:
return False
- if job.name in self.jobs:
+ if prior_jobs:
self.jobs[job.name].append(job)
else:
self.jobs[job.name] = [job]
diff --git a/zuul/model_api.py b/zuul/model_api.py
index ccb12077d..0244296dd 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
-MODEL_API = 12
+MODEL_API = 13
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e646c09b8..cd15a878c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2519,9 +2519,26 @@ class Scheduler(threading.Thread):
event.span_context = tracing.getSpanContext(span)
for pipeline in tenant.layout.pipelines.values():
+ # For most kinds of dependencies, it's sufficient to check
+ # if this change is already in the pipeline, because the
+ # only way to update a dependency cycle is to update one
+ # of the changes in it. However, dependencies-by-topic
+ # can have changes added to the cycle without updating any
+ # of the existing changes in the cycle. That means in
+ # order to detect whether a new change is added to an
+ # existing cycle in the pipeline, we need to know all of
+ # the dependencies of the new change, and check if *they*
+ # are in the pipeline. Therefore, go ahead and update our
+ # dependencies here so they are available for comparison
+ # against the pipeline contents. This front-loads some
+ # work that otherwise would happen in the pipeline
+ # manager, but the result of the work goes into the change
+ # cache, so it's not wasted; it's just less parallelized.
+ if isinstance(change, Change):
+ pipeline.manager.updateCommitDependencies(change, event)
if (
pipeline.manager.eventMatches(event, change)
- or pipeline.manager.isAnyVersionOfChangeInPipeline(change)
+ or pipeline.manager.isChangeRelevantToPipeline(change)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name
diff --git a/zuul/zk/job_request_queue.py b/zuul/zk/job_request_queue.py
index 175c57b90..7c85ae95e 100644
--- a/zuul/zk/job_request_queue.py
+++ b/zuul/zk/job_request_queue.py
@@ -609,7 +609,7 @@ class JobRequestQueue(ZooKeeperSimpleBase):
self.kazoo_client.delete(lock_path, recursive=True)
except Exception:
self.log.exception(
- "Unable to delete lock %s", path)
+ "Unable to delete lock %s", lock_path)
except Exception:
self.log.exception("Error cleaning up locks %s", self)