summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Dockerfile1
-rw-r--r--doc/source/developer/model-changelog.rst7
-rw-r--r--doc/source/monitoring.rst11
-rw-r--r--noxfile.py2
-rw-r--r--requirements.txt2
-rwxr-xr-xtests/make_playbooks.py3
-rw-r--r--tests/unit/test_connection.py192
-rw-r--r--tests/unit/test_gerrit.py45
-rw-r--r--tests/unit/test_git_driver.py3
-rw-r--r--tests/unit/test_github_driver.py43
-rw-r--r--tests/unit/test_merger_repo.py58
-rw-r--r--tests/unit/test_model_upgrade.py27
-rw-r--r--tests/unit/test_reporting.py4
-rw-r--r--tests/unit/test_scheduler.py48
-rw-r--r--tests/unit/test_v3.py37
-rw-r--r--tests/unit/test_zk.py84
-rw-r--r--tools/docker-compose.yaml2
-rwxr-xr-xtools/test-setup-docker.sh3
-rw-r--r--web/public/openapi.yaml40
-rw-r--r--zuul/ansible/logconfig.py3
-rw-r--r--zuul/configloader.py48
-rw-r--r--zuul/driver/gerrit/gerritconnection.py9
-rw-r--r--zuul/driver/github/githubconnection.py1
-rw-r--r--zuul/driver/gitlab/gitlabconnection.py7
-rw-r--r--zuul/driver/sql/alembic/env.py3
-rw-r--r--zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py17
-rw-r--r--zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py17
-rw-r--r--zuul/driver/sql/sqlconnection.py40
-rw-r--r--zuul/lib/repl.py6
-rw-r--r--zuul/manager/__init__.py72
-rw-r--r--zuul/merger/merger.py28
-rw-r--r--zuul/model.py220
-rw-r--r--zuul/model_api.py2
-rw-r--r--zuul/scheduler.py139
-rw-r--r--zuul/source/__init__.py3
-rw-r--r--zuul/zk/zkobject.py11
36 files changed, 925 insertions, 313 deletions
diff --git a/Dockerfile b/Dockerfile
index 5a17739f0..df326bd8a 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -29,6 +29,7 @@ ARG REACT_APP_ENABLE_SERVICE_WORKER
# Kubectl/Openshift version/sha
ARG OPENSHIFT_URL=https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/4.11.20/openshift-client-linux-4.11.20.tar.gz
ARG OPENSHIFT_SHA=74f252c812932425ca19636b2be168df8fe57b114af6b114283975e67d987d11
+ARG PBR_VERSION=
COPY . /tmp/src
COPY --from=js-builder /tmp/src/build /tmp/src/zuul/web/static
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst
index b80979362..f78b3f0a0 100644
--- a/doc/source/developer/model-changelog.rst
+++ b/doc/source/developer/model-changelog.rst
@@ -112,3 +112,10 @@ Version 12
:Prior Zuul version: 8.0.1
:Description: Adds job_versions and build_versions to BuildSet.
Affects schedulers.
+
+Version 13
+----------
+:Prior Zuul version: 8.2.0
+:Description: Stores only the necessary event info as part of a queue item
+ instead of the full trigger event.
+ Affects schedulers.
diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst
index 7f99c7f7f..f40bee445 100644
--- a/doc/source/monitoring.rst
+++ b/doc/source/monitoring.rst
@@ -444,6 +444,11 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Indicates if the executor is paused. 1 means paused else 0.
+ .. stat:: pct_used_hdd
+ :type: gauge
+
+ The used disk on this executor, as a percentage multiplied by 100.
+
.. stat:: pct_used_ram
:type: gauge
@@ -711,6 +716,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The size of the current connection event queue.
+ .. stat:: run_handler
+ :type: timer
+
+ A timer metric reporting the time taken for one scheduler run
+ handler iteration.
+
.. stat:: time_query
:type: timer
diff --git a/noxfile.py b/noxfile.py
index e920f053e..30058cef7 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -32,7 +32,6 @@ def set_standard_env_vars(session):
set_env(session, 'OS_STDERR_CAPTURE', '1')
set_env(session, 'OS_STDOUT_CAPTURE', '1')
set_env(session, 'OS_TEST_TIMEOUT', '360')
- set_env(session, 'SQLALCHEMY_WARN_20', '1')
session.env['PYTHONWARNINGS'] = ','.join([
'always::DeprecationWarning:zuul.driver.sql.sqlconnection',
'always::DeprecationWarning:tests.base',
@@ -49,7 +48,6 @@ def set_standard_env_vars(session):
@nox.session(python='3')
def bindep(session):
set_standard_env_vars(session)
- set_env(session, 'SQLALCHEMY_WARN_20', '1')
session.install('bindep')
session.run('bindep', 'test')
diff --git a/requirements.txt b/requirements.txt
index 082a18043..e12e9cf72 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -18,7 +18,7 @@ PrettyTable>=0.6,<0.8
babel>=1.0
netaddr
kazoo>=2.9.0
-sqlalchemy<2.0.0
+sqlalchemy>=2.0.0
alembic
cryptography>=39.0.0
cachecontrol<0.12.7
diff --git a/tests/make_playbooks.py b/tests/make_playbooks.py
index 93c37bc81..cb7a98096 100755
--- a/tests/make_playbooks.py
+++ b/tests/make_playbooks.py
@@ -40,7 +40,8 @@ def handle_repo(path):
config_path = os.path.join(path, fn)
break
try:
- config = yaml.safe_load(open(config_path))
+ with open(config_path) as f:
+ config = yaml.safe_load(f)
except Exception:
print(" Has yaml errors")
return
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index c30e1743d..26a99215e 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -65,7 +65,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
def _sql_tables_created(self, connection_name):
connection = self.scheds.first.connections.connections[connection_name]
- insp = sa.engine.reflection.Inspector(connection.engine)
+ insp = sa.inspect(connection.engine)
table_prefix = connection.table_prefix
self.assertEqual(self.expected_table_prefix, table_prefix)
@@ -82,7 +82,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
def _sql_indexes_created(self, connection_name):
connection = self.scheds.first.connections.connections[connection_name]
- insp = sa.engine.reflection.Inspector(connection.engine)
+ insp = sa.inspect(connection.engine)
table_prefix = connection.table_prefix
self.assertEqual(self.expected_table_prefix, table_prefix)
@@ -127,7 +127,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
self.assertEqual(5, len(buildsets))
@@ -137,107 +137,107 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset3 = buildsets[3]
buildset4 = buildsets[4]
- self.assertEqual('check', buildset0['pipeline'])
- self.assertEqual('org/project', buildset0['project'])
- self.assertEqual(1, buildset0['change'])
- self.assertEqual('1', buildset0['patchset'])
- self.assertEqual('SUCCESS', buildset0['result'])
- self.assertEqual('Build succeeded.', buildset0['message'])
- self.assertEqual('tenant-one', buildset0['tenant'])
+ self.assertEqual('check', buildset0.pipeline)
+ self.assertEqual('org/project', buildset0.project)
+ self.assertEqual(1, buildset0.change)
+ self.assertEqual('1', buildset0.patchset)
+ self.assertEqual('SUCCESS', buildset0.result)
+ self.assertEqual('Build succeeded.', buildset0.message)
+ self.assertEqual('tenant-one', buildset0.tenant)
self.assertEqual(
- 'https://review.example.com/%d' % buildset0['change'],
- buildset0['ref_url'])
- self.assertNotEqual(None, buildset0['event_id'])
- self.assertNotEqual(None, buildset0['event_timestamp'])
+ 'https://review.example.com/%d' % buildset0.change,
+ buildset0.ref_url)
+ self.assertNotEqual(None, buildset0.event_id)
+ self.assertNotEqual(None, buildset0.event_timestamp)
buildset0_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
# Check the first result, which should be the project-merge job
self.assertEqual(
- 'project-merge', buildset0_builds[0]['job_name'])
- self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
- self.assertEqual(None, buildset0_builds[0]['log_url'])
- self.assertEqual('check', buildset1['pipeline'])
- self.assertEqual('master', buildset1['branch'])
- self.assertEqual('org/project', buildset1['project'])
- self.assertEqual(2, buildset1['change'])
- self.assertEqual('1', buildset1['patchset'])
- self.assertEqual('FAILURE', buildset1['result'])
- self.assertEqual('Build failed.', buildset1['message'])
+ 'project-merge', buildset0_builds[0].job_name)
+ self.assertEqual("SUCCESS", buildset0_builds[0].result)
+ self.assertEqual(None, buildset0_builds[0].log_url)
+ self.assertEqual('check', buildset1.pipeline)
+ self.assertEqual('master', buildset1.branch)
+ self.assertEqual('org/project', buildset1.project)
+ self.assertEqual(2, buildset1.change)
+ self.assertEqual('1', buildset1.patchset)
+ self.assertEqual('FAILURE', buildset1.result)
+ self.assertEqual('Build failed.', buildset1.message)
buildset1_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset1['id']
+ buildset1.id
)
).fetchall()
# Check the second result, which should be the project-test1
# job which failed
self.assertEqual(
- 'project-test1', buildset1_builds[1]['job_name'])
- self.assertEqual("FAILURE", buildset1_builds[1]['result'])
- self.assertEqual(None, buildset1_builds[1]['log_url'])
+ 'project-test1', buildset1_builds[1].job_name)
+ self.assertEqual("FAILURE", buildset1_builds[1].result)
+ self.assertEqual(None, buildset1_builds[1].log_url)
buildset2_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset2['id']
+ buildset2.id
)
).fetchall()
# Check the first result, which should be the project-publish
# job
self.assertEqual('project-publish',
- buildset2_builds[0]['job_name'])
- self.assertEqual("SUCCESS", buildset2_builds[0]['result'])
+ buildset2_builds[0].job_name)
+ self.assertEqual("SUCCESS", buildset2_builds[0].result)
buildset3_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset3['id']
+ buildset3.id
)
).fetchall()
self.assertEqual(
- 'project-test1', buildset3_builds[1]['job_name'])
- self.assertEqual('NODE_FAILURE', buildset3_builds[1]['result'])
- self.assertEqual(None, buildset3_builds[1]['log_url'])
- self.assertIsNotNone(buildset3_builds[1]['start_time'])
- self.assertIsNotNone(buildset3_builds[1]['end_time'])
+ 'project-test1', buildset3_builds[1].job_name)
+ self.assertEqual('NODE_FAILURE', buildset3_builds[1].result)
+ self.assertEqual(None, buildset3_builds[1].log_url)
+ self.assertIsNotNone(buildset3_builds[1].start_time)
+ self.assertIsNotNone(buildset3_builds[1].end_time)
self.assertGreaterEqual(
- buildset3_builds[1]['end_time'],
- buildset3_builds[1]['start_time'])
+ buildset3_builds[1].end_time,
+ buildset3_builds[1].start_time)
# Check the paused build result
buildset4_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset4['id']
+ buildset4.id
).order_by(reporter.connection.zuul_build_table.c.id)
).fetchall()
paused_build_events = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_event_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_event_table.c.build_id
- == buildset4_builds[0]["id"]
+ == buildset4_builds[0].id
)
).fetchall()
@@ -245,16 +245,16 @@ class TestSQLConnectionMysql(ZuulTestCase):
pause_event = paused_build_events[0]
resume_event = paused_build_events[1]
self.assertEqual(
- pause_event["event_type"], "paused")
- self.assertIsNotNone(pause_event["event_time"])
- self.assertIsNone(pause_event["description"])
+ pause_event.event_type, "paused")
+ self.assertIsNotNone(pause_event.event_time)
+ self.assertIsNone(pause_event.description)
self.assertEqual(
- resume_event["event_type"], "resumed")
- self.assertIsNotNone(resume_event["event_time"])
- self.assertIsNone(resume_event["description"])
+ resume_event.event_type, "resumed")
+ self.assertIsNotNone(resume_event.event_time)
+ self.assertIsNone(resume_event.description)
self.assertGreater(
- resume_event["event_time"], pause_event["event_time"])
+ resume_event.event_time, pause_event.event_time)
self.executor_server.hold_jobs_in_build = True
@@ -333,51 +333,51 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
self.assertEqual(1, len(buildsets))
buildset0 = buildsets[0]
- self.assertEqual('check', buildset0['pipeline'])
- self.assertEqual('org/project', buildset0['project'])
- self.assertEqual(1, buildset0['change'])
- self.assertEqual('1', buildset0['patchset'])
- self.assertEqual('SUCCESS', buildset0['result'])
- self.assertEqual('Build succeeded.', buildset0['message'])
- self.assertEqual('tenant-one', buildset0['tenant'])
+ self.assertEqual('check', buildset0.pipeline)
+ self.assertEqual('org/project', buildset0.project)
+ self.assertEqual(1, buildset0.change)
+ self.assertEqual('1', buildset0.patchset)
+ self.assertEqual('SUCCESS', buildset0.result)
+ self.assertEqual('Build succeeded.', buildset0.message)
+ self.assertEqual('tenant-one', buildset0.tenant)
self.assertEqual(
- 'https://review.example.com/%d' % buildset0['change'],
- buildset0['ref_url'])
+ 'https://review.example.com/%d' % buildset0.change,
+ buildset0.ref_url)
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
# Check the retry results
- self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[0]['result'])
- self.assertTrue(buildset0_builds[0]['final'])
-
- self.assertEqual('project-test1', buildset0_builds[1]['job_name'])
- self.assertEqual('RETRY', buildset0_builds[1]['result'])
- self.assertFalse(buildset0_builds[1]['final'])
- self.assertEqual('project-test2', buildset0_builds[2]['job_name'])
- self.assertEqual('RETRY', buildset0_builds[2]['result'])
- self.assertFalse(buildset0_builds[2]['final'])
-
- self.assertEqual('project-test1', buildset0_builds[3]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[3]['result'])
- self.assertTrue(buildset0_builds[3]['final'])
- self.assertEqual('project-test2', buildset0_builds[4]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[4]['result'])
- self.assertTrue(buildset0_builds[4]['final'])
+ self.assertEqual('project-merge', buildset0_builds[0].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[0].result)
+ self.assertTrue(buildset0_builds[0].final)
+
+ self.assertEqual('project-test1', buildset0_builds[1].job_name)
+ self.assertEqual('RETRY', buildset0_builds[1].result)
+ self.assertFalse(buildset0_builds[1].final)
+ self.assertEqual('project-test2', buildset0_builds[2].job_name)
+ self.assertEqual('RETRY', buildset0_builds[2].result)
+ self.assertFalse(buildset0_builds[2].final)
+
+ self.assertEqual('project-test1', buildset0_builds[3].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[3].result)
+ self.assertTrue(buildset0_builds[3].final)
+ self.assertEqual('project-test2', buildset0_builds[4].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[4].result)
+ self.assertTrue(buildset0_builds[4].final)
self.executor_server.hold_jobs_in_build = True
@@ -430,7 +430,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
@@ -439,10 +439,10 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
@@ -488,7 +488,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
@@ -497,10 +497,10 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py
index 2e3057af6..2a63d5ef8 100644
--- a/tests/unit/test_gerrit.py
+++ b/tests/unit/test_gerrit.py
@@ -957,3 +957,48 @@ class TestGerritConnection(ZuulTestCase):
self.assertEqual(B.queried, 2)
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
+
+
+class TestGerritUnicodeRefs(ZuulTestCase):
+ config_file = 'zuul-gerrit-web.conf'
+ tenant_config_file = 'config/single-tenant/main.yaml'
+
+ upload_pack_data = (b'014452944ee370db5c87691e62e0f9079b6281319b4e HEAD'
+ b'\x00multi_ack thin-pack side-band side-band-64k '
+ b'ofs-delta shallow deepen-since deepen-not '
+ b'deepen-relative no-progress include-tag '
+ b'multi_ack_detailed allow-tip-sha1-in-want '
+ b'allow-reachable-sha1-in-want '
+ b'symref=HEAD:refs/heads/faster filter '
+ b'object-format=sha1 agent=git/2.37.1.gl1\n'
+ b'003d5f42665d737b3fd4ec22ca0209e6191859f09fd6 '
+ b'refs/for/faster\n'
+ b'004952944ee370db5c87691e62e0f9079b6281319b4e '
+ b'refs/heads/foo/\xf0\x9f\x94\xa5\xf0\x9f\x94\xa5'
+ b'\xf0\x9f\x94\xa5\n'
+ b'003f52944ee370db5c87691e62e0f9079b6281319b4e '
+ b'refs/heads/faster\n0000').decode("utf-8")
+
+ def test_mb_unicode_refs(self):
+ gerrit_config = {
+ 'user': 'gerrit',
+ 'server': 'localhost',
+ }
+ driver = GerritDriver()
+ gerrit = GerritConnection(driver, 'review_gerrit', gerrit_config)
+
+ def _uploadPack(project):
+ return self.upload_pack_data
+
+ self.patch(gerrit, '_uploadPack', _uploadPack)
+
+ project = gerrit.source.getProject('org/project')
+ refs = gerrit.getInfoRefs(project)
+
+ self.assertEqual(refs,
+ {'refs/for/faster':
+ '5f42665d737b3fd4ec22ca0209e6191859f09fd6',
+ 'refs/heads/foo/🔥🔥🔥':
+ '52944ee370db5c87691e62e0f9079b6281319b4e',
+ 'refs/heads/faster':
+ '52944ee370db5c87691e62e0f9079b6281319b4e'})
diff --git a/tests/unit/test_git_driver.py b/tests/unit/test_git_driver.py
index 06e2ac7c8..95fca30d3 100644
--- a/tests/unit/test_git_driver.py
+++ b/tests/unit/test_git_driver.py
@@ -62,7 +62,8 @@ class TestGitDriver(ZuulTestCase):
# Update zuul.yaml to force a tenant reconfiguration
path = os.path.join(self.upstream_root, 'common-config', 'zuul.yaml')
- config = yaml.safe_load(open(path, 'r').read())
+ with open(path, 'r') as f:
+ config = yaml.safe_load(f)
change = {
'name': 'org/project',
'check': {
diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py
index e3706440b..fb46aa7d1 100644
--- a/tests/unit/test_github_driver.py
+++ b/tests/unit/test_github_driver.py
@@ -18,8 +18,10 @@ import re
from testtools.matchers import MatchesRegex, Not, StartsWith
import urllib
import socket
+import threading
import time
import textwrap
+from concurrent.futures import ThreadPoolExecutor
from unittest import mock, skip
import git
@@ -32,10 +34,11 @@ from zuul.zk.layout import LayoutState
from zuul.lib import strings
from zuul.merger.merger import Repo
from zuul.model import MergeRequest, EnqueueEvent, DequeueEvent
+from zuul.zk.change_cache import ChangeKey
from tests.base import (AnsibleZuulTestCase, BaseTestCase,
ZuulGithubAppTestCase, ZuulTestCase,
- simple_layout, random_sha1)
+ simple_layout, random_sha1, iterate_timeout)
from tests.base import ZuulWebFixture
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
@@ -1484,6 +1487,44 @@ class TestGithubDriver(ZuulTestCase):
"rebase not supported",
str(loading_errors[0].error))
+ @simple_layout("layouts/basic-github.yaml", driver="github")
+ def test_concurrent_get_change(self):
+ """
+ Test that getting a change concurrently returns the same
+ object from the cache.
+ """
+ conn = self.scheds.first.sched.connections.connections["github"]
+
+ # Create a new change object and remove it from the cache so
+ # the concurrent call will try to create a new change object.
+ A = self.fake_github.openFakePullRequest("org/project", "master", "A")
+ change_key = ChangeKey(conn.connection_name, "org/project",
+ "PullRequest", str(A.number), str(A.head_sha))
+ change = conn.getChange(change_key, refresh=True)
+ conn._change_cache.delete(change_key)
+
+ # Acquire the update lock so the concurrent get task needs to
+ # wait for the lock to be release.
+ lock = conn._change_update_lock.setdefault(change_key,
+ threading.Lock())
+ lock.acquire()
+ try:
+ executor = ThreadPoolExecutor(max_workers=1)
+ task = executor.submit(conn.getChange, change_key, refresh=True)
+ for _ in iterate_timeout(5, "task to be running"):
+ if task.running():
+ break
+ # Add the change back so the waiting task can get the
+ # change from the cache.
+ conn._change_cache.set(change_key, change)
+ finally:
+ lock.release()
+ executor.shutdown()
+
+ other_change = task.result()
+ self.assertIsNotNone(other_change.cache_stat)
+ self.assertIs(change, other_change)
+
class TestMultiGithubDriver(ZuulTestCase):
config_file = 'zuul-multi-github.conf'
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index 7806db347..f907cb8b4 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -163,6 +163,64 @@ class TestMergerRepo(ZuulTestCase):
work_repo.reset()
work_repo.checkout("foobar")
+ def test_rebase_merge_conflict_abort(self):
+ """Test that a failed rebase is properly aborted and related
+ directories are cleaned up."""
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ parent_repo = git.Repo(parent_path)
+ parent_repo.create_head("feature")
+
+ files = {"test.txt": "master"}
+ self.create_commit("org/project1", files=files, head="master",
+ message="Add master file")
+
+ files = {"test.txt": "feature"}
+ self.create_commit("org/project1", files=files, head="feature",
+ message="Add feature file")
+
+ work_repo = Repo(parent_path, self.workspace_root,
+ "none@example.org", "User Name", "0", "0")
+
+ item = {"ref": "refs/heads/feature"}
+ # We expect the rebase to fail because of a conflict, but the
+ # rebase will be aborted.
+ with testtools.ExpectedException(git.exc.GitCommandError):
+ work_repo.rebaseMerge(item, "master")
+
+ # Assert that the failed rebase doesn't leave any temporary
+ # directories behind.
+ self.assertFalse(
+ os.path.exists(f"{work_repo.local_path}/.git/rebase-merge"))
+ self.assertFalse(
+ os.path.exists(f"{work_repo.local_path}/.git/rebase-apply"))
+
+ def test_rebase_merge_conflict_reset_cleanup(self):
+ """Test temporary directories of a failed rebase merge are
+ removed on repo reset."""
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ parent_repo = git.Repo(parent_path)
+ parent_repo.create_head("feature")
+
+ files = {"master.txt": "master"}
+ self.create_commit("org/project1", files=files, head="master",
+ message="Add master file")
+
+ files = {"feature.txt": "feature"}
+ self.create_commit("org/project1", files=files, head="feature",
+ message="Add feature file")
+
+ work_repo = Repo(parent_path, self.workspace_root,
+ "none@example.org", "User Name", "0", "0")
+
+ # Simulate leftovers from a failed rebase
+ os.mkdir(f"{work_repo.local_path}/.git/rebase-merge")
+ os.mkdir(f"{work_repo.local_path}/.git/rebase-apply")
+
+ # Resetting the repo should clean up any leaked directories
+ work_repo.reset()
+ item = {"ref": "refs/heads/feature"}
+ work_repo.rebaseMerge(item, "master")
+
def test_set_refs(self):
parent_path = os.path.join(self.upstream_root, 'org/project1')
remote_sha = self.create_commit('org/project1')
diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py
index a5a49bed4..c6cdee7ea 100644
--- a/tests/unit/test_model_upgrade.py
+++ b/tests/unit/test_model_upgrade.py
@@ -293,6 +293,33 @@ class TestModelUpgrade(ZuulTestCase):
result='SUCCESS', changes='1,1'),
], ordered=False)
+ @model_version(12)
+ def test_model_12_13(self):
+ # Initially queue items will still have the full trigger event
+ # stored in Zookeeper. The trigger event will be converted to
+ # an event info object after the model API update.
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 1)
+
+ # Upgrade our component
+ self.model_test_component_info.model_api = 13
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='SUCCESS', changes='1,1'),
+ dict(name='project-test2', result='SUCCESS', changes='1,1'),
+ dict(name='project1-project2-integration',
+ result='SUCCESS', changes='1,1'),
+ ], ordered=False)
+
class TestGithubModelUpgrade(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
diff --git a/tests/unit/test_reporting.py b/tests/unit/test_reporting.py
index 2cf93cdcb..0c5c5fbc9 100644
--- a/tests/unit/test_reporting.py
+++ b/tests/unit/test_reporting.py
@@ -151,7 +151,7 @@ class TestReporting(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
for x in buildsets:
@@ -180,7 +180,7 @@ class TestReporting(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
for x in buildsets:
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 35e632d46..172ed34dc 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -54,7 +54,7 @@ from tests.base import (
from zuul.zk.change_cache import ChangeKey
from zuul.zk.event_queues import PIPELINE_NAME_ROOT
from zuul.zk.layout import LayoutState
-from zuul.zk.locks import management_queue_lock
+from zuul.zk.locks import management_queue_lock, pipeline_lock
from zuul.zk import zkobject
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
@@ -461,6 +461,7 @@ class TestScheduler(ZuulTestCase):
'zuul.mergers.online', value='1', kind='g')
self.assertReportedStat('zuul.scheduler.eventqueues.connection.gerrit',
value='0', kind='g')
+ self.assertReportedStat('zuul.scheduler.run_handler', kind='ms')
# Catch time / monotonic errors
for key in [
@@ -490,9 +491,10 @@ class TestScheduler(ZuulTestCase):
'zuul.tenant.tenant-one.pipeline.gate.write_objects',
'zuul.tenant.tenant-one.pipeline.gate.read_znodes',
'zuul.tenant.tenant-one.pipeline.gate.write_znodes',
- 'zuul.tenant.tenant-one.pipeline.gate.read_bytes',
'zuul.tenant.tenant-one.pipeline.gate.write_bytes',
]:
+ # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is
+ # expected to be zero since it's initialized after reading
val = self.assertReportedStat(key, kind='g')
self.assertTrue(0.0 < float(val) < 60000.0)
@@ -3587,8 +3589,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3610,8 +3615,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3633,8 +3641,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3655,8 +3666,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3678,8 +3692,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3943,6 +3960,10 @@ class TestScheduler(ZuulTestCase):
else:
time.sleep(0)
+ self.assertGreater(new.last_reconfigured, old.last_reconfigured)
+ self.assertGreater(new.last_reconfigure_event_ltime,
+ old.last_reconfigure_event_ltime)
+
def test_tenant_reconfiguration_command_socket(self):
"Test that single-tenant reconfiguration via command socket works"
@@ -6200,7 +6221,8 @@ For CI problems and help debugging, contact ci@example.org"""
build = self.getBuildByName('check-job')
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
label = inventory['all']['hosts']['controller']['nodepool']['label']
self.assertEqual('slow-label', label)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index de8b8f3ad..004ede862 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -1557,6 +1557,43 @@ class TestInRepoConfig(ZuulTestCase):
'start_line': 5},
})
+ def test_dynamic_config_job_anchors(self):
+ # Test the use of anchors in job configuration. This is a
+ # regression test designed to catch a failure where we freeze
+ # the first job and in doing so, mutate the vars dict. The
+ # intended behavior is that the two jobs end up with two
+ # separate python objects for their vars dicts.
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: myvars
+ vars: &anchor
+ plugins:
+ foo: bar
+
+ - job:
+ name: project-test1
+ timeout: 999999999999
+ vars: *anchor
+
+ - project:
+ name: org/project
+ check:
+ jobs:
+ - project-test1
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(A.reported, 1,
+ "A should report failure")
+ self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1")
+ self.assertIn('max-job-timeout', A.messages[0])
+ self.assertHistory([])
+
def test_dynamic_config_non_existing_job_in_template(self):
"""Test that requesting a non existent job fails"""
in_repo_conf = textwrap.dedent(
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index 7e3c19dfe..b5697ee36 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -18,6 +18,7 @@ import json
import queue
import threading
import uuid
+from unittest import mock
import testtools
@@ -53,10 +54,12 @@ from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
iterate_timeout
)
-from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext
+from zuul.zk.zkobject import (
+ ShardedZKObject, ZKObject, ZKContext
+)
from zuul.zk.locks import tenant_write_lock
-from kazoo.exceptions import ZookeeperError, OperationTimeoutError
+from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError
class ZooKeeperBaseTestCase(BaseTestCase):
@@ -2037,3 +2040,80 @@ class TestBlobStore(ZooKeeperBaseTestCase):
with testtools.ExpectedException(KeyError):
bs.get(path)
+
+
+class TestPipelineInit(ZooKeeperBaseTestCase):
+ # Test the initialize-on-refresh code paths of various pipeline objects
+
+ def test_pipeline_state_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.state.refresh(context)
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_state_existing_object(self):
+ # Test the initialize-on-refresh code path with a pre-existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.manager = mock.Mock()
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ # We refresh the change list here purely for the side effect
+ # of creating the pipeline state object with no data (the list
+ # is a subpath of the state object).
+ pipeline.change_list.refresh(context)
+ pipeline.state.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_change_list_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.change_list.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ pipeline.manager = mock.Mock()
+ pipeline.state.refresh(context)
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_change_list_new_object_without_lock(self):
+ # Test the initialize-on-refresh code path if we don't have
+ # the lock. This should fail.
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ with testtools.ExpectedException(NoNodeError):
+ pipeline.change_list.refresh(context, allow_init=False)
+ self.assertIsNone(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ pipeline.manager = mock.Mock()
+ pipeline.state.refresh(context)
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
diff --git a/tools/docker-compose.yaml b/tools/docker-compose.yaml
index 83ab9f930..05b4905e2 100644
--- a/tools/docker-compose.yaml
+++ b/tools/docker-compose.yaml
@@ -3,7 +3,7 @@ version: "3"
services:
mysql:
container_name: zuul-test-mysql
- image: mysql:5.7
+ image: mysql:8.0
environment:
- MYSQL_ROOT_PASSWORD=insecure_worker
ports:
diff --git a/tools/test-setup-docker.sh b/tools/test-setup-docker.sh
index a0fcf9f5a..1601b11a7 100755
--- a/tools/test-setup-docker.sh
+++ b/tools/test-setup-docker.sh
@@ -58,7 +58,8 @@ timeout 30 bash -c "until ${ROOTCMD} ${MYSQL} -e 'show databases'; do sleep 0.5;
echo
echo "Setting up permissions for zuul tests"
-${ROOTCMD} ${MYSQL} -e "GRANT ALL PRIVILEGES ON *.* TO 'openstack_citest'@'%' identified by 'openstack_citest' WITH GRANT OPTION;"
+${ROOTCMD} ${MYSQL} -e "CREATE USER 'openstack_citest'@'%' identified by 'openstack_citest';"
+${ROOTCMD} ${MYSQL} -e "GRANT ALL PRIVILEGES ON *.* TO 'openstack_citest'@'%' WITH GRANT OPTION;"
${ROOTCMD} ${MYSQL} -u openstack_citest -popenstack_citest -e "SET default_storage_engine=MYISAM; DROP DATABASE IF EXISTS openstack_citest; CREATE DATABASE openstack_citest CHARACTER SET utf8;"
echo "Finished"
diff --git a/web/public/openapi.yaml b/web/public/openapi.yaml
index d69111cf8..cb2e10b37 100644
--- a/web/public/openapi.yaml
+++ b/web/public/openapi.yaml
@@ -249,7 +249,7 @@ paths:
- tenant
/api/tenant/{tenant}/key/{project}.pub:
get:
- operationId: get-project-key
+ operationId: get-project-secrets-key
parameters:
- description: The tenant name
in: path
@@ -275,12 +275,44 @@ paths:
'
schema:
- description: The project public key
+ description: The project secrets public key in PKCS8 format
type: string
- description: Returns the project public key
+ description: Returns the project public key that is used to encrypt secrets
'404':
description: Tenant or Project not found
- summary: Get a project public key
+ summary: Get a project public key that is used to encrypt secrets
+ tags:
+ - tenant
+ /api/tenant/{tenant}/project-ssh-key/{project}.pub:
+ get:
+ operationId: get-project-ssh-key
+ parameters:
+ - description: The tenant name
+ in: path
+ name: tenant
+ required: true
+ schema:
+ type: string
+ - description: The project name
+ in: path
+ name: project
+ required: true
+ schema:
+ type: string
+ responses:
+ '200':
+ content:
+ text/plain:
+ example: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACA
+
+ '
+ schema:
+ description: The project ssh public key in SSH2 format
+ type: string
+ description: Returns the project public key that executor adds to SSH agent
+ '404':
+ description: Tenant or Project not found
+ summary: Get a project public key that is used for SSH in post-merge pipelines
tags:
- tenant
/api/tenant/{tenant}/semaphores:
diff --git a/zuul/ansible/logconfig.py b/zuul/ansible/logconfig.py
index 66881336a..2d7c37463 100644
--- a/zuul/ansible/logconfig.py
+++ b/zuul/ansible/logconfig.py
@@ -140,7 +140,8 @@ def _read_config_file(filename: str):
raise ValueError("Unable to read logging config file at %s" % filename)
if os.path.splitext(filename)[1] in ('.yml', '.yaml', '.json'):
- return yaml.safe_load(open(filename, 'r'))
+ with open(filename, 'r') as f:
+ return yaml.safe_load(f)
return filename
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 16bff1401..fe22fe0f8 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -437,6 +437,30 @@ def ansible_vars_dict(value):
ansible_var_name(key)
+def copy_safe_config(conf):
+ """Return a deep copy of a config dictionary.
+
+ This lets us assign values of a config dictionary to configuration
+ objects, even if those values are nested dictionaries. This way
+ we can safely freeze the configuration object (the process of
+ which mutates dictionaries) without mutating the original
+ configuration.
+
+ Meanwhile, this does retain the original context information as a
+ single object (some behaviors rely on mutating the source context
+ (e.g., pragma)).
+
+ """
+ ret = copy.deepcopy(conf)
+ for key in (
+ '_source_context',
+ '_start_mark',
+ ):
+ if key in conf:
+ ret[key] = conf[key]
+ return ret
+
+
class PragmaParser(object):
pragma = {
'implied-branch-matchers': bool,
@@ -452,6 +476,7 @@ class PragmaParser(object):
self.pcontext = pcontext
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
bm = conf.get('implied-branch-matchers')
@@ -512,6 +537,7 @@ class NodeSetParser(object):
return vs.Schema(nodeset)
def fromYaml(self, conf, anonymous=False):
+ conf = copy_safe_config(conf)
if anonymous:
self.anon_schema(conf)
self.anonymous = True
@@ -599,6 +625,7 @@ class SecretParser(object):
return vs.Schema(secret)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
s = model.Secret(conf['name'], conf['_source_context'])
s.source_context = conf['_source_context']
@@ -723,6 +750,7 @@ class JobParser(object):
def fromYaml(self, conf, project_pipeline=False, name=None,
validate=True):
+ conf = copy_safe_config(conf)
if validate:
self.schema(conf)
@@ -1075,6 +1103,7 @@ class ProjectTemplateParser(object):
return vs.Schema(project)
def fromYaml(self, conf, validate=True, freeze=True):
+ conf = copy_safe_config(conf)
if validate:
self.schema(conf)
source_context = conf['_source_context']
@@ -1165,6 +1194,7 @@ class ProjectParser(object):
return vs.Schema(project)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
project_name = conf.get('name')
@@ -1328,6 +1358,7 @@ class PipelineParser(object):
return vs.Schema(pipeline)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
pipeline = model.Pipeline(conf['name'], self.pcontext.tenant)
pipeline.source_context = conf['_source_context']
@@ -1469,6 +1500,7 @@ class SemaphoreParser(object):
return vs.Schema(semaphore)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
semaphore = model.Semaphore(conf['name'], conf.get('max', 1))
semaphore.source_context = conf.get('_source_context')
@@ -1494,6 +1526,7 @@ class QueueParser:
return vs.Schema(queue)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
queue = model.Queue(
conf['name'],
@@ -1523,6 +1556,7 @@ class AuthorizationRuleParser(object):
return vs.Schema(authRule)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
a = model.AuthZRuleTree(conf['name'])
@@ -1556,6 +1590,7 @@ class GlobalSemaphoreParser(object):
return vs.Schema(semaphore)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
semaphore = model.Semaphore(conf['name'], conf.get('max', 1),
global_scope=True)
@@ -1576,6 +1611,7 @@ class ApiRootParser(object):
return vs.Schema(api_root)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
api_root = model.ApiRoot(conf.get('authentication-realm'))
api_root.access_rules = conf.get('access-rules', [])
@@ -1697,7 +1733,7 @@ class TenantParser(object):
'disallowed-labels': to_list(str),
'allow-circular-dependencies': bool,
'default-parent': str,
- 'default-ansible-version': vs.Any(str, float),
+ 'default-ansible-version': vs.Any(str, float, int),
'access-rules': to_list(str),
'admin-rules': to_list(str),
'semaphores': to_list(str),
@@ -1782,8 +1818,8 @@ class TenantParser(object):
# Set default ansible version
default_ansible_version = conf.get('default-ansible-version')
if default_ansible_version is not None:
- # The ansible version can be interpreted as float by yaml so make
- # sure it's a string.
+ # The ansible version can be interpreted as float or int
+ # by yaml so make sure it's a string.
default_ansible_version = str(default_ansible_version)
ansible_manager.requestVersion(default_ansible_version)
else:
@@ -2223,6 +2259,12 @@ class TenantParser(object):
job.source_context.branch)
with self.unparsed_config_cache.writeLock(
job.source_context.project_canonical_name):
+ # Prevent files cache ltime from going backward
+ if files_cache.ltime >= job.ltime:
+ self.log.info(
+ "Discarding job %s result since the files cache was "
+ "updated in the meantime", job)
+ continue
# Since the cat job returns all required config files
# for ALL tenants the project is a part of, we can
# clear the whole cache and then populate it with the
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 0a1f0ee61..276365e1d 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -1643,7 +1643,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def getInfoRefs(self, project: Project) -> Dict[str, str]:
try:
- data = self._uploadPack(project)
+ # Encode the UTF-8 data back to a byte array, as the size of
+ # each record in the pack is in bytes, and so the slicing must
+ # also be done on a byte-basis.
+ data = self._uploadPack(project).encode("utf-8")
except Exception:
self.log.error("Cannot get references from %s" % project)
raise # keeps error information
@@ -1662,7 +1665,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
plen -= 4
if len(data) - i < plen:
raise Exception("Invalid data in info/refs")
- line = data[i:i + plen]
+ # Once the pack data is sliced, we can safely decode it back
+ # into a (UTF-8) string.
+ line = data[i:i + plen].decode("utf-8")
i += plen
if not read_advertisement:
read_advertisement = True
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index 631d98998..182c83bae 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -1461,6 +1461,7 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
log.debug("Change %s is currently being updated, "
"waiting for it to finish", change)
with lock:
+ change = self._change_cache.get(change_key)
log.debug('Finished updating change %s', change)
return change
diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py
index 40cac241a..1515db8df 100644
--- a/zuul/driver/gitlab/gitlabconnection.py
+++ b/zuul/driver/gitlab/gitlabconnection.py
@@ -607,17 +607,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return change
project = self.source.getProject(change_key.project_name)
if not change:
- if not event:
- self.log.error("Change %s not found in cache and no event",
- change_key)
- if event:
- url = event.change_url
change = MergeRequest(project.name)
change.project = project
change.number = number
# patch_number is the tips commit SHA of the MR
change.patchset = change_key.revision
- change.url = url or self.getMRUrl(project.name, number)
+ change.url = self.getMRUrl(project.name, number)
change.uris = [change.url.split('://', 1)[-1]] # remove scheme
log.debug("Getting change mr#%s from project %s" % (
diff --git a/zuul/driver/sql/alembic/env.py b/zuul/driver/sql/alembic/env.py
index da7b3207f..17b67805e 100644
--- a/zuul/driver/sql/alembic/env.py
+++ b/zuul/driver/sql/alembic/env.py
@@ -53,7 +53,8 @@ def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
- poolclass=pool.NullPool)
+ poolclass=pool.NullPool,
+ future=True)
# we can get the table prefix via the tag object
tag = context.get_tag_argument()
diff --git a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
index 67581a6f9..1735d35f3 100644
--- a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
+++ b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
@@ -24,13 +24,16 @@ def upgrade(table_prefix=''):
connection = op.get_bind()
connection.execute(
- """
- UPDATE {buildset_table}
- SET result=(
- SELECT CASE score
- WHEN 1 THEN 'SUCCESS'
- ELSE 'FAILURE' END)
- """.format(buildset_table=table_prefix + BUILDSET_TABLE))
+ sa.text(
+ """
+ UPDATE {buildset_table}
+ SET result=(
+ SELECT CASE score
+ WHEN 1 THEN 'SUCCESS'
+ ELSE 'FAILURE' END)
+ """.format(buildset_table=table_prefix + BUILDSET_TABLE)
+ )
+ )
op.drop_column(table_prefix + BUILDSET_TABLE, 'score')
diff --git a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
index abfba7247..99d12d750 100644
--- a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
+++ b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
@@ -34,13 +34,16 @@ def upgrade(table_prefix=''):
connection = op.get_bind()
connection.execute(
- """
- UPDATE {buildset_table}
- SET updated=greatest(
- coalesce(first_build_start_time, '1970-01-01 00:00:00'),
- coalesce(last_build_end_time, '1970-01-01 00:00:00'),
- coalesce(event_timestamp, '1970-01-01 00:00:00'))
- """.format(buildset_table=table_prefix + "zuul_buildset"))
+ sa.text(
+ """
+ UPDATE {buildset_table}
+ SET updated=greatest(
+ coalesce(first_build_start_time, '1970-01-01 00:00:00'),
+ coalesce(last_build_end_time, '1970-01-01 00:00:00'),
+ coalesce(event_timestamp, '1970-01-01 00:00:00'))
+ """.format(buildset_table=table_prefix + "zuul_buildset")
+ )
+ )
def downgrade():
diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py
index b89653bba..2d5c39ec3 100644
--- a/zuul/driver/sql/sqlconnection.py
+++ b/zuul/driver/sql/sqlconnection.py
@@ -308,27 +308,31 @@ class SQLConnection(BaseConnection):
def _migrate(self, revision='head'):
"""Perform the alembic migrations for this connection"""
+ # Note that this method needs to be called with an external lock held.
+ # The reason for this is we retrieve the alembic version and run the
+ # alembic migrations in different database transactions which opens
+ # us to races without an external lock.
with self.engine.begin() as conn:
context = alembic.migration.MigrationContext.configure(conn)
current_rev = context.get_current_revision()
- self.log.debug('Current migration revision: %s' % current_rev)
-
- config = alembic.config.Config()
- config.set_main_option("script_location",
- "zuul:driver/sql/alembic")
- config.set_main_option("sqlalchemy.url",
- self.connection_config.get('dburi').
- replace('%', '%%'))
-
- # Alembic lets us add arbitrary data in the tag argument. We can
- # leverage that to tell the upgrade scripts about the table prefix.
- tag = {'table_prefix': self.table_prefix}
-
- if current_rev is None and not self.force_migrations:
- self.metadata.create_all(self.engine)
- alembic.command.stamp(config, revision, tag=tag)
- else:
- alembic.command.upgrade(config, revision, tag=tag)
+ self.log.debug('Current migration revision: %s' % current_rev)
+
+ config = alembic.config.Config()
+ config.set_main_option("script_location",
+ "zuul:driver/sql/alembic")
+ config.set_main_option("sqlalchemy.url",
+ self.connection_config.get('dburi').
+ replace('%', '%%'))
+
+ # Alembic lets us add arbitrary data in the tag argument. We can
+ # leverage that to tell the upgrade scripts about the table prefix.
+ tag = {'table_prefix': self.table_prefix}
+
+ if current_rev is None and not self.force_migrations:
+ self.metadata.create_all(self.engine)
+ alembic.command.stamp(config, revision, tag=tag)
+ else:
+ alembic.command.upgrade(config, revision, tag=tag)
def onLoad(self, zk_client, component_registry=None):
safe_connection = quote_plus(self.connection_name)
diff --git a/zuul/lib/repl.py b/zuul/lib/repl.py
index ecefae9ea..63a800406 100644
--- a/zuul/lib/repl.py
+++ b/zuul/lib/repl.py
@@ -26,14 +26,14 @@ class ThreadLocalProxy(object):
self.default = default
def __getattr__(self, name):
- obj = self.files.get(threading.currentThread(), self.default)
+ obj = self.files.get(threading.current_thread(), self.default)
return getattr(obj, name)
def register(self, obj):
- self.files[threading.currentThread()] = obj
+ self.files[threading.current_thread()] = obj
def unregister(self):
- self.files.pop(threading.currentThread())
+ self.files.pop(threading.current_thread())
class REPLHandler(socketserver.StreamRequestHandler):
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index bf49737dd..e87e553d3 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -28,6 +28,8 @@ from zuul.model import (
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
+from zuul.zk.exceptions import LockException
+from zuul.zk.locks import pipeline_lock
from opentelemetry import trace
@@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta):
def _postConfig(self):
layout = self.pipeline.tenant.layout
self.buildChangeQueues(layout)
- with self.sched.createZKContext(None, self.log) as ctx,\
- self.currentContext(ctx):
- # Make sure we have state and change list objects, and
- # ensure that they exist in ZK. We don't hold the
- # pipeline lock, but if they don't exist, that means they
- # are new, so no one else will either, so the write on
- # create is okay. If they do exist and we have an old
- # object, we'll just reuse it. If it does exist and we
- # don't have an old object, we'll get a new empty one.
- # Regardless, these will not automatically refresh now, so
- # they will be out of date until they are refreshed later.
- self.pipeline.state = PipelineState.create(
- self.pipeline, layout.uuid, self.pipeline.state)
- self.pipeline.change_list = PipelineChangeList.create(
- self.pipeline)
+ # Make sure we have state and change list objects. We
+ # don't actually ensure they exist in ZK here; these are
+ # just local objects until they are serialized the first
+ # time. Since we don't hold the pipeline lock, we can't
+ # reliably perform any read or write operations; we just
+ # need to ensure we have in-memory objects to work with
+ # and they will be initialized or loaded on the next
+ # refresh.
+
+ # These will be out of date until they are refreshed later.
+ self.pipeline.state = PipelineState.create(
+ self.pipeline, self.pipeline.state)
+ self.pipeline.change_list = PipelineChangeList.create(
+ self.pipeline)
+
+ # Now, try to acquire a non-blocking pipeline lock and refresh
+ # them for the side effect of initializing them if necessary.
+ # In the case of a new pipeline, no one else should have a
+ # lock anyway, and this helps us avoid emitting a whole bunch
+ # of errors elsewhere on startup when these objects don't
+ # exist. If the pipeline already exists and we can't acquire
+ # the lock, that's fine, we're much less likely to encounter
+ # read errors elsewhere in that case anyway.
+ try:
+ with pipeline_lock(
+ self.sched.zk_client, self.pipeline.tenant.name,
+ self.pipeline.name, blocking=False) as lock,\
+ self.sched.createZKContext(lock, self.log) as ctx,\
+ self.currentContext(ctx):
+ if not self.pipeline.state.exists(ctx):
+ # We only do this if the pipeline doesn't exist in
+ # ZK because in that case, this process should be
+ # fast since it's empty. If it does exist,
+ # refreshing it may be slow and since other actors
+ # won't encounter errors due to its absence, we
+ # would rather defer the work to later.
+ self.pipeline.state.refresh(ctx)
+ self.pipeline.change_list.refresh(ctx)
+ except LockException:
+ pass
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")
@@ -610,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta):
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
- self.reportStats(item, added=True)
+ self.reportStats(item, trigger_event=event)
item.quiet = quiet
if item.live:
@@ -2204,7 +2231,7 @@ class PipelineManager(metaclass=ABCMeta):
log.error("Reporting item %s received: %s", item, ret)
return action, (not ret)
- def reportStats(self, item, added=False):
+ def reportStats(self, item, trigger_event=None):
if not self.sched.statsd:
return
try:
@@ -2243,18 +2270,21 @@ class PipelineManager(metaclass=ABCMeta):
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
+ if (
+ trigger_event
+ and hasattr(trigger_event, 'arrived_at_scheduler_timestamp')
+ ):
now = time.time()
- arrived = item.event.arrived_at_scheduler_timestamp
+ arrived = trigger_event.arrived_at_scheduler_timestamp
processing = (now - arrived) * 1000
- elapsed = (now - item.event.timestamp) * 1000
+ elapsed = (now - trigger_event.timestamp) * 1000
self.sched.statsd.timing(
basekey + '.event_enqueue_processing_time',
processing)
self.sched.statsd.timing(
basekey + '.event_enqueue_time', elapsed)
self.reportPipelineTiming('event_enqueue_time',
- item.event.timestamp)
+ trigger_event.timestamp)
except Exception:
self.log.exception("Exception reporting pipeline stats")
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index 1e2de27b8..e4688a1b7 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -333,6 +333,26 @@ class Repo(object):
os.rmdir(root)
@staticmethod
+ def _cleanup_leaked_rebase_dirs(local_path, log, messages):
+ for rebase_dir in [".git/rebase-merge", ".git/rebase-apply"]:
+ leaked_dir = os.path.join(local_path, rebase_dir)
+ if not os.path.exists(leaked_dir):
+ continue
+ if log:
+ log.debug("Cleaning leaked %s dir", leaked_dir)
+ else:
+ messages.append(
+ f"Cleaning leaked {leaked_dir} dir")
+ try:
+ shutil.rmtree(leaked_dir)
+ except Exception as exc:
+ msg = f"Failed to remove leaked {leaked_dir} dir:"
+ if log:
+ log.exception(msg)
+ else:
+ messages.append(f"{msg}\n{exc}")
+
+ @staticmethod
def refNameToZuulRef(ref_name: str) -> str:
return "refs/zuul/{}".format(
hashlib.sha1(ref_name.encode("utf-8")).hexdigest()
@@ -384,6 +404,8 @@ class Repo(object):
messages.append("Delete stale Zuul ref {}".format(ref))
Repo._deleteRef(ref.path, repo)
+ Repo._cleanup_leaked_rebase_dirs(local_path, log, messages)
+
# Note: Before git 2.13 deleting a a ref foo/bar leaves an empty
# directory foo behind that will block creating the reference foo
# in the future. As a workaround we must clean up empty directories
@@ -615,7 +637,11 @@ class Repo(object):
self.fetch(ref, zuul_event_id=zuul_event_id)
log.debug("Rebasing %s with args %s", ref, args)
repo.git.checkout('FETCH_HEAD')
- repo.git.rebase(*args)
+ try:
+ repo.git.rebase(*args)
+ except Exception:
+ repo.git.rebase(abort=True)
+ raise
return repo.head.commit
def fetch(self, ref, zuul_event_id=None):
diff --git a/zuul/model.py b/zuul/model.py
index 8e0ae4eee..e526b749c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject):
_read_only=False,
)
+ def _lateInitData(self):
+ # If we're initializing the object on our initial refresh,
+ # reset the data to this.
+ return dict(
+ state=Pipeline.STATE_NORMAL,
+ queues=[],
+ old_queues=[],
+ consecutive_failures=0,
+ disabled=False,
+ layout_uuid=self.pipeline.tenant.layout.uuid,
+ )
+
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
obj = klass()
@@ -631,21 +643,23 @@ class PipelineState(zkobject.ZKObject):
return obj
@classmethod
- def create(cls, pipeline, layout_uuid, old_state=None):
- # If the object does not exist in ZK, create it with the
- # default attributes and the supplied layout UUID. Otherwise,
- # return an initialized object (or the old object for reuse)
- # without loading any data so that data can be loaded on the
- # next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline, old_state=None):
+ # If we are resetting an existing pipeline, we will have an
+ # old_state, so just clean up the object references there and
+ # let the next refresh handle updating any data.
+ if old_state:
+ old_state._resetObjectRefs()
+ return old_state
+
+ # Otherwise, we are initializing a pipeline that we haven't
+ # seen before. It still might exist in ZK, but since we
+ # haven't seen it, we don't have any object references to
+ # clean up. We can just start with a clean object, set the
+ # pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
- if state.exists(ctx):
- if old_state:
- old_state._resetObjectRefs()
- return old_state
- return state
- return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
+ return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
@@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject):
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
+
+ # Notably, this need not prevent us from performing the
+ # initialization below if necessary. The case of the object
+ # being brand new in ZK supercedes our worry that our old copy
+ # might be out of date since our old copy is, itself, brand
+ # new.
self._set(_read_only=read_only)
- return super().refresh(context)
+ try:
+ return super().refresh(context)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and we
+ # should write it to ZK.
+
+ # Note that typically this code is not used since
+ # currently other objects end up creating the pipeline
+ # path in ZK first. It is included in case that ever
+ # changes. Currently the empty byte-string code path in
+ # deserialize() is used instead.
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self._set(**self._lateInitData())
+ self.internalCreate(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
@@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject):
# source change cache.
self.pipeline.manager.clearCache()
+ # If the object doesn't exist we will get back an empty byte
+ # string. This happens because the postConfig call creates
+ # this object without holding the pipeline lock, so it can't
+ # determine whether or not it exists in ZK. We do hold the
+ # pipeline lock here, so if we get the empty byte string, we
+ # know we're initializing the object. In that case, we should
+ # initialize the layout id to the current layout. Nothing
+ # else needs to be set.
+ if raw == b'':
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ return self._lateInitData()
+
data = super().deserialize(raw, context)
if not self._read_only:
@@ -898,9 +952,31 @@ class PipelineChangeList(zkobject.ShardedZKObject):
_change_keys=[],
)
- def refresh(self, context):
- self._retry(context, super().refresh,
- context, max_tries=5)
+ def refresh(self, context, allow_init=True):
+ # Set allow_init to false to indicate that we don't hold the
+ # lock and we should not try to initialize the object in ZK if
+ # it does not exist.
+ try:
+ self._retry(context, super().refresh,
+ context, max_tries=5)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and
+ # we should write it to ZK.
+ if allow_init:
+ context.log.warning(
+ "Initializing pipeline change list for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self.internalCreate(context)
+ else:
+ # If we're called from a context where we can't
+ # initialize the change list, re-raise the exception.
+ raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
@@ -911,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject):
return pipeline_path + '/change_list'
@classmethod
- def create(cls, pipeline, old_list=None):
- # If the object does not exist in ZK, create it with the
- # default attributes. Otherwise, return an initialized object
- # (or the old object for reuse) without loading any data so
- # that data can be loaded on the next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline):
+ # This object may or may not exist in ZK, but we using any of
+ # that data here. We can just start with a clean object, set
+ # the pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
- if change_list.exists(ctx):
- if old_list:
- return old_list
- return change_list
- return cls.new(ctx, pipeline=pipeline)
+ return change_list
def serialize(self, context):
data = {
@@ -931,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
- def deserialize(self, data, context):
- data = super().deserialize(data, context)
+ def deserialize(self, raw, context):
+ data = super().deserialize(raw, context)
change_keys = []
# We must have a dictionary with a 'changes' key; otherwise we
# may be reading immediately after truncating. Allow the
@@ -4378,9 +4449,8 @@ class BuildSet(zkobject.ZKObject):
version = build.getZKVersion()
# If zstat is None, we created the object
if version is not None:
- versions = self.build_versions.copy()
- versions[build.uuid] = version + 1
- self.updateAttributes(context, build_versions=versions)
+ self.build_versions[build.uuid] = version + 1
+ self.updateAttributes(context, build_versions=self.build_versions)
def updateJobVersion(self, context, job):
if (COMPONENT_REGISTRY.model_api < 12):
@@ -4388,9 +4458,8 @@ class BuildSet(zkobject.ZKObject):
version = job.getZKVersion()
if version is not None:
- versions = self.job_versions.copy()
- versions[job.name] = version + 1
- self.updateAttributes(context, job_versions=versions)
+ self.job_versions[job.name] = version + 1
+ self.updateAttributes(context, job_versions=self.job_versions)
def shouldRefreshBuild(self, build):
# Unless all schedulers are updating versions, we can't trust
@@ -4398,19 +4467,15 @@ class BuildSet(zkobject.ZKObject):
if (COMPONENT_REGISTRY.model_api < 12):
return True
current = build.getZKVersion()
- if current is None:
- current = -1
expected = self.build_versions.get(build.uuid, 0)
- return expected > current
+ return expected != current
def shouldRefreshJob(self, job):
if (COMPONENT_REGISTRY.model_api < 12):
return True
current = job.getZKVersion()
- if current is None:
- current = -1
expected = self.job_versions.get(job.name, 0)
- return expected > current
+ return expected != current
@property
def ref(self):
@@ -4601,6 +4666,37 @@ class BuildSet(zkobject.ZKObject):
return Attributes(uuid=self.uuid)
+class EventInfo:
+
+ def __init__(self):
+ self.zuul_event_id = None
+ self.timestamp = time.time()
+ self.span_context = None
+
+ @classmethod
+ def fromEvent(cls, event):
+ tinfo = cls()
+ tinfo.zuul_event_id = event.zuul_event_id
+ tinfo.timestamp = event.timestamp
+ tinfo.span_context = event.span_context
+ return tinfo
+
+ @classmethod
+ def fromDict(cls, d):
+ tinfo = cls()
+ tinfo.zuul_event_id = d["zuul_event_id"]
+ tinfo.timestamp = d["timestamp"]
+ tinfo.span_context = d["span_context"]
+ return tinfo
+
+ def toDict(self):
+ return {
+ "zuul_event_id": self.zuul_event_id,
+ "timestamp": self.timestamp,
+ "span_context": self.span_context,
+ }
+
+
class QueueItem(zkobject.ZKObject):
"""Represents the position of a Change in a ChangeQueue.
@@ -4635,7 +4731,7 @@ class QueueItem(zkobject.ZKObject):
live=True, # Whether an item is intended to be processed at all
layout_uuid=None,
_cached_sql_results={},
- event=None, # The trigger event that lead to this queue item
+ event=None, # Info about the event that lead to this queue item
# Additional container for connection specifig information to be
# used by reporters throughout the lifecycle
@@ -4657,6 +4753,9 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
+ if COMPONENT_REGISTRY.model_api >= 13:
+ obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
+
data = obj._trySerialize(context)
obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
@@ -4685,10 +4784,18 @@ class QueueItem(zkobject.ZKObject):
return (tenant, pipeline, uuid)
def serialize(self, context):
- if isinstance(self.event, TriggerEvent):
- event_type = "TriggerEvent"
+ if COMPONENT_REGISTRY.model_api < 13:
+ if isinstance(self.event, TriggerEvent):
+ event_type = "TriggerEvent"
+ else:
+ event_type = self.event.__class__.__name__
else:
- event_type = self.event.__class__.__name__
+ event_type = "EventInfo"
+ if not isinstance(self.event, EventInfo):
+ # Convert our local trigger event to a trigger info
+ # object. This will only happen on the transition to
+ # model API version 13.
+ self._set(event=EventInfo.fromEvent(self.event))
data = {
"uuid": self.uuid,
@@ -4730,14 +4837,18 @@ class QueueItem(zkobject.ZKObject):
# child objects.
self._set(uuid=data["uuid"])
- event_type = data["event"]["type"]
- if event_type == "TriggerEvent":
- event_class = (
- self.pipeline.manager.sched.connections.getTriggerEventClass(
- data["event"]["data"]["driver_name"])
- )
+ if COMPONENT_REGISTRY.model_api < 13:
+ event_type = data["event"]["type"]
+ if event_type == "TriggerEvent":
+ event_class = (
+ self.pipeline.manager.sched.connections
+ .getTriggerEventClass(
+ data["event"]["data"]["driver_name"])
+ )
+ else:
+ event_class = EventTypeIndex.event_type_mapping.get(event_type)
else:
- event_class = EventTypeIndex.event_type_mapping.get(event_type)
+ event_class = EventInfo
if event_class is None:
raise NotImplementedError(
@@ -5959,8 +6070,7 @@ class Bundle:
def deserialize(cls, context, queue, items_by_path, data):
bundle = cls(data["uuid"])
bundle.items = [
- items_by_path.get(p) or QueueItem.fromZK(
- context, p, pipeline=queue.pipeline, queue=queue)
+ items_by_path.get(p) or QueueItem.fromZK(context, p, queue=queue)
for p in data["items"]
]
bundle.started_reporting = data["started_reporting"]
diff --git a/zuul/model_api.py b/zuul/model_api.py
index ccb12077d..0244296dd 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -14,4 +14,4 @@
# When making ZK schema changes, increment this and add a record to
# doc/source/developer/model-changelog.rst
-MODEL_API = 12
+MODEL_API = 13
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 2b78b4c84..cd15a878c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1528,7 +1528,7 @@ class Scheduler(threading.Thread):
with self.createZKContext(lock, self.log) as ctx:
if tenant is not None:
self._reconfigureTenant(ctx, min_ltimes,
- -1,
+ event.zuul_event_ltime,
tenant, old_tenant)
else:
self._reconfigureDeleteTenant(ctx, old_tenant)
@@ -1548,7 +1548,6 @@ class Scheduler(threading.Thread):
# This is called in the scheduler loop after another thread submits
# a request
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.log.debug("Updating system config")
self.updateSystemConfig()
with self.layout_lock:
@@ -2126,70 +2125,74 @@ class Scheduler(threading.Thread):
return
self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
- try:
- if not self._stopped:
- self.process_reconfigure_queue()
+ with self.statsd_timer("zuul.scheduler.run_handler"):
+ try:
+ self._run()
+ except Exception:
+ self.log.exception("Exception in run handler:")
+ # There may still be more events to process
+ self.wake_event.set()
+ finally:
+ self.run_handler_lock.release()
- if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.updateSystemConfig()
+ def _run(self):
+ if not self._stopped:
+ self.process_reconfigure_queue()
- for tenant_name in self.unparsed_abide.tenants:
- if self._stopped:
- break
+ if self.unparsed_abide.ltime < self.system_config_cache.ltime:
+ self.updateSystemConfig()
- tenant = self.abide.tenants.get(tenant_name)
- if not tenant:
- continue
+ for tenant_name in self.unparsed_abide.tenants:
+ if self._stopped:
+ break
- # This will also forward events for the pipelines
- # (e.g. enqueue or dequeue events) to the matching
- # pipeline event queues that are processed afterwards.
- self.process_tenant_management_queue(tenant)
+ tenant = self.abide.tenants.get(tenant_name)
+ if not tenant:
+ continue
- if self._stopped:
- break
+ # This will also forward events for the pipelines
+ # (e.g. enqueue or dequeue events) to the matching
+ # pipeline event queues that are processed afterwards.
+ self.process_tenant_management_queue(tenant)
- try:
- with tenant_read_lock(
- self.zk_client, tenant_name, blocking=False
- ) as tlock:
- if not self.isTenantLayoutUpToDate(tenant_name):
- continue
+ if self._stopped:
+ break
- # Get tenant again, as it might have been updated
- # by a tenant reconfig or layout change.
- tenant = self.abide.tenants[tenant_name]
+ try:
+ with tenant_read_lock(
+ self.zk_client, tenant_name, blocking=False
+ ) as tlock:
+ if not self.isTenantLayoutUpToDate(tenant_name):
+ continue
- if not self._stopped:
- # This will forward trigger events to pipeline
- # event queues that are processed below.
- self.process_tenant_trigger_queue(tenant)
+ # Get tenant again, as it might have been updated
+ # by a tenant reconfig or layout change.
+ tenant = self.abide.tenants[tenant_name]
- self.process_pipelines(tenant, tlock)
- except LockException:
- self.log.debug("Skipping locked tenant %s",
- tenant.name)
- remote_state = self.tenant_layout_state.get(
- tenant_name)
- local_state = self.local_layout_state.get(
- tenant_name)
- if (remote_state is None or
- local_state is None or
- remote_state > local_state):
- # Let's keep looping until we've updated to the
- # latest tenant layout.
- self.wake_event.set()
- except Exception:
- self.log.exception("Exception processing tenant %s:",
- tenant_name)
- # There may still be more events to process
- self.wake_event.set()
+ if not self._stopped:
+ # This will forward trigger events to pipeline
+ # event queues that are processed below.
+ self.process_tenant_trigger_queue(tenant)
+
+ self.process_pipelines(tenant, tlock)
+ except LockException:
+ self.log.debug("Skipping locked tenant %s",
+ tenant.name)
+ remote_state = self.tenant_layout_state.get(
+ tenant_name)
+ local_state = self.local_layout_state.get(
+ tenant_name)
+ if (remote_state is None or
+ local_state is None or
+ remote_state > local_state):
+ # Let's keep looping until we've updated to the
+ # latest tenant layout.
+ self.wake_event.set()
except Exception:
- self.log.exception("Exception in run handler:")
+ self.log.exception("Exception processing tenant %s:",
+ tenant_name)
# There may still be more events to process
self.wake_event.set()
- finally:
- self.run_handler_lock.release()
def primeSystemConfig(self):
with self.layout_lock:
@@ -2207,6 +2210,7 @@ class Scheduler(threading.Thread):
def updateSystemConfig(self):
with self.layout_lock:
+ self.log.debug("Updating system config")
self.unparsed_abide, self.globals = self.system_config_cache.get()
self.ansible_manager = AnsibleManager(
default_version=self.globals.default_ansible_version)
@@ -2241,7 +2245,12 @@ class Scheduler(threading.Thread):
self.zk_client, tenant.name, pipeline.name,
blocking=False) as lock,\
self.createZKContext(lock, self.log) as ctx:
+ self.log.debug("Processing pipeline %s in tenant %s",
+ pipeline.name, tenant.name)
with pipeline.manager.currentContext(ctx):
+ if ((tenant.name, pipeline.name) in
+ self._profile_pipelines):
+ ctx.profile = True
with self.statsd_timer(f'{stats_key}.handling'):
refreshed = self._process_pipeline(
tenant, pipeline)
@@ -2310,14 +2319,10 @@ class Scheduler(threading.Thread):
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
ctx = pipeline.manager.current_context
- if (tenant.name, pipeline.name) in self._profile_pipelines:
- ctx.profile = True
with self.statsd_timer(f'{stats_key}.refresh'):
pipeline.change_list.refresh(ctx)
pipeline.summary.refresh(ctx)
pipeline.state.refresh(ctx)
- if (tenant.name, pipeline.name) in self._profile_pipelines:
- ctx.profile = False
pipeline.state.setDirty(self.zk_client.client)
if pipeline.state.old_queues:
@@ -2354,7 +2359,9 @@ class Scheduler(threading.Thread):
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s %s",
tenant.name, pipeline.name)
- pipeline.change_list.refresh(ctx)
+ # This will raise an exception and abort the process if
+ # unable to refresh the change list.
+ pipeline.change_list.refresh(ctx, allow_init=False)
change_keys = pipeline.change_list.getChangeKeys()
relevant_changes = pipeline.manager.resolveChangeKeys(
change_keys)
@@ -2380,11 +2387,21 @@ class Scheduler(threading.Thread):
with trigger_queue_lock(
self.zk_client, tenant.name, blocking=False
):
+ self.log.debug("Processing tenant trigger events in %s",
+ tenant.name)
# Update the pipeline changes
ctx = self.createZKContext(None, self.log)
for pipeline in tenant.layout.pipelines.values():
+ # This will raise an exception if it is unable to
+ # refresh the change list. We will proceed anyway
+ # and use our data from the last time we did
+ # refresh in order to avoid stalling trigger
+ # processing. In this case we may not forward
+ # some events which are related to changes in the
+ # pipeline but don't match the pipeline trigger
+ # criteria.
try:
- pipeline.change_list.refresh(ctx)
+ pipeline.change_list.refresh(ctx, allow_init=False)
except Exception:
self.log.exception(
"Unable to refresh pipeline change list for %s",
@@ -2597,6 +2614,8 @@ class Scheduler(threading.Thread):
"Skipping management event queue for tenant %s",
tenant.name)
return
+ self.log.debug("Processing tenant management events in %s",
+ tenant.name)
self._process_tenant_management_queue(tenant)
except LockException:
self.log.debug("Skipping locked management event queue"
diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py
index 01a683b23..c2487af88 100644
--- a/zuul/source/__init__.py
+++ b/zuul/source/__init__.py
@@ -97,7 +97,7 @@ class BaseSource(object, metaclass=abc.ABCMeta):
# info on subsequent requests we can continue to do the
# requested job work.
try:
- dep = self.getChangeByURL(url, event)
+ return self.getChangeByURL(url, event)
except Exception:
# Note that if the change isn't found dep is None.
# We do not raise in that case and do not need to handle it
@@ -109,7 +109,6 @@ class BaseSource(object, metaclass=abc.ABCMeta):
time.sleep(1)
else:
raise
- return dep
@abc.abstractmethod
def getChangesDependingOn(self, change, projects, tenant):
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index b228ecaa4..87d76bca6 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -233,7 +233,18 @@ class ZKObject:
obj._load(context, path=path)
return obj
+ def internalCreate(self, context):
+ """Create the object in ZK from an existing ZKObject
+
+ This should only be used in special circumstances: when we
+ know it's safe to start using a ZKObject before it's actually
+ created in ZK. Normally use .new()
+ """
+ data = self._trySerialize(context)
+ self._save(context, data, create=True)
+
def refresh(self, context):
+
"""Update data from ZK"""
self._load(context)