diff options
Diffstat (limited to 'zuul')
-rw-r--r-- | zuul/ansible/base/callback/zuul_stream.py | 5 | ||||
-rwxr-xr-x | zuul/cmd/client.py | 44 | ||||
-rw-r--r-- | zuul/configloader.py | 112 | ||||
-rw-r--r-- | zuul/driver/elasticsearch/reporter.py | 4 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritconnection.py | 52 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritmodel.py | 4 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritsource.py | 11 | ||||
-rw-r--r-- | zuul/driver/github/githubconnection.py | 22 | ||||
-rw-r--r-- | zuul/driver/github/githubreporter.py | 13 | ||||
-rw-r--r-- | zuul/driver/gitlab/gitlabconnection.py | 51 | ||||
-rw-r--r-- | zuul/driver/mqtt/mqttconnection.py | 15 | ||||
-rw-r--r-- | zuul/driver/sql/sqlconnection.py | 39 | ||||
-rw-r--r-- | zuul/executor/server.py | 5 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 92 | ||||
-rw-r--r-- | zuul/merger/merger.py | 28 | ||||
-rw-r--r-- | zuul/model.py | 129 | ||||
-rw-r--r-- | zuul/model_api.py | 2 | ||||
-rw-r--r-- | zuul/scheduler.py | 19 | ||||
-rw-r--r-- | zuul/zk/job_request_queue.py | 2 |
19 files changed, 455 insertions, 194 deletions
diff --git a/zuul/ansible/base/callback/zuul_stream.py b/zuul/ansible/base/callback/zuul_stream.py index b5c14691b..3f886c797 100644 --- a/zuul/ansible/base/callback/zuul_stream.py +++ b/zuul/ansible/base/callback/zuul_stream.py @@ -44,6 +44,7 @@ import time from ansible.plugins.callback import default from ansible.module_utils._text import to_text +from ansible.module_utils.parsing.convert_bool import boolean from zuul.ansible import paths from zuul.ansible import logconfig @@ -333,6 +334,10 @@ class CallbackModule(default.CallbackModule): if (ip in ('localhost', '127.0.0.1')): # Don't try to stream from localhost continue + if boolean(play_vars[host].get( + 'zuul_console_disabled', False)): + # The user has told us not to even try + continue if play_vars[host].get('ansible_connection') in ('winrm',): # The winrm connections don't support streaming for now continue diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index 031b10a1e..6fa20c7c4 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -30,16 +30,14 @@ import time import textwrap import requests import urllib.parse -from uuid import uuid4 import zuul.cmd from zuul.lib.config import get_default -from zuul.model import SystemAttributes, PipelineState +from zuul.model import SystemAttributes, PipelineState, PipelineChangeList from zuul.zk import ZooKeeperClient from zuul.lib.keystorage import KeyStorage -from zuul.zk.locks import tenant_write_lock +from zuul.zk.locks import tenant_read_lock, pipeline_lock from zuul.zk.zkobject import ZKContext -from zuul.zk.layout import LayoutState, LayoutStateStore from zuul.zk.components import COMPONENT_REGISTRY @@ -542,6 +540,10 @@ class Client(zuul.cmd.ZuulApp): cmd_prune_database.add_argument( '--older-than', help='relative time (e.g., "24h" or "180d")') + cmd_prune_database.add_argument( + '--batch-size', + default=10000, + help='transaction batch size') cmd_prune_database.set_defaults(func=self.prune_database) return parser @@ -1029,28 +1031,18 @@ class Client(zuul.cmd.ZuulApp): safe_tenant = urllib.parse.quote_plus(args.tenant) safe_pipeline = urllib.parse.quote_plus(args.pipeline) COMPONENT_REGISTRY.create(zk_client) - with tenant_write_lock(zk_client, args.tenant) as lock: + self.log.info('get tenant') + with tenant_read_lock(zk_client, args.tenant): path = f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}' - layout_uuid = None - zk_client.client.delete( - f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}', - recursive=True) - with ZKContext(zk_client, lock, None, self.log) as context: - ps = PipelineState.new(context, _path=path, - layout_uuid=layout_uuid) - # Force everyone to make a new layout for this tenant in - # order to rebuild the shared change queues. - layout_state = LayoutState( - tenant_name=args.tenant, - hostname='admin command', - last_reconfigured=int(time.time()), - last_reconfigure_event_ltime=-1, - uuid=uuid4().hex, - branch_cache_min_ltimes={}, - ltime=ps._zstat.last_modified_transaction_id, - ) - tenant_layout_state = LayoutStateStore(zk_client, lambda: None) - tenant_layout_state[args.tenant] = layout_state + self.log.info('get pipe') + with pipeline_lock( + zk_client, args.tenant, args.pipeline + ) as plock: + self.log.info('got locks') + zk_client.client.delete(path, recursive=True) + with ZKContext(zk_client, plock, None, self.log) as context: + PipelineState.new(context, _path=path, layout_uuid=None) + PipelineChangeList.new(context) sys.exit(0) @@ -1061,7 +1053,7 @@ class Client(zuul.cmd.ZuulApp): cutoff = parse_cutoff(now, args.before, args.older_than) self.configure_connections(source_only=False, require_sql=True) connection = self.connections.getSqlConnection() - connection.deleteBuildsets(cutoff) + connection.deleteBuildsets(cutoff, args.batch_size) sys.exit(0) diff --git a/zuul/configloader.py b/zuul/configloader.py index fe22fe0f8..4f472cb4e 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -2151,21 +2151,26 @@ class TenantParser(object): for future in futures: future.result() - try: - self._processCatJobs(abide, tenant, loading_errors, jobs, - min_ltimes) - except Exception: - self.log.exception("Error processing cat jobs, canceling") - for job in jobs: + for i, job in enumerate(jobs, start=1): + try: try: - self.log.debug("Canceling cat job %s", job) + self._processCatJob(abide, tenant, loading_errors, job, + min_ltimes) + except TimeoutError: self.merger.cancel(job) - except Exception: - self.log.exception("Unable to cancel job %s", job) - if not ignore_cat_exception: - raise - if not ignore_cat_exception: - raise + raise + except Exception: + self.log.exception("Error processing cat job") + if not ignore_cat_exception: + # Cancel remaining jobs + for cancel_job in jobs[i:]: + self.log.debug("Canceling cat job %s", cancel_job) + try: + self.merger.cancel(cancel_job) + except Exception: + self.log.exception( + "Unable to cancel job %s", cancel_job) + raise def _cacheTenantYAMLBranch(self, abide, tenant, loading_errors, min_ltimes, tpc, project, branch, jobs): @@ -2234,49 +2239,48 @@ class TenantParser(object): job.source_context = source_context jobs.append(job) - def _processCatJobs(self, abide, tenant, loading_errors, jobs, min_ltimes): + def _processCatJob(self, abide, tenant, loading_errors, job, min_ltimes): # Called at the end of _cacheTenantYAML after all cat jobs # have been submitted - for job in jobs: - self.log.debug("Waiting for cat job %s" % (job,)) - res = job.wait(self.merger.git_timeout) - if not res: - # We timed out - raise Exception("Cat job %s timed out; consider setting " - "merger.git_timeout in zuul.conf" % (job,)) - if not job.updated: - raise Exception("Cat job %s failed" % (job,)) - self.log.debug("Cat job %s got files %s" % - (job, job.files.keys())) - - self._updateUnparsedBranchCache(abide, tenant, job.source_context, - job.files, loading_errors, - job.ltime, min_ltimes) - - # Save all config files in Zookeeper (not just for the current tpc) - files_cache = self.unparsed_config_cache.getFilesCache( - job.source_context.project_canonical_name, - job.source_context.branch) - with self.unparsed_config_cache.writeLock( - job.source_context.project_canonical_name): - # Prevent files cache ltime from going backward - if files_cache.ltime >= job.ltime: - self.log.info( - "Discarding job %s result since the files cache was " - "updated in the meantime", job) - continue - # Since the cat job returns all required config files - # for ALL tenants the project is a part of, we can - # clear the whole cache and then populate it with the - # updated content. - files_cache.clear() - for fn, content in job.files.items(): - # Cache file in Zookeeper - if content is not None: - files_cache[fn] = content - files_cache.setValidFor(job.extra_config_files, - job.extra_config_dirs, - job.ltime) + self.log.debug("Waiting for cat job %s" % (job,)) + res = job.wait(self.merger.git_timeout) + if not res: + # We timed out + raise TimeoutError(f"Cat job {job} timed out; consider setting " + "merger.git_timeout in zuul.conf") + if not job.updated: + raise Exception("Cat job %s failed" % (job,)) + self.log.debug("Cat job %s got files %s" % + (job, job.files.keys())) + + self._updateUnparsedBranchCache(abide, tenant, job.source_context, + job.files, loading_errors, + job.ltime, min_ltimes) + + # Save all config files in Zookeeper (not just for the current tpc) + files_cache = self.unparsed_config_cache.getFilesCache( + job.source_context.project_canonical_name, + job.source_context.branch) + with self.unparsed_config_cache.writeLock( + job.source_context.project_canonical_name): + # Prevent files cache ltime from going backward + if files_cache.ltime >= job.ltime: + self.log.info( + "Discarding job %s result since the files cache was " + "updated in the meantime", job) + return + # Since the cat job returns all required config files + # for ALL tenants the project is a part of, we can + # clear the whole cache and then populate it with the + # updated content. + files_cache.clear() + for fn, content in job.files.items(): + # Cache file in Zookeeper + if content is not None: + files_cache[fn] = content + files_cache.setValidFor(job.extra_config_files, + job.extra_config_dirs, + job.ltime) def _updateUnparsedBranchCache(self, abide, tenant, source_context, files, loading_errors, ltime, min_ltimes): diff --git a/zuul/driver/elasticsearch/reporter.py b/zuul/driver/elasticsearch/reporter.py index 7802cb609..e5e90e052 100644 --- a/zuul/driver/elasticsearch/reporter.py +++ b/zuul/driver/elasticsearch/reporter.py @@ -103,7 +103,9 @@ class ElasticsearchReporter(BaseReporter): build_doc['job_vars'] = job.variables if self.index_returned_vars: - build_doc['job_returned_vars'] = build.result_data + rdata = build.result_data.copy() + rdata.pop('zuul', None) + build_doc['job_returned_vars'] = rdata docs.append(build_doc) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 276365e1d..990b7b235 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -161,6 +161,8 @@ class GerritEventConnector(threading.Thread): IGNORED_EVENTS = ( 'cache-eviction', # evict-cache plugin + 'fetch-ref-replicated', + 'fetch-ref-replication-scheduled', 'ref-replicated', 'ref-replication-scheduled', 'ref-replication-done' @@ -1180,9 +1182,34 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): return True if change.wip: return False - if change.missing_labels <= set(allow_needs): - return True - return False + if change.missing_labels > set(allow_needs): + self.log.debug("Unable to merge due to " + "missing labels: %s", change.missing_labels) + return False + for sr in change.submit_requirements: + if sr.get('status') == 'UNSATISFIED': + # Otherwise, we don't care and should skip. + + # We're going to look at each unsatisfied submit + # requirement, and if one of the involved labels is an + # "allow_needs" label, we will assume that Zuul may be + # able to take an action which can cause the + # requirement to be satisfied, and we will ignore it. + # Otherwise, it is likely a requirement that Zuul can + # not alter in which case the requirement should stand + # and block merging. + result = sr.get("submittability_expression_result", {}) + expression = result.get("expression", '') + expr_contains_allow = False + for allow in allow_needs: + if f'label:{allow}' in expression: + expr_contains_allow = True + break + if not expr_contains_allow: + self.log.debug("Unable to merge due to " + "submit requirement: %s", sr) + return False + return True def getProjectOpenChanges(self, project: Project) -> List[GerritChange]: # This is a best-effort function in case Gerrit is unable to return @@ -1441,13 +1468,22 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): return data def queryChangeHTTP(self, number, event=None): - data = self.get('changes/%s?o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&' - 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&' - 'o=DETAILED_LABELS' % (number,)) + query = ('changes/%s?o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&' + 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&' + 'o=DETAILED_LABELS' % (number,)) + if self.version >= (3, 5, 0): + query += '&o=SUBMIT_REQUIREMENTS' + data = self.get(query) related = self.get('changes/%s/revisions/%s/related' % ( number, data['current_revision'])) - files = self.get('changes/%s/revisions/%s/files?parent=1' % ( - number, data['current_revision'])) + + files_query = 'changes/%s/revisions/%s/files' % ( + number, data['current_revision']) + + if data['revisions'][data['current_revision']]['commit']['parents']: + files_query += '?parent=1' + + files = self.get(files_query) return data, related, files def queryChange(self, number, event=None): diff --git a/zuul/driver/gerrit/gerritmodel.py b/zuul/driver/gerrit/gerritmodel.py index 0ac3e7f9d..4ac291f2b 100644 --- a/zuul/driver/gerrit/gerritmodel.py +++ b/zuul/driver/gerrit/gerritmodel.py @@ -34,6 +34,7 @@ class GerritChange(Change): self.wip = None self.approvals = [] self.missing_labels = set() + self.submit_requirements = [] self.commit = None self.zuul_query_ltime = None @@ -52,6 +53,7 @@ class GerritChange(Change): "wip": self.wip, "approvals": self.approvals, "missing_labels": list(self.missing_labels), + "submit_requirements": self.submit_requirements, "commit": self.commit, "zuul_query_ltime": self.zuul_query_ltime, }) @@ -64,6 +66,7 @@ class GerritChange(Change): self.wip = data["wip"] self.approvals = data["approvals"] self.missing_labels = set(data["missing_labels"]) + self.submit_requirements = data.get("submit_requirements", []) self.commit = data.get("commit") self.zuul_query_ltime = data.get("zuul_query_ltime") @@ -189,6 +192,7 @@ class GerritChange(Change): if 'approved' in label_data: continue self.missing_labels.add(label_name) + self.submit_requirements = data.get('submit_requirements', []) self.open = data['status'] == 'NEW' self.status = data['status'] self.wip = data.get('work_in_progress', False) diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py index f42e93254..b0bd3c448 100644 --- a/zuul/driver/gerrit/gerritsource.py +++ b/zuul/driver/gerrit/gerritsource.py @@ -164,11 +164,14 @@ class GerritSource(BaseSource): change = self.connection._getChange(change_key) changes[change_key] = change - for change in changes.values(): - for git_key in change.git_needs_changes: - if git_key in changes: + # Convert to list here because the recursive call can mutate + # the set. + for change in list(changes.values()): + for git_change_ref in change.git_needs_changes: + change_key = ChangeKey.fromReference(git_change_ref) + if change_key in changes: continue - git_change = self.getChange(git_key) + git_change = self.getChange(change_key) if not git_change.topic or git_change.topic == topic: continue self.getChangesByTopic(git_change.topic, changes) diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 182c83bae..a1353cb4d 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -81,6 +81,10 @@ ANNOTATION_LEVELS = { "warning": "warning", "error": "failure", } +# The maximum size for the 'message' field is 64 KB. Since it's unclear +# from the Github docs if the unit is KiB or KB we'll use KB to be on +# the safe side. +ANNOTATION_MAX_MESSAGE_SIZE = 64 * 1000 EventTuple = collections.namedtuple( "EventTuple", [ @@ -403,7 +407,8 @@ class GithubEventProcessor(object): # Returns empty on unhandled events return - self.log.debug("Handling %s event", self.event_type) + self.log.debug("Handling %s event with installation id %s", + self.event_type, installation_id) events = [] try: events = method() @@ -439,7 +444,11 @@ class GithubEventProcessor(object): # branch is now protected. if hasattr(event, "branch") and event.branch: protected = None - if change: + # Only use the `branch_protected` flag if the + # target branch of change and event are the same. + # The base branch could have changed in the + # meantime. + if change and change.branch == event.branch: # PR based events already have the information if the # target branch is protected so take the information # from there. @@ -675,6 +684,11 @@ class GithubEventProcessor(object): branch, project_name) events.append( self._branch_protection_rule_to_event(project_name, branch)) + + for event in events: + # Make sure every event has a branch cache ltime + self.connection.clearConnectionCacheOnBranchEvent(event) + return events def _branch_protection_rule_to_event(self, project_name, branch): @@ -2428,7 +2442,9 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): raw_annotation = { "path": fn, "annotation_level": annotation_level, - "message": comment["message"], + "message": comment["message"].encode( + "utf8")[:ANNOTATION_MAX_MESSAGE_SIZE].decode( + "utf8", "ignore"), "start_line": start_line, "end_line": end_line, "start_column": start_column, diff --git a/zuul/driver/github/githubreporter.py b/zuul/driver/github/githubreporter.py index 1f44303bd..396516038 100644 --- a/zuul/driver/github/githubreporter.py +++ b/zuul/driver/github/githubreporter.py @@ -193,13 +193,13 @@ class GithubReporter(BaseReporter): self.log.warning('Merge mode %s not supported by Github', mode) raise MergeFailure('Merge mode %s not supported by Github' % mode) - merge_mode = self.merge_modes[merge_mode] project = item.change.project.name pr_number = item.change.number sha = item.change.patchset log.debug('Reporting change %s, params %s, merging via API', item.change, self.config) - message = self._formatMergeMessage(item.change) + message = self._formatMergeMessage(item.change, merge_mode) + merge_mode = self.merge_modes[merge_mode] for i in [1, 2]: try: @@ -319,10 +319,13 @@ class GithubReporter(BaseReporter): self.connection.unlabelPull(project, pr_number, label, zuul_event_id=item.event) - def _formatMergeMessage(self, change): + def _formatMergeMessage(self, change, merge_mode): message = [] - if change.title: - message.append(change.title) + # For squash merges we don't need to add the title to the body + # as it will already be set as the commit subject. + if merge_mode != model.MERGER_SQUASH_MERGE: + if change.title: + message.append(change.title) if change.body_text: message.append(change.body_text) merge_message = "\n\n".join(message) diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 40cac241a..da423f085 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -281,6 +281,7 @@ class GitlabAPIClient(): self.api_token = api_token self.keepalive = keepalive self.disable_pool = disable_pool + self.get_mr_wait_factor = 2 self.headers = {'Authorization': 'Bearer %s' % ( self.api_token)} @@ -342,11 +343,36 @@ class GitlabAPIClient(): # https://docs.gitlab.com/ee/api/merge_requests.html#get-single-mr def get_mr(self, project_name, number, zuul_event_id=None): - path = "/projects/%s/merge_requests/%s" % ( - quote_plus(project_name), number) - resp = self.get(self.baseurl + path, zuul_event_id=zuul_event_id) - self._manage_error(*resp, zuul_event_id=zuul_event_id) - return resp[0] + log = get_annotated_logger(self.log, zuul_event_id) + attempts = 0 + + def _get_mr(): + path = "/projects/%s/merge_requests/%s" % ( + quote_plus(project_name), number) + resp = self.get(self.baseurl + path, zuul_event_id=zuul_event_id) + self._manage_error(*resp, zuul_event_id=zuul_event_id) + return resp[0] + + # The Gitlab API might not return a complete MR description as + # some attributes are updated asynchronously. This loop ensures + # we query the API until all async attributes are available or until + # a defined delay is reached. + while True: + attempts += 1 + mr = _get_mr() + # The diff_refs attribute is updated asynchronously + if all(map(lambda k: mr.get(k, None), ['diff_refs'])): + return mr + if attempts > 4: + log.warning( + "Fetched MR %s#%s with imcomplete data" % ( + project_name, number)) + return mr + wait_delay = attempts * self.get_mr_wait_factor + log.info( + "Will retry to fetch %s#%s due to imcomplete data " + "(in %s seconds) ..." % (project_name, number, wait_delay)) + time.sleep(wait_delay) # https://docs.gitlab.com/ee/api/branches.html#list-repository-branches def get_project_branches(self, project_name, exclude_unprotected, @@ -607,17 +633,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): return change project = self.source.getProject(change_key.project_name) if not change: - if not event: - self.log.error("Change %s not found in cache and no event", - change_key) - if event: - url = event.change_url change = MergeRequest(project.name) change.project = project change.number = number # patch_number is the tips commit SHA of the MR change.patchset = change_key.revision - change.url = url or self.getMRUrl(project.name, number) + change.url = self.getMRUrl(project.name, number) change.uris = [change.url.split('://', 1)[-1]] # remove scheme log.debug("Getting change mr#%s from project %s" % ( @@ -672,8 +693,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): change.ref = "refs/merge-requests/%s/head" % change.number change.branch = change.mr['target_branch'] change.is_current_patchset = (change.mr['sha'] == change.patchset) - change.base_sha = change.mr['diff_refs'].get('base_sha') - change.commit_id = change.mr['diff_refs'].get('head_sha') + change.commit_id = event.patch_number + diff_refs = change.mr.get("diff_refs", {}) + if diff_refs: + change.base_sha = diff_refs.get('base_sha') + else: + change.base_sha = None change.owner = change.mr['author'].get('username') # Files changes are not part of the Merge Request data # See api/merge_requests.html#get-single-mr-changes diff --git a/zuul/driver/mqtt/mqttconnection.py b/zuul/driver/mqtt/mqttconnection.py index 7f221282f..4a028ba23 100644 --- a/zuul/driver/mqtt/mqttconnection.py +++ b/zuul/driver/mqtt/mqttconnection.py @@ -64,6 +64,12 @@ class MQTTConnection(BaseConnection): def onLoad(self, zk_client, component_registry): self.log.debug("Starting MQTT Connection") + + # If the connection was not loaded by a scheduler, but by e.g. + # zuul-web, we want to stop here. + if not self.sched: + return + try: self.client.connect( self.connection_config.get('server', 'localhost'), @@ -76,10 +82,11 @@ class MQTTConnection(BaseConnection): self.client.loop_start() def onStop(self): - self.log.debug("Stopping MQTT Connection") - self.client.loop_stop() - self.client.disconnect() - self.connected = False + if self.connected: + self.log.debug("Stopping MQTT Connection") + self.client.loop_stop() + self.client.disconnect() + self.connected = False def publish(self, topic, message, qos, zuul_event_id): log = get_annotated_logger(self.log, zuul_event_id) diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py index 2d5c39ec3..7a4aea626 100644 --- a/zuul/driver/sql/sqlconnection.py +++ b/zuul/driver/sql/sqlconnection.py @@ -247,12 +247,25 @@ class DatabaseSession(object): except sqlalchemy.orm.exc.MultipleResultsFound: raise Exception("Multiple buildset found with uuid %s", uuid) - def deleteBuildsets(self, cutoff): + def deleteBuildsets(self, cutoff, batch_size): """Delete buildsets before the cutoff""" # delete buildsets updated before the cutoff - for buildset in self.getBuildsets(updated_max=cutoff): - self.session().delete(buildset) + deleted = True + while deleted: + deleted = False + oldest = None + for buildset in self.getBuildsets( + updated_max=cutoff, limit=batch_size): + deleted = True + if oldest is None: + oldest = buildset.updated + else: + oldest = min(oldest, buildset.updated) + self.session().delete(buildset) + self.session().commit() + if deleted: + self.log.info("Deleted from %s to %s", oldest, cutoff) class SQLConnection(BaseConnection): @@ -409,7 +422,10 @@ class SQLConnection(BaseConnection): final = sa.Column(sa.Boolean) held = sa.Column(sa.Boolean) nodeset = sa.Column(sa.String(255)) - buildset = orm.relationship(BuildSetModel, backref="builds") + buildset = orm.relationship(BuildSetModel, + backref=orm.backref( + "builds", + cascade="all, delete-orphan")) sa.Index(self.table_prefix + 'job_name_buildset_id_idx', job_name, buildset_id) @@ -468,7 +484,10 @@ class SQLConnection(BaseConnection): name = sa.Column(sa.String(255)) url = sa.Column(sa.TEXT()) meta = sa.Column('metadata', sa.TEXT()) - build = orm.relationship(BuildModel, backref="artifacts") + build = orm.relationship(BuildModel, + backref=orm.backref( + "artifacts", + cascade="all, delete-orphan")) class ProvidesModel(Base): __tablename__ = self.table_prefix + PROVIDES_TABLE @@ -476,7 +495,10 @@ class SQLConnection(BaseConnection): build_id = sa.Column(sa.Integer, sa.ForeignKey( self.table_prefix + BUILD_TABLE + ".id")) name = sa.Column(sa.String(255)) - build = orm.relationship(BuildModel, backref="provides") + build = orm.relationship(BuildModel, + backref=orm.backref( + "provides", + cascade="all, delete-orphan")) class BuildEventModel(Base): __tablename__ = self.table_prefix + BUILD_EVENTS_TABLE @@ -486,7 +508,10 @@ class SQLConnection(BaseConnection): event_time = sa.Column(sa.DateTime) event_type = sa.Column(sa.String(255)) description = sa.Column(sa.TEXT()) - build = orm.relationship(BuildModel, backref="build_events") + build = orm.relationship(BuildModel, + backref=orm.backref( + "build_events", + cascade="all, delete-orphan")) self.buildEventModel = BuildEventModel self.zuul_build_event_table = self.buildEventModel.__table__ diff --git a/zuul/executor/server.py b/zuul/executor/server.py index a49bbbbbf..6dbf62de0 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -1931,6 +1931,7 @@ class AnsibleJob(object): region=node.region, host_id=node.host_id, external_id=getattr(node, 'external_id', None), + slot=node.slot, interface_ip=node.interface_ip, public_ipv4=node.public_ipv4, private_ipv4=node.private_ipv4, @@ -3632,6 +3633,10 @@ class ExecutorServer(BaseMergeServer): log.exception('Process pool got broken') self.resetProcessPool() task.transient_error = True + except IOError: + log.exception('Got I/O error while updating repo %s/%s', + task.connection_name, task.project_name) + task.transient_error = True except Exception: log.exception('Got exception while updating repo %s/%s', task.connection_name, task.project_name) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 36361df11..c3d082a47 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -243,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta): and self.useDependenciesByTopic(change.project)) if (update_commit_dependencies or update_topic_dependencies): - self.updateCommitDependencies(change, None, event=None) + self.updateCommitDependencies(change, event=None) self._change_cache[change.cache_key] = change resolved_changes.append(change) return resolved_changes @@ -285,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta): return True return False - def isAnyVersionOfChangeInPipeline(self, change): - # Checks any items in the pipeline + def isChangeRelevantToPipeline(self, change): + # Checks if any version of the change or its deps matches any + # item in the pipeline. for change_key in self.pipeline.change_list.getChangeKeys(): if change.cache_stat.key.isSameChange(change_key): return True + if isinstance(change, model.Change): + for dep_change_ref in change.getNeedsChanges( + self.useDependenciesByTopic(change.project)): + dep_change_key = ChangeKey.fromReference(dep_change_ref) + if change.cache_stat.key.isSameChange(dep_change_key): + return True return False def isChangeAlreadyInQueue(self, change, change_queue): @@ -315,7 +322,7 @@ class PipelineManager(metaclass=ABCMeta): to_refresh.add(item.change) for existing_change in to_refresh: - self.updateCommitDependencies(existing_change, None, event) + self.updateCommitDependencies(existing_change, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: @@ -516,7 +523,8 @@ class PipelineManager(metaclass=ABCMeta): def addChange(self, change, event, quiet=False, enqueue_time=None, ignore_requirements=False, live=True, - change_queue=None, history=None, dependency_graph=None): + change_queue=None, history=None, dependency_graph=None, + skip_presence_check=False): log = get_annotated_logger(self.log, event) log.debug("Considering adding change %s" % change) @@ -531,7 +539,9 @@ class PipelineManager(metaclass=ABCMeta): # If we are adding a live change, check if it's a live item # anywhere in the pipeline. Otherwise, we will perform the # duplicate check below on the specific change_queue. - if live and self.isChangeAlreadyInPipeline(change): + if (live and + self.isChangeAlreadyInPipeline(change) and + not skip_presence_check): log.debug("Change %s is already in pipeline, ignoring" % change) return True @@ -564,7 +574,7 @@ class PipelineManager(metaclass=ABCMeta): # to date and this is a noop; otherwise, we need to refresh # them anyway. if isinstance(change, model.Change): - self.updateCommitDependencies(change, None, event) + self.updateCommitDependencies(change, event) with self.getChangeQueue(change, event, change_queue) as change_queue: if not change_queue: @@ -590,8 +600,10 @@ class PipelineManager(metaclass=ABCMeta): log.debug("History after enqueuing changes ahead: %s", history) if self.isChangeAlreadyInQueue(change, change_queue): - log.debug("Change %s is already in queue, ignoring" % change) - return True + if not skip_presence_check: + log.debug("Change %s is already in queue, ignoring", + change) + return True cycle = [] if isinstance(change, model.Change): @@ -625,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta): if enqueue_time: item.enqueue_time = enqueue_time item.live = live - self.reportStats(item, added=True) + self.reportStats(item, trigger_event=event) item.quiet = quiet if item.live: @@ -857,7 +869,7 @@ class PipelineManager(metaclass=ABCMeta): self.pipeline.tenant.name][other_pipeline.name].put( event, needs_result=False) - def updateCommitDependencies(self, change, change_queue, event): + def updateCommitDependencies(self, change, event): log = get_annotated_logger(self.log, event) must_update_commit_deps = ( @@ -1448,16 +1460,17 @@ class PipelineManager(metaclass=ABCMeta): item.bundle and item.bundle.updatesConfig(tenant) and tpc is not None ): - extra_config_files = set(tpc.extra_config_files) - extra_config_dirs = set(tpc.extra_config_dirs) - # Merge extra_config_files and extra_config_dirs of the - # dependent change - for item_ahead in item.items_ahead: - tpc_ahead = tenant.project_configs.get( - item_ahead.change.project.canonical_name) - if tpc_ahead: - extra_config_files.update(tpc_ahead.extra_config_files) - extra_config_dirs.update(tpc_ahead.extra_config_dirs) + # Collect extra config files and dirs of required changes. + extra_config_files = set() + extra_config_dirs = set() + for merger_item in item.current_build_set.merger_items: + source = self.sched.connections.getSource( + merger_item["connection"]) + project = source.getProject(merger_item["project"]) + tpc = tenant.project_configs.get(project.canonical_name) + if tpc: + extra_config_files.update(tpc.extra_config_files) + extra_config_dirs.update(tpc.extra_config_dirs) ready = self.scheduleMerge( item, @@ -1554,6 +1567,7 @@ class PipelineManager(metaclass=ABCMeta): log.info("Dequeuing change %s because " "it can no longer merge" % item.change) self.cancelJobs(item) + quiet_dequeue = False if item.isBundleFailing(): item.setDequeuedBundleFailing('Bundle is failing') elif not meets_reqs: @@ -1565,7 +1579,28 @@ class PipelineManager(metaclass=ABCMeta): else: msg = f'Change {clist} is needed.' item.setDequeuedNeedingChange(msg) - if item.live: + # If all the dependencies are already in the pipeline + # (but not ahead of this change), then we probably + # just added updated versions of them, possibly + # updating a cycle. In that case, attempt to + # re-enqueue this change with the updated deps. + if (item.live and + all([self.isChangeAlreadyInPipeline(c) + for c in needs_changes])): + # Try enqueue, if that succeeds, keep this dequeue quiet + try: + log.info("Attempting re-enqueue of change %s", + item.change) + quiet_dequeue = self.addChange( + item.change, item.event, + enqueue_time=item.enqueue_time, + quiet=True, + skip_presence_check=True) + except Exception: + log.exception("Unable to re-enqueue change %s " + "which is missing dependencies", + item.change) + if item.live and not quiet_dequeue: try: self.reportItem(item) except exceptions.MergeFailure: @@ -2197,7 +2232,7 @@ class PipelineManager(metaclass=ABCMeta): log.error("Reporting item %s received: %s", item, ret) return action, (not ret) - def reportStats(self, item, added=False): + def reportStats(self, item, trigger_event=None): if not self.sched.statsd: return try: @@ -2236,18 +2271,21 @@ class PipelineManager(metaclass=ABCMeta): if dt: self.sched.statsd.timing(key + '.resident_time', dt) self.sched.statsd.incr(key + '.total_changes') - if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'): + if ( + trigger_event + and hasattr(trigger_event, 'arrived_at_scheduler_timestamp') + ): now = time.time() - arrived = item.event.arrived_at_scheduler_timestamp + arrived = trigger_event.arrived_at_scheduler_timestamp processing = (now - arrived) * 1000 - elapsed = (now - item.event.timestamp) * 1000 + elapsed = (now - trigger_event.timestamp) * 1000 self.sched.statsd.timing( basekey + '.event_enqueue_processing_time', processing) self.sched.statsd.timing( basekey + '.event_enqueue_time', elapsed) self.reportPipelineTiming('event_enqueue_time', - item.event.timestamp) + trigger_event.timestamp) except Exception: self.log.exception("Exception reporting pipeline stats") diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index e4688a1b7..845925bfa 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -595,14 +595,32 @@ class Repo(object): log = get_annotated_logger(self.log, zuul_event_id) repo = self.createRepoObject(zuul_event_id) self.fetch(ref, zuul_event_id=zuul_event_id) - if len(repo.commit("FETCH_HEAD").parents) > 1: + fetch_head = repo.commit("FETCH_HEAD") + if len(fetch_head.parents) > 1: args = ["-s", "resolve", "FETCH_HEAD"] log.debug("Merging %s with args %s instead of cherry-picking", ref, args) repo.git.merge(*args) else: log.debug("Cherry-picking %s", ref) - repo.git.cherry_pick("FETCH_HEAD") + # Git doesn't have an option to ignore commits that are already + # applied to the working tree when cherry-picking, so pass the + # --keep-redundant-commits option, which will cause it to make an + # empty commit + repo.git.cherry_pick("FETCH_HEAD", keep_redundant_commits=True) + + # If the newly applied commit is empty, it means either: + # 1) The commit being cherry-picked was empty, in which the empty + # commit should be kept + # 2) The commit being cherry-picked was already applied to the + # tree, in which case the empty commit should be backed out + head = repo.commit("HEAD") + parent = head.parents[0] + if not any(head.diff(parent)) and \ + any(fetch_head.diff(fetch_head.parents[0])): + log.debug("%s was already applied. Removing it", ref) + self._checkout(repo, parent) + return repo.head.commit def merge(self, ref, strategy=None, zuul_event_id=None): @@ -704,9 +722,11 @@ class Repo(object): ret = {} repo = self.createRepoObject(zuul_event_id) if branch: - tree = repo.heads[branch].commit.tree + head = repo.heads[branch].commit else: - tree = repo.commit(commit).tree + head = repo.commit(commit) + log.debug("Getting files for %s at %s", self.local_path, head.hexsha) + tree = head.tree for fn in files: if fn in tree: if tree[fn].type != 'blob': diff --git a/zuul/model.py b/zuul/model.py index 1d82b5f2c..e36f9d670 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1408,6 +1408,7 @@ class Node(ConfigObject): self.private_ipv6 = None self.connection_port = 22 self.connection_type = None + self.slot = None self._keys = [] self.az = None self.provider = None @@ -3006,20 +3007,30 @@ class Job(ConfigObject): # possibility of success, which may help prevent errors in # most cases. If we don't raise an error here, the # possibility of later failure still remains. - nonfinal_parents = [p for p in parents if not p.final] - if not nonfinal_parents: + nonfinal_parent_found = False + nonintermediate_parent_found = False + nonprotected_parent_found = False + for p in parents: + if not p.final: + nonfinal_parent_found = True + if not p.intermediate: + nonintermediate_parent_found = True + if not p.protected: + nonprotected_parent_found = True + if (nonfinal_parent_found and + nonintermediate_parent_found and + nonprotected_parent_found): + break + + if not nonfinal_parent_found: raise Exception( f'The parent of job "{self.name}", "{self.parent}" ' 'is final and can not act as a parent') - nonintermediate_parents = [ - p for p in parents if not p.intermediate] - if not nonintermediate_parents and not self.abstract: + if not nonintermediate_parent_found and not self.abstract: raise Exception( f'The parent of job "{self.name}", "{self.parent}" ' f'is intermediate but "{self.name}" is not abstract') - nonprotected_parents = [ - p for p in parents if not p.protected] - if (not nonprotected_parents and + if (not nonprotected_parent_found and parents[0].source_context.project_canonical_name != self.source_context.project_canonical_name): raise Exception( @@ -4666,6 +4677,37 @@ class BuildSet(zkobject.ZKObject): return Attributes(uuid=self.uuid) +class EventInfo: + + def __init__(self): + self.zuul_event_id = None + self.timestamp = time.time() + self.span_context = None + + @classmethod + def fromEvent(cls, event): + tinfo = cls() + tinfo.zuul_event_id = event.zuul_event_id + tinfo.timestamp = event.timestamp + tinfo.span_context = event.span_context + return tinfo + + @classmethod + def fromDict(cls, d): + tinfo = cls() + tinfo.zuul_event_id = d["zuul_event_id"] + tinfo.timestamp = d["timestamp"] + tinfo.span_context = d["span_context"] + return tinfo + + def toDict(self): + return { + "zuul_event_id": self.zuul_event_id, + "timestamp": self.timestamp, + "span_context": self.span_context, + } + + class QueueItem(zkobject.ZKObject): """Represents the position of a Change in a ChangeQueue. @@ -4700,7 +4742,7 @@ class QueueItem(zkobject.ZKObject): live=True, # Whether an item is intended to be processed at all layout_uuid=None, _cached_sql_results={}, - event=None, # The trigger event that lead to this queue item + event=None, # Info about the event that lead to this queue item # Additional container for connection specifig information to be # used by reporters throughout the lifecycle @@ -4722,6 +4764,9 @@ class QueueItem(zkobject.ZKObject): def new(klass, context, **kw): obj = klass() obj._set(**kw) + if COMPONENT_REGISTRY.model_api >= 13: + obj._set(event=obj.event and EventInfo.fromEvent(obj.event)) + data = obj._trySerialize(context) obj._save(context, data, create=True) files_state = (BuildSet.COMPLETE if obj.change.files is not None @@ -4750,10 +4795,18 @@ class QueueItem(zkobject.ZKObject): return (tenant, pipeline, uuid) def serialize(self, context): - if isinstance(self.event, TriggerEvent): - event_type = "TriggerEvent" + if COMPONENT_REGISTRY.model_api < 13: + if isinstance(self.event, TriggerEvent): + event_type = "TriggerEvent" + else: + event_type = self.event.__class__.__name__ else: - event_type = self.event.__class__.__name__ + event_type = "EventInfo" + if not isinstance(self.event, EventInfo): + # Convert our local trigger event to a trigger info + # object. This will only happen on the transition to + # model API version 13. + self._set(event=EventInfo.fromEvent(self.event)) data = { "uuid": self.uuid, @@ -4795,14 +4848,18 @@ class QueueItem(zkobject.ZKObject): # child objects. self._set(uuid=data["uuid"]) - event_type = data["event"]["type"] - if event_type == "TriggerEvent": - event_class = ( - self.pipeline.manager.sched.connections.getTriggerEventClass( - data["event"]["data"]["driver_name"]) - ) + if COMPONENT_REGISTRY.model_api < 13: + event_type = data["event"]["type"] + if event_type == "TriggerEvent": + event_class = ( + self.pipeline.manager.sched.connections + .getTriggerEventClass( + data["event"]["data"]["driver_name"]) + ) + else: + event_class = EventTypeIndex.event_type_mapping.get(event_type) else: - event_class = EventTypeIndex.event_type_mapping.get(event_type) + event_class = EventInfo if event_class is None: raise NotImplementedError( @@ -7740,31 +7797,33 @@ class Layout(object): def addJob(self, job): # We can have multiple variants of a job all with the same # name, but these variants must all be defined in the same repo. - prior_jobs = [j for j in self.getJobs(job.name) if - j.source_context.project_canonical_name != - job.source_context.project_canonical_name] # Unless the repo is permitted to shadow another. If so, and # the job we are adding is from a repo that is permitted to # shadow the one with the older jobs, skip adding this job. job_project = job.source_context.project_canonical_name job_tpc = self.tenant.project_configs[job_project] skip_add = False - for prior_job in prior_jobs[:]: - prior_project = prior_job.source_context.project_canonical_name - if prior_project in job_tpc.shadow_projects: - prior_jobs.remove(prior_job) - skip_add = True - + prior_jobs = self.jobs.get(job.name, []) if prior_jobs: - raise Exception("Job %s in %s is not permitted to shadow " - "job %s in %s" % ( - job, - job.source_context.project_name, - prior_jobs[0], - prior_jobs[0].source_context.project_name)) + # All jobs we've added so far should be from the same + # project, so pick the first one. + prior_job = prior_jobs[0] + if (prior_job.source_context.project_canonical_name != + job.source_context.project_canonical_name): + prior_project = prior_job.source_context.project_canonical_name + if prior_project in job_tpc.shadow_projects: + skip_add = True + else: + raise Exception("Job %s in %s is not permitted to shadow " + "job %s in %s" % ( + job, + job.source_context.project_name, + prior_job, + prior_job.source_context.project_name)) + if skip_add: return False - if job.name in self.jobs: + if prior_jobs: self.jobs[job.name].append(job) else: self.jobs[job.name] = [job] diff --git a/zuul/model_api.py b/zuul/model_api.py index ccb12077d..0244296dd 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -14,4 +14,4 @@ # When making ZK schema changes, increment this and add a record to # doc/source/developer/model-changelog.rst -MODEL_API = 12 +MODEL_API = 13 diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e646c09b8..cd15a878c 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2519,9 +2519,26 @@ class Scheduler(threading.Thread): event.span_context = tracing.getSpanContext(span) for pipeline in tenant.layout.pipelines.values(): + # For most kinds of dependencies, it's sufficient to check + # if this change is already in the pipeline, because the + # only way to update a dependency cycle is to update one + # of the changes in it. However, dependencies-by-topic + # can have changes added to the cycle without updating any + # of the existing changes in the cycle. That means in + # order to detect whether a new change is added to an + # existing cycle in the pipeline, we need to know all of + # the dependencies of the new change, and check if *they* + # are in the pipeline. Therefore, go ahead and update our + # dependencies here so they are available for comparison + # against the pipeline contents. This front-loads some + # work that otherwise would happen in the pipeline + # manager, but the result of the work goes into the change + # cache, so it's not wasted; it's just less parallelized. + if isinstance(change, Change): + pipeline.manager.updateCommitDependencies(change, event) if ( pipeline.manager.eventMatches(event, change) - or pipeline.manager.isAnyVersionOfChangeInPipeline(change) + or pipeline.manager.isChangeRelevantToPipeline(change) ): self.pipeline_trigger_events[tenant.name][ pipeline.name diff --git a/zuul/zk/job_request_queue.py b/zuul/zk/job_request_queue.py index 175c57b90..7c85ae95e 100644 --- a/zuul/zk/job_request_queue.py +++ b/zuul/zk/job_request_queue.py @@ -609,7 +609,7 @@ class JobRequestQueue(ZooKeeperSimpleBase): self.kazoo_client.delete(lock_path, recursive=True) except Exception: self.log.exception( - "Unable to delete lock %s", path) + "Unable to delete lock %s", lock_path) except Exception: self.log.exception("Error cleaning up locks %s", self) |