# Copyright 2015 Hewlett-Packard Development Company, L.P. # Copyright 2017 IBM Corp. # Copyright 2017 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import copy import re import re2 import time from zuul.model import Change, TriggerEvent, EventFilter, RefFilter from zuul.model import FalseWithReason from zuul.driver.util import time_to_seconds EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes class PullRequest(Change): def __init__(self, project): super(PullRequest, self).__init__(project) self.pr = None self.updated_at = None self.title = None self.body_text = None self.reviews = [] self.files = [] self.labels = [] self.draft = None self.review_decision = None self.required_contexts = set() self.contexts = set() self.branch_protected = False @property def status(self): return ["{}:{}:{}".format(*c) for c in self.contexts] @property def successful_contexts(self) -> set: if not self.contexts: return set() return set( s[1] for s in self.contexts if s[2] == 'success' ) def isUpdateOf(self, other): if (self.project == other.project and hasattr(other, 'number') and self.number == other.number and hasattr(other, 'patchset') and self.patchset != other.patchset and hasattr(other, 'updated_at') and self.updated_at > other.updated_at): return True return False def serialize(self): d = super().serialize() d.update({ "pr": self.pr, "updated_at": self.updated_at, "title": self.title, "body_text": self.body_text, "reviews": list(self.reviews), "labels": self.labels, "draft": self.draft, "review_decision": self.review_decision, "required_contexts": list(self.required_contexts), "contexts": list(self.contexts), "branch_protected": self.branch_protected, }) return d def deserialize(self, data): super().deserialize(data) self.pr = data.get("pr") self.updated_at = data.get("updated_at") self.title = data.get("title") self.body_text = data.get("body_text") self.reviews = data.get("reviews", []) self.labels = data.get("labels", []) self.draft = data.get("draft") self.review_decision = data.get("review_decision") self.required_contexts = set(data.get("required_contexts", [])) self.contexts = set(tuple(c) for c in data.get("contexts", [])) self.branch_protected = data.get("branch_protected", False) class GithubTriggerEvent(TriggerEvent): def __init__(self): super(GithubTriggerEvent, self).__init__() self.title = None self.label = None self.unlabel = None self.action = None self.delivery = None self.check_run = None self.status = None self.commits = [] self.body_edited = None self.branch_protection_changed = None def toDict(self): d = super().toDict() d["title"] = self.title d["label"] = self.label d["unlabel"] = self.unlabel d["action"] = self.action d["delivery"] = self.delivery d["check_run"] = self.check_run d["status"] = self.status d["commits"] = self.commits d["body_edited"] = self.body_edited d["branch_protection_changed"] = self.branch_protection_changed return d def updateFromDict(self, d): super().updateFromDict(d) self.title = d["title"] self.label = d["label"] self.unlabel = d["unlabel"] self.action = d["action"] self.delivery = d["delivery"] self.check_run = d["check_run"] self.status = d["status"] self.commits = d["commits"] self.body_edited = d["body_edited"] self.branch_protection_changed = d.get("branch_protection_changed") def isBranchProtectionChanged(self): return bool(self.branch_protection_changed) def isPatchsetCreated(self): if self.type == 'pull_request': return self.action in ['opened', 'changed'] return False def isMessageChanged(self): return bool(self.body_edited) def isChangeAbandoned(self): if self.type == 'pull_request': return 'closed' == self.action return False def _repr(self): r = [super(GithubTriggerEvent, self)._repr()] if self.action: r.append(self.action) r.append(self.canonical_project_name) if self.change_number: r.append('%s,%s' % (self.change_number, self.patch_number)) if self.delivery: r.append('delivery: %s' % self.delivery) if self.check_run: r.append('check_run: %s' % self.check_run) return ' '.join(r) class GithubCommonFilter(object): def __init__(self, required_reviews=[], required_statuses=[], reject_reviews=[], reject_statuses=[]): self._required_reviews = copy.deepcopy(required_reviews) self._reject_reviews = copy.deepcopy(reject_reviews) self.required_reviews = self._tidy_reviews(self._required_reviews) self.reject_reviews = self._tidy_reviews(self._reject_reviews) self.required_statuses = required_statuses self.reject_statuses = reject_statuses def _tidy_reviews(self, reviews): for r in reviews: for k, v in r.items(): if k == 'username': r['username'] = re.compile(v) elif k == 'email': r['email'] = re.compile(v) elif k == 'newer-than': r[k] = time_to_seconds(v) elif k == 'older-than': r[k] = time_to_seconds(v) return reviews def _match_review_required_review(self, rreview, review): # Check if the required review and review match now = time.time() by = review.get('by', {}) for k, v in rreview.items(): if k == 'username': if (not v.search(by.get('username', ''))): return False elif k == 'email': if (not v.search(by.get('email', ''))): return False elif k == 'newer-than': t = now - v if (review['grantedOn'] < t): return False elif k == 'older-than': t = now - v if (review['grantedOn'] >= t): return False elif k == 'type': if review['type'] != v: return False elif k == 'permission': # If permission is read, we've matched. You must have read # to provide a review. if v != 'read': # Admins have implicit write. if v == 'write': if review['permission'] not in ('write', 'admin'): return False elif v == 'admin': if review['permission'] != 'admin': return False return True def matchesReviews(self, change): if self.required_reviews or self.reject_reviews: if not hasattr(change, 'number'): # not a PR, no reviews return FalseWithReason("Change is not a PR") if self.required_reviews and not change.reviews: # No reviews means no matching of required bits # having reject reviews but no reviews on the change is okay return FalseWithReason("Reviews %s does not match %s" % ( self.required_reviews, change.reviews)) return (self.matchesRequiredReviews(change) and self.matchesNoRejectReviews(change)) def matchesRequiredReviews(self, change): for rreview in self.required_reviews: matches_review = False for review in change.reviews: if self._match_review_required_review(rreview, review): # Consider matched if any review matches matches_review = True break if not matches_review: return FalseWithReason( "Required reviews %s does not match %s" % ( self.required_reviews, change.reviews)) return True def matchesNoRejectReviews(self, change): for rreview in self.reject_reviews: for review in change.reviews: if self._match_review_required_review(rreview, review): # A review matched, we can reject right away return FalseWithReason("Reject reviews %s matches %s" % ( self.reject_reviews, change.reviews)) return True def matchesStatuses(self, change): if self.required_statuses or self.reject_statuses: if not hasattr(change, 'number'): # not a PR, no status return FalseWithReason("Can't match statuses without PR") if self.required_statuses and not change.status: return FalseWithReason( "Required statuses %s does not match %s" % ( self.required_statuses, change.status)) required_statuses_results = self.matchesRequiredStatuses(change) if not required_statuses_results: return required_statuses_results return self.matchesNoRejectStatuses(change) def matchesRequiredStatuses(self, change): # statuses are ORed # A PR head can have multiple statuses on it. If the change # statuses and the filter statuses are a null intersection, there # are no matches and we return false if self.required_statuses: for required_status in self.required_statuses: for status in change.status: if re2.fullmatch(required_status, status): return True return FalseWithReason("RequiredStatuses %s does not match %s" % ( self.required_statuses, change.status)) return True def matchesNoRejectStatuses(self, change): # statuses are ANDed # If any of the rejected statusses are present, we return false for rstatus in self.reject_statuses: for status in change.status: if re2.fullmatch(rstatus, status): return FalseWithReason("NoRejectStatuses %s matches %s" % ( self.reject_statuses, change.status)) return True class GithubEventFilter(EventFilter, GithubCommonFilter): def __init__(self, connection_name, trigger, types=[], branches=[], refs=[], comments=[], actions=[], labels=[], unlabels=[], states=[], statuses=[], required_statuses=[], check_runs=[], ignore_deletes=True): EventFilter.__init__(self, connection_name, trigger) GithubCommonFilter.__init__(self, required_statuses=required_statuses) self._types = types self._branches = branches self._refs = refs self._comments = comments self.types = [re.compile(x) for x in types] self.branches = [re.compile(x) for x in branches] self.refs = [re.compile(x) for x in refs] self.comments = [re.compile(x) for x in comments] self.actions = actions self.labels = labels self.unlabels = unlabels self.states = states self.statuses = statuses self.required_statuses = required_statuses self.check_runs = check_runs self.ignore_deletes = ignore_deletes def __repr__(self): ret = '