diff options
-rw-r--r-- | doc/source/developer/model-changelog.rst | 7 | ||||
-rw-r--r-- | tests/base.py | 15 | ||||
-rw-r--r-- | tests/fakegitlab.py | 24 | ||||
-rw-r--r-- | tests/fixtures/layouts/deps-by-topic.yaml | 13 | ||||
-rw-r--r-- | tests/unit/test_circular_dependencies.py | 93 | ||||
-rw-r--r-- | tests/unit/test_client.py | 158 | ||||
-rw-r--r-- | tests/unit/test_github_driver.py | 35 | ||||
-rw-r--r-- | tests/unit/test_gitlab_driver.py | 61 | ||||
-rw-r--r-- | tests/unit/test_model_upgrade.py | 27 | ||||
-rwxr-xr-x | zuul/cmd/client.py | 38 | ||||
-rw-r--r-- | zuul/driver/gerrit/gerritconnection.py | 2 | ||||
-rw-r--r-- | zuul/driver/github/githubconnection.py | 6 | ||||
-rw-r--r-- | zuul/driver/gitlab/gitlabconnection.py | 44 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 71 | ||||
-rw-r--r-- | zuul/model.py | 68 | ||||
-rw-r--r-- | zuul/model_api.py | 2 | ||||
-rw-r--r-- | zuul/scheduler.py | 19 |
17 files changed, 532 insertions, 151 deletions
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index b80979362..f78b3f0a0 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -112,3 +112,10 @@ Version 12 :Prior Zuul version: 8.0.1 :Description: Adds job_versions and build_versions to BuildSet. Affects schedulers. + +Version 13 +---------- +:Prior Zuul version: 8.2.0 +:Description: Stores only the necessary event info as part of a queue item + instead of the full trigger event. + Affects schedulers. diff --git a/tests/base.py b/tests/base.py index dcd316fab..fd927a92c 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2118,6 +2118,21 @@ class FakeGitlabConnection(gitlabconnection.GitlabConnection): yield self._test_web_server.options['community_edition'] = False + @contextmanager + def enable_delayed_complete_mr(self, complete_at): + self._test_web_server.options['delayed_complete_mr'] = complete_at + yield + self._test_web_server.options['delayed_complete_mr'] = 0 + + @contextmanager + def enable_uncomplete_mr(self): + self._test_web_server.options['uncomplete_mr'] = True + orig = self.gl_client.get_mr_wait_factor + self.gl_client.get_mr_wait_factor = 0.1 + yield + self.gl_client.get_mr_wait_factor = orig + self._test_web_server.options['uncomplete_mr'] = False + class GitlabChangeReference(git.Reference): _common_path_default = "refs/merge-requests" diff --git a/tests/fakegitlab.py b/tests/fakegitlab.py index 294887af0..e4a3e1ac8 100644 --- a/tests/fakegitlab.py +++ b/tests/fakegitlab.py @@ -21,6 +21,7 @@ import re import socketserver import threading import urllib.parse +import time from git.util import IterableList @@ -32,12 +33,17 @@ class GitlabWebServer(object): self.merge_requests = merge_requests self.fake_repos = defaultdict(lambda: IterableList('name')) # A dictionary so we can mutate it - self.options = dict(community_edition=False) + self.options = dict( + community_edition=False, + delayed_complete_mr=0, + uncomplete_mr=False) + self.stats = {"get_mr": 0} def start(self): merge_requests = self.merge_requests fake_repos = self.fake_repos options = self.options + stats = self.stats class Server(http.server.SimpleHTTPRequestHandler): log = logging.getLogger("zuul.test.GitlabWebServer") @@ -146,6 +152,7 @@ class GitlabWebServer(object): self.wfile.write(data) def get_mr(self, project, mr): + stats["get_mr"] += 1 mr = self._get_mr(project, mr) data = { 'target_branch': mr.branch, @@ -162,13 +169,20 @@ class GitlabWebServer(object): 'labels': mr.labels, 'merged_at': mr.merged_at.strftime('%Y-%m-%dT%H:%M:%S.%fZ') if mr.merged_at else mr.merged_at, - 'diff_refs': { + 'merge_status': mr.merge_status, + } + if options['delayed_complete_mr'] and \ + time.monotonic() < options['delayed_complete_mr']: + diff_refs = None + elif options['uncomplete_mr']: + diff_refs = None + else: + diff_refs = { 'base_sha': mr.base_sha, 'head_sha': mr.sha, 'start_sha': 'c380d3acebd181f13629a25d2e2acca46ffe1e00' - }, - 'merge_status': mr.merge_status, - } + } + data['diff_refs'] = diff_refs self.send_data(data) def get_mr_approvals(self, project, mr): diff --git a/tests/fixtures/layouts/deps-by-topic.yaml b/tests/fixtures/layouts/deps-by-topic.yaml index 3824c5c2c..e7e8fc465 100644 --- a/tests/fixtures/layouts/deps-by-topic.yaml +++ b/tests/fixtures/layouts/deps-by-topic.yaml @@ -47,24 +47,27 @@ run: playbooks/run.yaml - job: - name: test-job + name: check-job + +- job: + name: gate-job - project: name: org/project1 queue: integrated check: jobs: - - test-job + - check-job gate: jobs: - - test-job + - gate-job - project: name: org/project2 queue: integrated check: jobs: - - test-job + - check-job gate: jobs: - - test-job + - gate-job diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index f534b2596..a3f9dda33 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -2267,8 +2267,8 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertEqual(B.patchsets[-1]["approvals"][0]["value"], "1") self.assertHistory([ - dict(name="test-job", result="SUCCESS", changes="2,1 1,1"), - dict(name="test-job", result="SUCCESS", changes="1,1 2,1"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="check-job", result="SUCCESS", changes="1,1 2,1"), ], ordered=False) A.addPatchset() @@ -2277,10 +2277,10 @@ class TestGerritCircularDependencies(ZuulTestCase): self.assertHistory([ # Original check run - dict(name="test-job", result="SUCCESS", changes="2,1 1,1"), - dict(name="test-job", result="SUCCESS", changes="1,1 2,1"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="check-job", result="SUCCESS", changes="1,1 2,1"), # Second check run - dict(name="test-job", result="SUCCESS", changes="2,1 1,2"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,2"), ], ordered=False) def test_deps_by_topic_multi_tenant(self): @@ -2368,16 +2368,85 @@ class TestGerritCircularDependencies(ZuulTestCase): self.executor_server.release() self.waitUntilSettled() - # A quirk: at the end of this process, the first change in - # Gerrit has a complete run because the process of updating it - # involves a new patchset that is enqueued. Compare to the - # same test in GitHub. self.assertHistory([ dict(name="project-job", result="ABORTED", changes="1,1"), dict(name="project-job", result="ABORTED", changes="1,1 2,1"), + dict(name="project-job", result="SUCCESS", changes="1,2 2,1"), dict(name="project-job", result="SUCCESS", changes="2,1 1,2"), ], ordered=False) + @simple_layout('layouts/deps-by-topic.yaml') + def test_dependency_refresh_by_topic_check(self): + # Test that when two changes are put into a cycle, the + # dependencies are refreshed and items already in pipelines + # are updated. + self.executor_server.hold_jobs_in_build = True + + # This simulates the typical workflow where a developer + # uploads changes one at a time. + # The first change: + A = self.fake_gerrit.addFakeChange('org/project1', "master", "A", + topic='test-topic') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Now that it has been uploaded, upload the second change + # in the same topic. + B = self.fake_gerrit.addFakeChange('org/project2', "master", "B", + topic='test-topic') + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertHistory([ + dict(name="check-job", result="ABORTED", changes="1,1"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,1"), + dict(name="check-job", result="SUCCESS", changes="1,1 2,1"), + ], ordered=False) + + @simple_layout('layouts/deps-by-topic.yaml') + def test_dependency_refresh_by_topic_gate(self): + # Test that when two changes are put into a cycle, the + # dependencies are refreshed and items already in pipelines + # are updated. + self.executor_server.hold_jobs_in_build = True + + # This simulates a workflow where a developer adds a change to + # a cycle already in gate. + A = self.fake_gerrit.addFakeChange('org/project1', "master", "A", + topic='test-topic') + B = self.fake_gerrit.addFakeChange('org/project2', "master", "B", + topic='test-topic') + A.addApproval("Code-Review", 2) + B.addApproval("Code-Review", 2) + A.addApproval("Approved", 1) + self.fake_gerrit.addEvent(B.addApproval("Approved", 1)) + self.waitUntilSettled() + + # Add a new change to the cycle. + C = self.fake_gerrit.addFakeChange('org/project1', "master", "C", + topic='test-topic') + self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + # At the end of this process, the gate jobs should be aborted + # because the new dpendency showed up. + self.assertEqual(A.data["status"], "NEW") + self.assertEqual(B.data["status"], "NEW") + self.assertEqual(C.data["status"], "NEW") + self.assertHistory([ + dict(name="gate-job", result="ABORTED", changes="1,1 2,1"), + dict(name="gate-job", result="ABORTED", changes="1,1 2,1"), + dict(name="check-job", result="SUCCESS", changes="2,1 1,1 3,1"), + ], ordered=False) + class TestGithubCircularDependencies(ZuulTestCase): config_file = "zuul-gerrit-github.conf" @@ -2607,13 +2676,11 @@ class TestGithubCircularDependencies(ZuulTestCase): self.executor_server.release() self.waitUntilSettled() - # A quirk: at the end of this process, the second PR in GitHub - # has a complete run because the process of updating the first - # change is not disruptive to the second. Compare to the same - # test in Gerrit. self.assertHistory([ dict(name="project-job", result="ABORTED", changes=f"{A.number},{A.head_sha}"), dict(name="project-job", result="SUCCESS", changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"), + dict(name="project-job", result="SUCCESS", + changes=f"{B.number},{B.head_sha} {A.number},{A.head_sha}"), ], ordered=False) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index b51639952..f241147eb 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -27,10 +27,11 @@ import jwt import testtools from zuul.zk import ZooKeeperClient +from zuul.zk.locks import SessionAwareLock from zuul.cmd.client import parse_cutoff from tests.base import BaseTestCase, ZuulTestCase -from tests.base import FIXTURE_DIR +from tests.base import FIXTURE_DIR, iterate_timeout from kazoo.exceptions import NoNodeError @@ -362,81 +363,112 @@ class TestOnlineZKOperations(ZuulTestCase): def assertSQLState(self): pass - def test_delete_pipeline_check(self): - self.executor_server.hold_jobs_in_build = True - A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) - self.waitUntilSettled() - - config_file = os.path.join(self.test_root, 'zuul.conf') - with open(config_file, 'w') as f: - self.config.write(f) - - # Make sure the pipeline exists - self.getZKTree('/zuul/tenant/tenant-one/pipeline/check/item') - p = subprocess.Popen( - [os.path.join(sys.prefix, 'bin/zuul-admin'), - '-c', config_file, - 'delete-pipeline-state', - 'tenant-one', 'check', - ], - stdout=subprocess.PIPE) - out, _ = p.communicate() - self.log.debug(out.decode('utf8')) - # Make sure it's deleted - with testtools.ExpectedException(NoNodeError): - self.getZKTree('/zuul/tenant/tenant-one/pipeline/check/item') - - self.executor_server.hold_jobs_in_build = False - self.executor_server.release() - B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') - self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + def _test_delete_pipeline(self, pipeline): + sched = self.scheds.first.sched + tenant = sched.abide.tenants['tenant-one'] + # Force a reconfiguration due to a config change (so that the + # tenant trigger event queue gets a minimum timestamp set) + file_dict = {'zuul.yaml': ''} + M = self.fake_gerrit.addFakeChange('org/project', 'master', 'A', + files=file_dict) + M.setMerged() + self.fake_gerrit.addEvent(M.getChangeMergedEvent()) self.waitUntilSettled() - self.assertHistory([ - dict(name='project-merge', result='SUCCESS', changes='1,1'), - dict(name='project-merge', result='SUCCESS', changes='2,1'), - dict(name='project-test1', result='SUCCESS', changes='2,1'), - dict(name='project-test2', result='SUCCESS', changes='2,1'), - ], ordered=False) - def test_delete_pipeline_gate(self): self.executor_server.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - A.addApproval('Code-Review', 2) - self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) + if pipeline == 'check': + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + else: + A.addApproval('Code-Review', 2) + self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) self.waitUntilSettled() - config_file = os.path.join(self.test_root, 'zuul.conf') - with open(config_file, 'w') as f: - self.config.write(f) - - # Make sure the pipeline exists - self.getZKTree('/zuul/tenant/tenant-one/pipeline/gate/item') - p = subprocess.Popen( - [os.path.join(sys.prefix, 'bin/zuul-admin'), - '-c', config_file, - 'delete-pipeline-state', - 'tenant-one', 'gate', - ], - stdout=subprocess.PIPE) - out, _ = p.communicate() - self.log.debug(out.decode('utf8')) - # Make sure it's deleted - with testtools.ExpectedException(NoNodeError): - self.getZKTree('/zuul/tenant/tenant-one/pipeline/gate/item') + # Lock the check pipeline so we don't process the event we're + # about to submit (it should go into the pipeline trigger event + # queue and stay there while we delete the pipeline state). + # This way we verify that events arrived before the deletion + # still work. + plock = SessionAwareLock( + self.zk_client.client, + f"/zuul/locks/pipeline/{tenant.name}/{pipeline}") + plock.acquire(blocking=True, timeout=None) + try: + self.log.debug('Got pipeline lock') + # Add a new event while our old last reconfigure time is + # in place. + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + if pipeline == 'check': + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + else: + B.addApproval('Code-Review', 2) + self.fake_gerrit.addEvent(B.addApproval('Approved', 1)) + + # Wait until it appears in the pipeline trigger event queue + self.log.debug('Waiting for event') + for x in iterate_timeout(30, 'trigger event queue has events'): + if sched.pipeline_trigger_events[ + tenant.name][pipeline].hasEvents(): + break + self.log.debug('Got event') + except Exception: + plock.release() + raise + # Grab the run handler lock so that we will continue to avoid + # further processing of the event after we release the + # pipeline lock (which the delete command needs to acquire). + sched.run_handler_lock.acquire() + try: + plock.release() + self.log.debug('Got run lock') + config_file = os.path.join(self.test_root, 'zuul.conf') + with open(config_file, 'w') as f: + self.config.write(f) + + # Make sure the pipeline exists + self.getZKTree( + f'/zuul/tenant/{tenant.name}/pipeline/{pipeline}/item') + # Save the old layout uuid + tenant = sched.abide.tenants[tenant.name] + old_layout_uuid = tenant.layout.uuid + self.log.debug('Deleting pipeline state') + p = subprocess.Popen( + [os.path.join(sys.prefix, 'bin/zuul-admin'), + '-c', config_file, + 'delete-pipeline-state', + tenant.name, pipeline, + ], + stdout=subprocess.PIPE) + # Delete the pipeline state + out, _ = p.communicate() + self.log.debug(out.decode('utf8')) + # Make sure it's deleted + with testtools.ExpectedException(NoNodeError): + self.getZKTree( + f'/zuul/tenant/{tenant.name}/pipeline/{pipeline}/item') + finally: + sched.run_handler_lock.release() self.executor_server.hold_jobs_in_build = False self.executor_server.release() - B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') - B.addApproval('Code-Review', 2) - self.fake_gerrit.addEvent(B.addApproval('Approved', 1)) self.waitUntilSettled() self.assertHistory([ - dict(name='project-merge', result='SUCCESS', changes='1,1'), dict(name='project-merge', result='SUCCESS', changes='2,1'), - dict(name='project-test1', result='SUCCESS', changes='2,1'), - dict(name='project-test2', result='SUCCESS', changes='2,1'), + dict(name='project-merge', result='SUCCESS', changes='3,1'), + dict(name='project-test1', result='SUCCESS', changes='3,1'), + dict(name='project-test2', result='SUCCESS', changes='3,1'), ], ordered=False) + tenant = sched.abide.tenants[tenant.name] + new_layout_uuid = tenant.layout.uuid + self.assertEqual(old_layout_uuid, new_layout_uuid) + self.assertEqual(tenant.layout.pipelines[pipeline].state.layout_uuid, + old_layout_uuid) + + def test_delete_pipeline_check(self): + self._test_delete_pipeline('check') + + def test_delete_pipeline_gate(self): + self._test_delete_pipeline('gate') class TestDBPruneParse(BaseTestCase): diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py index fb46aa7d1..47e84ca7f 100644 --- a/tests/unit/test_github_driver.py +++ b/tests/unit/test_github_driver.py @@ -1741,6 +1741,41 @@ class TestGithubUnprotectedBranches(ZuulTestCase): # branch self.assertLess(old, new) + def test_base_branch_updated(self): + self.create_branch('org/project2', 'feature') + github = self.fake_github.getGithubClient() + repo = github.repo_from_project('org/project2') + repo._set_branch_protection('master', True) + + # Make sure Zuul picked up and cached the configured branches + self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) + self.waitUntilSettled() + + github_connection = self.scheds.first.connections.connections['github'] + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + project = github_connection.source.getProject('org/project2') + + # Verify that only the master branch is considered protected + branches = github_connection.getProjectBranches(project, tenant) + self.assertEqual(branches, ["master"]) + + A = self.fake_github.openFakePullRequest('org/project2', 'master', + 'A') + # Fake an event from a pull-request that changed the base + # branch from "feature" to "master". The PR is already + # using "master" as base, but the event still references + # the old "feature" branch. + event = A.getPullRequestOpenedEvent() + event[1]["pull_request"]["base"]["ref"] = "feature" + + self.fake_github.emitEvent(event) + self.waitUntilSettled() + + # Make sure we are still only considering "master" to be + # protected. + branches = github_connection.getProjectBranches(project, tenant) + self.assertEqual(branches, ["master"]) + # This test verifies that a PR is considered in case it was created for # a branch just has been set to protected before a tenant reconfiguration # took place. diff --git a/tests/unit/test_gitlab_driver.py b/tests/unit/test_gitlab_driver.py index 6c4d4eeb9..e04a3b5d6 100644 --- a/tests/unit/test_gitlab_driver.py +++ b/tests/unit/test_gitlab_driver.py @@ -126,6 +126,27 @@ class TestGitlabDriver(ZuulTestCase): self.assertTrue(A.approved) @simple_layout('layouts/basic-gitlab.yaml', driver='gitlab') + def test_merge_request_opened_imcomplete(self): + + now = time.monotonic() + complete_at = now + 3 + with self.fake_gitlab.enable_delayed_complete_mr(complete_at): + description = "This is the\nMR description." + A = self.fake_gitlab.openFakeMergeRequest( + 'org/project', 'master', 'A', description=description) + self.fake_gitlab.emitEvent( + A.getMergeRequestOpenedEvent(), project='org/project') + self.waitUntilSettled() + + self.assertEqual('SUCCESS', + self.getJobFromHistory('project-test1').result) + + self.assertEqual('SUCCESS', + self.getJobFromHistory('project-test2').result) + + self.assertTrue(self.fake_gitlab._test_web_server.stats["get_mr"] > 2) + + @simple_layout('layouts/basic-gitlab.yaml', driver='gitlab') def test_merge_request_updated(self): A = self.fake_gitlab.openFakeMergeRequest('org/project', 'master', 'A') @@ -407,7 +428,7 @@ class TestGitlabDriver(ZuulTestCase): @simple_layout('layouts/basic-gitlab.yaml', driver='gitlab') def test_pull_request_with_dyn_reconf(self): - + path = os.path.join(self.upstream_root, 'org/project') zuul_yaml = [ {'job': { 'name': 'project-test3', @@ -424,11 +445,13 @@ class TestGitlabDriver(ZuulTestCase): playbook = "- hosts: all\n tasks: []" A = self.fake_gitlab.openFakeMergeRequest( - 'org/project', 'master', 'A') + 'org/project', 'master', 'A', + base_sha=git.Repo(path).head.object.hexsha) A.addCommit( {'.zuul.yaml': yaml.dump(zuul_yaml), 'job.yaml': playbook} ) + A.addCommit({"dummy.file": ""}) self.fake_gitlab.emitEvent(A.getMergeRequestOpenedEvent()) self.waitUntilSettled() @@ -440,6 +463,40 @@ class TestGitlabDriver(ZuulTestCase): self.getJobFromHistory('project-test3').result) @simple_layout('layouts/basic-gitlab.yaml', driver='gitlab') + def test_pull_request_with_dyn_reconf_alt(self): + with self.fake_gitlab.enable_uncomplete_mr(): + zuul_yaml = [ + {'job': { + 'name': 'project-test3', + 'run': 'job.yaml' + }}, + {'project': { + 'check': { + 'jobs': [ + 'project-test3' + ] + } + }} + ] + playbook = "- hosts: all\n tasks: []" + A = self.fake_gitlab.openFakeMergeRequest( + 'org/project', 'master', 'A') + A.addCommit( + {'.zuul.yaml': yaml.dump(zuul_yaml), + 'job.yaml': playbook} + ) + A.addCommit({"dummy.file": ""}) + self.fake_gitlab.emitEvent(A.getMergeRequestOpenedEvent()) + self.waitUntilSettled() + + self.assertEqual('SUCCESS', + self.getJobFromHistory('project-test1').result) + self.assertEqual('SUCCESS', + self.getJobFromHistory('project-test2').result) + self.assertEqual('SUCCESS', + self.getJobFromHistory('project-test3').result) + + @simple_layout('layouts/basic-gitlab.yaml', driver='gitlab') def test_ref_updated_and_tenant_reconfigure(self): self.waitUntilSettled() diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index a5a49bed4..c6cdee7ea 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -293,6 +293,33 @@ class TestModelUpgrade(ZuulTestCase): result='SUCCESS', changes='1,1'), ], ordered=False) + @model_version(12) + def test_model_12_13(self): + # Initially queue items will still have the full trigger event + # stored in Zookeeper. The trigger event will be converted to + # an event info object after the model API update. + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 1) + + # Upgrade our component + self.model_test_component_info.model_api = 13 + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + dict(name='project1-project2-integration', + result='SUCCESS', changes='1,1'), + ], ordered=False) + class TestGithubModelUpgrade(ZuulTestCase): config_file = 'zuul-github-driver.conf' diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index 031b10a1e..62e51ac3f 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -30,16 +30,14 @@ import time import textwrap import requests import urllib.parse -from uuid import uuid4 import zuul.cmd from zuul.lib.config import get_default -from zuul.model import SystemAttributes, PipelineState +from zuul.model import SystemAttributes, PipelineState, PipelineChangeList from zuul.zk import ZooKeeperClient from zuul.lib.keystorage import KeyStorage -from zuul.zk.locks import tenant_write_lock +from zuul.zk.locks import tenant_read_lock, pipeline_lock from zuul.zk.zkobject import ZKContext -from zuul.zk.layout import LayoutState, LayoutStateStore from zuul.zk.components import COMPONENT_REGISTRY @@ -1029,28 +1027,18 @@ class Client(zuul.cmd.ZuulApp): safe_tenant = urllib.parse.quote_plus(args.tenant) safe_pipeline = urllib.parse.quote_plus(args.pipeline) COMPONENT_REGISTRY.create(zk_client) - with tenant_write_lock(zk_client, args.tenant) as lock: + self.log.info('get tenant') + with tenant_read_lock(zk_client, args.tenant): path = f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}' - layout_uuid = None - zk_client.client.delete( - f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}', - recursive=True) - with ZKContext(zk_client, lock, None, self.log) as context: - ps = PipelineState.new(context, _path=path, - layout_uuid=layout_uuid) - # Force everyone to make a new layout for this tenant in - # order to rebuild the shared change queues. - layout_state = LayoutState( - tenant_name=args.tenant, - hostname='admin command', - last_reconfigured=int(time.time()), - last_reconfigure_event_ltime=-1, - uuid=uuid4().hex, - branch_cache_min_ltimes={}, - ltime=ps._zstat.last_modified_transaction_id, - ) - tenant_layout_state = LayoutStateStore(zk_client, lambda: None) - tenant_layout_state[args.tenant] = layout_state + self.log.info('get pipe') + with pipeline_lock( + zk_client, args.tenant, args.pipeline + ) as plock: + self.log.info('got locks') + zk_client.client.delete(path, recursive=True) + with ZKContext(zk_client, plock, None, self.log) as context: + PipelineState.new(context, _path=path, layout_uuid=None) + PipelineChangeList.new(context) sys.exit(0) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 276365e1d..f871671aa 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -161,6 +161,8 @@ class GerritEventConnector(threading.Thread): IGNORED_EVENTS = ( 'cache-eviction', # evict-cache plugin + 'fetch-ref-replicated', + 'fetch-ref-replication-scheduled', 'ref-replicated', 'ref-replication-scheduled', 'ref-replication-done' diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 182c83bae..cffbd6769 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -439,7 +439,11 @@ class GithubEventProcessor(object): # branch is now protected. if hasattr(event, "branch") and event.branch: protected = None - if change: + # Only use the `branch_protected` flag if the + # target branch of change and event are the same. + # The base branch could have changed in the + # meantime. + if change and change.branch == event.branch: # PR based events already have the information if the # target branch is protected so take the information # from there. diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 1515db8df..da423f085 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -281,6 +281,7 @@ class GitlabAPIClient(): self.api_token = api_token self.keepalive = keepalive self.disable_pool = disable_pool + self.get_mr_wait_factor = 2 self.headers = {'Authorization': 'Bearer %s' % ( self.api_token)} @@ -342,11 +343,36 @@ class GitlabAPIClient(): # https://docs.gitlab.com/ee/api/merge_requests.html#get-single-mr def get_mr(self, project_name, number, zuul_event_id=None): - path = "/projects/%s/merge_requests/%s" % ( - quote_plus(project_name), number) - resp = self.get(self.baseurl + path, zuul_event_id=zuul_event_id) - self._manage_error(*resp, zuul_event_id=zuul_event_id) - return resp[0] + log = get_annotated_logger(self.log, zuul_event_id) + attempts = 0 + + def _get_mr(): + path = "/projects/%s/merge_requests/%s" % ( + quote_plus(project_name), number) + resp = self.get(self.baseurl + path, zuul_event_id=zuul_event_id) + self._manage_error(*resp, zuul_event_id=zuul_event_id) + return resp[0] + + # The Gitlab API might not return a complete MR description as + # some attributes are updated asynchronously. This loop ensures + # we query the API until all async attributes are available or until + # a defined delay is reached. + while True: + attempts += 1 + mr = _get_mr() + # The diff_refs attribute is updated asynchronously + if all(map(lambda k: mr.get(k, None), ['diff_refs'])): + return mr + if attempts > 4: + log.warning( + "Fetched MR %s#%s with imcomplete data" % ( + project_name, number)) + return mr + wait_delay = attempts * self.get_mr_wait_factor + log.info( + "Will retry to fetch %s#%s due to imcomplete data " + "(in %s seconds) ..." % (project_name, number, wait_delay)) + time.sleep(wait_delay) # https://docs.gitlab.com/ee/api/branches.html#list-repository-branches def get_project_branches(self, project_name, exclude_unprotected, @@ -667,8 +693,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection): change.ref = "refs/merge-requests/%s/head" % change.number change.branch = change.mr['target_branch'] change.is_current_patchset = (change.mr['sha'] == change.patchset) - change.base_sha = change.mr['diff_refs'].get('base_sha') - change.commit_id = change.mr['diff_refs'].get('head_sha') + change.commit_id = event.patch_number + diff_refs = change.mr.get("diff_refs", {}) + if diff_refs: + change.base_sha = diff_refs.get('base_sha') + else: + change.base_sha = None change.owner = change.mr['author'].get('username') # Files changes are not part of the Merge Request data # See api/merge_requests.html#get-single-mr-changes diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index d7a95e9b5..c3d082a47 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -243,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta): and self.useDependenciesByTopic(change.project)) if (update_commit_dependencies or update_topic_dependencies): - self.updateCommitDependencies(change, None, event=None) + self.updateCommitDependencies(change, event=None) self._change_cache[change.cache_key] = change resolved_changes.append(change) return resolved_changes @@ -285,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta): return True return False - def isAnyVersionOfChangeInPipeline(self, change): - # Checks any items in the pipeline + def isChangeRelevantToPipeline(self, change): + # Checks if any version of the change or its deps matches any + # item in the pipeline. for change_key in self.pipeline.change_list.getChangeKeys(): if change.cache_stat.key.isSameChange(change_key): return True + if isinstance(change, model.Change): + for dep_change_ref in change.getNeedsChanges( + self.useDependenciesByTopic(change.project)): + dep_change_key = ChangeKey.fromReference(dep_change_ref) + if change.cache_stat.key.isSameChange(dep_change_key): + return True return False def isChangeAlreadyInQueue(self, change, change_queue): @@ -315,7 +322,7 @@ class PipelineManager(metaclass=ABCMeta): to_refresh.add(item.change) for existing_change in to_refresh: - self.updateCommitDependencies(existing_change, None, event) + self.updateCommitDependencies(existing_change, event) def reportEnqueue(self, item): if not self.pipeline.state.disabled: @@ -516,7 +523,8 @@ class PipelineManager(metaclass=ABCMeta): def addChange(self, change, event, quiet=False, enqueue_time=None, ignore_requirements=False, live=True, - change_queue=None, history=None, dependency_graph=None): + change_queue=None, history=None, dependency_graph=None, + skip_presence_check=False): log = get_annotated_logger(self.log, event) log.debug("Considering adding change %s" % change) @@ -531,7 +539,9 @@ class PipelineManager(metaclass=ABCMeta): # If we are adding a live change, check if it's a live item # anywhere in the pipeline. Otherwise, we will perform the # duplicate check below on the specific change_queue. - if live and self.isChangeAlreadyInPipeline(change): + if (live and + self.isChangeAlreadyInPipeline(change) and + not skip_presence_check): log.debug("Change %s is already in pipeline, ignoring" % change) return True @@ -564,7 +574,7 @@ class PipelineManager(metaclass=ABCMeta): # to date and this is a noop; otherwise, we need to refresh # them anyway. if isinstance(change, model.Change): - self.updateCommitDependencies(change, None, event) + self.updateCommitDependencies(change, event) with self.getChangeQueue(change, event, change_queue) as change_queue: if not change_queue: @@ -590,8 +600,10 @@ class PipelineManager(metaclass=ABCMeta): log.debug("History after enqueuing changes ahead: %s", history) if self.isChangeAlreadyInQueue(change, change_queue): - log.debug("Change %s is already in queue, ignoring" % change) - return True + if not skip_presence_check: + log.debug("Change %s is already in queue, ignoring", + change) + return True cycle = [] if isinstance(change, model.Change): @@ -625,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta): if enqueue_time: item.enqueue_time = enqueue_time item.live = live - self.reportStats(item, added=True) + self.reportStats(item, trigger_event=event) item.quiet = quiet if item.live: @@ -857,7 +869,7 @@ class PipelineManager(metaclass=ABCMeta): self.pipeline.tenant.name][other_pipeline.name].put( event, needs_result=False) - def updateCommitDependencies(self, change, change_queue, event): + def updateCommitDependencies(self, change, event): log = get_annotated_logger(self.log, event) must_update_commit_deps = ( @@ -1555,6 +1567,7 @@ class PipelineManager(metaclass=ABCMeta): log.info("Dequeuing change %s because " "it can no longer merge" % item.change) self.cancelJobs(item) + quiet_dequeue = False if item.isBundleFailing(): item.setDequeuedBundleFailing('Bundle is failing') elif not meets_reqs: @@ -1566,7 +1579,28 @@ class PipelineManager(metaclass=ABCMeta): else: msg = f'Change {clist} is needed.' item.setDequeuedNeedingChange(msg) - if item.live: + # If all the dependencies are already in the pipeline + # (but not ahead of this change), then we probably + # just added updated versions of them, possibly + # updating a cycle. In that case, attempt to + # re-enqueue this change with the updated deps. + if (item.live and + all([self.isChangeAlreadyInPipeline(c) + for c in needs_changes])): + # Try enqueue, if that succeeds, keep this dequeue quiet + try: + log.info("Attempting re-enqueue of change %s", + item.change) + quiet_dequeue = self.addChange( + item.change, item.event, + enqueue_time=item.enqueue_time, + quiet=True, + skip_presence_check=True) + except Exception: + log.exception("Unable to re-enqueue change %s " + "which is missing dependencies", + item.change) + if item.live and not quiet_dequeue: try: self.reportItem(item) except exceptions.MergeFailure: @@ -2198,7 +2232,7 @@ class PipelineManager(metaclass=ABCMeta): log.error("Reporting item %s received: %s", item, ret) return action, (not ret) - def reportStats(self, item, added=False): + def reportStats(self, item, trigger_event=None): if not self.sched.statsd: return try: @@ -2237,18 +2271,21 @@ class PipelineManager(metaclass=ABCMeta): if dt: self.sched.statsd.timing(key + '.resident_time', dt) self.sched.statsd.incr(key + '.total_changes') - if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'): + if ( + trigger_event + and hasattr(trigger_event, 'arrived_at_scheduler_timestamp') + ): now = time.time() - arrived = item.event.arrived_at_scheduler_timestamp + arrived = trigger_event.arrived_at_scheduler_timestamp processing = (now - arrived) * 1000 - elapsed = (now - item.event.timestamp) * 1000 + elapsed = (now - trigger_event.timestamp) * 1000 self.sched.statsd.timing( basekey + '.event_enqueue_processing_time', processing) self.sched.statsd.timing( basekey + '.event_enqueue_time', elapsed) self.reportPipelineTiming('event_enqueue_time', - item.event.timestamp) + trigger_event.timestamp) except Exception: self.log.exception("Exception reporting pipeline stats") diff --git a/zuul/model.py b/zuul/model.py index 1d82b5f2c..e526b749c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -4666,6 +4666,37 @@ class BuildSet(zkobject.ZKObject): return Attributes(uuid=self.uuid) +class EventInfo: + + def __init__(self): + self.zuul_event_id = None + self.timestamp = time.time() + self.span_context = None + + @classmethod + def fromEvent(cls, event): + tinfo = cls() + tinfo.zuul_event_id = event.zuul_event_id + tinfo.timestamp = event.timestamp + tinfo.span_context = event.span_context + return tinfo + + @classmethod + def fromDict(cls, d): + tinfo = cls() + tinfo.zuul_event_id = d["zuul_event_id"] + tinfo.timestamp = d["timestamp"] + tinfo.span_context = d["span_context"] + return tinfo + + def toDict(self): + return { + "zuul_event_id": self.zuul_event_id, + "timestamp": self.timestamp, + "span_context": self.span_context, + } + + class QueueItem(zkobject.ZKObject): """Represents the position of a Change in a ChangeQueue. @@ -4700,7 +4731,7 @@ class QueueItem(zkobject.ZKObject): live=True, # Whether an item is intended to be processed at all layout_uuid=None, _cached_sql_results={}, - event=None, # The trigger event that lead to this queue item + event=None, # Info about the event that lead to this queue item # Additional container for connection specifig information to be # used by reporters throughout the lifecycle @@ -4722,6 +4753,9 @@ class QueueItem(zkobject.ZKObject): def new(klass, context, **kw): obj = klass() obj._set(**kw) + if COMPONENT_REGISTRY.model_api >= 13: + obj._set(event=obj.event and EventInfo.fromEvent(obj.event)) + data = obj._trySerialize(context) obj._save(context, data, create=True) files_state = (BuildSet.COMPLETE if obj.change.files is not None @@ -4750,10 +4784,18 @@ class QueueItem(zkobject.ZKObject): return (tenant, pipeline, uuid) def serialize(self, context): - if isinstance(self.event, TriggerEvent): - event_type = "TriggerEvent" + if COMPONENT_REGISTRY.model_api < 13: + if isinstance(self.event, TriggerEvent): + event_type = "TriggerEvent" + else: + event_type = self.event.__class__.__name__ else: - event_type = self.event.__class__.__name__ + event_type = "EventInfo" + if not isinstance(self.event, EventInfo): + # Convert our local trigger event to a trigger info + # object. This will only happen on the transition to + # model API version 13. + self._set(event=EventInfo.fromEvent(self.event)) data = { "uuid": self.uuid, @@ -4795,14 +4837,18 @@ class QueueItem(zkobject.ZKObject): # child objects. self._set(uuid=data["uuid"]) - event_type = data["event"]["type"] - if event_type == "TriggerEvent": - event_class = ( - self.pipeline.manager.sched.connections.getTriggerEventClass( - data["event"]["data"]["driver_name"]) - ) + if COMPONENT_REGISTRY.model_api < 13: + event_type = data["event"]["type"] + if event_type == "TriggerEvent": + event_class = ( + self.pipeline.manager.sched.connections + .getTriggerEventClass( + data["event"]["data"]["driver_name"]) + ) + else: + event_class = EventTypeIndex.event_type_mapping.get(event_type) else: - event_class = EventTypeIndex.event_type_mapping.get(event_type) + event_class = EventInfo if event_class is None: raise NotImplementedError( diff --git a/zuul/model_api.py b/zuul/model_api.py index ccb12077d..0244296dd 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -14,4 +14,4 @@ # When making ZK schema changes, increment this and add a record to # doc/source/developer/model-changelog.rst -MODEL_API = 12 +MODEL_API = 13 diff --git a/zuul/scheduler.py b/zuul/scheduler.py index e646c09b8..cd15a878c 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -2519,9 +2519,26 @@ class Scheduler(threading.Thread): event.span_context = tracing.getSpanContext(span) for pipeline in tenant.layout.pipelines.values(): + # For most kinds of dependencies, it's sufficient to check + # if this change is already in the pipeline, because the + # only way to update a dependency cycle is to update one + # of the changes in it. However, dependencies-by-topic + # can have changes added to the cycle without updating any + # of the existing changes in the cycle. That means in + # order to detect whether a new change is added to an + # existing cycle in the pipeline, we need to know all of + # the dependencies of the new change, and check if *they* + # are in the pipeline. Therefore, go ahead and update our + # dependencies here so they are available for comparison + # against the pipeline contents. This front-loads some + # work that otherwise would happen in the pipeline + # manager, but the result of the work goes into the change + # cache, so it's not wasted; it's just less parallelized. + if isinstance(change, Change): + pipeline.manager.updateCommitDependencies(change, event) if ( pipeline.manager.eventMatches(event, change) - or pipeline.manager.isAnyVersionOfChangeInPipeline(change) + or pipeline.manager.isChangeRelevantToPipeline(change) ): self.pipeline_trigger_events[tenant.name][ pipeline.name |