summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--.zuul.yaml94
-rw-r--r--Dockerfile25
-rw-r--r--doc/source/config/job.rst16
-rw-r--r--doc/source/config/pipeline.rst11
-rw-r--r--doc/source/developer/model-changelog.rst19
-rw-r--r--doc/source/governance.rst2
-rw-r--r--doc/source/job-content.rst19
-rw-r--r--doc/source/monitoring.rst11
-rw-r--r--noxfile.py128
-rw-r--r--playbooks/zuul-nox/post-system-logs.yaml (renamed from playbooks/zuul-tox/post-system-logs.yaml)0
-rw-r--r--playbooks/zuul-nox/pre.yaml (renamed from playbooks/zuul-tox/pre.yaml)0
-rw-r--r--releasenotes/notes/change_message-18207e18b5dfffd3.yaml12
-rw-r--r--releasenotes/notes/fix-exclude-include-priority-ea4a21ab1e53cb4a.yaml12
-rw-r--r--releasenotes/notes/non-live-pipeline-requirements-aa173bd86b332e63.yaml29
-rw-r--r--releasenotes/notes/paused-events-4adaade5e29fc10e.yaml5
-rw-r--r--requirements.txt6
-rw-r--r--setup.cfg7
-rw-r--r--tests/base.py21
-rw-r--r--tests/fakegithub.py17
-rw-r--r--tests/fakegitlab.py1
-rw-r--r--tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml4
-rw-r--r--tests/fixtures/config/requirements/trusted-check/git/common-config/playbooks/base.yaml2
-rw-r--r--tests/fixtures/config/requirements/trusted-check/git/common-config/zuul.yaml41
-rw-r--r--tests/fixtures/config/requirements/trusted-check/git/gh_project/README1
-rw-r--r--tests/fixtures/config/requirements/trusted-check/git/org_project/README1
-rw-r--r--tests/fixtures/config/requirements/trusted-check/main.yaml11
-rw-r--r--tests/fixtures/config/tenant-parser/exclude-include-branches.yaml16
-rw-r--r--tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml4
-rw-r--r--tests/fixtures/layouts/deps-by-topic.yaml13
-rw-r--r--tests/fixtures/layouts/github-merge-mode.yaml21
-rw-r--r--tests/fixtures/layouts/two-check.yaml47
-rw-r--r--tests/fixtures/zuul-gerrit-github.conf1
-rwxr-xr-xtests/make_playbooks.py3
-rw-r--r--tests/unit/test_circular_dependencies.py170
-rw-r--r--tests/unit/test_configloader.py6
-rw-r--r--tests/unit/test_connection.py253
-rw-r--r--tests/unit/test_event_queues.py30
-rw-r--r--tests/unit/test_gerrit.py45
-rw-r--r--tests/unit/test_git_driver.py3
-rw-r--r--tests/unit/test_github_driver.py64
-rw-r--r--tests/unit/test_inventory.py23
-rw-r--r--tests/unit/test_merger_repo.py113
-rw-r--r--tests/unit/test_model.py4
-rw-r--r--tests/unit/test_model_upgrade.py146
-rw-r--r--tests/unit/test_reporting.py4
-rw-r--r--tests/unit/test_requirements.py37
-rw-r--r--tests/unit/test_scheduler.py93
-rw-r--r--tests/unit/test_sos.py73
-rw-r--r--tests/unit/test_streaming.py28
-rw-r--r--tests/unit/test_v3.py40
-rw-r--r--tests/unit/test_zk.py84
-rw-r--r--tests/unit/test_zuultrigger.py12
-rw-r--r--tools/docker-compose.yaml2
-rwxr-xr-xtools/test-setup-docker.sh3
-rwxr-xr-xtools/yarn-build.sh73
-rw-r--r--tox.ini7
-rw-r--r--web/public/openapi.yaml40
-rw-r--r--web/src/App.jsx105
-rw-r--r--web/src/App.test.jsx5
-rw-r--r--web/src/containers/timezone/SelectTz.jsx22
-rw-r--r--web/src/index.css15
-rw-r--r--zuul/ansible/logconfig.py3
-rwxr-xr-xzuul/cmd/__init__.py8
-rw-r--r--zuul/configloader.py80
-rw-r--r--zuul/connection/__init__.py103
-rw-r--r--zuul/driver/gerrit/gerritconnection.py9
-rw-r--r--zuul/driver/github/githubconnection.py47
-rw-r--r--zuul/driver/github/githubsource.py3
-rw-r--r--zuul/driver/gitlab/gitlabconnection.py7
-rw-r--r--zuul/driver/mqtt/mqttreporter.py3
-rw-r--r--zuul/driver/sql/alembic/env.py3
-rw-r--r--zuul/driver/sql/alembic/versions/0ed5def089e2_add_build_event_table.py49
-rw-r--r--zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py17
-rw-r--r--zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py17
-rw-r--r--zuul/driver/sql/sqlconnection.py63
-rw-r--r--zuul/driver/sql/sqlreporter.py11
-rw-r--r--zuul/driver/timer/__init__.py5
-rw-r--r--zuul/executor/client.py5
-rw-r--r--zuul/executor/common.py1
-rw-r--r--zuul/executor/server.py25
-rw-r--r--zuul/lib/encryption.py17
-rw-r--r--zuul/lib/fingergw.py30
-rw-r--r--zuul/lib/repl.py6
-rw-r--r--zuul/lib/streamer_utils.py2
-rw-r--r--zuul/manager/__init__.py229
-rw-r--r--zuul/manager/independent.py12
-rw-r--r--zuul/merger/merger.py43
-rw-r--r--zuul/model.py353
-rw-r--r--zuul/model_api.py4
-rw-r--r--zuul/scheduler.py392
-rw-r--r--zuul/source/__init__.py19
-rw-r--r--zuul/version.py5
-rwxr-xr-xzuul/web/__init__.py2
-rw-r--r--zuul/zk/branch_cache.py83
-rw-r--r--zuul/zk/config_cache.py2
-rw-r--r--zuul/zk/event_queues.py14
-rw-r--r--zuul/zk/locks.py2
-rw-r--r--zuul/zk/vendor/lock.py759
-rw-r--r--zuul/zk/zkobject.py22
100 files changed, 3183 insertions, 1373 deletions
diff --git a/.gitignore b/.gitignore
index c2f47953b..ee87c9d50 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,6 +7,7 @@
.mypy_cache
.test
.testrepository
+.nox
.tox
.venv
.coverage
diff --git a/.zuul.yaml b/.zuul.yaml
index 66a3de1e3..420ac11ad 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -40,44 +40,47 @@
zuul_ansible_version: 6
- job:
- name: zuul-tox
+ name: zuul-nox
description: |
Zuul unit tests with ZooKeeper running
- parent: tox
+ parent: nox
nodeset: ubuntu-jammy
- pre-run: playbooks/zuul-tox/pre.yaml
- post-run: playbooks/zuul-tox/post-system-logs.yaml
+ pre-run: playbooks/zuul-nox/pre.yaml
+ post-run: playbooks/zuul-nox/post-system-logs.yaml
vars:
- tox_environment:
+ nox_environment:
ZUUL_ZK_CA: /opt/zookeeper/ca/certs/cacert.pem
ZUUL_ZK_CERT: /opt/zookeeper/ca/certs/client.pem
ZUUL_ZK_KEY: /opt/zookeeper/ca/keys/clientkey.pem
ZUUL_TEST_ROOT: /tmp/zuul-test
YARN_REGISTRY: "https://{{ zuul_site_mirror_fqdn }}:4443/registry.npmjs"
+ CI: "1"
test_setup_environment:
ZUUL_TEST_ROOT: /tmp/zuul-test
YARN_REGISTRY: "https://{{ zuul_site_mirror_fqdn }}:4443/registry.npmjs"
- job:
- name: zuul-tox-remote
- parent: tox
+ name: zuul-nox-remote
+ parent: nox
nodeset: ubuntu-jammy
timeout: 2700 # 45 minutes
- pre-run: playbooks/zuul-tox/pre.yaml
- post-run: playbooks/zuul-tox/post-system-logs.yaml
+ pre-run: playbooks/zuul-nox/pre.yaml
+ post-run: playbooks/zuul-nox/post-system-logs.yaml
vars:
- tox_envlist: remote
- tox_environment:
+ nox_session: remote
+ nox_environment:
ZUUL_ZK_CA: /opt/zookeeper/ca/certs/cacert.pem
ZUUL_ZK_CERT: /opt/zookeeper/ca/certs/client.pem
ZUUL_ZK_KEY: /opt/zookeeper/ca/keys/clientkey.pem
ZUUL_SSH_KEY: /home/zuul/.ssh/id_rsa
ZUUL_REMOTE_IPV4: "{{ nodepool.interface_ip }}"
ZUUL_REMOTE_KEEP: "true"
+ CI: "1"
- job:
+ # Zuul cient uses this job so we can't just delete it yet.
name: zuul-tox-zuul-client
- parent: zuul-tox
+ parent: zuul-nox
description: |
Test that Zuul and zuul-client work together.
required-projects:
@@ -88,28 +91,42 @@
tox_envlist: zuul_client
- job:
- name: zuul-tox-py311
- parent: zuul-tox
+ name: zuul-nox-zuul-client
+ parent: zuul-nox
+ description: |
+ Test that Zuul and zuul-client work together.
+ required-projects:
+ - zuul/zuul
+ - zuul/zuul-client
+ vars:
+ zuul_work_dir: "{{ zuul.projects['opendev.org/zuul/zuul'].src_dir }}"
+ nox_session: zuul_client
+
+- job:
+ name: zuul-nox-py311
+ parent: zuul-nox
timeout: 7200 # 120 minutes
vars:
- tox_envlist: py311
+ nox_keyword: tests
+ nox_force_python: "3.11"
python_version: "3.11"
- job:
- name: zuul-tox-py38
- parent: zuul-tox
+ name: zuul-nox-py38
+ parent: zuul-nox
timeout: 7200 # 120 minutes
vars:
- tox_envlist: py38
+ nox_keyword: tests
+ nox_force_python: "3.8"
python_version: "3.8"
nodeset: ubuntu-focal
- job:
- name: zuul-tox-py311-multi-scheduler
- parent: zuul-tox-py311
+ name: zuul-nox-py311-multi-scheduler
+ parent: zuul-nox-py311
voting: false
vars:
- tox_environment:
+ nox_environment:
ZUUL_SCHEDULER_COUNT: 2
- job:
@@ -278,17 +295,18 @@
vars:
node_version: 16
release_python: python3
+ ensure_tox_version: "<4"
check:
jobs:
- zuul-build-image
- - zuul-tox-docs
- - tox-linters:
+ - zuul-nox-docs
+ - nox-linters:
vars:
- tox_install_bindep: false
+ nox_install_bindep: false
nodeset: ubuntu-jammy
- - zuul-tox-py38
- - zuul-tox-py311
- - zuul-tox-py311-multi-scheduler
+ - zuul-nox-py38
+ - zuul-nox-py311
+ - zuul-nox-py311-multi-scheduler
- zuul-build-dashboard-openstack-whitelabel
- zuul-build-dashboard-software-factory
- zuul-build-dashboard-opendev
@@ -303,22 +321,22 @@
- web/.*
nodeset: ubuntu-jammy
- zuul-stream-functional-6
- - zuul-tox-remote
+ - zuul-nox-remote
- zuul-quick-start:
requires: nodepool-container-image
dependencies: zuul-build-image
- - zuul-tox-zuul-client
+ - zuul-nox-zuul-client
- zuul-build-python-release
gate:
jobs:
- zuul-upload-image
- - zuul-tox-docs
- - tox-linters:
+ - zuul-nox-docs
+ - nox-linters:
vars:
- tox_install_bindep: false
+ nox_install_bindep: false
nodeset: ubuntu-jammy
- - zuul-tox-py38
- - zuul-tox-py311
+ - zuul-nox-py38
+ - zuul-nox-py311
- zuul-build-dashboard
- nodejs-run-lint:
vars:
@@ -331,16 +349,16 @@
- web/.*
nodeset: ubuntu-jammy
- zuul-stream-functional-6
- - zuul-tox-remote
+ - zuul-nox-remote
- zuul-quick-start:
requires: nodepool-container-image
dependencies: zuul-upload-image
- - zuul-tox-zuul-client
+ - zuul-nox-zuul-client
- zuul-build-python-release
promote:
jobs:
- zuul-promote-image
- - zuul-promote-docs
+ - zuul-promote-nox-docs
- opendev-promote-python:
vars:
download_artifact_job: zuul-build-python-release
@@ -351,7 +369,7 @@
release:
jobs:
- zuul-release-python
- - zuul-publish-tox-docs
+ - zuul-publish-nox-docs
- upload-docker-image:
secrets:
name: docker_credentials
diff --git a/Dockerfile b/Dockerfile
index 506ed7462..df326bd8a 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -27,8 +27,9 @@ ARG REACT_APP_ZUUL_API
# Optional flag to enable React Service Worker. (set to true to enable)
ARG REACT_APP_ENABLE_SERVICE_WORKER
# Kubectl/Openshift version/sha
-ARG OPENSHIFT_URL=https://github.com/openshift/origin/releases/download/v3.11.0/openshift-origin-client-tools-v3.11.0-0cbc58b-linux-64bit.tar.gz
-ARG OPENSHIFT_SHA=4b0f07428ba854174c58d2e38287e5402964c9a9355f6c359d1242efd0990da3
+ARG OPENSHIFT_URL=https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/4.11.20/openshift-client-linux-4.11.20.tar.gz
+ARG OPENSHIFT_SHA=74f252c812932425ca19636b2be168df8fe57b114af6b114283975e67d987d11
+ARG PBR_VERSION=
COPY . /tmp/src
COPY --from=js-builder /tmp/src/build /tmp/src/zuul/web/static
@@ -46,7 +47,16 @@ RUN /output/install-from-bindep \
&& curl -L $OPENSHIFT_URL -o /tmp/openshift-install/openshift-client.tgz \
&& cd /tmp/openshift-install/ \
&& echo $OPENSHIFT_SHA /tmp/openshift-install/openshift-client.tgz | sha256sum --check \
- && tar xvfz openshift-client.tgz --strip-components=1 -C /tmp/openshift-install
+ && tar xvfz openshift-client.tgz -C /tmp/openshift-install
+
+
+FROM docker.io/library/golang:1.19-bullseye AS skopeo-builder
+RUN apt-get update \
+ && apt-get install -y git build-essential libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config \
+ && git clone https://github.com/containers/skopeo.git \
+ && cd skopeo && git checkout v1.9.3 \
+ && make bin/skopeo \
+ && cp bin/skopeo /tmp/skopeo
FROM docker.io/opendevorg/python-base:3.11-bullseye as zuul
ENV DEBIAN_FRONTEND=noninteractive
@@ -66,11 +76,16 @@ CMD ["/usr/local/bin/zuul"]
FROM zuul as zuul-executor
ENV DEBIAN_FRONTEND=noninteractive
COPY --from=builder /usr/local/lib/zuul/ /usr/local/lib/zuul
-COPY --from=builder /tmp/openshift-install/kubectl /usr/local/bin/kubectl
COPY --from=builder /tmp/openshift-install/oc /usr/local/bin/oc
+# The oc and kubectl binaries are large and have the same hash.
+# Copy them only once and use a symlink to save space.
+RUN ln -s /usr/local/bin/oc /usr/local/bin/kubectl
+# See note above about this workaround. These are the runtime
+# dependencies, this should just install skopeo when we upgrade.
+COPY --from=skopeo-builder /tmp/skopeo /usr/local/bin/skopeo
RUN apt-get update \
- && apt-get install -y skopeo \
+ && apt-get install -y libdevmapper1.02.1 libgpgme11 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
diff --git a/doc/source/config/job.rst b/doc/source/config/job.rst
index 435d68e7e..f9e7b7b71 100644
--- a/doc/source/config/job.rst
+++ b/doc/source/config/job.rst
@@ -1046,6 +1046,14 @@ Here is an example of two job definitions:
value is used to determine if the job should run. This is a
:ref:`regular expression <regex>` or list of regular expressions.
+ .. warning::
+
+ File filters will be ignored for refs that don't have any
+ files. This will be the case for merge commits (e.g. in a post
+ pipeline) or empty commits created with
+ ``git commit --allow-empty`` (which can be used in order to
+ run all jobs).
+
.. attr:: irrelevant-files
This is a negative complement of **files**. It indicates that
@@ -1055,6 +1063,14 @@ Here is an example of two job definitions:
are in the docs directory. A :ref:`regular expression <regex>`
or list of regular expressions.
+ .. warning::
+
+ File filters will be ignored for refs that don't have any
+ files. This will be the case for merge commits (e.g. in a post
+ pipeline) or empty commits created with
+ ``git commit --allow-empty`` (which can be used in order to
+ run all jobs).
+
.. attr:: match-on-config-updates
:default: true
diff --git a/doc/source/config/pipeline.rst b/doc/source/config/pipeline.rst
index f4d7cce69..41bedfbf2 100644
--- a/doc/source/config/pipeline.rst
+++ b/doc/source/config/pipeline.rst
@@ -266,6 +266,17 @@ success, the pipeline reports back to Gerrit with ``Verified`` vote of
type of the connection will dictate which options are available.
See :ref:`drivers`.
+ .. attr:: allow-other-connections
+ :default: true
+
+ If this is set to `false` then any change enqueued into the
+ pipeline (whether it is enqueued to run jobs or merely as a
+ dependency) must be from one of the connections specified in the
+ pipeline configuration (this includes any trigger, reporter, or
+ source requirement). When used in conjuctions with
+ :attr:`pipeline.require`, this can ensure that pipeline
+ requirements are exhaustive.
+
.. attr:: supercedes
The name of a pipeline, or a list of names, that this pipeline
diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst
index c6d8fedbd..f78b3f0a0 100644
--- a/doc/source/developer/model-changelog.rst
+++ b/doc/source/developer/model-changelog.rst
@@ -100,3 +100,22 @@ Version 10
:Prior Zuul version: 6.4.0
:Description: Renames admin_rules to authz_rules in unparsed abide.
Affects schedulers and web.
+
+Version 11
+----------
+
+:Prior Zuul version: 8.0.1
+:Description: Adds merge_modes to branch cache. Affects schedulers and web.
+
+Version 12
+----------
+:Prior Zuul version: 8.0.1
+:Description: Adds job_versions and build_versions to BuildSet.
+ Affects schedulers.
+
+Version 13
+----------
+:Prior Zuul version: 8.2.0
+:Description: Stores only the necessary event info as part of a queue item
+ instead of the full trigger event.
+ Affects schedulers.
diff --git a/doc/source/governance.rst b/doc/source/governance.rst
index 7393481cd..7bcdac44f 100644
--- a/doc/source/governance.rst
+++ b/doc/source/governance.rst
@@ -99,7 +99,7 @@ The Project Lead is elected to a term of one year. The election
process shall be a Condorcet election and the candidates shall be
self-nominated from among the existing Maintainers.
-The Project Lead is James E. Blair (term expires 2023-01-13).
+The Project Lead is James E. Blair (term expires 2024-01-16).
Zuul-Jobs Maintainers
---------------------
diff --git a/doc/source/job-content.rst b/doc/source/job-content.rst
index 75044cf1c..d6bb07683 100644
--- a/doc/source/job-content.rst
+++ b/doc/source/job-content.rst
@@ -612,7 +612,6 @@ of item.
The patchset identifier for the change. If a change is
revised, this will have a different value.
-
.. var:: resources
:type: dict
@@ -706,14 +705,18 @@ are available:
The commit or pull request message of the change base64 encoded. Use the
`b64decode` filter in ansible when working with it.
- .. code-block:: yaml
+ .. warning:: This variable is deprecated and will be removed in
+ a future version. Use :var:`zuul.change_message`
+ instead.
+
+ .. var:: change_message
- - hosts: all
- tasks:
- - name: Dump commit message
- copy:
- content: "{{ zuul.message | b64decode }}"
- dest: "{{ zuul.executor.log_root }}/commit-message.txt"
+ The commit or pull request message of the change. When Zuul
+ runs Ansible, this variable is tagged with the ``!unsafe`` YAML
+ tag so that Ansible will not interpolate values into it. Note,
+ however, that the `inventory.yaml` file placed in the build's
+ workspace for debugging and inspection purposes does not inclued
+ the ``!unsafe`` tag.
Branch Items
diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst
index 7f99c7f7f..f40bee445 100644
--- a/doc/source/monitoring.rst
+++ b/doc/source/monitoring.rst
@@ -444,6 +444,11 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
Indicates if the executor is paused. 1 means paused else 0.
+ .. stat:: pct_used_hdd
+ :type: gauge
+
+ The used disk on this executor, as a percentage multiplied by 100.
+
.. stat:: pct_used_ram
:type: gauge
@@ -711,6 +716,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The size of the current connection event queue.
+ .. stat:: run_handler
+ :type: timer
+
+ A timer metric reporting the time taken for one scheduler run
+ handler iteration.
+
.. stat:: time_query
:type: timer
diff --git a/noxfile.py b/noxfile.py
new file mode 100644
index 000000000..30058cef7
--- /dev/null
+++ b/noxfile.py
@@ -0,0 +1,128 @@
+# Copyright 2022 Acme Gating, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import multiprocessing
+import os
+
+import nox
+
+
+nox.options.error_on_external_run = True
+nox.options.reuse_existing_virtualenvs = True
+nox.options.sessions = ["tests-3", "linters"]
+
+
+def set_env(session, var, default):
+ session.env[var] = os.environ.get(var, default)
+
+
+def set_standard_env_vars(session):
+ set_env(session, 'OS_LOG_CAPTURE', '1')
+ set_env(session, 'OS_STDERR_CAPTURE', '1')
+ set_env(session, 'OS_STDOUT_CAPTURE', '1')
+ set_env(session, 'OS_TEST_TIMEOUT', '360')
+ session.env['PYTHONWARNINGS'] = ','.join([
+ 'always::DeprecationWarning:zuul.driver.sql.sqlconnection',
+ 'always::DeprecationWarning:tests.base',
+ 'always::DeprecationWarning:tests.unit.test_database',
+ 'always::DeprecationWarning:zuul.driver.sql.alembic.env',
+ 'always::DeprecationWarning:zuul.driver.sql.alembic.script',
+ ])
+ # Set PYTHONTRACEMALLOC to a value greater than 0 in the calling env
+ # to get tracebacks of that depth for ResourceWarnings. Disabled by
+ # default as this consumes more resources and is slow.
+ set_env(session, 'PYTHONTRACEMALLOC', '0')
+
+
+@nox.session(python='3')
+def bindep(session):
+ set_standard_env_vars(session)
+ session.install('bindep')
+ session.run('bindep', 'test')
+
+
+@nox.session(python='3')
+def cover(session):
+ set_standard_env_vars(session)
+ session.env['PYTHON'] = 'coverage run --source zuul --parallel-mode'
+ session.install('-r', 'requirements.txt',
+ '-r', 'test-requirements.txt')
+ session.install('-e', '.')
+ session.run('stestr', 'run')
+ session.run('coverage', 'combine')
+ session.run('coverage', 'html', '-d', 'cover')
+ session.run('coverage', 'xml', '-o', 'cover/coverage.xml')
+
+
+@nox.session(python='3')
+def docs(session):
+ set_standard_env_vars(session)
+ session.install('-r', 'doc/requirements.txt',
+ '-r', 'test-requirements.txt')
+ session.install('-e', '.')
+ session.run('sphinx-build', '-E', '-W', '-d', 'doc/build/doctrees',
+ '-b', 'html', 'doc/source/', 'doc/build/html')
+
+
+@nox.session(python='3')
+def linters(session):
+ set_standard_env_vars(session)
+ session.install('flake8', 'openapi-spec-validator')
+ session.run('flake8')
+ session.run('openapi-spec-validator', 'web/public/openapi.yaml')
+
+
+@nox.session(python='3')
+def tests(session):
+ set_standard_env_vars(session)
+ session.install('-r', 'requirements.txt',
+ '-r', 'test-requirements.txt')
+ session.install('-e', '.')
+ session.run_always('tools/yarn-build.sh', external=True)
+ session.run_always('zuul-manage-ansible', '-v')
+ procs = max(int(multiprocessing.cpu_count() - 1), 1)
+ session.run('stestr', 'run', '--slowest', f'--concurrency={procs}',
+ *session.posargs)
+
+
+@nox.session(python='3')
+def remote(session):
+ set_standard_env_vars(session)
+ session.install('-r', 'requirements.txt',
+ '-r', 'test-requirements.txt')
+ session.install('-e', '.')
+ session.run_always('zuul-manage-ansible', '-v')
+ session.run('stestr', 'run', '--test-path', './tests/remote')
+
+
+@nox.session(python='3')
+def venv(session):
+ set_standard_env_vars(session)
+ session.install('-r', 'requirements.txt',
+ '-r', 'test-requirements.txt')
+ session.install('-e', '.')
+ session.run(*session.posargs)
+
+
+@nox.session(python='3')
+def zuul_client(session):
+ set_standard_env_vars(session)
+ session.install('zuul-client',
+ '-r', 'test-requirements.txt',
+ '-r', 'requirements.txt')
+ session.install('-e', '.')
+ session.run_always('zuul-manage-ansible', '-v')
+ session.run(
+ 'stestr', 'run', '--concurrency=1',
+ '--test-path', './tests/zuul_client')
diff --git a/playbooks/zuul-tox/post-system-logs.yaml b/playbooks/zuul-nox/post-system-logs.yaml
index 437411914..437411914 100644
--- a/playbooks/zuul-tox/post-system-logs.yaml
+++ b/playbooks/zuul-nox/post-system-logs.yaml
diff --git a/playbooks/zuul-tox/pre.yaml b/playbooks/zuul-nox/pre.yaml
index c8c1c6500..c8c1c6500 100644
--- a/playbooks/zuul-tox/pre.yaml
+++ b/playbooks/zuul-nox/pre.yaml
diff --git a/releasenotes/notes/change_message-18207e18b5dfffd3.yaml b/releasenotes/notes/change_message-18207e18b5dfffd3.yaml
new file mode 100644
index 000000000..1fd005684
--- /dev/null
+++ b/releasenotes/notes/change_message-18207e18b5dfffd3.yaml
@@ -0,0 +1,12 @@
+---
+features:
+ - |
+ The change message (commit message, or pull request message
+ depending on the driver) is now available in plain text form
+ annotated with the Ansible `!unsafe` tag as
+ :var:`zuul.change_message`.
+deprecations:
+ - |
+ The base64 encoded version of the change message available as
+ :var:`zuul.message` is deprecated and will be removed in a future
+ version of Zuul. Use :var:`zuul.change_message` instead.
diff --git a/releasenotes/notes/fix-exclude-include-priority-ea4a21ab1e53cb4a.yaml b/releasenotes/notes/fix-exclude-include-priority-ea4a21ab1e53cb4a.yaml
new file mode 100644
index 000000000..9dd238faf
--- /dev/null
+++ b/releasenotes/notes/fix-exclude-include-priority-ea4a21ab1e53cb4a.yaml
@@ -0,0 +1,12 @@
+---
+upgrade:
+ - |
+ Fixes `tenant.untrusted-projects.<project>.include-branches` being lower
+ priority than `tenant.untrusted-projects.<project>.exclude-branches` to
+ match the documentation and expected user behavior.
+
+ This only affects projects that are using both `include-branches` and
+ `exclude-branches` at the same time. Now, `include-branches` has priority
+ over `exclude-branches` for any branches that match both. Practically
+ speaking, this means that `exclude-branches` is ignored if
+ `include-branches` is set.
diff --git a/releasenotes/notes/non-live-pipeline-requirements-aa173bd86b332e63.yaml b/releasenotes/notes/non-live-pipeline-requirements-aa173bd86b332e63.yaml
new file mode 100644
index 000000000..052d5b255
--- /dev/null
+++ b/releasenotes/notes/non-live-pipeline-requirements-aa173bd86b332e63.yaml
@@ -0,0 +1,29 @@
+---
+features:
+ - |
+ A new pipeline attribute,
+ :attr:`pipeline.allow-other-connections`, has been added
+ to ensure that only changes from connections which
+ are mentioned in the pipeline configuration (such as triggers,
+ reporters, or pipeline requirements) are enqueued.
+security:
+ - |
+ Non-live items are now subject to pipeline requirements for
+ independent pipelines.
+
+ Previously, an optimization for independent pipelines skipped
+ checking that a change met the pipeline requirements. If an
+ independent pipeline is intended only to run reviewed code, this
+ could allow running unreviewed code by updating dependent changes.
+
+ Now both non-live and live items are subject to pipeline
+ requirements in all pipeline managers.
+
+ - |
+ The new `allow-other-connections` pipeline configuration option
+ may now be used to ensure that only changes from connections which
+ are mentioned in the pipeline configuration (such as triggers,
+ reporters, or pipeline requirements) are enqueued. This allows
+ the construction of a pipeline where, for example, code review
+ requirements are strictly enforced, even for dependencies which
+ are not normally directly enqueued.
diff --git a/releasenotes/notes/paused-events-4adaade5e29fc10e.yaml b/releasenotes/notes/paused-events-4adaade5e29fc10e.yaml
new file mode 100644
index 000000000..8db16102d
--- /dev/null
+++ b/releasenotes/notes/paused-events-4adaade5e29fc10e.yaml
@@ -0,0 +1,5 @@
+---
+features:
+ - |
+ Zuul now stores the pause and resume events for a build together with
+ their timestamp and reports them to the SQL database and via MQTT.
diff --git a/requirements.txt b/requirements.txt
index a4aeb7f09..e12e9cf72 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -17,10 +17,10 @@ tzlocal<3.0 # https://github.com/agronholm/apscheduler/discussions/570
PrettyTable>=0.6,<0.8
babel>=1.0
netaddr
-kazoo>=2.8.0
-sqlalchemy
+kazoo>=2.9.0
+sqlalchemy>=2.0.0
alembic
-cryptography>=1.6
+cryptography>=39.0.0
cachecontrol<0.12.7
cachetools
pyjwt>=2.0.0
diff --git a/setup.cfg b/setup.cfg
index 79157c9ed..e2eec4ba4 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -60,3 +60,10 @@ allow_redefinition = True
files = zuul
ignore_missing_imports = True
python_version = 3.6
+
+[flake8]
+# These are ignored intentionally in zuul projects;
+# please don't submit patches that solely correct them or enable them.
+ignore = E124,E125,E129,E252,E402,E741,H,W503,W504
+show-source = True
+exclude = .venv,.tox,.nox,dist,doc,build,*.egg,node_modules
diff --git a/tests/base.py b/tests/base.py
index 731d6c3cc..fd927a92c 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -1227,6 +1227,7 @@ class GerritWebServer(object):
def stop(self):
self.httpd.shutdown()
self.thread.join()
+ self.httpd.server_close()
class FakeGerritPoller(gerritconnection.GerritPoller):
@@ -3625,10 +3626,11 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.log.debug('No running builds to release')
return
- self.log.debug("Releasing build %s (%s)" % (regex, len(builds)))
+ self.log.debug("Releasing build %s %s (%s)" % (
+ regex, change, len(builds)))
for build in builds:
- if (not regex or re.match(regex, build.name) and
- not change or build.change == change):
+ if ((not regex or re.match(regex, build.name)) and
+ (not change or build.change == change)):
self.log.debug("Releasing build %s" %
(build.parameters['zuul']['build']))
build.release()
@@ -4096,6 +4098,7 @@ class WebProxyFixture(fixtures.Fixture):
def _cleanup(self):
self.httpd.shutdown()
self.thread.join()
+ self.httpd.server_close()
class ZuulWebFixture(fixtures.Fixture):
@@ -4345,6 +4348,11 @@ class BaseTestCase(testtools.TestCase):
handler.setFormatter(formatter)
logger = logging.getLogger()
+ # It is possible that a stderr log handler is inserted before our
+ # addHandler below. If that happens we will emit all logs to stderr
+ # even when we don't want to. Error here to make it clear there is
+ # a problem as early as possible as it is easy to overlook.
+ self.assertEqual(logger.handlers, [])
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
@@ -5173,6 +5181,11 @@ class ZuulTestCase(BaseTestCase):
self.assertIsNotNone(build.start_time)
self.assertIsNotNone(build.end_time)
+ def assertNoPipelineExceptions(self):
+ for tenant in self.scheds.first.sched.abide.tenants.values():
+ for pipeline in tenant.layout.pipelines.values():
+ self.assertEqual(0, pipeline._exception_count)
+
def assertFinalState(self):
self.log.debug("Assert final state")
# Make sure no jobs are running
@@ -5199,6 +5212,7 @@ class ZuulTestCase(BaseTestCase):
for pipeline in tenant.layout.pipelines.values():
if isinstance(pipeline.manager, ipm):
self.assertEqual(len(pipeline.queues), 0)
+ self.assertNoPipelineExceptions()
def shutdown(self):
self.log.debug("Shutting down after tests")
@@ -5625,6 +5639,7 @@ class ZuulTestCase(BaseTestCase):
time.sleep(0.1)
def refreshPipelines(self, sched):
+ ctx = None
for tenant in sched.abide.tenants.values():
with tenant_read_lock(self.zk_client, tenant.name):
for pipeline in tenant.layout.pipelines.values():
diff --git a/tests/fakegithub.py b/tests/fakegithub.py
index 4255d5022..725c083e2 100644
--- a/tests/fakegithub.py
+++ b/tests/fakegithub.py
@@ -238,6 +238,11 @@ class FakeRepository(object):
# List of branch protection rules
self._branch_protection_rules = defaultdict(FakeBranchProtectionRule)
+ self._repodata = {
+ 'allow_merge_commit': True,
+ 'allow_squash_merge': True,
+ 'allow_rebase_merge': True,
+ }
# fail the next commit requests with 404
self.fail_not_found = 0
@@ -350,6 +355,8 @@ class FakeRepository(object):
return self.get_url_collaborators(request)
if entity == 'commits':
return self.get_url_commits(request, params=params)
+ if entity == '':
+ return self.get_url_repo()
else:
return None
@@ -444,6 +451,9 @@ class FakeRepository(object):
}
return FakeResponse(data)
+ def get_url_repo(self):
+ return FakeResponse(self._repodata)
+
def pull_requests(self, state=None, sort=None, direction=None):
# sort and direction are unused currently, but present to match
# real world call signatures.
@@ -752,7 +762,12 @@ class FakeGithubSession(object):
return FakeResponse(check_run.as_dict(), 200)
def get_repo(self, request, params=None):
- org, project, request = request.split('/', 2)
+ parts = request.split('/', 2)
+ if len(parts) == 2:
+ org, project = parts
+ request = ''
+ else:
+ org, project, request = parts
project_name = '{}/{}'.format(org, project)
repo = self.client.repo_from_project(project_name)
diff --git a/tests/fakegitlab.py b/tests/fakegitlab.py
index 59b85ca96..e4a3e1ac8 100644
--- a/tests/fakegitlab.py
+++ b/tests/fakegitlab.py
@@ -276,3 +276,4 @@ class GitlabWebServer(object):
def stop(self):
self.httpd.shutdown()
self.thread.join()
+ self.httpd.server_close()
diff --git a/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml b/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml
index 834c3cbe8..cea1ffe8c 100644
--- a/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml
+++ b/tests/fixtures/config/inventory/git/common-config/playbooks/jinja2-message.yaml
@@ -4,3 +4,7 @@
copy:
content: "{{ zuul.message | b64decode }}"
dest: "{{ zuul.executor.log_root }}/commit-message.txt"
+ - name: Dump commit message
+ copy:
+ content: "{{ zuul.change_message }}"
+ dest: "{{ zuul.executor.log_root }}/change-message.txt"
diff --git a/tests/fixtures/config/requirements/trusted-check/git/common-config/playbooks/base.yaml b/tests/fixtures/config/requirements/trusted-check/git/common-config/playbooks/base.yaml
new file mode 100644
index 000000000..f679dceae
--- /dev/null
+++ b/tests/fixtures/config/requirements/trusted-check/git/common-config/playbooks/base.yaml
@@ -0,0 +1,2 @@
+- hosts: all
+ tasks: []
diff --git a/tests/fixtures/config/requirements/trusted-check/git/common-config/zuul.yaml b/tests/fixtures/config/requirements/trusted-check/git/common-config/zuul.yaml
new file mode 100644
index 000000000..494cd3cc7
--- /dev/null
+++ b/tests/fixtures/config/requirements/trusted-check/git/common-config/zuul.yaml
@@ -0,0 +1,41 @@
+- pipeline:
+ name: trusted-check
+ manager: independent
+ allow-other-connections: false
+ require:
+ gerrit:
+ approval:
+ - Code-Review: 2
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+ run: playbooks/base.yaml
+ nodeset:
+ nodes:
+ - label: ubuntu-xenial
+ name: controller
+
+- job:
+ name: check-job
+
+- project:
+ name: org/project
+ trusted-check:
+ jobs:
+ - check-job
+
+- project:
+ name: gh/project
+ trusted-check:
+ jobs:
+ - check-job
diff --git a/tests/fixtures/config/requirements/trusted-check/git/gh_project/README b/tests/fixtures/config/requirements/trusted-check/git/gh_project/README
new file mode 100644
index 000000000..9daeafb98
--- /dev/null
+++ b/tests/fixtures/config/requirements/trusted-check/git/gh_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/requirements/trusted-check/git/org_project/README b/tests/fixtures/config/requirements/trusted-check/git/org_project/README
new file mode 100644
index 000000000..9daeafb98
--- /dev/null
+++ b/tests/fixtures/config/requirements/trusted-check/git/org_project/README
@@ -0,0 +1 @@
+test
diff --git a/tests/fixtures/config/requirements/trusted-check/main.yaml b/tests/fixtures/config/requirements/trusted-check/main.yaml
new file mode 100644
index 000000000..c8deb36c1
--- /dev/null
+++ b/tests/fixtures/config/requirements/trusted-check/main.yaml
@@ -0,0 +1,11 @@
+- tenant:
+ name: tenant-one
+ source:
+ gerrit:
+ config-projects:
+ - common-config
+ untrusted-projects:
+ - org/project
+ github:
+ untrusted-projects:
+ - gh/project
diff --git a/tests/fixtures/config/tenant-parser/exclude-include-branches.yaml b/tests/fixtures/config/tenant-parser/exclude-include-branches.yaml
new file mode 100644
index 000000000..6d455802a
--- /dev/null
+++ b/tests/fixtures/config/tenant-parser/exclude-include-branches.yaml
@@ -0,0 +1,16 @@
+- tenant:
+ name: tenant-one
+ source:
+ gerrit:
+ config-projects:
+ - common-config
+ untrusted-projects:
+ - org/project1:
+ exclude-branches:
+ - foo
+ - bar
+ # Include branches has higher priority than exclude branches
+ include-branches:
+ - foo
+ - bar
+ - org/project2
diff --git a/tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml b/tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml
index 3fcc43ce6..975b9713e 100644
--- a/tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml
+++ b/tests/fixtures/config/zuultrigger/parent-change-enqueued/git/common-config/zuul.yaml
@@ -4,7 +4,7 @@
require:
gerrit:
approval:
- - Verified: -1
+ - email: for-check@example.com
trigger:
gerrit:
- event: patchset-created
@@ -24,7 +24,7 @@
require:
gerrit:
approval:
- - Verified: 1
+ - email: for-gate@example.com
trigger:
gerrit:
- event: comment-added
diff --git a/tests/fixtures/layouts/deps-by-topic.yaml b/tests/fixtures/layouts/deps-by-topic.yaml
index 3824c5c2c..e7e8fc465 100644
--- a/tests/fixtures/layouts/deps-by-topic.yaml
+++ b/tests/fixtures/layouts/deps-by-topic.yaml
@@ -47,24 +47,27 @@
run: playbooks/run.yaml
- job:
- name: test-job
+ name: check-job
+
+- job:
+ name: gate-job
- project:
name: org/project1
queue: integrated
check:
jobs:
- - test-job
+ - check-job
gate:
jobs:
- - test-job
+ - gate-job
- project:
name: org/project2
queue: integrated
check:
jobs:
- - test-job
+ - check-job
gate:
jobs:
- - test-job
+ - gate-job
diff --git a/tests/fixtures/layouts/github-merge-mode.yaml b/tests/fixtures/layouts/github-merge-mode.yaml
new file mode 100644
index 000000000..139db886a
--- /dev/null
+++ b/tests/fixtures/layouts/github-merge-mode.yaml
@@ -0,0 +1,21 @@
+- pipeline:
+ name: check
+ manager: independent
+ trigger:
+ github:
+ - event: pull_request_review
+ action: submitted
+ state: approve
+ start:
+ github: {}
+ success:
+ github: {}
+ failure:
+ github: {}
+
+- project:
+ name: org/project
+ merge-mode: rebase
+ gate:
+ jobs:
+ - noop
diff --git a/tests/fixtures/layouts/two-check.yaml b/tests/fixtures/layouts/two-check.yaml
new file mode 100644
index 000000000..bc3e59818
--- /dev/null
+++ b/tests/fixtures/layouts/two-check.yaml
@@ -0,0 +1,47 @@
+- pipeline:
+ name: check1
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- pipeline:
+ name: check2
+ manager: independent
+ trigger:
+ gerrit:
+ - event: patchset-created
+ success:
+ gerrit:
+ Verified: 1
+ failure:
+ gerrit:
+ Verified: -1
+
+- job:
+ name: base
+ parent: null
+ run: playbooks/base.yaml
+ nodeset:
+ nodes:
+ - label: ubuntu-xenial
+ name: controller
+
+- job:
+ name: check-job
+ run: playbooks/check.yaml
+
+- project:
+ name: org/project
+ check1:
+ jobs:
+ - check-job
+ check2:
+ jobs:
+ - check-job
diff --git a/tests/fixtures/zuul-gerrit-github.conf b/tests/fixtures/zuul-gerrit-github.conf
index 0d03bacc2..8114159f7 100644
--- a/tests/fixtures/zuul-gerrit-github.conf
+++ b/tests/fixtures/zuul-gerrit-github.conf
@@ -5,6 +5,7 @@ server=127.0.0.1
[scheduler]
tenant_config=main.yaml
+relative_priority=true
[merger]
git_dir=/tmp/zuul-test/merger-git
diff --git a/tests/make_playbooks.py b/tests/make_playbooks.py
index 93c37bc81..cb7a98096 100755
--- a/tests/make_playbooks.py
+++ b/tests/make_playbooks.py
@@ -40,7 +40,8 @@ def handle_repo(path):
config_path = os.path.join(path, fn)
break
try:
- config = yaml.safe_load(open(config_path))
+ with open(config_path) as f:
+ config = yaml.safe_load(f)
except Exception:
print(" Has yaml errors")
return
diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py
index 74d2cedf0..a3f9dda33 100644
--- a/tests/unit/test_circular_dependencies.py
+++ b/tests/unit/test_circular_dependencies.py
@@ -2267,8 +2267,8 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertEqual(B.patchsets[-1]["approvals"][0]["value"], "1")
self.assertHistory([
- dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
- dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
], ordered=False)
A.addPatchset()
@@ -2277,10 +2277,10 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.assertHistory([
# Original check run
- dict(name="test-job", result="SUCCESS", changes="2,1 1,1"),
- dict(name="test-job", result="SUCCESS", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
# Second check run
- dict(name="test-job", result="SUCCESS", changes="2,1 1,2"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,2"),
], ordered=False)
def test_deps_by_topic_multi_tenant(self):
@@ -2332,6 +2332,121 @@ class TestGerritCircularDependencies(ZuulTestCase):
dict(name="project6-job-t2", result="SUCCESS", changes="1,1 2,1"),
], ordered=False)
+ def test_dependency_refresh(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates the typical workflow where a developer only
+ # knows the change id of changes one at a time.
+ # The first change:
+ A = self.fake_gerrit.addFakeChange("org/project", "master", "A")
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Now that it has been uploaded, upload the second change and
+ # point it at the first.
+ # B -> A
+ B = self.fake_gerrit.addFakeChange("org/project", "master", "B")
+ B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ B.subject, A.data["url"]
+ )
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Now that the second change is known, update the first change
+ # B <-> A
+ A.addPatchset()
+ A.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ A.subject, B.data["url"]
+ )
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name="project-job", result="ABORTED", changes="1,1"),
+ dict(name="project-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="project-job", result="SUCCESS", changes="1,2 2,1"),
+ dict(name="project-job", result="SUCCESS", changes="2,1 1,2"),
+ ], ordered=False)
+
+ @simple_layout('layouts/deps-by-topic.yaml')
+ def test_dependency_refresh_by_topic_check(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates the typical workflow where a developer
+ # uploads changes one at a time.
+ # The first change:
+ A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ # Now that it has been uploaded, upload the second change
+ # in the same topic.
+ B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name="check-job", result="ABORTED", changes="1,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1"),
+ dict(name="check-job", result="SUCCESS", changes="1,1 2,1"),
+ ], ordered=False)
+
+ @simple_layout('layouts/deps-by-topic.yaml')
+ def test_dependency_refresh_by_topic_gate(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates a workflow where a developer adds a change to
+ # a cycle already in gate.
+ A = self.fake_gerrit.addFakeChange('org/project1', "master", "A",
+ topic='test-topic')
+ B = self.fake_gerrit.addFakeChange('org/project2', "master", "B",
+ topic='test-topic')
+ A.addApproval("Code-Review", 2)
+ B.addApproval("Code-Review", 2)
+ A.addApproval("Approved", 1)
+ self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
+ self.waitUntilSettled()
+
+ # Add a new change to the cycle.
+ C = self.fake_gerrit.addFakeChange('org/project1', "master", "C",
+ topic='test-topic')
+ self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ # At the end of this process, the gate jobs should be aborted
+ # because the new dpendency showed up.
+ self.assertEqual(A.data["status"], "NEW")
+ self.assertEqual(B.data["status"], "NEW")
+ self.assertEqual(C.data["status"], "NEW")
+ self.assertHistory([
+ dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="gate-job", result="ABORTED", changes="1,1 2,1"),
+ dict(name="check-job", result="SUCCESS", changes="2,1 1,1 3,1"),
+ ], ordered=False)
+
class TestGithubCircularDependencies(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
@@ -2524,3 +2639,48 @@ class TestGithubCircularDependencies(ZuulTestCase):
B.comments[-1]))
self.assertFalse(re.search('Change .*? is needed',
B.comments[-1]))
+
+ def test_dependency_refresh(self):
+ # Test that when two changes are put into a cycle, the
+ # dependencies are refreshed and items already in pipelines
+ # are updated.
+ self.executor_server.hold_jobs_in_build = True
+
+ # This simulates the typical workflow where a developer only
+ # knows the PR id of changes one at a time.
+ # The first change:
+ A = self.fake_github.openFakePullRequest("gh/project", "master", "A")
+ self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
+ self.waitUntilSettled()
+
+ # Now that it has been uploaded, upload the second change and
+ # point it at the first.
+ # B -> A
+ B = self.fake_github.openFakePullRequest("gh/project", "master", "B")
+ B.body = "{}\n\nDepends-On: {}\n".format(
+ B.subject, A.url
+ )
+ self.fake_github.emitEvent(B.getPullRequestOpenedEvent())
+ self.waitUntilSettled()
+
+ # Now that the second change is known, update the first change
+ # B <-> A
+ A.body = "{}\n\nDepends-On: {}\n".format(
+ A.subject, B.url
+ )
+
+ self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.subject))
+ self.waitUntilSettled()
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name="project-job", result="ABORTED",
+ changes=f"{A.number},{A.head_sha}"),
+ dict(name="project-job", result="SUCCESS",
+ changes=f"{A.number},{A.head_sha} {B.number},{B.head_sha}"),
+ dict(name="project-job", result="SUCCESS",
+ changes=f"{B.number},{B.head_sha} {A.number},{A.head_sha}"),
+ ], ordered=False)
diff --git a/tests/unit/test_configloader.py b/tests/unit/test_configloader.py
index d76eece12..81a05fc37 100644
--- a/tests/unit/test_configloader.py
+++ b/tests/unit/test_configloader.py
@@ -611,6 +611,12 @@ class TestTenantExcludeBranches(TestTenantIncludeBranches):
# Same test results as include-branches
+class TestTenantExcludeIncludeBranches(TestTenantIncludeBranches):
+ tenant_config_file = 'config/tenant-parser/exclude-include-branches.yaml'
+
+ # Same test results as include-branches
+
+
class TestTenantExcludeAll(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/exclude-all.yaml'
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index bae4ff258..26a99215e 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -65,7 +65,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
def _sql_tables_created(self, connection_name):
connection = self.scheds.first.connections.connections[connection_name]
- insp = sa.engine.reflection.Inspector(connection.engine)
+ insp = sa.inspect(connection.engine)
table_prefix = connection.table_prefix
self.assertEqual(self.expected_table_prefix, table_prefix)
@@ -82,7 +82,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
def _sql_indexes_created(self, connection_name):
connection = self.scheds.first.connections.connections[connection_name]
- insp = sa.engine.reflection.Inspector(connection.engine)
+ insp = sa.inspect(connection.engine)
table_prefix = connection.table_prefix
self.assertEqual(self.expected_table_prefix, table_prefix)
@@ -127,99 +127,134 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
- self.assertEqual(4, len(buildsets))
+ self.assertEqual(5, len(buildsets))
buildset0 = buildsets[0]
buildset1 = buildsets[1]
buildset2 = buildsets[2]
buildset3 = buildsets[3]
-
- self.assertEqual('check', buildset0['pipeline'])
- self.assertEqual('org/project', buildset0['project'])
- self.assertEqual(1, buildset0['change'])
- self.assertEqual('1', buildset0['patchset'])
- self.assertEqual('SUCCESS', buildset0['result'])
- self.assertEqual('Build succeeded.', buildset0['message'])
- self.assertEqual('tenant-one', buildset0['tenant'])
+ buildset4 = buildsets[4]
+
+ self.assertEqual('check', buildset0.pipeline)
+ self.assertEqual('org/project', buildset0.project)
+ self.assertEqual(1, buildset0.change)
+ self.assertEqual('1', buildset0.patchset)
+ self.assertEqual('SUCCESS', buildset0.result)
+ self.assertEqual('Build succeeded.', buildset0.message)
+ self.assertEqual('tenant-one', buildset0.tenant)
self.assertEqual(
- 'https://review.example.com/%d' % buildset0['change'],
- buildset0['ref_url'])
- self.assertNotEqual(None, buildset0['event_id'])
- self.assertNotEqual(None, buildset0['event_timestamp'])
+ 'https://review.example.com/%d' % buildset0.change,
+ buildset0.ref_url)
+ self.assertNotEqual(None, buildset0.event_id)
+ self.assertNotEqual(None, buildset0.event_timestamp)
buildset0_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
# Check the first result, which should be the project-merge job
self.assertEqual(
- 'project-merge', buildset0_builds[0]['job_name'])
- self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
- self.assertEqual(None, buildset0_builds[0]['log_url'])
- self.assertEqual('check', buildset1['pipeline'])
- self.assertEqual('master', buildset1['branch'])
- self.assertEqual('org/project', buildset1['project'])
- self.assertEqual(2, buildset1['change'])
- self.assertEqual('1', buildset1['patchset'])
- self.assertEqual('FAILURE', buildset1['result'])
- self.assertEqual('Build failed.', buildset1['message'])
+ 'project-merge', buildset0_builds[0].job_name)
+ self.assertEqual("SUCCESS", buildset0_builds[0].result)
+ self.assertEqual(None, buildset0_builds[0].log_url)
+ self.assertEqual('check', buildset1.pipeline)
+ self.assertEqual('master', buildset1.branch)
+ self.assertEqual('org/project', buildset1.project)
+ self.assertEqual(2, buildset1.change)
+ self.assertEqual('1', buildset1.patchset)
+ self.assertEqual('FAILURE', buildset1.result)
+ self.assertEqual('Build failed.', buildset1.message)
buildset1_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset1['id']
+ buildset1.id
)
).fetchall()
# Check the second result, which should be the project-test1
# job which failed
self.assertEqual(
- 'project-test1', buildset1_builds[1]['job_name'])
- self.assertEqual("FAILURE", buildset1_builds[1]['result'])
- self.assertEqual(None, buildset1_builds[1]['log_url'])
+ 'project-test1', buildset1_builds[1].job_name)
+ self.assertEqual("FAILURE", buildset1_builds[1].result)
+ self.assertEqual(None, buildset1_builds[1].log_url)
buildset2_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset2['id']
+ buildset2.id
)
).fetchall()
# Check the first result, which should be the project-publish
# job
self.assertEqual('project-publish',
- buildset2_builds[0]['job_name'])
- self.assertEqual("SUCCESS", buildset2_builds[0]['result'])
+ buildset2_builds[0].job_name)
+ self.assertEqual("SUCCESS", buildset2_builds[0].result)
buildset3_builds = conn.execute(
- sa.sql.select([
+ sa.sql.select(
reporter.connection.zuul_build_table
- ]).where(
+ ).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset3['id']
+ buildset3.id
)
).fetchall()
self.assertEqual(
- 'project-test1', buildset3_builds[1]['job_name'])
- self.assertEqual('NODE_FAILURE', buildset3_builds[1]['result'])
- self.assertEqual(None, buildset3_builds[1]['log_url'])
- self.assertIsNotNone(buildset3_builds[1]['start_time'])
- self.assertIsNotNone(buildset3_builds[1]['end_time'])
+ 'project-test1', buildset3_builds[1].job_name)
+ self.assertEqual('NODE_FAILURE', buildset3_builds[1].result)
+ self.assertEqual(None, buildset3_builds[1].log_url)
+ self.assertIsNotNone(buildset3_builds[1].start_time)
+ self.assertIsNotNone(buildset3_builds[1].end_time)
self.assertGreaterEqual(
- buildset3_builds[1]['end_time'],
- buildset3_builds[1]['start_time'])
+ buildset3_builds[1].end_time,
+ buildset3_builds[1].start_time)
+
+ # Check the paused build result
+ buildset4_builds = conn.execute(
+ sa.sql.select(
+ reporter.connection.zuul_build_table
+ ).where(
+ reporter.connection.zuul_build_table.c.buildset_id ==
+ buildset4.id
+ ).order_by(reporter.connection.zuul_build_table.c.id)
+ ).fetchall()
+
+ paused_build_events = conn.execute(
+ sa.sql.select(
+ reporter.connection.zuul_build_event_table
+ ).where(
+ reporter.connection.zuul_build_event_table.c.build_id
+ == buildset4_builds[0].id
+ )
+ ).fetchall()
+
+ self.assertEqual(len(paused_build_events), 2)
+ pause_event = paused_build_events[0]
+ resume_event = paused_build_events[1]
+ self.assertEqual(
+ pause_event.event_type, "paused")
+ self.assertIsNotNone(pause_event.event_time)
+ self.assertIsNone(pause_event.description)
+ self.assertEqual(
+ resume_event.event_type, "resumed")
+ self.assertIsNotNone(resume_event.event_time)
+ self.assertIsNone(resume_event.description)
+
+ self.assertGreater(
+ resume_event.event_time, pause_event.event_time)
self.executor_server.hold_jobs_in_build = True
@@ -264,6 +299,23 @@ class TestSQLConnectionMysql(ZuulTestCase):
self.orderedRelease()
self.waitUntilSettled()
+ # We are pausing a job within this test, so holding the jobs in
+ # build and releasing them in order becomes difficult as the
+ # paused job will either be paused or waiting on the child jobs
+ # to start.
+ # As we are not interested in the order the jobs are running but
+ # only on the results in the database, simply deactivate
+ # hold_jobs_in_build.
+ self.executor_server.hold_jobs_in_build = False
+
+ # Add a paused build result
+ self.log.debug("Adding paused build result")
+ D = self.fake_gerrit.addFakeChange("org/project", "master", "D")
+ self.executor_server.returnData(
+ "project-merge", D, {"zuul": {"pause": True}})
+ self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
check_results()
def test_sql_results_retry_builds(self):
@@ -281,51 +333,51 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
self.assertEqual(1, len(buildsets))
buildset0 = buildsets[0]
- self.assertEqual('check', buildset0['pipeline'])
- self.assertEqual('org/project', buildset0['project'])
- self.assertEqual(1, buildset0['change'])
- self.assertEqual('1', buildset0['patchset'])
- self.assertEqual('SUCCESS', buildset0['result'])
- self.assertEqual('Build succeeded.', buildset0['message'])
- self.assertEqual('tenant-one', buildset0['tenant'])
+ self.assertEqual('check', buildset0.pipeline)
+ self.assertEqual('org/project', buildset0.project)
+ self.assertEqual(1, buildset0.change)
+ self.assertEqual('1', buildset0.patchset)
+ self.assertEqual('SUCCESS', buildset0.result)
+ self.assertEqual('Build succeeded.', buildset0.message)
+ self.assertEqual('tenant-one', buildset0.tenant)
self.assertEqual(
- 'https://review.example.com/%d' % buildset0['change'],
- buildset0['ref_url'])
+ 'https://review.example.com/%d' % buildset0.change,
+ buildset0.ref_url)
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
# Check the retry results
- self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[0]['result'])
- self.assertTrue(buildset0_builds[0]['final'])
-
- self.assertEqual('project-test1', buildset0_builds[1]['job_name'])
- self.assertEqual('RETRY', buildset0_builds[1]['result'])
- self.assertFalse(buildset0_builds[1]['final'])
- self.assertEqual('project-test2', buildset0_builds[2]['job_name'])
- self.assertEqual('RETRY', buildset0_builds[2]['result'])
- self.assertFalse(buildset0_builds[2]['final'])
-
- self.assertEqual('project-test1', buildset0_builds[3]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[3]['result'])
- self.assertTrue(buildset0_builds[3]['final'])
- self.assertEqual('project-test2', buildset0_builds[4]['job_name'])
- self.assertEqual('SUCCESS', buildset0_builds[4]['result'])
- self.assertTrue(buildset0_builds[4]['final'])
+ self.assertEqual('project-merge', buildset0_builds[0].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[0].result)
+ self.assertTrue(buildset0_builds[0].final)
+
+ self.assertEqual('project-test1', buildset0_builds[1].job_name)
+ self.assertEqual('RETRY', buildset0_builds[1].result)
+ self.assertFalse(buildset0_builds[1].final)
+ self.assertEqual('project-test2', buildset0_builds[2].job_name)
+ self.assertEqual('RETRY', buildset0_builds[2].result)
+ self.assertFalse(buildset0_builds[2].final)
+
+ self.assertEqual('project-test1', buildset0_builds[3].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[3].result)
+ self.assertTrue(buildset0_builds[3].final)
+ self.assertEqual('project-test2', buildset0_builds[4].job_name)
+ self.assertEqual('SUCCESS', buildset0_builds[4].result)
+ self.assertTrue(buildset0_builds[4].final)
self.executor_server.hold_jobs_in_build = True
@@ -378,7 +430,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
@@ -387,10 +439,10 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
@@ -436,7 +488,7 @@ class TestSQLConnectionMysql(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table])
+ sa.sql.select(reporter.connection.zuul_buildset_table)
)
buildsets = result.fetchall()
@@ -445,10 +497,10 @@ class TestSQLConnectionMysql(ZuulTestCase):
buildset0_builds = conn.execute(
sa.sql.select(
- [reporter.connection.zuul_build_table]
+ reporter.connection.zuul_build_table
).where(
reporter.connection.zuul_build_table.c.buildset_id ==
- buildset0['id']
+ buildset0.id
)
).fetchall()
@@ -743,6 +795,37 @@ class TestMQTTConnection(ZuulTestCase):
self.assertIn('uuid', mqtt_payload)
self.assertEquals(dependent_test_job['dependencies'], ['test'])
+ def test_mqtt_paused_job(self):
+
+ A = self.fake_gerrit.addFakeChange("org/project", "master", "A")
+ # Let the job being paused via the executor
+ self.executor_server.returnData("test", A, {"zuul": {"pause": True}})
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ success_event = self.mqtt_messages.pop()
+
+ mqtt_payload = success_event["msg"]
+ self.assertEquals(mqtt_payload["project"], "org/project")
+ builds = mqtt_payload["buildset"]["builds"]
+ paused_job = [b for b in builds if b["job_name"] == "test"][0]
+ self.assertEquals(len(paused_job["events"]), 2)
+
+ pause_event = paused_job["events"][0]
+ self.assertEquals(pause_event["event_type"], "paused")
+ self.assertGreater(
+ pause_event["event_time"], paused_job["start_time"])
+ self.assertLess(pause_event["event_time"], paused_job["end_time"])
+
+ resume_event = paused_job["events"][1]
+ self.assertEquals(resume_event["event_type"], "resumed")
+ self.assertGreater(
+ resume_event["event_time"], paused_job["start_time"])
+ self.assertLess(resume_event["event_time"], paused_job["end_time"])
+
+ self.assertGreater(
+ resume_event["event_time"], pause_event["event_time"])
+
def test_mqtt_invalid_topic(self):
in_repo_conf = textwrap.dedent(
"""
diff --git a/tests/unit/test_event_queues.py b/tests/unit/test_event_queues.py
index eafd601e9..d93e088d8 100644
--- a/tests/unit/test_event_queues.py
+++ b/tests/unit/test_event_queues.py
@@ -637,6 +637,36 @@ class TestEventWatchers(EventQueueBaseTestCase):
result_queues["other-tenant"]["post"].put(result_event)
self._wait_for_event(event)
+ def test_pipeline_event_watcher_recreate(self):
+ event = threading.Event()
+ watcher = event_queues.EventWatcher(self.zk_client, event.set)
+
+ management_queues = (
+ event_queues.PipelineManagementEventQueue.createRegistry(
+ self.zk_client
+ )
+ )
+ self.assertFalse(event.is_set())
+
+ management_queues["tenant"]["check"].put(model.ReconfigureEvent())
+ self._wait_for_event(event)
+
+ # Wait for the watch to be fully established to avoid race
+ # conditions, since the event watcher will also ensure that the
+ # trigger and result event paths exist.
+ for _ in iterate_timeout(5, "all watches to be established"):
+ if watcher.watched_pipelines:
+ break
+
+ self.zk_client.client.delete(
+ event_queues.PIPELINE_NAME_ROOT.format(
+ tenant="tenant", pipeline="check"), recursive=True)
+ event.clear()
+
+ management_queues["tenant"]["check"].initialize()
+ management_queues["tenant"]["check"].put(model.ReconfigureEvent())
+ self._wait_for_event(event)
+
class TestConnectionEventQueue(EventQueueBaseTestCase):
diff --git a/tests/unit/test_gerrit.py b/tests/unit/test_gerrit.py
index 2e3057af6..2a63d5ef8 100644
--- a/tests/unit/test_gerrit.py
+++ b/tests/unit/test_gerrit.py
@@ -957,3 +957,48 @@ class TestGerritConnection(ZuulTestCase):
self.assertEqual(B.queried, 2)
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
+
+
+class TestGerritUnicodeRefs(ZuulTestCase):
+ config_file = 'zuul-gerrit-web.conf'
+ tenant_config_file = 'config/single-tenant/main.yaml'
+
+ upload_pack_data = (b'014452944ee370db5c87691e62e0f9079b6281319b4e HEAD'
+ b'\x00multi_ack thin-pack side-band side-band-64k '
+ b'ofs-delta shallow deepen-since deepen-not '
+ b'deepen-relative no-progress include-tag '
+ b'multi_ack_detailed allow-tip-sha1-in-want '
+ b'allow-reachable-sha1-in-want '
+ b'symref=HEAD:refs/heads/faster filter '
+ b'object-format=sha1 agent=git/2.37.1.gl1\n'
+ b'003d5f42665d737b3fd4ec22ca0209e6191859f09fd6 '
+ b'refs/for/faster\n'
+ b'004952944ee370db5c87691e62e0f9079b6281319b4e '
+ b'refs/heads/foo/\xf0\x9f\x94\xa5\xf0\x9f\x94\xa5'
+ b'\xf0\x9f\x94\xa5\n'
+ b'003f52944ee370db5c87691e62e0f9079b6281319b4e '
+ b'refs/heads/faster\n0000').decode("utf-8")
+
+ def test_mb_unicode_refs(self):
+ gerrit_config = {
+ 'user': 'gerrit',
+ 'server': 'localhost',
+ }
+ driver = GerritDriver()
+ gerrit = GerritConnection(driver, 'review_gerrit', gerrit_config)
+
+ def _uploadPack(project):
+ return self.upload_pack_data
+
+ self.patch(gerrit, '_uploadPack', _uploadPack)
+
+ project = gerrit.source.getProject('org/project')
+ refs = gerrit.getInfoRefs(project)
+
+ self.assertEqual(refs,
+ {'refs/for/faster':
+ '5f42665d737b3fd4ec22ca0209e6191859f09fd6',
+ 'refs/heads/foo/🔥🔥🔥':
+ '52944ee370db5c87691e62e0f9079b6281319b4e',
+ 'refs/heads/faster':
+ '52944ee370db5c87691e62e0f9079b6281319b4e'})
diff --git a/tests/unit/test_git_driver.py b/tests/unit/test_git_driver.py
index 06e2ac7c8..95fca30d3 100644
--- a/tests/unit/test_git_driver.py
+++ b/tests/unit/test_git_driver.py
@@ -62,7 +62,8 @@ class TestGitDriver(ZuulTestCase):
# Update zuul.yaml to force a tenant reconfiguration
path = os.path.join(self.upstream_root, 'common-config', 'zuul.yaml')
- config = yaml.safe_load(open(path, 'r').read())
+ with open(path, 'r') as f:
+ config = yaml.safe_load(f)
change = {
'name': 'org/project',
'check': {
diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py
index 47cc6c624..fb46aa7d1 100644
--- a/tests/unit/test_github_driver.py
+++ b/tests/unit/test_github_driver.py
@@ -18,8 +18,10 @@ import re
from testtools.matchers import MatchesRegex, Not, StartsWith
import urllib
import socket
+import threading
import time
import textwrap
+from concurrent.futures import ThreadPoolExecutor
from unittest import mock, skip
import git
@@ -32,10 +34,11 @@ from zuul.zk.layout import LayoutState
from zuul.lib import strings
from zuul.merger.merger import Repo
from zuul.model import MergeRequest, EnqueueEvent, DequeueEvent
+from zuul.zk.change_cache import ChangeKey
from tests.base import (AnsibleZuulTestCase, BaseTestCase,
ZuulGithubAppTestCase, ZuulTestCase,
- simple_layout, random_sha1)
+ simple_layout, random_sha1, iterate_timeout)
from tests.base import ZuulWebFixture
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
@@ -1463,6 +1466,65 @@ class TestGithubDriver(ZuulTestCase):
self.assertEqual('SUCCESS',
self.getJobFromHistory('project-test2').result)
+ @simple_layout('layouts/github-merge-mode.yaml', driver='github')
+ def test_merge_method_syntax_check(self):
+ """
+ Tests that the merge mode gets forwarded to the reporter and the
+ PR was rebased.
+ """
+ github = self.fake_github.getGithubClient()
+ repo = github.repo_from_project('org/project')
+ repo._repodata['allow_rebase_merge'] = False
+ self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
+
+ tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
+ loading_errors = tenant.layout.loading_errors
+ self.assertEquals(
+ len(tenant.layout.loading_errors), 1,
+ "An error should have been stored")
+ self.assertIn(
+ "rebase not supported",
+ str(loading_errors[0].error))
+
+ @simple_layout("layouts/basic-github.yaml", driver="github")
+ def test_concurrent_get_change(self):
+ """
+ Test that getting a change concurrently returns the same
+ object from the cache.
+ """
+ conn = self.scheds.first.sched.connections.connections["github"]
+
+ # Create a new change object and remove it from the cache so
+ # the concurrent call will try to create a new change object.
+ A = self.fake_github.openFakePullRequest("org/project", "master", "A")
+ change_key = ChangeKey(conn.connection_name, "org/project",
+ "PullRequest", str(A.number), str(A.head_sha))
+ change = conn.getChange(change_key, refresh=True)
+ conn._change_cache.delete(change_key)
+
+ # Acquire the update lock so the concurrent get task needs to
+ # wait for the lock to be release.
+ lock = conn._change_update_lock.setdefault(change_key,
+ threading.Lock())
+ lock.acquire()
+ try:
+ executor = ThreadPoolExecutor(max_workers=1)
+ task = executor.submit(conn.getChange, change_key, refresh=True)
+ for _ in iterate_timeout(5, "task to be running"):
+ if task.running():
+ break
+ # Add the change back so the waiting task can get the
+ # change from the cache.
+ conn._change_cache.set(change_key, change)
+ finally:
+ lock.release()
+ executor.shutdown()
+
+ other_change = task.result()
+ self.assertIsNotNone(other_change.cache_stat)
+ self.assertIs(change, other_change)
+
class TestMultiGithubDriver(ZuulTestCase):
config_file = 'zuul-multi-github.conf'
diff --git a/tests/unit/test_inventory.py b/tests/unit/test_inventory.py
index 5b30a139f..40f858624 100644
--- a/tests/unit/test_inventory.py
+++ b/tests/unit/test_inventory.py
@@ -57,7 +57,8 @@ class TestInventoryBase(ZuulTestCase):
build = self.getBuildByName(name)
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
return inventory
def _get_setup_inventory(self, name):
@@ -65,7 +66,9 @@ class TestInventoryBase(ZuulTestCase):
build = self.getBuildByName(name)
setup_inv_path = build.jobdir.setup_playbook.inventory
- return yaml.ansible_unsafe_load(open(setup_inv_path, 'r'))
+ with open(setup_inv_path, 'r') as f:
+ inventory = yaml.ansible_unsafe_load(f)
+ return inventory
def runJob(self, name):
self.hold_jobs_in_queue = False
@@ -409,26 +412,34 @@ class TestAnsibleInventory(AnsibleZuulTestCase):
build = self.history[0]
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
zv_path = os.path.join(build.jobdir.root, 'ansible', 'zuul_vars.yaml')
- zv = yaml.safe_load(open(zv_path, 'r'))
+ with open(zv_path, 'r') as f:
+ zv = yaml.ansible_unsafe_load(f)
# TODO(corvus): zuul vars aren't really stored here anymore;
# rework these tests to examine them separately.
inventory['all']['vars'] = {'zuul': zv['zuul']}
+ # The deprecated base64 version
decoded_message = base64.b64decode(
inventory['all']['vars']['zuul']['message']).decode('utf-8')
self.assertEqual(decoded_message, expected_message)
-
obtained_message = self._get_file(self.history[0],
'work/logs/commit-message.txt')
+ self.assertEqual(obtained_message, expected_message)
+ # The new !unsafe version
+ decoded_message = inventory['all']['vars']['zuul']['change_message']
+ self.assertEqual(decoded_message, expected_message)
+ obtained_message = self._get_file(self.history[0],
+ 'work/logs/change-message.txt')
self.assertEqual(obtained_message, expected_message)
def test_jinja2_message_brackets(self):
- self._jinja2_message("This message has {{ jinja2 }} in it ")
+ self._jinja2_message("This message has {{ ansible_host }} in it ")
def test_jinja2_message_raw(self):
self._jinja2_message("This message has {% raw %} in {% endraw %} it ")
diff --git a/tests/unit/test_merger_repo.py b/tests/unit/test_merger_repo.py
index fd78726ab..f907cb8b4 100644
--- a/tests/unit/test_merger_repo.py
+++ b/tests/unit/test_merger_repo.py
@@ -16,6 +16,8 @@
import datetime
import logging
import os
+import shutil
+from unittest import mock
import git
import testtools
@@ -161,6 +163,64 @@ class TestMergerRepo(ZuulTestCase):
work_repo.reset()
work_repo.checkout("foobar")
+ def test_rebase_merge_conflict_abort(self):
+ """Test that a failed rebase is properly aborted and related
+ directories are cleaned up."""
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ parent_repo = git.Repo(parent_path)
+ parent_repo.create_head("feature")
+
+ files = {"test.txt": "master"}
+ self.create_commit("org/project1", files=files, head="master",
+ message="Add master file")
+
+ files = {"test.txt": "feature"}
+ self.create_commit("org/project1", files=files, head="feature",
+ message="Add feature file")
+
+ work_repo = Repo(parent_path, self.workspace_root,
+ "none@example.org", "User Name", "0", "0")
+
+ item = {"ref": "refs/heads/feature"}
+ # We expect the rebase to fail because of a conflict, but the
+ # rebase will be aborted.
+ with testtools.ExpectedException(git.exc.GitCommandError):
+ work_repo.rebaseMerge(item, "master")
+
+ # Assert that the failed rebase doesn't leave any temporary
+ # directories behind.
+ self.assertFalse(
+ os.path.exists(f"{work_repo.local_path}/.git/rebase-merge"))
+ self.assertFalse(
+ os.path.exists(f"{work_repo.local_path}/.git/rebase-apply"))
+
+ def test_rebase_merge_conflict_reset_cleanup(self):
+ """Test temporary directories of a failed rebase merge are
+ removed on repo reset."""
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ parent_repo = git.Repo(parent_path)
+ parent_repo.create_head("feature")
+
+ files = {"master.txt": "master"}
+ self.create_commit("org/project1", files=files, head="master",
+ message="Add master file")
+
+ files = {"feature.txt": "feature"}
+ self.create_commit("org/project1", files=files, head="feature",
+ message="Add feature file")
+
+ work_repo = Repo(parent_path, self.workspace_root,
+ "none@example.org", "User Name", "0", "0")
+
+ # Simulate leftovers from a failed rebase
+ os.mkdir(f"{work_repo.local_path}/.git/rebase-merge")
+ os.mkdir(f"{work_repo.local_path}/.git/rebase-apply")
+
+ # Resetting the repo should clean up any leaked directories
+ work_repo.reset()
+ item = {"ref": "refs/heads/feature"}
+ work_repo.rebaseMerge(item, "master")
+
def test_set_refs(self):
parent_path = os.path.join(self.upstream_root, 'org/project1')
remote_sha = self.create_commit('org/project1')
@@ -678,6 +738,59 @@ class TestMergerRepo(ZuulTestCase):
new_result = work_repo_underlying.commit('testtag')
self.assertEqual(new_commit, new_result)
+ def test_set_remote_url_clone(self):
+ """Test that we always use the new Git URL for cloning.
+
+ This is a regression test to make sure we always use the new
+ Git URL when a clone of the repo is necessary before updating
+ the config.
+ """
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0')
+
+ # Simulate an invalid/outdated remote URL with the repo no
+ # longer existing on the file system.
+ work_repo.remote_url = "file:///dev/null"
+ shutil.rmtree(work_repo.local_path)
+
+ # Setting a valid remote URL should update the attribute and
+ # clone the repository.
+ work_repo.setRemoteUrl(parent_path)
+ self.assertEqual(work_repo.remote_url, parent_path)
+ self.assertTrue(os.path.exists(work_repo.local_path))
+
+ def test_set_remote_url_invalid(self):
+ """Test that we don't store the Git URL when failing to set it.
+
+ This is a regression test to make sure we will always update
+ the Git URL after a previously failed attempt.
+ """
+ parent_path = os.path.join(self.upstream_root, 'org/project1')
+ work_repo = Repo(parent_path, self.workspace_root,
+ 'none@example.org', 'User Name', '0', '0')
+
+ # Set the Git remote URL to an invalid value.
+ invalid_url = "file:///dev/null"
+ repo = work_repo.createRepoObject(None)
+ work_repo._git_set_remote_url(repo, invalid_url)
+ work_repo.remote_url = invalid_url
+
+ # Simulate a failed attempt to update the remote URL
+ with mock.patch.object(work_repo, "_git_set_remote_url",
+ side_effect=RuntimeError):
+ with testtools.ExpectedException(RuntimeError):
+ work_repo.setRemoteUrl(parent_path)
+
+ # Make sure we cleared out the remote URL.
+ self.assertIsNone(work_repo.remote_url)
+
+ # Setting a valid remote URL should update the attribute and
+ # clone the repository.
+ work_repo.setRemoteUrl(parent_path)
+ self.assertEqual(work_repo.remote_url, parent_path)
+ self.assertTrue(os.path.exists(work_repo.local_path))
+
class TestMergerWithAuthUrl(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py
index aed40b94e..98f971948 100644
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -32,7 +32,9 @@ import zuul.lib.connections
from tests.base import BaseTestCase, FIXTURE_DIR
from zuul.lib.ansible import AnsibleManager
from zuul.lib import tracing
+from zuul.model_api import MODEL_API
from zuul.zk.zkobject import LocalZKContext
+from zuul.zk.components import COMPONENT_REGISTRY
from zuul import change_matcher
@@ -44,6 +46,8 @@ class Dummy(object):
class TestJob(BaseTestCase):
def setUp(self):
+ COMPONENT_REGISTRY.registry = Dummy()
+ COMPONENT_REGISTRY.registry.model_api = MODEL_API
self._env_fixture = self.useFixture(
fixtures.EnvironmentVariable('HISTTIMEFORMAT', '%Y-%m-%dT%T%z '))
super(TestJob, self).setUp()
diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py
index 8e4cdc20c..c6cdee7ea 100644
--- a/tests/unit/test_model_upgrade.py
+++ b/tests/unit/test_model_upgrade.py
@@ -17,6 +17,7 @@ import json
from zuul.zk.components import ComponentRegistry
from tests.base import ZuulTestCase, simple_layout, iterate_timeout
+from tests.base import ZuulWebFixture
def model_version(version):
@@ -253,6 +254,72 @@ class TestModelUpgrade(ZuulTestCase):
result='SUCCESS', changes='1,1'),
], ordered=False)
+ @model_version(11)
+ def test_model_11_12(self):
+ # This excercises the upgrade to store build/job versions
+ first = self.scheds.first
+ second = self.createScheduler()
+ second.start()
+ self.assertEqual(len(self.scheds), 2)
+ for _ in iterate_timeout(10, "until priming is complete"):
+ state_one = first.sched.local_layout_state.get("tenant-one")
+ if state_one:
+ break
+
+ for _ in iterate_timeout(
+ 10, "all schedulers to have the same layout state"):
+ if (second.sched.local_layout_state.get(
+ "tenant-one") == state_one):
+ break
+
+ self.executor_server.hold_jobs_in_build = True
+ with second.sched.layout_update_lock, second.sched.run_handler_lock:
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled(matcher=[first])
+
+ self.model_test_component_info.model_api = 12
+ with first.sched.layout_update_lock, first.sched.run_handler_lock:
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled(matcher=[second])
+
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='SUCCESS', changes='1,1'),
+ dict(name='project-test2', result='SUCCESS', changes='1,1'),
+ dict(name='project1-project2-integration',
+ result='SUCCESS', changes='1,1'),
+ ], ordered=False)
+
+ @model_version(12)
+ def test_model_12_13(self):
+ # Initially queue items will still have the full trigger event
+ # stored in Zookeeper. The trigger event will be converted to
+ # an event info object after the model API update.
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+
+ self.assertEqual(len(self.builds), 1)
+
+ # Upgrade our component
+ self.model_test_component_info.model_api = 13
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='SUCCESS', changes='1,1'),
+ dict(name='project-test2', result='SUCCESS', changes='1,1'),
+ dict(name='project1-project2-integration',
+ result='SUCCESS', changes='1,1'),
+ ], ordered=False)
+
class TestGithubModelUpgrade(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
@@ -305,6 +372,85 @@ class TestGithubModelUpgrade(ZuulTestCase):
], ordered=False)
self.assertTrue(A.is_merged)
+ @model_version(10)
+ @simple_layout('layouts/github-merge-mode.yaml', driver='github')
+ def test_merge_method_syntax_check(self):
+ """
+ Tests that the merge mode gets forwarded to the reporter and the
+ PR was rebased.
+ """
+ webfixture = self.useFixture(
+ ZuulWebFixture(self.changes, self.config,
+ self.additional_event_queues, self.upstream_root,
+ self.poller_events,
+ self.git_url_with_auth, self.addCleanup,
+ self.test_root))
+ sched = self.scheds.first.sched
+ web = webfixture.web
+
+ github = self.fake_github.getGithubClient()
+ repo = github.repo_from_project('org/project')
+ repo._repodata['allow_rebase_merge'] = False
+ self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
+ self.waitUntilSettled()
+
+ # Verify that there are no errors with model version 9 (we
+ # should be using the defaultdict that indicates all merge
+ # modes are supported).
+ tenant = sched.abide.tenants.get('tenant-one')
+ self.assertEquals(len(tenant.layout.loading_errors), 0)
+
+ # Upgrade our component
+ self.model_test_component_info.model_api = 11
+
+ # Perform a smart reconfiguration which should not clear the
+ # cache; we should continue to see no errors because we should
+ # still be using the defaultdict.
+ self.scheds.first.smartReconfigure()
+ tenant = sched.abide.tenants.get('tenant-one')
+ self.assertEquals(len(tenant.layout.loading_errors), 0)
+
+ # Wait for web to have the same config
+ for _ in iterate_timeout(10, "config is synced"):
+ if (web.tenant_layout_state.get('tenant-one') ==
+ web.local_layout_state.get('tenant-one')):
+ break
+
+ # Repeat the check
+ tenant = web.abide.tenants.get('tenant-one')
+ self.assertEquals(len(tenant.layout.loading_errors), 0)
+
+ # Perform a full reconfiguration which should cause us to
+ # actually query, update the branch cache, and report an
+ # error.
+ self.scheds.first.fullReconfigure()
+ self.waitUntilSettled()
+
+ tenant = sched.abide.tenants.get('tenant-one')
+ loading_errors = tenant.layout.loading_errors
+ self.assertEquals(
+ len(tenant.layout.loading_errors), 1,
+ "An error should have been stored in sched")
+ self.assertIn(
+ "rebase not supported",
+ str(loading_errors[0].error))
+
+ # Wait for web to have the same config
+ for _ in iterate_timeout(10, "config is synced"):
+ if (web.tenant_layout_state.get('tenant-one') ==
+ web.local_layout_state.get('tenant-one')):
+ break
+
+ # Repoat the check for web
+ tenant = web.abide.tenants.get('tenant-one')
+ loading_errors = tenant.layout.loading_errors
+ self.assertEquals(
+ len(tenant.layout.loading_errors), 1,
+ "An error should have been stored in web")
+ self.assertIn(
+ "rebase not supported",
+ str(loading_errors[0].error))
+
class TestDeduplication(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
diff --git a/tests/unit/test_reporting.py b/tests/unit/test_reporting.py
index 2cf93cdcb..0c5c5fbc9 100644
--- a/tests/unit/test_reporting.py
+++ b/tests/unit/test_reporting.py
@@ -151,7 +151,7 @@ class TestReporting(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
for x in buildsets:
@@ -180,7 +180,7 @@ class TestReporting(ZuulTestCase):
engine.connect() as conn:
result = conn.execute(
- sa.sql.select([reporter.connection.zuul_buildset_table]))
+ sa.sql.select(reporter.connection.zuul_buildset_table))
buildsets = result.fetchall()
for x in buildsets:
diff --git a/tests/unit/test_requirements.py b/tests/unit/test_requirements.py
index 9a32e4b21..9f3b87187 100644
--- a/tests/unit/test_requirements.py
+++ b/tests/unit/test_requirements.py
@@ -453,3 +453,40 @@ class TestRequirementsReject(ZuulTestCase):
self.fake_gerrit.addEvent(comment)
self.waitUntilSettled()
self.assertEqual(len(self.history), 3)
+
+
+class TestRequirementsTrustedCheck(ZuulTestCase):
+ config_file = "zuul-gerrit-github.conf"
+ tenant_config_file = "config/requirements/trusted-check/main.yaml"
+
+ def test_non_live_requirements(self):
+ # Test that pipeline requirements are applied to non-live
+ # changes.
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.setDependsOn(A, 1)
+ B.addApproval('Code-Review', 2)
+
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([])
+
+ self.fake_gerrit.addEvent(A.addApproval('Code-Review', 2))
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([
+ dict(name='check-job', result='SUCCESS', changes='1,1 2,1')],
+ ordered=False)
+
+ def test_other_connections(self):
+ # Test allow-other-connections: False
+ A = self.fake_github.openFakePullRequest("gh/project", "master", "A")
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.data["commitMessage"] = "{}\n\nDepends-On: {}\n".format(
+ B.subject, A.url,
+ )
+ B.addApproval('Code-Review', 2)
+
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertHistory([])
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 6a0da1279..172ed34dc 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -52,8 +52,9 @@ from tests.base import (
skipIfMultiScheduler,
)
from zuul.zk.change_cache import ChangeKey
+from zuul.zk.event_queues import PIPELINE_NAME_ROOT
from zuul.zk.layout import LayoutState
-from zuul.zk.locks import management_queue_lock
+from zuul.zk.locks import management_queue_lock, pipeline_lock
from zuul.zk import zkobject
EMPTY_LAYOUT_STATE = LayoutState("", "", 0, None, {}, -1)
@@ -460,6 +461,7 @@ class TestScheduler(ZuulTestCase):
'zuul.mergers.online', value='1', kind='g')
self.assertReportedStat('zuul.scheduler.eventqueues.connection.gerrit',
value='0', kind='g')
+ self.assertReportedStat('zuul.scheduler.run_handler', kind='ms')
# Catch time / monotonic errors
for key in [
@@ -489,9 +491,10 @@ class TestScheduler(ZuulTestCase):
'zuul.tenant.tenant-one.pipeline.gate.write_objects',
'zuul.tenant.tenant-one.pipeline.gate.read_znodes',
'zuul.tenant.tenant-one.pipeline.gate.write_znodes',
- 'zuul.tenant.tenant-one.pipeline.gate.read_bytes',
'zuul.tenant.tenant-one.pipeline.gate.write_bytes',
]:
+ # 'zuul.tenant.tenant-one.pipeline.gate.read_bytes' is
+ # expected to be zero since it's initialized after reading
val = self.assertReportedStat(key, kind='g')
self.assertTrue(0.0 < float(val) < 60000.0)
@@ -2807,6 +2810,7 @@ class TestScheduler(ZuulTestCase):
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
B.subject, A.data['url'])
+ A.addApproval('Code-Review', 2)
B.addApproval('Code-Review', 2)
self.executor_server.hold_jobs_in_build = True
@@ -3585,8 +3589,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3608,8 +3615,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3631,8 +3641,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3653,8 +3666,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3676,8 +3692,11 @@ class TestScheduler(ZuulTestCase):
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
- with self.createZKContext() as ctx,\
- gate.manager.currentContext(ctx):
+ with pipeline_lock(
+ self.zk_client, tenant.name,
+ gate.name) as lock,\
+ self.createZKContext(lock) as ctx,\
+ gate.manager.currentContext(ctx):
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
@@ -3713,6 +3732,22 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(self.history[4].pipeline, 'check')
self.assertEqual(self.history[5].pipeline, 'check')
+ @simple_layout('layouts/two-check.yaml')
+ def test_query_dependency_count(self):
+ # Test that we efficiently query dependent changes
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
+ B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
+ B.subject, A.data['url'])
+ self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ # 1. The query to find the change id
+ # from the Depends-On string "change:1" (simpleQuery)
+ # 2. The query to populate the change once we know the id
+ # (queryChange)
+ self.assertEqual(A.queried, 2)
+ self.assertEqual(B.queried, 1)
+
def test_reconfigure_merge(self):
"""Test that two reconfigure events are merged"""
# Wrap the recofiguration handler so we can count how many
@@ -3925,6 +3960,10 @@ class TestScheduler(ZuulTestCase):
else:
time.sleep(0)
+ self.assertGreater(new.last_reconfigured, old.last_reconfigured)
+ self.assertGreater(new.last_reconfigure_event_ltime,
+ old.last_reconfigure_event_ltime)
+
def test_tenant_reconfiguration_command_socket(self):
"Test that single-tenant reconfiguration via command socket works"
@@ -6182,7 +6221,8 @@ For CI problems and help debugging, contact ci@example.org"""
build = self.getBuildByName('check-job')
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
label = inventory['all']['hosts']['controller']['nodepool']['label']
self.assertEqual('slow-label', label)
@@ -6491,6 +6531,33 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'MERGED')
+ def test_leaked_pipeline_cleanup(self):
+ self.waitUntilSettled()
+ sched = self.scheds.first.sched
+
+ pipeline_state_path = "/zuul/tenant/tenant-one/pipeline/invalid"
+ self.zk_client.client.ensure_path(pipeline_state_path)
+
+ # Create the ZK path as a side-effect of getting the event queue.
+ sched.pipeline_management_events["tenant-one"]["invalid"]
+ pipeline_event_queue_path = PIPELINE_NAME_ROOT.format(
+ tenant="tenant-one", pipeline="invalid")
+
+ self.assertIsNotNone(self.zk_client.client.exists(pipeline_state_path))
+ # Wait for the event watcher to create the event queues
+ for _ in iterate_timeout(30, "create event queues"):
+ for event_queue in ("management", "trigger", "result"):
+ if self.zk_client.client.exists(
+ f"{pipeline_event_queue_path}/{event_queue}") is None:
+ break
+ else:
+ break
+
+ sched._runLeakedPipelineCleanup()
+ self.assertIsNone(
+ self.zk_client.client.exists(pipeline_event_queue_path))
+ self.assertIsNone(self.zk_client.client.exists(pipeline_state_path))
+
class TestChangeQueues(ZuulTestCase):
tenant_config_file = 'config/change-queues/main.yaml'
diff --git a/tests/unit/test_sos.py b/tests/unit/test_sos.py
index 37a47c6ae..4f2110f3e 100644
--- a/tests/unit/test_sos.py
+++ b/tests/unit/test_sos.py
@@ -244,6 +244,79 @@ class TestScaleOutScheduler(ZuulTestCase):
self.assertTrue(all(l == new.uuid for l in layout_uuids))
self.waitUntilSettled()
+ def test_live_reconfiguration_del_pipeline(self):
+ # Test pipeline deletion while changes are enqueued
+
+ # Create a second scheduler instance
+ app = self.createScheduler()
+ app.start()
+ self.assertEqual(len(self.scheds), 2)
+
+ for _ in iterate_timeout(10, "Wait until priming is complete"):
+ old = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
+ if old is not None:
+ break
+
+ for _ in iterate_timeout(
+ 10, "Wait for all schedulers to have the same layout state"):
+ layout_states = [a.sched.local_layout_state.get("tenant-one")
+ for a in self.scheds.instances]
+ if all(l == old for l in layout_states):
+ break
+
+ pipeline_zk_path = app.sched.abide.tenants[
+ "tenant-one"].layout.pipelines["check"].state.getPath()
+
+ self.executor_server.hold_jobs_in_build = True
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+
+ # Let the first scheduler enqueue the change into the pipeline that
+ # will be removed later on.
+ with app.sched.run_handler_lock:
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled(matcher=[self.scheds.first])
+
+ # Process item only on second scheduler so the first scheduler has
+ # an outdated pipeline state.
+ with self.scheds.first.sched.run_handler_lock:
+ self.executor_server.release('.*-merge')
+ self.waitUntilSettled(matcher=[app])
+ self.assertEqual(len(self.builds), 2)
+
+ self.commitConfigUpdate(
+ 'common-config',
+ 'layouts/live-reconfiguration-del-pipeline.yaml')
+ # Trigger a reconfiguration on the first scheduler with the outdated
+ # pipeline state of the pipeline that will be removed.
+ self.scheds.execute(lambda a: a.sched.reconfigure(a.config),
+ matcher=[self.scheds.first])
+
+ new = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
+ for _ in iterate_timeout(
+ 10, "Wait for all schedulers to have the same layout state"):
+ layout_states = [a.sched.local_layout_state.get("tenant-one")
+ for a in self.scheds.instances]
+ if all(l == new for l in layout_states):
+ break
+
+ self.executor_server.hold_jobs_in_build = False
+ self.executor_server.release()
+ self.waitUntilSettled()
+
+ self.assertEqual(A.data['status'], 'NEW')
+ self.assertEqual(A.reported, 0)
+
+ self.assertHistory([
+ dict(name='project-merge', result='SUCCESS', changes='1,1'),
+ dict(name='project-test1', result='ABORTED', changes='1,1'),
+ dict(name='project-test2', result='ABORTED', changes='1,1'),
+ ], ordered=False)
+
+ tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
+ self.assertEqual(len(tenant.layout.pipelines), 0)
+ stat = self.zk_client.client.exists(pipeline_zk_path)
+ self.assertIsNone(stat)
+
def test_change_cache(self):
# Test re-using a change from the change cache.
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py
index ba3117f59..7e6a2e635 100644
--- a/tests/unit/test_streaming.py
+++ b/tests/unit/test_streaming.py
@@ -138,6 +138,17 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase):
s.close()
self.streamer.stop()
+ def _readSocket(self, sock, build_uuid, event, name):
+ msg = "%s\r\n" % build_uuid
+ sock.sendall(msg.encode('utf-8'))
+ event.set() # notify we are connected and req sent
+ while True:
+ data = sock.recv(1024)
+ if not data:
+ break
+ self.streaming_data[name] += data.decode('utf-8')
+ sock.shutdown(socket.SHUT_RDWR)
+
def runFingerClient(self, build_uuid, gateway_address, event, name=None):
# Wait until the gateway is started
for x in iterate_timeout(30, "finger client to start"):
@@ -154,7 +165,7 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase):
self.streaming_data[name] = ''
with socket.create_connection(gateway_address) as s:
if self.fingergw_use_ssl:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = False
context.load_cert_chain(
@@ -162,17 +173,10 @@ class TestStreamingBase(tests.base.AnsibleZuulTestCase):
os.path.join(FIXTURE_DIR, 'fingergw/fingergw.key'))
context.load_verify_locations(
os.path.join(FIXTURE_DIR, 'fingergw/root-ca.pem'))
- s = context.wrap_socket(s)
-
- msg = "%s\r\n" % build_uuid
- s.sendall(msg.encode('utf-8'))
- event.set() # notify we are connected and req sent
- while True:
- data = s.recv(1024)
- if not data:
- break
- self.streaming_data[name] += data.decode('utf-8')
- s.shutdown(socket.SHUT_RDWR)
+ with context.wrap_socket(s) as s:
+ self._readSocket(s, build_uuid, event, name)
+ else:
+ self._readSocket(s, build_uuid, event, name)
def runFingerGateway(self, zone=None):
self.log.info('Starting fingergw with zone %s', zone)
diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py
index 81d95927a..004ede862 100644
--- a/tests/unit/test_v3.py
+++ b/tests/unit/test_v3.py
@@ -1557,6 +1557,43 @@ class TestInRepoConfig(ZuulTestCase):
'start_line': 5},
})
+ def test_dynamic_config_job_anchors(self):
+ # Test the use of anchors in job configuration. This is a
+ # regression test designed to catch a failure where we freeze
+ # the first job and in doing so, mutate the vars dict. The
+ # intended behavior is that the two jobs end up with two
+ # separate python objects for their vars dicts.
+ in_repo_conf = textwrap.dedent(
+ """
+ - job:
+ name: myvars
+ vars: &anchor
+ plugins:
+ foo: bar
+
+ - job:
+ name: project-test1
+ timeout: 999999999999
+ vars: *anchor
+
+ - project:
+ name: org/project
+ check:
+ jobs:
+ - project-test1
+ """)
+
+ file_dict = {'.zuul.yaml': in_repo_conf}
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
+ files=file_dict)
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+ self.waitUntilSettled()
+ self.assertEqual(A.reported, 1,
+ "A should report failure")
+ self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1")
+ self.assertIn('max-job-timeout', A.messages[0])
+ self.assertHistory([])
+
def test_dynamic_config_non_existing_job_in_template(self):
"""Test that requesting a non existent job fails"""
in_repo_conf = textwrap.dedent(
@@ -5283,7 +5320,8 @@ class TestRoleBranches(RoleTestCase):
def getBuildInventory(self, name):
build = self.getBuildByName(name)
inv_path = os.path.join(build.jobdir.root, 'ansible', 'inventory.yaml')
- inventory = yaml.safe_load(open(inv_path, 'r'))
+ with open(inv_path, 'r') as f:
+ inventory = yaml.safe_load(f)
return inventory
def getCheckout(self, build, path):
diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py
index 7e3c19dfe..b5697ee36 100644
--- a/tests/unit/test_zk.py
+++ b/tests/unit/test_zk.py
@@ -18,6 +18,7 @@ import json
import queue
import threading
import uuid
+from unittest import mock
import testtools
@@ -53,10 +54,12 @@ from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
iterate_timeout
)
-from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext
+from zuul.zk.zkobject import (
+ ShardedZKObject, ZKObject, ZKContext
+)
from zuul.zk.locks import tenant_write_lock
-from kazoo.exceptions import ZookeeperError, OperationTimeoutError
+from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError
class ZooKeeperBaseTestCase(BaseTestCase):
@@ -2037,3 +2040,80 @@ class TestBlobStore(ZooKeeperBaseTestCase):
with testtools.ExpectedException(KeyError):
bs.get(path)
+
+
+class TestPipelineInit(ZooKeeperBaseTestCase):
+ # Test the initialize-on-refresh code paths of various pipeline objects
+
+ def test_pipeline_state_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.state.refresh(context)
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_state_existing_object(self):
+ # Test the initialize-on-refresh code path with a pre-existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.manager = mock.Mock()
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ # We refresh the change list here purely for the side effect
+ # of creating the pipeline state object with no data (the list
+ # is a subpath of the state object).
+ pipeline.change_list.refresh(context)
+ pipeline.state.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_change_list_new_object(self):
+ # Test the initialize-on-refresh code path with no existing object
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ pipeline.change_list.refresh(context)
+ self.assertTrue(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ pipeline.manager = mock.Mock()
+ pipeline.state.refresh(context)
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
+
+ def test_pipeline_change_list_new_object_without_lock(self):
+ # Test the initialize-on-refresh code path if we don't have
+ # the lock. This should fail.
+ tenant = model.Tenant('tenant')
+ pipeline = model.Pipeline('gate', tenant)
+ layout = model.Layout(tenant)
+ tenant.layout = layout
+ pipeline.state = model.PipelineState.create(
+ pipeline, pipeline.state)
+ pipeline.change_list = model.PipelineChangeList.create(
+ pipeline)
+ context = ZKContext(self.zk_client, None, None, self.log)
+ with testtools.ExpectedException(NoNodeError):
+ pipeline.change_list.refresh(context, allow_init=False)
+ self.assertIsNone(
+ self.zk_client.client.exists(pipeline.change_list.getPath()))
+ pipeline.manager = mock.Mock()
+ pipeline.state.refresh(context)
+ self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
diff --git a/tests/unit/test_zuultrigger.py b/tests/unit/test_zuultrigger.py
index 25bfd4c30..f649f4723 100644
--- a/tests/unit/test_zuultrigger.py
+++ b/tests/unit/test_zuultrigger.py
@@ -35,9 +35,10 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase):
A.addApproval('Code-Review', 2)
B1.addApproval('Code-Review', 2)
B2.addApproval('Code-Review', 2)
- A.addApproval('Verified', 1) # required by gate
- B1.addApproval('Verified', -1) # should go to check
- B2.addApproval('Verified', 1) # should go to gate
+ A.addApproval('Verified', 1, username="for-check") # reqd by check
+ A.addApproval('Verified', 1, username="for-gate") # reqd by gate
+ B1.addApproval('Verified', 1, username="for-check") # go to check
+ B2.addApproval('Verified', 1, username="for-gate") # go to gate
B1.addApproval('Approved', 1)
B2.addApproval('Approved', 1)
B1.setDependsOn(A, 1)
@@ -75,9 +76,9 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase):
self.scheds.first.sched, "addTriggerEvent", addTriggerEvent
):
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
- C.addApproval('Verified', -1)
+ C.addApproval('Verified', 1, username="for-check")
D = self.fake_gerrit.addFakeChange('org/project', 'master', 'D')
- D.addApproval('Verified', -1)
+ D.addApproval('Verified', 1, username="for-check")
D.setDependsOn(C, 1)
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
@@ -108,6 +109,7 @@ class TestZuulTriggerParentChangeEnqueuedGithub(ZuulGithubAppTestCase):
B1.addReview('derp', 'APPROVED')
B2.addReview('derp', 'APPROVED')
A.addLabel('for-gate') # required by gate
+ A.addLabel('for-check') # required by check
B1.addLabel('for-check') # should go to check
B2.addLabel('for-gate') # should go to gate
diff --git a/tools/docker-compose.yaml b/tools/docker-compose.yaml
index 83ab9f930..05b4905e2 100644
--- a/tools/docker-compose.yaml
+++ b/tools/docker-compose.yaml
@@ -3,7 +3,7 @@ version: "3"
services:
mysql:
container_name: zuul-test-mysql
- image: mysql:5.7
+ image: mysql:8.0
environment:
- MYSQL_ROOT_PASSWORD=insecure_worker
ports:
diff --git a/tools/test-setup-docker.sh b/tools/test-setup-docker.sh
index a0fcf9f5a..1601b11a7 100755
--- a/tools/test-setup-docker.sh
+++ b/tools/test-setup-docker.sh
@@ -58,7 +58,8 @@ timeout 30 bash -c "until ${ROOTCMD} ${MYSQL} -e 'show databases'; do sleep 0.5;
echo
echo "Setting up permissions for zuul tests"
-${ROOTCMD} ${MYSQL} -e "GRANT ALL PRIVILEGES ON *.* TO 'openstack_citest'@'%' identified by 'openstack_citest' WITH GRANT OPTION;"
+${ROOTCMD} ${MYSQL} -e "CREATE USER 'openstack_citest'@'%' identified by 'openstack_citest';"
+${ROOTCMD} ${MYSQL} -e "GRANT ALL PRIVILEGES ON *.* TO 'openstack_citest'@'%' WITH GRANT OPTION;"
${ROOTCMD} ${MYSQL} -u openstack_citest -popenstack_citest -e "SET default_storage_engine=MYISAM; DROP DATABASE IF EXISTS openstack_citest; CREATE DATABASE openstack_citest CHARACTER SET utf8;"
echo "Finished"
diff --git a/tools/yarn-build.sh b/tools/yarn-build.sh
new file mode 100755
index 000000000..42eaffc7b
--- /dev/null
+++ b/tools/yarn-build.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+# Copyright 2018 Red Hat, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -ex
+
+# This script checks if yarn is installed in the current path. If it is not,
+# it will use nodeenv to install node, npm and yarn.
+# Finally, it will install pip things.
+if [[ ! $(command -v yarn) ]]
+then
+ pip install nodeenv
+ # Initialize nodeenv and tell it to re-use the currently active virtualenv
+ attempts=0
+ set +e
+ until nodeenv --python-virtualenv -n 16.14.0 ; do
+ ((attempts++))
+ if [[ $attempts > 2 ]]
+ then
+ echo "Failed creating nodeenv"
+ exit 1
+ fi
+ done
+ set -e
+ # Use -g because inside of the virtualenv '-g' means 'install into the'
+ # virtualenv - as opposed to installing into the local node_modules.
+ # Avoid writing a package-lock.json file since we don't use it.
+ # Avoid writing yarn into package.json.
+ npm install -g --no-package-lock --no-save yarn
+fi
+if [[ ! -f zuul/web/static/index.html ]]
+then
+ mkdir -p zuul/web/static
+ ln -sfn ../zuul/web/static web/build
+ pushd web/
+ if [[ -n "${YARN_REGISTRY}" ]]
+ then
+ echo "Using yarn registry: ${YARN_REGISTRY}"
+ sed -i "s#https://registry.yarnpkg.com#${YARN_REGISTRY}#" yarn.lock
+ fi
+
+ # Be forgiving of package retrieval errors
+ attempts=0
+ set +e
+ until yarn install; do
+ ((attempts++))
+ if [[ $attempts > 2 ]]
+ then
+ echo "Failed installing npm packages"
+ exit 1
+ fi
+ done
+ set -e
+
+ yarn build
+ if [[ -n "${YARN_REGISTRY}" ]]
+ then
+ echo "Resetting yarn registry"
+ sed -i "s#${YARN_REGISTRY}#https://registry.yarnpkg.com#" yarn.lock
+ fi
+ popd
+fi
diff --git a/tox.ini b/tox.ini
index 8d7fb8c22..9c7fc5d28 100644
--- a/tox.ini
+++ b/tox.ini
@@ -105,10 +105,3 @@ passenv =
ZUUL_ZK_KEY
commands =
stestr run --test-path ./tests/remote {posargs}
-
-[flake8]
-# These are ignored intentionally in zuul projects;
-# please don't submit patches that solely correct them or enable them.
-ignore = E124,E125,E129,E252,E402,E741,H,W503,W504
-show-source = True
-exclude = .venv,.tox,dist,doc,build,*.egg,node_modules
diff --git a/web/public/openapi.yaml b/web/public/openapi.yaml
index d69111cf8..cb2e10b37 100644
--- a/web/public/openapi.yaml
+++ b/web/public/openapi.yaml
@@ -249,7 +249,7 @@ paths:
- tenant
/api/tenant/{tenant}/key/{project}.pub:
get:
- operationId: get-project-key
+ operationId: get-project-secrets-key
parameters:
- description: The tenant name
in: path
@@ -275,12 +275,44 @@ paths:
'
schema:
- description: The project public key
+ description: The project secrets public key in PKCS8 format
type: string
- description: Returns the project public key
+ description: Returns the project public key that is used to encrypt secrets
'404':
description: Tenant or Project not found
- summary: Get a project public key
+ summary: Get a project public key that is used to encrypt secrets
+ tags:
+ - tenant
+ /api/tenant/{tenant}/project-ssh-key/{project}.pub:
+ get:
+ operationId: get-project-ssh-key
+ parameters:
+ - description: The tenant name
+ in: path
+ name: tenant
+ required: true
+ schema:
+ type: string
+ - description: The project name
+ in: path
+ name: project
+ required: true
+ schema:
+ type: string
+ responses:
+ '200':
+ content:
+ text/plain:
+ example: 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACA
+
+ '
+ schema:
+ description: The project ssh public key in SSH2 format
+ type: string
+ description: Returns the project public key that executor adds to SSH agent
+ '404':
+ description: Tenant or Project not found
+ summary: Get a project public key that is used for SSH in post-merge pipelines
tags:
- tenant
/api/tenant/{tenant}/semaphores:
diff --git a/web/src/App.jsx b/web/src/App.jsx
index 6a5c6e010..9a0caf551 100644
--- a/web/src/App.jsx
+++ b/web/src/App.jsx
@@ -32,6 +32,8 @@ import {
ButtonVariant,
Dropdown,
DropdownItem,
+ DropdownToggle,
+ DropdownSeparator,
KebabToggle,
Modal,
Nav,
@@ -54,6 +56,7 @@ import {
import {
BellIcon,
BookIcon,
+ ChevronDownIcon,
CodeIcon,
ServiceIcon,
UsersIcon,
@@ -67,6 +70,7 @@ import ConfigModal from './containers/config/Config'
import logo from './images/logo.svg'
import { clearNotification } from './actions/notifications'
import { fetchConfigErrorsAction, clearConfigErrorsAction } from './actions/configErrors'
+import { fetchTenantsIfNeeded } from './actions/tenants'
import { routes } from './routes'
import { setTenantAction } from './actions/tenant'
import { configureAuthFromTenant, configureAuthFromInfo } from './actions/auth'
@@ -81,6 +85,7 @@ class App extends React.Component {
configErrorsReady: PropTypes.bool,
info: PropTypes.object,
tenant: PropTypes.object,
+ tenants: PropTypes.object,
timezone: PropTypes.string,
location: PropTypes.object,
history: PropTypes.object,
@@ -93,6 +98,7 @@ class App extends React.Component {
state = {
showErrors: false,
+ isTenantDropdownOpen: false,
}
renderMenu() {
@@ -199,6 +205,7 @@ class App extends React.Component {
} else if (!info.tenant) {
// Multi tenant, look for tenant name in url
whiteLabel = false
+ this.props.dispatch(fetchTenantsIfNeeded())
const match = matchPath(
this.props.location.pathname, { path: '/t/:tenant' })
@@ -368,6 +375,91 @@ class App extends React.Component {
)
}
+ renderTenantDropdown() {
+ const { tenant, tenants } = this.props
+ const { isTenantDropdownOpen } = this.state
+
+ if (tenant.whiteLabel) {
+ return (
+ <PageHeaderToolsItem>
+ <strong>Tenant</strong> {tenant.name}
+ </PageHeaderToolsItem>
+ )
+ } else {
+ const tenantLink = (_tenant) => {
+ const currentPath = this.props.location.pathname
+ let suffix
+ switch (currentPath) {
+ case '/t/' + tenant.name + '/projects':
+ suffix = '/projects'
+ break
+ case '/t/' + tenant.name + '/jobs':
+ suffix = '/jobs'
+ break
+ case '/t/' + tenant.name + '/labels':
+ suffix = '/labels'
+ break
+ case '/t/' + tenant.name + '/nodes':
+ suffix = '/nodes'
+ break
+ case '/t/' + tenant.name + '/autoholds':
+ suffix = '/autoholds'
+ break
+ case '/t/' + tenant.name + '/builds':
+ suffix = '/builds'
+ break
+ case '/t/' + tenant.name + '/buildsets':
+ suffix = '/buildsets'
+ break
+ case '/t/' + tenant.name + '/status':
+ default:
+ // all other paths point to tenant-specific resources that would most likely result in a 404
+ suffix = '/status'
+ break
+ }
+ return <Link to={'/t/' + _tenant.name + suffix}>{_tenant.name}</Link>
+ }
+
+ const options = tenants.tenants.filter(
+ (_tenant) => (_tenant.name !== tenant.name)
+ ).map(
+ (_tenant, idx) => {
+ return (
+ <DropdownItem key={'tenant-dropdown-' + idx} component={tenantLink(_tenant)} />
+ )
+ })
+ options.push(
+ <DropdownSeparator key="tenant-dropdown-separator" />,
+ <DropdownItem
+ key="tenant-dropdown-tenants_page"
+ component={<Link to={tenant.defaultRoute}>Go to tenants page</Link>} />
+ )
+
+ return (tenants.isFetching ?
+ <PageHeaderToolsItem>
+ Loading tenants ...
+ </PageHeaderToolsItem> :
+ <>
+ <PageHeaderToolsItem>
+ <Dropdown
+ isOpen={isTenantDropdownOpen}
+ toggle={
+ <DropdownToggle
+ className={`zuul-menu-dropdown-toggle${isTenantDropdownOpen ? '-expanded' : ''}`}
+ id="tenant-dropdown-toggle-id"
+ onToggle={(isOpen) => { this.setState({ isTenantDropdownOpen: isOpen }) }}
+ toggleIndicator={ChevronDownIcon}
+ >
+ <strong>Tenant</strong> {tenant.name}
+ </DropdownToggle>}
+ onSelect={() => { this.setState({ isTenantDropdownOpen: !isTenantDropdownOpen }) }}
+ dropdownItems={options}
+ />
+ </PageHeaderToolsItem>
+ </>)
+ }
+ }
+
render() {
const { isKebabDropdownOpen } = this.state
const { notifications, configErrors, tenant, info, auth } = this.props
@@ -406,7 +498,7 @@ class App extends React.Component {
key="tenant"
onClick={event => this.handleTenantLink(event)}
>
- <UsersIcon /> Tenant
+ <UsersIcon /> Tenants
</DropdownItem>
)
}
@@ -445,15 +537,7 @@ class App extends React.Component {
</Button>
</a>
</PageHeaderToolsItem>
- {tenant.name && (
- <PageHeaderToolsItem>
- <Link to={tenant.defaultRoute}>
- <Button variant={ButtonVariant.plain}>
- <strong>Tenant</strong> {tenant.name}
- </Button>
- </Link>
- </PageHeaderToolsItem>
- )}
+ {tenant.name && (this.renderTenantDropdown())}
</PageHeaderToolsGroup>
<PageHeaderToolsGroup>
{/* this kebab dropdown replaces the icon buttons and is hidden for
@@ -521,6 +605,7 @@ export default withRouter(connect(
configErrorsReady: state.configErrors.ready,
info: state.info,
tenant: state.tenant,
+ tenants: state.tenants,
timezone: state.timezone,
user: state.user,
auth: state.auth,
diff --git a/web/src/App.test.jsx b/web/src/App.test.jsx
index a1d0234d9..519980c7a 100644
--- a/web/src/App.test.jsx
+++ b/web/src/App.test.jsx
@@ -135,8 +135,9 @@ it('renders single tenant', async () => {
// Link should be white-label scoped
const topMenuLinks = application.root.findAllByType(Link)
expect(topMenuLinks[0].props.to).toEqual('/status')
- expect(topMenuLinks[3].props.to.pathname).toEqual('/status')
- expect(topMenuLinks[4].props.to.pathname).toEqual('/projects')
+ expect(topMenuLinks[1].props.to).toEqual('/openapi')
+ expect(topMenuLinks[2].props.to.pathname).toEqual('/status')
+ expect(topMenuLinks[3].props.to.pathname).toEqual('/projects')
// Location should be /status
expect(location.pathname).toEqual('/status')
// Info should tell white label tenant openstack
diff --git a/web/src/containers/timezone/SelectTz.jsx b/web/src/containers/timezone/SelectTz.jsx
index aaa585336..576645f6c 100644
--- a/web/src/containers/timezone/SelectTz.jsx
+++ b/web/src/containers/timezone/SelectTz.jsx
@@ -12,9 +12,9 @@
import PropTypes from 'prop-types'
import React from 'react'
-import Select from 'react-select'
+import Select, { components } from 'react-select'
import moment from 'moment-timezone'
-import { OutlinedClockIcon } from '@patternfly/react-icons'
+import { OutlinedClockIcon, ChevronDownIcon } from '@patternfly/react-icons'
import { connect } from 'react-redux'
import { setTimezoneAction } from '../../actions/timezone'
@@ -58,7 +58,7 @@ class SelectTz extends React.Component {
}
render() {
- const textColor = '#d1d1d1'
+ const textColor = '#fff'
const containerStyles= {
border: 'solid #2b2b2b',
borderWidth: '0 0 0 1px',
@@ -83,7 +83,11 @@ class SelectTz extends React.Component {
}),
dropdownIndicator:(provided) => ({
...provided,
- padding: '3px'
+ color: '#fff',
+ padding: '3px',
+ ':hover': {
+ color: '#fff'
+ }
}),
indicatorSeparator: () => {},
menu: (provided) => ({
@@ -93,12 +97,22 @@ class SelectTz extends React.Component {
top: '22px',
})
}
+
+ const DropdownIndicator = (props) => {
+ return (
+ <components.DropdownIndicator {...props}>
+ <ChevronDownIcon />
+ </components.DropdownIndicator>
+ )
+ }
+
return (
<div style={containerStyles}>
<OutlinedClockIcon/>
<Select
className="zuul-select-tz"
styles={customStyles}
+ components={{ DropdownIndicator }}
value={this.state.currentValue}
onChange={this.handleChange}
options={this.state.availableTz}
diff --git a/web/src/index.css b/web/src/index.css
index 6fec50911..587804cfa 100644
--- a/web/src/index.css
+++ b/web/src/index.css
@@ -66,6 +66,21 @@ a.refresh {
font-weight: bold;
}
+.zuul-menu-dropdown-toggle:before {
+ content: none !important;
+}
+
+.zuul-menu-dropdown-toggle:hover {
+ border-bottom: none;
+}
+
+.zuul-menu-dropdown-toggle-expanded:before {
+ border-left: none;
+ border-right: none;
+ border-top: none;
+ border-bottom: none;
+}
+
/* Remove ugly outline when a Switch is selected */
.pf-c-switch {
--pf-c-switch__input--focus__toggle--OutlineWidth: 0;
diff --git a/zuul/ansible/logconfig.py b/zuul/ansible/logconfig.py
index 66881336a..2d7c37463 100644
--- a/zuul/ansible/logconfig.py
+++ b/zuul/ansible/logconfig.py
@@ -140,7 +140,8 @@ def _read_config_file(filename: str):
raise ValueError("Unable to read logging config file at %s" % filename)
if os.path.splitext(filename)[1] in ('.yml', '.yaml', '.json'):
- return yaml.safe_load(open(filename, 'r'))
+ with open(filename, 'r') as f:
+ return yaml.safe_load(f)
return filename
diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py
index 6df796bca..9ce342502 100755
--- a/zuul/cmd/__init__.py
+++ b/zuul/cmd/__init__.py
@@ -105,8 +105,8 @@ class ZuulApp(object):
self.commands = {}
def _get_version(self):
- from zuul.version import version_info as zuul_version_info
- return "Zuul version: %s" % zuul_version_info.release_string()
+ from zuul.version import release_string
+ return "Zuul version: %s" % release_string
def createParser(self):
parser = argparse.ArgumentParser(
@@ -226,12 +226,12 @@ class ZuulDaemonApp(ZuulApp, metaclass=abc.ABCMeta):
def setup_logging(self, section, parameter):
super(ZuulDaemonApp, self).setup_logging(section, parameter)
- from zuul.version import version_info as zuul_version_info
+ from zuul.version import release_string
log = logging.getLogger(
"zuul.{section}".format(section=section.title()))
log.debug(
"Configured logging: {version}".format(
- version=zuul_version_info.release_string()))
+ version=release_string))
def main(self):
self.parseArguments()
diff --git a/zuul/configloader.py b/zuul/configloader.py
index 67d4494c4..fe22fe0f8 100644
--- a/zuul/configloader.py
+++ b/zuul/configloader.py
@@ -437,6 +437,30 @@ def ansible_vars_dict(value):
ansible_var_name(key)
+def copy_safe_config(conf):
+ """Return a deep copy of a config dictionary.
+
+ This lets us assign values of a config dictionary to configuration
+ objects, even if those values are nested dictionaries. This way
+ we can safely freeze the configuration object (the process of
+ which mutates dictionaries) without mutating the original
+ configuration.
+
+ Meanwhile, this does retain the original context information as a
+ single object (some behaviors rely on mutating the source context
+ (e.g., pragma)).
+
+ """
+ ret = copy.deepcopy(conf)
+ for key in (
+ '_source_context',
+ '_start_mark',
+ ):
+ if key in conf:
+ ret[key] = conf[key]
+ return ret
+
+
class PragmaParser(object):
pragma = {
'implied-branch-matchers': bool,
@@ -452,6 +476,7 @@ class PragmaParser(object):
self.pcontext = pcontext
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
bm = conf.get('implied-branch-matchers')
@@ -512,6 +537,7 @@ class NodeSetParser(object):
return vs.Schema(nodeset)
def fromYaml(self, conf, anonymous=False):
+ conf = copy_safe_config(conf)
if anonymous:
self.anon_schema(conf)
self.anonymous = True
@@ -599,6 +625,7 @@ class SecretParser(object):
return vs.Schema(secret)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
s = model.Secret(conf['name'], conf['_source_context'])
s.source_context = conf['_source_context']
@@ -723,6 +750,7 @@ class JobParser(object):
def fromYaml(self, conf, project_pipeline=False, name=None,
validate=True):
+ conf = copy_safe_config(conf)
if validate:
self.schema(conf)
@@ -1075,6 +1103,7 @@ class ProjectTemplateParser(object):
return vs.Schema(project)
def fromYaml(self, conf, validate=True, freeze=True):
+ conf = copy_safe_config(conf)
if validate:
self.schema(conf)
source_context = conf['_source_context']
@@ -1165,6 +1194,7 @@ class ProjectParser(object):
return vs.Schema(project)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
project_name = conf.get('name')
@@ -1292,6 +1322,7 @@ class PipelineParser(object):
pipeline = {vs.Required('name'): str,
vs.Required('manager'): manager,
+ 'allow-other-connections': bool,
'precedence': precedence,
'supercedes': to_list(str),
'description': str,
@@ -1327,10 +1358,13 @@ class PipelineParser(object):
return vs.Schema(pipeline)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
pipeline = model.Pipeline(conf['name'], self.pcontext.tenant)
pipeline.source_context = conf['_source_context']
pipeline.start_mark = conf['_start_mark']
+ pipeline.allow_other_connections = conf.get(
+ 'allow-other-connections', True)
pipeline.description = conf.get('description')
pipeline.supercedes = as_list(conf.get('supercedes', []))
@@ -1366,6 +1400,7 @@ class PipelineParser(object):
# Make a copy to manipulate for backwards compat.
conf_copy = conf.copy()
+ seen_connections = set()
for conf_key, action in self.reporter_actions.items():
reporter_set = []
allowed_reporters = self.pcontext.tenant.allowed_reporters
@@ -1379,6 +1414,7 @@ class PipelineParser(object):
reporter_name, pipeline, params)
reporter.setAction(conf_key)
reporter_set.append(reporter)
+ seen_connections.add(reporter_name)
setattr(pipeline, action, reporter_set)
# If merge-conflict actions aren't explicit, use the failure actions
@@ -1423,11 +1459,13 @@ class PipelineParser(object):
source = self.pcontext.connections.getSource(source_name)
manager.ref_filters.extend(
source.getRequireFilters(require_config))
+ seen_connections.add(source_name)
for source_name, reject_config in conf.get('reject', {}).items():
source = self.pcontext.connections.getSource(source_name)
manager.ref_filters.extend(
source.getRejectFilters(reject_config))
+ seen_connections.add(source_name)
for connection_name, trigger_config in conf.get('trigger').items():
if self.pcontext.tenant.allowed_triggers is not None and \
@@ -1439,7 +1477,9 @@ class PipelineParser(object):
manager.event_filters.extend(
trigger.getEventFilters(connection_name,
conf['trigger'][connection_name]))
+ seen_connections.add(connection_name)
+ pipeline.connections = list(seen_connections)
# Pipelines don't get frozen
return pipeline
@@ -1460,6 +1500,7 @@ class SemaphoreParser(object):
return vs.Schema(semaphore)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
semaphore = model.Semaphore(conf['name'], conf.get('max', 1))
semaphore.source_context = conf.get('_source_context')
@@ -1485,6 +1526,7 @@ class QueueParser:
return vs.Schema(queue)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
queue = model.Queue(
conf['name'],
@@ -1514,6 +1556,7 @@ class AuthorizationRuleParser(object):
return vs.Schema(authRule)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
a = model.AuthZRuleTree(conf['name'])
@@ -1547,6 +1590,7 @@ class GlobalSemaphoreParser(object):
return vs.Schema(semaphore)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
semaphore = model.Semaphore(conf['name'], conf.get('max', 1),
global_scope=True)
@@ -1567,6 +1611,7 @@ class ApiRootParser(object):
return vs.Schema(api_root)
def fromYaml(self, conf):
+ conf = copy_safe_config(conf)
self.schema(conf)
api_root = model.ApiRoot(conf.get('authentication-realm'))
api_root.access_rules = conf.get('access-rules', [])
@@ -1688,7 +1733,7 @@ class TenantParser(object):
'disallowed-labels': to_list(str),
'allow-circular-dependencies': bool,
'default-parent': str,
- 'default-ansible-version': vs.Any(str, float),
+ 'default-ansible-version': vs.Any(str, float, int),
'access-rules': to_list(str),
'admin-rules': to_list(str),
'semaphores': to_list(str),
@@ -1761,8 +1806,10 @@ class TenantParser(object):
for branch_future in as_completed(branch_futures.keys()):
tpc = branch_futures[branch_future]
- source_context = model.ProjectContext(
- tpc.project.canonical_name, tpc.project.name)
+ trusted, _ = tenant.getProject(tpc.project.canonical_name)
+ source_context = model.SourceContext(
+ tpc.project.canonical_name, tpc.project.name,
+ tpc.project.connection_name, None, None, trusted)
with project_configuration_exceptions(source_context,
loading_errors):
self._getProjectBranches(tenant, tpc, branch_cache_min_ltimes)
@@ -1771,8 +1818,8 @@ class TenantParser(object):
# Set default ansible version
default_ansible_version = conf.get('default-ansible-version')
if default_ansible_version is not None:
- # The ansible version can be interpreted as float by yaml so make
- # sure it's a string.
+ # The ansible version can be interpreted as float or int
+ # by yaml so make sure it's a string.
default_ansible_version = str(default_ansible_version)
ansible_manager.requestVersion(default_ansible_version)
else:
@@ -1858,6 +1905,9 @@ class TenantParser(object):
tpc.branches = static_branches
tpc.dynamic_branches = always_dynamic_branches
+ tpc.merge_modes = tpc.project.source.getProjectMergeModes(
+ tpc.project, tenant, min_ltime)
+
def _loadProjectKeys(self, connection_name, project):
project.private_secrets_key, project.public_secrets_key = (
self.keystorage.getProjectSecretsKeys(
@@ -2209,6 +2259,12 @@ class TenantParser(object):
job.source_context.branch)
with self.unparsed_config_cache.writeLock(
job.source_context.project_canonical_name):
+ # Prevent files cache ltime from going backward
+ if files_cache.ltime >= job.ltime:
+ self.log.info(
+ "Discarding job %s result since the files cache was "
+ "updated in the meantime", job)
+ continue
# Since the cat job returns all required config files
# for ALL tenants the project is a part of, we can
# clear the whole cache and then populate it with the
@@ -2577,11 +2633,23 @@ class TenantParser(object):
# Set a merge mode if we don't have one for this project.
# This can happen if there are only regex project stanzas
# but no specific project stanzas.
+ (trusted, project) = tenant.getProject(project_name)
project_metadata = layout.getProjectMetadata(project_name)
if project_metadata.merge_mode is None:
- (trusted, project) = tenant.getProject(project_name)
mode = project.source.getProjectDefaultMergeMode(project)
project_metadata.merge_mode = model.MERGER_MAP[mode]
+ tpc = tenant.project_configs[project.canonical_name]
+ if tpc.merge_modes is not None:
+ source_context = model.SourceContext(
+ project.canonical_name, project.name,
+ project.connection_name, None, None, trusted)
+ with project_configuration_exceptions(source_context,
+ layout.loading_errors):
+ if project_metadata.merge_mode not in tpc.merge_modes:
+ mode = model.get_merge_mode_name(
+ project_metadata.merge_mode)
+ raise Exception(f'Merge mode {mode} not supported '
+ f'by project {project_name}')
def _parseLayout(self, tenant, data, loading_errors, layout_uuid=None):
# Don't call this method from dynamic reconfiguration because
diff --git a/zuul/connection/__init__.py b/zuul/connection/__init__.py
index 53be27d38..3d4ceec55 100644
--- a/zuul/connection/__init__.py
+++ b/zuul/connection/__init__.py
@@ -15,10 +15,8 @@
import abc
import logging
-from typing import List, Optional
-
from zuul.lib.logutil import get_annotated_logger
-from zuul.model import Project
+from zuul import model
class ReadOnlyBranchCacheError(RuntimeError):
@@ -143,8 +141,8 @@ class ZKBranchCacheMixin:
read_only = False
@abc.abstractmethod
- def isBranchProtected(self, project_name: str, branch_name: str,
- zuul_event_id) -> Optional[bool]:
+ def isBranchProtected(self, project_name, branch_name,
+ zuul_event_id):
"""Return if the branch is protected or None if the branch is unknown.
:param str project_name:
@@ -157,9 +155,35 @@ class ZKBranchCacheMixin:
pass
@abc.abstractmethod
- def _fetchProjectBranches(self, project: Project,
- exclude_unprotected: bool) -> List[str]:
- pass
+ def _fetchProjectBranches(self, project, exclude_unprotected):
+ """Perform a remote query to determine the project's branches.
+
+ Connection subclasses should implement this method.
+
+ :param model.Project project:
+ The project.
+ :param bool exclude_unprotected:
+ Whether the query should exclude unprotected branches from
+ the response.
+
+ :returns: A list of branch names.
+ """
+
+ def _fetchProjectMergeModes(self, project):
+ """Perform a remote query to determine the project's supported merge
+ modes.
+
+ Connection subclasses should implement this method if they are
+ able to determine which merge modes apply for a project. The
+ default implemantion returns that all merge modes are valid.
+
+ :param model.Project project:
+ The project.
+
+ :returns: A list of merge modes as model IDs.
+
+ """
+ return model.ALL_MERGE_MODES
def clearConnectionCacheOnBranchEvent(self, event):
"""Update event and clear connection cache if needed.
@@ -214,8 +238,7 @@ class ZKBranchCacheMixin:
# Update them if we have them
if protected_branches is not None:
- protected_branches = self._fetchProjectBranches(
- project, True)
+ protected_branches = self._fetchProjectBranches(project, True)
self._branch_cache.setProjectBranches(
project.name, True, protected_branches)
@@ -223,6 +246,10 @@ class ZKBranchCacheMixin:
all_branches = self._fetchProjectBranches(project, False)
self._branch_cache.setProjectBranches(
project.name, False, all_branches)
+
+ merge_modes = self._fetchProjectMergeModes(project)
+ self._branch_cache.setProjectMergeModes(
+ project.name, merge_modes)
self.log.info("Got branches for %s" % project.name)
def getProjectBranches(self, project, tenant, min_ltime=-1):
@@ -282,6 +309,62 @@ class ZKBranchCacheMixin:
return sorted(branches)
+ def getProjectMergeModes(self, project, tenant, min_ltime=-1):
+ """Get the merge modes for the given project.
+
+ :param zuul.model.Project project:
+ The project for which the merge modes are returned.
+ :param zuul.model.Tenant tenant:
+ The related tenant.
+ :param int min_ltime:
+ The minimum ltime to determine if we need to refresh the cache.
+
+ :returns: The list of merge modes by model id.
+ """
+ merge_modes = None
+
+ if self._branch_cache:
+ try:
+ merge_modes = self._branch_cache.getProjectMergeModes(
+ project.name, min_ltime)
+ except LookupError:
+ if self.read_only:
+ # A scheduler hasn't attempted to fetch them yet
+ raise ReadOnlyBranchCacheError(
+ "Will not fetch merge modes as read-only is set")
+ else:
+ merge_modes = None
+
+ if merge_modes is not None:
+ return merge_modes
+ elif self.read_only:
+ # A scheduler has previously attempted a fetch, but got
+ # the None due to an error; we can't retry since we're
+ # read-only.
+ raise RuntimeError(
+ "Will not fetch merge_modes as read-only is set")
+
+ # We need to perform a query
+ try:
+ merge_modes = self._fetchProjectMergeModes(project)
+ except Exception:
+ # We weren't able to get the merge modes. We need to tell
+ # future schedulers to try again but tell zuul-web that we
+ # tried and failed. Set the merge modes to None to indicate
+ # that we have performed a fetch and retrieved no data. Any
+ # time we encounter None in the cache, we will try again.
+ if self._branch_cache:
+ self._branch_cache.setProjectMergeModes(
+ project.name, None)
+ raise
+ self.log.info("Got merge modes for %s" % project.name)
+
+ if self._branch_cache:
+ self._branch_cache.setProjectMergeModes(
+ project.name, merge_modes)
+
+ return merge_modes
+
def checkBranchCache(self, project_name: str, event,
protected: bool = None) -> None:
"""Clear the cache for a project when a branch event is processed
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 0a1f0ee61..276365e1d 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -1643,7 +1643,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def getInfoRefs(self, project: Project) -> Dict[str, str]:
try:
- data = self._uploadPack(project)
+ # Encode the UTF-8 data back to a byte array, as the size of
+ # each record in the pack is in bytes, and so the slicing must
+ # also be done on a byte-basis.
+ data = self._uploadPack(project).encode("utf-8")
except Exception:
self.log.error("Cannot get references from %s" % project)
raise # keeps error information
@@ -1662,7 +1665,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
plen -= 4
if len(data) - i < plen:
raise Exception("Invalid data in info/refs")
- line = data[i:i + plen]
+ # Once the pack data is sliced, we can safely decode it back
+ # into a (UTF-8) string.
+ line = data[i:i + plen].decode("utf-8")
i += plen
if not read_advertisement:
read_advertisement = True
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index a2a55b050..182c83bae 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -48,6 +48,7 @@ from zuul.driver.github.graphql import GraphQLClient
from zuul.lib import tracing
from zuul.web.handler import BaseWebController
from zuul.lib.logutil import get_annotated_logger
+from zuul import model
from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
@@ -64,6 +65,12 @@ GITHUB_BASE_URL = 'https://api.github.com'
PREVIEW_JSON_ACCEPT = 'application/vnd.github.machine-man-preview+json'
PREVIEW_DRAFT_ACCEPT = 'application/vnd.github.shadow-cat-preview+json'
PREVIEW_CHECKS_ACCEPT = 'application/vnd.github.antiope-preview+json'
+ALL_MERGE_MODES = [
+ model.MERGER_MERGE,
+ model.MERGER_MERGE_RESOLVE,
+ model.MERGER_SQUASH_MERGE,
+ model.MERGER_REBASE,
+]
# NOTE (felix): Using log levels for file comments / annotations is IMHO more
# convenient than the values Github expects. Having in mind that those comments
@@ -956,8 +963,9 @@ class GithubClientManager:
if 'cache-control' in response.headers:
del response.headers['cache-control']
+ self._cache = DictCache()
self.cache_adapter = cachecontrol.CacheControlAdapter(
- DictCache(),
+ self._cache,
cache_etags=True,
heuristic=NoAgeHeuristic())
@@ -1453,6 +1461,7 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
log.debug("Change %s is currently being updated, "
"waiting for it to finish", change)
with lock:
+ change = self._change_cache.get(change_key)
log.debug('Finished updating change %s', change)
return change
@@ -1776,6 +1785,42 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return branches
+ def _fetchProjectMergeModes(self, project):
+ github = self.getGithubClient(project.name)
+ url = github.session.build_url('repos', project.name)
+ headers = {'Accept': 'application/vnd.github.loki-preview+json'}
+ merge_modes = []
+
+ # GitHub API bug: if the allow_* attributes below are changed,
+ # the ETag is not updated, meaning that once we cache the repo
+ # URL, we'll never update it. To avoid this, clear this URL
+ # from the cache before performing the request.
+ self._github_client_manager._cache.data.pop(url, None)
+
+ resp = github.session.get(url, headers=headers)
+
+ if resp.status_code == 403:
+ self.log.error(str(resp))
+ rate_limit = github.rate_limit()
+ if rate_limit['resources']['core']['remaining'] == 0:
+ self.log.warning(
+ "Rate limit exceeded, using full merge method list")
+ return ALL_MERGE_MODES
+ elif resp.status_code == 404:
+ raise Exception("Got status code 404 when fetching "
+ "project %s" % project.name)
+
+ resp = resp.json()
+ if resp.get('allow_merge_commit'):
+ merge_modes.append(model.MERGER_MERGE)
+ merge_modes.append(model.MERGER_MERGE_RESOLVE)
+ if resp.get('allow_squash_merge'):
+ merge_modes.append(model.MERGER_SQUASH_MERGE)
+ if resp.get('allow_rebase_merge'):
+ merge_modes.append(model.MERGER_REBASE)
+
+ return merge_modes
+
def isBranchProtected(self, project_name: str, branch_name: str,
zuul_event_id=None) -> Optional[bool]:
github = self.getGithubClient(
diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py
index bdc373f79..0a94a1730 100644
--- a/zuul/driver/github/githubsource.py
+++ b/zuul/driver/github/githubsource.py
@@ -135,6 +135,9 @@ class GithubSource(BaseSource):
def getProjectBranches(self, project, tenant, min_ltime=-1):
return self.connection.getProjectBranches(project, tenant, min_ltime)
+ def getProjectMergeModes(self, project, tenant, min_ltime=-1):
+ return self.connection.getProjectMergeModes(project, tenant, min_ltime)
+
def getProjectBranchCacheLtime(self):
return self.connection._branch_cache.ltime
diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py
index 0d56744e4..da423f085 100644
--- a/zuul/driver/gitlab/gitlabconnection.py
+++ b/zuul/driver/gitlab/gitlabconnection.py
@@ -633,17 +633,12 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return change
project = self.source.getProject(change_key.project_name)
if not change:
- if not event:
- self.log.error("Change %s not found in cache and no event",
- change_key)
- if event:
- url = event.change_url
change = MergeRequest(project.name)
change.project = project
change.number = number
# patch_number is the tips commit SHA of the MR
change.patchset = change_key.revision
- change.url = url or self.getMRUrl(project.name, number)
+ change.url = self.getMRUrl(project.name, number)
change.uris = [change.url.split('://', 1)[-1]] # remove scheme
log.debug("Getting change mr#%s from project %s" % (
diff --git a/zuul/driver/mqtt/mqttreporter.py b/zuul/driver/mqtt/mqttreporter.py
index 5c95a19ea..8c3905e35 100644
--- a/zuul/driver/mqtt/mqttreporter.py
+++ b/zuul/driver/mqtt/mqttreporter.py
@@ -79,7 +79,8 @@ class MQTTReporter(BaseReporter):
'result': result,
'dependencies': [j.name for j in job.dependencies],
'artifacts': get_artifacts_from_result_data(
- build.result_data, logger=log)
+ build.result_data, logger=log),
+ 'events': [e.toDict() for e in build.events],
})
if include_returned_data:
rdata = build.result_data.copy()
diff --git a/zuul/driver/sql/alembic/env.py b/zuul/driver/sql/alembic/env.py
index da7b3207f..17b67805e 100644
--- a/zuul/driver/sql/alembic/env.py
+++ b/zuul/driver/sql/alembic/env.py
@@ -53,7 +53,8 @@ def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
- poolclass=pool.NullPool)
+ poolclass=pool.NullPool,
+ future=True)
# we can get the table prefix via the tag object
tag = context.get_tag_argument()
diff --git a/zuul/driver/sql/alembic/versions/0ed5def089e2_add_build_event_table.py b/zuul/driver/sql/alembic/versions/0ed5def089e2_add_build_event_table.py
new file mode 100644
index 000000000..94892038c
--- /dev/null
+++ b/zuul/driver/sql/alembic/versions/0ed5def089e2_add_build_event_table.py
@@ -0,0 +1,49 @@
+# Copyright 2022 BMW Group
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""add_build_event_table
+
+Revision ID: 0ed5def089e2
+Revises: c7467b642498
+Create Date: 2022-12-12 12:08:20.882790
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '0ed5def089e2'
+down_revision = 'c7467b642498'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+
+BUILD_EVENT_TABLE = "zuul_build_event"
+BUILD_TABLE = "zuul_build"
+
+
+def upgrade(table_prefix=''):
+ op.create_table(
+ table_prefix + BUILD_EVENT_TABLE,
+ sa.Column("id", sa.Integer, primary_key=True),
+ sa.Column("build_id", sa.Integer,
+ sa.ForeignKey(table_prefix + BUILD_TABLE + ".id")),
+ sa.Column("event_time", sa.DateTime),
+ sa.Column("event_type", sa.String(255)),
+ sa.Column("description", sa.TEXT()),
+ )
+
+
+def downgrade():
+ raise Exception("Downgrades not supported")
diff --git a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
index 67581a6f9..1735d35f3 100644
--- a/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
+++ b/zuul/driver/sql/alembic/versions/60c119eb1e3f_use_build_set_results.py
@@ -24,13 +24,16 @@ def upgrade(table_prefix=''):
connection = op.get_bind()
connection.execute(
- """
- UPDATE {buildset_table}
- SET result=(
- SELECT CASE score
- WHEN 1 THEN 'SUCCESS'
- ELSE 'FAILURE' END)
- """.format(buildset_table=table_prefix + BUILDSET_TABLE))
+ sa.text(
+ """
+ UPDATE {buildset_table}
+ SET result=(
+ SELECT CASE score
+ WHEN 1 THEN 'SUCCESS'
+ ELSE 'FAILURE' END)
+ """.format(buildset_table=table_prefix + BUILDSET_TABLE)
+ )
+ )
op.drop_column(table_prefix + BUILDSET_TABLE, 'score')
diff --git a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
index abfba7247..99d12d750 100644
--- a/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
+++ b/zuul/driver/sql/alembic/versions/c7467b642498_buildset_updated.py
@@ -34,13 +34,16 @@ def upgrade(table_prefix=''):
connection = op.get_bind()
connection.execute(
- """
- UPDATE {buildset_table}
- SET updated=greatest(
- coalesce(first_build_start_time, '1970-01-01 00:00:00'),
- coalesce(last_build_end_time, '1970-01-01 00:00:00'),
- coalesce(event_timestamp, '1970-01-01 00:00:00'))
- """.format(buildset_table=table_prefix + "zuul_buildset"))
+ sa.text(
+ """
+ UPDATE {buildset_table}
+ SET updated=greatest(
+ coalesce(first_build_start_time, '1970-01-01 00:00:00'),
+ coalesce(last_build_end_time, '1970-01-01 00:00:00'),
+ coalesce(event_timestamp, '1970-01-01 00:00:00'))
+ """.format(buildset_table=table_prefix + "zuul_buildset")
+ )
+ )
def downgrade():
diff --git a/zuul/driver/sql/sqlconnection.py b/zuul/driver/sql/sqlconnection.py
index 8ab528c39..2d5c39ec3 100644
--- a/zuul/driver/sql/sqlconnection.py
+++ b/zuul/driver/sql/sqlconnection.py
@@ -29,6 +29,7 @@ from zuul.zk.locks import CONNECTION_LOCK_ROOT, locked, SessionAwareLock
BUILDSET_TABLE = 'zuul_buildset'
BUILD_TABLE = 'zuul_build'
+BUILD_EVENTS_TABLE = 'zuul_build_event'
ARTIFACT_TABLE = 'zuul_artifact'
PROVIDES_TABLE = 'zuul_provides'
@@ -307,27 +308,31 @@ class SQLConnection(BaseConnection):
def _migrate(self, revision='head'):
"""Perform the alembic migrations for this connection"""
+ # Note that this method needs to be called with an external lock held.
+ # The reason for this is we retrieve the alembic version and run the
+ # alembic migrations in different database transactions which opens
+ # us to races without an external lock.
with self.engine.begin() as conn:
context = alembic.migration.MigrationContext.configure(conn)
current_rev = context.get_current_revision()
- self.log.debug('Current migration revision: %s' % current_rev)
-
- config = alembic.config.Config()
- config.set_main_option("script_location",
- "zuul:driver/sql/alembic")
- config.set_main_option("sqlalchemy.url",
- self.connection_config.get('dburi').
- replace('%', '%%'))
-
- # Alembic lets us add arbitrary data in the tag argument. We can
- # leverage that to tell the upgrade scripts about the table prefix.
- tag = {'table_prefix': self.table_prefix}
-
- if current_rev is None and not self.force_migrations:
- self.metadata.create_all(self.engine)
- alembic.command.stamp(config, revision, tag=tag)
- else:
- alembic.command.upgrade(config, revision, tag=tag)
+ self.log.debug('Current migration revision: %s' % current_rev)
+
+ config = alembic.config.Config()
+ config.set_main_option("script_location",
+ "zuul:driver/sql/alembic")
+ config.set_main_option("sqlalchemy.url",
+ self.connection_config.get('dburi').
+ replace('%', '%%'))
+
+ # Alembic lets us add arbitrary data in the tag argument. We can
+ # leverage that to tell the upgrade scripts about the table prefix.
+ tag = {'table_prefix': self.table_prefix}
+
+ if current_rev is None and not self.force_migrations:
+ self.metadata.create_all(self.engine)
+ alembic.command.stamp(config, revision, tag=tag)
+ else:
+ alembic.command.upgrade(config, revision, tag=tag)
def onLoad(self, zk_client, component_registry=None):
safe_connection = quote_plus(self.connection_name)
@@ -446,6 +451,15 @@ class SQLConnection(BaseConnection):
session.flush()
return p
+ def createBuildEvent(self, *args, **kw):
+ session = orm.session.Session.object_session(self)
+ e = BuildEventModel(*args, **kw)
+ e.build_id = self.id
+ self.build_events.append(e)
+ session.add(e)
+ session.flush()
+ return e
+
class ArtifactModel(Base):
__tablename__ = self.table_prefix + ARTIFACT_TABLE
id = sa.Column(sa.Integer, primary_key=True)
@@ -464,6 +478,19 @@ class SQLConnection(BaseConnection):
name = sa.Column(sa.String(255))
build = orm.relationship(BuildModel, backref="provides")
+ class BuildEventModel(Base):
+ __tablename__ = self.table_prefix + BUILD_EVENTS_TABLE
+ id = sa.Column(sa.Integer, primary_key=True)
+ build_id = sa.Column(sa.Integer, sa.ForeignKey(
+ self.table_prefix + BUILD_TABLE + ".id"))
+ event_time = sa.Column(sa.DateTime)
+ event_type = sa.Column(sa.String(255))
+ description = sa.Column(sa.TEXT())
+ build = orm.relationship(BuildModel, backref="build_events")
+
+ self.buildEventModel = BuildEventModel
+ self.zuul_build_event_table = self.buildEventModel.__table__
+
self.providesModel = ProvidesModel
self.zuul_provides_table = self.providesModel.__table__
diff --git a/zuul/driver/sql/sqlreporter.py b/zuul/driver/sql/sqlreporter.py
index d16f50fcb..4d4970f35 100644
--- a/zuul/driver/sql/sqlreporter.py
+++ b/zuul/driver/sql/sqlreporter.py
@@ -163,6 +163,17 @@ class SQLReporter(BaseReporter):
artifact['metadata'] = json.dumps(
artifact['metadata'])
db_build.createArtifact(**artifact)
+
+ for event in build.events:
+ # Reformat the event_time so it's compatible to SQL.
+ # Don't update the event object in place, but only
+ # the generated dict representation to not alter the
+ # datastructure for other reporters.
+ ev = event.toDict()
+ ev["event_time"] = datetime.datetime.fromtimestamp(
+ event.event_time, tz=datetime.timezone.utc)
+ db_build.createBuildEvent(**ev)
+
return db_build
except sqlalchemy.exc.DBAPIError:
if retry_count < self.retry_count - 1:
diff --git a/zuul/driver/timer/__init__.py b/zuul/driver/timer/__init__.py
index 37fdcf580..dcc013788 100644
--- a/zuul/driver/timer/__init__.py
+++ b/zuul/driver/timer/__init__.py
@@ -226,6 +226,7 @@ class TimerDriver(Driver, TriggerInterface):
event.branch = branch
event.zuul_event_id = str(uuid4().hex)
event.timestamp = time.time()
+ event.arrived_at_scheduler_timestamp = event.timestamp
# Refresh the branch in order to update the item in the
# change cache.
change_key = project.source.getChangeKey(event)
@@ -234,7 +235,9 @@ class TimerDriver(Driver, TriggerInterface):
event=event)
log = get_annotated_logger(self.log, event)
log.debug("Adding event")
- self.sched.addTriggerEvent(self.name, event)
+ self.sched.pipeline_trigger_events[tenant.name][
+ pipeline_name
+ ].put(self.name, event)
except Exception:
self.log.exception("Error dispatching timer event for "
"tenant %s project %s branch %s",
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 9aa38cbca..41e5e6916 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -118,6 +118,11 @@ class ExecutorClient(object):
# Store the NodeRequest ID in the job arguments, so we can look it up
# on the executor side to lock the nodes.
req_id = build.build_set.getJobNodeRequestID(job.name)
+ if isinstance(req_id, dict):
+ # This should never happen
+ raise Exception(
+ "Attempt to start build with deduplicated node request ID "
+ f"{req_id}")
if req_id:
params["noderequest_id"] = req_id
diff --git a/zuul/executor/common.py b/zuul/executor/common.py
index ff4522d22..b8393903e 100644
--- a/zuul/executor/common.py
+++ b/zuul/executor/common.py
@@ -65,6 +65,7 @@ def construct_build_params(uuid, connections, job, item, pipeline,
zuul_params['patchset'] = str(item.change.patchset)
if hasattr(item.change, 'message'):
zuul_params['message'] = strings.b64encode(item.change.message)
+ zuul_params['change_message'] = item.change.message
if (hasattr(item.change, 'oldrev') and item.change.oldrev
and item.change.oldrev != '0' * 40):
zuul_params['oldrev'] = item.change.oldrev
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index c3737b5cc..a49bbbbbf 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -14,6 +14,7 @@
# under the License.
import collections
+import copy
import datetime
import json
import logging
@@ -1049,7 +1050,7 @@ class AnsibleJob(object):
# The same, but frozen
self.frozen_hostvars = {}
# The zuul.* vars
- self.zuul_vars = {}
+ self.debug_zuul_vars = {}
self.waiting_for_semaphores = False
def run(self):
@@ -1888,7 +1889,8 @@ class AnsibleJob(object):
logfile=json_output))
return
try:
- output = json.load(open(json_output, 'r'))
+ with open(json_output, 'r') as f:
+ output = json.load(f)
last_playbook = output[-1]
# Transform json to yaml - because it's easier to read and given
# the size of the data it'll be extra-hard to read this as an
@@ -2332,7 +2334,8 @@ class AnsibleJob(object):
def prepareKubeConfig(self, jobdir, data):
kube_cfg_path = jobdir.kubeconfig
if os.path.exists(kube_cfg_path):
- kube_cfg = yaml.safe_load(open(kube_cfg_path))
+ with open(kube_cfg_path) as f:
+ kube_cfg = yaml.safe_load(f)
else:
kube_cfg = {
'apiVersion': 'v1',
@@ -2495,10 +2498,18 @@ class AnsibleJob(object):
if ri.role_path is not None],
))
+ # The zuul vars in the debug inventory.yaml file should not
+ # have any !unsafe tags, so save those before we update the
+ # execution version of those.
+ self.debug_zuul_vars = copy.deepcopy(zuul_vars)
+ if 'change_message' in zuul_vars:
+ zuul_vars['change_message'] = yaml.mark_strings_unsafe(
+ zuul_vars['change_message'])
+
with open(self.jobdir.zuul_vars, 'w') as zuul_vars_yaml:
zuul_vars_yaml.write(
- yaml.safe_dump({'zuul': zuul_vars}, default_flow_style=False))
- self.zuul_vars = zuul_vars
+ yaml.ansible_unsafe_dump({'zuul': zuul_vars},
+ default_flow_style=False))
# Squash all and extra vars into localhost (it's not
# explicitly listed).
@@ -2552,7 +2563,7 @@ class AnsibleJob(object):
inventory = make_inventory_dict(
self.host_list, self.nodeset, self.original_hostvars)
- inventory['all']['vars']['zuul'] = self.zuul_vars
+ inventory['all']['vars']['zuul'] = self.debug_zuul_vars
with open(self.jobdir.inventory, 'w') as inventory_yaml:
inventory_yaml.write(
yaml.ansible_unsafe_dump(
@@ -3481,6 +3492,8 @@ class ExecutorServer(BaseMergeServer):
self.statsd.gauge(base_key + '.load_average', 0)
self.statsd.gauge(base_key + '.pct_used_ram', 0)
self.statsd.gauge(base_key + '.running_builds', 0)
+ self.statsd.close()
+ self.statsd = None
# Use the BaseMergeServer's stop method to disconnect from
# ZooKeeper. We do this as one of the last steps to ensure
diff --git a/zuul/lib/encryption.py b/zuul/lib/encryption.py
index 79e92e366..fd637b278 100644
--- a/zuul/lib/encryption.py
+++ b/zuul/lib/encryption.py
@@ -20,18 +20,6 @@ from cryptography.hazmat.primitives import hashes
from functools import lru_cache
-# OpenSSL 3.0.0 performs key validation in a very slow manner. Since
-# our keys are internally generated and securely stored, we can skip
-# validation. See https://github.com/pyca/cryptography/issues/7236
-backend = default_backend()
-if hasattr(backend, '_rsa_skip_check_key'):
- backend._rsa_skip_check_key = True
-else:
- import logging
- logging.warning("Cryptography backend lacks _rsa_skip_check_key flag, "
- "key loading may be slow")
-
-
# https://cryptography.io/en/latest/hazmat/primitives/asymmetric/rsa/#generation
def generate_rsa_keypair():
"""Generate an RSA keypair.
@@ -42,7 +30,7 @@ def generate_rsa_keypair():
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
- backend=backend,
+ backend=default_backend(),
)
public_key = private_key.public_key()
return (private_key, public_key)
@@ -110,7 +98,8 @@ def deserialize_rsa_keypair(data, password=None):
private_key = serialization.load_pem_private_key(
data,
password=password,
- backend=backend,
+ backend=default_backend(),
+ unsafe_skip_rsa_key_validation=True,
)
public_key = private_key.public_key()
return (private_key, public_key)
diff --git a/zuul/lib/fingergw.py b/zuul/lib/fingergw.py
index ad945c1b7..184c9762d 100644
--- a/zuul/lib/fingergw.py
+++ b/zuul/lib/fingergw.py
@@ -47,6 +47,18 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
self.fingergw = kwargs.pop('fingergw')
super(RequestHandler, self).__init__(*args, **kwargs)
+ def _readSocket(self, sock, build_uuid):
+ # timeout only on the connection, let recv() wait forever
+ sock.settimeout(None)
+ msg = "%s\n" % build_uuid # Must have a trailing newline!
+ sock.sendall(msg.encode('utf-8'))
+ while True:
+ data = sock.recv(1024)
+ if data:
+ self.request.sendall(data)
+ else:
+ break
+
def _fingerClient(self, server, port, build_uuid, use_ssl):
'''
Open a finger connection and return all streaming results.
@@ -59,24 +71,16 @@ class RequestHandler(streamer_utils.BaseFingerRequestHandler):
'''
with socket.create_connection((server, port), timeout=10) as s:
if use_ssl:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = self.fingergw.tls_verify_hostnames
context.load_cert_chain(self.fingergw.tls_cert,
self.fingergw.tls_key)
context.load_verify_locations(self.fingergw.tls_ca)
- s = context.wrap_socket(s, server_hostname=server)
-
- # timeout only on the connection, let recv() wait forever
- s.settimeout(None)
- msg = "%s\n" % build_uuid # Must have a trailing newline!
- s.sendall(msg.encode('utf-8'))
- while True:
- data = s.recv(1024)
- if data:
- self.request.sendall(data)
- else:
- break
+ with context.wrap_socket(s, server_hostname=server) as s:
+ self._readSocket(s, build_uuid)
+ else:
+ self._readSocket(s, build_uuid)
def handle(self):
'''
diff --git a/zuul/lib/repl.py b/zuul/lib/repl.py
index ecefae9ea..63a800406 100644
--- a/zuul/lib/repl.py
+++ b/zuul/lib/repl.py
@@ -26,14 +26,14 @@ class ThreadLocalProxy(object):
self.default = default
def __getattr__(self, name):
- obj = self.files.get(threading.currentThread(), self.default)
+ obj = self.files.get(threading.current_thread(), self.default)
return getattr(obj, name)
def register(self, obj):
- self.files[threading.currentThread()] = obj
+ self.files[threading.current_thread()] = obj
def unregister(self):
- self.files.pop(threading.currentThread())
+ self.files.pop(threading.current_thread())
class REPLHandler(socketserver.StreamRequestHandler):
diff --git a/zuul/lib/streamer_utils.py b/zuul/lib/streamer_utils.py
index 04de4b8cb..a50fb4142 100644
--- a/zuul/lib/streamer_utils.py
+++ b/zuul/lib/streamer_utils.py
@@ -168,7 +168,7 @@ class CustomThreadingTCPServer(socketserver.ThreadingTCPServer):
if all([self.server_ssl_key, self.server_ssl_cert,
self.server_ssl_ca]):
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(self.server_ssl_cert, self.server_ssl_key)
context.load_verify_locations(self.server_ssl_ca)
context.verify_mode = ssl.CERT_REQUIRED
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 65fa58d5b..e87e553d3 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -28,6 +28,8 @@ from zuul.model import (
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
+from zuul.zk.exceptions import LockException
+from zuul.zk.locks import pipeline_lock
from opentelemetry import trace
@@ -95,21 +97,46 @@ class PipelineManager(metaclass=ABCMeta):
def _postConfig(self):
layout = self.pipeline.tenant.layout
self.buildChangeQueues(layout)
- with self.sched.createZKContext(None, self.log) as ctx,\
- self.currentContext(ctx):
- # Make sure we have state and change list objects, and
- # ensure that they exist in ZK. We don't hold the
- # pipeline lock, but if they don't exist, that means they
- # are new, so no one else will either, so the write on
- # create is okay. If they do exist and we have an old
- # object, we'll just reuse it. If it does exist and we
- # don't have an old object, we'll get a new empty one.
- # Regardless, these will not automatically refresh now, so
- # they will be out of date until they are refreshed later.
- self.pipeline.state = PipelineState.create(
- self.pipeline, layout.uuid, self.pipeline.state)
- self.pipeline.change_list = PipelineChangeList.create(
- self.pipeline)
+ # Make sure we have state and change list objects. We
+ # don't actually ensure they exist in ZK here; these are
+ # just local objects until they are serialized the first
+ # time. Since we don't hold the pipeline lock, we can't
+ # reliably perform any read or write operations; we just
+ # need to ensure we have in-memory objects to work with
+ # and they will be initialized or loaded on the next
+ # refresh.
+
+ # These will be out of date until they are refreshed later.
+ self.pipeline.state = PipelineState.create(
+ self.pipeline, self.pipeline.state)
+ self.pipeline.change_list = PipelineChangeList.create(
+ self.pipeline)
+
+ # Now, try to acquire a non-blocking pipeline lock and refresh
+ # them for the side effect of initializing them if necessary.
+ # In the case of a new pipeline, no one else should have a
+ # lock anyway, and this helps us avoid emitting a whole bunch
+ # of errors elsewhere on startup when these objects don't
+ # exist. If the pipeline already exists and we can't acquire
+ # the lock, that's fine, we're much less likely to encounter
+ # read errors elsewhere in that case anyway.
+ try:
+ with pipeline_lock(
+ self.sched.zk_client, self.pipeline.tenant.name,
+ self.pipeline.name, blocking=False) as lock,\
+ self.sched.createZKContext(lock, self.log) as ctx,\
+ self.currentContext(ctx):
+ if not self.pipeline.state.exists(ctx):
+ # We only do this if the pipeline doesn't exist in
+ # ZK because in that case, this process should be
+ # fast since it's empty. If it does exist,
+ # refreshing it may be slow and since other actors
+ # won't encounter errors due to its absence, we
+ # would rather defer the work to later.
+ self.pipeline.state.refresh(ctx)
+ self.pipeline.change_list.refresh(ctx)
+ except LockException:
+ pass
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")
@@ -216,7 +243,7 @@ class PipelineManager(metaclass=ABCMeta):
and self.useDependenciesByTopic(change.project))
if (update_commit_dependencies
or update_topic_dependencies):
- self.updateCommitDependencies(change, None, event=None)
+ self.updateCommitDependencies(change, event=None)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -258,11 +285,18 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
- def isAnyVersionOfChangeInPipeline(self, change):
- # Checks any items in the pipeline
+ def isChangeRelevantToPipeline(self, change):
+ # Checks if any version of the change or its deps matches any
+ # item in the pipeline.
for change_key in self.pipeline.change_list.getChangeKeys():
if change.cache_stat.key.isSameChange(change_key):
return True
+ if isinstance(change, model.Change):
+ for dep_change_ref in change.getNeedsChanges(
+ self.useDependenciesByTopic(change.project)):
+ dep_change_key = ChangeKey.fromReference(dep_change_ref)
+ if change.cache_stat.key.isSameChange(dep_change_key):
+ return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
@@ -276,19 +310,19 @@ class PipelineManager(metaclass=ABCMeta):
if not isinstance(change, model.Change):
return
- change_in_pipeline = False
+ to_refresh = set()
for item in self.pipeline.getAllItems():
if not isinstance(item.change, model.Change):
continue
+ if item.change.equals(change):
+ to_refresh.add(item.change)
for dep_change_ref in item.change.commit_needs_changes:
- if item.change.equals(change):
- change_in_pipeline = True
dep_change_key = ChangeKey.fromReference(dep_change_ref)
if dep_change_key.isSameChange(change.cache_stat.key):
- self.updateCommitDependencies(item.change, None, event)
+ to_refresh.add(item.change)
- if change_in_pipeline:
- self.updateCommitDependencies(change, None, event)
+ for existing_change in to_refresh:
+ self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
@@ -489,7 +523,8 @@ class PipelineManager(metaclass=ABCMeta):
def addChange(self, change, event, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True,
- change_queue=None, history=None, dependency_graph=None):
+ change_queue=None, history=None, dependency_graph=None,
+ skip_presence_check=False):
log = get_annotated_logger(self.log, event)
log.debug("Considering adding change %s" % change)
@@ -504,10 +539,18 @@ class PipelineManager(metaclass=ABCMeta):
# If we are adding a live change, check if it's a live item
# anywhere in the pipeline. Otherwise, we will perform the
# duplicate check below on the specific change_queue.
- if live and self.isChangeAlreadyInPipeline(change):
+ if (live and
+ self.isChangeAlreadyInPipeline(change) and
+ not skip_presence_check):
log.debug("Change %s is already in pipeline, ignoring" % change)
return True
+ if ((not self.pipeline.allow_other_connections) and
+ (change.project.connection_name not in self.pipeline.connections)):
+ log.debug("Change %s is not from a connection known to %s ",
+ change, self.pipeline)
+ return False
+
if not ignore_requirements:
for f in self.ref_filters:
if f.connection_name != change.project.connection_name:
@@ -531,7 +574,7 @@ class PipelineManager(metaclass=ABCMeta):
# to date and this is a noop; otherwise, we need to refresh
# them anyway.
if isinstance(change, model.Change):
- self.updateCommitDependencies(change, None, event)
+ self.updateCommitDependencies(change, event)
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
@@ -557,8 +600,10 @@ class PipelineManager(metaclass=ABCMeta):
log.debug("History after enqueuing changes ahead: %s", history)
if self.isChangeAlreadyInQueue(change, change_queue):
- log.debug("Change %s is already in queue, ignoring" % change)
- return True
+ if not skip_presence_check:
+ log.debug("Change %s is already in queue, ignoring",
+ change)
+ return True
cycle = []
if isinstance(change, model.Change):
@@ -592,7 +637,7 @@ class PipelineManager(metaclass=ABCMeta):
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
- self.reportStats(item, added=True)
+ self.reportStats(item, trigger_event=event)
item.quiet = quiet
if item.live:
@@ -824,40 +869,56 @@ class PipelineManager(metaclass=ABCMeta):
self.pipeline.tenant.name][other_pipeline.name].put(
event, needs_result=False)
- def updateCommitDependencies(self, change, change_queue, event):
+ def updateCommitDependencies(self, change, event):
log = get_annotated_logger(self.log, event)
- # Search for Depends-On headers and find appropriate changes
- log.debug(" Updating commit dependencies for %s", change)
- dependencies = []
- seen = set()
- for match in find_dependency_headers(change.message):
- log.debug(" Found Depends-On header: %s", match)
- if match in seen:
- continue
- seen.add(match)
- try:
- url = urllib.parse.urlparse(match)
- except ValueError:
- continue
- source = self.sched.connections.getSourceByHostname(
- url.hostname)
- if not source:
- continue
- log.debug(" Found source: %s", source)
- dep = source.getChangeByURLWithRetry(match, event)
- if dep and (not dep.is_merged) and dep not in dependencies:
- log.debug(" Adding dependency: %s", dep)
- dependencies.append(dep)
- new_commit_needs_changes = [d.cache_key for d in dependencies]
+ must_update_commit_deps = (
+ not hasattr(event, "zuul_event_ltime")
+ or change.commit_needs_changes is None
+ or change.cache_stat.mzxid <= event.zuul_event_ltime
+ )
- update_attrs = dict(commit_needs_changes=new_commit_needs_changes)
+ must_update_topic_deps = (
+ self.useDependenciesByTopic(change.project) and (
+ not hasattr(event, "zuul_event_ltime")
+ or change.topic_needs_changes is None
+ or change.cache_stat.mzxid <= event.zuul_event_ltime
+ )
+ )
+
+ update_attrs = {}
+ if must_update_commit_deps:
+ # Search for Depends-On headers and find appropriate changes
+ log.debug(" Updating commit dependencies for %s", change)
+ dependencies = []
+ seen = set()
+ for match in find_dependency_headers(change.message):
+ log.debug(" Found Depends-On header: %s", match)
+ if match in seen:
+ continue
+ seen.add(match)
+ try:
+ url = urllib.parse.urlparse(match)
+ except ValueError:
+ continue
+ source = self.sched.connections.getSourceByHostname(
+ url.hostname)
+ if not source:
+ continue
+ log.debug(" Found source: %s", source)
+ dep = source.getChangeByURLWithRetry(match, event)
+ if dep and (not dep.is_merged) and dep not in dependencies:
+ log.debug(" Adding dependency: %s", dep)
+ dependencies.append(dep)
+ new_commit_needs_changes = [d.cache_key for d in dependencies]
+
+ update_attrs = dict(commit_needs_changes=new_commit_needs_changes)
# Ask the source for any tenant-specific changes (this allows
# drivers to implement their own way of collecting deps):
source = self.sched.connections.getSource(
change.project.connection_name)
- if self.useDependenciesByTopic(change.project):
+ if must_update_topic_deps:
log.debug(" Updating topic dependencies for %s", change)
new_topic_needs_changes = []
for dep in source.getChangesByTopic(change.topic):
@@ -866,7 +927,8 @@ class PipelineManager(metaclass=ABCMeta):
new_topic_needs_changes.append(dep.cache_key)
update_attrs['topic_needs_changes'] = new_topic_needs_changes
- source.setChangeAttributes(change, **update_attrs)
+ if update_attrs:
+ source.setChangeAttributes(change, **update_attrs)
def provisionNodes(self, item):
log = item.annotateLogger(self.log)
@@ -1286,8 +1348,9 @@ class PipelineManager(metaclass=ABCMeta):
build_set = item.current_build_set
# if base_sha is not available, fallback to branch
- to_sha = getattr(item.change, "base_sha",
- getattr(item.change, "branch", None))
+ to_sha = getattr(item.change, "base_sha", None)
+ if to_sha is None:
+ to_sha = getattr(item.change, "branch", None)
self.sched.merger.getFilesChanges(
item.change.project.connection_name, item.change.project.name,
@@ -1503,6 +1566,7 @@ class PipelineManager(metaclass=ABCMeta):
log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
self.cancelJobs(item)
+ quiet_dequeue = False
if item.isBundleFailing():
item.setDequeuedBundleFailing('Bundle is failing')
elif not meets_reqs:
@@ -1514,7 +1578,28 @@ class PipelineManager(metaclass=ABCMeta):
else:
msg = f'Change {clist} is needed.'
item.setDequeuedNeedingChange(msg)
- if item.live:
+ # If all the dependencies are already in the pipeline
+ # (but not ahead of this change), then we probably
+ # just added updated versions of them, possibly
+ # updating a cycle. In that case, attempt to
+ # re-enqueue this change with the updated deps.
+ if (item.live and
+ all([self.isChangeAlreadyInPipeline(c)
+ for c in needs_changes])):
+ # Try enqueue, if that succeeds, keep this dequeue quiet
+ try:
+ log.info("Attempting re-enqueue of change %s",
+ item.change)
+ quiet_dequeue = self.addChange(
+ item.change, item.event,
+ enqueue_time=item.enqueue_time,
+ quiet=True,
+ skip_presence_check=True)
+ except Exception:
+ log.exception("Unable to re-enqueue change %s "
+ "which is missing dependencies",
+ item.change)
+ if item.live and not quiet_dequeue:
try:
self.reportItem(item)
except exceptions.MergeFailure:
@@ -1647,7 +1732,7 @@ class PipelineManager(metaclass=ABCMeta):
if (item.live and not dequeued
and self.sched.globals.use_relative_priority):
priority = item.getNodePriority()
- for request_id in item.current_build_set.node_requests.values():
+ for _, request_id in item.current_build_set.getNodeRequests():
node_request = self.sched.nodepool.zk_nodepool.getNodeRequest(
request_id, cached=True)
if not node_request:
@@ -1759,9 +1844,12 @@ class PipelineManager(metaclass=ABCMeta):
if all_completed:
self.sched.executor.resumeBuild(build)
- build.updateAttributes(
- build_set.item.pipeline.manager.current_context,
- paused=False)
+ with build.activeContext(self.current_context):
+ build.paused = False
+ build.addEvent(
+ model.BuildEvent(
+ event_time=time.time(),
+ event_type=model.BuildEvent.TYPE_RESUMED))
def _resetDependentBuilds(self, build_set, build):
job_graph = build_set.job_graph
@@ -2143,7 +2231,7 @@ class PipelineManager(metaclass=ABCMeta):
log.error("Reporting item %s received: %s", item, ret)
return action, (not ret)
- def reportStats(self, item, added=False):
+ def reportStats(self, item, trigger_event=None):
if not self.sched.statsd:
return
try:
@@ -2182,18 +2270,21 @@ class PipelineManager(metaclass=ABCMeta):
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes')
- if added and hasattr(item.event, 'arrived_at_scheduler_timestamp'):
+ if (
+ trigger_event
+ and hasattr(trigger_event, 'arrived_at_scheduler_timestamp')
+ ):
now = time.time()
- arrived = item.event.arrived_at_scheduler_timestamp
+ arrived = trigger_event.arrived_at_scheduler_timestamp
processing = (now - arrived) * 1000
- elapsed = (now - item.event.timestamp) * 1000
+ elapsed = (now - trigger_event.timestamp) * 1000
self.sched.statsd.timing(
basekey + '.event_enqueue_processing_time',
processing)
self.sched.statsd.timing(
basekey + '.event_enqueue_time', elapsed)
self.reportPipelineTiming('event_enqueue_time',
- item.event.timestamp)
+ trigger_event.timestamp)
except Exception:
self.log.exception("Exception reporting pipeline stats")
diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py
index aeeff5adc..ca52b37fe 100644
--- a/zuul/manager/independent.py
+++ b/zuul/manager/independent.py
@@ -61,13 +61,15 @@ class IndependentPipelineManager(PipelineManager):
for needed_change in needed_changes:
# This differs from the dependent pipeline by enqueuing
# changes ahead as "not live", that is, not intended to
- # have jobs run. Also, pipeline requirements are always
- # ignored (which is safe because the changes are not
- # live).
+ # have jobs run. Pipeline requirements are still in place
+ # in order to avoid unreviewed code being executed in
+ # pipelines that require review.
if needed_change not in history:
r = self.addChange(needed_change, event, quiet=True,
- ignore_requirements=True, live=False,
- change_queue=change_queue, history=history,
+ ignore_requirements=ignore_requirements,
+ live=False,
+ change_queue=change_queue,
+ history=history,
dependency_graph=dependency_graph)
if not r:
return False
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index d400c253b..e4688a1b7 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -333,6 +333,26 @@ class Repo(object):
os.rmdir(root)
@staticmethod
+ def _cleanup_leaked_rebase_dirs(local_path, log, messages):
+ for rebase_dir in [".git/rebase-merge", ".git/rebase-apply"]:
+ leaked_dir = os.path.join(local_path, rebase_dir)
+ if not os.path.exists(leaked_dir):
+ continue
+ if log:
+ log.debug("Cleaning leaked %s dir", leaked_dir)
+ else:
+ messages.append(
+ f"Cleaning leaked {leaked_dir} dir")
+ try:
+ shutil.rmtree(leaked_dir)
+ except Exception as exc:
+ msg = f"Failed to remove leaked {leaked_dir} dir:"
+ if log:
+ log.exception(msg)
+ else:
+ messages.append(f"{msg}\n{exc}")
+
+ @staticmethod
def refNameToZuulRef(ref_name: str) -> str:
return "refs/zuul/{}".format(
hashlib.sha1(ref_name.encode("utf-8")).hexdigest()
@@ -384,6 +404,8 @@ class Repo(object):
messages.append("Delete stale Zuul ref {}".format(ref))
Repo._deleteRef(ref.path, repo)
+ Repo._cleanup_leaked_rebase_dirs(local_path, log, messages)
+
# Note: Before git 2.13 deleting a a ref foo/bar leaves an empty
# directory foo behind that will block creating the reference foo
# in the future. As a workaround we must clean up empty directories
@@ -615,7 +637,11 @@ class Repo(object):
self.fetch(ref, zuul_event_id=zuul_event_id)
log.debug("Rebasing %s with args %s", ref, args)
repo.git.checkout('FETCH_HEAD')
- repo.git.rebase(*args)
+ try:
+ repo.git.rebase(*args)
+ except Exception:
+ repo.git.rebase(abort=True)
+ raise
return repo.head.commit
def fetch(self, ref, zuul_event_id=None):
@@ -757,8 +783,19 @@ class Repo(object):
return
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Set remote url to %s", redact_url(url))
- self._git_set_remote_url(self.createRepoObject(zuul_event_id), url)
- self.remote_url = url
+ try:
+ # Update the remote URL as it is used for the clone if the
+ # repo doesn't exist.
+ self.remote_url = url
+ self._git_set_remote_url(
+ self.createRepoObject(zuul_event_id), self.remote_url)
+ except Exception:
+ # Clear out the stored remote URL so we will always set
+ # the Git URL after a failed attempt. This prevents us from
+ # using outdated credentials that might still be stored in
+ # the Git config as part of the URL.
+ self.remote_url = None
+ raise
def mapLine(self, commit, filename, lineno, zuul_event_id=None):
repo = self.createRepoObject(zuul_event_id)
diff --git a/zuul/model.py b/zuul/model.py
index e339b1ffa..e526b749c 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -64,6 +64,7 @@ MERGER_MAP = {
'squash-merge': MERGER_SQUASH_MERGE,
'rebase': MERGER_REBASE,
}
+ALL_MERGE_MODES = list(MERGER_MAP.values())
PRECEDENCE_NORMAL = 0
PRECEDENCE_LOW = 1
@@ -437,6 +438,8 @@ class Pipeline(object):
# reconfigured). A pipeline requires a tenant in order to
# reach the currently active layout for that tenant.
self.tenant = tenant
+ self.allow_other_connections = True
+ self.connections = []
self.source_context = None
self.start_mark = None
self.description = None
@@ -472,6 +475,8 @@ class Pipeline(object):
self.window_decrease_factor = None
self.state = None
self.change_list = None
+ # Only used by the unit tests for assertions
+ self._exception_count = 0
@property
def queues(self):
@@ -615,6 +620,18 @@ class PipelineState(zkobject.ZKObject):
_read_only=False,
)
+ def _lateInitData(self):
+ # If we're initializing the object on our initial refresh,
+ # reset the data to this.
+ return dict(
+ state=Pipeline.STATE_NORMAL,
+ queues=[],
+ old_queues=[],
+ consecutive_failures=0,
+ disabled=False,
+ layout_uuid=self.pipeline.tenant.layout.uuid,
+ )
+
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
obj = klass()
@@ -626,21 +643,23 @@ class PipelineState(zkobject.ZKObject):
return obj
@classmethod
- def create(cls, pipeline, layout_uuid, old_state=None):
- # If the object does not exist in ZK, create it with the
- # default attributes and the supplied layout UUID. Otherwise,
- # return an initialized object (or the old object for reuse)
- # without loading any data so that data can be loaded on the
- # next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline, old_state=None):
+ # If we are resetting an existing pipeline, we will have an
+ # old_state, so just clean up the object references there and
+ # let the next refresh handle updating any data.
+ if old_state:
+ old_state._resetObjectRefs()
+ return old_state
+
+ # Otherwise, we are initializing a pipeline that we haven't
+ # seen before. It still might exist in ZK, but since we
+ # haven't seen it, we don't have any object references to
+ # clean up. We can just start with a clean object, set the
+ # pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
- if state.exists(ctx):
- if old_state:
- old_state._resetObjectRefs()
- return old_state
- return state
- return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
+ return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
@@ -707,8 +726,34 @@ class PipelineState(zkobject.ZKObject):
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
+
+ # Notably, this need not prevent us from performing the
+ # initialization below if necessary. The case of the object
+ # being brand new in ZK supercedes our worry that our old copy
+ # might be out of date since our old copy is, itself, brand
+ # new.
self._set(_read_only=read_only)
- return super().refresh(context)
+ try:
+ return super().refresh(context)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and we
+ # should write it to ZK.
+
+ # Note that typically this code is not used since
+ # currently other objects end up creating the pipeline
+ # path in ZK first. It is included in case that ever
+ # changes. Currently the empty byte-string code path in
+ # deserialize() is used instead.
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self._set(**self._lateInitData())
+ self.internalCreate(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
@@ -716,6 +761,20 @@ class PipelineState(zkobject.ZKObject):
# source change cache.
self.pipeline.manager.clearCache()
+ # If the object doesn't exist we will get back an empty byte
+ # string. This happens because the postConfig call creates
+ # this object without holding the pipeline lock, so it can't
+ # determine whether or not it exists in ZK. We do hold the
+ # pipeline lock here, so if we get the empty byte string, we
+ # know we're initializing the object. In that case, we should
+ # initialize the layout id to the current layout. Nothing
+ # else needs to be set.
+ if raw == b'':
+ context.log.warning("Initializing pipeline state for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ return self._lateInitData()
+
data = super().deserialize(raw, context)
if not self._read_only:
@@ -890,11 +949,34 @@ class PipelineChangeList(zkobject.ShardedZKObject):
super().__init__()
self._set(
changes=[],
+ _change_keys=[],
)
- def refresh(self, context):
- self._retry(context, super().refresh,
- context, max_tries=5)
+ def refresh(self, context, allow_init=True):
+ # Set allow_init to false to indicate that we don't hold the
+ # lock and we should not try to initialize the object in ZK if
+ # it does not exist.
+ try:
+ self._retry(context, super().refresh,
+ context, max_tries=5)
+ except NoNodeError:
+ # If the object doesn't exist we will receive a
+ # NoNodeError. This happens because the postConfig call
+ # creates this object without holding the pipeline lock,
+ # so it can't determine whether or not it exists in ZK.
+ # We do hold the pipeline lock here, so if we get this
+ # error, we know we're initializing the object, and
+ # we should write it to ZK.
+ if allow_init:
+ context.log.warning(
+ "Initializing pipeline change list for %s; "
+ "this is expected only for new pipelines",
+ self.pipeline.name)
+ self.internalCreate(context)
+ else:
+ # If we're called from a context where we can't
+ # initialize the change list, re-raise the exception.
+ raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
@@ -905,19 +987,14 @@ class PipelineChangeList(zkobject.ShardedZKObject):
return pipeline_path + '/change_list'
@classmethod
- def create(cls, pipeline, old_list=None):
- # If the object does not exist in ZK, create it with the
- # default attributes. Otherwise, return an initialized object
- # (or the old object for reuse) without loading any data so
- # that data can be loaded on the next refresh.
- ctx = pipeline.manager.current_context
+ def create(cls, pipeline):
+ # This object may or may not exist in ZK, but we using any of
+ # that data here. We can just start with a clean object, set
+ # the pipeline reference, and let the next refresh deal with
+ # whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
- if change_list.exists(ctx):
- if old_list:
- return old_list
- return change_list
- return cls.new(ctx, pipeline=pipeline)
+ return change_list
def serialize(self, context):
data = {
@@ -925,8 +1002,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
- def deserialize(self, data, context):
- data = super().deserialize(data, context)
+ def deserialize(self, raw, context):
+ data = super().deserialize(raw, context)
change_keys = []
# We must have a dictionary with a 'changes' key; otherwise we
# may be reading immediately after truncating. Allow the
@@ -1805,24 +1882,6 @@ class FrozenSecret(ConfigObject):
)
-class ProjectContext(ConfigObject):
-
- def __init__(self, project_canonical_name, project_name):
- super().__init__()
- self.project_canonical_name = project_canonical_name
- self.project_name = project_name
- self.branch = None
- self.path = None
-
- def __str__(self):
- return self.project_name
-
- def toDict(self):
- return dict(
- project=self.project_name,
- )
-
-
class SourceContext(ConfigObject):
"""A reference to the branch of a project in configuration.
@@ -2391,6 +2450,12 @@ class FrozenJob(zkobject.ZKObject):
data['_' + job_data_key] = None
return data
+ def _save(self, context, *args, **kw):
+ # Before saving, update the buildset with the new job version
+ # so that future readers know to refresh it.
+ self.buildset.updateJobVersion(context, self)
+ return super()._save(context, *args, **kw)
+
def setWaitingStatus(self, status):
if self.waiting_status == status:
return
@@ -3720,6 +3785,27 @@ class BuildReference:
self._path = _path
+class BuildEvent:
+ TYPE_PAUSED = "paused"
+ TYPE_RESUMED = "resumed"
+
+ def __init__(self, event_time, event_type, description=None):
+ self.event_time = event_time
+ self.event_type = event_type
+ self.description = description
+
+ def toDict(self):
+ return {
+ "event_time": self.event_time,
+ "event_type": self.event_type,
+ "description": self.description,
+ }
+
+ @classmethod
+ def fromDict(cls, data):
+ return cls(data["event_time"], data["event_type"], data["description"])
+
+
class Build(zkobject.ZKObject):
"""A Build is an instance of a single execution of a Job.
@@ -3760,6 +3846,8 @@ class Build(zkobject.ZKObject):
zuul_event_id=None,
build_request_ref=None,
span_info=None,
+ # A list of build events like paused, resume, ...
+ events=[],
)
def serialize(self, context):
@@ -3779,6 +3867,7 @@ class Build(zkobject.ZKObject):
"zuul_event_id": self.zuul_event_id,
"build_request_ref": self.build_request_ref,
"span_info": self.span_info,
+ "events": [e.toDict() for e in self.events],
}
if COMPONENT_REGISTRY.model_api < 5:
data["_result_data"] = (self._result_data.getPath()
@@ -3801,6 +3890,11 @@ class Build(zkobject.ZKObject):
def deserialize(self, raw, context):
data = super().deserialize(raw, context)
+ # Deserialize build events
+ data["events"] = [
+ BuildEvent.fromDict(e) for e in data.get("events", [])
+ ]
+
# Result data can change (between a pause and build
# completion).
@@ -3844,6 +3938,12 @@ class Build(zkobject.ZKObject):
def getPath(self):
return f"{self.job.getPath()}/build/{self.uuid}"
+ def _save(self, context, *args, **kw):
+ # Before saving, update the buildset with the new job version
+ # so that future readers know to refresh it.
+ self.job.buildset.updateBuildVersion(context, self)
+ return super()._save(context, *args, **kw)
+
def __repr__(self):
return ('<Build %s of %s voting:%s>' %
(self.uuid, self.job.name, self.job.voting))
@@ -3875,6 +3975,12 @@ class Build(zkobject.ZKObject):
data=secret_result_data,
_path=self.getPath() + '/secret_result_data')
+ def addEvent(self, event):
+ if not self._active_context:
+ raise Exception(
+ "addEvent must be used with a context manager")
+ self.events.append(event)
+
@property
def failed(self):
if self.result and self.result not in ['SUCCESS', 'SKIPPED']:
@@ -4048,6 +4154,8 @@ class BuildSet(zkobject.ZKObject):
job_graph=None,
jobs={},
deduplicated_jobs=[],
+ job_versions={},
+ build_versions={},
# Cached job graph of previous layout; not serialized
_old_job_graph=None,
_old_jobs={},
@@ -4159,6 +4267,8 @@ class BuildSet(zkobject.ZKObject):
"configured_time": self.configured_time,
"start_time": self.start_time,
"repo_state_request_time": self.repo_state_request_time,
+ "job_versions": self.job_versions,
+ "build_versions": self.build_versions,
# jobs (serialize as separate objects)
}
return json.dumps(data, sort_keys=True).encode("utf8")
@@ -4256,7 +4366,8 @@ class BuildSet(zkobject.ZKObject):
if job_name in self.jobs:
job = self.jobs[job_name]
- if not old_build_exists:
+ if ((not old_build_exists) or
+ self.shouldRefreshJob(job)):
tpe_jobs.append((None, job_name,
tpe.submit(job.refresh, context)))
else:
@@ -4268,7 +4379,8 @@ class BuildSet(zkobject.ZKObject):
build = self.builds.get(job_name)
builds[job_name] = build
if build and build.getPath() == build_path:
- if not build.result:
+ if ((not build.result) or
+ self.shouldRefreshBuild(build)):
tpe_jobs.append((
None, job_name, tpe.submit(
build.refresh, context)))
@@ -4323,6 +4435,48 @@ class BuildSet(zkobject.ZKObject):
})
return data
+ def updateBuildVersion(self, context, build):
+ # It's tempting to update versions regardless of the model
+ # API, but if we start writing versions before all components
+ # are upgraded we could get out of sync.
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+
+ # It is common for a lot of builds/jobs to be added at once,
+ # so to avoid writing this buildset object repeatedly during
+ # that time, we only update the version after the initial
+ # creation.
+ version = build.getZKVersion()
+ # If zstat is None, we created the object
+ if version is not None:
+ self.build_versions[build.uuid] = version + 1
+ self.updateAttributes(context, build_versions=self.build_versions)
+
+ def updateJobVersion(self, context, job):
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+
+ version = job.getZKVersion()
+ if version is not None:
+ self.job_versions[job.name] = version + 1
+ self.updateAttributes(context, job_versions=self.job_versions)
+
+ def shouldRefreshBuild(self, build):
+ # Unless all schedulers are updating versions, we can't trust
+ # the data.
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+ current = build.getZKVersion()
+ expected = self.build_versions.get(build.uuid, 0)
+ return expected != current
+
+ def shouldRefreshJob(self, job):
+ if (COMPONENT_REGISTRY.model_api < 12):
+ return True
+ current = job.getZKVersion()
+ expected = self.job_versions.get(job.name, 0)
+ return expected != current
+
@property
def ref(self):
# NOTE(jamielennox): The concept of buildset ref is to be removed and a
@@ -4428,8 +4582,18 @@ class BuildSet(zkobject.ZKObject):
with self.activeContext(self.item.pipeline.manager.current_context):
self.node_requests[job_name] = request_id
- def getJobNodeRequestID(self, job_name):
- return self.node_requests.get(job_name)
+ def getJobNodeRequestID(self, job_name, ignore_deduplicate=False):
+ r = self.node_requests.get(job_name)
+ if ignore_deduplicate and isinstance(r, dict):
+ return None
+ return r
+
+ def getNodeRequests(self):
+ # This ignores deduplicated node requests
+ for job_name, request in self.node_requests.items():
+ if isinstance(request, dict):
+ continue
+ yield job_name, request
def removeJobNodeRequestID(self, job_name):
if job_name in self.node_requests:
@@ -4502,6 +4666,37 @@ class BuildSet(zkobject.ZKObject):
return Attributes(uuid=self.uuid)
+class EventInfo:
+
+ def __init__(self):
+ self.zuul_event_id = None
+ self.timestamp = time.time()
+ self.span_context = None
+
+ @classmethod
+ def fromEvent(cls, event):
+ tinfo = cls()
+ tinfo.zuul_event_id = event.zuul_event_id
+ tinfo.timestamp = event.timestamp
+ tinfo.span_context = event.span_context
+ return tinfo
+
+ @classmethod
+ def fromDict(cls, d):
+ tinfo = cls()
+ tinfo.zuul_event_id = d["zuul_event_id"]
+ tinfo.timestamp = d["timestamp"]
+ tinfo.span_context = d["span_context"]
+ return tinfo
+
+ def toDict(self):
+ return {
+ "zuul_event_id": self.zuul_event_id,
+ "timestamp": self.timestamp,
+ "span_context": self.span_context,
+ }
+
+
class QueueItem(zkobject.ZKObject):
"""Represents the position of a Change in a ChangeQueue.
@@ -4536,7 +4731,7 @@ class QueueItem(zkobject.ZKObject):
live=True, # Whether an item is intended to be processed at all
layout_uuid=None,
_cached_sql_results={},
- event=None, # The trigger event that lead to this queue item
+ event=None, # Info about the event that lead to this queue item
# Additional container for connection specifig information to be
# used by reporters throughout the lifecycle
@@ -4558,6 +4753,9 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
+ if COMPONENT_REGISTRY.model_api >= 13:
+ obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
+
data = obj._trySerialize(context)
obj._save(context, data, create=True)
files_state = (BuildSet.COMPLETE if obj.change.files is not None
@@ -4586,10 +4784,18 @@ class QueueItem(zkobject.ZKObject):
return (tenant, pipeline, uuid)
def serialize(self, context):
- if isinstance(self.event, TriggerEvent):
- event_type = "TriggerEvent"
+ if COMPONENT_REGISTRY.model_api < 13:
+ if isinstance(self.event, TriggerEvent):
+ event_type = "TriggerEvent"
+ else:
+ event_type = self.event.__class__.__name__
else:
- event_type = self.event.__class__.__name__
+ event_type = "EventInfo"
+ if not isinstance(self.event, EventInfo):
+ # Convert our local trigger event to a trigger info
+ # object. This will only happen on the transition to
+ # model API version 13.
+ self._set(event=EventInfo.fromEvent(self.event))
data = {
"uuid": self.uuid,
@@ -4631,14 +4837,18 @@ class QueueItem(zkobject.ZKObject):
# child objects.
self._set(uuid=data["uuid"])
- event_type = data["event"]["type"]
- if event_type == "TriggerEvent":
- event_class = (
- self.pipeline.manager.sched.connections.getTriggerEventClass(
- data["event"]["data"]["driver_name"])
- )
+ if COMPONENT_REGISTRY.model_api < 13:
+ event_type = data["event"]["type"]
+ if event_type == "TriggerEvent":
+ event_class = (
+ self.pipeline.manager.sched.connections
+ .getTriggerEventClass(
+ data["event"]["data"]["driver_name"])
+ )
+ else:
+ event_class = EventTypeIndex.event_type_mapping.get(event_type)
else:
- event_class = EventTypeIndex.event_type_mapping.get(event_type)
+ event_class = EventInfo
if event_class is None:
raise NotImplementedError(
@@ -5860,8 +6070,7 @@ class Bundle:
def deserialize(cls, context, queue, items_by_path, data):
bundle = cls(data["uuid"])
bundle.items = [
- items_by_path.get(p) or QueueItem.fromZK(
- context, p, pipeline=queue.pipeline, queue=queue)
+ items_by_path.get(p) or QueueItem.fromZK(context, p, queue=queue)
for p in data["items"]
]
bundle.started_reporting = data["started_reporting"]
@@ -6968,6 +7177,7 @@ class TenantProjectConfig(object):
self.extra_config_dirs = ()
# Load config from a different branch if this is a config project
self.load_branch = None
+ self.merge_modes = None
def isAlwaysDynamicBranch(self, branch):
if self.always_dynamic_branches is None:
@@ -6978,24 +7188,15 @@ class TenantProjectConfig(object):
def includesBranch(self, branch):
if self.include_branches is not None:
- included = False
for r in self.include_branches:
if r.fullmatch(branch):
- included = True
- break
- else:
- included = True
- if not included:
+ return True
return False
- excluded = False
if self.exclude_branches is not None:
for r in self.exclude_branches:
if r.fullmatch(branch):
- excluded = True
- break
- if excluded:
- return False
+ return False
return True
diff --git a/zuul/model_api.py b/zuul/model_api.py
index 9542cf415..0244296dd 100644
--- a/zuul/model_api.py
+++ b/zuul/model_api.py
@@ -13,5 +13,5 @@
# under the License.
# When making ZK schema changes, increment this and add a record to
-# docs/developer/model-changelog.rst
-MODEL_API = 10
+# doc/source/developer/model-changelog.rst
+MODEL_API = 13
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index e30495d20..cd15a878c 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -23,6 +23,7 @@ import sys
import threading
import time
import traceback
+import urllib.parse
import uuid
from contextlib import suppress
from zuul.vendor.contextlib import nullcontext
@@ -53,6 +54,7 @@ from zuul.model import (
Abide,
Build,
BuildCompletedEvent,
+ BuildEvent,
BuildPausedEvent,
BuildStartedEvent,
BuildStatusEvent,
@@ -98,6 +100,8 @@ from zuul.zk.event_queues import (
PipelineManagementEventQueue,
PipelineResultEventQueue,
PipelineTriggerEventQueue,
+ PIPELINE_ROOT,
+ PIPELINE_NAME_ROOT,
TENANT_ROOT,
)
from zuul.zk.exceptions import LockException
@@ -689,8 +693,8 @@ class Scheduler(threading.Thread):
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.getAllItems(include_old=True):
- nrs = item.current_build_set.node_requests
- for req_id in (nrs.values()):
+ nrs = item.current_build_set.getNodeRequests()
+ for _, req_id in nrs:
outstanding_requests.add(req_id)
leaked_requests = zk_requests - outstanding_requests
for req_id in leaked_requests:
@@ -710,6 +714,7 @@ class Scheduler(threading.Thread):
self._runMergerApiCleanup()
self._runLayoutDataCleanup()
self._runBlobStoreCleanup()
+ self._runLeakedPipelineCleanup()
self.maintainConnectionCache()
except Exception:
self.log.exception("Error in general cleanup:")
@@ -752,6 +757,54 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception("Error in layout data cleanup:")
+ def _runLeakedPipelineCleanup(self):
+ for tenant in self.abide.tenants.values():
+ try:
+ with tenant_read_lock(self.zk_client, tenant.name,
+ blocking=False):
+ if not self.isTenantLayoutUpToDate(tenant.name):
+ self.log.debug(
+ "Skipping leaked pipeline cleanup for tenant %s",
+ tenant.name)
+ continue
+ valid_pipelines = tenant.layout.pipelines.values()
+ valid_state_paths = set(
+ p.state.getPath() for p in valid_pipelines)
+ valid_event_root_paths = set(
+ PIPELINE_NAME_ROOT.format(
+ tenant=p.tenant.name, pipeline=p.name)
+ for p in valid_pipelines)
+
+ safe_tenant = urllib.parse.quote_plus(tenant.name)
+ state_root = f"/zuul/tenant/{safe_tenant}/pipeline"
+ event_root = PIPELINE_ROOT.format(tenant=tenant.name)
+
+ all_state_paths = set(
+ f"{state_root}/{p}" for p in
+ self.zk_client.client.get_children(state_root))
+ all_event_root_paths = set(
+ f"{event_root}/{p}" for p in
+ self.zk_client.client.get_children(event_root))
+
+ leaked_state_paths = all_state_paths - valid_state_paths
+ leaked_event_root_paths = (
+ all_event_root_paths - valid_event_root_paths)
+
+ for leaked_path in (
+ leaked_state_paths | leaked_event_root_paths):
+ self.log.info("Removing leaked pipeline path %s",
+ leaked_path)
+ try:
+ self.zk_client.client.delete(leaked_path,
+ recursive=True)
+ except Exception:
+ self.log.exception(
+ "Error removing leaked pipeline path %s in "
+ "tenant %s", leaked_path, tenant.name)
+ except LockException:
+ # We'll cleanup this tenant on the next iteration
+ pass
+
def _runBlobStoreCleanup(self):
self.log.debug("Starting blob store cleanup")
try:
@@ -1291,6 +1344,21 @@ class Scheduler(threading.Thread):
self.log.info("Local layout update complete for %s (duration: %s "
"seconds)", tenant_name, duration)
+ def isTenantLayoutUpToDate(self, tenant_name):
+ remote_state = self.tenant_layout_state.get(tenant_name)
+ if remote_state is None:
+ # The tenant may still be in the
+ # process of initial configuration
+ self.wake_event.set()
+ return False
+ local_state = self.local_layout_state.get(tenant_name)
+ if local_state is None or remote_state > local_state:
+ self.log.debug("Local layout of tenant %s not up to date",
+ tenant_name)
+ self.layout_update_event.set()
+ return False
+ return True
+
def _checkTenantSourceConf(self, config):
tenant_config = None
script = False
@@ -1460,7 +1528,7 @@ class Scheduler(threading.Thread):
with self.createZKContext(lock, self.log) as ctx:
if tenant is not None:
self._reconfigureTenant(ctx, min_ltimes,
- -1,
+ event.zuul_event_ltime,
tenant, old_tenant)
else:
self._reconfigureDeleteTenant(ctx, old_tenant)
@@ -1480,7 +1548,6 @@ class Scheduler(threading.Thread):
# This is called in the scheduler loop after another thread submits
# a request
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.log.debug("Updating system config")
self.updateSystemConfig()
with self.layout_lock:
@@ -1632,7 +1699,7 @@ class Scheduler(threading.Thread):
item.removeBuild(build)
builds_to_cancel.append(build)
for request_job, request in \
- item.current_build_set.node_requests.items():
+ item.current_build_set.getNodeRequests():
new_job = item.getJob(request_job)
if not new_job:
requests_to_cancel.append(
@@ -1654,7 +1721,7 @@ class Scheduler(threading.Thread):
for build in item.current_build_set.getBuilds():
builds_to_cancel.append(build)
for request_job, request in \
- item.current_build_set.node_requests.items():
+ item.current_build_set.getNodeRequests():
requests_to_cancel.append(
(
item.current_build_set,
@@ -1690,12 +1757,25 @@ class Scheduler(threading.Thread):
new_pipeline = tenant.layout.pipelines.get(name)
if not new_pipeline:
with old_pipeline.manager.currentContext(context):
- self._reconfigureDeletePipeline(old_pipeline)
+ try:
+ self._reconfigureDeletePipeline(old_pipeline)
+ except Exception:
+ self.log.exception(
+ "Failed to cleanup deleted pipeline %s:",
+ old_pipeline)
+ self.management_events[tenant.name].initialize()
+ self.trigger_events[tenant.name].initialize()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
for pipeline in tenant.layout.pipelines.values():
+ self.pipeline_management_events[tenant.name][
+ pipeline.name].initialize()
+ self.pipeline_trigger_events[tenant.name][
+ pipeline.name].initialize()
+ self.pipeline_result_events[tenant.name
+ ][pipeline.name].initialize()
for trigger in pipeline.triggers:
trigger.postConfig(pipeline)
for reporter in pipeline.actions:
@@ -1753,7 +1833,11 @@ class Scheduler(threading.Thread):
(tenant,))
for pipeline in tenant.layout.pipelines.values():
with pipeline.manager.currentContext(context):
- self._reconfigureDeletePipeline(pipeline)
+ try:
+ self._reconfigureDeletePipeline(pipeline)
+ except Exception:
+ self.log.exception(
+ "Failed to cleanup deleted pipeline %s:", pipeline)
# Delete the tenant root path for this tenant in ZooKeeper to remove
# all tenant specific event queues
@@ -1769,45 +1853,80 @@ class Scheduler(threading.Thread):
def _reconfigureDeletePipeline(self, pipeline):
self.log.info("Removing pipeline %s during reconfiguration" %
(pipeline,))
- for shared_queue in pipeline.queues:
- builds_to_cancel = []
- requests_to_cancel = []
- for item in shared_queue.queue:
- with item.activeContext(pipeline.manager.current_context):
- item.item_ahead = None
- item.items_behind = []
- self.log.info(
- "Removing item %s during reconfiguration" % (item,))
- for build in item.current_build_set.getBuilds():
- builds_to_cancel.append(build)
- for request_job, request in \
- item.current_build_set.node_requests.items():
- requests_to_cancel.append(
- (
- item.current_build_set,
- request,
- item.getJob(request_job),
- )
+
+ ctx = pipeline.manager.current_context
+ pipeline.state.refresh(ctx)
+
+ builds_to_cancel = []
+ requests_to_cancel = []
+ for item in pipeline.getAllItems():
+ with item.activeContext(pipeline.manager.current_context):
+ item.item_ahead = None
+ item.items_behind = []
+ self.log.info(
+ "Removing item %s during reconfiguration" % (item,))
+ for build in item.current_build_set.getBuilds():
+ builds_to_cancel.append(build)
+ for request_job, request in \
+ item.current_build_set.getNodeRequests():
+ requests_to_cancel.append(
+ (
+ item.current_build_set,
+ request,
+ item.getJob(request_job),
)
- try:
- self.sql.reportBuildsetEnd(
- item.current_build_set, 'dequeue',
- final=False, result='DEQUEUED')
- except Exception:
- self.log.exception(
- "Error reporting buildset completion to DB:")
+ )
+ try:
+ self.sql.reportBuildsetEnd(
+ item.current_build_set, 'dequeue',
+ final=False, result='DEQUEUED')
+ except Exception:
+ self.log.exception(
+ "Error reporting buildset completion to DB:")
- for build in builds_to_cancel:
- self.log.info(
- "Canceling build %s during reconfiguration" % (build,))
+ for build in builds_to_cancel:
+ self.log.info(
+ "Canceling build %s during reconfiguration", build)
+ try:
self.cancelJob(build.build_set, build.job,
build=build, force=True)
- for build_set, request, request_job in requests_to_cancel:
- self.log.info(
- "Canceling node request %s during reconfiguration",
- request)
+ except Exception:
+ self.log.exception(
+ "Error canceling build %s during reconfiguration", build)
+ for build_set, request, request_job in requests_to_cancel:
+ self.log.info(
+ "Canceling node request %s during reconfiguration", request)
+ try:
self.cancelJob(build_set, request_job, force=True)
- shared_queue.delete(pipeline.manager.current_context)
+ except Exception:
+ self.log.exception(
+ "Error canceling node request %s during reconfiguration",
+ request)
+
+ # Delete the pipeline event root path in ZooKeeper to remove
+ # all pipeline specific event queues.
+ try:
+ self.zk_client.client.delete(
+ PIPELINE_NAME_ROOT.format(
+ tenant=pipeline.tenant.name,
+ pipeline=pipeline.name),
+ recursive=True)
+ except Exception:
+ # In case a pipeline event has been submitted during
+ # reconfiguration this cleanup will fail.
+ self.log.exception(
+ "Error removing event queues for deleted pipeline %s in "
+ "tenant %s", pipeline.name, pipeline.tenant.name)
+
+ # Delete the pipeline root path in ZooKeeper to remove all pipeline
+ # state.
+ try:
+ self.zk_client.client.delete(pipeline.state.getPath(),
+ recursive=True)
+ except Exception:
+ self.log.exception(
+ "Error removing state for deleted pipeline %s in tenant %s",
+ pipeline.name, pipeline.tenant.name)
def _doPromoteEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
@@ -2006,84 +2125,74 @@ class Scheduler(threading.Thread):
return
self.log.debug("Run handler awake")
self.run_handler_lock.acquire()
- try:
- if not self._stopped:
- self.process_reconfigure_queue()
+ with self.statsd_timer("zuul.scheduler.run_handler"):
+ try:
+ self._run()
+ except Exception:
+ self.log.exception("Exception in run handler:")
+ # There may still be more events to process
+ self.wake_event.set()
+ finally:
+ self.run_handler_lock.release()
- if self.unparsed_abide.ltime < self.system_config_cache.ltime:
- self.updateSystemConfig()
+ def _run(self):
+ if not self._stopped:
+ self.process_reconfigure_queue()
- for tenant_name in self.unparsed_abide.tenants:
- if self._stopped:
- break
+ if self.unparsed_abide.ltime < self.system_config_cache.ltime:
+ self.updateSystemConfig()
- tenant = self.abide.tenants.get(tenant_name)
- if not tenant:
- continue
+ for tenant_name in self.unparsed_abide.tenants:
+ if self._stopped:
+ break
- # This will also forward events for the pipelines
- # (e.g. enqueue or dequeue events) to the matching
- # pipeline event queues that are processed afterwards.
- self.process_tenant_management_queue(tenant)
+ tenant = self.abide.tenants.get(tenant_name)
+ if not tenant:
+ continue
- if self._stopped:
- break
+ # This will also forward events for the pipelines
+ # (e.g. enqueue or dequeue events) to the matching
+ # pipeline event queues that are processed afterwards.
+ self.process_tenant_management_queue(tenant)
- try:
- with tenant_read_lock(
- self.zk_client, tenant_name, blocking=False
- ) as tlock:
- remote_state = self.tenant_layout_state.get(
- tenant_name)
- if remote_state is None:
- # The tenant may still be in the
- # process of initial configuration
- self.wake_event.set()
- continue
- local_state = self.local_layout_state.get(
- tenant_name)
- if (local_state is None or
- remote_state > local_state):
- self.log.debug(
- "Local layout of tenant %s not up to date",
- tenant.name)
- self.layout_update_event.set()
- continue
+ if self._stopped:
+ break
- # Get tenant again, as it might have been updated
- # by a tenant reconfig or layout change.
- tenant = self.abide.tenants[tenant_name]
+ try:
+ with tenant_read_lock(
+ self.zk_client, tenant_name, blocking=False
+ ) as tlock:
+ if not self.isTenantLayoutUpToDate(tenant_name):
+ continue
- if not self._stopped:
- # This will forward trigger events to pipeline
- # event queues that are processed below.
- self.process_tenant_trigger_queue(tenant)
+ # Get tenant again, as it might have been updated
+ # by a tenant reconfig or layout change.
+ tenant = self.abide.tenants[tenant_name]
- self.process_pipelines(tenant, tlock)
- except LockException:
- self.log.debug("Skipping locked tenant %s",
- tenant.name)
- remote_state = self.tenant_layout_state.get(
- tenant_name)
- local_state = self.local_layout_state.get(
- tenant_name)
- if (remote_state is None or
- local_state is None or
- remote_state > local_state):
- # Let's keep looping until we've updated to the
- # latest tenant layout.
- self.wake_event.set()
- except Exception:
- self.log.exception("Exception processing tenant %s:",
- tenant_name)
- # There may still be more events to process
- self.wake_event.set()
+ if not self._stopped:
+ # This will forward trigger events to pipeline
+ # event queues that are processed below.
+ self.process_tenant_trigger_queue(tenant)
+
+ self.process_pipelines(tenant, tlock)
+ except LockException:
+ self.log.debug("Skipping locked tenant %s",
+ tenant.name)
+ remote_state = self.tenant_layout_state.get(
+ tenant_name)
+ local_state = self.local_layout_state.get(
+ tenant_name)
+ if (remote_state is None or
+ local_state is None or
+ remote_state > local_state):
+ # Let's keep looping until we've updated to the
+ # latest tenant layout.
+ self.wake_event.set()
except Exception:
- self.log.exception("Exception in run handler:")
+ self.log.exception("Exception processing tenant %s:",
+ tenant_name)
# There may still be more events to process
self.wake_event.set()
- finally:
- self.run_handler_lock.release()
def primeSystemConfig(self):
with self.layout_lock:
@@ -2101,6 +2210,7 @@ class Scheduler(threading.Thread):
def updateSystemConfig(self):
with self.layout_lock:
+ self.log.debug("Updating system config")
self.unparsed_abide, self.globals = self.system_config_cache.get()
self.ansible_manager = AnsibleManager(
default_version=self.globals.default_ansible_version)
@@ -2135,7 +2245,12 @@ class Scheduler(threading.Thread):
self.zk_client, tenant.name, pipeline.name,
blocking=False) as lock,\
self.createZKContext(lock, self.log) as ctx:
+ self.log.debug("Processing pipeline %s in tenant %s",
+ pipeline.name, tenant.name)
with pipeline.manager.currentContext(ctx):
+ if ((tenant.name, pipeline.name) in
+ self._profile_pipelines):
+ ctx.profile = True
with self.statsd_timer(f'{stats_key}.handling'):
refreshed = self._process_pipeline(
tenant, pipeline)
@@ -2204,14 +2319,10 @@ class Scheduler(threading.Thread):
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
ctx = pipeline.manager.current_context
- if (tenant.name, pipeline.name) in self._profile_pipelines:
- ctx.profile = True
with self.statsd_timer(f'{stats_key}.refresh'):
pipeline.change_list.refresh(ctx)
pipeline.summary.refresh(ctx)
pipeline.state.refresh(ctx)
- if (tenant.name, pipeline.name) in self._profile_pipelines:
- ctx.profile = False
pipeline.state.setDirty(self.zk_client.client)
if pipeline.state.old_queues:
@@ -2230,6 +2341,7 @@ class Scheduler(threading.Thread):
pass
except Exception:
self.log.exception("Exception in pipeline processing:")
+ pipeline._exception_count += 1
pipeline.state.updateAttributes(
ctx, state=pipeline.STATE_ERROR)
# Continue processing other pipelines+tenants
@@ -2247,7 +2359,9 @@ class Scheduler(threading.Thread):
for pipeline in tenant.layout.pipelines.values():
self.log.debug("Gather relevant cache items for: %s %s",
tenant.name, pipeline.name)
- pipeline.change_list.refresh(ctx)
+ # This will raise an exception and abort the process if
+ # unable to refresh the change list.
+ pipeline.change_list.refresh(ctx, allow_init=False)
change_keys = pipeline.change_list.getChangeKeys()
relevant_changes = pipeline.manager.resolveChangeKeys(
change_keys)
@@ -2273,11 +2387,21 @@ class Scheduler(threading.Thread):
with trigger_queue_lock(
self.zk_client, tenant.name, blocking=False
):
+ self.log.debug("Processing tenant trigger events in %s",
+ tenant.name)
# Update the pipeline changes
ctx = self.createZKContext(None, self.log)
for pipeline in tenant.layout.pipelines.values():
+ # This will raise an exception if it is unable to
+ # refresh the change list. We will proceed anyway
+ # and use our data from the last time we did
+ # refresh in order to avoid stalling trigger
+ # processing. In this case we may not forward
+ # some events which are related to changes in the
+ # pipeline but don't match the pipeline trigger
+ # criteria.
try:
- pipeline.change_list.refresh(ctx)
+ pipeline.change_list.refresh(ctx, allow_init=False)
except Exception:
self.log.exception(
"Unable to refresh pipeline change list for %s",
@@ -2395,9 +2519,26 @@ class Scheduler(threading.Thread):
event.span_context = tracing.getSpanContext(span)
for pipeline in tenant.layout.pipelines.values():
+ # For most kinds of dependencies, it's sufficient to check
+ # if this change is already in the pipeline, because the
+ # only way to update a dependency cycle is to update one
+ # of the changes in it. However, dependencies-by-topic
+ # can have changes added to the cycle without updating any
+ # of the existing changes in the cycle. That means in
+ # order to detect whether a new change is added to an
+ # existing cycle in the pipeline, we need to know all of
+ # the dependencies of the new change, and check if *they*
+ # are in the pipeline. Therefore, go ahead and update our
+ # dependencies here so they are available for comparison
+ # against the pipeline contents. This front-loads some
+ # work that otherwise would happen in the pipeline
+ # manager, but the result of the work goes into the change
+ # cache, so it's not wasted; it's just less parallelized.
+ if isinstance(change, Change):
+ pipeline.manager.updateCommitDependencies(change, event)
if (
pipeline.manager.eventMatches(event, change)
- or pipeline.manager.isAnyVersionOfChangeInPipeline(change)
+ or pipeline.manager.isChangeRelevantToPipeline(change)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name
@@ -2468,6 +2609,13 @@ class Scheduler(threading.Thread):
with management_queue_lock(
self.zk_client, tenant.name, blocking=False
):
+ if not self.isTenantLayoutUpToDate(tenant.name):
+ self.log.debug(
+ "Skipping management event queue for tenant %s",
+ tenant.name)
+ return
+ self.log.debug("Processing tenant management events in %s",
+ tenant.name)
self._process_tenant_management_queue(tenant)
except LockException:
self.log.debug("Skipping locked management event queue"
@@ -2698,6 +2846,10 @@ class Scheduler(threading.Thread):
# with child job skipping.
with build.activeContext(pipeline.manager.current_context):
build.paused = True
+ build.addEvent(
+ BuildEvent(
+ event_time=time.time(),
+ event_type=BuildEvent.TYPE_PAUSED))
build.setResultData(
event.data.get("data", {}),
event.data.get("secret_data", {}))
@@ -2825,7 +2977,8 @@ class Scheduler(threading.Thread):
# In case the build didn't show up on any executor, the node
# request does still exist, so we have to make sure it is
# removed from ZK.
- request_id = build.build_set.getJobNodeRequestID(build.job.name)
+ request_id = build.build_set.getJobNodeRequestID(
+ build.job.name, ignore_deduplicate=True)
if request_id:
self.nodepool.deleteNodeRequest(
request_id, event_id=build.zuul_event_id)
@@ -2926,9 +3079,10 @@ class Scheduler(threading.Thread):
# Cancel node request if needed
req_id = buildset.getJobNodeRequestID(job_name)
if req_id:
- req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
- if req:
- self.nodepool.cancelRequest(req)
+ if not isinstance(req_id, dict):
+ req = self.nodepool.zk_nodepool.getNodeRequest(req_id)
+ if req:
+ self.nodepool.cancelRequest(req)
buildset.removeJobNodeRequestID(job_name)
# Cancel build if needed
diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py
index faf97b48b..c2487af88 100644
--- a/zuul/source/__init__.py
+++ b/zuul/source/__init__.py
@@ -15,6 +15,8 @@
import abc
import time
+from zuul import model
+
class BaseSource(object, metaclass=abc.ABCMeta):
"""Base class for sources.
@@ -95,7 +97,7 @@ class BaseSource(object, metaclass=abc.ABCMeta):
# info on subsequent requests we can continue to do the
# requested job work.
try:
- dep = self.getChangeByURL(url, event)
+ return self.getChangeByURL(url, event)
except Exception:
# Note that if the change isn't found dep is None.
# We do not raise in that case and do not need to handle it
@@ -107,7 +109,6 @@ class BaseSource(object, metaclass=abc.ABCMeta):
time.sleep(1)
else:
raise
- return dep
@abc.abstractmethod
def getChangesDependingOn(self, change, projects, tenant):
@@ -183,6 +184,20 @@ class BaseSource(object, metaclass=abc.ABCMeta):
"""
+ def getProjectMergeModes(self, project, tenant, min_ltime=-1):
+ """Get supported merge modes for a project
+
+ This method is called very frequently, and should generally
+ return quickly. The connection is expected to cache merge
+ modes for all projects queried.
+
+ The default implementation indicates that all merge modes are
+ supported.
+
+ """
+
+ return model.ALL_MERGE_MODES
+
@abc.abstractmethod
def getProjectBranchCacheLtime(self):
"""Return the current ltime of the project branch cache."""
diff --git a/zuul/version.py b/zuul/version.py
index eafa6c2c7..80512842b 100644
--- a/zuul/version.py
+++ b/zuul/version.py
@@ -17,11 +17,10 @@
import json
-import pbr.version
+from importlib import metadata as importlib_metadata
import pkg_resources
-version_info = pbr.version.VersionInfo('zuul')
-release_string = version_info.release_string()
+release_string = importlib_metadata.distribution('zuul').version
is_release = None
git_version = None
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index 7f27cd970..47226fd7d 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -395,7 +395,7 @@ class LogStreamer(object):
self.finger_socket = socket.create_connection(
(server, port), timeout=10)
if use_ssl:
- context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = self.zuulweb.finger_tls_verify_hostnames
context.load_cert_chain(
diff --git a/zuul/zk/branch_cache.py b/zuul/zk/branch_cache.py
index 6fa531fad..0a8872158 100644
--- a/zuul/zk/branch_cache.py
+++ b/zuul/zk/branch_cache.py
@@ -13,11 +13,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
+import collections
import logging
import json
from zuul.zk.zkobject import ZKContext, ShardedZKObject
from zuul.zk.locks import SessionAwareReadLock, SessionAwareWriteLock, locked
+from zuul.zk.components import COMPONENT_REGISTRY
+from zuul import model
from kazoo.exceptions import NoNodeError
@@ -63,15 +67,28 @@ class BranchCacheZKObject(ShardedZKObject):
def __init__(self):
super().__init__()
self._set(protected={},
- remainder={})
+ remainder={},
+ merge_modes={})
def serialize(self, context):
data = {
"protected": self.protected,
"remainder": self.remainder,
}
+ # This is mostly here to enable unit tests of upgrades, it's
+ # safe to move into the dict above at any time.
+ if (COMPONENT_REGISTRY.model_api >= 11):
+ data["merge_modes"] = self.merge_modes
return json.dumps(data, sort_keys=True).encode("utf8")
+ def deserialize(self, raw, context):
+ data = super().deserialize(raw, context)
+ # MODEL_API < 11
+ if "merge_modes" not in data:
+ data["merge_modes"] = collections.defaultdict(
+ lambda: model.ALL_MERGE_MODES)
+ return data
+
def _save(self, context, data, create=False):
super()._save(context, data, create)
zstat = context.client.exists(self.getPath())
@@ -119,10 +136,12 @@ class BranchCache:
if projects is None:
self.cache.protected.clear()
self.cache.remainder.clear()
+ self.cache.merge_modes.clear()
else:
for p in projects:
self.cache.protected.pop(p, None)
self.cache.remainder.pop(p, None)
+ self.cache.merge_modes.pop(p, None)
def getProjectBranches(self, project_name, exclude_unprotected,
min_ltime=-1, default=RAISE_EXCEPTION):
@@ -255,6 +274,68 @@ class BranchCache:
if branch not in remainder_branches:
remainder_branches.append(branch)
+ def getProjectMergeModes(self, project_name,
+ min_ltime=-1, default=RAISE_EXCEPTION):
+ """Get the merge modes for the given project.
+
+ Checking the branch cache we need to distinguish three different
+ cases:
+
+ 1. cache miss (not queried yet)
+ 2. cache hit (including empty list of merge modes)
+ 3. error when fetching merge modes
+
+ If the cache doesn't contain any merge modes for the project and no
+ default value is provided a LookupError is raised.
+
+ If there was an error fetching the merge modes, the return value
+ will be None.
+
+ Otherwise the list of merge modes will be returned.
+
+ :param str project_name:
+ The project for which the merge modes are returned.
+ :param int min_ltime:
+ The minimum cache ltime to consider the cache valid.
+ :param any default:
+ Optional default value to return if no cache entry exits.
+
+ :returns: The list of merge modes by model id, or None if there was
+ an error when fetching the merge modes.
+ """
+ if self.ltime < min_ltime:
+ with locked(self.rlock):
+ self.cache.refresh(self.zk_context)
+
+ merge_modes = None
+ try:
+ merge_modes = self.cache.merge_modes[project_name]
+ except KeyError:
+ if default is RAISE_EXCEPTION:
+ raise LookupError(
+ f"No merge modes for project {project_name}")
+ else:
+ return default
+
+ return merge_modes
+
+ def setProjectMergeModes(self, project_name, merge_modes):
+ """Set the supported merge modes for the given project.
+
+ Use None as a sentinel value for the merge modes to indicate
+ that there was a fetch error.
+
+ :param str project_name:
+ The project for the merge modes.
+ :param list[int] merge_modes:
+ The list of merge modes (by model ID) or None.
+
+ """
+
+ with locked(self.wlock):
+ with self.cache.activeContext(self.zk_context):
+ self.cache.merge_modes[project_name] = merge_modes
+
@property
def ltime(self):
return self.cache._zstat.last_modified_transaction_id
diff --git a/zuul/zk/config_cache.py b/zuul/zk/config_cache.py
index 45e355c83..4fcfe94c8 100644
--- a/zuul/zk/config_cache.py
+++ b/zuul/zk/config_cache.py
@@ -20,10 +20,10 @@ from collections.abc import MutableMapping
from urllib.parse import quote_plus, unquote_plus
from kazoo.exceptions import NoNodeError
+from kazoo.recipe import lock
from zuul import model
from zuul.zk import sharding, ZooKeeperSimpleBase
-from zuul.zk.vendor import lock
CONFIG_ROOT = "/zuul/config"
diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py
index ebb33ec88..ce38cf23e 100644
--- a/zuul/zk/event_queues.py
+++ b/zuul/zk/event_queues.py
@@ -144,6 +144,17 @@ class EventWatcher(ZooKeeperSimpleBase):
self.watched_tenants.add(tenant_name)
def _pipelineWatch(self, tenant_name, pipelines):
+ # Remove pipelines that no longer exists from the watch list so
+ # we re-register the children watch in case the pipeline is
+ # added again.
+ for watched_tenant, pipeline_name in list(self.watched_pipelines):
+ if watched_tenant != tenant_name:
+ continue
+ if pipeline_name in pipelines:
+ continue
+ with suppress(KeyError):
+ self.watched_pipelines.remove((tenant_name, pipeline_name))
+
for pipeline_name in pipelines:
key = (tenant_name, pipeline_name)
if key in self.watched_pipelines:
@@ -205,6 +216,9 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
self.queue_root = queue_root
self.event_root = f'{queue_root}/queue'
self.data_root = f'{queue_root}/data'
+ self.initialize()
+
+ def initialize(self):
self.kazoo_client.ensure_path(self.event_root)
self.kazoo_client.ensure_path(self.data_root)
diff --git a/zuul/zk/locks.py b/zuul/zk/locks.py
index ade25dd75..ff098c5c5 100644
--- a/zuul/zk/locks.py
+++ b/zuul/zk/locks.py
@@ -17,9 +17,9 @@ from contextlib import contextmanager
from urllib.parse import quote_plus
from kazoo.protocol.states import KazooState
+from kazoo.recipe.lock import Lock, ReadLock, WriteLock
from zuul.zk.exceptions import LockException
-from zuul.zk.vendor.lock import Lock, ReadLock, WriteLock
LOCK_ROOT = "/zuul/locks"
TENANT_LOCK_ROOT = f"{LOCK_ROOT}/tenant"
diff --git a/zuul/zk/vendor/lock.py b/zuul/zk/vendor/lock.py
deleted file mode 100644
index fb387f70b..000000000
--- a/zuul/zk/vendor/lock.py
+++ /dev/null
@@ -1,759 +0,0 @@
-# This file is from the Kazoo project and contains fixes proposed in
-# https://github.com/python-zk/kazoo/pull/650
-#
-# https://github.com/python-zk/kazoo/blob/master/kazoo/recipe/lock.py
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Zookeeper Locking Implementations
-
-:Maintainer: Ben Bangert <ben@groovie.org>
-:Status: Production
-
-Error Handling
-==============
-
-It's highly recommended to add a state listener with
-:meth:`~KazooClient.add_listener` and watch for
-:attr:`~KazooState.LOST` and :attr:`~KazooState.SUSPENDED` state
-changes and re-act appropriately. In the event that a
-:attr:`~KazooState.LOST` state occurs, its certain that the lock
-and/or the lease has been lost.
-
-"""
-import re
-import sys
-
-try:
- from time import monotonic as now
-except ImportError:
- from time import time as now
-import uuid
-
-import six
-
-from kazoo.exceptions import (
- CancelledError,
- KazooException,
- LockTimeout,
- NoNodeError,
-)
-from kazoo.protocol.states import KazooState
-from kazoo.retry import (
- ForceRetryError,
- KazooRetry,
- RetryFailedError,
-)
-
-
-class _Watch(object):
- def __init__(self, duration=None):
- self.duration = duration
- self.started_at = None
-
- def start(self):
- self.started_at = now()
-
- def leftover(self):
- if self.duration is None:
- return None
- else:
- elapsed = now() - self.started_at
- return max(0, self.duration - elapsed)
-
-
-class Lock(object):
- """Kazoo Lock
-
- Example usage with a :class:`~kazoo.client.KazooClient` instance:
-
- .. code-block:: python
-
- zk = KazooClient()
- zk.start()
- lock = zk.Lock("/lockpath", "my-identifier")
- with lock: # blocks waiting for lock acquisition
- # do something with the lock
-
- Note: This lock is not *re-entrant*. Repeated calls after already
- acquired will block.
-
- This is an exclusive lock. For a read/write lock, see :class:`WriteLock`
- and :class:`ReadLock`.
-
- """
-
- # Node name, after the contender UUID, before the sequence
- # number. Involved in read/write locks.
- _NODE_NAME = "__lock__"
-
- # Node names which exclude this contender when present at a lower
- # sequence number. Involved in read/write locks.
- _EXCLUDE_NAMES = ["__lock__"]
-
- def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
- """Create a Kazoo lock.
-
- :param client: A :class:`~kazoo.client.KazooClient` instance.
- :param path: The lock path to use.
- :param identifier: Name to use for this lock contender. This can be
- useful for querying to see who the current lock
- contenders are.
- :param extra_lock_patterns: Strings that will be used to
- identify other znode in the path
- that should be considered contenders
- for this lock.
- Use this for cross-implementation
- compatibility.
-
- .. versionadded:: 2.7.1
- The extra_lock_patterns option.
- """
- self.client = client
- self.path = path
- self._exclude_names = set(
- self._EXCLUDE_NAMES + list(extra_lock_patterns)
- )
- self._contenders_re = re.compile(
- r"(?:{patterns})(-?\d{{10}})$".format(
- patterns="|".join(self._exclude_names)
- )
- )
-
- # some data is written to the node. this can be queried via
- # contenders() to see who is contending for the lock
- self.data = str(identifier or "").encode("utf-8")
- self.node = None
-
- self.wake_event = client.handler.event_object()
-
- # props to Netflix Curator for this trick. It is possible for our
- # create request to succeed on the server, but for a failure to
- # prevent us from getting back the full path name. We prefix our
- # lock name with a uuid and can check for its presence on retry.
- self.prefix = uuid.uuid4().hex + self._NODE_NAME
- self.create_path = self.path + "/" + self.prefix
-
- self.create_tried = False
- self.is_acquired = False
- self.assured_path = False
- self.cancelled = False
- self._retry = KazooRetry(
- max_tries=None, sleep_func=client.handler.sleep_func
- )
- self._lock = client.handler.lock_object()
-
- def _ensure_path(self):
- self.client.ensure_path(self.path)
- self.assured_path = True
-
- def cancel(self):
- """Cancel a pending lock acquire."""
- self.cancelled = True
- self.wake_event.set()
-
- def acquire(self, blocking=True, timeout=None, ephemeral=True):
- """
- Acquire the lock. By defaults blocks and waits forever.
-
- :param blocking: Block until lock is obtained or return immediately.
- :type blocking: bool
- :param timeout: Don't wait forever to acquire the lock.
- :type timeout: float or None
- :param ephemeral: Don't use ephemeral znode for the lock.
- :type ephemeral: bool
-
- :returns: Was the lock acquired?
- :rtype: bool
-
- :raises: :exc:`~kazoo.exceptions.LockTimeout` if the lock
- wasn't acquired within `timeout` seconds.
-
- .. warning::
-
- When :attr:`ephemeral` is set to False session expiration
- will not release the lock and must be handled separately.
-
- .. versionadded:: 1.1
- The timeout option.
-
- .. versionadded:: 2.4.1
- The ephemeral option.
- """
-
- def _acquire_lock():
- got_it = self._lock.acquire(False)
- if not got_it:
- raise ForceRetryError()
- return True
-
- retry = self._retry.copy()
- retry.deadline = timeout
-
- # Ensure we are locked so that we avoid multiple threads in
- # this acquistion routine at the same time...
- locked = self._lock.acquire(False)
- if not locked and not blocking:
- return False
- if not locked:
- # Lock acquire doesn't take a timeout, so simulate it...
- # XXX: This is not true in Py3 >= 3.2
- try:
- locked = retry(_acquire_lock)
- except RetryFailedError:
- return False
- already_acquired = self.is_acquired
- try:
- gotten = False
- try:
- gotten = retry(
- self._inner_acquire,
- blocking=blocking,
- timeout=timeout,
- ephemeral=ephemeral,
- )
- except RetryFailedError:
- pass
- except KazooException:
- # if we did ultimately fail, attempt to clean up
- exc_info = sys.exc_info()
- if not already_acquired:
- self._best_effort_cleanup()
- self.cancelled = False
- six.reraise(exc_info[0], exc_info[1], exc_info[2])
- if gotten:
- self.is_acquired = gotten
- if not gotten and not already_acquired:
- self._best_effort_cleanup()
- return gotten
- finally:
- self._lock.release()
-
- def _watch_session(self, state):
- self.wake_event.set()
- return True
-
- def _inner_acquire(self, blocking, timeout, ephemeral=True):
-
- # wait until it's our chance to get it..
- if self.is_acquired:
- if not blocking:
- return False
- raise ForceRetryError()
-
- # make sure our election parent node exists
- if not self.assured_path:
- self._ensure_path()
-
- node = None
- if self.create_tried:
- node = self._find_node()
- else:
- self.create_tried = True
-
- if not node:
- node = self.client.create(
- self.create_path, self.data, ephemeral=ephemeral, sequence=True
- )
- # strip off path to node
- node = node[len(self.path) + 1:]
-
- self.node = node
-
- while True:
- self.wake_event.clear()
-
- # bail out with an exception if cancellation has been requested
- if self.cancelled:
- raise CancelledError()
-
- predecessor = self._get_predecessor(node)
- if predecessor is None:
- return True
-
- if not blocking:
- return False
-
- # otherwise we are in the mix. watch predecessor and bide our time
- predecessor = self.path + "/" + predecessor
- self.client.add_listener(self._watch_session)
- try:
- self.client.get(predecessor, self._watch_predecessor)
- except NoNodeError:
- pass # predecessor has already been deleted
- else:
- self.wake_event.wait(timeout)
- if not self.wake_event.isSet():
- raise LockTimeout(
- "Failed to acquire lock on %s after %s seconds"
- % (self.path, timeout)
- )
- finally:
- self.client.remove_listener(self._watch_session)
-
- def _watch_predecessor(self, event):
- self.wake_event.set()
-
- def _get_predecessor(self, node):
- """returns `node`'s predecessor or None
-
- Note: This handle the case where the current lock is not a contender
- (e.g. rlock), this and also edge cases where the lock's ephemeral node
- is gone.
- """
- node_sequence = node[len(self.prefix):]
- children = self.client.get_children(self.path)
- found_self = False
- # Filter out the contenders using the computed regex
- contender_matches = []
- for child in children:
- match = self._contenders_re.search(child)
- if match is not None:
- contender_sequence = match.group(1)
- # Only consider contenders with a smaller sequence number.
- # A contender with a smaller sequence number has a higher
- # priority.
- if contender_sequence < node_sequence:
- contender_matches.append(match)
- if child == node:
- # Remember the node's match object so we can short circuit
- # below.
- found_self = match
-
- if found_self is False: # pragma: nocover
- # somehow we aren't in the childrens -- probably we are
- # recovering from a session failure and our ephemeral
- # node was removed.
- raise ForceRetryError()
-
- if not contender_matches:
- return None
-
- # Sort the contenders using the sequence number extracted by the regex
- # and return the original string of the predecessor.
- sorted_matches = sorted(contender_matches, key=lambda m: m.groups())
- return sorted_matches[-1].string
-
- def _find_node(self):
- children = self.client.get_children(self.path)
- for child in children:
- if child.startswith(self.prefix):
- return child
- return None
-
- def _delete_node(self, node):
- self.client.delete(self.path + "/" + node)
-
- def _best_effort_cleanup(self):
- try:
- node = self.node or self._find_node()
- if node:
- self._delete_node(node)
- except KazooException: # pragma: nocover
- pass
-
- def release(self):
- """Release the lock immediately."""
- return self.client.retry(self._inner_release)
-
- def _inner_release(self):
- if not self.is_acquired:
- return False
-
- try:
- self._delete_node(self.node)
- except NoNodeError: # pragma: nocover
- pass
-
- self.is_acquired = False
- self.node = None
- return True
-
- def contenders(self):
- """Return an ordered list of the current contenders for the
- lock.
-
- .. note::
-
- If the contenders did not set an identifier, it will appear
- as a blank string.
-
- """
- # make sure our election parent node exists
- if not self.assured_path:
- self._ensure_path()
-
- children = self.client.get_children(self.path)
- # We want all contenders, including self (this is especially important
- # for r/w locks). This is similar to the logic of `_get_predecessor`
- # except we include our own pattern.
- all_contenders_re = re.compile(
- r"(?:{patterns})(-?\d{{10}})$".format(
- patterns="|".join(self._exclude_names | {self._NODE_NAME})
- )
- )
- # Filter out the contenders using the computed regex
- contender_matches = []
- for child in children:
- match = all_contenders_re.search(child)
- if match is not None:
- contender_matches.append(match)
- # Sort the contenders using the sequence number extracted by the regex,
- # then extract the original string.
- contender_nodes = [
- match.string
- for match in sorted(contender_matches, key=lambda m: m.groups())
- ]
- # Retrieve all the contender nodes data (preserving order).
- contenders = []
- for node in contender_nodes:
- try:
- data, stat = self.client.get(self.path + "/" + node)
- if data is not None:
- contenders.append(data.decode("utf-8"))
- except NoNodeError: # pragma: nocover
- pass
-
- return contenders
-
- def __enter__(self):
- self.acquire()
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.release()
-
-
-class WriteLock(Lock):
- """Kazoo Write Lock
-
- Example usage with a :class:`~kazoo.client.KazooClient` instance:
-
- .. code-block:: python
-
- zk = KazooClient()
- zk.start()
- lock = zk.WriteLock("/lockpath", "my-identifier")
- with lock: # blocks waiting for lock acquisition
- # do something with the lock
-
- The lock path passed to WriteLock and ReadLock must match for them to
- communicate. The write lock can not be acquired if it is held by
- any readers or writers.
-
- Note: This lock is not *re-entrant*. Repeated calls after already
- acquired will block.
-
- This is the write-side of a shared lock. See :class:`Lock` for a
- standard exclusive lock and :class:`ReadLock` for the read-side of a
- shared lock.
-
- """
-
- _NODE_NAME = "__lock__"
- _EXCLUDE_NAMES = ["__lock__", "__rlock__"]
-
-
-class ReadLock(Lock):
- """Kazoo Read Lock
-
- Example usage with a :class:`~kazoo.client.KazooClient` instance:
-
- .. code-block:: python
-
- zk = KazooClient()
- zk.start()
- lock = zk.ReadLock("/lockpath", "my-identifier")
- with lock: # blocks waiting for outstanding writers
- # do something with the lock
-
- The lock path passed to WriteLock and ReadLock must match for them to
- communicate. The read lock blocks if it is held by any writers,
- but multiple readers may hold the lock.
-
- Note: This lock is not *re-entrant*. Repeated calls after already
- acquired will block.
-
- This is the read-side of a shared lock. See :class:`Lock` for a
- standard exclusive lock and :class:`WriteLock` for the write-side of a
- shared lock.
-
- """
-
- _NODE_NAME = "__rlock__"
- _EXCLUDE_NAMES = ["__lock__"]
-
-
-class Semaphore(object):
- """A Zookeeper-based Semaphore
-
- This synchronization primitive operates in the same manner as the
- Python threading version only uses the concept of leases to
- indicate how many available leases are available for the lock
- rather than counting.
-
- Note: This lock is not meant to be *re-entrant*.
-
- Example:
-
- .. code-block:: python
-
- zk = KazooClient()
- semaphore = zk.Semaphore("/leasepath", "my-identifier")
- with semaphore: # blocks waiting for lock acquisition
- # do something with the semaphore
-
- .. warning::
-
- This class stores the allowed max_leases as the data on the
- top-level semaphore node. The stored value is checked once
- against the max_leases of each instance. This check is
- performed when acquire is called the first time. The semaphore
- node needs to be deleted to change the allowed leases.
-
- .. versionadded:: 0.6
- The Semaphore class.
-
- .. versionadded:: 1.1
- The max_leases check.
-
- """
-
- def __init__(self, client, path, identifier=None, max_leases=1):
- """Create a Kazoo Lock
-
- :param client: A :class:`~kazoo.client.KazooClient` instance.
- :param path: The semaphore path to use.
- :param identifier: Name to use for this lock contender. This
- can be useful for querying to see who the
- current lock contenders are.
- :param max_leases: The maximum amount of leases available for
- the semaphore.
-
- """
- # Implementation notes about how excessive thundering herd
- # and watches are avoided
- # - A node (lease pool) holds children for each lease in use
- # - A lock is acquired for a process attempting to acquire a
- # lease. If a lease is available, the ephemeral node is
- # created in the lease pool and the lock is released.
- # - Only the lock holder watches for children changes in the
- # lease pool
- self.client = client
- self.path = path
-
- # some data is written to the node. this can be queried via
- # contenders() to see who is contending for the lock
- self.data = str(identifier or "").encode("utf-8")
- self.max_leases = max_leases
- self.wake_event = client.handler.event_object()
-
- self.create_path = self.path + "/" + uuid.uuid4().hex
- self.lock_path = path + "-" + "__lock__"
- self.is_acquired = False
- self.assured_path = False
- self.cancelled = False
- self._session_expired = False
-
- def _ensure_path(self):
- result = self.client.ensure_path(self.path)
- self.assured_path = True
- if result is True:
- # node did already exist
- data, _ = self.client.get(self.path)
- try:
- leases = int(data.decode("utf-8"))
- except (ValueError, TypeError):
- # ignore non-numeric data, maybe the node data is used
- # for other purposes
- pass
- else:
- if leases != self.max_leases:
- raise ValueError(
- "Inconsistent max leases: %s, expected: %s"
- % (leases, self.max_leases)
- )
- else:
- self.client.set(self.path, str(self.max_leases).encode("utf-8"))
-
- def cancel(self):
- """Cancel a pending semaphore acquire."""
- self.cancelled = True
- self.wake_event.set()
-
- def acquire(self, blocking=True, timeout=None):
- """Acquire the semaphore. By defaults blocks and waits forever.
-
- :param blocking: Block until semaphore is obtained or
- return immediately.
- :type blocking: bool
- :param timeout: Don't wait forever to acquire the semaphore.
- :type timeout: float or None
-
- :returns: Was the semaphore acquired?
- :rtype: bool
-
- :raises:
- ValueError if the max_leases value doesn't match the
- stored value.
-
- :exc:`~kazoo.exceptions.LockTimeout` if the semaphore
- wasn't acquired within `timeout` seconds.
-
- .. versionadded:: 1.1
- The blocking, timeout arguments and the max_leases check.
- """
- # If the semaphore had previously been canceled, make sure to
- # reset that state.
- self.cancelled = False
-
- try:
- self.is_acquired = self.client.retry(
- self._inner_acquire, blocking=blocking, timeout=timeout
- )
- except KazooException:
- # if we did ultimately fail, attempt to clean up
- self._best_effort_cleanup()
- self.cancelled = False
- raise
-
- return self.is_acquired
-
- def _inner_acquire(self, blocking, timeout=None):
- """Inner loop that runs from the top anytime a command hits a
- retryable Zookeeper exception."""
- self._session_expired = False
- self.client.add_listener(self._watch_session)
-
- if not self.assured_path:
- self._ensure_path()
-
- # Do we already have a lease?
- if self.client.exists(self.create_path):
- return True
-
- w = _Watch(duration=timeout)
- w.start()
- lock = self.client.Lock(self.lock_path, self.data)
- try:
- gotten = lock.acquire(blocking=blocking, timeout=w.leftover())
- if not gotten:
- return False
- while True:
- self.wake_event.clear()
-
- # Attempt to grab our lease...
- if self._get_lease():
- return True
-
- if blocking:
- # If blocking, wait until self._watch_lease_change() is
- # called before returning
- self.wake_event.wait(w.leftover())
- if not self.wake_event.isSet():
- raise LockTimeout(
- "Failed to acquire semaphore on %s"
- " after %s seconds" % (self.path, timeout)
- )
- else:
- return False
- finally:
- lock.release()
-
- def _watch_lease_change(self, event):
- self.wake_event.set()
-
- def _get_lease(self, data=None):
- # Make sure the session is still valid
- if self._session_expired:
- raise ForceRetryError("Retry on session loss at top")
-
- # Make sure that the request hasn't been canceled
- if self.cancelled:
- raise CancelledError("Semaphore cancelled")
-
- # Get a list of the current potential lock holders. If they change,
- # notify our wake_event object. This is used to unblock a blocking
- # self._inner_acquire call.
- children = self.client.get_children(
- self.path, self._watch_lease_change
- )
-
- # If there are leases available, acquire one
- if len(children) < self.max_leases:
- self.client.create(self.create_path, self.data, ephemeral=True)
-
- # Check if our acquisition was successful or not. Update our state.
- if self.client.exists(self.create_path):
- self.is_acquired = True
- else:
- self.is_acquired = False
-
- # Return current state
- return self.is_acquired
-
- def _watch_session(self, state):
- if state == KazooState.LOST:
- self._session_expired = True
- self.wake_event.set()
-
- # Return true to de-register
- return True
-
- def _best_effort_cleanup(self):
- try:
- self.client.delete(self.create_path)
- except KazooException: # pragma: nocover
- pass
-
- def release(self):
- """Release the lease immediately."""
- return self.client.retry(self._inner_release)
-
- def _inner_release(self):
- if not self.is_acquired:
- return False
- try:
- self.client.delete(self.create_path)
- except NoNodeError: # pragma: nocover
- pass
- self.is_acquired = False
- return True
-
- def lease_holders(self):
- """Return an unordered list of the current lease holders.
-
- .. note::
-
- If the lease holder did not set an identifier, it will
- appear as a blank string.
-
- """
- if not self.client.exists(self.path):
- return []
-
- children = self.client.get_children(self.path)
-
- lease_holders = []
- for child in children:
- try:
- data, stat = self.client.get(self.path + "/" + child)
- lease_holders.append(data.decode("utf-8"))
- except NoNodeError: # pragma: nocover
- pass
- return lease_holders
-
- def __enter__(self):
- self.acquire()
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.release()
diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py
index 73adf5954..87d76bca6 100644
--- a/zuul/zk/zkobject.py
+++ b/zuul/zk/zkobject.py
@@ -233,7 +233,18 @@ class ZKObject:
obj._load(context, path=path)
return obj
+ def internalCreate(self, context):
+ """Create the object in ZK from an existing ZKObject
+
+ This should only be used in special circumstances: when we
+ know it's safe to start using a ZKObject before it's actually
+ created in ZK. Normally use .new()
+ """
+ data = self._trySerialize(context)
+ self._save(context, data, create=True)
+
def refresh(self, context):
+
"""Update data from ZK"""
self._load(context)
@@ -308,6 +319,17 @@ class ZKObject:
return (compressed_size, uncompressed_size)
+ def getZKVersion(self):
+ """Return the ZK version of the object as of the last load/refresh.
+
+ Returns None if the object is newly created.
+ """
+ zstat = getattr(self, '_zstat', None)
+ # If zstat is None, we created the object
+ if zstat is None:
+ return None
+ return zstat.version
+
# Private methods below
def _retry(self, context, func, *args, max_tries=-1, **kw):