summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Dockerfile1
-rw-r--r--doc/source/developer/model-changelog.rst6
-rw-r--r--doc/source/job-content.rst19
-rw-r--r--doc/source/monitoring.rst11
-rw-r--r--noxfile.py6
-rw-r--r--releasenotes/notes/change_message-18207e18b5dfffd3.yaml12
-rw-r--r--requirements.txt2
-rw-r--r--tests/base.py3
-rw-r--r--tests/fakegitlab.py1
-rw-r--r--tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml4
-rwxr-xr-xtests/make_playbooks.py3
-rw-r--r--tests/unit/test_circular_dependencies.py93
-rw-r--r--tests/unit/test_connection.py192
-rw-r--r--tests/unit/test_event_queues.py30
-rw-r--r--tests/unit/test_gerrit.py45
-rw-r--r--tests/unit/test_git_driver.py3
-rw-r--r--tests/unit/test_inventory.py23
-rw-r--r--tests/unit/test_model.py4
-rw-r--r--tests/unit/test_model_upgrade.py39
-rw-r--r--tests/unit/test_reporting.py4
-rw-r--r--tests/unit/test_scheduler.py72
-rw-r--r--tests/unit/test_sos.py73
-rw-r--r--tests/unit/test_streaming.py28
-rw-r--r--tests/unit/test_v3.py40
-rw-r--r--tests/unit/test_zk.py84
-rw-r--r--web/src/App.jsx105
-rw-r--r--web/src/App.test.jsx5
-rw-r--r--web/src/containers/timezone/SelectTz.jsx22
-rw-r--r--web/src/index.css15
-rw-r--r--zuul/ansible/logconfig.py3
-rw-r--r--zuul/configloader.py47
-rw-r--r--zuul/driver/gerrit/gerritconnection.py9
-rw-r--r--zuul/driver/sql/alembic/env.py3
-rw-r--r--zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py17
-rw-r--r--zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py17
-rw-r--r--zuul/driver/sql/sqlconnection.py40
-rw-r--r--zuul/executor/common.py1
-rw-r--r--zuul/executor/server.py25
-rw-r--r--zuul/lib/fingergw.py30
-rw-r--r--zuul/lib/repl.py6
-rw-r--r--zuul/lib/streamer_utils.py2
-rw-r--r--zuul/manager/__init__.py69
-rw-r--r--zuul/model.py217
-rw-r--r--zuul/model_api.py4
-rw-r--r--zuul/scheduler.py347
-rwxr-xr-xzuul/web/__init__.py2
-rw-r--r--zuul/zk/event_queues.py14
-rw-r--r--zuul/zk/zkobject.py22
48 files changed, 1411 insertions, 409 deletions
diff --git a/Dockerfile b/Dockerfile
index 5a17739f0..df326bd8a 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -29,6 +29,7 @@ ARG REACT_APP_ENABLE_SERVICE_WORKER
# Kubectl/Openshift version/sha
ARG OPENSHIFT_URL=https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/4.11.20/openshift-client-linux-4.11.20.tar.gz
ARG OPENSHIFT_SHA=74f252c812932425ca19636b2be168df8fe57b114af6b114283975e67d987d11
+ARG PBR_VERSION=
COPY . /tmp/src
COPY --from=js-builder /tmp/src/build /tmp/src/zuul/web/static
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst
index a14ff9895..b80979362 100644
--- a/doc/source/developer/model-changelog.rst
+++ b/doc/source/developer/model-changelog.rst
@@ -106,3 +106,9 @@ Version 11
:Prior Zuul version: 8.0.1
:Description: Adds merge_modes to branch cache. Affects schedulers and web.
+
+Version 12
+----------
+:Prior Zuul version: 8.0.1
+:Description: Adds job_versions and build_versions to BuildSet.
+ Affects schedulers.
diff --git a/doc/source/job-content.rst b/doc/source/job-content.rst
index 75044cf1c..d6bb07683 100644
--- a/doc/source/job-content.rst
+++ b/doc/source/job-content.rst
@@ -612,7 +612,6 @@ of item.
The patchset identifier for the change. If a change is
revised, this will have a different value.
-
.. var:: resources
:type: dict
@@ -706,14 +705,18 @@ are available:
The commit or pull request message of the change base64 encoded. Use the
`b64decode` filter in ansible when working with it.
- .. code-block:: yaml
+ .. warning:: This variable is deprecated and will be removed in
+ a future version. Use :var:`zuul.change_message`
+ instead.
+
+ .. var:: change_message
- - hosts: all
- tasks:
- - name: Dump commit message
- copy:
- content: "{{ zuul.message | b64decode }}"
- dest: "{{ zuul.executor.log_root }}/commit-message.txt"
+ The commit or pull request message of the change. When Zuul
+ runs Ansible, this variable is tagged with the ``!unsafe`` YAML
+ tag so that Ansible will not interpolate values into it. Note,
+ however, that the `inventory.yaml` file placed in the build's
+ workspace for debugging and inspection purposes does not inclued
+ the ``!unsafe`` tag.
Branch Items
diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst
index 7f99c7f7f..f40bee445 100644
--- a/doc/source/monitoring.rst
+++ b/doc/source/monitoring.rst
@@ -444,6 +444,11 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Indicates if the executor is paused. 1 means paused else 0.
+ .. stat:: pct_used_hdd
+ :type: gauge
+
+ The used disk on this executor, as a percentage multiplied by 100.
+
.. stat:: pct_used_ram
:type: gauge
@@ -711,6 +716,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The size of the current connection event queue.
+ .. stat:: run_handler
+ :type: timer
+
+ A timer metric reporting the time taken for one scheduler run
+ handler iteration.
+
.. stat:: time_query
:type: timer
diff --git a/noxfile.py b/noxfile.py
index cdba605f0..30058cef7 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -32,7 +32,6 @@ def set_standard_env_vars(session):
set_env(session, 'OS_STDERR_CAPTURE', '1')
set_env(session, 'OS_STDOUT_CAPTURE', '1')
set_env(session, 'OS_TEST_TIMEOUT', '360')
- set_env(session, 'SQLALCHEMY_WARN_20', '1')
session.env['PYTHONWARNINGS'] = ','.join([
'always::DeprecationWarning:zuul.driver.sql.sqlconnection',
'always::DeprecationWarning:tests.base',
@@ -40,12 +39,15 @@ def set_standard_env_vars(session):
'always::DeprecationWarning:zuul.driver.sql.alembic.env',
'always::DeprecationWarning:zuul.driver.sql.alembic.script',
])
+ # Set PYTHONTRACEMALLOC to a value greater than 0 in the calling env
+ # to get tracebacks of that depth for ResourceWarnings. Disabled by
+ # default as this consumes more resources and is slow.
+ set_env(session, 'PYTHONTRACEMALLOC', '0')
@nox.session(python='3')
def bindep(session):
set_standard_env_vars(session)
- set_env(session, 'SQLALCHEMY_WARN_20', '1')
session.install('bindep')
session.run('bindep', 'test')
diff --git a/releasenotes/notes/change_message-18207e18b5dfffd3.yaml b/releasenotes/notes/change_message-18207e18b5dfffd3.yaml
new file mode 100644
index 000000000..1fd005684
--- /dev/null
+++ b/releasenotes/notes/change_message-18207e18b5dfffd3.yaml
@@ -0,0 +1,12 @@
+---
+features:
+ - |
+ The change message (commit message, or pull request message
+ depending on the driver) is now available in plain text form
+ annotated with the Ansible `!unsafe` tag as
+ :var:`zuul.change_message`.
+deprecations:
+ - |
+ The base64 encoded version of the change message available as
+ :var:`zuul.message` is deprecated and will be removed in a future
+ version of Zuul. Use :var:`zuul.change_message` instead.
diff --git a/requirements.txt b/requirements.txt
index 082a18043..e12e9cf72 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -18,7 +18,7 @@ PrettyTable>=0.6,<0.8
babel>=1.0
netaddr
kazoo>=2.9.0
-sqlalchemy<2.0.0
+sqlalchemy>=2.0.0
alembic
cryptography>=39.0.0
cachecontrol<0.12.7
diff --git a/tests/base.py b/tests/base.py
index cdcf63390..dcd316fab 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -1227,6 +1227,7 @@ class GerritWebServer(object):
def stop(self):
self.httpd.shutdown()
self.thread.join()
+ self.httpd.server_close()
class FakeGerritPoller(gerritconnection.GerritPoller):
@@ -4082,6 +4083,7 @@ class WebProxyFixture(fixtures.Fixture):
def _cleanup(self):
self.httpd.shutdown()
self.thread.join()
+ self.httpd.server_close()
class ZuulWebFixture(fixtures.Fixture):
@@ -5622,6 +5624,7 @@ class ZuulTestCase(BaseTestCase):
time.sleep(0.1)
def refreshPipelines(self, sched):
+ ctx = None
for tenant in sched.abide.tenants.values():
with tenant_read_lock(self.zk_client, tenant.name):
for pipeline in tenant.layout.pipelines.values():
diff --git a/tests/fakegitlab.py b/tests/fakegitlab.py
index c4706b3b1..294887af0 100644
--- a/tests/fakegitlab.py
+++ b/tests/fakegitlab.py
@@ -262,3 +262,4 @@ class GitlabWebServer(object):
def stop(self):
self.httpd.shutdown()
self.thread.join()
+ self.httpd.server_close()
diff --git a/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml b/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml
index 834c3cbe8..cea1ffe8c 100644
--- a/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml
+++ b/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml
@@ -4,3 +4,7 @@
copy:
content: "{{ zuul.message | b64decode }}"
dest: "{{ zuul.executor.log_root }}/commit-message.txt"
+ - name: Dump commit message
+ copy:
+ content: "{{ zuul.change_message }}"
+ dest: "{{ zuul.executor.log_root }}/change-message.txt"
diff --git a/tests/make_playbooks.py b/tests/make_playbooks.py
index 93c37bc81..cb7a98096 100755
--- a/tests/make_playbooks.py
+++ b/tests/make_playbooks.py
@@ -40,7 +40,8 @@ def handle_repo(path):
config_path = os.path.join(path, fn)
break
try:
- config = yaml.safe_load(open(config_path))
+ with open(config_path) as f:
+ config = yaml.safe_load(f)
except Exception:
print(" Has yaml errors")
return
diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py
index 74d2cedf0..f534b2596 100644
--- a/tests/unit/test_circular_dependencies.py
+++ b/tests/unit/test_circular_dependencies.py
@@ -2332,6 +2332,52 @@ class TestGerritCircularDependencies(ZuulTestCase):
dict(name="project6-job-t2", result="SUCCESS", changes="1,1 2,1"),
], ordered=False)
+ def test_dependency_refresh(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates the typical workflow where a developer only
+ # knows the change id of changes one at a time.
+ # The first change:
+ A = self.fake_gerrit.addFakeChange("org/project", "master", "A")
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Now that it has been uploaded, upload the second change and
+ # point it at the first.
+ # B -> A
+ B = self.fake_gerrit.addFakeChange("org/project", "master", "B")
+ B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ B.subject, A.data["url"]
+ )
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Now that the second change is known, update the first change
+ # B <-> A
+ A.addPatchset()
+ A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ A.subject, B.data["url"]
+ )
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # A quirk: at the end of this process, the first change in
+ # Gerrit has a complete run because the process of updating it
+ # involves a new patchset that is enqueued. Compare to the
+ # same test in GitHub.
+ self.assertHistory([
+ dict(name="project-job", result="ABORTED", changes="1,1"),
+ dict(name="project-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="project-job", result="SUCCESS", changes="2,1 1,2"),
+ ], ordered=False)
+
class TestGithubCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
@@ -2524,3 +2570,50 @@ class TestGithubCircularDependencies(ZuulTestCase):
B.comments[-1]))
self.assertFalse(re.search('Change .*? is needed',
B.comments[-1]))
+
+ def test_dependency_refresh(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates the typical workflow where a developer only
+ # knows the PR id of changes one at a time.
+ # The first change:
+ A = self.fake_github.openFakePullRequest("gh/project", "master", "A")
+ self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
+ self.waitUntilSettled()
+
+ # Now that it has been uploaded, upload the second change and
+ # point it at the first.
+ # B -> A
+ B = self.fake_github.openFakePullRequest("gh/project", "master", "B")
+ B.body = "{}\n\nDepends-On: {}\n".format(
+ B.subject, A.url
+ )
+ self.fake_github.emitEvent(B.getPullRequestOpenedEvent())
+ self.waitUntilSettled()
+
+ # Now that the second change is known, update the first change
+ # B <-> A
+ A.body = "{}\n\nDepends-On: {}\n".format(
+ A.subject, B.url
+ )
+
+ self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.subject))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # A quirk: at the end of this process, the second PR in GitHub
+ # has a complete run because the process of updating the first
+ # change is not disruptive to the second. Compare to the same
+ # test in Gerrit.
+ self.assertHistory([
+ dict(name="project-job", result="ABORTED",
+ changes=f"{A.number},{A.head_sha}"),
+ dict(name="project-job", result="SUCCESS",
+ changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"),
+ ], ordered=False)
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index c30e1743d..26a99215e 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -65,7 +65,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
def _sql_tables_created(self, connection_name):
connection = self.scheds.first.connections.connections[connection_name]
- insp = sa.engine.reflection.Inspector(connection.engine)
+ insp = sa.inspect(connection.engine)
table_prefix = connection.table_prefix
self.assertEqual(self.expected_table_prefix, table_prefix)
@@ -82,7 +82,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
def _sql_indexes_created(self, connection_name):
connection = self.scheds.first.connections.connections[connection_name]
- insp = sa.engine.reflection.Inspector(connection.engine)
+ insp = sa.inspect(connection.engine)
table_prefix = connection.table_prefix
self.assertEqual(self.expected_table_prefix, table_prefix)
@@ -127,7 +127,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
self.assertEqual(5, len(buildsets))
@@ -137,107 +137,107 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset3 = buildsets[3]
buildset4 = buildsets[4]
- self.assertEqual('check', buildset0['pipeline'])
- self.assertEqual('org/project', buildset0['project'])
- self.assertEqual(1, buildset0['change'])
- self.assertEqual('1', buildset0['patchset'])
- self.assertEqual('SUCCESS', buildset0['result'])
- self.assertEqual('Build succeeded.', buildset0['message'])
- self.assertEqual('tenant-one', buildset0['tenant'])
+ self.assertEqual('check', buildset0.pipeline)
+ self.assertEqual('org/project', buildset0.project)
+ self.assertEqual(1, buildset0.change)
+ self.assertEqual('1', buildset0.patchset)
+ self.assertEqual('SUCCESS', buildset0.result)
+ self.assertEqual('Build succeeded.', buildset0.message)
+ self.assertEqual('tenant-one', buildset0.tenant)
self.assertEqual(
- 'https://review.example.com/%d' % buildset0['change'],
- buildset0['ref_url'])
- self.assertNotEqual(None, buildset0['event_id'])
- self.assertNotEqual(None, buildset0['event_timestamp'])
+ 'https://review.example.com/%d' % buildset0.change,
+ buildset0.ref_url)
+ self.assertNotEqual(None, buildset0.event_id)
+ self.assertNotEqual(None, buildset0.event_timestamp)
buildset0_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
# Check the first result, which should be the project-merge job
self.assertEqual(
- 'project-merge', buildset0_builds[0]['job_name'])
- self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
- self.assertEqual(None, buildset0_builds[0]['log_url'])
- self.assertEqual('check', buildset1['pipeline'])
- self.assertEqual('master', buildset1['branch'])
- self.assertEqual('org/project', buildset1['project'])
- self.assertEqual(2, buildset1['change'])
- self.assertEqual('1', buildset1['patchset'])
- self.assertEqual('FAILURE', buildset1['result'])
- self.assertEqual('Build failed.', buildset1['message'])
+ 'project-merge', buildset0_builds[0].job_name)
+ self.assertEqual("SUCCESS", buildset0_builds[0].result)
+ self.assertEqual(None, buildset0_builds[0].log_url)
+ self.assertEqual('check', buildset1.pipeline)
+ self.assertEqual('master', buildset1.branch)
+ self.assertEqual('org/project', buildset1.project)
+ self.assertEqual(2, buildset1.change)
+ self.assertEqual('1', buildset1.patchset)
+ self.assertEqual('FAILURE', buildset1.result)
+ self.assertEqual('Build failed.', buildset1.message)
buildset1_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset1['id']
+ buildset1.id
)
).fetchall()
# Check the second result, which should be the project-test1
# job which failed
self.assertEqual(
- 'project-test1', buildset1_builds[1]['job_name'])
- self.assertEqual("FAILURE", buildset1_builds[1]['result'])
- self.assertEqual(None, buildset1_builds[1]['log_url'])
+ 'project-test1', buildset1_builds[1].job_name)
+ self.assertEqual("FAILURE", buildset1_builds[1].result)
+ self.assertEqual(None, buildset1_builds[1].log_url)
buildset2_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset2['id']
+ buildset2.id
)
).fetchall()
# Check the first result, which should be the project-publish
# job
self.assertEqual('project-publish',
- buildset2_builds[0]['job_name'])
- self.assertEqual("SUCCESS", buildset2_builds[0]['result'])
+ buildset2_builds[0].job_name)
+ self.assertEqual("SUCCESS", buildset2_builds[0].result)
buildset3_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset3['id']
+ buildset3.id
)
).fetchall()
self.assertEqual(
- 'project-test1', buildset3_builds[1]['job_name'])
- self.assertEqual('NODE_FAILURE', buildset3_builds[1]['result'])
- self.assertEqual(None, buildset3_builds[1]['log_url'])
- self.assertIsNotNone(buildset3_builds[1]['start_time'])
- self.assertIsNotNone(buildset3_builds[1]['end_time'])
+ 'project-test1', buildset3_builds[1].job_name)
+ self.assertEqual('NODE_FAILURE', buildset3_builds[1].result)
+ self.assertEqual(None, buildset3_builds[1].log_url)
+ self.assertIsNotNone(buildset3_builds[1].start_time)
+ self.assertIsNotNone(buildset3_builds[1].end_time)
self.assertGreaterEqual(
- buildset3_builds[1]['end_time'],
- buildset3_builds[1]['start_time'])
+ buildset3_builds[1].end_time,
+ buildset3_builds[1].start_time)
# Check the paused build result
buildset4_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset4['id']
+ buildset4.id
).order_by(reporter.connection.zuul_build_table.c.id)
).fetchall()
paused_build_events = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_event_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_event_table.c.build_id
- == buildset4_builds[0]["id"]
+ == buildset4_builds[0].id
)
).fetchall()
@@ -245,16 +245,16 @@ class TestSQLConnectionMysql(ZuulTestCase):
pause_event = paused_build_events[0]
resume_event = paused_build_events[1]
self.assertEqual(
- pause_event["event_type"], "paused")
- self.assertIsNotNone(pause_event["event_time"])
- self.assertIsNone(pause_event["description"])
+ pause_event.event_type, "paused")
+ self.assertIsNotNone(pause_event.event_time)
+ self.assertIsNone(pause_event.description)
self.assertEqual(
- resume_event["event_type"], "resumed")
- self.assertIsNotNone(resume_event["event_time"])
- self.assertIsNone(resume_event["description"])
+ resume_event.event_type, "resumed")
+ self.assertIsNotNone(resume_event.event_time)
+ self.assertIsNone(resume_event.description)
self.assertGreater(
- resume_event["event_time"], pause_event["event_time"])
+ resume_event.event_time, pause_event.event_time)
self.executor_server.hold_jobs_in_build = True
@@ -333,51 +333,51 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
self.assertEqual(1, len(buildsets))
buildset0 = buildsets[0]
- self.assertEqual('check', buildset0['pipeline'])
- self.assertEqual('org/project', buildset0['project'])
- self.assertEqual(1, buildset0['change'])
- self.assertEqual('1', buildset0['patchset'])
- self.assertEqual('SUCCESS', buildset0['result'])
- self.assertEqual('Build succeeded.', buildset0['message'])
- self.assertEqual('tenant-one', buildset0['tenant'])
+ self.assertEqual('check', buildset0.pipeline)
+ self.assertEqual('org/project', buildset0.project)
+ self.assertEqual(1, buildset0.change)
+ self.assertEqual('1', buildset0.patchset)
+ self.assertEqual('SUCCESS', buildset0.result)
+ self.assertEqual('Build succeeded.', buildset0.message)
+ self.assertEqual('tenant-one', buildset0.tenant)
self.assertEqual(
- 'https://review.example.com/%d' % buildset0['change'],
- buildset0['ref_url'])
+ 'https://review.example.com/%d' % buildset0.change,
+ buildset0.ref_url)
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
# Check the retry results
- self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[0]['result'])
- self.assertTrue(buildset0_builds[0]['final'])
-
- self.assertEqual('project-test1', buildset0_builds[1]['job_name'])
- self.assertEqual('RETRY', buildset0_builds[1]['result'])
- self.assertFalse(buildset0_builds[1]['final'])
- self.assertEqual('project-test2', buildset0_builds[2]['job_name'])
- self.assertEqual('RETRY', buildset0_builds[2]['result'])
- self.assertFalse(buildset0_builds[2]['final'])
-
- self.assertEqual('project-test1', buildset0_builds[3]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[3]['result'])
- self.assertTrue(buildset0_builds[3]['final'])
- self.assertEqual('project-test2', buildset0_builds[4]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[4]['result'])
- self.assertTrue(buildset0_builds[4]['final'])
+ self.assertEqual('project-merge', buildset0_builds[0].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[0].result)
+ self.assertTrue(buildset0_builds[0].final)
+
+ self.assertEqual('project-test1', buildset0_builds[1].job_name)
+ self.assertEqual('RETRY', buildset0_builds[1].result)
+ self.assertFalse(buildset0_builds[1].final)
+ self.assertEqual('project-test2', buildset0_builds[2].job_name)
+ self.assertEqual('RETRY', buildset0_builds[2].result)
+ self.assertFalse(buildset0_builds[2].final)
+
+ self.assertEqual('project-test1', buildset0_builds[3].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[3].result)
+ self.assertTrue(buildset0_builds[3].final)
+ self.assertEqual('project-test2', buildset0_builds[4].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[4].result)
+ self.assertTrue(buildset0_builds[4].final)
self.executor_server.hold_jobs_in_build = True
@@ -430,7 +430,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
@@ -439,10 +439,10 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
@@ -488,7 +488,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
@@ -497,10 +497,10 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
diff --git a/tests/unit/test_event_queues.py b/tests/unit/test_event_queues.py
index eafd601e9..d93e088d8 100644
--- a/tests/unit/test_event_queues.py
+++ b/tests/unit/test_event_queues.py
@@ -637,6 +637,36 @@ class TestEventWatchers(EventQueueBaseTestCase):
result_queues["other-tenant"]["post"].put(result_event)
self._wait_for_event(event)
+ def test_pipeline_event_watcher_recreate(self):
+ event = threading.Event()
+ watcher = event_queues.EventWatcher(self.zk_client, event.set)
+
+ management_queues = (
+ event_queues.PipelineManagementEventQueue.createRegistry(
+ self.zk_client
+ )
+ )
+ self.assertFalse(event.is_set())
+
+ management_queues["tenant"]["check"].put(model.ReconfigureEvent())
+ self._wait_for_event(event)
+
+ # Wait for the watch to be fully established to avoid race
+ # conditions, since the event watcher will also ensure that the
+ # trigger and result event paths exist.
+ for _ in iterate_timeout(5, "all watches to be established"):
+ if watcher.watched_pipelines:
+ break
+
+ self.zk_client.client.delete(
+ event_queues.PIPELINE_NAME_ROOT.format(
+ tenant="tenant", pipeline="check"), recursive=True)
+ event.clear()
+
+ management_queues["tenant"]["check"].initialize()
+ management_queues["tenant"]["check"].put(model.ReconfigureEvent())
+ self._wait_for_event(event)
+
class TestConnectionEventQueue(EventQueueBaseTestCase):
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py
index 2e3057af6..2a63d5ef8 100644
--- a/tests/unit/test_gerrit.py
+++ b/tests/unit/test_gerrit.py
@@ -957,3 +957,48 @@ class TestGerritConnection(ZuulTestCase):
self.assertEqual(B.queried, 2)
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
+
+
+class TestGerritUnicodeRefs(ZuulTestCase):
+ config_file = 'zuul-gerrit-web.conf'
+ tenant_config_file = 'config/single-tenant/main.yaml'
+
+ upload_pack_data = (b'014452944ee370db5c87691e62e0f9079b6281319b4e HEAD'
+ b'\x00multi_ack thin-pack side-band side-band-64k '
+ b'ofs-delta shallow deepen-since deepen-not '
+ b'deepen-relative no-progress include-tag '
+ b'multi_ack_detailed allow-tip-sha1-in-want '
+ b'allow-reachable-sha1-in-want '
+ b'symref=HEAD:refs/heads/faster filter '
+ b'object-format=sha1 agent=git/2.37.1.gl1\n'
+ b'003d5f42665d737b3fd4ec22ca0209e6191859f09fd6 '
+ b'refs/for/faster\n'
+ b'004952944ee370db5c87691e62e0f9079b6281319b4e '
+ b'refs/heads/foo/\xf0\x9f\x94\xa5\xf0\x9f\x94\xa5'
+ b'\xf0\x9f\x94\xa5\n'
+ b'003f52944ee370db5c87691e62e0f9079b6281319b4e '
+ b'refs/heads/faster\n0000').decode("utf-8")
+
+ def test_mb_unicode_refs(self):
+ gerrit_config = {
+ 'user': 'gerrit',
+ 'server': 'localhost',
+ }
+ driver = GerritDriver()
+ gerrit = GerritConnection(driver, 'review_gerrit', gerrit_config)
+
+ def _uploadPack(project):
+ return self.upload_pack_data
+
+ self.patch(gerrit, '_uploadPack', _uploadPack)
+
+ project = gerrit.source.getProject('org/project')
+ refs = gerrit.getInfoRefs(project)
+
+ self.assertEqual(refs,
+ {'refs/for/faster':
+ '5f42665d737b3fd4ec22ca0209e6191859f09fd6',
+ 'refs/heads/foo/🔥🔥🔥':
+ '52944ee370db5c87691e62e0f9079b6281319b4e',
+ 'refs/heads/faster':
+ '52944ee370db5c87691e62e0f9079b6281319b4e'})
diff --git a/tests/unit/test_git_driver.py b/tests/unit/test_git_driver.py
index 06e2ac7c8..95fca30d3 100644
--- a/tests/unit/test_git_driver.py
+++ b/tests/unit/test_git_driver.py
@@ -62,7 +62,8 @@ class TestGitDriver(ZuulTestCase):
# Update zuul.yaml to force a tenant reconfiguration
path = os.path.join(self.upstream_root, 'common-config', 'zuul.yaml')
- config = yaml.safe_load(open(path, 'r').read())
+ with open(path, 'r') as f:
+ config = yaml.safe_load(f)
change = {
'name': 'org/project',
'check': {
diff --git a/tests/unit/test_inventory.py b/tests/unit/test_inventory.py
index 5b30a139f..40f858624 100644
--- a/tests/unit/test_inventory.py
+++ b/tests/unit/test_inventory.py
@@ -57,7 +57,8 @@ class TestInventoryBase(ZuulTestCase):
build = self.getBuildByName(name)
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
return inventory
def _get_setup_inventory(self, name):
@@ -65,7 +66,9 @@ class TestInventoryBase(ZuulTestCase):
build = self.getBuildByName(name)
setup_inv_path = build.jobdir.setup_playbook.inventory
- return yaml.ansible_unsafe_load(open(setup_inv_path, 'r'))
+ with open(setup_inv_path, 'r') as f:
+ inventory = yaml.ansible_unsafe_load(f)
+ return inventory
def runJob(self, name):
self.hold_jobs_in_queue = False
@@ -409,26 +412,34 @@ class TestAnsibleInventory(AnsibleZuulTestCase):
build = self.history[0]
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
zv_path = os.path.join(build.jobdir.root, 'ansible', 'zuul_vars.yaml')
- zv = yaml.safe_load(open(zv_path, 'r'))
+ with open(zv_path, 'r') as f:
+ zv = yaml.ansible_unsafe_load(f)
# TODO(corvus): zuul vars aren't really stored here anymore;
# rework these tests to examine them separately.
inventory['all']['vars'] = {'zuul': zv['zuul']}
+ # The deprecated base64 version
decoded_message = base64.b64decode(
inventory['all']['vars']['zuul']['message']).decode('utf-8')
self.assertEqual(decoded_message, expected_message)
-
obtained_message = self._get_file(self.history[0],
'work/logs/commit-message.txt')
+ self.assertEqual(obtained_message, expected_message)
+ # The new !unsafe version
+ decoded_message = inventory['all']['vars']['zuul']['change_message']
+ self.assertEqual(decoded_message, expected_message)
+ obtained_message = self._get_file(self.history[0],
+ 'work/logs/change-message.txt')
self.assertEqual(obtained_message, expected_message)
def test_jinja2_message_brackets(self):
- self._jinja2_message("This message has {{ jinja2 }} in it ")
+ self._jinja2_message("This message has {{ ansible_host }} in it ")
def test_jinja2_message_raw(self):
self._jinja2_message("This message has {% raw %} in {% endraw %} it ")
diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py
index aed40b94e..98f971948 100644
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -32,7 +32,9 @@ import zuul.lib.connections
from tests.base import BaseTestCase, FIXTURE_DIR
from zuul.lib.ansible import AnsibleManager
from zuul.lib import tracing
+from zuul.model_api import MODEL_API
from zuul.zk.zkobject import LocalZKContext
+from zuul.zk.components import COMPONENT_REGISTRY
from zuul import change_matcher
@@ -44,6 +46,8 @@ class Dummy(object):
class TestJob(BaseTestCase):
def setUp(self):
+ COMPONENT_REGISTRY.registry = Dummy()
+ COMPONENT_REGISTRY.registry.model_api = MODEL_API
self._env_fixture = self.useFixture(
fixtures.EnvironmentVariable('HISTTIMEFORMAT', '%Y-%m-%dT%T%z '))
super(TestJob, self).setUp()
diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py
index f392e8c3e..a5a49bed4 100644
--- a/tests/unit/test_model_upgrade.py
+++ b/tests/unit/test_model_upgrade.py
@@ -254,6 +254,45 @@ class TestModelUpgrade(ZuulTestCase):
result='SUCCESS', changes='1,1'),
], ordered=False)
+ @model_version(11)
+ def test_model_11_12(self):
+ # This excercises the upgrade to store build/job versions
+ first = self.scheds.first
+ second = self.createScheduler()
+ second.start()
+ self.assertEqual(len(self.scheds), 2)
+ for _ in iterate_timeout(10, "until priming is complete"):
+ state_one = first.sched.local_layout_state.get("tenant-one")
+ if state_one:
+ break
+
+ for _ in iterate_timeout(
+ 10, "all schedulers to have the same layout state"):
+ if (second.sched.local_layout_state.get(
+ "tenant-one") == state_one):
+ break
+
+ self.executor_server.hold_jobs_in_build = True
+ with second.sched.layout_update_lock, second.sched.run_handler_lock:
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled(matcher=[first])
+
+ self.model_test_component_info.model_api = 12
+ with first.sched.layout_update_lock, first.sched.run_handler_lock:
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled(matcher=[second])
+
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='SUCCESS', changes='1,1'),
+ dict(name='project-test2', result='SUCCESS', changes='1,1'),
+ dict(name='project1-project2-integration',
+ result='SUCCESS', changes='1,1'),
+ ], ordered=False)
+
class TestGithubModelUpgrade(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
diff --git a/tests/unit/test_reporting.py b/tests/unit/test_reporting.py
index 2cf93cdcb..0c5c5fbc9 100644
--- a/tests/unit/test_reporting.py
+++ b/tests/unit/test_reporting.py
@@ -151,7 +151,7 @@ class TestReporting(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
for x in buildsets:
@@ -180,7 +180,7 @@ class TestReporting(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
for x in buildsets:
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 3a6544937..1a5657ed6 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -52,8 +52,9 @@ from tests.base import (
skipIfMultiScheduler,
)
from zuul.zk.change_cache import ChangeKey
+from zuul.zk.event_queues import PIPELINE_NAME_ROOT
from zuul.zk.layout import LayoutState
-from zuul.zk.locks import management_queue_lock
+from zuul.zk.locks import management_queue_lock, pipeline_lock
from zuul.zk import zkobject
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
@@ -460,6 +461,7 @@ class TestScheduler(ZuulTestCase):
'zuul.mergers.online', value='1', kind='g')
self.assertReportedStat('zuul.scheduler.eventqueues.connection.gerrit',
value='0', kind='g')
+ self.assertReportedStat('zuul.scheduler.run_handler', kind='ms')
# Catch time / monotonic errors
for key in [
@@ -489,9 +491,10 @@ class TestScheduler(ZuulTestCase):
'zuul.tenant.tenant-one.pipeline.gate.write_objects',
'zuul.tenant.tenant-one.pipeline.gate.read_znodes',
'zuul.tenant.tenant-one.pipeline.gate.write_znodes',
- 'zuul.tenant.tenant-one.pipeline.gate.read_bytes',
'zuul.tenant.tenant-one.pipeline.gate.write_bytes',
]:
+ # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is
+ # expected to be zero since it's initialized after reading
val = self.assertReportedStat(key, kind='g')
self.assertTrue(0.0 < float(val) < 60000.0)
@@ -3586,8 +3589,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3609,8 +3615,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3632,8 +3641,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3654,8 +3666,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3677,8 +3692,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -6199,7 +6217,8 @@ For CI problems and help debugging, contact ci@example.org"""
build = self.getBuildByName('check-job')
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
label = inventory['all']['hosts']['controller']['nodepool']['label']
self.assertEqual('slow-label', label)
@@ -6508,6 +6527,33 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
+ def test_leaked_pipeline_cleanup(self):
+ self.waitUntilSettled()
+ sched = self.scheds.first.sched
+
+ pipeline_state_path = "/zuul/tenant/tenant-one/pipeline/invalid"
+ self.zk_client.client.ensure_path(pipeline_state_path)
+
+ # Create the ZK path as a side-effect of getting the event queue.
+ sched.pipeline_management_events["tenant-one"]["invalid"]
+ pipeline_event_queue_path = PIPELINE_NAME_ROOT.format(
+ tenant="tenant-one", pipeline="invalid")
+
+ self.assertIsNotNone(self.zk_client.client.exists(pipeline_state_path))
+ # Wait for the event watcher to create the event queues
+ for _ in iterate_timeout(30, "create event queues"):
+ for event_queue in ("management", "trigger", "result"):
+ if self.zk_client.client.exists(
+ f"{pipeline_event_queue_path}/{event_queue}") is None:
+ break
+ else:
+ break
+
+ sched._runLeakedPipelineCleanup()
+ self.assertIsNone(
+ self.zk_client.client.exists(pipeline_event_queue_path))
+ self.assertIsNone(self.zk_client.client.exists(pipeline_state_path))
+
class TestChangeQueues(ZuulTestCase):
tenant_config_file = 'config/change-queues/main.yaml'
diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py
index 37a47c6ae..4f2110f3e 100644
--- a/tests/unit/test_sos.py
+++ b/tests/unit/test_sos.py
@@ -244,6 +244,79 @@ class TestScaleOutScheduler(ZuulTestCase):
self.assertTrue(all(l == new.uuid for l in layout_uuids))
self.waitUntilSettled()
+ def test_live_reconfiguration_del_pipeline(self):
+ # Test pipeline deletion while changes are enqueued
+
+ # Create a second scheduler instance
+ app = self.createScheduler()
+ app.start()
+ self.assertEqual(len(self.scheds), 2)
+
+ for _ in iterate_timeout(10, "Wait until priming is complete"):
+ old = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
+ if old is not None:
+ break
+
+ for _ in iterate_timeout(
+ 10, "Wait for all schedulers to have the same layout state"):
+ layout_states = [a.sched.local_layout_state.get("tenant-one")
+ for a in self.scheds.instances]
+ if all(l == old for l in layout_states):
+ break
+
+ pipeline_zk_path = app.sched.abide.tenants[
+ "tenant-one"].layout.pipelines["check"].state.getPath()
+
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+
+ # Let the first scheduler enqueue the change into the pipeline that
+ # will be removed later on.
+ with app.sched.run_handler_lock:
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled(matcher=[self.scheds.first])
+
+ # Process item only on second scheduler so the first scheduler has
+ # an outdated pipeline state.
+ with self.scheds.first.sched.run_handler_lock:
+ self.executor_server.release('.*-merge')
+ self.waitUntilSettled(matcher=[app])
+ self.assertEqual(len(self.builds), 2)
+
+ self.commitConfigUpdate(
+ 'common-config',
+ 'layouts/live-reconfiguration-del-pipeline.yaml')
+ # Trigger a reconfiguration on the first scheduler with the outdated
+ # pipeline state of the pipeline that will be removed.
+ self.scheds.execute(lambda a: a.sched.reconfigure(a.config),
+ matcher=[self.scheds.first])
+
+ new = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
+ for _ in iterate_timeout(
+ 10, "Wait for all schedulers to have the same layout state"):
+ layout_states = [a.sched.local_layout_state.get("tenant-one")
+ for a in self.scheds.instances]
+ if all(l == new for l in layout_states):
+ break
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(A.reported, 0)
+
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='ABORTED', changes='1,1'),
+ dict(name='project-test2', result='ABORTED', changes='1,1'),
+ ], ordered=False)
+
+ tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
+ self.assertEqual(len(tenant.layout.pipelines), 0)
+ stat = self.zk_client.client.exists(pipeline_zk_path)
+ self.assertIsNone(stat)
+
def test_change_cache(self):
# Test re-using a change from the change cache.
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py
index ba3117f59..7e6a2e635 100644
--- a/tests/unit/test_streaming.py
+++ b/tests/unit/test_streaming.py
@@ -138,6 +138,17 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase):
s.close()
self.streamer.stop()
+ def _readSocket(self, sock, build_uuid, event, name):
+ msg = "%s\r\n" % build_uuid
+ sock.sendall(msg.encode('utf-8'))
+ event.set() # notify we are connected and req sent
+ while True:
+ data = sock.recv(1024)
+ if not data:
+ break
+ self.streaming_data[name] += data.decode('utf-8')
+ sock.shutdown(socket.SHUT_RDWR)
+
def runFingerClient(self, build_uuid, gateway_address, event, name=None):
# Wait until the gateway is started
for x in iterate_timeout(30, "finger client to start"):
@@ -154,7 +165,7 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase):
self.streaming_data[name] = ''
with socket.create_connection(gateway_address) as s:
if self.fingergw_use_ssl:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = False
context.load_cert_chain(
@@ -162,17 +173,10 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase):
os.path.join(FIXTURE_DIR, 'fingergw/fingergw.key'))
context.load_verify_locations(
os.path.join(FIXTURE_DIR, 'fingergw/root-ca.pem'))
- s = context.wrap_socket(s)
-
- msg = "%s\r\n" % build_uuid
- s.sendall(msg.encode('utf-8'))
- event.set() # notify we are connected and req sent
- while True:
- data = s.recv(1024)
- if not data:
- break
- self.streaming_data[name] += data.decode('utf-8')
- s.shutdown(socket.SHUT_RDWR)
+ with context.wrap_socket(s) as s:
+ self._readSocket(s, build_uuid, event, name)
+ else:
+ self._readSocket(s, build_uuid, event, name)
def runFingerGateway(self, zone=None):
self.log.info('Starting fingergw with zone %s', zone)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 81d95927a..004ede862 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -1557,6 +1557,43 @@ class TestInRepoConfig(ZuulTestCase):
'start_line': 5},
})
+ def test_dynamic_config_job_anchors(self):
+ # Test the use of anchors in job configuration. This is a
+ # regression test designed to catch a failure where we freeze
+ # the first job and in doing so, mutate the vars dict. The
+ # intended behavior is that the two jobs end up with two
+ # separate python objects for their vars dicts.
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: myvars
+ vars: &anchor
+ plugins:
+ foo: bar
+
+ - job:
+ name: project-test1
+ timeout: 999999999999
+ vars: *anchor
+
+ - project:
+ name: org/project
+ check:
+ jobs:
+ - project-test1
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(A.reported, 1,
+ "A should report failure")
+ self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1")
+ self.assertIn('max-job-timeout', A.messages[0])
+ self.assertHistory([])
+
def test_dynamic_config_non_existing_job_in_template(self):
"""Test that requesting a non existent job fails"""
in_repo_conf = textwrap.dedent(
@@ -5283,7 +5320,8 @@ class TestRoleBranches(RoleTestCase):
def getBuildInventory(self, name):
build = self.getBuildByName(name)
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
return inventory
def getCheckout(self, build, path):
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index 7e3c19dfe..b5697ee36 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -18,6 +18,7 @@ import json
import queue
import threading
import uuid
+from unittest import mock
import testtools
@@ -53,10 +54,12 @@ from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
iterate_timeout
)
-from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext
+from zuul.zk.zkobject import (
+ ShardedZKObject, ZKObject, ZKContext
+)
from zuul.zk.locks import tenant_write_lock
-from kazoo.exceptions import ZookeeperError, OperationTimeoutError
+from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError
class ZooKeeperBaseTestCase(BaseTestCase):
@@ -2037,3 +2040,80 @@ class TestBlobStore(ZooKeeperBaseTestCase):
with testtools.ExpectedException(KeyError):
bs.get(path)
+
+
+class TestPipelineInit(ZooKeeperBaseTestCase):
+ # Test the initialize-on-refresh code paths of various pipeline objects
+
+ def test_pipeline_state_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.state.refresh(context)
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_state_existing_object(self):
+ # Test the initialize-on-refresh code path with a pre-existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.manager = mock.Mock()
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ # We refresh the change list here purely for the side effect
+ # of creating the pipeline state object with no data (the list
+ # is a subpath of the state object).
+ pipeline.change_list.refresh(context)
+ pipeline.state.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_change_list_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.change_list.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ pipeline.manager = mock.Mock()
+ pipeline.state.refresh(context)
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_change_list_new_object_without_lock(self):
+ # Test the initialize-on-refresh code path if we don't have
+ # the lock. This should fail.
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ with testtools.ExpectedException(NoNodeError):
+ pipeline.change_list.refresh(context, allow_init=False)
+ self.assertIsNone(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ pipeline.manager = mock.Mock()
+ pipeline.state.refresh(context)
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
diff --git a/web/src/App.jsx b/web/src/App.jsx
index 6a5c6e010..9a0caf551 100644
--- a/web/src/App.jsx
+++ b/web/src/App.jsx
@@ -32,6 +32,8 @@ import {
ButtonVariant,
Dropdown,
DropdownItem,
+ DropdownToggle,
+ DropdownSeparator,
KebabToggle,
Modal,
Nav,
@@ -54,6 +56,7 @@ import {
import {
BellIcon,
BookIcon,
+ ChevronDownIcon,
CodeIcon,
ServiceIcon,
UsersIcon,
@@ -67,6 +70,7 @@ import ConfigModal from './containers/config/Config'
import logo from './images/logo.svg'
import { clearNotification } from './actions/notifications'
import { fetchConfigErrorsAction, clearConfigErrorsAction } from './actions/configErrors'
+import { fetchTenantsIfNeeded } from './actions/tenants'
import { routes } from './routes'
import { setTenantAction } from './actions/tenant'
import { configureAuthFromTenant, configureAuthFromInfo } from './actions/auth'
@@ -81,6 +85,7 @@ class App extends React.Component {
configErrorsReady: PropTypes.bool,
info: PropTypes.object,
tenant: PropTypes.object,
+ tenants: PropTypes.object,
timezone: PropTypes.string,
location: PropTypes.object,
history: PropTypes.object,
@@ -93,6 +98,7 @@ class App extends React.Component {
state = {
showErrors: false,
+ isTenantDropdownOpen: false,
}
renderMenu() {
@@ -199,6 +205,7 @@ class App extends React.Component {
} else if (!info.tenant) {
// Multi tenant, look for tenant name in url
whiteLabel = false
+ this.props.dispatch(fetchTenantsIfNeeded())
const match = matchPath(
this.props.location.pathname, { path: '/t/:tenant' })
@@ -368,6 +375,91 @@ class App extends React.Component {
)
}
+ renderTenantDropdown() {
+ const { tenant, tenants } = this.props
+ const { isTenantDropdownOpen } = this.state
+
+ if (tenant.whiteLabel) {
+ return (
+ <PageHeaderToolsItem>
+ <strong>Tenant</strong> {tenant.name}
+ </PageHeaderToolsItem>
+ )
+ } else {
+ const tenantLink = (_tenant) => {
+ const currentPath = this.props.location.pathname
+ let suffix
+ switch (currentPath) {
+ case '/t/' + tenant.name + '/projects':
+ suffix = '/projects'
+ break
+ case '/t/' + tenant.name + '/jobs':
+ suffix = '/jobs'
+ break
+ case '/t/' + tenant.name + '/labels':
+ suffix = '/labels'
+ break
+ case '/t/' + tenant.name + '/nodes':
+ suffix = '/nodes'
+ break
+ case '/t/' + tenant.name + '/autoholds':
+ suffix = '/autoholds'
+ break
+ case '/t/' + tenant.name + '/builds':
+ suffix = '/builds'
+ break
+ case '/t/' + tenant.name + '/buildsets':
+ suffix = '/buildsets'
+ break
+ case '/t/' + tenant.name + '/status':
+ default:
+ // all other paths point to tenant-specific resources that would most likely result in a 404
+ suffix = '/status'
+ break
+ }
+ return <Link to={'/t/' + _tenant.name + suffix}>{_tenant.name}</Link>
+ }
+
+ const options = tenants.tenants.filter(
+ (_tenant) => (_tenant.name !== tenant.name)
+ ).map(
+ (_tenant, idx) => {
+ return (
+ <DropdownItem key={'tenant-dropdown-' + idx} component={tenantLink(_tenant)} />
+ )
+ })
+ options.push(
+ <DropdownSeparator key="tenant-dropdown-separator" />,
+ <DropdownItem
+ key="tenant-dropdown-tenants_page"
+ component={<Link to={tenant.defaultRoute}>Go to tenants page</Link>} />
+ )
+
+ return (tenants.isFetching ?
+ <PageHeaderToolsItem>
+ Loading tenants ...
+ </PageHeaderToolsItem> :
+ <>
+ <PageHeaderToolsItem>
+ <Dropdown
+ isOpen={isTenantDropdownOpen}
+ toggle={
+ <DropdownToggle
+ className={`zuul-menu-dropdown-toggle${isTenantDropdownOpen ? '-expanded' : ''}`}
+ id="tenant-dropdown-toggle-id"
+ onToggle={(isOpen) => { this.setState({ isTenantDropdownOpen: isOpen }) }}
+ toggleIndicator={ChevronDownIcon}
+ >
+ <strong>Tenant</strong> {tenant.name}
+ </DropdownToggle>}
+ onSelect={() => { this.setState({ isTenantDropdownOpen: !isTenantDropdownOpen }) }}
+ dropdownItems={options}
+ />
+ </PageHeaderToolsItem>
+ </>)
+ }
+ }
+
render() {
const { isKebabDropdownOpen } = this.state
const { notifications, configErrors, tenant, info, auth } = this.props
@@ -406,7 +498,7 @@ class App extends React.Component {
key="tenant"
onClick={event => this.handleTenantLink(event)}
>
- <UsersIcon /> Tenant
+ <UsersIcon /> Tenants
</DropdownItem>
)
}
@@ -445,15 +537,7 @@ class App extends React.Component {
</Button>
</a>
</PageHeaderToolsItem>
- {tenant.name && (
- <PageHeaderToolsItem>
- <Link to={tenant.defaultRoute}>
- <Button variant={ButtonVariant.plain}>
- <strong>Tenant</strong> {tenant.name}
- </Button>
- </Link>
- </PageHeaderToolsItem>
- )}
+ {tenant.name && (this.renderTenantDropdown())}
</PageHeaderToolsGroup>
<PageHeaderToolsGroup>
{/* this kebab dropdown replaces the icon buttons and is hidden for
@@ -521,6 +605,7 @@ export default withRouter(connect(
configErrorsReady: state.configErrors.ready,
info: state.info,
tenant: state.tenant,
+ tenants: state.tenants,
timezone: state.timezone,
user: state.user,
auth: state.auth,
diff --git a/web/src/App.test.jsx b/web/src/App.test.jsx
index a1d0234d9..519980c7a 100644
--- a/web/src/App.test.jsx
+++ b/web/src/App.test.jsx
@@ -135,8 +135,9 @@ it('renders single tenant', async () => {
// Link should be white-label scoped
const topMenuLinks = application.root.findAllByType(Link)
expect(topMenuLinks[0].props.to).toEqual('/status')
- expect(topMenuLinks[3].props.to.pathname).toEqual('/status')
- expect(topMenuLinks[4].props.to.pathname).toEqual('/projects')
+ expect(topMenuLinks[1].props.to).toEqual('/openapi')
+ expect(topMenuLinks[2].props.to.pathname).toEqual('/status')
+ expect(topMenuLinks[3].props.to.pathname).toEqual('/projects')
// Location should be /status
expect(location.pathname).toEqual('/status')
// Info should tell white label tenant openstack
diff --git a/web/src/containers/timezone/SelectTz.jsx b/web/src/containers/timezone/SelectTz.jsx
index aaa585336..576645f6c 100644
--- a/web/src/containers/timezone/SelectTz.jsx
+++ b/web/src/containers/timezone/SelectTz.jsx
@@ -12,9 +12,9 @@
import PropTypes from 'prop-types'
import React from 'react'
-import Select from 'react-select'
+import Select, { components } from 'react-select'
import moment from 'moment-timezone'
-import { OutlinedClockIcon } from '@patternfly/react-icons'
+import { OutlinedClockIcon, ChevronDownIcon } from '@patternfly/react-icons'
import { connect } from 'react-redux'
import { setTimezoneAction } from '../../actions/timezone'
@@ -58,7 +58,7 @@ class SelectTz extends React.Component {
}
render() {
- const textColor = '#d1d1d1'
+ const textColor = '#fff'
const containerStyles= {
border: 'solid #2b2b2b',
borderWidth: '0 0 0 1px',
@@ -83,7 +83,11 @@ class SelectTz extends React.Component {
}),
dropdownIndicator:(provided) => ({
...provided,
- padding: '3px'
+ color: '#fff',
+ padding: '3px',
+ ':hover': {
+ color: '#fff'
+ }
}),
indicatorSeparator: () => {},
menu: (provided) => ({
@@ -93,12 +97,22 @@ class SelectTz extends React.Component {
top: '22px',
})
}
+
+ const DropdownIndicator = (props) => {
+ return (
+ <components.DropdownIndicator {...props}>
+ <ChevronDownIcon />
+ </components.DropdownIndicator>
+ )
+ }
+
return (
<div style={containerStyles}>
<OutlinedClockIcon/>
<Select
className="zuul-select-tz"
styles={customStyles}
+ components={{ DropdownIndicator }}
value={this.state.currentValue}
onChange={this.handleChange}
options={this.state.availableTz}
diff --git a/web/src/index.css b/web/src/index.css
index 6fec50911..587804cfa 100644
--- a/web/src/index.css
+++ b/web/src/index.css
@@ -66,6 +66,21 @@ a.refresh {
font-weight: bold;
}
+.zuul-menu-dropdown-toggle:before {
+ content: none !important;
+}
+
+.zuul-menu-dropdown-toggle:hover {
+ border-bottom: none;
+}
+
+.zuul-menu-dropdown-toggle-expanded:before {
+ border-left: none;
+ border-right: none;
+ border-top: none;
+ border-bottom: none;
+}
+
/* Remove ugly outline when a Switch is selected */
.pf-c-switch {
--pf-c-switch__input--focus__toggle--OutlineWidth: 0;
diff --git a/zuul/ansible/logconfig.py b/zuul/ansible/logconfig.py
index 66881336a..2d7c37463 100644
--- a/zuul/ansible/logconfig.py
+++ b/zuul/ansible/logconfig.py
@@ -140,7 +140,8 @@ def _read_config_file(filename: str):
raise ValueError("Unable to read logging config file at %s" % filename)
if os.path.splitext(filename)[1] in ('.yml', '.yaml', '.json'):
- return yaml.safe_load(open(filename, 'r'))
+ with open(filename, 'r') as f:
+ return yaml.safe_load(f)
return filename
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 1e7e010cc..f9e52595e 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -437,6 +437,30 @@ def ansible_vars_dict(value):
ansible_var_name(key)
+def copy_safe_config(conf):
+ """Return a deep copy of a config dictionary.
+
+ This lets us assign values of a config dictionary to configuration
+ objects, even if those values are nested dictionaries. This way
+ we can safely freeze the configuration object (the process of
+ which mutates dictionaries) without mutating the original
+ configuration.
+
+ Meanwhile, this does retain the original context information as a
+ single object (some behaviors rely on mutating the source context
+ (e.g., pragma)).
+
+ """
+ ret = copy.deepcopy(conf)
+ for key in (
+ '_source_context',
+ '_start_mark',
+ ):
+ if key in conf:
+ ret[key] = conf[key]
+ return ret
+
+
class PragmaParser(object):
pragma = {
'implied-branch-matchers': bool,
@@ -452,6 +476,7 @@ class PragmaParser(object):
self.pcontext = pcontext
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
bm = conf.get('implied-branch-matchers')
@@ -512,6 +537,7 @@ class NodeSetParser(object):
return vs.Schema(nodeset)
def fromYaml(self, conf, anonymous=False):
+ conf = copy_safe_config(conf)
if anonymous:
self.anon_schema(conf)
self.anonymous = True
@@ -599,6 +625,7 @@ class SecretParser(object):
return vs.Schema(secret)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
s = model.Secret(conf['name'], conf['_source_context'])
s.source_context = conf['_source_context']
@@ -723,6 +750,7 @@ class JobParser(object):
def fromYaml(self, conf, project_pipeline=False, name=None,
validate=True):
+ conf = copy_safe_config(conf)
if validate:
self.schema(conf)
@@ -1075,6 +1103,7 @@ class ProjectTemplateParser(object):
return vs.Schema(project)
def fromYaml(self, conf, validate=True, freeze=True):
+ conf = copy_safe_config(conf)
if validate:
self.schema(conf)
source_context = conf['_source_context']
@@ -1165,6 +1194,7 @@ class ProjectParser(object):
return vs.Schema(project)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
project_name = conf.get('name')
@@ -1328,6 +1358,7 @@ class PipelineParser(object):
return vs.Schema(pipeline)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
pipeline = model.Pipeline(conf['name'], self.pcontext.tenant)
pipeline.source_context = conf['_source_context']
@@ -1469,6 +1500,7 @@ class SemaphoreParser(object):
return vs.Schema(semaphore)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
semaphore = model.Semaphore(conf['name'], conf.get('max', 1))
semaphore.source_context = conf.get('_source_context')
@@ -1494,6 +1526,7 @@ class QueueParser:
return vs.Schema(queue)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
queue = model.Queue(
conf['name'],
@@ -1523,6 +1556,7 @@ class AuthorizationRuleParser(object):
return vs.Schema(authRule)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
a = model.AuthZRuleTree(conf['name'])
@@ -1556,6 +1590,7 @@ class GlobalSemaphoreParser(object):
return vs.Schema(semaphore)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
semaphore = model.Semaphore(conf['name'], conf.get('max', 1),
global_scope=True)
@@ -1576,6 +1611,7 @@ class ApiRootParser(object):
return vs.Schema(api_root)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
api_root = model.ApiRoot(conf.get('authentication-realm'))
api_root.access_rules = conf.get('access-rules', [])
@@ -1770,8 +1806,10 @@ class TenantParser(object):
for branch_future in as_completed(branch_futures.keys()):
tpc = branch_futures[branch_future]
- source_context = model.ProjectContext(
- tpc.project.canonical_name, tpc.project.name)
+ trusted, _ = tenant.getProject(tpc.project.canonical_name)
+ source_context = model.SourceContext(
+ tpc.project.canonical_name, tpc.project.name,
+ tpc.project.connection_name, None, None, trusted)
with project_configuration_exceptions(source_context,
loading_errors):
self._getProjectBranches(tenant, tpc, branch_cache_min_ltimes)
@@ -2602,8 +2640,9 @@ class TenantParser(object):
project_metadata.merge_mode = model.MERGER_MAP[mode]
tpc = tenant.project_configs[project.canonical_name]
if tpc.merge_modes is not None:
- source_context = model.ProjectContext(
- project.canonical_name, project.name)
+ source_context = model.SourceContext(
+ project.canonical_name, project.name,
+ project.connection_name, None, None, trusted)
with project_configuration_exceptions(source_context,
layout.loading_errors):
if project_metadata.merge_mode not in tpc.merge_modes:
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 0a1f0ee61..276365e1d 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -1643,7 +1643,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def getInfoRefs(self, project: Project) -> Dict[str, str]:
try:
- data = self._uploadPack(project)
+ # Encode the UTF-8 data back to a byte array, as the size of
+ # each record in the pack is in bytes, and so the slicing must
+ # also be done on a byte-basis.
+ data = self._uploadPack(project).encode("utf-8")
except Exception:
self.log.error("Cannot get references from %s" % project)
raise # keeps error information
@@ -1662,7 +1665,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
plen -= 4
if len(data) - i < plen:
raise Exception("Invalid data in info/refs")
- line = data[i:i + plen]
+ # Once the pack data is sliced, we can safely decode it back
+ # into a (UTF-8) string.
+ line = data[i:i + plen].decode("utf-8")
i += plen
if not read_advertisement:
read_advertisement = True
diff --git a/zuul/driver/sql/alembic/env.py b/zuul/driver/sql/alembic/env.py
index da7b3207f..17b67805e 100644
--- a/zuul/driver/sql/alembic/env.py
+++ b/zuul/driver/sql/alembic/env.py
@@ -53,7 +53,8 @@ def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
- poolclass=pool.NullPool)
+ poolclass=pool.NullPool,
+ future=True)
# we can get the table prefix via the tag object
tag = context.get_tag_argument()
diff --git a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
index 67581a6f9..1735d35f3 100644
--- a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
+++ b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
@@ -24,13 +24,16 @@ def upgrade(table_prefix=''):
connection = op.get_bind()
connection.execute(
- """
- UPDATE {buildset_table}
- SET result=(
- SELECT CASE score
- WHEN 1 THEN 'SUCCESS'
- ELSE 'FAILURE' END)
- """.format(buildset_table=table_prefix + BUILDSET_TABLE))
+ sa.text(
+ """
+ UPDATE {buildset_table}
+ SET result=(
+ SELECT CASE score
+ WHEN 1 THEN 'SUCCESS'
+ ELSE 'FAILURE' END)
+ """.format(buildset_table=table_prefix + BUILDSET_TABLE)
+ )
+ )
op.drop_column(table_prefix + BUILDSET_TABLE, 'score')
diff --git a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
index abfba7247..99d12d750 100644
--- a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
+++ b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
@@ -34,13 +34,16 @@ def upgrade(table_prefix=''):
connection = op.get_bind()
connection.execute(
- """
- UPDATE {buildset_table}
- SET updated=greatest(
- coalesce(first_build_start_time, '1970-01-01 00:00:00'),
- coalesce(last_build_end_time, '1970-01-01 00:00:00'),
- coalesce(event_timestamp, '1970-01-01 00:00:00'))
- """.format(buildset_table=table_prefix + "zuul_buildset"))
+ sa.text(
+ """
+ UPDATE {buildset_table}
+ SET updated=greatest(
+ coalesce(first_build_start_time, '1970-01-01 00:00:00'),
+ coalesce(last_build_end_time, '1970-01-01 00:00:00'),
+ coalesce(event_timestamp, '1970-01-01 00:00:00'))
+ """.format(buildset_table=table_prefix + "zuul_buildset")
+ )
+ )
def downgrade():
diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py
index b89653bba..2d5c39ec3 100644
--- a/zuul/driver/sql/sqlconnection.py
+++ b/zuul/driver/sql/sqlconnection.py
@@ -308,27 +308,31 @@ class SQLConnection(BaseConnection):
def _migrate(self, revision='head'):
"""Perform the alembic migrations for this connection"""
+ # Note that this method needs to be called with an external lock held.
+ # The reason for this is we retrieve the alembic version and run the
+ # alembic migrations in different database transactions which opens
+ # us to races without an external lock.
with self.engine.begin() as conn:
context = alembic.migration.MigrationContext.configure(conn)
current_rev = context.get_current_revision()
- self.log.debug('Current migration revision: %s' % current_rev)
-
- config = alembic.config.Config()
- config.set_main_option("script_location",
- "zuul:driver/sql/alembic")
- config.set_main_option("sqlalchemy.url",
- self.connection_config.get('dburi').
- replace('%', '%%'))
-
- # Alembic lets us add arbitrary data in the tag argument. We can
- # leverage that to tell the upgrade scripts about the table prefix.
- tag = {'table_prefix': self.table_prefix}
-
- if current_rev is None and not self.force_migrations:
- self.metadata.create_all(self.engine)
- alembic.command.stamp(config, revision, tag=tag)
- else:
- alembic.command.upgrade(config, revision, tag=tag)
+ self.log.debug('Current migration revision: %s' % current_rev)
+
+ config = alembic.config.Config()
+ config.set_main_option("script_location",
+ "zuul:driver/sql/alembic")
+ config.set_main_option("sqlalchemy.url",
+ self.connection_config.get('dburi').
+ replace('%', '%%'))
+
+ # Alembic lets us add arbitrary data in the tag argument. We can
+ # leverage that to tell the upgrade scripts about the table prefix.
+ tag = {'table_prefix': self.table_prefix}
+
+ if current_rev is None and not self.force_migrations:
+ self.metadata.create_all(self.engine)
+ alembic.command.stamp(config, revision, tag=tag)
+ else:
+ alembic.command.upgrade(config, revision, tag=tag)
def onLoad(self, zk_client, component_registry=None):
safe_connection = quote_plus(self.connection_name)
diff --git a/zuul/executor/common.py b/zuul/executor/common.py
index ff4522d22..b8393903e 100644
--- a/zuul/executor/common.py
+++ b/zuul/executor/common.py
@@ -65,6 +65,7 @@ def construct_build_params(uuid, connections, job, item, pipeline,
zuul_params['patchset'] = str(item.change.patchset)
if hasattr(item.change, 'message'):
zuul_params['message'] = strings.b64encode(item.change.message)
+ zuul_params['change_message'] = item.change.message
if (hasattr(item.change, 'oldrev') and item.change.oldrev
and item.change.oldrev != '0' * 40):
zuul_params['oldrev'] = item.change.oldrev
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index c3737b5cc..a49bbbbbf 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -14,6 +14,7 @@
# under the License.
import collections
+import copy
import datetime
import json
import logging
@@ -1049,7 +1050,7 @@ class AnsibleJob(object):
# The same, but frozen
self.frozen_hostvars = {}
# The zuul.* vars
- self.zuul_vars = {}
+ self.debug_zuul_vars = {}
self.waiting_for_semaphores = False
def run(self):
@@ -1888,7 +1889,8 @@ class AnsibleJob(object):
logfile=json_output))
return
try:
- output = json.load(open(json_output, 'r'))
+ with open(json_output, 'r') as f:
+ output = json.load(f)
last_playbook = output[-1]
# Transform json to yaml - because it's easier to read and given
# the size of the data it'll be extra-hard to read this as an
@@ -2332,7 +2334,8 @@ class AnsibleJob(object):
def prepareKubeConfig(self, jobdir, data):
kube_cfg_path = jobdir.kubeconfig
if os.path.exists(kube_cfg_path):
- kube_cfg = yaml.safe_load(open(kube_cfg_path))
+ with open(kube_cfg_path) as f:
+ kube_cfg = yaml.safe_load(f)
else:
kube_cfg = {
'apiVersion': 'v1',
@@ -2495,10 +2498,18 @@ class AnsibleJob(object):
if ri.role_path is not None],
))
+ # The zuul vars in the debug inventory.yaml file should not
+ # have any !unsafe tags, so save those before we update the
+ # execution version of those.
+ self.debug_zuul_vars = copy.deepcopy(zuul_vars)
+ if 'change_message' in zuul_vars:
+ zuul_vars['change_message'] = yaml.mark_strings_unsafe(
+ zuul_vars['change_message'])
+
with open(self.jobdir.zuul_vars, 'w') as zuul_vars_yaml:
zuul_vars_yaml.write(
- yaml.safe_dump({'zuul': zuul_vars}, default_flow_style=False))
- self.zuul_vars = zuul_vars
+ yaml.ansible_unsafe_dump({'zuul': zuul_vars},
+ default_flow_style=False))
# Squash all and extra vars into localhost (it's not
# explicitly listed).
@@ -2552,7 +2563,7 @@ class AnsibleJob(object):
inventory = make_inventory_dict(
self.host_list, self.nodeset, self.original_hostvars)
- inventory['all']['vars']['zuul'] = self.zuul_vars
+ inventory['all']['vars']['zuul'] = self.debug_zuul_vars
with open(self.jobdir.inventory, 'w') as inventory_yaml:
inventory_yaml.write(
yaml.ansible_unsafe_dump(
@@ -3481,6 +3492,8 @@ class ExecutorServer(BaseMergeServer):
self.statsd.gauge(base_key + '.load_average', 0)
self.statsd.gauge(base_key + '.pct_used_ram', 0)
self.statsd.gauge(base_key + '.running_builds', 0)
+ self.statsd.close()
+ self.statsd = None
# Use the BaseMergeServer's stop method to disconnect from
# ZooKeeper. We do this as one of the last steps to ensure
diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py
index ad945c1b7..184c9762d 100644
--- a/zuul/lib/fingergw.py
+++ b/zuul/lib/fingergw.py
@@ -47,6 +47,18 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
self.fingergw = kwargs.pop('fingergw')
super(RequestHandler, self).__init__(*args, **kwargs)
+ def _readSocket(self, sock, build_uuid):
+ # timeout only on the connection, let recv() wait forever
+ sock.settimeout(None)
+ msg = "%s\n" % build_uuid # Must have a trailing newline!
+ sock.sendall(msg.encode('utf-8'))
+ while True:
+ data = sock.recv(1024)
+ if data:
+ self.request.sendall(data)
+ else:
+ break
+
def _fingerClient(self, server, port, build_uuid, use_ssl):
'''
Open a finger connection and return all streaming results.
@@ -59,24 +71,16 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
'''
with socket.create_connection((server, port), timeout=10) as s:
if use_ssl:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = self.fingergw.tls_verify_hostnames
context.load_cert_chain(self.fingergw.tls_cert,
self.fingergw.tls_key)
context.load_verify_locations(self.fingergw.tls_ca)
- s = context.wrap_socket(s, server_hostname=server)
-
- # timeout only on the connection, let recv() wait forever
- s.settimeout(None)
- msg = "%s\n" % build_uuid # Must have a trailing newline!
- s.sendall(msg.encode('utf-8'))
- while True:
- data = s.recv(1024)
- if data:
- self.request.sendall(data)
- else:
- break
+ with context.wrap_socket(s, server_hostname=server) as s:
+ self._readSocket(s, build_uuid)
+ else:
+ self._readSocket(s, build_uuid)
def handle(self):
'''
diff --git a/zuul/lib/repl.py b/zuul/lib/repl.py
index ecefae9ea..63a800406 100644
--- a/zuul/lib/repl.py
+++ b/zuul/lib/repl.py
@@ -26,14 +26,14 @@ class ThreadLocalProxy(object):
self.default = default
def __getattr__(self, name):
- obj = self.files.get(threading.currentThread(), self.default)
+ obj = self.files.get(threading.current_thread(), self.default)
return getattr(obj, name)
def register(self, obj):
- self.files[threading.currentThread()] = obj
+ self.files[threading.current_thread()] = obj
def unregister(self):
- self.files.pop(threading.currentThread())
+ self.files.pop(threading.current_thread())
class REPLHandler(socketserver.StreamRequestHandler):
diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py
index 04de4b8cb..a50fb4142 100644
--- a/zuul/lib/streamer_utils.py
+++ b/zuul/lib/streamer_utils.py
@@ -168,7 +168,7 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer):
if all([self.server_ssl_key, self.server_ssl_cert,
self.server_ssl_ca]):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(self.server_ssl_cert, self.server_ssl_key)
context.load_verify_locations(self.server_ssl_ca)
context.verify_mode = ssl.CERT_REQUIRED
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 60eb479e0..36361df11 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -28,6 +28,8 @@ from zuul.model import (
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
+from zuul.zk.exceptions import LockException
+from zuul.zk.locks import pipeline_lock
from opentelemetry import trace
@@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta):
def _postConfig(self):
layout = self.pipeline.tenant.layout
self.buildChangeQueues(layout)
- with self.sched.createZKContext(None, self.log) as ctx,\
- self.currentContext(ctx):
- # Make sure we have state and change list objects, and
- # ensure that they exist in ZK. We don't hold the
- # pipeline lock, but if they don't exist, that means they
- # are new, so no one else will either, so the write on
- # create is okay. If they do exist and we have an old
- # object, we'll just reuse it. If it does exist and we
- # don't have an old object, we'll get a new empty one.
- # Regardless, these will not automatically refresh now, so
- # they will be out of date until they are refreshed later.
- self.pipeline.state = PipelineState.create(
- self.pipeline, layout.uuid, self.pipeline.state)
- self.pipeline.change_list = PipelineChangeList.create(
- self.pipeline)
+ # Make sure we have state and change list objects. We
+ # don't actually ensure they exist in ZK here; these are
+ # just local objects until they are serialized the first
+ # time. Since we don't hold the pipeline lock, we can't
+ # reliably perform any read or write operations; we just
+ # need to ensure we have in-memory objects to work with
+ # and they will be initialized or loaded on the next
+ # refresh.
+
+ # These will be out of date until they are refreshed later.
+ self.pipeline.state = PipelineState.create(
+ self.pipeline, self.pipeline.state)
+ self.pipeline.change_list = PipelineChangeList.create(
+ self.pipeline)
+
+ # Now, try to acquire a non-blocking pipeline lock and refresh
+ # them for the side effect of initializing them if necessary.
+ # In the case of a new pipeline, no one else should have a
+ # lock anyway, and this helps us avoid emitting a whole bunch
+ # of errors elsewhere on startup when these objects don't
+ # exist. If the pipeline already exists and we can't acquire
+ # the lock, that's fine, we're much less likely to encounter
+ # read errors elsewhere in that case anyway.
+ try:
+ with pipeline_lock(
+ self.sched.zk_client, self.pipeline.tenant.name,
+ self.pipeline.name, blocking=False) as lock,\
+ self.sched.createZKContext(lock, self.log) as ctx,\
+ self.currentContext(ctx):
+ if not self.pipeline.state.exists(ctx):
+ # We only do this if the pipeline doesn't exist in
+ # ZK because in that case, this process should be
+ # fast since it's empty. If it does exist,
+ # refreshing it may be slow and since other actors
+ # won't encounter errors due to its absence, we
+ # would rather defer the work to later.
+ self.pipeline.state.refresh(ctx)
+ self.pipeline.change_list.refresh(ctx)
+ except LockException:
+ pass
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")
@@ -276,19 +303,19 @@ class PipelineManager(metaclass=ABCMeta):
if not isinstance(change, model.Change):
return
- change_in_pipeline = False
+ to_refresh = set()
for item in self.pipeline.getAllItems():
if not isinstance(item.change, model.Change):
continue
+ if item.change.equals(change):
+ to_refresh.add(item.change)
for dep_change_ref in item.change.commit_needs_changes:
- if item.change.equals(change):
- change_in_pipeline = True
dep_change_key = ChangeKey.fromReference(dep_change_ref)
if dep_change_key.isSameChange(change.cache_stat.key):
- self.updateCommitDependencies(item.change, None, event)
+ to_refresh.add(item.change)
- if change_in_pipeline:
- self.updateCommitDependencies(change, None, event)
+ for existing_change in to_refresh:
+ self.updateCommitDependencies(existing_change, None, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
diff --git a/zuul/model.py b/zuul/model.py
index 0f17e4e2e..1d82b5f2c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -620,6 +620,18 @@ class PipelineState(zkobject.ZKObject):
_read_only=False,
)
+ def _lateInitData(self):
+ # If we're initializing the object on our initial refresh,
+ # reset the data to this.
+ return dict(
+ state=Pipeline.STATE_NORMAL,
+ queues=[],
+ old_queues=[],
+ consecutive_failures=0,
+ disabled=False,
+ layout_uuid=self.pipeline.tenant.layout.uuid,
+ )
+
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
obj = klass()
@@ -631,21 +643,23 @@ class PipelineState(zkobject.ZKObject):
return obj
@classmethod
- def create(cls, pipeline, layout_uuid, old_state=None):
- # If the object does not exist in ZK, create it with the
- # default attributes and the supplied layout UUID. Otherwise,
- # return an initialized object (or the old object for reuse)
- # without loading any data so that data can be loaded on the
- # next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline, old_state=None):
+ # If we are resetting an existing pipeline, we will have an
+ # old_state, so just clean up the object references there and
+ # let the next refresh handle updating any data.
+ if old_state:
+ old_state._resetObjectRefs()
+ return old_state
+
+ # Otherwise, we are initializing a pipeline that we haven't
+ # seen before. It still might exist in ZK, but since we
+ # haven't seen it, we don't have any object references to
+ # clean up. We can just start with a clean object, set the
+ # pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
- if state.exists(ctx):
- if old_state:
- old_state._resetObjectRefs()
- return old_state
- return state
- return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
+ return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
@@ -712,8 +726,34 @@ class PipelineState(zkobject.ZKObject):
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
+
+ # Notably, this need not prevent us from performing the
+ # initialization below if necessary. The case of the object
+ # being brand new in ZK supercedes our worry that our old copy
+ # might be out of date since our old copy is, itself, brand
+ # new.
self._set(_read_only=read_only)
- return super().refresh(context)
+ try:
+ return super().refresh(context)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and we
+ # should write it to ZK.
+
+ # Note that typically this code is not used since
+ # currently other objects end up creating the pipeline
+ # path in ZK first. It is included in case that ever
+ # changes. Currently the empty byte-string code path in
+ # deserialize() is used instead.
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self._set(**self._lateInitData())
+ self.internalCreate(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
@@ -721,6 +761,20 @@ class PipelineState(zkobject.ZKObject):
# source change cache.
self.pipeline.manager.clearCache()
+ # If the object doesn't exist we will get back an empty byte
+ # string. This happens because the postConfig call creates
+ # this object without holding the pipeline lock, so it can't
+ # determine whether or not it exists in ZK. We do hold the
+ # pipeline lock here, so if we get the empty byte string, we
+ # know we're initializing the object. In that case, we should
+ # initialize the layout id to the current layout. Nothing
+ # else needs to be set.
+ if raw == b'':
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ return self._lateInitData()
+
data = super().deserialize(raw, context)
if not self._read_only:
@@ -895,11 +949,34 @@ class PipelineChangeList(zkobject.ShardedZKObject):
super().__init__()
self._set(
changes=[],
+ _change_keys=[],
)
- def refresh(self, context):
- self._retry(context, super().refresh,
- context, max_tries=5)
+ def refresh(self, context, allow_init=True):
+ # Set allow_init to false to indicate that we don't hold the
+ # lock and we should not try to initialize the object in ZK if
+ # it does not exist.
+ try:
+ self._retry(context, super().refresh,
+ context, max_tries=5)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and
+ # we should write it to ZK.
+ if allow_init:
+ context.log.warning(
+ "Initializing pipeline change list for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self.internalCreate(context)
+ else:
+ # If we're called from a context where we can't
+ # initialize the change list, re-raise the exception.
+ raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
@@ -910,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject):
return pipeline_path + '/change_list'
@classmethod
- def create(cls, pipeline, old_list=None):
- # If the object does not exist in ZK, create it with the
- # default attributes. Otherwise, return an initialized object
- # (or the old object for reuse) without loading any data so
- # that data can be loaded on the next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline):
+ # This object may or may not exist in ZK, but we using any of
+ # that data here. We can just start with a clean object, set
+ # the pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
- if change_list.exists(ctx):
- if old_list:
- return old_list
- return change_list
- return cls.new(ctx, pipeline=pipeline)
+ return change_list
def serialize(self, context):
data = {
@@ -930,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
- def deserialize(self, data, context):
- data = super().deserialize(data, context)
+ def deserialize(self, raw, context):
+ data = super().deserialize(raw, context)
change_keys = []
# We must have a dictionary with a 'changes' key; otherwise we
# may be reading immediately after truncating. Allow the
@@ -1810,24 +1882,6 @@ class FrozenSecret(ConfigObject):
)
-class ProjectContext(ConfigObject):
-
- def __init__(self, project_canonical_name, project_name):
- super().__init__()
- self.project_canonical_name = project_canonical_name
- self.project_name = project_name
- self.branch = None
- self.path = None
-
- def __str__(self):
- return self.project_name
-
- def toDict(self):
- return dict(
- project=self.project_name,
- )
-
-
class SourceContext(ConfigObject):
"""A reference to the branch of a project in configuration.
@@ -2396,6 +2450,12 @@ class FrozenJob(zkobject.ZKObject):
data['_' + job_data_key] = None
return data
+ def _save(self, context, *args, **kw):
+ # Before saving, update the buildset with the new job version
+ # so that future readers know to refresh it.
+ self.buildset.updateJobVersion(context, self)
+ return super()._save(context, *args, **kw)
+
def setWaitingStatus(self, status):
if self.waiting_status == status:
return
@@ -3878,6 +3938,12 @@ class Build(zkobject.ZKObject):
def getPath(self):
return f"{self.job.getPath()}/build/{self.uuid}"
+ def _save(self, context, *args, **kw):
+ # Before saving, update the buildset with the new job version
+ # so that future readers know to refresh it.
+ self.job.buildset.updateBuildVersion(context, self)
+ return super()._save(context, *args, **kw)
+
def __repr__(self):
return ('<Build %s of %s voting:%s>' %
(self.uuid, self.job.name, self.job.voting))
@@ -4088,6 +4154,8 @@ class BuildSet(zkobject.ZKObject):
job_graph=None,
jobs={},
deduplicated_jobs=[],
+ job_versions={},
+ build_versions={},
# Cached job graph of previous layout; not serialized
_old_job_graph=None,
_old_jobs={},
@@ -4199,6 +4267,8 @@ class BuildSet(zkobject.ZKObject):
"configured_time": self.configured_time,
"start_time": self.start_time,
"repo_state_request_time": self.repo_state_request_time,
+ "job_versions": self.job_versions,
+ "build_versions": self.build_versions,
# jobs (serialize as separate objects)
}
return json.dumps(data, sort_keys=True).encode("utf8")
@@ -4296,7 +4366,8 @@ class BuildSet(zkobject.ZKObject):
if job_name in self.jobs:
job = self.jobs[job_name]
- if not old_build_exists:
+ if ((not old_build_exists) or
+ self.shouldRefreshJob(job)):
tpe_jobs.append((None, job_name,
tpe.submit(job.refresh, context)))
else:
@@ -4308,7 +4379,8 @@ class BuildSet(zkobject.ZKObject):
build = self.builds.get(job_name)
builds[job_name] = build
if build and build.getPath() == build_path:
- if not build.result:
+ if ((not build.result) or
+ self.shouldRefreshBuild(build)):
tpe_jobs.append((
None, job_name, tpe.submit(
build.refresh, context)))
@@ -4363,6 +4435,48 @@ class BuildSet(zkobject.ZKObject):
})
return data
+ def updateBuildVersion(self, context, build):
+ # It's tempting to update versions regardless of the model
+ # API, but if we start writing versions before all components
+ # are upgraded we could get out of sync.
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+
+ # It is common for a lot of builds/jobs to be added at once,
+ # so to avoid writing this buildset object repeatedly during
+ # that time, we only update the version after the initial
+ # creation.
+ version = build.getZKVersion()
+ # If zstat is None, we created the object
+ if version is not None:
+ self.build_versions[build.uuid] = version + 1
+ self.updateAttributes(context, build_versions=self.build_versions)
+
+ def updateJobVersion(self, context, job):
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+
+ version = job.getZKVersion()
+ if version is not None:
+ self.job_versions[job.name] = version + 1
+ self.updateAttributes(context, job_versions=self.job_versions)
+
+ def shouldRefreshBuild(self, build):
+ # Unless all schedulers are updating versions, we can't trust
+ # the data.
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+ current = build.getZKVersion()
+ expected = self.build_versions.get(build.uuid, 0)
+ return expected != current
+
+ def shouldRefreshJob(self, job):
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+ current = job.getZKVersion()
+ expected = self.job_versions.get(job.name, 0)
+ return expected != current
+
@property
def ref(self):
# NOTE(jamielennox): The concept of buildset ref is to be removed and a
@@ -5910,8 +6024,7 @@ class Bundle:
def deserialize(cls, context, queue, items_by_path, data):
bundle = cls(data["uuid"])
bundle.items = [
- items_by_path.get(p) or QueueItem.fromZK(
- context, p, pipeline=queue.pipeline, queue=queue)
+ items_by_path.get(p) or QueueItem.fromZK(context, p, queue=queue)
for p in data["items"]
]
bundle.started_reporting = data["started_reporting"]
diff --git a/zuul/model_api.py b/zuul/model_api.py
index 6c93a5177..ccb12077d 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -13,5 +13,5 @@
# under the License.
# When making ZK schema changes, increment this and add a record to
-# docs/developer/model-changelog.rst
-MODEL_API = 11
+# doc/source/developer/model-changelog.rst
+MODEL_API = 12
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 8d85140cd..b8314f162 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -23,6 +23,7 @@ import sys
import threading
import time
import traceback
+import urllib.parse
import uuid
from contextlib import suppress
from zuul.vendor.contextlib import nullcontext
@@ -99,6 +100,8 @@ from zuul.zk.event_queues import (
PipelineManagementEventQueue,
PipelineResultEventQueue,
PipelineTriggerEventQueue,
+ PIPELINE_ROOT,
+ PIPELINE_NAME_ROOT,
TENANT_ROOT,
)
from zuul.zk.exceptions import LockException
@@ -711,6 +714,7 @@ class Scheduler(threading.Thread):
self._runMergerApiCleanup()
self._runLayoutDataCleanup()
self._runBlobStoreCleanup()
+ self._runLeakedPipelineCleanup()
self.maintainConnectionCache()
except Exception:
self.log.exception("Error in general cleanup:")
@@ -753,6 +757,54 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Error in layout data cleanup:")
+ def _runLeakedPipelineCleanup(self):
+ for tenant in self.abide.tenants.values():
+ try:
+ with tenant_read_lock(self.zk_client, tenant.name,
+ blocking=False):
+ if not self.isTenantLayoutUpToDate(tenant.name):
+ self.log.debug(
+ "Skipping leaked pipeline cleanup for tenant %s",
+ tenant.name)
+ continue
+ valid_pipelines = tenant.layout.pipelines.values()
+ valid_state_paths = set(
+ p.state.getPath() for p in valid_pipelines)
+ valid_event_root_paths = set(
+ PIPELINE_NAME_ROOT.format(
+ tenant=p.tenant.name, pipeline=p.name)
+ for p in valid_pipelines)
+
+ safe_tenant = urllib.parse.quote_plus(tenant.name)
+ state_root = f"/zuul/tenant/{safe_tenant}/pipeline"
+ event_root = PIPELINE_ROOT.format(tenant=tenant.name)
+
+ all_state_paths = set(
+ f"{state_root}/{p}" for p in
+ self.zk_client.client.get_children(state_root))
+ all_event_root_paths = set(
+ f"{event_root}/{p}" for p in
+ self.zk_client.client.get_children(event_root))
+
+ leaked_state_paths = all_state_paths - valid_state_paths
+ leaked_event_root_paths = (
+ all_event_root_paths - valid_event_root_paths)
+
+ for leaked_path in (
+ leaked_state_paths | leaked_event_root_paths):
+ self.log.info("Removing leaked pipeline path %s",
+ leaked_path)
+ try:
+ self.zk_client.client.delete(leaked_path,
+ recursive=True)
+ except Exception:
+ self.log.exception(
+ "Error removing leaked pipeline path %s in "
+ "tenant %s", leaked_path, tenant.name)
+ except LockException:
+ # We'll cleanup this tenant on the next iteration
+ pass
+
def _runBlobStoreCleanup(self):
self.log.debug("Starting blob store cleanup")
try:
@@ -1292,6 +1344,21 @@ class Scheduler(threading.Thread):
self.log.info("Local layout update complete for %s (duration: %s "
"seconds)", tenant_name, duration)
+ def isTenantLayoutUpToDate(self, tenant_name):
+ remote_state = self.tenant_layout_state.get(tenant_name)
+ if remote_state is None:
+ # The tenant may still be in the
+ # process of initial configuration
+ self.wake_event.set()
+ return False
+ local_state = self.local_layout_state.get(tenant_name)
+ if local_state is None or remote_state > local_state:
+ self.log.debug("Local layout of tenant %s not up to date",
+ tenant_name)
+ self.layout_update_event.set()
+ return False
+ return True
+
def _checkTenantSourceConf(self, config):
tenant_config = None
script = False
@@ -1481,7 +1548,6 @@ class Scheduler(threading.Thread):
# This is called in the scheduler loop after another thread submits
# a request
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.log.debug("Updating system config")
self.updateSystemConfig()
with self.layout_lock:
@@ -1691,12 +1757,25 @@ class Scheduler(threading.Thread):
new_pipeline = tenant.layout.pipelines.get(name)
if not new_pipeline:
with old_pipeline.manager.currentContext(context):
- self._reconfigureDeletePipeline(old_pipeline)
+ try:
+ self._reconfigureDeletePipeline(old_pipeline)
+ except Exception:
+ self.log.exception(
+ "Failed to cleanup deleted pipeline %s:",
+ old_pipeline)
+ self.management_events[tenant.name].initialize()
+ self.trigger_events[tenant.name].initialize()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
for pipeline in tenant.layout.pipelines.values():
+ self.pipeline_management_events[tenant.name][
+ pipeline.name].initialize()
+ self.pipeline_trigger_events[tenant.name][
+ pipeline.name].initialize()
+ self.pipeline_result_events[tenant.name
+ ][pipeline.name].initialize()
for trigger in pipeline.triggers:
trigger.postConfig(pipeline)
for reporter in pipeline.actions:
@@ -1754,7 +1833,11 @@ class Scheduler(threading.Thread):
(tenant,))
for pipeline in tenant.layout.pipelines.values():
with pipeline.manager.currentContext(context):
- self._reconfigureDeletePipeline(pipeline)
+ try:
+ self._reconfigureDeletePipeline(pipeline)
+ except Exception:
+ self.log.exception(
+ "Failed to cleanup deleted pipeline %s:", pipeline)
# Delete the tenant root path for this tenant in ZooKeeper to remove
# all tenant specific event queues
@@ -1770,45 +1853,80 @@ class Scheduler(threading.Thread):
def _reconfigureDeletePipeline(self, pipeline):
self.log.info("Removing pipeline %s during reconfiguration" %
(pipeline,))
- for shared_queue in pipeline.queues:
- builds_to_cancel = []
- requests_to_cancel = []
- for item in shared_queue.queue:
- with item.activeContext(pipeline.manager.current_context):
- item.item_ahead = None
- item.items_behind = []
- self.log.info(
- "Removing item %s during reconfiguration" % (item,))
- for build in item.current_build_set.getBuilds():
- builds_to_cancel.append(build)
- for request_job, request in \
- item.current_build_set.getNodeRequests():
- requests_to_cancel.append(
- (
- item.current_build_set,
- request,
- item.getJob(request_job),
- )
+
+ ctx = pipeline.manager.current_context
+ pipeline.state.refresh(ctx)
+
+ builds_to_cancel = []
+ requests_to_cancel = []
+ for item in pipeline.getAllItems():
+ with item.activeContext(pipeline.manager.current_context):
+ item.item_ahead = None
+ item.items_behind = []
+ self.log.info(
+ "Removing item %s during reconfiguration" % (item,))
+ for build in item.current_build_set.getBuilds():
+ builds_to_cancel.append(build)
+ for request_job, request in \
+ item.current_build_set.getNodeRequests():
+ requests_to_cancel.append(
+ (
+ item.current_build_set,
+ request,
+ item.getJob(request_job),
)
- try:
- self.sql.reportBuildsetEnd(
- item.current_build_set, 'dequeue',
- final=False, result='DEQUEUED')
- except Exception:
- self.log.exception(
- "Error reporting buildset completion to DB:")
+ )
+ try:
+ self.sql.reportBuildsetEnd(
+ item.current_build_set, 'dequeue',
+ final=False, result='DEQUEUED')
+ except Exception:
+ self.log.exception(
+ "Error reporting buildset completion to DB:")
- for build in builds_to_cancel:
- self.log.info(
- "Canceling build %s during reconfiguration" % (build,))
+ for build in builds_to_cancel:
+ self.log.info(
+ "Canceling build %s during reconfiguration", build)
+ try:
self.cancelJob(build.build_set, build.job,
build=build, force=True)
- for build_set, request, request_job in requests_to_cancel:
- self.log.info(
- "Canceling node request %s during reconfiguration",
- request)
+ except Exception:
+ self.log.exception(
+ "Error canceling build %s during reconfiguration", build)
+ for build_set, request, request_job in requests_to_cancel:
+ self.log.info(
+ "Canceling node request %s during reconfiguration", request)
+ try:
self.cancelJob(build_set, request_job, force=True)
- shared_queue.delete(pipeline.manager.current_context)
+ except Exception:
+ self.log.exception(
+ "Error canceling node request %s during reconfiguration",
+ request)
+
+ # Delete the pipeline event root path in ZooKeeper to remove
+ # all pipeline specific event queues.
+ try:
+ self.zk_client.client.delete(
+ PIPELINE_NAME_ROOT.format(
+ tenant=pipeline.tenant.name,
+ pipeline=pipeline.name),
+ recursive=True)
+ except Exception:
+ # In case a pipeline event has been submitted during
+ # reconfiguration this cleanup will fail.
+ self.log.exception(
+ "Error removing event queues for deleted pipeline %s in "
+ "tenant %s", pipeline.name, pipeline.tenant.name)
+
+ # Delete the pipeline root path in ZooKeeper to remove all pipeline
+ # state.
+ try:
+ self.zk_client.client.delete(pipeline.state.getPath(),
+ recursive=True)
+ except Exception:
+ self.log.exception(
+ "Error removing state for deleted pipeline %s in tenant %s",
+ pipeline.name, pipeline.tenant.name)
def _doPromoteEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
@@ -2007,84 +2125,74 @@ class Scheduler(threading.Thread):
return
self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
- try:
- if not self._stopped:
- self.process_reconfigure_queue()
+ with self.statsd_timer("zuul.scheduler.run_handler"):
+ try:
+ self._run()
+ except Exception:
+ self.log.exception("Exception in run handler:")
+ # There may still be more events to process
+ self.wake_event.set()
+ finally:
+ self.run_handler_lock.release()
- if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.updateSystemConfig()
+ def _run(self):
+ if not self._stopped:
+ self.process_reconfigure_queue()
- for tenant_name in self.unparsed_abide.tenants:
- if self._stopped:
- break
+ if self.unparsed_abide.ltime < self.system_config_cache.ltime:
+ self.updateSystemConfig()
- tenant = self.abide.tenants.get(tenant_name)
- if not tenant:
- continue
+ for tenant_name in self.unparsed_abide.tenants:
+ if self._stopped:
+ break
- # This will also forward events for the pipelines
- # (e.g. enqueue or dequeue events) to the matching
- # pipeline event queues that are processed afterwards.
- self.process_tenant_management_queue(tenant)
+ tenant = self.abide.tenants.get(tenant_name)
+ if not tenant:
+ continue
- if self._stopped:
- break
+ # This will also forward events for the pipelines
+ # (e.g. enqueue or dequeue events) to the matching
+ # pipeline event queues that are processed afterwards.
+ self.process_tenant_management_queue(tenant)
- try:
- with tenant_read_lock(
- self.zk_client, tenant_name, blocking=False
- ) as tlock:
- remote_state = self.tenant_layout_state.get(
- tenant_name)
- if remote_state is None:
- # The tenant may still be in the
- # process of initial configuration
- self.wake_event.set()
- continue
- local_state = self.local_layout_state.get(
- tenant_name)
- if (local_state is None or
- remote_state > local_state):
- self.log.debug(
- "Local layout of tenant %s not up to date",
- tenant.name)
- self.layout_update_event.set()
- continue
+ if self._stopped:
+ break
- # Get tenant again, as it might have been updated
- # by a tenant reconfig or layout change.
- tenant = self.abide.tenants[tenant_name]
+ try:
+ with tenant_read_lock(
+ self.zk_client, tenant_name, blocking=False
+ ) as tlock:
+ if not self.isTenantLayoutUpToDate(tenant_name):
+ continue
- if not self._stopped:
- # This will forward trigger events to pipeline
- # event queues that are processed below.
- self.process_tenant_trigger_queue(tenant)
+ # Get tenant again, as it might have been updated
+ # by a tenant reconfig or layout change.
+ tenant = self.abide.tenants[tenant_name]
- self.process_pipelines(tenant, tlock)
- except LockException:
- self.log.debug("Skipping locked tenant %s",
- tenant.name)
- remote_state = self.tenant_layout_state.get(
- tenant_name)
- local_state = self.local_layout_state.get(
- tenant_name)
- if (remote_state is None or
- local_state is None or
- remote_state > local_state):
- # Let's keep looping until we've updated to the
- # latest tenant layout.
- self.wake_event.set()
- except Exception:
- self.log.exception("Exception processing tenant %s:",
- tenant_name)
- # There may still be more events to process
- self.wake_event.set()
+ if not self._stopped:
+ # This will forward trigger events to pipeline
+ # event queues that are processed below.
+ self.process_tenant_trigger_queue(tenant)
+
+ self.process_pipelines(tenant, tlock)
+ except LockException:
+ self.log.debug("Skipping locked tenant %s",
+ tenant.name)
+ remote_state = self.tenant_layout_state.get(
+ tenant_name)
+ local_state = self.local_layout_state.get(
+ tenant_name)
+ if (remote_state is None or
+ local_state is None or
+ remote_state > local_state):
+ # Let's keep looping until we've updated to the
+ # latest tenant layout.
+ self.wake_event.set()
except Exception:
- self.log.exception("Exception in run handler:")
+ self.log.exception("Exception processing tenant %s:",
+ tenant_name)
# There may still be more events to process
self.wake_event.set()
- finally:
- self.run_handler_lock.release()
def primeSystemConfig(self):
with self.layout_lock:
@@ -2102,6 +2210,7 @@ class Scheduler(threading.Thread):
def updateSystemConfig(self):
with self.layout_lock:
+ self.log.debug("Updating system config")
self.unparsed_abide, self.globals = self.system_config_cache.get()
self.ansible_manager = AnsibleManager(
default_version=self.globals.default_ansible_version)
@@ -2136,7 +2245,12 @@ class Scheduler(threading.Thread):
self.zk_client, tenant.name, pipeline.name,
blocking=False) as lock,\
self.createZKContext(lock, self.log) as ctx:
+ self.log.debug("Processing pipeline %s in tenant %s",
+ pipeline.name, tenant.name)
with pipeline.manager.currentContext(ctx):
+ if ((tenant.name, pipeline.name) in
+ self._profile_pipelines):
+ ctx.profile = True
with self.statsd_timer(f'{stats_key}.handling'):
refreshed = self._process_pipeline(
tenant, pipeline)
@@ -2205,14 +2319,10 @@ class Scheduler(threading.Thread):
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
ctx = pipeline.manager.current_context
- if (tenant.name, pipeline.name) in self._profile_pipelines:
- ctx.profile = True
with self.statsd_timer(f'{stats_key}.refresh'):
pipeline.change_list.refresh(ctx)
pipeline.summary.refresh(ctx)
pipeline.state.refresh(ctx)
- if (tenant.name, pipeline.name) in self._profile_pipelines:
- ctx.profile = False
pipeline.state.setDirty(self.zk_client.client)
if pipeline.state.old_queues:
@@ -2249,7 +2359,9 @@ class Scheduler(threading.Thread):
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s %s",
tenant.name, pipeline.name)
- pipeline.change_list.refresh(ctx)
+ # This will raise an exception and abort the process if
+ # unable to refresh the change list.
+ pipeline.change_list.refresh(ctx, allow_init=False)
change_keys = pipeline.change_list.getChangeKeys()
relevant_changes = pipeline.manager.resolveChangeKeys(
change_keys)
@@ -2275,11 +2387,21 @@ class Scheduler(threading.Thread):
with trigger_queue_lock(
self.zk_client, tenant.name, blocking=False
):
+ self.log.debug("Processing tenant trigger events in %s",
+ tenant.name)
# Update the pipeline changes
ctx = self.createZKContext(None, self.log)
for pipeline in tenant.layout.pipelines.values():
+ # This will raise an exception if it is unable to
+ # refresh the change list. We will proceed anyway
+ # and use our data from the last time we did
+ # refresh in order to avoid stalling trigger
+ # processing. In this case we may not forward
+ # some events which are related to changes in the
+ # pipeline but don't match the pipeline trigger
+ # criteria.
try:
- pipeline.change_list.refresh(ctx)
+ pipeline.change_list.refresh(ctx, allow_init=False)
except Exception:
self.log.exception(
"Unable to refresh pipeline change list for %s",
@@ -2470,6 +2592,13 @@ class Scheduler(threading.Thread):
with management_queue_lock(
self.zk_client, tenant.name, blocking=False
):
+ if not self.isTenantLayoutUpToDate(tenant.name):
+ self.log.debug(
+ "Skipping management event queue for tenant %s",
+ tenant.name)
+ return
+ self.log.debug("Processing tenant management events in %s",
+ tenant.name)
self._process_tenant_management_queue(tenant)
except LockException:
self.log.debug("Skipping locked management event queue"
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index 7f27cd970..47226fd7d 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -395,7 +395,7 @@ class LogStreamer(object):
self.finger_socket = socket.create_connection(
(server, port), timeout=10)
if use_ssl:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = self.zuulweb.finger_tls_verify_hostnames
context.load_cert_chain(
diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py
index ebb33ec88..ce38cf23e 100644
--- a/zuul/zk/event_queues.py
+++ b/zuul/zk/event_queues.py
@@ -144,6 +144,17 @@ class EventWatcher(ZooKeeperSimpleBase):
self.watched_tenants.add(tenant_name)
def _pipelineWatch(self, tenant_name, pipelines):
+ # Remove pipelines that no longer exists from the watch list so
+ # we re-register the children watch in case the pipeline is
+ # added again.
+ for watched_tenant, pipeline_name in list(self.watched_pipelines):
+ if watched_tenant != tenant_name:
+ continue
+ if pipeline_name in pipelines:
+ continue
+ with suppress(KeyError):
+ self.watched_pipelines.remove((tenant_name, pipeline_name))
+
for pipeline_name in pipelines:
key = (tenant_name, pipeline_name)
if key in self.watched_pipelines:
@@ -205,6 +216,9 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
self.queue_root = queue_root
self.event_root = f'{queue_root}/queue'
self.data_root = f'{queue_root}/data'
+ self.initialize()
+
+ def initialize(self):
self.kazoo_client.ensure_path(self.event_root)
self.kazoo_client.ensure_path(self.data_root)
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index 73adf5954..87d76bca6 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -233,7 +233,18 @@ class ZKObject:
obj._load(context, path=path)
return obj
+ def internalCreate(self, context):
+ """Create the object in ZK from an existing ZKObject
+
+ This should only be used in special circumstances: when we
+ know it's safe to start using a ZKObject before it's actually
+ created in ZK. Normally use .new()
+ """
+ data = self._trySerialize(context)
+ self._save(context, data, create=True)
+
def refresh(self, context):
+
"""Update data from ZK"""
self._load(context)
@@ -308,6 +319,17 @@ class ZKObject:
return (compressed_size, uncompressed_size)
+ def getZKVersion(self):
+ """Return the ZK version of the object as of the last load/refresh.
+
+ Returns None if the object is newly created.
+ """
+ zstat = getattr(self, '_zstat', None)
+ # If zstat is None, we created the object
+ if zstat is None:
+ return None
+ return zstat.version
+
# Private methods below
def _retry(self, context, func, *args, max_tries=-1, **kw):