summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/source/developer/model-changelog.rst7
-rw-r--r--tests/base.py15
-rw-r--r--tests/fakegitlab.py24
-rw-r--r--tests/fixtures/layouts/deps-by-topic.yaml13
-rw-r--r--tests/unit/test_circular_dependencies.py93
-rw-r--r--tests/unit/test_client.py158
-rw-r--r--tests/unit/test_github_driver.py35
-rw-r--r--tests/unit/test_gitlab_driver.py61
-rw-r--r--tests/unit/test_model_upgrade.py27
-rwxr-xr-xzuul/cmd/client.py38
-rw-r--r--zuul/driver/gerrit/gerritconnection.py2
-rw-r--r--zuul/driver/github/githubconnection.py6
-rw-r--r--zuul/driver/gitlab/gitlabconnection.py44
-rw-r--r--zuul/manager/__init__.py71
-rw-r--r--zuul/model.py68
-rw-r--r--zuul/model_api.py2
-rw-r--r--zuul/scheduler.py19
17 files changed, 532 insertions, 151 deletions
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst
index b80979362..f78b3f0a0 100644
--- a/doc/source/developer/model-changelog.rst
+++ b/doc/source/developer/model-changelog.rst
@@ -112,3 +112,10 @@ Version 12
:Prior Zuul version: 8.0.1
:Description: Adds job_versions and build_versions to BuildSet.
Affects schedulers.
+
+Version 13
+----------
+:Prior Zuul version: 8.2.0
+:Description: Stores only the necessary event info as part of a queue item
+ instead of the full trigger event.
+ Affects schedulers.
diff --git a/tests/base.py b/tests/base.py
index dcd316fab..fd927a92c 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -2118,6 +2118,21 @@ class FakeGitlabConnection(gitlabconnection.GitlabConnection):
yield
self._test_web_server.options['community_edition'] = False
+ @contextmanager
+ def enable_delayed_complete_mr(self, complete_at):
+ self._test_web_server.options['delayed_complete_mr'] = complete_at
+ yield
+ self._test_web_server.options['delayed_complete_mr'] = 0
+
+ @contextmanager
+ def enable_uncomplete_mr(self):
+ self._test_web_server.options['uncomplete_mr'] = True
+ orig = self.gl_client.get_mr_wait_factor
+ self.gl_client.get_mr_wait_factor = 0.1
+ yield
+ self.gl_client.get_mr_wait_factor = orig
+ self._test_web_server.options['uncomplete_mr'] = False
+
class GitlabChangeReference(git.Reference):
_common_path_default = "refs/merge-requests"
diff --git a/tests/fakegitlab.py b/tests/fakegitlab.py
index 294887af0..e4a3e1ac8 100644
--- a/tests/fakegitlab.py
+++ b/tests/fakegitlab.py
@@ -21,6 +21,7 @@ import re
import socketserver
import threading
import urllib.parse
+import time
from git.util import IterableList
@@ -32,12 +33,17 @@ class GitlabWebServer(object):
self.merge_requests = merge_requests
self.fake_repos = defaultdict(lambda: IterableList('name'))
# A dictionary so we can mutate it
- self.options = dict(community_edition=False)
+ self.options = dict(
+ community_edition=False,
+ delayed_complete_mr=0,
+ uncomplete_mr=False)
+ self.stats = {"get_mr": 0}
def start(self):
merge_requests = self.merge_requests
fake_repos = self.fake_repos
options = self.options
+ stats = self.stats
class Server(http.server.SimpleHTTPRequestHandler):
log = logging.getLogger("zuul.test.GitlabWebServer")
@@ -146,6 +152,7 @@ class GitlabWebServer(object):
self.wfile.write(data)
def get_mr(self, project, mr):
+ stats["get_mr"] += 1
mr = self._get_mr(project, mr)
data = {
'target_branch': mr.branch,
@@ -162,13 +169,20 @@ class GitlabWebServer(object):
'labels': mr.labels,
'merged_at': mr.merged_at.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
if mr.merged_at else mr.merged_at,
- 'diff_refs': {
+ 'merge_status': mr.merge_status,
+ }
+ if options['delayed_complete_mr'] and \
+ time.monotonic() < options['delayed_complete_mr']:
+ diff_refs = None
+ elif options['uncomplete_mr']:
+ diff_refs = None
+ else:
+ diff_refs = {
'base_sha': mr.base_sha,
'head_sha': mr.sha,
'start_sha': 'c380d3acebd181f13629a25d2e2acca46ffe1e00'
- },
- 'merge_status': mr.merge_status,
- }
+ }
+ data['diff_refs'] = diff_refs
self.send_data(data)
def get_mr_approvals(self, project, mr):
diff --git a/tests/fixtures/layouts/deps-by-topic.yaml b/tests/fixtures/layouts/deps-by-topic.yaml
index 3824c5c2c..e7e8fc465 100644
--- a/tests/fixtures/layouts/deps-by-topic.yaml
+++ b/tests/fixtures/layouts/deps-by-topic.yaml
@@ -47,24 +47,27 @@
run: playbooks/run.yaml
- job:
- name: test-job
+ name: check-job
+
+- job:
+ name: gate-job
- project:
name: org/project1
queue: integrated
check:
jobs:
- - test-job
+ - check-job
gate:
jobs:
- - test-job
+ - gate-job
- project:
name: org/project2
queue: integrated
check:
jobs:
- - test-job
+ - check-job
gate:
jobs:
- - test-job
+ - gate-job
diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py
index f534b2596..a3f9dda33 100644
--- a/tests/unit/test_circular_dependencies.py
+++ b/tests/unit/test_circular_dependencies.py
@@ -2267,8 +2267,8 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(B.patchsets[-1]["approvals"][0]["value"], "1")
self.assertHistory([
- dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
- dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
], ordered=False)
A.addPatchset()
@@ -2277,10 +2277,10 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertHistory([
# Original check run
- dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
- dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
# Second check run
- dict(name="test-job", result="SUCCESS", changes="2,1 1,2"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
def test_deps_by_topic_multi_tenant(self):
@@ -2368,16 +2368,85 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.executor_server.release()
self.waitUntilSettled()
- # A quirk: at the end of this process, the first change in
- # Gerrit has a complete run because the process of updating it
- # involves a new patchset that is enqueued. Compare to the
- # same test in GitHub.
self.assertHistory([
dict(name="project-job", result="ABORTED", changes="1,1"),
dict(name="project-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="project-job", result="SUCCESS", changes="1,2 2,1"),
dict(name="project-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
+ @simple_layout('layouts/deps-by-topic.yaml')
+ def test_dependency_refresh_by_topic_check(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates the typical workflow where a developer
+ # uploads changes one at a time.
+ # The first change:
+ A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Now that it has been uploaded, upload the second change
+ # in the same topic.
+ B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name="check-job", result="ABORTED", changes="1,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
+ ], ordered=False)
+
+ @simple_layout('layouts/deps-by-topic.yaml')
+ def test_dependency_refresh_by_topic_gate(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates a workflow where a developer adds a change to
+ # a cycle already in gate.
+ A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
+ topic='test-topic')
+ B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
+ topic='test-topic')
+ A.addApproval("Code-Review", 2)
+ B.addApproval("Code-Review", 2)
+ A.addApproval("Approved", 1)
+ self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
+ self.waitUntilSettled()
+
+ # Add a new change to the cycle.
+ C = self.fake_gerrit.addFakeChange('org/project1', "master", "C",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # At the end of this process, the gate jobs should be aborted
+ # because the new dpendency showed up.
+ self.assertEqual(A.data["status"], "NEW")
+ self.assertEqual(B.data["status"], "NEW")
+ self.assertEqual(C.data["status"], "NEW")
+ self.assertHistory([
+ dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1 3,1"),
+ ], ordered=False)
+
class TestGithubCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
@@ -2607,13 +2676,11 @@ class TestGithubCircularDependencies(ZuulTestCase):
self.executor_server.release()
self.waitUntilSettled()
- # A quirk: at the end of this process, the second PR in GitHub
- # has a complete run because the process of updating the first
- # change is not disruptive to the second. Compare to the same
- # test in Gerrit.
self.assertHistory([
dict(name="project-job", result="ABORTED",
changes=f"{A.number},{A.head_sha}"),
dict(name="project-job", result="SUCCESS",
changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"),
+ dict(name="project-job", result="SUCCESS",
+ changes=f"{B.number},{B.head_sha} {A.number},{A.head_sha}"),
], ordered=False)
diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py
index b51639952..f241147eb 100644
--- a/tests/unit/test_client.py
+++ b/tests/unit/test_client.py
@@ -27,10 +27,11 @@ import jwt
import testtools
from zuul.zk import ZooKeeperClient
+from zuul.zk.locks import SessionAwareLock
from zuul.cmd.client import parse_cutoff
from tests.base import BaseTestCase, ZuulTestCase
-from tests.base import FIXTURE_DIR
+from tests.base import FIXTURE_DIR, iterate_timeout
from kazoo.exceptions import NoNodeError
@@ -362,81 +363,112 @@ class TestOnlineZKOperations(ZuulTestCase):
def assertSQLState(self):
pass
- def test_delete_pipeline_check(self):
- self.executor_server.hold_jobs_in_build = True
- A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
- self.waitUntilSettled()
-
- config_file = os.path.join(self.test_root, 'zuul.conf')
- with open(config_file, 'w') as f:
- self.config.write(f)
-
- # Make sure the pipeline exists
- self.getZKTree('/zuul/tenant/tenant-one/pipeline/check/item')
- p = subprocess.Popen(
- [os.path.join(sys.prefix, 'bin/zuul-admin'),
- '-c', config_file,
- 'delete-pipeline-state',
- 'tenant-one', 'check',
- ],
- stdout=subprocess.PIPE)
- out, _ = p.communicate()
- self.log.debug(out.decode('utf8'))
- # Make sure it's deleted
- with testtools.ExpectedException(NoNodeError):
- self.getZKTree('/zuul/tenant/tenant-one/pipeline/check/item')
-
- self.executor_server.hold_jobs_in_build = False
- self.executor_server.release()
- B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
- self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ def _test_delete_pipeline(self, pipeline):
+ sched = self.scheds.first.sched
+ tenant = sched.abide.tenants['tenant-one']
+ # Force a reconfiguration due to a config change (so that the
+ # tenant trigger event queue gets a minimum timestamp set)
+ file_dict = {'zuul.yaml': ''}
+ M = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ M.setMerged()
+ self.fake_gerrit.addEvent(M.getChangeMergedEvent())
self.waitUntilSettled()
- self.assertHistory([
- dict(name='project-merge', result='SUCCESS', changes='1,1'),
- dict(name='project-merge', result='SUCCESS', changes='2,1'),
- dict(name='project-test1', result='SUCCESS', changes='2,1'),
- dict(name='project-test2', result='SUCCESS', changes='2,1'),
- ], ordered=False)
- def test_delete_pipeline_gate(self):
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
- A.addApproval('Code-Review', 2)
- self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
+ if pipeline == 'check':
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ else:
+ A.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
- config_file = os.path.join(self.test_root, 'zuul.conf')
- with open(config_file, 'w') as f:
- self.config.write(f)
-
- # Make sure the pipeline exists
- self.getZKTree('/zuul/tenant/tenant-one/pipeline/gate/item')
- p = subprocess.Popen(
- [os.path.join(sys.prefix, 'bin/zuul-admin'),
- '-c', config_file,
- 'delete-pipeline-state',
- 'tenant-one', 'gate',
- ],
- stdout=subprocess.PIPE)
- out, _ = p.communicate()
- self.log.debug(out.decode('utf8'))
- # Make sure it's deleted
- with testtools.ExpectedException(NoNodeError):
- self.getZKTree('/zuul/tenant/tenant-one/pipeline/gate/item')
+ # Lock the check pipeline so we don't process the event we're
+ # about to submit (it should go into the pipeline trigger event
+ # queue and stay there while we delete the pipeline state).
+ # This way we verify that events arrived before the deletion
+ # still work.
+ plock = SessionAwareLock(
+ self.zk_client.client,
+ f"/zuul/locks/pipeline/{tenant.name}/{pipeline}")
+ plock.acquire(blocking=True, timeout=None)
+ try:
+ self.log.debug('Got pipeline lock')
+ # Add a new event while our old last reconfigure time is
+ # in place.
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ if pipeline == 'check':
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ else:
+ B.addApproval('Code-Review', 2)
+ self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
+
+ # Wait until it appears in the pipeline trigger event queue
+ self.log.debug('Waiting for event')
+ for x in iterate_timeout(30, 'trigger event queue has events'):
+ if sched.pipeline_trigger_events[
+ tenant.name][pipeline].hasEvents():
+ break
+ self.log.debug('Got event')
+ except Exception:
+ plock.release()
+ raise
+ # Grab the run handler lock so that we will continue to avoid
+ # further processing of the event after we release the
+ # pipeline lock (which the delete command needs to acquire).
+ sched.run_handler_lock.acquire()
+ try:
+ plock.release()
+ self.log.debug('Got run lock')
+ config_file = os.path.join(self.test_root, 'zuul.conf')
+ with open(config_file, 'w') as f:
+ self.config.write(f)
+
+ # Make sure the pipeline exists
+ self.getZKTree(
+ f'/zuul/tenant/{tenant.name}/pipeline/{pipeline}/item')
+ # Save the old layout uuid
+ tenant = sched.abide.tenants[tenant.name]
+ old_layout_uuid = tenant.layout.uuid
+ self.log.debug('Deleting pipeline state')
+ p = subprocess.Popen(
+ [os.path.join(sys.prefix, 'bin/zuul-admin'),
+ '-c', config_file,
+ 'delete-pipeline-state',
+ tenant.name, pipeline,
+ ],
+ stdout=subprocess.PIPE)
+ # Delete the pipeline state
+ out, _ = p.communicate()
+ self.log.debug(out.decode('utf8'))
+ # Make sure it's deleted
+ with testtools.ExpectedException(NoNodeError):
+ self.getZKTree(
+ f'/zuul/tenant/{tenant.name}/pipeline/{pipeline}/item')
+ finally:
+ sched.run_handler_lock.release()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
- B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
- B.addApproval('Code-Review', 2)
- self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.waitUntilSettled()
self.assertHistory([
- dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='2,1'),
- dict(name='project-test1', result='SUCCESS', changes='2,1'),
- dict(name='project-test2', result='SUCCESS', changes='2,1'),
+ dict(name='project-merge', result='SUCCESS', changes='3,1'),
+ dict(name='project-test1', result='SUCCESS', changes='3,1'),
+ dict(name='project-test2', result='SUCCESS', changes='3,1'),
], ordered=False)
+ tenant = sched.abide.tenants[tenant.name]
+ new_layout_uuid = tenant.layout.uuid
+ self.assertEqual(old_layout_uuid, new_layout_uuid)
+ self.assertEqual(tenant.layout.pipelines[pipeline].state.layout_uuid,
+ old_layout_uuid)
+
+ def test_delete_pipeline_check(self):
+ self._test_delete_pipeline('check')
+
+ def test_delete_pipeline_gate(self):
+ self._test_delete_pipeline('gate')
class TestDBPruneParse(BaseTestCase):
diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py
index fb46aa7d1..47e84ca7f 100644
--- a/tests/unit/test_github_driver.py
+++ b/tests/unit/test_github_driver.py
@@ -1741,6 +1741,41 @@ class TestGithubUnprotectedBranches(ZuulTestCase):
# branch
self.assertLess(old, new)
+ def test_base_branch_updated(self):
+ self.create_branch('org/project2', 'feature')
+ github = self.fake_github.getGithubClient()
+ repo = github.repo_from_project('org/project2')
+ repo._set_branch_protection('master', True)
+
+ # Make sure Zuul picked up and cached the configured branches
+ self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
+
+ github_connection = self.scheds.first.connections.connections['github']
+ tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
+ project = github_connection.source.getProject('org/project2')
+
+ # Verify that only the master branch is considered protected
+ branches = github_connection.getProjectBranches(project, tenant)
+ self.assertEqual(branches, ["master"])
+
+ A = self.fake_github.openFakePullRequest('org/project2', 'master',
+ 'A')
+ # Fake an event from a pull-request that changed the base
+ # branch from "feature" to "master". The PR is already
+ # using "master" as base, but the event still references
+ # the old "feature" branch.
+ event = A.getPullRequestOpenedEvent()
+ event[1]["pull_request"]["base"]["ref"] = "feature"
+
+ self.fake_github.emitEvent(event)
+ self.waitUntilSettled()
+
+ # Make sure we are still only considering "master" to be
+ # protected.
+ branches = github_connection.getProjectBranches(project, tenant)
+ self.assertEqual(branches, ["master"])
+
# This test verifies that a PR is considered in case it was created for
# a branch just has been set to protected before a tenant reconfiguration
# took place.
diff --git a/tests/unit/test_gitlab_driver.py b/tests/unit/test_gitlab_driver.py
index 6c4d4eeb9..e04a3b5d6 100644
--- a/tests/unit/test_gitlab_driver.py
+++ b/tests/unit/test_gitlab_driver.py
@@ -126,6 +126,27 @@ class TestGitlabDriver(ZuulTestCase):
self.assertTrue(A.approved)
@simple_layout('layouts/basic-gitlab.yaml', driver='gitlab')
+ def test_merge_request_opened_imcomplete(self):
+
+ now = time.monotonic()
+ complete_at = now + 3
+ with self.fake_gitlab.enable_delayed_complete_mr(complete_at):
+ description = "This is the\nMR description."
+ A = self.fake_gitlab.openFakeMergeRequest(
+ 'org/project', 'master', 'A', description=description)
+ self.fake_gitlab.emitEvent(
+ A.getMergeRequestOpenedEvent(), project='org/project')
+ self.waitUntilSettled()
+
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test1').result)
+
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test2').result)
+
+ self.assertTrue(self.fake_gitlab._test_web_server.stats["get_mr"] > 2)
+
+ @simple_layout('layouts/basic-gitlab.yaml', driver='gitlab')
def test_merge_request_updated(self):
A = self.fake_gitlab.openFakeMergeRequest('org/project', 'master', 'A')
@@ -407,7 +428,7 @@ class TestGitlabDriver(ZuulTestCase):
@simple_layout('layouts/basic-gitlab.yaml', driver='gitlab')
def test_pull_request_with_dyn_reconf(self):
-
+ path = os.path.join(self.upstream_root, 'org/project')
zuul_yaml = [
{'job': {
'name': 'project-test3',
@@ -424,11 +445,13 @@ class TestGitlabDriver(ZuulTestCase):
playbook = "- hosts: all\n tasks: []"
A = self.fake_gitlab.openFakeMergeRequest(
- 'org/project', 'master', 'A')
+ 'org/project', 'master', 'A',
+ base_sha=git.Repo(path).head.object.hexsha)
A.addCommit(
{'.zuul.yaml': yaml.dump(zuul_yaml),
'job.yaml': playbook}
)
+ A.addCommit({"dummy.file": ""})
self.fake_gitlab.emitEvent(A.getMergeRequestOpenedEvent())
self.waitUntilSettled()
@@ -440,6 +463,40 @@ class TestGitlabDriver(ZuulTestCase):
self.getJobFromHistory('project-test3').result)
@simple_layout('layouts/basic-gitlab.yaml', driver='gitlab')
+ def test_pull_request_with_dyn_reconf_alt(self):
+ with self.fake_gitlab.enable_uncomplete_mr():
+ zuul_yaml = [
+ {'job': {
+ 'name': 'project-test3',
+ 'run': 'job.yaml'
+ }},
+ {'project': {
+ 'check': {
+ 'jobs': [
+ 'project-test3'
+ ]
+ }
+ }}
+ ]
+ playbook = "- hosts: all\n tasks: []"
+ A = self.fake_gitlab.openFakeMergeRequest(
+ 'org/project', 'master', 'A')
+ A.addCommit(
+ {'.zuul.yaml': yaml.dump(zuul_yaml),
+ 'job.yaml': playbook}
+ )
+ A.addCommit({"dummy.file": ""})
+ self.fake_gitlab.emitEvent(A.getMergeRequestOpenedEvent())
+ self.waitUntilSettled()
+
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test1').result)
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test2').result)
+ self.assertEqual('SUCCESS',
+ self.getJobFromHistory('project-test3').result)
+
+ @simple_layout('layouts/basic-gitlab.yaml', driver='gitlab')
def test_ref_updated_and_tenant_reconfigure(self):
self.waitUntilSettled()
diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py
index a5a49bed4..c6cdee7ea 100644
--- a/tests/unit/test_model_upgrade.py
+++ b/tests/unit/test_model_upgrade.py
@@ -293,6 +293,33 @@ class TestModelUpgrade(ZuulTestCase):
result='SUCCESS', changes='1,1'),
], ordered=False)
+ @model_version(12)
+ def test_model_12_13(self):
+ # Initially queue items will still have the full trigger event
+ # stored in Zookeeper. The trigger event will be converted to
+ # an event info object after the model API update.
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 1)
+
+ # Upgrade our component
+ self.model_test_component_info.model_api = 13
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='SUCCESS', changes='1,1'),
+ dict(name='project-test2', result='SUCCESS', changes='1,1'),
+ dict(name='project1-project2-integration',
+ result='SUCCESS', changes='1,1'),
+ ], ordered=False)
+
class TestGithubModelUpgrade(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py
index 031b10a1e..62e51ac3f 100755
--- a/zuul/cmd/client.py
+++ b/zuul/cmd/client.py
@@ -30,16 +30,14 @@ import time
import textwrap
import requests
import urllib.parse
-from uuid import uuid4
import zuul.cmd
from zuul.lib.config import get_default
-from zuul.model import SystemAttributes, PipelineState
+from zuul.model import SystemAttributes, PipelineState, PipelineChangeList
from zuul.zk import ZooKeeperClient
from zuul.lib.keystorage import KeyStorage
-from zuul.zk.locks import tenant_write_lock
+from zuul.zk.locks import tenant_read_lock, pipeline_lock
from zuul.zk.zkobject import ZKContext
-from zuul.zk.layout import LayoutState, LayoutStateStore
from zuul.zk.components import COMPONENT_REGISTRY
@@ -1029,28 +1027,18 @@ class Client(zuul.cmd.ZuulApp):
safe_tenant = urllib.parse.quote_plus(args.tenant)
safe_pipeline = urllib.parse.quote_plus(args.pipeline)
COMPONENT_REGISTRY.create(zk_client)
- with tenant_write_lock(zk_client, args.tenant) as lock:
+ self.log.info('get tenant')
+ with tenant_read_lock(zk_client, args.tenant):
path = f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}'
- layout_uuid = None
- zk_client.client.delete(
- f'/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}',
- recursive=True)
- with ZKContext(zk_client, lock, None, self.log) as context:
- ps = PipelineState.new(context, _path=path,
- layout_uuid=layout_uuid)
- # Force everyone to make a new layout for this tenant in
- # order to rebuild the shared change queues.
- layout_state = LayoutState(
- tenant_name=args.tenant,
- hostname='admin command',
- last_reconfigured=int(time.time()),
- last_reconfigure_event_ltime=-1,
- uuid=uuid4().hex,
- branch_cache_min_ltimes={},
- ltime=ps._zstat.last_modified_transaction_id,
- )
- tenant_layout_state = LayoutStateStore(zk_client, lambda: None)
- tenant_layout_state[args.tenant] = layout_state
+ self.log.info('get pipe')
+ with pipeline_lock(
+ zk_client, args.tenant, args.pipeline
+ ) as plock:
+ self.log.info('got locks')
+ zk_client.client.delete(path, recursive=True)
+ with ZKContext(zk_client, plock, None, self.log) as context:
+ PipelineState.new(context, _path=path, layout_uuid=None)
+ PipelineChangeList.new(context)
sys.exit(0)
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 276365e1d..f871671aa 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -161,6 +161,8 @@ class GerritEventConnector(threading.Thread):
IGNORED_EVENTS = (
'cache-eviction', # evict-cache plugin
+ 'fetch-ref-replicated',
+ 'fetch-ref-replication-scheduled',
'ref-replicated',
'ref-replication-scheduled',
'ref-replication-done'
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 182c83bae..cffbd6769 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -439,7 +439,11 @@ class GithubEventProcessor(object):
# branch is now protected.
if hasattr(event, "branch") and event.branch:
protected = None
- if change:
+ # Only use the `branch_protected` flag if the
+ # target branch of change and event are the same.
+ # The base branch could have changed in the
+ # meantime.
+ if change and change.branch == event.branch:
# PR based events already have the information if the
# target branch is protected so take the information
# from there.
diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py
index 1515db8df..da423f085 100644
--- a/zuul/driver/gitlab/gitlabconnection.py
+++ b/zuul/driver/gitlab/gitlabconnection.py
@@ -281,6 +281,7 @@ class GitlabAPIClient():
self.api_token = api_token
self.keepalive = keepalive
self.disable_pool = disable_pool
+ self.get_mr_wait_factor = 2
self.headers = {'Authorization': 'Bearer %s' % (
self.api_token)}
@@ -342,11 +343,36 @@ class GitlabAPIClient():
# https://docs.gitlab.com/ee/api/merge_requests.html#get-single-mr
def get_mr(self, project_name, number, zuul_event_id=None):
- path = "/projects/%s/merge_requests/%s" % (
- quote_plus(project_name), number)
- resp = self.get(self.baseurl + path, zuul_event_id=zuul_event_id)
- self._manage_error(*resp, zuul_event_id=zuul_event_id)
- return resp[0]
+ log = get_annotated_logger(self.log, zuul_event_id)
+ attempts = 0
+
+ def _get_mr():
+ path = "/projects/%s/merge_requests/%s" % (
+ quote_plus(project_name), number)
+ resp = self.get(self.baseurl + path, zuul_event_id=zuul_event_id)
+ self._manage_error(*resp, zuul_event_id=zuul_event_id)
+ return resp[0]
+
+ # The Gitlab API might not return a complete MR description as
+ # some attributes are updated asynchronously. This loop ensures
+ # we query the API until all async attributes are available or until
+ # a defined delay is reached.
+ while True:
+ attempts += 1
+ mr = _get_mr()
+ # The diff_refs attribute is updated asynchronously
+ if all(map(lambda k: mr.get(k, None), ['diff_refs'])):
+ return mr
+ if attempts > 4:
+ log.warning(
+ "Fetched MR %s#%s with imcomplete data" % (
+ project_name, number))
+ return mr
+ wait_delay = attempts * self.get_mr_wait_factor
+ log.info(
+ "Will retry to fetch %s#%s due to imcomplete data "
+ "(in %s seconds) ..." % (project_name, number, wait_delay))
+ time.sleep(wait_delay)
# https://docs.gitlab.com/ee/api/branches.html#list-repository-branches
def get_project_branches(self, project_name, exclude_unprotected,
@@ -667,8 +693,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
change.ref = "refs/merge-requests/%s/head" % change.number
change.branch = change.mr['target_branch']
change.is_current_patchset = (change.mr['sha'] == change.patchset)
- change.base_sha = change.mr['diff_refs'].get('base_sha')
- change.commit_id = change.mr['diff_refs'].get('head_sha')
+ change.commit_id = event.patch_number
+ diff_refs = change.mr.get("diff_refs", {})
+ if diff_refs:
+ change.base_sha = diff_refs.get('base_sha')
+ else:
+ change.base_sha = None
change.owner = change.mr['author'].get('username')
# Files changes are not part of the Merge Request data
# See api/merge_requests.html#get-single-mr-changes
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index d7a95e9b5..c3d082a47 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -243,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
- self.updateCommitDependencies(change, None, event=None)
+ self.updateCommitDependencies(change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -285,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
- def isAnyVersionOfChangeInPipeline(self, change):
- # Checks any items in the pipeline
+ def isChangeRelevantToPipeline(self, change):
+ # Checks if any version of the change or its deps matches any
+ # item in the pipeline.
for change_key in self.pipeline.change_list.getChangeKeys():
if change.cache_stat.key.isSameChange(change_key):
return True
+ if isinstance(change, model.Change):
+ for dep_change_ref in change.getNeedsChanges(
+ self.useDependenciesByTopic(change.project)):
+ dep_change_key = ChangeKey.fromReference(dep_change_ref)
+ if change.cache_stat.key.isSameChange(dep_change_key):
+ return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
@@ -315,7 +322,7 @@ class PipelineManager(metaclass=ABCMeta):
to_refresh.add(item.change)
for existing_change in to_refresh:
- self.updateCommitDependencies(existing_change, None, event)
+ self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@@ -516,7 +523,8 @@ class PipelineManager(metaclass=ABCMeta):
def addChange(self, change, event, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True,
- change_queue=None, history=None, dependency_graph=None):
+ change_queue=None, history=None, dependency_graph=None,
+ skip_presence_check=False):
log = get_annotated_logger(self.log, event)
log.debug("Considering adding change %s" % change)
@@ -531,7 +539,9 @@ class PipelineManager(metaclass=ABCMeta):
# If we are adding a live change, check if it's a live item
# anywhere in the pipeline. Otherwise, we will perform the
# duplicate check below on the specific change_queue.
- if live and self.isChangeAlreadyInPipeline(change):
+ if (live and
+ self.isChangeAlreadyInPipeline(change) and
+ not skip_presence_check):
log.debug("Change %s is already in pipeline, ignoring" % change)
return True
@@ -564,7 +574,7 @@ class PipelineManager(metaclass=ABCMeta):
# to date and this is a noop; otherwise, we need to refresh
# them anyway.
if isinstance(change, model.Change):
- self.updateCommitDependencies(change, None, event)
+ self.updateCommitDependencies(change, event)
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
@@ -590,8 +600,10 @@ class PipelineManager(metaclass=ABCMeta):
log.debug("History after enqueuing changes ahead: %s", history)
if self.isChangeAlreadyInQueue(change, change_queue):
- log.debug("Change %s is already in queue, ignoring" % change)
- return True
+ if not skip_presence_check:
+ log.debug("Change %s is already in queue, ignoring",
+ change)
+ return True
cycle = []
if isinstance(change, model.Change):
@@ -625,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta):
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
- self.reportStats(item, added=True)
+ self.reportStats(item, trigger_event=event)
item.quiet = quiet
if item.live:
@@ -857,7 +869,7 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name].put(
event, needs_result=False)
- def updateCommitDependencies(self, change, change_queue, event):
+ def updateCommitDependencies(self, change, event):
log = get_annotated_logger(self.log, event)
must_update_commit_deps = (
@@ -1555,6 +1567,7 @@ class PipelineManager(metaclass=ABCMeta):
log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
self.cancelJobs(item)
+ quiet_dequeue = False
if item.isBundleFailing():
item.setDequeuedBundleFailing('Bundle is failing')
elif not meets_reqs:
@@ -1566,7 +1579,28 @@ class PipelineManager(metaclass=ABCMeta):
else:
msg = f'Change {clist} is needed.'
item.setDequeuedNeedingChange(msg)
- if item.live:
+ # If all the dependencies are already in the pipeline
+ # (but not ahead of this change), then we probably
+ # just added updated versions of them, possibly
+ # updating a cycle. In that case, attempt to
+ # re-enqueue this change with the updated deps.
+ if (item.live and
+ all([self.isChangeAlreadyInPipeline(c)
+ for c in needs_changes])):
+ # Try enqueue, if that succeeds, keep this dequeue quiet
+ try:
+ log.info("Attempting re-enqueue of change %s",
+ item.change)
+ quiet_dequeue = self.addChange(
+ item.change, item.event,
+ enqueue_time=item.enqueue_time,
+ quiet=True,
+ skip_presence_check=True)
+ except Exception:
+ log.exception("Unable to re-enqueue change %s "
+ "which is missing dependencies",
+ item.change)
+ if item.live and not quiet_dequeue:
try:
self.reportItem(item)
except exceptions.MergeFailure:
@@ -2198,7 +2232,7 @@ class PipelineManager(metaclass=ABCMeta):
log.error("Reporting item %s received: %s", item, ret)
return action, (not ret)
- def reportStats(self, item, added=False):
+ def reportStats(self, item, trigger_event=None):
if not self.sched.statsd:
return
try:
@@ -2237,18 +2271,21 @@ class PipelineManager(metaclass=ABCMeta):
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
+ if (
+ trigger_event
+ and hasattr(trigger_event, 'arrived_at_scheduler_timestamp')
+ ):
now = time.time()
- arrived = item.event.arrived_at_scheduler_timestamp
+ arrived = trigger_event.arrived_at_scheduler_timestamp
processing = (now - arrived) * 1000
- elapsed = (now - item.event.timestamp) * 1000
+ elapsed = (now - trigger_event.timestamp) * 1000
self.sched.statsd.timing(
basekey + '.event_enqueue_processing_time',
processing)
self.sched.statsd.timing(
basekey + '.event_enqueue_time', elapsed)
self.reportPipelineTiming('event_enqueue_time',
- item.event.timestamp)
+ trigger_event.timestamp)
except Exception:
self.log.exception("Exception reporting pipeline stats")
diff --git a/zuul/model.py b/zuul/model.py
index 1d82b5f2c..e526b749c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -4666,6 +4666,37 @@ class BuildSet(zkobject.ZKObject):
return Attributes(uuid=self.uuid)
+class EventInfo:
+
+ def __init__(self):
+ self.zuul_event_id = None
+ self.timestamp = time.time()
+ self.span_context = None
+
+ @classmethod
+ def fromEvent(cls, event):
+ tinfo = cls()
+ tinfo.zuul_event_id = event.zuul_event_id
+ tinfo.timestamp = event.timestamp
+ tinfo.span_context = event.span_context
+ return tinfo
+
+ @classmethod
+ def fromDict(cls, d):
+ tinfo = cls()
+ tinfo.zuul_event_id = d["zuul_event_id"]
+ tinfo.timestamp = d["timestamp"]
+ tinfo.span_context = d["span_context"]
+ return tinfo
+
+ def toDict(self):
+ return {
+ "zuul_event_id": self.zuul_event_id,
+ "timestamp": self.timestamp,
+ "span_context": self.span_context,
+ }
+
+
class QueueItem(zkobject.ZKObject):
"""Represents the position of a Change in a ChangeQueue.
@@ -4700,7 +4731,7 @@ class QueueItem(zkobject.ZKObject):
live=True, # Whether an item is intended to be processed at all
layout_uuid=None,
_cached_sql_results={},
- event=None, # The trigger event that lead to this queue item
+ event=None, # Info about the event that lead to this queue item
# Additional container for connection specifig information to be
# used by reporters throughout the lifecycle
@@ -4722,6 +4753,9 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
+ if COMPONENT_REGISTRY.model_api >= 13:
+ obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
+
data = obj._trySerialize(context)
obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
@@ -4750,10 +4784,18 @@ class QueueItem(zkobject.ZKObject):
return (tenant, pipeline, uuid)
def serialize(self, context):
- if isinstance(self.event, TriggerEvent):
- event_type = "TriggerEvent"
+ if COMPONENT_REGISTRY.model_api < 13:
+ if isinstance(self.event, TriggerEvent):
+ event_type = "TriggerEvent"
+ else:
+ event_type = self.event.__class__.__name__
else:
- event_type = self.event.__class__.__name__
+ event_type = "EventInfo"
+ if not isinstance(self.event, EventInfo):
+ # Convert our local trigger event to a trigger info
+ # object. This will only happen on the transition to
+ # model API version 13.
+ self._set(event=EventInfo.fromEvent(self.event))
data = {
"uuid": self.uuid,
@@ -4795,14 +4837,18 @@ class QueueItem(zkobject.ZKObject):
# child objects.
self._set(uuid=data["uuid"])
- event_type = data["event"]["type"]
- if event_type == "TriggerEvent":
- event_class = (
- self.pipeline.manager.sched.connections.getTriggerEventClass(
- data["event"]["data"]["driver_name"])
- )
+ if COMPONENT_REGISTRY.model_api < 13:
+ event_type = data["event"]["type"]
+ if event_type == "TriggerEvent":
+ event_class = (
+ self.pipeline.manager.sched.connections
+ .getTriggerEventClass(
+ data["event"]["data"]["driver_name"])
+ )
+ else:
+ event_class = EventTypeIndex.event_type_mapping.get(event_type)
else:
- event_class = EventTypeIndex.event_type_mapping.get(event_type)
+ event_class = EventInfo
if event_class is None:
raise NotImplementedError(
diff --git a/zuul/model_api.py b/zuul/model_api.py
index ccb12077d..0244296dd 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
-MODEL_API = 12
+MODEL_API = 13
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e646c09b8..cd15a878c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -2519,9 +2519,26 @@ class Scheduler(threading.Thread):
event.span_context = tracing.getSpanContext(span)
for pipeline in tenant.layout.pipelines.values():
+ # For most kinds of dependencies, it's sufficient to check
+ # if this change is already in the pipeline, because the
+ # only way to update a dependency cycle is to update one
+ # of the changes in it. However, dependencies-by-topic
+ # can have changes added to the cycle without updating any
+ # of the existing changes in the cycle. That means in
+ # order to detect whether a new change is added to an
+ # existing cycle in the pipeline, we need to know all of
+ # the dependencies of the new change, and check if *they*
+ # are in the pipeline. Therefore, go ahead and update our
+ # dependencies here so they are available for comparison
+ # against the pipeline contents. This front-loads some
+ # work that otherwise would happen in the pipeline
+ # manager, but the result of the work goes into the change
+ # cache, so it's not wasted; it's just less parallelized.
+ if isinstance(change, Change):
+ pipeline.manager.updateCommitDependencies(change, event)
if (
pipeline.manager.eventMatches(event, change)
- or pipeline.manager.isAnyVersionOfChangeInPipeline(change)
+ or pipeline.manager.isChangeRelevantToPipeline(change)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name